Writing an Executor using STM

I’m working on the 0.3 release of Multiverse, a Java based STM implementation, creating replica’s of concurrent datastructures to understand better how it should evolve. The Executor from Java is a different type of datastructure than the standard queues/stacks (already available) etc, because it also needs to manage threads.

That is why I decided to create an example STM based implementation of the Executor interface. It is lazy with thread creation and has a dynamic poolsize.

@AtomicObject
public class StmExecutor implements Executor {

    private final DoubleLinkedList workerThreads;
    private final DoubleLinkedQueue workQueue;
    private StmExecutorState state;
    private int poolSize;

    public StmExecutor(int poolSize, int maxCapacity) {
        if (poolSize < 0) {
            throw new IllegalArgumentException();
        }

        this.workQueue = new DoubleLinkedQueue(maxCapacity);
        this.state = StmExecutorState.started;
        this.workerThreads = new DoubleLinkedList();
        this.poolSize = poolSize;
    }

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException();
        }

        if (state != StmExecutorState.started) {
            throw new RejectedExecutionException();
        }

        workQueue.push(task);

        if (poolSize > workerThreads.size() && workQueue.size() > 1) {
            createWorkers(1);
        }
    }

    public int getPoolSize() {
        return poolSize;
    }

    public int getActualPoolsize() {
        return workerThreads.size();
    }

    public boolean isShutdown() {
        return state == StmExecutorState.shutdown;
    }

    public void setPoolSize(int newPoolSize) {
        if (state != StmExecutorState.started) {
            throw new IllegalStateException();
        }

        createWorkers(newPoolSize - poolSize);
        this.poolSize = newPoolSize;
    }

    private void createWorkers(int workerCount) {
        if (workerCount > 0) {
            DoubleLinkedList newWorkers = new DoubleLinkedList();
            for (int k = 0; k < workerCount; k++) {
                newWorkers.add(new WorkerThread(this));
            }
            workerThreads.addAll(newWorkers);
            executePostCommit(new StartWorkerThreadsTask(newWorkers));
        }
    }

    public void shutdown() {
        if (workerThreads.isEmpty()) {
            state = StmExecutorState.shutdown;
        } else {
            state = StmExecutorState.shutdownInProgress;
        }
    }

    public void awaitShutdown() {
        if (!isShutdown()) {
            retry();
        }
    }

    //needed to overcome an instrumentation issue
    private StmExecutorState getState() {
        return state;
    }

    //needed to overcome an instrumentation issue
    private void setState(StmExecutorState state) {
        this.state = state;
    }

    //needed to overcome an instrumentation issue
    private DoubleLinkedList getWorkerThreads() {
        return workerThreads;
    }

    //needs to be a static inner class for now.
    static class WorkerThread extends Thread {
        private final StmExecutor executor;

        public WorkerThread(StmExecutor executor) {
            setName("Worker");
            this.executor = executor;
        }

        public void run() {
            boolean again;

            do {
                try {
                    again = runTask();
                } catch (Throwable ex) {
                    ex.printStackTrace();
                    again = true;
                }
            } while (again);
        }

        @AtomicMethod
        private boolean runTask() {
            if (isShuttingDown() || isTooMany()) {
                executor.getWorkerThreads().remove(this);

                if (isShuttingDown() && executor.getWorkerThreads().isEmpty()) {
                    executor.setState(StmExecutorState.shutdown);
                }
                return false;
            }

            Runnable task = executor.workQueue.take();

            task.run();
            return true;
        }

        private boolean isTooMany() {
            return executor.getActualPoolsize() > executor.getPoolSize();
        }

        private boolean isShuttingDown() {
            return executor.getState() == StmExecutorState.shutdownInProgress;
        }
    }

    static class StartWorkerThreadsTask implements Runnable {

        final DoubleLinkedList workerList;

        StartWorkerThreadsTask(DoubleLinkedList workerList) {
            this.workerList = workerList;
        }

        @Override
        public void run() {
            for (WorkerThread thread : workerList) {
                thread.start();
            }
        }
    }
}

enum StmExecutorState {
    started, shutdownInProgress, shutdown
}

As you can see, it is quite easy. And it certainly is a lot better than writing one using old school concurrency. I have written a few implementations for the Prometheus project, so I know how complex they are to write. Especially the closing down part. Another cool thing is that this executor can participate on already running transactions. So you don’t need to be worries about the fact that the work was placed on the queue even though the calling thread fails somehow later, so atomicity you also get for free.

There still are some issues although, if you look at the workqueue, there is lot of contention on the head and tail. A queue has just too strict ordering behaviour. So some kind of pessimistic locking mechanism (perhaps a less extreme version than the shared/exclusive.. where you still are allowed to do a write if others already obtained the lock), could help transactions to find non conflicting executions. This could be combined with something like striping.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: