Blocking Stacks and Queue’s in an STM

I’m currently working on a Java based STM implementation with deferred writes, object granularity and it relies a lot on multiversioning (just like Oracle) and less on pessimistic locking. This weekend I completed the STM version of condition variables and I’m glad I got my Stack and Queue up and running inside a multithreaded producer consumer example. Writing ‘concurrent’ code for SMT’s is a lot different than the traditional approach with locks/conditions and the like.

This is an example of the Stack that can be used safely inside an STM.

public class Stack<E> {

    private Node<E> head;

    public void push(E item) {
        if (item == null) throw new NullPointerException();
        head = new Node(item, head);
    }

    public E peek() {
        if (head == null)
            return null;

        return removeTopItem();
    }

    public E pop() {
        if (head == null)
            retry();

        return removeTopItem();
    }

    private E removeTopItem() {
        Node<E> oldHead = head;
        head = head.parent;
        return oldHead.value;
    }

    public int size() {
        return head == null ? 0 : head.size();
    }

    public boolean isEmpty() {
        return size() == 0;
    }

    public static class Node<E> {
        final E value;
        final Node parent;

        Node(E value, Node prev) {
            this.value = value;
            this.parent = prev;
        }

        int size() {
            if (parent == null)
                return 1;
            else
                return parent.size() + 1;
        }
    }

As you can see there is no concurrency logic here apart from the retry functionality (that works like a condition variable). At the moment I need to instrument the classes manually so they can be part of the STM, and after the instrumentation it looks like this:

public class Stack<E> implements Citizen {

    private Node<E> head;

    public void push(E item) {
        if (item == null) throw new NullPointerException();
        head = new Node(item, head);
    }

    public E peek() {
        if (head == null)
            return null;

        return removeTopItem();
    }

    public E pop() {
        if (head == null)
            retry();

        return removeTopItem();
    }

    private E removeTopItem() {
        Node<E> oldHead = head;
        head = head.parent;
        return oldHead.value;
    }

    public int size() {
        return head == null ? 0 : head.size();
    }

    public boolean isEmpty() {
        return size() == 0;
    }

    public static class Node<E> {
        final E value;
        final Node parent;

        Node(E value, Node prev) {
            this.value = value;
            this.parent = prev;
        }

        int size() {
            if (parent == null)
                return 1;
            else
                return parent.size() + 1;
        }
    }

    //================== generated  ======================

    private Node head_initial;
    private long ptr;
    private MultiversionedStm.MultiversionedTransaction transaction;

    public Iterator<Citizen> ___findNewlyborns() {
        return EmptyIterator.INSTANCE;
    }

    public void ___onAttach(MultiversionedStm.MultiversionedTransaction transaction) {
        this.transaction = transaction;
    }

    public MultiversionedStm.MultiversionedTransaction ___getTransaction() {
        return transaction;
    }

    public long ___getPointer() {
        return ptr;
    }

    public void ___setPointer(long ptr) {
        this.ptr = ptr;
    }

    public HydratedStack ___hydrate() {
        return new HydratedStack(head);
    }

    public boolean ___isDirty() {
        return head != head_initial;
    }

    public static class HydratedStack implements HydratedCitizen {
        private final Node head;

        public HydratedStack(Node head) {
            this.head = head;
        }

        public Stack dehydrate(long ptr, MultiversionedStm.MultiversionedTransaction transaction) {
            Stack stack = new Stack();
            stack.head = head;
            stack.head_initial = head;
            stack.transaction = transaction;
            stack.ptr = ptr;
            return stack;
        }
    }
}

And based on this stack it was easy to create a Queue that is quite concurrent because puts and takes can be executed concurrently in most situations (as long as the poppes see a non empty readyToPopStack).

public class Queue<E> {

    private Stack<E> readyToPopStack = new Stack<E>();
    private Stack<E> pushedStack = new Stack<E>();

    public E pop() {
        if (!readyToPopStack.isEmpty())
            return readyToPopStack.pop();

        while (!pushedStack.isEmpty()) {
            E item = pushedStack.pop();
            readyToPopStack.push(item);
        }

        return readyToPopStack.pop();
    }

    public void push(E value) {
        pushedStack.push(value);
    }

    public boolean isEmpty() {
        return size() == 0;
    }

    public int size() {
        return readyToPopStack.size() + pushedStack.size();
    } 

As you can see, with STM’s there is a lot less worrying about threadsafety. And we can focus on the core logic again. I think that STM’s could mean for Enterprise Java what OR-mappers meant for Java many years ago.

Advertisements

6 Responses to Blocking Stacks and Queue’s in an STM

  1. How does this ensure that the FIFO order of the queue is preserved in pop()? Does pop() run in a single transaction? If not, then you might get a new item pushed on to the pushedStack whilst the popping thread is only half-way through transferring the items to the readyToPopStack.

    If it *does* run in a single transaction, how do you handle the contention between a pop() call that needs to transfer from the pushedStack and a separate push()? Does your “retry()” call in the waiting thread block until the other thread has completed its transaction?

