接着上一节说另外几种模型:

(3)、广播模型

10.PNG
有了交换机的概念,工作模式是消费者们共用一个消息队列,消息用完就没了,广播模式则是生产者会通过交换机把消息发送到所有与之绑定的消息队列供消费者消费,所以所有消费者都可以拿到相同的消息,实现一条消息被多个消费者消费。可以参考springcloud bus 消息总线吧。

生产者,参数含义参考注释:

package fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author jinyunlong
 * @date 2021/8/24 15:32
 * @profession ICBC锅炉房保安
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();

        //将通道声明指定的交换机  //参数1:交换机名称  参数2:交换机类型 fanout 广播类型
        channel.exchangeDeclare("fanfanfan","fanout");
        //发送消息
        channel.basicPublish("fanfanfan","",null,"fanout type message".getBytes(StandardCharsets.UTF_8));
        //释放资源
        RabbitMQConnection.closeConnectionAndChanel(channel,connection);
    }
}

消费者123内容一样,都是接广播的,这里注意调用 channel.queueDeclare().getQueue();这个方法是创建临时队列的,关闭连接后就没了:

package fanout;

import com.rabbitmq.client.*;
import utils.RabbitMQConnection;

import java.io.IOException;

/**
 * @author jinyunlong
 * @date 2021/8/24 15:44
 * @profession ICBC锅炉房保安
 */
public class Customer1 {
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("fanfanfan","fanout");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queue,"fanfanfan","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

(4)、路由模型

11.PNG

简言之和广播模式相比,路由模式就是指哪发哪,因为生产者发送时指定了路由key,所以消费者消费时需要对应的绑定关系才可以去消费消息。

生产者,需要注意的就是在发消息方法里要指定路由key了:

package direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author jinyunlong
 * @date 2021/8/24 16:37
 * @profession ICBC锅炉房保安
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQConnection.getConnection();
        //获取连接通道对象
        Channel channel = connection.createChannel();
        //通过通道声明交换机
        channel.exchangeDeclare("logs_direct","direct");
        //发送消息
        String routingkey = "warning";
        channel.basicPublish("logs_direct",routingkey,null,
                ("这是direct模型发布的基于route_key:["+routingkey+"]发送的消息").getBytes(StandardCharsets.UTF_8));
        //关闭资源
        RabbitMQConnection.closeConnectionAndChanel(channel,connection);

    }
}

两个消费者,指定了不同的路由key,1是只能消费error key所以消息不到生产者发出的消息,2的话指定了3个key,有一个满足生产者的warning key,所以可以消费到消息:

package direct;

import com.rabbitmq.client.*;
import utils.RabbitMQConnection;

import java.io.IOException;

/**
 * @author jinyunlong
 * @date 2021/8/24 16:48
 * @profession ICBC锅炉房保安
 */
public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "logs_direct";
        //通道声明交换机以及交换的类型
        channel.exchangeDeclare(exchangeName,"direct");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于route key 绑定队列和交换机
        channel.queueBind(queue,exchangeName,"error");
        //获取消费的消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });

    }
}

package direct;

import com.rabbitmq.client.*;
import utils.RabbitMQConnection;

import java.io.IOException;

/**
 * @author jinyunlong
 * @date 2021/8/24 16:48
 * @profession ICBC锅炉房保安
 */
public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "logs_direct";
        //通道声明交换机以及交换的类型
        channel.exchangeDeclare(exchangeName,"direct");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于route key 绑定队列和交换机
        channel.queueBind(queue,exchangeName,"info");
        channel.queueBind(queue,exchangeName,"error");
        channel.queueBind(queue,exchangeName,"warning");
        //获取消费的消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });

    }
}

(5)、订阅模型

12.PNG
更灵活版的路由模型,可以使用通配符了,*比#更加严格一点:

* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
# (hash) can substitute for zero or more words.  匹配一个或多个词

生产者,写法和路由差不多,交换机类型要换一下:

package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author jinyunlong
 * @date 2021/8/25 9:07
 * @profession ICBC锅炉房保安
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型  topic
        channel.exchangeDeclare("topics","topic");
        //发布消息
        String routekey = "user.save";
        channel.basicPublish("topics",routekey,null,
                ("这里是topic动态路由模型,routekey:["+routekey+"]").getBytes(StandardCharsets.UTF_8));
        //关闭资源
        RabbitMQConnection.closeConnectionAndChanel(channel,connection);
    }
}

消费者12,下头的写法是都能消费消息的,但比如在user.*后面再来个.save,这样就消费不到了,多试几个就明白了:

package topic;

import com.rabbitmq.client.*;
import utils.RabbitMQConnection;

import java.io.IOException;

/**
 * @author jinyunlong
 * @date 2021/8/25 9:16
 * @profession ICBC锅炉房保安
 */
public class Customer1 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        channel.exchangeDeclare("topics","topic");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列和交换机 动态通配符形式route key
        channel.queueBind(queue,"topics","user.*");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

package topic;

import com.rabbitmq.client.*;
import utils.RabbitMQConnection;

import java.io.IOException;

/**
 * @author jinyunlong
 * @date 2021/8/25 9:16
 * @profession ICBC锅炉房保安
 */
