How DDD helped us build new revisions in pizzerias

In pizzerias, it is important to build a system of accounting and inventory management. The system is needed in order not to lose products, not to carry out unnecessary write-offs and to correctly forecast purchases for the next month. An important role in accounting for revisions. They help you check food balances and check the actual quantity and what is in the system.







The audit at Dodo is not paper-based: the auditor has a tablet where the auditor notes all products and creates reports. But until 2020, the revision in pizzerias was carried out precisely on pieces of paper - simply because it was easier and easier that way. This, of course, led to inaccurate data, errors and losses - people make mistakes, pieces of paper are lost, and there are many more. We decided to fix this problem and improve the tablet way. The implementation decided to use DDD. How we did it, we will tell you further.



First, briefly about business processes in order to understand the context. Let's consider the scheme of the movement of products, and where in it the revisions, and then move on to the technical details, which will be many.



Scheme of the movement of products and why a revision is needed



There are more than 600 pizzerias in our network (and this number will continue to grow). Every day there is a movement of raw materials in each of them: from the preparation and sale of products, write-off of ingredients by expiration date, to the movement of raw materials to other pizzerias of the chain. The balance of the pizzeria constantly contains about 120 items necessary for the production of products, and in addition a lot of consumables, household materials and chemicals to keep the pizzeria clean. All this requires "accounting" in order to know which raw materials are in abundance and which are lacking. 



"Accounting" describes any movement of raw materials in pizzerias. Delivery is a plus on the balance sheet, and write-off is a minus. For example, when we order a pizza, the cashier accepts the order and sends it for processing. The dough is then rolled out and stuffed with ingredients such as cheese, tomato sauce and pepperoni. All these products go into production - are written off. Also, write-off can occur when the expiration date ends.



As a result of deliveries and write-offs, "warehouse balances" are formed. This is a report that reflects how much raw materials are on the balance sheet based on operations in the information system. All this is the "settlement balance". But there is an "actual value" - how much raw material is actually in stock now.



Revisions



To calculate the actual value, "revisions" are used (they are also called "inventories"). 



Audits help to accurately calculate the quantity of raw materials for purchases. Too many purchases will freeze working capital, and the risk of writing off excess products will increase, which also leads to losses. Not only the surplus of raw materials is dangerous, but also its shortage - this can lead to a halt in the production of some products, which will lead to a decrease in revenue. Audits help to see how much profit a business is losing due to recorded and unaccounted losses of raw materials, and to work to reduce costs.



Revisions share their data with due regard for further processing, for example, building reports.



Problems in the revision process, or How old revisions worked



Revisions are a laborious process. It takes a lot of time and consists of several stages: counting and fixing the remains of raw materials, summing up the results of raw materials by storage areas, entering the results into the Dodo IS information system.



Previously, audits were carried out with a pen and paper form, on which there was a list of raw materials. When manually summarizing, reconciling and transferring results to Dodo IS, there is a possibility of making a mistake. In a full audit, more than 100 items of raw materials are counted, and the calculation itself is often carried out in the late evening or early morning, from which concentration can suffer.



How to solve the problem



Our Game of Threads team is developing accounting in pizzerias. We decided to launch a project called an “auditor's tablet”, which will simplify the audit of pizzerias. We decided to do everything in our own information system Dodo IS, in which the main components for accounting are implemented, so we do not need integrations with third-party systems. In addition, all countries of our presence will be able to use the tool without resorting to additional integrations.



Even before starting work on the project, we in the team discussed the desire to apply DDD in practice. Fortunately, one of the projects has already successfully applied this approach, so we had an example that you can look at - this is the “ cash desk ” project.



In this article, I will talk about the tactical DDD patterns that we used in development: aggregates, commands, domain events, application service, and bounded contexts integration. We will not describe the strategic patterns and fundamentals of DDD, otherwise the article will be very long. We already talked about this in the article “ What can you learn about Domain Driven Design in 10 minutes? "



