Code Artistry - Message-Passing Communication with Queues
Introduction:
Message-Passing Communication (MPC) establishes a channel between processes to communciate by sending messages. A well
known example is the communication between browsers and web servers sending request and reply messages using the
HTTP protocol.
The simplest of these MPC systems is the client server model where a client sends a request message to a specified
server, the server processes the message, and sends back an appropriate reply message. This request / reply sequence
constitutes a talk protocol. The client initiates all message exchanges and the server simply responds. It has the
benefit of a very simple message exchange sequence, but has the liability that the client must wait for a response.
Channels:
Another style is based on one-way asynchronous message channels between a sender on one peer and a receiver on another, as shown
in the diagram below. Each of these "peers" might be located on seperate machines or in seperate processes in the same
machine.
Each peer contains a sender and receiver package. The sender communicates with one receiver at a time. The receivers
each handle concurrent senders by accepting messages in a receive queue. Often those messages are processed sequentially
by a single thread. This means that the receiver and consequential processing do not have to be thread safe, as long as
the receiving queue is thread-safe.
End Points:
Each receiver has a listener assigned to a specific port, and we describe the ip-address and port number as an endpoint
which we represent as an attribute pair ip : port. The endpoint is an address for a particular service provided by a peer.
Note that the diagram emphasizes that the communication endpoints are identical, even though the processing that is provided
at a particular endpoint may be quite different from that at another endpoint. The communication endpoints are peers, but
the machines are not.
Talk Protocol:
The resulting talk protocol is very simple because messages only flow one way in each channel. A sender can send to an
endpoint at any time, and a receiver can handle enqueued messages at any time. This makes a very flexible and fluid style
of communication. The sender does not wait for a response. After sending a message it may send messages to other peers
or do other processing. Eventually the receiver of the message may elect to send back a reply message to the sender, but
is not required to do so. The exchange is very like threads of email that we all use.
Messages:
Messages contain a destination address so that the sender can connect to that endpoint. If the reciever will eventually
reply, then, since there will be multiple senders, the message needs a return address. The message also needs to define
the requested operation and provide any parameters needed to carry out the requested action.
WCF Communication:
If we use a communcation framework
like Windows Communication Foundation (WCF), then each message will consist of a Simple Object Access Protocol (SOAP) wrapper
around a serialized instance of a data class that defines the request, the to and from addresses, and any parameters needed to
execute the request.
The diagram at the right shows one possible design for peer-to-peer communication using WCF. Each Receiver instantiates a
WCF service. The service objects simply enqueue incoming messages, for processing by the dequeueing thread. If a reply
is appropriate the message processor builds a reply message, using the incoming message return address and passes it to its
sender, usually with a PostMessage(msg) invocation.
Socket Communication:
If we craft our own communication infrastructure with sockets, we're likely to use HTTP style messages.
The HTTP protocol uses messages
that contain a header, consisting of text lines, where each line is an attribute pair. The message header is terminated with
a blank line. If the message contains a content-length:483 attribute, that implies that the header is followed with a body
containing 483 bytes of data, which need not be text.
Receivers instantiate a socket listener, running on its own thread. When a connection is established, the listener passes
a work-item to a thread pool thread that contains the connected socket and a reference to the blocking queue. When a thread
pool thread is available, it begins framing messages by pulling bytes out of the socket, reading lines from the message header
and pulling out the message body. It builds a message instance from the bytes it receives and enqueues for processing
by the Message Processing Thread.
Message Queues:
Each receiver has a thread-safe blocking queue that is shared by all senders to that endpoint. If we are using WCF, we
make the queue a static member of the service class so that every service instance shares the same queue, e.g., each sender
to that endpoint gets a service instance and the service simply enqueues messages for the endpoint's processing thread to
dequeue and process. This is the design we use for the last of the CSE681 - Software Modeling and Analysis projects.
If we are building a socket-based communication system, the listener socket, using threads from a threadpool, provides a
dedicated client handler thread with the connected socket and a reference to the shared blocking queue. In CSE687 - Object Oriented Design,
we will use this design for some projects.
File Transfer:
For sending a file between endpoints we could send blocks of bytes from the file in a sequence of messages, but it would be
somewhat more efficient to send a beginning message that identifies the file name, length, and block size, and then a sequence of
blocks of bytes, perhaps followed by a terminating message. Note that, while performing a file transfer, the service objects
will simply write incoming blocks into a file, but not post them to the receive queue. When transfer is complete it posts
a message to the queue so the message processor knows a file has arrived.
Channel Structure
We show, in Figure 4., details of the channel processing. Each peer has a Comm component that contains both a Sender
and a Receiver, so the Peers are depicted as mirror images in the diagram.
Socket Listeners are shown as boxes labeled SL. Receiving sockets are shown as boxes labeled S, and Sending sockets are
shown as boxes labeled with SC. These labels correspond to classes in the socket library, e.g., Socket, SocketListener,
and SocketConnector.
The gray clouds are ClientHandler instances connected to Peers not shown in the diagram. The white clouds are
ClientHandler instances used for a two-way conversation between the Peers shown in the diagram.
Remember that a sender does not wait for a response. It simply sends a message describing an activity to be conducted
by the receiver or information needed by the receiver, along with the destination address and its own return address for replies.
Should the recever need to reply, it simply sends a message to
the endpoint specified by the return address of the message it received.
This asynchronous message-passing communication is very fluid. It establishes conversations, much like email exchanges,
with its peers. Of course, this flexibility is bought at the expense of sockets and threads at each end, but is very
easy to set up and use.
Message Structure
Messages are similar to those used for HTTP communication. We add "to" and "from" attributes
and frequenly use custome attributes to provide information needed by the receiver. Our messages start with a
command line, but that is often different from the HTTP command lines holding GET, POST, PUT, DELETE, etc.
When sending binary data we usually use chunking. That may entail sending one chunk each in a sequence of messages,
or could be one initiating message describing the length of the binary body, and then sending a sequence of binary blocks
until the specified content-length has been sent. Any given application would use one or the other.
There is only one type of message, used in our channels, as this message type accomodates an open-ended
number of attributes, allowing us to send whatever information and commands are required by the other end.
What we have done is to push all of the application specific details out of the communication channel, and
into messsages that are composed by application senders and interpreted by application receivers.
This encourages a very flexible "duck-typing" style of design concentrated in message handling,
but keeping strict typing for all of the non-message communication parts.
Essentially, we get the best of
both worlds - very mutable message contents, supporting whatever the application needs, but strict control
of the channel typing which eliminates errors due to typing errors and failures to connect components
according to their interfaces.
When messages are received, they are dispatched to methods that are appropriate for handling that specific
message. We provide each message with a key - the command (see Figure 5.) - that is used to lookup a functor
or lambda to handle processing of that messages.
When we want to add new functionality to a server, we just design a new message format and a lambda to handle
that message. Nothing else changes, so we get virtually no breakage when responding to new requirements.
Message-Passing Comm Classes
Each Peer contains a Comm instance. The Comm class composes a Sender and a Receiver instance, both of
which use a thread-safe blocking queue and a socket library with a base Socket class and derivedf
SocketListener and SocketConnecter classes.
Each Receiver contains an instance of SocketListener, running on its own thread. When a connection is
made, the SocketListerner instance creates an instance of a ClientHandler thread, passing it a Socket
instance connected to the sender. The ClientHandler is responsible for framing messages drawn from its
Socket instance and enqueuing them for server processing.
Each Sender instance has a send thread that inspects input messages to get the destination address,
and attempts to connect to that address, or uses an already open connection. Send and ClientHandler
threads also collaborate to handle transmission of file chunks.
Each Peer contains a Comm instance. The Comm class composes a Sender and a Receiver instance, both of
which use a thread-safe blocking queue and a socket library with a base Socket class and derivedf
SocketListener and SocketConnecter classes.
Server Threading
As shown in Figure 4., a server dequeues and processes messages with a single thread. Multiple clients
are being served concurrently, as each ClientHandler thread is framing messages and/or receiving files,
for potentially multiple concurrent clients. This style of processing is often referred to as a Single
Threaded Apartment (STA), a term that was coined to describe one type of Microsoft COM processing in COM
components. Virtually all Graphical User Interfaces, Windows message handling, and JavaScript (and
node.js) execution use this form of processing.
The advantages are that there won't be any thread conflicts when servers access a data store or open files
because only one thread processes all messages. Should we decide that, for performance reasons, we need
to use multi-threading for message processing, the server can always spawn new threads, and lock any
shared resources.
Code Resources
You will find all the C++ code for a demonstration of Asynchronous Message-Passing in
CppCommWithFileXfer. This was written using Visual Studio 2017
and its Visual C++ compiler and standard libraries. The Socket Library is also included with that project.
You will also find C# code that implements an Asynchronous Message-Passing Communication channel in
CsMessagePassingComm. That channel uses the .Net
Windows Communication Foundation (WCF) framework instead of directly handling sockets. Note that you must run
Visual Studio in administrator mode for this project to run successfully. That is a policy set by WCF.
Note that Figures 1., 3., 4. and 6. are specific to the C++ channel. Figure 2. is specific to the C# WCF channel,
and Figure 5. applies to both. The C# channel processing is similar to that shown in Figure 4., but some of the
details are different, e.g., we use WCF instead of sockets.