Introduction
There are various ways to write tests using Apache Kafka. For example, TestContainers and EmbeddedKafka can be used . You can read about this, for example, here: The pitfalls of testing Kafka Streams . But there is also an option for writing tests using KafkaServer.
What will be tested?
Suppose you need to develop a service for sending messages through various channels: email, telegram, etc.
Let the service name be: SenderService.
The service must: listen to the specified channel, select the messages it needs from the channel, parse messages and send them over the desired channel for the final delivery of messages.
To test the service, you need to form a message to be sent using the mail sending channel and make sure that the message was sent to the final channel.
Of course, in real-world applications, tests will be more difficult. But to illustrate the chosen approach, such a test will be sufficient.
The service and test are implemented using: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.
Service
The service will be able to start and stop its work.
void start()
void stop()
At the start, you must set at least the following parameters:
String bootstrapServers
String senderTopic
EmailService emailService
bootstrapServers – kafka.
senderTopic – , .
emailService – .
.
«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.
Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
SenderConsumerLoop senderConsumerLoop =
new SenderConsumerLoop(
bootstrapServers,
senderTopic,
"sender",
"sender",
tasksExecutorService,
emailService
);
closeables.add(senderConsumerLoop);
senderTasksExecutor.submit(senderConsumerLoop);
}
«», .
«» . .
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (AutoCloseable autoCloseable : closeables) {
try {
autoCloseable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
senderTasksExecutor.shutdown();
tasksExecutorService.shutdown();
stop();
try {
senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
.
«»
«» :
void run()
void close()
: run.
@Override
public void run() {
kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
kafkaConsumer.subscribe(Collections.singleton(topic));
while (true) {
calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
}
}
«kafka-». «kafka-» . . .
json- , , .
:
{
"subject": {
"subject_type": "send"
},
"body": {
"method": "email",
"recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
"title": "42",
"message": "73"
}
}
subject_type — . «send».
method – . «email» — .
recipients – .
title – .
message – .
:
void calculate(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
calculate(record);
}
}
:
void calculate(ConsumerRecord<String, String> record) {
JSONParser jsonParser = new JSONParser();
Object parsedObject = null;
try {
parsedObject = jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
if (parsedObject instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) parsedObject;
JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
if (SEND.equals(subjectType)) {
JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
calculate(jsonBody);
}
}
}
:
void calculate(JSONObject jsonBody) {
String method = jsonBody.get(METHOD).toString();
if (EMAIL_METHOD.equals(method)) {
String recipients = jsonBody.get(RECIPIENTS).toString();
String title = jsonBody.get(TITLE).toString();
String message = jsonBody.get(MESSAGE).toString();
sendEmail(recipients, title, message);
}
}
:
void sendEmail(String recipients, String title, String message) {
tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}
.
.
«kafka-»:
static KafkaConsumer<String, String> createKafkaConsumerStringString(
String bootstrapServers,
String clientId,
String groupId
) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(properties);
}
:
interface EmailService {
void send(String recipients, String title, String message);
}
.
«kafka-».
«kafka-».
.
«kafka-». .
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
}
}
}
. «kafka-». «kafka-» . .
«mock» :
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
:
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
:
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
:
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
:
Thread.sleep(6000);
, :
verify(emailService).send(recipients, title, message);
:
senderService.stop();
:
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Thread.sleep(6000);
verify(emailService).send(recipients, title, message);
senderService.stop();
}
}
}
:
public class SenderFactory {
public static final String SUBJECT = "subject";
public static final String SUBJECT_TYPE = "subject_type";
public static final String BODY = "body";
public static final String METHOD = "method";
public static final String EMAIL_METHOD = "email";
public static final String RECIPIENTS = "recipients";
public static final String TITLE = "title";
public static final String MESSAGE = "message";
public static final String SEND = "send";
public static String key() {
return UUID.randomUUID().toString();
}
public static String createMessage(String method, String recipients, String title, String message) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> subject = new HashMap<>();
Map<String, Object> body = new HashMap<>();
map.put(SUBJECT, subject);
subject.put(SUBJECT_TYPE, SEND);
map.put(BODY, body);
body.put(METHOD, method);
body.put(RECIPIENTS, recipients);
body.put(TITLE, title);
body.put(MESSAGE, message);
return JSONObject.toJSONString(map);
}
}
«kafka-»
:
void start()
void close()
void createTopic(String topic)
«start» .
Create "zookeeper" and save its address:
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
Create a zookeeper client:
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Setting properties for the server:
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
Server creation:
kafkaServer = TestUtils.createServer(config, new MockTime());
Together:
public void start() {
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
kafkaServer = TestUtils.createServer(config, new MockTime());
}
Stopping the service:
@Override
public void close() {
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
Topic creation:
public void createTopic(String topic) {
AdminUtils.createTopic(
zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
Conclusion
In conclusion, it should be noted that the code given here only illustrates the chosen method.
To create and test services using "kafka", you can refer to the following resource:
kafka-streams-examples