Apache Kafka — , 30% Fortune 500. Kafka , , , , Kafka.
, — . , , Kafka, .
1.
KafkaProducer
acks
. acks
, , . :
none
— . .
one
— , .
all
— .
, : , , .
acks=all
. acks
all Kafka, Kafka — . , . , . :
,
.
acks=all
. , . . - , . . , , ! , , .
:
- ,
, «»
.
acks=all
, . , , , .
, , . acks=all
. .
: min.insync.replicas
. min.insync.replicas
, , . min.insync.replicas
, . min.insync.replicas
— 1. , , 2.
:
! min.insync.replicas=2 , .
, . NotEnoughReplicasException
NotEnoughReplicasAfterAppendException
, . , , , .
min.insync.replicas
acks , .
— Kafka. API Kafka , .
2. sticky partitioner API
Kafka , . Kafka -, null. Kafka . . . Partitioner .
ProducerRecord ProducerRecord. .
, ProducerRecord , . , .
ProducerRecord , Kafka (round-robin). 0, — 1 , . 0 .
:
. round robin , .
. : . , . , .
. . , , :
: 2 0.
.
round robin. .
, . — , .
Apache Kafka 2.4.0 sticky partitioner, . Sticky partitioner round robin , , . , , sticky partitioner:
sticky partitioner, 0. — 1. — 2, .
, , . , partitioner . , , .
, Apache Kafka sticky partitioner, KIP-480.
.
3. stop-the-world
Kafka — , , . , Kafka , — , . , , Kafka , .
Kafka 2.4 — . , .
, , . group.id . . .
:
, . , ? , ? , — .
:
, 2 - . . () . , 2 , . , , .
. , , stop-the-world. , ConsumerPartitionAssignor
, , , .
eager rebalancing
. — , . , .
. , , — (incremental cooperative rebalancing). Kafka Connect Apache Kafka 2.3, . . . , , .
, , , , . , .
, , . , . CooperativeStickyAssignor
. CooperativeStickyAssignor
, .
, partition.assignment.strategy
CooperativeStickyAssignor
. . , . Kafka Streams, . Kafka Streams , .
4.
Apache Kafka bin
. . console-consumer
, console-producer
, dump-log
delete-records
.
Kafka console producer
console-producer . -, . console-producer:
kafka-console-producer --topic \ --broker-list <broker-host:port>
. Enter, .
, . , — :
kafka-console-producer --topic \ --broker-list <broker-host:port> \ --property parse.key=true \ --property key.separator=":"
key.separator
. . - . Confluent Schema Registry, CLI Avro, Protobuf JSON.
: .
Kafka console consumer
console-consumer . . , . , , :
kafka-console-consumer --topic \ --bootstrap-server <broker-host:port>
( ). , --from-beginning
.
kafka-console-consumer --topic <topic-name> \ --bootstrap-server <broker-host:port> \ --from-beginning
Schema Registry, CLI Avro, Protobuf JSON. Schema Registry Avro, Protobuf JSON, Java: String, Long, Double, Integer . . String.
, --key-deserializer
--value-deserializer
(FQN) .
, , . , :
kafka-console-consumer --topic \ --bootstrap-server <broker-host:port> \ --property print.key=true --property key.separator=":"
, .
Dump log
Kafka — - . kafka-dump-log
. , example
:
kafka-dump-log \ --print-data-log \ --files ./var/lib/kafka/data/example-0/00000000000000000000.log
-
--print-data-log
. -
--files
. , .
,kafka-dump-log
--help
.
:
Dumping ./var/lib/kafka/data/example-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1599775774460 size: 81 magic: 2 compresscodec: NONE crc: 3162584294 isvalid: true | offset: 0 CreateTime: 1599775774460 keysize: 3 valuesize: 10 sequence: -1 headerKeys: [] key: 887 payload: -2.1510235 baseOffset: 1 lastOffset: 9 count: 9 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1599775774468 size: 252 magic: 2 compresscodec: NONE crc: 2796351311 isvalid: true | offset: 1 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 5 payload: 33.440664 | offset: 2 CreateTime: 1599775774463 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 60024247 payload: 9.1408728 | offset: 3 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 1 payload: 45.348946 | offset: 4 CreateTime: 1599775774464 keysize: 6 valuesize: 10 sequence: -1 headerKeys: [] key: 241795 payload: -63.786373 | offset: 5 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 53596698 payload: 69.431393 | offset: 6 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 33219463 payload: 88.307875 | offset: 7 CreateTime: 1599775774466 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 0 payload: 39.940350 | offset: 8 CreateTime: 1599775774467 keysize: 5 valuesize: 9 sequence: -1 headerKeys: [] key: 78496 payload: 74.180098 | offset: 9 CreateTime: 1599775774468 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 89866187 payload: 79.459314
dump-log
, key
, payload
, . -, 10 . . — . dump-log , --key-decoder-class
--value-decoder-class
.
Delete records
Kafka . . , . , Kafka :
-
log.retention.hours
. 168 ( ). -
log.retention.bytes
, .
log.retention.bytes
-1, . , . . , . , Kafka .
kafka-delete-records
:
--bootstrap-server
— ;--offset-json-file
— JSON .
JSON:
{ "partitions": [ {"topic": "example", "partition": 0, "offset": -1} ], "version":1 }
, JSON . JSON. JSON :
- topic — , ;
- partition — , ;
- offset — , , .
dump-log, JSON. , JSON .
offset
JSON? 10 , . . , , , 42. -1
, high watermark
, . high watermark
— ( ).
, :
kafka-delete-records --bootstrap-server <broker-host:port> \ --offset-json-file offsets.json
:
Executing records delete operation Records delete operation completed: partition: example-0 low_watermark: 10
, Kafka example-0
. low_watermark
, 10, , . example topic
10 , 0 9, . : KIP-107 KIP-204.
5.
Apache Kafka 0.11 . Kafka, - . , , . , .
, ? .
- -, compacted , , . compaction , .
- -, , , . . , .
. KIP, , :
- .
- APM (, Appdynamics Dynatrace) , .
- , , .
- , .
, , , Kafka.
Kafka
Java ProducerRecord
:
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value");
producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8));
producerRecord.headers().add("data-file", "incoming-data.txt".getBytes(StandardCharsets.UTF_8));
// Details left out for clarity
producer.send(producerRecord);
-
ProducerRecord
. -
ProducerRecord.headers()
. - .
? header String
. . .
ProducerRecord
, Iterable<Header>
. , Header
, Iterable
. , .
, , .
:
//Details left out for clarity ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { for (Header header : consumerRecord.headers()) { System.out.println("header key " + header.key() + "header value " + new String(header.value())); } }
-
ConsumerRecords
. -
ConsumerRecord
. - .
, ConsumerRecord.headers()
, . . , . KIP-431 ConsoleConsumer. Apache Kafka 2.7.0.
kafkacat. :
kafkacat -b kafka-broker:9092 -t my_topic_name -C \ -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
Apache Kafka :
Apache Kafka
Kafka
Kafka ? ,
Kafka
Apache Kafka 200 ?