Before I get started, I would like to point out that this is just a short quick start tutorial for those who, like me, have never used Kafka in practice.
And so let's get started!
The only broker Kafka and the ZooKeeper required for its operation I will run in Docker .
Let's create a separate network first kafkanet
docker network create kafkanet
Running a container with ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Running a container with Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
In order to make sure that there are no errors, you can display the log docker logs kafka
Kafka, , ,
kafka
docker exec -it kafka bash
demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-topics --list --zookeeper zookeeper:2181
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
.NET : KafkaProducer
, , KafkaConsumer
, . Confluent.Kafka Microsoft.Extensions.Hosting.
KafkaProducer
KafkaProducerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaProducer
{
public class KafkaProducerService : IHostedService
{
private readonly ILogger<KafkaProducerService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerService(ILogger<KafkaProducerService> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < 5; i++)
{
var value = $"Event N {i}";
_logger.LogInformation($"Sending >> {value}");
await _producer.ProduceAsync(
"demo-topic",
new Message<Null, string> { Value = value },
cancellationToken);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
_logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
return Task.CompletedTask;
}
}
}
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaProducerService>());
}
}
KafkaConsumer
KafkaConsumerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
public class KafkaConsumerService : IHostedService
{
private readonly ILogger<KafkaConsumerService> _logger;
private readonly IConsumer<Ignore, string> _consumer;
public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "demo-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(config).Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_consumer.Subscribe("demo-topic");
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(cancellationToken);
_logger.LogInformation($"Received >> {consumeResult.Message.Value}");
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_consumer?.Dispose();
_logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
return Task.CompletedTask;
}
}
}
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaConsumerService>());
}
}