TL;DR — I am just thisclose to crashing out
Part of a series
Previous article: Know your stuff
Next article: Next time I’m wearing a GoPro
We now have some level of confidence around the data that gets into our system. The next area of focus is on being able to retrieve that data efficiently. Our current implementation mainly caters for a single entity: we can retrieve the event stream for a single entity, apply new events and then commit the changes to our event log. This will quickly fall apart with more complex flows, we will need to be able to load the event for multiple entities, apply some changes to some of them and then commit the changes.
We’ll start with loading the event stream for multiple entities. The solution looks a lot like the one we had to load the stream for a single entity which was introduced in Standing on business bar one operator:
(defn load-by-entity-ids
"Load all events for entities by entity ids.
Events are ordered by the time occurred and their revision.
If snapshots are available starts from the snapshot."
[connectable entity-ids]
(let [query (-> {:with [[[:snapshots {:columns [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]}]
{:select [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]
:from :event-journal
:where [:and
[:in :entity-id entity-ids]
[:= :event-agent (:snapshot agents/system-agents)]]
:order-by [[:entity-id :asc] [:time-occurred :asc] [:revision :asc]]}]
[[:events {:columns [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]}]
{:select [:entity-id :revision :event-agent
:time-occurred :time-observed :event-data]
:from :event-journal
:where [:and
[:in :entity-id entity-ids]
[:> :revision [:coalesce
{:select [[[:max :revision]]]
:from :snapshots
**:where [:= :event-journal.entity-id :snapshots.entity-id]**}
0]]]
:order-by [[:entity-id :asc] [:time-occurred :asc] [:revision :asc]]}]]
:union [{:select [:*]
:from :snapshots}
{:select [:*]
:from :events
:order-by [[:entity-id :asc] [:time-occurred :asc] [:revision :asc]]}]}
(sql/format))
result (jdbc/execute! connectable query)]
result))
Now we can work on the second part: getting an aggregate that we can operate on. We can already do this using this form of aggregate:
(defn aggregate
([entity transformer {:keys [committed-events uncommitted-events] :as _opts}]
(when (or (and (some? committed-events) (not-empty committed-events))
(and (some? uncommitted-events) (not-empty uncommitted-events)))
(let [current-state (apply/aggregate transformer (into [] cat [committed-events uncommitted-events]))
schema (truss/have ((keyword entity) @schema-registry))]
(when (truss/have (partial s/valid? schema) current-state)
{:aggregate current-state :events committed-events :uncommitted-events uncommitted-events})))))
commit! already operates on an aggregate so there’s nothing to note or do.
Say our use case is that a user can create multiple applications for some document that they need, we need to be able to load all the applications for that user to show a list on the landing page:
(defn get-all-forms-by-user-uuid [user-uuid]
(let [user (get-user-by-uuid user-uuid)
document-uuids (get-document-ids user-uuid)
document-events (group-by :entity-id (store/load-by-entity-ids (:ds-opts @db) document-uuids))
documents (update-vals document-events #(entity/aggregate :document transformers/reducer {:committed-events %}))]
{:user (get-in user [:aggregate :uuid])
:documents (map :aggregate (vals documents)}))
In the example above, there are a few ways we can get all the document id’s associated with the user. One way is with a constraints table: every time we add a document we add an entry to the constraints table referencing the id of the document. We can’t do any database level checks to see if the document id is valid, it’s just a key value pair. A constraints table also allows us to do quick checks to see if a document is associated with a user without having to load all the events associated with the user.