Tuesday, August 21, 2007

Queue size monitoring

One of the things we would like to build into the server before moving on to any features and improvements is monitoring. First of all, we will measure queues' growth.
Monitoring is always a trade-off between overhead and precision. In our monitoring class, QueueSizeMonitor, we will have a parameter, called granularity, in milliseconds. It determines how often the measurements are taken. What does "take measurement" mean here?
Our monitoring of queue sizes is based on "counting". Messaging server calls count(long delta, String key) method on the monitoring object every time when the size of any queue changes. Argument delta is the value of the change. Argument key would normally be an event name (like queue X grows, queue X shrinks). This method accumulates values in a thread-local storage (i.e. every thread has its own copy of the counter). Every now and then (depending on granularity value) data from thread-local storages are promoted to the common storage. This operation entails some locking, that is why the smaller is the granularity, the bigger is the overhead of monitoring.
A background thread periodically flushes collected data to a file. Frequency of such flushing is determined by the window parameter. Flushing interval = granularity * window. File is written in XML format that looks like following:


<series>
<serie timestamp="1187734787600">
<item key="10|poll" value="54"/>
<item key="1|poll" value="165"/>
<item key="2|poll" value="4"/>
<item key="Distribution|poll" value="112"/>
<item key="Distribution|put" value="109"/>
</serie>
<serie timestamp="1187734787700">
<item key="1|poll" value="193"/>
<item key="2|poll" value="258"/>
<item key="4|poll" value="18"/>
<item key="5|poll" value="430"/>
<item key="7|poll" value="4"/>
<item key="8|poll" value="2"/>
<item key="Distribution|poll" value="452"/>
<item key="Distribution|put" value="454"/>
</serie>
</series>

Although the queue size monitor adds data dynamically, the XML file it produces is always well-formed. This is because it shifts position in the file back by 9 bytes (size of the </series> tag), then writes several <serie> tags, then adds </series> in the end. In most cases, XML will be well-formed even if the server crashes.

I haven't worked out yet the best set of values to monitor and the best way of representing them. However, as a start, I added simple class Chart that parses XML produced by the queue size monitor and shows line chart. It doesn't look very useful at the moment:

Monday, August 13, 2007

First files checked in

My project at sourceforge has been approved. So I have uploaded first source code:
http://diymiddleware.cvs.sourceforge.net/diymiddleware/
There are three files there:
  • Server1.java. You can run the server (does not require any arguments). It will try to bind to the port 22221 and listen for incoming client connection. Server never stops on its own, so you have to terminate it forcibly.
  • Client1.java. Non-runnable API-like client for the server. In order to use it, you need to instantiate it, passing host name of the server (DNS or IP address) as the argument. It will create a thread, which will be used to call your callbacks when you want to receive messages. Method "close" should be called in the end, it interrupts the thread. Three methods of this class correspond to the commands that the server understand:
    1. consume. Sends "REC" command. Takes regex pattern and callback. When there is a message sent, part of which matches the pattern, callback gets called (by only one thread per client instance).
    2. stop. Sends "STOP" command, canceling current "REC" subscription.
    3. send. Sends "M" command, which actually publishes message to the server.
  • Test1.java. Test program. Helps obtain some benchmarks. Currently, it tries to send 100 thousand messages of specified length to the server, using specified number of senders (each sender is an instance of Client1 class). There are also specified number of receivers (each receiver is an instance of Client1 class too). Senders send messages concurrently. Each sender sends equal number of messages, with total number around 100,000. First sender includes a string "P1P" into message, second one includes "P2P" and so on. First receiver subscribes to patter "P1P", second one - to "P2P" and so on. Hence, if number of senders more than number of receivers, some messages will not be received by anyone. Test measures 3 things:
    1. Average sending rate (messages per second). This is total number of sent messages divided by the total time spent on sending (only sending, not delivering).
    2. Average message delay. Every sender inserts current timestamp into message. Every receiver reads it and calculates delay.
    3. Maximum message delay.

Sunday, August 12, 2007

What is this all about

Last Thursday I spent few hours writing sample messaging middleware in Java (for a presentation on high-rate messaging in Java). I called it "DIY messaging server", "DIY messaging client" and "DIY messaging test". Features were:
  1. All messages are Java Strings
  2. A client can do 3 things to the server:
    1. "REC" (followed by a regular expression) - express interest in receiving all messages containing given regular expression
    2. "STOP" stop receiving any messages
    3. "M" (followed by a string) - send message for delivery
  3. Only one "subscription" per connected client is possible - any "REC" command cancels the effect of previous one.
  4. There is no acknowledgment and no flow control - i.e. senders are not getting any feedback from the server
Implementation used old-fashion blocking I/O mechanisms with 2 threads for every connected client (one thread is reading commands and accepting sent messages, another one writes to subscribers to deliver messages to them).

Here is the architecture. Clients can be either senders, or receiver or both at the same time. Every client has an open socket to the server (two arrows between client and server). Within the server, every client connection is serviced by two threads (shown as stars). When client is sending messages, corresponding thread reads them off the input stream of the socket and places into the distribution queue. Distribution queue is accessible to a number of distributor threads (4 stars in the middle of the server). Using their routing rules, which are updated by "REC" and "STOP" commands, these distributor threads are placing every message into zero, one or many output queues (there is one output queue per connected client that is currently interested in messages). Output queues are dispatched by the threads attached to the output streams of the sockets.

My next steps:
  1. Upload the current code somewhere to sourceforge and get Subversion access to it, so I can specify revision number for every blog entry
  2. Simple monitoring of the distribution queue and output queues. Monitoring will include size of the queues and also contribution of every sender
  3. Flow control post + simple flow control implementation. Judgment for flow control will be based on monitoring values and some parameters known as "watermark values"
  4. Post about unfairness of flow control
  5. Re-writing the code to use non-blocking I/O. This is first of all to compare performance and scalability (with flow control on)
  6. Post about TTL (Time-To-Live) as an alternative to Flow Control, which is more fair, but could lead to data loss. Concerns about memory usage because of keeping TTL.
  7. Flush messages to disk to avoid excessive memory consumption when using long TTL. Rotate the files in order to avoid indexing and use of embedded databases etc.
  8. Some other cool stuff to come...