深入解析中间件之-RocketMQ

Apache RocketMQ: http://rocketmq.apache.org/

QuickStart

分别启动NameServer、Broker、生产者、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> nohup sh bin/mqnamesrv &
> nohup sh bin/mqbroker -n localhost:9876 &

> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D36B0000, offsetMsgId=AC112A0100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D3F50001, offsetMsgId=AC112A0100002A9F00000000000000B2, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D3FB0002, offsetMsgId=AC112A0100002A9F0000000000000164, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D4000003, offsetMsgId=AC112A0100002A9F0000000000000216, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D4040004, offsetMsgId=AC112A0100002A9F00000000000002C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D4080005, offsetMsgId=AC112A0100002A9F000000000000037A, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D40C0006, offsetMsgId=AC112A0100002A9F000000000000042C, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC112A0140641B6D35866042D4100007, offsetMsgId=AC112A0100002A9F00000000000004DE, messageQueue=MessageQueue [topic=TopicTest, brokerName=dp0652, queueId=2], queueOffset=1]

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1508402192396, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192398, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F000000000000042C, commitLogOffset=1068, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D40C0006, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=1, storeSize=179, queueOffset=2, sysFlag=0, bornTimestamp=1508402192410, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192412, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F00000000000006F4, commitLogOffset=1780, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243399, UNIQ_KEY=AC112A0140641B6D35866042D41A000A, WAIT=true, TAGS=TagA}, body=17]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1508402192384, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192386, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F0000000000000216, commitLogOffset=534, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D4000003, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1508402192373, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192377, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F00000000000000B2, commitLogOffset=178, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D3F50001, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1508402192236, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192319, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243397, UNIQ_KEY=AC112A0140641B6D35866042D36B0000, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1508402192388, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192390, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F00000000000002C8, commitLogOffset=712, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D4040004, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1508402192400, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192401, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F00000000000004DE, commitLogOffset=1246, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D4100007, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1508402192379, bornHost=/172.17.42.1:55844, storeTimestamp=1508402192382, storeHost=/172.17.42.1:10911, msgId=AC112A0100002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1508402243398, UNIQ_KEY=AC112A0140641B6D35866042D3FB0002, WAIT=true, TAGS=TagA}, body=16]]]

17884 org.apache.rocketmq.namesrv.NamesrvStartup
17965 org.apache.rocketmq.broker.BrokerStartup -n localhost:9876

RocketMQ的数据目录在store下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[qihuang.zheng@dp0652 ~]$ tree store
store
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   └── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   └── TopicTest
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   ├── 2
│   │   └── 00000000000000000000
│   └── 3
│   └── 00000000000000000000
└── index
└── 20171019163632344

数据相关的文件夹有三个:

  • commitlog:提交日志
  • consumequeue:消费队列
  • index:索引文件

查看commitlog的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[qihuang.zheng@dp0652 ~]$ strings store/commitlog/00000000000000000000 | head -30
Hello RocketMQ 0 TopicTest
>UNIQ_KEY
AC112A0140641B6D35866042D36B0000
WAIT
true
TAGS
TagA
Hello RocketMQ 1 TopicTest
>UNIQ_KEY
AC112A0140641B6D35866042D3F50001
WAIT
true
TAGS
TagA
Hello RocketMQ 2 TopicTest
>UNIQ_KEY
AC112A0140641B6D35866042D3FB0002
WAIT
true
TAGS
TagA
Hello RocketMQ 3 TopicTest

消费者的相关配置:

  • 消费者对订阅主题的消费进度存储在consumerOffset.json配置文件中
  • 消费者所属的消费组信息存储在subscriptionGroup.json配置文件中
  • 消费者订阅的主题存储在topics.json配置文件中

Kafka中消费者订阅信息存储在ZooKeeper中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
[qihuang.zheng@dp0652 ~]$ cat store/config/consumerFilter.json
{
"filterDataByTopic":{}
}
[qihuang.zheng@dp0652 ~]$ cat store/config/delayOffset.json
{
"offsetTable":{}
}

[qihuang.zheng@dp0652 ~]$ cat store/config/consumerOffset.json
{
"offsetTable":{
"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250
},
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
}
}
}