New version of revisions



Before starting the audit, you need to know what exactly to count. For this we need revision templates . They are configured by the "office manager" role. Revision template is an InventoryTemplate entity. It contains the following fields:



  • template identifier;

  • pizzeria ID;

  • template name;

  • revision category: monthly, weekly, daily;

  • units;

  • storage areas and raw materials in this storage area 



For this entity, a CRUD functionality has been implemented and we will not dwell on it in detail.



Once the auditor has a list of templates, he can start the audit . This usually happens when the pizzeria is closed. At this moment, there are no orders and the raw materials are not moving - you can reliably get data on balances.



Starting the audit, the auditor selects a zone, for example, a refrigerator, and goes to count the raw materials there. In the refrigerator he sees 5 packs of cheese, 10 kg each, enters 10 kg * 5 into the calculator, presses "Enter more". Then he notices 2 more packs on the top shelf, and clicks "Add". As a result, he has 2 measurements - 50 and 20 kg each.



Meteringwe call the entered quantity of raw materials by the inspector in a certain area, but not necessarily the total. The inspector can enter two measurements of one kilogram or just two kilograms in one measurement - any combination can be. The main thing is that the auditor himself should be clear.





Calculator interface.



So, step by step, the auditor considers all the raw materials in 1-2 hours, and then completes the audit.



The algorithm of actions is quite simple:



  • the auditor can start the audit;

  • the auditor can add measurements in the started revision;

  • the auditor can complete the audit.



From this algorithm, business requirements for the system are formed.



Implementation of the first version of the aggregate, commands and events of the domain



First, let's define the terms that are included in the DDD tactical template set. We will refer to them in this article.



Tactical DDD Templates



Aggregate is a cluster of entity and value objects. Objects in a cluster are a single entity in terms of data modification. Each aggregate has a root element through which entities and values ​​are accessed. Units should not be designed too large. They will consume a lot of memory, and the likelihood of a successful transaction will decrease.



Aggregate boundary is a set of objects that must be consistent within a single transaction: all invariants within this cluster must be observed.



Invariants are business rules that cannot be inconsistent.



CommandIs some kind of action on the unit. As a result of this action, the state of the aggregate can be changed, and one or more domain events can be generated.



A domain event is a notification of a change in the state of an aggregate, needed to maintain consistency. The aggregate ensures transactional consistency: all data must be changed here and now. The resulting consistency guarantees consistency in the end - the data will change, but not here and now, but after an indefinite period of time. This interval depends on many factors: the congestion of message queues, the readiness of external services to process these messages, the network.



Root elementIs an entity with a unique global identifier. Child elements can only have local identity within a whole aggregate. They can refer to each other and can only reference their root element.



Teams and events



Let's describe the business requirement as a team. Commands are just DTOs with descriptive fields.



The command "add measurement" has the following fields:



  • measurement value - the amount of raw materials in a certain unit of measurement, can be null if the measurement has been deleted;

  • version - the measurement can be edited, so a version is needed;

  • raw material identifier;

  • unit of measure: kg / g, l / ml, pieces;

  • storage area identifier.



Measurement adding command code
public sealed class AddMeasurementCommand
{
    // ctor

    public double? Value { get; }
    public int Version { get; }
    public UUId MaterialTypeId { get; }
    public UUId MeasurementId { get; }
    public UnitOfMeasure UnitOfMeasure { get; }
    public UUId InventoryZoneId { get; }
}




We also need an event that will result from the execution of these commands. We mark the event with an interface IPublicInventoryEvent- we will need it for integration with external consumers in the future.



In the event "measurement" the fields are the same as in the command "Add measurement", except that the event also stores the identifier of the unit on which it occurred and its version.



Event code "frozen"
public class MeasurementEvent : IPublicInventoryEvent
{
    public UUId MaterialTypeId { get; set; }
    public double? Value { get; set; }
	
