Hello everyone! My name is Vitaly, I am a developer at Web3Tech. In this post, I will introduce the basic concepts and constructs of the Spring Cloud Stream framework for supporting and working with Kafka message brokers, with a full cycle of contextual unit testing. We use such a scheme in our project of the all-Russian electronic voting on the Waves Enterprise blockchain platform .
As part of the Spring Cloud project team, Spring Cloud Stream is based on Spring Boot and uses Spring Integration to provide communication with message brokers. However, it easily integrates with various message brokers and requires minimal configuration to create event-driven or message-driven microservices.
Configuration and dependencies
First, we need to add the spring-cloud-starter-stream-kafka dependency to build.gradle :
dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
In the configuration of the Spring Cloud Stream project, you need to include the Kafka broker URL, the queue name (topic), and other binding parameters. Here is an example YAML configuration for the application.yaml service :
spring:
application:
name: cloud-stream-binding-kafka-app
cloud:
stream:
kafka:
binder:
brokers: 0.0.0.0:8080
configuration:
auto-offset-reset: latest
bindings:
customChannel: #Channel name
destination: 0.0.0.0:8080 #Destination to which the message is sent (topic)
group: input-group-N
contentType: application/json
consumer:
max-attempts: 1
autoCommitOffset: true
autoCommitOnError: false
Concept and classes
, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):
@EnableBinding(ProducerBinding::class)
@SpringBootApplication
class SpringCloudStreamBindingKafkaApp
fun main(args: Array<String>) {
SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
}
@EnableBinding , .
.
Binding β , .
Binder β middleware .
Channel β middleware .
StreamListeners β (beans), , MessageConverter middleware βDTOβ.
Message Schema β , . .
send/receive, producer consumer. , Spring Cloud Stream.
Producer Kafka, (ProducerBinding.kt):
interface ProducerBinding {
@Output(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
onsumer Kafka .
ConsumerBinding.kt:
interface ConsumerBinding {
companion object {
const val BINDING_TARGET_NAME = "customChannel"
}
@Input(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
Consumer.kt:
@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {
@StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
fun process(
@Payload message: Map<String, Any?>,
@Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
) {
messageService.consume(message)
}
}
Kafka . Kafka, spring-kafka-test.
MessageCollector
, . ProducerBinding payload ProducerTest.kt:
@SpringBootTest
class ProducerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var messageCollector: MessageCollector
@Test
fun `should produce somePayload to channel`() {
// ARRANGE
val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
// ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
val payload = messageCollector.forChannel(producerBinding.messageChannel())
.poll()
.payload
// ASSERT
val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
assertTrue(request.entries.stream().allMatch { re ->
re.value == payloadAsMap[re.key.toString()]
})
messageCollector.forChannel(producerBinding.messageChannel()).clear()
}
}
Embedded Kafka
@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):
@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var objectMapper: ObjectMapper
@MockBean
lateinit var messageService: MessageService
companion object {
@ClassRule @JvmField
var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
}
@Test
fun `should consume via txConsumer process`() {
// ACT
val request = mapOf(1 to "foo", 2 to "bar")
producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
.setHeader("someHeaderName", "someHeaderValue")
.build())
// ASSERT
val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
runBlocking {
delay(20)
verify(messageService).consume(requestAsMap)
}
}
}
In this post, I demonstrated the capabilities of Spring Cloud Stream and how to use it with Kafka. Spring Cloud Stream offers a user-friendly interface with simplified nuances of broker configuration, is quickly implemented, works stably and supports modern popular brokers such as Kafka. As a result, I gave a number of examples with unit testing based on the EmbeddedKafkaRule using the MessageCollector.
All sources can be found on Github . Thanks for reading!