Get hands-on training for JIRA Software, Confluence, and more at Atlassian Summit Europe. Register now ›

In the Engineering Services team at Atlassian, we’re busily building out a microservice-based architecture for our applications. This is a massive change for us, and it is imperative that our changes are ’safe’, i.e. we prove as much as possible that we cannot inadvertently destroy data, and we can recover from any data issues that we do encounter. This led us to implement an event sourcing model for our entity store in Scala, where we store full history of changes to entities so that we can recover to any point in time. We were able to build this on top of a highly available and scalable key-value store, Amazon’s DynamoDB, that has a basic API compared to a relational database.

Aside: What are we building?

We’re on a long journey re-architecting our applications to be a composition of stateless microservices; small pieces of functionality that we can compose together to create highly available and horizontally scalable applications. In terms of technology stack, we’re building our microservices in Scala using Functional Programming principles to reap the safety and productivity of immutability. Our microservices will be running in AWS infrastructure.

What is event sourcing?

Rather than storing the view of an entity that can be inserted, updated or deleted, event sourcing involves storing the sequence of events that produce the current view of the entity. An event represents an operation or transformation from one view of the entity to another. Events are therefore tied to a starting view or snapshot, thus making any transformation idempotent. While events may just be ‘sets’ or ‘deltas’, the normal principle would be to relate events back to the business domain (though I won’t get into the whole domain-driven design argument).

To get to the current view of an entity, one would only need to read in all events for that particular entity and then replay the transformations in order.

The concept actually isn’t new; databases with their transaction logs have been doing it for years, and if you believe accountants they’ve been doing it even longer with double-entry bookkeeping. Now that space is (relatively) plentiful, we can afford to implement the concept at an entity level, and store events instead of just current snapshots of entities.

Why is event sourcing a good idea?

Event sourcing separates the storage of entities and querying of entities, and fits nicely into concepts like CQRS (command query responsibility segregation). This means that for a given store of events for an entity, any number of consumers can produce their own custom view of the entity most suitable for their application simply by providing their own way of interpreting the transformations upon replay (which is the reason by business-domain driven events are better than data-driven events).

In the simplest case, and what we’re using for our project, event sourcing gives us the ability to compute the view of an entity at any given point in time. This is great if we ever need to recover data in case an entity was ever inadvertently deleted or modified. Also, we get a full audit trail of changes to entities for free.

Show me the code!

The code in this blog is in Scala, and is accessible at https://bitbucket.org/sshek/scalasyd-scalaz-stream/. The interesting files are [cci lang=”scala”]EventSource.scala[/cci] and [cci lang=”scala”]MappingEventSource.scala[/cci] in the [cci lang=”scala”]eventsource[/cci] package.

Let’s consider a simple entity – a key-value mapping, with the value containing most of an entity’s information. To start with, we have an [cci lang=”scala”]EventSource[K, V][/cci] trait that provides an interface to an event source for a [cci lang=”scala”]K[/cci] key to [cci lang=”scala”]V[/cci] value mapping. An [cci lang=”scala”]EventSource[/cci] has the following types:

  • [cci lang=”scala”]Event[/cci] – wraps up an event with a unique [cci lang=”scala”]EventId[/cci] identifier (the [cci lang=”scala”]K[/cci] key, and a [cci lang=”scala”]Sequence[/cci] that is an incrementing number). An event is a [cci lang=”scala”]Transform[V][/cci], which for our example just includes ‘inserting’ or ‘deleting’ a mapping but could also store some delta. Strictly speaking we could introduce a [cci lang=”scala”]Commit[/cci] type that wraps up a series of events, but for our case a single event is sufficient.
  • [cci lang=”scala”]Snapshot[/cci] – represents a view of a value [cci lang=”scala”]V[/cci] at an [cci lang=”scala”]EventId[/cci].

Representing storage of Events

