Multiverse: STM for Scala? Part I

August 31, 2009

Multiverse is a Java based STM implementation and last week I have been playing with integrating Multiverse in Scala. And since I have no practical experience with Scala and the Java language is not going anywhere (any more) it sounded like a nice opportunity to get started with Scala.

Multiverse supports 2 models for creating atomic objects (objects managed by the STM) in the Java language. The simplest approach is adding an @AtomicObject annotation to a Pojo:

@AtomicObject
class Person{
   private int age;
   public int getAge(){return age;}
   public void setAge(int newAge){this.age = newAge;}
}

All changes made to person are atomic and isolated and the cool thing is that the get/setAge methods can participate in larger transactions (this solves the composability problem lock based approaches are suffering from). Using an @AtomicObject annotation is simple because you still can write normal Pojo’s.

The second model is using explicit refs (similar like Clojure refs).

class Person{
   private final Ref age = new Ref();
   public int getAge(){return age.get();}
   public void setAge(int newAge){age.set(newAge;}  
}

Multiverse doesn’t care which model is being used. The first has a better syntax and probably is going to perform better when the Pojo has more than 1 field (although it could also lead to false write-conflicts). But the second model is very easy to integrate with Scala and doesn’t rely on instrumentation. In the future the first model is going to be added to Scala as well, but I need to have a better understanding of how Scala is compiled to bytecode.

One of the cool things about Scala is that it is very easy to add new language constructs using closures. If we look at the atomic functionality (so wrapping a transaction around a set of operations) in Java, we need to write a lot of verbose code:

int sum = new AtomicTemplate(){
    public Integer run(Transaction t){
         return person1.getAge()+person2.getAge()
   }
}.execute();

But in Scala you can say:

val sum = atomic{person1.age+person2.age}

Using the following closure:

 def atomic[E](body: => E): E = {
    new AtomicTemplate[E] {
      def execute(t: Transaction) = body
    }.execute()
  }

Isn’t that cool?

In the following blogposts I’ll explain the support for the retry and orelse mechanisms.


Spring AOP: Too complex for day to day use.

August 17, 2009

I have done quite a lot with Spring and AOP in the past (have been a long time Spring supporter), but every time I need it in a project and look at the reference documentation, I find it more and more confusing and getting too complex for day to day use (especially if not every team member is a Spring expert). It is not because I don’t understand AOP:
– I have used it for quite some time
– I do quite a lot of bytecode manipulation on the Multiverse project.

What is wrong with Spring AOP?
– everything can be combined with everything in all kinds of different ways (so too many permutations)
– there are 10.000 ways to configure it and the configuration can be spread all over the place (including in the source).
– Spring documentation (which is very detailed) is not written in a way to get you up and running fast (so a complete working copy/paste hello world solution).
– bytecode weaving of AspectJ gives problems with load time weaving (administrators, influences other applications on the same jvm) but compile time weaving in combination with Maven also gives problems because the bytecode is modified before the unit tests are executed.

It doesn’t mean that Spring AOP is bad from a technology point of view, but it certainly is bad from a productivity point of view. I love to solve complex problems, but I don’t like to solve problems that are needlessly complex. And Spring AOP imho is a good example of Spring becoming the problem it tried to solve: needless complexity.


Writing an Executor using STM

August 9, 2009

I’m working on the 0.3 release of Multiverse, a Java based STM implementation, creating replica’s of concurrent datastructures to understand better how it should evolve. The Executor from Java is a different type of datastructure than the standard queues/stacks (already available) etc, because it also needs to manage threads.

That is why I decided to create an example STM based implementation of the Executor interface. It is lazy with thread creation and has a dynamic poolsize.

@AtomicObject
public class StmExecutor implements Executor {

    private final DoubleLinkedList workerThreads;
    private final DoubleLinkedQueue workQueue;
    private StmExecutorState state;
    private int poolSize;

    public StmExecutor(int poolSize, int maxCapacity) {
        if (poolSize < 0) {
            throw new IllegalArgumentException();
        }

        this.workQueue = new DoubleLinkedQueue(maxCapacity);
        this.state = StmExecutorState.started;
        this.workerThreads = new DoubleLinkedList();
        this.poolSize = poolSize;
    }

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException();
        }

        if (state != StmExecutorState.started) {
            throw new RejectedExecutionException();
        }

        workQueue.push(task);

