Orchestrator of endless tasks

In this article we will talk about how to implement an infinite task orchestrator using queues. As an ultimate goal, we need to implement a system that can manage tasks with a long lifespan, a distributed system, where a group of tasks are hosted on a specific server, and if this server fails, the tasks are automatically redistributed to free ones. 





In most cases, all enterprise development comes down to fulfilling the same requirements: an application is created, depending on the type of application, it has some kind of life cycle, at the end of the life of the application, we receive (... or do not receive) what we want. By an application, we can mean anything, from an online purchase of a product, a money order, or calculating the trajectory of a ballistic missile. Each application has its own life path, and what is important to note is  the lifetime , and the shorter this time, the better. In other words, the sooner my wire transfer is completed, the better. The requirements are also similar, more  RPC  operations per second, less  Latency , the system must be fault-tolerant, scalable and ready  yesterday... There are a million tools, hundreds of databases, different approaches and patterns. And everything has been written for a long time, we just have to use the ready-made technologies correctly in our projects. 





The topic of task orchestration is not new, but to my surprise, there are simply no ready-made solutions for managing infinite tasks (whose lifetime is unlimited), with the possibility of redistributing tasks across active servers. Therefore, we will implement our own solution. But first things first…. 






, . β€” (Job), , . . , β€œβ€, . : , . , , , . β€œβ€-  WebSocket ,  connected. , , , , . , β€œβ€  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  β€œβ€.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   β€œHello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , β€” , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , β€œβ€ :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(β€œunique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { β€œunique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db β€“ -  , Hangfire   MsSql,  PostgreSql   Redis.





β€”    . β€œβ€.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , β€œdefault” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , β€” , , . , ? , . , , . , . 





? β€œβ€.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , β€œObserver 1”.  Unacked  ,  . 





  • msg2  . β€œObserver 1”  ,  ,  β€œObserver 2”. 





, β€œObserver-service 1”  , ( - β€œ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? β€œβ€, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 β€œβ€, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill β€“9, ). ,  Docker. , , . β€œβ€, , , . , - …. 





β€œβ€ API. ( , β€œState queue” ). β€œβ€ , , , , - .





, , β€œβ€. , , , , . 





,  ,  β€œβ€  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





β€œβ€ . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  β€œβ€ , . 





, β€” , ... β€œ ”.  





 Observer-service  , . , β€œβ€  CancellationToken. β€œβ€ . 





β€œβ€ . ,  id . , .  





  •  Created, β€œβ€ . - , β€œβ€. 





  •  OnDeleting  Deleted, - β€œβ€ , . 





  •  Processing, β€œβ€  OnDeleting  .    . , β€œβ€,  CancellationToken  β€œstate queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , β€œβ€, . β€œβ€, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) β€œβ€, , . 





, , β€œβ€ . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles