feat: add async support to MemorySessionManager#478
Conversation
✅ No Breaking Changes DetectedNo public API breaking changes found in this PR. |
c11771c to
228790d
Compare
Hweinstock
left a comment
There was a problem hiding this comment.
changes LGTM! I think an integ test for this flag would be awesome (perhaps as a follow-up), and perhaps worth getting @jariy17 to take a look.
| "Sync invocation will raise RuntimeError from Strands' hook registry." | ||
| ) | ||
|
|
||
| registry.add_callback(AgentInitializedEvent, lambda event: self.initialize(event.agent)) |
There was a problem hiding this comment.
note (for my own understanding): because this path doesn't call RepositorySessionManager.register_hooks, we must manually register the initialize hook: https://github.com/strands-agents/sdk-python/blob/main/src/strands/session/session_manager.py#L43.
In other words, we pick this synchronous hook from that implementation and leave out the rest to overwrite them with our own async hooks.
| if self.config.batch_size > 1: | ||
| registry.add_callback(AfterInvocationEvent, lambda event: self._flush_messages()) | ||
|
|
||
| async def _on_after_invocation_flush(event: AfterInvocationEvent) -> None: |
There was a problem hiding this comment.
nit: could we add a small helper to reduce boilerplate here?
jariy17
left a comment
There was a problem hiding this comment.
Everything is good but look into the _flush_agent_state bug and missing bidi hooks.
| RepositorySessionManager.register_hooks(self, registry, **kwargs) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) | ||
| if not self.config.async_mode: | ||
| RepositorySessionManager.register_hooks(self, registry, **kwargs) |
There was a problem hiding this comment.
note: We're manually registering hooks in the async path instead of going through RepositorySessionManager.register_hooks(), so if strands adds new hooks upstream, we won't pick them up here.
| """ | ||
| RepositorySessionManager.register_hooks(self, registry, **kwargs) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) | ||
| if not self.config.async_mode: |
There was a problem hiding this comment.
Nit: I prefer if self.config.async_mode: — easier to read when the primary condition isn't negated.
|
|
||
| registry.add_callback(MultiAgentInitializedEvent, _on_multi_agent_initialized) | ||
| registry.add_callback(AfterNodeCallEvent, _on_after_node_call) | ||
| registry.add_callback(AfterMultiAgentInvocationEvent, _on_after_multi_agent_invocation) |
There was a problem hiding this comment.
nit: Do we support BidiAgent? The sync path picks up BidiAgent hooks through the parent, but async mode doesn't register them. Should we add them here or explicitly document that BidiAgent + async_mode is unsupported?
| logger.info("Flushed %d message events to AgentCore Memory", len(results)) | ||
| return results | ||
|
|
||
| def _flush_agent_states_only(self) -> list[dict[str, Any]]: |
There was a problem hiding this comment.
This function copies the buffer under one lock acquisition (L1142-1143), does network I/O, then clears it under a separate lock acquisition (L1175-1176). Anything appended
between the copy and the clear gets destroyed without ever being sent.
With async mode, multiple to_thread workers can be calling create_agent concurrently, which makes the window between copy and clear much easier to hit. Let's copy the same locking mechanism in _flush_messages_only() where it sends and clears under the same lock.
There was a problem hiding this comment.
great find, agree we should copy and clear within the same lock acquire.
Issue #, if available:
#452
Description of changes:
async_mode: bool = Falseon AgentCoreMemoryConfig. Opt-in; default preserves existing behavior.register_hooksinstalls async callbacks that wrap the existing sync methods inasyncio.to_thread()register_hookstime whenasync_mode=Truepointing users tostream_async/invoke_async— sync invocation will raise RuntimeError from Strands.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.