The background service portion of our client (service, layer0, layer1) originally used a combination of Thrift transports for RPC and ZeroMQ (via NetMQ) for pub/sub. Consequently, we had a ton of ports/connections.

Multiplexing the thrift clients with TMultiplexedProtocol helped. Our technical director had the idea of using ZeroMQ itself as the transport for thrift. In effect, multiplexing all our communicating agents over ZeroMQ.

We also leverage the Majordomo protocol (from the NetMQ samples) to provide additional service-oriented functionality atop ZeroMQ.

Transport

public class ZeroMqTransport : TStreamClientTransport
{
    public string Channel { get; set; }
    private MDPClient Client { get; set; }

    public ZeroMqTransport(MDPClient client)
    {
        Client = client;

        InputStream = null; // only set input stream after received data
        OutputStream = Common.MemoryPool.GetStream(); // set output stream so we can write message into it
    }

    public override async Task FlushAsync(CancellationToken token)
    {
        await base.FlushAsync(token);

        try
        {
            var ostream = OutputStream as MemoryStream;
            if (ostream.Length <= 0)
                return;

            InputStream = null;
            NetMQMessage req = new NetMQMessage();
            req.Append(ostream.ToArray());

            // TODO: rep could be null if time out happens
            var rep = Client.Send(Channel, req);

            InputStream = rep == null ? null : Common.MemoryPool.GetStream(rep.Last.Buffer);
            OutputStream = Common.MemoryPool.GetStream();
        }
        catch (Exception ex)
        {
            Logging.Logger.Log($"zmq transport flush: {ex.Message}", Logging.LogLevel.Warn);
            Logging.Logger.Log($"zmq transport flush: {ex.Message} \n {ex.StackTrace}", Logging.LogLevel.Debug);
        }
    }
    
}

We’re overriding FlushAsync(). If you take a look at code generated by thrift:

await oprot.WriteMessageBeginAsync(..., cancellationToken);
// Write the message...
await oprot.WriteMessageEndAsync(cancellationToken);
await oprot.Transport.FlushAsync(cancellationToken);

The transport’s FlushAsync() method is called at the end of every message. We use that to handle a complete message:

  1. Construct NetMQMessage containing serialized thrift message
  2. MDPClient.Send() over ZeroMQ
  3. If there is a reply, set TStreamClientTransport.InputStream

Lastly, Common.MemoryPool.GetStream() is a wrapper around Microsoft.IO.RecyclableMemoryStreamManager.GetStream().

Protocol

The protocol isn’t particularly interesting, it inherits behaviour from TBinaryProtocol and wraps the MDPClient and ZeroMqTransport. Here it is for completeness:

public class ZeroMqProtocol : TBinaryProtocol
{
    private MDPClient syncClient = null;
    private ZeroMqTransport zmqTransport = null;

    public ZeroMqProtocol(string brokerAddr, byte[] ClientID) : base(null)
    {
        syncClient = new MDPClient(brokerAddr, ClientID);
        zmqTransport = new ZeroMqTransport(syncClient);
        Trans = zmqTransport;
    }

    protected override void Dispose(bool disposing)
    {
        if (zmqTransport != null)
        {
            zmqTransport.Close();
            zmqTransport.Dispose();
            zmqTransport = null;
        }
        if (syncClient != null)
        {
            syncClient.Dispose();
            syncClient = null;
        }
        base.Dispose(disposing);
    }

    public string Channel
    {
        set { zmqTransport.Channel = value; }
    }
}

Footnotes

This may or may not be the correct way to do this. This is one of our earlier pieces of code when we were still cutting our teeth on both ZeroMQ and Thrift.

In any case, it greatly simplified our architecture because there’s only one source of connections: ZeroMQ.