[qihuang.zheng@dp0652 ~]$ cat store/config/subscriptionGroup.json
{
"dataVersion":{
"counter":1,
"timestamp":1508402243205
},
"subscriptionGroupTable":{
"please_rename_unique_group_name_4":{
"brokerId":0,
"consumeBroadcastEnable":true,
"consumeEnable":true,
"consumeFromMinEnable":true,
"groupName":"please_rename_unique_group_name_4",
"notifyConsumerIdsChangedEnable":true,
"retryMaxTimes":16,
"retryQueueNums":1,
"whichBrokerWhenConsumeSlowly":1
}
}
}

[qihuang.zheng@dp0652 ~]$ cat store/config/topics.json
{
"dataVersion":{
"counter":2,
"timestamp":1508402243219
},
"topicConfigTable":{
"TopicTest":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
}
}
}

在本机测试时,没有遇到问题。但是IDE连接远程机器时,报错连接不上,这是因为服务端装了docker导致IP有问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6915]ms, Topic: TopicTestA, BrokersSent: [dp0652, dp0652, dp0652]
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:544)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1065)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:212)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10909> failed

172.17.42.1这个IP地址是docker的

1
2
3
4
5
6
[qihuang.zheng@dp0652 rocketmq]$ ifconfig
docker0 Link encap:Ethernet HWaddr CA:3E:ED:C2:67:20
inet addr:172.17.42.1 Bcast:0.0.0.0 Mask:255.255.0.0

em1 Link encap:Ethernet HWaddr B0:83:FE:C7:02:B3
inet addr:192.168.6.52 Bcast:192.168.6.255 Mask:255.255.255.0

用模板生成,可以看到brokerIP1就是docker的IP:

1
2
3
4
5
6
7
8
[qihuang.zheng@dp0652 rocketmq]$ sh bin/mqbroker -m > broker.p

[qihuang.zheng@dp0652 rocketmq]$ cat broker.p
2017-10-19 17\:58\:00 INFO main - namesrvAddr=localhost:9876
2017-10-19 17\:58\:00 INFO main - brokerIP1=172.17.42.1
2017-10-19 17\:58\:00 INFO main - brokerName=dp0652
2017-10-19 17\:58\:00 INFO main - brokerClusterName=DefaultCluster
2017-10-19 17\:58\:00 INFO main - brokerId=0

接下来重启broker:

1
2
3
4
[qihuang.zheng@dp0652 rocketmq]$ sh bin/mqshutdown broker
The mqbroker(29723) is running...
Send shutdown request to mqbroker(29723) OK
[qihuang.zheng@dp0652 rocketmq]$ nohup sh bin/mqbroker -n localhost:9876 -c broker.properties &

重启后发送消息正常,这里把Topic改成TopicTestA:

1
SendResult [sendStatus=SEND_OK, msgId=0A39F12CF5A6355DA25460935C280000, offsetMsgId=C0A8063400002A9F000000000002BEB2, messageQueue=MessageQueue [topic=TopicTestA, brokerName=dp0652, queueId=0], queueOffset=0]

查看store,可以看到commitlog没有新增文件夹,而consumequeue则新增了TopicTestA文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
├── commitlog
│   └── 00000000000000000000
├── consumequeue
│   ├── TopicTest
│   │   ├── 0
│   │   │   └── 00000000000000000000
│   │   ├── 1
│   │   │   └── 00000000000000000000
│   │   ├── 2
│   │   │   └── 00000000000000000000
│   │   └── 3
│   │   └── 00000000000000000000
│   └── TopicTestA
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   ├── 2
│   │   └── 00000000000000000000
│   └── 3
│   └── 00000000000000000000

API示例

生产者

同步的生产者:http://rocketmq.apache.org/docs/simple-example/

1
2
3
4
5
6
7
8
9
10
11
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.6.52:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTestA", "TagA",
("RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();

异步的生产者:

1
2
3
4
5
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
});

一次性的生产者,主要用于日志收集:

一个 RPC 调用,通常是这样一个过程:

  1. 客户端发送请求到服务器
  2. 服务器处理该请求
  3. 服务器向客户端返回应答

