2. 中国科学院 沈阳计算技术研究所, 沈阳 110168
Shenyang Institute of Computing Technology, Chinese Academy of Sciences, Shenyang 110168, China
随着智能终端普及率的提高, 大部分网民已经从传统的PC接入网络变为移动终端接入, 即时通信的发展前景一片光明. 然而基于私有通信协议开发的即时通信类应用, 严重影响了扩展网络功能和技术的进步的步伐, 因此急需一种基于标准协议并能满足大众业务需求的消息推送平台. MQTT协议是Android系统中消息推送的实现技术之一, 由于此协议的简单、便捷, 目前已得到了众多应用. 但传统MQTT服务器在分布式部署方面存在较大不足, 基于RocketMQ具有高性能、高可靠、高实时、分布式的特点[1], 本文提出了一种基于RocketMQ的MQTT消息推送服务器并实现了分布式部署.
本文主要分为4个部分, 第1部分对RocketMQ进行了概述. 第2部分设计并实现了基于RocketMQ的MQTT消息推送服务器. 第3部分介绍了服务器的分布式部署情况. 第4部分对本文设计的消息推送服务器进行了测试和分析. 第5部分对本文进行了总结以及对未来工作进行了概述.
1 RocketMQ介绍 1.1 RocketMQ介绍RocketMQ是阿里巴巴消息中间团队开发的一种基于队列模型的消息中间件, 具有高性能、高可靠、高实时、分布式特点[1], 其中Producer、Consumer以及队列都可以实现分布式. 发送消息时, Producer以此发送消息到各个队列, 这个队列集合称为Topic. 一个Consumer实例消费一个Topic对应的所有队列的情况称为广播消费, 多个Consumer实例平均消费一个topic对应的队列集合的情况称为集群消费[2]. 其通信组件使用了Netty-4.0.9.Final, 并在此之上进行了简单的协议封装. 其消息格式如表1所示.
其中length为4字节整数, 其数值等于后3部分长度的和. Header length为4字节整数, 其数值等于header data的长度; Header data部分使用json序列化数据; Body data使用阿里自定义的二进制序列化数据.
2 基于RocketMQ的消息推送服务器设计 2.1 系统结构设计本文所设计的消息推送服务器结构如图1所示. 整个服务器分为3层. 第一层为MQTT Broker, 主要负责将客户端与服务器进行连接, 并且基于pub/sub机制, 实现MQTT协议的相关功能; 第二层为协议转换层, 主要将MQTT消息包转换为RocketMQ支持的消息格式, 其负责实现MQTT Broker与RocketMQ的对接, 将消息推送到RocketMQ的消息队列中以及将消息推送到MQTT Broker端; 第三层为RocketMQ broker, 主要负责将第二层发送过来的消息发送到总线当中, 与分布式服务器进行通讯, 完成消息的分发与接收.
2.2 服务器功能本文服务器的功能主要是实现消息推送与pub/sub功能, 并且在该功能的基础上, 实现用户个体之间的即时通信、预订阅等相关功能. 即时通信可以传输文本、图片、语音、视频和文件等消息, 预订阅用于客户端第一次连接到代理服务器时, 帮助用户预先订阅话题, 从而达到节省流量, 优化用户体验的目的.
2.3 消息推送模块的实现
现如今, 比较主流的MQTT协议代理有IBM Websphere MQ Telemetry和Mosquitto[3]. 本文选取了Mosquitto作为MQTT的消息代理. Mosquitto是一款轻量级开源的消息代理, 它相对完整的实现了MQTT协议中的各项功能, 并能够完美运行在Linux系统中. 所以, 本文选取并在其开源项目的基础上, 对其进行了优化, 使其能够实现多终端同步.
通过对比MQTT协议的格式以及RocketMQ消息的格式, 不难发现两者都是基于Topic模块, 而且两者参数存在很多相同. 所以, 我们通过协议转换模块, 将mosquitto接收到的Topic、QoS及payload等相关参数, 转换成RocketMQ的Topic、QoS及Body等参数, 并经由producer发送至总线中, 由Consumer负责消费. 流程图如图2所示.
消息推送模块完成消息发布到被消费的具体流程:
1) 当客户端发布消息时, 由轻量级Mosquitto消息代理将消息接受并且返回给客户端消息送达状态.
2) 将MQTT消息包内的Topic、payload以及Qos等相关信息进行提取, Topic在原有基础上增加_rocketmq组成新的Topic传递给rocketmq; 将payload格式由mqtt_string转换为string, 并赋值给Body传递给rocketmq; 将MQTT的QoS参数直接传递给Rocketmq的QoS参数. 组成新的数据包之后, 经由RocketMQ broker将消息推送到消息总线中, 并由总线返回消息送达状态.
3) RocketMQ的Consumer根据Topic对总线中的消息进行消费, 并将RocketMQ的Topic直接传递给MQTT的Topic, 即topic_rocketmq. 并将Body和QoS重新按照MQTT格式进行封装, 并且推送给订阅了该主题的客户端.
3 分布式部署结构 3.1 服务器角色分配
对服务器的架构角色分配如表2所示.
3.2 服务器物理结构图3中, 由于Name Server是一个几乎无状态节点, 故部署为集群, 节点之间无任何消息同步[4]. Producer与Consumer完全无状态, 集群部署, Producer选择Name Server集群中的一个随机节点建立长连接[5]. Consumer基本与Producer相同, 只是其要想提供Topic服务的Master、Slave建立长连接并定时发送心跳. 其即可以从Master订阅消息, 也可以从Slave订阅消息.
Broker可分为Master与Slave. 通过指定相同的BrokerName, 不同的BrokerId来定义Master与Slave的对应关系. 想要一个代理成为Master, 设置BrokerId为0, 设置BrokerId的值非0则为Slave. Master可以部署多个. 每个Broker与NameServer集群中的所有节点建立长连接, 定时向所有的NameServer注册Topic信息[3].
3.3 服务器配置
Host文件配置:
# vim/etc/hosts
172.17.21.8 mqnameserver1
172.17.21.10 mqnameserver2
172.17.21.14 mqnameserver3
172.17.21.8 rocketmq-master2
172.17.21.9 rocketmq-slave2
172.17.21.10 rocketmq-master1
172.17.21.11 rocketmq-slave1
由于篇幅有限, 本文只举例说明Master1服务器配置, 其余配置基本相同.
# vim/usr/local/rocketmq/conf/2m-noslave/broker-1.properties
brokerName=broker-1
# brokerId为0为Master, 非零为Slave
brokerId=0
namesrvAddr=mqnameserver1:9876;mqnameserver2:9876;mqnameserver3:9876
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
4 消息服务器测试 4.1 单Broker测试本部分将对本文所设计的服务器在增加负载的情况下进行性能和鲁棒性的测试. 服务器配置如表3所示.
本文通过使用Erlang语言开发的, 开源测试工具emqtt_benchmark模拟客户端登录并发送消息对服务器进行负载压力测试. 测试网络结构图如图4所示. 其中, 黑色连接线代表网络连接, 箭头表示数据流向.
两台terminal模拟用户起大量线程, 不间断的向broker发送大小为2 K的消息, 测试结果如图5所示.
通过测试, 单机Broker的TPS可达到15 900左右, 继续增大消息发送量时, 可以看到TPS有下降的趋势. 通过测试可以观测服务器在大量消息情况下依然能够保持一个稳定的状态, 基本能够满足日常服务器需求.
4.2 双Master、双Slave测试接下来模拟大量用户进行连接, 并在连接完成后进行消息收发, 测试其CPU占有率及消息全部送达所需时间. 测试结果如表4所示.
由表中数据可以得出, 本文所设计的消息推送服务器, 响应速度基本满足当前移动互联网领域中的需求, 服务器负载也在可接受范围内, 连接并发数量尚可, 服务器性能稳定.
5 总结与展望本文在研究MQTT协议以及RocketMQ的基础上, 论述了基于RocketMQ的MQTT消息推送服务器的设计, 并对其进行了实现. 本文重点阐述了消息推送服务器的消息接收和发送、消息格式转换功能、消息推送服务器的性能测试, 同时与客户端结合完成了消息推送服务器的功能测试. 最后根据测试结果可以看出, 本文设计的服务器完成了消息收发、协议转换等功能要求, 同时具有良好的抗压能力和鲁棒性.
下一步的工作主要集中在提高服务器的并发处理能力上, 同时进一步提高服务器的性能及鲁棒性.
[1] |
架构说: 阿里中间件技术: 消息中间件篇. http://www.jiagoushuo.com/article/1000141.html. [2016-04-18].
|
[2] |
Apache RocketMQ: An open source distributed messaging and streaming data platform. http://rocketmq.apache.org. [2016-12-29].
|
[3] |
Mosquitto. An open source MQTT v3.1/v3.1.1 broker. http://mosquitto.org/. [2015].
|
[4] |
Kobejayandy. RocketMQ入门. http://blog.csdn.net/kobejayandy/article/details/52831213. [2016-10-16].
|
[5] |
欧志芳. 基于RocketMQ实现异构数据库同步. 网络安全技术与应用, 2016(12): 99-100. DOI:10.3969/j.issn.1009-6833.2016.12.066 |