【MySQL】利用MySQL的binlog日志实现数据同步
参考了n篇文章(博客里面转载),实现了一个粗糙版本的数据同步。
准备
MySQL版本最好一致,开启binlog日志,并选择ROW模式,下面是my.ini配置文件
[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8mb4
[mysqld]
#设置3306端口
port = 3306
# 设置mysql的安装目录
basedir=D:\dir\mysql5.7
datadir=D:\dir\mysql5.7\data
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8mb4
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
# ERROR 1067 (42000): Invalid default value for 'CREATE_TIME'
sql_mode=STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 打开binlog日志:https://blog.csdn.net/king_kgh/article/details/74800513
log-bin=mysql-bin
binlog-format=ROW #选择ROW模式
server-id=12345
原理
考虑一个简单的场景,系统S是一个跨网络的系统,且两个网络之间物理隔离。我们将S在这两个网络上的分系统分别称为S1和S2,S1和S2都使用MySQL数据库,其中S1仅在企业内部使用,生成的数据需要同步到S2上供客户访问。因为网络不通,这两个数据库之间无法使用MySQL自身的同步机制。当然可以在应用层面做这件事,将S1生成的数据打包成XML或JSON,在S2上还原,缺点是与具体应用相关,不通用,增加应用层面的工作量等等。而利用binlog日志,将S1上的binlog日志同步到S2上,并利用工具将其在S2的MySQL上还原,可以达到与应用无关,通用,且可以减少应用层的工作量,更易于维护。
具体实现
这里只列出一些关键代码,实际实现的时候可能需要考虑的会更多一些;涉及的步骤包括
1.S1的binlog文件下载
2.拷贝到S2并上传
3.在S2上还原binlog文件
上面第3部是关键,这里利用mysql-binlog-connector-java进行还原,首先是两个辅助类,用于帮助解析binlog文件
BinlogParser
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
/**
*
* @author admin
*/
public class BinlogParser {
private final File binlogFile;
public BinlogParser(File binlogFile ) {
this.binlogFile = binlogFile;
}
/**
*
* @param startPosition
* @return
* @throws IOException
*/
public List<EventPair> parse(long startPosition) throws IOException {
List<EventPair> list = new ArrayList<>();
EventDeserializer eventDeserializer = new EventDeserializer();
// https://blog.csdn.net/jiangzeyin_/article/details/79442984
// 必须加CRC校验,否则抛com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException
eventDeserializer.setChecksumType(ChecksumType.CRC32);
try (BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer)) {
Event precedeTableMapEvent = null;
for (Event event; (event = reader.readEvent()) != null;) {
EventHeaderV4 eventHeader = event.getHeader();
if(eventHeader.getPosition() < startPosition) {
continue;
}
if(null != eventHeader.getEventType()) switch (eventHeader.getEventType()) {
case TABLE_MAP:
precedeTableMapEvent = event;
break;
case EXT_WRITE_ROWS: // inserted rows (within a single table)
case WRITE_ROWS:
case EXT_UPDATE_ROWS: // updated rows (within a single table)
case UPDATE_ROWS:
case EXT_DELETE_ROWS: // deleted rows (within a single table)
case DELETE_ROWS:
list.add(new EventPair(precedeTableMapEvent, event));
break;
default:
break;
}
}
}
return list;
}
}
EventPair
import com.github.shyiko.mysql.binlog.event.Event;
/**
*
* @author admin
*/
public class EventPair {
private final Event precedeTableMapEvent;
private final Event rowOperationEvent;
public EventPair(Event precedeTableMapEvent, Event rowOperationEvent) {
this.precedeTableMapEvent = precedeTableMapEvent;
this.rowOperationEvent = rowOperationEvent;
}
public Event getPrecedeTableMapEvent() {
return precedeTableMapEvent;
}
public Event getRowOperationEvent() {
return rowOperationEvent;
}
}
基本结构
import com.xxx.binlog.BinlogParser;
import com.xxx.binlog.EventPair;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
*
* @author admin
*/
public class Main {
public static void main(String[] args) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
BinlogParser parser = new BinlogParser(new File("D:\\dir\\mysql5.7\\data\\mysql-bin.000002"));
List<EventPair> list = parser.parse(0);
for(EventPair eventPair : list) {
Event tableMapEvent = eventPair.getPrecedeTableMapEvent();
Event rowOperationEvent = eventPair.getRowOperationEvent();
if(tableMapEvent != null && rowOperationEvent != null) {
TableMapEventData tableMapEventData = tableMapEvent.getData();
EventHeaderV4 header = rowOperationEvent.getHeader();
if(header.getEventType() == EventType.EXT_WRITE_ROWS || header.getEventType() == EventType.WRITE_ROWS) {
WriteRowsEventData data = rowOperationEvent.getData();
// TODO: ①插入
} else if(header.getEventType() == EventType.EXT_UPDATE_ROWS || header.getEventType() == EventType.UPDATE_ROWS) {
UpdateRowsEventData data = rowOperationEvent.getData();
// TODO: ②更新
} else if(header.getEventType() == EventType.EXT_DELETE_ROWS || header.getEventType() == EventType.DELETE_ROWS) {
DeleteRowsEventData data = rowOperationEvent.getData();
// TODO: ③删除
} else {
// ignore
}
}
}
}
}
①插入
将EventType.WRITE_ROWS事件转换为insert语句,这里只处理了字符串、日期、字节数组(对应数据库text类型)和数字类型。注意这里insert语句没有指定字段名称,而是按照字段生成顺序插入。
WriteRowsEventData data = rowOperationEvent.getData();
List<Serializable[]> rows = data.getRows();
for (Object[] row : rows) {
StringBuilder sb = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES (");
for(int i=0; i<row.length; i++) {
if(row[i] == null) {
sb.append("null");
} else if(row[i] instanceof String) {
String value = new String(row[i].toString().getBytes(), "utf-8");
sb.append("'").append(value).append("'");
} else if(row[i] instanceof byte[]) {
// byte数组先转成字符串再处理(对应数据库text类型)
String val = new String((byte[])row[i], "utf-8");
sb.append("'").append(val).append("'");
} else if(row[i] instanceof Date) {
String value = sdf.format((Date)row[i]);
sb.append("'").append(value).append("'");
} else {
// 数字等
sb.append(row[i]);
}
if(i < row.length-1) {
sb.append(", ");
}
}
sb.append(")");
System.out.println(sb.toString());
②更新
将EventType.UPDATE_ROWS事件转换为update语句,这里只处理了字符串、日期、字节数组(对应数据库text类型)和数字类型。update语句需要具体的字段名称,字段名称可以根据表中字段创建顺序进行配置(表很多的话会造成配置膨胀),也可以从MySQL服务器查询(文章后面会提到)。为简单起见,这里假定表的第一个字段都是主键,且名称为id(否则需要做些配置来识别主键)。
UpdateRowsEventData data = rowOperationEvent.getData();
List<Map.Entry<Serializable[], Serializable[]>> rows = data.getRows();
for(Map.Entry<Serializable[], Serializable[]> mapEntry : rows) {
Object[] key = mapEntry.getKey();
Object[] value = mapEntry.getValue();
StringBuilder sb = new StringBuilder("UPDATE ").append(tableName).append(" SET ");
for(int i=0; i<key.length; i++) {
String columnName = tableColumnMap.get(tableName + "." + (i+1));
System.out.println("i=" + i + ", " + columnName + ", key=" + key[i] + ", value=" + value[i]);
// 判断key[i]和value[i]是否相等,key[i]表示更新前的值,value[i]表示更新后的值
if(key[i] == null && value[i] == null) {
continue;
}
if(value[i] instanceof String) {
// 此时value[i]不为null,因为null instanceof String为false
if(!StringUtils.equals((String)key[i], (String)value[i])) {
String val = new String(value[i].toString().getBytes(), "utf-8");
sb.append(columnName).append("='").append(val).append("', ");
}
} else if(value[i] instanceof Date) {
// 此时value[i]不为null,因为null instanceof Date为false
if(!value[i].equals(key[i])) {
String date = sdf.format((Date)value[i]);
sb.append(columnName).append("='").append(date).append("', ");
}
} else if(value[i] instanceof byte[]) {
if(!Arrays.equals((byte[])key[i], (byte[])value[i])) {
// byte数组先转成字符串再处理(对应数据库text类型)
String val = new String((byte[])value[i], "utf-8");
sb.append(columnName).append("='").append(val).append("', ");
}
} else {
// 数字等
if(key[i] != value[i]) {
sb.append(columnName).append("=").append(value[i]).append(", ");
}
}
}
String updateSQL = sb.substring(0, sb.length() - 2);
// 加上where条件
updateSQL = updateSQL + " WHERE id='" + (String)key[0] + "'";
System.out.println(updateSQL);
③删除
将EventType.DELETE_ROWS事件转换为delete语句。为简单起见,这里假定表的第一个字段都是主键,且名称为id(否则需要做些配置来识别主键)。
DeleteRowsEventData data = rowOperationEvent.getData();
List<Serializable[]> rows = data.getRows();
for (Object[] row : rows) {
// 默认第一列为主键id
StringBuilder sb = new StringBuilder("DELETE FROM ").append(tableMapEventData.getDatabase()).append(".").append(tableMapEventData.getTable()).append(" WHERE id='").append(row[0].toString()).append("'");
System.out.println(sb.toString());
}
查询表中字段名称
Connection connection = DataSourceUtils.getConnection(dataSource);
System.out.println("service diy jdbc::::::::::::" + connection.toString());
DatabaseMetaData metaData = connection.getMetaData();
ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[] { "TABLE" });
try {
while (tableResultSet.next()) {
// @see https://www.cnblogs.com/dnn179/p/DatabaseMetaData.html,注意“TABLE_SCHEM”
String tableName = database + "." + tableResultSet.getString("TABLE_NAME");
ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null);
try {
while (columnResultSet.next()) {
tableColumnMap.put(tableName + "." + columnResultSet.getLong("ORDINAL_POSITION"), columnResultSet.getString("COLUMN_NAME"));
}
} finally {
columnResultSet.close();
}
}
} finally {
tableResultSet.close();
}