Files
Book-management-system/.trellis/scripts/common/workflow_phase.py
T
2026-04-27 18:40:30 +08:00

177 lines
5.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Workflow Phase Extraction.
Extracts step-level content from .trellis/workflow.md and optionally filters
platform-specific blocks.
Platform marker syntax in workflow.md:
[Claude Code, Cursor, ...]
agent-capable content
[/Claude Code, Cursor, ...]
Provides:
get_phase_index - Extract the Phase Index section (no --step)
get_step - Extract a single step (#### X.X) section
filter_platform - Strip platform blocks that don't include the given name
"""
from __future__ import annotations
import re
from .paths import DIR_WORKFLOW, get_repo_root
def _workflow_md_path():
return get_repo_root() / DIR_WORKFLOW / "workflow.md"
# Match a line that *is* a platform marker: "[A, B, C]" or "[/A, B, C]"
_MARKER_RE = re.compile(r"^\[(/?)([A-Za-z][^\[\]]*)\]\s*$")
# Step heading: "#### 1.0 Title" or "#### 1.0 ..."
_STEP_HEADING_RE = re.compile(r"^####\s+(\d+\.\d+)\b.*$")
# Phase Index starts here; Phase 1/2/3 step bodies follow; ends at Breadcrumbs.
_PHASE_INDEX_HEADING = "## Phase Index"
def _read_workflow() -> str:
path = _workflow_md_path()
if not path.exists():
raise FileNotFoundError(f"workflow.md not found: {path}")
return path.read_text(encoding="utf-8")
def _parse_marker(line: str) -> tuple[bool, list[str]] | None:
"""Parse a platform marker line.
Returns:
(is_closing, [platform_names]) if line is a marker, else None.
"""
m = _MARKER_RE.match(line)
if not m:
return None
is_closing = m.group(1) == "/"
names = [p.strip() for p in m.group(2).split(",") if p.strip()]
return is_closing, names
def get_phase_index() -> str:
"""Return Phase Index + Phase 1/2/3 step bodies from workflow.md.
Matches what the SessionStart hook injects into the `<workflow>` block:
starts at `## Phase Index`, continues through `## Phase 1: Plan`,
`## Phase 2: Execute`, `## Phase 3: Finish`, stops at
`## Workflow State Breadcrumbs` (consumed by UserPromptSubmit hook).
"""
text = _read_workflow()
lines = text.splitlines()
start: int | None = None
end: int | None = None
for i, line in enumerate(lines):
stripped = line.strip()
if start is None and stripped == _PHASE_INDEX_HEADING:
start = i
continue
if start is not None and stripped == "## Workflow State Breadcrumbs":
end = i
break
if start is None:
return ""
if end is None:
end = len(lines)
return "\n".join(lines[start:end]).rstrip() + "\n"
def get_step(step_id: str) -> str:
"""Return the `#### X.X` section matching step_id (header + body).
Body ends at the next `####` or `---` or `##` heading (whichever comes first).
"""
text = _read_workflow()
lines = text.splitlines()
start: int | None = None
for i, line in enumerate(lines):
m = _STEP_HEADING_RE.match(line)
if m and m.group(1) == step_id:
start = i
break
if start is None:
return ""
end: int = len(lines)
for j in range(start + 1, len(lines)):
line = lines[j]
if line.startswith("#### "):
end = j
break
if line.startswith("## "):
end = j
break
# Horizontal rule at column 0
if line.strip() == "---":
end = j
break
return "\n".join(lines[start:end]).rstrip() + "\n"
def _platform_matches(platform: str, block_names: list[str]) -> bool:
"""Case-insensitive fuzzy match: accept 'cursor', 'Cursor', 'claude-code', 'Claude Code'."""
needle = platform.lower().replace("-", "").replace("_", "").replace(" ", "")
for name in block_names:
hay = name.lower().replace("-", "").replace("_", "").replace(" ", "")
if needle == hay:
return True
return False
def filter_platform(content: str, platform: str) -> str:
"""Keep lines outside any `[...]` block + lines inside blocks that include platform.
Marker lines themselves are dropped from the output.
"""
lines = content.splitlines()
out: list[str] = []
in_block = False
keep_block = False
for line in lines:
marker = _parse_marker(line)
if marker is not None:
is_closing, names = marker
if not is_closing:
in_block = True
keep_block = _platform_matches(platform, names)
else:
in_block = False
keep_block = False
continue # drop the marker line itself
if in_block:
if keep_block:
out.append(line)
continue
out.append(line)
# Collapse runs of 3+ blank lines that may arise from dropped markers
collapsed: list[str] = []
blank_run = 0
for line in out:
if line.strip() == "":
blank_run += 1
if blank_run <= 2:
collapsed.append(line)
else:
blank_run = 0
collapsed.append(line)
return "\n".join(collapsed).rstrip() + "\n"