Message Bus¶
A convenient method of sending messages to other processes is via a message bus; one can be contructed quite easily using gproc in Erlang and for ease a package erl-simplebus does just that.
Defining a bus¶
A bus is just a ‘name’ with a phantom type associated with it onto which messages of that type can be placed and received by multiple listeners
data BusMessage = StreamStarted String
| Data Binary
| Eof
bus :: SimpleBus.Bus String BusMessage
bus = SimpleBus.bus "file_reading_bus"
In the above example, we define a bus called file_reading_bus, which will be capable of distributing messages of BusMessage, The convention is that a module wishing to expose a bus will just export it via its module definition. By keeping the constructor for the ADT private, only the owner of the bus will be able to place messages on it.
module StreamReader ( bus
, BusMessage
) where
To place a message onto the bus, the module that ‘owns’ the bus need only call send, passing in the bus involved and a constructed message.
_ <- SimpleBus.send bus $ StreamStarted "stargate.ts"
_ <- SimpleBus.send bus Eof
Subscribing to a bus¶
From another process or module, we only need call subscribe, passing in the bus and a callback to receive the messages. In a Genserver, this would look like this
data Msg = Tick
| DoSomething String
| StreamMessage StreamReader.BusMessage
init :: InitFn Unit Unit Msg State
init = do
self <- self
_ <- liftEffect $ SimpleBus.subscribe StreamReader.bus $ send self <<< StreamMessage
pure $ InitOk {}
handleInfo :: InfoFn Unit Unit Msg State
handleInfo msg state = do
case msg of
Tick ->
handleTick state
DoSomething what ->
handleSomething what state
ReaderMessage msg ->
handleReaderMessage msg state
We can see clearly here the pattern of lifting an external module’s message into our own type so we can handle it in our handleInfo dispatch loop.
Unsubscribing from a bus¶
It’s actually rare that we’ll ever unsubscribe from a bus; most of the time we’ll subscribe on a process startup and then allow the subscription to be automatically cleaned up on process termination.
However, it’s worth pointing out that SimpleBus.subscribe actually returns a reference of type SubscriptionRef which we can stash in our process state for use later on.
init :: InitFn Unit Unit Msg State
init = do
self <- self
busRef <- liftEffect $ SimpleBus.subscribe StreamReader.bus $ send self <<< StreamMessage
pure $ InitOk { busRef: Just busRef }
unsubscribe :: State -> Effect State
unsubscribe s@{ busRef: Nothing } = pure s
unsubscribe s@( busRef: Just ref } = do
void SimpleBus.unsubscribe ref
pure s { busRef = Nothing }
When to use a bus¶
A bus is an extremely lazy way of sending messages about the place and care must be taken not to overuse them in complicated orchestration scenarios. In general they’re really good for distributing events to multiple subscribers to let them know something has already happened and not commands that tell things to happen.