Kafka with .NET Core
in .NET
Kafka Quick Start Guide
Installation
CentOS Installation
Prerequisites
Step 1: Download & Extract
# Download Kafka
wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
tar -zxvf kafka_2.12-2.1.1.tgz
mv kafka_2.12-2.1.1 /data/kafka
# Download ZooKeeper
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8-bin /data/zookeeper
Step 2: Start ZooKeeper
cd /data/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
# Modify configuration if needed
vim zoo.cfg
# Service Management
./bin/zkServer.sh start # Start
./bin/zkServer.sh status # Check status
./bin/zkServer.sh stop # Stop
./bin/zkServer.sh restart # Restart
# Test connection
./bin/zkCli.sh -server localhost:2181
quit
Step 3: Configure & Start Kafka
cd /data/kafka/config
cp server.properties server.properties_backup
# Edit critical configurations
vim server.properties
"""
# Cluster-unique broker ID
broker.id=0
# Internal listener
listeners=PLAINTEXT://<internal_ip>:9092
# External advertised address
advertised.listeners=PLAINTEXT://<public_ip>:9092
# Default partitions per topic
num.partitions=3
# ZooKeeper connection
zookeeper.connect=localhost:2181
"""
# Start Kafka
./bin/kafka-server-start.sh config/server.properties &
# Verify process
ps -ef | grep kafka
jps
Docker Installation
# ZooKeeper
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# Kafka
docker run -d --name kafka \
-p 9092:9092 \
--link zookeeper \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=<host_ip> \
-e KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka
Core Concepts
Term | Description |
---|---|
Broker | Kafka server node. Multiple brokers form a cluster. |
Topic | Logical message category (e.g., page_views , click_streams ). |
Partition | Physical subdivision of topics. Each partition maintains an ordered message sequence. |
Segment | Physical storage units within partitions. |
Offset | Unique sequential ID for messages within a partition. |
Consumer-Partition Relationship
- Consumer > Partitions: Wasted resources (Kafka prevents concurrent partition access)
- Consumer < Partitions: Single consumer handles multiple partitions (ensure even distribution)
- Optimal: Partition count = N × Consumers (integer multiple)
- Order guaranteed only within partitions
- Cluster changes trigger consumer rebalancing
.NET Integration
NuGet Package
Install-Package Confluent.Kafka
Service Interface
public interface IKafkaService
{
Task PublishAsync<TMessage>(string topic, TMessage message) where TMessage : class;
Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> handler,
CancellationToken cancellationToken) where TMessage : class;
}
Producer Implementation
public class KafkaService : IKafkaService
{
public async Task PublishAsync<TMessage>(string topic, TMessage message) where TMessage : class
{
var config = new ProducerConfig { BootstrapServers = "127.0.0.1:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync(topic, new Message<string, string>
{
Key = Guid.NewGuid().ToString(),
Value = JsonConvert.SerializeObject(message)
});
}
}
Consumer Implementation
public class KafkaService : IKafkaService
{
public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> handler,
CancellationToken cancellationToken) where TMessage : class
{
var config = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
GroupId = "app-consumers",
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
Console.WriteLine($"Assigned: {string.Join(", ", partitions)}"))
.Build();
consumer.Subscribe(topics);
try
{
while (!cancellationToken.IsCancellationRequested)
{
var result = consumer.Consume(cancellationToken);
var message = JsonConvert.DeserializeObject<TMessage>(result.Message.Value);
handler(message);
consumer.Commit(result);
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
Practical Examples
Producer Console App
static async Task Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();
while (true)
{
Console.Write("> ");
var input = Console.ReadLine();
await producer.ProduceAsync("demo-topic", new Message<string, string>
{
Key = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString(),
Value = input
});
}
}
Consumer Console App
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "console-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("demo-topic");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => cts.Cancel();
while (!cts.IsCancellationRequested)
{
var result = consumer.Consume(cts.Token);
Console.WriteLine($"Received: {result.Message.Value}");
}
}
Key Takeaways
- Always configure
advertised.listeners
for external access - Match consumer count to partition count for optimal throughput
- Use
EnableAutoCommit = false
for at-least-once delivery guarantees - Monitor consumer lag in production environments