The Three Musketeers - Event Sourcing, Event Storming and the Event Store - Enter the Battle: Part 1 - Trying the DB Event Store





Hello, Habr! I decided to move away from Scala, Idris and other FP for a while and talk a little about the Event Store - a database in which events can be saved to event streams. As in the good old book, we also have Musketeers in fact 4 and the fourth is DDD. First, I use Event Storming to select commands, events and entities associated with them. Then, on their basis, I will save the state of the object and restore it. I will be doing a regular TodoList in this article. For details, welcome under cat.



Content



  • The Three Musketeers - Event Sourcing, Event Storming and the Event Store - Enter the Battle: Part 1 - Trying the DB Event Store


Links



Sources

Images docker image

Event Store

Event Soucing

Event Storming



Actually, the Event Store is a database that is designed to store events. She also knows how to create subscriptions to events so that they can be somehow processed. There are also projections that also react to events and, on their basis, accumulate some data. For example, during the TodoCreated event, you can increase some kind of Count counter in the projection. For now, in this part I will be using the Event Store as Read and Write Db. Further in the following articles I will create a separate database for reading into which data will be written based on events stored in the database for writing to the Event Store. There will also be an example of how to do "Time Travel" by rolling back the system to the state it had in the past.

And so let's start Event Stroming. Usually, for its implementation, all interested people and experts are gathered who tell what events in the subject area that the software will simulate. For example, for plant software - ProductManufactured. For the game - Damage taken. For Financial Software - Money credited to the Account and so on. Since our subject area is as simple as TodoList, we will have few events. And so, let's write the events of our subject area (domain) on the board.







Now let's add the commands that trigger these events.







Next, let's group these events and commands around the entity with a change in the state of which they are associated.







My commands will simply turn into service method names. Let's get down to implementation.



First, let's describe the Events in code.



    public interface IDomainEvent
    {
      // .   id   Event Strore
        Guid EventId { get; }
       // .        Event Store
        long EventNumber { get; set; }
    }

    public sealed class TodoCreated : IDomainEvent
    {
       //Id  Todo
        public Guid Id { get; set; }
       //  Todo
        public string Name { get; set; }
        public Guid EventId => Id;
        public long EventNumber { get; set; }
    }

    public sealed class TodoRemoved : IDomainEvent
    {
        public Guid EventId { get; set; }
        public long EventNumber { get; set; }
    }

    public sealed class TodoCompleted: IDomainEvent
    {
        public Guid EventId { get; set; }
        public long EventNumber { get; set; }
    }


Now our core is an entity:



    public sealed class Todo : IEntity<TodoId>
    {
        private readonly List<IDomainEvent> _events;

        public static Todo CreateFrom(string name)
        {
            var id = Guid.NewGuid();
            var e = new List<IDomainEvent>(){new TodoCreated()
                {
                    Id = id,
                    Name = name
                }};
            return new Todo(new TodoId(id), e, name, false);
        }

        public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
        {
            var id = Guid.Empty;
            var name = String.Empty;
            var completed = false;
            var ordered = events.OrderBy(e => e.EventNumber).ToList();
            if (ordered.Count == 0)
                return null;
            foreach (var @event in ordered)
            {
                switch (@event)
                {
                    case TodoRemoved _:
                        return null;
                    case TodoCreated created:
                        name = created.Name;
                        id = created.Id;
                        break;
                    case TodoCompleted _:
                        completed = true;
                        break;
                    default: break;
                }
            }
            if (id == default)
                return null;
            return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
        }

        private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
        {
            Id = id;
            _events = events;
            Name = name;
            IsCompleted = isCompleted;
            Validate();
        }

        public TodoId Id { get; }
        public IReadOnlyList<IDomainEvent> Events => _events;
        public string Name { get; }
        public bool IsCompleted { get; private set; }

        public void Complete()
        {
            if (!IsCompleted)
            {
                IsCompleted = true;
                _events.Add(new TodoCompleted()
                {
                    EventId = Guid.NewGuid()
                });
            }
        }

        public void Delete()
        {
            _events.Add(new TodoRemoved()
            {
                EventId = Guid.NewGuid()
            });
        }

        private void Validate()
        {
            if (Events == null)
                throw new ApplicationException("  ");
            if (string.IsNullOrWhiteSpace(Name))
                throw new ApplicationException("  ");
            if (Id == default)
                throw new ApplicationException("  ");
        }
    }


We connect to the Event Store:



            services.AddSingleton(sp =>
            {
//  TCP        . 
//       .        .
                var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
                con.ConnectAsync().Wait();
                return con;
            });


