Skip to content

[Fix] Fix race condition with memory consumption by using ray_init_fixture#1787

Open
SumanthRH wants to merge 4 commits into
mainfrom
fix-moe-test-new-inf
Open

[Fix] Fix race condition with memory consumption by using ray_init_fixture#1787
SumanthRH wants to merge 4 commits into
mainfrom
fix-moe-test-new-inf

Conversation

@SumanthRH

Copy link
Copy Markdown
Member

What does this PR do?

Quick follow-up to #1737 . For context, please see: #1737 (comment)

GPU CI fails with changes in #1737

2026-06-14 05:05:22.796 | INFO     | skyrl.train.utils.utils:print_mem:933 - After offload - Allocated: 0.02 GiB, Reserved: 0.02 GiB, Free: 17.29 GiB, Total: 22.03 GiB
Removing checkpoint directory: /home/ray/ckpts/test/global_step_1
.

=================================== FAILURES ===================================
_____________ test_worker_wrap_load_weights_preserves_moe_forward ______________

    @pytest.mark.asyncio
    async def test_worker_wrap_load_weights_preserves_moe_forward() -> None:
        """Weight sync must not corrupt MoE forward output.
    
        Runs decode -> weight sync -> decode and asserts upon the outputs.
        Decodes are greedy for test determinism/repeatability.
    
        Regression test for https://github.com/NovaSky-AI/SkyRL/issues/1680.
        """
>       engine = vllm.AsyncLLMEngine.from_engine_args(
            vllm.AsyncEngineArgs(
                model=MOE_MODEL,
                # Cap the max seq length per request to fit the below 40% budget of GPU RAM
                max_model_len=4096,
                # `reload_weights` calls `initialize_layerwise_reload` to materialize fresh
                # per-layer GPU buffers; leave headroom for these buffers alongside the model
                gpu_memory_utilization=0.4,
                # Skip CUDAGraph capture as the test only needs two short prefills
                enforce_eager=True,
                worker_extension_cls=f"{WorkerWrapRepro.__module__}.{WorkerWrapRepro.__name__}",
            )
        )

