Skip to content

Add Realm execution support for parallel operators (Replicate, Repartition, Combine, Reduction)#1641

Open
seemamirch wants to merge 5 commits intoflexflow:masterfrom
seemamirch:sm/realm-parallel-operators-all
Open

Add Realm execution support for parallel operators (Replicate, Repartition, Combine, Reduction)#1641
seemamirch wants to merge 5 commits intoflexflow:masterfrom
seemamirch:sm/realm-parallel-operators-all

Conversation

@seemamirch
Copy link
Copy Markdown

@seemamirch seemamirch commented Apr 23, 2026

Summary

This PR implements support for all four parallel operators (Replicate, Repartition, Combine, Reduction) CPU based in the Realm-based execution backend.

Motivation

Parallel operators are fundamental to distributed training in FlexFlow — they handle data redistribution between devices (repartition), replication (replicate), gathering (combine), and partial sum reduction (reduction). Previously only the graph pass infrastructure existed; this PR adds the Realm execution layer.

Approach

Each parallel op is executed as a Realm copy operation rather than a regular op task, since they involve data movement between devices rather than computation.

Data Movement

Op FWD BWD
Replicate broadcast copy to all replicas sum-reduce replica gradients
Repartition scatter full tensor into shards gather shards into full tensor
Combine gather shards into full tensor scatter gradient into shards
Reduction sum-reduce partial tensors broadcast gradient to all partials

Key Design Decisions

Offset index space allocation: Each shard instance is allocated with a non-zero origin rect reflecting its position in the full tensor (e.g. shard 1 of a [10,16] tensor split along dim 0 is allocated at [5..9, 0..15]). This allows plain Realm copies between shards and combined tensors to work correctly.

CopyDomain: A new CopyDomain enum (SRC/DST) selects which instance's index space is used as the copy domain. SRC is used when the source is the smaller piece; DST when the destination is the smaller piece.

get_per_device_shape: A new function that correctly computes the per-device tensor size by dividing each dimension by its shard degree, used for instance allocation.

ManyToOne collision fix: Combine FWD and Reduction FWD produce multiple invocations with the same output DynamicValueAttrs. Only the first is registered in the producer map to avoid collision while still marking the value as having a producer.

Changes

New files

  • lib/task-spec/include/task-spec/dynamic_graph/parallel_op_utils.h — shared is_parallel_op_attrs helper
  • lib/realm-execution/test/src/realm-execution/test_op_combine.cc — combine op e2e test
  • lib/realm-execution/test/src/realm-execution/test_op_reduce.cc — reduction op e2e test
  • lib/realm-execution/test/src/realm-execution/test_op_repartition.cc — repartition op e2e test

Modified files

  • shard_expansion.cc — add perform_shard_expansion_one_to_many and perform_shard_expansion_many_to_one generic functions covering all eight FWD/BWD combinations
  • pass_expansion.cc — add perform_pass_expansion_for_parallel_op
  • copy_insertion.cc — guard parallel ops in copy insertion
  • make_dynamic_open_dataflow_graph_from_mapped_pcg.cc — dispatch parallel ops to build_parallel_op_invocation
  • dynamic_open_dataflow_graph.cc — fix ManyToOne collision for combine/reduction FWD
  • pcg_instance.cc — add Realm copy dispatch for all parallel ops
  • realm_context.cc/h — add CopyDomain, create_instance_with_offset, update issue_copy to use get_indexspace()
  • instance_allocation.cc — use offset index spaces for sharded tensor allocation
  • parallel_tensor_dims.cc/h — add get_per_device_dims
  • parallel_tensor_shape.cc/h — add get_per_device_shape
  • task_id_t.cc — return nullopt for all parallel ops in get_init_task_id_for_op_attrs

Testing

Added e2e tests for each parallel op running on 2 CPU devices:

  • test_op_combine.cc — repartition → combine → relu
  • test_op_reduce.cc — repartition → linear → reduction → relu
  • test_op_repartition.cc — repartition → relu

This change is Reviewable

Seema Mirchandaney added 5 commits April 9, 2026 15:49
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion
- Add perform_shard_expansion_for_replicate and _bwd for shard expansion
- Add build_replicate_invocation in make_dynamic_open_dataflow_graph
- Add is_replicate_attrs helper and guard replicate in copy_insertion
- Add ReplicateAttrs to TrainingOperationAttrs
- Add SumReductionFloat/Double for backward replicate reduce operation
- Add issue_replicate_bwd in spawn_dynamic_node_invocation
- Fix per_device_op_state init race condition with direct write
- Fix .value() calls on optional per_device_op_state across op impls
- Update issue_copy to support optional reduction op
- Add testcase for replicate op
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion
- Add perform_shard_expansion_for_replicate and _bwd for shard expansion
- Add build_replicate_invocation in make_dynamic_open_dataflow_graph
- Add is_replicate_attrs helper and guard replicate in copy_insertion
- Add ReplicateAttrs to TrainingOperationAttrs
- Add SumReductionFloat/Double for backward replicate reduce operation
- Add issue_replicate_bwd in spawn_dynamic_node_invocation
- Fix per_device_op_state init race condition with direct write
- Fix .value() calls on optional per_device_op_state across op impls
- Update issue_copy to support optional reduction op
- Add testcase for replicate op
…ion) in Realm backend (CPU versions)

Each parallel op is handled via Realm copies rather than op tasks:
- Replicate FWD: broadcast copy; BWD: sum-reduce replica gradients
- Repartition FWD: scatter into shards; BWD: gather shards into full tensor
- Combine FWD: gather shards into full tensor; BWD: scatter gradient into shards
- Reduction FWD: sum-reduce partials; BWD: broadcast gradient to all partials

Key implementation details:
- Parallel ops have no ComputationGraphOpAttrs equivalent
- Instance allocation uses offset index spaces for sharded tensors
- issue_copy uses actual instance index space via get_indexspace()
- Add CopyDomain::SRC/DST to select correct copy domain
- Combine FWD and Reduction FWD register only first invocation in ManyToOne
- Add get_per_device_shape() for correct per-device tensor size
- Add perform_shard_expansion_one_to_many and _many_to_one generic functions
- Add parallel_op_utils.h shared header for is_parallel_op_attrs
- Add CopyDomain enum and create_instance_with_offset to RealmContext
- Add multi-cpu tests for the parallel operators
@seemamirch
Copy link
Copy Markdown
Author

@lockshaw @elliottslaughter - please review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant