第一种模型(直连)


第一种模型<直连>

AMQP协议的回顾
在这里插入图片描述

RabbitMQ支持的消息模型

在这里插入图片描述
在这里插入图片描述

引入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

第一种模型(直连)

在这里插入图片描述
在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

开发开发者

public class DirectConnection_Provider {
    public static void main(String[] args) throws IOException, TimeoutException {

//        创建连接mq的连接工厂对象(factory就像一个人工,将要建造下面这些东西)
        ConnectionFactory factory=new ConnectionFactory();
//        设置连接rabbitmq主机(Server)
        factory.setHost("192.168.80.33");
//        设置端口号
        factory.setPort(5672);
//        设置连接哪个虚拟主机(VirtualHost)
        factory.setVirtualHost("/ems");
//        设置访问虚拟主机的用户名密码
        factory.setUsername("ems");
        factory.setPassword("123");

//        获取连接对象
        Connection connection=factory.newConnection();

//        获取连接中的通道
        Channel channel=connection.createChannel();

/**
 * 通道绑定对应的消息队列
 * 参数1  queue:队列名称
 * 参数2  durable:用来定义队列特性 是否需要持久化
 * 参数3  exclusive:是否为独占队列
 * 参数4  autoDelete:是否在完成消费后自动删除队列
 * 参数5  arguments:额外附加参数
 */
        channel.queueDeclare("hello",false,false,false,null);

/**
 *    发布消息
 *    参数1:交换机名称,因为这里用的是第一种模式没用到交换机 所以为空
 *    参数2:队列名称
 *    参数3:传递消息额外设置
 *    参数4:消息的具体内容
 */
        channel.basicPublish("","hello",null,"hello SmallRabbit!!".getBytes());

        channel.close();
        connection.close();
    }
}

开发消费者

public class DirectConnection_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
      /*  ConnectionFactory factory=new ConnectionFactory();

        factory.setHost("192.168.80.33");
        factory.setPort(5672);
        factory.setVirtualHost("/ems");
        factory.setUsername("ems");
        factory.setPassword("123");

        Connection connection=factory.newConnection();*/
        //使用工具类创建连接
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel=connection.createChannel();
        channel.queueDeclare("hello",false,false,false,null);

        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("=====消息内容:"+new String(body));
            }
        });

    }
}

在这里插入图片描述

工具类

public class RabbitMqUtils {
    private  static  ConnectionFactory factory;
    static {/*之所以放到static代码块中 是因为factory是一个重量级资源,频繁的开关会消耗资源,所以放在static代码块
        中就只会在类加载的时候创建一次就可以了。*/
        factory=new ConnectionFactory();
        factory.setHost("192.168.80.33");
        factory.setPort(5672);
        factory.setVirtualHost("/ems");
        factory.setUsername("ems");
        factory.setPassword("123");
    }

    public static Connection getConnection(){

        try {
            return  factory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Channel channel,Connection connection){

            try {
                if (channel!=null){
                channel.close();
                }

                if (connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    }
}

文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录