参考链接
疑问
这里配置生产者、消费者 每一个都需要在yml配置,看起来很复杂,不知道有没有简单的配置方法
pom 添加依赖
方式一
<!--cloud rabbitMq 依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId><version>4.1.0</version></dependency>
方式二
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>4.1.0</version>
</dependency>
yml 配置文件
rabbitmq 配置
spring:cloud:stream:binders:rabbit:type: rabbitenvironment: #配置rabbitmq连接环境spring:rabbitmq:host: ipusername: adminpassword: adminvirtual-host: my_vhost
这里我把生产者 消费者放在一个项目测试,可以在不同想目放生产者、消费者
消费者 配置
spring:cloud:stream:bindings:# in 消费者test-in-0:content-type: application/jsondestination: test-destinationgroup: test-groupbinder: rabbittest1-in-0:content-type: application/jsondestination: test1-destinationgroup: test1-groupbinder: rabbittest2-in-0:content-type: application/jsondestination: test2-destinationgroup: test2-group # 队列binder: rabbitfunction:definition: test;test1;test2
消费者
@Component
public class ConsumerTest {/*** 注意方法名称 demo 要与配置文件中的spring.cloud.stream.bindings.demo-in-0 保持一致* 其中 -in-0 是固定写法,in 标识消费者类型,0是消费者索引*/@Beanpublic Consumer<Person> test() {return person -> {System.out.println("Received: " + person);};}@Beanpublic Consumer<String> test1() {return msg -> {System.out.println("Received: " + msg);};}@Beanpublic Consumer<Person> test2() {return msg -> {System.out.println("Received: " + msg);};}}
public class Person {private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}public String toString() {return this.name;}
}
启动后可以在 rabbitmq 控制台看到 生成的 topic
、 queue
配置生产者
yml
spring:cloud:stream:bindings:# 生产者test-out-0:content-type: application/jsondestination: test-destination # topicbinder: rabbittest1-out-0:content-type: application/jsondestination: test1-destinationbinder: rabbittest2-out-0:content-type: application/jsondestination: test2-destinationbinder: rabbit
测试代码
package com.example.demorabbit;import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequiredArgsConstructor
public class ProducerController {private final StreamBridge streamBridge;@GetMapping("sendMsg")public String sendMsg(int delay, String name) {Person person = new Person();person.setName(name);
// Message<SpringcloudstreamDemo1Application.Person> message = MessageBuilder.withPayload(person)
// .setHeader("x-delay", delay).build();
// // 发送延时消息
// streamBridge.send("demo2-out-0", message);streamBridge.send("test1-out-0", person);streamBridge.send("test-out-0", person);return "发送成功";}
}
测试发送消息
http://localhost:5656/sendMsg?delay=10000&name=zhangsan
发送接收成功