Events are stored and retrieved from implementations of [cci lang=”scala”]Storage[F][/cci]:
[cc lang=”scala” line_numbers=”on” lines=”25″]
trait Storage[F[_]] {
def M: Monad[F]
def C: Catchable[F]

/**
* Retrieve a stream of events from the underlying data store.
* This stream should take care of pagination and cleanup of
* any underlying resources (e.g. closing connections if required).
* @param key The key
* @return Stream of events.
*/
def get(key: K): Process[F, Event]

/**
* Save the given event.
* @param event The event to save.
* @return Either an EventSourceError or the event that was saved.
* Other non-specific errors should be available through the container F.
*/
def put(event: Event): F[EventSourceError \/ Event]


}
[/cc]

Essentially a [cci]Storage[/cci] implementation needs to provide a [cci]get[/cci] that returns a ([cci]scalaz-stream[/cci]) ‘stream of [cci]Events[/cci]’ for a given key, and a [cci]put[/cci] for saving an event, which may return an error (primarily when a concurrent save with the same [cci]EventId[/cci] is attempted). Generation of [cci]Events[/cci] and [cci]EventId[/cci]s are handled elsewhere (we’ll get to it in a minute). The [cci]F[/cci] type parameter is just some container for wrapping operations on the underlying data store; it needs to be a [cci]Monad[/cci] for sequencing, and [cci]Catchable[/cci] to deal with errors in underlying calls to the data store. Typically [cci]F[/cci] would be a [cci]scalaz.concurrent.Task[/cci] which works nicely with [cci]scalaz-stream[/cci].

[cci]scalaz-stream[/cci] for the win!

The really cool thing about [cci lang=”scala”]Storage[/cci] is that a series of [cci lang=”scala”]Events[/cci] for an entity is just a [cci]scalaz-stream[/cci], which provides a nice abstraction around streams from any source, minimising in-memory buffering and lots of cool features and examples that the [cci]scalaz-stream[/cci] team provide for creating and processing streams. For example, the [cci]applyEvents[/cci] function in [cci]Storage[/cci] replays a stream of events to generate a [cci]Snapshot[/cci] is simply:
[cc lang=”scala” line_numbers=”on”]
def applyEvents(events: Process[F, Commit]): F[Snapshot] =
events.pipe {
process1.fold(Snapshot.zero)(Snapshot.update)
}.runLastOr(Snapshot.zero)(M, C)
[/cc]
Describing [cci]scalaz-stream[/cci] is worthy of multiple posts, but the crux of [cci]applyEvents[/cci] is:

  • [cci lang=”scala”]process1.fold[/cci] produces a ‘single input transducer’. Basically when the transducer sees an input, it can produce zero or more outputs. In our case, our transducer takes an input of [cci lang=”scala”]Event[/cci], and produces exactly one output of [cci lang=”scala”]Snapshot[/cci]. [cci lang=”scala”]fold[/cci] is what you would expect from a traversable fold; given a starting accumulator (in our case [cci lang=”scala”]Snapshot.zero[/cci] that returns an ‘empty’ entity), run the function [cci lang=”scala”]Snapshot.update[/cci] over each value in the stream, ‘adding’ each value to the accumulator.
  • [cci lang=”scala”]pipe[/cci] runs the stream of events through the transducer.
  • [cci lang=”scala”]runLastOr[/cci] ‘runs’ the stream, basically turning the stream into an [cci lang=”scala”]F[/cci] that can be ‘run’ to generate the [cci lang=”scala”]Snapshot[/cci] resulting from accumulating all the events. We use [cci lang=”scala”]runLastOr[/cci] to return the ‘empty’ snapshot in case there are no events. The [cci lang=”scala”]M[/cci] and [cci lang=”scala”]C[/cci] references are just explicit references to the [cci lang=”scala”]Monad[/cci] and [cci lang=”scala”]Catchable[/cci] instances that the stream functions require.

The nice thing about [cci lang=”scala”]applyCommits[/cci] is that it doesn’t care about how the stream of events are obtained, just that it is a stream of events. Importantly, it leaves implementation details like pagination to the implementor of the source of events (i.e. [cci lang=”scala”]Storage[/cci]). We’ll show an example of this when we get to the Dynamo implementation of it.

