#!/usr/bin/env python3 """ Trellis configuration reader. Reads settings from .trellis/config.yaml with sensible defaults. """ from __future__ import annotations import sys from pathlib import Path from .paths import DIR_WORKFLOW, get_repo_root # ============================================================================= # YAML Simple Parser (no dependencies) # ============================================================================= def _unquote(s: str) -> str: """Remove exactly one layer of matching surrounding quotes. Unlike str.strip('"'), this only removes the outermost pair, preserving any nested quotes inside the value. Examples: _unquote('"hello"') -> 'hello' _unquote("'hello'") -> 'hello' _unquote('"echo \\'hi\\'"') -> "echo 'hi'" _unquote('hello') -> 'hello' _unquote('"hello\\'') -> '"hello\\'' (mismatched, unchanged) """ if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"): return s[1:-1] return s def parse_simple_yaml(content: str) -> dict: """Parse simple YAML with nested dict support (no dependencies). Supports: - key: value (string) - key: (followed by list items) - item1 - item2 - key: (followed by nested dict) nested_key: value nested_key2: - item Uses indentation to detect nesting (2+ spaces deeper = child). Args: content: YAML content string. Returns: Parsed dict (values can be str, list[str], or dict). """ lines = content.splitlines() result: dict = {} _parse_yaml_block(lines, 0, 0, result) return result def _parse_yaml_block( lines: list[str], start: int, min_indent: int, target: dict ) -> int: """Parse a YAML block into target dict, returning next line index.""" i = start current_list: list | None = None while i < len(lines): line = lines[i] stripped = line.strip() # Skip empty lines and comments if not stripped or stripped.startswith("#"): i += 1 continue # Calculate indentation indent = len(line) - len(line.lstrip()) # If dedented past our block, we're done if indent < min_indent: break if stripped.startswith("- "): if current_list is not None: current_list.append(_unquote(stripped[2:].strip())) i += 1 elif ":" in stripped: key, _, value = stripped.partition(":") key = key.strip() value = _unquote(value.strip()) current_list = None if value: # key: value target[key] = value i += 1 else: # key: (no value) — peek ahead to determine list vs nested dict next_i, next_line = _next_content_line(lines, i + 1) if next_i >= len(lines): target[key] = {} i = next_i elif next_line.strip().startswith("- "): # It's a list current_list = [] target[key] = current_list i += 1 else: next_indent = len(next_line) - len(next_line.lstrip()) if next_indent > indent: # It's a nested dict nested: dict = {} target[key] = nested i = _parse_yaml_block(lines, i + 1, next_indent, nested) else: # Empty value, same or less indent follows target[key] = {} i += 1 else: i += 1 return i def _next_content_line(lines: list[str], start: int) -> tuple[int, str]: """Find the next non-empty, non-comment line.""" i = start while i < len(lines): stripped = lines[i].strip() if stripped and not stripped.startswith("#"): return i, lines[i] i += 1 return i, "" # Defaults DEFAULT_SESSION_COMMIT_MESSAGE = "chore: record journal" DEFAULT_MAX_JOURNAL_LINES = 2000 CONFIG_FILE = "config.yaml" def _is_true_config_value(value: object) -> bool: """Return True when a config value represents an enabled flag.""" if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() == "true" return False def _get_config_path(repo_root: Path | None = None) -> Path: """Get path to config.yaml.""" root = repo_root or get_repo_root() return root / DIR_WORKFLOW / CONFIG_FILE def _load_config(repo_root: Path | None = None) -> dict: """Load and parse config.yaml. Returns empty dict on any error.""" config_file = _get_config_path(repo_root) try: content = config_file.read_text(encoding="utf-8") return parse_simple_yaml(content) except (OSError, IOError): return {} def get_session_commit_message(repo_root: Path | None = None) -> str: """Get the commit message for auto-committing session records.""" config = _load_config(repo_root) return config.get("session_commit_message", DEFAULT_SESSION_COMMIT_MESSAGE) def get_max_journal_lines(repo_root: Path | None = None) -> int: """Get the maximum lines per journal file.""" config = _load_config(repo_root) value = config.get("max_journal_lines", DEFAULT_MAX_JOURNAL_LINES) try: return int(value) except (ValueError, TypeError): return DEFAULT_MAX_JOURNAL_LINES def get_hooks(event: str, repo_root: Path | None = None) -> list[str]: """Get hook commands for a lifecycle event. Args: event: Event name (e.g. "after_create", "after_archive"). repo_root: Repository root path. Returns: List of shell commands to execute, empty if none configured. """ config = _load_config(repo_root) hooks = config.get("hooks") if not isinstance(hooks, dict): return [] commands = hooks.get(event) if isinstance(commands, list): return [str(c) for c in commands] return [] # ============================================================================= # Monorepo / Packages # ============================================================================= def get_packages(repo_root: Path | None = None) -> dict[str, dict] | None: """Get monorepo package declarations. Returns: Dict mapping package name to its config (path, type, etc.), or None if not configured (single-repo mode). Example return: {"cli": {"path": "packages/cli"}, "docs-site": {"path": "docs-site", "type": "submodule"}} """ config = _load_config(repo_root) packages = config.get("packages") if not isinstance(packages, dict): return None # Ensure each value is a dict (filter out scalar entries) filtered = {k: v for k, v in packages.items() if isinstance(v, dict)} if not filtered: return None return filtered def get_default_package(repo_root: Path | None = None) -> str | None: """Get the default package name from config. Returns: Package name string, or None if not configured. """ config = _load_config(repo_root) value = config.get("default_package") return str(value) if value else None def get_submodule_packages(repo_root: Path | None = None) -> dict[str, str]: """Get packages that are git submodules. Returns: Dict mapping package name to its path for submodule-type packages. Empty dict if none configured. Example return: {"docs-site": "docs-site"} """ packages = get_packages(repo_root) if packages is None: return {} return { name: cfg.get("path", name) for name, cfg in packages.items() if cfg.get("type") == "submodule" } def get_git_packages(repo_root: Path | None = None) -> dict[str, str]: """Get packages that have their own independent git repository. These are sub-directories with their own .git (not submodules), marked with ``git: true`` in config.yaml. Returns: Dict mapping package name to its path for git-repo packages. Empty dict if none configured. Example config:: packages: backend: path: iqs git: true Example return:: {"backend": "iqs"} """ packages = get_packages(repo_root) if packages is None: return {} return { name: cfg.get("path", name) for name, cfg in packages.items() if _is_true_config_value(cfg.get("git")) } def is_monorepo(repo_root: Path | None = None) -> bool: """Check if the project is configured as a monorepo (has packages in config).""" return get_packages(repo_root) is not None def get_spec_base(package: str | None = None, repo_root: Path | None = None) -> str: """Get the spec directory base path relative to .trellis/. Single-repo: returns "spec" Monorepo with package: returns "spec/" Monorepo without package: returns "spec" (caller should specify package) """ if package and is_monorepo(repo_root): return f"spec/{package}" return "spec" def validate_package(package: str, repo_root: Path | None = None) -> bool: """Check if a package name is valid in this project. Single-repo (no packages configured): always returns True. Monorepo: returns True only if package exists in config.yaml packages. """ packages = get_packages(repo_root) if packages is None: return True # Single-repo, no validation needed return package in packages def resolve_package( task_package: str | None = None, repo_root: Path | None = None, ) -> str | None: """Resolve package from inferred sources with validation. Checks in order: task_package → default_package. Invalid inferred values print a warning to stderr and are skipped. Returns: Resolved package name, or None if no valid package found. Note: CLI --package should be validated separately by the caller (fail-fast with available packages list on error). """ packages = get_packages(repo_root) if packages is None: return None # Single-repo, no package needed # Try task_package (guard against non-string values from malformed JSON) if task_package and isinstance(task_package, str): if task_package in packages: return task_package print( f"Warning: task.json package '{task_package}' not found in config, skipping", file=sys.stderr, ) # Try default_package default = get_default_package(repo_root) if default: if default in packages: return default print( f"Warning: default_package '{default}' not found in config, skipping", file=sys.stderr, ) return None def get_spec_scope(repo_root: Path | None = None) -> list[str] | str | None: """Get session.spec_scope configuration. Returns: list[str]: Package names to include in spec scanning. str: "active_task" to use current task's package. None: No scope configured (scan all packages). """ config = _load_config(repo_root) session = config.get("session") if not isinstance(session, dict): return None scope = session.get("spec_scope") if scope is None: return None if isinstance(scope, str): return scope # e.g. "active_task" if isinstance(scope, list): return [str(s) for s in scope] return None