记一次多线程批量处理耗时问题

吴书松
吴书松
发布于 2025-09-30 / 5 阅读
0
0

记一次多线程批量处理耗时问题

场景:数据库1000万条数据,可以根据批次分组成4000个小组,然后通过多线程,去处理4000个小组,每个小组处理一批数据,将数据同步到redis

线程池配置:

*     1、corePoolSize=10

*     2、maxPoolSize=128

*     3、queueCapacity=1000
// 4000个分组

List<String> groupList = new ArrayList<>();

List<Future<Integer>> futureList = new ArrayList<>();

for (int groupId = 0; groupId < 4000; groupId++) {

    Future<Integer> submit = ThreadUtils.POOL_EXECUTOR.submit(new SyncData2RedisCallable<>(groupId));

    futureList.add(submit);

}

//SyncData2RedisCallable 内容

        int idLast = 0;

        List<DbEntity> dbList = new ArrayList<>();

        while (true) {

            //根据id分页

            dbList = dbService.queryList(idLast, 4000);

            if (CollUtil.isEmpty(dbList)) {

                break;

            }

            idLast = dbList.get(dbList.size() - 1).getId();

            

            //4000 分批 -》 1000*4

            List<List<DbEntity>> dbListList = ListUtil.partition(dbList, 1000);

            

            //方案0 不用线程

            for (List<IotCardBaseInfoEntity> batch : dbListList) {

                this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);

                //已处理数量

                result.setNum(result.getNum() + batch.size());

            }

            //方案1 自定义线程池

//          ExecutorService executor = Executors.newFixedThreadPool(4);

//          int doNum = processData(dbListList,executor);

//          result.setNum(result.getNum() + doNum);

            //方案2、异步非堵塞Flux

//          int doNum = processReactive(dbListList);

//          result.setNum(result.getNum() + doNum);

            //方案3、并行流处理

//          int subTotal = dbListList.parallelStream()

//          .mapToInt(batch -> {

//              if(CollUtil.isEmpty(batch)){

//                  return 0;

//              }

//              this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);

//              return batch.size();

//          })

//          .sum();

//          result.setNum(result.getNum() + subTotal);

            if (dbList.size() < NumFilterConstant.DB_MAX_BATCH_NUM) {

                dbList.clear();

                break;

            } else{

                dbList.clear();

            }

        }

        

        public int processData(List<List<DbEntity>> dbListList, ExecutorService executor) {

            List<CompletableFuture<Integer>> futures = dbListList.stream()

                    .map(batch -> CompletableFuture.supplyAsync(() -> {

                        try {

                            if(CollUtil.isEmpty(batch)){

                                return 0;

                            }

                            this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);

                            return batch.size();

                        } catch (Exception e) {

                            log.error("处理批次数据失败", e);

                            return 0;

                        }

                    }, null == executor?ThreadUtils.POOL_EXECUTOR:executor))

                    .collect(Collectors.toList());

            // 使用allOf等待所有任务完成,然后汇总结果

            CompletableFuture<Void> allFutures = CompletableFuture.allOf(

                    futures.toArray(new CompletableFuture[0]));

            return allFutures.thenApply(v ->

                    futures.stream()

                            .map(CompletableFuture::join)

                            .mapToInt(Integer::intValue)

                            .sum()

            ).join();

        }

        public int processReactive(List<List<DbEntity>> dbListList) {

            Integer num = Flux.fromIterable(dbListList)

                    .flatMap(batch -> Mono.fromCallable(() -> {

                                        if(CollUtil.isEmpty(batch)){

                                            return 0;

                                        }

                                        this.iBloomService.doSyncData(batch, this.interCode, this.overFlog);

                                        return batch.size();

                                    })

                                    // 专为阻塞操作设计的调度器

                                    .subscribeOn(Schedulers.boundedElastic())

                            // 控制并发度

                            , 4)

                    .reduce(0, Integer::sum)

                    .onErrorReturn(0)

                    .block();

            return num;

        }

上面四种方式存在问题:

  • 1、不使用线程,循环同步,耗时

  • 2、使用自定义线程池,需要注意线程池的配置,用完后要销毁,线程池大小配置,因为是在外层循环中,所以线程池大小配置,不能太大,否则容易导致资源耗尽

  • 3、使用异步非堵塞Flux,内部其实用的也是一个内部线程池,所以线程池配置,不能太大,否则容易导致资源耗尽

Flux内部线程池配置: .subscribeOn(Schedulers.boundedElastic()):

Schedulers.immediate()无线程的线程池,类似no-op的一个封装类,执行Runnable时会立即用当前线程执行,只是为了某种占位时用。也就是使用当前主线程来执行

Schedulers.single()单线程的线程池,只会有一个线程在执行,可以用于一些低优先级场景的情况下。If you want a per-call dedicated thread, use Schedulers.newSingle() for each call.

Schedulers.elastic()无界的线程池,可以创建无数多的线程,只要有任务需要执行且没有空闲的线程时,就会创建新线程,类似于jdk中的Executors.newCachedThreadPool()方式。由于,可创建的线程数不受控制,当任务量大的时候,会导致系统资源被耗尽,进而引起崩溃。

 Schedulers.boundedElastic()有界的线程池,是对于Schedulers.elastic()的改进,在保证能尽可能创建线程的同时,又限定了总数,减少系统崩溃的可能性。默认最多可创建CPU核数的10倍,同时当线程空闲的时候会被回收(默认60s),在线程数达到极限后,单个线程最多会让(默认)100000个任务进入队列,所以总共可容纳的任务数是线程数100000,基本上已经可以容纳足够的任务数了,效果基本等同于elastic。因此,boundedElastic()相对于elastic()更可控,所以通常推荐使用boundedElastic()。

Schedulers.parallel()固定的线程池,用于执行无阻塞的任务,因此线程数默认等同于CPU核数,类似于类似于jdk中的Executors.newFixedThreadPool()。
  • 4、并行流处理

parallelStream和completablefuture 并行流,内部使用的是一个固定大小的公用线程池,ForkJoinPool.commonPool(); 因为是公用的,所以容易耗尽,如果这个线程池耗尽,则会阻塞,导致程序卡顿。

注意: 外层也是一个多线程处理任务,线程内部使用parallelStream,其内部使用一个公用的线程池,如果这个线程池耗尽,则会导致外层整个循环阻塞,导致程序卡顿。可以使用自定义线程池,来处理parallelStream,避免线程池耗尽,导致外层循环阻塞。但是自定义线程池,仍旧需要注意方式2存在的问题。

本次问题:

  • 问题1:外层线程池配置不合理,queueCapacity=5000,配置过大,因为总共4000个分组,达不到5000个堵塞队列,线程池不会创建新的线程去处理任务,仍旧使用的是corePoolSize=10个线程去处理任务,maxPoolSize=128没生效,线程池不会扩容。解决:将queueCapacity=1000,超过1000,就创建新线程处理任务,线程池会扩容。

  • 问题2:外层多线程循环处理,线程池内部使用了parallelStream,内部使用一个公用的线程池,如果这个线程池耗尽,则会导致外层整个循环阻塞,导致程序卡顿。解决:线程内部,使用单线程处理问题(方式1),或者使用自定义线程池,或者使用Flux(仍旧需要注意线程池配置,不能太大,否则容易导致资源耗尽)


评论