RabbitMQ使用

一、 Windows安装教程

下载并安装erlang:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。

下载地址:http://www.erlang.org/downloads

1.对应系统版本下载:

01

2.下载后就一路下一步,然后就是配置Erlang环境变量:

添加系统环境变量ERLANG_HOME,值为安装目录

02

修改系统环境变量Path,在PATH变量中添加“%ERLANG_HOME%\bin”

03

3.添加成功后运行erl,查看是否添加成功

erl

08

4.下载RabbitMQ,对应自己的系统下载

下载地址:http://www.rabbitmq.com/download.html

09

10

5.安装,默认一路下一步。

6.启动服务

进入到安装目录sbin目录,执行“rabbitmq-plugins enable rabbitmq_management”命令

rabbitmq-plugins enable rabbitmq_management

13

再输入“ rabbitmqctl status”命令

14

然后双击"rabbitmq-server.bat"

04

会弹出一个黑窗口不要关闭

05

使用浏览器登录管理页面:http://127.0.0.1:15672/

06

使用用户名密码(guest guest)登录

07


二、Docker安装

1.创建容器

docker run -it -d  --name rabbitmq -p 5672:5672 -p 15672:15672  rabbitmq:management

2.端口映射说明

https://www.rabbitmq.com/networking.html


3.启动访问

用户名,密码都是:guest


三、ubuntu安装

1.  配置环境,进入官网

https://www.rabbitmq.com/#getstarted


查看与erlang对应关系,安装时需要版本对应


进入ubuntu安装页面




2. 将脚本复制下来,到sudo apt-get update -y就可以了,后面安装部分我们手动安装


3.创建vim install-mq.sh 文件,将内容粘贴进去,并给与执行权限

chmod o=rxw,u=rwx,g=rwx install-mq.sh

4.执行 ./install-mq.sh 文件

5.安装erlang,查看可以安装的版本

apt list -a erlang


6.选择版本进行安装

sudo apt-get install -y erlang=1:26.1.2-1

7.安装RabbitMQ,查看可以安装的版本

apt list -a rabbitmq-server


8.选择版本安装

sudo apt-get install -y rabbitmq-server=3.12.12-1

9.安装管理界面

rabbitmq-plugins enable rabbitmq_management

10. 访问页面


11.添加远程账号

rabbitmqctl add_user username password

12.为账号设置角色

rabbitmqctl set_user_tags admin administrator
角色说明:

administrator:超级管理员角色,可以登录控制台查看所有信息,并可以对用户、策略操作

monitoring:监控者角色,可以登录控制台查看rabbitmq节点相关信息,无法对策略管理

management:普通管理者角色,仅可以登录控制台,无法看到节点信息

13.设置权限

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

14.其他操作

#停止

rabbitmqctl stop

#重启
rabbitmq-server -detached
rabbitmqctl start_app

#状态

rabbitmqctl status

#查看账户列表

rabbitmqctl list_users

#删除角色

rabbitmqctl delete_user username

#修改密码

rabbitmqctl change_password username password

 

四、集群

1.拷贝主服务器cookie文件到其他机器

scp /var/lib/rabbitmq/.erlang.cookie root@192.168.137.102:/var/lib/rabbitmq/

2.所有机器重启

rabbitmq-server -detached

3.节点2上面执行

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@主机名1
rabbitmqctl start_app

4.节点3上面执行

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@j节点2
rabbitmqctl start_app

5.查看集群状态

rabbitmqctl cluster_status


6.解除集群节点

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(在node1节点上面执行)

7.镜像队列



8.标识启动

查看标识:
rabbitmqctl list_feature_flags
启动:
rabbitmqctl enable_feature_flag all

9.端口开通

管理控制台:15672

客户端:5672

集群通信:25672

五、控制台使用

1.功能介绍


2.交换机类型

002-01 交换机类型

002-02 交换机类型

3.交换机创建

003-01 交换机创建

4.队列创建

003-02 队列创建


5.点击交换机添加绑定

003-03-01 交换机绑定队列

6.Topic交换机绑定

003-03-02 交换机绑定队列-Topic

六、Spring Boot使用

1.配置文件