    public UUId MeasurementId { get; set; }
    public int MeasurementVersion { get; set; }
    public UUId AggregateId { get; set; }
    public int Version { get; set; }
    public UnitOfMeasure UnitOfMeasure { get; set; }
    public UUId InventoryZoneId { get; set; }
}




When we have described commands and events, we can implement the aggregate Inventory.



Implementing the Inventory Aggregate





UML aggregate diagram Inventory.



The approach is this: the beginning of the revision initiates the creation of the aggregate Inventory, for this we use the factory method Createand start the revision with the command StartInventoryCommand.



Each command mutates the state of the aggregate and saves events in the list changes, which will be sent to the storage for recording. Also, based on these changes, events for the outside world will be generated.



When the aggregate has Inventorybeen created, we can restore it for each subsequent request to change its state.



  • Changes ( changes) are stored since the last time the unit was restored.

  • The state is restored by a method Restorethat plays all previous events, sorted by version, on the current instance of the aggregate Inventory.



This is the implementation of the idea Event Sourcingwithin the unit. We will Event Sourcingtalk about how to implement the idea within the framework of the repository a little later. There is a nice illustration from Vaughn Vernon's book: The





state of the unit is restored by applying events in the order they occur.



Then several measurements are made by the team AddMeasurementCommand. The audit ends with a command FinishInventoryCommand. The aggregate validates its state in mutating methods to comply with its invariants.



It is important to note that the unit is Inventoryfully versioned, as well as each measurement. With measurements it is more difficult - you have to resolve conflicts in the event handling method When(MeasurementEvent e). In the code, I will only show the processing of the command AddMeasurementCommand.



Aggregate Inventory Code
public sealed class Inventory : IEquatable<Inventory>
{
    private readonly List<IInventoryEvent> _changes = new List<IInventoryEvent>();

    private readonly List<InventoryMeasurement> _inventoryMeasurements = new List<InventoryMeasurement>();

    internal Inventory(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
        UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc)
	
        : this(id)
    {
        Version = version;
        UnitId = unitId;
        InventoryTemplateId = inventoryTemplateId;
        StartedBy = startedBy;
        State = state;
        StartedAtUtc = startedAtUtc;
        FinishedAtUtc = finishedAtUtc;
	
    }

    private Inventory(UUId id)
    {
        Id = id;
        Version = 0;
        State = InventoryState.Unknown;
    }
	
    public UUId Id { get; private set; }
    public int Version { get; private set; }
    public UUId UnitId { get; private set; }
    public UUId InventoryTemplateId { get; private set; }
    public UUId StartedBy { get; private set; }
    public InventoryState State { get; private set; }
    public DateTime StartedAtUtc { get; private set; }
    public DateTime? FinishedAtUtc { get; private set; }
    public ReadOnlyCollection<IInventoryEvent> Changes => _changes.AsReadOnly();
	
    public ReadOnlyCollection<InventoryMeasurement> Measurements => _inventoryMeasurements.AsReadOnly();

    public static Inventory Restore(UUId inventoryId, IInventoryEvent[] events)
    {
        var inventory = new Inventory(inventoryId);
        inventory.ReplayEvents(events);
        return inventory;
    }

    public static Inventory Restore(UUId id, int version, UUId unitId, UUId inventoryTemplateId,
        UUId startedBy, InventoryState state, DateTime startedAtUtc, DateTime? finishedAtUtc,
        InventoryMeasurement[] measurements)
    {
        var inventory = new Inventory(id, version, unitId, inventoryTemplateId,
            startedBy, state, startedAtUtc, finishedAtUtc);

        inventory._inventoryMeasurements.AddRange(measurements);

        return inventory;
    }

    public static Inventory Create(UUId inventoryId)
    {
        if (inventoryId == null)
        {
            throw new ArgumentNullException(nameof(inventoryId));
        }

        return new Inventory(inventoryId);
    }

