diff --git a/pmapi/pom.xml b/pmapi/pom.xml index f4e7b97..7b3b966 100644 --- a/pmapi/pom.xml +++ b/pmapi/pom.xml @@ -258,7 +258,10 @@ system ${basedir}/src/lib/zwdd-sdk-java-1.2.0.jar - + + com.ningdatech + nd-rabbitmq-starter + org.apache.httpcomponents httpclient @@ -267,13 +270,11 @@ joda-time joda-time - 2.10.6 org.springframework.statemachine spring-statemachine-core - 2.0.1.RELEASE diff --git a/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/DirectRabbitConfig.java b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/DirectRabbitConfig.java new file mode 100644 index 0000000..0ce2b91 --- /dev/null +++ b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/DirectRabbitConfig.java @@ -0,0 +1,64 @@ +package com.ningdatech.pmapi.rabbitmq.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author zpf + * @version 1.0.0 + * @Description 创建direct类型的交换机 + * @createTime 2023年02月17日 + */ +@Slf4j +@Configuration +public class DirectRabbitConfig { + + + private static final String QUEUE = "TestDirectQueue"; + private static final String EXCHANGE = "TestDirectExchange"; + private static final String ROUTING_KEY = "TestDirectRouting"; + + /** + * 创建一个名为TestDirectQueue的队列 + * + * @return + */ + @Bean + public Queue testDirectQueue() { + // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 + // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable + // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。 + // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。 + return new Queue(QUEUE, true); + } + + /** + * 创建一个名为TestDirectExchange的Direct类型的交换机 + * + * @return + */ + @Bean + public DirectExchange testDirectExchange() { + // durable:是否持久化,默认是false,持久化交换机。 + // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。 + // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机 + return new DirectExchange(EXCHANGE, true, false); + } + + /** + * 绑定交换机和队列 + * + * @return + */ + @Bean + public Binding bindingDirect() { + //bind队列to交换机中with路由key(routing key) + return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY); + } +} + diff --git a/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/RabbitConfig.java b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/RabbitConfig.java new file mode 100644 index 0000000..d5a2ac1 --- /dev/null +++ b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/RabbitConfig.java @@ -0,0 +1,47 @@ +package com.ningdatech.pmapi.rabbitmq.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +public class RabbitConfig { + + @Bean + public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(); + rabbitTemplate.setConnectionFactory(connectionFactory); + + //设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。 + //我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除) + rabbitTemplate.setMandatory(true); + + rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack); + } else { + log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause); + } + } + }); + + rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.info("ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}" + , returnedMessage.getMessage(), returnedMessage.getReplyCode() + , returnedMessage.getReplyText(), returnedMessage.getExchange() + , returnedMessage.getRoutingKey()); + } + }); + + return rabbitTemplate; + } +} diff --git a/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/controller/TestController.java b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/controller/TestController.java new file mode 100644 index 0000000..aeecc81 --- /dev/null +++ b/pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/controller/TestController.java @@ -0,0 +1,59 @@ +package com.ningdatech.pmapi.rabbitmq.controller; + +import com.alibaba.fastjson.JSON; +import com.ningdatech.pmapi.user.entity.UserInfo; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +/** + * @author zpf + * @version 1.0.0 + * @Description + * @createTime 2023年02月17日 + */ +@RestController +public class TestController { + + @Autowired + RabbitTemplate rabbitTemplate; + + + @GetMapping("/test") + public String test() { + return "producer ok"; + } + + @GetMapping("/push") + public String push() { + for (int i = 1; i <= 5; i++) { + //这个参数是用来做消息的唯一标识 + //发布消息时使用,存储在消息的headers中 + UserInfo user = new UserInfo(); + user.setId(1L); + user.setRealName("汪涵"); + // 关联的数据,可以用在消息投递失败的时候,作为一个线索,比如我把当前用户的id放进去,如果user消息投递失败 + // 我后面可以根据id再找到user,再次投递数据 + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().concat("-") + i); + if (i == 2) { + //故意把交换机写错,演示 confirmCallback + rabbitTemplate.convertAndSend("TestDirectExchange_111", "TestDirectRouting", + JSON.toJSONString(user), correlationData); + } else if (i == 3) { + //故意把路由键写错,演示 returnCallback + rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting_111", + JSON.toJSONString(user), correlationData); + } else { + //正常发送 + rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", + JSON.toJSONString(user), correlationData); + } + } + return "producer push ok"; + } +} + diff --git a/pmapi/src/test/resources/application-dev.yml b/pmapi/src/test/resources/application-dev.yml index 7727198..616bd7f 100644 --- a/pmapi/src/test/resources/application-dev.yml +++ b/pmapi/src/test/resources/application-dev.yml @@ -87,6 +87,18 @@ spring: wall: config: multi-statement-allow: true + # rabbitmq 配置信息 + rabbitmq: + host: 110.40.194.60 + port: 5672 + username: admin + password: admin + #1、确保消息从发送端到服务端投递可靠(分为以下两个步骤) + #1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlate + publisher-confirm-type: correlated + #1.2、确认消息从交换机中到队列中 + publisher-returns: true + mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl diff --git a/pom.xml b/pom.xml index beccbd0..e6d82f5 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,22 @@ nd-log-starter 1.0.0 + + com.ningdatech + nd-rabbitmq-starter + 1.0.0 + + + joda-time + joda-time + 2.10.6 + + + + org.springframework.statemachine + spring-statemachine-core + 2.0.1.RELEASE +