SourceTermAnalysisSystem_java/jeecg-module-sync/src/main/java/org/jeecg/OracleSync.java

531 lines
20 KiB
Java
Raw Normal View History

2025-09-28 14:45:34 +08:00
package org.jeecg;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
/**
* Oracle数据库同步工具类
* 功能根据指定的字段类型日期或ID分批同步数据并记录同步位置
*/
public class OracleSync {
// 数据库连接信息
private static final String SOURCE_URL = "jdbc:oracle:thin:@//127.0.0.1:1521/orcl";
private static final String TARGET_URL = "jdbc:oracle:thin:@//127.0.0.1:1521/orcl";
private static final String USERNAME = "idctest";
private static final String PASSWORD = "12345678";
private static final String TARGET_USER = "REB";
private static final String TARGET_PASSWORD = "REB";
// 分批同步配置
private static final int BATCH_SIZE_ID = 10000; // ID每次同步数量
private static final int BATCH_SIZE_DAYS = 30; // 日期每次同步天数
// 同步状态记录文件路径
private static final String SYNC_STATUS_FILE = "sync_status.properties";
/**
* 表配置类用于存储表名依据字段名和字段类型
*/
static class TableConfig {
String tableName; // 表名
String fieldName; // 依据字段名
FieldType fieldType; // 字段类型日期或ID
public TableConfig(String tableName, String fieldName, FieldType fieldType) {
this.tableName = tableName;
this.fieldName = fieldName;
this.fieldType = fieldType;
}
}
/**
* 字段类型枚举
*/
enum FieldType {
DATE, // 日期类型字段
ID // ID类型字段
}
/**
* 日期范围类用于存储最小和最大日期
*/
static class DateRange {
Date minDate; // 最小日期
Date maxDate; // 最大日期
public DateRange(Date minDate, Date maxDate) {
this.minDate = minDate;
this.maxDate = maxDate;
}
}
/**
* ID范围类用于存储最小和最大ID
*/
static class IdRange {
long minId; // 最小ID
long maxId; // 最大ID
public IdRange(long minId, long maxId) {
this.minId = minId;
this.maxId = maxId;
}
}
/**
* 主方法程序入口
* @param args 命令行参数
*/
public static void main(String[] args) {
// 指定要同步的表名和依据字段
List<TableConfig> tableConfigs = Arrays.asList(
new TableConfig("TABLE1", "CREATE_DATE", FieldType.DATE),
new TableConfig("TABLE2", "ID", FieldType.ID)
);
// 加载上次同步状态
Properties syncStatus = loadSyncStatus();
try {
// 1. 验证指定的表是否存在
List<TableConfig> validTables = validateTables(tableConfigs);
if (validTables.isEmpty()) {
System.out.println("没有有效的表需要同步");
return;
}
// 2. 在目标库创建表结构
createTargetTables(validTables);
// 3. 同步数据(根据字段类型分批)
syncDataByFieldType(validTables, syncStatus);
System.out.println("所有指定表同步完成");
} catch (SQLException e) {
System.err.println("数据库操作错误: " + e.getMessage());
e.printStackTrace();
} catch (Exception e) {
System.err.println("系统错误: " + e.getMessage());
e.printStackTrace();
} finally {
// 保存同步状态
saveSyncStatus(syncStatus);
}
}
/**
* 验证表是否存在且包含指定字段
* @param tableConfigs 表配置列表
* @return 有效的表配置列表
* @throws SQLException 数据库异常
*/
private static List<TableConfig> validateTables(List<TableConfig> tableConfigs) throws SQLException {
List<TableConfig> validTables = new ArrayList<>();
try (Connection sourceConn = DriverManager.getConnection(SOURCE_URL, USERNAME, PASSWORD)) {
for (TableConfig config : tableConfigs) {
if (isTableExists(sourceConn, USERNAME, config.tableName)) {
if (isColumnExists(sourceConn, USERNAME, config.tableName, config.fieldName)) {
validTables.add(config);
System.out.println("" + config.tableName + " 存在,将根据字段 " + config.fieldName + " 分批同步");
} else {
System.out.println("警告: 表 " + config.tableName + " 中不存在字段 " + config.fieldName + ",已跳过");
}
} else {
System.out.println("警告: 表 " + config.tableName + " 在源数据库中不存在,已跳过");
}
}
}
return validTables;
}
/**
* 在目标库创建表结构
* @param tableConfigs 表配置列表
* @throws SQLException 数据库异常
*/
private static void createTargetTables(List<TableConfig> tableConfigs) throws SQLException {
try (Connection sourceConn = DriverManager.getConnection(SOURCE_URL, USERNAME, PASSWORD);
Connection targetConn = DriverManager.getConnection(TARGET_URL, TARGET_USER, TARGET_PASSWORD)) {
for (TableConfig config : tableConfigs) {
try {
// 检查表是否已存在(区分大小写)
if (!isTableExists(targetConn, TARGET_USER, config.tableName)) {
// 获取源表结构
String createSql = getCreateTableSql(sourceConn, config.tableName);
// 在目标库创建表
try (Statement stmt = targetConn.createStatement()) {
stmt.execute(createSql);
System.out.println("" + config.tableName + " 创建成功");
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("" + config.tableName + " 已存在,跳过创建");
}
} catch (SQLException e) {
System.err.println("处理表 " + config.tableName + " 时出错: " + e.getMessage());
throw e;
}
}
}
}
/**
* 根据字段类型同步数据
* @param tableConfigs 表配置列表
* @param syncStatus 同步状态
* @throws SQLException 数据库异常
*/
private static void syncDataByFieldType(List<TableConfig> tableConfigs, Properties syncStatus) throws SQLException {
try (Connection sourceConn = DriverManager.getConnection(SOURCE_URL, USERNAME, PASSWORD);
Connection targetConn = DriverManager.getConnection(TARGET_URL, TARGET_USER, TARGET_PASSWORD)) {
// 设置目标连接为批量提交模式
targetConn.setAutoCommit(false);
for (TableConfig config : tableConfigs) {
System.out.println("开始同步表: " + config.tableName + " (依据字段: " + config.fieldName + ")");
long startTime = System.currentTimeMillis();
try {
if (config.fieldType == FieldType.DATE) {
syncByDateRange(sourceConn, targetConn, config, syncStatus);
} else if (config.fieldType == FieldType.ID) {
syncByIdRange(sourceConn, targetConn, config, syncStatus);
}
} catch (SQLException e) {
System.err.println("同步表 " + config.tableName + " 时出错: " + e.getMessage());
targetConn.rollback();
throw e;
}
long endTime = System.currentTimeMillis();
System.out.println("" + config.tableName + " 同步耗时: " + (endTime - startTime) + " 毫秒");
}
}
}
/**
* 按日期范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param config 表配置
* @param syncStatus 同步状态
* @throws SQLException 数据库异常
*/
private static void syncByDateRange(Connection sourceConn, Connection targetConn,
TableConfig config, Properties syncStatus) throws SQLException {
// 获取最小和最大日期
DateRange dateRange = getDateRange(sourceConn, config);
System.out.println("日期范围: " + dateRange.minDate + "" + dateRange.maxDate);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
// 获取上次同步的位置
Date lastSyncedDate = getLastSyncedDate(config.tableName, syncStatus);
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.minDate)) ?
lastSyncedDate : dateRange.minDate;
while (currentStart.before(dateRange.maxDate)) {
Date currentEnd = addDays(currentStart, BATCH_SIZE_DAYS);
if (currentEnd.after(dateRange.maxDate)) {
currentEnd = dateRange.maxDate;
}
String whereClause = config.fieldName + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
"', 'YYYY-MM-DD') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD')";
System.out.println("同步日期范围: " + sdf.format(currentStart) + "" + sdf.format(currentEnd));
int rowsSynced = syncDataBatch(sourceConn, targetConn, config.tableName, whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
// 记录同步位置
syncStatus.setProperty(config.tableName + ".lastSyncedDate", sdf.format(currentEnd));
saveSyncStatus(syncStatus); // 实时保存状态
currentStart = currentEnd;
}
}
/**
* 按ID范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param config 表配置
* @param syncStatus 同步状态
* @throws SQLException 数据库异常
*/
private static void syncByIdRange(Connection sourceConn, Connection targetConn,
TableConfig config, Properties syncStatus) throws SQLException {
// 获取最小和最大ID
IdRange idRange = getIdRange(sourceConn, config);
System.out.println("ID范围: " + idRange.minId + "" + idRange.maxId);
// 获取上次同步的位置
long lastSyncedId = getLastSyncedId(config.tableName, syncStatus);
long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.minId) ?
lastSyncedId : idRange.minId;
while (currentStart <= idRange.maxId) {
long currentEnd = currentStart + BATCH_SIZE_ID - 1;
if (currentEnd > idRange.maxId) {
currentEnd = idRange.maxId;
}
String whereClause = config.fieldName + " BETWEEN " + currentStart + " AND " + currentEnd;
System.out.println("同步ID范围: " + currentStart + "" + currentEnd);
int rowsSynced = syncDataBatch(sourceConn, targetConn, config.tableName, whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
// 记录同步位置
syncStatus.setProperty(config.tableName + ".lastSyncedId", String.valueOf(currentEnd));
saveSyncStatus(syncStatus); // 实时保存状态
currentStart = currentEnd + 1;
}
}
/**
* 获取表的日期范围
* @param conn 数据库连接
* @param config 表配置
* @return 日期范围对象
* @throws SQLException 数据库异常
*/
private static DateRange getDateRange(Connection conn, TableConfig config) throws SQLException {
String sql = "SELECT MIN(" + config.fieldName + ") as min_date, MAX(" + config.fieldName + ") as max_date FROM " +
USERNAME + "." + config.tableName;
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new DateRange(rs.getDate("min_date"), rs.getDate("max_date"));
}
}
throw new SQLException("无法获取日期范围");
}
/**
* 获取表的ID范围
* @param conn 数据库连接
* @param config 表配置
* @return ID范围对象
* @throws SQLException 数据库异常
*/
private static IdRange getIdRange(Connection conn, TableConfig config) throws SQLException {
String sql = "SELECT MIN(" + config.fieldName + ") as min_id, MAX(" + config.fieldName + ") as max_id FROM " +
USERNAME + "." + config.tableName;
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new IdRange(rs.getLong("min_id"), rs.getLong("max_id"));
}
}
throw new SQLException("无法获取ID范围");
}
/**
* 同步一批数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param tableName 表名
* @param whereClause 条件子句
* @return 同步的行数
* @throws SQLException 数据库异常
*/
private static int syncDataBatch(Connection sourceConn, Connection targetConn,
String tableName, String whereClause) throws SQLException {
int totalRows = 0;
// 从源表读取数据(使用带引号的表名保持大小写)
String selectSql = "SELECT * FROM \"" + USERNAME.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
try (Statement sourceStmt = sourceConn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
// 设置获取大小为1000优化大表读取
sourceStmt.setFetchSize(1000);
// 获取列信息
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 准备插入语句(使用带引号的表名保持大小写)
StringBuilder insertSql = new StringBuilder("INSERT INTO \"")
.append(TARGET_USER.toUpperCase()).append("\".\"").append(tableName).append("\" VALUES (");
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql.append(", ");
insertSql.append("?");
}
insertSql.append(")");
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql.toString())) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
pstmt.setObject(i, rs.getObject(i));
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 1000 == 0) {
pstmt.executeBatch();
targetConn.commit();
batchSize = 0;
}
}
if (batchSize > 0) {
pstmt.executeBatch();
targetConn.commit();
}
}
}
return totalRows;
}
/**
* 检查表是否存在
* @param conn 数据库连接
* @param schema 模式名
* @param tableName 表名
* @return 表是否存在
* @throws SQLException 数据库异常
*/
private static boolean isTableExists(Connection conn, String schema, String tableName) throws SQLException {
String sql = "SELECT COUNT(*) FROM all_tables WHERE owner = ? AND (table_name = ? OR table_name = ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, tableName);
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}
}
}
/**
* 检查列是否存在
* @param conn 数据库连接
* @param schema 模式名
* @param tableName 表名
* @param columnName 列名
* @return 列是否存在
* @throws SQLException 数据库异常
*/
private static boolean isColumnExists(Connection conn, String schema, String tableName, String columnName) throws SQLException {
String sql = "SELECT COUNT(*) FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, columnName.toUpperCase());
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}
}
}
/**
* 获取创建表的SQL语句
* @param conn 数据库连接
* @param table 表名
* @return 创建表的SQL语句
* @throws SQLException 数据库异常
*/
private static String getCreateTableSql(Connection conn, String table) throws SQLException {
if (!isTableExists(conn, USERNAME, table)) {
throw new SQLException("" + table + " 在源数据库中不存在");
}
return "CREATE TABLE \"" + table + "\" AS SELECT * FROM \"" +
USERNAME.toUpperCase() + "\".\"" + table + "\" WHERE 1=0";
}
/**
* 加载同步状态
* @return 同步状态Properties对象
*/
private static Properties loadSyncStatus() {
Properties props = new Properties();
try {
props.load(new FileInputStream(SYNC_STATUS_FILE));
} catch (Exception e) {
// 文件不存在或其他错误返回空Properties
}
return props;
}
/**
* 保存同步状态
* @param syncStatus 同步状态Properties对象
*/
private static void saveSyncStatus(Properties syncStatus) {
try {
syncStatus.store(new FileOutputStream(SYNC_STATUS_FILE),
"Oracle Sync Status - " + new Date());
} catch (Exception e) {
System.err.println("无法保存同步状态: " + e.getMessage());
}
}
/**
* 获取上次同步的日期
* @param tableName 表名
* @param syncStatus 同步状态
* @return 上次同步的日期如果没有则返回null
*/
private static Date getLastSyncedDate(String tableName, Properties syncStatus) {
try {
String dateStr = syncStatus.getProperty(tableName + ".lastSyncedDate");
if (dateStr != null) {
return new SimpleDateFormat("yyyy-MM-dd").parse(dateStr);
}
} catch (Exception e) {
System.err.println("解析上次同步日期失败: " + e.getMessage());
}
return null;
}
/**
* 获取上次同步的ID
* @param tableName 表名
* @param syncStatus 同步状态
* @return 上次同步的ID如果没有则返回0
*/
private static long getLastSyncedId(String tableName, Properties syncStatus) {
try {
String idStr = syncStatus.getProperty(tableName + ".lastSyncedId");
if (idStr != null) {
return Long.parseLong(idStr);
}
} catch (Exception e) {
System.err.println("解析上次同步ID失败: " + e.getMessage());
}
return 0;
}
/**
* 计算指定日期加上指定天数后的日期
* @param date 基准日期
* @param days 要添加的天数
* @return 计算后的日期
*/
private static Date addDays(Date date, int days) {
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
return new Date(time);
}
}