    public void ReplayEvents(params IInventoryEvent[] events)
    {
        if (events == null)
        {
            throw new ArgumentNullException(nameof(events));
        }

        foreach (var @event in events.OrderBy(e => e.Version))
        {
            Mutate(@event);
        }
    }

    public void AddMeasurement(AddMeasurementCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException(nameof(command));
        }

        Apply(new MeasurementEvent
        {
            AggregateId = Id,
            Version = Version + 1,
            UnitId = UnitId,
            Value = command.Value,
            MeasurementVersion = command.Version,
            MaterialTypeId = command.MaterialTypeId,
            MeasurementId = command.MeasurementId,
            UnitOfMeasure = command.UnitOfMeasure,
            InventoryZoneId = command.InventoryZoneId
        });
    }

    private void Apply(IInventoryEvent @event)
    {
        Mutate(@event);
        _changes.Add(@event);
    }

    private void Mutate(IInventoryEvent @event)
    {
        When((dynamic) @event);
        Version = @event.Version;
    }

    private void When(MeasurementEvent e)
    {
        var existMeasurement = _inventoryMeasurements.SingleOrDefault(x => x.MeasurementId == e.MeasurementId);
        if (existMeasurement is null)
    {
        _inventoryMeasurements.Add(new InventoryMeasurement
        {
            Value = e.Value,
            MeasurementId = e.MeasurementId,
            MeasurementVersion = e.MeasurementVersion,
            PreviousValue = e.PreviousValue,
            MaterialTypeId = e.MaterialTypeId,
            UserId = e.By,
            UnitOfMeasure = e.UnitOfMeasure,
            InventoryZoneId = e.InventoryZoneId
        });
    }
    else
    {
        if (!existMeasurement.Value.HasValue)
        {
            throw new InventoryInvalidStateException("Change removed measurement");
        }

        if (existMeasurement.MeasurementVersion == e.MeasurementVersion - 1)
        {
            existMeasurement.Value = e.Value;
            existMeasurement.MeasurementVersion = e.MeasurementVersion;
            existMeasurement.UnitOfMeasure = e.UnitOfMeasure;
            existMeasurement.InventoryZoneId = e.InventoryZoneId;
        }
        else if (existMeasurement.MeasurementVersion < e.MeasurementVersion)
        {
            throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
        }
        else if (existMeasurement.MeasurementVersion == e.MeasurementVersion &&
            existMeasurement.Value != e.Value)
        {
            throw new MeasurementConcurrencyException(Id, e.MeasurementId, e.Value);
        }
        else
        {
            throw new NotChangeException();
        }
    }
}

// Equals
// GetHashCode
}




When the “Measured” event occurs, the presence of an existing measurement with this identifier is checked. If this is not the case, a new measurement is added.



If so, additional checks are needed:



  • you cannot edit a remote measurement;

  • the incoming version must be larger than the previous one.



If the conditions are met, we can set a new value and a new version for the existing measurement. If the version is smaller, then this is a conflict. For this, we throw an exception MeasurementConcurrencyException. If the version matches and the values ​​differ, then this is also a conflict situation. Well, if both the version and the value match, then no changes have occurred. Such situations usually do not arise.



The "measurement" entity contains exactly the same fields as the "Add measurement" command.



Entity code "froze"
public class InventoryMeasurement
{
    public UUId MeasurementId { get; set; }
    public UUId MaterialTypeId { get; set; }
    public UUId UserId { get; set; }
    public double? Value { get; set; }

    public int MeasurementVersion { get; set; }

    public UnitOfMeasure UnitOfMeasure { get; set; }

    public UUId InventoryZoneId { get; set; }
}




The use of public aggregate methods is well demonstrated by Unit tests.