所以一个 RPC 的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,
例如日志收集类应用,此类应用可以采用 oneway 形式调用,oneway 形式只发送请求不等待应答,
而发送请求在客户端实现层面仅仅是一个 os 系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒级。

1
producer.sendOneway(msg);

有序的生产者:http://rocketmq.apache.org/docs/order-example/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
MQProducer producer = new DefaultMQProducer("example_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag , message key and body.
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId); // 最后一个参数orderId作为第二个参数的arg值
System.out.printf("%s%n", sendResult);
}
producer.shutdown();

定时生产者:http://rocketmq.apache.org/docs/schedule-example/

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
如果要支持任意的时间精度,在 Broker 局面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s,10s,1m 等。
定时消息是在生产者端设置DelayTimeLevel,消费者端不做任何处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}

}

批量消息:http://rocketmq.apache.org/docs/batch-example/

简单的批量消息只需要构造List,调用producer.send()即可。不过在一个Batch中消息大小不能超过1Mib,需要程序手动进行切分。

消费者

拉取消费者(PullConsumer):

  • 首先根据Topic获取订阅的MessageQueue
  • 对每个MessageQueue,都会调用pullBlockIfNotFound方法消费这个队列里的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: " + mq + "%n");
SINGLE_MQ:
while (true) {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
}
consumer.shutdown();
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}

推送消费者(PushConsumer):

  • 订阅方法的第二个参数为*,表示所有的Tag,不进行过滤
  • Push推送方式采用注册消息监听器的方式,当收到Broker推送的消息,就会触发监听器的回调
1
2
3
4
5
6
7
8
9
10
11
12
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Jodie_topic_1023", "*");

// 下面几个采用Push模式的消费者的监听器都一样
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");

广播模式的推送消费者,相比上一个示例增加了设置消息模型(setMessageModel),其他没有变化。

1
2
3
4
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");

过滤器的消费者。过滤器采用Push方式时,过滤逻辑在Broker实现,Broker把过滤过的数据发送给消费者。
如果过滤器采用Pull模式,所有的数据都会传送到消费者,然后在消费者端执行过滤逻辑。

1
2
3
4
5
6
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
String filterCode = MixAll.file2String(classFile);
// 订阅方法的第二个参数是过滤器的实现类,而前面示例的第二个参数是Tag过滤
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl", filterCode);

SQL消费者(生产者发送消息时通过putUserProperty可以指定自定义的属性,除了Tag外,自定义属性也可以被过滤):

1
2
3
4
5
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 订阅方法的第二个参数是消息选择器
consumer.subscribe("TopicTest", MessageSelector.bySql(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 3)"));

有序的消费者:前面几种消费者注册的监听器是:MessageListenerConcurrently,这里是MessageListenerOrderly。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

消费者的监听器有两种形式:并发和有序。参考:http://rocketmq.apache.org/docs/best-practice-consumer/

监听器 上下文 返回状态 返回码
MessageListenerConcurrently ConsumeConcurrentlyContext ConsumeConcurrentlyStatus CONSUME_SUCCESS
MessageListenerOrderly ConsumeOrderlyContext ConsumeOrderlyStatus SUCCESS、ROLLBACK、COMMIT、SUSPEND_CURRENT_QUEUE_A_MOMENT

消息消费的顺序问题:

  • 并发情况下,返回RECONSUME_LATER,表示过一会儿再消费,先去消费其他消息
  • 有序情况下,返回SUSPEND_CURRENT_QUEUE_A_MOMENT,表示等一会儿再消费,无法消费其他消息

基本流程

Remoting RPC示例

rocketmq-remoting模块采用Netty封装了RPC的调用,包括客户端和服务端之间的交互。

不同分布式系统在通信上都会实现RPC模块,比如Kafka、Hadoop等都有各自的RPC实现。

先来查看测试用例RemotingServerTest的使用方法:

  • 启动RemotingServer和RemotingClient
  • 调用RemotingClient的invokeAsync()或者invokeSync()、invokeOneway()方法

以异步调用为例,RemotingClient的invokeAsync()方法主要有三个参数:

  • 服务端地址,RPC调用需要指定服务端的地址,这样客户端才能发送请求,让服务端处理
  • 远程指令(RemotingCommand),即客户端发送的请求
  • 回调对象(InvokeCallback),即客户端收到服务端返回的响应结果后,如何处理

RPC调用的具体步骤如下:

  • 启动客户端和服务端
  • 客户端构造远程指令对象
  • 客户端通过RemotingClient同步或者异步调用
  • 服务端在启动时注册的处理器,会处理客户端发送的请求,即调用处理器的processRequest()方法
  • 服务端处理完请求后,返回响应给客户端
  • 客户端收到服务端返回的响应结果,会触发回调对象调用operationComplete()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}

