We have finally made it! This will be the fifth and last article in our Kafka Learning Series, and the culmination of our theoretical learning with a real demo!
This is what we covered so far:
Bank
Our demo bank application is composed by 3 projects:
Producer.API:
.NET Core 5.0 Web API with a Transactions API to simulate the creation of transactions. These transactions are sent to Kafka via Idempotent producer which produces to the transaction topic. It uses UserId as the message key (ensuring messages from the same user will always be sent to the same topic partition).
Consumer:
Worker which consumes messages from the transactions Topic. This consumer disables auto commit. Messages are processed and offsets committed separately.
Contracts:
Holds the Transaction Message type.
Producer:
Application Settings:
Registration:
We register the IProducer as a service so that we can tidy up the code a bit more on the Producer application. As you can see, we define our producer as Idempotent
and to Ack=All
.
We also created a Transaction service so that our controller is quite light and doesn't have any infrastructure code and concerns.
The message is defined to take the UserId as its Key, and then the payload of the transaction is serialized into the actual content of the message.
Our controller is as simple as:
Consumer:
Application Settings:
Consumer worker:
As you can see, we will be looping through until the Worker actually stops. We subscribe to the transactions
topic, and we consume from it. Kafka doesn't have an async API for the Consume method (you can check the thread here), so this call will actually block the current thread, one can specify the amount of time to block until it exits the Consume
method. There are a few important configurations in the Consumer:
var config = new ConsumerConfig
{
BootstrapServers = bootstrapperServer,
GroupId = consumerGroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
EnableAutoOffsetStore = false
};
Its set to disable auto commits, meaning its our responsibility to commit the offset. Offset position is to be Earliest when there is no committed offset.
Two other important statements are:
consumer.StoreOffset(consumerResult);
consumer.Commit(consumerResult);
StoreOffset
Stores offsets for a single partition based on the topic/partition/offset of a consume result. The offset will be committed according to EnableAutoCommit
and AutoCommitIntervalMs
.
Commit
Commits an offset based on the topic/partition/offset of a ConsumeResult.
As you can see its quite easy to work with Kafka .NET Client. You can find the repo here.
This concludes our Kafka Learning series, I hope you have liked it, and learned something from it! If you have please, give the Like button a click or share it to help me out produce more content.