德彪,不如让我们重新开始:
简单回看下rabbitmq,以前用仅局限于去控制台配置,而且应该只用最简单的那种直连模型,这次更加全面的对这个消息中间件做个学习了解,优点有三:解耦、异步、削峰。
1、MQ的概念:
MQ(Message Quene) : 翻译为 消息队列
,通过典型的 生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
RabbitMQ基于AMQP协议,AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
除了RabbitMQ,其他主流的MQ:ActiveMQ、Kafka、RocketMQ等,各有特点,相对来说,这次看的RabbitMQ在这几个里头是最稳定可靠的。
总结RabbitMQ:基于AMQP
协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。 后半句他自己官网说的。。。。。。
官方教程:https://www.rabbitmq.com/#getstarted
2、安装RabbitMQ
linux有linux的装法,windows有windows的 ,(废话)听君一席话,如听一席话。我用windows装的,但是听说可能不稳定,所以以后还是用linux装吧。比较简单,因为是erlang开发的所以先装个erlang环境,然后直接启动bat就行了,随便去百度首页搜个安装教程就行。
RabbitMQ的命令还是不少的,也记不全,windows下就直接去安装目录sbin下执行rabbitmqctl help看就完事了。(rabbitmqctl status 看状态 rabbitmqctl version 看版本)
启动后访问localhost:15672就可以访问图形化界面了(注意是15672端口,5672是amqp协议端口,25672不知道),输入默认用户名密码guest guest可以到控制台首页。
管理界面的几个功能都顾名思义,overview就是概览;其他几个都是对amqp协议数据通信的可视化呈现:
- `connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况`
- `channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。`
- `Exchanges:交换机,用来实现消息的路由`
- `Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。`
Admin里面功能创建用户、给用户赋权、创建虚拟主机并绑定用户和虚拟主机。为了日后生产者消费者和RabbitMQ建立连接。创建后可见。
也可以在其他功能中观察到用户和虚拟主机的对照关系(例在连接中):
3、RabbitMQ支持的消息模型
有6种,一种一种来看,首先先从maven引个依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
(1)、直连模型
比较直接,生产者发,消息队列存,消费者取,看看代码实现:
工具类,把加载资源写在静态代码块里了,这种东西一般不会太变吧。。然后在工具类里封装创建连接的方法和关闭连接和通道的方法,都是rabbitmq包里封好的了,所以拿来调就完了;方法都默认需要异常处理。
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jinyunlong
* @date 2021/8/23 18:57
* @profession ICBC锅炉房保安
*/
public class RabbitMQConnection {
private static ConnectionFactory connectionFactory;
static {
//重量级资源 静态代码块类加载执行 只执行一次
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("jinyunlong");
connectionFactory.setUsername("jinyunlong");
connectionFactory.setPassword("xxxxxxxxx");
}
//定义提供链接对象的方法
public static Connection getConnection(){
try {
Connection connection = connectionFactory.newConnection();
return connection;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
//关闭通道和关闭连接工具方法
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if(channel!=null) {
channel.close();
}
if(connection!=null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
生产者:
package helloWorld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author jinyunlong
* @date 2021/8/23 14:41
* @profession ICBC锅炉房保安
*/
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
//通过工具类获取连接对象
Connection connection = RabbitMQConnection.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1:队列名称 如果队列不存在自动创建
//参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化 不持久化重启mq消息就丢了
//参数3:excusive 是否独占队列 true 独占队列 false 不独占 独占只能和以一个通道绑定
//参数4:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5:额外附加参数
//生产者消费者参数一定要对应!!!
channel.queueDeclare("hello",false,false,false,null);
//发布消息
//参数1:交换机名称 没有就空着
//参数2:队列名称
//参数3:传递消息的额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN 即使重启,也要持久
//参数4:消息的具体内容
channel.basicPublish("","hello",null,"hello rabbitMQ".getBytes(StandardCharsets.UTF_8));
RabbitMQConnection.closeConnectionAndChanel(channel,connection);
}
}
既然连接关闭都封好了,那么生产者代码就关键的就三行
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",null,"hello rabbitMQ".getBytes(StandardCharsets.UTF_8));
建通道,建队列,往队列里发消息,俩方法里参数含义参考生产者类里面的注释
消费者;
package helloWorld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jinyunlong
* @date 2021/8/23 15:39
* @profession ICBC锅炉房保安
*/
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
{
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1:队列名称 如果队列不存在自动创建
//参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化 不持久化重启mq消息就丢了
//参数3:excusive 是否独占队列 true 独占队列 false 不独占 独占只能和以一个通道绑定
//参数4:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5:额外附加参数
//生产者消费者参数一定要对应!!!
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数1:消费哪个队列的消息 队列名称
//参数2:开始消息的自动确认机制 消息自动确认 true 消费者自动向rabbitMQ确认消费信息 建议别用:比如分配消息队列已经向消费者分配了5个消息,处理了3个时消费者挂了,剩下2个也没法处理了
//参数3:消费时的回调接口
channel.basicConsume("hello",true, new DefaultConsumer(channel){
@Override //最后一个参数:消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("new String(body) = " + new String(body));
}
});
}
}
}
也和生产者写法差不多,唯一不同的就是消费消息时的一个重写方法。有一点一定要注意就是生产消费者的创建消息队列方法中的参数一定要保持一致!! 还有消费方法中第二个参数 消息自动确认机制,一般建议是false 因为如果是自动确认,比如消息队列已经向消费者分配了5个消息,然后消息队列就不管了默认5条就都处理了,但是消费者消费者处理完3个挂了,剩下2个也没法处理了,导致消息丢失。在接下来work quene模型中可以看到手动确认消息的例子。(感觉就像是消息的一种等待)
(2)、任务模型(work quene)
其实和直连差不多,就是增加了消费者,但是消息是共同被消费的,比如1消费了,2就消费不到了。图中的假设是两个消费者一个处理消息慢一个快,假如说性能一样的话,处理消息的数量是一样的(默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环)。这里的代码模拟的就是一快一慢的场景,而且增加了消息的手动确认。
生产者,加了个for循环,跑一次发100条消息到消息队列:
package workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @author jinyunlong
* @date 2021/8/24 8:50
* @profession ICBC锅炉房保安
*/
public class Provider {
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connection = RabbitMQConnection.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通过通道声明队列
channel.queueDeclare("work",false,false,false,null);
//生产消息
for (int i = 0; i <=100; i++) {
channel.basicPublish("","work",null,(i+"hello work quene").getBytes(StandardCharsets.UTF_8));
}
RabbitMQConnection.closeConnectionAndChanel(channel,connection);
}
}
消费者1和消费者2,内容一样,1加了一个sleep,就只贴1了:
package workquene;
import com.rabbitmq.client.*;
import utils.RabbitMQConnection;
import java.io.IOException;
/**
* @author jinyunlong
* @date 2021/8/24 8:56
* @profession ICBC锅炉房保安
*/
public class Customer1 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQConnection.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",false,false,false,null);
//false 不会自动确认消息
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1::::"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
手动消息确认的关键在这几行:
channel.basicQos(1); //一次只接受一条未确认的消息
channel.basicConsume("work",false,new DefaultConsumer(channel) //参数2:关闭自动确认消息
channel.basicAck(envelope.getDeliveryTag(),false); //手动确认消息
消费者只有调用basicAck方法后才会去消费下一条消息。假设一个场景是消费者1和2 1比2慢,然后生产者发了100条消息,2在处理了59条时,1处理了2条,2在处理62条时挂了,这时basicAck就调不到了,然后从62开始的消息就全去消费者1里调用了。
如图,1sleep了1s ,2正常处理,我在2获取62条时把消费者2停了,所以62就会去1接着获取,然后接着往下:
这么看的话,相比于自动消息确认,手动消息确认在消费者挂了以后更不容易丢消息。本质是等待。
1w字了,后面的模式后面说,不年轻还叫年轻人吗?