public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
latch.await();
}

RemotingServer的registerProcessor()方法有三个参数:

  • 请求编码,比如SEND_MESSAGE表示(生产者)客户端发送消息的请求
  • 请求处理器,比如服务端如何处理客户端发送消息的处理器,实现类为:SendMessageProcessor
  • 处理线程,每种请求编码都对应一个处理线程池。如果没有指定,则使用默认的线程池

客户端调用服务端有三种方式:同步(Sync)、异步(Async)、一次性(OneWay)。前两种有响应结果,最后一种不产生响应结果。

Netty RPC

NettyRemotingServer在启动时,会绑定NettyServerHandler。Netty RPC的特点如下:

  • 请求和响应都是用RemotingCommand对象来表示
  • 服务端(NettyRemotingServer)和客户端(NettyRemotingClient)实现了抽象的NettyRemotingAbstract
  • 抽象类根据不同的指令类型调用不同的处理方法,比如处理请求调用processRequestCommand,处理响应调用processResponseCommand

下面举例客户端和服务端执行一次RPC调用链路的过程:

  • 客户端发送请求给服务端,通过Netty的Channel发送请求给服务端
  • 服务端处理客户端发送的请求,NettyServerHandler接收的消息类型为REQUEST_COMMAND,调用processRequestCommand方法
  • 服务端处理完成后,通过Netty的Channel发送响应结果给客户端
  • 客户端处理服务端发送的响应,NettyClientHandler接收的消息类型为RESPONSE_COMMAND,调用processResponseCommand方法

NettyRemotingAbstract用processorTable变量记录了请求编码、处理器、线程池之间的关系。

  • 每个请求编码都对应了一种唯一的处理器,相同请求编码的处理器是相同的
  • 由于处理器与线程池组成一对,所以相同请求编码的请求在相同的线程池中执行

不同的请求编码在不同的线程池中运行,以发送消息和消费消息为例:

请求编码(request code) 处理器 线程池
SEND_MESSAGE SendMessageProcessor ExecutorService#1
GET_MESSAGE PullMessageProcessor ExecutorService#2

以经典的RPC通信模型来看,客户端向服务端发起RPC调用请求。那么processorTable主要针对服务端,responseTable则主要针对客户端。

  • 客户端发起RPC调动时,会创建异步的响应对象,并放入将opaque和ResponseFuture的映射关系放入responseTable
  • 当客户端收到服务端发送的响应结果后,会将opaque以及ResponseFuture从responseTable中移除

那么opaque是如何在请求和响应之间进行关联的呢?下面代码中的注释说明了opaque在请求和响应之间的设置和获取流程。

opaque表示:请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

// 处理请求,比如服务端处理客户端发送的请求,NettyServerHandler会调用到这里
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 4. 从请求对象中获取opaque,那么什么时候opaque设置到请求中?
// 这里的cmd实际上是步骤3的request,因为步骤1已经有opaque,所以这里也能取到opaque
final int opaque = cmd.getOpaque();
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
// 5. 将opaque设置到响应对象中
response.setOpaque(opaque);
// 6. 发送响应对象给客户端
ctx.writeAndFlush(response);
}

// 处理响应,比如客户端处理服务端发送的响应,NettyClientHandler会调用到这里
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
// 7. 从响应对象中获取opaque,那么什么时候opaque设置到响应里?答案在步骤5中
// 这里的cmd是步骤5的response,而response的opaque来自于request
final int opaque = cmd.getOpaque();
// 8. 根据opaque从responseTable中获取出对应的ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseFuture.release();
// 9. 将opaque与ResponseFuture的映射关系从responseTable中移除,与步骤2互相对应
responseTable.remove(opaque);
// 执行客户端在发送RPC调用时定义的回调函数
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
}
}

