Let's say we have such a task.
There is a source of transactions in the stock market. This source sends us transactions via the Rest interface.
We need to get these transactions, save them to the database and make a convenient in-memory storage.
This repository should perform the following functions:
- return a list of trades;
- return the full position, i.e. table "instrument" - "current number of securities";
- return a position for a given instrument.
How do we approach this task?
According to the precepts of the microservice fashion, we need to divide the task into microservices components:
- receipt of a transaction by Rest;
- saving the transaction to the database;
- in-memory storage for presenting position data.
Let's make the first and third services within the framework of this tutorial, and leave the second for the second part (write in the comments if it's interesting).
So, we have two microservices.
The first receives data from the outside.
The second processes this data and responds to incoming requests.
Of course, we want to get horizontal scaling, non-stop updates and other benefits of microservices.
What is a very difficult task before us?
There are actually a lot of them, but now let's talk about how data will flow between these microservices. You can also make Rest between them, you can put some kind of queue, you can come up with a lot of things with their pros and cons.
Let's take a look at one of the possible approaches - asynchronous communication through the Axon framework .
What are the advantages of this solution?
Firstly, asynchronous communication increases flexibility (yes, there is a minus here, but we are only talking about pluses so far).
Second, we get Event Sourcing and CQRS right out of the box .
Third, Axon provides a ready-made infrastructure, and we only need to focus on developing business logic.
Let's get started.
We will have the project on gradle. It will have three modules:
- common. module with common data structures (we don't like copy-paste);
- tradeCreator. module with microservice for accepting transactions on Rest;
- tradeQueries. module with microservices for displaying position.
Let's take Spring Boot as a basis and connect the Axon starter.
Axon works fine without Spring, but we'll be using them together.
Here we need to stop and tell you a few words about Axon.
It is a client-server system. There is a server - this is a separate application, we will run it in docker.
And there are clients that embed themselves in microservices.
This is the picture. First, the Axon server (in docker) is launched, then our microservices.
At startup, microservices look for a server and start interacting with it. Interaction can be conditionally divided into two types: technical and business.
The technical one is the exchange of messages "I'm alive" (such messages can be seen in the debug log mode).
Business is obscured by messages like "new deal".
An important feature, after starting the microservice, it can ask the Axon server "what happened" and the server sends the accumulated events to the microservice. Thus, the microservice can be relaunched relatively safely without data loss.
With this exchange scheme, we can very easily run many instances of microservices,
and on different hosts.
Yes, one instance of Axon Server is not reliable, but so far.
We work in the Event Sourcing and CQRS paradigms. This means that we must have "teams", "events" and "samples".
We will have one command: "create a deal", one event "deal created" and three selections: "show all deals", "show position", "show position for an instrument".
The scheme of work is as follows:
- TradeCreator microservice accepts a Rest transaction.
- The tradeCreator microservice creates a "create trade" command and sends it to the Axon server.
- The Axon server receives the command and forwards the command to the interested recipient, in our case it is the tradeCreator microservice.
- The tradeCreator microservice receives a command, generates a "deal created" event and sends it to the Axon server.
- The Axon server receives the event and forwards it to interested subscribers.
- Now we have only one interested recipient - the tradeQueries microservice.
- The tradeQueries microservice receives the event and updates the internal data.
(It is important that at the moment the event is formed, the tradeQueries Microservice may not be available, but as soon as it starts, it will immediately receive the event).
Yes, the axon-server is in the center of communication, all messages go through it.
Let's move on to coding.
In order not to clutter up the post with code, below I will give only fragments, the link to the whole example will be below.
Let's start with the common module.
In it, the common parts are the event (class CreatedTradeEvent). Pay attention to the name, in fact, this is the name of the team that generated this event, but in the past tense. In the past, because first, the command appears, which leads to the creation of the event.
Other common structures include classes for describing a position (class Position), a trade (class Trade) and a side of a trade (enum Side), i.e. buying or selling.
Let's move on to the tradeCreator module.
This module has a Rest interface (class TradeController) for accepting trades.
The command "create a deal" is formed from the received deal and sent to the axon-server.
@PostMapping("/trade")
public ResponseEntity<String> create(@RequestBody Trade trade) {
var createTradeCommand = CreateTradeCommand.builder()
.tradeId(trade.getTradeId())
...
.build();
var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
return ResponseEntity.ok(result.get().toString());
}
To process the command, the class TradeAggregate is used.
For Axon to find it, we add the @Aggregate annotation.
The method for processing the command looks like this (with an abbreviation):
@CommandHandler
public TradeAggregate(CreateTradeCommand command) {
log.info("command: {}", command);
var event = CreatedTradeEvent.builder()
.tradeId(command.tradeId())
....
.build();
AggregateLifecycle.apply(event);
}
An event is generated from the command and sent to the server.
The command is in the CreateTradeCommand class.
Now let's take a look at the last tradeQueries module.
The selections are described in the queries package.
This module also has a
public class TradeController Rest interface .
For example, let's see the processing of the request: "show all transactions".
@GetMapping("/trade/all")
public List<Trade> findAllTrades() {
return queryGateway.query(new FindAllTradesQuery(),
ResponseTypes.multipleInstancesOf(Trade.class)).join();
}
A fetch request is created and sent to the server.
The TradesEventHandler class is used to process the fetch request.
It has a method annotated
@QueryHandler
public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)
It is he who is responsible for fetching data from the in-memory storage.
The question arises as to how information is updated in this store.
To begin with, this is just a collection of ConcurrentHashMaps tailored for specific selections.
To update them, the method is applied:
@EventHandler
public void on(CreatedTradeEvent event) {
log.info("event:{}", event);
var trade = Trade.builder()
...
.build();
trades.put(event.tradeId(), trade);
position.merge(event.shortName(), event.size(),
(oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
}
It receives the "deal created" event and updates the Maps.
These are the highlights of microservices development.
What about Axon's shortcomings?
First, this is the complication of the infrastructure, a point of failure has appeared - the Axon server, all communications go through it.
Secondly, the disadvantage of such distributed systems is very clearly manifested - temporary data inconsistency. In our case, an unacceptably long time may pass between receiving a new deal and updating the data for the samples.
What's left behind the scenes?
Nothing is said at all about Event Sourcing and CQRS, what it is and what it is for.
Without disclosing these concepts, some points might not be clear.
Perhaps some code fragments also require clarification.
We talked about this at an open webinar .
Complete example .