跳至主要內容

线程池的使用

Mr.Xu约 694 字大约 2 分钟

线程池的使用

创建线程池

@Component
public class ThreadUtil {
    @Bean(value = "executorService")
    public ExecutorService getExecutorService() {
        return new ThreadPoolExecutor(20, 20, 3600, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000));
    }
}

使用线程池

  • 不需要获取结果
    @Autowired
    private ExecutorService executorService;
    @Override
    public void queryECSMetricInfo() {
        //发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
        List<Message> messages = Collections.synchronizedList(new ArrayList<>());
        boolean flag = true;
        //获取上次查询时间
        Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
        Long endTime = System.currentTimeMillis();
        try {
            //查询出所有的运行中的实例
            List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
            if (CollectionUtils.isEmpty(cloudInstances)) {
                return;
            }
            //定义计数器
            CountDownLatch latch = new CountDownLatch(cloudInstances.size());
            //遍历查询
            for (CloudInstanceAssetDto instance : cloudInstances) {
                executorService.submit(() -> {
                    try {
                        //获取内网流出带宽,并将结果封装到消息集合中
                        dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
                                startTime, endTime, instance, messages);
                    } catch (Exception e) {
                        log.error("获取ECS的指标数据-多线程处理任务异常!", e);
                    } finally {
                        latch.countDown();
                    }

                });
            }
            //等待任务执行完毕
            latch.await();
          //  service.shutdown();
            //将最终的消息集合发送到kafka
            if (CollectionUtils.isNotEmpty(messages)) {
                Map<String, Object> tagMap = new HashMap<>();
                tagMap.put("group", "ecs-metric");
                for (int i = 0; i < messages.size(); i++) {
                    if (StringUtils.isNotBlank(messages.get(i).getValue())
                            && "noSuchInstance".equals(messages.get(i).getValue())) {
                        continue;
                    }
                    Message message = messages.get(i);
                    OpenTSDBUtils.putData(message.getCode() + "-" + message.getHost(), Long.parseLong(message.getTime()),
                            Double.valueOf(message.getValue()), tagMap);
                }
            }
        } catch (Exception e) {
            flag = false;
            log.error("获取ECS的指标数据失败", e);
        }
        //更新记录上次查询时间
        if (flag) {
            QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
            queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
            queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
        }
    }
  • 需要获取线程执行结果
    @Autowired
    private ExecutorService executorService;
    @Override
    public void queryECSMetricInfo() {
        //发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
        List<Future<List<Message>>> futures  =new ArrayList<>();
        List<Message> messages = new ArrayList<>();
        boolean flag = true;
        //获取上次查询时间
        Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
        Long endTime = System.currentTimeMillis();
        try {
            //查询出所有的运行中的实例
            List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
            if (CollectionUtils.isEmpty(cloudInstances)) {
                return;
            }
            //定义计数器
            CountDownLatch latch = new CountDownLatch(cloudInstances.size());
            //遍历查询
            for (CloudInstanceAssetDto instance : cloudInstances) {
               Future<Message> future = executorService.submit(() -> {
                    try {
                        //获取内网流出带宽,并将结果封装到消息集合中
                        List<Message> curMessages = dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
                                startTime, endTime, instance);
                        return curMessages;
                    } catch (Exception e) {
                        log.error("获取ECS的指标数据-多线程处理任务异常!", e);
                    } finally {
                        latch.countDown();
                    }
                
                });
              
                futures.add(future);
            }
            //等待任务执行完毕
            latch.await();
          //  service.shutdown();
            //处理线程执行结果
            for(Future<List<Message>> future:futures){
                messages.add(future.get());
            }
            //将最终的消息集合发送到kafka
            if (CollectionUtils.isNotEmpty(messages)) {
                Map<String, Object> tagMap = new HashMap<>();
                tagMap.put("group", "ecs-metric");
                for (int i = 0; i < messages.size(); i++) {
                    if (StringUtils.isNotBlank(messages.get(i).getValue())
                            && "noSuchInstance".equals(messages.get(i).getValue())) {
                        continue;
                    }
                    Message message = messages.get(i);
                    OpenTSDBUtils.putData(message.getCode() + "-" + message.getHost(), Long.parseLong(message.getTime()),
                            Double.valueOf(message.getValue()), tagMap);
                }
            }
        } catch (Exception e) {
            flag = false;
            log.error("获取ECS的指标数据失败", e);
        }
        //更新记录上次查询时间
        if (flag) {
            QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
            queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
            queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
        }
    }