Spring Integration - dynamic data flows

Fireworks, Habr! Today we will analyze a rather specific area - streaming data processing using the Spring Integration framework and how to make these streams at runtime without preliminary initialization in the application context. A complete sample application is in the Gita .



Introduction



Spring Integration is an Enterprise Integration Framework (EIP) that uses messaging mechanisms under the hood between adapters of different protocols / integration systems based on message channels (conditional queues). Famous analogues are Camel, Mule, Nifi.



From the test case, we will have - to make a REST service that can read the received request parameters, go to our database, for example, postgres, update and fetch from the table data according to the parameters received from the source, and send the result back to the queue (request / response), and also make multiple instances with different request paths.



Conventionally, the data flow diagram will look like this:



image



Next, I will show how you can simply do this without much dancing with a tambourine, using IntegrationFlowContext, with REST-controlled component / thread endpoints. All the main project code will be located in the repository, here I will indicate only a few clippings. Well, who is interested, please, under cat.



Tools



Let's start with the dependency block by default. Basically, we will need spring-boot projects - for the REST ideology of flow / component management, spring-integration - to create our case based on channels and adapters.



And we immediately think what else we need to reproduce the case. In addition to core dependencies, we will need subprojects - integration-http, integration-jdbc, integration-groovy (provides dynamically customizable data transformers based on Goovy scripts). Separately, I will say that in this example we will not use the groovy converter as unnecessary, but we will provide the ability to customize it from outside.



Dependency list
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Internal kitchen



Let's move on to creating the necessary system components (wrappers / models). We will need channel, bean, httpInboundGateway, handler, jdbcOutboundGateway and result models.



  • bean - a helper object needed for adapters, thread
  • channel - channel for delivering messages to / from stream components
  • httpInboundGateway - http access point to which we will further send a request with data for further processing
  • handler - a generic type of handler (groove transformers, various adapters, etc.)
  • jdbcOutboundGateway - jdbc adapter
  • result - handler for sending information to a specific channel


We will need wrappers to store parameters and correctly initialize the components of a whole stream, so we immediately create a component store, add. functionality of JSON converters -> Definition Model. Direct mapping of fields using jackson and objects in my case was not applicable - we have one more bike for a specific communication protocol.



Let's do it nicely right away , using annotations :



StreamComponent - is responsible for identifying classes as a tuning model of a stream component and has service information - the name of the component, the type of the component, whether the component is nested and description;



SettingClass - responsible for additional options for scanning the model, such as scanning super class fields and ignoring fields when initializing values;



SettingValue - responsible for identifying the class field as customizable from outside, with naming settings in JSON, description, type converter, required field flag and internal object flag for informational purposes;



Component storage manager



Helper methods for working with models for REST controllers



Base model - an abstraction with a set of auxiliary fields / model methods



Current flow configuration models



Mapper JSON -> Definition Model



The main ground for work was prepared. Now let's get down to the implementation, directly, of services that will be responsible for the life cycle, storage and initialization of streams and we will immediately lay down the idea that we can parallelize 1 stream with the same naming into several instances, i.e. we will need to make unique identifiers (guids) for all components of the flow, otherwise collisions with other singleton components (beans, channels, etc.) may occur in the application context. But first let's make mappers of two components - http and jdbc, i.e. the increment of the models made earlier to the components of the stream itself (HttpRequestHandlerEndpointSpec and JdbcOutboundGateway).



HttpRegistry



JdbcRegistry



Central Management Service ( StreamDeployingService) performs the functions of storing workers / inactive, registers new ones, starts, stops and removes threads completely from the application context. An important feature of the service is the introduction of the IntegrationFlowBuilderRegistry dependency, which helps us to make the application dynamics (perhaps remember these configuration xml files or DSL classes for kilometers). According to the stream specification, it must always start with an inbound component or channel, so we take this into account in the implementation of the registerStreamContext method.



And the auxiliary manager ( IntegrationFlowBuilderRegistry), which performs the function of both a mapper of models to flow components and initialization of the flow itself using IntegrationFlowBuilder. I also implemented a log handler in the flow pipeline, a service for collecting flow channel metrics (a toggleable option) and a possible implementation of flow message converters based on the Groovy implementation (if suddenly this example becomes the basis for selling, then precompilation of groovy scripts must be done at the stage of flow initialization , because run into load tests in RAM and no matter how many cores and power you have). Depending on the configuration of the model's log-stages and log-level parameters, it will be active after each transmission of a message from component to component. Monitoring is enabled and disabled by a parameter in application.yml:



monitoring:
  injectction:
    default: true


Now we have all the mechanics for initializing dynamic data processing flows, we can additionally write mappers for various protocols and adapters such as RabbitMQ, Kafka, Tcp, Ftp, etc. Moreover, in most cases, you don't need to write anything with your own hand (except, of course, configuration models and auxiliary methods) - a fairly large number of components are already present in the repository .



The final stage will be the implementation of controllers for obtaining information about existing system components, managing flows and obtaining metrics.



ComponentsController - provides information about all components in a human-readable model, and one component by name and type.



StreamController - provides full flow management, namely the initialization of new JSON models, starting, stopping, deleting and issuing metrics by identifier.



Final product



We raise the resulting application and describe the test case in JSON format.



Sample Data Stream
:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Testing:



1) We initialize a new stream using the



POST / stream / deploy method , where our JSON will be in the request body.



In response, the system will have to send if everything is correct, otherwise an error message will be visible:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) We initiate the start using the method:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, where we indicate the identifier of the initialized stream earlier.



In response, the system will have to send if everything is correct, otherwise an error message will be visible:



{
    "status": "SUCCESS", -  
}


3) Calling a stream by an identifier in the system? How, what and where - in the mapper of the HttpRegistry model, I wrote the condition



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


where, the http-inbound-path parameter is taken into account, and if it is not explicitly specified in the component's configuration, it is ignored and the system call path is set. In our case, it will be:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - where the stream identifier is present, with the request body:



{
    "accountId": 1
}


In response, we will receive, if the stages of processing the request worked correctly, we will receive a flat structure of records of the tables account_data and account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


The specificity of the JdbcOutboundGateway adapter is such that if you specify the update-query parameter, an additional handler is registered, which first updates the data, and only then fetch by the query parameter.



If you specify the same paths manually, then the possibility of launching components with HttpInboundGateway as an access point to a stream in several instances will be abolished because the system will not allow registering a similar path.



4) Let's see the metrics using the GET method / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics



Response content
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Conclusion



Thus, it was shown how, having spent a little more time and effort, write an application for integration with various systems than to write additional manual handlers (pipelines) each time in your application for integration with other systems, 200-500 lines of code each.



In the current example, you can parallelize the work of the same type of threads for several instances by means of unique identifiers, avoiding collisions in the global context of the application between thread dependencies (bins, channels, etc.).



In addition, you can develop the project:



  • save streams to the database;
  • make support for all the integration components that the spring and spring-integration community provides us;
  • make workers who would perform work with threads on a schedule;
  • make a sane UI for configuring streams with a conditional "mouse and component cubes" (by the way, the example was partially sharpened for the github.com/spring-cloud/spring-cloud-dataflow-ui project ).


And once again I will duplicate the link to the repository .



All Articles