#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Workflow Phase Extraction. Extracts step-level content from .trellis/workflow.md and optionally filters platform-specific blocks. Platform marker syntax in workflow.md: [Claude Code, Cursor, ...] agent-capable content [/Claude Code, Cursor, ...] Provides: get_phase_index - Extract the Phase Index section (no --step) get_step - Extract a single step (#### X.X) section filter_platform - Strip platform blocks that don't include the given name """ from __future__ import annotations import re from .paths import DIR_WORKFLOW, get_repo_root def _workflow_md_path(): return get_repo_root() / DIR_WORKFLOW / "workflow.md" # Match a line that *is* a platform marker: "[A, B, C]" or "[/A, B, C]" _MARKER_RE = re.compile(r"^\[(/?)([A-Za-z][^\[\]]*)\]\s*$") # Step heading: "#### 1.0 Title" or "#### 1.0 ..." _STEP_HEADING_RE = re.compile(r"^####\s+(\d+\.\d+)\b.*$") # Phase Index starts here; Phase 1/2/3 step bodies follow; ends at Breadcrumbs. _PHASE_INDEX_HEADING = "## Phase Index" def _read_workflow() -> str: path = _workflow_md_path() if not path.exists(): raise FileNotFoundError(f"workflow.md not found: {path}") return path.read_text(encoding="utf-8") def _parse_marker(line: str) -> tuple[bool, list[str]] | None: """Parse a platform marker line. Returns: (is_closing, [platform_names]) if line is a marker, else None. """ m = _MARKER_RE.match(line) if not m: return None is_closing = m.group(1) == "/" names = [p.strip() for p in m.group(2).split(",") if p.strip()] return is_closing, names def get_phase_index() -> str: """Return Phase Index + Phase 1/2/3 step bodies from workflow.md. Matches what the SessionStart hook injects into the `` block: starts at `## Phase Index`, continues through `## Phase 1: Plan`, `## Phase 2: Execute`, `## Phase 3: Finish`, stops at `## Workflow State Breadcrumbs` (consumed by UserPromptSubmit hook). """ text = _read_workflow() lines = text.splitlines() start: int | None = None end: int | None = None for i, line in enumerate(lines): stripped = line.strip() if start is None and stripped == _PHASE_INDEX_HEADING: start = i continue if start is not None and stripped == "## Workflow State Breadcrumbs": end = i break if start is None: return "" if end is None: end = len(lines) return "\n".join(lines[start:end]).rstrip() + "\n" def get_step(step_id: str) -> str: """Return the `#### X.X` section matching step_id (header + body). Body ends at the next `####` or `---` or `##` heading (whichever comes first). """ text = _read_workflow() lines = text.splitlines() start: int | None = None for i, line in enumerate(lines): m = _STEP_HEADING_RE.match(line) if m and m.group(1) == step_id: start = i break if start is None: return "" end: int = len(lines) for j in range(start + 1, len(lines)): line = lines[j] if line.startswith("#### "): end = j break if line.startswith("## "): end = j break # Horizontal rule at column 0 if line.strip() == "---": end = j break return "\n".join(lines[start:end]).rstrip() + "\n" def _platform_matches(platform: str, block_names: list[str]) -> bool: """Case-insensitive fuzzy match: accept 'cursor', 'Cursor', 'claude-code', 'Claude Code'.""" needle = platform.lower().replace("-", "").replace("_", "").replace(" ", "") for name in block_names: hay = name.lower().replace("-", "").replace("_", "").replace(" ", "") if needle == hay: return True return False def filter_platform(content: str, platform: str) -> str: """Keep lines outside any `[...]` block + lines inside blocks that include platform. Marker lines themselves are dropped from the output. """ lines = content.splitlines() out: list[str] = [] in_block = False keep_block = False for line in lines: marker = _parse_marker(line) if marker is not None: is_closing, names = marker if not is_closing: in_block = True keep_block = _platform_matches(platform, names) else: in_block = False keep_block = False continue # drop the marker line itself if in_block: if keep_block: out.append(line) continue out.append(line) # Collapse runs of 3+ blank lines that may arise from dropped markers collapsed: list[str] = [] blank_run = 0 for line in out: if line.strip() == "": blank_run += 1 if blank_run <= 2: collapsed.append(line) else: blank_run = 0 collapsed.append(line) return "\n".join(collapsed).rstrip() + "\n"