第三种模型(fanout)


第三种模型< fanout >

fanout 扇出 也称为广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

开发生产者


public class Provider {
    public static void main(String[] args) throws IOException {
//        获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
//        获取通道
        Channel channel=connection.createChannel();

//        用通道声明指定交换机。 参数1:交换机名称 参数2:交换机类型
        channel.exchangeDeclare("logs","fanout");

//        发送消息
        channel.basicPublish("logs","",null,"Test fanout".getBytes());

//        释放资源
        RabbitMqUtils.close(channel,connection);
    }
}

开发消费者1

package cn.duck.fanout;

import cn.duck.Utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {

    //        获取连接对象
    Connection connection = RabbitMqUtils.getConnection();
    //        获取通道
    Channel channel=connection.createChannel();

//    通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

//        临时队列
        String TempQueueName = channel.queueDeclare().getQueue();

//        绑定交换机和临时队列
        channel.queueBind(TempQueueName,"logs","");

//        消费消息
        channel.basicConsume(TempQueueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));

            }
        });
    }
}

开发消费者2

package cn.duck.fanout;

import cn.duck.Utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {

    //        获取连接对象
    Connection connection = RabbitMqUtils.getConnection();
    //        获取通道
    Channel channel=connection.createChannel();

//    通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

//        临时队列
        String TempQueueName = channel.queueDeclare().getQueue();

//        绑定交换机和临时队列
        channel.queueBind(TempQueueName,"logs","");

//        消费消息
        channel.basicConsume(TempQueueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));

            }
        });
    }
}

开发消费者3

package cn.duck.fanout;

import cn.duck.Utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer3 {
    public static void main(String[] args) throws IOException {

    //        获取连接对象
    Connection connection = RabbitMqUtils.getConnection();
    //        获取通道
    Channel channel=connection.createChannel();

//    通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

//        临时队列
        String TempQueueName = channel.queueDeclare().getQueue();

//        绑定交换机和临时队列
        channel.queueBind(TempQueueName,"logs","");

//        消费消息
        channel.basicConsume(TempQueueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者3:"+new String(body));

            }
        });
    }
}

结果

在这里插入图片描述
三个消费者都能拿到生产者生产的消息。


文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录