pom
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
binlogDto
package com.example.demo.dto;
public class BinlogDto {
private String event;
private Object value;
public BinlogDto(String event, Object value) {
this.event = event;
this.value = value;
}
public BinlogDto() {
}
public String getEvent() {
return event;
}
public void setEvent(String event) {
this.event = event;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
}
BinlogclientRunner
package com.mxc.binlog.runner;
import com.alibaba.fastjson.JSON;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.mxc.binlog.dto.BinlogDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class BinlogClientRunner implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(BinlogClientRunner.class);
@Value("${mysql.binlog.host}")
private String host;
@Value("${mysql.binlog.port}")
private int port;
@Value("${mysql.binlog.user}")
private String user;
@Value("${mysql.binlog.password}")
private String password;
// binlog server_id
@Value("${mysql.server.id}")
private long serverId;
// 指定监听的数据表
@Value("${mysql.binlog.database.table}")
private String database_table;
@Async
@Override
public void run(String... args) throws Exception {
// 获取监听数据表数组
List<String> databaseList = Arrays.asList(database_table.split(","));
HashMap<Long, String> tableMap = new HashMap<Long, String>();
// 创建binlog监听客户端
BinaryLogClient client = new BinaryLogClient(host, port, user, password);
client.setServerId(serverId);
client.registerEventListener((event -> {
// binlog事件
EventData data = event.getData();
if (data != null) {
if (data instanceof TableMapEventData) {
TableMapEventData tableMapEventData = (TableMapEventData) data;
tableMap.put(tableMapEventData.getTableId(),
tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());
}
// update数据
if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
String tableName = tableMap.get(updateRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".update";
for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row.getValue()));
log.info("更新:{}", msg);
}
}
}
// insert数据
else if (data instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
String tableName = tableMap.get(writeRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".insert";
for (Serializable[] row : writeRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
log.info("插入:{}", msg);
}
}
}
// delete数据
else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
String tableName = tableMap.get(deleteRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".delete";
for (Serializable[] row : deleteRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
log.info("删除:{}", msg);
}
}
}
}
}));
client.connect();
}
}
完成上述操作的前提是在mysql的配置文件中进行配置。
log-bin=mysql-bin # 开启binlog binlog-format=ROW # 设置Binary Log记录方式为Row server_id=1 # 记住id 后续开发会使用
然后再在我们自己的项目中进行配置
# binlog配置
mysql:
server:
id: 1
binlog:
host: 127.0.0.1
port: 3306
user: root
password: root
# 指定监听的表格
database:
table: rboot.sys_dept,rboot.sys_user
demo
下载:https://mengxc.lanzouh.com/iCTxr0271saj
密码:2vbj