【MySQL】利用MySQL的binlog日志实现数据同步
参考了n篇文章(博客里面转载),实现了一个粗糙版本的数据同步。
准备
MySQL版本最好一致,开启binlog日志,并选择ROW模式,下面是my.ini配置文件
[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8mb4
[mysqld]
#设置3306端口
port = 3306 
# 设置mysql的安装目录
basedir=D:\dir\mysql5.7
datadir=D:\dir\mysql5.7\data
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8mb4
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
# ERROR 1067 (42000): Invalid default value for 'CREATE_TIME'
sql_mode=STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 打开binlog日志:https://blog.csdn.net/king_kgh/article/details/74800513
log-bin=mysql-bin
binlog-format=ROW	#选择ROW模式
server-id=12345
原理
考虑一个简单的场景,系统S是一个跨网络的系统,且两个网络之间物理隔离。我们将S在这两个网络上的分系统分别称为S1和S2,S1和S2都使用MySQL数据库,其中S1仅在企业内部使用,生成的数据需要同步到S2上供客户访问。因为网络不通,这两个数据库之间无法使用MySQL自身的同步机制。当然可以在应用层面做这件事,将S1生成的数据打包成XML或JSON,在S2上还原,缺点是与具体应用相关,不通用,增加应用层面的工作量等等。而利用binlog日志,将S1上的binlog日志同步到S2上,并利用工具将其在S2的MySQL上还原,可以达到与应用无关,通用,且可以减少应用层的工作量,更易于维护。
具体实现
这里只列出一些关键代码,实际实现的时候可能需要考虑的会更多一些;涉及的步骤包括
1.S1的binlog文件下载
2.拷贝到S2并上传
3.在S2上还原binlog文件
上面第3部是关键,这里利用mysql-binlog-connector-java进行还原,首先是两个辅助类,用于帮助解析binlog文件
BinlogParser
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
/**
 *
 * @author admin
 */
public class BinlogParser {
    
    private final File binlogFile;
    
    public BinlogParser(File binlogFile ) {
        this.binlogFile = binlogFile;
    }
    
    /**
     * 
     * @param startPosition
     * @return 
     * @throws IOException 
     */
    public List<EventPair> parse(long startPosition) throws IOException {
        
        List<EventPair> list = new ArrayList<>();
        
        EventDeserializer eventDeserializer = new EventDeserializer();
        // https://blog.csdn.net/jiangzeyin_/article/details/79442984
        // 必须加CRC校验,否则抛com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
        
        try (BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer)) {
            Event precedeTableMapEvent = null;
            for (Event event; (event = reader.readEvent()) != null;) {
                EventHeaderV4 eventHeader = event.getHeader();
                if(eventHeader.getPosition() < startPosition) {
                    continue;
                }
                
                if(null != eventHeader.getEventType()) switch (eventHeader.getEventType()) {
                    case TABLE_MAP:
                        precedeTableMapEvent = event;
                        break;
                    case EXT_WRITE_ROWS:    // inserted rows (within a single table)
                    case WRITE_ROWS:
                    case EXT_UPDATE_ROWS:   // updated rows (within a single table)
                    case UPDATE_ROWS:
                    case EXT_DELETE_ROWS:   // deleted rows (within a single table)
                    case DELETE_ROWS:
                        list.add(new EventPair(precedeTableMapEvent, event));
                        break;
                    default:
                        break;
                }
            }
        }
        
        return list;
    }
}
EventPair
import com.github.shyiko.mysql.binlog.event.Event;
/**
 *
 * @author admin
 */
public class EventPair {
    private final Event precedeTableMapEvent;
    private final Event rowOperationEvent;
    public EventPair(Event precedeTableMapEvent, Event rowOperationEvent) {
        this.precedeTableMapEvent = precedeTableMapEvent;
        this.rowOperationEvent = rowOperationEvent;
    }
    public Event getPrecedeTableMapEvent() {
        return precedeTableMapEvent;
    }
    public Event getRowOperationEvent() {
        return rowOperationEvent;
    }
    
}
基本结构
import com.xxx.binlog.BinlogParser;
import com.xxx.binlog.EventPair;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
 *
 * @author admin
 */
public class Main {
    
    public static void main(String[] args) throws IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        
        BinlogParser parser = new BinlogParser(new File("D:\\dir\\mysql5.7\\data\\mysql-bin.000002"));
        