And so, the main part. Storing and reading events from the Event Store itself:



    public sealed class EventsRepository : IEventsRepository
    {
        private readonly IEventStoreConnection _connection;

        public EventsRepository(IEventStoreConnection connection)
        {
            _connection = connection;
        }

        public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
        {
            var eventPayload = events.Select(e => new EventData(
//Id 
                e.EventId,
// 
                e.GetType().Name,
//  Json (True|False)
                true,
// 
                Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
// 
                Encoding.UTF8.GetBytes((string)e.GetType().FullName)
            ));
//      
            var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
            return res.NextExpectedVersion;
        }

        public async Task<List<IDomainEvent>> Get(Guid collectionId)
        {
            var results = new List<IDomainEvent>();
            long start = 0L;
            while (true)
            {
                var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
                if (events.Status != SliceReadStatus.Success)
                    return results;
                results.AddRange(Deserialize(events.Events));
                if (events.IsEndOfStream)
                    return results;
                start = events.NextEventNumber;
            }
        }

        public async Task<List<T>> GetAll<T>() where T : IDomainEvent
        {
            var results = new List<IDomainEvent>();
            Position start = Position.Start;
            while (true)
            {
                var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
                results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
                if (events.IsEndOfStream)
                    return results.OfType<T>().ToList();
                start = events.NextPosition;
            }
        }

        private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
            events
                .Where(e => IsEvent(e.Event.EventType))
                .Select(e =>
                {
                    var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
                    result.EventNumber = e.Event.EventNumber;
                    return result;
                })
                .ToList();

        private static bool IsEvent(string eventName)
        {
            return eventName switch
            {
                nameof(TodoCreated) => true,
                nameof(TodoCompleted) => true,
                nameof(TodoRemoved) => true,
                _ => false
            };
        }
        private static Type ToType(string eventName)
        {
            return eventName switch
            {
                nameof(TodoCreated) => typeof(TodoCreated),
                nameof(TodoCompleted) => typeof(TodoCompleted),
                nameof(TodoRemoved) => typeof(TodoRemoved),
                _ => throw new NotImplementedException(eventName)
            };
        }
    }


The entity store looks very simple. We get the entity's events from the EventStore and restore it from them, or we simply save the entity's events.



    public sealed class TodoRepository : ITodoRepository
    {
        private readonly IEventsRepository _eventsRepository;

        public TodoRepository(IEventsRepository eventsRepository)
        {
            _eventsRepository = eventsRepository;
        }

        public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);

        public async Task<Todo> GetAsync(TodoId id)
        {
            var events = await _eventsRepository.Get(id.Value);
            return Todo.CreateFrom(events);
        }

        public async Task<List<Todo>> GetAllAsync()
        {
            var events = await _eventsRepository.GetAll<TodoCreated>();
            var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
            return res.Where(t => t != null).ToList();
        }
    }


The service in which the work with the repository and the entity takes place:



    public sealed class TodoService : ITodoService
    {
        private readonly ITodoRepository _repository;

        public TodoService(ITodoRepository repository)
        {
            _repository = repository;
        }

        public async Task<TodoId> Create(TodoCreateDto dto)
        {
            var todo = Todo.CreateFrom(dto.Name);
            await _repository.SaveAsync(todo);
            return todo.Id;
        }

        public async Task Complete(TodoId id)
        {
            var todo = await _repository.GetAsync(id);
            todo.Complete();
            await _repository.SaveAsync(todo);
        }

        public async Task Remove(TodoId id)
        {
            var todo = await _repository.GetAsync(id);
            todo.Delete();
            await _repository.SaveAsync(todo);
        }

        public async Task<List<TodoReadDto>> GetAll()
        {
            var todos = await _repository.GetAllAsync();
            return todos.Select(t => new TodoReadDto()
            {
                Id = t.Id.Value,
                Name = t.Name,
                IsComplete = t.IsCompleted
            }).ToList();
        }

        public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
        {
            var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
            return todos.Where(t => t != null).Select(t => new TodoReadDto()
            {
                Id = t.Id.Value,
                Name = t.Name,
                IsComplete = t.IsCompleted
            }).ToList();
        }
    }


Well, actually, so far nothing impressive. In the next article, when I add a separate database for reading, everything will sparkle with different colors. This will immediately hang us consistency over time. Event Store and SQL DB on the master - slave principle. One white ES and many black MS SQL from which read data.



Lyrical digression. In light of recent events, I could not help joking about the master slave and black whites. Ehe, the era is leaving, we will tell our grandchildren that we lived at a time when the bases during replication were called master and slave.



In systems where there is a lot of reading and little writing of data (most of them), this will increase the speed of work. Actually, the replication of the master slave itself, it is aimed at the fact that your writing slows down (as with indexes), but in return, reading is accelerated by distributing the load across several databases.



All Articles