Initial commit

This commit is contained in:
Zzzz
2026-04-27 18:40:30 +08:00
commit 2120774b05
112 changed files with 12308 additions and 0 deletions
+84
View File
@@ -0,0 +1,84 @@
"""
Common utilities for Trellis workflow scripts.
This module provides shared functionality used by other Trellis scripts.
"""
import io
import sys
# =============================================================================
# Windows Encoding Fix (MUST be at top, before any other output)
# =============================================================================
# On Windows, stdout defaults to the system code page (often GBK/CP936).
# This causes UnicodeEncodeError when printing non-ASCII characters.
#
# Any script that imports from common will automatically get this fix.
# =============================================================================
def _configure_stream(stream: object) -> object:
"""Configure a stream for UTF-8 encoding on Windows."""
# Try reconfigure() first (Python 3.7+, more reliable)
if hasattr(stream, "reconfigure"):
stream.reconfigure(encoding="utf-8", errors="replace") # type: ignore[union-attr]
return stream
# Fallback: detach and rewrap with TextIOWrapper
elif hasattr(stream, "detach"):
return io.TextIOWrapper(
stream.detach(), # type: ignore[union-attr]
encoding="utf-8",
errors="replace",
)
return stream
if sys.platform == "win32":
sys.stdout = _configure_stream(sys.stdout) # type: ignore[assignment]
sys.stderr = _configure_stream(sys.stderr) # type: ignore[assignment]
sys.stdin = _configure_stream(sys.stdin) # type: ignore[assignment]
def configure_encoding() -> None:
"""
Configure stdout/stderr/stdin for UTF-8 encoding on Windows.
This is automatically called when importing from common,
but can be called manually for scripts that don't import common.
Safe to call multiple times.
"""
global sys
if sys.platform == "win32":
sys.stdout = _configure_stream(sys.stdout) # type: ignore[assignment]
sys.stderr = _configure_stream(sys.stderr) # type: ignore[assignment]
sys.stdin = _configure_stream(sys.stdin) # type: ignore[assignment]
from .paths import (
DIR_WORKFLOW,
DIR_WORKSPACE,
DIR_TASKS,
DIR_ARCHIVE,
DIR_SPEC,
DIR_SCRIPTS,
FILE_DEVELOPER,
FILE_CURRENT_TASK,
FILE_TASK_JSON,
FILE_JOURNAL_PREFIX,
get_repo_root,
get_developer,
check_developer,
get_tasks_dir,
get_workspace_dir,
get_active_journal_file,
count_lines,
get_current_task,
get_current_task_abs,
normalize_task_ref,
resolve_task_ref,
set_current_task,
clear_current_task,
has_current_task,
generate_task_date_prefix,
)
+776
View File
@@ -0,0 +1,776 @@
"""
CLI Adapter for Multi-Platform Support.
Abstracts differences between Claude Code, OpenCode, Cursor, iFlow, Codex, Kilo, Kiro Code, Gemini CLI, Antigravity, Windsurf, Qoder, CodeBuddy, GitHub Copilot, and Factory Droid interfaces.
Supported platforms:
- claude: Claude Code (default)
- opencode: OpenCode
- cursor: Cursor IDE
- iflow: iFlow CLI
- codex: Codex CLI (skills-based)
- kilo: Kilo CLI
- kiro: Kiro Code (skills-based)
- gemini: Gemini CLI
- antigravity: Antigravity (workflow-based)
- windsurf: Windsurf (workflow-based)
- qoder: Qoder
- codebuddy: CodeBuddy
- copilot: GitHub Copilot (VS Code)
- droid: Factory Droid (commands-based)
Usage:
from common.cli_adapter import CLIAdapter
adapter = CLIAdapter("opencode")
cmd = adapter.build_run_command(
agent="dispatch",
session_id="abc123",
prompt="Start the pipeline"
)
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar, Literal
Platform = Literal[
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"windsurf",
"qoder",
"codebuddy",
"copilot",
"droid",
]
@dataclass
class CLIAdapter:
"""Adapter for different AI coding CLI tools."""
platform: Platform
# =========================================================================
# Agent Name Mapping
# =========================================================================
# OpenCode has built-in agents that cannot be overridden
# See: https://github.com/sst/opencode/issues/4271
# Note: Class-level constant, not a dataclass field
_AGENT_NAME_MAP: ClassVar[dict[Platform, dict[str, str]]] = {
"claude": {}, # No mapping needed
"opencode": {
"plan": "trellis-plan", # 'plan' is built-in in OpenCode
},
}
def get_agent_name(self, agent: str) -> str:
"""Get platform-specific agent name.
Args:
agent: Original agent name (e.g., 'plan', 'dispatch')
Returns:
Platform-specific agent name (e.g., 'trellis-plan' for OpenCode)
"""
mapping = self._AGENT_NAME_MAP.get(self.platform, {})
return mapping.get(agent, agent)
# =========================================================================
# Agent Path
# =========================================================================
@property
def config_dir_name(self) -> str:
"""Get platform-specific config directory name.
Returns:
Directory name ('.claude', '.opencode', '.cursor', '.iflow', '.codex', '.kilocode', '.kiro', '.gemini', '.agent', '.windsurf', '.qoder', or '.codebuddy')
"""
if self.platform == "opencode":
return ".opencode"
elif self.platform == "cursor":
return ".cursor"
elif self.platform == "iflow":
return ".iflow"
elif self.platform == "codex":
return ".codex"
elif self.platform == "kilo":
return ".kilocode"
elif self.platform == "kiro":
return ".kiro"
elif self.platform == "gemini":
return ".gemini"
elif self.platform == "antigravity":
return ".agent"
elif self.platform == "windsurf":
return ".windsurf"
elif self.platform == "qoder":
return ".qoder"
elif self.platform == "codebuddy":
return ".codebuddy"
elif self.platform == "copilot":
return ".github/copilot"
elif self.platform == "droid":
return ".factory"
else:
return ".claude"
def get_config_dir(self, project_root: Path) -> Path:
"""Get platform-specific config directory.
Args:
project_root: Project root directory
Returns:
Path to config directory (.claude, .opencode, .cursor, .iflow, .codex, .kilocode, .kiro, .gemini, .agent, .windsurf, .qoder, or .codebuddy)
"""
return project_root / self.config_dir_name
def get_agent_path(self, agent: str, project_root: Path) -> Path:
"""Get path to agent definition file.
Args:
agent: Agent name (original, before mapping)
project_root: Project root directory
Returns:
Path to agent definition file (.md for most platforms, .toml for Codex)
"""
mapped_name = self.get_agent_name(agent)
if self.platform == "codex":
return self.get_config_dir(project_root) / "agents" / f"{mapped_name}.toml"
return self.get_config_dir(project_root) / "agents" / f"{mapped_name}.md"
def get_commands_path(self, project_root: Path, *parts: str) -> Path:
"""Get path to commands directory or specific command file.
Args:
project_root: Project root directory
*parts: Additional path parts (e.g., 'trellis', 'finish-work.md')
Returns:
Path to commands directory or file
Note:
Cursor uses prefix naming: .cursor/commands/trellis-<name>.md
Antigravity uses workflow directory: .agent/workflows/<name>.md
Windsurf uses workflow directory: .windsurf/workflows/trellis-<name>.md
Copilot uses prompt files: .github/prompts/<name>.prompt.md
Claude/OpenCode use subdirectory: .claude/commands/trellis/<name>.md
"""
if self.platform == "windsurf":
workflow_dir = self.get_config_dir(project_root) / "workflows"
if not parts:
return workflow_dir
if len(parts) >= 2 and parts[0] == "trellis":
filename = parts[-1]
return workflow_dir / f"trellis-{filename}"
return workflow_dir / Path(*parts)
if self.platform in ("antigravity", "kilo"):
workflow_dir = self.get_config_dir(project_root) / "workflows"
if not parts:
return workflow_dir
if len(parts) >= 2 and parts[0] == "trellis":
filename = parts[-1]
return workflow_dir / filename
return workflow_dir / Path(*parts)
if self.platform == "copilot":
prompts_dir = project_root / ".github" / "prompts"
if not parts:
return prompts_dir
if len(parts) >= 2 and parts[0] == "trellis":
filename = parts[-1]
if filename.endswith(".md"):
filename = filename[:-3]
return prompts_dir / f"{filename}.prompt.md"
return prompts_dir / Path(*parts)
if not parts:
return self.get_config_dir(project_root) / "commands"
# Cursor uses prefix naming instead of subdirectory
if self.platform == "cursor" and len(parts) >= 2 and parts[0] == "trellis":
# Convert trellis/<name>.md to trellis-<name>.md
filename = parts[-1]
return (
self.get_config_dir(project_root) / "commands" / f"trellis-{filename}"
)
return self.get_config_dir(project_root) / "commands" / Path(*parts)
def get_trellis_command_path(self, name: str) -> str:
"""Get relative path to a trellis command file.
Args:
name: Command name without extension (e.g., 'finish-work', 'check')
Returns:
Relative path string for use in JSONL entries
Note:
Cursor: .cursor/commands/trellis-<name>.md
Codex: .agents/skills/trellis-<name>/SKILL.md
Kiro: .kiro/skills/trellis-<name>/SKILL.md
Gemini: .gemini/commands/trellis/<name>.toml
Antigravity: .agent/workflows/<name>.md
Windsurf: .windsurf/workflows/trellis-<name>.md
Others: .{platform}/commands/trellis/<name>.md
"""
if self.platform == "cursor":
return f".cursor/commands/trellis-{name}.md"
elif self.platform == "codex":
# 0.5.0-beta.0 renamed all skill dirs to add the `trellis-` prefix
# (see that release's manifest for the 60+ rename entries).
return f".agents/skills/trellis-{name}/SKILL.md"
elif self.platform == "kiro":
return f".kiro/skills/trellis-{name}/SKILL.md"
elif self.platform == "gemini":
return f".gemini/commands/trellis/{name}.toml"
elif self.platform == "antigravity":
return f".agent/workflows/{name}.md"
elif self.platform == "windsurf":
return f".windsurf/workflows/trellis-{name}.md"
elif self.platform == "kilo":
return f".kilocode/workflows/{name}.md"
elif self.platform == "copilot":
return f".github/prompts/{name}.prompt.md"
elif self.platform == "droid":
return f".factory/commands/trellis/{name}.md"
else:
return f"{self.config_dir_name}/commands/trellis/{name}.md"
# =========================================================================
# Environment Variables
# =========================================================================
def get_non_interactive_env(self) -> dict[str, str]:
"""Get environment variables for non-interactive mode.
Returns:
Dict of environment variables to set
"""
if self.platform == "opencode":
return {"OPENCODE_NON_INTERACTIVE": "1"}
elif self.platform == "iflow":
return {"IFLOW_NON_INTERACTIVE": "1"}
elif self.platform == "codex":
return {"CODEX_NON_INTERACTIVE": "1"}
elif self.platform == "kiro":
return {"KIRO_NON_INTERACTIVE": "1"}
elif self.platform == "gemini":
return {} # Gemini CLI doesn't have a non-interactive env var
elif self.platform == "antigravity":
return {}
elif self.platform == "windsurf":
return {}
elif self.platform == "qoder":
return {}
elif self.platform == "codebuddy":
return {}
elif self.platform == "copilot":
return {}
elif self.platform == "droid":
return {}
else:
return {"CLAUDE_NON_INTERACTIVE": "1"}
# =========================================================================
# CLI Command Building
# =========================================================================
def build_run_command(
self,
agent: str,
prompt: str,
session_id: str | None = None,
skip_permissions: bool = True,
verbose: bool = True,
json_output: bool = True,
) -> list[str]:
"""Build CLI command for running an agent.
Args:
agent: Agent name (will be mapped if needed)
prompt: Prompt to send to the agent
session_id: Optional session ID (Claude Code only for creation)
skip_permissions: Whether to skip permission prompts
verbose: Whether to enable verbose output
json_output: Whether to use JSON output format
Returns:
List of command arguments
"""
mapped_agent = self.get_agent_name(agent)
if self.platform == "opencode":
cmd = ["opencode", "run"]
cmd.extend(["--agent", mapped_agent])
# Note: OpenCode 'run' mode is non-interactive by default
# No equivalent to Claude Code's --dangerously-skip-permissions
# See: https://github.com/anomalyco/opencode/issues/9070
if json_output:
cmd.extend(["--format", "json"])
if verbose:
cmd.extend(["--log-level", "DEBUG", "--print-logs"])
# Note: OpenCode doesn't support --session-id on creation
# Session ID must be extracted from logs after startup
cmd.append(prompt)
elif self.platform == "iflow":
cmd = ["iflow", "-y", "-p"]
cmd.append(f"${mapped_agent} {prompt}")
elif self.platform == "codex":
cmd = ["codex", "exec"]
cmd.append(prompt)
elif self.platform == "kiro":
cmd = ["kiro", "run", prompt]
elif self.platform == "gemini":
cmd = ["gemini"]
cmd.append(prompt)
elif self.platform == "antigravity":
raise ValueError(
"Antigravity workflows are UI slash commands; CLI agent run is not supported."
)
elif self.platform == "windsurf":
raise ValueError(
"Windsurf workflows are UI slash commands; CLI agent run is not supported."
)
elif self.platform == "qoder":
cmd = ["qodercli", "-p", prompt]
elif self.platform == "codebuddy":
raise ValueError(
"CodeBuddy does not support non-interactive mode (no CLI agent)"
)
elif self.platform == "copilot":
raise ValueError(
"GitHub Copilot is IDE-only; CLI agent run is not supported."
)
elif self.platform == "droid":
raise ValueError(
"Factory Droid CLI agent run is not yet supported."
)
else: # claude
cmd = ["claude", "-p"]
cmd.extend(["--agent", mapped_agent])
if session_id:
cmd.extend(["--session-id", session_id])
if skip_permissions:
cmd.append("--dangerously-skip-permissions")
if json_output:
cmd.extend(["--output-format", "stream-json"])
if verbose:
cmd.append("--verbose")
cmd.append(prompt)
return cmd
def build_resume_command(self, session_id: str) -> list[str]:
"""Build CLI command for resuming a session.
Args:
session_id: Session ID to resume (ignored for iFlow)
Returns:
List of command arguments
"""
if self.platform == "opencode":
return ["opencode", "run", "--session", session_id]
elif self.platform == "iflow":
# iFlow uses -c to continue most recent conversation
# session_id is ignored as iFlow doesn't support session IDs
return ["iflow", "-c"]
elif self.platform == "codex":
return ["codex", "resume", session_id]
elif self.platform == "kiro":
return ["kiro", "resume", session_id]
elif self.platform == "gemini":
return ["gemini", "--resume", session_id]
elif self.platform == "antigravity":
raise ValueError(
"Antigravity workflows are UI slash commands; CLI resume is not supported."
)
elif self.platform == "windsurf":
raise ValueError(
"Windsurf workflows are UI slash commands; CLI resume is not supported."
)
elif self.platform == "qoder":
return ["qodercli", "--resume", session_id]
elif self.platform == "codebuddy":
raise ValueError(
"CodeBuddy does not support non-interactive mode (no CLI agent)"
)
elif self.platform == "copilot":
raise ValueError(
"GitHub Copilot is IDE-only; CLI resume is not supported."
)
elif self.platform == "droid":
raise ValueError(
"Factory Droid CLI resume is not yet supported."
)
else:
return ["claude", "--resume", session_id]
def get_resume_command_str(self, session_id: str, cwd: str | None = None) -> str:
"""Get human-readable resume command string.
Args:
session_id: Session ID to resume
cwd: Optional working directory to cd into
Returns:
Command string for display
"""
cmd = self.build_resume_command(session_id)
cmd_str = " ".join(cmd)
if cwd:
return f"cd {cwd} && {cmd_str}"
return cmd_str
# =========================================================================
# Platform Detection Helpers
# =========================================================================
@property
def is_opencode(self) -> bool:
"""Check if platform is OpenCode."""
return self.platform == "opencode"
@property
def is_claude(self) -> bool:
"""Check if platform is Claude Code."""
return self.platform == "claude"
@property
def is_cursor(self) -> bool:
"""Check if platform is Cursor."""
return self.platform == "cursor"
@property
def is_iflow(self) -> bool:
"""Check if platform is iFlow CLI."""
return self.platform == "iflow"
@property
def cli_name(self) -> str:
"""Get CLI executable name.
Note: Cursor doesn't have a CLI tool, returns None-like value.
"""
if self.is_opencode:
return "opencode"
elif self.is_cursor:
return "cursor" # Note: Cursor is IDE-only, no CLI
elif self.platform == "iflow":
return "iflow"
elif self.platform == "kiro":
return "kiro"
elif self.platform == "gemini":
return "gemini"
elif self.platform == "antigravity":
return "agy"
elif self.platform == "windsurf":
return "windsurf"
elif self.platform == "qoder":
return "qodercli"
elif self.platform == "codebuddy":
return "codebuddy"
elif self.platform == "copilot":
return "copilot"
elif self.platform == "droid":
return "droid"
else:
return "claude"
@property
def supports_cli_agents(self) -> bool:
"""Check if platform supports running agents via CLI.
Claude Code, OpenCode, iFlow, and Codex support CLI agent execution.
Cursor is IDE-only and doesn't support CLI agents.
"""
return self.platform in ("claude", "opencode", "iflow", "codex")
@property
def requires_agent_definition_file(self) -> bool:
"""Check if platform requires an agent definition file (.md/.toml) to run.
Claude Code, OpenCode, iFlow: require agent .md files (--agent flag).
Codex: auto-discovers agents from .codex/agents/*.toml, no --agent flag.
"""
return self.platform in ("claude", "opencode", "iflow")
# =========================================================================
# Session ID Handling
# =========================================================================
@property
def supports_session_id_on_create(self) -> bool:
"""Check if platform supports specifying session ID on creation.
Claude Code: Yes (--session-id)
OpenCode: No (auto-generated, extract from logs)
iFlow: No (no session ID support)
"""
return self.platform == "claude"
def extract_session_id_from_log(self, log_content: str) -> str | None:
"""Extract session ID from log output (OpenCode only).
OpenCode generates session IDs in format: ses_xxx
Args:
log_content: Log file content
Returns:
Session ID if found, None otherwise
"""
import re
# OpenCode session ID pattern
match = re.search(r"ses_[a-zA-Z0-9]+", log_content)
if match:
return match.group(0)
return None
# =============================================================================
# Factory Function
# =============================================================================
def get_cli_adapter(platform: str = "claude") -> CLIAdapter:
"""Get CLI adapter for the specified platform.
Args:
platform: Platform name ('claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', 'windsurf', 'qoder', or 'codebuddy')
Returns:
CLIAdapter instance
Raises:
ValueError: If platform is not supported
"""
if platform not in (
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"windsurf",
"qoder",
"codebuddy",
"copilot",
"droid",
):
raise ValueError(
f"Unsupported platform: {platform} (must be 'claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', 'windsurf', 'qoder', 'codebuddy', 'copilot', or 'droid')"
)
return CLIAdapter(platform=platform) # type: ignore
_ALL_PLATFORM_CONFIG_DIRS = (
".claude",
".cursor",
".iflow",
".opencode",
".codex",
".kilocode",
".kiro",
".gemini",
".agent",
".windsurf",
".qoder",
".codebuddy",
".github/copilot",
".factory",
)
"""Platform-specific config directory names used by detect_platform exclusion
checks. `.agents/skills/` is NOT listed here: it is a shared cross-platform
layer (written by Codex, also consumed by Amp/Cline/Warp/etc. via the
agentskills.io standard), not a single-platform signal. Its presence must not
block detection of Kiro, Antigravity, Windsurf, or other platforms."""
def _has_other_platform_dir(project_root: Path, exclude: set[str]) -> bool:
"""Check if any platform config dir exists besides those in *exclude*."""
return any(
(project_root / d).is_dir()
for d in _ALL_PLATFORM_CONFIG_DIRS
if d not in exclude
)
def detect_platform(project_root: Path) -> Platform:
"""Auto-detect platform based on existing config directories.
Detection order:
1. TRELLIS_PLATFORM environment variable (if set)
2. .opencode directory exists → opencode
3. .iflow directory exists → iflow
4. .cursor directory exists (without .claude) → cursor
5. .codex exists and no other platform dirs → codex
6. .kilocode directory exists → kilo
7. .kiro/skills exists and no other platform dirs → kiro
8. .gemini directory exists → gemini
9. .agent/workflows exists and no other platform dirs → antigravity
10. .windsurf/workflows exists and no other platform dirs → windsurf
11. .codebuddy directory exists → codebuddy
12. .qoder directory exists → qoder
13. Default → claude
Args:
project_root: Project root directory
Returns:
Detected platform ('claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', 'windsurf', 'qoder', 'codebuddy', or default 'claude')
"""
import os
# Check environment variable first
env_platform = os.environ.get("TRELLIS_PLATFORM", "").lower()
if env_platform in (
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"windsurf",
"qoder",
"codebuddy",
"copilot",
"droid",
):
return env_platform # type: ignore
# Check for .opencode directory (OpenCode-specific)
if (project_root / ".opencode").is_dir():
return "opencode"
# Check for .iflow directory (iFlow-specific)
if (project_root / ".iflow").is_dir():
return "iflow"
# Check for .cursor directory (Cursor-specific)
# Only detect as cursor if .claude doesn't exist (to avoid confusion)
if (project_root / ".cursor").is_dir() and not (project_root / ".claude").is_dir():
return "cursor"
# Check for .gemini directory (Gemini CLI-specific)
if (project_root / ".gemini").is_dir():
return "gemini"
# Check for .codex directory (Codex-specific)
# .agents/skills/ alone does NOT trigger codex detection (it's a shared standard)
if (project_root / ".codex").is_dir() and not _has_other_platform_dir(
project_root, {".codex", ".agents"}
):
return "codex"
# Check for .kilocode directory (Kilo-specific)
if (project_root / ".kilocode").is_dir():
return "kilo"
# Check for Kiro skills directory only when no other platform config exists
if (project_root / ".kiro" / "skills").is_dir() and not _has_other_platform_dir(
project_root, {".kiro"}
):
return "kiro"
# Check for Antigravity workflow directory only when no other platform config exists
if (
project_root / ".agent" / "workflows"
).is_dir() and not _has_other_platform_dir(
project_root, {".agent", ".gemini"}
):
return "antigravity"
# Check for Windsurf workflow directory only when no other platform config exists
if (
project_root / ".windsurf" / "workflows"
).is_dir() and not _has_other_platform_dir(
project_root, {".windsurf"}
):
return "windsurf"
# Check for .codebuddy directory (CodeBuddy-specific)
if (project_root / ".codebuddy").is_dir():
return "codebuddy"
# Check for .qoder directory (Qoder-specific)
if (project_root / ".qoder").is_dir():
return "qoder"
# Check for .github/copilot directory (GitHub Copilot-specific)
if (project_root / ".github" / "copilot").is_dir():
return "copilot"
# Check for .factory directory (Factory Droid-specific)
if (project_root / ".factory").is_dir():
return "droid"
# Fallback: checkout only has the Codex shared-skills layer
# (.agents/skills/trellis-* dirs) and no explicit platform config dir.
# Happens on fresh clones where .codex/ is gitignored/absent but the
# shared skills were committed to git. Must guard against the case
# where .claude/ or any other platform dir also exists — .agents/skills/
# can legitimately coexist with any platform as a shared consumption
# layer for Amp/Cline/Warp/etc.
agents_skills = project_root / ".agents" / "skills"
if agents_skills.is_dir() and not _has_other_platform_dir(
project_root, set()
):
try:
for entry in agents_skills.iterdir():
if entry.is_dir() and entry.name.startswith("trellis-"):
return "codex"
except OSError:
pass
return "claude"
def get_cli_adapter_auto(project_root: Path) -> CLIAdapter:
"""Get CLI adapter with auto-detected platform.
Args:
project_root: Project root directory
Returns:
CLIAdapter instance for detected platform
"""
platform = detect_platform(project_root)
return CLIAdapter(platform=platform)
+389
View File
@@ -0,0 +1,389 @@
#!/usr/bin/env python3
"""
Trellis configuration reader.
Reads settings from .trellis/config.yaml with sensible defaults.
"""
from __future__ import annotations
import sys
from pathlib import Path
from .paths import DIR_WORKFLOW, get_repo_root
# =============================================================================
# YAML Simple Parser (no dependencies)
# =============================================================================
def _unquote(s: str) -> str:
"""Remove exactly one layer of matching surrounding quotes.
Unlike str.strip('"'), this only removes the outermost pair,
preserving any nested quotes inside the value.
Examples:
_unquote('"hello"') -> 'hello'
_unquote("'hello'") -> 'hello'
_unquote('"echo \\'hi\\'"') -> "echo 'hi'"
_unquote('hello') -> 'hello'
_unquote('"hello\\'') -> '"hello\\'' (mismatched, unchanged)
"""
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
return s[1:-1]
return s
def parse_simple_yaml(content: str) -> dict:
"""Parse simple YAML with nested dict support (no dependencies).
Supports:
- key: value (string)
- key: (followed by list items)
- item1
- item2
- key: (followed by nested dict)
nested_key: value
nested_key2:
- item
Uses indentation to detect nesting (2+ spaces deeper = child).
Args:
content: YAML content string.
Returns:
Parsed dict (values can be str, list[str], or dict).
"""
lines = content.splitlines()
result: dict = {}
_parse_yaml_block(lines, 0, 0, result)
return result
def _parse_yaml_block(
lines: list[str], start: int, min_indent: int, target: dict
) -> int:
"""Parse a YAML block into target dict, returning next line index."""
i = start
current_list: list | None = None
while i < len(lines):
line = lines[i]
stripped = line.strip()
# Skip empty lines and comments
if not stripped or stripped.startswith("#"):
i += 1
continue
# Calculate indentation
indent = len(line) - len(line.lstrip())
# If dedented past our block, we're done
if indent < min_indent:
break
if stripped.startswith("- "):
if current_list is not None:
current_list.append(_unquote(stripped[2:].strip()))
i += 1
elif ":" in stripped:
key, _, value = stripped.partition(":")
key = key.strip()
value = _unquote(value.strip())
current_list = None
if value:
# key: value
target[key] = value
i += 1
else:
# key: (no value) — peek ahead to determine list vs nested dict
next_i, next_line = _next_content_line(lines, i + 1)
if next_i >= len(lines):
target[key] = {}
i = next_i
elif next_line.strip().startswith("- "):
# It's a list
current_list = []
target[key] = current_list
i += 1
else:
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent > indent:
# It's a nested dict
nested: dict = {}
target[key] = nested
i = _parse_yaml_block(lines, i + 1, next_indent, nested)
else:
# Empty value, same or less indent follows
target[key] = {}
i += 1
else:
i += 1
return i
def _next_content_line(lines: list[str], start: int) -> tuple[int, str]:
"""Find the next non-empty, non-comment line."""
i = start
while i < len(lines):
stripped = lines[i].strip()
if stripped and not stripped.startswith("#"):
return i, lines[i]
i += 1
return i, ""
# Defaults
DEFAULT_SESSION_COMMIT_MESSAGE = "chore: record journal"
DEFAULT_MAX_JOURNAL_LINES = 2000
CONFIG_FILE = "config.yaml"
def _is_true_config_value(value: object) -> bool:
"""Return True when a config value represents an enabled flag."""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() == "true"
return False
def _get_config_path(repo_root: Path | None = None) -> Path:
"""Get path to config.yaml."""
root = repo_root or get_repo_root()
return root / DIR_WORKFLOW / CONFIG_FILE
def _load_config(repo_root: Path | None = None) -> dict:
"""Load and parse config.yaml. Returns empty dict on any error."""
config_file = _get_config_path(repo_root)
try:
content = config_file.read_text(encoding="utf-8")
return parse_simple_yaml(content)
except (OSError, IOError):
return {}
def get_session_commit_message(repo_root: Path | None = None) -> str:
"""Get the commit message for auto-committing session records."""
config = _load_config(repo_root)
return config.get("session_commit_message", DEFAULT_SESSION_COMMIT_MESSAGE)
def get_max_journal_lines(repo_root: Path | None = None) -> int:
"""Get the maximum lines per journal file."""
config = _load_config(repo_root)
value = config.get("max_journal_lines", DEFAULT_MAX_JOURNAL_LINES)
try:
return int(value)
except (ValueError, TypeError):
return DEFAULT_MAX_JOURNAL_LINES
def get_hooks(event: str, repo_root: Path | None = None) -> list[str]:
"""Get hook commands for a lifecycle event.
Args:
event: Event name (e.g. "after_create", "after_archive").
repo_root: Repository root path.
Returns:
List of shell commands to execute, empty if none configured.
"""
config = _load_config(repo_root)
hooks = config.get("hooks")
if not isinstance(hooks, dict):
return []
commands = hooks.get(event)
if isinstance(commands, list):
return [str(c) for c in commands]
return []
# =============================================================================
# Monorepo / Packages
# =============================================================================
def get_packages(repo_root: Path | None = None) -> dict[str, dict] | None:
"""Get monorepo package declarations.
Returns:
Dict mapping package name to its config (path, type, etc.),
or None if not configured (single-repo mode).
Example return:
{"cli": {"path": "packages/cli"}, "docs-site": {"path": "docs-site", "type": "submodule"}}
"""
config = _load_config(repo_root)
packages = config.get("packages")
if not isinstance(packages, dict):
return None
# Ensure each value is a dict (filter out scalar entries)
filtered = {k: v for k, v in packages.items() if isinstance(v, dict)}
if not filtered:
return None
return filtered
def get_default_package(repo_root: Path | None = None) -> str | None:
"""Get the default package name from config.
Returns:
Package name string, or None if not configured.
"""
config = _load_config(repo_root)
value = config.get("default_package")
return str(value) if value else None
def get_submodule_packages(repo_root: Path | None = None) -> dict[str, str]:
"""Get packages that are git submodules.
Returns:
Dict mapping package name to its path for submodule-type packages.
Empty dict if none configured.
Example return:
{"docs-site": "docs-site"}
"""
packages = get_packages(repo_root)
if packages is None:
return {}
return {
name: cfg.get("path", name)
for name, cfg in packages.items()
if cfg.get("type") == "submodule"
}
def get_git_packages(repo_root: Path | None = None) -> dict[str, str]:
"""Get packages that have their own independent git repository.
These are sub-directories with their own .git (not submodules),
marked with ``git: true`` in config.yaml.
Returns:
Dict mapping package name to its path for git-repo packages.
Empty dict if none configured.
Example config::
packages:
backend:
path: iqs
git: true
Example return::
{"backend": "iqs"}
"""
packages = get_packages(repo_root)
if packages is None:
return {}
return {
name: cfg.get("path", name)
for name, cfg in packages.items()
if _is_true_config_value(cfg.get("git"))
}
def is_monorepo(repo_root: Path | None = None) -> bool:
"""Check if the project is configured as a monorepo (has packages in config)."""
return get_packages(repo_root) is not None
def get_spec_base(package: str | None = None, repo_root: Path | None = None) -> str:
"""Get the spec directory base path relative to .trellis/.
Single-repo: returns "spec"
Monorepo with package: returns "spec/<package>"
Monorepo without package: returns "spec" (caller should specify package)
"""
if package and is_monorepo(repo_root):
return f"spec/{package}"
return "spec"
def validate_package(package: str, repo_root: Path | None = None) -> bool:
"""Check if a package name is valid in this project.
Single-repo (no packages configured): always returns True.
Monorepo: returns True only if package exists in config.yaml packages.
"""
packages = get_packages(repo_root)
if packages is None:
return True # Single-repo, no validation needed
return package in packages
def resolve_package(
task_package: str | None = None,
repo_root: Path | None = None,
) -> str | None:
"""Resolve package from inferred sources with validation.
Checks in order: task_package → default_package.
Invalid inferred values print a warning to stderr and are skipped.
Returns:
Resolved package name, or None if no valid package found.
Note:
CLI --package should be validated separately by the caller
(fail-fast with available packages list on error).
"""
packages = get_packages(repo_root)
if packages is None:
return None # Single-repo, no package needed
# Try task_package (guard against non-string values from malformed JSON)
if task_package and isinstance(task_package, str):
if task_package in packages:
return task_package
print(
f"Warning: task.json package '{task_package}' not found in config, skipping",
file=sys.stderr,
)
# Try default_package
default = get_default_package(repo_root)
if default:
if default in packages:
return default
print(
f"Warning: default_package '{default}' not found in config, skipping",
file=sys.stderr,
)
return None
def get_spec_scope(repo_root: Path | None = None) -> list[str] | str | None:
"""Get session.spec_scope configuration.
Returns:
list[str]: Package names to include in spec scanning.
str: "active_task" to use current task's package.
None: No scope configured (scan all packages).
"""
config = _load_config(repo_root)
session = config.get("session")
if not isinstance(session, dict):
return None
scope = session.get("spec_scope")
if scope is None:
return None
if isinstance(scope, str):
return scope # e.g. "active_task"
if isinstance(scope, list):
return [str(s) for s in scope]
return None
+190
View File
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
Developer management utilities.
Provides:
init_developer - Initialize developer
ensure_developer - Ensure developer is initialized (exit if not)
show_developer_info - Show developer information
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from .paths import (
DIR_WORKFLOW,
DIR_WORKSPACE,
DIR_TASKS,
FILE_DEVELOPER,
FILE_JOURNAL_PREFIX,
get_repo_root,
get_developer,
check_developer,
)
# =============================================================================
# Developer Initialization
# =============================================================================
def init_developer(name: str, repo_root: Path | None = None) -> bool:
"""Initialize developer.
Creates:
- .trellis/.developer file with developer info
- .trellis/workspace/<name>/ directory structure
- Initial journal file and index.md
Args:
name: Developer name.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success, False on error.
"""
if not name:
print("Error: developer name is required", file=sys.stderr)
return False
if repo_root is None:
repo_root = get_repo_root()
dev_file = repo_root / DIR_WORKFLOW / FILE_DEVELOPER
workspace_dir = repo_root / DIR_WORKFLOW / DIR_WORKSPACE / name
# Create .developer file
initialized_at = datetime.now().isoformat()
try:
dev_file.write_text(
f"name={name}\ninitialized_at={initialized_at}\n",
encoding="utf-8"
)
except (OSError, IOError) as e:
print(f"Error: Failed to create .developer file: {e}", file=sys.stderr)
return False
# Create workspace directory structure
try:
workspace_dir.mkdir(parents=True, exist_ok=True)
except (OSError, IOError) as e:
print(f"Error: Failed to create workspace directory: {e}", file=sys.stderr)
return False
# Create initial journal file
journal_file = workspace_dir / f"{FILE_JOURNAL_PREFIX}1.md"
if not journal_file.exists():
today = datetime.now().strftime("%Y-%m-%d")
journal_content = f"""# Journal - {name} (Part 1)
> AI development session journal
> Started: {today}
---
"""
try:
journal_file.write_text(journal_content, encoding="utf-8")
except (OSError, IOError) as e:
print(f"Error: Failed to create journal file: {e}", file=sys.stderr)
return False
# Create index.md with markers for auto-update
index_file = workspace_dir / "index.md"
if not index_file.exists():
index_content = f"""# Workspace Index - {name}
> Journal tracking for AI development sessions.
---
## Current Status
<!-- @@@auto:current-status -->
- **Active File**: `journal-1.md`
- **Total Sessions**: 0
- **Last Active**: -
<!-- @@@/auto:current-status -->
---
## Active Documents
<!-- @@@auto:active-documents -->
| File | Lines | Status |
|------|-------|--------|
| `journal-1.md` | ~0 | Active |
<!-- @@@/auto:active-documents -->
---
## Session History
<!-- @@@auto:session-history -->
| # | Date | Title | Commits | Branch |
|---|------|-------|---------|--------|
<!-- @@@/auto:session-history -->
---
## Notes
- Sessions are appended to journal files
- New journal file created when current exceeds 2000 lines
- Use `add_session.py` to record sessions
"""
try:
index_file.write_text(index_content, encoding="utf-8")
except (OSError, IOError) as e:
print(f"Error: Failed to create index.md: {e}", file=sys.stderr)
return False
print(f"Developer initialized: {name}")
print(f" .developer file: {dev_file}")
print(f" Workspace dir: {workspace_dir}")
return True
def ensure_developer(repo_root: Path | None = None) -> None:
"""Ensure developer is initialized, exit if not.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
if repo_root is None:
repo_root = get_repo_root()
if not check_developer(repo_root):
print("Error: Developer not initialized.", file=sys.stderr)
print(f"Run: python3 ./{DIR_WORKFLOW}/scripts/init_developer.py <your-name>", file=sys.stderr)
sys.exit(1)
def show_developer_info(repo_root: Path | None = None) -> None:
"""Show developer information.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if not developer:
print("Developer: (not initialized)")
else:
print(f"Developer: {developer}")
print(f"Workspace: {DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/")
print(f"Tasks: {DIR_WORKFLOW}/{DIR_TASKS}/")
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
show_developer_info()
+31
View File
@@ -0,0 +1,31 @@
"""
Git command execution utility.
Single source of truth for running git commands across all Trellis scripts.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def run_git(args: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
"""Run a git command and return (returncode, stdout, stderr).
Uses UTF-8 encoding with -c i18n.logOutputEncoding=UTF-8 to ensure
consistent output across all platforms (Windows, macOS, Linux).
"""
try:
git_args = ["git", "-c", "i18n.logOutputEncoding=UTF-8"] + args
result = subprocess.run(
git_args,
cwd=cwd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
return result.returncode, result.stdout, result.stderr
except Exception as e:
return 1, "", str(e)
+101
View File
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git and Session Context utilities.
Entry shim — delegates to session_context and packages_context.
Provides:
output_json - Output context in JSON format
output_text - Output context in text format
"""
from __future__ import annotations
import json
from .git import run_git
from .session_context import (
get_context_json,
get_context_text,
get_context_record_json,
get_context_text_record,
output_json,
output_text,
)
from .packages_context import (
get_context_packages_text,
get_context_packages_json,
)
from .workflow_phase import (
filter_platform,
get_phase_index,
get_step,
)
# Backward-compatible alias — external modules import this name
_run_git_command = run_git
# =============================================================================
# Main Entry
# =============================================================================
def main() -> None:
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Get Session Context for AI Agent")
parser.add_argument(
"--json",
"-j",
action="store_true",
help="Output in JSON format (works with any --mode)",
)
parser.add_argument(
"--mode",
"-m",
choices=["default", "record", "packages", "phase"],
default="default",
help="Output mode: default (full context), record (for record-session), packages (package info only), phase (workflow step extraction)",
)
parser.add_argument(
"--step",
help="Step id for --mode phase, e.g. 1.1, 2.2. Omit to get the Phase Index.",
)
parser.add_argument(
"--platform",
help="Platform name for --mode phase, e.g. cursor, claude-code. Filters platform-tagged blocks.",
)
args = parser.parse_args()
if args.mode == "record":
if args.json:
print(json.dumps(get_context_record_json(), indent=2, ensure_ascii=False))
else:
print(get_context_text_record())
elif args.mode == "packages":
if args.json:
print(json.dumps(get_context_packages_json(), indent=2, ensure_ascii=False))
else:
print(get_context_packages_text())
elif args.mode == "phase":
content = get_step(args.step) if args.step else get_phase_index()
if not content.strip():
if args.step:
parser.exit(2, f"Step not found: {args.step}\n")
else:
parser.exit(2, "Phase Index section not found in workflow.md\n")
if args.platform:
content = filter_platform(content, args.platform)
print(content, end="")
else:
if args.json:
output_json()
else:
output_text()
if __name__ == "__main__":
main()
+37
View File
@@ -0,0 +1,37 @@
"""
JSON file I/O utilities.
Provides read_json and write_json as the single source of truth
for JSON file operations across all Trellis scripts.
"""
from __future__ import annotations
import json
from pathlib import Path
def read_json(path: Path) -> dict | None:
"""Read and parse a JSON file.
Returns None if the file doesn't exist, is invalid JSON, or can't be read.
"""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
def write_json(path: Path, data: dict) -> bool:
"""Write dict to JSON file with pretty formatting.
Returns True on success, False on error.
"""
try:
path.write_text(
json.dumps(data, indent=2, ensure_ascii=False),
encoding="utf-8",
)
return True
except (OSError, IOError):
return False
+45
View File
@@ -0,0 +1,45 @@
"""
Terminal output utilities: colors and structured logging.
Single source of truth for Colors and log_* functions
used across all Trellis scripts.
"""
from __future__ import annotations
class Colors:
"""ANSI color codes for terminal output."""
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
CYAN = "\033[0;36m"
DIM = "\033[2m"
NC = "\033[0m" # No Color / Reset
def colored(text: str, color: str) -> str:
"""Apply ANSI color to text."""
return f"{color}{text}{Colors.NC}"
def log_info(msg: str) -> None:
"""Print info-level message with [INFO] prefix."""
print(f"{Colors.BLUE}[INFO]{Colors.NC} {msg}")
def log_success(msg: str) -> None:
"""Print success message with [SUCCESS] prefix."""
print(f"{Colors.GREEN}[SUCCESS]{Colors.NC} {msg}")
def log_warn(msg: str) -> None:
"""Print warning message with [WARN] prefix."""
print(f"{Colors.YELLOW}[WARN]{Colors.NC} {msg}")
def log_error(msg: str) -> None:
"""Print error message with [ERROR] prefix."""
print(f"{Colors.RED}[ERROR]{Colors.NC} {msg}")
+238
View File
@@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Package discovery and context output.
Provides:
get_packages_info - Get structured package info
get_packages_section - Build PACKAGES text section
get_context_packages_text - Full packages text output (--mode packages)
get_context_packages_json - Full packages JSON output (--mode packages --json)
"""
from __future__ import annotations
from pathlib import Path
from .config import _is_true_config_value, get_default_package, get_packages, get_spec_scope
from .paths import (
DIR_SPEC,
DIR_WORKFLOW,
get_current_task,
get_repo_root,
)
from .tasks import load_task
# =============================================================================
# Internal Helpers
# =============================================================================
def _scan_spec_layers(spec_dir: Path, package: str | None = None) -> list[str]:
"""Scan spec directory for available layers (subdirectories).
For monorepo: scans spec/<package>/
For single-repo: scans spec/
"""
target = spec_dir / package if package else spec_dir
if not target.is_dir():
return []
return sorted(
d.name for d in target.iterdir() if d.is_dir() and d.name != "guides"
)
def _get_active_task_package(repo_root: Path) -> str | None:
"""Get the package field from the active task's task.json."""
current = get_current_task(repo_root)
if not current:
return None
ct = load_task(repo_root / current)
return ct.package if ct and ct.package else None
def _resolve_scope_set(
packages: dict,
spec_scope,
task_pkg: str | None,
default_pkg: str | None,
) -> set | None:
"""Resolve spec_scope to a set of allowed package names, or None for full scan."""
if not packages:
return None
if spec_scope is None:
return None
if isinstance(spec_scope, str) and spec_scope == "active_task":
if task_pkg and task_pkg in packages:
return {task_pkg}
if default_pkg and default_pkg in packages:
return {default_pkg}
return None
if isinstance(spec_scope, list):
valid = {e for e in spec_scope if e in packages}
if valid:
return valid
# All invalid: fallback
if task_pkg and task_pkg in packages:
return {task_pkg}
if default_pkg and default_pkg in packages:
return {default_pkg}
return None
return None
# =============================================================================
# Public Functions
# =============================================================================
def get_packages_info(repo_root: Path) -> list[dict]:
"""Get structured package info for monorepo projects.
Returns list of dicts with keys: name, path, type, default, specLayers,
isSubmodule, isGitRepo.
Returns empty list for single-repo projects.
"""
packages = get_packages(repo_root)
if not packages:
return []
default_pkg = get_default_package(repo_root)
spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
result = []
for pkg_name, pkg_config in packages.items():
pkg_path = pkg_config.get("path", pkg_name) if isinstance(pkg_config, dict) else str(pkg_config)
pkg_type = pkg_config.get("type", "local") if isinstance(pkg_config, dict) else "local"
pkg_git = pkg_config.get("git", False) if isinstance(pkg_config, dict) else False
layers = _scan_spec_layers(spec_dir, pkg_name)
result.append({
"name": pkg_name,
"path": pkg_path,
"type": pkg_type,
"default": pkg_name == default_pkg,
"specLayers": layers,
"isSubmodule": pkg_type == "submodule",
"isGitRepo": _is_true_config_value(pkg_git),
})
return result
def get_packages_section(repo_root: Path) -> str:
"""Build the PACKAGES section for text output."""
spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
pkg_info = get_packages_info(repo_root)
lines: list[str] = []
lines.append("## PACKAGES")
if not pkg_info:
lines.append("(single-repo mode)")
layers = _scan_spec_layers(spec_dir)
if layers:
lines.append(f"Spec layers: {', '.join(layers)}")
return "\n".join(lines)
default_pkg = get_default_package(repo_root)
for pkg in pkg_info:
layers_str = f" [{', '.join(pkg['specLayers'])}]" if pkg["specLayers"] else ""
submodule_tag = " (submodule)" if pkg["isSubmodule"] else ""
git_repo_tag = " (git repo)" if pkg["isGitRepo"] else ""
default_tag = " *" if pkg["default"] else ""
lines.append(
f"- {pkg['name']:<16} {pkg['path']:<20}{layers_str}{submodule_tag}{git_repo_tag}{default_tag}"
)
if default_pkg:
lines.append(f"Default package: {default_pkg}")
return "\n".join(lines)
def get_context_packages_text(repo_root: Path | None = None) -> str:
"""Get packages context as formatted text (for --mode packages)."""
if repo_root is None:
repo_root = get_repo_root()
pkg_info = get_packages_info(repo_root)
lines: list[str] = []
if not pkg_info:
spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
lines.append("Single-repo project (no packages configured)")
lines.append("")
layers = _scan_spec_layers(spec_dir)
if layers:
lines.append(f"Spec layers: {', '.join(layers)}")
return "\n".join(lines)
# Resolve scope for annotations
packages_dict = get_packages(repo_root) or {}
default_pkg = get_default_package(repo_root)
spec_scope = get_spec_scope(repo_root)
task_pkg = _get_active_task_package(repo_root)
scope_set = _resolve_scope_set(packages_dict, spec_scope, task_pkg, default_pkg)
lines.append("## PACKAGES")
lines.append("")
for pkg in pkg_info:
default_tag = " (default)" if pkg["default"] else ""
type_tag = f" [{pkg['type']}]" if pkg["type"] != "local" else ""
git_tag = " [git repo]" if pkg["isGitRepo"] else ""
# Scope annotation
scope_tag = ""
if scope_set is not None and pkg["name"] not in scope_set:
scope_tag = " (out of scope)"
lines.append(f"### {pkg['name']}{default_tag}{type_tag}{git_tag}{scope_tag}")
lines.append(f"Path: {pkg['path']}")
if pkg["specLayers"]:
lines.append(f"Spec layers: {', '.join(pkg['specLayers'])}")
for layer in pkg["specLayers"]:
lines.append(f" - .trellis/spec/{pkg['name']}/{layer}/index.md")
else:
lines.append("Spec: not configured")
lines.append("")
# Also show shared guides
guides_dir = repo_root / DIR_WORKFLOW / DIR_SPEC / "guides"
if guides_dir.is_dir():
lines.append("### Shared Guides (always included)")
lines.append("Path: .trellis/spec/guides/index.md")
lines.append("")
return "\n".join(lines)
def get_context_packages_json(repo_root: Path | None = None) -> dict:
"""Get packages context as a dictionary (for --mode packages --json)."""
if repo_root is None:
repo_root = get_repo_root()
pkg_info = get_packages_info(repo_root)
if not pkg_info:
spec_dir = repo_root / DIR_WORKFLOW / DIR_SPEC
layers = _scan_spec_layers(spec_dir)
return {
"mode": "single-repo",
"specLayers": layers,
}
default_pkg = get_default_package(repo_root)
spec_scope = get_spec_scope(repo_root)
task_pkg = _get_active_task_package(repo_root)
return {
"mode": "monorepo",
"packages": pkg_info,
"defaultPackage": default_pkg,
"specScope": spec_scope,
"activeTaskPackage": task_pkg,
}
+444
View File
@@ -0,0 +1,444 @@
#!/usr/bin/env python3
"""
Common path utilities for Trellis workflow.
Provides:
get_repo_root - Get repository root directory
get_developer - Get developer name
get_workspace_dir - Get developer workspace directory
get_tasks_dir - Get tasks directory
get_active_journal_file - Get current journal file
"""
from __future__ import annotations
import re
from datetime import datetime
from pathlib import Path
# =============================================================================
# Path Constants (change here to rename directories)
# =============================================================================
# Directory names
DIR_WORKFLOW = ".trellis"
DIR_WORKSPACE = "workspace"
DIR_TASKS = "tasks"
DIR_ARCHIVE = "archive"
DIR_SPEC = "spec"
DIR_SCRIPTS = "scripts"
# File names
FILE_DEVELOPER = ".developer"
FILE_CURRENT_TASK = ".current-task"
FILE_TASK_JSON = "task.json"
FILE_JOURNAL_PREFIX = "journal-"
# =============================================================================
# Repository Root
# =============================================================================
def get_repo_root(start_path: Path | None = None) -> Path:
"""Find the nearest directory containing .trellis/ folder.
This handles nested git repos correctly (e.g., test project inside another repo).
Args:
start_path: Starting directory to search from. Defaults to current directory.
Returns:
Path to repository root, or current directory if no .trellis/ found.
"""
current = (start_path or Path.cwd()).resolve()
while current != current.parent:
if (current / DIR_WORKFLOW).is_dir():
return current
current = current.parent
# Fallback to current directory if no .trellis/ found
return Path.cwd().resolve()
# =============================================================================
# Developer
# =============================================================================
def get_developer(repo_root: Path | None = None) -> str | None:
"""Get developer name from .developer file.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Developer name or None if not initialized.
"""
if repo_root is None:
repo_root = get_repo_root()
dev_file = repo_root / DIR_WORKFLOW / FILE_DEVELOPER
if not dev_file.is_file():
return None
try:
content = dev_file.read_text(encoding="utf-8")
for line in content.splitlines():
if line.startswith("name="):
return line.split("=", 1)[1].strip()
except (OSError, IOError):
pass
return None
def check_developer(repo_root: Path | None = None) -> bool:
"""Check if developer is initialized.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if developer is initialized.
"""
return get_developer(repo_root) is not None
# =============================================================================
# Tasks Directory
# =============================================================================
def get_tasks_dir(repo_root: Path | None = None) -> Path:
"""Get tasks directory path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to tasks directory.
"""
if repo_root is None:
repo_root = get_repo_root()
return repo_root / DIR_WORKFLOW / DIR_TASKS
# =============================================================================
# Workspace Directory
# =============================================================================
def get_workspace_dir(repo_root: Path | None = None) -> Path | None:
"""Get developer workspace directory.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to workspace directory or None if developer not set.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if developer:
return repo_root / DIR_WORKFLOW / DIR_WORKSPACE / developer
return None
# =============================================================================
# Journal File
# =============================================================================
def get_active_journal_file(repo_root: Path | None = None) -> Path | None:
"""Get the current active journal file.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to active journal file or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
workspace_dir = get_workspace_dir(repo_root)
if workspace_dir is None or not workspace_dir.is_dir():
return None
latest: Path | None = None
highest = 0
for f in workspace_dir.glob(f"{FILE_JOURNAL_PREFIX}*.md"):
if not f.is_file():
continue
# Extract number from filename
name = f.stem # e.g., "journal-1"
match = re.search(r"(\d+)$", name)
if match:
num = int(match.group(1))
if num > highest:
highest = num
latest = f
return latest
def count_lines(file_path: Path) -> int:
"""Count lines in a file.
Args:
file_path: Path to file.
Returns:
Number of lines, or 0 if file doesn't exist.
"""
if not file_path.is_file():
return 0
try:
return len(file_path.read_text(encoding="utf-8").splitlines())
except (OSError, IOError):
return 0
# =============================================================================
# Current Task Management
# =============================================================================
def _get_current_task_file(repo_root: Path | None = None) -> Path:
"""Get .current-task file path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to .current-task file.
"""
if repo_root is None:
repo_root = get_repo_root()
return repo_root / DIR_WORKFLOW / FILE_CURRENT_TASK
def normalize_task_ref(task_ref: str) -> str:
"""Normalize a task ref for stable storage in .current-task.
Stored refs should prefer repo-relative POSIX paths like
`.trellis/tasks/03-27-my-task`, even on Windows. Absolute paths are preserved
unless they can later be converted back to repo-relative form by callers.
"""
normalized = task_ref.strip()
if not normalized:
return ""
path_obj = Path(normalized)
if path_obj.is_absolute():
return str(path_obj)
normalized = normalized.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
if normalized.startswith(f"{DIR_TASKS}/"):
return f"{DIR_WORKFLOW}/{normalized}"
return normalized
def resolve_task_ref(task_ref: str, repo_root: Path | None = None) -> Path | None:
"""Resolve a task ref from .current-task to an absolute task directory path."""
if repo_root is None:
repo_root = get_repo_root()
normalized = normalize_task_ref(task_ref)
if not normalized:
return None
path_obj = Path(normalized)
if path_obj.is_absolute():
return path_obj
if normalized.startswith(f"{DIR_WORKFLOW}/"):
return repo_root / path_obj
return repo_root / DIR_WORKFLOW / DIR_TASKS / path_obj
def get_current_task(repo_root: Path | None = None) -> str | None:
"""Get current task directory path (relative to repo_root).
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Relative path to current task directory or None.
"""
current_file = _get_current_task_file(repo_root)
if not current_file.is_file():
return None
try:
content = current_file.read_text(encoding="utf-8").strip()
return normalize_task_ref(content) if content else None
except (OSError, IOError):
return None
def get_current_task_abs(repo_root: Path | None = None) -> Path | None:
"""Get current task directory absolute path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Absolute path to current task directory or None.
"""
if repo_root is None:
repo_root = get_repo_root()
relative = get_current_task(repo_root)
if relative:
return resolve_task_ref(relative, repo_root)
return None
def set_current_task(task_path: str, repo_root: Path | None = None) -> bool:
"""Set current task.
Args:
task_path: Task directory path (relative to repo_root).
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success, False on error.
"""
if repo_root is None:
repo_root = get_repo_root()
normalized = normalize_task_ref(task_path)
if not normalized:
return False
# Verify task directory exists
full_path = resolve_task_ref(normalized, repo_root)
if full_path is None or not full_path.is_dir():
return False
try:
normalized = full_path.relative_to(repo_root).as_posix()
except ValueError:
normalized = str(full_path)
current_file = _get_current_task_file(repo_root)
try:
current_file.write_text(normalized, encoding="utf-8")
return True
except (OSError, IOError):
return False
def clear_current_task(repo_root: Path | None = None) -> bool:
"""Clear current task.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success.
"""
current_file = _get_current_task_file(repo_root)
try:
if current_file.is_file():
current_file.unlink()
return True
except (OSError, IOError):
return False
def has_current_task(repo_root: Path | None = None) -> bool:
"""Check if has current task.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if current task is set.
"""
return get_current_task(repo_root) is not None
# =============================================================================
# Task ID Generation
# =============================================================================
def generate_task_date_prefix() -> str:
"""Generate task ID based on date (MM-DD format).
Returns:
Date prefix string (e.g., "01-21").
"""
return datetime.now().strftime("%m-%d")
# =============================================================================
# Monorepo / Package Paths
# =============================================================================
def get_spec_dir(package: str | None = None, repo_root: Path | None = None) -> Path:
"""Get the spec directory path.
Single-repo: .trellis/spec
Monorepo with package: .trellis/spec/<package>
Uses lazy import to avoid circular dependency with config.py.
"""
if repo_root is None:
repo_root = get_repo_root()
from .config import get_spec_base
base = get_spec_base(package, repo_root)
return repo_root / DIR_WORKFLOW / base
def get_package_path(package: str, repo_root: Path | None = None) -> Path | None:
"""Get a package's source directory absolute path from config.
Returns:
Absolute path to the package directory, or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
from .config import get_packages
packages = get_packages(repo_root)
if not packages or package not in packages:
return None
info = packages[package]
if isinstance(info, dict):
rel_path = info.get("path", package)
else:
rel_path = str(info)
return repo_root / rel_path
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
repo = get_repo_root()
print(f"Repository root: {repo}")
print(f"Developer: {get_developer(repo)}")
print(f"Tasks dir: {get_tasks_dir(repo)}")
print(f"Workspace dir: {get_workspace_dir(repo)}")
print(f"Journal file: {get_active_journal_file(repo)}")
print(f"Current task: {get_current_task(repo)}")
+562
View File
@@ -0,0 +1,562 @@
#!/usr/bin/env python3
"""
Session context generation (default + record modes).
Provides:
get_context_json - JSON output for default mode
get_context_text - Text output for default mode
get_context_record_json - JSON for record mode
get_context_text_record - Text for record mode
output_json - Print JSON
output_text - Print text
"""
from __future__ import annotations
import json
from pathlib import Path
from .config import get_git_packages
from .git import run_git
from .packages_context import get_packages_section
from .tasks import iter_active_tasks, load_task, get_all_statuses, children_progress
from .paths import (
DIR_SCRIPTS,
DIR_SPEC,
DIR_TASKS,
DIR_WORKFLOW,
DIR_WORKSPACE,
count_lines,
get_active_journal_file,
get_current_task,
get_developer,
get_repo_root,
get_tasks_dir,
)
# =============================================================================
# Helpers
# =============================================================================
def _collect_package_git_info(repo_root: Path) -> list[dict]:
"""Collect git status and recent commits for packages with independent git repos.
Only packages marked with ``git: true`` in config.yaml are included.
Returns:
List of dicts with keys: name, path, branch, isClean,
uncommittedChanges, recentCommits.
Empty list if no git-repo packages are configured.
"""
git_pkgs = get_git_packages(repo_root)
if not git_pkgs:
return []
result = []
for pkg_name, pkg_path in git_pkgs.items():
pkg_dir = repo_root / pkg_path
if not (pkg_dir / ".git").exists():
continue
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=pkg_dir)
branch = branch_out.strip() or "unknown"
_, status_out, _ = run_git(["status", "--porcelain"], cwd=pkg_dir)
changes = len([l for l in status_out.splitlines() if l.strip()])
_, log_out, _ = run_git(["log", "--oneline", "-5"], cwd=pkg_dir)
commits = []
for line in log_out.splitlines():
if line.strip():
parts = line.split(" ", 1)
if len(parts) >= 2:
commits.append({"hash": parts[0], "message": parts[1]})
elif len(parts) == 1:
commits.append({"hash": parts[0], "message": ""})
result.append({
"name": pkg_name,
"path": pkg_path,
"branch": branch,
"isClean": changes == 0,
"uncommittedChanges": changes,
"recentCommits": commits,
})
return result
def _append_package_git_context(lines: list[str], package_git_info: list[dict]) -> None:
"""Append Git status and recent commits for package repositories."""
for pkg in package_git_info:
lines.append(f"## GIT STATUS ({pkg['name']}: {pkg['path']})")
lines.append(f"Branch: {pkg['branch']}")
if pkg["isClean"]:
lines.append("Working directory: Clean")
else:
lines.append(
f"Working directory: {pkg['uncommittedChanges']} uncommitted change(s)"
)
lines.append("")
lines.append(f"## RECENT COMMITS ({pkg['name']}: {pkg['path']})")
if pkg["recentCommits"]:
for commit in pkg["recentCommits"]:
lines.append(f"{commit['hash']} {commit['message']}")
else:
lines.append("(no commits)")
lines.append("")
# =============================================================================
# JSON Output
# =============================================================================
def get_context_json(repo_root: Path | None = None) -> dict:
"""Get context as a dictionary.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Context dictionary.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
tasks_dir = get_tasks_dir(repo_root)
journal_file = get_active_journal_file(repo_root)
journal_lines = 0
journal_relative = ""
if journal_file and developer:
journal_lines = count_lines(journal_file)
journal_relative = (
f"{DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/{journal_file.name}"
)
# Git info
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
_, status_out, _ = run_git(["status", "--porcelain"], cwd=repo_root)
git_status_count = len([line for line in status_out.splitlines() if line.strip()])
is_clean = git_status_count == 0
# Recent commits
_, log_out, _ = run_git(["log", "--oneline", "-5"], cwd=repo_root)
commits = []
for line in log_out.splitlines():
if line.strip():
parts = line.split(" ", 1)
if len(parts) >= 2:
commits.append({"hash": parts[0], "message": parts[1]})
elif len(parts) == 1:
commits.append({"hash": parts[0], "message": ""})
# Tasks
tasks = [
{
"dir": t.dir_name,
"name": t.name,
"status": t.status,
"children": list(t.children),
"parent": t.parent,
}
for t in iter_active_tasks(tasks_dir)
]
# Package git repos (independent sub-repositories)
pkg_git_info = _collect_package_git_info(repo_root)
result = {
"developer": developer or "",
"git": {
"branch": branch,
"isClean": is_clean,
"uncommittedChanges": git_status_count,
"recentCommits": commits,
},
"tasks": {
"active": tasks,
"directory": f"{DIR_WORKFLOW}/{DIR_TASKS}",
},
"journal": {
"file": journal_relative,
"lines": journal_lines,
"nearLimit": journal_lines > 1800,
},
}
if pkg_git_info:
result["packageGit"] = pkg_git_info
return result
def output_json(repo_root: Path | None = None) -> None:
"""Output context in JSON format.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
context = get_context_json(repo_root)
print(json.dumps(context, indent=2, ensure_ascii=False))
# =============================================================================
# Text Output
# =============================================================================
def get_context_text(repo_root: Path | None = None) -> str:
"""Get context as formatted text.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Formatted text output.
"""
if repo_root is None:
repo_root = get_repo_root()
lines = []
lines.append("========================================")
lines.append("SESSION CONTEXT")
lines.append("========================================")
lines.append("")
developer = get_developer(repo_root)
# Developer section
lines.append("## DEVELOPER")
if not developer:
lines.append(
f"ERROR: Not initialized. Run: python3 ./{DIR_WORKFLOW}/{DIR_SCRIPTS}/init_developer.py <name>"
)
return "\n".join(lines)
lines.append(f"Name: {developer}")
lines.append("")
# Git status
lines.append("## GIT STATUS")
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
lines.append(f"Branch: {branch}")
_, status_out, _ = run_git(["status", "--porcelain"], cwd=repo_root)
status_lines = [line for line in status_out.splitlines() if line.strip()]
status_count = len(status_lines)
if status_count == 0:
lines.append("Working directory: Clean")
else:
lines.append(f"Working directory: {status_count} uncommitted change(s)")
lines.append("")
lines.append("Changes:")
_, short_out, _ = run_git(["status", "--short"], cwd=repo_root)
for line in short_out.splitlines()[:10]:
lines.append(line)
lines.append("")
# Recent commits
lines.append("## RECENT COMMITS")
_, log_out, _ = run_git(["log", "--oneline", "-5"], cwd=repo_root)
if log_out.strip():
for line in log_out.splitlines():
lines.append(line)
else:
lines.append("(no commits)")
lines.append("")
# Package git repos — independent sub-repositories
_append_package_git_context(lines, _collect_package_git_info(repo_root))
# Current task
lines.append("## CURRENT TASK")
current_task = get_current_task(repo_root)
if current_task:
current_task_dir = repo_root / current_task
lines.append(f"Path: {current_task}")
ct = load_task(current_task_dir)
if ct:
lines.append(f"Name: {ct.name}")
lines.append(f"Status: {ct.status}")
lines.append(f"Created: {ct.raw.get('createdAt', 'unknown')}")
if ct.description:
lines.append(f"Description: {ct.description}")
# Check for prd.md
prd_file = current_task_dir / "prd.md"
if prd_file.is_file():
lines.append("")
lines.append("[!] This task has prd.md - read it for task details")
else:
lines.append("(none)")
lines.append("")
# Active tasks
lines.append("## ACTIVE TASKS")
tasks_dir = get_tasks_dir(repo_root)
task_count = 0
# Collect all task data for hierarchy display
all_tasks = {t.dir_name: t for t in iter_active_tasks(tasks_dir)}
all_statuses = {name: t.status for name, t in all_tasks.items()}
def _print_task_tree(name: str, indent: int = 0) -> None:
nonlocal task_count
t = all_tasks[name]
progress = children_progress(t.children, all_statuses)
prefix = " " * indent
lines.append(f"{prefix}- {name}/ ({t.status}){progress} @{t.assignee or '-'}")
task_count += 1
for child in t.children:
if child in all_tasks:
_print_task_tree(child, indent + 1)
for dir_name in sorted(all_tasks.keys()):
if not all_tasks[dir_name].parent:
_print_task_tree(dir_name)
if task_count == 0:
lines.append("(no active tasks)")
lines.append(f"Total: {task_count} active task(s)")
lines.append("")
# My tasks
lines.append("## MY TASKS (Assigned to me)")
my_task_count = 0
for t in all_tasks.values():
if t.assignee == developer and t.status != "done":
progress = children_progress(t.children, all_statuses)
lines.append(f"- [{t.priority}] {t.title} ({t.status}){progress}")
my_task_count += 1
if my_task_count == 0:
lines.append("(no tasks assigned to you)")
lines.append("")
# Journal file
lines.append("## JOURNAL FILE")
journal_file = get_active_journal_file(repo_root)
if journal_file:
journal_lines = count_lines(journal_file)
relative = f"{DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/{journal_file.name}"
lines.append(f"Active file: {relative}")
lines.append(f"Line count: {journal_lines} / 2000")
if journal_lines > 1800:
lines.append("[!] WARNING: Approaching 2000 line limit!")
else:
lines.append("No journal file found")
lines.append("")
# Packages
packages_text = get_packages_section(repo_root)
if packages_text:
lines.append(packages_text)
lines.append("")
# Paths
lines.append("## PATHS")
lines.append(f"Workspace: {DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/")
lines.append(f"Tasks: {DIR_WORKFLOW}/{DIR_TASKS}/")
lines.append(f"Spec: {DIR_WORKFLOW}/{DIR_SPEC}/")
lines.append("")
lines.append("========================================")
return "\n".join(lines)
# =============================================================================
# Record Mode
# =============================================================================
def get_context_record_json(repo_root: Path | None = None) -> dict:
"""Get record-mode context as a dictionary.
Focused on: my active tasks, git status, current task.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
tasks_dir = get_tasks_dir(repo_root)
# Git info
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
_, status_out, _ = run_git(["status", "--porcelain"], cwd=repo_root)
git_status_count = len([line for line in status_out.splitlines() if line.strip()])
_, log_out, _ = run_git(["log", "--oneline", "-5"], cwd=repo_root)
commits = []
for line in log_out.splitlines():
if line.strip():
parts = line.split(" ", 1)
if len(parts) >= 2:
commits.append({"hash": parts[0], "message": parts[1]})
# My tasks (single pass — collect statuses and filter by assignee)
all_tasks_list = list(iter_active_tasks(tasks_dir))
all_statuses = {t.dir_name: t.status for t in all_tasks_list}
my_tasks = []
for t in all_tasks_list:
if t.assignee == developer:
done = sum(
1 for c in t.children
if all_statuses.get(c) in ("completed", "done")
)
my_tasks.append({
"dir": t.dir_name,
"title": t.title,
"status": t.status,
"priority": t.priority,
"children": list(t.children),
"childrenDone": done,
"parent": t.parent,
"meta": t.meta,
})
# Current task
current_task_info = None
current_task = get_current_task(repo_root)
if current_task:
ct = load_task(repo_root / current_task)
if ct:
current_task_info = {
"path": current_task,
"name": ct.name,
"status": ct.status,
}
# Package git repos
pkg_git_info = _collect_package_git_info(repo_root)
result = {
"developer": developer or "",
"git": {
"branch": branch,
"isClean": git_status_count == 0,
"uncommittedChanges": git_status_count,
"recentCommits": commits,
},
"myTasks": my_tasks,
"currentTask": current_task_info,
}
if pkg_git_info:
result["packageGit"] = pkg_git_info
return result
def get_context_text_record(repo_root: Path | None = None) -> str:
"""Get context as formatted text for record-session mode.
Focused output: MY ACTIVE TASKS first (with [!!!] emphasis),
then GIT STATUS, RECENT COMMITS, CURRENT TASK.
"""
if repo_root is None:
repo_root = get_repo_root()
lines: list[str] = []
lines.append("========================================")
lines.append("SESSION CONTEXT (RECORD MODE)")
lines.append("========================================")
lines.append("")
developer = get_developer(repo_root)
if not developer:
lines.append(
f"ERROR: Not initialized. Run: python3 ./{DIR_WORKFLOW}/{DIR_SCRIPTS}/init_developer.py <name>"
)
return "\n".join(lines)
# MY ACTIVE TASKS — first and prominent
lines.append(f"## [!!!] MY ACTIVE TASKS (Assigned to {developer})")
lines.append("[!] Review whether any should be archived before recording this session.")
lines.append("")
tasks_dir = get_tasks_dir(repo_root)
my_task_count = 0
# Single pass — collect all tasks and filter by assignee
all_statuses = get_all_statuses(tasks_dir)
for t in iter_active_tasks(tasks_dir):
if t.assignee == developer:
progress = children_progress(t.children, all_statuses)
lines.append(f"- [{t.priority}] {t.title} ({t.status}){progress}{t.dir_name}")
my_task_count += 1
if my_task_count == 0:
lines.append("(no active tasks assigned to you)")
lines.append("")
# GIT STATUS
lines.append("## GIT STATUS")
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
lines.append(f"Branch: {branch}")
_, status_out, _ = run_git(["status", "--porcelain"], cwd=repo_root)
status_lines = [line for line in status_out.splitlines() if line.strip()]
status_count = len(status_lines)
if status_count == 0:
lines.append("Working directory: Clean")
else:
lines.append(f"Working directory: {status_count} uncommitted change(s)")
lines.append("")
lines.append("Changes:")
_, short_out, _ = run_git(["status", "--short"], cwd=repo_root)
for line in short_out.splitlines()[:10]:
lines.append(line)
lines.append("")
# RECENT COMMITS
lines.append("## RECENT COMMITS")
_, log_out, _ = run_git(["log", "--oneline", "-5"], cwd=repo_root)
if log_out.strip():
for line in log_out.splitlines():
lines.append(line)
else:
lines.append("(no commits)")
lines.append("")
# Package git repos — independent sub-repositories
_append_package_git_context(lines, _collect_package_git_info(repo_root))
# CURRENT TASK
lines.append("## CURRENT TASK")
current_task = get_current_task(repo_root)
if current_task:
lines.append(f"Path: {current_task}")
ct = load_task(repo_root / current_task)
if ct:
lines.append(f"Name: {ct.name}")
lines.append(f"Status: {ct.status}")
else:
lines.append("(none)")
lines.append("")
lines.append("========================================")
return "\n".join(lines)
def output_text(repo_root: Path | None = None) -> None:
"""Output context in text format.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
print(get_context_text(repo_root))
+223
View File
@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
Task JSONL context management.
Provides:
cmd_add_context - Add entry to JSONL context file
cmd_validate - Validate JSONL context files
cmd_list_context - List JSONL context entries
Note:
``cmd_init_context`` was removed in v0.5.0-beta.12. JSONL context files
are now seeded at ``task.py create`` time with a self-describing
``_example`` line; the AI agent curates real entries during Phase 1.3 of
the workflow. See ``.trellis/workflow.md`` Phase 1.3 for the current
instructions.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from .log import Colors, colored
from .paths import get_repo_root
from .task_utils import resolve_task_dir
# =============================================================================
# Command: add-context
# =============================================================================
def cmd_add_context(args: argparse.Namespace) -> int:
"""Add entry to JSONL context file."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
jsonl_name = args.file
path = args.path
reason = args.reason or "Added manually"
if not target_dir.is_dir():
print(colored(f"Error: Directory not found: {target_dir}", Colors.RED))
return 1
# Support shorthand
if not jsonl_name.endswith(".jsonl"):
jsonl_name = f"{jsonl_name}.jsonl"
jsonl_file = target_dir / jsonl_name
full_path = repo_root / path
entry_type = "file"
if full_path.is_dir():
entry_type = "directory"
if not path.endswith("/"):
path = f"{path}/"
elif not full_path.is_file():
print(colored(f"Error: Path not found: {path}", Colors.RED))
return 1
# Check if already exists
if jsonl_file.is_file():
content = jsonl_file.read_text(encoding="utf-8")
if f'"{path}"' in content:
print(colored(f"Warning: Entry already exists for {path}", Colors.YELLOW))
return 0
# Add entry
entry: dict
if entry_type == "directory":
entry = {"file": path, "type": "directory", "reason": reason}
else:
entry = {"file": path, "reason": reason}
with jsonl_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(colored(f"Added {entry_type}: {path}", Colors.GREEN))
return 0
# =============================================================================
# Command: validate
# =============================================================================
def cmd_validate(args: argparse.Namespace) -> int:
"""Validate JSONL context files."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
if not target_dir.is_dir():
print(colored("Error: task directory required", Colors.RED))
return 1
print(colored("=== Validating Context Files ===", Colors.BLUE))
print(f"Target dir: {target_dir}")
print()
total_errors = 0
for jsonl_name in ["implement.jsonl", "check.jsonl"]:
jsonl_file = target_dir / jsonl_name
errors = _validate_jsonl(jsonl_file, repo_root)
total_errors += errors
print()
if total_errors == 0:
print(colored("✓ All validations passed", Colors.GREEN))
return 0
else:
print(colored(f"✗ Validation failed ({total_errors} errors)", Colors.RED))
return 1
def _validate_jsonl(jsonl_file: Path, repo_root: Path) -> int:
"""Validate a single JSONL file.
Seed rows (no ``file`` field — typically ``{"_example": "..."}``) are
skipped silently; they are self-describing comments, not real entries.
"""
file_name = jsonl_file.name
errors = 0
if not jsonl_file.is_file():
print(f" {colored(f'{file_name}: not found (skipped)', Colors.YELLOW)}")
return 0
line_num = 0
real_entries = 0
for line in jsonl_file.read_text(encoding="utf-8").splitlines():
line_num += 1
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
print(f" {colored(f'{file_name}:{line_num}: Invalid JSON', Colors.RED)}")
errors += 1
continue
file_path = data.get("file")
entry_type = data.get("type", "file")
if not file_path:
# Seed / comment row — skip silently
continue
real_entries += 1
full_path = repo_root / file_path
if entry_type == "directory":
if not full_path.is_dir():
print(f" {colored(f'{file_name}:{line_num}: Directory not found: {file_path}', Colors.RED)}")
errors += 1
else:
if not full_path.is_file():
print(f" {colored(f'{file_name}:{line_num}: File not found: {file_path}', Colors.RED)}")
errors += 1
if errors == 0:
print(f" {colored(f'{file_name}: ✓ ({real_entries} entries)', Colors.GREEN)}")
else:
print(f" {colored(f'{file_name}: ✗ ({errors} errors)', Colors.RED)}")
return errors
# =============================================================================
# Command: list-context
# =============================================================================
def cmd_list_context(args: argparse.Namespace) -> int:
"""List JSONL context entries."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
if not target_dir.is_dir():
print(colored("Error: task directory required", Colors.RED))
return 1
print(colored("=== Context Files ===", Colors.BLUE))
print()
for jsonl_name in ["implement.jsonl", "check.jsonl"]:
jsonl_file = target_dir / jsonl_name
if not jsonl_file.is_file():
continue
print(colored(f"[{jsonl_name}]", Colors.CYAN))
count = 0
seed_only = True
for line in jsonl_file.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
file_path = data.get("file")
if not file_path:
# Seed / comment row — don't count as a real entry
continue
seed_only = False
count += 1
entry_type = data.get("type", "file")
reason = data.get("reason", "-")
if entry_type == "directory":
print(f" {colored(f'{count}.', Colors.GREEN)} [DIR] {file_path}")
else:
print(f" {colored(f'{count}.', Colors.GREEN)} {file_path}")
print(f" {colored('', Colors.YELLOW)} {reason}")
if seed_only:
print(f" {colored('(no curated entries yet — only seed row)', Colors.YELLOW)}")
print()
return 0
+188
View File
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""
Task queue utility functions.
Provides:
list_tasks_by_status - List tasks by status
list_pending_tasks - List tasks with pending status
list_tasks_by_assignee - List tasks by assignee
list_my_tasks - List tasks assigned to current developer
get_task_stats - Get P0/P1/P2/P3 counts
"""
from __future__ import annotations
from pathlib import Path
from .paths import (
get_repo_root,
get_developer,
get_tasks_dir,
)
from .tasks import iter_active_tasks
# =============================================================================
# Internal helper
# =============================================================================
def _task_to_dict(t) -> dict:
"""Convert TaskInfo to the dict format callers expect."""
return {
"priority": t.priority,
"id": t.raw.get("id", ""),
"title": t.title,
"status": t.status,
"assignee": t.assignee or "-",
"dir": t.dir_name,
"children": list(t.children),
"parent": t.parent,
}
# =============================================================================
# Public Functions
# =============================================================================
def list_tasks_by_status(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks by status.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts with keys: priority, id, title, status, assignee.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
for t in iter_active_tasks(tasks_dir):
if filter_status and t.status != filter_status:
continue
results.append(_task_to_dict(t))
return results
def list_pending_tasks(repo_root: Path | None = None) -> list[dict]:
"""List pending tasks.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
return list_tasks_by_status("planning", repo_root)
def list_tasks_by_assignee(
assignee: str,
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to a specific developer.
Args:
assignee: Developer name.
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
for t in iter_active_tasks(tasks_dir):
if (t.assignee or "-") != assignee:
continue
if filter_status and t.status != filter_status:
continue
results.append(_task_to_dict(t))
return results
def list_my_tasks(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to current developer.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
Raises:
ValueError: If developer not set.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if not developer:
raise ValueError("Developer not set")
return list_tasks_by_assignee(developer, filter_status, repo_root)
def get_task_stats(repo_root: Path | None = None) -> dict[str, int]:
"""Get task statistics.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Dict with keys: P0, P1, P2, P3, Total.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
stats = {"P0": 0, "P1": 0, "P2": 0, "P3": 0, "Total": 0}
for t in iter_active_tasks(tasks_dir):
if t.priority in stats:
stats[t.priority] += 1
stats["Total"] += 1
return stats
def format_task_stats(stats: dict[str, int]) -> str:
"""Format task stats as string.
Args:
stats: Stats dict from get_task_stats.
Returns:
Formatted string like "P0:0 P1:1 P2:2 P3:0 Total:3".
"""
return f"P0:{stats['P0']} P1:{stats['P1']} P2:{stats['P2']} P3:{stats['P3']} Total:{stats['Total']}"
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
stats = get_task_stats()
print(format_task_stats(stats))
print()
print("Pending tasks:")
for task in list_pending_tasks():
print(f" {task['priority']}|{task['id']}|{task['title']}|{task['status']}|{task['assignee']}")
+598
View File
@@ -0,0 +1,598 @@
#!/usr/bin/env python3
"""
Task CRUD operations.
Provides:
ensure_tasks_dir - Ensure tasks directory exists
cmd_create - Create a new task
cmd_archive - Archive completed task
cmd_set_branch - Set git branch for task
cmd_set_base_branch - Set PR target branch
cmd_set_scope - Set scope for PR title
cmd_add_subtask - Link child task to parent
cmd_remove_subtask - Unlink child task from parent
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from .config import (
get_packages,
is_monorepo,
resolve_package,
validate_package,
)
from .git import run_git
from .io import read_json, write_json
from .log import Colors, colored
from .paths import (
DIR_ARCHIVE,
DIR_TASKS,
DIR_WORKFLOW,
FILE_TASK_JSON,
clear_current_task,
generate_task_date_prefix,
get_current_task,
get_developer,
get_repo_root,
get_tasks_dir,
)
from .task_utils import (
archive_task_complete,
find_task_by_name,
resolve_task_dir,
run_task_hooks,
)
# =============================================================================
# Helper Functions
# =============================================================================
def _slugify(title: str) -> str:
"""Convert title to slug (only works with ASCII)."""
result = title.lower()
result = re.sub(r"[^a-z0-9]", "-", result)
result = re.sub(r"-+", "-", result)
result = result.strip("-")
return result
def ensure_tasks_dir(repo_root: Path) -> Path:
"""Ensure tasks directory exists."""
tasks_dir = get_tasks_dir(repo_root)
archive_dir = tasks_dir / "archive"
if not tasks_dir.exists():
tasks_dir.mkdir(parents=True)
print(colored(f"Created tasks directory: {tasks_dir}", Colors.GREEN), file=sys.stderr)
if not archive_dir.exists():
archive_dir.mkdir(parents=True)
return tasks_dir
# =============================================================================
# Sub-agent platform detection + JSONL seeding
# =============================================================================
# Config directories of platforms that consume implement.jsonl / check.jsonl.
# Keep in sync with src/types/ai-tools.ts AI_TOOLS entries — these are the
# platforms listed in workflow.md's "agent-capable" Skill Routing block
# (Class-1 hook-inject + Class-2 pull-based preludes). Kilo / Antigravity /
# Windsurf are NOT in this list: they do not consume JSONL.
_SUBAGENT_CONFIG_DIRS: tuple[str, ...] = (
".claude",
".cursor",
".codex",
".kiro",
".gemini",
".opencode",
".qoder",
".codebuddy",
".factory", # Factory Droid
".github/copilot",
)
_SEED_EXAMPLE = (
"Fill with {\"file\": \"<path>\", \"reason\": \"<why>\"}. "
"Put spec/research files only — no code paths. "
"Run `python3 .trellis/scripts/get_context.py --mode packages` to list available specs. "
"Delete this line once real entries are added."
)
def _has_subagent_platform(repo_root: Path) -> bool:
"""Return True if any sub-agent-capable platform is configured.
Detected by probing well-known config directories at the repo root. Used
only to decide whether ``task.py create`` should seed empty
``implement.jsonl`` / ``check.jsonl`` files.
"""
for config_dir in _SUBAGENT_CONFIG_DIRS:
if (repo_root / config_dir).is_dir():
return True
return False
def _write_seed_jsonl(path: Path) -> None:
"""Write a one-line seed JSONL file with a self-describing ``_example``.
The seed row has no ``file`` field, so downstream consumers (hooks +
preludes) that iterate entries via ``item.get("file")`` naturally skip
it. The row exists purely as an in-file prompt for the AI curator.
"""
seed = {"_example": _SEED_EXAMPLE}
path.write_text(json.dumps(seed, ensure_ascii=False) + "\n", encoding="utf-8")
# =============================================================================
# Command: create
# =============================================================================
def cmd_create(args: argparse.Namespace) -> int:
"""Create a new task."""
repo_root = get_repo_root()
if not args.title:
print(colored("Error: title is required", Colors.RED), file=sys.stderr)
return 1
# Validate --package (CLI source: fail-fast)
package: str | None = getattr(args, "package", None)
if not is_monorepo(repo_root):
# Single-repo: ignore --package, no package prefix
if package:
print(colored(f"Warning: --package ignored in single-repo project", Colors.YELLOW), file=sys.stderr)
package = None
elif package:
if not validate_package(package, repo_root):
packages = get_packages(repo_root)
available = ", ".join(sorted(packages.keys())) if packages else "(none)"
print(colored(f"Error: unknown package '{package}'. Available: {available}", Colors.RED), file=sys.stderr)
return 1
else:
# Inferred: default_package → None (no task.json yet for create)
package = resolve_package(repo_root=repo_root)
# Default assignee to current developer
assignee = args.assignee
if not assignee:
assignee = get_developer(repo_root)
if not assignee:
print(colored("Error: No developer set. Run init_developer.py first or use --assignee", Colors.RED), file=sys.stderr)
return 1
ensure_tasks_dir(repo_root)
# Get current developer as creator
creator = get_developer(repo_root) or assignee
# Generate slug if not provided
slug = args.slug or _slugify(args.title)
if not slug:
print(colored("Error: could not generate slug from title", Colors.RED), file=sys.stderr)
return 1
# Create task directory with MM-DD-slug format
tasks_dir = get_tasks_dir(repo_root)
date_prefix = generate_task_date_prefix()
dir_name = f"{date_prefix}-{slug}"
task_dir = tasks_dir / dir_name
task_json_path = task_dir / FILE_TASK_JSON
if task_dir.exists():
print(colored(f"Warning: Task directory already exists: {dir_name}", Colors.YELLOW), file=sys.stderr)
else:
task_dir.mkdir(parents=True)
today = datetime.now().strftime("%Y-%m-%d")
# Record current branch as base_branch (PR target)
_, branch_out, _ = run_git(["branch", "--show-current"], cwd=repo_root)
current_branch = branch_out.strip() or "main"
task_data = {
"id": slug,
"name": slug,
"title": args.title,
"description": args.description or "",
"status": "planning",
"dev_type": None,
"scope": None,
"package": package,
"priority": args.priority,
"creator": creator,
"assignee": assignee,
"createdAt": today,
"completedAt": None,
"branch": None,
"base_branch": current_branch,
"worktree_path": None,
"commit": None,
"pr_url": None,
"subtasks": [],
"children": [],
"parent": None,
"relatedFiles": [],
"notes": "",
"meta": {},
}
write_json(task_json_path, task_data)
# Seed implement.jsonl / check.jsonl for sub-agent-capable platforms.
# Agent curates real entries in Phase 1.3 (see .trellis/workflow.md).
# Agent-less platforms (Kilo / Antigravity / Windsurf) skip this — they
# load specs via the trellis-before-dev skill instead of JSONL.
seeded_jsonl = False
if _has_subagent_platform(repo_root):
for jsonl_name in ("implement.jsonl", "check.jsonl"):
jsonl_path = task_dir / jsonl_name
if not jsonl_path.exists():
_write_seed_jsonl(jsonl_path)
seeded_jsonl = True
# Handle --parent: establish bidirectional link
if args.parent:
parent_dir = resolve_task_dir(args.parent, repo_root)
parent_json_path = parent_dir / FILE_TASK_JSON
if not parent_json_path.is_file():
print(colored(f"Warning: Parent task.json not found: {args.parent}", Colors.YELLOW), file=sys.stderr)
else:
parent_data = read_json(parent_json_path)
if parent_data:
# Add child to parent's children list
parent_children = parent_data.get("children", [])
if dir_name not in parent_children:
parent_children.append(dir_name)
parent_data["children"] = parent_children
write_json(parent_json_path, parent_data)
# Set parent in child's task.json
task_data["parent"] = parent_dir.name
write_json(task_json_path, task_data)
print(colored(f"Linked as child of: {parent_dir.name}", Colors.GREEN), file=sys.stderr)
print(colored(f"Created task: {dir_name}", Colors.GREEN), file=sys.stderr)
print("", file=sys.stderr)
print(colored("Next steps:", Colors.BLUE), file=sys.stderr)
print(" 1. Create prd.md with requirements", file=sys.stderr)
if seeded_jsonl:
print(
" 2. Curate implement.jsonl / check.jsonl (spec + research files only — "
"see .trellis/workflow.md Phase 1.3)",
file=sys.stderr,
)
print(" 3. Run: python3 task.py start <dir>", file=sys.stderr)
else:
print(" 2. Run: python3 task.py start <dir>", file=sys.stderr)
print("", file=sys.stderr)
# Output relative path for script chaining
print(f"{DIR_WORKFLOW}/{DIR_TASKS}/{dir_name}")
run_task_hooks("after_create", task_json_path, repo_root)
return 0
# =============================================================================
# Command: archive
# =============================================================================
def cmd_archive(args: argparse.Namespace) -> int:
"""Archive completed task."""
repo_root = get_repo_root()
task_name = args.name
if not task_name:
print(colored("Error: Task name is required", Colors.RED), file=sys.stderr)
return 1
tasks_dir = get_tasks_dir(repo_root)
# Find task directory
task_dir = find_task_by_name(task_name, tasks_dir)
if not task_dir or not task_dir.is_dir():
print(colored(f"Error: Task not found: {task_name}", Colors.RED), file=sys.stderr)
print("Active tasks:", file=sys.stderr)
# Import lazily to avoid circular dependency
from .tasks import iter_active_tasks
for t in iter_active_tasks(tasks_dir):
print(f" - {t.dir_name}/", file=sys.stderr)
return 1
dir_name = task_dir.name
task_json_path = task_dir / FILE_TASK_JSON
# Update status before archiving
today = datetime.now().strftime("%Y-%m-%d")
if task_json_path.is_file():
data = read_json(task_json_path)
if data:
data["status"] = "completed"
data["completedAt"] = today
write_json(task_json_path, data)
# Handle subtask relationships on archive
task_parent = data.get("parent")
task_children = data.get("children", [])
# If this is a child, remove from parent's children list
if task_parent:
parent_dir = find_task_by_name(task_parent, tasks_dir)
if parent_dir:
parent_json = parent_dir / FILE_TASK_JSON
if parent_json.is_file():
parent_data = read_json(parent_json)
if parent_data:
parent_children = parent_data.get("children", [])
if dir_name in parent_children:
parent_children.remove(dir_name)
parent_data["children"] = parent_children
write_json(parent_json, parent_data)
# If this is a parent, clear parent field in all children
if task_children:
for child_name in task_children:
child_dir_path = find_task_by_name(child_name, tasks_dir)
if child_dir_path:
child_json = child_dir_path / FILE_TASK_JSON
if child_json.is_file():
child_data = read_json(child_json)
if child_data:
child_data["parent"] = None
write_json(child_json, child_data)
# Clear if current task
current = get_current_task(repo_root)
if current and dir_name in current:
clear_current_task(repo_root)
# Archive
result = archive_task_complete(task_dir, repo_root)
if "archived_to" in result:
archive_dest = Path(result["archived_to"])
year_month = archive_dest.parent.name
print(colored(f"Archived: {dir_name} -> archive/{year_month}/", Colors.GREEN), file=sys.stderr)
# Auto-commit unless --no-commit
if not getattr(args, "no_commit", False):
_auto_commit_archive(dir_name, repo_root)
# Return the archive path
print(f"{DIR_WORKFLOW}/{DIR_TASKS}/{DIR_ARCHIVE}/{year_month}/{dir_name}")
# Run hooks with the archived path
archived_json = archive_dest / FILE_TASK_JSON
run_task_hooks("after_archive", archived_json, repo_root)
return 0
return 1
def _auto_commit_archive(task_name: str, repo_root: Path) -> None:
"""Stage .trellis/tasks/ changes and commit after archive."""
tasks_rel = f"{DIR_WORKFLOW}/{DIR_TASKS}"
run_git(["add", "-A", tasks_rel], cwd=repo_root)
# Check if there are staged changes
rc, _, _ = run_git(
["diff", "--cached", "--quiet", "--", tasks_rel], cwd=repo_root
)
if rc == 0:
print("[OK] No task changes to commit.", file=sys.stderr)
return
commit_msg = f"chore(task): archive {task_name}"
rc, _, err = run_git(["commit", "-m", commit_msg], cwd=repo_root)
if rc == 0:
print(f"[OK] Auto-committed: {commit_msg}", file=sys.stderr)
else:
print(f"[WARN] Auto-commit failed: {err.strip()}", file=sys.stderr)
# =============================================================================
# Command: add-subtask
# =============================================================================
def cmd_add_subtask(args: argparse.Namespace) -> int:
"""Link a child task to a parent task."""
repo_root = get_repo_root()
parent_dir = resolve_task_dir(args.parent_dir, repo_root)
child_dir = resolve_task_dir(args.child_dir, repo_root)
parent_json_path = parent_dir / FILE_TASK_JSON
child_json_path = child_dir / FILE_TASK_JSON
if not parent_json_path.is_file():
print(colored(f"Error: Parent task.json not found: {args.parent_dir}", Colors.RED), file=sys.stderr)
return 1
if not child_json_path.is_file():
print(colored(f"Error: Child task.json not found: {args.child_dir}", Colors.RED), file=sys.stderr)
return 1
parent_data = read_json(parent_json_path)
child_data = read_json(child_json_path)
if not parent_data or not child_data:
print(colored("Error: Failed to read task.json", Colors.RED), file=sys.stderr)
return 1
# Check if child already has a parent
existing_parent = child_data.get("parent")
if existing_parent:
print(colored(f"Error: Child task already has a parent: {existing_parent}", Colors.RED), file=sys.stderr)
return 1
# Add child to parent's children list
parent_children = parent_data.get("children", [])
child_dir_name = child_dir.name
if child_dir_name not in parent_children:
parent_children.append(child_dir_name)
parent_data["children"] = parent_children
# Set parent in child's task.json
child_data["parent"] = parent_dir.name
# Write both
write_json(parent_json_path, parent_data)
write_json(child_json_path, child_data)
print(colored(f"Linked: {child_dir.name} -> {parent_dir.name}", Colors.GREEN), file=sys.stderr)
return 0
# =============================================================================
# Command: remove-subtask
# =============================================================================
def cmd_remove_subtask(args: argparse.Namespace) -> int:
"""Unlink a child task from a parent task."""
repo_root = get_repo_root()
parent_dir = resolve_task_dir(args.parent_dir, repo_root)
child_dir = resolve_task_dir(args.child_dir, repo_root)
parent_json_path = parent_dir / FILE_TASK_JSON
child_json_path = child_dir / FILE_TASK_JSON
if not parent_json_path.is_file():
print(colored(f"Error: Parent task.json not found: {args.parent_dir}", Colors.RED), file=sys.stderr)
return 1
if not child_json_path.is_file():
print(colored(f"Error: Child task.json not found: {args.child_dir}", Colors.RED), file=sys.stderr)
return 1
parent_data = read_json(parent_json_path)
child_data = read_json(child_json_path)
if not parent_data or not child_data:
print(colored("Error: Failed to read task.json", Colors.RED), file=sys.stderr)
return 1
# Remove child from parent's children list
parent_children = parent_data.get("children", [])
child_dir_name = child_dir.name
if child_dir_name in parent_children:
parent_children.remove(child_dir_name)
parent_data["children"] = parent_children
# Clear parent in child's task.json
child_data["parent"] = None
# Write both
write_json(parent_json_path, parent_data)
write_json(child_json_path, child_data)
print(colored(f"Unlinked: {child_dir.name} from {parent_dir.name}", Colors.GREEN), file=sys.stderr)
return 0
# =============================================================================
# Command: set-branch
# =============================================================================
def cmd_set_branch(args: argparse.Namespace) -> int:
"""Set git branch for task."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
branch = args.branch
if not branch:
print(colored("Error: Missing arguments", Colors.RED))
print("Usage: python3 task.py set-branch <task-dir> <branch-name>")
return 1
task_json = target_dir / FILE_TASK_JSON
if not task_json.is_file():
print(colored(f"Error: task.json not found at {target_dir}", Colors.RED))
return 1
data = read_json(task_json)
if not data:
return 1
data["branch"] = branch
write_json(task_json, data)
print(colored(f"✓ Branch set to: {branch}", Colors.GREEN))
return 0
# =============================================================================
# Command: set-base-branch
# =============================================================================
def cmd_set_base_branch(args: argparse.Namespace) -> int:
"""Set the base branch (PR target) for task."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
base_branch = args.base_branch
if not base_branch:
print(colored("Error: Missing arguments", Colors.RED))
print("Usage: python3 task.py set-base-branch <task-dir> <base-branch>")
print("Example: python3 task.py set-base-branch <dir> develop")
print()
print("This sets the target branch for PR (the branch your feature will merge into).")
return 1
task_json = target_dir / FILE_TASK_JSON
if not task_json.is_file():
print(colored(f"Error: task.json not found at {target_dir}", Colors.RED))
return 1
data = read_json(task_json)
if not data:
return 1
data["base_branch"] = base_branch
write_json(task_json, data)
print(colored(f"✓ Base branch set to: {base_branch}", Colors.GREEN))
print(f" PR will target: {base_branch}")
return 0
# =============================================================================
# Command: set-scope
# =============================================================================
def cmd_set_scope(args: argparse.Namespace) -> int:
"""Set scope for PR title."""
repo_root = get_repo_root()
target_dir = resolve_task_dir(args.dir, repo_root)
scope = args.scope
if not scope:
print(colored("Error: Missing arguments", Colors.RED))
print("Usage: python3 task.py set-scope <task-dir> <scope>")
return 1
task_json = target_dir / FILE_TASK_JSON
if not task_json.is_file():
print(colored(f"Error: task.json not found at {target_dir}", Colors.RED))
return 1
data = read_json(task_json)
if not data:
return 1
data["scope"] = scope
write_json(task_json, data)
print(colored(f"✓ Scope set to: {scope}", Colors.GREEN))
return 0
+274
View File
@@ -0,0 +1,274 @@
#!/usr/bin/env python3
"""
Task utility functions.
Provides:
is_safe_task_path - Validate task path is safe to operate on
find_task_by_name - Find task directory by name
resolve_task_dir - Resolve task directory from name, relative, or absolute path
archive_task_dir - Archive task to monthly directory
run_task_hooks - Run lifecycle hooks for task events
"""
from __future__ import annotations
import shutil
import sys
from datetime import datetime
from pathlib import Path
from .paths import get_repo_root, get_tasks_dir
# =============================================================================
# Path Safety
# =============================================================================
def is_safe_task_path(task_path: str, repo_root: Path | None = None) -> bool:
"""Check if a relative task path is safe to operate on.
Args:
task_path: Task path (relative to repo_root).
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if safe, False if dangerous.
"""
if repo_root is None:
repo_root = get_repo_root()
normalized = task_path.replace("\\", "/")
# Check empty or null
if not normalized or normalized == "null":
print("Error: empty or null task path", file=sys.stderr)
return False
# Reject absolute paths
if Path(task_path).is_absolute():
print(f"Error: absolute path not allowed: {task_path}", file=sys.stderr)
return False
# Reject ".", "..", paths starting with "./" or "../", or containing ".."
if normalized in (".", "..") or normalized.startswith("./") or normalized.startswith("../") or ".." in normalized:
print(f"Error: path traversal not allowed: {task_path}", file=sys.stderr)
return False
# Final check: ensure resolved path is not the repo root
abs_path = repo_root / Path(normalized)
if abs_path.exists():
try:
resolved = abs_path.resolve()
root_resolved = repo_root.resolve()
if resolved == root_resolved:
print(f"Error: path resolves to repo root: {task_path}", file=sys.stderr)
return False
except (OSError, IOError):
pass
return True
# =============================================================================
# Task Lookup
# =============================================================================
def find_task_by_name(task_name: str, tasks_dir: Path) -> Path | None:
"""Find task directory by name (exact or suffix match).
Args:
task_name: Task name to find.
tasks_dir: Tasks directory path.
Returns:
Absolute path to task directory, or None if not found.
"""
if not task_name or not tasks_dir or not tasks_dir.is_dir():
return None
# Try exact match first
exact_match = tasks_dir / task_name
if exact_match.is_dir():
return exact_match
# Try suffix match (e.g., "my-task" matches "01-21-my-task")
for d in tasks_dir.iterdir():
if d.is_dir() and d.name.endswith(f"-{task_name}"):
return d
return None
# =============================================================================
# Archive Operations
# =============================================================================
def archive_task_dir(task_dir_abs: Path, repo_root: Path | None = None) -> Path | None:
"""Archive a task directory to archive/{YYYY-MM}/.
Args:
task_dir_abs: Absolute path to task directory.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to archived directory, or None on error.
"""
if not task_dir_abs.is_dir():
print(f"Error: task directory not found: {task_dir_abs}", file=sys.stderr)
return None
# Get tasks directory (parent of the task)
tasks_dir = task_dir_abs.parent
archive_dir = tasks_dir / "archive"
year_month = datetime.now().strftime("%Y-%m")
month_dir = archive_dir / year_month
# Create archive directory
try:
month_dir.mkdir(parents=True, exist_ok=True)
except (OSError, IOError) as e:
print(f"Error: Failed to create archive directory: {e}", file=sys.stderr)
return None
# Move task to archive
task_name = task_dir_abs.name
dest = month_dir / task_name
try:
shutil.move(str(task_dir_abs), str(dest))
except (OSError, IOError, shutil.Error) as e:
print(f"Error: Failed to move task to archive: {e}", file=sys.stderr)
return None
return dest
def archive_task_complete(
task_dir_abs: Path,
repo_root: Path | None = None
) -> dict[str, str]:
"""Complete archive workflow: archive directory.
Args:
task_dir_abs: Absolute path to task directory.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Dict with archive result info.
"""
if not task_dir_abs.is_dir():
print(f"Error: task directory not found: {task_dir_abs}", file=sys.stderr)
return {}
archive_dest = archive_task_dir(task_dir_abs, repo_root)
if archive_dest:
return {"archived_to": str(archive_dest)}
return {}
# =============================================================================
# Task Directory Resolution
# =============================================================================
def resolve_task_dir(target_dir: str, repo_root: Path) -> Path:
"""Resolve task directory to absolute path.
Supports:
- Absolute path: /path/to/task
- Relative path: .trellis/tasks/01-31-my-task
- Task name: my-task (uses find_task_by_name for lookup)
Args:
target_dir: Task directory specification.
repo_root: Repository root path.
Returns:
Resolved absolute path.
"""
if not target_dir:
return Path()
normalized = target_dir.replace("\\", "/")
while normalized.startswith("./"):
normalized = normalized[2:]
# Absolute path
if Path(target_dir).is_absolute():
return Path(target_dir)
# Relative path (contains path separator or starts with .trellis)
if "/" in normalized or normalized.startswith(".trellis"):
return repo_root / Path(normalized)
# Task name - try to find in tasks directory
tasks_dir = get_tasks_dir(repo_root)
found = find_task_by_name(target_dir, tasks_dir)
if found:
return found
# Fallback to treating as relative path
return repo_root / Path(normalized)
# =============================================================================
# Lifecycle Hooks
# =============================================================================
def run_task_hooks(event: str, task_json_path: Path, repo_root: Path) -> None:
"""Run lifecycle hooks for a task event.
Args:
event: Event name (e.g. "after_create").
task_json_path: Absolute path to the task's task.json.
repo_root: Repository root for cwd and config lookup.
"""
import os
import subprocess
from .config import get_hooks
from .log import Colors, colored
commands = get_hooks(event, repo_root)
if not commands:
return
env = {**os.environ, "TASK_JSON_PATH": str(task_json_path)}
for cmd in commands:
try:
result = subprocess.run(
cmd,
shell=True,
cwd=repo_root,
env=env,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
if result.returncode != 0:
print(
colored(f"[WARN] Hook failed ({event}): {cmd}", Colors.YELLOW),
file=sys.stderr,
)
if result.stderr.strip():
print(f" {result.stderr.strip()}", file=sys.stderr)
except Exception as e:
print(
colored(f"[WARN] Hook error ({event}): {cmd}{e}", Colors.YELLOW),
file=sys.stderr,
)
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
repo = get_repo_root()
tasks = get_tasks_dir(repo)
print(f"Tasks dir: {tasks}")
print(f"is_safe_task_path('.trellis/tasks/test'): {is_safe_task_path('.trellis/tasks/test', repo)}")
print(f"is_safe_task_path('../test'): {is_safe_task_path('../test', repo)}")
+109
View File
@@ -0,0 +1,109 @@
"""
Task data access layer.
Single source of truth for loading and iterating task directories.
Replaces scattered task.json parsing across 9+ files.
Provides:
load_task — Load a single task by directory path
iter_active_tasks — Iterate all non-archived tasks (sorted)
get_all_statuses — Get {dir_name: status} map for children progress
"""
from __future__ import annotations
from collections.abc import Iterator
from pathlib import Path
from .io import read_json
from .paths import FILE_TASK_JSON
from .types import TaskInfo
def load_task(task_dir: Path) -> TaskInfo | None:
"""Load task from a directory containing task.json.
Args:
task_dir: Absolute path to the task directory.
Returns:
TaskInfo if task.json exists and is valid, None otherwise.
"""
task_json = task_dir / FILE_TASK_JSON
if not task_json.is_file():
return None
data = read_json(task_json)
if not data:
return None
return TaskInfo(
dir_name=task_dir.name,
directory=task_dir,
title=data.get("title") or data.get("name") or "unknown",
status=data.get("status", "unknown"),
assignee=data.get("assignee", ""),
priority=data.get("priority", "P2"),
children=tuple(data.get("children", [])),
parent=data.get("parent"),
package=data.get("package"),
raw=data,
)
def iter_active_tasks(tasks_dir: Path) -> Iterator[TaskInfo]:
"""Iterate all active (non-archived) tasks, sorted by directory name.
Skips the "archive" directory and directories without valid task.json.
Args:
tasks_dir: Path to the tasks directory.
Yields:
TaskInfo for each valid task.
"""
if not tasks_dir.is_dir():
return
for d in sorted(tasks_dir.iterdir()):
if not d.is_dir() or d.name == "archive":
continue
info = load_task(d)
if info is not None:
yield info
def get_all_statuses(tasks_dir: Path) -> dict[str, str]:
"""Get a {dir_name: status} mapping for all active tasks.
Useful for computing children progress without loading full TaskInfo.
Args:
tasks_dir: Path to the tasks directory.
Returns:
Dict mapping directory names to status strings.
"""
return {t.dir_name: t.status for t in iter_active_tasks(tasks_dir)}
def children_progress(
children: tuple[str, ...] | list[str],
all_statuses: dict[str, str],
) -> str:
"""Format children progress string like " [2/3 done]".
Args:
children: List of child directory names.
all_statuses: Status map from get_all_statuses().
Returns:
Formatted string, or "" if no children.
"""
if not children:
return ""
done = sum(
1 for c in children
if all_statuses.get(c) in ("completed", "done")
)
return f" [{done}/{len(children)} done]"
+110
View File
@@ -0,0 +1,110 @@
"""
Core type definitions for Trellis task data.
Provides:
TaskData — TypedDict for task.json shape (read-path type hints only)
TaskInfo — Frozen dataclass for loaded task (the public API type)
AgentRecord — TypedDict for registry.json agent entries
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import TypedDict
# =============================================================================
# task.json shape (TypedDict — used only for read-path type hints)
# =============================================================================
class TaskData(TypedDict, total=False):
"""Shape of task.json on disk.
Used only for type annotations when reading task.json.
Writes must use the original dict to avoid losing unknown fields.
"""
id: str
name: str
title: str
description: str
status: str
dev_type: str
scope: str | None
package: str | None
priority: str
creator: str
assignee: str
createdAt: str
completedAt: str | None
branch: str | None
base_branch: str | None
worktree_path: str | None
commit: str | None
pr_url: str | None
subtasks: list[str]
children: list[str]
parent: str | None
relatedFiles: list[str]
notes: str
meta: dict
# =============================================================================
# Loaded task object (frozen dataclass — the public API type)
# =============================================================================
@dataclass(frozen=True)
class TaskInfo:
"""Immutable view of a loaded task.
Created by load_task() / iter_active_tasks().
Contains the commonly accessed fields; the original dict
is preserved in `raw` for write-back and uncommon field access.
"""
dir_name: str
directory: Path
title: str
status: str
assignee: str
priority: str
children: tuple[str, ...]
parent: str | None
package: str | None
raw: dict # original dict — use for writes and uncommon fields
@property
def name(self) -> str:
"""Task name (id or name field)."""
return self.raw.get("name") or self.raw.get("id") or self.dir_name
@property
def description(self) -> str:
return self.raw.get("description", "")
@property
def branch(self) -> str | None:
return self.raw.get("branch")
@property
def meta(self) -> dict:
return self.raw.get("meta", {})
# =============================================================================
# registry.json agent entry
# =============================================================================
class AgentRecord(TypedDict, total=False):
"""Shape of an agent entry in registry.json."""
id: str
pid: int
task_dir: str
worktree_path: str
branch: str
platform: str
started_at: str
status: str
+176
View File
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Workflow Phase Extraction.
Extracts step-level content from .trellis/workflow.md and optionally filters
platform-specific blocks.
Platform marker syntax in workflow.md:
[Claude Code, Cursor, ...]
agent-capable content
[/Claude Code, Cursor, ...]
Provides:
get_phase_index - Extract the Phase Index section (no --step)
get_step - Extract a single step (#### X.X) section
filter_platform - Strip platform blocks that don't include the given name
"""
from __future__ import annotations
import re
from .paths import DIR_WORKFLOW, get_repo_root
def _workflow_md_path():
return get_repo_root() / DIR_WORKFLOW / "workflow.md"
# Match a line that *is* a platform marker: "[A, B, C]" or "[/A, B, C]"
_MARKER_RE = re.compile(r"^\[(/?)([A-Za-z][^\[\]]*)\]\s*$")
# Step heading: "#### 1.0 Title" or "#### 1.0 ..."
_STEP_HEADING_RE = re.compile(r"^####\s+(\d+\.\d+)\b.*$")
# Phase Index starts here; Phase 1/2/3 step bodies follow; ends at Breadcrumbs.
_PHASE_INDEX_HEADING = "## Phase Index"
def _read_workflow() -> str:
path = _workflow_md_path()
if not path.exists():
raise FileNotFoundError(f"workflow.md not found: {path}")
return path.read_text(encoding="utf-8")
def _parse_marker(line: str) -> tuple[bool, list[str]] | None:
"""Parse a platform marker line.
Returns:
(is_closing, [platform_names]) if line is a marker, else None.
"""
m = _MARKER_RE.match(line)
if not m:
return None
is_closing = m.group(1) == "/"
names = [p.strip() for p in m.group(2).split(",") if p.strip()]
return is_closing, names
def get_phase_index() -> str:
"""Return Phase Index + Phase 1/2/3 step bodies from workflow.md.
Matches what the SessionStart hook injects into the `<workflow>` block:
starts at `## Phase Index`, continues through `## Phase 1: Plan`,
`## Phase 2: Execute`, `## Phase 3: Finish`, stops at
`## Workflow State Breadcrumbs` (consumed by UserPromptSubmit hook).
"""
text = _read_workflow()
lines = text.splitlines()
start: int | None = None
end: int | None = None
for i, line in enumerate(lines):
stripped = line.strip()
if start is None and stripped == _PHASE_INDEX_HEADING:
start = i
continue
if start is not None and stripped == "## Workflow State Breadcrumbs":
end = i
break
if start is None:
return ""
if end is None:
end = len(lines)
return "\n".join(lines[start:end]).rstrip() + "\n"
def get_step(step_id: str) -> str:
"""Return the `#### X.X` section matching step_id (header + body).
Body ends at the next `####` or `---` or `##` heading (whichever comes first).
"""
text = _read_workflow()
lines = text.splitlines()
start: int | None = None
for i, line in enumerate(lines):
m = _STEP_HEADING_RE.match(line)
if m and m.group(1) == step_id:
start = i
break
if start is None:
return ""
end: int = len(lines)
for j in range(start + 1, len(lines)):
line = lines[j]
if line.startswith("#### "):
end = j
break
if line.startswith("## "):
end = j
break
# Horizontal rule at column 0
if line.strip() == "---":
end = j
break
return "\n".join(lines[start:end]).rstrip() + "\n"
def _platform_matches(platform: str, block_names: list[str]) -> bool:
"""Case-insensitive fuzzy match: accept 'cursor', 'Cursor', 'claude-code', 'Claude Code'."""
needle = platform.lower().replace("-", "").replace("_", "").replace(" ", "")
for name in block_names:
hay = name.lower().replace("-", "").replace("_", "").replace(" ", "")
if needle == hay:
return True
return False
def filter_platform(content: str, platform: str) -> str:
"""Keep lines outside any `[...]` block + lines inside blocks that include platform.
Marker lines themselves are dropped from the output.
"""
lines = content.splitlines()
out: list[str] = []
in_block = False
keep_block = False
for line in lines:
marker = _parse_marker(line)
if marker is not None:
is_closing, names = marker
if not is_closing:
in_block = True
keep_block = _platform_matches(platform, names)
else:
in_block = False
keep_block = False
continue # drop the marker line itself
if in_block:
if keep_block:
out.append(line)
continue
out.append(line)
# Collapse runs of 3+ blank lines that may arise from dropped markers
collapsed: list[str] = []
blank_run = 0
for line in out:
if line.strip() == "":
blank_run += 1
if blank_run <= 2:
collapsed.append(line)
else:
blank_run = 0
collapsed.append(line)
return "\n".join(collapsed).rstrip() + "\n"