Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8212945
Add gRPC resiliency design spec
Apr 24, 2026
8b25b56
Ignore local worktrees
Apr 24, 2026
fd07d1a
Add gRPC resiliency implementation plan
Apr 24, 2026
774004a
Add gRPC resiliency option types
Apr 24, 2026
4e8d0bb
Add grpc resiliency validation tests
Apr 24, 2026
c09def6
Thread gRPC resiliency options through constructors
Apr 24, 2026
a6bc855
Strengthen retained client state tests
Apr 24, 2026
a2ff52b
Add shared gRPC resiliency helpers
Apr 24, 2026
c049899
Add completion long-poll resiliency test
Apr 24, 2026
19e3d71
Add grpc resiliency edge-case tests
Apr 24, 2026
008be65
Harden worker gRPC stream reconnect behavior
Apr 24, 2026
c4a98e9
Fix worker channel cleanup on teardown
Apr 24, 2026
950892b
Add worker silent disconnect tests
Apr 24, 2026
09697bb
Add sync client gRPC channel recreation
Apr 24, 2026
d89bc10
Reset sync client long-poll failure tracking
Apr 24, 2026
937231d
Add sync client recreation input test
Apr 24, 2026
2834177
Add sync client recreation test coverage
Apr 24, 2026
4f4b3a3
Add async client gRPC channel recreation
Apr 24, 2026
762b247
Add async channel recreation transport test
Apr 24, 2026
2363926
Add gRPC connection resiliency
Apr 24, 2026
2401224
Remove repo-wide pytest importlib addopts
Apr 24, 2026
bbaf2b0
Update gRPC resiliency plan tracking
Apr 24, 2026
72191f1
Fix worker channel retirement for in-flight completions
Apr 24, 2026
a49f5ce
Fix worker shutdown channel draining
Apr 24, 2026
32f383d
Rename Azure Managed gRPC resiliency test module
Apr 24, 2026
b6f3b43
Fix sync client channel cleanup
Apr 24, 2026
1f9545f
Address automated review feedback
Apr 24, 2026
bdb37e2
Remove superpowers docs from PR
Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,7 @@ dmypy.json

# IDEs
.idea
.worktrees/
docs/superpowers/

coverage.lcov
coverage.lcov
28 changes: 26 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,32 @@ ADDED
`TaskHubGrpcClient`, `AsyncTaskHubGrpcClient`, and `TaskHubGrpcWorker` to
support pre-configured channel passthrough and low-level gRPC channel
customization.
- Added `get_orchestration_history()` and `list_instance_ids()` to the sync and async gRPC clients.
- Added in-memory backend support for `StreamInstanceHistory` and `ListInstanceIds` so local orchestration tests can retrieve history and page terminal instance IDs by completion window.
- Added `GrpcWorkerResiliencyOptions` and `GrpcClientResiliencyOptions`, plus
`resiliency_options` constructor parameters on `TaskHubGrpcClient`,
`AsyncTaskHubGrpcClient`, and `TaskHubGrpcWorker`, to configure hello
deadlines, silent-disconnect detection, reconnect backoff, and channel
recreation thresholds for SDK-managed gRPC connections.
- Added `get_orchestration_history()` and `list_instance_ids()` to the sync
and async gRPC clients.
- Added in-memory backend support for `StreamInstanceHistory` and
`ListInstanceIds` so local orchestration tests can retrieve history and page
terminal instance IDs by completion window.

FIXED

- Improved `TaskHubGrpcWorker` recovery from stale or disconnected gRPC streams
so configured hello timeouts apply on fresh connections, received work resets
failure tracking, SDK-owned channels are refreshed and cleaned up safely, and
caller-owned channels are never recreated or closed during reconnects.
- Fixed `TaskHubGrpcWorker` so in-flight and queued work item completions keep
draining across graceful gRPC stream resets and worker shutdown before the
worker retires an SDK-owned channel.
- Improved sync and async gRPC clients so repeated transport failures recreate
SDK-owned channels, while long-poll deadlines, successful replies, and
application-level RPC errors do not trigger unnecessary channel replacement.
- Fixed `TaskHubGrpcClient.close()` so explicit sync client shutdown now closes
any previously retired SDK-owned gRPC channels immediately instead of waiting
for the delayed cleanup timer.

## v1.4.0

