diff --git a/Rules.md b/Rules.md index 9350f098..616a86a8 100644 --- a/Rules.md +++ b/Rules.md @@ -472,14 +472,16 @@ root_folder (or any name you prefer) | --ppn hostname:slotcount | Number of processes per node | N/A | YES (minimal 4) | YES (minimal 4) | | --num-processes | Total number of processes | Node local: 8
Global: the value in Table 1 | NO | YES | | --checkpoint-folder | The folder to save the checkpoint data | checkpoint/{workload} | YES | YES | -| --num-checkpoints-write | Number of write checkpoints | 10 or 0** | NO | NO | -| --num-checkpoints-read | Number of write checkpoints | 10 or 0** | NO | NO | +| --num-checkpoints-write | Number of write checkpoints | 10 (or 0**) | Only 10 or 0** | YES | +| --num-checkpoints-read | Number of read checkpoints | 10 (or 0**) | Only 10 or 0** | YES | **NOTE: In the ``--ppn`` syntax above, the ``slotcount`` value means the number of processes per node to run.** +**\*\* NOTE: In CLOSED submissions, ``--num-checkpoints-write`` and ``--num-checkpoints-read`` may be set to ``0`` only as part of the two-invocation cache-flush workflow described in §4.7.1: one invocation runs the write phase with ``--num-checkpoints-read=0`` and the next runs the read phase with ``--num-checkpoints-write=0``. The default for both flags is 10 and the total work performed across both invocations must still be 10 writes followed by 10 reads.** + ## 4.7. Storage System Must Be Simultaneously R/W or _Remappable_ -4.7.1. **checkpointCacheFlushValidation** -- If a submitter needs to issue a cache flush operation between the write phase and the read phase of a checkpoint benchmark run, then the validator must check that ``--num-checkpoints-read=0`` was set during the write phase, that there was a short pause of up to 30 seconds maximum, then the write phase was started with ``--num-checkpoints-write=0`` set. +4.7.1. **checkpointCacheFlushValidation** -- A cache flush between the write and read phases is only required when the client node has enough memory to cache all of the checkpoints written by that client during the run. The benchmark writes 10 sequential checkpoints specifically to overfill typical filesystem caches; on most submission configurations the early checkpoints have already been evicted by the time the read phase begins, so no flush is required. As a rule of thumb (see `checkpointing/README.md`), a flush is required when the total checkpoint size written per client is less than 3× the client node's memory capacity. When a flush is required, the submitter must execute the run in two invocations: the write phase with ``--num-checkpoints-read=0``, followed by the cache flush during a pause of no more than 30 seconds, then the read phase with ``--num-checkpoints-write=0``. The validator must confirm this split occurred and that the inter-phase gap did not exceed 30 seconds. 4.7.2. **checkpointTotalTestDuration** -- The validator must verify that the total test duration starts from the timestamp of the first checkpoint written and ends at the ending timestamp of the last checkpoint read, notably including the "remapping" time. diff --git a/mlpstorage_py/cli/checkpointing_args.py b/mlpstorage_py/cli/checkpointing_args.py index 53a4aafb..171a2715 100755 --- a/mlpstorage_py/cli/checkpointing_args.py +++ b/mlpstorage_py/cli/checkpointing_args.py @@ -62,8 +62,6 @@ def _add_checkpointing_core_args(parser, command): loops=1, params='', allow_invalid_params=False, - num_checkpoints_read=10, - num_checkpoints_write=10, dlio_bin_path=None, checkpoint_folder=None, ) @@ -117,14 +115,9 @@ def _add_checkpointing_core_args(parser, command): help=HELP_MESSAGES['checkpoint_folder'] ) - -def _add_checkpointing_open_args(parser, command): - """Add open/whatif-only checkpointing arguments. - - Args: - parser: The subcommand parser to add arguments to. - command: The subcommand name. - """ + # num-checkpoints-read/write are available in all modes so closed submitters + # can split write and read into two invocations (set =0 on one side) with a + # cache flush in between — see Rules.md §4.7.1. parser.add_argument( '--num-checkpoints-read', '-ncr', type=int, @@ -137,6 +130,15 @@ def _add_checkpointing_open_args(parser, command): default=10, help=HELP_MESSAGES['num_checkpoints'] ) + + +def _add_checkpointing_open_args(parser, command): + """Add open/whatif-only checkpointing arguments. + + Args: + parser: The subcommand parser to add arguments to. + command: The subcommand name. + """ parser.add_argument( '--loops', type=int, @@ -174,6 +176,26 @@ def validate_checkpointing_arguments(args): if args.num_checkpoints_read < 0 or args.num_checkpoints_write < 0: error_messages.append("Number of checkpoints read and write must be non-negative") + # CLOSED mode: each flag must be exactly 10 or 0, and both can't be 0. + # Two-invocation split (10/0 then 0/10) is the only deviation from 10/10 + # allowed by Rules.md §4.7.1. + if getattr(args, 'mode', None) == 'closed': + if args.num_checkpoints_write not in (10, 0): + error_messages.append( + "CLOSED submissions require --num-checkpoints-write to be 10 or 0 " + f"(got {args.num_checkpoints_write}). See Rules.md §4.7.1." + ) + if args.num_checkpoints_read not in (10, 0): + error_messages.append( + "CLOSED submissions require --num-checkpoints-read to be 10 or 0 " + f"(got {args.num_checkpoints_read}). See Rules.md §4.7.1." + ) + if args.num_checkpoints_write == 0 and args.num_checkpoints_read == 0: + error_messages.append( + "CLOSED submissions cannot set both --num-checkpoints-write=0 and " + "--num-checkpoints-read=0 in the same invocation." + ) + if error_messages: for msg in error_messages: print(msg) diff --git a/mlpstorage_py/cli/help_formatter.py b/mlpstorage_py/cli/help_formatter.py index 56022c5c..c9fcb951 100644 --- a/mlpstorage_py/cli/help_formatter.py +++ b/mlpstorage_py/cli/help_formatter.py @@ -262,16 +262,19 @@ --client-host-memory-in-gb/-cm N Optional: --hosts/-s HOST... (default: 127.0.0.1) + --num-checkpoints-read/-ncr N (default: 10; closed allows 10 or 0) + --num-checkpoints-write/-ncw N (default: 10; closed allows 10 or 0) + CORE_STD (--results-dir optional) - Note: --num-checkpoints-read/write fixed at 10 in closed; not shown + Note: closed runs use 10/10 by default. Use 10/0 then 0/10 in two + invocations when a cache flush is required between phases + (see Rules.md §4.7.1 and checkpointing/README.md). CK_DATASIZE_OPEN = CK_DATASIZE_CLOSED plus: - --num-checkpoints-read/-ncr N (default: 10) - --num-checkpoints-write/-ncw N (default: 10) --dlio-bin-path/-dp PATH --params/-p KEY=VALUE... + OPEN_STD + Note: open allows any non-negative integer for --num-checkpoints-read/-write CK_DATASIZE_WHATIF = CK_DATASIZE_OPEN (model positional choices identical; flags identical) @@ -288,9 +291,13 @@ Optional: --exec-type/-et {mpi,docker} (default: mpi) --hosts/-s HOST... (default: 127.0.0.1) + --num-checkpoints-read/-ncr N (default: 10; closed allows 10 or 0) + --num-checkpoints-write/-ncw N (default: 10; closed allows 10 or 0) + MPI_ARGS + CORE_STD - Note: --num-checkpoints-read/write fixed at 10 in closed; not shown + Note: closed runs use 10/10 by default. Use 10/0 then 0/10 in two + invocations when a cache flush is required between phases + (see Rules.md §4.7.1 and checkpointing/README.md). Closed rank constraints by model: llama3-1t: 8 or 1024 @@ -300,13 +307,12 @@ CK_RUN_OPEN = CK_RUN_CLOSED plus: - --num-checkpoints-read/-ncr N (default: 10) - --num-checkpoints-write/-ncw N (default: 10) --dlio-bin-path/-dp PATH --params/-p KEY=VALUE... + OPEN_STD + TIMESERIES Note: open allows any multiple of the per-model GPU-per-DP-instance count + and any non-negative integer for --num-checkpoints-read/-write CK_RUN_WHATIF = CK_RUN_OPEN (model positional choices identical; flags identical) diff --git a/mlpstorage_py/rules/models.py b/mlpstorage_py/rules/models.py index 85addf44..d78c519f 100755 --- a/mlpstorage_py/rules/models.py +++ b/mlpstorage_py/rules/models.py @@ -83,6 +83,7 @@ class BenchmarkRunData: metrics: Optional[Dict[str, Any]] = None result_dir: Optional[str] = None accelerator: Optional[str] = None + end_datetime: str = "" @dataclass @@ -755,6 +756,7 @@ def parse(self, result_dir: str, metadata: Optional[Dict] = None) -> BenchmarkRu model=model, command=command, run_datetime=summary.get("start", ""), + end_datetime=summary.get("end", ""), num_processes=summary.get("num_accelerators", 0), parameters=hydra_workload_config, override_parameters=override_parameters, @@ -849,6 +851,7 @@ def _from_metadata(self, metadata: Dict, result_dir: str) -> BenchmarkRunData: model=metadata.get('model'), command=metadata.get('command'), run_datetime=metadata.get('run_datetime', ''), + end_datetime=metadata.get('end_datetime', ''), num_processes=metadata.get('num_processes', 0), parameters=metadata.get('parameters', {}), override_parameters=metadata.get('override_parameters', {}), @@ -944,6 +947,10 @@ def command(self): def run_datetime(self): return self._data.run_datetime + @property + def end_datetime(self): + return self._data.end_datetime + @property def num_processes(self): return self._data.num_processes diff --git a/mlpstorage_py/rules/submission_checkers/checkpointing.py b/mlpstorage_py/rules/submission_checkers/checkpointing.py index 57868792..ac7d832f 100755 --- a/mlpstorage_py/rules/submission_checkers/checkpointing.py +++ b/mlpstorage_py/rules/submission_checkers/checkpointing.py @@ -4,6 +4,7 @@ Validates checkpointing benchmark submissions (multiple runs). """ +from datetime import datetime from typing import Optional, List from mlpstorage_py.config import BENCHMARK_TYPES, LLM_MODELS, PARAM_VALIDATION @@ -11,6 +12,24 @@ from mlpstorage_py.rules.submission_checkers.base import MultiRunRulesChecker +# Maximum allowed pause between the write-phase end and the read-phase start +# in a two-invocation CLOSED submission (Rules.md §4.7.1). +MAX_INTER_PHASE_GAP_SECONDS = 30 + + +def _parse_summary_timestamp(value): + """Parse a DLIO summary 'start'/'end' timestamp into a datetime, or None.""" + if not value: + return None + try: + return datetime.fromisoformat(value) + except (TypeError, ValueError): + try: + return datetime.strptime(value, "%Y%m%d_%H%M%S") + except (TypeError, ValueError): + return None + + class CheckpointSubmissionRulesChecker(MultiRunRulesChecker): """Rules checker for checkpointing benchmark submissions.""" @@ -83,3 +102,163 @@ def check_num_runs(self) -> List[Issue]: )) return issues + + def check_invocation_structure(self) -> List[Issue]: + """Enforce the CLOSED-mode invocation pattern from Rules.md §4.7.1. + + A CLOSED checkpointing submission must consist of either: + - a single invocation with --num-checkpoints-write=10 and + --num-checkpoints-read=10; or + - two invocations: the first with 10 writes / 0 reads, followed + within MAX_INTER_PHASE_GAP_SECONDS by a second invocation with + 0 writes / 10 reads (the gap covers the cache flush). + + Any other arrangement (e.g. ten 1-write runs, overlapping phases, or + a >30s gap between the write and read phases) is INVALID for CLOSED. + """ + issues = [] + + checkpoint_runs = [ + run for run in self.benchmark_runs + if run.benchmark_type == BENCHMARK_TYPES.checkpointing + ] + if not checkpoint_runs: + return issues + + # Only enforce the structural pattern for CLOSED submissions; OPEN + # may freely choose any non-negative read/write counts. + categories = {run.category for run in checkpoint_runs} + if categories != {PARAM_VALIDATION.CLOSED}: + return issues + + def _wr(run): + params = run.parameters.get('checkpoint', {}) + return ( + params.get('num_checkpoints_write', 0), + params.get('num_checkpoints_read', 0), + ) + + if len(checkpoint_runs) == 1: + writes, reads = _wr(checkpoint_runs[0]) + if writes == self.REQUIRED_WRITES and reads == self.REQUIRED_READS: + issues.append(Issue( + validation=PARAM_VALIDATION.CLOSED, + message=( + "Single-invocation CLOSED submission with " + f"{self.REQUIRED_WRITES} writes and {self.REQUIRED_READS} reads" + ), + parameter="checkpoint.invocation_structure", + expected="single run with 10 writes and 10 reads", + actual=f"writes={writes}, reads={reads}", + )) + else: + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + "Single-invocation CLOSED submission must use 10 writes " + f"and 10 reads (got writes={writes}, reads={reads})." + ), + parameter="checkpoint.invocation_structure", + expected="single run with 10 writes and 10 reads", + actual=f"writes={writes}, reads={reads}", + )) + return issues + + if len(checkpoint_runs) == 2: + ordered = sorted( + checkpoint_runs, + key=lambda r: _parse_summary_timestamp(r.run_datetime) or datetime.min, + ) + first_w, first_r = _wr(ordered[0]) + second_w, second_r = _wr(ordered[1]) + + if (first_w, first_r) != (self.REQUIRED_WRITES, 0) or \ + (second_w, second_r) != (0, self.REQUIRED_READS): + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + "Two-invocation CLOSED submission must run the write phase " + "(writes=10, reads=0) followed by the read phase " + f"(writes=0, reads=10). Got first=(writes={first_w}, reads={first_r}), " + f"second=(writes={second_w}, reads={second_r})." + ), + parameter="checkpoint.invocation_structure", + expected="run 1: writes=10/reads=0; run 2: writes=0/reads=10", + actual=( + f"run 1: writes={first_w}/reads={first_r}; " + f"run 2: writes={second_w}/reads={second_r}" + ), + )) + return issues + + write_end = _parse_summary_timestamp(ordered[0].end_datetime) + read_start = _parse_summary_timestamp(ordered[1].run_datetime) + if write_end is None or read_start is None: + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + "Two-invocation CLOSED submission is missing parseable " + "start/end timestamps for the write or read phase, so the " + "inter-phase cache-flush gap (≤30s) cannot be verified." + ), + parameter="checkpoint.invocation_structure", + expected="parseable write-phase end and read-phase start", + actual=( + f"write_end={ordered[0].end_datetime!r}, " + f"read_start={ordered[1].run_datetime!r}" + ), + )) + return issues + + gap_seconds = (read_start - write_end).total_seconds() + if gap_seconds < 0: + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + f"Read phase started {-gap_seconds:.1f}s before the write " + "phase ended; the two phases must not overlap." + ), + parameter="checkpoint.invocation_structure", + expected="read-phase start after write-phase end", + actual=f"gap={gap_seconds:.1f}s", + )) + return issues + + if gap_seconds > MAX_INTER_PHASE_GAP_SECONDS: + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + f"Gap between write-phase end and read-phase start is " + f"{gap_seconds:.1f}s, exceeding the {MAX_INTER_PHASE_GAP_SECONDS}s " + "maximum required by Rules.md §4.7.1." + ), + parameter="checkpoint.invocation_structure", + expected=f"≤ {MAX_INTER_PHASE_GAP_SECONDS}s", + actual=f"{gap_seconds:.1f}s", + )) + return issues + + issues.append(Issue( + validation=PARAM_VALIDATION.CLOSED, + message=( + "Two-invocation CLOSED submission: write phase (10/0) followed by " + f"read phase (0/10) with a {gap_seconds:.1f}s inter-phase gap." + ), + parameter="checkpoint.invocation_structure", + expected="write 10/0 then read 0/10 within 30s", + actual=f"gap={gap_seconds:.1f}s", + )) + return issues + + # 0 runs handled above; here len(checkpoint_runs) >= 3 (or == 0 already returned) + issues.append(Issue( + validation=PARAM_VALIDATION.INVALID, + message=( + f"CLOSED checkpointing submission must consist of 1 or 2 invocations " + f"(got {len(checkpoint_runs)}). See Rules.md §4.7.1." + ), + parameter="checkpoint.invocation_structure", + expected="1 or 2 invocations", + actual=str(len(checkpoint_runs)), + )) + return issues diff --git a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py index 84e2a5ed..0d33e5ac 100644 --- a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py +++ b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py @@ -51,6 +51,7 @@ def init_checks(self): self.subset_run_validation, self.open_mpi_processes, # CHKPT-01 (Task 2a) self.cache_flush_validation, # CHKPT-02 (Task 2a) + self.checkpoint_invocation_structure, # 4.7.1 strict structural enforcement self.total_test_duration, # CHKPT-03 (Task 2a) self.remapping_time_reporting, # CHKPT-04 (Task 2b) self.simultaneous_rw_support, # CHKPT-05 (Task 2b) @@ -617,6 +618,106 @@ def cache_flush_validation(self): valid = False return valid + @rule("4.7.1", "checkpointCacheFlushValidation") + def checkpoint_invocation_structure(self): + """Enforce the strict invocation structure for CLOSED checkpointing (Rules.md 4.7.1). + + A CLOSED submission must consist of either: + - a single invocation with --num-checkpoints-write=10 AND + --num-checkpoints-read=10; or + - exactly two invocations: a write phase (10/0) followed by a read + phase (0/10), with the read phase starting AFTER the write phase + has ended (no overlap). + + The 30-second upper bound on the inter-phase gap is enforced separately + by ``cache_flush_validation``. OPEN submissions may use any non-negative + integer for --num-checkpoints-* per Rules.md Table 3, so this check is + a no-op for them. + """ + valid = True + if self.mode != "checkpointing": + return valid + + closed_runs = [ + (summary, metadata, ts) + for summary, metadata, ts in self._iter_valid_files() + if (metadata or {}).get("verification") == "closed" + ] + if not closed_runs: + return valid + + REQUIRED = 10 + + def _wr(metadata): + args = (metadata or {}).get("args", {}) or {} + return ( + int(args.get("num_checkpoints_write", 0) or 0), + int(args.get("num_checkpoints_read", 0) or 0), + ) + + if len(closed_runs) == 1: + _summary, metadata, ts = closed_runs[0] + writes, reads = _wr(metadata) + if (writes, reads) != (REQUIRED, REQUIRED): + self.log_violation( + "4.7.1", "checkpointCacheFlushValidation", self.path, + "single-invocation CLOSED submission must use " + "--num-checkpoints-write=10 AND --num-checkpoints-read=10 " + "(got writes=%d, reads=%d in %s)", + writes, reads, ts, + ) + valid = False + return valid + + if len(closed_runs) == 2: + # Sort by ts (YYYYMMDD_HHmmss is lexicographically chronological). + ordered = sorted(closed_runs, key=lambda e: e[2]) + first_summary, first_md, first_ts = ordered[0] + second_summary, second_md, second_ts = ordered[1] + + first_w, first_r = _wr(first_md) + second_w, second_r = _wr(second_md) + + if (first_w, first_r) != (REQUIRED, 0) or (second_w, second_r) != (0, REQUIRED): + self.log_violation( + "4.7.1", "checkpointCacheFlushValidation", self.path, + "two-invocation CLOSED submission must be write-phase (10/0) " + "followed by read-phase (0/10); got first=(writes=%d,reads=%d) " + "in %s, second=(writes=%d,reads=%d) in %s", + first_w, first_r, first_ts, second_w, second_r, second_ts, + ) + valid = False + return valid + + # Overlap check: read phase must start after write phase ends. + write_end = (first_summary or {}).get("end_time") or (first_summary or {}).get("end") + read_start = (second_summary or {}).get("start_time") or (second_summary or {}).get("start") + if write_end and read_start: + try: + gap_seconds = _parse_iso_gap(write_end, read_start) + except (ValueError, TypeError): + # Parse errors are already reported by cache_flush_validation; + # don't double-log. + return valid + if gap_seconds < 0: + self.log_violation( + "4.7.1", "checkpointCacheFlushValidation", self.path, + "read phase started %.1fs before write phase ended; " + "the two phases must not overlap (write_end=%s, read_start=%s)", + -gap_seconds, write_end, read_start, + ) + valid = False + return valid + + # 3+ invocations — disallowed. + self.log_violation( + "4.7.1", "checkpointCacheFlushValidation", self.path, + "CLOSED checkpointing submission must consist of 1 or 2 invocations " + "(got %d). See Rules.md 4.7.1.", + len(closed_runs), + ) + return False + @rule("4.7.2", "checkpointTotalTestDuration") def total_test_duration(self): """Compute and log total checkpoint test duration. (Rules.md 4.7.2) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 24d1b7ac..7e896e0e 100755 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -26,6 +26,7 @@ add_history_arguments, add_universal_arguments, add_mpi_arguments, + validate_checkpointing_arguments, HELP_MESSAGES, PROGRAM_DESCRIPTIONS, ) @@ -441,15 +442,40 @@ def test_closed_mode_rejects_loops(self, parser): with pytest.raises(SystemExit): parser.parse_args(self.RUN_ARGS + ['--loops', '3']) - def test_closed_mode_rejects_num_checkpoints_read(self, parser): - """Closed checkpointing must reject --num-checkpoints-read.""" + def test_closed_mode_accepts_num_checkpoints_read_zero(self, parser): + """Closed checkpointing accepts --num-checkpoints-read=0 (read-only split).""" + args = parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-read', '0']) + assert args.num_checkpoints_read == 0 + assert args.num_checkpoints_write == 10 + + def test_closed_mode_accepts_num_checkpoints_write_zero(self, parser): + """Closed checkpointing accepts --num-checkpoints-write=0 (write-only split).""" + args = parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-write', '0']) + assert args.num_checkpoints_write == 0 + assert args.num_checkpoints_read == 10 + + def test_closed_mode_rejects_arbitrary_num_checkpoints_read(self, parser): + """validate_checkpointing_arguments must reject non-{10,0} read counts in closed mode.""" + args = parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-read', '20']) + args.mode = 'closed' with pytest.raises(SystemExit): - parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-read', '20']) + validate_checkpointing_arguments(args) - def test_closed_mode_rejects_num_checkpoints_write(self, parser): - """Closed checkpointing must reject --num-checkpoints-write.""" + def test_closed_mode_rejects_arbitrary_num_checkpoints_write(self, parser): + """validate_checkpointing_arguments must reject non-{10,0} write counts in closed mode.""" + args = parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-write', '20']) + args.mode = 'closed' + with pytest.raises(SystemExit): + validate_checkpointing_arguments(args) + + def test_closed_mode_rejects_both_zero(self, parser): + """validate_checkpointing_arguments must reject write=0 and read=0 together in closed mode.""" + args = parser.parse_args( + self.RUN_ARGS + ['--num-checkpoints-write', '0', '--num-checkpoints-read', '0'] + ) + args.mode = 'closed' with pytest.raises(SystemExit): - parser.parse_args(self.RUN_ARGS + ['--num-checkpoints-write', '20']) + validate_checkpointing_arguments(args) def test_closed_mode_namespace_has_open_defaults(self, parser): """Closed-mode parse must supply all open-gated attrs via set_defaults."""