springboot整合canal
小码农叔叔 人气:15前言
在Mysql到Elasticsearch高效实时同步Debezium实现,分析并实战演示了如何利用debezium完成数据从mysql到es的准实时同步过程。
本篇将基于已经构建好的canal服务,演示在代码中如何利用canal完成一些业务场景的使用
环境准备
- 已经搭建好的canal服务
- 两个不同环境(IP)下的mysql服务
一、快速搭建canal服务
为方便后文的演示和学习,以便看到的同学能体验到完整的操作流程,在正式编写代码之前,先基于centos7环境快速搭建起一个canal服务
搭建步骤
1、服务器使用docker快速安装一个mysql并开启binlog日志
具体可参考:docker开启mysql的binlog日志解决数据卷问题
2、上传canal安装包并解压
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
3、进入到第二步解压后的文件目录,并修改配置文件
进入conf目录,需要的修改的配置文件为:canal.properties
################################################# ######### common argument ############# ################################################# canal.id = 1 canal.ip = canal.port = 11111 canal.metrics.pull.port = 11112 canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = tcp # flush meta cursor/parse position to file
说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的
输出 model,默认 tcp,改为输出到 kafka
重点关注上面的:canal.serverMode = tcp 这个配置,默认情况,如果是使用mysql,可以不做修改,如果需要将数据同步到kafka,或者rocketmq,可以分别修改即可,此处暂不做修改
进入example目录,需要的修改的配置文件为:instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=20 #只要和mysql的master的不一样即可 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306
- canal.instance.mysql.slaveId=20 #只要和mysql的master的不一样即可
- canal.instance.master.address=127.0.0.1:3306 ,监听的mysql的master节点信息
配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
4、启动canal服务
返回到bin目录,直接: startup.sh
二、与springboot整合
搭建好了canal服务,如何在业务中使用呢?其实在很场景下,可以考虑借助canal实现一些诸如数据同步、灾备、同城双活等,比如来考虑这样一种场景,一些比较大的电商页面上,都有商品搜索服务,用户输入商品关键字可以快速检索到商品
基本上搜索服务都是采用了诸如es这样的搜索引擎,思考一下,网站的所有上架的商品数据开始肯定是存放在mysql这样的关系型数据库,但是搜索走mysql的话肯定不可能,所以需要定期或者准实时的将mysql的数据同步到es
在之前的某一篇中,我们可以直接基于canal做配置,将mysql的数据同步到es中
但是考虑到数据并非所有的都同步,比如说要对同步到es的数据进行分类、筛选、过滤等操作,纯粹的配置就很难胜任了
于是,可以考虑在程序中,通过某种机制监听到mysql中的商品上架数据的变化然后触发程序,再通过程序将数据写入到es,实现准实时同步
在上面的这种业务场景下,canal就是一种很好的选择
1、Java中使用canal
导入基本的依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency>
2、编写一个demo
通过客户端,连接canal的信息,可以在程序中监听到mysql的master节点数据变化
下面直接贴出核心代码:
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws Exception{ //1.获取 canal 连接对象 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal所在服务器IP", 11111), "example", "", ""); System.out.println("canal启动并开始监听数据 ...... "); while (true){ canalConnector.connect(); //订阅表 canalConnector.subscribe("shop001.*"); //获取数据 Message message = canalConnector.get(100); //解析message List<CanalEntry.Entry> entries = message.getEntries(); if(entries.size() <=0){ System.out.println("未检测到数据"); Thread.sleep(1000); } for(CanalEntry.Entry entry : entries){ //1、获取表名 String tableName = entry.getHeader().getTableName(); //2、获取类型 CanalEntry.EntryType entryType = entry.getEntryType(); //3、获取序列化后的数据 ByteString storeValue = entry.getStoreValue(); //判断是否rowdata类型数据 if(CanalEntry.EntryType.ROWDATA.equals(entryType)){ //对第三步中的数据进行解析 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //获取当前事件的操作类型 CanalEntry.EventType eventType = rowChange.getEventType(); //获取数据集 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //便利数据 for(CanalEntry.RowData rowData : rowDatasList){ //数据变更之前的内容 JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getAfterColumnsList(); for(CanalEntry.Column column : beforeColumnsList){ beforeData.put(column.getName(),column.getValue()); } //数据变更之后的内容 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); JSONObject afterData = new JSONObject(); for(CanalEntry.Column column : afterColumnsList){ afterData.put(column.getName(),column.getValue()); } System.out.println("Table :" + tableName + ",eventType :" + eventType + ",beforeData :" + beforeData + ",afterData : " + afterData); } }else { System.out.println("当前操作类型为:" + entryType); } } } } }
关于API的使用,可以参考官方的demo示例代码,核心的代码处理步骤大概如下:
- 建立连接
- 订阅指定数据库(或者所有数据库,或某个库下的表)
- 检测到数据变更
- 提取binlog中的元数据,解析变更数据类型,解析元数据中的信息
- 基于变更数据做自身的业务逻辑或其他业务
下面运行上面的代码,这时候我们去数据库中修改一下本次订阅的数据库下的某个表的数据
接下来去 shop001数据库中给 user_info表新增一条数据
执行sql,然后观察控制台日志输出
我们再次修改其中一条数据,很快控制台上输出了数据更改前和修改后的数据信息日志
由此我们得知,基于上面解析出来的信息,可以检测到数据库中某些表的变化情况,从而将变化后的数据做同步或者接入其他的中间件进行消息通知等
3、与springboot整合
接下来,我们仍然以一个具体的业务场景为例
需求描述:将从canal中读取到的数据同步变更到另一个数据库下相同的表中
导入下面依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.14</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-dbutils/commons-dbutils --> <dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.7</version> </dependency> <!--canal客户端连接--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies>
4、application.yml 配置文件
注意,这里连接的数据库地址是目标数据库,即从canal中读取并解析后的数据即将写入的服务器地址
server: port: 8083 logging: config: classpath:logback-spring.xml #日志 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://IP:3306/bank1?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false username: root password: root druid: max-active: 100 initial-size: 10 max-wait: 60000 min-idle: 5
5、核心工具类
其实我们可以直接拿上面的演示代码,在里面做业务逻辑的处理也可以,不过在实际项目中,这样做不便于代码的维护性和可阅读性,因此需要根据功能封装一些方法形成可复用的工具类
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.dbutils.DbUtils; import org.apache.commons.dbutils.QueryRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CanalClient { private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource; /** * canal入库方法 */ public void handleMessages() { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("canal所在的服务IP", 11111), "example", "", ""); int batchSize = 1000; System.out.println("canal启动并开始监听数据 ...... "); try { connector.connect(); connector.subscribe("shop001.*"); connector.rollback(); try { while (true) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { System.out.println("未检测到任何数据变化......"); Thread.sleep(2000); } else { dataHandle(message.getEntries()); } connector.ack(batchId); //当队列里面堆积的sql大于一定数值的时候就模拟执行 if (SQL_QUEUE.size() >= 1) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); } } /** * 模拟执行队列里面的sql语句 */ public void executeQueueSql() { int size = SQL_QUEUE.size(); for (int i = 0; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println("[sql]----> " + sql); this.execute(sql.toString()); } } /** * 数据处理 * @param entrys */ private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新语句 * @param entry */ private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into "+ entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 入库 * @param sql */ public void execute(String sql) { Connection con = null; try { if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println("update: "+ row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } } }
6、提供一个配置类,在程序启动后监听数据变化
@Configuration public class InitConfig implements CommandLineRunner { @Resource private CanalClient canalClient; @Override public void run(String... args) throws Exception { canalClient.handleMessages(); } }
7、启动类
@SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); } }
提前在目标写入的数据库中准备一张相同的表
启动程序后,往canal服务监听的mysql服务数据库的user_info表中插入上面的一条新数据,然后观察控制台输出日志信息,
执行数据插入,这时候控制台检测到了数据变化
同时目标数据表中也新增了一条数据,
通过上面的操作,就完成了预期的需求,当然基于此业务逻辑,还可以衍生出更多的需求场景,比如只监听变化的数据,然后通知下游的其他应用等。
加载全部内容