Producer / Consumer on Kafka and Kotlin

The translation of the article was prepared on the eve of the start of the course "Backend development in Kotlin"








In this article, we'll talk about how to create a simple Spring Boot application with Kafka and Kotlin.



Introduction



Start by visiting https://start.spring.io and add the following dependencies:



Groovy



implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")


Gradle. Maven.



. IntelliJ IDEA.



Apache Kafka



Apache Kafka . Windows 10. Kafka «too many lines encountered». Kafka . , - Power Shell.



Kafka, :



Shell



.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties


/bin/windows.



Kafka, Zookeeper. Zookeeper – Apache, .



Spring Boot



IDE , KafkaDemoApplication.kt. Spring, .



:



Kotlin



import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KafkaDemoApplication 

fun main(args: Array<String>) {
   runApplication<KafkaDemoApplication>(*args)
}




. .



-, . KafkaController.kt. :



Kotlin



var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"

@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
    var sendResult: SendResult<String, String> = lf.get()
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}


, test_topic, KafkaTemplate. ListenableFuture, . , .





Kafka – KafkaProducer. :



Kotlin



@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

    val map = mutableMapOf<String, String>()
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    map["bootstrap.servers"] = "localhost:9092"

    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
    return ResponseEntity.ok(" message sent to " + future.get().topic());
}


.



KafkaProduce Map, . , StringSerializer.



, Serializer – Kafka, . Apache Kafka , ByteArraySerializer, ByteSerializer, FloatSerializer .



map StringSerializer.



Kotlin



map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"


– bootstrap-, Kafka.



Kotlin



map["bootstrap.servers"] = "localhost:9092"


, KafkaProducer.



ProducerRecord . :



Kotlin



var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)


:



Kotlin



var future:Future<RecordMetadata> = producer?.send(producerRecord)!!


future , .





, . . , , .

MessageConsumer.kt Service.



Kotlin



@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
    println(" message received from topic : $message");
}


@KafkaListener , . , , .

GitHub.






«Backend- Kotlin»







All Articles