S R T B H P N

Code Artistry - Message-Passing Communication with Queues

Introduction:

Message-Passing Communication (MPC) establishes a channel between processes to communciate by sending messages. A well known example is the communication between browsers and web servers sending request and reply messages using the HTTP protocol. The simplest of these MPC systems is the client server model where a client sends a request message to a specified server, the server processes the message, and sends back an appropriate reply message. This request / reply sequence constitutes a talk protocol. The client initiates all message exchanges and the server simply responds. It has the benefit of a very simple message exchange sequence, but has the liability that the client must wait for a response.

Channels:

Another style is based on one-way asynchronous message channels between a sender on one peer and a receiver on another, as shown in the diagram below. Each of these "peers" might be located on seperate machines or in seperate processes in the same machine.
Each peer contains a sender and receiver package. The sender communicates with one receiver at a time. The receivers each handle concurrent senders by accepting messages in a receive queue. Often those messages are processed sequentially by a single thread. This means that the receiver and consequential processing do not have to be thread safe, as long as the receiving queue is thread-safe.

End Points:

Each receiver has a listener assigned to a specific port, and we describe the ip-address and port number as an endpoint which we represent as an attribute pair ip : port. The endpoint is an address for a particular service provided by a peer. Note that the diagram emphasizes that the communication endpoints are identical, even though the processing that is provided at a particular endpoint may be quite different from that at another endpoint. The communication endpoints are peers, but the machines are not.

Talk Protocol:

The resulting talk protocol is very simple because messages only flow one way in each channel. A sender can send to an endpoint at any time, and a receiver can handle enqueued messages at any time. This makes a very flexible and fluid style of communication. The sender does not wait for a response. After sending a message it may send messages to other peers or do other processing. Eventually the receiver of the message may elect to send back a reply message to the sender, but is not required to do so. The exchange is very like threads of email that we all use.

Messages:

Messages contain a destination address so that the sender can connect to that endpoint. If the reciever will eventually reply, then, since there will be multiple senders, the message needs a return address. The message also needs to define the requested operation and provide any parameters needed to carry out the requested action.

WCF Communication:

If we use a communcation framework like Windows Communication Foundation (WCF), then each message will consist of a Simple Object Access Protocol (SOAP) wrapper around a serialized instance of a data class that defines the request, the to and from addresses, and any parameters needed to execute the request. The diagram at the right shows one possible design for peer-to-peer communication using WCF. Each Receiver instantiates a WCF service. The service objects simply enqueue incoming messages, for processing by the dequeueing thread. If a reply is appropriate the message processor builds a reply message, using the incoming message return address and passes it to its sender, usually with a PostMessage(msg) invocation.

Socket Communication:

If we craft our own communication infrastructure with sockets, we're likely to use HTTP style messages. The HTTP protocol uses messages that contain a header, consisting of text lines, where each line is an attribute pair. The message header is terminated with a blank line. If the message contains a content-length:483 attribute, that implies that the header is followed with a body containing 483 bytes of data, which need not be text.
Receivers instantiate a socket listener, running on its own thread. When a connection is established, the listener passes a work-item to a thread pool thread that contains the connected socket and a reference to the blocking queue. When a thread pool thread is available, it begins framing messages by pulling bytes out of the socket, reading lines from the message header and pulling out the message body. It builds a message instance from the bytes it receives and enqueues for processing by the Message Processing Thread.

Message Queues:

Each receiver has a thread-safe blocking queue that is shared by all senders to that endpoint. If we are using WCF, we make the queue a static member of the service class so that every service instance shares the same queue, e.g., each sender to that endpoint gets a service instance and the service simply enqueues messages for the endpoint's processing thread to dequeue and process. This is the design we use for the last of the CSE681 - Software Modeling and Analysis projects. If we are building a socket-based communication system, the listener socket, using threads from a threadpool, provides a dedicated client handler thread with the connected socket and a reference to the shared blocking queue. In CSE687 - Object Oriented Design, we will use this design for some projects.

File Transfer:

For sending a file between endpoints we could send blocks of bytes from the file in a sequence of messages, but it would be somewhat more efficient to send a beginning message that identifies the file name, length, and block size, and then a sequence of blocks of bytes, perhaps followed by a terminating message. Note that, while performing a file transfer, the service objects will simply write incoming blocks into a file, but not post them to the receive queue. When transfer is complete it posts a message to the queue so the message processor knows a file has arrived.

Channel Structure

