项目作者: gaevoy

项目描述 :
Tiny library to build durable task, saga, process manager using the async/await capabilities. Inspired by Azure Durable Task Framework.
高级语言: C#
项目地址: git://github.com/gaevoy/Gaev.DurableTask.git
创建时间: 2017-04-11T20:35:02Z
项目社区:https://github.com/gaevoy/Gaev.DurableTask

开源协议:

下载


Gaev.DurableTask is tiny library to build durable task, saga, process manager using the async/await capabilities. Main idea is to fit saga, process manager in one async method. Inspired by Azure Durable Task Framework.

Just imagine you can write regular code using the async/await capabilities which can last for long time, say 1 week or year. Moreover, if an application crashes the durable task will resume execution exactly from where it left.

A durable task must have some storage for storing current state in order to resume execution after restart/crash. There is MS SQL and file system based storage provider. However, you can implement your own provider, just implement IProcessStorage.

To estimate amount of used memory, a simple durable task was hosted in console application. As a result one instance of the durable task will use 4.3Kb in 32bit or 9KB in 64bit, so 250 000 instances will occupy 1Gb of 32bit console app.

Caveat: An exception type is limited due to serialization/deserialization. Any exception will be wrapped in ProcessException. So ProcessException is only one type allowed to catch in a durable task (see Rollback logic example below).

Let’s look closer to the durable task. It is easier to show an example:

Saga, process manager complete example

  1. async Task DurableTask(string id, string email = null)
  2. {
  3. using (var proc = _host.Spawn(id))
  4. {
  5. // Save email not to lose it if durable task resumes
  6. email = await proc.Get(email, "SaveEmail");
  7. // Register the user
  8. var userId = await proc.Do(() => _service.RegisterUser(email), "RegisterUser");
  9. // Generate a secret for email verification
  10. var secret = await proc.Get(Guid.NewGuid(), "secret");
  11. // Send email to the user with the secret to verify
  12. await proc.Do(() => _service.VerifyEmail(email, secret), "VerifyEmail");
  13. // Wait when user receive verification email and send the secret here, it can take couple of days
  14. await proc.Do(() => _service.WaitForEmailVerification(secret), "WaitForEmailVerification");
  15. // Activate the user in the system
  16. await proc.Do(() => _service.ActivateUser(userId), "ActivateUser");
  17. }
  18. }

Schedule complete example

  1. async Task DurableTask(string id, string email = null)
  2. {
  3. using (var proc = _host.Spawn(id))
  4. {
  5. // Save email not to lose it if durable task resumes
  6. email = await proc.Get(email, "SaveEmail");
  7. await proc.Do(() => _smtp.Send(email, "Welcome!"), "Welcome");
  8. // Wait 1 month
  9. await proc.Delay(TimeSpan.FromDays(30), "Wait1m");
  10. await proc.Do(() => _smtp.Send(email, "Your 1st month with us. Congrats!"), "CongratsMonth");
  11. // Wait 11 months
  12. await proc.Delay(TimeSpan.FromDays(365 - 30), "Wait1y");
  13. await proc.Do(() => _smtp.Send(email, "Your 1st year with us. Congrats!"), "CongratsYear");
  14. }
  15. }

Rollback logic complete example

  1. async Task DurableTask(string id, string srcAccount = null, string destAccount = null, decimal amount = 0)
  2. {
  3. using (var proc = _host.Spawn(id))
  4. {
  5. // Save values not to lose it if durable task resumes
  6. srcAccount = await proc.Get(srcAccount, "SaveSrcAccount");
  7. destAccount = await proc.Get(destAccount, "SaveDestAccount");
  8. amount = await proc.Get(amount, "SaveAmount");
  9. var srcTranId = Guid.Empty;
  10. var destTranId = Guid.Empty;
  11. try
  12. {
  13. // Start transferring the money
  14. srcTranId = await proc.Do(() => _service.StartTransfer(srcAccount, -amount), "StartTransfer1");
  15. destTranId = await proc.Do(() => _service.StartTransfer(destAccount, +amount), "StartTransfer2");
  16. // Complete transferring the money
  17. await proc.Do(() => _service.CompleteTransfer(srcAccount, srcTranId), "CompleteTransfer1");
  18. await proc.Do(() => _service.CompleteTransfer(destAccount, destTranId), "CompleteTransfer2");
  19. }
  20. catch (ProcessException ex) when (ex.Type == nameof(TransferFailedException))
  21. {
  22. // Rollback logic
  23. if (srcTranId != Guid.Empty)
  24. await proc.Do(() => _service.RollbackTransfer(srcAccount, srcTranId), "RollbackTransfer1");
  25. if (destTranId != Guid.Empty)
  26. await proc.Do(() => _service.RollbackTransfer(destAccount, destTranId), "RollbackTransfer2");
  27. throw;
  28. }
  29. }
  30. }

Sending a message to the durable task complete example

  1. async Task DurableTask(string processId, string companyId = null, string creditCard = null)
  2. {
  3. using (var proc = _host.Spawn(processId).As<CreditCardProcess>())
  4. {
  5. companyId = await proc.Get(companyId, "1");
  6. creditCard = await proc.Get(creditCard, "2");
  7. Console.WriteLine($"CreditCardFlow is up for companyId={companyId} creditCard={creditCard}");
  8. var email = await proc.Do(() => GetEmail(companyId), "3");
  9. await proc.Do(() => SendEmail(email, $"{creditCard} was assigned to you"), "4");
  10. var onCheckTime = proc.Delay(TimeSpan.FromMinutes(5), "5");
  11. var onFirstTransaction = proc.Do(() => proc.OnTransactionAppeared(), "6");
  12. var onDeleted = proc.Do(() => proc.OnCreditCardDeleted(), "7");
  13. Task.Run(async () =>
  14. {
  15. await onCheckTime;
  16. if (onDeleted.IsCompleted) return;
  17. if (!onFirstTransaction.IsCompleted)
  18. await proc.Do(() => SendEmail(email, $"{creditCard} is inactive long time"), "8");
  19. });
  20. Task.Run(async () =>
  21. {
  22. await onFirstTransaction;
  23. if (onDeleted.IsCompleted) return;
  24. await proc.Do(() => SendEmail(email, $"{creditCard} received 1st transaction"), "9");
  25. });
  26. await onDeleted;
  27. await proc.Do(() => SendEmail(email, $"{creditCard} was deleted"), "10");
  28. }
  29. }
  30. static void Main(string[] args)
  31. {
  32. ...
  33. var procId = creditCardFlow.Start("1111-1111-1111-1111", "user1@gmail.com");
  34. host.Get(procId).As<CreditCardProcess>()?.RaiseOnTransactionAppeared();
  35. host.Get(procId).As<CreditCardProcess>()?.RaiseOnCreditCardDeleted();
  36. ...
  37. }