本文最后更新于2 分钟前,文中所描述的信息可能已发生改变。
简介
RocketMQ为Alibaba开发的消息队列。优点:实现应用解耦、异步提速(响应速度)、削峰填谷(囤积消息慢慢处理)。缺点:可用性降低(如何保证中间件的可靠)、复杂性提高(耦合了中间件的代码)、一致性问题(即分布式事务)。
工作过程
包含消息生产者(producer)集群P、命名服务器集群(类似注册中心)B、消息服务器集群(部署了RocketMQ的集群)R、消费者(consumer)集群C。
集群启动时会将自己的ip信息注册到命名服务器集群并建立长连接,生产者集群通过询问命名服务器集群来确定将消息发送到哪个消息服务器中。消费者也会和消息服务器建立长连接。
环境搭建(windows)
windows 下将下好的二进制包解压到无中文目录下(例如:D:\mysoftware\rocketmq-all-5.1.0-bin-release
),并将该应用的目录添加到环境变量中(系统变量或用户变量都可),变量名为 ROCKETMQ_HOME 。
测试
- 启动命名服务器:在其bin目录下打开cmd并输入命令
start mqnamesrv.cmd
。jdk8+版本需要将runserver.cmd
文件中的set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
和set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
两行注释掉。
set CLASSPATH=.;%BASE_DIR%conf;%BASE_DIR%lib\*;%CLASSPATH%
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
rem set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
rem set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages"
set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp "%CLASSPATH%""
且runbroker.cmd
也需要进行更改,将JVM的内存大小改小一点并注释掉三行
rem ===========================================================================================
rem JVM Configuration
rem ===========================================================================================
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g" rem 第一行
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m" rem 修改分配给JVM的内存大小
rem set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 rem 第二行-XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
rem set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:%USERPROFILE%\mq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" rem 第三行
rem set "JAVA_OPT=%JAVA_OPT% -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch"
set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking"
set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true"
set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%"
- 启动消息服务器:输入命令
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
其中参数-n
表示指定命名服务器的地址。 - 测试发送消息与接收消息:输入命令
tools.cmd org.apache.rocketmq.example.quickstart.Producer
发送消息。输入命令tools.cmd org.apache.rocketmq.example.quickstart.Consumer
接收消息。
图形化界面安装
打包成jar包再运行。
RocketMQ生产者与消费者示例
在消息服务器与命名服务器都启动的情况下。
- 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.2.0</version>
</dependency>
- 生产者(producer)示例
public static void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("127.0.0.1:9876");//设置命名服务器地址
producer.start();
Message message = new Message("topic1", "hello world!".getBytes(StandardCharsets.UTF_8));//参数为topic和要发送的消息
SendResult result = producer.send(message);
System.out.println("result = " + result);
producer.shutdown();
}
- 消费者(consumer)示例
public static void consumer() throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group1");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("topic1","*");//订阅一个消息,"*"匹配规则
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(messageExt -> {
System.out.println("messageExt = " + messageExt);
System.out.println("body = " + new String(messageExt.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
}
一(producer)对多(consumer)的消息发送模式
- 消费者属于同一组并监听同一个主题的消息:默认是负载均衡模式(即消费者会争抢消息),可以设置成广播模式(每一个消费者都收到全部消息)。使用
pushConsumer.setMessageModel(MessageModel.BROADCASTING);
即可设置成广播模式。 - 消费者属于不同一组并监听同一个主题的消息:每一个消费者都收到全部消息。
消息类型
- 同步消息:消息发送完需要等待返回结果看消息是否发送成功,是及时性强的消息。默认的 send 方法发送的就是此类消息。
- 异步消息:消息发送完不需要等待返回结果,通过回调函数来异步通知结果。
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功的结果 = " + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败的异常 = " + throwable);
}
});
注意:不能关闭生产者。
- 单向消息:不需要关系消息发送的返回结果,即不关心消息是否发送成功。使用
producer.sendOneway(message);
来发送。 - 延时消息:可以设置消息延时多久后发送,通过
message.setDelayTimeLevel(3);
可以设置消息延时等级,3级代表延时10s发送。各等级对应时间"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。 - 批量消息:在一次连接中发送多个消息,用
List<Message>
来存储多个消息。注意批量发送的消息不能是延时消息,消息主题需要一致且消息内容总长度不超过4M。
消息过滤
消费者过滤接收某种类型的消息。
- 通过标签来过滤:可以在使用
pushConsumer.subscribe("topic1","tag1 || tag2")
的第二个参数来设置接收消息的标签类型。在生产者处设置消息的标签。 - sql过滤:可以在生产者处给消息添加一些自定义属性
message.putUserProperty("key","value");
,并在消费者处进行指定要满足要求的消息,即对设置的属性进行sql过滤。想要开启sql过滤需要在配置文件broker.conf
中添加enablePropertyFilter=true
。
整合SpringBoot
- 依赖引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<!-- 版本在父工程中管理了为2.3.0 -->
</dependency>
- 编写生产者与消费者
生产者
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String sendMsg(){
rocketMQTemplate.convertAndSend("topic1:tag1","小石潭记");
SendResult sendResult = rocketMQTemplate.syncSend("topic1:tag1", MessageBuilder.withPayload("同步消息").build());
System.out.println("sendResult = " + sendResult);
return "Ok!";
}
}
消费者
@Service
@RocketMQMessageListener(topic = "topic1",consumerGroup = "group1",selectorExpression = "tag1")
public class ConsumerServer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消息 = " + s);
}
}
消息顺序同步
在生产者端使用rocketMQTemplate.syncSendOrderly("topic1","有顺序的消息内容","通常是一些id");
来发送消息,第三个参数可以让有顺序的消息发往同一个队列。在消费者端的注解@RocketMQMessageListener(topic = "topic1",consumerGroup = "group1",selectorExpression = "tag1",consumeMode = ConsumeMode.ORDERLY)
上指定consumeMode = ConsumeMode.ORDERLY
,因为消费者端接收消息默认是并发多线程去接收的,设置后变为单线程顺序接收消息。
事务消息
未整合SpringBoot代码示例:
public static void producerTransaction() throws MQClientException{
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("127.0.0.1:9876");//设置命名服务器地址
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//模拟
String isOk = "OK";
LocalTransactionState state = null;
if(isOk.equals("OK")){
//正常事务执行
state = LocalTransactionState.COMMIT_MESSAGE;//返回值为此会通知消息服务器将消息提交到消息队列当中
}else if(isOk.equals("NO")){
//事务异常执行
state = LocalTransactionState.ROLLBACK_MESSAGE;//会通知消息服务器让其删除消息
}else {
state = LocalTransactionState.UNKNOW;//会触发事务补偿
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//补偿事务过程
System.out.println(" 事务补偿 ");
//根据检测结果返回LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW 当也为UNKNOW时则人工介入。
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message message = new Message("topic1","tag1", "hello world!".getBytes(StandardCharsets.UTF_8));//参数为topic和要发送的消息
SendResult result = producer.sendMessageInTransaction(message,null);
System.out.println("result = " + result);
//发送事务消息不能关闭生产者
}
高级特性
- 读写快:事先会申请一块大的连续的内存空间,保证能顺序读写,提高速度。
- "零复制":减少了一次数据的复制。
- 同步刷盘(读入消息到内存会马上写入磁盘)与异步刷盘(等读入内存的消息达到一定量的时候才会写入磁盘,节约了建立连接的资源)。
- 高可用性:
- 命名服务器:无状态(相互独立)+全服务注册(每个服务都会向其注册)
- 消息服务器:主从架构(2M-2S)
- 消息生产者:生产者将相同的 topic 绑定到多个 group ,保障某个主节点挂掉后其他主节点还能正常接收。
- 消息消费:会根据主消息服务器的压力决定是否由从节点来提供数据读取的工作,即不仅可以从主节点上读取消息也可以从从节点上读取消息,因为他们的消息是同步的,可以由此来减轻主节点的压力。
- 负载均衡:
- 生产者->消息服务器:按顺序轮流发到队列中。
- 消息服务器->消费者:按照循环平均分配来接收消息。
- 消息重试:
- 有序消息(消息服务器->消费者):消息服务器每隔一秒会自动重发,会阻塞需要监控。
- 无序消息(消息服务器->消费者):总共会重发16次,间隔时长越来越大,如果最后发完还没有接收到ACK则将消息加入死信队列当中只保存3天。
- 消息重复消费:设置消息幂等性(以业务id)解决。