// 客户端发起RPC调用
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request) {
// 1. 从请求中获取opaque
final int opaque = request.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
// 2. 创建ResponseFuture,并记录到responseTable
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 3. 发起RPC调用
channel.writeAndFlush(request);
}

生产者发送消息

以example/quickstart下的Producer发送消息为例,入口方法走到DefaultMQProducerImpl的sendDefaultImpl()方法。

发送消息过程涉及下面几个步骤:

  • tryToFindTopicPublishInfo():根据消息的Topic获取TopicPublishInfo
  • selectOneMessageQueue():选择一个MessageQueue
  • sendKernelImpl():调用内核的发送方法
  • 如果是同步调用,返回SendResult,否则返回空

接下来进入DefaultMQProducerImpl的内核发送方法,主要的参数有:Message、MessageQueue、TopicPublishInfo

  • 如果有Hook,构造SendMessageContext,将Message、MessageQueue等都设置为上下文对象的成员变量
  • 构造SendMessageRequestHeader
  • 从MQClientFactory获取getMQClientAPIImpl()实现类MQClientAPIImpl,调用sendMessage()方法

接下来进入MQClientAPIImpl的sendMessage()方法

  • 根据RequestCode.SEND_MESSAGE(请求编码)和SendMessageRequestHeader(请求头)创建RemotingCommand对象
  • 设置请求的body为消息内容:request.setBody(msg.getBody())
  • 调用remotingClient.invokeAsync()或者invokeSync()方法
  • 对于同步调用,因为要等待结果返回,所以会立即调用processSendResponse()
  • processSendResponse()方法返回一个SendResult对象
1
2
3
4
5
6
7
8
9
10
11
12
private SendResult sendMessageSync(
final String addr, // Broker的地址
final String brokerName, // Broker的名字
final Message msg, // 消息内容
final long timeoutMillis,
final RemotingCommand request // 请求对象
) {
// RPC调用示例,这里的客户端是生产者,通过MQClientAPIImpl调用
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

生产者通过MQClientAPIImpl发起RPC调用,request请求对象的编码是SEND_MESSAGE。这里的地址指的是Broker的地址,而不是NameServer。
虽然生产者连接的是NameServer,但这中间会有选择MessageQueue,再选择Broker的过程,由于这里先关注整体的流程,暂时不去分析具体的细节。

客户端通过RemotingClient调用了服务端Broker,接下来看服务端BrokerController的处理。

BrokerController启动时会为各种请求类型注册不同的请求处理器,比如SEND_MESSAGE注册了SendMessageProcessor处理器:

1
2
3
4
5
6
7
public void registerProcessor() {
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// SendMessageProcessor有两个Hook:发送消息和消费消息的Hook。
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
}

SendMessageProcessor的processRequest()方法会处理生产者客户端发送的SEND_MESSAGE请求。

客户端在发送请求之前构建了SendMessageContextSendMessageRequestHeader,这里对应的会首先从RemotingCommand反解析出着两个对象

  • 解析请求的body,创建MessageExtBrokerInner对象
  • 获取MessageStore,并调用putMessage方法,传入MessageExtBrokerInner对象
  • 返回PutMessageResult,并调用handlePutMessageResult方法
  • 最后返回的是一个RemotingCommand响应对象,会返回给客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default: // SEND_MESSAGE的处理逻辑...
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}

接下来进入DefaultMessageStore的putMessage()方法,这个方法会调用CommitLog的putMessage()方法

  • BrokerController和SendMessageProcessor都在broker模块
  • MessageStore和CommitLog则在store模块

CommitLog首先获取最近的MappedFile,然后追加消息到映射文件中。

  • 追加消息的回调类DefaultAppendMessageCallback是执行数据写入文件的真正方法。
  • 追加完成后,有多种的磁盘刷写方式,比如同步和异步
1
2
3
4
5
6
7
8
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback);
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}

同样,我们省略了具体写入到CommitLog中的细节,以及如何处理磁盘的刷写、HA等细枝末节。实际上,到这里为止,
生产者客户端发起RPC调用,到服务端处理请求,服务端返回响应,客户端接收响应结果,这个过程已经分析完毕了。