We show, in Figure 4., details of the channel processing. Each peer has a Comm component that contains both a Sender and a Receiver, so the Peers are depicted as mirror images in the diagram. Socket Listeners are shown as boxes labeled SL. Receiving sockets are shown as boxes labeled S, and Sending sockets are shown as boxes labeled with SC. These labels correspond to classes in the socket library, e.g., Socket, SocketListener, and SocketConnector. The gray clouds are ClientHandler instances connected to Peers not shown in the diagram. The white clouds are ClientHandler instances used for a two-way conversation between the Peers shown in the diagram. Remember that a sender does not wait for a response. It simply sends a message describing an activity to be conducted by the receiver or information needed by the receiver, along with the destination address and its own return address for replies. Should the recever need to reply, it simply sends a message to the endpoint specified by the return address of the message it received. This asynchronous message-passing communication is very fluid. It establishes conversations, much like email exchanges, with its peers. Of course, this flexibility is bought at the expense of sockets and threads at each end, but is very easy to set up and use.

Message Structure

Messages are similar to those used for HTTP communication. We add "to" and "from" attributes and frequenly use custome attributes to provide information needed by the receiver. Our messages start with a command line, but that is often different from the HTTP command lines holding GET, POST, PUT, DELETE, etc. When sending binary data we usually use chunking. That may entail sending one chunk each in a sequence of messages, or could be one initiating message describing the length of the binary body, and then sending a sequence of binary blocks until the specified content-length has been sent. Any given application would use one or the other. There is only one type of message, used in our channels, as this message type accomodates an open-ended number of attributes, allowing us to send whatever information and commands are required by the other end. What we have done is to push all of the application specific details out of the communication channel, and into messsages that are composed by application senders and interpreted by application receivers. This encourages a very flexible "duck-typing" style of design concentrated in message handling, but keeping strict typing for all of the non-message communication parts. Essentially, we get the best of both worlds - very mutable message contents, supporting whatever the application needs, but strict control of the channel typing which eliminates errors due to typing errors and failures to connect components according to their interfaces. When messages are received, they are dispatched to methods that are appropriate for handling that specific message. We provide each message with a key - the command (see Figure 5.) - that is used to lookup a functor or lambda to handle processing of that messages. When we want to add new functionality to a server, we just design a new message format and a lambda to handle that message. Nothing else changes, so we get virtually no breakage when responding to new requirements.

Message-Passing Comm Classes

Each Peer contains a Comm instance. The Comm class composes a Sender and a Receiver instance, both of which use a thread-safe blocking queue and a socket library with a base Socket class and derivedf SocketListener and SocketConnecter classes. Each Receiver contains an instance of SocketListener, running on its own thread. When a connection is made, the SocketListerner instance creates an instance of a ClientHandler thread, passing it a Socket instance connected to the sender. The ClientHandler is responsible for framing messages drawn from its Socket instance and enqueuing them for server processing. Each Sender instance has a send thread that inspects input messages to get the destination address, and attempts to connect to that address, or uses an already open connection. Send and ClientHandler threads also collaborate to handle transmission of file chunks. Each Peer contains a Comm instance. The Comm class composes a Sender and a Receiver instance, both of which use a thread-safe blocking queue and a socket library with a base Socket class and derivedf SocketListener and SocketConnecter classes.

Server Threading

As shown in Figure 4., a server dequeues and processes messages with a single thread. Multiple clients are being served concurrently, as each ClientHandler thread is framing messages and/or receiving files, for potentially multiple concurrent clients. This style of processing is often referred to as a Single Threaded Apartment (STA), a term that was coined to describe one type of Microsoft COM processing in COM components. Virtually all Graphical User Interfaces, Windows message handling, and JavaScript (and node.js) execution use this form of processing. The advantages are that there won't be any thread conflicts when servers access a data store or open files because only one thread processes all messages. Should we decide that, for performance reasons, we need to use multi-threading for message processing, the server can always spawn new threads, and lock any shared resources.

Code Resources

You will find all the C++ code for a demonstration of Asynchronous Message-Passing in CppCommWithFileXfer. This was written using Visual Studio 2017 and its Visual C++ compiler and standard libraries. The Socket Library is also included with that project. You will also find C# code that implements an Asynchronous Message-Passing Communication channel in CsMessagePassingComm. That channel uses the .Net Windows Communication Foundation (WCF) framework instead of directly handling sockets. Note that you must run Visual Studio in administrator mode for this project to run successfully. That is a policy set by WCF. Note that Figures 1., 3., 4. and 6. are specific to the C++ channel. Figure 2. is specific to the C# WCF channel, and Figure 5. applies to both. The C# channel processing is similar to that shown in Figure 4., but some of the details are different, e.g., we use WCF instead of sockets.