Unit test code "adding a measurement after the start of the revision"
[Fact]
public void WhenAddMeasurementAfterStartInventory_ThenInventoryHaveOneMeasurement()
{
    var inventoryId = UUId.NewUUId();
    var inventory = Domain.Inventories.Entities.Inventory.Create(inventoryId);
    var unitId = UUId.NewUUId();
    inventory.StartInventory(Create.StartInventoryCommand()
        .WithUnitId(unitId)
        .Please());

    var materialTypeId = UUId.NewUUId();
    var measurementId = UUId.NewUUId();
    var measurementVersion = 1;
    var value = 500;
    var cmd = Create.AddMeasurementCommand()
        .WithMaterialTypeId(materialTypeId)
        .WithMeasurement(measurementId, measurementVersion)
        .WithValue(value)
        .Please();
    inventory.AddMeasurement(cmd);

    inventory.Measurements.Should().BeEquivalentTo(new InventoryMeasurement
    {
        MaterialTypeId = materialTypeId,
        MeasurementId = measurementId,
        MeasurementVersion = measurementVersion,
        Value = value,
        UnitOfMeasure = UnitOfMeasure.Quantity
    });
}




Putting it all together: commands, events, Inventory aggregate





Inventory aggregate life cycle when running Finish Inventory.



The diagram shows the process of command processing FinishInventoryCommand. Before processing, it is necessary to restore the state of the unit Inventoryat the time of the command execution. To do this, we load all the events that have been performed on this unit into memory and play them (p. 1). 



At the time of the completion of the revision, we already have the following events - the beginning of the revision and the addition of three measurements. These events appeared as a result of command processing StartInventoryCommandand AddMeasurementCommand, accordingly. In the database, each row in the table contains the revision ID, version, and body of the event itself.



At this stage, we execute the commandFinishInventoryCommand(p. 2). This command will first check the validity of the current state of the unit - that the revision is in a state InProgress, and then will generate a new state change by adding an event FinishInventoryEventto the list changes(item 3).



When the command completes, all changes will be saved to the database. As a result, a new line with the event FinishInventoryEventand the latest version of the unit will appear in the database (p. 4).



Type Inventory(revision) - aggregate and root element in relation to their nested entities. Thus, the type Inventorydefines the boundaries of the unit. The aggregate boundaries include a list of entities of type Measurement(measurement), and a list of all events performed on the aggregate ( changes).



Implementation of the entire feature



By features, we mean the implementation of a specific business requirement. In our example, we will consider the Add Measurement feature. To implement the feature, we need to understand the concept of "application service" ( ApplicationService).



An application service is a direct client of the domain model. Application Services guarantee transactions when using the ACID database, ensuring that state transitions are preserved atomic. In addition, application services also address security concerns.



We already have a unitInventory... To implement the entire feature, we will use the application service entirely. In it, you need to check the presence of all connected entities, as well as the access rights of the user. Only after all the conditions are met, it is possible to save the current state of the unit and send events to the outside world. To implement an application service, we use MediatR.



