PoffyZhang преди 1 година
родител
ревизия
7aecaae837
променени са 6 файла, в които са добавени 202 реда и са изтрити 3 реда
  1. +4
    -3
      pmapi/pom.xml
  2. +64
    -0
      pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/DirectRabbitConfig.java
  3. +47
    -0
      pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/RabbitConfig.java
  4. +59
    -0
      pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/controller/TestController.java
  5. +12
    -0
      pmapi/src/test/resources/application-dev.yml
  6. +16
    -0
      pom.xml

+ 4
- 3
pmapi/pom.xml Целия файл

@@ -258,7 +258,10 @@
<scope>system</scope>
<systemPath>${basedir}/src/lib/zwdd-sdk-java-1.2.0.jar</systemPath>
</dependency>

<dependency>
<groupId>com.ningdatech</groupId>
<artifactId>nd-rabbitmq-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
@@ -267,13 +270,11 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.6</version>
</dependency>
<!--状态机-->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>

</dependencies>


+ 64
- 0
pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/DirectRabbitConfig.java Целия файл

@@ -0,0 +1,64 @@
package com.ningdatech.pmapi.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author zpf
* @version 1.0.0
* @Description 创建direct类型的交换机
* @createTime 2023年02月17日
*/
@Slf4j
@Configuration
public class DirectRabbitConfig {


private static final String QUEUE = "TestDirectQueue";
private static final String EXCHANGE = "TestDirectExchange";
private static final String ROUTING_KEY = "TestDirectRouting";

/**
* 创建一个名为TestDirectQueue的队列
*
* @return
*/
@Bean
public Queue testDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
return new Queue(QUEUE, true);
}

/**
* 创建一个名为TestDirectExchange的Direct类型的交换机
*
* @return
*/
@Bean
public DirectExchange testDirectExchange() {
// durable:是否持久化,默认是false,持久化交换机。
// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
return new DirectExchange(EXCHANGE, true, false);
}

/**
* 绑定交换机和队列
*
* @return
*/
@Bean
public Binding bindingDirect() {
//bind队列to交换机中with路由key(routing key)
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);
}
}


+ 47
- 0
pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/config/RabbitConfig.java Целия файл

@@ -0,0 +1,47 @@
package com.ningdatech.pmapi.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);

//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
} else {
log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
}
}
});

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}"
, returnedMessage.getMessage(), returnedMessage.getReplyCode()
, returnedMessage.getReplyText(), returnedMessage.getExchange()
, returnedMessage.getRoutingKey());
}
});

return rabbitTemplate;
}
}

+ 59
- 0
pmapi/src/main/java/com/ningdatech/pmapi/rabbitmq/controller/TestController.java Целия файл

@@ -0,0 +1,59 @@
package com.ningdatech.pmapi.rabbitmq.controller;

import com.alibaba.fastjson.JSON;
import com.ningdatech.pmapi.user.entity.UserInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
* @author zpf
* @version 1.0.0
* @Description
* @createTime 2023年02月17日
*/
@RestController
public class TestController {

@Autowired
RabbitTemplate rabbitTemplate;


@GetMapping("/test")
public String test() {
return "producer ok";
}

@GetMapping("/push")
public String push() {
for (int i = 1; i <= 5; i++) {
//这个参数是用来做消息的唯一标识
//发布消息时使用,存储在消息的headers中
UserInfo user = new UserInfo();
user.setId(1L);
user.setRealName("汪涵");
// 关联的数据,可以用在消息投递失败的时候,作为一个线索,比如我把当前用户的id放进去,如果user消息投递失败
// 我后面可以根据id再找到user,再次投递数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().concat("-") + i);
if (i == 2) {
//故意把交换机写错,演示 confirmCallback
rabbitTemplate.convertAndSend("TestDirectExchange_111", "TestDirectRouting",
JSON.toJSONString(user), correlationData);
} else if (i == 3) {
//故意把路由键写错,演示 returnCallback
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting_111",
JSON.toJSONString(user), correlationData);
} else {
//正常发送
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",
JSON.toJSONString(user), correlationData);
}
}
return "producer push ok";
}
}


+ 12
- 0
pmapi/src/test/resources/application-dev.yml Целия файл

@@ -87,6 +87,18 @@ spring:
wall:
config:
multi-statement-allow: true
# rabbitmq 配置信息
rabbitmq:
host: 110.40.194.60
port: 5672
username: admin
password: admin
#1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)
#1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlate
publisher-confirm-type: correlated
#1.2、确认消息从交换机中到队列中
publisher-returns: true

mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl


+ 16
- 0
pom.xml Целия файл

@@ -149,6 +149,22 @@
<artifactId>nd-log-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.ningdatech</groupId>
<artifactId>nd-rabbitmq-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.6</version>
</dependency>
<!--状态机-->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>



Loading…
Отказ
Запис