diff --git a/README.md b/README.md index 82772ee..6d97d56 100644 --- a/README.md +++ b/README.md @@ -51,12 +51,81 @@ Then run: Inside TruShell, type ‘help’ for a list of commands. Type ‘exit’ or Ctrl‑D to quit. -To run from source: -``` - git clone https://github.com/AkshajSinghal/trushell - cd trushell - pip install -e . - PYTHONPATH=. python -m trushell +## Commands + +### Todo + +| Command | Description | +| --- | --- | +| `addtask "" ""` | Add a new todo. | +| `deletetask ` | Delete by the number shown in `showtasks`. | +| `updatetask "" ""` | Update task text and category. | +| `completetask ` | Mark a task as done. | +| `showtasks` | Print the current todo list. | + +### Time + +| Command | Description | +| --- | --- | +| `now` | Show current local time. | +| `time` | Show the configured ASCII clock. | +| `world` | Show saved time zones. | +| `tz list` | List saved time zones. | +| `tz add ` | Add a time zone such as `Europe/London`. | +| `tz remove ` | Remove a saved time zone. | +| `alarm list` | List alarms. | +| `alarm add "" --label "Name"` | Add an alarm. | +| `alarm remove ` | Remove an alarm by ID. | +| `sw start`, `sw pause`, `sw lap`, `sw reset`, `sw show` | Stopwatch controls. | + +### Shell And Settings + +| Command | Description | +| --- | --- | +| `settings` | Change persisted preferences. | +| `edit ` | Open the built-in Textual editor. | +| `cd ` | Change TruShell's current directory. | +| `z [options] [pattern]` | Jump to or list frequently used directories by fuzzy path matching. | +| `help` | Print command help. | +| `exit` or `quit` | Leave the REPL. | + +Unrecognized commands are executed directly through the host OS without shell +operator expansion. Commands containing pipes, redirects, or chained operators +are rejected for now because they need a proper parser before they can be passed +through safely. + +## Storage + +Todos and application preferences are stored in SQLite under the platform's user +data directory. Older JSON state files are migrated into SQLite on first load and +renamed to a `.bak` file so the original settings are not silently discarded. + +## Architecture Notes + +TruShell uses a few terminal libraries, each for a narrow job: + +- Typer owns command parsing and CLI entry points. +- Rich owns formatted terminal output such as tables and styled status text. +- Textual is used only for the full-screen editor, where a widget toolkit is + more appropriate than line-by-line terminal output. + +The main modules are: + +```text +trushell/ + cli.py direct CLI commands + project.py interactive REPL and host-command fallback + todocli.py todo commands + database.py SQLite connection and persistence helpers + settings.py prompt-based preference editor + pyfunny.py jokes, cowsay rendering, and sound selection + chronoterm/ + shell.py time-related commands + state.py SQLite-backed app state with JSON migration + alarms.py alarm scheduling + timezones.py world clock helpers + stopwatch.py stopwatch state + sound.py platform-specific audio fallback ``` Core commands (most useful) diff --git a/tests/test_z.py b/tests/test_z.py new file mode 100644 index 0000000..5ea81f7 --- /dev/null +++ b/tests/test_z.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import time +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from trushell import project +from trushell.chronoterm.state import StateStore + + +def test_frecency_score() -> None: + now = 1_700_000_000.0 + assert project._frecency_score(10, now, now=now) == 10.0 + decayed = project._frecency_score(10, now - 86400.0, now=now) + assert decayed == pytest.approx(10 * 0.95, rel=1e-6) + + +def test_add_z_path_updates_state(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + + directory = tmp_path / "home" + directory.mkdir() + project._add_z_path(str(directory)) + + state = project._load_z_state() + normalized = project._normalize_directory(str(directory)) + assert normalized in state.z_dirs + assert state.z_dirs[normalized]["count"] == 1 + + project._add_z_path(str(directory)) + state = project._load_z_state() + assert state.z_dirs[normalized]["count"] == 2 + + +def test_handle_z_command_add_option(tmp_path: Path, monkeypatch, capsys) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + + directory = tmp_path / "projects" + directory.mkdir() + + assert project._handle_z_command("z", ["--add", str(directory)]) is True + captured = capsys.readouterr() + assert "Added:" in captured.out + + state = project._load_z_state() + normalized = project._normalize_directory(str(directory)) + assert normalized in state.z_dirs + assert state.z_dirs[normalized]["count"] == 1 + + +def test_find_z_matches_prefers_higher_frecency(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + + path_one = tmp_path / "one" + path_two = tmp_path / "two" + path_one.mkdir() + path_two.mkdir() + + state = project._load_z_state() + now = time.time() + state.z_dirs = { + project._normalize_directory(str(path_one)): { + "count": 1, + "last_accessed": now, + }, + project._normalize_directory(str(path_two)): { + "count": 10, + "last_accessed": now - 86400.0 * 10, + }, + } + project._save_z_state(state) + + matches = project._find_z_matches("two", current_only=False, recency_only=False) + assert matches[0][0] == project._normalize_directory(str(path_two)) + + recent_matches = project._find_z_matches("two", current_only=False, recency_only=True) + assert recent_matches[0][0] == project._normalize_directory(str(path_two)) + + +def test_find_z_matches_ignores_missing_directories(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + + existing_path = tmp_path / "exists" + existing_path.mkdir() + missing_path = tmp_path / "missing" + + state = project._load_z_state() + state.z_dirs = { + project._normalize_directory(str(existing_path)): {"count": 1, "last_accessed": time.time()}, + project._normalize_directory(str(missing_path)): {"count": 10, "last_accessed": time.time()}, + } + project._save_z_state(state) + + matches = project._find_z_matches(None, current_only=False, recency_only=False) + assert len(matches) == 1 + assert matches[0][0] == project._normalize_directory(str(existing_path)) + + +def test_cd_command_adds_path_to_z_history(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + project._PREVIOUS_CWD = None + + target = tmp_path / "directory" + target.mkdir() + + monkeypatch.setattr(project.os, "chdir", lambda path: None) + monkeypatch.setattr(project.os, "getcwd", lambda: str(target)) + monkeypatch.setattr(project, "_run_external_command", lambda command, shell, check, cwd=None: SimpleNamespace(returncode=0)) + + command, arguments = project._split_command(f"cd {target}") + assert project._handle_cd_command(command, arguments) is True + + state = project._load_z_state() + normalized = project._normalize_directory(str(target)) + assert normalized in state.z_dirs + assert state.z_dirs[normalized]["count"] == 1 + + +def test_mkdir_command_creates_directories(tmp_path: Path) -> None: + target_a = tmp_path / "test_a" + target_b = tmp_path / "test_b" + + command, arguments = project._split_command(f"mkdir -p {target_a} {target_b}") + assert project._handle_mkdir_command(command, arguments) is True + + assert target_a.exists() and target_a.is_dir() + assert target_b.exists() and target_b.is_dir() + + +def test_cd_command_revisits_existing_path_and_increments_count(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + project._PREVIOUS_CWD = None + + target = tmp_path / "directory" + target.mkdir() + + monkeypatch.setattr(project.os, "chdir", lambda path: None) + monkeypatch.setattr(project.os, "getcwd", lambda: str(target)) + monkeypatch.setattr(project, "_run_external_command", lambda command, shell, check, cwd=None: SimpleNamespace(returncode=0)) + + command, arguments = project._split_command(f"cd {target}") + assert project._handle_cd_command(command, arguments) is True + assert project._handle_cd_command(command, arguments) is True + + state = project._load_z_state() + normalized = project._normalize_directory(str(target)) + assert state.z_dirs[normalized]["count"] == 2 + + +def test_cd_dash_switches_to_previous_directory(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + project._PREVIOUS_CWD = None + + first_dir = tmp_path / "first" + second_dir = tmp_path / "second" + first_dir.mkdir() + second_dir.mkdir() + + cwd_state = {"cwd": str(first_dir)} + + def fake_chdir(path: str) -> None: + cwd_state["cwd"] = str(project._normalize_directory(path)) + + monkeypatch.setattr(project.os, "chdir", fake_chdir) + monkeypatch.setattr(project.os, "getcwd", lambda: cwd_state["cwd"]) + monkeypatch.setattr(project, "_run_external_command", lambda command, shell, check, cwd=None: SimpleNamespace(returncode=0)) + + command, arguments = project._split_command(f"cd {second_dir}") + assert project._handle_cd_command(command, arguments) is True + command, arguments = project._split_command("cd -") + assert project._handle_cd_command(command, arguments) is True + assert cwd_state["cwd"] == project._normalize_directory(str(first_dir)) + + +def test_z_command_updates_current_directory_and_supports_bang_pwd(tmp_path: Path, monkeypatch) -> None: + history_path = tmp_path / "state.json" + monkeypatch.setattr(project, "_Z_STATE_STORE", StateStore(path=history_path)) + + target = tmp_path / "target" + target.mkdir() + + # Add target to z history so z can match it. + project._add_z_path(str(target)) + + cwd_state = {"cwd": str(project._normalize_directory(target))} + + def fake_chdir(path: str) -> None: + cwd_state["cwd"] = project._normalize_directory(path) + + monkeypatch.setattr(project.os, "chdir", fake_chdir) + monkeypatch.setattr(project.os, "getcwd", lambda: cwd_state["cwd"]) + + executed = {} + def fake_run(command, shell, check, cwd=None): + executed["command"] = command + executed["shell"] = shell + executed["cwd"] = cwd + return SimpleNamespace(returncode=0) + + monkeypatch.setattr(project, "_run_external_command", fake_run) + + command, arguments = project._split_command("z target") + assert project._handle_z_command(command, arguments) is True + assert project._CURRENT_DIR == project._normalize_directory(str(target)) + + assert project.parse_and_execute_command("!pwd") is True + assert executed["command"] == "pwd" + assert executed["shell"] is True + assert executed["cwd"] == project._CURRENT_DIR diff --git a/trushell/chronoterm/state.py b/trushell/chronoterm/state.py index de9d31f..86b53d3 100644 --- a/trushell/chronoterm/state.py +++ b/trushell/chronoterm/state.py @@ -23,12 +23,15 @@ class AppState: joke_sound: str = "cow-sound.mp3" version: int = 1 updated_at_iso: str | None = None + z_dirs: dict[str, dict[str, float | int]] | None = None def __post_init__(self) -> None: if self.timezones is None: self.timezones = [] if self.alarms is None: self.alarms = [] + if self.z_dirs is None: + self.z_dirs = {} def touch(self) -> None: self.updated_at_iso = datetime.now().astimezone().isoformat(timespec="seconds") @@ -54,6 +57,7 @@ def load(self) -> AppState: state.joke_sound = file_data.get("joke_sound", "cow-sound.mp3") state.version = file_data.get("version", 1) state.updated_at_iso = file_data.get("updated_at_iso") + state.z_dirs = file_data.get("z_dirs", {}) except FileNotFoundError: return state except Exception: @@ -75,6 +79,7 @@ def save(self, state: AppState) -> None: "joke_sound": state.joke_sound, "version": state.version, "updated_at_iso": state.updated_at_iso, + "z_dirs": state.z_dirs, }, state_file, indent=2, diff --git a/trushell/project.py b/trushell/project.py index c3e6b2f..6305f56 100644 --- a/trushell/project.py +++ b/trushell/project.py @@ -16,6 +16,7 @@ from .settings import launch_settings from .todocli import addtask, delete_todo, update_todo, complete_todo, showtask from .chronoterm.shell import app as chronoterm_app +from .chronoterm.state import StateStore try: import psutil @@ -25,9 +26,226 @@ HELP_TEXT = ( "Available commands: joke, joke_trex, " "addtask, deletetask, updatetask, completetask, showtask, " - "now, time, world, tz, alarm, sw, settings, exit, help" + "now, time, world, tz, alarm, sw, settings, mkdir, z, exit, help" ) +_Z_STATE_STORE = StateStore() +_Z_ENTRY_LIMIT = 1000 +_PREVIOUS_CWD: str | None = None +_CURRENT_DIR: str = os.getcwd() + + +def _normalize_directory(path: str) -> str: + return os.path.normpath(os.path.abspath(os.path.expanduser(path))) + + +def _set_current_dir(path: str) -> None: + global _CURRENT_DIR + _CURRENT_DIR = _normalize_directory(path) + + +def _resolve_cd_target(raw_target: str) -> str: + global _PREVIOUS_CWD + if raw_target == "-": + if _PREVIOUS_CWD is None: + raise ValueError("No previous directory available.") + return _PREVIOUS_CWD + return os.path.expanduser(raw_target) + + +def _ensure_z_state(state): + if getattr(state, "z_dirs", None) is None: + state.z_dirs = {} + return state + + +def _load_z_state(): + state = _Z_STATE_STORE.load() + return _ensure_z_state(state) + + +def _save_z_state(state) -> None: + _ensure_z_state(state) + _Z_STATE_STORE.save(state) + + +def _frecency_score(count: int, last_accessed: float, now: float | None = None) -> float: + if now is None: + now = time.time() + days_since = max(0.0, (now - last_accessed) / 86400) + return count * (0.95 ** days_since) + + +def _is_descendant(path: str, root: str) -> bool: + normalized_path = _normalize_directory(path) + normalized_root = _normalize_directory(root) + if normalized_path == normalized_root: + return True + try: + return os.path.commonpath([normalized_root, normalized_path]) == normalized_root + except ValueError: + return False + + +def _cleanup_z_entries(z_dirs: dict[str, dict[str, float | int]]) -> None: + now = time.time() + valid_entries: list[tuple[str, dict[str, float | int], float]] = [] + + for path, entry in z_dirs.items(): + if not os.path.isdir(path): + continue + count = int(entry.get("count", 0)) + last_accessed = float(entry.get("last_accessed", now)) + score = _frecency_score(count, last_accessed, now) + valid_entries.append((path, entry, score)) + + valid_entries.sort(key=lambda item: (-item[2], -float(item[1].get("last_accessed", 0)))) + kept_entries = valid_entries[:_Z_ENTRY_LIMIT] + + z_dirs.clear() + for path, entry, _ in kept_entries: + z_dirs[path] = entry + + +def _add_z_path(path: str) -> None: + normalized_path = _normalize_directory(path) + if not os.path.isdir(normalized_path): + return + + state = _load_z_state() + entry = state.z_dirs.get(normalized_path, {"count": 0, "last_accessed": 0.0}) + entry["count"] = int(entry.get("count", 0)) + 1 + entry["last_accessed"] = time.time() + state.z_dirs[normalized_path] = entry + _cleanup_z_entries(state.z_dirs) + _save_z_state(state) + + +def _find_z_matches( + pattern: str | None, + current_only: bool, + recency_only: bool, +) -> list[tuple[str, dict[str, float | int], float]]: + state = _load_z_state() + now = time.time() + normalized_pattern = pattern.lower() if pattern else None + cwd = os.getcwd() + matches: list[tuple[str, dict[str, float | int], float]] = [] + + for path, entry in state.z_dirs.items(): + if not os.path.isdir(path): + continue + if current_only and not _is_descendant(path, cwd): + continue + if normalized_pattern and normalized_pattern not in path.lower(): + continue + + count = int(entry.get("count", 0)) + last_accessed = float(entry.get("last_accessed", now)) + score = _frecency_score(count, last_accessed, now) + matches.append((path, entry, score)) + + if recency_only: + matches.sort(key=lambda item: -float(item[1].get("last_accessed", 0))) + else: + matches.sort(key=lambda item: (-item[2], -float(item[1].get("last_accessed", 0)))) + + return matches + + +def _format_z_list(entries: list[tuple[str, dict[str, float | int], float]]) -> list[str]: + lines: list[str] = [] + for path, entry, score in entries: + last_accessed = float(entry.get("last_accessed", 0.0)) + lines.append(f"{score:.4f} {last_accessed:.0f} {path}") + return lines + + +def _handle_z_command(command: str, arguments: list[str]) -> bool: + if command != "z": + return False + + list_only = False + current_only = False + recency_only = False + help_requested = False + add_path: str | None = None + pattern_parts: list[str] = [] + it = iter(arguments) + + for arg in it: + if arg == "-l": + list_only = True + elif arg == "-c": + current_only = True + elif arg == "-r": + recency_only = True + elif arg == "-h": + help_requested = True + elif arg == "--add": + try: + add_path = next(it) + except StopIteration: + typer.secho("❌ --add requires a path.", fg=typer.colors.RED) + return True + elif arg.startswith("-"): + typer.secho(f"❌ Unknown option: {arg}", fg=typer.colors.RED) + return True + else: + pattern_parts.append(arg) + + if help_requested: + typer.echo( + "Usage: z [options] [pattern]\n" + "Options:\n" + " -l list matching directories with scores\n" + " -c only match under current directory\n" + " -r sort by recency instead of frecency\n" + " -h show this help text\n" + " --add manually add a directory to z history\n" + "If no pattern is provided, z lists the top 10 directories by score." + ) + return True + + if add_path: + normalized_path = _normalize_directory(add_path) + if not os.path.isdir(normalized_path): + typer.secho(f"❌ Cannot add path: {normalized_path} is not a directory.", fg=typer.colors.RED) + return True + _add_z_path(normalized_path) + typer.echo(f"Added: {normalized_path}") + return True + + pattern = " ".join(pattern_parts) if pattern_parts else None + matches = _find_z_matches(pattern, current_only=current_only, recency_only=recency_only) + + if not pattern and not list_only: + matches = matches[:10] + + if not matches: + if pattern or list_only: + typer.secho("❌ No matching directories found.", fg=typer.colors.YELLOW) + else: + typer.echo("No directories tracked yet.") + return True + + lines = _format_z_list(matches if list_only or not pattern else matches) + if list_only or not pattern: + for line in lines: + typer.echo(line) + return True + + target = matches[0][0] + try: + os.chdir(target) + _set_current_dir(os.getcwd()) + _add_z_path(os.getcwd()) + _run_external_command(["ls"], shell=False, check=False, cwd=_CURRENT_DIR) + typer.echo(target) + except (FileNotFoundError, NotADirectoryError, PermissionError, OSError) as error: + typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) + return True + def _split_command(user_input: str) -> tuple[str, list[str]]: stripped = user_input.strip() @@ -79,6 +297,9 @@ def _run_external_command( When shell=False, the command must be provided as a list of arguments so it is executed directly without shell interpretation. """ + if cwd is None: + cwd = _CURRENT_DIR + if psutil is None: return subprocess.run(command, shell=shell, check=check, cwd=cwd) @@ -224,6 +445,54 @@ def _handle_edit_command(command: str, arguments: list[str]) -> bool: return True +def _handle_mkdir_command(command: str, arguments: list[str]) -> bool: + if command != "mkdir": + return False + + if not arguments: + typer.secho( + '⚠️ Syntax: mkdir [-p] [ ...]', + fg=typer.colors.YELLOW, + ) + return True + + create_parents = False + directories: list[str] = [] + for arg in arguments: + if arg == "-p": + create_parents = True + else: + directories.append(arg) + + if not directories: + typer.secho( + '⚠️ Syntax: mkdir [-p] [ ...]', + fg=typer.colors.YELLOW, + ) + return True + + for directory in directories: + try: + os.makedirs(os.path.expanduser(directory), exist_ok=create_parents) + except OSError as error: + typer.secho(f"❌ mkdir failed: {error}", fg=typer.colors.RED) + return True + + +def _handle_ls_command(command: str, arguments: list[str]) -> bool: + if command != "ls": + return False + + directory = os.getcwd() if not arguments else os.path.expanduser(arguments[0]) + try: + entries = sorted(os.listdir(directory)) + for entry in entries: + typer.echo(entry) + except OSError as error: + typer.secho(f"❌ ls failed: {error}", fg=typer.colors.RED) + return True + + def _handle_local_command(command: str, arguments: list[str]) -> str: if command == "addtask" and len(arguments) < 2: typer.secho( @@ -238,6 +507,10 @@ def _handle_local_command(command: str, arguments: list[str]) -> str: return "handled" if _handle_todo_command(command, arguments): return "handled" + if _handle_mkdir_command(command, arguments): + return "handled" + if _handle_ls_command(command, arguments): + return "handled" if command == "settings": launch_settings() return "handled" @@ -260,6 +533,7 @@ def _handle_chronoterm_command(raw_command: str, normalized_command: str) -> boo def _handle_cd_command(command: str, arguments: list[str]) -> bool: """Handle cd natively so the shell's working directory changes permanently.""" + global _PREVIOUS_CWD if command != "cd": return False @@ -267,11 +541,17 @@ def _handle_cd_command(command: str, arguments: list[str]) -> bool: typer.secho("Syntax: cd ", fg=typer.colors.YELLOW) return True - target = os.path.expanduser(arguments[0]) - + target_arg = arguments[0] try: + target = _resolve_cd_target(target_arg) + previous = os.getcwd() os.chdir(target) - _run_external_command(["ls"], shell=False, check=False, cwd=os.getcwd()) + _PREVIOUS_CWD = previous + _set_current_dir(os.getcwd()) + _add_z_path(os.getcwd()) + _run_external_command(["ls"], shell=False, check=False, cwd=_CURRENT_DIR) + except ValueError as error: + typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) except (FileNotFoundError, NotADirectoryError, PermissionError) as error: typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) except OSError as error: @@ -304,6 +584,18 @@ def _handle_os_fallback(raw_command: str) -> bool: if not command: return False + if command.startswith("!"): + shell_command = command[1:].strip() + if not shell_command: + typer.secho("❌ No command provided after '!'.", fg=typer.colors.RED) + return True + try: + _run_external_command(shell_command, shell=True, check=False, cwd=_CURRENT_DIR) + except (OSError, subprocess.SubprocessError) as error: + typer.secho("❓ Command not recognized by TruShell or your host OS.", fg=typer.colors.YELLOW) + typer.secho(f"OS fallback error: {error}", fg=typer.colors.RED) + return True + if is_dangerous_command(command): typer.secho( "⚠️ TruShell blocks shell operators and expansions for safety.", @@ -364,6 +656,8 @@ def parse_and_execute_command(raw_command: str) -> bool: if _handle_chronoterm_command(stripped, command): return True + if _handle_z_command(command, arguments): + return True if _handle_cd_command(command, arguments): return True if _handle_edit_command(command, arguments):