First touch of Kafka

Before I get started, I would like to point out that this is just a short quick start tutorial for those who, like me, have never used Kafka in practice.





And so let's get started!





The only broker Kafka and the ZooKeeper required for its operation I will run in Docker .





Let's create a separate network first kafkanet







docker network create kafkanet
      
      



Running a container with ZooKeeper





docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
      
      



Running a container with Kafka





docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
      
      



In order to make sure that there are no errors, you can display the log docker logs kafka







Kafka, , ,





kafka







docker exec -it kafka bash
      
      



demo-topic







/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-topics --list --zookeeper zookeeper:2181
      
      







/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
      
      



.NET : KafkaProducer



, , KafkaConsumer



, . Confluent.Kafka Microsoft.Extensions.Hosting.





KafkaProducer



KafkaProducerService







using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaProducer
{
    public class KafkaProducerService : IHostedService
    {
        private readonly ILogger<KafkaProducerService> _logger;
        private readonly IProducer<Null, string> _producer;

        public KafkaProducerService(ILogger<KafkaProducerService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            for (var i = 0; i < 5; i++)
            {
                var value = $"Event N {i}";
                _logger.LogInformation($"Sending >> {value}");
                await _producer.ProduceAsync(
                    "demo-topic",
                    new Message<Null, string> { Value = value },
                    cancellationToken);
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _producer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
      
      



Program.cs







using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaProducerService>());
    }
}
      
      



KafkaConsumer



KafkaConsumerService







using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaConsumer
{
    public class KafkaConsumerService : IHostedService
    {
        private readonly ILogger<KafkaConsumerService> _logger;
        private readonly IConsumer<Ignore, string> _consumer;

        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "demo-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("demo-topic");
            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(cancellationToken);
                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");
            }
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _consumer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
      
      



Program.cs







using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaConsumerService>());
    }
}
      
      



( )








All Articles