Daniel Tisdall


concurrency | correctness | sw eng

Large-scale Incremental Processing Using Distributed Transactions and Notifications a.k.a. Percolator

Apparently this paper made a splash in the transaction processing world. Google presents Percolator, a system that provides random concurrent access to a multi petabyte repository while maintaining invariants. The paper is from 2010, so it might not be state of the art anymore.

Percolator requires a layer beneath it. The layer provides access to values with tuple shaped keys. The tuple is made from a row, column and timestamp. The layer provides atomic read/write access to single rows. Additionally, Percolator requires a global monotonically increasing timestamp oracle.

Given a reliable base layer, Percolator provides atomic multi-row transactions with snapshot isolation, in the presence of client failures.

Definition reminder

Snapshot Isolation: each transaction reads a data snapshot from a start timestamp. The start timestamp is taken between the start of the transaction and the transaction’s first read. A transaction running in snapshot isolation is never blocked while reading. Write-write conflict is not allowed and will cause a transaction to abort.

Algorithm

The algorithm has a two phase commit structure. Writers non atomically lock all the items they are interested in, starting with a primary lock. Each lock is associated to data. Once all locks have been acquired, the writer atomically replaces the primary lock with a pointer to the corresponding data. After the atomic replacement, the write is committed. The transaction completes by roll-forward: deleting the remaining non-primary locks and replacing them with pointers to the corresponding data.

The algorithm is made more interesting by the multi-versioning. Each item has a multiple versions. When an item is locked, the associated data is written to a new version but concurrent readers will continue to read the old version until the write transaction commits and a pointer to the new version is made visible.

The diagram shows a state with three items. Some transactions have committed, writing pointers in the write column pointing into the data column. However, there is an uncompleted (but committed) transaction. This is evident from the lock pointer at (item 2, time 42) which points to an (absent) lock at (item 0, time 42). A pointer to an absent lock indicates that the transaction either committed or was rolled back. If there is data at associated to the absent lock then the transaction committed, otherwise it rolled back. In this case, there is associated data ((item 0, time 42)) so the transaction committed.

Rollback and rollforward

The algorithm is cooperative in the sense that readers roll forward or backward crashed writes. There’s no way for a reader to definitively know if a writer has crashed or not so it is correct for a reader to always cooperate. From a modeling point of view, it’s then simpler to only model this cooperation, and not have the writers do any work at all: that is, shift the work entirely to the readers.

Property checking

Properties like snapshot isolation aren’t trivial to model check [5] and doing so for this model would probably require writing as much or more TLA+ again than is already written. Right now I’d rather move on to learning more algorithms. I would like to revisit at some point and check a simpler property like transaction atomicity.

Code

VARIABLES
    \* @type: TIME;
    time,
    \* @type: (IID, TIME) => Int; Value
    data,
    \* @type: (IID, TIME) => IID; Points to IID of primary or 0 for Null
    lock,
    \* @type: (IID, TIME) => TIME; Points to time of data, or 0 for Null
    write,
    \* @type: Set(WRITE_TRANSACTION);
    write_transactions,
    \* @type: Set(READ_TRANSACTION);
    read_transactions

IsLockedFrom(iid, t) ==         \E k \in {iid} \X TIME_RANGE : t <= k[2] /\ lock[k] # NullInt
IsWritFrom(iid, t) ==           \E k \in {iid} \X TIME_RANGE : t <= k[2] /\ write[k] # NullInt
IsLockedInRange(iid, t0, t1) == \E k \in {iid} \X TIME_RANGE : t0 <= k[2] /\ k[2] <= t1 /\ lock[k] # NullInt
IsWritInRange(iid, t0, t1) ==   \E k \in {iid} \X TIME_RANGE : t0 <= k[2] /\ k[2] <= t1 /\ write[k] # NullInt

Init == 
    /\ time = 1
    /\ data = [k \in KEYS |-> NullInt]
    /\ lock = [k \in KEYS |-> NullInt]
    /\ write = [k \in KEYS |-> NullInt]
    /\ write_transactions = {}
    /\ read_transactions = {}

NewWriteTransaction(p) ==
    /\ UNCHANGED data
    /\ UNCHANGED lock
    /\ UNCHANGED write
    /\ \E f \in WRITES : 
        \E primary \in DOMAIN f:
            write_transactions' = write_transactions \cup {[
                pid |-> p,
                start |-> time,
                value |-> f,
                iids |-> DOMAIN f,
                primary |-> primary,
                prewritten |-> {}
            ]}

NewReadTransaction(p) ==
    /\ UNCHANGED data
    /\ UNCHANGED lock
    /\ UNCHANGED write
    /\ \E iid \in IIDS : 
        read_transactions' = read_transactions \cup {[
            pid |-> p,
            start |-> time,
            iid |-> iid
        ]}

Write(t) ==
    LET
    Prewrite(iid) ==
        IF IsWritFrom(iid, t.start) \/ IsLockedFrom(iid, 0)
        THEN 
            (*Abort*)
            /\ UNCHANGED data
            /\ UNCHANGED lock
            /\ UNCHANGED write
            /\ write_transactions' = write_transactions \ {t}
        ELSE
            (*Succeed*)
            /\ data' = [data EXCEPT ![iid, t.start] = t.value[iid]]
            /\ lock' = [lock EXCEPT ![iid, t.start] = t.primary]
            /\ UNCHANGED write
            /\ write_transactions' = (write_transactions \ {t}) \cup {[t EXCEPT !.prewritten = @ \cup {iid}]}

    Commit(iid) ==
        IF lock[iid, t.start] = NullInt
        THEN 
            (*Abort*)
            /\ UNCHANGED data
            /\ UNCHANGED lock
            /\ UNCHANGED write
            /\ write_transactions' = write_transactions \ {t}
        ELSE
            (*Succeed*)
            /\ UNCHANGED data
            /\ lock' = [lock EXCEPT ![iid, t.start] = NullInt]
            /\ write' = [write EXCEPT ![iid, time] = t.start]
            /\ write_transactions' = write_transactions \ {t}

    IN
    CASE 
    (*Try lock the primary*)
        t.primary \notin t.prewritten 
                                                    -> Prewrite(t.primary)
    []
    (*Try lock a non primary*)
        /\ t.primary \in t.prewritten
        /\ t.iids \ t.prewritten # {}
                                                    -> 
                                                    LET 
                                                    iid == CHOOSE iid \in (t.iids \ t.prewritten) : TRUE
                                                    IN
                                                    Prewrite(iid)
    []
    (*Try commit the primary*)
        \A iid \in t.iids : iid \in t.prewritten
                                                    -> Commit(t.primary)
    
Read(t) ==
    LET
    DoRead == 
        \* ... could do something with the latest write time in range(0, time), for verification purposes
        \* but I'll skip that as more interested in learning the algorithm than verifying it.
        /\ UNCHANGED data
        /\ UNCHANGED lock
        /\ UNCHANGED write
        /\ read_transactions' = read_transactions \ {t}
    RollBack == 
        /\ data' = [data EXCEPT ![t.iid, t.start] = NullInt]
        /\ lock' = [lock EXCEPT ![t.iid, t.start] = NullInt]
        /\ UNCHANGED write
        /\ UNCHANGED read_transactions
    RollForward(primaryIID) == 
        (*
        We must know the IID of the primary in order to locate the entry in the write column
        that commits the write. We take the time of that committing entry and use it to rollforward.
        *)
        LET 
            CommitTime == CHOOSE ct \in TIME_RANGE : write[primaryIID, ct] = t.start
        IN
        /\ UNCHANGED data
        /\ lock' = [lock EXCEPT ![t.iid, t.start] = NullInt]
        /\ write' = [write EXCEPT ![t.iid, CommitTime] = t.start]
        /\ UNCHANGED read_transactions
    DoNothing == 
        /\ UNCHANGED data
        /\ UNCHANGED lock
        /\ UNCHANGED write
        /\ UNCHANGED read_transactions
    IN
    CASE
    (*Actually read*)
        /\ ~IsLockedInRange(t.iid, 1, t.start)
        /\ IsWritInRange(t.iid, 1, t.start)
                                                    -> DoRead
    []
    (*
        Find a primary lock.
        Delete the lock and the data.
    *)
        /\ lock[t.iid, t.start] = t.iid 
                                                    -> RollBack
    []
    (*
        Find a non primary lock pointing to deleted primary lock.
        Delete the non primary lock and the data.
    *)
        /\ lock[t.iid, t.start] # NullInt
        /\ lock[t.iid, t.start] # t.iid
        /\ lock[lock[t.iid, t.start]] = NullInt
        /\ data[lock[t.iid, t.start]] = NullInt
                                                    -> RollBack
    []
    (*
        Find a non primary lock pointing to deleted primary lock.
        The primary row has data (so there must be a write pointing to the data)
        Replace the lock with a write pointing to the data, at the commit time.
    *)
        /\ lock[t.iid, t.start] # NullInt
        /\ lock[t.iid, t.start] # t.iid
        /\ lock[lock[t.iid, t.start]] = NullInt
        /\ data[lock[t.iid, t.start]] # NullInt
                                                    -> RollForward(lock[t.iid, t.start])

    [] OTHER -> DoNothing \* If find non primary lock pointing to a non-deleted primary lock, for example.

WriterCrash(p, t) == 
    /\ UNCHANGED data
    /\ UNCHANGED lock
    /\ UNCHANGED write
    /\ write_transactions' = write_transactions \ {t}
    /\ UNCHANGED read_transactions

NextTransition == 
    \E p \in PIDS : 
        \/
            /\ ~(\E t \in write_transactions : t.pid = p)
            /\ NewWriteTransaction(p)
            /\ UNCHANGED read_transactions
        \/
            /\ ~(\E t \in read_transactions : t.pid = p)
            /\ NewReadTransaction(p)
            /\ UNCHANGED write_transactions
        \/ \E t \in write_transactions :
            /\ t.pid = p
            /\ Write(t)
            /\ UNCHANGED read_transactions
        \/ \E t \in  read_transactions :
            /\ t.pid = p
            /\ Read(t)
            /\ UNCHANGED write_transactions
        \/ \E t \in write_transactions :
            /\ t.pid = p
            /\ WriterCrash(p, t)

Next == 
    \/ /\ time < MAX_TIME
       /\ time' = time + 1
       /\ NextTransition
    (*Loop back to allow TLC to exhaust the state space.*)
    \/ /\ time = MAX_TIME
       /\ time' = 1
       /\ data' = [k \in KEYS |-> NullInt]
       /\ lock' = [k \in KEYS |-> NullInt]
       /\ write' = [k \in KEYS |-> NullInt]
       /\ write_transactions' = {}
       /\ read_transactions' = {}