Introduction
After quite some time, I have decided to pick up my concurrency library and I hope to make a first release in a month. The library extends JSR-166 (the excellent concurrency library that is part of Java 5 and higher) and contains some goodies I have used in various server-side projects, like:
- Repeater: a structure that keeps repeating a task over and over again. The standard implementation is the ThreadPoolRepeater (it uses a pool of threads to run the task concurrently). Repeaters are great for setting up processes that need to block while waiting for input/output, like batch processes.
- BlockingExecutor: an Executor with more control on the blocking and timeout behavior.
- AwaitableReference: a synchronization structure that makes it easy to wait for a (non null) reference.
- LendableReference: a synchronization structure that looks a lot like the AwaitableReference, but the value needs to be returned before a new one can be set (although this depends on the implementation).
- a configurable ThreadFactory implementation. The JSR-166, contains a ThreadFactory interface, but not a configurable implementation. This is not very handy if need a lot of control on threads (especially on the server-side), eg: you want to run batch threads on a much lower priority than interactive threads.
The library will be open sourced and released under the MIT/BSD license.
WaitPoint
One of the things that annoyed me, while implementing this library, is that I had to reimplement waiting logic: making the ThreadPoolRepeater and ThreadPoolBlockingExecutor pausible for example. Finally it occurred to me, I needed a structure, threads can wait on: the WaitPoint. The WaitPoint is nothing more than a synchronization structure a Thread needs to pass to continue:
example:
while(true){ waitpoint.pass(); System.out.println("hello"); }
As long as the waitpoint allows to pass, hello is printed. As soon as it doesn’t allow passage, the thread blocks and nothing is printed. In essence the WaitPoint is just the waiting part of a Condition.
CloseableWaitPoint
The CloseableWaitPoint is WaitPoint implementation that can be openen and closed. If it is open, all pass request won’t block and the threads can continue what they were doing. But if it is closed, all threads that want to pass, block as long as it is closed. When it is opened again, the threads can continue.
But the CloseableWaitpoint still is a low level concurrency structure. A more high level structure is the BlockingQueue: excellent for sharing data between threads. I also made a new BlockingQueue implementation (it is a decorator): one where all puts go through a front-waitpoint and all takes go through a back-waitpoint (the front of the queue is where the puts take place, and the back of the queue is where the takes take place). Using these two waitpoints, you can control threads that want to put messages in the queue, or want to take them from the queue.
Pausible Executors
The cool thing is that this technique can be used to make a ThreadPoolExecutor pausible:
BlockingQueue<Runnable> targetQueue = new LinkedBlockingQueue<Runnable>(10); CloseableWaitPoint frontCloseableWaitPoint = new CloseableWaitPoint(); CloseableWaitPoint backCloseableWaitPoint = new CloseableWaitPoint(); BlockingQueue<Runnable> workQueue = new BlockingQueueWithWaitingTakesAndPuts<Runnable>( targetQueue, frontCloseableWaitPoint, backCloseableWaitPoint); ThreadPoolExecutor executor = new ThreadPoolExecutor(workQueue);
If you want to stop the acceptance of new tasks, do:
frontCloseableWaitPoint().close();
This gives the executor chance to execute all outstanding tasks, but new tasks aren’t accepted.
If you want to stop execution of tasks, you can close the back-end of the queue:
backCloseableWaitPoint().close();
If you leave the frontCloseableWaitPoint open, you still accept new tasks, but they won’t be processed as long the backCloseableWaitPoint is closed. I have used a similar approach with a LendeableReference and the ThreadPoolRepeater to make this structure pausible.
Other usages
By creating different WaitPoints you can customize waiting behaviour to a high degree without having to integrate it into the structure (Inversion of Control rocks). Because the waiting functionality is extracted into a seperate object, this object can be shared between a lot of structures: you can control a lot of structures with one WaitPoint. Another usage is throttling: you could create a ThrottlingWaitpoint to control the period between passes for example. By setting the minimum delay to 10 miliseconds, at most 100 passes per second are allowed. By using this waypoint, you can control the ‘speed’ of task execution of the Repeater for example, but I guess it can be used for a lot of things.
ps:
I’m sitting in the bus to my work, and another usage came to mind: it also can be used to regulate the capacity of structures that contain elements of some sort. There are already bounded implementations of the BlockingQueue, but moving this behavior in a separate structure, makes it reusable and reduces the complexity.