微信号:imooc-com

介绍:慕课网是一个超酷的互联网、IT技术免费学习平台,创新的网络一站式学习、实践体验;服务及时贴心,内容专业、有趣易学。专注服务互联网工程师快速成为技术高手!

RabbitMQ集群整合SpringBoot2.x

2018-08-02 08:35 阿神_


RabbitMQ相信大家已经再熟悉不过了,作为业界四大主流消息中间件之一(Apache RocketMQ、Apache Kafka、Apache ActiveMQ、RabbitMQ),它具有非常好的性能和可靠性的集群模式,不仅仅在各大互联网大厂中广泛使用(比如同程艺龙、美团点评等),而且在互联网金融行业也常常被作为首选!


同时MQ技术也是晋级技术骨干、团队核心的必备技术。


SpringBoot作为互联网开发利器已经不需要我再过多介绍什么,接下来我们一起从零开始构建RabbitMQ、并且与SpringBoot2.x的整合吧!



一、安装RabbitMQ集群十步走!(3.6.5版本、采用rpm安装方式,我们要安装3个节点的集群,192.168.11.71 192.168.11.72 192.168.11.73、以71节点为例:)

第一步:下载所需依赖包

Rabbitmq基于erlang语言编写,所以必须要有。

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

socat包为秘钥包,如果没有会提示缺少此依赖。

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

rabbitmq-server-3.6.5核心服务包。

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm


第二步:安装并配置

安装命令:

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm


rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm


rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装成功后:修改配置

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件

修改:loopback_users 中的 <<"guest">>,只保留guest


第三步:3个节点同时进行前两步骤操作:(71、72、73)

第四步:启停单点服务与开启管控台

/etc/init.d/rabbitmq-server start  |  stop   |  status   |    restart

分别启动三个节点,然后执行命令启动控制台插件如下:

rabbitmq-plugins enable  rabbitmq_management最后使用 guest/guest登录成功即可!


第五步:接下来进行集群构建:

71、72、73任意一个节点为Master(这里选择71为Master)

也就是说我们需要把71的Cookie文件同步到72、73节点上去,先停止所有服务器:/etc/init.d/rabbitmq-server stop

然后进入制定目录(/var/lib/rabbitmq/)并远程copy 文件到72、73节点,如下操作:scp /var/lib/rabbitmq/.erlang.cookie到192.168.11.72和192.168.11.73中


第六步:组成集群

首先启动三个节点:rabbitmq-server -detached

然后把72和73分别加入到71中,组成集群 [--ram]为节点以什么方式加入到集群中

ram为内存 存储 默认不加为disk磁盘存储,操作如下:

node72:rabbitmqctl stop_app

node72:rabbitmqctl join_cluster [--ram] rabbit@bhz71

node72:rabbitmqctl start_app


第七步:修改集群名称:

rabbitmqctl set_cluster_name rabbitmq_cluster1


第八步:查看集群状态:

rabbitmqctl cluster_status ,如下所示表示集群构建OK!



第九步:构建镜像队列,任意节点执行命令如下:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'


第十步:查看管控台发现集群已经构建成功



二.RabbitMQ与SpringBoot2.x整合

生产者端:

第一步:pom.xml配置如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bfxy</groupId>
    <artifactId>rabbitmq-springboot-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-springboot-producer</name>
    <description>rabbitmq-springboot-producer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>


第二步:application.properties配置文件

spring.rabbitmq.addresses=192.168.11.71:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true  #confirm模式
spring.rabbitmq.publisher-returns=true   #return机制
spring.rabbitmq.template.mandatory=true  #与return机制结合配置次属性


第三步:编写RabbitSender生产端代码

package com.bfxy.springboot.producer;

import java.util.Map;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  

    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };

    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey)
 
{
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1""springboot.abc", msg, correlationData);
    }

}


消费者:

第一步:pom文件同生产者一致

第二步:application.properties配置文件

spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手工签收
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10


第三步:RabbitRecever消费端代码

package com.bfxy.springboot.conusmer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1"
            durable="true")
,
            exchange = @Exchange(value = "exchange-1"
            durable="true"
            type= "topic"
            ignoreDeclarationExceptions = "true")
,
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}


生产端测试:

package com.bfxy.springboot;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.bfxy.springboot.producer.RabbitSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Test
    public void testSender1() throws Exception {
         Map<String, Object> properties = new HashMap<>();
         properties.put("number""12345");
         properties.put("send_time", simpleDateFormat.format(new Date()));
         rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }

}



若你想要全面学习RabbitMQ消息中间件,

这门《RabbitMQ消息中间件技术精讲》都是你理想的选择

无论你是刚开始java开发的初级工程师,

还是希望更多接触大厂开发技术,

甚至是寻求职业晋升途径的Java开发者,


点击下图,抢先订阅吧



 • end • 


[ 若你有优质的文章想与大家分享,欢迎投稿。]


联系方式:微信(lsm20130823)
邮箱(lisy@imooc.com)



 
慕课网 更多文章 作为50 岁的开发者,面对被裁的命运,我如何绝地求生? 专栏 | 凉凉了,Python工程师凭什么拿到这么高薪资? 专栏 | 你想学习的Vue技能知识点,都在这里 [杂谈] 了解一些额外知识,让前端开发锦上添花 完全掌握这些React知识点,20K以上薪资没问题
猜您喜欢 写给“凡人”看的区块链入门指南 HTML 5 Web Workers 这可能是最适合C++学习者的书单 复利时间压缩机 测试人员在敏捷团队中扮演的角色