The API for consumers

To wrap up [cci lang=”scala”]EventSource[/cci], the consumer accesses it through an implementation of [cci lang=”scala”]API[F][/cci]:
[cc lang=”scala” line_numbers=”on” lines=”40″]
/**
* This is the main interface for consumers of the Event source.
* Implementation will contain logic to create a transform given a value to
* save. Upon construction of an API, a suitable Storage store needs to be
* provided.
*
* @tparam F Container type for API operations. It needs to be a Monad and
* a Catchable (e.g. scalaz Task)
*/
trait API[F[_]] {
def M: Monad[F]
def C: Catchable[F]

/**
* @return Underlying store of events
*/
def store: Storage[F]

/**
* Create a suitable transform from ‘old’ to ‘newValue’.
* @param old The old value
* @param newValue The new value
* @return The suitable transform.
*/
def createTransform(old: Option[V], newValue: Option[V]): Transform[V]

/**
* Return the current view of the data for key ‘key’
* @param key the key
* @return current view of the data
*/
def get(key: K): F[Option[V]] =
getAtUsing(key, _ => true)

/**
* Return the view of the data for the key ‘key’ at the specified
* sequence number.
* @param key the key
* @param seq the sequence number of the event at which we want
* the see the view of the data.
* @return view of the data at event with sequence ‘seq’
*/
def getAt(key: K, seq: Sequence): F[Option[V]] =
getAtUsing(key, { _.id.sequence.seq <= seq.seq })

/**
* Return the view of the data for the key ‘key’ at the specified timestamp.
* @param key The key
* @param time The timestamp at which we want to see the view of the data
* @return view of the data with events up to the given time stamp.
*/
def getAt(key: K, time: DateTime): F[Option[V]] = {
import com.github.nscala_time.time.Implicits._
getAtUsing(key, { _.time <= time })
}

/**
* Save the given value for the given key.
* @param key The key
* @param value The value to save
* @return The previous value if there was one.
*/
def save(key: K, value: Option[V]): F[Option[V]] =
saveUsing(store)(key, value)

}
[/cc]
Consumers of the API can save or get values of type [cci lang=”scala”]V[/cci] for a given key [cci lang=”scala”]K[/cci]. They don’t have to handle anything to do with events (or [cci lang=”scala”]scalaz-stream[/cci] for that matter).

Implementations of API only need to provide [cci lang=”scala”]createTransform[/cci] (a way of generating an event or [cci lang=”scala”]Transform[/cci] from an existing view of a value to the new view that the consumer wants to publish to others i.e. save), and a [cci lang=”scala”]Storage[/cci] implementation.

Replaying events to get values – [cci lang=”scala”]scalaz-stream[/cci] for the win again!

There are several ‘get’ functions provided: [cci lang=”scala”]get[/cci] current value, [cci lang=”scala”]getAt[/cci] event Sequence, and [cci lang=”scala”]getAt[/cci] a timestamp. These all reference [cci lang=”scala”]getAtUsing[/cci]:
[cc lang=”scala” line_numbers=”on”]
/**
* All a ‘get’ is doing is taking events up to a condition
* (e.g. sequence number or a date) and then applying
* them in order. This is quite trivial using something like Scalaz Stream.
* @param key The key for which to retrieve events.
* @param cond Conditional function for filtering events.
* @return View of the data obtained from applying all events in the stream
* up until the given condition is not met.
*/
private[store] def getAtUsing(key: K, cond: Event =>; Boolean): F[Option[V]] =
M.apply(store.applyEvents(store.get(key).takeWhile(cond)))
{ s: Snapshot => s.value }
[/cc]
A [cci lang=”scala”]scalaz-stream Process[/cci] provides a [cci lang=”scala”]takeWhile(predicate)[/cci] function that produces another stream that takes elements from the stream while the predicate is true. So in each of the [cci lang=”scala”]getAt[/cci] cases, we can stop at either the appropriate sequence or timestamp for an event, and then pass it to [cci lang=”scala”]Storage.applyEvents[/cci] to get a [cci lang=”scala”]Snapshot[/cci].

