Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,16 @@ root_folder (or any name you prefer)
| --ppn hostname:slotcount | Number of processes per node | N/A | YES (minimal 4) | YES (minimal 4) |
| --num-processes | Total number of processes | Node local: 8<br>Global: the value in Table 1 | NO | YES |
| --checkpoint-folder | The folder to save the checkpoint data | checkpoint/{workload} | YES | YES |
| --num-checkpoints-write | Number of write checkpoints | 10 or 0** | NO | NO |
| --num-checkpoints-read | Number of write checkpoints | 10 or 0** | NO | NO |
| --num-checkpoints-write | Number of write checkpoints | 10 (or 0**) | Only 10 or 0** | YES |
| --num-checkpoints-read | Number of read checkpoints | 10 (or 0**) | Only 10 or 0** | YES |

**NOTE: In the ``--ppn`` syntax above, the ``slotcount`` value means the number of processes per node to run.**

**\*\* NOTE: In CLOSED submissions, ``--num-checkpoints-write`` and ``--num-checkpoints-read`` may be set to ``0`` only as part of the two-invocation cache-flush workflow described in §4.7.1: one invocation runs the write phase with ``--num-checkpoints-read=0`` and the next runs the read phase with ``--num-checkpoints-write=0``. The default for both flags is 10 and the total work performed across both invocations must still be 10 writes followed by 10 reads.**

## 4.7. Storage System Must Be Simultaneously R/W or _Remappable_

4.7.1. **checkpointCacheFlushValidation** -- If a submitter needs to issue a cache flush operation between the write phase and the read phase of a checkpoint benchmark run, then the validator must check that ``--num-checkpoints-read=0`` was set during the write phase, that there was a short pause of up to 30 seconds maximum, then the write phase was started with ``--num-checkpoints-write=0`` set.
4.7.1. **checkpointCacheFlushValidation** -- A cache flush between the write and read phases is only required when the client node has enough memory to cache all of the checkpoints written by that client during the run. The benchmark writes 10 sequential checkpoints specifically to overfill typical filesystem caches; on most submission configurations the early checkpoints have already been evicted by the time the read phase begins, so no flush is required. As a rule of thumb (see `checkpointing/README.md`), a flush is required when the total checkpoint size written per client is less than 3× the client node's memory capacity. When a flush is required, the submitter must execute the run in two invocations: the write phase with ``--num-checkpoints-read=0``, followed by the cache flush during a pause of no more than 30 seconds, then the read phase with ``--num-checkpoints-write=0``. The validator must confirm this split occurred and that the inter-phase gap did not exceed 30 seconds.

4.7.2. **checkpointTotalTestDuration** -- The validator must verify that the total test duration starts from the timestamp of the first checkpoint written and ends at the ending timestamp of the last checkpoint read, notably including the "remapping" time.

Expand Down
42 changes: 32 additions & 10 deletions mlpstorage_py/cli/checkpointing_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ def _add_checkpointing_core_args(parser, command):
loops=1,
params='',
allow_invalid_params=False,
num_checkpoints_read=10,
num_checkpoints_write=10,
dlio_bin_path=None,
checkpoint_folder=None,
)
Expand Down Expand Up @@ -117,14 +115,9 @@ def _add_checkpointing_core_args(parser, command):
help=HELP_MESSAGES['checkpoint_folder']
)


def _add_checkpointing_open_args(parser, command):
"""Add open/whatif-only checkpointing arguments.

Args:
parser: The subcommand parser to add arguments to.
command: The subcommand name.
"""
# num-checkpoints-read/write are available in all modes so closed submitters
# can split write and read into two invocations (set =0 on one side) with a
# cache flush in between — see Rules.md §4.7.1.
parser.add_argument(
'--num-checkpoints-read', '-ncr',
type=int,
Expand All @@ -137,6 +130,15 @@ def _add_checkpointing_open_args(parser, command):
default=10,
help=HELP_MESSAGES['num_checkpoints']
)


