Spring Cloud Stream 是一个用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务的框架。

这里记录下Spring Cloud Stream 3.1版本的实现。@StreamListener 等注解在3.1版本后不建议使用推荐使用,摸索官方文档发现使用新的写法,使用java.util.function.[Supplier/Function/Consumer]的方式

Binder Implementations

Spring Cloud Stream 的核心构件是:

  • Destination Binders: 负责提供与外部消息传递系统集成的组件。
  • Destination Bindings: 外部消息传递系统与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • Message: 生产者和使用者使用的规范数据结构,用于与目标绑定器(以及通过外部消息传递系统与其他应用程序进行通信)进行通信。

官方GitHub项目示例

代码实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<artifactId>xm-demo-cloud-alibaba-stream</artifactId>
	<name>xm-demo-cloud-alibaba-stream</name>
	<description>晓梦-测试-阿里微服务-Stream函数式编程</description>

	<parent>
		<groupId>com.xm.demo.cloud.alibaba</groupId>
		<artifactId>xm-demo-cloud-alibaba</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>

	<dependencies>

		<!-- WEB守护进程启动springboot -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- processor -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<!-- 工程主入口 -->
					<mainClass>com.xm.demo.alicloud.stream.XmAlicloudStreamApp</mainClass>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-surefire-plugin</artifactId>
				<configuration>
					<skip>true</skip>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

bootstrap.yml

server: 
  port: 8080
  
spring: 
  application:  
    name: xm-demo-cloud-alibaba-stream
  cloud:  
    stream: 
      default-binder:  rocketMQ
      binders:                                            # 需要绑定的rabbitmq的服务信息
        rabbitMQ:                                         # 定义的名称,用于binding整合
          type: rabbit                                    # 消息组件类型
          environment:                                    # 环境配置
            spring:
              rabbitmq:
                host: 192.168.1.211
                port: 5672
                username: admin
                password: 123456
        rocketMQ: 
          type: rocketmq
          environment:                                    # 环境配置spring.cloud.stream.rocketmq.binder
            spring: 
              cloud:  
                stream: 
                  rocketmq: 
                    binder:  
                      name-server: 192.168.1.211:9876
                      group: mygroup                      # 必须配 否则报错
      bindings:
        rocket-out-0:
          binder: rocketMQ
          destination: rocket-topic
        rocket-in-0:
          binder: rocketMQ
          destination: rocket-topic
          group: rocket-group                      
    function: 
      definition: rocket

controller.java

package com.xm.demo.alicloud.stream.controller;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

	Logger log = LoggerFactory.getLogger(ProducerController.class);

	@Autowired
	private StreamBridge bridge;

	@GetMapping("/test")
	public String test(String id) {
		log.info("请求参数:{}", id);
		Map<String, Object> map = new HashMap<>();
		map.put("id", id);
		boolean send = bridge.send("rocket-out-0", MessageBuilder.withPayload(map)
                .setHeader("DELAY", 2)
                .build());
		log.info("发送消息:{}", send ? "成功" : "失败");
		return id + "-" + send;
	}
}

listener.java

package com.xm.demo.alicloud.stream.listener;

import java.util.function.Consumer;
//import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
//import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
//import reactor.core.publisher.Flux;
//import reactor.core.publisher.Mono;

@Service
public class ConsumerListener {

	Logger log = LoggerFactory.getLogger(ConsumerListener.class);

// 据说是官方推荐
//	@Bean
//	public Function<Flux<Message<String>>, Mono<Void>> rocket() {
//		return flux -> flux.map(message -> {
//			System.out.println(message.getPayload());
//			return message;
//		}).then();
//	}

	@Bean
	public Consumer<String> rocket() {
		return msg -> {
			log.info(msg);
		};
	}
}

DEMO:

下载:https://mengxc.lanzouh.com/ikTTF026zqhc

密码:9jcb