In this article we will talk about how to implement an infinite task orchestrator using queues. As an ultimate goal, we need to implement a system that can manage tasks with a long lifespan, a distributed system, where a group of tasks are hosted on a specific server, and if this server fails, the tasks are automatically redistributed to free ones.
In most cases, all enterprise development comes down to fulfilling the same requirements: an application is created, depending on the type of application, it has some kind of life cycle, at the end of the life of the application, we receive (... or do not receive) what we want. By an application, we can mean anything, from an online purchase of a product, a money order, or calculating the trajectory of a ballistic missile. Each application has its own life path, and what is important to note is the lifetime , and the shorter this time, the better. In other words, the sooner my wire transfer is completed, the better. The requirements are also similar, more RPC operations per second, less Latency , the system must be fault-tolerant, scalable and ready yesterday... There are a million tools, hundreds of databases, different approaches and patterns. And everything has been written for a long time, we just have to use the ready-made technologies correctly in our projects.
The topic of task orchestration is not new, but to my surprise, there are simply no ready-made solutions for managing infinite tasks (whose lifetime is unlimited), with the possibility of redistributing tasks across active servers. Therefore, we will implement our own solution. But first things firstβ¦.
, . β (Job), , . . , ββ, . : , . , , , . ββ- WebSocket , connected. , , , , . , ββ Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task ββ.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , βHello Wordβ :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , β , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , ββ :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(βunique-queue-nameβ);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { βunique-queue-nameβ },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db β - , Hangfire MsSql, PostgreSql Redis.
β . ββ.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , βdefaultβ . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , β , , . , ? , . , , . , .
? ββ. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"β, :
Observer-services , , Round-robin.
msg1 . , βObserver 1β. Unacked , .
msg2 . βObserver 1β , , βObserver 2β.
, βObserver-service 1β , ( - β ... ?β).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? ββ, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 ββ, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill β9, ). , Docker. , , . ββ, , , . , - β¦.
ββ API. ( , βState queueβ ). ββ , , , , - .
, , ββ. , , , , .
, , ββ Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
ββ .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , ββ , .
, β , ... β β.
Observer-service , . , ββ CancellationToken. ββ .
ββ . , id . , .
Created, ββ . - , ββ.
OnDeleting Deleted, - ββ , .
Processing, ββ OnDeleting . . , ββ, CancellationToken βstate queueβ. , OnDeleting Deleted.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , ββ, . ββ, . ( , .)
3) .
, "β, . "β, , .
4) . , "β.
. - , , .
5) ββ, , .
, , ββ . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "β, . , , , PrefetchCount. , , .