Pull Consumer

PULL_MESSAGE对应的处理器是PullMessageProcessor。与生产消息调用MessageStore的putMessage()类似,
消费消息调用MessageStore的getMessage()方法,并返回GetMessageResult。

请求编码 消息处理器 消息存储 结果
SEND_MESSAGE SendMessageProcessor putMessage() PutMessageResult
PULL_MESSAGE PullMessageProcessor getMessage() GetMessageResult

消费者还需要提交偏移量,对应ConsumerOffsetManager的commitOffset()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), // 消费组
requestHeader.getTopic(), // 主题
requestHeader.getQueueId(), // 队列编号
requestHeader.getQueueOffset(), // 队列的偏移量
requestHeader.getMaxMsgNums(), // 最大的消息数量
messageFilter); // 过滤器
// .......................................................
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());
}
}

存储层设计到文件操作时,生产消息会写到CommitLog,消费消息则会调用getMessage方法,给定偏移量和大小。

设计

架构

arch

  • NameServer Cluster
    Name Servers provide lightweight service discovery and routing.
    Each Name Server records full routing information(路由信息表),
    provides corresponding reading and writing service,
    and supports fast storage expansion.
  • Broker Cluster
    Brokers take care of message storage by providing lightweight TOPIC and QUEUE mechanisms.
    They support the Push and Pull model, contains fault tolerance mechanism (2 copies or 3 copies),
    and provides strong padding of peaks and capacity of accumulating hundreds of billion messages in their original time order.
    In addition, Brokers provide disaster recovery, rich metrics statistics, and alert mechanisms, all of which are lacking in traditional messaging systems.
  • Producer Cluster
    Producers support distributed deployment.
    Distributed Producers send messages to the Broker cluster through multiple load balancing modes.
    The sending processes support fast failure and have low latency.
  • Consumer Cluster
    Consumers support distributed deployment in the Push and Pull model as well.
    It also supports cluster consumption(集群消费) and message broadcasting(消息广播).
    It provides real-time message subscription mechanism and can meet most consumer requirements. R

NameServer is a fully functional server, which mainly includes two features:

  • Broker Management, NameServer accepts the register from Broker cluster and provides heartbeat mechanism to check whether a broker is alive.
  • Routing Management, each NameServer will hold whole routing info about the broker cluster and the queue info for clients query.

Broker server is responsible for message store and delivery, message query, HA guarantee, and so on.

  • Remoting Module, the entry of broker, handles the requests from clients(处理客户端请求).
  • Client Manager, manages the clients (Producer/Consumer) and maintains topic subscription of consumer(维护消费者的主题订阅).
  • Store Service, provides simple APIs to store or query message in physical disk(磁盘文件存储和查询消息).
  • HA Service, provides data sync feature between master broker and slave broker(主从节点的数据同步).
  • Index Service, builds index for messages by specified key and provides quick message query(构建消息索引).

Name server follows the share-nothing design paradigm. Brokers send heartbeat data to all name servers.
Producers and consumers can query meta data from any of name servers available while sending / consuming messages.

Brokers can be divided into two categories according to their roles: master and slave.
Master brokers provide RW access while slave brokers only accept read access.

To deploy a high-availability RocketMQ cluster with no single point of failure, a series of broker sets should be deployed.
A broker set contains one master with brokerId set to 0 and several slaves with non-zero brokerIDs.
All of the brokers in one set have the same brokerName. In serious scenarios,
we should have at least two brokers in one broker set. Each topic resides in two or more brokers.

Broker is a major component of the RocketMQ system.
It receives messages sent from producers, store them and prepare to handle pull requests from consumers.
It also stores message related meta data, including consumer groups, consuming progress offsets and topic / queue info.

mqs

物理部署结构(服务端)

brokers

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker 部署相对复杂,Broker 分为 Master 与 Slave,
一个 Master 可以对应多个 Slave, 但是一个 Slave 只能对应一个 Master,
Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,
BrokerId为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。
每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,
定期从 Name Server 取 Topic 路由信息,
向提供 Topic 服务的 Master 建立长连接,
且定时向 Master 发送心跳。
Producer 完全无状态,可集群部署。

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,
定期从 Name Server 取 Topic 路由信息,
向提供 Topic 服务的 Master、Slave 建立长连接,
且定时向 Master、Slave 发送心跳。
Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

