Skip to content

Component manager shutdown can leak formula actors if a cache stop fails #1412

@shsms

Description

@shsms

Manager shutdown can still leak formula actors if a cache stop fails

Severity: Medium
Location: src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_component_manager.py, _stop_all_unreachable_power_subscriptions()

The single-subscription teardown path was improved correctly: _stop_unreachable_power_subscription() uses try/finally, so the pooled formula is stopped even if subscription.cache.stop() raises.

But the bulk shutdown path still does this:

async def _stop_all_unreachable_power_subscriptions(self) -> None:
    """Stop every unreachable-power subscription and the owned formula pool."""
    for subscription in self._unreachable_power_subscriptions.values():
        await subscription.cache.stop()
    self._unreachable_power_subscriptions.clear()
    if self._unreachable_power_pool is not None:
        await self._unreachable_power_pool.stop()
        self._unreachable_power_pool = None

If any subscription.cache.stop() raises, the function exits before clearing subscriptions and, more importantly, before self._unreachable_power_pool.stop() runs. That means the formula-evaluating actors owned by the pool can remain running during manager shutdown. This is the same class of leak the PR is trying to fix, just in the manager-level teardown path.

Impact

During normal stop/teardown of BatteryManager or PVManager, a cache teardown failure could prevent the owned FormulaPool from stopping. Battery and PV managers both call _stop_all_unreachable_power_subscriptions() during stop(): battery after stopping component caches and the status tracker, PV after stopping inverter caches and the status tracker.

In practice, this could leave formula actors and resampler subscriptions alive after a component manager is stopped. If managers are recreated, CPU/memory usage can still grow over time.

Suggested fix

Make the pool cleanup happen in a finally, and avoid losing later cleanup because one cache failed. For example:

async def _stop_all_unreachable_power_subscriptions(self) -> None:
    subscriptions = list(self._unreachable_power_subscriptions.values())
    self._unreachable_power_subscriptions.clear()

    cache_stop_errors: list[BaseException] = []
    try:
        for subscription in subscriptions:
            try:
                await subscription.cache.stop()
            except BaseException as err:
                cache_stop_errors.append(err)
    finally:
        if self._unreachable_power_pool is not None:
            try:
                await self._unreachable_power_pool.stop()
            finally:
                self._unreachable_power_pool = None

    if cache_stop_errors:
        raise cache_stop_errors[0]

A variant using asyncio.gather(..., return_exceptions=True) would also be fine, as long as the formula pool is always stopped and an error is still surfaced.

Suggested test

Add a test parallel to the existing single-subscription failure test, but exercise manager.stop() / _stop_all_unreachable_power_subscriptions():

  1. Create two unreachable subscriptions.
  2. Make the first LatestValueCache.stop() raise.
  3. Assert the owned pool’s stop() is still called.
  4. Assert the pool reference is cleared or otherwise left in a retryable, well-defined state.
  5. Assert the original cache-stop error is not silently swallowed.

Originally posted by @llucax in #1411 (review)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Priority

    None yet

    Projects

    Status
    To do

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions