Streaming
Publish, subscribe, and consume streams.
Streaming
Section titled “Streaming”Guide to Orleans streaming with F#-idiomatic APIs.
What you’ll learn
Section titled “What you’ll learn”- How to publish events to streams
- How to subscribe with callbacks or pull-based TaskSeq
- How to use broadcast channels for fan-out
- How to rewind and resume stream consumption
- Implicit stream subscriptions on grains
Overview
Section titled “Overview”Orleans.FSharp wraps Orleans streams with typed StreamRef<'T> references and functional APIs in the Stream module. Broadcast channels get their own BroadcastChannel module.
Configure a stream provider in your silo:
open Orleans.FSharp.Runtime
let config = siloConfig { useLocalhostClustering addMemoryStorage "Default" addMemoryStreams "StreamProvider"}Publishing
Section titled “Publishing”Get a stream reference and publish events:
open Orleans.FSharp.Streaming
let streamProvider = client.GetStreamProvider("StreamProvider")
let stream = Stream.getStream<OrderEvent> streamProvider "orders" "us-east"
do! Stream.publish stream (OrderPlaced { OrderId = "123"; Total = 99.99m })do! Stream.publish stream (OrderShipped { OrderId = "123"; TrackingNumber = "ABC" })Stream.getStream is a purely local operation — it creates a reference without contacting the silo.
Subscribing (Push-based)
Section titled “Subscribing (Push-based)”Subscribe with a callback handler:
let! subscription = Stream.subscribe stream (fun event -> task { printfn "Received: %A" event })
// Later, unsubscribedo! Stream.unsubscribe subscriptionThe subscription is durable and persists beyond grain deactivation.
Consuming as TaskSeq (Pull-based)
Section titled “Consuming as TaskSeq (Pull-based)”Convert a stream to a TaskSeq<'T> for pull-based consumption with backpressure:
open FSharp.Control
let events = Stream.asTaskSeq stream
// Process events as they arrivefor event in events do processEvent eventInternally, asTaskSeq uses a bounded Channel with capacity 1000 and BoundedChannelFullMode.Wait for backpressure when the consumer falls behind.
Rewinding / Resuming
Section titled “Rewinding / Resuming”Subscribe from a specific sequence token to resume processing from a checkpoint:
let! subscription = Stream.subscribeFrom stream savedToken (fun event -> task { processEvent event // Save the token for future recovery })This is only supported by rewindable stream providers (e.g., Event Hubs).
Resuming Subscriptions After Reactivation
Section titled “Resuming Subscriptions After Reactivation”After a grain reactivates, existing durable subscriptions need new handlers:
do! Stream.resumeAll stream (fun event -> task { processEvent event })Listing Subscriptions
Section titled “Listing Subscriptions”Get all active subscriptions for a stream:
let! subscriptions = Stream.getSubscriptions stream
for sub in subscriptions do printfn "Active subscription"Broadcast Channels
Section titled “Broadcast Channels”Broadcast channels deliver messages to ALL subscriber grains (fan-out), unlike streams which target individual consumers.
let config = siloConfig { useLocalhostClustering addBroadcastChannel "Notifications"}Publishing
Section titled “Publishing”open Orleans.FSharp.BroadcastChannel
let provider = client.ServiceProvider.GetRequiredService<IBroadcastChannelProvider>()let channel = BroadcastChannel.getChannel<string> provider "alerts" "global"
do! BroadcastChannel.publish channel "System maintenance at midnight"Consuming
Section titled “Consuming”Broadcast channel consumers are grains that implement IOnBroadcastChannelSubscribed with the [ImplicitChannelSubscription] attribute. This is handled by the C# CodeGen.
Implicit Stream Subscriptions
Section titled “Implicit Stream Subscriptions”Use implicitStreamSubscription in the grain { } CE to auto-subscribe a grain to a stream namespace:
let orderProcessor = grain { defaultState { ProcessedCount = 0 } handle myHandler persist "Default"
implicitStreamSubscription "OrderEvents" (fun state event -> task { let orderEvent = event :?> OrderEvent return { state with ProcessedCount = state.ProcessedCount + 1 } }) }The grain is automatically subscribed when activated. Each grain ID receives events from the stream with the matching key.
Stream Providers
Section titled “Stream Providers”Event Hubs
Section titled “Event Hubs”open Orleans.FSharp.StreamProviders
let configFn = StreamProviders.addEventHubStreams "EventHub" connStr "my-hub"Azure Queue
Section titled “Azure Queue”let configFn = StreamProviders.addAzureQueueStreams "AzureQueue" connStrApply these to the ISiloBuilder directly or via addCustomStorage in the silo config.
Complete Example
Section titled “Complete Example”open Orleans.FSharp.Runtimeopen Orleans.FSharp.Streaming
// Configurelet config = siloConfig { useLocalhostClustering addMemoryStorage "Default" addMemoryStreams "Events" addBroadcastChannel "Alerts"}
// Publish from a grain handlerlet publisher = grain { defaultState () handleWithContext (fun ctx state msg -> task { let streamProvider = GrainContext.getService<IClusterClient> ctx |> fun c -> c.GetStreamProvider("Events") let stream = Stream.getStream<string> streamProvider "logs" "app" do! Stream.publish stream $"Event: {msg}" return (), box () }) }
// Subscribe from client codelet streamProvider = client.GetStreamProvider("Events")let stream = Stream.getStream<string> streamProvider "logs" "app"
let! sub = Stream.subscribe stream (fun msg -> task { printfn "Log: %s" msg })
// Pull-based consumptionlet events = Stream.asTaskSeq streamfor event in events do printfn "Pulled: %s" eventNext steps
Section titled “Next steps”- Grain Definition —
implicitStreamSubscriptionand other grain features - Silo Configuration — configure stream providers
- Event Sourcing — CQRS pattern with event streams