Java实现高效数据批量处理的全套解决方案

2025年09月08日/ 浏览 5

Java实现高效数据批量处理的全套解决方案

一、批量导入导出的核心实现

1.1 基于POI的Excel批处理

java
// 使用SXSSFWorkbook处理百万级数据
SXSSFWorkbook workbook = new SXSSFWorkbook(1000); // 保持1000行在内存中
Sheet sheet = workbook.createSheet(“数据导入”);

// 数据批写入优化
List dataList = getBatchData();
for(int i=0; i<dataList.size(); i++){
Row row = sheet.createRow(i);
row.createCell(0).setCellValue(dataList.get(i).getField1());
// 每1000行刷新到磁盘
if(i % 1000 == 0){
((SXSSFSheet)sheet).flushRows(1000);
}
}

1.2 数据库批量操作

java
// JDBC批处理示例
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(“INSERT INTO table VALUES(?,?)”);

final int BATCH_SIZE = 2000;
for(int i=0; i<dataList.size(); i++){
ps.setString(1, dataList.get(i).getField1());
ps.addBatch();

if(i%BATCH_SIZE == 0){
    ps.executeBatch();
    conn.commit();
}

}
ps.executeBatch(); // 处理剩余记录

二、提升处理效率的6个关键技巧

2.1 内存分块处理

采用分页查询+批处理组合方案:
java
int pageSize = 5000;
int total = getTotalCount();
for(int page=1; page<=total/pageSize+1; page++){
List<Data> chunk = queryByPage(page,pageSize);
processBatch(chunk); // 处理当前分页数据
System.gc(); // 主动触发垃圾回收
}

2.2 多线程并行处理

java
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()*2);

List<Future> futures = new ArrayList<>();
for(DataBatch batch : splitToBatches(dataList)){
futures.add(executor.submit(() -> processSingleBatch(batch)));
}

// 合并处理结果
List results = futures.stream()
.map(f -> {
try { return f.get(); }
catch(Exception e) { return null; }
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

三、异常处理与事务控制

3.1 健壮的异常恢复机制

java
try {
// 批量处理逻辑
} catch (BatchUpdateException e) {
int[] successCount = e.getUpdateCounts();
// 构建失败记录重试队列
List<Data> retryList = new ArrayList<>();
for(int i=0; i<successCount.length; i++){
if(successCount[i] == Statement.EXECUTE_FAILED){
retryList.add(dataList.get(i));
}
}
// 记录失败日志并报警
logRetryRecords(retryList);
}

3.2 分布式事务方案

java
// 使用Spring的编程式事务
TransactionTemplate template = new TransactionTemplate(transactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATIONREQUIRESNEW);

template.execute(status -> {
try {
importService.processBatch(batchData);
fileService.markProcessed(fileId);
return true;
} catch(Exception e) {
status.setRollbackOnly();
throw new BatchException(“批量导入失败”,e);
}
});

四、性能优化实战建议

  1. 缓冲区调优:根据服务器内存调整JVM参数,特别是-Xmx和-XX:MaxDirectMemorySize

  2. 连接池配置:properties

Tomcat JDBC连接池优化

spring.datasource.tomcat.max-active=50
spring.datasource.tomcat.max-idle=20
spring.datasource.tomcat.min-idle=10
spring.datasource.tomcat.max-wait=30000

  1. JVM层优化

– 添加-XX:+UseG1GC启用G1垃圾回收器
– 设置-XX:MaxGCPauseMillis=200控制GC停顿时间

  1. 异步日志方案:采用Log4j2的异步日志写入,避免I/O阻塞
    xml
    <AsyncLogger name="importLogger" level="info"
    includeLocation="true">
    <AppenderRef ref="ImportFileAppender"/>
    </AsyncLogger>

五、扩展应用场景

5.1 与消息队列结合

java
// 处理完成后发送MQ通知
batchImportResult.setProcessTime(System.currentTimeMillis());
rabbitTemplate.convertAndSend(
"import.finish.queue",
result,
message -> {
message.getMessageProperties()
.setHeader("retry-count", 0);
return message;
}
);

5.2 断点续传实现

java
// 记录处理进度
public class ImportProgress {
private Long jobId;
private Integer totalCount;
private Integer processed;
private String checkpointFile;
private LocalDateTime lastUpdate;

// 每处理1000条更新一次进度
public void updateProgress(int batchSize){
    this.processed += batchSize;
    saveCheckpoint();
}

}

通过合理组合这些技术方案,可以实现每日千万级数据的高效处理。实际项目中需要根据具体业务场景进行参数调优,建议在预发布环境进行充分的压力测试。

picture loss