, . , . , . , .
, . Kafka Streams Microservices Framework, , . streaming-ops , . , , , streaming-ops, .
, . :
" ".
, , .
, , , .
Spring Boot
, , Spring Boot ( Spring) .
Spring Boot , , , , , - , Apache Kafka. Spring Boot Kafka Streams.
streaming-ops - , , , Kafka Streams. Spring Boot, GitHub. .
Kafka
Spring for Apache Kafka Spring Kafka, Kafka Streams DSL Processor API. Spring dependency injection (DI) framework. Kafka Streams, :
@Autowired
public void orderTable(final StreamsBuilder builder) {
logger.info("Building orderTable");
builder
.table(
this.topic,
Consumed.with(Serdes.String(), orderValueSerde()),
Materialized.as(STATE_STORE))
.toStream()
.peek((k,v) -> logger.info("Table Peek: {}", v));
}
@Autowired
Spring DI , StreamsBuilder
, DSL- Kafka Streams. -, Kafka Streams .
Spring , Apache Kafka. application.properties, Profiles Spring. application.properties :
# ###############################################
# For Kafka, the following values can be
# overridden by a 'traditional' Kafka
# properties file
bootstrap.servers=localhost:9092
...
# Spring Kafka
spring.kafka.properties.bootstrap.servers=${bootstrap.servers}
...
, spring.kafka.properties.bootstrap.servers
bootstrap.servers
${var.name}
.
Spring config
. , , application-<profile-name>.properties
, . , spring.profiles.active
, . streaming-ops
, , SPRING_PROFILES_ACTIVE
.
Spring Gradle Spring. dependency-management plugin
, build.gradle:
plugins {
id 'org.springframework.boot' version '2.3.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
id 'java'
}
Spring , :
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
...
REST-
Spring REST- Java HTTP. , API Kafka Streams. , Spring, , HTTP-:
@GetMapping(value = "/orders/{id}", produces = "application/json")
public DeferredResult<ResponseEntity> getOrder(
@PathVariable String id,
@RequestParam Optional timeout) {
final DeferredResult<ResponseEntity> httpResult =
new DeferredResult<>(timeout.orElse(5000L));
...
Confluent , Spring Apache Kafka (, Advanced Testing Techniques for Spring for Apache Kafka). , Java-, Spring DI, Kafka Kafka, Kafka Streams AdminClient
:
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class OrderProducerTests {
...
Spring DI , Kafka, :
@Autowired
private OrderProducer producer;
...
@Test
public void testSend() throws Exception {
...
List producedOrders = List.of(o1, o2);
producedOrders.forEach(producer::produceOrder);
...
dev
, ; CI, PR . , , , dev , .
streaming-ops Kubernetes GitOps . dev, dev, Kustomize Deployment, PR .
PR , GitOps, . Kubernetes , .
, , REST-, . REST, Kubernetes - Makefile
, curl
HTTP:
$ make prompt
bash-5.0# curl -XGET http://orders-service
curl: (7) Failed to connect to orders-service port 80: Connection refused
HTTP , :
kubectl logs deployments/orders-service | grep ERROR
2020-11-22 20:56:30.243 ERROR 21 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [order-table-4cca220a-53cb-4bd5-8c34-d00a5aa77e63-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: order-table
, , . , , . GitOps . GitHub PR, PR, .
PR , GitOps , . . dev
.
, . , .
PR, . , Java Archive (JAR).
application.yaml
HTTP- :
Server:
Port: 18894
application.properties
( Spring Apache Kafka) ID Kafka Streams , Confluent Cloud ACL:
spring.kafka.streams.application-id=OrdersService
PR , CI/CD GitHub Actions . PR Action Docker- .
PR dev
. , .
$ make prompt
bash-5.0# curl http://orders-service/actuator/health
{"status":"UP","groups":["liveness","readiness"]}
bash-5.0# curl -XGET http://orders-service/v1/orders/284298
{"id":"284298","customerId":0,"state":"FAILED","product":"JUMPERS","quantity":1,"price":1.0}
, Confluent Cloud CLI orders
Avro (. Confluent Cloud CLI CLI).
➜ ccloud kafka topic consume orders --value-format avro
Starting Kafka Consumer. ^C or ^D to exit
{"quantity":1,"price":1,"id":"284320","customerId":5,"state":"CREATED","product":"UNDERPANTS"}
{"id":"284320","customerId":1,"state":"FAILED","product":"STOCKINGS","quantity":1,"price":1}
{"id":"284320","customerId":1,"state":"FAILED","product":"STOCKINGS","quantity":1,"price":1}
^CStopping Consumer.
prd
, , . GitOps . , .
-, . Kustomize Kubernetes, . streaming-ops Makefile :
➜ make test-prd test-dev >/dev/null; diff .test/dev.yaml .test/prd.yaml | grep "orders-service"
< image: cnfldemos/orders-service:sha-82165db
> image: cnfldemos/orders-service:sha-93c0516
, Docker dev
prd
. PR, prd
dev
. , , dev
. dev
- , dev. PR prd:
, , , diff
grep
:
➜ make test-prd test-dev >/dev/null; diff .test/dev.yaml .test/prd.yaml | grep "orders-service"
➜
PR , FluxCD prd
. jq
kubectl
--context
, dev
prd
:
➜ kubectl --context= get deployments/orders-service -o json | jq -r '.spec.template.spec.containers | .[].image'
cnfldemos/orders-service:sha-82165db
➜ kubectl --context= get deployments/orders-service -o json | jq -r '.spec.template.spec.containers | .[].image'
cnfldemos/orders-service:sha-82165db
curl
, , . kubectl
:
➜ kubectl config use-context <your-prd-k8s-context>
Switched to context "kafka-devops-prd".
- prd
, REST- :
➜ make prompt
Launching-util-pod--------------------------------
➜ kubectl run --tty -i --rm util --image=cnfldemos/util:0.0.5 --restart=Never --serviceaccount=in-cluster-sa --namespace=default
If you don't see a command prompt, try pressing enter.
bash-5.0#
(“” - health) :
bash-5.0# curl -XGET http://orders-service/actuator/health
{"status":"UP","groups":["liveness","readiness"]}
bash-5.0# exit
, , , orders-and-payments-simulator
:
➜ kubectl logs deployments/orders-and-payments-simulator | tail -n 5
Getting order from: http://orders-service/v1/orders/376087 .... Posted order 376087 equals returned order: OrderBean{id='376087', customerId=2, state=CREATED, product=STOCKINGS, quantity=1, price=1.0}
Posting order to: http://orders-service/v1/orders/ .... Response: 201
Getting order from: http://orders-service/v1/orders/376088 .... Posted order 376088 equals returned order: OrderBean{id='376088', customerId=5, state=CREATED, product=STOCKINGS, quantity=1, price=1.0}
Posting order to: http://orders-service/v1/orders/ .... Response: 201
Getting order from: http://orders-service/v1/orders/376089 .... Posted order 376089 equals returned order: OrderBean{id='376089', customerId=1, state=CREATED, product=JUMPERS, quantity=1, price=1.0}
REST , /v1/validated
. 201 , , , Kafka Streams.
. , . GitOps , . , DevOps, , , , , - PRs !
«Microservice Architecture». « , ». , , .