Feature code "adding measurement"
public class AddMeasurementChangeHandler 
    : IRequestHandler<AddMeasurementChangeRequest, AddMeasurementChangeResponse>
{
    // dependencies
    // ctor

    public async Task<AddMeasurementChangeResponse> Handle(
        AddMeasurementChangeRequest request,
        CancellationToken ct)
    {
        var inventory =
            await _inventoryRepository.GetAsync(request.AddMeasurementChange.InventoryId, ct);
        if (inventory == null)
        {
            throw new NotFoundException($"Inventory {request.AddMeasurementChange.InventoryId} is not found");
        }

        var user = await _usersRepository.GetAsync(request.UserId, ct);
        if (user == null)
        {
            throw new SecurityException();
        }

        var hasPermissions =
        await _authPermissionService.HasPermissionsAsync(request.CountryId, request.Token, inventory.UnitId, ct);
        if (!hasPermissions)
        {
            throw new SecurityException();
        }

        var unit = await _unitRepository.GetAsync(inventory.UnitId, ct);
        if (unit == null)
        {
            throw new InvalidRequestDataException($"Unit {inventory.UnitId} is not found");
        }

        var unitOfMeasure =

Enum.Parse<UnitOfMeasure>(request.AddMeasurementChange.MaterialTypeUnitOfMeasure);


        var addMeasurementCommand = new AddMeasurementCommand(	
            request.AddMeasurementChange.Value,
            request.AddMeasurementChange.Version,
            request.AddMeasurementChange.MaterialTypeId,
            request.AddMeasurementChange.Id,
            unitOfMeasure,
            request.AddMeasurementChange.InventoryZoneId);

        inventory.AddMeasurement(addMeasurementCommand);

        await HandleAsync(inventory, ct);

        return new AddMeasurementChangeResponse(request.AddMeasurementChange.Id, user.Id, user.GetName());
    }

    private async Task HandleAsync(Domain.Inventories.Entities.Inventory inventory, CancellationToken ct)
    {
            await _inventoryRepository.AppendEventsAsync(inventory.Changes, ct);
 
            try
            {
                await _localQueueDataService.Publish(inventory.Changes, ct);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "error occured while handling action");
            }
    }
}




Event sourcing



During implementation, we decided to choose the ES approach for several reasons:



  • Dodo has examples of successful use of this approach.

  • ES makes it easier to understand the problem during an incident - all user actions are stored.

  • If you take the traditional approach, you won't be able to move to ES.



The idea of ​​implementation is quite simple - we add all new events that appeared as a result of commands to the database. To restore the aggregate, we get all the events and play them on the instance. In order not to get a large batch of events every time, we remove the states every N events and play the rest of this snapshot.



Inventory Aggregate Store ID
internal sealed class InventoryRepository : IInventoryRepository
{
    // dependencies
    // ctor

    static InventoryRepository()
    {
        EventTypes = typeof(IEvent)
            .Assembly.GetTypes().Where(x => typeof(IEvent).IsAssignableFrom(x))
            .ToDictionary(t => t.FullName, x => x);
    }

    public async Task AppendAsync(IReadOnlyCollection<IEvent> events, CancellationToken ct)
    {
        using (var session = await _dbSessionFactory.OpenAsync())
        {
            if (events.Count == 0) return;

            try
            {
                foreach (var @event in events)
                {
                    await session.ExecuteAsync(Sql.AppendEvent,
                        new
                        {
                            @event.AggregateId,
                            @event.Version,
                            @event.UnitId,
                            Type = @event.GetType().FullName,
                            Data = JsonConvert.SerializeObject(@event),
                            CreatedDateTimeUtc = DateTime.UtcNow
                        }, cancellationToken: ct);
                }
            }
            catch (MySqlException e)
                when (e.Number == (int) MySqlErrorCode.DuplicateKeyEntry)
            {
                throw new OptimisticConcurrencyException(events.First().AggregateId, "");
            }
        }
    }

    public async Task<Domain.Models.Inventory> GetInventoryAsync(
        UUId inventoryId,
        CancellationToken ct)
    {
        var events = await GetEventsAsync(inventoryId, 0, ct);

        if (events.Any()) return Domain.Models.Inventory.Restore(inventoryId, events);

        return null;
    }
    
    private async Task<IEvent[]> GetEventsAsync(
        UUId id,
        int snapshotVersion,
        CancellationToken ct)
    {
        using (var session = await _dbSessionFactory.OpenAsync())
    {
            var snapshot = await GetInventorySnapshotAsync(session, inventoryId, ct);
            var version = snapshot?.Version ?? 0;
        
            var events = await GetEventsAsync(session, inventoryId, version, ct);
            if (snapshot != null)
            {
                snapshot.ReplayEvents(events);
                return snapshot;
            }

            if (events.Any())
            {
                return Domain.Inventories.Entities.Inventory.Restore(inventoryId, events);
            }

            return null;
        }
    }