tests/backends/skyrl_train/gpu/gpu_ci/inference_servers/test_weight_sync.py:525: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/async_llm.py:246: in from_engine_args
    return cls(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/async_llm.py:146: in __init__
    self.engine_core = EngineCoreClient.make_async_mp_client(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/tracing/otel.py:178: in sync_wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:130: in make_async_mp_client
    return AsyncMPClient(*client_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/tracing/otel.py:178: in sync_wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:900: in __init__
    super().__init__(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:535: in __init__
    with launch_core_engines(
/home/ray/anaconda3/lib/python3.12/contextlib.py:144: in __exit__
    next(self.gen)
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/utils.py:1119: in launch_core_engines
    wait_for_engine_startup(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

handshake_socket = <zmq.Socket(zmq.ROUTER) at 0x71daddda2ba0 closed>
addresses = EngineZmqAddresses(inputs=['ipc:///tmp/778ab18d-fa09-4f4e-b0b5-90ad93392ed8'], outputs=['ipc:///tmp/5018d6c0-5a9c-4c55-b7aa-2a59e44cd20e'], coordinator_input=None, coordinator_output=None, frontend_stats_publish_address=None)
core_engines = [<vllm.v1.engine.utils.CoreEngine object at 0x71d9f883fd10>]
parallel_config = ParallelConfig(pipeline_parallel_size=1, tensor_parallel_size=1, prefill_context_parallel_size=1, data_parallel_size=1..._comm_backend='ag_rs', cp_kv_cache_interleave_size=1, data_parallel_index=0, _api_process_count=1, _api_process_rank=0)
coordinated_dp = False
cache_config = CacheConfig(block_size=16, user_specified_block_size=False, user_specified_mamba_block_size=False, hash_block_size=Non...=False, kv_cache_memory_bytes=None, kv_offloading_size=None, kv_offloading_backend='native', _block_size_resolved=True)
proc_manager = <vllm.v1.engine.utils.CoreEngineProcManager object at 0x71da703220f0>
coord_process = None

    def wait_for_engine_startup(
        handshake_socket: zmq.Socket,
        addresses: EngineZmqAddresses,
        core_engines: list[CoreEngine],
        parallel_config: ParallelConfig,
        coordinated_dp: bool,
        cache_config: CacheConfig,
        proc_manager: CoreEngineProcManager | None,
        coord_process: Process | None,
    ):
        # Wait for engine core process(es) to send ready messages.
        local_count = parallel_config.data_parallel_size_local
        remote_count = len(core_engines) - local_count
        # [local, remote] counts
        conn_pending, start_pending = [local_count, remote_count], [0, 0]
        poller = zmq.Poller()
        poller.register(handshake_socket, zmq.POLLIN)
    
        remote_should_be_headless = (
            not parallel_config.data_parallel_hybrid_lb
            and not parallel_config.data_parallel_external_lb
        )
    
        if proc_manager is not None:
            for sentinel in proc_manager.sentinels():
                poller.register(sentinel, zmq.POLLIN)
        if coord_process is not None:
            poller.register(coord_process.sentinel, zmq.POLLIN)
        while any(conn_pending) or any(start_pending):
            events = poller.poll(STARTUP_POLL_PERIOD_MS)
            if not events:
                if any(conn_pending):
                    logger.debug(
                        "Waiting for %d local, %d remote core engine proc(s) to connect.",
                        *conn_pending,
                    )
                if any(start_pending):
                    logger.debug(
                        "Waiting for %d local, %d remote core engine proc(s) to start.",
                        *start_pending,
                    )
                continue
            if len(events) > 1 or events[0][0] != handshake_socket:
                # One of the local core processes exited.
                finished = proc_manager.finished_procs() if proc_manager else {}
                if coord_process is not None and coord_process.exitcode is not None:
                    finished[coord_process.name] = coord_process.exitcode
>               raise RuntimeError(
                    "Engine core initialization failed. "
                    "See root cause above. "
                    f"Failed core proc(s): {finished}"
                )
E               RuntimeError: Engine core initialization failed. See root cause above. Failed core proc(s): {}

/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/utils.py:1178: RuntimeError
___________ test_worker_wrap_multichunk_reload_preserves_moe_forward ___________

capfd = <_pytest.capture.CaptureFixture object at 0x71d9f883c2f0>
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x71d9f883e300>

    @pytest.mark.asyncio
    async def test_worker_wrap_multichunk_reload_preserves_moe_forward(
        capfd: pytest.CaptureFixture[str],
        monkeypatch: pytest.MonkeyPatch,
    ) -> None:
        """Multi-chunk weight sync through the real CUDA-IPC legacy sender must not corrupt MoE.
    
        Regression test for https://github.com/NovaSky-AI/SkyRL/issues/1680.
        """
        # Test is covering the legacy sender path, so force it here
        monkeypatch.setattr(
            "skyrl.backends.skyrl_train.weight_sync.cuda_ipc_strategy._SKYRL_USE_NEW_INFERENCE",
            False,
        )
    
>       engine = vllm.AsyncLLMEngine.from_engine_args(
            vllm.AsyncEngineArgs(
                model=MOE_MODEL,
                # See rationale for these hypers in the above
                # `test_worker_wrap_load_weights_preserves_moe_forward`
                max_model_len=4096,
                gpu_memory_utilization=0.4,
                enforce_eager=True,
                worker_extension_cls=f"{WorkerWrap.__module__}.{WorkerWrap.__name__}",
            )
        )

tests/backends/skyrl_train/gpu/gpu_ci/inference_servers/test_weight_sync.py:587: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/async_llm.py:246: in from_engine_args
    return cls(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/async_llm.py:146: in __init__
    self.engine_core = EngineCoreClient.make_async_mp_client(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/tracing/otel.py:178: in sync_wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:130: in make_async_mp_client
    return AsyncMPClient(*client_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/tracing/otel.py:178: in sync_wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:900: in __init__
    super().__init__(
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/core_client.py:535: in __init__
    with launch_core_engines(
/home/ray/anaconda3/lib/python3.12/contextlib.py:144: in __exit__
    next(self.gen)
/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/utils.py:1119: in launch_core_engines
    wait_for_engine_startup(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

handshake_socket = <zmq.Socket(zmq.ROUTER) at 0x71d9f8f3ac10 closed>
addresses = EngineZmqAddresses(inputs=['ipc:///tmp/bc9761db-b067-4d1a-b2be-4c82e107e7f4'], outputs=['ipc:///tmp/f38e312f-ed57-4d57-9b09-e9e6457602d6'], coordinator_input=None, coordinator_output=None, frontend_stats_publish_address=None)
core_engines = [<vllm.v1.engine.utils.CoreEngine object at 0x71d9f88b6180>]
parallel_config = ParallelConfig(pipeline_parallel_size=1, tensor_parallel_size=1, prefill_context_parallel_size=1, data_parallel_size=1..._comm_backend='ag_rs', cp_kv_cache_interleave_size=1, data_parallel_index=0, _api_process_count=1, _api_process_rank=0)
coordinated_dp = False
cache_config = CacheConfig(block_size=16, user_specified_block_size=False, user_specified_mamba_block_size=False, hash_block_size=Non...=False, kv_cache_memory_bytes=None, kv_offloading_size=None, kv_offloading_backend='native', _block_size_resolved=True)
proc_manager = <vllm.v1.engine.utils.CoreEngineProcManager object at 0x71d9f88b57f0>
coord_process = None

    def wait_for_engine_startup(
        handshake_socket: zmq.Socket,
        addresses: EngineZmqAddresses,
        core_engines: list[CoreEngine],
        parallel_config: ParallelConfig,
        coordinated_dp: bool,
        cache_config: CacheConfig,
        proc_manager: CoreEngineProcManager | None,
        coord_process: Process | None,
    ):
        # Wait for engine core process(es) to send ready messages.
        local_count = parallel_config.data_parallel_size_local
        remote_count = len(core_engines) - local_count
        # [local, remote] counts
        conn_pending, start_pending = [local_count, remote_count], [0, 0]
        poller = zmq.Poller()
        poller.register(handshake_socket, zmq.POLLIN)
    
        remote_should_be_headless = (
            not parallel_config.data_parallel_hybrid_lb
            and not parallel_config.data_parallel_external_lb
        )
    
        if proc_manager is not None:
            for sentinel in proc_manager.sentinels():
                poller.register(sentinel, zmq.POLLIN)
        if coord_process is not None:
            poller.register(coord_process.sentinel, zmq.POLLIN)
        while any(conn_pending) or any(start_pending):
            events = poller.poll(STARTUP_POLL_PERIOD_MS)
            if not events:
                if any(conn_pending):
                    logger.debug(
                        "Waiting for %d local, %d remote core engine proc(s) to connect.",
                        *conn_pending,
                    )
                if any(start_pending):
                    logger.debug(
                        "Waiting for %d local, %d remote core engine proc(s) to start.",
                        *start_pending,
                    )
                continue
            if len(events) > 1 or events[0][0] != handshake_socket:
                # One of the local core processes exited.
                finished = proc_manager.finished_procs() if proc_manager else {}
                if coord_process is not None and coord_process.exitcode is not None:
                    finished[coord_process.name] = coord_process.exitcode
>               raise RuntimeError(
                    "Engine core initialization failed. "
                    "See root cause above. "
                    f"Failed core proc(s): {finished}"
                )
E               RuntimeError: Engine core initialization failed. See root cause above. Failed core proc(s): {}

/home/ray/.cache/uv/builds-v0/.tmpQN77ho/lib/python3.12/site-packages/vllm/v1/engine/utils.py:1178: RuntimeError
=============================== warnings summary ===

The reason is that the new tests don't have proper init/ teardown. The previous test still occupies gpu memory while the next moe test runs. temporary buffers or temporary memory usage from the previous test is released while the second test runs profiling. vLLM doesn't permit free memory to change on the worker while it's running profiling and it errors out.

The fix is to use ray_init_fixture for proper init and teardown. I also separated out the tests into a standalone module because it's a specific regression test.

I don't have push access to the contributor's repo and they don't they have access either. So I'm adding the fixes in this follow-up PR

Signed-off-by: SumanthRH <sumanthrh99@gmail.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the weight synchronization tests by moving the Mixture of Experts (MoE) model weight sync tests from test_weight_sync.py into a dedicated test file test_weight_sync_moe.py. Feedback on the new test file highlights that the vllm.AsyncLLMEngine and torch.distributed process groups are initialized but never properly shut down or destroyed, which can lead to GPU memory leaks and background process pollution. It is recommended to wrap these test routines in try...finally blocks to ensure proper resource cleanup.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

x
Signed-off-by: SumanthRH <sumanthrh99@gmail.com>
@SumanthRH

Copy link
Copy Markdown
Member Author

Triggered a GPU CI run here

…e_resume

Signed-off-by: SumanthRH <sumanthrh99@gmail.com>
Signed-off-by: SumanthRH <sumanthrh99@gmail.com>
@SumanthRH

Copy link
Copy Markdown
Member Author

I encountered some more errors while running GPU CI. I had an agent categorize them:

| # | Test | Type | Root error |
| 1 | `integrations/test_pd_routing.py::test_pd_routing_verification` | FAILED | `RuntimeError: NIXL is not available` |
| 2 | `test_engine_generation.py::test_pd_generation` | FAILED | `RuntimeError: NIXL is not available` |
| 3 | `test_engine_generation.py::test_pd_generation_non_colocated[1P1D_non_colocated]` | FAILED | `RuntimeError: NIXL is not available` |
| 4 | `test_weight_sync.py::TestWeightUpdateFlow::test_update_weights_flow[pd_1P1D_non_colocated]` | ERROR (setup) | `RuntimeError: NIXL is not available` |
| 5 | `test_skyrl_gym_generator.py::test_generator_formatting_no_use_conversation_multi_turn[unsloth/Llama-3.2-1B-Instruct]` | FAILED | `assert 0 == 1` (no EOS token in loss-masked-in response) |
| 6 | `test_inference_server_group.py::TestServerGroupAndRouter::test_pause_resume` | ERROR (teardown) | `RuntimeError: There is no current event loop in thread 'MainThread'` |

There were two fixes needed:

  1. NIXL is not available : This is an interesting one. The root cause is that we only use nixl-cu12 but vllm imports _api from the nixl PyPI package. We had multiple fixes touch this package recently. github.com/[chore] Exclude nixl-cu13 from packages #1756 excluded nixl-cu13 from the toml. We noticed some CI errors and [chore] pin nixl-cu12 #1759 changed it to just installing nixl-cu12 directly instead of excluding nixl-cu13. The failure is fixed by reverting [chore] pin nixl-cu12 #1759.

  2. RuntimeError: There is no current event loop in thread 'MainThread' : this is a simple fix for proper teardown.

  3. test_skyrl_gym_generator.py::test_generator_formatting_no_use_conversation_multi_turn[unsloth/Llama-3.2-1B-Instruct]: this is a flaky test

Fixes for 1. and 2. have been added in 164064c

I re-ran the failing tests after these fixes:

https://console.anyscale.com/cld_hxkifz7xa22mwicp21nzkds1lw/prj_4b6c498rypyq6g7yhk6vzgjevt/jobs/prodjob_m71sz4nf9jc1qig4xecgrrt2xq?job-tab=overview&job-logs-section-tabs=application_logs

and now only the Moe forward test is failng:

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED tests/backends/skyrl_train/gpu/gpu_ci/inference_servers/test_weight_sync_moe.py::test_worker_wrap_load_weights_preserves_moe_forward - RuntimeError: Engine core initialization failed. See root cause above. Failed core proc(s): {}
====== 1 failed, 33 passed, 6 skipped, 108 warnings in 1251.71s (0:20:51) ======
sys:1: DeprecationWarning: builtin type swigvarlink has no __module__ attribute

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant