forked from EndogenAI/dogma
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate_session_state.py
More file actions
287 lines (230 loc) · 9.97 KB
/
Copy pathvalidate_session_state.py
File metadata and controls
287 lines (230 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""scripts/validate_session_state.py
Validator for Phase/gate transitions in scratchpad session files, and YAML
phase-status block parser for structured session state tracking.
Purpose:
Two modes of operation:
1. FSM validation (default): Enforce proper sequencing of phase execution
and review gates. Detects phase skipping, missing review gates between
domains, and FSM violations.
2. YAML phase-status (--yaml-state): Parse the ## Session State YAML block
written by prune_scratchpad.py --init, validate its structure, and print
a human-readable phase status table.
Checks (FSM mode):
1. All phases follow numerically (no skipping: 1→2→3, not 1→3).
2. Each Phase N is followed by a Review gate before proceeding to next domain.
3. No duplicate phases.
4. Session contains at least Phase 1 (session started).
YAML block schema (--yaml-state, Candidate C extended):
branch: string
date: string or null (optional, ISO date)
active_phase: string or null
active_issues: list (optional, GitHub issue numbers)
blockers: list (optional, open blocker strings)
last_agent: string or null (optional, last delegated agent)
phases: list of {name: str, status: str, commit: str}
Inputs:
[file ...] Path to session .md file (positional, optional).
--yaml-state Parse and display the ## Session State YAML block.
Outputs:
stdout: Human-readable pass/fail summary (FSM) or phase table (--yaml-state).
Exit codes:
0 All checks passed / YAML block valid.
1 One or more checks failed / YAML block missing or malformed.
Usage examples:
uv run python scripts/validate_session_state.py .tmp/branch/2026-03-11.md
uv run python scripts/validate_session_state.py --yaml-state .tmp/branch/2026-03-16.md
"""
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
def extract_yaml_state_block(text: str) -> str | None:
"""
Find and extract the YAML content inside the ## Session State fenced block.
Looks for a pattern of the form:
## Session State
```yaml
...
```
Returns the raw YAML content string (without the fence), or None if not found.
"""
pattern = re.compile(
r"^## Session State\s*\n+```yaml\n(.*?)```",
re.MULTILINE | re.DOTALL,
)
match = pattern.search(text)
return match.group(1) if match else None
def parse_yaml_block(yaml_content: str) -> tuple[dict | None, str | None]:
"""
Parse YAML content and validate required keys.
Expected schema (Candidate C extended):
branch: string
date: string or null (optional)
active_phase: string or null
active_issues: list (optional)
blockers: list (optional)
last_agent: string or null (optional)
phases: list of {name, status, commit}
Returns:
(data_dict, error_message) — error_message is None on success.
"""
try:
import yaml # type: ignore[import-untyped]
data = yaml.safe_load(yaml_content)
except Exception as exc:
return None, f"YAML parse error: {exc}"
if not isinstance(data, dict):
return None, "Session State block must be a YAML mapping"
for key in ("branch", "active_phase", "phases"):
if key not in data:
return None, f"Missing required key: {key}"
if not isinstance(data["branch"], str) or not data["branch"]:
return None, "'branch' must be a non-empty string"
if data["active_phase"] is not None and not isinstance(data["active_phase"], str):
return None, "'active_phase' must be a string or null"
if not isinstance(data["phases"], list):
return None, "'phases' must be a YAML list"
for i, phase in enumerate(data["phases"]):
if not isinstance(phase, dict):
return None, f"phases[{i}] must be a YAML mapping"
if "name" not in phase:
return None, f"phases[{i}] is missing required field 'name'"
# Validate optional Candidate C fields when present
if "date" in data and data["date"] is not None and not isinstance(data["date"], str):
return None, "'date' must be a string or null"
if "active_issues" in data and not isinstance(data["active_issues"], list):
return None, "'active_issues' must be a list"
if "blockers" in data and not isinstance(data["blockers"], list):
return None, "'blockers' must be a list"
if "last_agent" in data and data["last_agent"] is not None and not isinstance(data["last_agent"], str):
return None, "'last_agent' must be a string or null"
return data, None
def display_phase_table(data: dict) -> None:
"""Print a human-readable phase status table from parsed YAML state data."""
branch = data.get("branch") or "(unknown)"
active_phase = data.get("active_phase")
session_date = data.get("date")
active_issues = data.get("active_issues") or []
blockers = data.get("blockers") or []
last_agent = data.get("last_agent")
phases = data.get("phases", [])
print(f"Branch: {branch}")
if session_date:
print(f"Date: {session_date}")
print(f"Active phase: {active_phase or '(none)'}")
if last_agent:
print(f"Last agent: {last_agent}")
if active_issues:
print(f"Active issues: {', '.join(str(i) for i in active_issues)}")
if blockers:
print("Blockers:")
for blocker in blockers:
print(f" - {blocker}")
print("Phases:")
if not phases:
print(" (none)")
return
for phase in phases:
if not isinstance(phase, dict):
continue
name = phase.get("name", "(unnamed)")
status = phase.get("status", "(unknown)")
commit = phase.get("commit") or ""
commit_str = f" {commit}" if commit else ""
print(f" {name:<30} [{status}]{commit_str}")
def validate_yaml_state(file_path: Path) -> tuple[bool, str]:
"""
Parse and validate the ## Session State YAML block in a scratchpad file.
Returns:
(success: bool, message: str)
"""
if not file_path.exists():
return False, f"File not found: {file_path}"
text = file_path.read_text(encoding="utf-8")
yaml_content = extract_yaml_state_block(text)
if yaml_content is None:
return False, "## Session State block not found in scratchpad"
data, error = parse_yaml_block(yaml_content)
if error:
return False, f"Invalid Session State block: {error}"
display_phase_table(data)
return True, "OK"
def validate(file_path: Path) -> tuple[bool, list[str]]:
"""
Validate session state FSM (phase sequencing and review gates).
Returns:
(passed, list_of_failure_messages)
"""
failures: list[str] = []
if not file_path.exists():
return False, [f"File not found: {file_path}"]
text = file_path.read_text(encoding="utf-8")
# --- Check 1: Extract all phase numbers ---
phase_pattern = re.compile(r"^###\s+Phase\s+(\d+)", re.MULTILINE)
# Domain-only phases: exclude headings that contain "Review" after the number.
domain_pattern = re.compile(r"^###\s+Phase\s+(\d+)(?!.*\bReview\b)", re.MULTILINE)
domain_phase_nums = [int(m) for m in domain_pattern.findall(text)]
# Check for duplicates among domain phases before de-duplicating
seen: set[int] = set()
for num in domain_phase_nums:
if num in seen:
failures.append(f"Duplicate Phase {num} heading found")
seen.add(num)
phases = sorted(set(int(m) for m in phase_pattern.findall(text)))
if not phases:
failures.append("No phases found (expected ### Phase N headings)")
return len(failures) == 0, failures
# --- Check 2: Phases must start at 1 ---
if phases[0] != 1:
failures.append(f"First phase must be Phase 1, not Phase {phases[0]}")
# --- Check 3: Phases must be sequential (no skipping) ---
for i, phase_num in enumerate(phases, 1):
if phase_num != i:
failures.append(f"Phase sequence broken at position {i}: expected Phase {i}, got Phase {phase_num}")
# --- Check 4: Review gates between domains ---
# Each phase should have a "## Phase N Review" heading or similar
for phase_num in phases:
review_pattern = re.compile(rf"^###\s+Phase\s+{phase_num}\s+.*?Review", re.MULTILINE | re.IGNORECASE)
if not review_pattern.search(text):
failures.append(
f"Missing review gate marker for Phase {phase_num} "
f"(expected '### Phase {phase_num} — ... Review' or similar)"
)
return len(failures) == 0, failures
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Validate Phase/gate transitions in session files")
parser.add_argument(
"files",
nargs="*",
help="Path to session .md file(s) to validate",
)
parser.add_argument(
"--yaml-state",
action="store_true",
help="Parse and display the ## Session State YAML block; exit 1 if missing or malformed",
)
args = parser.parse_args(argv)
if not args.files:
print("Please provide at least one session file to validate", file=sys.stderr)
return 1
files_to_check = [Path(f) for f in args.files]
overall_exit_code = 0
for file_path in files_to_check:
if args.yaml_state:
success, message = validate_yaml_state(file_path)
if not success:
print(f"ERROR: {message}", file=sys.stderr)
overall_exit_code = 1
else:
passed, messages = validate(file_path)
if not passed:
overall_exit_code = 1
print(f"{file_path}:")
for msg in messages:
print(f" ✗ {msg}")
else:
print(f"{file_path}: ✓ OK")
return overall_exit_code
if __name__ == "__main__":
sys.exit(main())