System Architecture: Messaging Business Events
Originally published on April 14, 2017 at https://www.pluralsight.com/tech-blog/system-architecture--messaging-business-events
When we started on our journey towards bounded contexts, we wanted to maintain the small-team, focused, feel that we had enjoyed with as a single team supporting a monolith. One strategy we adopted was to limit our dependencies between the teams and, by extension, the different bounded contexts. We did not want to introduce temporal runtime coupling between components developed by different teams because that would reduce the autonomy of these teams and limit our ability to develop the products that we wanted to develop in the ways we wanted to develop them.
For example, imagine one team is developing an API and they have a goal of a 50 millisecond response time for 98% of requests. If they have to take a runtime dependency on another team that has 20 millisecond response time for 90% of requests but a 200 millisecond average response time for the remaining requests. The first team has no hope of reasonably accomplishing their goal on their own. They may be able to put some technical work on the backlog of the team that provided the original API.
In order to avoid the problem of cross-context coupling and limit the possibility of partial outages turning into cascading failures as we disaggregated our monolith into independent bounded contexts, we chose an asynchronous-first model of integration. At that point, we had a small team with limited experience with message-based architectures, so we didn’t set very many rules around how this should be accomplished. What we knew at the time was that we needed a common messaging infrastructure, naming is really important, messages should be idempotent, and that we would have to adapt as we gained experience.
We chose RabbitMQ as our message broker. A couple of our team members had experience running it in production and it seemed like a reasonable choice for us based on our criteria:
We did not make any rules around what shape messages should be or how they should be used.
We name all of our business events/messages/publisher exchanges as follows:
When a breaking change is required, that team will publish the new message to a new exchange while continuing to publish the old message. This dual-publishing phase allows the consumers to migrate from old to new without serious impact. We expect that each consumer of a message will be a good citizen in the system and migrate to new messages in a timely fashion. When there are no more consumers of the old message, we stop publishing it and clean up the code and infrastructure. No team should ever publish 3 versions of the same message at the same time.
When a message is net new to the system, it can be flagged as a v0. These messages are considered to be draft messages and can evolve rapidly without regard for any impact on consumers. When the publisher and the subscribers have reached an agreement on the shape of the message, we increment the version to v1 and the standard rules for changes come into effect.
We found that teams were using messages primarily for two purposes:
Messages of this type tend to have names like:
One example is our course catalog. Only one Bounded Context is the source of truth for courses but most of the system needs read only access to course data. A simple, architecturally consistent, answer immediately presented itself. We can just publish a message every time a course is updated! This is how ps.monolith.course-updated.v1 was born.
Naming messages of the data replication style is so easy that you can do it programmatically:
Our solution to this problem was to add a TTL to each record. If you read data that is past the TTL, then go back to the API as though you had no data for that entity.
Some teams tried another option for avoiding the out of order processing issue. Rather than include the serialized entity in their foo-updated messages, they only include the URI of the resource. This means that message processing is a lot more chatty as every message requires an HTTP request to get the data, but you always retrieve the latest version of the data which makes your data replication messages more idempotent.
Another strategy we would explore is creating shared caches of common data. For example, most parts of our application need access to our content library. Should we have created a SQL DB with readonly slaves in each bounded context? Or perhaps upload the current state of each content object to an S3 bucket? We avoided this because of the type of coupling it introduces as well as the difficulties with succeeding at enterprise-wide data modelling. Even if you get your enterprise data model right the first time, evolving it becomes very difficult.
When we started on our journey towards bounded contexts, we wanted to maintain the small-team, focused, feel that we had enjoyed with as a single team supporting a monolith. One strategy we adopted was to limit our dependencies between the teams and, by extension, the different bounded contexts. We did not want to introduce temporal runtime coupling between components developed by different teams because that would reduce the autonomy of these teams and limit our ability to develop the products that we wanted to develop in the ways we wanted to develop them.
For example, imagine one team is developing an API and they have a goal of a 50 millisecond response time for 98% of requests. If they have to take a runtime dependency on another team that has 20 millisecond response time for 90% of requests but a 200 millisecond average response time for the remaining requests. The first team has no hope of reasonably accomplishing their goal on their own. They may be able to put some technical work on the backlog of the team that provided the original API.
In order to avoid the problem of cross-context coupling and limit the possibility of partial outages turning into cascading failures as we disaggregated our monolith into independent bounded contexts, we chose an asynchronous-first model of integration. At that point, we had a small team with limited experience with message-based architectures, so we didn’t set very many rules around how this should be accomplished. What we knew at the time was that we needed a common messaging infrastructure, naming is really important, messages should be idempotent, and that we would have to adapt as we gained experience.
How do we broadcast messages?
All of our messages are broadcast using a publish/subscribe messaging pattern.We chose RabbitMQ as our message broker. A couple of our team members had experience running it in production and it seemed like a reasonable choice for us based on our criteria:
- Avoid a single point of failure (supports clustering)
- Publish/Subscribe (via fanout exchanges)
- Reliable delivery (with durable messages)
- Polyglot development teams (AMQP is widely supported)
We did not make any rules around what shape messages should be or how they should be used.
How do we name messages?
We initially chose to think of messages as business events that occur within a bounded context that might be of interest within another bounded context. Our naming strategy reflects that. We were also in a period of rapid growth through acquisition and we didn’t know how we would integrate all of our new partners. Rather than guess, we took that into account with our naming convention.We name all of our business events/messages/publisher exchanges as follows:
ps.bounded-context.event-name.vX
- ps is the organization or company that is the source of this message.
- bounded-context is the name of the context (and these can be mapped to teams).
- event-name is a past-tense, semantic name for what happened.
- X is an integer value indicating the version. v0 is a draft message.
ps.bounded-context.event-name.vX=>org.other-context.listener-name
- ps.bounded-context.event-name.vX is the full name of the exchange the queue is bound to.
- org is the organization or company that is consuming this message.
- other-context is the name of the subscribing context (and these can be mapped to teams).
- listener-name is a string used by the subscriber to identify which process is receiving the messages.
Versioning Messages
As any experienced engineer knows, your first guess is rarely your best guess, so we started with the assumption that we would need to change our messages over time. As we are using JSON message bodies, adding fields is a non-breaking change, but removing fields, renaming fields, or changing data-types typically is. When a team makes a non-breaking change to a message, there is not need to increment the version number.When a breaking change is required, that team will publish the new message to a new exchange while continuing to publish the old message. This dual-publishing phase allows the consumers to migrate from old to new without serious impact. We expect that each consumer of a message will be a good citizen in the system and migrate to new messages in a timely fashion. When there are no more consumers of the old message, we stop publishing it and clean up the code and infrastructure. No team should ever publish 3 versions of the same message at the same time.
When a message is net new to the system, it can be flagged as a v0. These messages are considered to be draft messages and can evolve rapidly without regard for any impact on consumers. When the publisher and the subscribers have reached an agreement on the shape of the message, we increment the version to v1 and the standard rules for changes come into effect.
What happened next?
Over the next few years, we disassembled our monolith into more than 30 separate bounded contexts. The teams responsible for each of them started publishing messages. Other teams subscribed to those messages. Everything seemed to be working.We found that teams were using messages primarily for two purposes:
- Process choreography
- Data replication
Messages for Process Choreography
Using messages for asynchronous process choreography is a clear win. When you can deconstruct your processes temporally, you can deliver value more quickly and easily from a distributed system. For an example, please see the Gregor Hohpe classic: Your Coffee Shop Doesn’t Use Two-Phase Commit.Messages of this type tend to have names like:
- user-signed-in
- credit-card-charged
- user-location-detected
- course-added-to-channel
- course-removed-from-channel
- the ID of the channel
- the ID of the course
- a URI to the channel resource (which aids a bounded context that doesn’t have any information for this channel, yet)
- the timestamp when the course was added, of course
Messages for Data Replication
Sharing a database with another team is often a recipe for pain. Making changes to the shared resource requires planning and coordination and, more often than not, leads to delays in delivering features. When the barriers to change become too high, smart people come up with clever ideas like storing multiple kinds of data in a single field. Because of many painful experiences with this type of behavior, we imposed a hard rule on ourselves that we would avoid the structural, protocol, temporal, and access coupling that come from sharing a database schema. Of course, this boundary created a new problem: how do teams that need the same data share it?One example is our course catalog. Only one Bounded Context is the source of truth for courses but most of the system needs read only access to course data. A simple, architecturally consistent, answer immediately presented itself. We can just publish a message every time a course is updated! This is how ps.monolith.course-updated.v1 was born.
Naming messages of the data replication style is so easy that you can do it programmatically:
- user-account-updated
- course-updated
- channel-updated
- a single JSON-serialized entity
Bootstrapping the data
The first problem we encountered is how to get the initial set of data. If you are subscribed to a stream, you will get all the new updates, but if something hasn’t changed, you won’t even know that it exists. The obvious solution was to use well-known ETL patterns to get started and rely on the message stream to stay current.Missed Messages
If you miss a message, you can end up with missing records in your local data store. When this happens, you need a way to heal your local database. We evolved our system to include resource APIs that could be called by message subscribers if (and only if) they were missing data in their local database. This led to the following pattern for all data that is mastered in another Bounded Context:public void ProcessMessage(FooUpdatedMessage message) { WriteFooToDB(BuildFooFromMessage(message)); } public Foo LoadFoo(string fooId) { var foo = GetFooFromDB(fooId); if (foo == null) { foo = GetFooFromAPI(fooId); WriteFooToDB(foo); } return foo; }
Out of Order Messages
After resolving the missing data issue, we learned that sometimes multiple updates can be published for the same entity in quick succession. With multiple message processors on different machines it is possible that we process these messages out of order leading to bad data in the local database. If there are no additional messages to trigger another update, the data on disk is not eventually consistent, it’s just eternally wrong.Our solution to this problem was to add a TTL to each record. If you read data that is past the TTL, then go back to the API as though you had no data for that entity.
Some teams tried another option for avoiding the out of order processing issue. Rather than include the serialized entity in their foo-updated messages, they only include the URI of the resource. This means that message processing is a lot more chatty as every message requires an HTTP request to get the data, but you always retrieve the latest version of the data which makes your data replication messages more idempotent.
Current Plans for Data Replication
After we’d gained experience with how our teams were implementing the asynchronous-first guideline, we realized that the biggest problem we have is data replication. It was also clear that our existing solutions were insufficient to the cause. We are currently exploring more robust data replication mechanisms including use of the Apache Kafka distributed commit log and read slaves which rely on traditional database replication. As these nascent experiments develop and grow in adoption with more teams, we will continue to learn and share our learning.Another strategy we would explore is creating shared caches of common data. For example, most parts of our application need access to our content library. Should we have created a SQL DB with readonly slaves in each bounded context? Or perhaps upload the current state of each content object to an S3 bucket? We avoided this because of the type of coupling it introduces as well as the difficulties with succeeding at enterprise-wide data modelling. Even if you get your enterprise data model right the first time, evolving it becomes very difficult.
Comments
Post a Comment