def _add_checkpointing_open_args(parser, command):
"""Add open/whatif-only checkpointing arguments.

Args:
parser: The subcommand parser to add arguments to.
command: The subcommand name.
"""
parser.add_argument(
'--loops',
type=int,
Expand Down Expand Up @@ -174,6 +176,26 @@ def validate_checkpointing_arguments(args):
if args.num_checkpoints_read < 0 or args.num_checkpoints_write < 0:
error_messages.append("Number of checkpoints read and write must be non-negative")

# CLOSED mode: each flag must be exactly 10 or 0, and both can't be 0.
# Two-invocation split (10/0 then 0/10) is the only deviation from 10/10
# allowed by Rules.md §4.7.1.
if getattr(args, 'mode', None) == 'closed':
if args.num_checkpoints_write not in (10, 0):
error_messages.append(
"CLOSED submissions require --num-checkpoints-write to be 10 or 0 "
f"(got {args.num_checkpoints_write}). See Rules.md §4.7.1."
)
if args.num_checkpoints_read not in (10, 0):
error_messages.append(
"CLOSED submissions require --num-checkpoints-read to be 10 or 0 "
f"(got {args.num_checkpoints_read}). See Rules.md §4.7.1."
)
if args.num_checkpoints_write == 0 and args.num_checkpoints_read == 0:
error_messages.append(
"CLOSED submissions cannot set both --num-checkpoints-write=0 and "
"--num-checkpoints-read=0 in the same invocation."
)

if error_messages:
for msg in error_messages:
print(msg)
Expand Down
18 changes: 12 additions & 6 deletions mlpstorage_py/cli/help_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,19 @@
--client-host-memory-in-gb/-cm N
Optional:
--hosts/-s HOST... (default: 127.0.0.1)
--num-checkpoints-read/-ncr N (default: 10; closed allows 10 or 0)
--num-checkpoints-write/-ncw N (default: 10; closed allows 10 or 0)
+ CORE_STD (--results-dir optional)
Note: --num-checkpoints-read/write fixed at 10 in closed; not shown
Note: closed runs use 10/10 by default. Use 10/0 then 0/10 in two
invocations when a cache flush is required between phases
(see Rules.md §4.7.1 and checkpointing/README.md).

CK_DATASIZE_OPEN
= CK_DATASIZE_CLOSED plus:
--num-checkpoints-read/-ncr N (default: 10)
--num-checkpoints-write/-ncw N (default: 10)
--dlio-bin-path/-dp PATH
--params/-p KEY=VALUE...
+ OPEN_STD
Note: open allows any non-negative integer for --num-checkpoints-read/-write

CK_DATASIZE_WHATIF
= CK_DATASIZE_OPEN (model positional choices identical; flags identical)
Expand All @@ -288,9 +291,13 @@
Optional:
--exec-type/-et {mpi,docker} (default: mpi)
--hosts/-s HOST... (default: 127.0.0.1)
--num-checkpoints-read/-ncr N (default: 10; closed allows 10 or 0)
--num-checkpoints-write/-ncw N (default: 10; closed allows 10 or 0)
+ MPI_ARGS
+ CORE_STD
Note: --num-checkpoints-read/write fixed at 10 in closed; not shown
Note: closed runs use 10/10 by default. Use 10/0 then 0/10 in two
invocations when a cache flush is required between phases
(see Rules.md §4.7.1 and checkpointing/README.md).

Closed rank constraints by model:
llama3-1t: 8 or 1024
Expand All @@ -300,13 +307,12 @@

CK_RUN_OPEN
= CK_RUN_CLOSED plus:
--num-checkpoints-read/-ncr N (default: 10)
--num-checkpoints-write/-ncw N (default: 10)
--dlio-bin-path/-dp PATH
--params/-p KEY=VALUE...
+ OPEN_STD
+ TIMESERIES
Note: open allows any multiple of the per-model GPU-per-DP-instance count
and any non-negative integer for --num-checkpoints-read/-write

CK_RUN_WHATIF
= CK_RUN_OPEN (model positional choices identical; flags identical)
Expand Down
7 changes: 7 additions & 0 deletions mlpstorage_py/rules/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class BenchmarkRunData:
metrics: Optional[Dict[str, Any]] = None
result_dir: Optional[str] = None
accelerator: Optional[str] = None
end_datetime: str = ""


@dataclass
Expand Down Expand Up @@ -755,6 +756,7 @@ def parse(self, result_dir: str, metadata: Optional[Dict] = None) -> BenchmarkRu
model=model,
command=command,
run_datetime=summary.get("start", ""),
end_datetime=summary.get("end", ""),
num_processes=summary.get("num_accelerators", 0),
parameters=hydra_workload_config,
override_parameters=override_parameters,
Expand Down Expand Up @@ -849,6 +851,7 @@ def _from_metadata(self, metadata: Dict, result_dir: str) -> BenchmarkRunData:
model=metadata.get('model'),
command=metadata.get('command'),
run_datetime=metadata.get('run_datetime', ''),
end_datetime=metadata.get('end_datetime', ''),
num_processes=metadata.get('num_processes', 0),
parameters=metadata.get('parameters', {}),
override_parameters=metadata.get('override_parameters', {}),
Expand Down Expand Up @@ -944,6 +947,10 @@ def command(self):
def run_datetime(self):
return self._data.run_datetime

@property
def end_datetime(self):
return self._data.end_datetime

@property
def num_processes(self):
return self._data.num_processes
Expand Down
179 changes: 179 additions & 0 deletions mlpstorage_py/rules/submission_checkers/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,32 @@
Validates checkpointing benchmark submissions (multiple runs).
"""

from datetime import datetime
from typing import Optional, List

from mlpstorage_py.config import BENCHMARK_TYPES, LLM_MODELS, PARAM_VALIDATION
from mlpstorage_py.rules.issues import Issue
from mlpstorage_py.rules.submission_checkers.base import MultiRunRulesChecker


# Maximum allowed pause between the write-phase end and the read-phase start
# in a two-invocation CLOSED submission (Rules.md §4.7.1).
MAX_INTER_PHASE_GAP_SECONDS = 30


def _parse_summary_timestamp(value):
"""Parse a DLIO summary 'start'/'end' timestamp into a datetime, or None."""
if not value:
return None
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
try:
return datetime.strptime(value, "%Y%m%d_%H%M%S")
except (TypeError, ValueError):
return None


class CheckpointSubmissionRulesChecker(MultiRunRulesChecker):
"""Rules checker for checkpointing benchmark submissions."""

Expand Down Expand Up @@ -83,3 +102,163 @@ def check_num_runs(self) -> List[Issue]:
))

return issues

def check_invocation_structure(self) -> List[Issue]:
"""Enforce the CLOSED-mode invocation pattern from Rules.md §4.7.1.

A CLOSED checkpointing submission must consist of either:
- a single invocation with --num-checkpoints-write=10 and
--num-checkpoints-read=10; or
- two invocations: the first with 10 writes / 0 reads, followed
within MAX_INTER_PHASE_GAP_SECONDS by a second invocation with
0 writes / 10 reads (the gap covers the cache flush).

Any other arrangement (e.g. ten 1-write runs, overlapping phases, or
a >30s gap between the write and read phases) is INVALID for CLOSED.
"""
issues = []

checkpoint_runs = [
run for run in self.benchmark_runs
if run.benchmark_type == BENCHMARK_TYPES.checkpointing
]
if not checkpoint_runs:
return issues

# Only enforce the structural pattern for CLOSED submissions; OPEN
# may freely choose any non-negative read/write counts.
categories = {run.category for run in checkpoint_runs}
if categories != {PARAM_VALIDATION.CLOSED}:
return issues

def _wr(run):
params = run.parameters.get('checkpoint', {})
return (
params.get('num_checkpoints_write', 0),
params.get('num_checkpoints_read', 0),
)

if len(checkpoint_runs) == 1:
writes, reads = _wr(checkpoint_runs[0])
if writes == self.REQUIRED_WRITES and reads == self.REQUIRED_READS:
issues.append(Issue(
validation=PARAM_VALIDATION.CLOSED,
message=(
"Single-invocation CLOSED submission with "
f"{self.REQUIRED_WRITES} writes and {self.REQUIRED_READS} reads"
),
parameter="checkpoint.invocation_structure",
expected="single run with 10 writes and 10 reads",
actual=f"writes={writes}, reads={reads}",
))
else:
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
"Single-invocation CLOSED submission must use 10 writes "
f"and 10 reads (got writes={writes}, reads={reads})."
),
parameter="checkpoint.invocation_structure",
expected="single run with 10 writes and 10 reads",
actual=f"writes={writes}, reads={reads}",
))
return issues

if len(checkpoint_runs) == 2:
ordered = sorted(
checkpoint_runs,
key=lambda r: _parse_summary_timestamp(r.run_datetime) or datetime.min,
)
first_w, first_r = _wr(ordered[0])
second_w, second_r = _wr(ordered[1])

if (first_w, first_r) != (self.REQUIRED_WRITES, 0) or \
(second_w, second_r) != (0, self.REQUIRED_READS):
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
"Two-invocation CLOSED submission must run the write phase "
"(writes=10, reads=0) followed by the read phase "
f"(writes=0, reads=10). Got first=(writes={first_w}, reads={first_r}), "
f"second=(writes={second_w}, reads={second_r})."
),
parameter="checkpoint.invocation_structure",
expected="run 1: writes=10/reads=0; run 2: writes=0/reads=10",
actual=(
f"run 1: writes={first_w}/reads={first_r}; "
f"run 2: writes={second_w}/reads={second_r}"
),
))
return issues

write_end = _parse_summary_timestamp(ordered[0].end_datetime)
read_start = _parse_summary_timestamp(ordered[1].run_datetime)
if write_end is None or read_start is None:
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
"Two-invocation CLOSED submission is missing parseable "
"start/end timestamps for the write or read phase, so the "
"inter-phase cache-flush gap (≤30s) cannot be verified."
),
parameter="checkpoint.invocation_structure",
expected="parseable write-phase end and read-phase start",
actual=(
f"write_end={ordered[0].end_datetime!r}, "
f"read_start={ordered[1].run_datetime!r}"
),
))
return issues

gap_seconds = (read_start - write_end).total_seconds()
if gap_seconds < 0:
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
f"Read phase started {-gap_seconds:.1f}s before the write "
"phase ended; the two phases must not overlap."
),
parameter="checkpoint.invocation_structure",
expected="read-phase start after write-phase end",
actual=f"gap={gap_seconds:.1f}s",
))
return issues

if gap_seconds > MAX_INTER_PHASE_GAP_SECONDS:
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
f"Gap between write-phase end and read-phase start is "
f"{gap_seconds:.1f}s, exceeding the {MAX_INTER_PHASE_GAP_SECONDS}s "
"maximum required by Rules.md §4.7.1."
),
parameter="checkpoint.invocation_structure",
expected=f"≤ {MAX_INTER_PHASE_GAP_SECONDS}s",
actual=f"{gap_seconds:.1f}s",
))
return issues

issues.append(Issue(
validation=PARAM_VALIDATION.CLOSED,
message=(
"Two-invocation CLOSED submission: write phase (10/0) followed by "
f"read phase (0/10) with a {gap_seconds:.1f}s inter-phase gap."
),
parameter="checkpoint.invocation_structure",
expected="write 10/0 then read 0/10 within 30s",
actual=f"gap={gap_seconds:.1f}s",
))
return issues

# 0 runs handled above; here len(checkpoint_runs) >= 3 (or == 0 already returned)
issues.append(Issue(
validation=PARAM_VALIDATION.INVALID,
message=(
f"CLOSED checkpointing submission must consist of 1 or 2 invocations "
f"(got {len(checkpoint_runs)}). See Rules.md §4.7.1."
),
parameter="checkpoint.invocation_structure",
expected="1 or 2 invocations",
actual=str(len(checkpoint_runs)),
))
return issues
Loading
Loading