The translation of the article was prepared on the eve of the start of the course "Backend development in Kotlin"
In this article, we'll talk about how to create a simple Spring Boot application with Kafka and Kotlin.
Introduction
Start by visiting https://start.spring.io and add the following dependencies:
Groovy
implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
Gradle. Maven.
. IntelliJ IDEA.
Apache Kafka
Apache Kafka . Windows 10. Kafka «too many lines encountered». Kafka . , - Power Shell.
Kafka, :
Shell
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties
/bin/windows.
Kafka, Zookeeper. Zookeeper – Apache, .
Spring Boot
IDE , KafkaDemoApplication.kt. Spring, .
:
Kotlin
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class KafkaDemoApplication
fun main(args: Array<String>) {
runApplication<KafkaDemoApplication>(*args)
}
. .
-, . KafkaController.kt. :
Kotlin
var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"
@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
var sendResult: SendResult<String, String> = lf.get()
return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}
, test_topic, KafkaTemplate. ListenableFuture, . , .
Kafka – KafkaProducer. :
Kotlin
@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
val map = mutableMapOf<String, String>()
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["bootstrap.servers"] = "localhost:9092"
var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
return ResponseEntity.ok(" message sent to " + future.get().topic());
}
.
KafkaProduce Map, . , StringSerializer.
, Serializer – Kafka, . Apache Kafka , ByteArraySerializer, ByteSerializer, FloatSerializer .
map StringSerializer.
Kotlin
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
– bootstrap-, Kafka.
Kotlin
map["bootstrap.servers"] = "localhost:9092"
, KafkaProducer.
ProducerRecord . :
Kotlin
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
:
Kotlin
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
future , .
, . . , , .
MessageConsumer.kt Service.
Kotlin
@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
println(" message received from topic : $message");
}
@KafkaListener , . , , .
GitHub.