springCloud→Stream:实战


SpringCloud Stream实战

请添加图片描述
如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们将使用SpringCloud Stream实现该功能。
项目结构图:
在这里插入图片描述

注:本次实战默认已经安装好了rabbitMQ

生产者(支付服务)

1)引入依赖

hailtaxi-pay中引入依赖:

<!--stream-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2)配置MQ服务

修改hailtaxi-payapplication.yml添加如下配置:

    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

完整配置如下:

server:
  port: 18083
spring:
  application:
    name: hailtaxi-pay
  cloud:
    #Consul配置
    consul:
      host: localhost
      port: 8500
      discovery:
        #注册到Consul中的服务名字
        service-name: ${spring.application.name}
    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定

创建com.itheima.pay.mq.MessageSender代码如下:

package com.itheima.pay.mq;

/***
 * 负责向MQ发送消息
 */
@EnableBinding(Source.class)
public class MessageSender {

    @Resource
    private MessageChannel output;//消息发送管道

    /***
     * 发送消息
     * @param message
     * @return
     */
    public Boolean send(Object message) {
        //消息发送
        boolean bo = output.send(MessageBuilder.withPayload(message).build());
        System.out.println("*******send message: "+message);
        return bo;
    }
}

参数说明:

Source.class: 绑定一个输出消息通道Channel。
MessageChannel: 消息发送对象,默认是DirectWithAttributesChannel,发消息在AbstractMessageChannel中完成。
MessageBuilder.withPayload: 构建消息。

4)消息发送

com.itheima.pay.controller.TaxiPayController中创建支付方法用于发送消息,代码如下:

@RestController
@RequestMapping(value = "/pay")
public class TaxiPayController {

    @Autowired
    private MessageSender messageSender;

    /***
     * 支付  http://localhost:18083/pay/wxpay/1
     * @return
     */
    @GetMapping(value = "/wxpay/{id}")
    public TaxiPay pay(@PathVariable(value = "id")String id){
        //支付操作
        TaxiPay taxiPay = new TaxiPay(id,310,3);
        //发送消息
        messageSender.send(taxiPay);
        return taxiPay;
    }
}

消费者(订单服务)

1)修改配置

修改hailtaxi-order的核心配置文件application.yml,在文件中配置要监听的MQ信息:

    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

2)消息监听

hailtaxi-order中创建消息监听对象com.itheima.order.mq.MessageReceiver,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {

    @Value("${server.port}")
    private String port;

    /****
     * 消息监听
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(String message) {
        System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);
    }
}

参数说明:

Sink.class: 绑定消费者管道信息。
@StreamListener(Sink.INPUT): 监听消息配置,指定了消息为application中的input。

测试

我们请求http://localhost:18083/pay/wxpay/1测试效果如下:
在这里插入图片描述
在这里插入图片描述


文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录