线程池的使用
约 694 字大约 2 分钟
线程池的使用
创建线程池
@Component
public class ThreadUtil {
@Bean(value = "executorService")
public ExecutorService getExecutorService() {
return new ThreadPoolExecutor(20, 20, 3600, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000));
}
}
使用线程池
- 不需要获取结果
@Autowired
private ExecutorService executorService;
@Override
public void queryECSMetricInfo() {
//发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
List<Message> messages = Collections.synchronizedList(new ArrayList<>());
boolean flag = true;
//获取上次查询时间
Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
Long endTime = System.currentTimeMillis();
try {
//查询出所有的运行中的实例
List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
if (CollectionUtils.isEmpty(cloudInstances)) {
return;
}
//定义计数器
CountDownLatch latch = new CountDownLatch(cloudInstances.size());
//遍历查询
for (CloudInstanceAssetDto instance : cloudInstances) {
executorService.submit(() -> {
try {
//获取内网流出带宽,并将结果封装到消息集合中
dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
startTime, endTime, instance, messages);
} catch (Exception e) {
log.error("获取ECS的指标数据-多线程处理任务异常!", e);
} finally {
latch.countDown();
}
});
}
//等待任务执行完毕
latch.await();
// service.shutdown();
//将最终的消息集合发送到kafka
if (CollectionUtils.isNotEmpty(messages)) {
Map<String, Object> tagMap = new HashMap<>();
tagMap.put("group", "ecs-metric");
for (int i = 0; i < messages.size(); i++) {
if (StringUtils.isNotBlank(messages.get(i).getValue())
&& "noSuchInstance".equals(messages.get(i).getValue())) {
continue;
}
Message message = messages.get(i);
OpenTSDBUtils.putData(message.getCode() + "-" + message.getHost(), Long.parseLong(message.getTime()),
Double.valueOf(message.getValue()), tagMap);
}
}
} catch (Exception e) {
flag = false;
log.error("获取ECS的指标数据失败", e);
}
//更新记录上次查询时间
if (flag) {
QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
}
}
- 需要获取线程执行结果
@Autowired
private ExecutorService executorService;
@Override
public void queryECSMetricInfo() {
//发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
List<Future<List<Message>>> futures =new ArrayList<>();
List<Message> messages = new ArrayList<>();
boolean flag = true;
//获取上次查询时间
Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
Long endTime = System.currentTimeMillis();
try {
//查询出所有的运行中的实例
List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
if (CollectionUtils.isEmpty(cloudInstances)) {
return;
}
//定义计数器
CountDownLatch latch = new CountDownLatch(cloudInstances.size());
//遍历查询
for (CloudInstanceAssetDto instance : cloudInstances) {
Future<Message> future = executorService.submit(() -> {
try {
//获取内网流出带宽,并将结果封装到消息集合中
List<Message> curMessages = dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
startTime, endTime, instance);
return curMessages;
} catch (Exception e) {
log.error("获取ECS的指标数据-多线程处理任务异常!", e);
} finally {
latch.countDown();
}
});
futures.add(future);
}
//等待任务执行完毕
latch.await();
// service.shutdown();
//处理线程执行结果
for(Future<List<Message>> future:futures){
messages.add(future.get());
}
//将最终的消息集合发送到kafka
if (CollectionUtils.isNotEmpty(messages)) {
Map<String, Object> tagMap = new HashMap<>();
tagMap.put("group", "ecs-metric");
for (int i = 0; i < messages.size(); i++) {
if (StringUtils.isNotBlank(messages.get(i).getValue())
&& "noSuchInstance".equals(messages.get(i).getValue())) {
continue;
}
Message message = messages.get(i);
OpenTSDBUtils.putData(message.getCode() + "-" + message.getHost(), Long.parseLong(message.getTime()),
Double.valueOf(message.getValue()), tagMap);
}
}
} catch (Exception e) {
flag = false;
log.error("获取ECS的指标数据失败", e);
}
//更新记录上次查询时间
if (flag) {
QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
}
}