在本地安装RabbitMQ服务或者任意个人服务器安装server
-
系统环境
- JDK1.8
- Centos7-64
- Erlang-OTP 23
- RabbitMQ-3.8.5
-
通过rpm 安装Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
-
安装Erlang
yum install -y erlang
-
查看erl版本号
erl
- Erlang 安装完成
- 导入key
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
rpm --import https://packagecloud.io/gpg.key
- 设置RabbitMQ 前置条件
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
- 下载RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 访问链接下载后,将rpm包上传至服务器-导入key
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
- 安装socat
yum -y install epel-release
yum -y install socat
- 安装RabbitMQ rpm 文件
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
- 启用管理平台插件,启用插件后,可以可视化管理RabbitMQ
rabbitmq-plugins enable rabbitmq_management
- 启动MQ
systemctl start rabbitmq-server
- 访问控制图形化界面 -> IP:15672
- MQ 默认账号guest 密码 guest
- 创建专属账号进行赋权使用-> 账号:admin 密码:admin
rabbitmqctl add_user admin admin
- 设置admin为超级管理员
rabbitmqctl set_user_tags admin administrator
- 授权远程访问(也可以登录后,可视化配置)
rabbitmqctl set_permissions -p / admin "." "." ".*"
- 创建完成后,重启RabbitMQ
systemctl restart rabbitmq-server
<!-- rabbit mq--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.10.RELEASE</version></dependency>
spring: port: 8088 rabbitmq: first: host: 42.156.222.164 port: 5672 username: admin password: admin #消费端配置 listener: simple: concurrency: 10 #消费端 max-concurrency: 20 #最大消费端数 acknowledge-mode: auto #自动签收auto 手动 manual prefetch: 1 #限流(海量数据,同时只能过来一条) second: host: localhost port: 5672 username: admin password: admin
package com.study;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class MqApplication { public static void main(String[] args) { SpringApplication.run(MqApplication.class, args); }}
package com.study.mq.rabbitmqConfig;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;@Configurationpublic class RabbitConfig { @Bean(name = "firstConnectionFactory") @Primary public ConnectionFactory firstConnectionFactory( @Value("${spring.rabbitmq.first.host}") String host, @Value("${spring.rabbitmq.first.port}") int port, @Value("${spring.rabbitmq.first.username}") String username, @Value("${spring.rabbitmq.first.password}") String password ) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory( @Value("${spring.rabbitmq.second.host}") String host, @Value("${spring.rabbitmq.second.port}") int port, @Value("${spring.rabbitmq.second.username}") String username, @Value("${spring.rabbitmq.second.password}") String password ) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } @Bean(name = "firstRabbitTemplate") @Primary public RabbitTemplate firstRabbitTemplate( @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory ) { return new RabbitTemplate(connectionFactory); } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate( @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory ) { return new RabbitTemplate(connectionFactory); } @Bean(name = "firstFactory") public SimpleRabbitListenerContainerFactory firstFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }}
package com.study.mq.Receiver;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.study.mq.web.bean.entity.MqMsg;import com.study.mq.web.mapper.MqMsgMapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * RabbitMQ中的消费者,接收first RabbitMQ中的队列first的数据 */@Componentpublic class Receiver { @Resource private MqMsgMapper mapper; @RabbitListener(queues = "first", containerFactory = "firstFactory") @RabbitHandler public void process(String msg) { final JSONObject object = JSONUtil.parseObj(msg); final String context = object.getStr("context"); System.out.println("Receiver : " + context); final MqMsg mqMsg = new MqMsg(); mqMsg.setContext(context); mapper.insert(mqMsg); }}
package com.study.mq.Receiver;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * RabbitMQ中的消费者,接收second RabbitMQ中的队列second的数据 */@Componentpublic class Receiver2 { @RabbitListener(queues = "second", containerFactory = "secondFactory") @RabbitHandler public void process(String msg) { System.out.println("Receiver : " + msg); }}
package com.study.mq.sender;import java.util.Date;import java.util.HashMap;import javax.annotation.Resource;import cn.hutool.core.map.MapUtil;import cn.hutool.json.JSONUtil;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;/** * RabbitMQ中的生产者,发送消息到RabbitMQ中first队列 */@Componentpublic class FirstSender { @Resource(name="firstRabbitTemplate") private RabbitTemplate firstRabbitTemplate; public void send1() { final HashMap<Object, Object> map = MapUtil.newHashMap(); map.put("context", "一条消息"); final String msg = JSONUtil.toJsonStr(map); this.firstRabbitTemplate.convertAndSend("first","firstDirectRouting", msg); }}
package com.study.mq.sender;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.Date;/** * RabbitMQ中的生产者,发送消息到RabbitMQ中的second队列 *///@Component@RestControllerpublic class SecondSender { @Resource(name = "secondRabbitTemplate") private RabbitTemplate secondRabbitTemplate; public void send1() { String context = "第一次发送 " + new Date(); System.out.println("Sender : " + context); this.secondRabbitTemplate.convertAndSend("second","secondRoutingKey", context); }}
package com.study.mq;import com.study.mq.sender.FirstSender;import com.study.mq.sender.SecondSender;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class TestDemo { @Resource private FirstSender firstSender; @Resource private SecondSender secondSender; @GetMapping("/firstSend") public void firstSend() throws Exception { firstSender.send1(); } @GetMapping("/secondSend") public void secondSend() throws Exception { secondSender.send1(); }}
- 启动项目, 调用 /firstSend 接口向第一个first 源发送一条消息
- 测试项目时先注释消费者 @RabbitListener 监听注解,这样发送消息后不至于被马上消费
- 消息发送成功后,再放开@RabbitListener注释,重新启动项目可以查看到消息已被消费