Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/Functions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Net;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.Logging;

namespace WorkItemFiltering;

// =============================================================================
// Orchestrations
// =============================================================================

/// <summary>
/// A simple orchestration that calls an activity and returns the result.
/// With work item filtering enabled, DTS will only dispatch this orchestration
/// to workers that have it registered.
/// </summary>
public static class GreetingOrchestration
{
[Function(nameof(GreetingOrchestration))]
public static async Task<string> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(GreetingOrchestration)).LogInformation("GreetingOrchestration started");
return await ctx.CallActivityAsync<string>(nameof(SayHello), "World");
}

[Function(nameof(GreetingOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/greeting")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(GreetingOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// A fan-out/fan-in orchestration that calls the same activity in parallel.
/// Demonstrates that activity work items are also filtered.
/// </summary>
public static class FanOutOrchestration
{
[Function(nameof(FanOutOrchestration))]
public static async Task<string[]> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(FanOutOrchestration)).LogInformation("FanOutOrchestration: fanning out to 3 activities");
return await Task.WhenAll(
ctx.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
ctx.CallActivityAsync<string>(nameof(SayHello), "London"),
ctx.CallActivityAsync<string>(nameof(SayHello), "Seattle"));
}

[Function(nameof(FanOutOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/fanout")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(FanOutOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// A parent orchestration that calls a child orchestration.
/// Sub-orchestration dispatch is also governed by work item filters.
/// </summary>
public static class ParentOrchestration
{
[Function(nameof(ParentOrchestration))]
public static async Task<string> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(ParentOrchestration)).LogInformation("Calling sub-orchestration");
string result = await ctx.CallSubOrchestratorAsync<string>(nameof(GreetingOrchestration));
return $"Parent received: {result}";
}

[Function(nameof(ParentOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/parent")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(ParentOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// An orchestration that interacts with a durable entity.
/// Entity work items are also filtered.
/// </summary>
public static class CounterOrchestration
{
[Function(nameof(CounterOrchestration))]
public static async Task<int> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
var logger = ctx.CreateReplaySafeLogger(nameof(CounterOrchestration));
var entityId = new EntityInstanceId(nameof(CounterEntity), "sample-counter");

await ctx.Entities.CallEntityAsync(entityId, "Add", 10);
await ctx.Entities.CallEntityAsync(entityId, "Add", 20);
int value = await ctx.Entities.CallEntityAsync<int>(entityId, "Get");

logger.LogInformation("Counter value = {Value}", value);
return value;
}

[Function(nameof(CounterOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/counter")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(CounterOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

// =============================================================================
// Activities
// =============================================================================

public static class SayHello
{
[Function(nameof(SayHello))]
public static string Run([ActivityTrigger] string name) => $"Hello, {name}!";
}

// =============================================================================
// Entities
// =============================================================================

public class CounterEntity : TaskEntity<int>
{
public void Add(int amount) => this.State += amount;
public void Reset() => this.State = 0;
public int Get() => this.State;

[Function(nameof(CounterEntity))]
public static Task Dispatch([EntityTrigger] TaskEntityDispatcher dispatcher)
=> dispatcher.DispatchAsync<CounterEntity>();
}

// =============================================================================
// Generic starter (for cross-app filter isolation tests)
// =============================================================================

public static class GenericStarter
{
[Function("StartOrchestration")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start/{name}")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
string name)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(name);
return client.CreateCheckStatusResponse(req, instanceId);
}
}
5 changes: 5 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Hosting;

FunctionsApplicationBuilder builder = FunctionsApplication.CreateBuilder(args);
builder.Build().Run();
130 changes: 130 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Work Item Filtering with Durable Functions (.NET)

.NET | Durable Functions

## Description

Demonstrates the **work item filtering** feature for Durable Functions with the Durable Task Scheduler (DTS) backend. When multiple Function apps share the same DTS task hub, work item filtering ensures each app only receives work items for the functions it has registered — preventing dispatch failures.

This sample includes orchestrations, activities, entities, sub-orchestrations, and fan-out/fan-in patterns — all governed by work item filters.

## Prerequisites

1. [.NET 8 SDK](https://dotnet.microsoft.com/download/dotnet/8.0)
2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the emulator and Azurite)
3. [Azure Functions Core Tools v4](https://learn.microsoft.com/azure/azure-functions/functions-run-local)

## Quick Run

1. Start the Durable Task Scheduler emulator:
```bash
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
```

2. Start Azurite (Azure Storage emulator):
```bash
docker run --name azurite -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite
```

3. Start the Function app:
```bash
func start
```

4. Trigger an orchestration:
```bash
curl -X POST http://localhost:7071/api/orchestrators/greeting
```

5. Test filter isolation — schedule an orchestration this app does NOT have:
```bash
curl -X POST http://localhost:7071/api/start/SomeOtherOrchestration
```
Check the status — it should stay `Pending` because no worker has `SomeOtherOrchestration` in its filter. Without filtering, this would fail with *"function doesn't exist"*.

## Expected Output

```
# Matching orchestration → Completed
{"name":"GreetingOrchestration","runtimeStatus":"Completed","output":"Hello, World!"}

# Unknown orchestration → stays Pending (filter isolation working)
{"name":"SomeOtherOrchestration","runtimeStatus":"Pending"}
```

## How It Works

The key configuration in [`host.json`](host.json):

```json
{
"extensions": {
"durableTask": {
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"workItemFilteringEnabled": true
}
}
}
}
```

When `workItemFilteringEnabled` is `true`:
1. The Durable Functions extension discovers registered orchestrators, activities, and entities during function indexing
2. These names are sent to DTS as `WorkItemFilters` on the `GetWorkItems` gRPC stream
3. DTS only dispatches work items that match the worker's registered functions
4. Unmatched work items stay in the DTS queue until a matching worker connects

No code changes are needed — filtering is automatic based on the functions registered in the app.

## Registered Functions

| Type | Function | Description |
|---------------|---------------------------|-------------------------------------|
| Orchestration | `GreetingOrchestration` | Simple activity call |
| Orchestration | `FanOutOrchestration` | Parallel fan-out to 3 activities |
| Orchestration | `ParentOrchestration` | Calls GreetingOrchestration as sub |
| Orchestration | `CounterOrchestration` | Interacts with CounterEntity |
| Activity | `SayHello` | Returns a greeting string |
| Entity | `CounterEntity` | Counter with Add/Reset/Get |

## Multi-App Scenario

To see filter isolation in action across two apps:

1. Create a second Function app with **different** orchestrations/activities
2. Point both apps to the **same** DTS task hub
3. Enable `workItemFilteringEnabled: true` in both
4. Schedule orchestrations — each app only processes its own functions

## Viewing in the Dashboard

- **Emulator:** Navigate to http://localhost:8082 → select the "default" task hub
- **Azure:** Navigate to your Scheduler resource in the Azure Portal → Task Hub → Dashboard URL

## Using a Deployed Scheduler (Azure)

To use a Durable Task Scheduler in Azure instead of the emulator:

1. Update `local.settings.json`:
```json
{
"Values": {
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://<your-scheduler>.durabletask.io;Authentication=ManagedIdentity"
}
}
```

2. Run the sample using the same commands above.

## Related Samples

- [WorkItemFilteringSplitActivities](../../../scenarios/WorkItemFilteringSplitActivities/) — Multi-worker scenario using Durable Task SDK
- [Fan-out/Fan-in (Python)](../../python/fan-out-fan-in/) — Fan-out pattern in Python
- [HelloCities (.NET)](../HelloCities/) — Basic Durable Functions quickstart

## Learn More

- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-task-hubs)
- [Durable Functions patterns](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.16.3" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" OutputItemType="Analyzer" />
</ItemGroup>
</Project>
19 changes: 19 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.AzureStorage": "Warning",
"DurableTask.Core": "Warning"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"workItemFilteringEnabled": true
}
}
}
}
Loading