RabbitMQ的使用
约 866 字大约 3 分钟
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
2、配置rabbitmq的连接信息等
生产者配置
rabbitmq: host: 110.40.209.16 port: 5672 username: zhuoye password: zy521 virtual-host: /
消费者配置
rabbitmq: host: 110.40.209.16 port: 5672 username: zhuoye password: zy521 virtual-host: / listener: simple: prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息
3、设置消息转换器
目的:默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用
Jackson的序列化代替JDk的序列化。
在生产者和消费者的启动类上加上如下代码:
@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {
public static void main( String[] args ) {
SpringApplication.run(ConsumerApp.class, args);
String fozuStr = "ICAgICAgICAgICAgICAgICAgIF9vb09vb18KICAgICAgICAgICAgICAgICAgbzg4ODg4ODhvCiAgICAgICAgICAgICAgICAgIDg4IiAuICI4OAogICAgICAgICAgICAgICAgICAofCAtXy0gfCkKICAgICAgICAgICAgICAgICAgT1wgID0gIC9PCiAgICAgICAgICAgICAgIF9fX18vYC0tLSdcX19fXwogICAgICAgICAgICAgLicgIFxcfCAgICAgfC8vICBgLgogICAgICAgICAgICAvICBcXHx8fCAgOiAgfHx8Ly8gIFwKICAgICAgICAgICAvICBffHx8fHwgLTotIHx8fHx8LSAgXAogICAgICAgICAgIHwgICB8IFxcXCAgLSAgLy8vIHwgICB8CiAgICAgICAgICAgfCBcX3wgICcnXC0tLS8nJyAgfCAgIHwKICAgICAgICAgICBcICAuLVxfXyAgYC1gICBfX18vLS4gLwogICAgICAgICBfX19gLiAuJyAgLy0tLi0tXCAgYC4gLiBfXwogICAgICAuIiIgJzwgIGAuX19fXF88fD5fL19fXy4nICA+JyIiLgogICAgIHwgfCA6ICBgLSBcYC47YFwgXyAvYDsuYC8gLSBgIDogfCB8CiAgICAgXCAgXCBgLS4gICBcXyBfX1wgL19fIF8vICAgLi1gIC8gIC8KPT09PT09YC0uX19fX2AtLl9fX1xfX19fXy9fX18uLWBfX19fLi0nPT09PT09CiAgICAgICAgICAgICAgICAgICBgPS0tLT0nCl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXl5eXgogICAgICAgICAgICAgICAgIOS9m+elluS/neS9kSAgICAgICDmsLjml6BCVUc=";
byte[] decode = Base64.decode(fozuStr);
System.out.println(new String(decode));
}
//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化
@Bean
public MessageConverter jacksonMessageConvertor(){
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能
return jackson2JsonMessageConverter;
}
}
4、生产者代码示例
- 配置文件
@Configuration
public class RabbitMqConfig {
private static String EXCHANGE_NAME="amq.topic";
private static String QUEUE_NAME="alarm.data.topic.queue";
private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";
/**
* 声明交换机
*/
@Bean
public TopicExchange exchange(){
// durable:是否持久化,默认是false
// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。
return new TopicExchange(EXCHANGE_NAME,true,false);
}
/**
* 声明告警队列
* @return
*/
@Bean("alarmQueue")
public Queue alarmQueue(){
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new Queue(QUEUE_NAME,true,false,false);
}
/**
* 声明确认告警队列
* @return
*/
@Bean("confirmAlarmQueue")
public Queue confirmAlarmQueue(){
return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);
}
/**
* 声明告警队列绑定关系
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");
}
/**
* 声明确认告警队列绑定关系
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");
}
生产消息代码
@Autowired private RabbitTemplate rabbitTemplate; private static String EXCHANGE_NAME="amq.topic"; private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue"; @Test void producerAlarmMsg() { String msg = "发送一条告警消息"; rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg); System.out.println("msg = " + msg); } @Test void producerConfirmAlarmMsg() { String msg = "发送一条确认告警消息"; rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg); System.out.println("msg = " + msg); }
5、消费者代码示例
@Component
public class AlarmConsumer {
@Autowired
private IAlarmService alarmService;
@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")
public void getAlarmInfo(String data){
alarmService.dealAlarmData(data);
}
@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")
public void getConfirmAlarmInfo(String data){
alarmService.dealConfirmAlarmData(data);
}
}
@Service
public class IAlarmServiceImpl implements IAlarmService {
@Override
public void dealAlarmData(String data) {
EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);
List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();
DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);
dceEquipAlarmDto.setCreateTime(new Date());
dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);
//查询出需要新增或者更新的数据
Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();
//开启事务,保证新增、更新、删除的原子性
TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
List<DceEquipAlarmDto> list=new ArrayList<>();
list.add(dceEquipAlarmDto);
try {
//新增
if (!flag) {
dceEquipAlarmMapper.insertBatch(list);
}
//更新
if (flag) {
dceEquipAlarmMapper.updateBatch(list);
}
//提交事务
transactionManager.commit(transaction);
} catch (Exception e) {
//回滚
transactionManager.rollback(transaction);
log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);
}
}
@Override
public void dealConfirmAlarmData(String data) {
EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);
Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));
alarmResp.setConfirmTime(confirmTime);
dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());
}
}