Tags

The bunny_farm library now includes two queue-based implementations of generic server behaviours. They are named gen_qserver and gen_qfsm, which should be self explanatory. By using these behaviours, all queue semantics are built-in and hidden from view. This means that module developers can focus on building modules and not worrying about details related to bus communication.

Messages sent over the bus behave like erlang messages with both asynchronous and synchronous semantics. The connections to the message queue are configured when the process is started with an additional parameter passed to start_link and init.

Connections

Exchanges used for publishing only need an exchange defined as either

  • <<“exchange”>>
  • “exchange”

The string form will be converted to a binary string, but the handle (for retrieving the connection later) will remain as a string.

For consuming messages, both an exchange and a routing key must be provided. When the server starts, it will automatically consume messages for these connections and deliver messages using the callbacks defined below.

  • {<<“exchange”>>, <<“routing.key”>>}
  • {“exchange”, “routing.key”}

The RabbitMQ connection specs are then passed to gen_qserver:start_link.

start_link() ->
  ConnSpecs = [ <<"pub.1.exch">>, <<"pub.2.exch">>, {<<"sub.1.exch">>, <<"key.1">>} ],
  gen_qserver:start_link(?MODULE, [], [], ConnSpecs).

The init/2 callback has an additional argument that represents the pid for the cache process, which maintains the queue connections. These can be retrieved in code as

Bus = qcache:get_bus(CachePid, <<"pub.1.exch">>),
bunny_farm:publish("my message", <<"route.2">>, Bus),

Obviously to use the CachePid, it needs to be saved in the implementation’s state. If a server only receives messages, then this argument can be ignored.

gen_qserver

These servers dispatch to handle_cast/2 and handle_cast/3 as shown below.

  • publish -> handle_cast/2
  • RPC -> handle_call/3

gen_qfsm

These servers dispatch to Module:StateName/2 and Module:StateName/3 as shown below.

  • publish -> Module:StateName/2
  • RPC -> Module:StateName/3

Messages from the queue are wrapped in a tuple so that pattern matching can be done on the routing key. The format is: { RoutingKey, Payload }, where Payload is equivalent to the standard message body that gets sent to the callback. Hence if you don’t about the special routing, all messages can be passed through via an implementation like this:

handle_cast({<<_B/binary>>, Payload}, State) ->
handle_cast(Payload, State);

More examples will follow.