SpringCloud-Stream-1

构建微服务架构(Stream 消息驱动 篇)

该篇文档,前置代码下载:下载
该篇文档,全部完成后的代码下载:下载

原文链接:https://blog.csdn.net/u011863024/article/details/114298270

Stream 为什么被引入

常见 MQ(消息中间件):

ActiveMQ
RabbitMQ
RocketMQ
Kafka

有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)

Cloud Stream 是什么?

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

Stream是什么及Binder介绍

官方文档1:https://spring.io/projects/spring-cloud-stream#overview

Cloud Stream 中文指导手册:https://m.wang1314.com/doc/webapp/topic/20971999.html

什么是 Spring Cloud Stream?

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过inputs或者 outputs 来与 Spring Cloud Stream 中 binder 对象交互。

通过我们配置来 binding(绑定),而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持 RabbitMQ、 Kafka。

Stream 的设计思想

标准 MQ

生产者/消费者之间靠消息媒介传递信息内容
消息必须走特定的通道 - 消息通道 Message Channel
消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息处理器所订阅。

为什么用 Cloud Stream?

比方说我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange,kafka 有 Topic 和 Partitions 分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 Spring Cloud Stream 给我们提供了—种解耦合的方式。

Stream 凭什么可以统一底层差异?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder:

INPUT 对应于消费者

OUTPUT 对应于生产者

Stream 中的消息通信方式遵循了发布-订阅模式

Topic 主题进行广播

在 RabbitMQ 就是 Exchange
在 Kakfa 中就是 Topic

Stream 编码常用注解简介
Spring Cloud Stream 标准流程套路

Binder - 很方便的连接中间件,屏蔽差异。

Channel - 通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过 Channel 对队列进行配置。

Source 和 Sink - 简单的可理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入。

编码 API 和常用注解

组成 说明
Middleware 中间件,目前只支持 RabbitMQ 和 Kafka
Binder Binder 是应用与消息中间件之间的封装,目前实行了 Kafka 和 RabbitMQ 的 Binder,通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型(对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输乎通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道 channel 和 exchange 绑定在一起

案例说明

准备 RabbitMQ 环境

工程中新建三个子模块

cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
cloud-stream-rabbitmq-consumer8802,作为消息接收模块
cloud-stream-rabbitmq-consumer8803,作为消息接收模块

1. Stream 消息驱动之生产者(cloud-stream-rabbitmq-provider8801)

1. 新建 Module:cloud-stream-rabbitmq-provider8801

2. cloud-stream-rabbitmq-provider8801 项目 pom.xml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud2021</artifactId>
<groupId>com.sevattal.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. cloud-stream-rabbitmq-provider8801 项目 application.yml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址

注: defaultRabbit 和 binders 在 stream 下。

4. cloud-stream-rabbitmq-provider8801 项目 主启动类 文件

1
2
3
4
5
6
7
8
9
package com.sevattal.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}

5. cloud-stream-rabbitmq-provider8801 项目 业务类消息接口 文件

1
2
3
4
package com.sevattal.springcloud.service;
public interface IMessageProvider {
public String send();
}

6. cloud-stream-rabbitmq-provider8801 项目 发送消息接口实现类 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.sevattal.springcloud.service.impl;
import com.sevattal.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}

7. cloud-stream-rabbitmq-provider8801 项目 Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.sevattal.springcloud.controller;
import com.sevattal.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}

8. 测试

启动 7001eureka
启动 RabpitMq(79_Bus之RabbitMQ环境配置)

rabbitmq-plugins enable rabbitmq_management
http://localhost:15672/

启动 8801
访问 - http://localhost:8801/sendMessage
后台将打印 serial: UUID 字符串

2. Stream消息驱动之消费者(cloud-stream-rabbitmq-consumer8802)

1. 新建Module:cloud-stream-rabbitmq-consumer8802

2. cloud-stream-rabbitmq-consumer8802 项目 pom.xml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud2021</artifactId>
<groupId>com.sevattal.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. cloud-stream-rabbitmq-consumer8802 项目 application.yml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址

4. cloud-stream-rabbitmq-consumer8802 项目 主启动类 文件

1
2
3
4
5
6
7
8
9
package com.sevattal.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}

5. cloud-stream-rabbitmq-consumer8802 项目 Controller 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.sevattal.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}

6. 测试

启动 EurekaMain7001

启动 StreamMQMain8801

启动 StreamMQMain8802