    private async Task<Inventory> GetInventorySnapshotAsync(
        IDbSession session,
        UUId id,
        CancellationToken ct)
    {
        var record =
            await session.QueryFirstOrDefaultAsync<InventoryRecord>(Sql.GetSnapshot, new {AggregateId = id},
                cancellationToken: ct);
        return record == null ? null : Map(record);
    }

    private async Task<IInventoryEvent[]> GetEventsAsync(
        IDbSession session,
        UUId id,
        int snapshotVersion,
        CancellationToken ct)
    {
        var rows = await session.QueryAsync<EventRecord>(Sql.GetEvents,
            new
            {
                AggregateId = id,
                Version = snapshotVersion
            }, cancellationToken: ct);
        return rows.Select(Map).ToArray();
    }

    private static IEvent Map(EventRecord e)
    {
        var type = EventTypes[e.Type];
        return (IEvent) JsonConvert.DeserializeObject(e.Data, type);
    }
}

internal class EventRecord
{
    public string Type { get; set; }
    public string Data { get; set; }
}




After several months of operation, we realized that we do not have a big need to store all user actions on the unit instance. The business does not use this information in any way. That being said, there is an overhead in maintaining this approach. Having evaluated all the pros and cons, we plan to move away from ES to the traditional approach - to replace the sign Eventswith Inventoriesand Measurements.



Integration with external bounded contexts



This is how the bounded context interacts Inventorywith the outside world.





Interaction of the revision context with other contexts. The diagram shows contexts, services and their belonging to each other.



In the case of Auth, Inventoryand Datacatalog, there is one bounded context for each service. The monolith performs several functions, but now we are only interested in the accounting functionality in pizzerias. In addition to revisions, accounting also includes the movement of raw materials in pizzerias: receipts, transfers, write-offs.



HTTP



The service Inventoryinteracts with Authover HTTP. First of all, the user is faced with Auth, which prompts the user to choose one of the roles available to him.



  • The system has a role "auditor", which the user chooses during the audit.

  • .

  • .



At the last stage, the user has a token from Auth. The revision service must validate this token, so it asks Authfor validation. Authwill check whether the token's lifetime has expired, whether it belongs to the owner, or whether it has the necessary access rights. If all is well, then it Inventorysaves the stamps in the cookies - user ID, login, pizzeria ID and sets the cookie lifetime.



Note . AuthWe described in more detail how the service works in the article " Subtleties of authorization: an overview of the OAuth 2.0 technology ".



It Inventoryinteracts with other services through message queues. The company uses RabbitMQ as a message broker, as well as the binding above it - MassTransit.



RMQ: Consuming Events



Directory service - Datacatalog- will provide Inventoryall the necessary entities: raw materials for accounting, countries, divisions and pizzerias.



Without going into details of the infrastructure, I will describe the basic idea of ​​consuming events. On the side of the directory service, everything is already ready for publishing events, let's look at the example of the raw material entity.



Datacatalog Event Contract Code
namespace Dodo.DataCatalog.Contracts.Products.v1
{
    public class MaterialType
    {
        public UUId Id { get; set; }
        public int Version { get; set; }
        public int CountryId { get; set; }
        public UUId DepartmentId { get; set; }

        public string Name { get; set; }
        public MaterialCategory Category { get; set; }
        public UnitOfMeasure BasicUnitOfMeasure { get; set; }
        public bool IsRemoved { get; set; }
    }

    public enum UnitOfMeasure
    {
        Quantity = 1,
        Gram = 5,
        Milliliter = 7,
        Meter = 8,
    }

    public enum MaterialCategory
    {
        Ingredient = 1,
        SemiFinishedProduct = 2,
        FinishedProduct = 3,
        Inventory = 4,
        Packaging = 5,
        Consumables = 6
    }
}




This post is published in exchange. Each service can create its own bundle exchange-queueto consume events.