public class Customer2 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        channel.exchangeDeclare("topics","topic");
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列和交换机 动态通配符形式route key
        channel.queueBind(queue,"topics","user.#");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}

4、SpringBoot集成RabbitMQ

总的来说硬编码看rabbitmq就很简单了,用springboot集成更简单,用注解就完事了:

依赖和配置:

 <!--引入与rabbitma集成的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: 你的
    password: 你的
    virtual-host: 你的

生产者,里面把上述5种模式的消息发布都写了,在springboot中,只需要注入模板类rabbitTemplate然后调用里面的方法就行了,参数配置也很简单了,重载了一堆方法,看用哪个模式写哪些方法吧:

package com.jin.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author jinyunlong
 * @date 2021/8/25 14:44
 * @profession ICBC锅炉房保安
 */
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic 动态路由 订阅模式
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
    }

    //route 路由模式
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
    }

    //fanout 广播
    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的消息");
    }

    //work
    @Test
    public void testWork(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","work 模型");
        }
    }

    //hello world
    @Test
    public void testHello(){
        rabbitTemplate.convertAndSend("hello","hello world");
    }
}

消费者们,@RabbitListener既可以放在类上也可以放在方法上,一般放在方法上就行,那个@RabbitHandler好像没啥用把,想不起来了,然后@Queue注解、@Exchange注解不额外写参数的话用的应该就是最常用的默认值好像,忘了忘了。。。

package com.jin.rabbitmq.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author jinyunlong
 * @date 2021/8/25 16:57
 * @profession ICBC锅炉房保安
 */
@Component  //持久化 非独占 不是自动删除队列
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {
    @RabbitHandler
    public void receivce(String message){
        System.out.println("message =" + message);
    }
}

package com.jin.rabbitmq.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author jinyunlong
 * @date 2021/8/25 17:46
 * @profession ICBC锅炉房保安
 */
@Component
public class WorkCustomer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("message1 ="+message);
    }

    //第二个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("message2 ="+message);
    }
}

package com.jin.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author jinyunlong
 * @date 2021/8/25 18:22
 * @profession ICBC锅炉房保安
 */
@Component
public class FanoutCustomer {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
                                              exchange = @Exchange(value = "logs",type = "fanout")  //绑定的交换和类型
             )
    })
    public void receive1(String message){
        System.out.println("message1 = " +message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "logs",type = "fanout")  //绑定的交换机
    )
    })
    public void receive2(String message){
        System.out.println("message2 = " +message);
    }
}

package com.jin.rabbitmq.route;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author jinyunlong
 * @date 2021/8/25 18:33
 * @profession ICBC锅炉房保安
 */
@Component
public class RouteCustomer {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "directs",type = "direct"),  //绑定的交换机和类型
            key = {"info","error","waring"}
    )
    })
    public void receive1(String message){
        System.out.println("message1 = "+message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "directs",type = "direct"),  //绑定的交换机和类型
            key = {"info","waring"}
    )
    })
    public void receive2(String message){
        System.out.println("message2 = "+message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "directs",type = "direct"),  //绑定的交换机和类型
            key = {"error"}
    )
    })
    public void receive3(String message){
        System.out.println("message3 = "+message);
    }
}

package com.jin.rabbitmq.topics;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author jinyunlong
 * @date 2021/8/25 18:44
 * @profession ICBC锅炉房保安
 */
@Component
public class TopicCustomer {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "topics",type = "topic"),  //绑定的交换机和类型
            key = {"user.save","user.*"}
    )
    })
    public void receive1(String message){
        System.out.println("message1 = "+message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "topics",type = "topic"),  //绑定的交换机和类型
            key = {"order.#"}
    )
    })
    public void receive2(String message){
        System.out.println("message2 = "+message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定名称就是生成临时队列
            exchange = @Exchange(value = "topics",type = "topic"),  //绑定的交换机和类型
            key = {"user.#"}
    )
    })
    public void receive3(String message){
        System.out.println("message3 = "+message);
    }
}

都很简单,以后写时参照上面格式来就完事了。

5、RabbitMQ的应用场景

解耦:双11购物节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口,但是要是库存系统突然挂了,好多订单就失败了;引入RabbitMQ,就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

异步:指对业务流程性能优化,比如用户注册业务,注册成功后会去调用发短信和发邮件的业务接口,串行的话很费时间,将发邮件和发短信并行的话虽说能优化流程但也不完美,引入RabbitMQ后,相当于就不管发短信和发邮件了,因为这两个业务对注册成功来说是非必要业务,将注册成功的消息放到消息队列后,以后让他们自己来取就行了。

削峰:引入RabbitMQ后,用户的请求,服务器收到之后,首先写入消息队列,假如消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。总之就是设置程序可处理的最大阈值,超过则直接失败。(这就是我们秒杀从未成功的原因吗😅 )

6、RabbitMQ集群

没有做,就跟K8S没有学一样,太吃资源了,我机器不行。。。

分主从集群和镜像集群,一般用镜像集群就完事了。


标题:RabbitMQ(2)
作者:jyl
地址:http://jinyunlong.xyz/articles/2021/08/27/1630034967400.html