8801 发送 8802 接收消息

3. Stream 之消息重复消费

依照 8802,克隆出来一份运行 8803 - cloud-stream-rabbitmq-consumer8803。

启动

RabbitMQ
服务注册 - 7001
消息生产 - 8801
消息消费 - 8802
消息消费 - 8803

运行后有两个问题

有重复消费问题
消息持久化问题

消费

http://localhost:8801/sendMessage
目前是 8802/8803 同时都收到了,存在重复消费问题
如何解决:分组和持久化属性 group(重要)

生产实际案例

比如在如下场景中,订单系统我们做集群部署,都会从 RabbitMQ 中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用 Stream 中的消息分组来解决。

注意在 Stream 中处于同一个 group 中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

Stream 之 group 解决消息重复消费

原理

微服务应用放置于同一个 group 中,就能够保证消息只会被其中一个应用消费一次。

不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

8802/8803 都变成不同组,group 两个不同

group: A_Group、B_Group

8802 修改 YML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的 rabbitmq 的服务信息;
defaultRabbit: # 表示定义的名称,用于于 binding 整合
type: rabbit # 消息组件类型
environment: # 设置 rabbitmq 的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的 Exchange 名称定义
content-type: application/json # 设置消息类型,本次为 json,文本则设置"text/plain"
group: A_Group # <----------------------------------------关键

8803 修改 YML(与 8802 的类似位置 group: B_Group)

结论:还是重复消费

8802/8803 实现了轮询分组,每次只有一个消费者,8801 模块的发的消息只能被 8802 或 8803 其中一个接收到,这样避免了重复消费。

8802/8803 都变成相同组,group 两个相同

group: A_Group

8802修改YMLgroup: A_Group

8803修改YMLgroup: A_Group

结论:同一个组的多个微服务实例,每次只会有一个拿到

Stream 之消息持久化

通过上述,解决了重复消费问题,再看看持久化。

停止 8802/8803, 并去除掉 8802 的分组 group: A_Group, 8803 的分组 group: A_Group 没有去掉。

8801 先发送 4 条消息到 RabbitMq。

先启动 8802,无分组属性配置,后台没有打出来消息。

再启动 8803,有分组属性配置,后台打出来了 MQ 上的消息。(消息持久化体现)

Contents
  1. 1. 构建微服务架构(Stream 消息驱动 篇)
    1. 1.1. Stream 为什么被引入
    2. 1.2. Stream是什么及Binder介绍
    3. 1.3. Stream 的设计思想
    4. 1.4. 案例说明
      1. 1.4.1. 1. Stream 消息驱动之生产者(cloud-stream-rabbitmq-provider8801)
        1. 1.4.1.1. 1. 新建 Module:cloud-stream-rabbitmq-provider8801
        2. 1.4.1.2. 2. cloud-stream-rabbitmq-provider8801 项目 pom.xml 文件
        3. 1.4.1.3. 3. cloud-stream-rabbitmq-provider8801 项目 application.yml 文件
        4. 1.4.1.4. 4. cloud-stream-rabbitmq-provider8801 项目 主启动类 文件
        5. 1.4.1.5. 5. cloud-stream-rabbitmq-provider8801 项目 业务类消息接口 文件
        6. 1.4.1.6. 6. cloud-stream-rabbitmq-provider8801 项目 发送消息接口实现类 文件
        7. 1.4.1.7. 7. cloud-stream-rabbitmq-provider8801 项目 Controller
        8. 1.4.1.8. 8. 测试
      2. 1.4.2. 2. Stream消息驱动之消费者(cloud-stream-rabbitmq-consumer8802)
        1. 1.4.2.1. 1. 新建Module:cloud-stream-rabbitmq-consumer8802
        2. 1.4.2.2. 2. cloud-stream-rabbitmq-consumer8802 项目 pom.xml 文件
        3. 1.4.2.3. 3. cloud-stream-rabbitmq-consumer8802 项目 application.yml 文件
        4. 1.4.2.4. 4. cloud-stream-rabbitmq-consumer8802 项目 主启动类 文件
        5. 1.4.2.5. 5. cloud-stream-rabbitmq-consumer8802 项目 Controller 文件
        6. 1.4.2.6. 6. 测试
      3. 1.4.3. 3. Stream 之消息重复消费
    5. 1.5. Stream 之 group 解决消息重复消费
    6. 1.6. Stream 之消息持久化
|