Files
2026-04-27 18:40:30 +08:00

390 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Trellis configuration reader.
Reads settings from .trellis/config.yaml with sensible defaults.
"""
from __future__ import annotations
import sys
from pathlib import Path
from .paths import DIR_WORKFLOW, get_repo_root
# =============================================================================
# YAML Simple Parser (no dependencies)
# =============================================================================
def _unquote(s: str) -> str:
"""Remove exactly one layer of matching surrounding quotes.
Unlike str.strip('"'), this only removes the outermost pair,
preserving any nested quotes inside the value.
Examples:
_unquote('"hello"') -> 'hello'
_unquote("'hello'") -> 'hello'
_unquote('"echo \\'hi\\'"') -> "echo 'hi'"
_unquote('hello') -> 'hello'
_unquote('"hello\\'') -> '"hello\\'' (mismatched, unchanged)
"""
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
return s[1:-1]
return s
def parse_simple_yaml(content: str) -> dict:
"""Parse simple YAML with nested dict support (no dependencies).
Supports:
- key: value (string)
- key: (followed by list items)
- item1
- item2
- key: (followed by nested dict)
nested_key: value
nested_key2:
- item
Uses indentation to detect nesting (2+ spaces deeper = child).
Args:
content: YAML content string.
Returns:
Parsed dict (values can be str, list[str], or dict).
"""
lines = content.splitlines()
result: dict = {}
_parse_yaml_block(lines, 0, 0, result)
return result
def _parse_yaml_block(
lines: list[str], start: int, min_indent: int, target: dict
) -> int:
"""Parse a YAML block into target dict, returning next line index."""
i = start
current_list: list | None = None
while i < len(lines):
line = lines[i]
stripped = line.strip()
# Skip empty lines and comments
if not stripped or stripped.startswith("#"):
i += 1
continue
# Calculate indentation
indent = len(line) - len(line.lstrip())
# If dedented past our block, we're done
if indent < min_indent:
break
if stripped.startswith("- "):
if current_list is not None:
current_list.append(_unquote(stripped[2:].strip()))
i += 1
elif ":" in stripped:
key, _, value = stripped.partition(":")
key = key.strip()
value = _unquote(value.strip())
current_list = None
if value:
# key: value
target[key] = value
i += 1
else:
# key: (no value) — peek ahead to determine list vs nested dict
next_i, next_line = _next_content_line(lines, i + 1)
if next_i >= len(lines):
target[key] = {}
i = next_i
elif next_line.strip().startswith("- "):
# It's a list
current_list = []
target[key] = current_list
i += 1
else:
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent > indent:
# It's a nested dict
nested: dict = {}
target[key] = nested
i = _parse_yaml_block(lines, i + 1, next_indent, nested)
else:
# Empty value, same or less indent follows
target[key] = {}
i += 1
else:
i += 1
return i
def _next_content_line(lines: list[str], start: int) -> tuple[int, str]:
"""Find the next non-empty, non-comment line."""
i = start
while i < len(lines):
stripped = lines[i].strip()
if stripped and not stripped.startswith("#"):
return i, lines[i]
i += 1
return i, ""
# Defaults
DEFAULT_SESSION_COMMIT_MESSAGE = "chore: record journal"
DEFAULT_MAX_JOURNAL_LINES = 2000
CONFIG_FILE = "config.yaml"
def _is_true_config_value(value: object) -> bool:
"""Return True when a config value represents an enabled flag."""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() == "true"
return False
def _get_config_path(repo_root: Path | None = None) -> Path:
"""Get path to config.yaml."""
root = repo_root or get_repo_root()
return root / DIR_WORKFLOW / CONFIG_FILE
def _load_config(repo_root: Path | None = None) -> dict:
"""Load and parse config.yaml. Returns empty dict on any error."""
config_file = _get_config_path(repo_root)
try:
content = config_file.read_text(encoding="utf-8")
return parse_simple_yaml(content)
except (OSError, IOError):
return {}
def get_session_commit_message(repo_root: Path | None = None) -> str:
"""Get the commit message for auto-committing session records."""
config = _load_config(repo_root)
return config.get("session_commit_message", DEFAULT_SESSION_COMMIT_MESSAGE)
def get_max_journal_lines(repo_root: Path | None = None) -> int:
"""Get the maximum lines per journal file."""
config = _load_config(repo_root)
value = config.get("max_journal_lines", DEFAULT_MAX_JOURNAL_LINES)
try:
return int(value)
except (ValueError, TypeError):
return DEFAULT_MAX_JOURNAL_LINES
def get_hooks(event: str, repo_root: Path | None = None) -> list[str]:
"""Get hook commands for a lifecycle event.
Args:
event: Event name (e.g. "after_create", "after_archive").
repo_root: Repository root path.
Returns:
List of shell commands to execute, empty if none configured.
"""
config = _load_config(repo_root)
hooks = config.get("hooks")
if not isinstance(hooks, dict):
return []
commands = hooks.get(event)
if isinstance(commands, list):
return [str(c) for c in commands]
return []
# =============================================================================
# Monorepo / Packages
# =============================================================================
def get_packages(repo_root: Path | None = None) -> dict[str, dict] | None:
"""Get monorepo package declarations.
Returns:
Dict mapping package name to its config (path, type, etc.),
or None if not configured (single-repo mode).
Example return:
{"cli": {"path": "packages/cli"}, "docs-site": {"path": "docs-site", "type": "submodule"}}
"""
config = _load_config(repo_root)
packages = config.get("packages")
if not isinstance(packages, dict):
return None
# Ensure each value is a dict (filter out scalar entries)
filtered = {k: v for k, v in packages.items() if isinstance(v, dict)}
if not filtered:
return None
return filtered
def get_default_package(repo_root: Path | None = None) -> str | None:
"""Get the default package name from config.
Returns:
Package name string, or None if not configured.
"""
config = _load_config(repo_root)
value = config.get("default_package")
return str(value) if value else None
def get_submodule_packages(repo_root: Path | None = None) -> dict[str, str]:
"""Get packages that are git submodules.
Returns:
Dict mapping package name to its path for submodule-type packages.
Empty dict if none configured.
Example return:
{"docs-site": "docs-site"}
"""
packages = get_packages(repo_root)
if packages is None:
return {}
return {
name: cfg.get("path", name)
for name, cfg in packages.items()
if cfg.get("type") == "submodule"
}
def get_git_packages(repo_root: Path | None = None) -> dict[str, str]:
"""Get packages that have their own independent git repository.
These are sub-directories with their own .git (not submodules),
marked with ``git: true`` in config.yaml.
Returns:
Dict mapping package name to its path for git-repo packages.
Empty dict if none configured.
Example config::
packages:
backend:
path: iqs
git: true
Example return::
{"backend": "iqs"}
"""
packages = get_packages(repo_root)
if packages is None:
return {}
return {
name: cfg.get("path", name)
for name, cfg in packages.items()
if _is_true_config_value(cfg.get("git"))
}
def is_monorepo(repo_root: Path | None = None) -> bool:
"""Check if the project is configured as a monorepo (has packages in config)."""
return get_packages(repo_root) is not None
def get_spec_base(package: str | None = None, repo_root: Path | None = None) -> str:
"""Get the spec directory base path relative to .trellis/.
Single-repo: returns "spec"
Monorepo with package: returns "spec/<package>"
Monorepo without package: returns "spec" (caller should specify package)
"""
if package and is_monorepo(repo_root):
return f"spec/{package}"
return "spec"
def validate_package(package: str, repo_root: Path | None = None) -> bool:
"""Check if a package name is valid in this project.
Single-repo (no packages configured): always returns True.
Monorepo: returns True only if package exists in config.yaml packages.
"""
packages = get_packages(repo_root)
if packages is None:
return True # Single-repo, no validation needed
return package in packages
def resolve_package(
task_package: str | None = None,
repo_root: Path | None = None,
) -> str | None:
"""Resolve package from inferred sources with validation.
Checks in order: task_package → default_package.
Invalid inferred values print a warning to stderr and are skipped.
Returns:
Resolved package name, or None if no valid package found.
Note:
CLI --package should be validated separately by the caller
(fail-fast with available packages list on error).
"""
packages = get_packages(repo_root)
if packages is None:
return None # Single-repo, no package needed
# Try task_package (guard against non-string values from malformed JSON)
if task_package and isinstance(task_package, str):
if task_package in packages:
return task_package
print(
f"Warning: task.json package '{task_package}' not found in config, skipping",
file=sys.stderr,
)
# Try default_package
default = get_default_package(repo_root)
if default:
if default in packages:
return default
print(
f"Warning: default_package '{default}' not found in config, skipping",
file=sys.stderr,
)
return None
def get_spec_scope(repo_root: Path | None = None) -> list[str] | str | None:
"""Get session.spec_scope configuration.
Returns:
list[str]: Package names to include in spec scanning.
str: "active_task" to use current task's package.
None: No scope configured (scan all packages).
"""
config = _load_config(repo_root)
session = config.get("session")
if not isinstance(session, dict):
return None
scope = session.get("spec_scope")
if scope is None:
return None
if isinstance(scope, str):
return scope # e.g. "active_task"
if isinstance(scope, list):
return [str(s) for s in scope]
return None