spring:
  #RabbitMQ配置
  rabbitmq:
    host: 192.168.137.188
    port: 5672
    username: guest
    password: guest
    #开启消息抵达服务器确认-交互模式,publisher-confirms过时
    publisher-confirm-type: correlated
    #开启消息抵达队列确认
    publisher-returns: true
    #只要抵达队列,以异步方式回调我们returnconfirm
    template:
      mandatory:
    #手动ACK消息,不让它自动回复,不然消息未成功处理消息就丢失了
    listener:
      simple:
        acknowledge-mode: manual

2.依赖引用

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.配置类

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    @Bean
    /**
     * 对象格式化Json
     */
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

} 

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@RequiredArgsConstructor
public class RabbitTemplateConfig {
    private final RabbitTemplate rabbitTemplate;
    /**
     * 定制RabbitTemplate
     * @PostConstruct:对象创建完成以后执行这个方法
     */
    @PostConstruct
    public void initRabbitTemplate(){

        /**
         * 设置抵达服务器确认回调
         *
         * 配置:
         *    spring.rabbitmq.publisher-confirm-type: correlated
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达服务器 ack=true
             * @param correlationData:当前消息的唯一关联数据(这个消息的唯一ID)
             * @param ack:消息是否成功搜道
             * @param cause:失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("--------------[消息抵达服务器回调]----------------------");
                System.out.println("ID:"+correlationData+",是否成功:"+ack+",失败原因:"+cause);
            }

        });

        /**
         * 设置消息未抵达队列的回调
         *
         *  配置:
         *     #开启消息抵达队列确认
         *     spring.rabbitmq.publisher-returns: true
         *     #只要抵达队列,以异步方式回调我们returnconfirm
         *     spring.rabbitmq.template.mandatory: true
         */
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
            /**
             * 只要消息没有投递到指定队列,就触发这个失败回调
             * returnedMessage.getMessage():投递失败的消息详细信息
             * returnedMessage.getReplyCode():回复的状态码
             * returnedMessage.getReplyText():回复的文本内容
             * returnedMessage.getExchange():当时这个消息发给哪个交换机
             * returnedMessage.getRoutingKey():当时这个消息用的哪个路由键
             */
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("=============[消息未抵达队列回调]======================");
                System.out.println("getMessage:"+returnedMessage.getMessage());
                System.out.println("getReplyCode:"+returnedMessage.getReplyCode());
                System.out.println("getReplyText:"+returnedMessage.getReplyText());
                System.out.println("getExchange:"+returnedMessage.getExchange());
                System.out.println("getRoutingKey:"+returnedMessage.getRoutingKey());
            }
        });


    }
}

4.测试类发送消息


import com.cpc.user.entity.CustomerInfoUserEntity;
import com.cpc.utils.emailUtils.EmailUtil;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class EmailApplicationTests {

    @Autowired
    private EmailUtil emailService;

    

    @Autowired
    AmqpAdmin amqpAdmin;

    //创建交换机
    @Test
    public void createExchanges(){
        TopicExchange topicExchange=new TopicExchange("lry.test",true,false);
        amqpAdmin.deleteExchange("lry.test");
        amqpAdmin.declareExchange(topicExchange);
    }


    //创建交换机
    @Test
    public void createQueues(){
        Queue queue=new Queue("lry.test",true);
        amqpAdmin.deleteQueue("lry.test");
        amqpAdmin.declareQueue(queue);
    }

    //交换机绑定队列
    @Test
    public void exchangeBindQueue(){
        //String destination=目的地
        //DestinationType destinationType=绑定类型
        //String exchange=交换机名称
        //String routingKey=路由键
        // @Nullable Map<String, Object> arguments=参数
        Binding binding=new Binding("lry.test", Binding.DestinationType.QUEUE,"lry.test","#.test",null);
        amqpAdmin.removeBinding(binding);
        amqpAdmin.declareBinding(binding);
    }


    @Autowired
    RabbitTemplate rabbitTemplate;

    //发送消息
    @Test
    public void sendMesage(){
        CustomerInfoUserEntity ciue=new CustomerInfoUserEntity();
        ciue.setGid("aaa");
        ciue.setPoc("POC");
        ciue.setRegion("CPC");
        ciue.setUser_name("aaaaaaa");
        rabbitTemplate.convertAndSend("lry.test","#.test",ciue);
    }



}

5.消息接收监听

