自己动手写一个Mysql到PostgreSQL数据库迁移工具

1. 前言

这段时间在进行 Mysql 到 PostgreSQL 数据库迁移工作.

主要包含三部分工作, 其一是构建数据库对象, 包括表, 视图, 存储过程的构建, 这部分由于我在项目早期就引入了 liquibase, 所以迁移工作很简单, 所以没有总结文章.

其二是代码修改, 让代码适配新的数据库, 这部分已经总结发布到了鹏叔的技术博客 - 从 Mariadb 迁移到 postgresql.

其三是数据迁移, 数据迁移也有一些现成的工具, 但是配置起来比较麻烦, 工具比想象中的复杂太多, 用起来也不是太顺手, 与其花时间在熟悉一次性的工具上, 不如自己写一个迁移工具. 于是就有了这篇文章.

2. 目标

写一个通用的工具, 尽量是一键式完成数据迁移. 用户不需要提高太多信息, 最多提共源和目标数据库的信息, 确认需要迁移的表后自动完成数据迁移工作.

3. 思路

  1. 首先需要连接两个异构数据库.
  2. 然后从源数据库批量读出数据.
  3. 最后将其写入目标数据库.
  4. 尽量通过查询数据库的元数据创建查询和更新工作.

4. 实现

4.1. 创建 gradle 项目

使用 gradle 创建 java 或 springboot 项目结构, 如何创建 springboot 或 java 工程可以到我的博客空间查找. 这里只是列出简单的命令.

1
gradle init

4.2. 引入依赖

1
2
runtimeOnly 'mysql:mysql-connector-java:5.1.37'
runtimeOnly 'org.postgresql:postgresql:42.5.1'

4.3. 创建数据源

这里使用 spring boot 管理.

Config.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class Config {

@Bean
@ConfigurationProperties(prefix = "spring.target-db")
public DataSource targetDatasource() {
return DataSourceBuilder.create().build();
}

@Bean
@ConfigurationProperties(prefix = "spring.source-db")
@Primary
public DataSource sourceDataSource() {
return DataSourceBuilder.create().build();
}
}

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13

#target db
spring.target-db.jdbcUrl=jdbc:postgresql://localhost:5432/test
spring.target-db.username=postgres
spring.target-db.password=password
spring.target-db.driverClassName=org.postgresql.Driver

#source db
spring.source-db.jdbcUrl=jdbc:mysql://localhost:3306/test
spring.source-db.username=root
spring.source-db.password=password
spring.source-db.driverClassName=com.mysql.jdbc.Driver

4.4. 抽象出迁移步骤

首选获取表数据总数, 然后分批查询源数据库, 批量写入目标数据库.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45


import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract public class CommonMigration {

private static Logger LOG = LoggerFactory.getLogger(CommonMigration.class);

public void migrate() throws Exception {

int totalRecords = getTotalRecords();

int stepLength = getStepLength();
LOG.info("start to migrate data from source db to target db");
for (int offset = getInitialOffset(); offset < totalRecords; offset = offset + stepLength) {

List<Map<String, Object>> rows = queryForList(getQuerySql(), offset, stepLength);
batchInsert(rows);
LOG.info("moved {} records", offset);
}
}

abstract protected List<Map<String, Object>> queryForList(String querySql, int offset, int stepLength);

abstract protected String getQuerySql();

abstract protected void batchInsert(List<Map<String, Object>> collocMaps) throws Exception;

protected int getStepLength() {
return 100;
}

protected int getInitialOffset() {
return 0;
}

abstract protected int getTotalRecords();

}


4.5. 具体实现细节

DataTableMigration.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.sql.DataSource;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;

public class DataTableMigration extends CommonMigration {

private final JdbcTemplate targetJdbc;
private final JdbcTemplate sourceJdbc;
private final String tableName;
private final String primaryKey;
private final String[] columnNamesInSourceDB;
private final String[] columnNamesInTargetDB;

private final Map<String, String> columnMappings;

public DataTableMigration(DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException {
this(sourceDataSource, targetDataSource, tableName, new HashMap<>());
}

public DataTableMigration(DataSource sourceDataSource, DataSource targetDataSource, String tableName,
Map<String, String> columnMappings)
throws SQLException {
this.tableName = tableName.toLowerCase();
this.sourceJdbc = new JdbcTemplate(sourceDataSource);
this.targetJdbc = new JdbcTemplate(targetDataSource);
this.primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this.tableName);
this.columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this.tableName);
Assert.isTrue(this.columnNamesInSourceDB != null && this.columnNamesInSourceDB.length > 0,
"can't find column infor from source db for the table " + this.tableName);
this.columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this.tableName);
Assert.isTrue(this.columnNamesInTargetDB != null && this.columnNamesInTargetDB.length > 0,
"can't find column infor from target db for the table " + this.tableName);
this.columnMappings = columnMappings;
}