No sign of any pagination logic, buffering, or caring about how the stream is stored or retrieved. How easy is that!

Saving events

To save an event, we basically need to get the current view of the value, create a suitable transform, create a new [cci lang=”scala”]Event[/cci] with that transform and incremented [cci lang=”scala”]EventId[/cci], and then save it provided no one else has come in in front of us (i.e. conditional put). If there was a collision, then retrying should eventually solve the problem:
[cc lang=”scala” line_numbers=”on” lines=”25″]
/**
* To save a new value, we need to get the latest snapshot in order to get the
* existing view of data and the latest event Id. Then we create a suitable
* transform and event and try to save it. Upon duplicate events,
* try the operation again (highly unlikely that this situation would occur).
* @param key The key
* @param value The value to store
* @return The previous view of the data.
*/
private[store] def saveUsing(key: K, value: Option[V]): F[Option[V]] = {
import scalaz.syntax.monad._
implicit def MonadF = M
for {
latestSnapshot newCommit = Event.next(key, latestSnapshot,
createTransform(latestSnapshot.value, value))
putResult lastValue case -\/(EventSourceError.DuplicateCommit) => save(key, value)
case \/-(c) => latestSnapshot.value.point[F]
}
} yield lastValue
}
[/cc]

There’s probably value in making the recursive call trampoline, but given its unlikely to happen more than once for a record it shouldn’t be a big issue.

DynamoDB implementation – pagination, what pagination?

The DynamoDB implementation can be found in [cci lang=”scala”]MappingEventSource[/cci], most of which is DynamoDB-specific rigamarole to get mappings between the Scala data types to DynamoDB columns. The really nice function is the get that handles pagination, shown below using Task as the concrete implementation of F just to highlight the important parts:
[cc lang=”scala” line_numbers=”on” lines=”30″]
/**
* To return a stream of events from Dynamo, we first need to execute a query,
* then emit results, and then optionally recursively execute the next query
* until there is nothing more to query.
*
* @param key The key
* @return Stream of events.
*/
override def get(key: MappedKey): Process[Task, Event] = {
val query = Query.forHash(key)
import Process._
def requestPage(q: Query[Event]): Task[Page[Event]] = Task.suspend {
DynamoDB.query(q).run(client)
}

def loop(pt: Task[Page[Event]]): Process[Task, Event] =
await(pt) { page =>
emitAll(page.result) ++ {
page.next match {
case None => halt
case Some(nextQuery) => loop(requestPage(nextQuery))
}
}
}
loop(requestPage(query))
}
[/cc]

Normally when running a query against DynamoDB, you get a list of result records (up to 1MB) and a [cci lang=”scala”]lastEvaluatedKey[/cci] that you can use to re-run the query to get the next batch. The result of a query is represented by [cci lang=”scala”]Page[/cci].

To generate a [cci lang=”scala”]scalaz-stream[/cci] of [cci lang=”scala”]Events[/cci], we basically want to run a query, emit all the [cci lang=”scala”]Events[/cci] from the query result, and then when we get asked for another [cci lang=”scala”]Event[/cci] beyond the current page, run a new query to get the next page of results.

Execution of the query is done by [cci lang=”scala”]requestPage[/cci] (strictly speaking [cci lang=”scala”]requestPage[/cci] provides a [cci lang=”scala”]Task[/cci] that when run will execute the query).

