Introduction
Spring Integration is an Enterprise Integration Framework (EIP) that uses messaging mechanisms under the hood between adapters of different protocols / integration systems based on message channels (conditional queues). Famous analogues are Camel, Mule, Nifi.
From the test case, we will have - to make a REST service that can read the received request parameters, go to our database, for example, postgres, update and fetch from the table data according to the parameters received from the source, and send the result back to the queue (request / response), and also make multiple instances with different request paths.
Conventionally, the data flow diagram will look like this:
Next, I will show how you can simply do this without much dancing with a tambourine, using IntegrationFlowContext, with REST-controlled component / thread endpoints. All the main project code will be located in the repository, here I will indicate only a few clippings. Well, who is interested, please, under cat.
Tools
Let's start with the dependency block by default. Basically, we will need spring-boot projects - for the REST ideology of flow / component management, spring-integration - to create our case based on channels and adapters.
And we immediately think what else we need to reproduce the case. In addition to core dependencies, we will need subprojects - integration-http, integration-jdbc, integration-groovy (provides dynamically customizable data transformers based on Goovy scripts). Separately, I will say that in this example we will not use the groovy converter as unnecessary, but we will provide the ability to customize it from outside.
Dependency list
<!-- Spring block -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<!-- Db block -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- Utility block -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
Internal kitchen
Let's move on to creating the necessary system components (wrappers / models). We will need channel, bean, httpInboundGateway, handler, jdbcOutboundGateway and result models.
- bean - a helper object needed for adapters, thread
- channel - channel for delivering messages to / from stream components
- httpInboundGateway - http access point to which we will further send a request with data for further processing
- handler - a generic type of handler (groove transformers, various adapters, etc.)
- jdbcOutboundGateway - jdbc adapter
- result - handler for sending information to a specific channel
We will need wrappers to store parameters and correctly initialize the components of a whole stream, so we immediately create a component store, add. functionality of JSON converters -> Definition Model. Direct mapping of fields using jackson and objects in my case was not applicable - we have one more bike for a specific communication protocol.
Let's do it nicely right away , using annotations :
StreamComponent - is responsible for identifying classes as a tuning model of a stream component and has service information - the name of the component, the type of the component, whether the component is nested and description;
SettingClass - responsible for additional options for scanning the model, such as scanning super class fields and ignoring fields when initializing values;
SettingValue - responsible for identifying the class field as customizable from outside, with naming settings in JSON, description, type converter, required field flag and internal object flag for informational purposes;
Component storage manager
Helper methods for working with models for REST controllers
Base model - an abstraction with a set of auxiliary fields / model methods
Current flow configuration models
Mapper JSON -> Definition Model
The main ground for work was prepared. Now let's get down to the implementation, directly, of services that will be responsible for the life cycle, storage and initialization of streams and we will immediately lay down the idea that we can parallelize 1 stream with the same naming into several instances, i.e. we will need to make unique identifiers (guids) for all components of the flow, otherwise collisions with other singleton components (beans, channels, etc.) may occur in the application context. But first let's make mappers of two components - http and jdbc, i.e. the increment of the models made earlier to the components of the stream itself (HttpRequestHandlerEndpointSpec and JdbcOutboundGateway).
HttpRegistry
JdbcRegistry
Central Management Service ( StreamDeployingService) performs the functions of storing workers / inactive, registers new ones, starts, stops and removes threads completely from the application context. An important feature of the service is the introduction of the IntegrationFlowBuilderRegistry dependency, which helps us to make the application dynamics (perhaps remember these configuration xml files or DSL classes for kilometers). According to the stream specification, it must always start with an inbound component or channel, so we take this into account in the implementation of the registerStreamContext method.
And the auxiliary manager ( IntegrationFlowBuilderRegistry), which performs the function of both a mapper of models to flow components and initialization of the flow itself using IntegrationFlowBuilder. I also implemented a log handler in the flow pipeline, a service for collecting flow channel metrics (a toggleable option) and a possible implementation of flow message converters based on the Groovy implementation (if suddenly this example becomes the basis for selling, then precompilation of groovy scripts must be done at the stage of flow initialization , because run into load tests in RAM and no matter how many cores and power you have). Depending on the configuration of the model's log-stages and log-level parameters, it will be active after each transmission of a message from component to component. Monitoring is enabled and disabled by a parameter in application.yml:
monitoring:
injectction:
default: true
Now we have all the mechanics for initializing dynamic data processing flows, we can additionally write mappers for various protocols and adapters such as RabbitMQ, Kafka, Tcp, Ftp, etc. Moreover, in most cases, you don't need to write anything with your own hand (except, of course, configuration models and auxiliary methods) - a fairly large number of components are already present in the repository .
The final stage will be the implementation of controllers for obtaining information about existing system components, managing flows and obtaining metrics.
ComponentsController - provides information about all components in a human-readable model, and one component by name and type.
StreamController - provides full flow management, namely the initialization of new JSON models, starting, stopping, deleting and issuing metrics by identifier.
Final product
We raise the resulting application and describe the test case in JSON format.
Sample Data Stream
:
: order — , .. , . ( ). — .
CREATE TABLE IF NOT EXISTS account_data
(
id INT NOT NULL,
accountname VARCHAR(45) NOT NULL,
password VARCHAR(128),
email VARCHAR(255),
last_ip VARCHAR(15) DEFAULT NULL NOT NULL
);
CREATE UNIQUE INDEX account_data_username_uindex
ON account_data (accountname);
ALTER TABLE account_data
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME account_data_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
ALTER TABLE account_data
ADD CONSTRAINT account_data_pk
PRIMARY KEY (id);
CREATE TABLE IF NOT EXISTS account_info
(
id INT NOT NULL,
banned BOOLEAN DEFAULT FALSE,
premium_points INT DEFAULT 0,
premium_type SMALLINT DEFAULT -1
);
ALTER TABLE account_info
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME account_info_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
ALTER TABLE account_info
ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE account_info
ADD CONSTRAINT account_info_pk
PRIMARY KEY (id);
INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);
: order — , .. , . ( ). — .
{
"flowName": "Rest Postgres stream",
"components": [
{
"componentName": "bean",
"componentType": "other",
"componentParameters": {
"id": "pgDataSource",
"bean-type": "com.zaxxer.hikari.HikariDataSource",
"property-args": [
{
"property-name": "username",
"property-value": "postgres"
},
{
"property-name": "password",
"property-value": "postgres"
},
{
"property-name": "jdbcUrl",
"property-value": "jdbc:postgresql://localhost:5432/test"
},
{
"property-name": "driverClassName",
"property-value": "org.postgresql.Driver"
}
]
}
},
{
"componentName": "message-channel",
"componentType": "source",
"componentParameters": {
"id": "jdbcReqChannel",
"order": 1,
"channel-type": "direct",
"max-subscribers": 1000
}
},
{
"componentName": "message-channel",
"componentType": "source",
"componentParameters": {
"id": "jdbcRepChannel",
"order": 1,
"channel-type": "direct"
}
},
{
"componentName": "http-inbound-gateway",
"componentType": "source",
"componentParameters": {
"order": 2,
"http-inbound-supported-methods": [
"POST"
],
"payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
"log-stages": true,
"log-level": "INFO",
"request-channel": "jdbcReqChannel",
"reply-channel": "jdbcRepChannel"
}
},
{
"componentName": "handler",
"componentType": "processor",
"componentParameters": {
"order": 3,
"handler-definition": {
"componentName": "jdbc-outbound-adapter",
"componentType": "app",
"componentParameters": {
"data-source": "pgDataSource",
"query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
"update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
"jdbc-reply-channel": "jdbcRepChannel",
"log-stages": true,
"log-level": "INFO"
}
}
}
},
{
"componentName": "result",
"componentType": "app",
"componentParameters": {
"order": 4,
"cancel": false,
"result-channel": "jdbcRepChannel"
}
}
]
}
Testing:
1) We initialize a new stream using the
POST / stream / deploy method , where our JSON will be in the request body.
In response, the system will have to send if everything is correct, otherwise an error message will be visible:
{
"status": "SUCCESS", -
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -
}
2) We initiate the start using the method:
GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, where we indicate the identifier of the initialized stream earlier.
In response, the system will have to send if everything is correct, otherwise an error message will be visible:
{
"status": "SUCCESS", -
}
3) Calling a stream by an identifier in the system? How, what and where - in the mapper of the HttpRegistry model, I wrote the condition
Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))
where, the http-inbound-path parameter is taken into account, and if it is not explicitly specified in the component's configuration, it is ignored and the system call path is set. In our case, it will be:
POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - where the stream identifier is present, with the request body:
{
"accountId": 1
}
In response, we will receive, if the stages of processing the request worked correctly, we will receive a flat structure of records of the tables account_data and account_info.
{
"accountname": "test",
"password": "test",
"email": "test@test",
"last_ip": "127.0.0.1",
"banned": true,
"premium_points": 1000,
"premium_type": 1
}
The specificity of the JdbcOutboundGateway adapter is such that if you specify the update-query parameter, an additional handler is registered, which first updates the data, and only then fetch by the query parameter.
If you specify the same paths manually, then the possibility of launching components with HttpInboundGateway as an access point to a stream in several instances will be abolished because the system will not allow registering a similar path.
4) Let's see the metrics using the GET method / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics
Response content
, / , / / :
[
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
"sendDuration": {
"count": 1,
"min": 153.414,
"max": 153.414,
"mean": 153.414,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 153.414,
"minSendDuration": 153.414,
"meanSendDuration": 153.414,
"meanSendRate": 0.001195117818082359,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 1.1102230246251565E-16
},
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
"sendDuration": {
"count": 1,
"min": 0.1431,
"max": 0.1431,
"mean": 0.1431,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 0.1431,
"minSendDuration": 0.1431,
"meanSendDuration": 0.1431,
"meanSendRate": 0.005382436008121413,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 0.0
},
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
"sendDuration": {
"count": 1,
"min": 0.0668,
"max": 0.0668,
"mean": 0.0668,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 0.0668,
"minSendDuration": 0.0668,
"meanSendDuration": 0.0668,
"meanSendRate": 0.001195118373693797,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 1.1102230246251565E-16
}
]
Conclusion
Thus, it was shown how, having spent a little more time and effort, write an application for integration with various systems than to write additional manual handlers (pipelines) each time in your application for integration with other systems, 200-500 lines of code each.
In the current example, you can parallelize the work of the same type of threads for several instances by means of unique identifiers, avoiding collisions in the global context of the application between thread dependencies (bins, channels, etc.).
In addition, you can develop the project:
- save streams to the database;
- make support for all the integration components that the spring and spring-integration community provides us;
- make workers who would perform work with threads on a schedule;
- make a sane UI for configuring streams with a conditional "mouse and component cubes" (by the way, the example was partially sharpened for the github.com/spring-cloud/spring-cloud-dataflow-ui project ).
And once again I will duplicate the link to the repository .