A backbone for message-driven Python applications.

Max Kossatz
5 min readSep 11, 2021

--

In the last couple of weeks, I developed a Python library designed to act as a software-level message broker, enabling a lightweight implementation of the publish-subscribe design pattern.

The Latin word apostolus, from the Ancient Greek ἀπόστολος, refers to a messenger. So I named it apos!

The library can be found on PyPi: https://pypi.org/project/apos/.
This post was published with apos v0.2.1.

Context

I am a huge fan of Hexagonal Architecture! Simply said, having a clean application core. A separate layer is used for anything that interacts with or provides access for an external entity, referred to as adapters. These adapters are an implementation of interfaces, referred to as ports, that are defined in the application core.

https://herbertograca.com/2017/11/16/explicit-architecture-01-ddd-hexagonal-onion-clean-cqrs-how-i-put-it-all-together/

apos does not depend on this architectural style, but it is inspired by it and it makes a lot of sense to use it in combination. Inspired by the Hexagonal Architectures' separation of concern via the decoupling of layers, this library takes it a step further.

apos was born to accomplish two objectives:

  • Decouple the application layer from any interface adapters
  • Develop reactive business functions

With apos, you can develop a message-driven application. This means that commands, events, and queries are sent to apos, which in return executes the functions that subscribe to these messages. This means that an adapter providing an external interface, may it be a web-API or a CLI, would not directly call application functions, but would rather send a message to apos, which will in return execute the business functions that subscribe to these messages. Equally, a business function would not call any other business function, but rather publishes an event, which other business functions can subscribe to and execute upon, controlled through apos.

Decoupling the application core

Using apos, an adapter implementation in the outer layer would not be aware of any business functions in the application core. This is the case because it would not call functions directly but rather publish a command, event, or query object to apos. apos will in return execute the functions that subscribe to these messages. This decouples the adapters almost completely from the application core. In practice, this means that most of the application core can be extended, refactored, or replaced without having to change any dependencies in any interface implementations. The adapters would still depend on the messages but not the implementation of how they are handled.

apos.publish_command(CreateUserCommand(user_name="Max"))
events: List[apos.IEvent] = apos.get_published_events()

The example above shows how an dapter implementation could use the apos to publish a command. apos will execute the Callable mapped to the name of the command Class. At the same time, it will record the event published by the executed business function, as well as any other events that are published by business functions that are executed as a result of subscribing to the prior events. These events can be retrieved from apos using the get_published_events method. It is up to the adapter what it wants to do with these events, but an example would be returning them in an HTTP response or publishing the messages to an external message broker like Kafka or RabbitMQ.

response: RetrievedUserResponse = apos.publish_query(
RetrieveUserQuery(user_name="Max"))

The example above shows that apos equally supports query messages. Publishing queries works like publishing commands, the only difference being that it will return the response object returned by the business function that subscribes to the query.

apos.subscribe_command(CreateUserCommand, create_user)
apos.subscribe_event(UserCreatedEvent, [email_user])
apos.subscribe_query(RetrieveUserQuery, retrieve_user)

The example above shows how apos is used to subscribe functions to message Classes. apos will record these subscriptions by mapping the Class names to the Callables. Be mindful of the following restrictions:

  • Commands and queries are mapped one-to-one, only allowing one subscription per command
  • Events are mapped one-to-many, allowing multiple subscriptions per event

Reactive Business Functions

Looking at a typical straightforward implementation of an application core, you will usually see business functions calling other business functions. This becomes funny when it's done across domain core aggregate boundaries. For example, let's say we have a recruitment platform, where deactivating a user additionally requires a withdrawal of the user's job applications and an alert email to be sent. In this case, we would likely have a business function for deactivating a user, which would call the business functions for withdrawing job applications and the function for sending an alert email. With this, we coupled business functions of separate aggregates and contexts. With large applications that separate code by model aggregates and bounded contexts, it is not viable to couple these using direct function calls. The better way of doing this would be by having reactive business functions, where the completion of one function would emit an event, which other business functions can subscribe to aka react to. Coming back to the example, withdrawing the job applications and sending an alert email would be business functions that happen as a reaction to the user being deactivated. This way of developing reactive applications increases maintainability and extendability because any business function can be added, removed, or refactored independent of any other function, as long as the events are defined.

On a side note, using this pub-sub pattern is great when you cannot or don't want to go down the microservices path to separate bounded contexts. You can separate your application logic into packages by context and connect them using messages, the same way you would connect microservices via messages over an external message broker.

apos.publish_event(
UserDeactivatedEvent(user_name="Max"))

The example above shows how an event can be published. In this case, the business function for deactivating a user would publish the UserDeactivatedEvent upon completion.

apos.subscribe_event(
UserDeactivatedEvent, [withdraw_job_applications])

The example above shows how apos can be used to subscribe a business function to an event. In this case, the business function for withdrawing job applications subscribes to the UserDeactivatedEvent. This means that if the user deactivation the business function in the earlier example completes by publishing a UserDeactivatedEvent, apos would react by executing the business function for withdrawing the job applications. Upon calling the function, the event would be passed as an object as a parameter.

Examples

You can find simple examples in the examples directory of the project's repository. As of when this post was written, I created one very simple example to make it easier to get started. https://github.com/mkossatz/apos/tree/main/examples

Thank you a lot for reading my post! I hope I was able to excite or intrigue you at least a little.

--

--