一、问题引入:为什么需要谨慎使用foreach?
在MyBatis中进行批量插入时,很多开发者习惯使用<foreach>标签来拼接SQL语句:
xml
<insert id="batchInsert" parameterType="java.util.List"> INSERT INTO user (id, name, email) VALUES <foreach collection="list" item="item" index="index" separator=","> (#{item.id}, #{item.name}, #{item.email}) </foreach> </insert>这种方式看似简洁高效,但在处理几千条甚至上万条数据时,会带来严重的性能问题和潜在风险。
二、foreach批量插入的工作原理
2.1 SQL语句的生成
当传入1000条数据时,生成的SQL语句如下:
sql
INSERT INTO user (id, name, email) VALUES (1, '张三', 'zhangsan@example.com'), (2, '李四', 'lisi@example.com'), ... -- 共1000行 (1000, '王五', 'wangwu@example.com');
2.2 JDBC的执行过程
SQL语句发送到数据库
数据库解析SQL(词法分析、语法分析、语义分析)
数据库执行计划生成
数据插入操作
返回执行结果
三、foreach批量插入的五大问题
3.1 SQL语句过长问题
问题表现:
sql
-- 假设每条记录约100字符,10000条记录就是100万字符 -- MySQL默认max_allowed_packet=4MB -- SQL语句很容易超过限制,导致异常: -- Packet for query is too large (xxxx > 4194304)
影响范围:
MySQL:
max_allowed_packet限制(默认4MB)Oracle:SQL文本长度限制
SQL Server:批处理大小限制
3.2 数据库连接超时
java
// 执行超长时间SQL会导致: // 1. 数据库连接被长时间占用 // 2. 连接池连接耗尽 // 3. 应用服务器线程阻塞 try { // 执行几万条的批量插入 userMapper.batchInsert(hugeList); // 可能执行几分钟 } catch (Exception e) { // java.sql.SQLException: Timeout expired }3.3 事务管理风险
问题场景:
java
@Transactional public void batchProcess() { // 插入1万条数据 batchInsert(dataList); // 执行时间过长 // 其他操作... // 整个事务持有锁时间过长,影响并发 }3.4 内存消耗过大
java
// MyBatis在构建SQL时的内存消耗 StringBuilder sqlBuilder = new StringBuilder(); for (User user : userList) { // 10000条 // 每次拼接都产生新的字符串 sqlBuilder.append("(") .append(user.getId()) .append(",") .append(user.getName()) .append("),"); } // 最终SQL字符串占用大量内存3.5 错误处理困难
sql
-- 如果第5000条数据违反唯一约束 -- 整个插入操作全部回滚 -- 难以定位具体是哪条数据有问题 INSERT INTO user VALUES (1, 'A'), ✓ (2, 'B'), ✓ ... (5000, 'A'), ✗ -- 重复,违反唯一约束 ... (10000, 'Z'); -- 都不会执行
四、性能对比测试
4.1 测试环境
数据量:1万条用户记录
数据库:MySQL 8.0
网络:本地测试,排除网络延迟
4.2 测试结果
| 插入方式 | 执行时间 | 内存峰值 | SQL长度 | 成功/失败处理 |
|---|---|---|---|---|
| foreach拼接 | 12.3秒 | 350MB | 1.2MB | 全有或全无 |
| JDBC批处理 | 4.8秒 | 50MB | 短 | 可部分成功 |
| 分批次插入 | 5.2秒 | 60MB | 短 | 灵活控制 |
4.3 代码测试示例
java
public class BatchInsertBenchmark { @Test public void testForeachPerformance() { List<User> dataList = generateTestData(10000); long start = System.currentTimeMillis(); // 使用foreach方式 userMapper.batchInsertWithForeach(dataList); long end = System.currentTimeMillis(); System.out.println("Foreach方式耗时: " + (end - start) + "ms"); } @Test public void testBatchExecutorPerformance() { List<User> dataList = generateTestData(10000); SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); UserMapper mapper = sqlSession.getMapper(UserMapper.class); long start = System.currentTimeMillis(); for (User user : dataList) { mapper.insert(user); } sqlSession.commit(); long end = System.currentTimeMillis(); System.out.println("批处理方式耗时: " + (end - start) + "ms"); } }五、替代方案详解
5.1 MyBatis Batch执行器(推荐)
配置方式:
java
// 获取Batch模式的SqlSession SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); try { UserMapper mapper = sqlSession.getMapper(UserMapper.class); for (int i = 0; i < dataList.size(); i++) { mapper.insert(dataList.get(i)); // 每1000条提交一次,避免内存溢出 if (i % 1000 == 0 || i == dataList.size() - 1) { sqlSession.commit(); // 清理缓存,防止内存溢出 sqlSession.clearCache(); } } } finally { sqlSession.close(); }Spring集成配置:
yaml
# application.yml mybatis: executor-type: batch # 全局设置为批处理模式
java
// 或使用注解指定某个方法使用批处理 @Transactional public void batchInsert(List<User> users) { for (User user : users) { // 每次插入都会添加到批处理中 userMapper.insert(user); } // 事务提交时批量执行 }5.2 分批次foreach插入
xml
<!-- 批次大小可配置 --> <insert id="batchInsertByChunk"> INSERT INTO user (id, name, email) VALUES <foreach collection="list" item="item" index="index" separator=","> (#{item.id}, #{item.name}, #{item.email}) </foreach> </insert>java
@Service public class UserService { @Autowired private UserMapper userMapper; // 批次大小,可配置 private static final int BATCH_SIZE = 1000; public void batchInsert(List<User> userList) { if (CollectionUtils.isEmpty(userList)) { return; } int total = userList.size(); int pageCount = (total + BATCH_SIZE - 1) / BATCH_SIZE; for (int i = 0; i < pageCount; i++) { int fromIndex = i * BATCH_SIZE; int toIndex = Math.min((i + 1) * BATCH_SIZE, total); List<User> subList = userList.subList(fromIndex, toIndex); // 每批次执行 userMapper.batchInsertByChunk(subList); } } }5.3 使用VALUES多个语法优化
xml
<insert id="batchInsertMultiValues"> <foreach collection="list" item="chunk" separator=";"> INSERT INTO user (id, name, email) VALUES <foreach collection="chunk" item="item" separator=","> (#{item.id}, #{item.name}, #{item.email}) </foreach> </foreach> </insert>5.4 数据库特定批量操作
MySQL的LOAD DATA INFILE:
java
public void batchInsertWithLoadData(List<User> users) throws IOException { // 1. 生成CSV文件 File tempFile = File.createTempFile("batch_", ".csv"); try (PrintWriter writer = new PrintWriter(tempFile)) { for (User user : users) { writer.println(String.format("%s,%s,%s", user.getId(), user.getName(), user.getEmail())); } } // 2. 执行LOAD DATA命令 String sql = String.format( "LOAD DATA LOCAL INFILE '%s' INTO TABLE user " + "FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'", tempFile.getAbsolutePath()); jdbcTemplate.execute(sql); // 3. 删除临时文件 tempFile.delete(); }六、事务与错误处理策略
6.1 灵活的事务控制
java
@Service public class UserBatchService { @Autowired private PlatformTransactionManager transactionManager; public void batchInsertWithTransactionControl(List<User> users) { // 每批次独立事务 int batchSize = 1000; for (int i = 0; i < users.size(); i += batchSize) { int end = Math.min(i + batchSize, users.size()); List<User> batch = users.subList(i, end); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); TransactionStatus status = transactionManager.getTransaction(def); try { userMapper.batchInsert(batch); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); // 记录失败批次,继续处理后续批次 log.error("批次 {}-{} 插入失败: {}", i, end, e.getMessage()); // 可选:将失败批次拆分为更小的批次重试 retryWithSmallerBatch(batch); } } } private void retryWithSmallerBatch(List<User> failedBatch) { // 重试逻辑:拆分为单个插入或更小的批次 for (User user : failedBatch) { try { userMapper.insert(user); } catch (Exception e) { log.error("插入用户失败: {}, 错误: {}", user, e.getMessage()); // 记录到失败队列,人工处理 } } } }6.2 错误数据记录与跳过
java
public class BatchInsertResult<T> { private int successCount; private int failureCount; private List<T> successItems; private List<FailedItem<T>> failedItems; // 失败记录详情 public static class FailedItem<T> { private T item; private Exception error; private String errorMessage; } } @Service public class SafeBatchService { public BatchInsertResult<User> safeBatchInsert(List<User> users) { BatchInsertResult<User> result = new BatchInsertResult<>(); for (User user : users) { try { userMapper.insert(user); result.getSuccessItems().add(user); result.setSuccessCount(result.getSuccessCount() + 1); } catch (Exception e) { // 记录失败信息,但不中断流程 BatchInsertResult.FailedItem<User> failedItem = new BatchInsertResult.FailedItem<>(); failedItem.setItem(user); failedItem.setError(e); failedItem.setErrorMessage(e.getMessage()); result.getFailedItems().add(failedItem); result.setFailureCount(result.getFailureCount() + 1); } } return result; } }七、MyBatis 3.5+ 的新特性
7.1 批量插入的增强支持
xml
<!-- MyBatis 3.5+ 支持rewriteBatchedStatements配置 --> <insert id="batchInsert" useGeneratedKeys="true" keyProperty="id"> INSERT INTO user (name, email) VALUES (#{name}, #{email}) </insert>java
// 配合JDBC参数优化 @Configuration public class DataSourceConfig { @Bean public DataSource dataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/test?" + "rewriteBatchedStatements=true" // MySQL批处理重写 + "&useServerPrepStmts=true" // 服务器端预处理 + "&cachePrepStmts=true" // 缓存预处理语句 + "&prepStmtCacheSize=250" // 预处理缓存大小 + "&prepStmtCacheSqlLimit=2048"); // SQL长度限制 return dataSource; } }7.2 流式插入支持
java
@Mapper public interface UserMapper { @Insert({ "<script>", "INSERT INTO user (name, email) VALUES", "<foreach collection='list' item='item' separator=','>", "(#{item.name}, #{item.email})", "</foreach>", "</script>" }) @Options(flushCache = Options.FlushCachePolicy.TRUE) void batchInsertStream(@Param("list") Collection<User> users); } // 使用Stream API处理大数据 public void batchInsertLargeData(List<User> hugeList) { try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { UserMapper mapper = sqlSession.getMapper(UserMapper.class); // 使用流式处理,避免内存溢出 hugeList.stream() .collect(Collectors.groupingBy(user -> user.getId() % 10)) // 分组 .forEach((key, batch) -> { mapper.batchInsertStream(batch); sqlSession.commit(); sqlSession.clearCache(); }); } }八、企业级最佳实践
8.1 配置化批量处理框架
java
@Component public class BatchProcessor<T> { @Value("${batch.size:1000}") private int batchSize; @Value("${batch.max.retry:3}") private int maxRetry; @Value("${batch.timeout:300}") private long timeoutSeconds; /** * 通用批量处理方法 */ public BatchResult<T> process( List<T> dataList, BatchOperation<T> operation, ErrorHandler<T> errorHandler) { BatchResult<T> result = new BatchResult<>(); List<Future<BatchChunkResult<T>>> futures = new ArrayList<>(); // 创建线程池处理批次 ExecutorService executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); try { // 分割数据为多个批次 List<List<T>> batches = Lists.partition(dataList, batchSize); for (List<T> batch : batches) { Callable<BatchChunkResult<T>> task = () -> processBatch(batch, operation, errorHandler); futures.add(executor.submit(task)); } // 收集结果 for (Future<BatchChunkResult<T>> future : futures) { try { BatchChunkResult<T> chunkResult = future.get( timeoutSeconds, TimeUnit.SECONDS ); result.merge(chunkResult); } catch (TimeoutException e) { log.error("批次处理超时", e); result.addTimeoutError(); } } } finally { executor.shutdown(); } return result; } private BatchChunkResult<T> processBatch( List<T> batch, BatchOperation<T> operation, ErrorHandler<T> errorHandler) { BatchChunkResult<T> result = new BatchChunkResult<>(); for (int retry = 0; retry < maxRetry; retry++) { try { operation.execute(batch); result.setSuccess(true); break; } catch (Exception e) { if (retry == maxRetry - 1) { result.setSuccess(false); result.setError(e); // 错误处理 for (T item : batch) { try { operation.execute(Collections.singletonList(item)); } catch (Exception singleError) { errorHandler.handle(item, singleError); } } } else { // 指数退避重试 try { Thread.sleep((long) (Math.pow(2, retry) * 1000)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } } return result; } // 批处理操作接口 public interface BatchOperation<T> { void execute(List<T> items) throws Exception; } // 错误处理器接口 public interface ErrorHandler<T> { void handle(T item, Exception error); } }8.2 监控与告警
java
@Component @Slf4j public class BatchInsertMonitor { @Autowired private MeterRegistry meterRegistry; private Timer batchInsertTimer; private DistributionSummary batchSizeSummary; private Counter successCounter; private Counter failureCounter; @PostConstruct public void init() { batchInsertTimer = Timer.builder("batch.insert.duration") .description("批量插入执行时间") .register(meterRegistry); batchSizeSummary = DistributionSummary.builder("batch.insert.size") .description("批量插入数据量") .register(meterRegistry); successCounter = Counter.builder("batch.insert.success") .description("批量插入成功次数") .register(meterRegistry); failureCounter = Counter.builder("batch.insert.failure") .description("批量插入失败次数") .register(meterRegistry); } public <T> void monitorBatchInsert( String operationName, List<T> data, Supplier<BatchResult<T>> operation) { batchSizeSummary.record(data.size()); Timer.Sample sample = Timer.start(); BatchResult<T> result = null; try { result = operation.get(); sample.stop(batchInsertTimer); if (result.isSuccess()) { successCounter.increment(); log.info("批量插入成功: {}, 数量: {}", operationName, data.size()); } else { failureCounter.increment(); log.warn("批量插入部分失败: {}, 成功: {}, 失败: {}", operationName, result.getSuccessCount(), result.getFailureCount()); } } catch (Exception e) { failureCounter.increment(); log.error("批量插入异常: {}, 错误: {}", operationName, e.getMessage(), e); throw e; } } }8.3 动态批次大小调整
java
@Component public class AdaptiveBatchProcessor { private int currentBatchSize = 1000; private long lastProcessTime = 0; private double averageProcessTime = 0; private int sampleCount = 0; public void adaptiveBatchInsert(List<User> users) { List<List<User>> batches = partitionByAdaptiveSize(users); for (List<User> batch : batches) { long startTime = System.currentTimeMillis(); try { batchInsert(batch); // 更新统计信息 long duration = System.currentTimeMillis() - startTime; updateStatistics(duration, batch.size()); // 动态调整批次大小 adjustBatchSize(duration); } catch (Exception e) { // 如果失败,减小批次大小重试 reduceBatchSize(); retryWithSmallerBatches(batch); } } } private void updateStatistics(long duration, int batchSize) { lastProcessTime = duration; averageProcessTime = (averageProcessTime * sampleCount + duration) / (sampleCount + 1); sampleCount++; } private void adjustBatchSize(long duration) { // 如果处理时间过长,减小批次大小 if (duration > 5000) { // 超过5秒 currentBatchSize = Math.max(100, currentBatchSize / 2); log.info("减小批次大小: {}", currentBatchSize); } // 如果处理时间很短,尝试增大批次大小 else if (duration < 1000 && currentBatchSize < 10000) { currentBatchSize = Math.min(10000, currentBatchSize * 2); log.info("增大批次大小: {}", currentBatchSize); } } private void reduceBatchSize() { currentBatchSize = Math.max(100, currentBatchSize / 2); log.warn("因失败减小批次大小: {}", currentBatchSize); } }九、不同数据库的优化策略
9.1 MySQL优化
java
// MySQL特定优化配置 @Configuration public class MySQLBatchConfig { @Bean @Primary public DataSource dataSource() { HikariConfig config = new HikariConfig(); // 批处理关键参数 config.addDataSourceProperty("rewriteBatchedStatements", "true"); config.addDataSourceProperty("useServerPrepStmts", "true"); config.addDataSourceProperty("cachePrepStmts", "true"); config.addDataSourceProperty("prepStmtCacheSize", "250"); config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); // 连接池优化 config.setMaximumPoolSize(20); config.setMinimumIdle(5); config.setConnectionTimeout(30000); config.setIdleTimeout(600000); config.setMaxLifetime(1800000); return new HikariDataSource(config); } @Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource); // 配置执行器类型 org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(); configuration.setDefaultExecutorType(ExecutorType.BATCH); sessionFactory.setConfiguration(configuration); return sessionFactory.getObject(); } }9.2 PostgreSQL优化
java
// PostgreSQL批量插入优化 public class PostgreSQLBatchInserter { public void batchInsertPostgreSQL(List<User> users) { // PostgreSQL使用COPY命令效率更高 String sql = "COPY user (name, email) FROM STDIN WITH (FORMAT CSV)"; try (Connection conn = dataSource.getConnection(); CopyManager copyManager = new CopyManager( (BaseConnection) conn.unwrap(BaseConnection.class))) { // 生成CSV格式数据 StringWriter writer = new StringWriter(); CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT.withRecordSeparator("\n")); for (User user : users) { csvPrinter.printRecord(user.getName(), user.getEmail()); } csvPrinter.flush(); // 执行COPY命令 copyManager.copyIn(sql, new ByteArrayInputStream(writer.toString().getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new RuntimeException("批量插入失败", e); } } }9.3 Oracle优化
java
// Oracle批量插入优化 public class OracleBatchInserter { public void batchInsertOracle(List<User> users) { // Oracle使用INSERT ALL语法 String sql = "INSERT ALL "; for (int i = 0; i < users.size(); i++) { User user = users.get(i); sql += "INTO user (id, name, email) VALUES (" + user.getId() + ", '" + user.getName() + "', '" + user.getEmail() + "') "; } sql += "SELECT 1 FROM DUAL"; // 注意:Oracle也有SQL长度限制,需要分批次 jdbcTemplate.execute(sql); } // 或者使用Oracle的批处理特性 public void batchInsertWithOracleBatch(List<User> users) { String sql = "INSERT INTO user (id, name, email) VALUES (?, ?, ?)"; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { User user = users.get(i); ps.setLong(1, user.getId()); ps.setString(2, user.getName()); ps.setString(3, user.getEmail()); } @Override public int getBatchSize() { return users.size(); } }); } }十、实战案例:电商订单批量插入
10.1 场景分析
每天需要插入百万级订单数据
数据来自多个渠道
需要保证数据一致性和插入性能
需要实时监控和告警
10.2 解决方案
java
@Service @Slf4j public class OrderBatchService { @Autowired private OrderMapper orderMapper; @Autowired private BatchProcessor<Order> batchProcessor; @Autowired private BatchInsertMonitor monitor; @Value("${order.batch.size:500}") private int orderBatchSize; /** * 订单批量插入主入口 */ public BatchResult<Order> batchInsertOrders(List<Order> orders) { return monitor.monitorBatchInsert( "order.batch.insert", orders, () -> batchProcessor.process( orders, this::insertOrderBatch, this::handleOrderInsertError ) ); } /** * 单个订单批次插入 */ private void insertOrderBatch(List<Order> batch) { // 使用分批次foreach插入 for (List<Order> chunk : Lists.partition(batch, orderBatchSize)) { try { orderMapper.batchInsertOrders(chunk); } catch (DataIntegrityViolationException e) { // 处理唯一约束冲突 handleDuplicateOrders(chunk, e); } catch (Exception e) { log.error("订单批次插入失败", e); throw new BatchInsertException("订单插入失败", e); } } } /** * 处理重复订单 */ private void handleDuplicateOrders(List<Order> orders, Exception cause) { // 1. 先尝试找出重复的订单 List<Order> duplicates = findDuplicateOrders(orders); // 2. 移除重复订单后重试 List<Order> uniqueOrders = orders.stream() .filter(order -> !duplicates.contains(order)) .collect(Collectors.toList()); if (!uniqueOrders.isEmpty()) { orderMapper.batchInsertOrders(uniqueOrders); } // 3. 记录重复订单 logDuplicateOrders(duplicates, cause); } /** * 订单插入错误处理 */ private void handleOrderInsertError(Order order, Exception error) { // 1. 记录到错误表 ErrorRecord errorRecord = new ErrorRecord(); errorRecord.setDataType("ORDER"); errorRecord.setDataId(order.getId()); errorRecord.setErrorMessage(error.getMessage()); errorRecord.setRawData(JsonUtils.toJson(order)); errorRecord.setCreateTime(new Date()); errorRecordMapper.insert(errorRecord); // 2. 发送告警 sendAlert(order, error); // 3. 记录日志 log.error("订单插入失败: {}, 订单ID: {}", error.getMessage(), order.getId(), error); } /** * 异步批量插入(适用于高并发场景) */ @Async("orderBatchExecutor") public CompletableFuture<BatchResult<Order>> asyncBatchInsertOrders( List<Order> orders, String batchId) { log.info("开始异步批量插入订单,批次ID: {}, 订单数: {}", batchId, orders.size()); BatchResult<Order> result = batchInsertOrders(orders); // 插入完成后发送通知 sendBatchCompleteNotification(batchId, result); return CompletableFuture.completedFuture(result); } } @Configuration @EnableAsync public class AsyncBatchConfig { @Bean("orderBatchExecutor") public Executor orderBatchExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("order-batch-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }10.3 分表分库批量插入
java
@Service public class ShardingOrderBatchService { /** * 根据分片键进行批量插入 */ public void batchInsertWithSharding(List<Order> orders) { // 1. 按分片键分组 Map<String, List<Order>> shardingMap = orders.stream() .collect(Collectors.groupingBy(this::getShardingKey)); // 2. 并行处理每个分片 shardingMap.entrySet().parallelStream().forEach(entry -> { String shardingKey = entry.getKey(); List<Order> shardOrders = entry.getValue(); // 3. 获取对应的数据源和Mapper SqlSessionFactory shardSessionFactory = shardingManager.getSessionFactory(shardingKey); try (SqlSession sqlSession = shardSessionFactory.openSession(ExecutorType.BATCH)) { OrderMapper shardMapper = sqlSession.getMapper(OrderMapper.class); // 4. 分批插入 for (List<Order> batch : Lists.partition(shardOrders, 1000)) { shardMapper.batchInsertOrders(batch); sqlSession.commit(); sqlSession.clearCache(); } } }); } private String getShardingKey(Order order) { // 根据用户ID取模分片 long userId = order.getUserId(); int shardCount = 16; // 16个分片 return "shard_" + (userId % shardCount); } }十一、性能优化总结
11.1 核心原则
小批量多次提交:避免单次操作数据量过大
使用批处理模式:充分利用JDBC批处理能力
合理设置批次大小:根据数据库和网络环境调整
监控和调优:持续监控性能,动态调整策略
错误隔离:单条记录失败不影响整体流程
11.2 配置建议
yaml
# application.yml 批量插入配置 batch: insert: enabled: true size: 1000 # 每批次大小 timeout: 30 # 超时时间(秒) retry: enabled: true max-attempts: 3 backoff: 1000 # 重试间隔(毫秒) executor: type: BATCH # MyBatis执行器类型 flush-statements: true # 是否刷新语句 flush-cache: true # 是否刷新缓存 jdbc: rewrite-batched-statements: true # MySQL批处理重写 use-server-prep-stmts: true # 使用服务器预处理 cache-prep-stmts: true # 缓存预处理语句 prep-stmt-cache-size: 250 # 预处理缓存大小 prep-stmt-cache-sql-limit: 2048 # SQL长度限制
11.3 代码规范
java
/** * 批量插入代码规范示例 */ public class BatchInsertCodeStandard { /** * 好的实践:使用批处理模式,分批次提交 */ public void goodPractice(List<Data> dataList) { try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { Mapper mapper = sqlSession.getMapper(Mapper.class); int batchSize = 1000; for (int i = 0; i < dataList.size(); i++) { mapper.insert(dataList.get(i)); // 定期提交,避免内存溢出 if (i % batchSize == 0 || i == dataList.size() - 1) { sqlSession.commit(); sqlSession.clearCache(); } } } } /** * 不好的实践:使用foreach拼接大量数据 */ public void badPractice(List<Data> dataList) { // 警告:当dataList很大时,会导致SQL语句过长 mapper.batchInsertWithForeach(dataList); // 慎用! } /** * 折中方案:分批次使用foreach */ public void compromisePractice(List<Data> dataList) { int batchSize = 1000; for (int i = 0; i < dataList.size(); i += batchSize) { int end = Math.min(i + batchSize, dataList.size()); List<Data> batch = dataList.subList(i, end); // 每个批次使用foreach,但批次大小可控 mapper.batchInsertWithForeach(batch); } } }十二、常见问题与解决方案
12.1 Q&A
Q1:foreach批量插入的最大限制是多少?
A:这取决于数据库配置。MySQL的max_allowed_packet默认4MB,建议单次插入不超过1000条。
Q2:批处理模式为什么比foreach快?
A:批处理模式使用JDBC的addBatch()和executeBatch()方法,数据库只需要解析一次SQL语句,然后批量执行参数。
Q3:批量插入时如何获取自增ID?
A:在批处理模式下,需要在提交后才能获取到自增ID。可以使用useGeneratedKeys和keyProperty配置。
Q4:批量插入时出现死锁怎么办?
A:减小批次大小,调整事务隔离级别,或者按固定顺序插入数据。
Q5:如何监控批量插入的性能?
A:使用Micrometer、Prometheus等监控工具,监控执行时间、批次大小、成功率等指标。
12.2 故障处理预案
java
@Component public class BatchInsertFailover { private static final int MAX_RETRY = 3; private static final int INITIAL_BATCH_SIZE = 1000; /** * 带降级的批量插入 */ public BatchResult batchInsertWithFallback(List<Data> dataList) { // 1. 首先尝试最优方案(批处理模式) try { return tryBatchInsert(dataList); } catch (Exception e1) { log.warn("批处理模式失败,尝试分批次插入", e1); // 2. 降级方案:分批次插入 try { return tryChunkInsert(dataList); } catch (Exception e2) { log.warn("分批次插入失败,降级为单条插入", e2); // 3. 最终降级:单条插入(最慢但最稳定) return trySingleInsert(dataList); } } } /** * 紧急情况下的数据恢复 */ public void emergencyRecovery(List<Data> failedData) { // 1. 将失败数据写入临时文件 File tempFile = writeToTempFile(failedData); // 2. 使用数据库原生导入工具 try { importWithDatabaseTool(tempFile); } catch (Exception e) { // 3. 人工介入处理 notifyAdministrator(failedData, e); } finally { // 4. 清理临时文件 tempFile.delete(); } } }结语
MyBatis批量插入是一个常见但需要谨慎处理的需求。foreach标签虽然使用方便,但在处理大量数据时存在明显的性能和稳定性问题。通过本文的分析,我们了解到:
foreach批量插入适合小批量数据(几百条以内)
大批量数据应使用MyBatis的批处理模式
需要根据业务场景选择合适的批次大小
完善的错误处理和监控是必不可少的
不同数据库有不同的优化策略