ZeroMQ

Description

ZeroMQ (aka ZMQ or 0MQ) is a great library that provides “super sockets”.

These sockets are actually some intelligent wrappers around normal TCP or UDP sockets. For example the most noticeable feature provided by this library is that you can bind on some TCP port and start connecting the socket even if nothing is listening on the other end! Connection and disconnection is done transparently, you can interconnect some components, totally remove one of them, and then reconnect it transparently and everything works fine. That means that bind() does not have to be done before the connect().

What you transmit on the sockets are actually messages, and not just data. When you read on ZMQ a socket, you know that you get a whole message.

Also, many types of sockets are provided, to easily let you architecture your network in a natural way. For example you can create a socket of type PUB, and any number of socket SUB, and all data sent on the PUB socket will be received by all (none, one, a hundreds) the connected SUB sockets. Everything is done transparently behind the scene inside the ZeroMQ library. You have interprocess sockets, IPC sockets, etc…

Of course, these sockets are an additional layer above TCP. You cannot connect a ZMQ socket to a TCP socket and expect everything to work. ZMQ provides its own zmq_poll() function that acts like the standard poll, but for ZMQ sockets.

ZMQ inside a standard poll loop

I love events, timeouts, polling sockets or file descriptors. It naturally comes to me the need to poll zmq sockets along with my other more standard objects. Fortunately, there is a simple way to wait for events on a ZMQ socket, with poll(), epoll() or any other poller without the need for the ZMQ library inside that polling loop.

I’ll give examples in python because this is much less verbose and complicated to read than C, but everything is doable in any language providing ZMQ bindings.

import zmq
import select

# Create your ZMQ socket
ctx = zmq.Context()
zsocket = ctx.socket(zmq.PULL)

# Create a “normal” poller
poller = select.poll()

# Now you need to get the file descriptor internally
# used by zmq
fd = s.getsockopt(zmq.FD)

# and then use that directly inside your poller
poller.register(fd, select.POLLIN)

poller.poll()

At this point, you have your normal poll() call that will notify you whenever there’s something to read on the TCP socket used by the ZMQ socket. Remember that a ZMQ socket is NOT a TCP socket used in a special way, but some interface around a TCP socket. Also remember that what you recv() from a ZMQ socket is a whole message. This means that you may have data available to read on the TCP socket (poll() returns to inform you of that fact), but calling zsocket.recv() would block because you do not have a full message yet. The trick here is to check for data on the ZMQ socket each time you have data on the TCP socket to read. This is done like this:

def on_zread(zsocket):
    """
    Called whenever poll reports some data to read
    on the internal fd.
    """
    while zsocket.getsockopt(zmq.EVENTS) & zmq.POLLIN:
        message = zsocket.recv()
        ...
        # Do something with the data.

By getting the value of zmq.EVENTS for your ZMQ socket, you’ll know if you can read a message or not.

You may now ask “but what if some data is available on the TCP socket but the whole ZMQ message never entirely arrives, won’t poll() return immediately everytime, causing your program to take the entire CPU with an active wait?”. And the answer is no: the trick is that calling zsocket.getsockopt(zmq.EVENTS) will actually make the ZMQ socket consume the available data (by calling recv() on its internal TCP socket) and keep it internaly (in its own buffer or what not). This means that after the on_zread() call, poll() won’t report that there is data to read (until some new data arrives, eventually completing the ZMQ message), avoiding this active wait that you would have expected.

One other important point is to use while instead of if. Imagine that you receive two whole ZMQ messages. poll() will return and you will call on_zread(). By doing a simple if zsocket.getsockopt(zmq.EVENTS) & zmq.POLLIN, you will consume all the data available on the internal TCP socket, but zsocket.recv() will retrieve only one of the two available messages. The next call to poll() will block because all data on the fd he’s polling has been consumed. The message you miss is actually still inside the ZMQ socket, but you didn’t fetch it. You will not get that message until some new data is received on the TCP socket. When you use a standard poller with a ZMQ socket, it’s actually bad to consume all the data on the TCP socket but leaving an unread message inside the ZMQ socket. That’s why you need to consume all available messages in one on_zread call.

This design (polling anything, including ZMQ sockets, with a standard poll/epoll/select/kqueue) is one of the ideas behind slixmpp.