Scheme for publishing an event and its consumption through the RMQ primitives.



Ultimately, there is a queue for each entity that the service can subscribe to. All that remains is to save the new version to the database.



Event consumer code from Datacatalog
public class MaterialTypeConsumer : IConsumer<Dodo.DataCatalog.Contracts.Products.v1.MaterialType>
{
    private readonly IMaterialTypeRepository _materialTypeRepository;

    public MaterialTypeConsumer(IMaterialTypeRepository materialTypeRepository)
    {
         _materialTypeRepository = materialTypeRepository;
    }
 
    public async Task Consume(ConsumeContext<Dodo.DataCatalog.Contracts.Products.v1.MaterialType> context)
    {
        var materialType = new AddMaterialType(context.Message.Id,
            context.Message.Name,
            (int)context.Message.Category,
            (int)context.Message.BasicUnitOfMeasure,
            context.Message.CountryId,
            context.Message.DepartmentId,
            context.Message.IsRemoved,
            context.Message.Version);
    
        await _materialTypeRepository.SaveAsync(materialType, context.CancellationToken);
    }
}




RMQ: Publishing Events



The accounting part of the monolith consumes data Inventoryto support the rest of the functionality that requires revision data. All events that we want to notify other services about, we marked with the interface IPublicInventoryEvent. When an event of this kind occurs, we extract them from the changelog ( changes) and send them to the dispatch queue. For this, two tables are used publicqueueand publicqueue_archive.



To guarantee the delivery of messages, we use a pattern that we usually call "local queue", meaning Transactional outbox pattern. Saving the state of the aggregate Inventoryand sending events to the local queue occurs in one transaction. As soon as the transaction is committed, we immediately try to send messages to the broker.



If the message was sent, then it is removed from the queue publicqueue. If not, an attempt will be made to send the message later. Subscribers to the monolith and data pipelines then consume messages. The table publicqueue_archivestores data forever for convenient re-dispatch of events if it is required at some point.



Code for publishing events to the message broker
internal sealed class BusDataService : IBusDataService
{
    private readonly IPublisherControl _publisherControl;
    private readonly IPublicQueueRepository _repository;
    private readonly EventMapper _eventMapper;

    public BusDataService(
        IPublicQueueRepository repository,
        IPublisherControl publisherControl,
        EventMapper eventMapper)
    {
        _repository = repository;
        _publisherControl = publisherControl;
        _eventMapper = eventMapper;
    }

    public async Task ConsumePublicQueueAsync(int batchEventSize, CancellationToken cancellationToken)
    {
        var events = await _repository.GetAsync(batchEventSize, cancellationToken);
        await Publish(events, cancellationToken);
    }

    public async Task Publish(IEnumerable<IPublicInventoryEvent> events, CancellationToken ct)
    {
        foreach (var @event in events)
        {
            var publicQueueEvent = _eventMapper.Map((dynamic) @event);
            await _publisherControl.Publish(publicQueueEvent, ct);
            await _repository.DeleteAsync(@event, ct);
       }
    }
}




We send events to the monolith for reports. Loss and surplus report allows you to compare any two revisions with each other. In addition, there is an important report "warehouse balances", which was already mentioned earlier. 



Why send events to the data pipeline? All the same - for reports, but only on new rails. Previously, all reports lived in a monolith, but now they are taken out. This shares two responsibilities - storage and processing of production and analytical data: OLTP and OLAP. This is important both in terms of infrastructure and development.



Conclusion



By following the principles and practices of Domain-Driven Design, we have managed to build a reliable and flexible system that meets the business needs of users. We got not only a decent product, but also good code that is easy to modify. We hope that in your projects there will be a place for using Domain-Driven Design.



You can find more information about DDD on our DDDevotion community and on the DDDevotion Youtube channel . You can discuss the article in Telegram at Dodo Engineering chat .



All Articles