The loop of running the query, emitting, waiting and then running the next query is handled by the loop function. loop generates a [cci]scalaz-stream[/cci] ‘in waiting’ (i.e. it doesn’t eagerly load anything from the data store). We use [cci]scalaz-stream[/cci]’s [cci lang=”scala”]Process.await[/cci] to create a stream given:

  • A value generator ([cci lang=”scala”]Task[Page[Event]][/cci]) – The generator is run when there is a request on the stream for some values, and there aren’t any existing values in memory. The generator then generates some interim value, in our case a [cci lang=”scala”]Page[Event][/cci], or a page of events.
  • A value processor – Given the interim value, the value processor generates a suitable stream of actual event values via the [cci lang=”scala”]Process.emitAll[/cci] function. After all the events have been emitted, we may need to prepare to get the next page of events from the data store. A page has an optional query (next) to run to get the next page, so if there is a next query to run, we can generate another ‘waiting’ stream by calling loop again otherwise ‘halt’ is returned to signify the end of the stream of events. Note that loop appears recursive, but actually isn’t because it returns immediately with a ‘waiting’ stream; the next call to loop is run by the function that processes the entire stream.

Lots of pagination code completely hidden away from the caller of get! Also, if we did have to close down the database connection (DynamoDB doesn’t need this, but other implementations may), we can pass through a suitable ‘cleanup’ function to [cci lang=”scala”]Process.await[/cci] to be run whenever the stream is finished, either because there are no more events to emit, or the caller of get no longer needs the stream.

What about Cassandra? or Riak?

The nice thing about [cci lang=”scala”]EventSource[/cci] is that it doesn’t require anything from the underlying data store except for a conditional put or a way to deterministically resolve conflicts for colliding writes. We can wrap up implementation details, such as how gets and queries work with or without pagination, in the Storage implementation. This means we can replace our DynamoDB implementation with another store such as Cassandra or Riak.

NB If we were to use something like Cassandra or Riak, we perhaps could optimise [cci lang=”scala”]EventSource.API.saveUsing[/cci] to make use of CRDT/CRDT-like functionality provided by these stores, but that is for another day…

Performance considerations

Reading in all events and replaying them for each read (and write) sounds slow, and it can be in general, especially if retrieving all events requires multiple round trips to the data store.

For our particular use case, we’re not expecting a huge number of updates and with DynamoDB we can retrieve a bunch of events with a single query, so we’re very likely to need just one round trip to calculate a snapshot. We still need a query before a write, so that does increase the required read capacity units and latency slightly, but since the operations on DynamoDB are from our microservice running in AWS the actual impact that we’ve measured is not too bad. It’s a price to pay for having a log of all entity changes.

In the general case, we may instead want to store snapshots in addition to events. Instead of having to retrieve all events for every entity read, we would retrieve the latest snapshot of the entity and the events after that snapshot, thus significantly reducing the number of data store reads required. What’s cool is that:

  • We already have a model of snapshots in our code; we just need to expose a way of saving/retrieving them much like events.
  • Snapshots can be written asynchronously (e.g. by a scheduled task), and hence do not need to impact normal read/write operations. We will still generate the correct view of an entity even with an old snapshot; it will just required reading more events from the data store.
  • We don’t even need to store more than 1 snapshot, meaning snapshot retrieval is just a simple get instead of a query.

Summary

Event sourcing, where all changes to entities are recorded instead of continually updating an entity, is a great model for storing data in today’s world of abundant storage. In our use case, it gives us the ability to replay transactions to recover state at a specific point in time, and for more sophisticated entities it would be possible to re-interpret events to ‘query’ data in ways beyond the original data model design. It also gives as history, or audit trail of changes, for free. In this post, we described how easy it is to implement an event sourcing system on top of a key-value store with a basic API like DynamoDB. This is yet another example of immutability in action and its benefits.

[cci]scalaz-stream[/cci] also gave us a great abstraction over accessing events from the data store so that consumers don’t need to worry about overhead such as resource management and pagination. We’re using [cci]scalaz-stream[/cci] in a few other places, and while it is early days for the library, we’ve found it quite reliable and pleasant to use. It is definitely something to keep an eye on.

Fresh ideas, announcements, and inspiration for your team, delivered weekly.

Subscribe now

Fresh ideas, announcements, and inspiration for your team, delivered weekly.

Subscribe now