逻辑部署结构(客户端)

logical

Producer Group 用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,
可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个 Producer 对象。
一个 Producer Group 可以发送多个 Topic 消息,Producer Group 作用如下:

  1. 标识一类 Producer
  2. 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例
  3. 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态

Consumer Group 用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,
可以是多台机器,也可以是多个进程,或者是一个进程的多个 Consumer 对象。
一个 Consumer Group 下的多个 Consumer 以均摊/集群(CLUSTER)方式消费消息,
如果设置为广播方式(BROADCAST),那么这个 Consumer Group 下的每个实例都消费全量数据。

存储结构

store

store

  1. 所有数据单独存储到一个 Commit Log,完全顺序写,随机读。
    RocketMQ 的所有消息都是持久化,先写入系统 PAGECACHE,然后刷盘,可以保证内存不磁盘都有一份数据, 访问时,直接从内存读叏。
  2. 对最终用户展现的队列(ConsumeQueue)实际只存储消息在CommitLog的位置信息,并且串行方式刷盘。
  3. 消费者的读取流程是:先读ConsumeQueue,再读CommitLog
  4. 由于ConsumeQueue存储数据量极少,并且是顺序读,在PAGECACHE预读作用下,
    ConsumeQueue的读性能几乎与内存一致,即使堆积情况下。所以可认为 Consume Queue 完全不会阻碍读性能。
  5. 要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。
    Commit Log 中存储了所有的元信息,包含消息体,类似于 Mysql、Oracle 的 redolog,
    所以只要有 Commit Log 在,Consume Queue 即使数据丢失,仍然可以恢复出来。

总结一句话:生产消息时先写入PageCache,然后刷写到磁盘。

pagecache

同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PAGECACHE 直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:

  1. 写入 PAGECACHE 后,线程等待,通知刷盘线程刷盘。
  2. 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
  3. 前端等待线程向用户返回成功。

读取消息的ConsumeQueue文件也会加载到PageCache,读PageCache和内存速度差不多。

pc

  1. Producer 发送消息,消息从 socket 进入 java 堆。
  2. Producer 发送消息,消息从 java 堆转入 PAGACACHE,物理内存。
  3. Producer 发送消息,由异步线程刷盘,消息从 PAGECACHE 刷入磁盘。
  4. Consumer 拉消息(正常消费),消息直接从PAGECACHE(数据在物理内存)转入socket,到达consumer,不经过 java 堆。
    这种消费场景最多,线上 96G 物理内存,按照 1K 消息算,可以在物理内存缓存 1 亿条消息。
  5. Consumer 拉消息(异常消费),消息直接从 PAGECACHE(数据在虚拟内存)转入 socket。
  6. Consumer 拉消息(异常消费),由于 socket 访问了虚拟内存,产生缺页中断,此时会产生磁盘 IO,
    从磁盘 Load 消息到 PAGECACHE,然后直接从 socket 发出去。
  7. 同5
  8. 同6

负载均衡(7.8/7.9)

loadbalance

消息查询(TODO 7.3)

按照MessageId查询

按照MessageKey查询

消息过滤(TODO 7.4)

有两种类型的消息过滤:

  • Broker 端消息过滤:在 Broker 中,按照 Consumer 的要求做过滤,优点是减少了对于 Consumer 无用消息的网络传输。缺点是增加了 Broker 的负担,实现相对复杂。
  • Consumer 端消息过滤:这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到 Consumer 端。

长轮询Pull(TODO 7.5)

RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消息,
RocketMQ 使用长轮询方式,可以保证消息实时性同 Push 方式一致。简单说就是长轮询Pull = Push

顺序消息(TODO 7.6)

消息有序指的是一类消息消费时,能按照发送的顺序来消费。
例如:一个订单产生了 3 条消息,分别是订单创建,订单付款,订单完成。
消费时,要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。

order