  2. pveentjer says:

    The pop of the queue runs inside a single transaction. So it can’t get out of order (it can observe changes made by other transactions because that would violate the isolation). If you look at the size method of the queue, within a transaction it will always returns the correct number of items (it can’t happen that it shows changes made by other threads).

    How does it handle content?
    If there is ‘contention’ (in my stm that would be a write conflict) one of the transactions is aborted and retried. The STM at the moment doesn’t rely on pessimistic locking btw, I use multiversioning in combination with optimistic locking. Pessimistic locking is planned (I’m going to use a similar approach as being used in oracle and other mvcc databases).

    The retry method throws a RetryException and the whole transaction is aborted. But based on the version of the transaction and the cells being read, a new transaction is started when one of the read cells has received an update so a new transaction is likely to make more progress,

    This is the TransactionTemplate:

    public abstract class TransactionTemplate {

    private final Stm stm;
    private final AtomicInteger retryCount = new AtomicInteger();

    protected TransactionTemplate(Stm stm) {
    if (stm == null) throw new NullPointerException();
    this.stm = stm;
    }

    public long getRetryCount() {
    return retryCount.longValue();
    }

    public Stm getStm() {
    return stm;
    }

    abstract protected Object execute(Transaction t) throws Exception;

    public final Object execute() {
    try {
    boolean success = false;
    long[] addresses = null;
    long version = -1;
    Object result = null;
    do {
    Transaction transaction = addresses == null ? stm.startTransaction() : stm.startTransaction(addresses, version);
    try {
    version = -1;
    addresses = null;
    result = execute(transaction);
    transaction.commit();
    success = true;
    } catch (RetryException ex) {
    System.out.println(Thread.currentThread()+” retried”);
    transaction.abort();
    addresses = transaction.getReadAddresses();
    version = transaction.getVersion();
    } catch (AbortedException ex) {
    System.out.println(Thread.currentThread()+” aborted”);
    transaction.abort();
    } catch (Exception ex) {
    throw new RuntimeException(ex);
    }
    } while (!success);

    return result;
    } catch (InterruptedException ex) {
    Thread.interrupted();
    throw new RuntimeException(ex);
    }
    }
    }

    Here you can see the waiting functionality:
    public MultiversionedTransaction startTransaction(long[] addresses, long version) throws InterruptedException {
    Latch latch = heap.listen(addresses, version);
    latch.await();
    return startTransaction();
    }

  3. OK, that makes sense. So, in this case if a pop() is in progress (and transferring items from one stack to the other), a push() would discover that there was a write conflict when it tried to update the pushedStack. It would then wait (in heap.listen) until there was a new version in pushedStack.head, which requires the pop() transaction to complete. Right?

    My concern with such a scheme is that if the pop() calls are too infrequent then the transfer of items from pushedStack to readyToPopStack will take a long time and thus block both push() threads and pop() threads in the mean time. Maybe that doesn’t matter in practice.

  4. Raoul Duke says:

    i’m very excited about alternative approaches to dealing with concurrency. but some folks apparently worry that maybe it will just lead to lots of hard to debug and fix contention?

  5. pveentjer says:

    @Anthony Williams
    answer to your question: That is correct

    Your concern: it could be a problem. To be honest, I don’t have enough experience to see if that is a problem. The main goal behind the STM is to gain more insights and to get such questions answered.

    @ Raoul:
    Hi Raoul, it could be a problem. But on the other hand, the approach I’m using (MVCC) is being used in databases like Oracle for quite a long time. STM are just ACID databases without the D 🙂

    At the moment I’m creating a new implementation (once you have done the first, the second one is a lot easier) where the ‘stop the world’ lock is removed. The first implementation requires such a lock for committing changes to the heap and checking for write conflicts. The new implementation uses a non blocking approach where each transaction builds his own new view of the reality and tries to set that as the active view. I do a lot with immutable objects, so it isn’t as expensive as it sounds.

    The new implementation also lifts on the garbage collector of the JVM instead of having to write one yourself. All objects that are dehydrated, maintain references to other dehydrated objects. And all dehydrated root objects are part of the current snapshot (this is stored in the stm) So objects that only can be reached from older snapshots will be removed because the snapshot itself will be removed. Each transaction gets a snapshot when it starts, so transactions are keeping older dehydrated objects alive. Once the transaction is garbage collected, the snapshot can be garbage collected, and therfor the dehydrated objects can be garbage collected. It leads to an increased usage of memory, but I hope also to an increased concurrency. It is a nice experiment..

  6. Guy Korland says:

    Hi,

    I think you might want to try DeuceSTM.
    This project wasn’t publicly published yet but already has few users and developers.
    Currently, it provides two different leading algorithms for STM (TL2 & LSA).
    But the many thing here is that it provides an easy way for other algorithm to be implemented.
    All you have to do is to extend one context class and you’ll get for free byte code manipulation that will weave the code with your algorithm.

    You can contact me for more information gkorland *at* gmail *dot* com

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: