Hey! My name is Andrey Serebryansky, I am a data engineer in the Data Operations team. Our team is responsible for filling our Snowflake repository, as well as ensuring that the rest of the teams have real-time data. For example, the feed of transactions (these are customer purchases, their transfers, cashback received by them) is filled based on our data.
For all these tasks, we use Kafka, and most importantly Kafka Streams. Today I will talk about what tasks Kafka Streams can be used for and show the code for our simple examples. This will be useful for those using Kafka but haven't tried Kafka Streams yet. If you would like to preserve state while processing Kafka topics or were looking for a simple syntax to enrich some topics with information from others, then today I will show how you can do this easily and practically out of the box.
Article outline
-
Why do we need Kafka Streams at all
Case No. 1. Enriching our customers' purchases with brand information
Case number 2. We take customer data from the Origination team to our storage
-
A little about scalability of Kafka Streams
-
A little about Kafka Streams
Kafka Streams - Java. Kafka Java/Scala.
exactly once processing kafka transactions.
Kafka Streams , stateful (, ).
Kafka Streams?
: , - , , , , .
, - . , , . , , , , , .
: , . , , , .
, .
, , , . . Kafka Streams. , ,
, .
№1.
, . (brand_id) ( ).
.
.
builder.streams("authorization-events")
.join(
builder.globalTable("brands"),
auth -> auth.get("brand_id"), // ,
(brand, auth) -> auth.set("brandName", brand.get("name")) //
);
builder? . :
import org.apache.kafka.streams.StreamsBuilder;
...
StreamsBuilder builder = new StreamsBuilder();
, Kafka Streams id ( , ).
id ?
Kafka Streams , , - . builder.globalTable(topicName)
.
. , , . , . , , .
, Kafka Streams .
№2. Origination
Vivid Money, , . Origination - Vivid.
Kafka Connect open-source dynamodb JSON.
, . , , . Apache AVRO. .
Avro
{
"type": "record",
"name": "OriginationClient",
"namespace": "datahub",
"fields": [
{
"name": "firstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "lastName",
"type": [
"null",
"string"
],
"default": null
},
...
]
}
, :
Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);
builder.stream("origination-json-topic")
.mapValues(val -> avroConverter.convert(val))
.to("origination-avro-topic");
AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .
, . , , , . (diff) . , .
, . . . , Kafka Streams . , , .
:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...
var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore**("state-store"**), //
Serdes.Bytes(), //
new GenericAvroSerde() //
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // ,
.to(outputTopic);
ChangeTransformer :
public class ChangeTransformer implements Transformer {
private KeyValueStore<Bytes, GenericRecord> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.stateStore = processorContext.getStateStore("state-store");
}
@Override
public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
GenericRecord prevState = stateStore.get(recordKey);
return extractDiff(prevState, record);
}
...
}
?
StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
.filter(...)
.map(...)
.to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); //
...
kafkaStreams.stop();
Kafka Streams
Kafka Streams . . 16 , 16 . , .
, state-store ( ChangeTransformer-), , ! .
: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model
Kafka Streams :
stateful (join, get previous state). , .
. map, filter, join DSL. ,
transform()
. ChangeTransformer-, .
. . .
P.S. ) , !