Durable Functions Patterns
En el anterior post hablábamos de las Durable Function, de los tipos que había y cómo gracias al Framework de las Durable Function se podía realizar tareas de ejecución prolongada (long-running).
En este post vamos a ver los distintos patrones que existen para así poder definir una estrategia que se adapte al escenario que estemos desarrollando.
Patrones
El principal caso de uso de las Durable Function es simplificar la complejidad, y la coordinación del estado con los distintos requerimientos, para lograr este objetivo, vemos los principales patrones:
- Function chaining
- Fan-out/fan-in
- Async HTTP APIs
- Monitoring
- Human interaction
- Aggregator (stateful entities)
A continuación, pasamos a detallar la definición de cada uno.
Function chaining
Es un encadenamiento de ejecución de Functions en un orden específico. El output de una Function podría ser el input de la siguiente.
En el ejemplo siguiente, cuatro Functions son desencadenadas de forma que la siguiente recibe por parámetro el valor resultante de la ejecución anterior.
[FunctionName("ChainingPatternFunction")]
public static async Task<object> Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var a = await context.CallActivityAsync<object>("FunctionA", null);
var b = await context.CallActivityAsync<object>("FunctionB", a);
var c = await context.CallActivityAsync<object>("FunctionC", b);
var d = await context.CallActivityAsync<object>("FunctionD", c);
return d;
}
catch (Exception ex)
{
Debug.WriteLine(ex);
throw;
}
}
Como vimos en el anterior post, el objeto context
puede ser utilizado para llamar a otras Functions por nombre, pasar parámetros o devolver el resultado de la ejecución.
Fan-out/fan-in (Distribución ramificada de salida y de entrada)
En este patrón, ejecutamos múltiples Functions en paralelo y esperamos a que todas finalicen.
En el siguiente gráfico de ejemplo, las Functions B, C y D son ejecutadas en paralelo y luego pasan a ejecutar la Function E, quién podría utilizar el resultado de las otras tres para realizar su procesamiento.
Una primera Function recuperará el literal de la cadena de conexión que utilizarán las Functions B, C y D en paralelo para llamar a un repositorio de datos y hacer operaciones sobre filas de datos. Una vez terminen cada función reporta el número de filas que se han visto afectadas, y la FunctionE totaliza el número de filas.
[FunctionName("FanOutFanInPatternFunction")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var tasks = new List<Task<int>>();
var connectionString = await context.CallActivityAsync<string>("FunctionA", null);
tasks.Add(context.CallActivityAsync<int>("FunctionB", connectionString));
tasks.Add(context.CallActivityAsync<int>("FunctionC", connectionString));
tasks.Add(context.CallActivityAsync<int>("FunctionD", connectionString));
await Task.WhenAll(tasks);
int totalRows = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("FunctionE", sum);
}
Async HTTP APIs
En este patrón se resuelven posibles problemas de coordinación con el estado de las operaciones long-running, para ello, una forma es implementar una Function que desencadene la operativa, y a continuación el cliente se redirige a otra Function que le reporta el estado de las tareas, y así puede saber cómo se encuentra la ejecución.
El Framework de las Durable Function, tras iniciar una instancia, expone un webhook para que se pueda comprobar el estado de la ejecución sin necesidad de tener que realizar un desarrollo específico de comprobación de los estados.
Monitoring
Este patrón consistiría en una supervisión periódica y flexible de un workflow, por ejemplo, realizando comprobaciones hasta que se cumple una condición. En este patrón podríamos invertir el de la API asíncrona, y en lugar de exponer un punto para que un cliente externo supervise la operación, el propio monitor consume el punto de conexión y espera a que se produzca un cambio de estado o se cumpla la condición.
En este ejemplo, la Function Get Status
en lugar de exponer un API para consultar el estado de Results
, está verificando de forma periódica el estado de los resultados y hasta que no se dan las condiciones de salida (en este caso que se complete la ejecución), no se finaliza el proceso.
En este ejemplo, el while
está ejecutándose mientras no se complete el trabajo de otra Function o la fecha y hora actual sea mayor que la fecha de expiración. Cada 5 segundos hará una nueva iteración para comprobar el estado.
[FunctionName("MonitorJobStatus")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var id = context.GetInput<int>();
var intervalSeconds = 5;
var maxDateTime = DateTime.UtcNow.AddHours(1);
while (context.CurrentUtcDateTime < maxDateTime)
{
var status = await context.CallActivityAsync<string>("GetStatus", id);
if (status == "Completed")
{
await context.CallActivityAsync("End", machineId);
break;
}
var nextCheck = context.CurrentUtcDateTime.AddSeconds(intervalSeconds);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
}
Human interaction
En determinados casos, un workflow requiere de la intervención humana para proseguir con su ejecución. Esta acción manual, en un proceso automatizado ralentiza significativamente la ejecución, pues las personas no tienen la disponibilidad que tienen las máquinas. Para poder realizar escenarios de este tipo se deben implementar tiempos de expiración y lógicas de compensación.
En este ejemplo se inicia la ejecución y se solicita a un humano la aprobación, si el usuario no responde a tiempo se escala, pero si contesta se continúa con el proceso gestionando la respuesta que haya facilitado para la Aprobación o Rechazo.
En este snippet si la Function no recibe el resultado de la aprobación o rechazo del usuario en 72 horas, se continúa la ejecución llamando a otra Function para que gestione la falta de respuesta. Por otro lado, RaiseEventToOrchestration
envía un aprobado a la orquestación para responder a la solicitud de aprobación.
[FunctionName("ApprovalFunction")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
var dueTime = context.CurrentUtcDateTime.AddHours(72);
var durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
var approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run([HttpTrigger] string instanceId, [DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
Aggregator (stateful entities)
Este patrón trata de agregar datos o lotes de datos de distintos eventos e incluso orígenes, durante un período determinado de tiempo en una Entidad expuesta por una Durable Entity. El proceso podría ir realizando operaciones sobre los datos que recibe durante la ejecución del proceso y también es posible que durante la ejecución, clientes externos puedan consultar los datos agregados.
Implementar este patrón con funciones normales es casi imposible, ya que es necesiario gestionar el estado o la simultaneidad. Para simplificar y poder llevar a cabo este patrón de una forma sencilla en una única Function se utilizan las Durable Entities.
Cada vez que un EventHub
desencadene el trigger se creará una entidad y se agregará un delta con los datos recibidos.
[FunctionName("EventHubTriggerFunction")]
public static async Task Run([EventHubTrigger("device-sensor-events")] EventData eventData, [DurableClient] IDurableOrchestrationClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
public class CounterFunction
{
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
ctx.SetState(ctx.GetState<int>() + ctx.GetInput<int>());
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(ctx.GetState<int>());
break;
}
}
}
[JsonObject(MemberSerialization.OptIn)]
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx) => ctx.DispatchAsync<Counter>();
}