2. 中国科学院大学, 北京 100049
2. University of Chinese Academy of Sciences, Beijing 100049, China
RabbitMQ是开源的基于Erlang的高效部署分布式消息队列, 实现了AMQP协议, 具有良好的可靠性、稳定性, 可运行在多种操作系统, 便于集群运行[1–6]. RabbitMQ支持多种编程语言的客户端, 可以通过安装插件扩展功能. RabbitMQ可以解耦应用程序, 将不同语言开发的程序粘合在一起, 在完全不同的应用之间共享数据.
Erlang采用轻量级并发模型, 用于高并发、分布式“软实时系统”编程, 支持运行系统中的软件升级[7]. Erlang程序简短紧凑, 采用函数式编程, 自动存储管理. Erlang进程不共享内存, 进程间通信通过消息传递进行.
RabbitMQ投递消息的速度受软硬件配置影响. 硬件方面有: 处理器、内存、磁盘、网络配置等; 软件方面有: 消息持久化机制、消息确认机制、交换器类型等. 要以高速度向消费者投递消息, 应尽可能保持队列为空.
对RabbitMQ进行优化有较多方法. 如对生产者确认机制的优化: 直接建立channel与消息存储之间的联系, 减少插入、删除、消息传递等操作, 可以大幅降低处理时间[8]. 还可以优化topic路由匹配算法; 批量发送消息; 优化消息持久化机制; 开启Erlang HiPE编译选项; 使用位运算等.
生产者发送一条消息到broker, 消息可能被n个消费者接收. 同时启用生产者确认与消费者确认, 生产者仍无法获知n个消费者是否全部接收到消息. 本文对小消息情况下RabbitMQ的确认机制进行优化, 在broker收到n个消费者的确认消息后, 向生产者发送确认消息. 生产者收到确认消息则表明消费者已成功接收到消息. 若消息丢失, 由生产者负责重发消息. 对不同生产者、消费者、队列数量的情况进行测试, 分析比较优化前与优化后的持久化小消息发送速率.
1 RabbitMQ架构与相关模块简介 1.1 RabbitMQ架构如图1所示, 生产者发送消息到交换器, 队列通过路由键绑定到交换器, 根据交换器类型与路由键将消息路由到队列, 消费者从队列接收消息. 常用的交换器类型有direct、fanout和topic. 对于direct交换器, 如果路由键匹配, 消息就被投入相应的队列; fanout交换器将收到的消息广播到绑定的队列; topic交换器对路由键进行模式匹配, 消息被路由到匹配的队列. 消费者通过basic.consume命令自动从队列获取下一条消息; 通过basic.get命令获取单条消息. 队列具有多个消费者时, 采用round-robin方式向消费者发送消息. broker是消息队列服务器实体, 一个broker中可以有多个虚拟主机.
1.2 相关模块功能1) channel接收reader解析的来自客户端的协议帧; 使用writer向客户端发送帧; 路由消息给队列进程; 处理AMQP方法; 发出AMQP命令. 一条TCP连接中可以有多个channel.
2) 支持队列(backing queue, BQ), 一般情况下默认为rabbit_variable_queue. 队列进程使用BQ实现队列功能. 队列中消息具有4种状态: alpha、beta、gamma、delta. 持久化消息只可能处于alpha、gamma、delta三种状态之一. BQ具有5个内部队列: q1、q2、q3、q4、delta. q1和q4中只有alpha状态的消息; q2和q3包含beta和gamma状态的消息; delta队列不在内存中, 只有delta状态的消息.
3) 队列索引(queue index)用于在磁盘上记录队列中消息的顺序. 每个队列有一个队列索引. 消息依次被发布、投递、确认. 发布记录包括消息ID、消息在队列中的序列号等内容. 发布记录也可能包括完整的消息. 投递和确认记录只包括消息在队列中的序列号. 队列索引使用日志文件(journal)避免过多磁盘寻址. 日志文件具有固定的长度, 默认为32 768, 由queue_index_max_journal_entries参数配置.
4) 消息存储(message store)用于将消息写入磁盘或将消息从磁盘加载到内存. 存储的消息是引用计数的, ID相同的消息多次写入时只会存储一次.
1.3 小消息嵌入队列索引
RabbitMQ 3.5.0版本引入小消息嵌入队列索引. 小于queue_index_embed_msgs_below参数值的消息属于小消息, 该参数默认值为4096 bytes. 小消息的持久化操作直接在队列进程中进行, 不使用消息存储, 只需要写一次磁盘, 可以减少I/O和内存消耗, 提高10%左右的性能[9].
如果小消息被一个交换器路由到多个队列, 这条消息需要被写入多个队列索引; 若使用消息存储, 则只需要写一次. 从磁盘读取消息时, 每个队列索引需要在内存中保持至少1个段文件. 段文件包含16 384条消息记录. 因此queue_index_embed_msgs_below参数的少量增加会导致大量的内存使用[10].
2 RabbitMQ消息确认过程分析如图2所示, 生产者确认是异步的, 生产者发送消息到broker, 可以在等待确认的同时发送下一条. 为了在broker重启或崩溃时不丢失消息, 消息投递给消费者前需要进行持久化, 消息写入磁盘后向生产者发送确认消息. 消费者收到消息后必须进行确认, 可以发送basic.ack命令进行显示确认, 也可以使用自动确认. 若使用自动确认, 消费者接收到消息, 即视其确认了消息. broker收到消费者发送的确认消息, 将确认记录追加到队列索引的日志文件.
开启生产者确认与消费者确认, 持久化小消息在生产者、消费者、RabbitMQ相关模块间的传递过程如图3所示. 1–6: 生产者发送消息到消费者; 7–10: 消费者确认相关过程; 11–14: 生产者确认相关过程.
2.1 生产者确认过程分析
生产者确认过程会依次在channel、队列进程、队列索引、BQ处记录生产者确认相关信息. 消息写入磁盘后, 已确认的消息ID从队列索引依次传递给BQ、队列进程、channel, 各处均会将已确认记录删除.
channel收到生产者发送的消息, 为消息分配一个唯一的序列号, 组装#delivery, 获取需要投递的队列记录列表Qs, 将#delivery投递到Qs中的队列. channel使用dtree记录消息被投递到哪些队列, 格式为: {消息在channel中的序列号, 队列进程pid列表, 交换器名称}. 若channel将消息投递到m个队列, channel收到相应的m个队列发送的确认消息才会向生产者发送确认消息.
队列进程收到消息, 判断队列的消费者是否满足消息投递条件. 若有消费者满足投递条件且消息队列为空, 则消息不会进入队列, 而是直接投递给消费者端channel. 需要组装消息状态(message status). 将包含小消息的发布记录与只包括消息在队列中序列号的投递记录追加到队列索引的日志文件.
若没有消费者满足投递条件或消息队列非空, 则将消息进队. 需要组装消息状态. 将发布记录追加到队列索引的日志文件. 将消息状态加入消息队列. 从消息队列中取消息时, 若消息队列非空且有消费者满足消息投递条件, 则将消息从消息队列中移除. 将投递记录追加到队列索引的日志文件. 将取出的消息投递给消费者端channel.
队列进程收到生产者端channel投递的消息, 使用gb_trees记录未确认的消息ID、发送消息的channel和该消息在channel中的序列号, 格式为: {消息ID, {channel pid, 消息在channel中的序列号}}.
在发布记录写入日志文件前, 队列索引使用gb_sets记录未确认的消息ID. 队列索引的日志文件可能在两种情况下写入磁盘.
1) 队列进程设置同步定时器, 每200毫秒向自身发送sync_timeout消息. 队列进程收到消息后, 对队列索引的日志文件执行sync操作.
2) 当日志文件中记录数目达到一定数量时, 将内存中预分割的日志文件写入段文件.
消息持久化操作完成后, BQ使用gb_sets记录未确认的消息ID.
2.2 消费者确认过程分析消费者确认过程会依次在BQ、队列进程、channel处记录消费者确认相关信息. 收到消费者发送的确认消息后, 会按照相反的顺序从未确认记录中删除已确认记录, 最终将包括消息在队列中序列号的确认记录追加到队列索引的日志文件.
消息到达队列进程直接投递给消费者端channel时或从消息队列中取消息时, 会在BQ相应的gb_trees中添加未确认记录, 格式为: {消息在队列中的序列号, 消息状态}.
队列进程将消息投递给channel, 在Erlang queue中添加未确认记录, 格式为: {消息在队列中的序列号, 消费者标签}.
channel使用writer将消息投递给消费者, 在Erlang queue中添加未确认记录, 格式为: {投递标签, 消费者标签, {队列进程pid, 消息在队列中的序列号}}. 收到消费者发送的确认消息后, channel根据确认消息中的投递标签与multiple字段从未确认记录中获取已确认记录, 将已确认的消息序列号发送给相应的队列进程.
2.3 对性能的影响生产者确认与消费者确认过程涉及较多dtree、gb_trees、gb_sets和Erlang queue操作, 包括插入、删除、查找、集合运算等. 队列索引的日志文件会定时地或在记录达到一定数量时写入磁盘, 对性能影响较大.
使用RabbitMQ 2.8.1进行简单测试, CPU为双Xeon E5530, RAM为40GB, Erlang R15B, 开启HiPE, 1个生产者, 1个消费者[11]. 不使用生产者确认与消费者确认, 不进行消息持久化, 消息发送速率为: 44824 msg/s; 开启消费者确认后: 32005 msg/s; 接着开启生产者确认: 26103 msg/s; 在此基础上对消息进行持久化: 4725 msg/s[12]. 可见消息确认机制对消息发送速率有一定影响, 消息持久化机制对消息发送速率有较大影响.
3 优化方法 3.1 小消息确认机制的优化如图4所示, 优化后持久化与非持久化小消息的确认过程是相同的. 需要将生产者确认过程与消费者确认过程衔接起来. 生产者发送消息到broker, 消息投递给消费者前, 不进行消息持久化操作. 不会向日志文件追加记录, 不写段文件; 不会设置同步定时器, 不执行代价较大的sync操作. 队列索引与BQ不记录未确认的消息ID. 消费者收到消息后, 向broker发送确认消息. broker收到消费者确认消息, 向生产者发送确认消息. 若消费者没有收到消息, 生产者不会收到确认消息, 此时由生产者重发该消息. 该方法保证了在生产者收到确认消息时消费者已成功接收到消息.
1) 小消息到达队列进程, 队列进程需要记录生产者确认相关信息. 需要修改队列进程模块的send_or_record_confirms/2函数. SenderPid是发送消息给队列进程的channel pid, MsgSeqNo是消息在channel中的序列号, MTC用于记录未确认消息ID对应的{SenderPid, MsgSeqNo}. 添加未确认记录后, 更新队列进程状态.
send_or_record_confirms(#delivery{confirm = true,
sender = SenderPid, msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}},
State = #q{msg_id_to_channel = MTC}) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid,
MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel =
MTC1}};
2) 队列进程在消费者确认过程结束后, 向生产者端channel发送确认消息. 需要修改队列进程模块的ack/3函数. MsgIds是已确认的消息ID列表. 需要获取MsgIds对应的channel pid和消息在channel中的序列号, 将包含消息在channel中序列号的确认消息发送给相应的channel. 更新队列进程状态.
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
MTC1 = confirm_messages(MsgIds, MTC),
State1#q{backing_queue_state = BQS1,
msg_id_to_channel = MTC1}
end
3.2 继续优化消费者确认过程可以在上述优化的基础上减少消费者确认过程中的插入、删除等操作, 提高性能, 减少内存使用. BQ和队列进程不记录消费者确认相关消息, 消费者端channel记录: {消息投递标签, 消费者标签, {队列进程pid, 消息ID}}.
1) 队列进程向channel投递消息, 格式为:{deliver, ConsumerTag, AckRequired, Msg}. ConsumerTag是消费者标签, AckRequired取值为true或false. Msg类型为rabbit_amqqueue:qmsg(), 格式为: {队列名称, 队列进程pid, 消息在队列中的序列号, Redelivered, Message}. Redelivered取值为true或false, Message类型为#basic_message. 使用模式匹配从Message中提取消息ID. 需要修改channel模块的record_sent/4函数.
#basic_message{id = MsgId} = Message
2) 消费者端channel向队列进程发送的消息中包括已确认的消息ID列表MsgIds. 队列进程收到确认消息, 向生产者端channel发送相应的确认消息. 更新队列进程状态. 需要修改队列进程模块的handle_cast/2函数.
handle_cast({ack, MsgIds, _ChPid},
State = #q{msg_id_to_channel = MTC}) ->
MTC1 = confirm_messages(MsgIds, MTC),
noreply(State#q{msg_id_to_channel = MTC1});
4 性能测试 4.1 测试环境与方法生产者、消费者、RabbitMQ在同一台机器上. 开启生产者确认与消费者确认. 持久化小消息, 消息大小为1500 bytes. 性能测试工具为PerfTest. 测试环境配置如表1所示.
在1个虚拟主机中启动不同数量的持久化队列, 每个队列有2个生产者、3个消费者, 每个生产者连接中有2个channel, 每个消费者连接中有3个channel. 队列数量小于等于15时, 绑定到同一个持久化direct交换器; 队列数量大于15时, 绑定到两个持久化direct交换器. 开启management插件与top插件. 分别记录优化前与优化后的消息发送速率, 每种情况测试10分钟, 测试3次取平均值.
消息发送速率提高百分比的计算方法为: (优化后消息发送速率–优化前消息发送速率)/优化前消息发送速率*100%.
消息发送速率平均提高百分比为: 不同队列数量时, 消息发送速率提高百分比的算术平均值.
4.2 测试结果与分析如图5所示, 在1个虚拟主机中, 随着队列数量增加, 消息发送速率先增加然后缓慢下降. 优化后消息发送速率提高的比例是逐渐下降的. 1个队列时, 优化后的消息发送速率是优化前的3.08倍; 2个队列时, 优化后的消息发送速率是优化前的2.01倍; 15个队列时, 消息发送速率提高40.9%; 30个队列时, 消息发送速率提高40.3%. 队列数量大于3时, 消息发送速率平均提高42.9%.
如图6所示, 对消费者确认过程进一步优化后, 在同样的测试环境中, 随着队列数量增加, 优化后的消息发送速率逐渐下降. 1个队列时, 优化后的消息发送速率取得最大值, 是优化前的3.48倍; 2个队列时, 优化后的消息发送速率是优化前的2.16倍; 15个队列时, 消息发送速率提高52.6%; 30个队列时, 消息发送速率提高53.4%. 队列数量大于3时, 消息发送速率平均提高56.5%.
优化后不执行消息持久化操作, 减少部分内存操作, 使消息发送速率得到提高. 继续优化后, 不会在BQ和队列进程处记录消费者确认相关信息, 减少了插入、查找、删除等操作, 进一步提高了消息发送速率, 但是在队列数量较多时可靠性略有下降. 上述两种优化方法需要确保每个队列至少有一个消费者, 适用于消费速度很快的情形. 在生产者、消费者、队列数量较少时可以获得更大的性能提升. 生产者重发消息的策略, 可以根据实际应用场景确定. 与改进前类似, 在异常情况下, 消费者可能收到重复的消息. 第一种优化方法保留了完整的消费者确认过程, 能够较好地处理basic.reject与basic.nack等命令, 消费者可以拒绝接收某些消息. 第二种优化方法简化了消费者确认过程, 无法处理消费者拒绝消息的情况, 适用于消费者只对消息进行确认的情况. 可以根据应用程序对性能、可靠性的不同需求使用相应的优化方法.
5 结语本文详细分析了RabbitMQ中持久化小消息的确认过程, 将生产者确认过程与消费者确认过程结合起来进行优化, 使生产者可以获知消费者成功接收到消息, 提高了持久化小消息的发送速率. 要使消息发送速率得到根本提高, 可以重新设计RabbitMQ的架构: 使用多个轻量级进程实现逻辑队列与逻辑channel, 或集群部署使用.
[1] |
Rostanski M, Grochla K, Seman A. Evaluation of highly available and fault-tolerant middleware clustered architectures using RabbitMQ. Federated Conference on Computer Science and Information Systems. Warsaw, Poland. 2014. 879–884.
|
[2] |
Videla A, Williams JJW. RabbitMQ实战: 高效部署分布式消息队列. 汪佳南, 译. 北京: 电子工业出版社, 2015.
|
[3] |
Ionescu VM. The analysis of the performance of RabbitMQ and ActiveMQ. 14th RoEduNet International Conference - Networking in Education and Research (RoEduNet NER). Craiova, Romania. 2015. 132–137.
|
[4] |
Vandikas K, Tsiatsis V. Performance evaluation of an IoT platform. 8th International Conference on Next Generation Mobile Apps, Services and Technologies. Oxford, UK. 2014. 141–146.
|
[5] |
Dawar S, Fallon E, Bennet T, et al. An extensible architecture for mobile network management event distribution and rule processing - a performance evaluation. 1st International Conference on Artificial Intelligence, Modelling and Simulation. Kota Kinabalu, Malaysia. 2013. 451–456.
|
[6] |
Yang WJ, Liu XG, Zhang L, et al. Big data real-time processing based on storm. 12th IEEE International Conference on Trust, Security and Privacy in Computing and Communications. Melbourne, VIC, Australia. 2013. 1784–1787.
|
[7] |
Cesarini F, Thompson S. Erlang编程指南. 慕尼黑Isar工作组, 杨剑, 译. 北京: 机械工业出版社, 2011.
|
[8] |
袁佳. 基于主机日志的入侵检测系统的设计与实现[硕士学位论文]. 北京: 北京邮电大学, 2014.
|
[9] |
李帅. RabbitMQ进程结构分析与性能调优. https://www. qcloud.com/community/article/164816001481011847. [2016-10-10].
|
[10] |
Persistence Configuration. http://www.rabbitmq.com/persistence-conf.html. [2017-06-01]
|
[11] |
RabbitMQ performance measurements, part 1. http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/. [2012-04-17]
|
[12] |
RabbitMQ performance measurements, part 2. http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/. [2012-04-25]
|