        List<EventPair> list = parser.parse(0);
        for(EventPair eventPair : list) {
            Event tableMapEvent = eventPair.getPrecedeTableMapEvent();
            Event rowOperationEvent = eventPair.getRowOperationEvent();
            
            if(tableMapEvent != null && rowOperationEvent != null) {
                TableMapEventData tableMapEventData = tableMapEvent.getData();
                
                EventHeaderV4 header = rowOperationEvent.getHeader();
                
                if(header.getEventType() == EventType.EXT_WRITE_ROWS || header.getEventType() == EventType.WRITE_ROWS) {
                    WriteRowsEventData data = rowOperationEvent.getData();
                    
		    // TODO: ①插入
                } else if(header.getEventType() == EventType.EXT_UPDATE_ROWS || header.getEventType() == EventType.UPDATE_ROWS) {
                    UpdateRowsEventData data = rowOperationEvent.getData();
         	    // TODO: ②更新
                } else if(header.getEventType() == EventType.EXT_DELETE_ROWS || header.getEventType() == EventType.DELETE_ROWS) {
                    DeleteRowsEventData data = rowOperationEvent.getData();
		    // TODO: ③删除
                } else {
                    // ignore
                }
            }
        }
    }
}
①插入
将EventType.WRITE_ROWS事件转换为insert语句,这里只处理了字符串、日期、字节数组(对应数据库text类型)和数字类型。注意这里insert语句没有指定字段名称,而是按照字段生成顺序插入。
WriteRowsEventData data = rowOperationEvent.getData();
List<Serializable[]> rows = data.getRows();
for (Object[] row : rows) {
	StringBuilder sb = new StringBuilder("INSERT INTO ").append(tableName).append(" VALUES (");
	for(int i=0; i<row.length; i++) {
		if(row[i] == null) {
			sb.append("null");
		} else if(row[i] instanceof String) {
			String value = new String(row[i].toString().getBytes(), "utf-8");
			sb.append("'").append(value).append("'");
		} else if(row[i] instanceof byte[]) {
				// byte数组先转成字符串再处理(对应数据库text类型)
				String val = new String((byte[])row[i], "utf-8");
				sb.append("'").append(val).append("'");
		} else if(row[i] instanceof Date) {
			String value = sdf.format((Date)row[i]);
			sb.append("'").append(value).append("'");
		} else {
			// 数字等
			sb.append(row[i]);
		}
		
		if(i < row.length-1) {
			sb.append(", ");
		}
	}
	sb.append(")");
	System.out.println(sb.toString());
②更新
将EventType.UPDATE_ROWS事件转换为update语句,这里只处理了字符串、日期、字节数组(对应数据库text类型)和数字类型。update语句需要具体的字段名称,字段名称可以根据表中字段创建顺序进行配置(表很多的话会造成配置膨胀),也可以从MySQL服务器查询(文章后面会提到)。为简单起见,这里假定表的第一个字段都是主键,且名称为id(否则需要做些配置来识别主键)。
UpdateRowsEventData data = rowOperationEvent.getData();
List<Map.Entry<Serializable[], Serializable[]>> rows = data.getRows();
for(Map.Entry<Serializable[], Serializable[]> mapEntry : rows) {
	Object[] key = mapEntry.getKey();
	Object[] value = mapEntry.getValue();
	
	StringBuilder sb = new StringBuilder("UPDATE ").append(tableName).append(" SET ");
	for(int i=0; i<key.length; i++) {
		String columnName = tableColumnMap.get(tableName + "." + (i+1));
		System.out.println("i=" + i + ", " + columnName + ", key=" + key[i] + ", value=" + value[i]);
		
		// 判断key[i]和value[i]是否相等,key[i]表示更新前的值,value[i]表示更新后的值
		if(key[i] == null && value[i] == null) {
			continue;
		}
		
		if(value[i] instanceof String) {
			// 此时value[i]不为null,因为null instanceof String为false
			if(!StringUtils.equals((String)key[i], (String)value[i])) {
				String val = new String(value[i].toString().getBytes(), "utf-8");
				sb.append(columnName).append("='").append(val).append("', ");
			}
			
		} else if(value[i] instanceof Date) {
			// 此时value[i]不为null,因为null instanceof Date为false
			if(!value[i].equals(key[i])) {
				String date = sdf.format((Date)value[i]);
				sb.append(columnName).append("='").append(date).append("', ");
			}
		} else if(value[i] instanceof byte[]) {
			if(!Arrays.equals((byte[])key[i], (byte[])value[i])) {
				// byte数组先转成字符串再处理(对应数据库text类型)
				String val = new String((byte[])value[i], "utf-8");
				sb.append(columnName).append("='").append(val).append("', ");
			}
		} else {
			// 数字等
			if(key[i] != value[i]) {
				sb.append(columnName).append("=").append(value[i]).append(", ");
			}
		}
	}
	String updateSQL = sb.substring(0, sb.length() - 2);
	
	// 加上where条件
	updateSQL = updateSQL + " WHERE id='" + (String)key[0] + "'";
	System.out.println(updateSQL);
③删除
将EventType.DELETE_ROWS事件转换为delete语句。为简单起见,这里假定表的第一个字段都是主键,且名称为id(否则需要做些配置来识别主键)。
DeleteRowsEventData data = rowOperationEvent.getData();
List<Serializable[]> rows = data.getRows();
for (Object[] row : rows) {
	// 默认第一列为主键id
	StringBuilder sb = new StringBuilder("DELETE FROM ").append(tableMapEventData.getDatabase()).append(".").append(tableMapEventData.getTable()).append(" WHERE id='").append(row[0].toString()).append("'");
	System.out.println(sb.toString());
}
查询表中字段名称
Connection connection = DataSourceUtils.getConnection(dataSource);
System.out.println("service diy jdbc::::::::::::" + connection.toString());
DatabaseMetaData metaData = connection.getMetaData();
ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[] { "TABLE" });
try {
	while (tableResultSet.next()) {
		// @see https://www.cnblogs.com/dnn179/p/DatabaseMetaData.html,注意“TABLE_SCHEM”
		String tableName = database + "." + tableResultSet.getString("TABLE_NAME");
		ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null);
		try {
			while (columnResultSet.next()) {
				tableColumnMap.put(tableName + "." + columnResultSet.getLong("ORDINAL_POSITION"), columnResultSet.getString("COLUMN_NAME"));
			}
		} finally {
			columnResultSet.close();
		}
	}
} finally {
	tableResultSet.close();
}