第一种模型<直连>
AMQP协议的回顾
RabbitMQ支持的消息模型
引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
第一种模型(直连)
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
开发开发者
public class DirectConnection_Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接mq的连接工厂对象(factory就像一个人工,将要建造下面这些东西)
ConnectionFactory factory=new ConnectionFactory();
// 设置连接rabbitmq主机(Server)
factory.setHost("192.168.80.33");
// 设置端口号
factory.setPort(5672);
// 设置连接哪个虚拟主机(VirtualHost)
factory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名密码
factory.setUsername("ems");
factory.setPassword("123");
// 获取连接对象
Connection connection=factory.newConnection();
// 获取连接中的通道
Channel channel=connection.createChannel();
/**
* 通道绑定对应的消息队列
* 参数1 queue:队列名称
* 参数2 durable:用来定义队列特性 是否需要持久化
* 参数3 exclusive:是否为独占队列
* 参数4 autoDelete:是否在完成消费后自动删除队列
* 参数5 arguments:额外附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
/**
* 发布消息
* 参数1:交换机名称,因为这里用的是第一种模式没用到交换机 所以为空
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello",null,"hello SmallRabbit!!".getBytes());
channel.close();
connection.close();
}
}
开发消费者
public class DirectConnection_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
/* ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.80.33");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("ems");
factory.setPassword("123");
Connection connection=factory.newConnection();*/
//使用工具类创建连接
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=====消息内容:"+new String(body));
}
});
}
}
工具类
public class RabbitMqUtils {
private static ConnectionFactory factory;
static {/*之所以放到static代码块中 是因为factory是一个重量级资源,频繁的开关会消耗资源,所以放在static代码块
中就只会在类加载的时候创建一次就可以了。*/
factory=new ConnectionFactory();
factory.setHost("192.168.80.33");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("ems");
factory.setPassword("123");
}
public static Connection getConnection(){
try {
return factory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void close(Channel channel,Connection connection){
try {
if (channel!=null){
channel.close();
}
if (connection!=null){
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}