Add support for async workflow activities#1053
Draft
seherv wants to merge 2 commits into
Draft
Conversation
seherv
commented
May 25, 2026
| f"Activity '{req.name}#{req.taskId}' result is too large to deliver " | ||
| f'(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}' | ||
| ) | ||
| failure_res = pb.ActivityResponse( |
Contributor
Author
There was a problem hiding this comment.
This nesting got hard to follow, I needed to refactor this file to understand the logic better.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1053 +/- ##
==========================================
- Coverage 86.63% 82.57% -4.06%
==========================================
Files 84 149 +65
Lines 4473 14834 +10361
==========================================
+ Hits 3875 12249 +8374
- Misses 598 2585 +1987 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Comment on lines
+37
to
+53
| asyncio's **default executor**, which is process-wide and sized to | ||
| `min(32, cpu_count + 4)`. It is *not* `maximum_thread_pool_workers`. | ||
|
|
||
| If the sidecar takes >5 ms to acknowledge and the worker runs >30 concurrent | ||
| async activities, response delivery serializes through the default executor and | ||
| tail latency inflates. Install a larger default executor before starting: | ||
|
|
||
| ```python | ||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(max_workers=200)) | ||
| ``` | ||
|
|
||
| This goes away when the worker migrates to `grpc.aio`. Until then, the default | ||
| executor is a separate knob from `maximum_thread_pool_workers`. | ||
|
|
Comment on lines
+114
to
+121
| - **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. No thread pool involvement. The gRPC response is delivered via `loop.run_in_executor(None, stub.CompleteActivityTask, ...)` (asyncio's default executor). | ||
| - **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread. The thread pool size is controlled by `maximum_thread_pool_workers`. | ||
|
|
||
| Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async. | ||
|
|
||
| **Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them. | ||
|
|
||
| **`maximum_thread_pool_workers` gotcha.** This knob sizes the sync-activity thread pool only. Async-activity response delivery uses asyncio's default executor (process-wide, lazily sized to `min(32, cpu_count + 4)`), which is not capped by this knob. Strict thread-count bounds for async response delivery require calling `asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(max_workers=N))` before `wfr.start()`. A future PR may migrate the worker to `grpc.aio` and remove this caveat by sending responses without any thread pool. |
|
|
||
| **Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them. | ||
|
|
||
| **`maximum_thread_pool_workers` gotcha.** This knob sizes the sync-activity thread pool only. Async-activity response delivery uses asyncio's default executor (process-wide, lazily sized to `min(32, cpu_count + 4)`), which is not capped by this knob. Strict thread-count bounds for async response delivery require calling `asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(max_workers=N))` before `wfr.start()`. A future PR may migrate the worker to `grpc.aio` and remove this caveat by sending responses without any thread pool. |
Comment on lines
+171
to
+176
| class IsAsyncCallableTest(unittest.TestCase): | ||
| """Pin the contract of ``_is_async_callable`` against decorator shapes that bare | ||
| ``inspect.iscoroutinefunction`` would miss. These are the patterns the fix for finding | ||
| #5 was meant to address. Without coverage, a future refactor can silently regress | ||
| async-activity routing for any of them. | ||
| """ |
Comment on lines
+11
to
+16
| - **Timestamp**: 2026-05-25 20:40:09 UTC | ||
| - **Git commit**: `8f13da0-dirty` | ||
| - **Python**: CPython 3.13.12 | ||
| - **OS**: Darwin 25.5.0 (arm64) on Apple M3 Pro (12 logical cores), 36.0 GB | ||
| - **asyncio default executor**: `max_workers=16` (`min(32, cpu_count + 4)`) | ||
| - **CI environment**: no |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Workflow activities can now be async, and the runtime will automatically dispatch them to the event loop. Sync activities are still dispatched to the thread pool. The user-facing API remains exactly the same.
Also added a benchmark suite to verify performance locally.
Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #834 #897 #975
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: