Transactional Extensibility for Crux

Serialise updates, enforce integrity constraints and answer 'What if?' queries

Transactional Extensibility for Crux

Published
August 13, 2020
by
James Henderson
Categories

In the latest releases we added 'transaction functions' and 'speculative transactions' to Crux - a potent combination of features!

Transaction functions are user-supplied functions that run on the individual Crux nodes as a transaction is being processed. As Crux ingests only ever transactions in serial, directly based on the order of transactions on the transaction log, this is an ideal time (for example) to safely check the current database state before applying a transaction, for integrity checks, or to patch an entity.

Speculative transactions allow you to see what the results of your queries would be if a new transaction were to be applied, without persisting the changes or affecting other users of the database.

“What if” queries are a frequent requirement; they’re used to explore the effect of making certain changes without actually having to make (and subsequently unmake, possibly) the changes in question.

— SQL and Relational Theory
2nd Edition by C.J. Date

This capability is also valuable during REPL-based development and for expressing integrity constraints using Datalog.

For those who like to dive a bit deeper, we’ve also made it easier to write your own transaction consumer - more on this below.

Transaction Functions

In the most recent dev diary we shared some details on why and how we implemented transaction functions, but let’s recap on their day-to-day usage.

We first submit the transaction function to the database as we do any other document. Transaction function documents have a :crux.db/id to identify the function, and a :crux.tx/fn key containing the function body (currently on Clojure is supported).

A transaction function takes a 'context' parameter as its first argument, which you can use to obtain a database value using crux.api/db - we use it here to get the current version of the entity. A transaction function can also accept any number of additional parameters, and when invoked it is normally expected to return a vector of basic transaction operations which get indexed in the usual way. If the transaction function invocation returns false or throw an exception, the whole transaction will roll back.

Here’s an example of a transaction function that increments an entity’s :age key:

(crux/submit-tx node [[:crux.tx/put {:crux.db/id :increment-age
                                     ;; note that the function body is quoted.
                                     ;; and function calls are fully qualified
                                     :crux.db/fn '(fn [ctx eid]
                                                    (let [db (crux.api/db ctx)
                                                          entity (crux.api/entity db eid)]
                                                      [[:crux.tx/put (update entity :age inc)]]))}]])

If we didn’t use a transaction function for this, we might be tempted to run a normal query to get the entity’s age, then submit a transaction with the incremented age - but this would be vulnerable to a race condition if two processes both tried to change the same entity at the same time. A :crux.tx/match transaction operation could be used to atomically validate that the previous age value is unchanged, but that would not be an efficient or elegant solution for something as primitive as a counter. By using a transaction function here instead, we can ensure that each process will always read-then-write the latest version of the entity.

We may then invoke this transaction function in a later transaction using the :crux.tx/fn operation, passing it any expected parameters:

(crux/submit-tx node [[:crux.tx/put {:crux.db/id :ivan, :age 40}]])

;; `[[:crux.tx/fn <id-of-fn> <args*>]]`
(crux/submit-tx node [[:crux.tx/fn :increment-age :ivan]])

When invoked by the indexer, the transaction function will return a vector containing a single put operation, as specified by our function body. This vector is equivalent to what the indexer will see and subsequently go on to process as normal:

;; [[:crux.tx/put {:crux.db/id :ivan, :age 41}]]

After those three transactions have been indexed we should see that Ivan’s age has been incremented:

(crux/entity (crux/db node) :ivan)
;; => {:crux.db/id :ivan, :age 41}

Job done!

Another intriguing aspect is that the returned vector of operations could in turn include further transaction function invocations which will also get processed. This basic recursive property allows you to create rich hierarchies of transaction function logic that ultimately expands into a standard vector of basic operations.

Behind the scenes the 'argument documents' supplied to the invocation operations (arguments get stored as documents!) are replaced with the resulting operations the first time they are processed. This allows Crux to avoid needlessly re-executing transaction functions during subsequent processing of the transaction log.

Speculative Transactions

When you apply a speculative transaction a new database value is synchronously returned. You can then use this value tp make queries and entity requests as you would any normal database value. Only this local value observes the effects of your speculative transaction - neither the transaction or its effects are submitted to the cluster, and they’re not visible to any other database value in your application.

Spectulative transactions

We apply these transactions to a database value using with-tx:

(let [real-tx (crux/submit-tx node [[:crux.tx/put {:crux.db/id :ivan, :name "Ivan"}]])
      _ (crux/await-tx node real-tx)
      all-names '{:find [?name], :where [[?e :name ?name]]}
      db (crux/db node)]

  (crux/q db all-names) ; => #{["Ivan"]}

  (let [speculative-db (crux/with-tx db
                         [[:crux.tx/put {:crux.db/id :petr, :name "Petr"}]])]
    (crux/q speculative-db all-names) ; => #{["Petr"] ["Ivan"]}
    )

  ;; we haven't impacted the original db value, nor the node
  (crux/q db all-names) ; => #{["Ivan"]}
  (crux/q (crux/db node) all-names) ; => #{["Ivan"]}
  )

The 'making-of'

Behind the scenes, both of these features have benefited from refactoring how Crux indexes transactions. Even before these changes, to correctly index a transaction, we’ve needed to ensure that a later transaction operation sees the updated indices from earlier operations within the transaction - but also that these updates aren’t persisted before the transaction atomically commits. Indeed, transaction operations themselves need to make similar (albeit lower level) history requests of the indices, to ascertain the current state of the entities in question before making their updates.

For example, a put operation with a start and end valid-time specified looks at the current timeline of the entity between those times (using similar functions to those found on our public history API), and then uses this to assert a new timeline at the current transaction time. This both allows queries after the transaction to reflect the new timeline, whilst also allowing you to go back in transaction time and see the timeline before that update - the 'crux' of our bitemporality!

This internal history API, then, needed to reflect both the persisted state of the indices and the transient updates introduced by earlier transaction operations. It did this, essentially, by merging two indices (one on-disk, one in-memory) to make them look like one index - our indices are an ordered trie structures so this is a relatively straightforward operation. When a transaction is ready to commit, we write the changes accumulated within the in-memory indices to the on-disk indices.

We then realised that if we threw these changes away rather than applying them to the on-disk indices, this looks a lot like 'speculative transactions' - handy!

In refactoring this area, we’ve also managed to surface a few of the lower level primitives of Crux transaction consumption. The main primitives here are begin-tx, index-tx-events, commit and abort, called as follows:

(require '[crux.db :as db])

(let [tx (db/begin-tx tx-ingester {:crux.tx/tx-time ..., :crux.tx/tx-id ...})]
  (if (db/index-tx-events tx evts)
    (db/commit tx)
    (db/abort)))

This can then be incorporated into a transaction log implementation as required - for example, Kafka and JDBC both have something akin to this in a polling loop, standalone calls it directly from a single-threaded queue consumer - other suitable transaction logs may work in a more reactive style. For a complete example, have a look at the JDBC transaction log’s usage of the Crux polling consumer.

The existing transaction consumers are all now written in terms of these primitives - but, more importantly, this change opens up the possibility of new implementations from explorers willing to dive into the Crux internals. Indeed, if there’s demand from the community, we can look to stabilise this as a public API. As always, let us know - we’re happy to help out, and we’re certainly excited to see what you build!

Getting in touch

We can be contacted at crux@juxt.pro, or through our Zulip support channel. We also frequent the #crux channel on Clojurians' Slack.

Cheers!

The Crux Team.