The outbox pattern is a proven and scalable software design pattern often used in a distributed environment to publish events reliably and enforce data consistency. This article presents an overview of the outbox pattern and examines how it can be implemented using Kafka and C# in ASP.NET Core applications.
If you're to work with the code examples discussed in this article, you need the following installed in your system:
- Visual Studio 2022
- .NET 8.0
- ASP.NET Core 8.0 Runtime
If you don't already have Visual Studio 2022 installed on your computer, you can download it from here: https://visualstudio.microsoft.com/downloads/.
In this article, I'll examine the following points:
- An overview of the outbox pattern and its benefits and downsides
- An introduction to Kafka as a message broker
- How the transaction outbox pattern works
- How to implement the outbox pattern in ASP.NET Core, Kafka, and C#
- How to implement the consumer application to consume messages stored in Kafka
A Real-World Use Case
Typically, microservices-based applications maintain their own data store to persist and retrieve data. This strategy helps improve the application's scalability and promotes agility in the deployment process. Additionally, these applications take advantage of messaging to facilitate asynchronous communication between the services. There are several messaging solutions for you to choose from such as Kafka, Azure Service Bus, RabbitMQ, etc.
Understanding the Problem
Consider the flow of a typical Order Processing application at a glance:
- The client application sends a request to the API to persist the data to the underlying database.
- The API validates the incoming request, executes the necessary business logic, and persists the data in the database, as shown in Figure 1.
- Once the data is stored in the database, the API calls a message broker to publish the event so that other services can consume it if needed, as shown in Figure 2.
What's wrong with this? Although the flow may appear straightforward, complications can arise. For instance, if the message broker is unavailable, there's an error in publishing the messages or a runtime exception occurs in the application. As shown in Figure 3, the ordering service and the data can become inconsistent.
Addressing this issue can be challenging, as writing code to solve it may lead to a violation of the Single Responsibility Principle, potentially requiring additional responsibilities for the OrderService.
Here's where the transactional outbox pattern comes to the rescue.
An Overview of the Outbox Pattern
The outbox pattern is a technique used in microservices architectures to guarantee reliable communication between services. For this purpose, a temporary storage area is created in the database of a microservice, often referred to as an outbox table, in which the outgoing data changes are recorded. The data is serialized before being sent from the outbox table to other external systems or services. In distributed systems, duplicate writes occur when an operation must update a local database and notify other services.
The outbox design solves this problem by combining the publication of messages/events with the same database transaction that updates the local state - making both actions atomic and, therefore, complete or incomplete. In the outbox pattern, you won't be publishing an event to the message broker directly. Instead, a record will be inserted into a special database named Outbox. Incidentally, the Outbox database table is stored in the same database where the data of the application is being stored. Then an asynchronous process executes in the background and periodically checks for incoming messages. When it finds one, it publishes the message to a message broker (i.e., Apache Kafka, RabbitMQ, etc.), as shown in Figure 4.
Components of the Outbox Pattern
The outbox pattern is comprised of the following key components:
- Events: At runtime, an application generates events that are stored in the Outbox table.
- Outbox table: This data store is used to store the messages until they are processed by the publisher.
- Publisher: This component is used to process messages from the outbox table and then dispatch them to the appropriate destinations
Benefits and Downsides
The outbox pattern is a proven technique widely used for enhancing data consistency and reliability in microservices architectures. However, like any architectural pattern, it has its benefits and downsides.
Benefits
The following are the benefits of the outbox pattern:
- Transactional consistency: The outbox pattern ensures that any changes to the database residing locally and the publishing of events/messages are part of the same database transaction. This prevents data inconsistencies that might occur due to operation failures after the database changes are committed but before the events are published. By sending the messages as part of the same transaction, the outbox pattern maintains atomicity and consistency. Moreover, the outbox pattern enables events to be retried until they have been successfully processed.
- Reliable message publishing: The outbox pattern ensures reliable message sending by taking advantage of a persistent storage mechanism where the messages or events are stored in the database, thereby preventing message loss due to failures in the event publishing mechanism or network issues. This persistent storage of messages coupled with asynchronous message sending helps decouple the message dispatch process from the application's response, thereby minimizing the chances of message loss and improving resiliency.
- Scalability: The outbox pattern enhances scalability of an application by separating the concerns of event creation from event distribution. It enables an application to focus on processing incoming requests without interruption by offloading message transmission to a different process or service.
- Performance: The outbox pattern improves performance by sending messages asynchronously, thereby processing the messages without any delays to enhance responsiveness and throughput. Storing the events in the same database transaction as the business operations minimizes the number of total transactions executed, which can enhance performance.
Downsides
Despite the benefits of the outbox pattern stated earlier, there are certain downsides as well:
- Increased complexity: The outbox pattern can introduce additional complexity to your architecture, application design, and development. By implementing this pattern, you need to manage the outbox processor and verify whether the messages are recorded correctly, thereby making your application difficult to manage, maintain, and debug.
- Latency: When using the outbox pattern, there might be a delay from the time an event occurs and when it's published to other services. It usually publishes events in batches at a regular time interval set by the outbox processor. However, this delay isn't recommended in situations where an immediate or real-time communication is needed.
- Database coupling: Because the outbox pattern depends on the database to store events before they're published, there's an inherent coupling with the database, thereby making future changes more challenging.
- Scalability: Although the outbox pattern ensures that microservices are not tightly coupled and can be independently scaled, scaling your application may still be challenging. If an outbox processor isn't working as expected or there's too much traffic, you might have to consider scaling your infrastructure for processing outboxes.
- Resiliency: Although the outbox pattern ensures that events aren't lost, there's no built-in mechanism to handle failures while processing the events. You need to write your code to handle duplicate events, idempotency, retries, and failure recovery.
- Operational overhead: Monitoring the outbox repository regularly and adopting the strategy to transmit messages and potential retries can be an additional overhead. Keeping an eye on the outbox storage, ensuring that the messages are delivered, and dealing with errors or retries may be challenging.
- Integration challenges: You must take the necessary steps to ensure reliable message delivery. This may involve implementing additional configurations and dependencies when connecting to external systems like RabbitMQ and Apache Kafka.
How Does the Outbox Pattern Work?
Assume an order processing application in which an order service is used to handle orders placed by the customer. As soon as a new order is placed, the application must update the order database table and also notify the inventory service that the stock should be updated. The complete flow is illustrated in the sequence of steps of a typical order processing use case given below:
- Create Order: A customer uses the application to place a new order. Next, the Order Service picks up this order and processes it.
- Update Order database table: The Order Service stores the new order in the Order database table with information that includes the order details and the customer details together with the Order status.
- Create Stock Update message: Besides updating the Order database table, the Order Service creates a message to indicate that the Stock database table needs to be updated. This message is persisted in the Outbox database table within the context of the same transaction.
- Commit transaction: Next, the transaction that comprises updating the Order database table and insertion of the message into the Outbox database table is committed.
- Check Outbox database table periodically: A process named Message Relay executes in the background to check the Outbox database table for new messages.
- Dispatch message to Stock service: When the Message Relay detects a new message in the Outbox database table, it forwards the message to the Stock service for processing via the Message queue.
- Stock update: Upon receipt of a new message, the Stock service updates the Stock database table accordingly.
In the outbox pattern, when an event occurs, instead of a message being sent to a message broker such as Kafka, a new record that contains the details of the event is stored in the Outbox database table. It should be noted that typically the Outbox table is part of the same database and this all happens as part of the same transaction. Figure 5 illustrates how this all works together.
Key Terminologies
Here are the key terminologies you often come across when exploring the Outbox pattern:
- Event log or the Outbox: Each microservice maintains an event log or an Outbox where the events that have to be published are first recorded to ensure that the events are never lost. Essentially, an Outbox is a database table to store events or messages before the events are published.
- Transactional operation: For the purpose of ensuring that only successful business operations are captured in the event log, events are added to the log as part of the same transaction as the business operations that generated them.
- Outbox processor: An outbox processor runs as a separate background task and scans the event log at regular intervals of time. It collects events without processing them and then publishes them to the event bus or message broker.
- Message broker or event bus: A message broker or an event bus receives those events that have been published. The message broker or event bus provides durability and reliability and distributes these events to the subscribers or the other microservices.
Introduction to Kafka as a Message Broker
Apache Kafka is a distributed, high-performance, open source, scalable, and versatile stream-processing software. It's written in Java and Scala, and has become popular in recent times for building systems that are adept at handling massive volumes of data. Kafka is a messaging platform based on the publish/subscribe model. It comes with built-in features for replication, partitioning, fault tolerance, and improved throughput, compared to applications without message brokers.
Kafka is a good option for applications that need vast data processing capabilities. The main use of Kafka is in creating real-time streaming data pipelines. For this reason, Kafka includes stream processing and fault-tolerant storage functionalities, which enable storing and analyzing historical and real-time data. Figure 6 shows how Kafka fits in a typical implementation of the outbox pattern.
Getting Started with Apache Kafka
To get started with Apache Kafka, follow the steps outlined in the next few sections.
Download and Extract Kafka
You can download Apache Kafka from here: https://kafka.apache.org/downloads.
Alternatively, you can run the following command at the terminal to download Kafka in your computer:
wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
Once Kafka has been downloaded, you should extract the zip archive by running the following command at the terminal window:
tar -xzf kafka_2.13-3.7.0.tgz
Start ZooKeeper
You need ZooKeeper to run Kafka on your computer. Once you've downloaded and extracted Apache Kafka to your computer, start ZooKeeper by running the following command at the terminal window:
cd kafka_2.13-3.7.0 bin/windows/zookeeper-server-start.bat
config/zookeeper.properties
Create a Topic
Now that ZooKeeper and Kafka are up and running in your computer, you can start creating topics. You can create a new topic by running the following command at the terminal window.
.\bin\windows\kafka-topics.bat --create --topic mynewtopic
--bootstrap-server localhost:9092
Display All Kafka Messages in a Topic
To display all messages in a particular topic, use the following command:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic mynewtopic --from-beginning
Display all Topics
To list all topics, you can use the following command:
.\bin\windows\kafka-topics.bat --list --bootstrap-server
localhost:9092
Start the Producer
You can run the following command at the terminal window to start a producer.
bin/windows/kafka-console-producer.bat --topic mynewtopic
--bootstrap-server localhost:9092
Start the Consumer
In Kafka, you need a consumer to read the messages produced by a producer. You can run the following command at the terminal window to read all messages sent by the producer as shown in Figure 7.
bin/windows/kafka-console-consumer.bat --topic mynewtopic --from-beginning
--bootstrap-server localhost:9092
Shut Down Zookeeper and Kafka
To shut down Zookeeper, use the zookeeper-server-stop.bat script, as shown below:
bin\windows\zookeeper-server-stop.bat
To shut down Kafka, you use the kafka-server-stop.bat script as shown below:
bin\windows\kafka-server-stop.bat
Implementing the Outbox Pattern in ASP.NET Core, Kafka, and C#
In this section, I'll examine how to implement the outbox pattern in an ASP.NET Core application using Kafka. In this example, you'll use the following interfaces and classes.
- OutboxMessage: This is the model class.
- IKafkaProducer: This is the interface for the
KafkaProducer
class. - IKafkaConsumer: This is the interface for the
KafkaConsumer
class. - IOrderService: This interface contains the declaration of methods supported by the
OrderService.
- IOutboxMessageRepository: This represents the interface for the
OutboxMessageRepository
class. - KafkaProducer: This class is used to send messages to Kafka.
- KafkaConsumer: This class is used to consume messages in Kafka.
- OrderService: The
OrderService
class implements theIOrderService
interface and encapsulates the logic for invoking the appropriate methods of theOrderMessageRepository.
- KafkaMessageProcessor: This is a background service that calls
KafkaConsumer
at regular intervals of time to consume messages residing in Kafka. - OutboxMessageProcessor: This is yet another background service used to invoke
KafkaProducer
to transmit messages to Kafka. - OutboxMessageRepository: This class contains methods to retrieve messages from the Outbox database table and also update a message as needed.
- ApplicationDbContext: This class represents the data context that acts as a gateway to connect to the underlying database being used by the application.
- OrderController: This represents the Order API that can be consumed by clients to retrieve existing orders or create new orders.
Create a New ASP.NET Core 8 Project in Visual Studio 2022
You can create a project in Visual Studio 2022 in several ways such as, from the Visual Studio 2022 Developer Command Prompt or by launching the Visual Studio 2022 IDE. When you launch Visual Studio 2022, you'll see the Start window. You can choose “Continue without code” to launch the main screen of the Visual Studio 2022 IDE.
Now that you know the basics, let's start setting up the project. To create a new ASP.NET Core 8 Project in Visual Studio 2022:
- Start the Visual Studio 2022 IDE.
- In the “Create a new project” window, select “ASP.NET Core Web API” and click Next to move on.
- Specify the project name as “Outbox_Pattern_Demo” and the path where it should be created in the Configure your new project window.
- If you want the solution file and project to be created in the same directory, you can optionally check the “Place solution and project in the same directory” checkbox. Click Next to move on.
- In the next screen, specify the target framework and authentication type as well. Ensure that the “Configure for HTTPS,” “Enable Docker Support,” “Do not use top-level statements,” and the “Enable OpenAPI support” checkboxes are unchecked because you won't use any of these in this example.
- Remember to leave the “Use controllers” checkbox checked because you won't use minimal API in this example.
- Click Create to complete the process.
A new ASP.NET Core Web API project is created. You'll use this project to implement the outbox pattern using Kafka in ASP.NET Core and C#.
Create the Outbox Database Table
First off, create the Outbox database table to store messages. To do this, create a new database called Outbox_Pattern_Demo using the database script given below:
CREATE DATABASE Outbox_Pattern_Demo;
Next, create a new database table called OutboxMessage using the following database script.
CREATE TABLE Outbox_Message
(
Event_Id BIGINT IDENTITY PRIMARY KEY,
Event_Payload NVARCHAR(MAX) NOT NULL,
Event_Date DATETIME NOT NULL
DEFAULT CURRENT_TIMESTAMP,
IsMessageDispatched BIT NOT NULL
);
Create another database table in the same database named Order using the following script.
CREATE TABLE [Order]
(
Order_Id BIGINT IDENTITY PRIMARY KEY,
Customer_Id INT NOT NULL,
Order_Date DATETIME,
Order_Amount MONEY NOT NULL
);
Install NuGet Package(s)
In this example, you'll take advantage of Entity Framework to interact with the database and perform CRUD (an acronym for Create Read Update Delete) operations. You'll also need a Kafka client package to produce and consume messages. To work with Apache Kafka, you'll use the Confluent.Kafka
NuGet package. To install the required package into your project, right-click on the solution and then select Manage NuGet Packages for Solution.… Now search for the Microsoft.EntityFramework.SqlServer
and Confluent.Kafka
packages in the search box and install it. Alternatively, you can type the commands shown below at the NuGet Package Manager Command Prompt:
PM> Install-Package Microsoft.EntityFrameworkCore.SqlServer
Install-Package Confluent.Kafka
You can also install this package by executing the following commands at the Windows Shell:
dotnet add package Microsoft.EntityFrameworkCore.SqlServer
dotnet add package Confluent.Kafka
Create the OutboxMessage Class
Create a new C# class named OutboxMessage
in a file named OutboxMessage.cs
and write the following code in there.
public class OutboxMessage
{
public int Event_Id { get; set; }
public string Event_Payload { get; set; }
public DateTime Event_Date { get; set; }
public bool IsMessageDispatched { get; set; }
}
Create the Order Model Class
Create a new class named Order
in a file having the same name with a .cs extension and write the following code in there:
public class Order
{
public long Order_Id { get; set; }
public int Customer_Id { get; set; }
public DateTime Order_Date { get; set; }
public decimal Amount { get; set; }
}
Create the Data Context
In Entity Framework Core (EF Core), a data context is a component used by an application to interact with the database and manage database connections, and to query and persist data in the database. You'll now create a data context class named CustomerDbContext
. To create a data context, create a class that extends the DbContext class of EF Core, as shown below:
public class ApplicationDbContext : DbContext
{
public ApplicationDbContext(DbContextOptions
<ApplicationDbContext> options) : base(options)
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
}
public DbSet<OutboxMessage> OutboxEvents { get; set; }
public DbSet<Order> Orders { get; set; }
}
You should specify the primary keys and the table names for your entities in the `OnModelCreating method, as shown in the code snippet given below:
protected override void
OnModelCreating
(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>(entity =>
{
entity.ToTable("Order");
entity.HasKey(e => e.Order_Id);
});
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.ToTable("OutboxMessage");
entity.HasKey(e => e.Event_Id);
});
base.OnModelCreating(modelBuilder);
}
The complete source of the data context class is given in Listing 1.
Listing 1: The ApplicationDbContext class
using Microsoft.EntityFrameworkCore;
namespace Outbox_Pattern_Demo
{
public class ApplicationDbContext : DbContext
{
public ApplicationDbContext(DbContextOptions
<ApplicationDbContext> options) : base(options)
{
}
protected override void OnModelCreating(
ModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>(entity =>
{
entity.ToTable("Order");
entity.HasKey(e => e.Order_Id);
});
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.ToTable("OutboxMessage");
entity.HasKey(e => e.Event_Id);
});
base.OnModelCreating(modelBuilder);
}
public DbSet<OutboxMessage>OutboxMessages { get; set; }
public DbSet<Order>Orders { get; set; }
}
}
Create the OrderService Class
Now, create a new class named OrderService
in a file having the same name with a .cs extension. Write the following code in there:
public class OrderService : IOrderService
{
}
The OrderService
class illustrated in the code snippet below implements the methods of the IOrderService
interface. Here’s how the IOrderService
interface should look:
public interface IOrderService
{
public Task<List<Order>> GetAllOrdersAsync();
public Task<Order> GetOrderAsync(int Id);
public Task CreateOrderAsync(Order order);
}
The OrderService
class implements the methods of the IOrderService
interface.
public async Task CreateOrderAsync(Order order)
{
using var transaction = _context.Database.BeginTransaction();
try
{
_context.Orders.Add(order);
await _context.SaveChangesAsync();
var outboxMessage = new OutboxMessage
{
Event_Payload = JsonSerializer.Serialize(order),
Event_Date = DateTime.Now, IsMessageDispatched = false
};
_context.OutboxMessages.Add(outboxMessage);
await _context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
The complete source code of the OrderService
class is given in Listing 2.
Listing 2: The OrderService Class
namespace Outbox_Pattern_Demo
{
using System.Text.Json;
public class OrderService : IOrderService
{
private readonly ApplicationDbContext _context;
public OrderService(ApplicationDbContext context)
{
_context = context;
}
public async Task<List<Order>> GetAllOrdersAsync()
{
return await Task.FromResult(_context.Orders.ToList<Order>());
}
public async Task<Order> GetOrderAsync(int Id)
{
return await Task.FromResult(
_context.Orders.FirstOrDefault(x => x.Order_Id == Id));
}
public async Task CreateOrderAsync(Order order)
{
using var transaction = _context.Database.BeginTransaction();
try
{
_context.Orders.Add(order);
await _context.SaveChangesAsync();
var outboxMessage = new OutboxMessage
{
Event_Payload = JsonSerializer.Serialize(order),
Event_Date = DateTime.Now, IsMessageDispatched = false
};
_context.OutboxMessages.Add(outboxMessage);
await _context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
}
Create the OutboxMessageProcessor Background Service
You'll now create a service that runs in the background and checks for messages in the OutboxMessage database table. If one or more messages are found, it sends them to Kafka for further processing. To create a background service in .NET Core, you can take advantage of the IHostedService
interface. You can create your background service class by implementing the IHostedService
interface.
This interface contains two methods, StartAsync
and StopAsync
as shown below:
public interface IHostedService
{
Task StartAsync (CancellationToken cancellationToken);
Task StopAsync (CancellationToken cancellationToken);
}
You can also use the abstract helper class called BackgroundService
that's available in .NET Core. Because this class already implements the IHostedService
interface, you can create your background service class by extending this class in lieu of implementing the IWorkerService
interface. The BackgroundService
class is defined in the Microsoft.Extensions.Hosting
namespace as shown below:
public abstract class BackgroundService : IHostedService, IDisposable
{
public virtual void Dispose();
public virtual Task StartAsync(CancellationToken cancellationToken);
public virtual Task StopAsync(CancellationToken cancellationToken);
protected abstract Task ExecuteAsync(CancellationToken stoppingToken);
}
The following code snippet shows how you can create your background service class in this way.
public class OutboxMessageProcessor : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken
cancellationToken)
{
}
}
The PublishOutboxMessagesAsync
method of the OutboxMessageProcessor
class is responsible for sending the outbox messages to Kafka. The foreach loop iterates through all messages residing in the Outbox database table that are yet to be dispatched. Such messages are sent to Kafka by making a call to the SendMessageToKafkaAsync
method of the KafkaProducer
class.
private async Task PublishOutboxMessagesAsync
(CancellationToken cancellationToken)
{
try
{
using var scope = _scopeFactory.CreateScope();
await using var _dbContext = scope.ServiceProvider.
GetRequiredService<ApplicationDbContext>();
List<OutboxMessage> messages = _dbContext.OutboxMessages.Where(om =>
om.IsMessageDispatched != false).ToList();
foreach (OutboxMessage outboxMessage in messages)
{
try
{
await _producer.SendMessageToKafkaAsync(outboxMessage);
outboxMessage.IsMessageDispatched = true;
outboxMessage.Event_Date = DateTime.UtcNow;
_dbContext.OutboxMessages.Update(outboxMessage);
await _dbContext.SaveChangesAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
catch
{
throw;
}
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
The complete source code of the OutboxMessageProcessor
class is given in Listing 3.
Listing 3: The OutboxMessageProcessor class
namespace Outbox_Pattern_Demo
{
public class OutboxMessageProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IKafkaProducer _producer;
public OutboxMessageProcessor(IServiceScopeFactory scopeFactory,
IKafkaProducer producer)
{
_scopeFactory = scopeFactory;
_producer = producer;
}
protected override async Task ExecuteAsync(CancellationToken
cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await PublishOutboxMessagesAsync(cancellationToken);
}
}
private async Task PublishOutboxMessagesAsync(CancellationToken
cancellationToken)
{
try
{
using var scope = _scopeFactory.CreateScope();
await using var _dbContext =
scope.ServiceProvider.GetRequiredService
<ApplicationDbContext>();
List<OutboxMessage> messages = _dbContext.OutboxMessages.Where
(om => om.IsMessageDispatched != false).ToList();
foreach (OutboxMessage outboxMessage in messages)
{
try
{
await _producer.SendMessageToKafkaAsync(outboxMessage);
outboxMessage.IsMessageDispatched = true;
outboxMessage.Event_Date = DateTime.UtcNow;
_dbContext.OutboxMessages.Update(outboxMessage);
await _dbContext.SaveChangesAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
catch
{
throw;
}
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}
}
Create the OutboxMessageRepository
Let’s now create the OutboxMessageRepository
that will be used to get the messages from the Outbox database table and also update a message in the same table. The OutboxMessageRepository
implements the IOutboxMessageRepository
interface. The following code snippet shows the IOutboxMessageRepository
interface:
namespace Outbox_Pattern_Demo
{
public interface IOutboxMessageRepository
{
Task<IreadOnlyCollection<OutboxMessage>> GetUnsentMessagesAsync();
Task<IreadOnlyCollection<OutboxMessage>> GetMessagesByIdsAsync
(IEnumerable<int> ids);
Task UpdateAsync(OutboxMessage message, bool status);
};
}
The complete source code of the OutboxMessageRepository
class is given in Listing 4.
Listing 4: The OutboxMessageRepository class
using System.Collections.ObjectModel;
using System.Data;
namespace Outbox_Pattern_Demo
{
public class OutboxMessageRepository : IOutboxMessageRepository
{
private readonly ApplicationDbContext _context;
private IreadOnlyCollection<OutboxMessage> _outboxMessages;
public OutboxMessageRepository(ApplicationDbContext context)
{
_context = context;
}
public IreadOnlyCollection<OutboxMessage> OutboxMessages
{
get
{
return _outboxMessages ?? (_outboxMessages = new
ReadOnlyCollection<OutboxMessage>
(_context.OutboxMessages.ToList()));
}
}
public async Task<IreadOnlyCollection<OutboxMessage>>
GetUnsentMessagesAsync()
{
List<OutboxMessage>? unsentMessages =
_context.OutboxMessages.Where
(e => e.IsMessageDispatched != true).ToList();
ReadOnlyCollection<OutboxMessage>? result = new ReadOnlyCollection
<OutboxMessage>(unsentMessages);
return result;
}
public async Task<IreadOnlyCollection<OutboxMessage>>
GetMessagesByIdsAsync(IEnumerable<int> ids)
{
List<OutboxMessage>? orders = _context.OutboxMessages.ToList();
var readOnlyOrders = new ReadOnlyCollection<OutboxMessage>(orders);
return readOnlyOrders;
}
public async Task UpdateAsync(OutboxMessage message, bool status)
{
var entity = _context.OutboxMessages.FirstOrDefault
(o => o.Event_Id == message.Event_Id);
if (entity != null)
{
entity.Event_Id = message.Event_Id;
entity.Event_Date = message.Event_Date;
entity.Event_Payload = message.Event_Payload;
entity.IsMessageDispatched = message.IsMessageDispatched;
await _context.SaveChangesAsync();
}
}
};
}
The OrderController Class
Next, create a new API controller class named OrderController
and replace the generated code with the code given in Listing 5. The OrdersController
class contains three action methods: GetOrder
, GetOrders
, and CreateOrder
. The GetOrders
action method returns a list of Order
instances, and the GetOrder
action method returns one Order
based on the Order ID
passed to the action method as a parameter.
Listing 5: The OrderController Class
using Microsoft.AspNetCore.Mvc;
namespace Outbox_Pattern_Demo.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OrderController : ControllerBase
{
private IOrderService _orderService;
public OrderController(IOrderService orderService)
{
_orderService = orderService;
}
[HttpGet("GetOrders")]
public async Task<List<Order>> GetOrders()
{
return await _orderService.GetAllOrdersAsync();
}
[HttpGet("{id}")]
public async Task<Order> GetOrder(int id)
{
return await _orderService.GetOrderAsync(id);
}
[HttpPost("CreateOrder")]
public async Task<IActionResult>CreateOrder([FromBody] Order order)
{
await _orderService.CreateOrderAsync(order);
return Ok();
}
}
}
The CreateOrder
action method given below creates a new order record in the order database table.
[HttpPost("CreateOrder")]
public async Task<IActionResult> CreateOrder([FromBody] Order order)
{
await _orderService.CreateOrderAsync(order);
return Ok();
}
Send Messages to Apache Kafka
Create an interface named IKafkaProducer
in a file named IKafkaProducerr.cs
and write the following code in there:
public interface IKafkaProducer
{
Task SendMessageToKafkaAsync(OutboxMessage message);
};
Next, create a new class named KafkaProducer
in a file having the same name with a .cs
extension and write the code given in Listing 6 in there.
Listing 6: The KafkaProducer class
using Confluent.Kafka;
using System.Net;
namespace Outbox_Pattern_Demo
{
public class KafkaProducer : IKafkaProducer
{
private readonly ProducerConfig _producerConfig;
private readonly IOutboxRepository _outboxRepository;
private readonly string topic = "test";
public KafkaProducer(IOutboxRepository outboxRepository)
{
_outboxRepository = outboxRepository;
_producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = Dns.GetHostName()
};
_outboxRepository = outboxRepository;
}
public async Task SendMessageToKafkaAsync(OutboxMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
using var producer = new ProducerBuilder<Null,
string>(_producerConfig).Build();
try
{
var result = await producer.ProduceAsync
(topic, new Message<Null, string>
{
Value = message.EventPayload
});
if (result.Status == PersistenceStatus.Persisted)
{
await _outboxRepository.UpdateAsync
(message, OutboxMessageStatus.Sent);
}
}
catch (Exception)
{
await _outboxRepository.UpdateAsync
(message, OutboxMessageStatus.New);
}
}
};
}
In addition, you should register the instances of type ImessageRepository
and IKafkaProducer
with the service collection, as shown below:
builder.Services.AddScoped <IOutboxMessageRepository,
OutboxMessageRepository>();
builder.Services.AddScoped<IKafkaProducer, KafkaProducer>();
You should also register the hosted service in the Program.cs
file, as shown in the code snippet given below:
services.AddSingleton<IHostedService, OutboxMessageProcessor>();
Sequence Diagram of the Complete Flow
The sequence diagram of the complete flow is shown in Figure 8.
Create the Message Consumer Application
Let's create a message consumer to consume the messages stored in Kafka. To do this, create another ASP.NET Core application by following the steps mentioned earlier in this article. Create a new interface named IKafkaConsumer
and write the following code in there:
namespace Outbox_Pattern_Demo
{
public interface IKafkaConsumer
{
public Task ConsumeMessagesAsync(CancellationToken cancellationToken);
}
}
Next, create a new class named KafkaConsumer
that implements the IKafkaConsumer
interface, as shown in Listing 8. You now need a service that runs in the background and consumes the messages residing in Kafka. To do this, create a class named KafkaMessageProcessor
that extends the BackgroundService
class, as shown in Listing 9.
Listing 8: The KafkaConsumer class
using Confluent.Kafka;
using System.Text.Json;
namespace Outbox_Pattern_Demo
{
public class KafkaConsumer : IKafkaConsumer
{
private readonly string topic = "test";
private readonly string groupId = "test_group";
private readonly string bootstrapServers = "localhost:9092";
public async Task ConsumeMessagesAsync(CancellationToken
cancellationToken)
{
var config = new ConsumerConfig
{
GroupId = groupId,
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};
try
{
using (var consumerBuilder = new ConsumerBuilder
<Ignore, string>(config).Build())
{
consumerBuilder.Subscribe(topic);
var cancelToken = new CancellationTokenSource();
try
{
while (true)
{
var consumer = consumerBuilder.Consume
(cancelToken.Token);
var order = JsonSerializer.Deserialize
<Order>(consumer.Message.Value);
}
}
catch (OperationCanceledException)
{
consumerBuilder.Close();
}
}
}
catch
{
throw;
}
}
}
}
Listing 9: The KafkaMessageProducer class
namespace Outbox_Pattern_Demo
{
public class KafkaMessageProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IKafkaConsumer _consumer;
public KafkaMessageProcessor(IServiceScopeFactory scopeFactory,
IKafkaConsumer consumer)
{
_scopeFactory = scopeFactory;
_consumer = consumer;
}
protected override async Task ExecuteAsync
(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await _consumer.ConsumeMessagesAsync(cancellationToken);
}
}
}
}
Register the instance of type IKafkaConsumer
as a scoped service and the instance of KafkaMessageProcessor
as a singleton hosted service.
builder.Services.AddScoped<IKafkaConsumer, KafkaConsumer>();
builder.Services.AddSingleton<IHostedService, KafkaMessageProcessor>();
Figure 9 illustrates the complete flow visually.
Listing 10 shows the Program.cs
file of the consumer application.
Listing 10: The Program.cs file (Consumer Application)
using Microsoft.EntityFrameworkCore;
using Outbox_Pattern_Client_Demo;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddScoped<IKafkaConsumer, KafkaConsumer>();
builder.Services.AddSingleton<IHostedService, KafkaMessageProcessor>();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseAuthorization();
app.MapControllers();
app.Run();
Execute the Application
Run the producer and the consumer applications separately because they're part of different solutions. You should also set appropriate breakpoints in the source code of both applications so that you can debug them. To run the application, follow the steps outlined below:
- Start ZooKeeper in a terminal window.
- Start Kafka in another terminal window.
- Execute the producer application.
- Execute the consumer application.
- Launch the Postman Http Debugger tool.
- Send an
HTTP POST
request to the producer API using Postman.
Figure 10 shows how you can invoke the API endpoint in Postman.
Real-World Use Cases for the Outbox Pattern
The outbox pattern has several use cases in real-life: real-time notifications and data synchronization. The next two sections discuss these.
Real-Time Notifications
You might often want real-time notifications in your application to send instant alerts, messages, etc. You can use the outbox pattern to implement notifications in your application in real-time. These events can then be processed asynchronously, thereby triggering real-time notifications to the logged-in users of the application.
Data Synchronization
Real-time data synchronization is yet another use case where the outbox pattern fits in. For example, in an e-commerce application when a customer places an order, both the order and the stock database tables are updated simultaneously in the same transaction. Additionally, the order, product, and cart data must be synchronized across all devices in use using asynchronous processing.
Alternatives to the Transactional Outbox Pattern
Although the outbox pattern is a popular strategy for handling distributed transactions efficiently and ensuring reliable and consistent communication between microservices, there are certain downsides to using it as well. I've discussed them earlier in this article. The Two-Phase Commit and the Saga Pattern are two popular alternatives to this strategy.
Two-Phase Commit
This approach is used for performing an atomic transaction across multiple resources. Keep in mind that there are two phases in this strategy. While the transaction coordinator notifies all other resources about its desire to execute a transaction in the first phase, it instructs all resources to execute this operation in the next phase. The major drawback of such an approach is that if any resource fails or there are network issues, the whole transaction stops and locks are applied on resources.
Saga Pattern
This approach splits the transactions into multiple steps and completes each step as a separate transaction. This particular design is best suited for long-running transactions or those that take place between microservices. However, when you adopt this approach, it can be difficult to manage which transactions must be rolled back in the event of an error in your application. This is because the transaction timing and sequence for such transactions can be difficult to comprehend.
Where Do We Go from Here?
The outbox pattern, a reliable pattern for consistent event publishing in microservices-based applications, is a great way to make your microservices apps more stable and scalable. It helps separate services through which the events are successfully delivered and isolates business operations from publishing events. However, it also has certain downsides, which explains why you should understand these constraints before deciding on your application's design. I'll discuss more on the Outbox Pattern in a future article.