import com.cpc.email.service.RabbitMQAlertMailService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequiredArgsConstructor
@RequestMapping("/mq")
public class RabbitMQAlertMailController {

    private final RabbitMQAlertMailService service;

    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(@RequestParam(value="num",required = false,defaultValue = "10") Integer num){
        return service.sendMessage(num);
    }
}


import com.cpc.email.entity.AlertEmailInfoEntity;
import com.cpc.user.entity.CustomerInfoUserEntity;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.UUID;

@Service
@RequiredArgsConstructor
@RabbitListener(queues = {"lry.test"})
public class RabbitMQAlertMailService {

    private final RabbitTemplate rabbitTemplate;

    public String sendMessage(Integer num){
        for(int i=0;i<num;i++){
            CustomerInfoUserEntity ciue=new CustomerInfoUserEntity();
            AlertEmailInfoEntity aeie=new AlertEmailInfoEntity();
            if(i%3==0){//成功:类型是CustomerInfoUserEntity
                ciue.setGid("GID-"+i);
                ciue.setPoc("POC-"+i);
                ciue.setRegion("CPC-"+i);
                ciue.setUser_name("Name-"+i);
                rabbitTemplate.convertAndSend("lry.test","#.test",ciue,new CorrelationData(UUID.randomUUID().toString()));
            }else if(i%4==0){ //成功:类型是AlertEmailInfoEntity
                aeie.setGid("GID-"+i);
                rabbitTemplate.convertAndSend("lry.test","#.test",aeie,new CorrelationData(UUID.randomUUID().toString()));
            }else if(i%5==0){//失败,交换机错误:进入ConfirmCallback
                ciue.setGid("GID-"+i);
                ciue.setPoc("POC-"+i);
                ciue.setRegion("CPC-"+i);
                ciue.setUser_name("Name-"+i);
                rabbitTemplate.convertAndSend("lry.test1","#.test",ciue,new CorrelationData(UUID.randomUUID().toString()));
            }else{//失败,路由键错误:进入ReturnsCallback
                aeie.setGid("GID-"+i);
                rabbitTemplate.convertAndSend("lry.test","#.tes00t",aeie,new CorrelationData(UUID.randomUUID().toString()));
            }
        }
        return "OK";
    }

    //@RabbitListener(queues = {"lry.test"})
    /**
     * Message:原生消息详细信息,头+体
     * CustomerInfoUserEntity:发送消息的类型
     * Channel:当前传输数据的通道
     *
     * @RabbitListener:可以标注到类和方法上面
     * @RabbitHandler:只能标注方法上面,如果多个类型(就是第二个参数 CustomerInfoUserEntity),可以把@RabbitListener标注在类上面,@RabbitHandler标注在方法上面。
     */
    @RabbitHandler
    public void recieveMessage(Message message, CustomerInfoUserEntity entity, Channel channel){
        //消息头
        MessageProperties messageProperties = message.getMessageProperties();
        //消息体
        byte[] body = message.getBody();

        //System.out.println("消息头:"+messageProperties);
        //System.out.println("消息体:"+body);
        System.out.println("+++++++++++++++++[CustomerInfoUserEntity类型消息]++++++++++++++++++++++++++");
        System.out.println("消息体Json[CustomerInfoUserEntity]:"+entity);


        //手动ACK需要的参数,channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //确认消息,第二个参数是否批量确认
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            //只有网络中断才会报异常
            e.printStackTrace();
        }
    }

    @RabbitHandler
    public void recieveMessage(Message message, AlertEmailInfoEntity entity, Channel channel){
        //消息头
        MessageProperties messageProperties = message.getMessageProperties();
        //消息体
        byte[] body = message.getBody();

        //System.out.println("消息头:"+messageProperties);
        //System.out.println("消息体:"+body);
        System.out.println("//////////////////////[AlertEmailInfoEntity类型消息]//////////////////////////////");
        System.out.println("消息体Json[AlertEmailInfoEntity]:"+entity);


        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //拒绝:参数一:deliveryTag,参数二:是否批量拒绝,参数三:是否重新回到队列,如果是false就是直接丢弃这个消息
            channel.basicNack(deliveryTag,false,true);
        } catch (IOException e) {
            //只有网络中断才会报异常
            e.printStackTrace();
        }

    }
}












(1)