Managing entity attributes in Apache Kafka

Introduction



When working on machine learning problems with online data, there is a need to collect various entities into one for further analysis and evaluation. The collection process should be convenient and fast. It should also often provide for a seamless transition from development to industrial use without additional effort and routine work. You can use the Feature Store approach to resolve this issue. This approach is described in many details here: Meet Michelangelo: Uber's Machine Learning Platform . This article describes how to interpret the specified feature management solution as a prototype.







Feature Store for online streaming



Feature Store can be viewed as a service that must perform its functions strictly according to its specification. Before defining this specification, a simple example should be disassembled.







Example



Let the following entities be given.







A movie that has an ID and a title.







Movie rating, which also has its own identifier, movie identifier, and rating value. The rating changes over time.







Rating source, which also has its own rating. And it changes over time.

And you need to combine these entities into one.







Here's what happens.







image

Entity diagram







As you can see, the merge is based on entity keys. Those. all movie ratings are searched for a movie, and all source ratings for a movie rating.







Generalization of the example



, .







kafka-, : A, B… NN.

: AB, BCD… NM.

: Feature Stream Engine.







Feature Stream Engine kafka-, Feature Stream Store Feature Stream Center, .







image

Feature Stream Engine







Feature Stream Store



, .







– (feature).







, , .







.







Feature Stream Center



, , .







Feature Stream Engine



Feature Stream Engine , .







image

Feature Stream Engine







Feature Stream Engine



Feature Stream Engine , .







Feature Stream Engine .







.

kafka.

.

( ).

, .







image

Feature Stream Engine









.







.







, ("configration.properties").







.







topic- kafka. “,”.

. “,”.

topic-.







, .







public static FeaturesDescriptor createFromProperties(Properties properties) {
    String sources = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_SOURCES);
    String keys = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_KEYS);
    String sinkSource = properties.getProperty(FEATURES_DESCRIPTOR_SINK_SOURCE);
    String[] sourcesArray = sources.split(",");
    String[] keysArray = keys.split(",");
    List<FeatureDescriptor> featureDescriptors = new ArrayList<>();
    for (int i = 0; i < sourcesArray.length; i++) {
        FeatureDescriptor featureDescriptor =
                new FeatureDescriptor(sourcesArray[i], keysArray[i]);
        featureDescriptors.add(featureDescriptor);
    }
    return new FeaturesDescriptor(featureDescriptors, sinkSource);
}
      
      





public static class FeatureDescriptor {
    public final String source;
    public final String key;

    public FeatureDescriptor(String source, String key) {
        this.source = source;
        this.key = key;
    }
}
      
      





public static class FeaturesDescriptor {
    public final List<FeatureDescriptor> featureDescriptors;
    public final String sinkSource;

    public FeaturesDescriptor(List<FeatureDescriptor> featureDescriptors, String sinkSource) {
        this.featureDescriptors = featureDescriptors;
        this.sinkSource = sinkSource;
    }
}
      
      





.







void buildStreams(StreamsBuilder builder)
      
      





topic-, , , .







Serde<String> stringSerde = Serdes.String();

List<KStream<String, String>> streams = new ArrayList<>();

for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
    KStream<String, String> stream =
            builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                    .map(new KeyValueMapperSimple(featureDescriptor.key));
    streams.add(stream);
}
      
      





.







KStream<String, String> pref = streams.get(0);
for (int i = 1; i < streams.size(); i++) {
    KStream<String, String> cur = streams.get(i);
    pref = pref.leftJoin(cur,
            new ValueJoinerSimple(),
            JoinWindows.of(Duration.ofSeconds(1)),
            StreamJoined.with(
                    Serdes.String(),
                    Serdes.String(),
                    Serdes.String())
    );
}
      
      





topic.







pref.to(featuresDescriptor.sinkSource);
      
      





.







public void buildStreams(StreamsBuilder builder) {
    Serde<String> stringSerde = Serdes.String();

    List<KStream<String, String>> streams = new ArrayList<>();

    for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {
        KStream<String, String> stream =
                builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))
                        .map(new KeyValueMapperSimple(featureDescriptor.key));
        streams.add(stream);
    }

    if (streams.size() > 0) {
        if (streams.size() == 1) {
            KStream<String, String> stream = streams.get(0);
            stream.to(featuresDescriptor.sinkSource);
        } else {
            KStream<String, String> pref = streams.get(0);
            for (int i = 1; i < streams.size(); i++) {
                KStream<String, String> cur = streams.get(i);
                pref = pref.leftJoin(cur,
                        new ValueJoinerSimple(),
                        JoinWindows.of(Duration.ofSeconds(1)),
                        StreamJoined.with(
                                Serdes.String(),
                                Serdes.String(),
                                Serdes.String())
                );
            }
            pref.to(featuresDescriptor.sinkSource);
        }
    }
}
      
      





.







void run(Properties config)
      
      





( ).







FeaturesStream featuresStream = new FeaturesStream(config);
      
      





kafka.







StreamsBuilder builder = new StreamsBuilder();

featuresStream.buildStreams(builder);
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
      
      





.







streams.start();
      
      





.







public static void run(Properties config) {
    StreamsBuilder builder = new StreamsBuilder();
    FeaturesStream featuresStream = new FeaturesStream(config);
    featuresStream.buildStreams(builder);
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());
    CountDownLatch latch = new CountDownLatch(1);
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        streams.close();
        latch.countDown();
    }));
    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
      
      





.







java -jar features-stream-1.0.0.jar -c plain.properties
      
      





: Java 1.8.

: kafka 2.6.0, jsoup 1.13.1.









. .







First: it allows you to quickly construct a topic-into union.

Second: allows you to quickly start merging in different environments.







It is worth noting that the solution imposes a constraint on the structure of the input data. Namely, topic-and must have a tabular structure. To overcome this limitation, you can introduce an additional layer that will allow you to reduce various structures to tabular.







For industrial implementation of full functionality, you should pay attention to a very powerful and, most importantly, flexible functionality: KSQL .







Links and Resources



Source code ;

Meet Michelangelo: Uber's Machine Learning Platform .








All Articles