TL;DR — A picture paints a thousand words

Part of a series

Previous article: Standing on business

Next article: Just eyeball it

What we have so far is good for unit tests, there are two other aspects of data access that we need to consider before we can use it in any meaningful way: efficiency and safety. A useful system is a messy system. For us this means that a useful system creates many many many many events. And many many many many events mean that using the system takes a long long long long long long time.

Conventional wisdom states that we should refrain from optimising before we have the numbers to back it up and I don’t have any numbers but there is a difference between shaving off a few milliseconds to make boxes taller and perceptible differences. We don’t usually need to invest time improving an area that is already otherwise performing fine but we do need to spend some time evaluating areas which considerably impact user experience.

General API response times taken from here. A majority of endpoints around 300ms or less and some around 500ms is fine most of the time

General API response times taken from here. A majority of endpoints around 300ms or less and some around 500ms is fine most of the time

Quick wins are always good though and in the case of event sourcing the easiest way to speed things up is to create a special event which captures the current state of the entity at that time. This event is known as a snapshot event. The next time we need to load the entity we no longer have to start at the first revision and work our way to the latest revision, we can just start at the last snapshot.

Breaking things down is always a good way to go about things, let’s break down what we need to do. First, we need to find the latest snapshot event for the entity. Then we need to load all events which occurred after the snapshot event. Finally, we return the snapshot event and all the events which occurred after in ascending order of revision. That wasn’t too bad. Improving load-by-entity-id to support this looks like (new changes are in bold):

(require '[honey.sql :as sql]
				 '[next.jdbc :as jdbc])

(defn load-by-entity-id
  "Load all events for an entity by its id.
   
   Events are ordered by the time the occurred and their revision.
   
   If snapshots are available starts from the snapshot."
  [connectable entity-id]
  (let [query (-> {:with [**[[:snapshots {:columns [:entity-id :revision :event-agent
                                                  :time-occurred :time-observed :event-data]}]
                           {:select [:entity-id :revision :event-agent
                                     :time-occurred :time-observed :event-data]
                            :from :event-journal
                            :where [:and
                                    [:= :entity-id entity-id]
                                    [:= :event-agent (:snapshot agents/system-agents)]]
                            :order-by [[:time-occurred :asc] [:revision :asc]]}]**
                          [[:events {:columns [:entity-id :revision :event-agent
                                               :time-occurred :time-observed :event-data]}]
                           {:select [:entity-id :revision :event-agent
                                     :time-occurred :time-observed :event-data]
                            :from :event-journal
                            :where [:and
                                    [:= :entity-id entity-id]
                                    **[:> :revision [:coalesce
                                                   {:select [[[:max :revision]]]
                                                    :from :snapshots}
                                                   0]]**]
                            :order-by [[:time-occurred :asc] [:revision :asc]]}]]
                   **:union [{:select [:*]
                            :from :snapshots}**
                           {:select [:*]
                            :from :events
                            :order-by [[:time-occurred :asc] [:revision :asc]]}]}
                  (sql/format))
        result (jdbc/execute! connectable query)]
    result))

That’s all there is to speed up fetching the events associated with an entity. To be able to load snapshot events we need to first create one and now I will show you how to fish: determine the current state of the entity and create a snapshot event with the current state as its data.

(require '[honey.sql :as sql]
				 '[next.jdbc :as jdbc]
				 '[taoensso.truss :as truss]
				 '[java-time.api :as jt]
				 '[clj-uuid :as uuid])

(def ->sql-value  #(vector (:event-agent %)
                           (str (or (:entity-id %) (uuid/v7)))
                           (or (:time-occurred %) (jt/instant))
                           (:time-observed %)
                           [:lift (:event-data %)]
                           (:revision %)))

(defn persist!
  "Persist a stream of events"
  [connectable events]
  (let [query! (-> {:insert-into [:event-journal]
                    :columns [:event-agent :entity-id :time-occurred :time-observed :event-data :revision]
                    :values (map ->sql-value events)
                    :returning :*}
                   (sql/format))
        result (jdbc/execute! connectable query!)]
    result))

(defn aggregate
  "Determine the current state of an entity from its events ordered by revision"
  [transformer events]
  (let [reduced (reduce transformer {} (sort-by :revision (truss/have valid-stream? events)))]
    (assoc reduced :revision (latest-revision events))))

(defn next-revision
  "Get the next revision from an aggregate"
  [aggregate]
  (inc' (or (:revision aggregate) 0)))

(defn snapshot!
   "Creates and persists a snapshot event of the current state of the entity.
   
   Snapshot events are valuable when there are many events for an entity. If a snapshot exists then it is the
   starting point when events are loaded"
   [connectable transformer events]
   (let [aggregate (aggregate transformer events)
         revision (next-revision aggregate)
         snapshot-event {:event-agent (:snapshot agents/system-agents)
                         :entity-id entity-id
                         :time-occurred (jt/instant)
                         :time-observed (jt/instant)
                         :event-data (dissoc aggregate :revision)
                         :revision revision}]
     (persist! connectable [snapshot-event])
     snapshot-event))

There’s a gotcha: creating a snapshot event requires us to determine the current state of the entity. To determine the current state we need to give form to what is formless. Version. Version. Version. I need to sort out a few bugs.