From 268cacd2c1cabf969aa46815ea84d2c919bd9d91 Mon Sep 17 00:00:00 2001 From: hekaiyu <13673834656@163.com> Date: Mon, 13 Oct 2025 18:16:45 +0800 Subject: [PATCH] =?UTF-8?q?oracle=E5=90=8C=E6=AD=A5=E5=88=B0pg=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/jeecg/common/util/DBUtil.java | 4 + .../impl/StasSyncStrategyServiceImpl.java | 208 +++++++++++++-- .../controller/StasTaskConfigController.java | 12 +- .../org/jeecg/taskConfig/job/SyncDataJob.java | 250 ++++++++++-------- 4 files changed, 339 insertions(+), 135 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java index f24e632..218dc18 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java @@ -30,6 +30,10 @@ public class DBUtil { return String.format("%s%s:%s%s", DBUtil.ORACLE_URL_PREFIX, ip, port, serveId); } + public static String getPgUrl(String ip, Integer port, String serveId){ + return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId); + } + public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{ try { Class.forName(Driver); diff --git a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java index 72698f7..ae1f42d 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java @@ -1,6 +1,7 @@ package org.jeecg.stasSyncStrategy.service.impl; import lombok.RequiredArgsConstructor; +import org.jeecg.common.constant.enums.SourceDataTypeEnum; import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.StasDataSource; @@ -16,6 +17,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.transaction.annotation.Transactional; import java.sql.*; +import java.util.ArrayList; +import java.util.List; /** * @Description: 同步策略表 @@ -48,14 +51,14 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl pkColumns = new ArrayList<>(); + + while (rs.next()) { + pkColumns.add(rs.getString("column_name")); + } + + if (!pkColumns.isEmpty()) { + sqlBuilder.append(",\n PRIMARY KEY ("); + for (int i = 0; i < pkColumns.size(); i++) { + if (i > 0) { + sqlBuilder.append(", "); + } + sqlBuilder.append("\"").append(pkColumns.get(i)).append("\""); + } + sqlBuilder.append(")"); + } + } + + sqlBuilder.append("\n)"); + return sqlBuilder.toString(); + } + + /** + * Oracle数据类型到PostgreSQL的映射 + */ + private static String mapOracleTypeToPg(String oracleType, int length, int precision, int scale) { + switch (oracleType.toUpperCase()) { + case "VARCHAR2": + return "VARCHAR(" + (length > 0 ? length : 255) + ")"; + case "NVARCHAR2": + return "VARCHAR(" + (length > 0 ? length : 255) + ")"; + case "CHAR": + return "CHAR(" + (length > 0 ? length : 1) + ")"; + case "NCHAR": + return "CHAR(" + (length > 0 ? length : 1) + ")"; + case "NUMBER": + if (precision == 0 && scale == 0) { + return "NUMERIC"; // 未指定精度的NUMBER + } else if (scale == 0) { + return "NUMERIC(" + precision + ")"; // 整数 + } else { + return "NUMERIC(" + precision + "," + scale + ")"; // 小数 + } + case "FLOAT": + return "DOUBLE PRECISION"; + case "DATE": + return "TIMESTAMP"; + case "TIMESTAMP": + return "TIMESTAMP"; + case "CLOB": + return "TEXT"; + case "BLOB": + return "BYTEA"; + case "RAW": + return "BYTEA"; + case "LONG": + return "TEXT"; + case "LONG RAW": + return "BYTEA"; + default: + return "TEXT"; // 默认转换为TEXT + } } /** @@ -125,15 +271,28 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl 0; } @@ -146,15 +305,26 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl 0; } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java index abf675f..4791534 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java @@ -168,8 +168,10 @@ public class StasTaskConfigController extends JeecgController pause(@RequestParam(name="taskId",required=true) String taskId) { - StasTaskConfig byId = stasTaskConfigService.getById(taskId); - stasTaskConfigService.pauseQuartzJob(byId.getQuartzId()); + StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId); + stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey()); + stasTaskConfigService.updateById(stasTaskConfig); + stasTaskConfigService.pauseQuartzJob(stasTaskConfig.getQuartzId()); return Result.OK("暂停成功!"); } @@ -183,8 +185,10 @@ public class StasTaskConfigController extends JeecgController resume(@RequestParam(name="taskId",required=true) String taskId) { - StasTaskConfig byId = stasTaskConfigService.getById(taskId); - stasTaskConfigService.resumeQuartzJob(byId.getQuartzId()); + StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId); + stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey()); + stasTaskConfigService.updateById(stasTaskConfig); + stasTaskConfigService.resumeQuartzJob(stasTaskConfig.getQuartzId()); return Result.OK("启动成功!"); } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java index 6ace6f3..ea0bd57 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.jeecg.common.constant.enums.SourceDataTypeEnum; import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.StasDataSource; @@ -25,9 +26,7 @@ import java.util.Date; import java.util.List; /** - * 示例带参定时任务 - * - * @Author Scott + * 数据同步任务(支持Oracle和PostgreSQL) */ @Slf4j public class SyncDataJob implements Job { @@ -39,9 +38,6 @@ public class SyncDataJob implements Job { @Resource private StasSyncStrategyMapper stasSyncStrategyMapper; - /** - * 若参数变量名修改 QuartzJobController中也需对应修改 - */ private String parameter; public void setParameter(String parameter) { @@ -50,102 +46,106 @@ public class SyncDataJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - try { - syncDataByFieldType(parameter); - } catch (SQLException e) { - throw new JeecgBootException(e.getMessage()); - } - } + try { + syncDataByFieldType(parameter); + } catch (SQLException e) { + throw new JeecgBootException(e.getMessage()); + } + } /** * 根据字段类型同步数据 - * @param taskId 任务id - * @throws SQLException 数据库异常 */ public void syncDataByFieldType(String taskId) throws SQLException { StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId); StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId()); StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId()); + String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); - String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); + String targetUrl; + if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){ + targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); + }else{ + targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); + } + try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword()); Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) { // 设置目标连接为批量提交模式 targetConn.setAutoCommit(false); - List stasSyncStrategies = stasSyncStrategyMapper. - selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); + List stasSyncStrategies = stasSyncStrategyMapper + .selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); + for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { - System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")"); + log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()); long startTime = System.currentTimeMillis(); try { - if (isDateColumn(sourceConn,stasSyncStrategy)) { - syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay()); + if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) { + syncByDateRange(sourceConn, targetConn, stasSyncStrategy, + stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType()); } else { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount()); + syncByIdRange(sourceConn, targetConn, stasSyncStrategy, + stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType()); } - } catch (SQLException e) { - System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage()); + } catch (SQLException | ParseException e) { + log.error("同步表 {} 时出错: {}", stasSyncStrategy.getTableName(), e.getMessage()); targetConn.rollback(); throw new JeecgBootException(e.getMessage()); - } catch (ParseException e) { - throw new JeecgBootException(e.getMessage()); } long endTime = System.currentTimeMillis(); - System.out.println("表 " + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒"); + log.info("表 {} 同步耗时: {} 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)); } } } - public boolean isDateColumn(Connection sourceConn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT data_type FROM all_tab_columns " - + "WHERE owner = ? AND table_name = ? AND column_name = ?"; + /** + * 判断字段是否为日期类型 + */ + public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + String sql; + if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { + sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?"; + } else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) { + sql = "SELECT data_type FROM information_schema.columns " + + "WHERE table_schema = ? AND table_name = ? AND column_name = ?"; + } else { + throw new SQLException("不支持的数据库类型: " + dbType); + } - try (PreparedStatement pstmt = sourceConn.prepareStatement(sql)) { + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase()); pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase()); pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase()); ResultSet rs = pstmt.executeQuery(); if (rs.next()) { - String dataType = rs.getString("data_type"); + String dataType = rs.getString("data_type").toUpperCase(); // 判断是否为日期/时间类型 - return dataType != null && - (dataType.equalsIgnoreCase("DATE") || - dataType.equalsIgnoreCase("TIMESTAMP") || - dataType.equalsIgnoreCase("TIMESTAMP WITH TIME ZONE") || - dataType.equalsIgnoreCase("TIMESTAMP WITH LOCAL TIME ZONE")); + return dataType.contains("DATE") || dataType.contains("TIME"); } - } catch (SQLException e) { - e.printStackTrace(); - throw e; // 重新抛出异常,让调用方处理 } - return false; } /** * 按日期范围同步数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param stasSyncStrategy 表配置 - * @param syncCount 同步起始位置 - * @throws SQLException 数据库异常 */ public void syncByDateRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException, ParseException { + StasSyncStrategy stasSyncStrategy, Integer syncCount, + Integer sourceDbType, Integer targetDbType) throws SQLException, ParseException { // 获取最小和最大日期 - DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy); - System.out.println("日期范围: " + dateRange.getMinDate() + " 至 " + dateRange.getMaxDate()); + DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); + log.info("日期范围: {} 至 {}", dateRange.getMinDate(), dateRange.getMaxDate()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 获取上次同步的位置 String syncOrigin = stasSyncStrategy.getSyncOrigin(); - Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(stasSyncStrategy.getSyncOrigin()) : dateRange.getMinDate(); + Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate(); Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? lastSyncedDate : dateRange.getMinDate(); @@ -154,36 +154,40 @@ public class SyncDataJob implements Job { currentEnd = dateRange.getMaxDate(); } - String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + - "', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')"; + // 根据数据库类型构建不同的日期条件 + String whereClause; + if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { + whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + + "', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')"; + } else { + whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TIMESTAMP '" + sdf.format(currentStart) + + "' AND TIMESTAMP '" + sdf.format(currentEnd) + "'"; + } - System.out.println("同步日期范围: " + sdf.format(currentStart) + " 至 " + sdf.format(currentEnd)); + log.info("同步日期范围: {} 至 {}", sdf.format(currentStart), sdf.format(currentEnd)); int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause); - System.out.println("已同步 " + rowsSynced + " 行数据"); + stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), + whereClause, sourceDbType, targetDbType); + log.info("已同步 {} 行数据", rowsSynced); - // 只在最后记录同步位置 + // 更新同步位置 if (currentEnd != null) { stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd)); stasSyncStrategyMapper.updateById(stasSyncStrategy); - System.out.println("最终同步位置已更新为: " + sdf.format(currentEnd)); + log.info("最终同步位置已更新为: {}", sdf.format(currentEnd)); } } /** * 按ID范围同步数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param stasSyncStrategy 表配置 - * @param syncCount 每次同步的记录数 - * @throws SQLException 数据库异常 */ public void syncByIdRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException { + StasSyncStrategy stasSyncStrategy, Integer syncCount, + Integer sourceDbType, Integer targetDbType) throws SQLException { // 获取最小和最大ID - IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy); - System.out.println("ID范围: " + idRange.getMinId() + " 至 " + idRange.getMaxId()); + IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); + log.info("ID范围: {} 至 {}", idRange.getMinId(), idRange.getMaxId()); // 获取上次同步的位置 String syncOrigin = stasSyncStrategy.getSyncOrigin(); @@ -198,35 +202,40 @@ public class SyncDataJob implements Job { String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; - System.out.println("同步ID范围: " + currentStart + " 至 " + currentEnd); + log.info("同步ID范围: {} 至 {}", currentStart, currentEnd); int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause); - System.out.println("已同步 " + rowsSynced + " 行数据"); + stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), + whereClause, sourceDbType, targetDbType); + log.info("已同步 {} 行数据", rowsSynced); - // 只在最后记录同步位置 + // 更新同步位置 if (currentEnd > 0) { stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd)); stasSyncStrategyMapper.updateById(stasSyncStrategy); - System.out.println("最终同步位置已更新为: " + currentEnd); + log.info("最终同步位置已更新为: {}", currentEnd); } } /** * 获取表的日期范围 - * @param conn 数据库连接 - * @param stasSyncStrategy 表配置 - * @return 日期范围对象 - * @throws SQLException 数据库异常 */ - public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, MAX(" + stasSyncStrategy.getColumnName() + ") as max_date FROM " + - stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName(); + public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + String sql; + if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + + "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + } else { + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + + "FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + } try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { if (rs.next()) { - return new DateRangeVO(rs.getDate("min_date"), rs.getDate("max_date")); + return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date")); } } throw new SQLException("无法获取日期范围"); @@ -234,14 +243,18 @@ public class SyncDataJob implements Job { /** * 获取表的ID范围 - * @param conn 数据库连接 - * @param stasSyncStrategy 表配置 - * @return ID范围对象 - * @throws SQLException 数据库异常 */ - public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, MAX(" + stasSyncStrategy.getColumnName() + ") as max_id FROM " + - stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName(); + public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + String sql; + if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + + "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + } else { + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + + "FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + } try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { @@ -253,50 +266,66 @@ public class SyncDataJob implements Job { } /** - * 同步一批数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param tableName 表名 - * @param whereClause 条件子句 - * @return 同步的行数 - * @throws SQLException 数据库异常 + * 同步一批数据(支持跨数据库类型) */ public int syncDataBatch(Connection sourceConn, Connection targetConn, String sourceOwner, String targetOwner, - String tableName, String whereClause) throws SQLException { + String tableName, String whereClause, + Integer sourceDbType, Integer targetDbType) throws SQLException { int totalRows = 0; + String selectSql; - // 从源表读取数据(使用带引号的表名保持大小写) - String selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + - "\" WHERE " + whereClause; + // 构建查询SQL(根据源数据库类型) + if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { + selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + + "\" WHERE " + whereClause; + } else { + selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName + + "\" WHERE " + whereClause; + } try (Statement sourceStmt = sourceConn.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = sourceStmt.executeQuery(selectSql)) { - // 设置获取大小为5000,优化大表读取 - sourceStmt.setFetchSize(5000); - - // 获取列信息 + sourceStmt.setFetchSize(5000); // 优化大表读取 ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); - // 准备插入语句(使用带引号的表名保持大小写) - StringBuilder insertSql = new StringBuilder("INSERT INTO \"") - .append(targetOwner.toUpperCase()).append("\".\"").append(tableName).append("\" VALUES ("); - for (int i = 1; i <= columnCount; i++) { - if (i > 1) insertSql.append(", "); - insertSql.append("?"); + // 构建插入SQL(根据目标数据库类型) + String insertSql; + if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { + insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; + } else { + insertSql = "INSERT INTO \"" + tableName + "\" VALUES ("; } - insertSql.append(")"); + + for (int i = 1; i <= columnCount; i++) { + if (i > 1) insertSql += ", "; + insertSql += "?"; + } + insertSql += ")"; // 批量插入 - try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql.toString())) { + try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) { int batchSize = 0; while (rs.next()) { for (int i = 1; i <= columnCount; i++) { - pstmt.setObject(i, rs.getObject(i)); + Object value = rs.getObject(i); + + // 特殊处理Oracle的TIMESTAMP类型到PostgreSQL + if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) { + String columnType = metaData.getColumnTypeName(i); + if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { + Timestamp ts = rs.getTimestamp(i); + if (ts != null) { + pstmt.setTimestamp(i, ts); + continue; + } + } + } + pstmt.setObject(i, value); } pstmt.addBatch(); batchSize++; @@ -319,12 +348,9 @@ public class SyncDataJob implements Job { /** * 计算指定日期加上指定天数后的日期 - * @param date 基准日期 - * @param days 要添加的天数 - * @return 计算后的日期 */ private static Date addDays(Date date, int days) { long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000; return new Date(time); } -} +} \ No newline at end of file