A pipeline (aka ‘Pipes and Filters’) is a great architectural pattern for building a complex system composed of a sequence of simpler processes. But one of the weaknesses of a pipeline is that it doesn’t have a low response time when it is under load. In this post I describe a solution to this limitation.
A month ago, I was working an system that needed to be very flexible, required a sequence of steps to be executed and should be highly scalable, so a pipeline was the first thing that came to mind. A pipeline is a chain of producers and consumers of messages connected by queues (aka channels, pipes). A message is put on the first queue by some producer, and a consumer will take the message from the queue, does some processing and place a message (could be the same) on the following queue.
One advantage of pipelines is that they make it very easy to loosely couple processes:
- a producer of messages, only needs a reference to a pipe to put messages on.
- a consumer of messages, only needs a reference to a pipe to take messages from.
This means, that a producer is not coupled to a consumer behind it, and a consumer not to a producer in front of it. This makes it very easy to replace components or alter their behaviour.
Another big advantage of pipelines is that it is quite easy to make them multi-threaded (so great for using all cores/cpu’s of a system):
- every producer and consumer can run on its own thread. You can even use multiple threads for running a single producer or consumer. Tip: try to externalize the threading from the producers and consumers, and hook it up from outside.
- messages are ‘touched’ by at most a single producer/consumer at any given moment. This means, that messages are used in the isolation of a single thread, and this reduces the need to make them thread safe. With the introduction of the new Java Memory Model (JSR-133) in Java 5, save hand of also is a great new property of these structures because it helps to prevent visibility problems.
- by making queues blocking, producers block if they try to put an element on a full queue (great for graceful degradation), and consumers block if they try to take an element from an empty queue. Blocking calls help to reduce concurrency control related complexity, because this programming model makes synchronization with other threads largely transparent.
What is the problem
The problem with pipelines, is that they have a low response time when the system is under load. The cause of this problem, is that a message is not processed immediately, but put on a queue first, and it has to wait until all proceeding messages are processed. If there are many unprocessed messages (usually the case when a system is under load), the response-time will increase. If there are multiple queues (often the case in pipelines), the response time increases even more because the wait times are accumulated.
In the system I was working on, there were 2 message producers:
- scheduled process: on certain times, the scheduler triggers a process. This process could place a large amount of queries that need to be processed.
- user: the user also can place queries that need to be processed.
The problem is, that a standard pipeline is not responsive to user queries, when there are a lot of queries created by the scheduled process. This is not acceptable, because one of the non-functional requirements is that user queries should have a very small response time.
The solution is quite simple: add priorities to the messages and make the queues aware of these priorities: higher priority messages should be passed to consumers before lower priority messages. As soon as a high priority message is placed on the queue, it will be processed before all lower priority messages in that queue. This means that the response time of higher priority messages decreases, and response time for lower priority messages increases (in our case this behaviour is acceptable).
Luckily in Java there already is such a queue implementation: PriorityBlockingQueue. Warning: one of the problems of the PriorityBlockingQueue, is that it isn’t bounded: placing a limit on the maximum size is not available out of the box.
Also make sure that all the queues are sensitive to the priorities, and if a new message is created (based on an previous message), don’t forget to pass the priority to this new message.