        if (poolSize > workerThreads.size() && workQueue.size() > 1) {
            createWorkers(1);
        }
    }

    public int getPoolSize() {
        return poolSize;
    }

    public int getActualPoolsize() {
        return workerThreads.size();
    }

    public boolean isShutdown() {
        return state == StmExecutorState.shutdown;
    }

    public void setPoolSize(int newPoolSize) {
        if (state != StmExecutorState.started) {
            throw new IllegalStateException();
        }

        createWorkers(newPoolSize - poolSize);
        this.poolSize = newPoolSize;
    }

    private void createWorkers(int workerCount) {
        if (workerCount > 0) {
            DoubleLinkedList newWorkers = new DoubleLinkedList();
            for (int k = 0; k < workerCount; k++) {
                newWorkers.add(new WorkerThread(this));
            }
            workerThreads.addAll(newWorkers);
            executePostCommit(new StartWorkerThreadsTask(newWorkers));
        }
    }

    public void shutdown() {
        if (workerThreads.isEmpty()) {
            state = StmExecutorState.shutdown;
        } else {
            state = StmExecutorState.shutdownInProgress;
        }
    }

    public void awaitShutdown() {
        if (!isShutdown()) {
            retry();
        }
    }

    //needed to overcome an instrumentation issue
    private StmExecutorState getState() {
        return state;
    }

    //needed to overcome an instrumentation issue
    private void setState(StmExecutorState state) {
        this.state = state;
    }

    //needed to overcome an instrumentation issue
    private DoubleLinkedList getWorkerThreads() {
        return workerThreads;
    }

    //needs to be a static inner class for now.
    static class WorkerThread extends Thread {
        private final StmExecutor executor;

        public WorkerThread(StmExecutor executor) {
            setName("Worker");
            this.executor = executor;
        }

        public void run() {
            boolean again;

            do {
                try {
                    again = runTask();
                } catch (Throwable ex) {
                    ex.printStackTrace();
                    again = true;
                }
            } while (again);
        }

        @AtomicMethod
        private boolean runTask() {
            if (isShuttingDown() || isTooMany()) {
                executor.getWorkerThreads().remove(this);

                if (isShuttingDown() && executor.getWorkerThreads().isEmpty()) {
                    executor.setState(StmExecutorState.shutdown);
                }
                return false;
            }

            Runnable task = executor.workQueue.take();

            task.run();
            return true;
        }

        private boolean isTooMany() {
            return executor.getActualPoolsize() > executor.getPoolSize();
        }

        private boolean isShuttingDown() {
            return executor.getState() == StmExecutorState.shutdownInProgress;
        }
    }

    static class StartWorkerThreadsTask implements Runnable {

        final DoubleLinkedList workerList;

        StartWorkerThreadsTask(DoubleLinkedList workerList) {
            this.workerList = workerList;
        }

        @Override
        public void run() {
            for (WorkerThread thread : workerList) {
                thread.start();
            }
        }
    }
}

enum StmExecutorState {
    started, shutdownInProgress, shutdown
}

As you can see, it is quite easy. And it certainly is a lot better than writing one using old school concurrency. I have written a few implementations for the Prometheus project, so I know how complex they are to write. Especially the closing down part. Another cool thing is that this executor can participate on already running transactions. So you don’t need to be worries about the fact that the work was placed on the queue even though the calling thread fails somehow later, so atomicity you also get for free.

There still are some issues although, if you look at the workqueue, there is lot of contention on the head and tail. A queue has just too strict ordering behaviour. So some kind of pessimistic locking mechanism (perhaps a less extreme version than the shared/exclusive.. where you still are allowed to do a write if others already obtained the lock), could help transactions to find non conflicting executions. This could be combined with something like striping.


Remapping a method with ASM

August 2, 2009

Last few weeks I have been working at home on Multiverse (summer vacation), a Java based STM implementation. I use ASM based instrumentation to transforms POJO’s (with some additional annotations) so that certain interfaces and method implementations are added.

With Multiverse 0.2, I did all the method generation by hand (manually written bytecode) and this is a very time consuming and error-prone task. That is why I came up with a different idea for 0.3: make an abstract class that contains (most of) the implementation, and move the code from that class to another (essentially the class has become a mixin). By copying the methods/fields instead of making the mixin a super class, it prevents imposing limitations on the class hierarchy. Luckily ASM has some functionality for this called the RemappingMethodAdapter. The problem is that this functionality is made to be used in the visitor api of ASM and not the Tree API and I’m using the latter one.

So I wrote function that iterates over the bytecode and transforms it. The problem is that this leads to more code to maintain and test. RĂ©mi Forax of the ASM discussion group made a suggestion that the RemappingMethodAdapter can be used with the Tree api because the MethodNode has an accept function.

So to make a long story short, this is the code I’m using to remap a method from one class to another. And I hope it helps other people struggling with the same problems:

public static MethodNode remap(MethodNode originalMethod, Remapper remapper) {
        String[] exceptions = getExceptions(originalMethod);

        MethodNode mappedMethod = new MethodNode(
                originalMethod.access,
                originalMethod.name,
                remapper.mapMethodDesc(originalMethod.desc),
                remapper.mapSignature(originalMethod.signature, false),
                remapper.mapTypes(exceptions));

        RemappingMethodAdapter remapVisitor = new RemappingMethodAdapter(
                mappedMethod.access,
                mappedMethod.desc,
                mappedMethod,
                remapper);
        originalMethod.accept(remapVisitor);
        return mappedMethod;
    }

    public static String[] getExceptions(MethodNode originalMethod) {
        if (originalMethod.exceptions == null) {
            return new String[]{};
        }

        String[] exceptions = new String[originalMethod.exceptions.size()];
        originalMethod.exceptions.toArray(exceptions);
        return exceptions;
    }