异步消息队列zeromq实现服务器间高性能通信
ZeroMQ 是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于会话层之上,应用层之下,有了它,你甚至不需要自己写一行的socket函数调用就能完成复杂的网络通信工作。
一、三种基本模式
zeromq 有多种模式,常用的有三种:请求应答模式、订阅发布模式、push pull模式 。
1. 请求应答模式(req 和 rep)
消息双向的,有来有往,req端请求的消息,rep端必须答复给req端
2. 订阅发布模式 (sub 和 pub)
消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。
3. push pull模式
消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.
后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。
4.阻塞 和 非阻塞
以上三种基本模式都支持阻塞模式和非阻塞模式。req 和 rep的阻塞模式是这样的(其实跟原生的socket实现也非常像)。大家用过socket的,客户端要是先启动的话,会连接失败,或者是短时间内有超时问题。
二、示例代码
如果使用ActiveMQ/RabbitMQ之类的有代理MQ系统,只要保证MQ代理最先启动, 就可以保证系统的正常运行。而对于无代理的ZeroMQ来说,似乎比较难办。 在刚刚开始使用ZeroMQ时,我也一直担心这个问题,总是小心翼翼地首先启动调 用bind指令的程序,然后启动执行connect指令的程序。这样其实只是利用了 ZeroMQ的高速数据传输能力,以及ZeroMQ对IPC和socket的良好封装特性,还是 没有解决进程启动顺序的问题。后来,偶然实验了一下,发现bind程序和 connect程序无论谁先启动,其实都不影响整个系统的正常运行。
服务端
1import zmq
2context = zmq.Context()
3# Socket to talk to server
4print "Connecting to hello world server…"
5socket = context.socket(zmq.REQ)
6socket.connect ("tcp://localhost:5555")
7# Do 10 requests, waiting each time for a response
8for request in range (10):
9 print "Sending request ", request,"…"
10 socket.send ("Hello")
11 # Get the reply.
12 message = socket.recv()
13 print "Received reply ", request, "[", message, "]"
客户端
1import zmq
2import time
3context = zmq.Context()
4socket = context.socket(zmq.REP)
5socket.bind("tcp://*:5555")
6while True:
7 # Wait for next request from client
8 message = socket.recv()
9 print "Received request: ", message
10 # Do some 'work'
11 time.sleep (1) # Do some 'work'
12 # Send reply back to client
13 socket.send("World")
可以看出发布者绑定绑定一个端口,订阅者通过连接发布者接受订阅的消息。这里的Publish-Subscribe模型是一个很典型的PUB-SUB模型,即发布者(Publisher)只能发送数据,它发送时指明发送数据的类型,而订阅者(Subscriber)则只接收它关心的类型的消息。
1. pub/sub模式下,sub事实上可以连接多个pub,每次只连接一个connect,所以接收到的消息可以是叫错的,以至于不会单个pub掩盖了其他pub
2. 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
3. 如果你采用tcp的连接方式,sub很慢,消息将会堆积在pub,后期会对该问题有个较好的解决
4. 目前的而版本,过滤发生在sub端,而不是pub端,意思就是说一个pub会发送所有的消息到所有的sub, 由sub决定是要drop这个msg.
zeromq是lib库,部署完成后可自行编写server和client,编译时指定-lzmq即可
为题提高性能 可以用gevent框架:
1import gevent
2from gevent_zeromq import zmq
3# Global Context
4context = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socket
5def server():
6 server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等
7 server_socket.bind("tcp://127.0.0.1:5000") #绑定socket
8 for request in range(1,10):
9 server_socket.send("Hello")
10 print('Switched to Server for ', request)
11 server_socket.recv() #这里发生上下文切换
12def client():
13 client_socket = context.socket(zmq.REP) (客户端是回复)
14 client_socket.connect("tcp://127.0.0.1:5000") #连接server的socket端口
15 for request in range(1,10):
16 client_socket.recv()
17 print('Switched to Client for ', request)
18 client_socket.send("World")
19 publisher = gevent.spawn(server)
20 client = gevent.spawn(client)
21 gevent.joinall([publisher, client])
捐赠本站(Donate)
如您感觉文章有用,可扫码捐赠本站!(If the article useful, you can scan the QR code to donate))
- Author: shisekong
- Link: https://blog.361way.com/async-zeromq-communication/4288.html
- License: This work is under a 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. Kindly fulfill the requirements of the aforementioned License when adapting or creating a derivative of this work.