In this article, we will go through the not yet considered libraries for working with distributed transactions, queues and databases, which can be found in our repository on GitHub (the source is here ), and Nuget packages are here .
ViennaNET.Sagas
When a project transitions to DDD and a microservice architecture, then when the business logic is spread across different services, a problem arises associated with the need to implement the mechanism of distributed transactions, because many scenarios often affect several domains at once. You can learn more about such mechanisms, for example, in the book "Microservices Patterns" by Chris Richardson .
In our projects, we have implemented a simple but useful mechanism: a saga, or rather a saga based on orchestration. Its essence is as follows: there is a certain business scenario in which it is necessary to sequentially perform operations in different services, while, in case of any problems at any step, it is necessary to call the rollback procedure of all previous steps, where it is provided. Thus, at the end of the saga, regardless of success, we get consistent data across all domains.
Our implementation is still basic and not tied to the use of any methods of interaction with other services. It is not difficult to use it: it is enough to inherit from the base abstract class SagaBase <T>, where T is your context class, in which you can store the initial data required for the saga to work, as well as some intermediate results. The context instance will be forwarded to all steps at runtime. The saga itself is a stateless class, so the instance can be placed in the DI as a Singleton to get the required dependencies.
Example declaration:
public class ExampleSaga : SagaBase<ExampleContext>
{
public ExampleSaga()
{
Step("Step 1")
.WithAction(c => ...)
.WithCompensation(c => ...);
AsyncStep("Step 2")
.WithAction(async c => ...);
}
}
Call example:
var saga = new ExampleSaga();
var context = new ExampleContext();
await saga.Execute(context);
Full examples of different implementations can be found here and in the assembly with tests .
ViennaNET.Orm. *
A set of libraries for working with various databases through Nhibernate. We use the DB-First approach with the use of Liquibase, so there is only functionality for working with data in the finished database.
ViennaNET.Orm.Seedwork ViennaNET.Orm
- main assemblies containing base interfaces and their implementations, respectively. Let's dwell on their content in more detail.
The interface
IEntityFactoryService
and its implementation EntityFactoryService
are the main starting point for working with the database, since the Unit of Work, repositories for working with specific entities, as well as executors of commands and direct SQL queries are created here. Sometimes it is convenient to restrict the capabilities of a class for working with a database, for example, to enable read-only data. For such cases, it IEntityFactoryService
has an ancestor - an interface IEntityRepositoryFactory
in which only the method for creating repositories is declared.
For direct access to the database, the provider mechanism is used. For each used in our database teams have their own implementation:
ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql
.
At the same time, several providers can be registered in one application at the same time, which allows, for example, within the framework of one service, without any costs for updating the infrastructure, a step-by-step migration from one DBMS to another. The mechanism for selecting the required connection and, therefore, the provider for a specific entity class (for which the mapping to the database tables is written) is implemented through registering the entity in the BoundedContext class (contains a method for registering domain entities) or its successor ApplicationContext (contains methods for registering application entities , direct requests and commands), where the connection identifier from the configuration is taken as an argument:
"db": [
{
"nick": "mssql_connection",
"dbServerType": "MSSQL",
"ConnectionString": "...",
"useCallContext": true
},
{
"nick": "oracle_connection",
"dbServerType": "Oracle",
"ConnectionString": "..."
}
],
Example ApplicationContext:
internal sealed class DbContext : ApplicationContext
{
public DbContext()
{
AddEntity<SomeEntity>("mssql_connection");
AddEntity<MigratedSomeEntity>("oracle_connection");
AddEntity<AnotherEntity>("oracle_connection");
}
}
If no connection identifier is specified, the connection named "default" will be used.
Direct mapping of entities to database tables is implemented using standard NHibernate tools. You can use the description both through xml files and through classes. For convenient writing stub repositories in Unit tests, there is a library
ViennaNET.TestUtils.Orm
.
Full examples of using ViennaNET.Orm. * Can be found here .
ViennaNET.Messaging. *
A set of libraries for working with queues.
To work with queues, the same approach was chosen as with various DBMS, namely, the maximum possible unified approach in terms of working with the library, regardless of the queue manager used. The library
ViennaNET.Messaging
is just responsible for this unification, and ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue ViennaNET.Messaging.KafkaQueue
contains the adapter implementations for IBM MQ, RabbitMQ, and Kafka, respectively.
There are two processes in working with queues: receiving a message and sending.
Consider getting. There are 2 options here: for constant listening and for receiving a single message. To constantly listen to the queue, you first need to describe the processor class inherited from
IMessageProcessor
, which will be responsible for processing the incoming message. Further, it must be "tied" to a specific queue, this is done by registering IQueueReactorFactory
with the queue identifier from the configuration:
"messaging": {
"ApplicationName": "MyApplication"
},
"rabbitmq": {
"queues": [
{
"id": "myQueue",
"queuename": "lalala",
...
}
]
},
An example of starting listening:
_queueReactorFactory.Register<MyMessageProcessor>("myQueue");
var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");
queueReactor.StartProcessing();
Then, when the service starts and the method is called to start listening, all messages from the specified queue will go to the corresponding processor.
To receive a single message in the factory interface,
IMessagingComponentFactory
there is a method CreateMessageReceiver
that will create a recipient waiting for a message from the specified queue:
using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue"))
{
var message = receiver.Receive();
}
To send a message, you must use the same
IMessagingComponentFactory
and create a message sender:
using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue"))
{
sender.SendMessage(new MyMessage { Value = ...});
}
There are three ready-made options for serializing and deserializing a message: just text, XML and JSON, but if necessary, you can safely make your own implementations of the interfaces
IMessageSerializer IMessageDeserializer
.
We tried to preserve the unique capabilities of each queue manager, for example,
ViennaNET.Messaging.MQSeriesQueue
it allows sending not only text messages, but also byte messages, and ViennaNET.Messaging.RabbitMQQueue
supports routing and queuing on the fly. Our adapter wrapper for RabbitMQ also implements some semblance of RPC: we send a message and wait for a response from a special temporary queue that is created for only one response message.
Here is an example of using queues with basic connection nuances .
ViennaNET.CallContext
We use queues not only for integration between different systems, but also for communication between microservices of one application, for example, within the framework of a saga. This led to the need to transfer along with the message such auxiliary data as username, request ID for end-to-end logging, source ip-address and authorization data. To implement the forwarding of this data, a library was developed
ViennaNET.CallContext
that allows storing data from a request entering the service. In this case, how the request was made, through the queue or through Http, does not matter. Then, before sending an outbound request or message, the data is taken out of the context and placed in the headers. Thus, the next service receives auxiliary data and disposes of them in the same way.
Thank you for your attention, we are looking forward to your comments and pull requests!