pom

<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.21.0</version>
</dependency>

binlogDto

package com.example.demo.dto;

public class BinlogDto {
    private String event;
    private Object value;

    public BinlogDto(String event, Object value) {
        this.event = event;
        this.value = value;
    }

    public BinlogDto() {
    }

    public String getEvent() {
        return event;
    }

    public void setEvent(String event) {
        this.event = event;
    }

    public Object getValue() {
        return value;
    }

    public void setValue(Object value) {
        this.value = value;
    }
}

BinlogclientRunner

package com.mxc.binlog.runner;

import com.alibaba.fastjson.JSON;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.mxc.binlog.dto.BinlogDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class BinlogClientRunner implements CommandLineRunner {

	private static final Logger log = LoggerFactory.getLogger(BinlogClientRunner.class);

	@Value("${mysql.binlog.host}")
	private String host;

	@Value("${mysql.binlog.port}")
	private int port;

	@Value("${mysql.binlog.user}")
	private String user;

	@Value("${mysql.binlog.password}")
	private String password;

	// binlog server_id
	@Value("${mysql.server.id}")
	private long serverId;

	// 指定监听的数据表
	@Value("${mysql.binlog.database.table}")
	private String database_table;

	@Async
	@Override
	public void run(String... args) throws Exception {

		// 获取监听数据表数组
		List<String> databaseList = Arrays.asList(database_table.split(","));
		HashMap<Long, String> tableMap = new HashMap<Long, String>();
		// 创建binlog监听客户端
		BinaryLogClient client = new BinaryLogClient(host, port, user, password);
		client.setServerId(serverId);
		client.registerEventListener((event -> {
			// binlog事件
			EventData data = event.getData();
			if (data != null) {
				if (data instanceof TableMapEventData) {
					TableMapEventData tableMapEventData = (TableMapEventData) data;
					tableMap.put(tableMapEventData.getTableId(),
							tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());
				}
				// update数据
				if (data instanceof UpdateRowsEventData) {
					UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
					String tableName = tableMap.get(updateRowsEventData.getTableId());
					if (tableName != null && databaseList.contains(tableName)) {
						String eventKey = tableName + ".update";
						for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
							String msg = JSON.toJSONString(new BinlogDto(eventKey, row.getValue()));
							log.info("更新:{}", msg);
						}
					}
				}
				// insert数据
				else if (data instanceof WriteRowsEventData) {
					WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
					String tableName = tableMap.get(writeRowsEventData.getTableId());
					if (tableName != null && databaseList.contains(tableName)) {
						String eventKey = tableName + ".insert";
						for (Serializable[] row : writeRowsEventData.getRows()) {
							String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
							log.info("插入:{}", msg);
						}
					}
				}
				// delete数据
				else if (data instanceof DeleteRowsEventData) {
					DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
					String tableName = tableMap.get(deleteRowsEventData.getTableId());
					if (tableName != null && databaseList.contains(tableName)) {
						String eventKey = tableName + ".delete";
						for (Serializable[] row : deleteRowsEventData.getRows()) {
							String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
							log.info("删除:{}", msg);
						}
					}
				}
			}
		}));
		client.connect();
	}
}

完成上述操作的前提是在mysql的配置文件中进行配置。

log-bin=mysql-bin # 开启binlog binlog-format=ROW # 设置Binary Log记录方式为Row server_id=1 # 记住id 后续开发会使用

然后再在我们自己的项目中进行配置

# binlog配置
mysql:  
  server: 
    id: 1
  binlog: 
    host: 127.0.0.1
    port: 3306
    user: root
    password: root
  # 指定监听的表格
    database: 
      table:  rboot.sys_dept,rboot.sys_user

demo

下载:https://mengxc.lanzouh.com/iCTxr0271saj

密码:2vbj