Description
ZeroMQ (aka ZMQ or 0MQ) is a great library that provides “super sockets”.
These sockets are actually some intelligent wrappers around normal TCP or
UDP sockets. For example the most noticeable feature provided by this
library is that you can bind on some TCP port and start connecting the
socket even if nothing is listening on the other end! Connection and
disconnection is done transparently, you can interconnect some components,
totally remove one of them, and then reconnect it transparently and
everything works fine. That means that bind()
does not have to be done
before the connect()
.
What you transmit on the sockets are actually messages, and not just data. When you read on ZMQ a socket, you know that you get a whole message.
Also, many types of sockets are provided, to easily let you architecture your network in a natural way. For example you can create a socket of type PUB, and any number of socket SUB, and all data sent on the PUB socket will be received by all (none, one, a hundreds) the connected SUB sockets. Everything is done transparently behind the scene inside the ZeroMQ library. You have interprocess sockets, IPC sockets, etc…
Of course, these sockets are an additional layer above TCP. You cannot
connect a ZMQ socket to a TCP socket and expect everything to work. ZMQ
provides its own zmq_poll()
function that acts like the standard poll, but
for ZMQ sockets.
ZMQ inside a standard poll loop
I love events, timeouts, polling sockets or file descriptors. It naturally
comes to me the need to poll zmq sockets along with my other more standard
objects. Fortunately, there is a simple way to wait for events on a ZMQ
socket, with poll()
, epoll()
or any other poller without the need for
the ZMQ library inside that polling loop.
I’ll give examples in python because this is much less verbose and complicated to read than C, but everything is doable in any language providing ZMQ bindings.
import zmq
import select
# Create your ZMQ socket
ctx = zmq.Context()
zsocket = ctx.socket(zmq.PULL)
# Create a “normal” poller
poller = select.poll()
# Now you need to get the file descriptor internally
# used by zmq
fd = s.getsockopt(zmq.FD)
# and then use that directly inside your poller
poller.register(fd, select.POLLIN)
poller.poll()
At this point, you have your normal poll()
call that will notify you
whenever there’s something to read on the TCP socket used by the ZMQ
socket. Remember that a ZMQ socket is NOT a TCP socket used in a special
way, but some interface around a TCP socket. Also remember that what you
recv()
from a ZMQ socket is a whole message. This means that you may have
data available to read on the TCP socket (poll()
returns to inform you of
that fact), but calling zsocket.recv()
would block because you do not have
a full message yet. The trick here is to check for data on the ZMQ socket
each time you have data on the TCP socket to read. This is done like this:
def on_zread(zsocket):
"""
Called whenever poll reports some data to read
on the internal fd.
"""
while zsocket.getsockopt(zmq.EVENTS) & zmq.POLLIN:
message = zsocket.recv()
...
# Do something with the data.
By getting the value of zmq.EVENTS for your ZMQ socket, you’ll know if you can read a message or not.
You may now ask “but what if some data is available on the TCP socket but
the whole ZMQ message never entirely arrives, won’t poll()
return
immediately everytime, causing your program to take the entire CPU with an
active wait?”. And the answer is no: the trick is that calling
zsocket.getsockopt(zmq.EVENTS)
will actually make the ZMQ socket consume
the available data (by calling recv()
on its internal TCP socket) and keep
it internaly (in its own buffer or what not). This means that after the
on_zread()
call, poll()
won’t report that there is data to read (until
some new data arrives, eventually completing the ZMQ message), avoiding this
active wait that you would have expected.
One other important point is to use while
instead of if
. Imagine that
you receive two whole ZMQ messages. poll()
will return and you will call
on_zread()
. By doing a simple if zsocket.getsockopt(zmq.EVENTS) &
zmq.POLLIN
, you will consume all the data available on the internal TCP
socket, but zsocket.recv()
will retrieve only one of the two available
messages. The next call to poll()
will block because all data on the fd
he’s polling has been consumed. The message you miss is actually still
inside the ZMQ socket, but you didn’t fetch it. You will not get that
message until some new data is received on the TCP socket. When you use a
standard poller with a ZMQ socket, it’s actually bad to consume all the data
on the TCP socket but leaving an unread message inside the ZMQ socket.
That’s why you need to consume all available messages in one on_zread
call.
This design (polling anything, including ZMQ sockets, with a standard poll/epoll/select/kqueue) is one of the ideas behind slixmpp.