TL;DR — Ezpz lemon sqzy
Part of a series
Previous article: I gotta see the goods
Next article: Time is money
Calculating the current state of an entity is straightforward: we find all the events associated with an entity and reduce it down to a single result. What’s left open is for the application to decide is what reducing down to a single result means, this is also what makes event sourcing such a powerful data access pattern.
While a document storage system like MongoDB does not have an explicit schema, there is still an implicit schema which is being followed. We can’t just dump documents of any shape into a collection and call it a day. We can version documents so that we can handle schema changes or we could also use a tool like https://github.com/DoubleCiti/mongodb-migrations to migrate existing data. The first option is the same as versioning events, we support both paths when reading but we write new documents according to the latest schema. The tricky part which is potentially complex is updating older documents when the schema has changed. The second option requires us to migrate all existing documents to conform to the new schema and is riskier. Compare that to event sourcing where we can decide at runtime the shape which makes the most sense for our use case without being bogged down in considerations of risk and data integrity, all we have to do is specify a new mapping to support the new version of an event.
The thing to watch out for is that events have to be ordered in ascending order of their revision, consider:
UserCreated (revision: 1)
johndoeUserUpdated (revision: 2)
testIt should be pretty obvious what happens to the final result if the events were not ordered correctly, the final result would be incorrect because older changes have overwritten newer changes. We will also not hear about it until we get yelled at.
A basic implementation to determine the current state of entity looks like this:
(require '[honey.sql :as sql]
'[next.jdbc :as jdbc]
'[taoensso.truss :as truss])
(defn load-by-entity-id
"Load all events for an entity by its id.
Events are ordered by the time the occurred and their revision."
[connectable entity-id]
(let [query (-> {:with [[[:events {:columns [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]}]
{:select [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]
:from :event-journal
:where [:= :entity-id entity-id]}]]
:select [:*]
:from :events
:order-by [[:time-occurred :asc] [:revision :asc]]}
(sql/format))
result (jdbc/execute! connectable query)]
result))
(defn latest-revision
"Get the latest revision from a stream of unordered events"
[events]
(:revision (apply max-key :revision (truss/have seq events))))
(defn valid-stream?
"A stream of events is valid if:
- The latest revision in the stream is equal to the number of events
- The revision of each event increases monotonically in steps of 1"
[events]
(and (= (latest-revision (truss/have seq events)) (count (truss/have seq events)))
(= (map :revision (sort-by :revision (truss/have seq events)))
(range (:revision (apply min-key :revision (truss/have seq events))) (inc (count (truss/have seq events))) 1))))
(defn aggregate
"Determine the current state of an entity from its events ordered by revision"
[transformer events]
(let [reduced (reduce transformer {} (sort-by :revision (truss/have valid-stream? events)))]
(assoc reduced :revision (latest-revision events))))
Most transformer implementations are roughly going to look something like this: we map the event data to an API model and deep merge it into the existing result that we have. There are usually 3 different mappings: database model → what we send to other modules (api model) → what we send over the wire (contract). My personal preference is to collapse it down to 1 when I can but it depends on the use case.
(defn transformer
([] {})
([acc] acc)
([acc {{:keys [type _version]} :event-data :as event}]
(cond
(= (:user-created event-types) type) (->> (api<-event-user-created event)
(deep-merge acc))
(= (:user-updated event-types) type) (->> (api<-event-user-updated event)
(deep-merge acc))
:else acc)))