Spring Cloud Stream 是一个用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务的框架。
该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的 Spring 习惯用法和最佳实践,包括对持久化的 pub/sub 语义、消费者组和有状态分区的支持。
这里记录下Spring Cloud Stream 3.1版本的实现。@StreamListener 等注解在3.1版本后不建议使用推荐使用,摸索官方文档发现使用新的写法,使用java.util.function.[Supplier/Function/Consumer]的方式
Binder Implementations
Spring Cloud Stream 的核心构件是:
- Destination Binders: 负责提供与外部消息传递系统集成的组件。
- Destination Bindings: 外部消息传递系统与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
- Message: 生产者和使用者使用的规范数据结构,用于与目标绑定器(以及通过外部消息传递系统与其他应用程序进行通信)进行通信。
代码实现
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.qxw</groupId>
<artifactId>cloud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cloud</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
<spring-cloud-alibaba.version>2021.1</spring-cloud-alibaba.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--bootstrap-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!--stream kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!--stream rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
bootstrap.yml
spring:
application:
name: cloud
kafka:
bootstrap-servers: localhost:9092
producer:
acks: 1
consumer:
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
binders:
defaultKafka:
type: kafka
environment:
spring.kafka: ${spring.kafka}
defaultRabbit:
type: rabbit
environment:
spring.rabbitmq: ${spring.rabbitmq}
default-binder: defaultKafka
bindings:
# bindingName的命名规则
# input: <functionName>-in -<index>
# output: <functionName>-out-<index>
# kafka binding
outputKafka-out-0:
binder: defaultKafka
destination: destination-test-kafka
inputKafka1-in-0:
binder: defaultKafka
destination: destination-test-kafka
group: group1
inputKafka2-in-0:
binder: defaultKafka
destination: destination-test-kafka
group: group2
# rabbit binding
outputRabbit-out-0:
binder: defaultRabbit
destination: destination-test-rabbit
inputRabbit1-in-0:
binder: defaultRabbit
destination: destination-test-rabbit
group: group1
inputRabbit2-in-0:
binder: defaultRabbit
destination: destination-test-rabbit
group: group2
function:
# functionName对应服务中的Bean
definition: inputKafka1;inputKafka2;inputRabbit1;inputRabbit2
CloudApplication.java
package com.qxw.cloud;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import java.util.function.Consumer;
@SpringBootApplication
@RestController
@RequestMapping
@Slf4j
public class CloudApplication {
public static void main(String[] args) {
SpringApplication.run(CloudApplication.class, args);
}
@Autowired
private StreamBridge bridge;
@PostMapping("sendKafka")
public ResponseEntity<Boolean> sendKafka(@RequestBody Map<String, Object> map) {
boolean send = bridge.send("outputKafka-out-0", map);
return ResponseEntity.ok(send);
}
@PostMapping("sendRabbit")
public ResponseEntity<Boolean> sendRabbit(@RequestBody Map<String, Object> map) {
boolean send = bridge.send("outputRabbit-out-0", map);
return ResponseEntity.ok(send);
}
@Bean
public Consumer<String> inputKafka1() {
return str -> {
// 收到消息在这里做一些处理
log.info("inputKafka1 message: {}", str);
};
}
@Bean
public Consumer<String> inputKafka2() {
return str -> {
log.info("inputKafka2 message: {}", str);
};
}
@Bean
public Consumer<String> inputRabbit1() {
return str -> {
log.info("inputRabbit1 message: {}", str);
};
}
@Bean
public Consumer<String> inputRabbit2() {
return str -> {
log.info("inputRabbit2 message: {}", str);
};
}
}