1. 前言 这段时间在进行 Mysql 到 PostgreSQL 数据库迁移工作.
主要包含三部分工作, 其一是构建数据库对象, 包括表, 视图, 存储过程的构建, 这部分由于我在项目早期就引入了 liquibase, 所以迁移工作很简单, 所以没有总结文章.
其二是代码修改, 让代码适配新的数据库, 这部分已经总结发布到了鹏叔的技术博客 - 从 Mariadb 迁移到 postgresql .
其三是数据迁移, 数据迁移也有一些现成的工具, 但是配置起来比较麻烦, 工具比想象中的复杂太多, 用起来也不是太顺手, 与其花时间在熟悉一次性的工具上, 不如自己写一个迁移工具. 于是就有了这篇文章.
2. 目标 写一个通用的工具, 尽量是一键式完成数据迁移. 用户不需要提高太多信息, 最多提共源和目标数据库的信息, 确认需要迁移的表后自动完成数据迁移工作.
3. 思路 首先需要连接两个异构数据库. 然后从源数据库批量读出数据. 最后将其写入目标数据库. 尽量通过查询数据库的元数据创建查询和更新工作. 4. 实现 4.1. 创建 gradle 项目 使用 gradle 创建 java 或 springboot 项目结构, 如何创建 springboot 或 java 工程可以到我的博客空间查找. 这里只是列出简单的命令.
4.2. 引入依赖 1 2 runtimeOnly 'mysql:mysql-connector-java:5.1.37' runtimeOnly 'org.postgresql:postgresql:42.5.1'
4.3. 创建数据源 这里使用 spring boot 管理.
Config.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import javax.sql.DataSource;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;@Configuration public class Config { @Bean @ConfigurationProperties(prefix = "spring.target-db") public DataSource targetDatasource () { return DataSourceBuilder.create().build(); } @Bean @ConfigurationProperties(prefix = "spring.source-db") @Primary public DataSource sourceDataSource () { return DataSourceBuilder.create().build(); } }
application.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 spring.target-db.jdbcUrl =jdbc:postgresql://localhost:5432/test spring.target-db.username =postgres spring.target-db.password =password spring.target-db.driverClassName =org.postgresql.Driver spring.source-db.jdbcUrl =jdbc:mysql://localhost:3306/test spring.source-db.username =root spring.source-db.password =password spring.source-db.driverClassName =com.mysql.jdbc.Driver
4.4. 抽象出迁移步骤 首选获取表数据总数, 然后分批查询源数据库, 批量写入目标数据库.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import java.util.List;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;abstract public class CommonMigration { private static Logger LOG = LoggerFactory.getLogger(CommonMigration.class); public void migrate () throws Exception { int totalRecords = getTotalRecords(); int stepLength = getStepLength(); LOG.info("start to migrate data from source db to target db" ); for (int offset = getInitialOffset(); offset < totalRecords; offset = offset + stepLength) { List<Map<String, Object>> rows = queryForList(getQuerySql(), offset, stepLength); batchInsert(rows); LOG.info("moved {} records" , offset); } } abstract protected List<Map<String, Object>> queryForList (String querySql, int offset, int stepLength) ; abstract protected String getQuerySql () ; abstract protected void batchInsert (List<Map<String, Object>> collocMaps) throws Exception; protected int getStepLength () { return 100 ; } protected int getInitialOffset () { return 0 ; } abstract protected int getTotalRecords () ; }
4.5. 具体实现细节 DataTableMigration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import java.sql.SQLException;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;import java.util.stream.IntStream;import javax.sql.DataSource;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.util.Assert;public class DataTableMigration extends CommonMigration { private final JdbcTemplate targetJdbc; private final JdbcTemplate sourceJdbc; private final String tableName; private final String primaryKey; private final String[] columnNamesInSourceDB; private final String[] columnNamesInTargetDB; private final Map<String, String> columnMappings; public DataTableMigration (DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException { this (sourceDataSource, targetDataSource, tableName, new HashMap <>()); } public DataTableMigration (DataSource sourceDataSource, DataSource targetDataSource, String tableName, Map<String, String> columnMappings) throws SQLException { this .tableName = tableName.toLowerCase(); this .sourceJdbc = new JdbcTemplate (sourceDataSource); this .targetJdbc = new JdbcTemplate (targetDataSource); this .primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this .tableName); this .columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this .tableName); Assert.isTrue(this .columnNamesInSourceDB != null && this .columnNamesInSourceDB.length > 0 , "can't find column infor from source db for the table " + this .tableName); this .columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this .tableName); Assert.isTrue(this .columnNamesInTargetDB != null && this .columnNamesInTargetDB.length > 0 , "can't find column infor from target db for the table " + this .tableName); this .columnMappings = columnMappings; } protected JdbcTemplate getSourceJdbc () { return this .sourceJdbc; } protected JdbcTemplate getTargetJdbc () { return this .targetJdbc; } @Override protected List<Map<String, Object>> queryForList (String querySql, int offset, int stepLength) { return getSourceJdbc().queryForList(querySql, offset, stepLength); } @Override protected void batchInsert (List<Map<String, Object>> rows) throws SQLException { getTargetJdbc().batchUpdate(getInsertSQL(), rows.stream().map(this ::rowToParam) .collect(Collectors.toList())); } private Object[] rowToParam(Map<String, Object> row) { return Arrays.stream(columnNamesInTargetDB) .map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource)) .map(row::get) .toArray(); } protected String getInsertSQL () { return String.format("insert into %s (%s) values(%s)" , this .tableName, String.join("," , columnNamesInTargetDB), IntStream.range(0 , columnNamesInTargetDB.length) .mapToObj(n -> "?" ) .collect(Collectors.joining("," ))); } @Override protected String getQuerySql () { return String.format("select %s" + " from %s" + " order by %s asc " + " limit ?, ?" , String.join("," , columnNamesInSourceDB), this .tableName, this .primaryKey); } @Override protected int getStepLength () { return 100 ; } @Override protected int getTotalRecords () { int count = getSourceJdbc().queryForObject( "select count(1) from " + tableName, Integer.class); return count; } }
所使用的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import java.sql.Connection;import java.sql.DatabaseMetaData;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;public class MigrationUtils { public static String getPrimaryKeyByTableName (Connection conn, String tableNamePattern) throws SQLException { DatabaseMetaData dbMetaData = conn.getMetaData(); ResultSet tabs = dbMetaData.getTables(null , null , tableNamePattern, new String [] { "TABLE" }); List<String> pkColList = new ArrayList <>(); while (tabs.next()) { ResultSet resultSet = dbMetaData.getPrimaryKeys(null , tabs.getString("TABLE_SCHEM" ), tabs.getString("TABLE_NAME" )); while (resultSet.next()) { pkColList.add(resultSet.getString("COLUMN_NAME" )); } } return pkColList.stream().collect(Collectors.joining("," )); } public static String[] getColumnsByTableName(Connection conn, String tableNamePattern) throws SQLException { DatabaseMetaData dbMetaData = conn.getMetaData(); ResultSet tabs = dbMetaData.getTables(null , null , tableNamePattern, new String [] { "TABLE" }); List<String> columnList = new ArrayList <>(); while (tabs.next()) { ResultSet resultSet = dbMetaData.getColumns(null , tabs.getString("TABLE_SCHEM" ), tabs.getString("TABLE_NAME" ), null ); while (resultSet.next()) { columnList.add(resultSet.getString("COLUMN_NAME" )); } } return columnList.toArray(new String [columnList.size()]); } }
main 方法, 这里的数据库表, 可以通过 DatabaseMetaData 全部获取, 但是数据迁移项目需求各不相同, 可以加以改进适配到自己的项目中.
程序运行有两个前提, 一目标数据库表是空的, 否则会有主键冲突的状况. 二,数据库表各字段的名和类型需要一致 程序在 mysql 和 postgreSQL 之间进行了有限测试, 代码拿走不谢, 但是测试需要自己完成, 有问题欢迎反馈.
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { new DataTableMigration (sourceDataSource, "TABLE1" , targetDataSource).migrate(); new DataTableMigration (sourceDataSource, "TABLE2" , targetDataSource).migrate(); new DataTableMigration (sourceDataSource, "TABLE3" , targetDataSource).migrate(); new DataTableMigration (sourceDataSource, "TABLE4" , targetDataSource).migrate(); new DataTableMigration (sourceDataSource, "TABLE5" , targetDataSource).migrate(); }
5. 后记 写完这部分数据迁移工具后, 发现数据库结构的迁移也是可用通过通用代码来完成的, 目前忙于赶项目进度. 如果忙完手头的工作, 再来补充数据库表结构的迁移工作.
6. 参考文档 Java DatabaseMetaData getPrimaryKeys()方法与示例
liquibase 的 changelog 详解