protected JdbcTemplate getSourceJdbc() {
return this.sourceJdbc;
}

protected JdbcTemplate getTargetJdbc() {
return this.targetJdbc;
}

@Override
protected List<Map<String, Object>> queryForList(String querySql, int offset, int stepLength) {
return getSourceJdbc().queryForList(querySql, offset, stepLength);
}

@Override
protected void batchInsert(List<Map<String, Object>> rows) throws SQLException {

getTargetJdbc().batchUpdate(getInsertSQL(),
rows.stream().map(this::rowToParam)
.collect(Collectors.toList()));

}

private Object[] rowToParam(Map<String, Object> row) {
return Arrays.stream(columnNamesInTargetDB)
.map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource))
.map(row::get)
.toArray();
}

protected String getInsertSQL() {
return String.format("insert into %s (%s) values(%s)",
this.tableName,
String.join(",", columnNamesInTargetDB),
IntStream.range(0, columnNamesInTargetDB.length)
.mapToObj(n -> "?")
.collect(Collectors.joining(",")));
}

@Override
protected String getQuerySql() {

return String.format("select %s"
+ " from %s"
+ " order by %s asc "
+ " limit ?, ?",
String.join(",", columnNamesInSourceDB),
this.tableName,
this.primaryKey);
}

@Override
protected int getStepLength() {
return 100;
}

@Override
protected int getTotalRecords() {
int count = getSourceJdbc().queryForObject(
"select count(1) from " + tableName, Integer.class);
return count;
}

}

所使用的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class MigrationUtils {

public static String getPrimaryKeyByTableName(Connection conn, String tableNamePattern) throws SQLException {

DatabaseMetaData dbMetaData = conn.getMetaData();

ResultSet tabs = dbMetaData.getTables(null, null, tableNamePattern, new String[] { "TABLE" });

List<String> pkColList = new ArrayList<>();

while (tabs.next()) {
ResultSet resultSet = dbMetaData.getPrimaryKeys(null, tabs.getString("TABLE_SCHEM"),
tabs.getString("TABLE_NAME"));

while (resultSet.next()) {
pkColList.add(resultSet.getString("COLUMN_NAME"));
}
}

return pkColList.stream().collect(Collectors.joining(","));
}

public static String[] getColumnsByTableName(Connection conn, String tableNamePattern) throws SQLException {

DatabaseMetaData dbMetaData = conn.getMetaData();

ResultSet tabs = dbMetaData.getTables(null, null, tableNamePattern, new String[] { "TABLE" });

List<String> columnList = new ArrayList<>();

while (tabs.next()) {
ResultSet resultSet = dbMetaData.getColumns(null, tabs.getString("TABLE_SCHEM"),
tabs.getString("TABLE_NAME"), null);
while (resultSet.next()) {
columnList.add(resultSet.getString("COLUMN_NAME"));
}
}
return columnList.toArray(new String[columnList.size()]);
}
}

main 方法, 这里的数据库表, 可以通过 DatabaseMetaData 全部获取, 但是数据迁移项目需求各不相同, 可以加以改进适配到自己的项目中.

程序运行有两个前提,
一目标数据库表是空的, 否则会有主键冲突的状况.
二,数据库表各字段的名和类型需要一致
程序在 mysql 和 postgreSQL 之间进行了有限测试, 代码拿走不谢, 但是测试需要自己完成, 有问题欢迎反馈.

1
2
3
4
5
6
7
8
9

public static void main(String[] args) {
new DataTableMigration(sourceDataSource, "TABLE1", targetDataSource).migrate();
new DataTableMigration(sourceDataSource, "TABLE2", targetDataSource).migrate();
new DataTableMigration(sourceDataSource, "TABLE3", targetDataSource).migrate();
new DataTableMigration(sourceDataSource, "TABLE4", targetDataSource).migrate();
new DataTableMigration(sourceDataSource, "TABLE5", targetDataSource).migrate();
}

5. 后记

写完这部分数据迁移工具后, 发现数据库结构的迁移也是可用通过通用代码来完成的, 目前忙于赶项目进度. 如果忙完手头的工作, 再来补充数据库表结构的迁移工作.

6. 参考文档

Java DatabaseMetaData getPrimaryKeys()方法与示例

liquibase 的 changelog 详解