Expand Down
4 changes: 4 additions & 0 deletions durabletask-azuremanaged/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`DurableTaskSchedulerClient`, `AsyncDurableTaskSchedulerClient`, and
`DurableTaskSchedulerWorker` to allow combining custom gRPC interceptors with
DTS defaults and to support pre-configured/customized gRPC channels.
- Added pass-through `resiliency_options` support on
`DurableTaskSchedulerClient`, `AsyncDurableTaskSchedulerClient`, and
`DurableTaskSchedulerWorker` so Azure Managed applications can use the core
SDK's gRPC resiliency option types through their constructors.
- Added `workerid` gRPC metadata on Durable Task Scheduler worker calls for
improved worker identity and observability.
- Improved sync access token refresh concurrency handling to avoid duplicate
Expand Down
11 changes: 10 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
DTSDefaultClientInterceptorImpl,
)
from durabletask.client import AsyncTaskHubGrpcClient, TaskHubGrpcClient
from durabletask.grpc_options import GrpcChannelOptions
from durabletask.grpc_options import (
GrpcChannelOptions,
GrpcClientResiliencyOptions,
)
import durabletask.internal.shared as shared
from durabletask.payload.store import PayloadStore

Expand All @@ -30,6 +33,7 @@ def __init__(self, *,
secure_channel: bool = True,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
channel_options: Optional[GrpcChannelOptions] = None,
resiliency_options: Optional[GrpcClientResiliencyOptions] = None,
default_version: Optional[str] = None,
payload_store: Optional[PayloadStore] = None,
log_handler: Optional[logging.Handler] = None,
Expand All @@ -54,6 +58,7 @@ def __init__(self, *,
log_formatter=log_formatter,
interceptors=resolved_interceptors,
channel_options=channel_options,
resiliency_options=resiliency_options,
default_version=default_version,
payload_store=payload_store)

Expand All @@ -74,6 +79,8 @@ class AsyncDurableTaskSchedulerClient(AsyncTaskHubGrpcClient):
If None, anonymous authentication will be used.
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
Defaults to True.
resiliency_options (Optional[GrpcClientResiliencyOptions], optional): Client-side
gRPC resiliency settings forwarded to the base async client.
default_version (Optional[str], optional): Default version string for orchestrations.
payload_store (Optional[PayloadStore], optional): A payload store for
externalizing large payloads. If None, payloads are sent inline.
Expand Down Expand Up @@ -104,6 +111,7 @@ def __init__(self, *,
secure_channel: bool = True,
interceptors: Optional[Sequence[shared.AsyncClientInterceptor]] = None,
channel_options: Optional[GrpcChannelOptions] = None,
resiliency_options: Optional[GrpcClientResiliencyOptions] = None,
default_version: Optional[str] = None,
payload_store: Optional[PayloadStore] = None,
log_handler: Optional[logging.Handler] = None,
Expand All @@ -128,5 +136,6 @@ def __init__(self, *,
log_formatter=log_formatter,
interceptors=resolved_interceptors,
channel_options=channel_options,
resiliency_options=resiliency_options,
default_version=default_version,
payload_store=payload_store)
9 changes: 8 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
DTSDefaultClientInterceptorImpl
from durabletask.grpc_options import GrpcChannelOptions
from durabletask.grpc_options import (
GrpcChannelOptions,
GrpcWorkerResiliencyOptions,
)
import durabletask.internal.shared as shared
from durabletask.payload.store import PayloadStore
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
Expand All @@ -34,6 +37,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
If None, anonymous authentication will be used.
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
Defaults to True.
resiliency_options (Optional[GrpcWorkerResiliencyOptions], optional): Worker-side
gRPC resiliency settings forwarded to the base worker.
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
for controlling worker concurrency limits. If None, default concurrency
settings will be used.
Expand Down Expand Up @@ -74,6 +79,7 @@ def __init__(self, *,
secure_channel: bool = True,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
channel_options: Optional[GrpcChannelOptions] = None,
resiliency_options: Optional[GrpcWorkerResiliencyOptions] = None,
concurrency_options: Optional[ConcurrencyOptions] = None,
payload_store: Optional[PayloadStore] = None,
log_handler: Optional[logging.Handler] = None,
Expand Down Expand Up @@ -101,6 +107,7 @@ def __init__(self, *,
log_formatter=log_formatter,
interceptors=resolved_interceptors,
channel_options=channel_options,
resiliency_options=resiliency_options,
concurrency_options=concurrency_options,
# DTS natively supports long timers so chunking is unnecessary
maximum_timer_interval=None,
Expand Down
Loading
Loading