Can your threads wait?

Introduction

After quite some time, I have decided to pick up my concurrency library and I hope to make a first release in a month. The library extends JSR-166 (the excellent concurrency library that is part of Java 5 and higher) and contains some goodies I have used in various server-side projects, like:

  1. Repeater: a structure that keeps repeating a task over and over again. The standard implementation is the ThreadPoolRepeater (it uses a pool of threads to run the task concurrently). Repeaters are great for setting up processes that need to block while waiting for input/output, like batch processes.
  2. BlockingExecutor: an Executor with more control on the blocking and timeout behavior.
  3. AwaitableReference: a synchronization structure that makes it easy to wait for a (non null) reference.
  4. LendableReference: a synchronization structure that looks a lot like the AwaitableReference, but the value needs to be returned before a new one can be set (although this depends on the implementation).
  5. a configurable ThreadFactory implementation. The JSR-166, contains a ThreadFactory interface, but not a configurable implementation. This is not very handy if need a lot of control on threads (especially on the server-side), eg: you want to run batch threads on a much lower priority than interactive threads.

The library will be open sourced and released under the MIT/BSD license.

WaitPoint

One of the things that annoyed me, while implementing this library, is that I had to reimplement waiting logic: making the ThreadPoolRepeater and ThreadPoolBlockingExecutor pausible for example. Finally it occurred to me, I needed a structure, threads can wait on: the WaitPoint. The WaitPoint is nothing more than a synchronization structure a Thread needs to pass to continue:

example:

while(true){
	waitpoint.pass();
	System.out.println("hello");
}

As long as the waitpoint allows to pass, hello is printed. As soon as it doesn’t allow passage, the thread blocks and nothing is printed. In essence the WaitPoint is just the waiting part of a Condition.

CloseableWaitPoint

The CloseableWaitPoint is WaitPoint implementation that can be openen and closed. If it is open, all pass request won’t block and the threads can continue what they were doing. But if it is closed, all threads that want to pass, block as long as it is closed. When it is opened again, the threads can continue.

But the CloseableWaitpoint still is a low level concurrency structure. A more high level structure is the BlockingQueue: excellent for sharing data between threads. I also made a new BlockingQueue implementation (it is a decorator): one where all puts go through a front-waitpoint and all takes go through a back-waitpoint (the front of the queue is where the puts take place, and the back of the queue is where the takes take place). Using these two waitpoints, you can control threads that want to put messages in the queue, or want to take them from the queue.

Pausible Executors

The cool thing is that this technique can be used to make a ThreadPoolExecutor pausible:

BlockingQueue<Runnable> targetQueue = new LinkedBlockingQueue<Runnable>(10);
CloseableWaitPoint frontCloseableWaitPoint = new CloseableWaitPoint();
CloseableWaitPoint backCloseableWaitPoint = new CloseableWaitPoint();
BlockingQueue<Runnable> workQueue = new
 	BlockingQueueWithWaitingTakesAndPuts<Runnable>(
 		targetQueue,
 		frontCloseableWaitPoint,
 		backCloseableWaitPoint);
ThreadPoolExecutor executor = new ThreadPoolExecutor(workQueue);

If you want to stop the acceptance of new tasks, do:

frontCloseableWaitPoint().close();

This gives the executor chance to execute all outstanding tasks, but new tasks aren’t accepted.

If you want to stop execution of tasks, you can close the back-end of the queue:

backCloseableWaitPoint().close();

If you leave the frontCloseableWaitPoint open, you still accept new tasks, but they won’t be processed as long the backCloseableWaitPoint is closed. I have used a similar approach with a LendeableReference and the ThreadPoolRepeater to make this structure pausible.

Other usages

By creating different WaitPoints you can customize waiting behaviour to a high degree without having to integrate it into the structure (Inversion of Control rocks). Because the waiting functionality is extracted into a seperate object, this object can be shared between a lot of structures: you can control a lot of structures with one WaitPoint. Another usage is throttling: you could create a ThrottlingWaitpoint to control the period between passes for example. By setting the minimum delay to 10 miliseconds, at most 100 passes per second are allowed. By using this waypoint, you can control the ‘speed’ of task execution of the Repeater for example, but I guess it can be used for a lot of things.

ps:
I’m sitting in the bus to my work, and another usage came to mind: it also can be used to regulate the capacity of structures that contain elements of some sort. There are already bounded implementations of the BlockingQueue, but moving this behavior in a separate structure, makes it reusable and reduces the complexity.

About these ads

2 Responses to Can your threads wait?

  1. There is a suggestion I would make based on our own experience with a similar concept. Basically, instead of “pass”, you should have both “enter” and “exit”, so that if the operation tht occurs after the “pass” call needs to complete before the “close”, you can be certain of that. We modeled it after a walled city with a gate, in which one can enter and exit the city, but when the gate is closed, no one can enter. (We also required all other threads other than the gate closer to leave before the gates finished closing.)

    Here’s from the JavaDoc:

    * Use this class in cases that large numbers of threads can operate
    * concurrently with an additional requirement that all threads be blocked for
    * certain operations. The algorithm is based on a gate concept, allowing
    * threads in (enter) and out (exit), but occasionally shutting the gate (close)
    * such that other threads cannot enter and exit. However, since threads may
    * “be inside”, the gate cannot fully close until they leave (exit). Once all
    * threads are out, the gate is closed, and can be re-opened (open) or
    * permanently closed (destroy).

    Peace,

    Cameron.

  2. pveentjer says:

    Hi Cameron,

    I think you have mentioned a good point. At the moment I’ll leave the Waitpoint as it is (so only passing, not entering/exiting) but as soon as I need it, I’m going to implement it. One good usage would be to place a limit on the number of concurrent lend values from the LendeableReference (this can’t be done with the standard WaitPoint because it can’t deal with ‘returning’).

    And the cool think is that a Waitpoint could be made from a EnteringWaitpoint (anyone has a better name?)

    class FooWaitPoint implements WaitPoint{
    private EnteringWaitPoint enteringWaitPoint;

    void pass(){
    enteringWaitpoint.enter();
    enteringWaitpoint.exit();
    }
    }

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: