How do we use Kafka Streams in the Vivid Money data warehouse team?

Hey! My name is Andrey Serebryansky, I am a data engineer in the Data Operations team. Our team is responsible for filling our Snowflake repository, as well as ensuring that the rest of the teams have real-time data. For example, the feed of transactions (these are customer purchases, their transfers, cashback received by them) is filled based on our data.





For all these tasks, we use Kafka, and most importantly Kafka Streams. Today I will talk about what tasks Kafka Streams can be used for and show the code for our simple examples. This will be useful for those using Kafka but haven't tried Kafka Streams yet. If you would like to preserve state while processing Kafka topics or were looking for a simple syntax to enrich some topics with information from others, then today I will show how you can do this easily and practically out of the box.





Article outline

  1. A little about Kafka Streams





  2. Why do we need Kafka Streams at all





  3. Case No. 1. Enriching our customers' purchases with brand information





  4. Case number 2. We take customer data from the Origination team to our storage





  5. How to start all this?





  6. A little about scalability of Kafka Streams





  7. conclusions





A little about Kafka Streams

Kafka Streams - Java. Kafka Java/Scala.





exactly once processing kafka transactions.





Kafka Streams , stateful (, ).





Kafka Streams?

: , - , , , , .





, - . , , . , , , , , .





: , . , , , .





We sequentially fetch data from different sources, waiting if something went wrong in the source.
, , -

, .





Too many friends

, , , . . Kafka Streams. , ,





Kafka Streams pulls up the necessary data in advance
Kafka Streams

, .





№1.

, . (brand_id) ( ).





Top brands

.





Authorization topic

.









builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // ,      
        (brand, auth) -> auth.set("brandName", brand.get("name")) //  
    );

      
      



builder? . :





import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

      
      



, Kafka Streams id ( , ).





id ?

Kafka Streams , , - . builder.globalTable(topicName)



.





. , , . , . , , .





https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

, Kafka Streams .





№2. Origination

Vivid Money, , . Origination - Vivid.





Information about the first and last name goes to the Origination team database
Origination

Kafka Connect open-source dynamodb JSON.





We take data from dynamodb to our kafka
dynamodb

, . , , . Apache AVRO. .





Avro
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

      
      



, :





Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

      
      



AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .





, . , , , . (diff) . , .





, . . . , Kafka Streams . , , .





:





import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), //                
  Serdes.Bytes(), //       
    new GenericAvroSerde() //       
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") //  ,  
    .to(outputTopic);

      
      



ChangeTransformer :





public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

      
      



?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // 
...
kafkaStreams.stop();

      
      



Kafka Streams

Kafka Streams . . 16 , 16 . , .





, state-store ( ChangeTransformer-), , ! .





: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model





Kafka Streams :





  • stateful (join, get previous state). , .





  • . map, filter, join DSL. , transform()



    . ChangeTransformer-, .





  • . . .





P.S. ) , !








All Articles