Event Sourcing
Guide to the eventSourcedGrain computation expression for CQRS event sourcing.
Event Sourcing
Section titled “Event Sourcing”Guide to the eventSourcedGrain { } computation expression.
What you’ll learn
Section titled “What you’ll learn”- How to define event-sourced grains with pure functions
- The CQRS pattern: commands in, events out, state by folding
- How to replay events and test with FsCheck
- Log consistency providers
Overview
Section titled “Overview”Event-sourced grains separate state changes into a sequence of events. The eventSourcedGrain { } CE builds an EventSourcedGrainDefinition<'State, 'Event, 'Command> with three key functions:
apply— a pure fold:state -> event -> statehandle— a command handler:state -> command -> event listdefaultState— the initial state before any events
The Orleans.FSharp.CodeGen package generates a C# JournaledGrain that delegates to these F# functions.
Installation
Section titled “Installation”dotnet add package Orleans.FSharp.EventSourcingBank Account Example
Section titled “Bank Account Example”Define the types
Section titled “Define the types”open Orleans.FSharp.EventSourcing
[<GenerateSerializer>]type BankAccountState = { Balance: decimal TransactionCount: int }
[<GenerateSerializer>]type BankAccountEvent = | [<Id(0u)>] Deposited of amount: decimal | [<Id(1u)>] Withdrawn of amount: decimal | [<Id(2u)>] TransferSent of amount: decimal * toAccount: string | [<Id(3u)>] TransferReceived of amount: decimal * fromAccount: string
[<GenerateSerializer>]type BankAccountCommand = | [<Id(0u)>] Deposit of amount: decimal | [<Id(1u)>] Withdraw of amount: decimal | [<Id(2u)>] Transfer of amount: decimal * toAccount: string | [<Id(3u)>] GetBalanceDefine the grain
Section titled “Define the grain”let bankAccount = eventSourcedGrain { defaultState { Balance = 0m; TransactionCount = 0 }
apply (fun state event -> match event with | Deposited amount -> { state with Balance = state.Balance + amount TransactionCount = state.TransactionCount + 1 } | Withdrawn amount -> { state with Balance = state.Balance - amount TransactionCount = state.TransactionCount + 1 } | TransferSent(amount, _) -> { state with Balance = state.Balance - amount TransactionCount = state.TransactionCount + 1 } | TransferReceived(amount, _) -> { state with Balance = state.Balance + amount TransactionCount = state.TransactionCount + 1 })
handle (fun state cmd -> match cmd with | Deposit amount when amount > 0m -> [ Deposited amount ] | Withdraw amount when amount > 0m && state.Balance >= amount -> [ Withdrawn amount ] | Transfer(amount, toAccount) when amount > 0m && state.Balance >= amount -> [ TransferSent(amount, toAccount) ] | GetBalance -> [] // No events -- this is a query | _ -> []) // Reject invalid commands silently
logConsistencyProvider "LogStorage" }Key points:
applymust be pure and deterministic. No side effects, no I/O. The same event applied to the same state always produces the same result.handlereturns an empty list to reject a command or signal a query (no state change).logConsistencyProvidernames the Orleans log consistency provider. Common values:"LogStorage"(log-based),"StateStorage"(state snapshot-based).
How it works
Section titled “How it works”Command --> handle(state, cmd) --> Event list | v apply(state, event) --> New State | v Events persisted to log- A command arrives at the grain.
- The
handlefunction produces zero or more events. - Each event is applied to the current state via
apply. - Events are persisted to the log consistency provider.
- On recovery, all events are replayed through
applyto rebuild state.
Replaying Events
Section titled “Replaying Events”Use EventSourcedGrainDefinition.foldEvents to replay event history:
let events = [ Deposited 100m Withdrawn 30m Deposited 50m]
let finalState = EventSourcedGrainDefinition.foldEvents bankAccount { Balance = 0m; TransactionCount = 0 } events
// finalState = { Balance = 120m; TransactionCount = 3 }Handling Commands Programmatically
Section titled “Handling Commands Programmatically”Use EventSourcedGrainDefinition.handleCommand to process a command and get both the events and the resulting state:
let currentState = { Balance = 100m; TransactionCount = 0 }let command = Withdraw 30m
let newState, events = EventSourcedGrainDefinition.handleCommand bankAccount currentState command
// newState = { Balance = 70m; TransactionCount = 1 }// events = [ Withdrawn 30m ]Testing with FsCheck
Section titled “Testing with FsCheck”Event-sourced grains are highly testable because apply and handle are pure functions:
open FsCheckopen FsCheck.Xunitopen Orleans.FSharp.Testing
let balanceInvariant state = state.Balance >= 0m
let applyCommand state cmd = let newState, _ = EventSourcedGrainDefinition.handleCommand bankAccount state cmd newState
[<Property>]let ``balance is never negative for any command sequence`` () = let arb = GrainArbitrary.forCommands<BankAccountCommand>() Prop.forAll arb (fun commands -> FsCheckHelpers.stateMachineProperty { Balance = 0m; TransactionCount = 0 } applyCommand balanceInvariant commands)You can also test applyEvent in isolation:
[<Property>]let ``deposits always increase balance`` (amount: decimal) = amount > 0m ==> lazy let state = { Balance = 50m; TransactionCount = 0 } let newState = EventSourcedGrainDefinition.applyEvent bankAccount state (Deposited amount) newState.Balance = state.Balance + amountEvent-Sourcing API Reference
Section titled “Event-Sourcing API Reference”The EventSourcedGrainDefinition module provides helper functions for testing and programmatic use:
| Function | Description |
|---|---|
foldEvents def state events | Replay a list of events through apply to rebuild state |
handleCommand def state cmd | Process a command: returns (newState, events) |
applyEvent def state event | Apply a single event to state |
// Handle a command programmaticallylet currentState = { Balance = 100m; TransactionCount = 0 }let newState, events = EventSourcedGrainDefinition.handleCommand bankAccount currentState (Withdraw 30m)// newState = { Balance = 70m; TransactionCount = 1 }// events = [ Withdrawn 30m ]
// Apply a single eventlet newState2 = EventSourcedGrainDefinition.applyEvent newState (Withdrawn 20m)// newState2 = { Balance = 50m; TransactionCount = 2 }
// Replay event historylet finalState = EventSourcedGrainDefinition.foldEvents bankAccount { Balance = 0m; TransactionCount = 0 } [ Deposited 100m; Withdrawn 30m; Deposited 50m ]// finalState = { Balance = 120m; TransactionCount = 3 }These are also used internally by the generated C# JournaledGrain class.
Log Consistency Providers
Section titled “Log Consistency Providers”Orleans provides several built-in log consistency providers:
| Provider | Description |
|---|---|
"LogStorage" | Events stored in a log; state rebuilt by replay |
"StateStorage" | Full state snapshot stored; events for recent changes |
If you omit logConsistencyProvider, the silo’s default provider is used.
Complete Example
Section titled “Complete Example”open Orleans.FSharp.EventSourcingopen Orleans.FSharp.Runtime
// Types[<GenerateSerializer>]type InventoryState = { Items: Map<string, int> }
[<GenerateSerializer>]type InventoryEvent = | [<Id(0u)>] ItemAdded of sku: string * qty: int | [<Id(1u)>] ItemRemoved of sku: string * qty: int
[<GenerateSerializer>]type InventoryCommand = | [<Id(0u)>] AddStock of sku: string * qty: int | [<Id(1u)>] RemoveStock of sku: string * qty: int | [<Id(2u)>] CheckStock of sku: string
// Grain definitionlet inventory = eventSourcedGrain { defaultState { Items = Map.empty }
apply (fun state event -> match event with | ItemAdded(sku, qty) -> let current = state.Items |> Map.tryFind sku |> Option.defaultValue 0 { state with Items = state.Items |> Map.add sku (current + qty) } | ItemRemoved(sku, qty) -> let current = state.Items |> Map.tryFind sku |> Option.defaultValue 0 let newQty = max 0 (current - qty) { state with Items = state.Items |> Map.add sku newQty })
handle (fun state cmd -> match cmd with | AddStock(sku, qty) when qty > 0 -> [ ItemAdded(sku, qty) ] | RemoveStock(sku, qty) when qty > 0 -> let available = state.Items |> Map.tryFind sku |> Option.defaultValue 0 if available >= qty then [ ItemRemoved(sku, qty) ] else [] | CheckStock _ -> [] | _ -> [])
logConsistencyProvider "LogStorage" }
// Silo configurationlet config = siloConfig { useLocalhostClustering addMemoryStorage "Default"}Next steps
Section titled “Next steps”- Grain Definition — standard
grain { }CE for non-event-sourced grains - Testing — property testing of event-sourced grains
- Advanced — transactions, state migration, and more