缺点:

  • 发送顺序消息无法利用集群 FailOver 特性
    􏰀- 消费顺序消息的并行度依赖于队列数量(MessageQueue的数量)
    􏰀- 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题 􏰀
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停(等一段时间再消费)

消费线程(单队列并行消费, 7.10)

singlequeue

单队列并行消费采用滑动窗口方式并行消费,如图所示,3~7的消息在一个滑动窗口区间,可以有多个线程并行消费,但是每次提交的 Offset 都是最小 Offset,例如 3。

修改消费并行度的两种方法:

  1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(超过订阅队列数的 Consumer 实例无效)。
    可以通过加机器,或者在已有机器启动多个进程的方式。
  2. 提高单个 Consumer 的消费并行线程,通过修改两个参数:consumeThreadMin/consumeThreadMax。

批量方式消费:

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吏量,例如订单扣款类应用,
一次处理一个订单耗时 1 秒钟,一次处理 10 个订单可能也只耗时 2 秒钟,这样即可大幅度提高消费的吞吏量。
通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,
默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的 消息数小于等于 N。

消息堆积、消息重试

  • 消息堆积(4.12)和消息重试(4.15)
  • 解决办法(7.15)
  • 跳过非重要消息(14.3)

事务(TODO)

分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要 KV 存储的支持,
因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据 Key 去查找 Message 的动作。
RocketMQ 在第二阶段绕过了根据 Key 去查找 Message 的问题,
采用第一阶段发送 Prepared 消息时,拿到了消息的 Offset,
第二阶段通过 Offset 去访问消息, 并修改状态,Offset 就是数据的地址。

RocketMQ 这种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,
存在一个显著缺陷,即通过 Offset 更改数据,会令系统的脏页过多,需要特别关注。

trans

Producer Group

Producers of the same role are grouped together.
A different producer instance of the same producer group
may be contacted by a broker to commit or roll back a transaction
in case the original producer crashed after the transaction.

Warning: Considering the provided producer is sufficiently powerful at sending messages,
only one instance is allowed per producer group to avoid unnecessary initialization of producer instances.

扩容

扩容是整个系统中的很重要的一个环节。在保证顺序的情况下进行扩容的难度会更大。
基本的策略是让向一个队列写入数据的消息发送者能够知道应该把消息写入迁移到新的队列中,
并且需要让消息的订阅者知道,当前的队列消费完数据后需要迁移到新队列去消费消息。关键点如下:

  • 原队列在开始扩容后需要有一个标志,即便有新消息过来,也不再接收。
  • 通知消息发送端新的队列的位置。
  • 对于消息接受端,对原来队列的定位会收到新旧两个位置,当旧队列的数据接受完毕后,则会只关心新队列的位置,完成切换。

那么对于Metaq顺序消息,如何做到不停写扩容呢?我说说自己的看法:
在队列扩容的时候考虑到需要处理最新的消息服务,为了不丢失这部分消息,
可以采取让Producer暂存消息在本地磁盘设备中,
等扩容完成后再与Broker交互。这是我目前能想到的不停写扩容方式。

参考文档


文章目录
  1. 1. QuickStart
  2. 2. API示例
    1. 2.1. 生产者
    2. 2.2. 消费者
  3. 3. 基本流程
    1. 3.1. Remoting RPC示例
    2. 3.2. Netty RPC
    3. 3.3. 生产者发送消息
    4. 3.4. Pull Consumer
  4. 4. 设计
    1. 4.1. 架构
    2. 4.2. 物理部署结构(服务端)
    3. 4.3. 逻辑部署结构(客户端)
    4. 4.4. 存储结构
    5. 4.5. 负载均衡(7.8/7.9)
    6. 4.6. 消息查询(TODO 7.3)
      1. 4.6.1. 按照MessageId查询
      2. 4.6.2. 按照MessageKey查询
    7. 4.7. 消息过滤(TODO 7.4)
    8. 4.8. 长轮询Pull(TODO 7.5)
    9. 4.9. 顺序消息(TODO 7.6)
    10. 4.10. 消费线程(单队列并行消费, 7.10)
    11. 4.11. 消息堆积、消息重试
    12. 4.12. 事务(TODO)
      1. 4.12.1. Producer Group
    13. 4.13. 扩容
  5. 5. 参考文档