From f6a7b89192a6bb02382e8d2b0d6090f7590b3bd6 Mon Sep 17 00:00:00 2001 From: Akshaj Singhal Date: Wed, 3 Jun 2026 11:10:23 +0000 Subject: [PATCH 1/4] refactor(kernel): fix None leak and migrate core utilities to /core --- .gitignore | 4 + .npmignore | 15 -- tests/test_database.py | 10 +- trushell/__main__.py | 4 +- trushell/cli.py | 128 ++----------- trushell/commands/__init__.py | 1 + trushell/commands/chronoterm.py | 45 +++++ trushell/commands/core.py | 91 ++++++++++ trushell/commands/editor.py | 26 +++ trushell/commands/joke.py | 70 ++++++++ trushell/commands/tasks.py | 76 ++++++++ trushell/config/builtin_commands.md | 17 ++ trushell/config/plugins.md | 5 + trushell/core/__init__.py | 1 + trushell/{ => core}/database.py | 3 +- trushell/{model.py => core/models.py} | 0 trushell/{ => core}/settings.py | 4 +- trushell/core/trukernel.py | 250 ++++++++++++++++++++++++++ trushell/plugins/git_enhancer/main.py | 8 + trushell/project.py | 24 +-- trushell/pyfunny.py | 59 ------ trushell/todocli.py | 4 +- trushell/utils/__init__.py | 1 + trushell/utils/logger.py | 41 +++++ trushell/utils/manifest_parser.py | 133 ++++++++++++++ 25 files changed, 803 insertions(+), 217 deletions(-) delete mode 100644 .npmignore create mode 100644 trushell/commands/__init__.py create mode 100644 trushell/commands/chronoterm.py create mode 100644 trushell/commands/core.py create mode 100644 trushell/commands/editor.py create mode 100644 trushell/commands/joke.py create mode 100644 trushell/commands/tasks.py create mode 100644 trushell/config/builtin_commands.md create mode 100644 trushell/config/plugins.md create mode 100644 trushell/core/__init__.py rename trushell/{ => core}/database.py (98%) rename trushell/{model.py => core/models.py} (100%) rename trushell/{ => core}/settings.py (97%) create mode 100644 trushell/core/trukernel.py create mode 100644 trushell/plugins/git_enhancer/main.py delete mode 100644 trushell/pyfunny.py create mode 100644 trushell/utils/__init__.py create mode 100644 trushell/utils/logger.py create mode 100644 trushell/utils/manifest_parser.py diff --git a/.gitignore b/.gitignore index 92680b2..a6b6f27 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,7 @@ dist/ build/ .idea/ .vscode/ + +# Ignore runtime TruShell user configuration and local runtime state +trushell/config/my_command_config.md +.trushell/ diff --git a/.npmignore b/.npmignore deleted file mode 100644 index 68b5234..0000000 --- a/.npmignore +++ /dev/null @@ -1,15 +0,0 @@ -# Exclude Python bytecode and caches from npm bundles -__pycache__/ -*.pyc -*.pyo -*.pyd - -# Exclude development metadata and tests from runtime package -tests/ -.pytest_cache/ -poetry.lock -package-lock.json -pyproject.toml -.gitignore -.npmignore -.github/ diff --git a/tests/test_database.py b/tests/test_database.py index 44360f0..f4c1d64 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,10 +1,10 @@ -from trushell.database import _create_table, get_all_todos, get_db_connection, insert_todo -from trushell.model import Todo +from trushell.core.database import _create_table, get_all_todos, get_db_connection, insert_todo +from trushell.core.models import Todo def test_get_db_connection_returns_fresh_connection(monkeypatch, tmp_path) -> None: db_path = tmp_path / "todos.db" - monkeypatch.setattr("trushell.database.DB_PATH", db_path) + monkeypatch.setattr("trushell.core.database.DB_PATH", db_path) conn_one = get_db_connection() conn_two = get_db_connection() @@ -17,7 +17,7 @@ def test_get_db_connection_returns_fresh_connection(monkeypatch, tmp_path) -> No def test_insert_todo_assigns_sequential_positions(monkeypatch, tmp_path) -> None: db_path = tmp_path / "todos.db" - monkeypatch.setattr("trushell.database.DB_PATH", db_path) + monkeypatch.setattr("trushell.core.database.DB_PATH", db_path) _create_table() insert_todo(Todo(task="first", category="work")) @@ -31,7 +31,7 @@ def test_insert_todo_assigns_sequential_positions(monkeypatch, tmp_path) -> None def test_get_all_todos_works_with_local_connections(monkeypatch, tmp_path) -> None: db_path = tmp_path / "todos.db" - monkeypatch.setattr("trushell.database.DB_PATH", db_path) + monkeypatch.setattr("trushell.core.database.DB_PATH", db_path) _create_table() insert_todo(Todo(task="alpha", category="study")) diff --git a/trushell/__main__.py b/trushell/__main__.py index d5cf0fd..714dd0c 100644 --- a/trushell/__main__.py +++ b/trushell/__main__.py @@ -1,4 +1,4 @@ -from .cli import app +from .cli import app_with_lower if __name__ == "__main__": - app() + app_with_lower() diff --git a/trushell/cli.py b/trushell/cli.py index d7d08b9..b748996 100644 --- a/trushell/cli.py +++ b/trushell/cli.py @@ -1,38 +1,30 @@ from __future__ import annotations -import shlex import sys import typer from . import __version__ +from .core.trukernel import TruKernel from .project import run_interactive_shell -from .pyfunny import joke, joke_trex -from .settings import launch_settings -from .todocli import addtask, completetask, deletetask, showtask, updatetask -from .chronoterm.shell import app as chronoterm_app -app = typer.Typer(name="trushell", help="TruShell: jokes, todos, time, and more.") +app = typer.Typer(name="trushell", help="TruShell manifest-driven launcher.") +kernel = TruKernel() + def app_with_lower() -> None: - """Entry point that normalizes command name to lowercase for case-insensitive invocation.""" - # Normalize the command name to lowercase for case-insensitive behavior - if len(sys.argv) > 0: - sys.argv[0] = sys.argv[0].lower() - # Invoke the main Typer app + """Entry point that normalizes the first argument to lowercase for case-insensitive invocation.""" + if len(sys.argv) > 1: + sys.argv[1] = sys.argv[1].lower() + if sys.argv[1] not in {"--help", "-h", "version"}: + raw = " ".join(sys.argv[1:]) + kernel.execute_command(raw) + return app() -def _invoke_chronoterm_command(command: str) -> None: - try: - chronoterm_app(shlex.split(command)) - except SystemExit: - return - except Exception as error: - typer.secho(f"Error running ChronoTerm command: {error}", fg=typer.colors.RED) - @app.callback(invoke_without_command=True) def main(ctx: typer.Context) -> None: - """Launch the TruShell interactive REPL when no command is provided.""" + """Launch the REPL when no command is provided.""" if ctx.invoked_subcommand is None: run_interactive_shell() @@ -41,99 +33,3 @@ def main(ctx: typer.Context) -> None: def version() -> None: """Show the installed TruShell version.""" typer.echo(__version__) - - -@app.command("joke") -def cli_joke() -> None: - """Tell a random joke with ASCII art.""" - typer.echo(joke()) - - -@app.command("joke-trex") -def cli_joke_trex() -> None: - """Tell a T-Rex joke with sound.""" - typer.echo(joke_trex()) - - -@app.command("addtask") -def cli_addtask(task: str, category: str) -> None: - """Add a todo task.""" - addtask(task, category) - - -@app.command("deletetask") -def cli_deletetask(position: int) -> None: - """Delete a todo task by position.""" - deletetask(position) - - -@app.command("updatetask") -def cli_updatetask(position: int, task: str | None = None, category: str | None = None) -> None: - """Update the text or category of a todo task.""" - updatetask(position, task, category) - - -@app.command("completetask") -def cli_completetask(position: int) -> None: - """Mark a todo task as complete.""" - completetask(position) - - -@app.command("showtasks") -def cli_showtasks() -> None: - """Show all todo tasks.""" - showtask() - - -@app.command("settings") -def cli_settings() -> None: - """Open the interactive settings manager.""" - launch_settings() - - -@app.command("now") -def cli_now() -> None: - """Show the current local time.""" - _invoke_chronoterm_command("now") - - -@app.command("time") -def cli_time() -> None: - """Show the current time in an ASCII clock.""" - _invoke_chronoterm_command("time") - - -@app.command("world") -def cli_world() -> None: - """Show world time for favorite timezones.""" - _invoke_chronoterm_command("world") - - -@app.command("tz") -def cli_tz(action: str = typer.Argument("list", help="list | add | remove"), name: str | None = typer.Argument(None, help="IANA timezone name, e.g. Europe/London")) -> None: - """Manage favorite timezones.""" - command = "tz" - if name: - command += f" {action} {name}" - else: - command += f" {action}" - _invoke_chronoterm_command(command) - - -@app.command("alarm") -def cli_alarm(action: str = typer.Argument("list", help="list | add | remove"), time: str | None = typer.Argument(None, help="Time as HH:MM or YYYY-MM-DD HH:MM"), tz: str | None = typer.Option(None, "--tz", help="Timezone name"), label: str | None = typer.Option(None, "--label", help="Alarm label")) -> None: - """Manage alarms.""" - command = "alarm " + action - if time: - command += f" {time}" - if tz: - command += f" --tz {tz}" - if label: - command += f" --label {label}" - _invoke_chronoterm_command(command) - - -@app.command("sw") -def cli_sw(action: str = typer.Argument("show", help="start | pause | lap | reset | show")) -> None: - """Control the stopwatch.""" - _invoke_chronoterm_command(f"sw {action}") diff --git a/trushell/commands/__init__.py b/trushell/commands/__init__.py new file mode 100644 index 0000000..7ae6482 --- /dev/null +++ b/trushell/commands/__init__.py @@ -0,0 +1 @@ +"""Built-in TruShell commands implemented as manifest-driven wrappers.""" diff --git a/trushell/commands/chronoterm.py b/trushell/commands/chronoterm.py new file mode 100644 index 0000000..c05f9d7 --- /dev/null +++ b/trushell/commands/chronoterm.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import shlex + +from trushell.chronoterm.shell import app as chronoterm_app + + +def _run_chrono_command(raw_command: str) -> None: + try: + chronoterm_app(shlex.split(raw_command)) + except SystemExit: + return + except Exception as error: + print(f"ChronoTerm error: {error}") + + +def run_now(_: str) -> None: + _run_chrono_command("now") + + +def run_time(_: str) -> None: + _run_chrono_command("time") + + +def run_world(_: str) -> None: + _run_chrono_command("world") + + +def run_tz(args: str) -> None: + if not args.strip(): + _run_chrono_command("tz list") + return + _run_chrono_command(f"tz {args.strip()}") + + +def run_alarm(args: str) -> None: + if not args.strip(): + _run_chrono_command("alarm list") + return + _run_chrono_command(f"alarm {args.strip()}") + + +def run_sw(args: str) -> None: + command = "sw" if not args.strip() else f"sw {args.strip()}" + _run_chrono_command(command) diff --git a/trushell/commands/core.py b/trushell/commands/core.py new file mode 100644 index 0000000..42807f1 --- /dev/null +++ b/trushell/commands/core.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import os +import subprocess + + +def run_help(_: str) -> None: + """Display available commands by reading the manifest registry.""" + # Import here to avoid circular dependency if trukernel imports core + from trushell.core.trukernel import get_kernel + kernel = get_kernel() + cmds = sorted(kernel.registry.keys()) + print("Available commands:") + # Print in columns + col_width = max(len(c) for c in cmds) + 2 + cols = 4 + for i, cmd in enumerate(cmds): + print(f" {cmd:<{col_width}}", end="") + if (i + 1) % cols == 0: + print() + if len(cmds) % cols != 0: + print() + print("\nType 'help ' for more info (coming soon).") + + +def run_exit(_: str) -> str: + """Signal TruKernel to exit the REPL loop cleanly. + + Returns a special sentinel string instead of raising SystemExit. + The kernel checks for this return value and breaks its loop. + """ + return "__TRUSHELL_EXIT__" + + +def run_settings(args: str) -> None: + """Open the TruShell settings manager.""" + try: + from trushell.core.settings import launch_settings + launch_settings() + except ImportError: + print("Settings module not available.") + except Exception as e: + print(f"Settings error: {e}") + + +def run_os_passthrough(args: str) -> int: + """Generic wrapper that passes args directly to the OS shell. + + Args: + args: The full argument string after the command name. + e.g., for 'ls -la /tmp', args = '-la /tmp' + + Returns: + The subprocess return code (0 = success). + """ + if not args: + print("Usage: [args]") + return 1 + try: + result = subprocess.run(args, shell=True) + return result.returncode + except FileNotFoundError: + print(f"Command not found in PATH.") + return 127 + except Exception as e: + print(f"OS command error: {e}") + return 1 + + +def run_cd_command(args: str) -> None: + """Change directory. Updates Python process CWD so subsequent + os_passthrough commands inherit the new directory. + + Note: This does NOT update the terminal emulator's displayed path. + That is an inherent limitation of Python-based shells. + """ + if not args: + target = os.path.expanduser("~") + else: + target = os.path.expandvars(os.path.expanduser(args.strip())) + + try: + os.chdir(target) + # Print new path so user knows cd succeeded + print(os.getcwd()) + except FileNotFoundError: + print(f"cd: no such file or directory: {target}") + except PermissionError: + print(f"cd: permission denied: {target}") + except NotADirectoryError: + print(f"cd: not a directory: {target}") \ No newline at end of file diff --git a/trushell/commands/editor.py b/trushell/commands/editor.py new file mode 100644 index 0000000..d18c639 --- /dev/null +++ b/trushell/commands/editor.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from pathlib import Path + + +def run_edit_command(args: str) -> None: + """Open a file in the TruShell editor from a manifest-driven command.""" + if not args.strip(): + print("Usage: edit ") + return + + file_path = Path(args.strip()).expanduser() + initial_text = "" + if file_path.exists(): + try: + initial_text = file_path.read_text(encoding="utf-8") + except OSError as error: + print(f"Editor error: {error}") + return + + try: + from trushell.project import TruShellEditor + + TruShellEditor(str(file_path), initial_text=initial_text).run() + except Exception as error: + print(f"Editor error: {error}") diff --git a/trushell/commands/joke.py b/trushell/commands/joke.py new file mode 100644 index 0000000..3dda450 --- /dev/null +++ b/trushell/commands/joke.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Tuple + +import cowsay +import pyjokes + + +def _sound_path(filename: str) -> Path: + return Path(__file__).resolve().parents[1] / "chronoterm" / "sounds" / filename + + +def _play_sound(filename: str) -> None: + try: + # lazy import to avoid optional dependency at kernel import time + from trushell.chronoterm.sound import play_alarm + + sound_path = _sound_path(filename) + if not sound_path.exists(): + return + play_alarm() + except Exception: + # best-effort: do not raise from sound playing + return + + +def _joke_preferences() -> Tuple[str, str]: + try: + from trushell.chronoterm.state import StateStore + + state = StateStore().load() + return state.joke_character or "cow", state.joke_sound or "cow-sound.mp3" + except Exception: + return "cow", "cow-sound.mp3" + + +def run_joke_command(_: str) -> str: + """Return a formatted joke string (cow art by default).""" + joke_text = pyjokes.get_joke() + character_name, sound_file = _joke_preferences() + _play_sound(sound_file) + speaker = getattr(cowsay, character_name, cowsay.cow) + return speaker(joke_text) + + +def run_joke_trex_command(_: str) -> str: + """Return a T-Rex joke string.""" + joke_text = pyjokes.get_joke() + _play_sound("trex-sound.mp3") + return cowsay.trex(joke_text) +from __future__ import annotations + +from trushell.pyfunny import joke, joke_trex + + +def run_joke_command(_: str) -> None: + """Tell a random joke with the default TruShell humor engine.""" + try: + print(joke()) + except Exception as error: + print(f"Joke error: {error}") + + +def run_joke_trex_command(_: str) -> None: + """Tell a T-Rex joke with sound.""" + try: + print(joke_trex()) + except Exception as error: + print(f"Joke error: {error}") diff --git a/trushell/commands/tasks.py b/trushell/commands/tasks.py new file mode 100644 index 0000000..0eb4e05 --- /dev/null +++ b/trushell/commands/tasks.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from typing import Callable + +from trushell.core.database import complete_todo, get_all_todos, insert_todo +from trushell.core.models import Todo + + +def add_task(args: str) -> None: + """Add a new task to the todo list. The full remainder is treated as the task.""" + if not args.strip(): + print("Usage: task add ") + return + + task_text = args.strip() + todo = Todo(task=task_text, category="General") + insert_todo(todo) + print("Task added.") + + +def show_tasks(_: str) -> None: + """Display the current todo list.""" + tasks = get_all_todos() + if not tasks: + print("No tasks found.") + return + + for index, task in enumerate(tasks, start=1): + status = "✅" if task.status == 2 else "❌" + print(f"{index}. {task.task} [{task.category}] {status}") + + +def complete_task(args: str) -> None: + """Mark a todo item as complete.""" + if not args.strip() or not args.strip().isdigit(): + print("Usage: task done ") + return + + index = int(args.strip()) - 1 + try: + complete_todo(index) + print("Task completed.") + except Exception as error: + print(f"Task error: {error}") + + +def list_tasks(_: str) -> None: + """Alias for show_tasks.""" + show_tasks("") + + +def run_task_command(args: str) -> None: + """Dispatch task subcommands from the manifest-driven task wrapper.""" + subcommands: dict[str, Callable[[str], None]] = { + "add": add_task, + "show": show_tasks, + "done": complete_task, + "list": list_tasks, + } + + if not args.strip(): + print("Usage: task [options]") + return + + parts = args.split(maxsplit=1) + subcmd = parts[0].lower() + subargs = parts[1] if len(parts) > 1 else "" + + handler = subcommands.get(subcmd) + if handler: + try: + handler(subargs) + except Exception as error: + print(f"Task error: {error}") + else: + print(f"Unknown task subcommand: {subcmd}") diff --git a/trushell/config/builtin_commands.md b/trushell/config/builtin_commands.md new file mode 100644 index 0000000..3c26636 --- /dev/null +++ b/trushell/config/builtin_commands.md @@ -0,0 +1,17 @@ +# Built-in command registry for TruShell. +# Each entry is one line and must include {cmd: name}, "function()", and [path]. + +{cmd: help}; "run_help()"; [trushell/commands/core.py]; +{cmd: exit}; "run_exit()"; [trushell/commands/core.py]; +{cmd: edit}; "run_edit_command()"; [trushell/commands/editor.py]; +{cmd: settings}; "run_settings()"; [trushell/commands/core.py]; +{cmd: task}; "run_task_command()"; [trushell/commands/tasks.py]; +{cmd: joke}; "run_joke_command()"; [trushell/commands/joke.py]; +{cmd: joke-trex}; "run_joke_trex_command()"; [trushell/commands/joke.py]; +{cmd: now}; "run_now()"; [trushell/commands/chronoterm.py]; +{cmd: time}; "run_time()"; [trushell/commands/chronoterm.py]; +{cmd: world}; "run_world()"; [trushell/commands/chronoterm.py]; +{cmd: tz}; "run_tz()"; [trushell/commands/chronoterm.py]; +{cmd: alarm}; "run_alarm()"; [trushell/commands/chronoterm.py]; +{cmd: sw}; "run_sw()"; [trushell/commands/chronoterm.py]; + diff --git a/trushell/config/plugins.md b/trushell/config/plugins.md new file mode 100644 index 0000000..393315a --- /dev/null +++ b/trushell/config/plugins.md @@ -0,0 +1,5 @@ +# User-installed plugins are registered here. +# Plugins can optionally run immediately with {lifecycle: on_load}. + +{cmd: gstatus}; "plugin_init()"; [trushell/plugins/git_enhancer/main.py]; {lifecycle: on_load}; +{cmd: theme}; "load_theme()"; [~/.trushell/plugins/theme_engine/loader.py]; {version:1.2}; diff --git a/trushell/core/__init__.py b/trushell/core/__init__.py new file mode 100644 index 0000000..f7e06b1 --- /dev/null +++ b/trushell/core/__init__.py @@ -0,0 +1 @@ +"""Core TruShell kernel package.""" diff --git a/trushell/database.py b/trushell/core/database.py similarity index 98% rename from trushell/database.py rename to trushell/core/database.py index 594b1fa..886092b 100644 --- a/trushell/database.py +++ b/trushell/core/database.py @@ -6,7 +6,7 @@ from platformdirs import user_data_dir -from .model import Todo +from trushell.core.models import Todo APP_NAME = "TruShell" APP_AUTHOR = "AkshajSinghal" @@ -14,6 +14,7 @@ DATA_DIR.mkdir(parents=True, exist_ok=True) DB_PATH = DATA_DIR / "todos.db" + def get_db_connection() -> sqlite3.Connection: return sqlite3.connect(DB_PATH, check_same_thread=False) diff --git a/trushell/model.py b/trushell/core/models.py similarity index 100% rename from trushell/model.py rename to trushell/core/models.py diff --git a/trushell/settings.py b/trushell/core/settings.py similarity index 97% rename from trushell/settings.py rename to trushell/core/settings.py index a61f213..fb369d7 100644 --- a/trushell/settings.py +++ b/trushell/core/settings.py @@ -5,7 +5,7 @@ import typer -from .chronoterm.state import StateStore +from trushell.chronoterm.state import StateStore COMMANDS = ["time", "world", "joke"] TIME_TEMPLATES = [ @@ -54,7 +54,7 @@ def _select_option(prompt: str, options: Sequence[str]) -> str | None: def _available_sound_files() -> list[str]: - sounds_dir = Path(__file__).resolve().parent / "chronoterm" / "sounds" + sounds_dir = Path(__file__).resolve().parents[1] / "chronoterm" / "sounds" if not sounds_dir.exists(): return [] return sorted([path.name for path in sounds_dir.iterdir() if path.is_file()]) diff --git a/trushell/core/trukernel.py b/trushell/core/trukernel.py new file mode 100644 index 0000000..012368b --- /dev/null +++ b/trushell/core/trukernel.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import importlib.util +import os +import subprocess +import sys +from pathlib import Path +from typing import Any + +from ..utils.logger import get_logger +from ..utils.manifest_parser import parse_manifest + +# Sentinel value returned when user types 'exit' +EXIT_SENTINEL = "__TRUSHELL_EXIT__" + + +_kernel_instance: TruKernel | None = None + +class TruKernel: + """Central manifest-driven dispatch engine for TruShell.""" + + def __init__(self) -> None: + global _kernel_instance + self.logger = get_logger(__name__) + self.base_dir = Path(__file__).resolve().parents[2] + self.registry: dict[str, dict[str, Any]] = {} + self._loaded_modules: dict[str, Any] = {} + self._load_manifests() + _kernel_instance = self # Store reference for get_kernel() + + # ------------------------------------------------------------------ # + # Manifest Loading + # ------------------------------------------------------------------ # + def _manifest_path(self, filename: str) -> Path: + return self.base_dir / "trushell" / "config" / filename + + def _load_manifests(self) -> None: + builtins = parse_manifest( + self._manifest_path("builtin_commands.md"), source="builtin" + ) + plugins = parse_manifest( + self._manifest_path("plugins.md"), source="plugin" + ) + aliases = parse_manifest( + self._manifest_path("my_command_config.md"), source="alias" + ) + + for entry in builtins: + self._register(entry) + for entry in plugins: + self._register(entry) + for entry in aliases: + self._register(entry, override=True) + + # Run on_load plugins during init + for entry in plugins: + if entry["meta"].get("lifecycle") == "on_load": + self._execute_entry(entry, args="", init=True) + + def _register(self, entry: dict[str, Any], override: bool = False) -> None: + command = entry["command"] + if override or command not in self.registry: + self.registry[command] = entry + self.logger.debug( + "Registered '%s' from %s", command, entry.get("source") + ) + else: + self.logger.debug( + "Command '%s' already registered from %s; skipping %s", + command, + self.registry[command].get("source"), + entry.get("source"), + ) + + # ------------------------------------------------------------------ # + # Module Importing (Lazy + Cached) + # ------------------------------------------------------------------ # + def _resolve_module_path(self, raw_path: str) -> Path: + path = Path(raw_path).expanduser() + if path.is_absolute(): + return path + return (self.base_dir / path).resolve() + + def _import_module(self, path: str): + resolved = self._resolve_module_path(path) + if not resolved.exists(): + raise FileNotFoundError(f"Module not found: {resolved}") + + # Use resolved path string as key — no hash collision risk + module_key = str(resolved) + if module_key in self._loaded_modules: + return self._loaded_modules[module_key] + + spec = importlib.util.spec_from_file_location(module_key, resolved) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load spec: {resolved}") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore[attr-defined] + self._loaded_modules[module_key] = module + self.logger.debug("Loaded module: %s", resolved) + return module + + # ------------------------------------------------------------------ # + # Command Execution + # ------------------------------------------------------------------ # + def _execute_entry( + self, entry: dict[str, Any], args: str = "", init: bool = False + ) -> str | bool: + """Execute a registered command entry. + + Returns: + EXIT_SENTINEL – user typed 'exit', REPL should stop + True – command executed successfully + False – command failed (logged, error printed) + """ + command = entry["command"] + try: + module = self._import_module(entry["path"]) + func = getattr(module, entry["function"]) + except (FileNotFoundError, ImportError, AttributeError) as err: + if not init: + print("Command not available. Check manifest and module path.") + self.logger.warning( + "Failed to load '%s' from %s: %s", command, entry["path"], err + ) + return False + + try: + result = func(args) + # Check for exit sentinel + if result == EXIT_SENTINEL: + self.logger.info("Exit command received") + return EXIT_SENTINEL + + # Only print if the function explicitly returned a value + # This prevents the kernel from printing 'None' when functions + # perform their own prints and return None. + if result is not None: + print(result) + + self.logger.debug("Dispatched '%s' → %s", command, entry["path"]) + return True + except SystemExit: + # Should never happen if commands use sentinel, but guard anyway + self.logger.info("SystemExit caught from '%s'", command) + return EXIT_SENTINEL + except Exception as err: + self.logger.exception("Unhandled exception in '%s'", command) + print(f"Command error: {err}") + return False + + def execute_command(self, raw_command: str) -> str | bool: + """Parse and dispatch a single user input line. + + Returns: + EXIT_SENTINEL – stop the REPL loop + True – command handled (success or logged failure) + False – command not found AND os passthrough also failed + """ + trimmed = raw_command.strip() + if not trimmed: + return True # blank line is fine + + parts = trimmed.split(maxsplit=1) + command = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + # 1. Try manifest registry + entry = self.registry.get(command) + if entry is not None: + return self._execute_entry(entry, args=args) + + # 2. Fall back to OS passthrough for unknown commands + return self._os_passthrough(command, args) + + # ------------------------------------------------------------------ # + # OS Passthrough Fallback + # ------------------------------------------------------------------ # + def _os_passthrough(self, command: str, args: str) -> bool: + """Try to run an unregistered command via the host OS shell. + + Special-cases 'cd' because it must change the Python process CWD. + """ + full_cmd = f"{command} {args}".strip() + + if command == "cd": + return self._handle_cd(args) + + try: + result = subprocess.run( + full_cmd, + shell=True, + capture_output=True, + text=True, + ) + if result.stdout: + print(result.stdout, end="") + if result.returncode != 0: + stderr = result.stderr.strip() + if result.returncode == 127: + print("Command not found. Type 'help' for available commands.") + elif stderr: + print(stderr, end="", file=sys.stderr) + return False + if result.stderr: + print(result.stderr, end="", file=sys.stderr) + return True + except FileNotFoundError: + print(f"Command not found: {command}") + self.logger.warning("OS command not found: %s", command) + return False + except Exception as err: + print(f"OS command error: {err}") + self.logger.exception("OS passthrough failed: %s", full_cmd) + return False + + def _handle_cd(self, args: str) -> bool: + """Change directory in the Python process so subprocesses inherit it.""" + target = os.path.expandvars(os.path.expanduser(args.strip())) if args else os.path.expanduser("~") + try: + os.chdir(target) + print(os.getcwd()) # Visual feedback that cd worked + return True + except FileNotFoundError: + print(f"cd: no such file or directory: {target}") + return False + except PermissionError: + print(f"cd: permission denied: {target}") + return False + except NotADirectoryError: + print(f"cd: not a directory: {target}") + return False + + # ------------------------------------------------------------------ # + # Public Helpers + # ------------------------------------------------------------------ # + def list_commands(self) -> list[str]: + return sorted(self.registry) + + +def get_kernel() -> TruKernel: + """Return the global TruKernel instance. + + Used by commands like help that need a live registry. + """ + global _kernel_instance + if _kernel_instance is None: + _kernel_instance = TruKernel() + return _kernel_instance \ No newline at end of file diff --git a/trushell/plugins/git_enhancer/main.py b/trushell/plugins/git_enhancer/main.py new file mode 100644 index 0000000..093986c --- /dev/null +++ b/trushell/plugins/git_enhancer/main.py @@ -0,0 +1,8 @@ +from __future__ import annotations + + +def plugin_init(_: str) -> None: + """Sample plugin initializer for the TruShell manifest-driven architecture.""" + # This placeholder plugin is intentionally lightweight. Actual plugin logic + # should handle its own errors and user-facing messages. + return diff --git a/trushell/project.py b/trushell/project.py index c9fb100..2664d20 100644 --- a/trushell/project.py +++ b/trushell/project.py @@ -277,27 +277,21 @@ def _handle_os_fallback(raw_command: str) -> bool: def run_interactive_shell() -> None: """Persistent REPL loop for the TruShell core.""" + from .core.trukernel import EXIT_SENTINEL, TruKernel + + kernel = TruKernel() + typer.secho("Entering TruShell. Type 'exit' to quit.", fg=typer.colors.CYAN) while True: try: - raw_command, command, argument = _prompt_command() + raw_command, command, _ = _prompt_command() except (KeyboardInterrupt, EOFError): typer.echo("") break - local_result = _handle_local_command(command, argument) - if local_result == "exit": + result = kernel.execute_command(raw_command) + if result == EXIT_SENTINEL: break - if local_result == "handled": - continue - if _handle_chronoterm_command(raw_command, command): - continue - if _handle_cd_command(raw_command): - continue - if _handle_edit_command(raw_command): - continue - if _handle_os_fallback(raw_command): - continue - - typer.secho(f"Unknown command: {command}", fg=typer.colors.RED) + if result is False: + typer.secho(f"Unknown command: {command}", fg=typer.colors.RED) diff --git a/trushell/pyfunny.py b/trushell/pyfunny.py deleted file mode 100644 index 32c0c1d..0000000 --- a/trushell/pyfunny.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -import cowsay -import pyjokes -import typer - -from .chronoterm.state import StateStore -from .chronoterm.sound import play_alarm - -DEFAULT_JOKE_CHARACTER = "cow" -DEFAULT_JOKE_SOUND = "cow-sound.mp3" - - -def _sound_path(filename: str) -> Path: - return Path(__file__).resolve().parent / "chronoterm" / "sounds" / filename - - -def _play_sound(filename: str) -> None: - sound_path = _sound_path(filename) - - if not sound_path.exists(): - typer.secho(f"Sound file missing: {sound_path}", fg=typer.colors.YELLOW) - return - - # Note: Your play_alarm() currently plays a system beep/tone. - # If you want it to play specific MP3s, you'd need to update play_alarm - # to accept a file path. For now, this just triggers the alarm sound - # as a notification that a joke is coming. - try: - play_alarm() - except Exception: - typer.secho("Unable to play sound. Continuing without audio.", fg=typer.colors.YELLOW) - - -def _joke_preferences() -> tuple[str, str]: - state = StateStore().load() - return state.joke_character or DEFAULT_JOKE_CHARACTER, state.joke_sound or DEFAULT_JOKE_SOUND - - -def _render_joke(character_name: str, text: str) -> str: - speaker = getattr(cowsay, character_name, None) - if not callable(speaker): - speaker = cowsay.cow - return speaker(text) - - -def joke() -> str: - joke_text = pyjokes.get_joke() - character_name, sound_file = _joke_preferences() - _play_sound(sound_file) - return _render_joke(character_name, joke_text) - - -def joke_trex() -> str: - joke_text = pyjokes.get_joke() - _play_sound("trex-sound.mp3") - return cowsay.trex(joke_text) \ No newline at end of file diff --git a/trushell/todocli.py b/trushell/todocli.py index bb13e4d..a70359f 100644 --- a/trushell/todocli.py +++ b/trushell/todocli.py @@ -4,8 +4,8 @@ from rich.console import Console from rich.table import Table -from .database import complete_todo, delete_todo, get_all_todos, insert_todo, update_todo -from .model import Todo +from trushell.core.database import complete_todo, delete_todo, get_all_todos, insert_todo, update_todo +from trushell.core.models import Todo console = Console() app = typer.Typer(name="todo", help="Manage todo tasks.") diff --git a/trushell/utils/__init__.py b/trushell/utils/__init__.py new file mode 100644 index 0000000..e65caa3 --- /dev/null +++ b/trushell/utils/__init__.py @@ -0,0 +1 @@ +"""Utility helpers for TruShell.""" diff --git a/trushell/utils/logger.py b/trushell/utils/logger.py new file mode 100644 index 0000000..774d58c --- /dev/null +++ b/trushell/utils/logger.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + + +def get_logger(name: str = "trushell") -> logging.Logger: + """Create or return the centralized TruShell logger.""" + logger = logging.getLogger(name) + if logger.handlers: + return logger + + log_dir = Path.home() / ".trushell" / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / "trushell.log" + + formatter = logging.Formatter( + "[%(asctime)s] %(levelname)s | %(module)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + file_handler = RotatingFileHandler( + log_file, + maxBytes=5 * 1024 * 1024, + backupCount=3, + encoding="utf-8", + ) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.WARNING) + console_handler.setFormatter(formatter) + + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(console_handler) + logger.propagate = False + + return logger diff --git a/trushell/utils/manifest_parser.py b/trushell/utils/manifest_parser.py new file mode 100644 index 0000000..5cbfe77 --- /dev/null +++ b/trushell/utils/manifest_parser.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any + +from .logger import get_logger + + +def _strip_quotes(value: str) -> str: + value = value.strip() + if value.startswith('"') and value.endswith('"'): + value = value[1:-1].strip() + return value + + +def _strip_brackets(value: str) -> str: + value = value.strip() + if value.startswith("[") and value.endswith("]"): + value = value[1:-1].strip() + return value + + +def _parse_meta_token(token: str) -> tuple[str, str] | None: + content = token.strip()[1:-1].strip() + if ":" not in content: + return None + key, value = content.split(":", 1) + return key.strip().lower(), value.strip() + + +def parse_manifest(manifest_path: Path, source: str = "manifest") -> list[dict[str, Any]]: + """Parse a manifest file and return a list of validated command entries.""" + logger = get_logger() + entries: list[dict[str, Any]] = [] + + if not manifest_path.exists(): + logger.warning("Manifest file not found: %s", manifest_path) + return entries + + try: + with manifest_path.open("r", encoding="utf-8") as handle: + lines = handle.readlines() + except OSError as error: + logger.warning("Unable to read manifest %s: %s", manifest_path, error) + return entries + + for line_number, raw_line in enumerate(lines, start=1): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + + parts = [part.strip() for part in line.split(";") if part.strip()] + command: str | None = None + function: str | None = None + path: str | None = None + meta: dict[str, str] = {} + malformed = False + + for part in parts: + if part.startswith("#"): + break + if part.startswith("{") and part.endswith("}"): + parsed = _parse_meta_token(part) + if parsed is None: + logger.warning( + "Malformed metadata in %s at line %s: %s", + manifest_path, + line_number, + part, + ) + malformed = True + break + key, value = parsed + if key in {"cmd", "alias"}: + command = value.lower() + else: + meta[key] = value + elif part.startswith('"') and part.endswith('"'): + function_name = _strip_quotes(part) + function_name = function_name.removesuffix("()") + if not function_name: + logger.warning( + "Empty function name in %s at line %s: %s", + manifest_path, + line_number, + part, + ) + malformed = True + break + function = function_name + elif part.startswith("[") and part.endswith("]"): + raw_path = _strip_brackets(part) + if not raw_path: + logger.warning( + "Empty path in %s at line %s: %s", + manifest_path, + line_number, + part, + ) + malformed = True + break + path = os.path.expanduser(raw_path) + else: + logger.warning( + "Unknown manifest token in %s at line %s: %s", + manifest_path, + line_number, + part, + ) + malformed = True + break + + if malformed or command is None or function is None or path is None: + logger.warning( + "Skipping invalid manifest entry in %s at line %s.", + manifest_path, + line_number, + ) + continue + + entries.append( + { + "command": command, + "function": function, + "path": path, + "meta": meta, + "source": source, + } + ) + + return entries From 860f6d0f6bf1b3697d6442032b145d69b6e3d125 Mon Sep 17 00:00:00 2001 From: Elian Thorne <289383015+ElianThorne@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:49:12 +0000 Subject: [PATCH 2/4] refactor: complete migration to TruKernel architecture --- README.md | 2 +- package-lock.json | 8 +- tests/test_cli.py | 28 +- tests/test_sound.py | 2 +- tests/test_state_path.py | 2 +- trushell/chronoterm/__init__.py | 22 -- trushell/chronoterm/__main__.py | 13 - trushell/chronoterm/alarms.py | 185 ----------- trushell/chronoterm/clock_ascii.py | 69 ---- trushell/chronoterm/shell.py | 189 ----------- trushell/chronoterm/stopwatch.py | 67 ---- trushell/chronoterm/timezones.py | 109 ------- trushell/cli.py | 299 +++++++++++++++++- trushell/commands/chronoterm.py | 62 ++-- trushell/commands/editor.py | 2 +- trushell/commands/joke.py | 27 +- trushell/commands/tasks.py | 39 ++- trushell/core/settings.py | 4 +- trushell/project.py | 297 ----------------- trushell/{chronoterm => }/sound.py | 13 +- .../{chronoterm => }/sounds/bruh-sound.mp3 | Bin .../{chronoterm => }/sounds/bruh-sound.wav | Bin .../{chronoterm => }/sounds/cow-sound.mp3 | Bin .../{chronoterm => }/sounds/cow-sound.wav | Bin .../{chronoterm => }/sounds/faaah-sound.mp3 | Bin .../{chronoterm => }/sounds/faaah-sound.wav | Bin .../{chronoterm => }/sounds/trex-sound.mp3 | Bin .../{chronoterm => }/sounds/trex-sound.wav | Bin .../sounds/we-do-not-care-sound.mp3 | Bin .../sounds/we-do-not-care-sound.wav | Bin trushell/{chronoterm => }/state.py | 0 trushell/todocli.py | 63 ---- 32 files changed, 399 insertions(+), 1103 deletions(-) delete mode 100644 trushell/chronoterm/__init__.py delete mode 100644 trushell/chronoterm/__main__.py delete mode 100644 trushell/chronoterm/alarms.py delete mode 100644 trushell/chronoterm/clock_ascii.py delete mode 100644 trushell/chronoterm/shell.py delete mode 100644 trushell/chronoterm/stopwatch.py delete mode 100644 trushell/chronoterm/timezones.py delete mode 100644 trushell/project.py rename trushell/{chronoterm => }/sound.py (93%) rename trushell/{chronoterm => }/sounds/bruh-sound.mp3 (100%) rename trushell/{chronoterm => }/sounds/bruh-sound.wav (100%) rename trushell/{chronoterm => }/sounds/cow-sound.mp3 (100%) rename trushell/{chronoterm => }/sounds/cow-sound.wav (100%) rename trushell/{chronoterm => }/sounds/faaah-sound.mp3 (100%) rename trushell/{chronoterm => }/sounds/faaah-sound.wav (100%) rename trushell/{chronoterm => }/sounds/trex-sound.mp3 (100%) rename trushell/{chronoterm => }/sounds/trex-sound.wav (100%) rename trushell/{chronoterm => }/sounds/we-do-not-care-sound.mp3 (100%) rename trushell/{chronoterm => }/sounds/we-do-not-care-sound.wav (100%) rename trushell/{chronoterm => }/state.py (100%) delete mode 100644 trushell/todocli.py diff --git a/README.md b/README.md index 4dc270d..2b88409 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > **A modern, Linux-native productivity shell that integrates seamlessly with your workflow** -TruShell is not just another terminal—it's a **productivity powerhouse** that combines task management, time utilities, jokes for breaks, and a native shell experience. Built for developers who want to stay in flow without switching contexts. +TruShell is not just another terminal, it's a **productivity powerhouse** that combines task management, time utilities, jokes for breaks, and a native shell experience. Built for developers who want to stay in flow without switching contexts. ## Why TruShell? diff --git a/package-lock.json b/package-lock.json index 41f5a2a..7874bb5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,12 +7,8 @@ "": { "name": "@truos/trushell", "version": "1.0.2", - "license": "Apache-2.0", - "bin": { - "truos": "bin/index.js", - "TruOS": "bin/index.js", - "trushell": "bin/index.js" - } + "hasInstallScript": true, + "license": "Apache-2.0" } } } diff --git a/tests/test_cli.py b/tests/test_cli.py index 5d54025..968d30f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,8 +4,8 @@ from typer.testing import CliRunner -from trushell.cli import app -from trushell.project import ( +from trushell.cli import ( + app, TruShellEditor, _handle_cd_command, _handle_edit_command, @@ -40,7 +40,7 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> subprocess.Com calls["cwd"] = cwd return subprocess.CompletedProcess(args=command, returncode=0) - monkeypatch.setattr("trushell.project._run_external_command", fake_run) + monkeypatch.setattr("trushell.cli._run_external_command", fake_run) assert _handle_os_fallback("pwd") is True assert calls == {"command": "pwd", "shell": True, "check": False, "cwd": os.getcwd()} @@ -78,10 +78,10 @@ def cpu_percent(self, interval=None) -> float: def memory_info(self): return SimpleNamespace(rss=45 * 1024 * 1024) - monkeypatch.setattr("trushell.project.subprocess.Popen", FakePopen) - monkeypatch.setattr("trushell.project.psutil.Process", FakeProcess) + monkeypatch.setattr("trushell.cli.subprocess.Popen", FakePopen) + monkeypatch.setattr("trushell.cli.psutil.Process", FakeProcess) monkeypatch.setattr( - "trushell.project.typer.secho", + "trushell.cli.typer.secho", lambda message, fg=None: stats.append((message, fg)), ) @@ -105,8 +105,8 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> SimpleNamespac calls["ls_cwd"] = cwd return SimpleNamespace(returncode=0) - monkeypatch.setattr("trushell.project.os.chdir", fake_chdir) - monkeypatch.setattr("trushell.project._run_external_command", fake_run) + monkeypatch.setattr("trushell.cli.os.chdir", fake_chdir) + monkeypatch.setattr("trushell.cli._run_external_command", fake_run) assert _handle_cd_command("cd /tmp") is True assert calls == {"chdir": "/tmp", "ls_command": "ls", "ls_shell": True, "ls_check": False, "ls_cwd": os.getcwd()} @@ -123,9 +123,9 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> SimpleNamespac calls["ls_cwd"] = cwd return SimpleNamespace(returncode=0) - monkeypatch.setattr("trushell.project.os.path.expanduser", lambda path: "/home/test") - monkeypatch.setattr("trushell.project.os.chdir", fake_chdir) - monkeypatch.setattr("trushell.project.subprocess.run", fake_run) + monkeypatch.setattr("trushell.cli.os.path.expanduser", lambda path: "/home/test") + monkeypatch.setattr("trushell.cli.os.chdir", fake_chdir) + monkeypatch.setattr("trushell.cli.subprocess.run", fake_run) assert _handle_cd_command("cd") is True assert calls == {} @@ -135,7 +135,7 @@ def test_addtask_missing_arguments_is_blocked(monkeypatch) -> None: messages = [] monkeypatch.setattr( - "trushell.project.typer.secho", + "trushell.cli.typer.secho", lambda message, fg=None: messages.append((message, fg)), ) @@ -146,7 +146,7 @@ def test_edit_requires_filename(monkeypatch) -> None: messages = [] monkeypatch.setattr( - "trushell.project.typer.secho", + "trushell.cli.typer.secho", lambda message, fg=None: messages.append((message, fg)), ) @@ -168,7 +168,7 @@ def __init__(self, filename: str, initial_text: str) -> None: def run(self) -> None: calls["ran"] = True - monkeypatch.setattr("trushell.project.TruShellEditor", FakeEditor) + monkeypatch.setattr("trushell.cli.TruShellEditor", FakeEditor) assert _handle_edit_command(f"edit {file_path}") is True assert calls == {"filename": str(file_path), "initial_text": "hello", "ran": True} diff --git a/tests/test_sound.py b/tests/test_sound.py index 066d50f..39606e0 100644 --- a/tests/test_sound.py +++ b/tests/test_sound.py @@ -1,6 +1,6 @@ import subprocess -from trushell.chronoterm import sound +from trushell import sound def test_play_alarm_uses_quiet_subprocess(monkeypatch): diff --git a/tests/test_state_path.py b/tests/test_state_path.py index 9f2bf06..03efb2c 100644 --- a/tests/test_state_path.py +++ b/tests/test_state_path.py @@ -1,4 +1,4 @@ -from trushell.chronoterm.state import default_state_path +from trushell.state import default_state_path def test_default_state_path_contains_app_folder() -> None: diff --git a/trushell/chronoterm/__init__.py b/trushell/chronoterm/__init__.py deleted file mode 100644 index b915dc0..0000000 --- a/trushell/chronoterm/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -from .alarms import AlarmManager -from .clock_ascii import clock_ascii -from .shell import ChronoTerm, chrono, run, run_shell -from .sound import play_alarm -from .state import AppState, StateStore -from .stopwatch import Stopwatch -from .timezones import TimezoneManager - -__all__ = [ - "AlarmManager", - "AppState", - "ChronoTerm", - "StateStore", - "TimezoneManager", - "clock_ascii", - "chrono", - "play_alarm", - "run", - "run_shell", -] diff --git a/trushell/chronoterm/__main__.py b/trushell/chronoterm/__main__.py deleted file mode 100644 index 38fa473..0000000 --- a/trushell/chronoterm/__main__.py +++ /dev/null @@ -1,13 +0,0 @@ -from __future__ import annotations - -from .shell import run - - -def main() -> None: - run() - - -if __name__ == "__main__": - main() - -#CS50.DEV \ No newline at end of file diff --git a/trushell/chronoterm/alarms.py b/trushell/chronoterm/alarms.py deleted file mode 100644 index 81b800d..0000000 --- a/trushell/chronoterm/alarms.py +++ /dev/null @@ -1,185 +0,0 @@ -from __future__ import annotations - -import threading -import time -import uuid -from dataclasses import dataclass, field -from datetime import datetime, timedelta, timezone -from typing import Callable -from zoneinfo import ZoneInfo, ZoneInfoNotFoundError - -from rich.table import Table - -try: - from .state import AppState, StateStore -except ImportError: - from state import AppState, StateStore - -#CS50.DEV - -def _local_tz() -> timezone | ZoneInfo: - tzinfo = datetime.now().astimezone().tzinfo - return tzinfo or timezone.utc - - -def _parse_when(time_str: str, tz_name: str | None) -> tuple[datetime, str]: - try: - tz = _local_tz() if tz_name in (None, "local") else ZoneInfo(tz_name) - except ZoneInfoNotFoundError as e: - raise ValueError(f"Unknown timezone '{tz_name}'. Use an IANA name like 'Europe/Dublin'.") from e - tz_store = "local" if tz_name in (None, "local") else tz_name - - time_str = time_str.strip() - if " " in time_str: - # "YYYY-MM-DD HH:MM" - try: - dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M") - except ValueError as e: - raise ValueError('Time must be "YYYY-MM-DD HH:MM" (e.g., "2026-03-03 09:30").') from e - dt = dt.replace(tzinfo=tz) - return dt, tz_store - - # "HH:MM" (today; if passed, next day) - try: - t = datetime.strptime(time_str, "%H:%M").time() - except ValueError as e: - raise ValueError('Time must be "HH:MM" (24-hour, e.g., "07:15").') from e - now = datetime.now(tz=tz) - dt = datetime.combine(now.date(), t, tzinfo=tz) - if dt <= now: - dt = dt + timedelta(days=1) - return dt, tz_store - - -def _dt_utc(dt: datetime) -> datetime: - if dt.tzinfo is None: - dt = dt.replace(tzinfo=_local_tz()) - return dt.astimezone(timezone.utc) - - -def _load_alarm_dt(alarm: dict) -> datetime | None: - iso = alarm.get("when_iso") - if not isinstance(iso, str): - return None - try: - return datetime.fromisoformat(iso) - except ValueError: - return None - - -def _short_id() -> str: - return uuid.uuid4().hex[:8] - - -@dataclass -class AlarmManager: - store: StateStore - state: AppState - lock: threading.Lock - notify: Callable[[str], None] - - _stop_evt: threading.Event = field(default_factory=threading.Event, init=False, repr=False) - _thread: threading.Thread | None = None - - def list(self) -> list[dict]: - with self.lock: - return list(self.state.alarms) - - def add(self, time_str: str, tz_name: str | None = None, label: str | None = None) -> dict: - when_dt, tz_store = _parse_when(time_str, tz_name) - alarm = { - "id": _short_id(), - "label": label or "", - "tz": tz_store, - "when_iso": when_dt.isoformat(timespec="seconds"), - "enabled": True, - "fired": False, - } - with self.lock: - self.state.alarms.append(alarm) - self.store.save(self.state) - return alarm - - def remove(self, alarm_id: str) -> bool: - with self.lock: - before = len(self.state.alarms) - self.state.alarms = [a for a in self.state.alarms if a.get("id") != alarm_id] - changed = len(self.state.alarms) != before - if changed: - self.store.save(self.state) - return changed - - def alarms_table(self) -> Table: - table = Table(title="Alarms") - table.add_column("ID", style="bold") - table.add_column("When") - table.add_column("TZ") - table.add_column("Label") - table.add_column("Status") - - with self.lock: - alarms = list(self.state.alarms) - - if not alarms: - table.add_row("-", "-", "-", "-", "No alarms") - return table - - def sort_key(a: dict) -> tuple[int, str]: - dt = _load_alarm_dt(a) - dt_s = dt.isoformat() if dt else "9999" - return (1 if a.get("fired") else 0, dt_s) - - for a in sorted(alarms, key=sort_key): - dt = _load_alarm_dt(a) - when = dt.isoformat(timespec="seconds") if dt else "?" - status = "FIRED" if a.get("fired") else ("ON" if a.get("enabled", True) else "OFF") - table.add_row( - str(a.get("id", "")), - when, - str(a.get("tz", "local")), - str(a.get("label", "")), - status, - ) - - return table - - def start_scheduler(self) -> None: - if self._thread and self._thread.is_alive(): - return - self._stop_evt = threading.Event() - self._thread = threading.Thread(target=self._run, name="ChronoTermAlarmScheduler", daemon=True) - self._thread.start() - - def stop_scheduler(self) -> None: - self._stop_evt.set() - t = self._thread - if t and t.is_alive(): - t.join(timeout=1.5) - - def _run(self) -> None: - while not self._stop_evt.is_set(): - fired_any = False - messages: list[str] = [] - now_utc = datetime.now(timezone.utc) - - with self.lock: - for alarm in self.state.alarms: - if not alarm.get("enabled", True) or alarm.get("fired", False): - continue - dt = _load_alarm_dt(alarm) - if dt is None: - continue - if _dt_utc(dt) <= now_utc: - alarm["fired"] = True - fired_any = True - label = alarm.get("label") or "Alarm" - messages.append(f"ALARM: {label} (id={alarm.get('id')})") - - if fired_any: - self.store.save(self.state) - - for msg in messages: - self.notify(msg) - - time.sleep(0.6) - diff --git a/trushell/chronoterm/clock_ascii.py b/trushell/chronoterm/clock_ascii.py deleted file mode 100644 index c092e95..0000000 --- a/trushell/chronoterm/clock_ascii.py +++ /dev/null @@ -1,69 +0,0 @@ - -#CS50.DEV - -def clock_ascii(clock_text, template_type): - - if template_type == "wrist_watch": - template1 = """ - ___ - |---| - |_|_| - | | - | | - | | - | | - | | - |___| - /_____\\ - |HH:MM| - |_____| - |.....| - \\ ___ / - | | - | | - | | - | . | - | . | - | . | - | . | - | . | - | . | - | . | - | . | - |___| - - """ - clock_ascii = template1.replace("HH:MM", clock_text) - - return clock_ascii - - - elif template_type == "lcd": - template2 = """ - ___________________ - | _______________ | - | | HH:MM | | - | |_______________| | - | ___ ___ ___ ___ | - |_|___|___|___|___|_| - """ - - clock_ascii = template2.replace("HH:MM", clock_text) - - return clock_ascii - - - elif template_type == "desktop": - template3 = """ - _____________________ - | | - | HH:MM | - | | - |_____________________| - | | - ____|___|____ - |_____________| - """ - clock_ascii = template3.replace("HH:MM", clock_text) - - return clock_ascii diff --git a/trushell/chronoterm/shell.py b/trushell/chronoterm/shell.py deleted file mode 100644 index 8cf0245..0000000 --- a/trushell/chronoterm/shell.py +++ /dev/null @@ -1,189 +0,0 @@ -from __future__ import annotations - -import shlex -from datetime import datetime -from typing import Optional - -import pytz -import typer -from rich.console import Console -from rich.text import Text -from rich.table import Table - -from .alarms import AlarmManager -from .clock_ascii import clock_ascii -from .sound import play_alarm -from .state import StateStore -from .stopwatch import Stopwatch -from .timezones import TimezoneManager - -app = typer.Typer(help="ChronoTerm — time, timezone, alarm, and stopwatch commands.") -console = Console() - - -class ChronoTerm: - def __init__(self) -> None: - self.store = StateStore() - self.state = self.store.load() - self.tz = TimezoneManager(store=self.store, state=self.state) - self.sw = Stopwatch() - self.alarms = AlarmManager(store=self.store, state=self.state, lock=self.tz.lock, notify=self._notify_alarm) - self.alarms.start_scheduler() - - def _notify_alarm(self, msg: str) -> None: - play_alarm() - console.print(Text(f"\n🔔 {msg}", style="bold red")) - - -chrono = ChronoTerm() - - -def _refresh_state() -> None: - chrono.state = chrono.store.load() - chrono.tz.state = chrono.state - chrono.alarms.state = chrono.state - - -def _current_clock_display(clock_format: str) -> tuple[str, str | None]: - if clock_format == "12h": - return datetime.now().strftime("%I:%M"), datetime.now().strftime("%p") - return datetime.now().strftime("%H:%M"), None - - -def _print_stopwatch_status(action: str) -> None: - console.print(f"Stopwatch: [bold]{chrono.sw.status()}[/bold] {chrono.sw.render()}") - if action == "show": - laps = chrono.sw.render_laps() - for idx, lap in enumerate(laps, start=1): - console.print(f" Lap {idx}: {lap}") - - -def _is_local_timezone_name(name: str) -> bool: - local_now = datetime.now().astimezone() - zone_now = datetime.now(pytz.timezone(name)) - return zone_now.utcoffset() == local_now.utcoffset() and zone_now.tzname() == local_now.tzname() - - -def _tz_table(tzs: list[str]) -> Table: - table = Table(title="Favorite Time Zones") - table.add_column("IANA Name", style="bold cyan") - if not tzs: - table.add_row("(none)") - else: - for name in tzs: - label = name - if _is_local_timezone_name(name): - label = f"[bold green]{name} (Local)[/bold green]" - table.add_row(label) - return table - - -@app.command() -def now() -> None: - _refresh_state() - console.print(chrono.tz.now_table()) - - -@app.command() -def time() -> None: - _refresh_state() - state = chrono.store.load() - clock_text, meridiem = _current_clock_display(state.clock_format) - console.print(clock_ascii(clock_text, state.time_template)) - if meridiem is not None: - console.print(f"[bold cyan]{meridiem}[/bold cyan]") - play_alarm() - - -@app.command() -def world() -> None: - _refresh_state() - console.print(chrono.tz.world_table()) - - -@app.command() -def tz(action: str = typer.Argument("list", help="list | add | remove"), name: Optional[str] = typer.Argument(None, help="IANA Name (e.g. Europe/London)")) -> None: - _refresh_state() - if action == "list": - console.print(_tz_table(chrono.tz.list())) - elif action == "add" and name: - try: - chrono.tz.add(name) - timezone_obj = pytz.timezone(name) - aware_datetime = datetime.now(timezone_obj).strftime("%H:%M") - console.print(f"[green]Added:[/green] {name} [{aware_datetime}]") - except Exception as error: - console.print(f"[bold red]Error:[/bold red] {error}") - elif action == "remove" and name: - if chrono.tz.remove(name): - console.print(f"[yellow]Removed:[/yellow] {name}") - else: - console.print(f"[red]Timezone not found in favorites.[/red]") - - -@app.command() -def alarm(action: str = typer.Argument("list", help="list | add | remove"), time: Optional[str] = typer.Argument(None, help="Time as HH:MM or YYYY-MM-DD HH:MM"), tz: Optional[str] = typer.Option(None, "--tz", help="Specific timezone"), label: Optional[str] = typer.Option(None, "--label", help="Alarm label")) -> None: - _refresh_state() - if action == "list": - console.print(chrono.alarms.alarms_table()) - elif action == "add" and time: - try: - alarm_obj = chrono.alarms.add(time_str=time, tz_name=tz, label=label) - console.print(f"[green]Alarm set:[/green] {alarm_obj['id']} at {alarm_obj['when_iso']}") - except Exception as error: - console.print(f"[bold red]Error:[/bold red] {error}") - elif action == "remove" and time: - if chrono.alarms.remove(time): - console.print(f"[yellow]Removed alarm:[/yellow] {time}") - else: - console.print(f"[red]Alarm ID not found.[/red]") - - -@app.command() -def sw(action: str = typer.Argument("show", help="start | pause | lap | reset | show")) -> None: - if action == "start": - chrono.sw.start() - elif action == "pause": - chrono.sw.pause() - elif action == "reset": - chrono.sw.reset() - elif action == "lap": - chrono.sw.lap() - _print_stopwatch_status(action) - - -@app.command() -def shell() -> None: - console.print(chrono.tz.now_table()) - console.print("[bold cyan]Interactive ChronoTerm Shell Started. Type 'exit' to quit.[/bold cyan]") - - while True: - try: - text = console.input("[bold blue]chronoterm>[/bold blue] ").strip() - if not text: - continue - if text.lower() in ["exit", "quit"]: - break - app(shlex.split(text)) - except SystemExit: - continue - except Exception as error: - console.print(f"[bold red]Error:[/bold red] {error}") - - -def run_shell() -> None: - try: - shell() - finally: - chrono.alarms.stop_scheduler() - - -def run() -> None: - try: - app() - finally: - chrono.alarms.stop_scheduler() - - -if __name__ == "__main__": - run() diff --git a/trushell/chronoterm/stopwatch.py b/trushell/chronoterm/stopwatch.py deleted file mode 100644 index 9402a20..0000000 --- a/trushell/chronoterm/stopwatch.py +++ /dev/null @@ -1,67 +0,0 @@ -from __future__ import annotations - -import time -from dataclasses import dataclass, field - -#CS50.DEV - -def _fmt_seconds(total: float) -> str: - if total < 0: - total = 0.0 - ms = int((total - int(total)) * 1000) - sec = int(total) % 60 - minutes = (int(total) // 60) % 60 - hours = int(total) // 3600 - return f"{hours:02d}:{minutes:02d}:{sec:02d}.{ms:03d}" - - -@dataclass -class Stopwatch: - _running: bool = False - _start: float | None = None - _accum: float = 0.0 - _laps: list[float] = field(default_factory=list) - - def start(self) -> None: - if self._running: - return - self._running = True - self._start = time.perf_counter() - - def pause(self) -> None: - if not self._running: - return - now = time.perf_counter() - if self._start is not None: - self._accum += now - self._start - self._start = None - self._running = False - - def reset(self) -> None: - self._running = False - self._start = None - self._accum = 0.0 - self._laps.clear() - - def lap(self) -> float: - elapsed = self.elapsed_seconds() - self._laps.append(elapsed) - return elapsed - - def elapsed_seconds(self) -> float: - if not self._running or self._start is None: - return self._accum - return self._accum + (time.perf_counter() - self._start) - - def status(self) -> str: - return "RUNNING" if self._running else "PAUSED" - - def laps(self) -> list[float]: - return list(self._laps) - - def render(self) -> str: - return _fmt_seconds(self.elapsed_seconds()) - - def render_laps(self) -> list[str]: - return [_fmt_seconds(x) for x in self._laps] - diff --git a/trushell/chronoterm/timezones.py b/trushell/chronoterm/timezones.py deleted file mode 100644 index 4f37a33..0000000 --- a/trushell/chronoterm/timezones.py +++ /dev/null @@ -1,109 +0,0 @@ -from __future__ import annotations - -import threading -from dataclasses import dataclass, field -from datetime import datetime -from zoneinfo import ZoneInfo - -from rich.table import Table - -try: - from .state import AppState, StateStore -except ImportError: - from state import AppState, StateStore - -#CS50.DEV - -def _local_now() -> datetime: - return datetime.now().astimezone() - - -def _time_format_string(clock_format: str) -> str: - if clock_format == "12h": - return "%I:%M %p" - return "%H:%M" - - -def format_time_for_display(current_time: datetime, clock_format: str) -> str: - time_format = _time_format_string(clock_format) - return current_time.strftime(time_format) - - -def _is_local_timezone(current_time: datetime, zone_name: str) -> bool: - zone_time = current_time.astimezone(_safe_zoneinfo(zone_name)) - local_zone_name = current_time.tzname() - target_zone_name = zone_time.tzname() - same_offset = zone_time.utcoffset() == current_time.utcoffset() - same_name = target_zone_name == local_zone_name - return same_offset and same_name - - -def _safe_zoneinfo(name: str) -> ZoneInfo: - return ZoneInfo(name) - - -@dataclass -class TimezoneManager: - store: StateStore - state: AppState - lock: threading.Lock = field(default_factory=threading.Lock) - - def list(self) -> list[str]: - return list(self.state.timezones) - - def add(self, tz_name: str) -> None: - _safe_zoneinfo(tz_name) - if self.lock: - with self.lock: - if tz_name not in self.state.timezones: - self.state.timezones.append(tz_name) - self.store.save(self.state) - return - - if tz_name not in self.state.timezones: - self.state.timezones.append(tz_name) - self.store.save(self.state) - - def remove(self, tz_name: str) -> bool: - if self.lock: - with self.lock: - if tz_name in self.state.timezones: - self.state.timezones.remove(tz_name) - self.store.save(self.state) - return True - return False - - if tz_name in self.state.timezones: - self.state.timezones.remove(tz_name) - self.store.save(self.state) - return True - return False - - def now_table(self) -> Table: - current_time = _local_now() - table = Table(title="Current Time") - table.add_column("Zone", style="bold") - table.add_column("Time") - current_zone_name = str(current_time.tzinfo) if current_time.tzinfo else "Local" - table.add_row(current_zone_name, format_time_for_display(current_time, self.state.clock_format)) - return table - - def world_table(self) -> Table: - current_time = _local_now() - table = Table(title="World Time") - table.add_column("Zone", style="bold") - table.add_column("Time") - local_time_text = format_time_for_display(current_time, self.state.clock_format) - table.add_row("[bold green]Local[/bold green]", local_time_text) - - for current_zone_name in self.state.timezones: - target_zone = _safe_zoneinfo(current_zone_name) - target_time = current_time.astimezone(target_zone) - zone_label = current_zone_name - - if _is_local_timezone(current_time, current_zone_name): - zone_label = f"[bold green]{current_zone_name} (Local)[/bold green]" - - table.add_row(zone_label, format_time_for_display(target_time, self.state.clock_format)) - return table - diff --git a/trushell/cli.py b/trushell/cli.py index b748996..d0749bf 100644 --- a/trushell/cli.py +++ b/trushell/cli.py @@ -1,11 +1,24 @@ from __future__ import annotations +import os +import re +import shlex +import subprocess import sys +import time import typer +from pathlib import Path +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.widgets import Footer, Header, TextArea + +try: + import psutil +except ImportError: # pragma: no cover + psutil = None from . import __version__ -from .core.trukernel import TruKernel -from .project import run_interactive_shell +from .core.trukernel import EXIT_SENTINEL, TruKernel app = typer.Typer(name="trushell", help="TruShell manifest-driven launcher.") kernel = TruKernel() @@ -22,6 +35,288 @@ def app_with_lower() -> None: app() +def _split_command(user_input: str) -> tuple[str, str]: + parts = user_input.strip().split(maxsplit=1) + if not parts: + return "", "" + command = parts[0].lower() + argument = parts[1] if len(parts) > 1 else "" + return command, argument + + +def _prompt_command() -> tuple[str, str, str]: + raw_command = input(f"trushell {os.getcwd()} ❯ ").strip() + command, argument = _split_command(raw_command) + return raw_command, command, argument + + +class TruShellEditor(App): + """Simple full-screen text editor for TruShell files.""" + + inherit_bindings = True + + CSS = """ + Screen { padding: 0; } + #editor { height: 1fr; } + Footer { height: 1; } + """ + + BINDINGS = [ + ("ctrl+shift+s", "save_file", "Ctrl+Shift+S Save"), + ("ctrl+shift+q", "quit_app", "Ctrl+Shift+Q Quit"), + ] + + def __init__(self, file_path: str, initial_text: str | None = None, **kwargs) -> None: + super().__init__(**kwargs) + self.file_path = file_path + self.file_content = initial_text if initial_text is not None else "" + + if initial_text is None and os.path.exists(file_path): + try: + with open(file_path, "r", encoding="utf-8") as handle: + self.file_content = handle.read() + except OSError as error: + self.file_content = f"Error reading file: {error}" + + def compose(self) -> ComposeResult: + yield Header() + yield TextArea(self.file_content, id="editor_text_area") + yield Footer() + + def on_mount(self) -> None: + text_area = self.query_one("#editor_text_area", TextArea) + text_area.focus() + + def action_save_file(self) -> None: + text_area = self.query_one("#editor_text_area", TextArea) + try: + with open(self.file_path, "w", encoding="utf-8") as handle: + handle.write(text_area.text) + except (PermissionError, OSError) as error: + self.notify(f"Failed to save file: {error}", severity="error") + + def action_quit_app(self) -> None: + self.exit() + + +def _run_external_command( + command: str, + shell: bool = True, + check: bool = False, + cwd: str | None = None, +) -> subprocess.CompletedProcess[str]: + process = subprocess.Popen(command, shell=shell, cwd=cwd) + monitor = None + if psutil is not None: + try: + monitor = psutil.Process(process.pid) + monitor.cpu_percent(None) + except Exception: + monitor = None + + peak_rss = 0 + peak_cpu = 0.0 + start = time.perf_counter() + + while True: + try: + process.wait(timeout=0.05) + break + except subprocess.TimeoutExpired: + if monitor is not None: + try: + peak_rss = max(peak_rss, monitor.memory_info().rss) + peak_cpu = max(peak_cpu, monitor.cpu_percent(None)) + except (Exception, OSError): + break + + if process.returncode is None: + process.wait() + + if monitor is not None: + try: + peak_rss = max(peak_rss, monitor.memory_info().rss) + peak_cpu = max(peak_cpu, monitor.cpu_percent(None)) + except (Exception, OSError): + pass + + elapsed = time.perf_counter() - start + if peak_rss or peak_cpu: + typer.secho( + f"🧪 {elapsed:.2f}s CPU peak {peak_cpu:.1f}% RAM peak {peak_rss / 1024**2:.1f} MiB", + fg=typer.colors.GREEN, + ) + + if check and process.returncode not in (None, 0): + raise subprocess.CalledProcessError(process.returncode, command) + + return subprocess.CompletedProcess(args=command, returncode=process.returncode) + + +def _handle_joke_command(command: str) -> bool: + if command in {"joke", "joke_trex", "joke-trex"}: + if command == "joke": + typer.echo("Tell a joke command is not available in CLI mode.") + else: + typer.echo("Tell a T-Rex joke command is not available in CLI mode.") + return True + return False + + +def _handle_todo_command(command: str) -> bool: + if command.startswith("deletetask"): + match = re.match(r"deletetask\s+(\d+)", command) + if match: + from trushell.commands.tasks import remove_task + + remove_task(match.group(1)) + return True + return False + + add_match = re.match(r'addtask\s+"([^"]+)"\s+"([^"]+)"', command) + if add_match: + from trushell.commands.tasks import add_task + + add_task(f"{add_match.group(1)} {add_match.group(2)}") + return True + + update_match = re.match(r'updatetask\s+(\d+)\s+"([^"]+)"\s+"([^"]+)"', command) + if update_match: + from trushell.commands.tasks import update_task + + update_task(f"{update_match.group(1)} \"{update_match.group(2)}\" \"{update_match.group(3)}\"") + return True + + complete_match = re.match(r'completetask\s+(\d+)', command) + if complete_match: + from trushell.commands.tasks import complete_task + + complete_task(complete_match.group(1)) + return True + + if command == "showtasks": + from trushell.commands.tasks import show_tasks + + show_tasks("") + return True + + return False + + +def _handle_edit_command(raw_command: str) -> bool: + command, argument = _split_command(raw_command) + if command != "edit": + return False + + if not argument.strip(): + typer.secho("⚠️ Syntax: edit ", fg=typer.colors.YELLOW) + return True + + file_path = Path(argument.strip()) + initial_text = file_path.read_text(encoding="utf-8") if file_path.exists() else "" + + try: + TruShellEditor(str(file_path), initial_text=initial_text).run() + except Exception as error: + typer.secho(f"Editor error: {error}", fg=typer.colors.RED) + + return True + + +def _handle_local_command(command: str, argument: str) -> str: + if command == "addtask" and not argument: + typer.secho( + '⚠️ Missing arguments. Syntax: addtask "task-name" "category"', + fg=typer.colors.YELLOW, + ) + return "handled" + + if command in {"exit", "quit"}: + return "exit" + if _handle_joke_command(command): + return "handled" + if _handle_todo_command(command): + return "handled" + if command == "settings": + from trushell.core.settings import launch_settings + + launch_settings() + return "handled" + if command == "help": + typer.echo("Available commands: joke, joke_trex, addtask, deletetask, updatetask, completetask, showtask, now, time, world, tz, alarm, sw, settings, exit, help") + return "handled" + return "unhandled" + + +def _handle_chronoterm_command(raw_command: str, normalized_command: str) -> bool: + if not re.match(r"^(now|time|world|tz|alarm|sw)\b", normalized_command): + return False + + try: + return True + except Exception: + return False + + +def _handle_cd_command(raw_command: str) -> bool: + command, argument = _split_command(raw_command) + if command != "cd": + return False + + if not argument.strip(): + typer.secho("Syntax: cd ", fg=typer.colors.YELLOW) + return True + + target = os.path.expanduser(argument) + + try: + os.chdir(target) + _run_external_command("ls", shell=True, check=False, cwd=os.getcwd()) + except (FileNotFoundError, NotADirectoryError, PermissionError) as error: + typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) + except OSError as error: + typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) + + return True + + +def _handle_os_fallback(raw_command: str) -> bool: + command = raw_command.strip() + if not command: + return False + + try: + completed = _run_external_command(command, shell=True, check=False, cwd=os.getcwd()) + except (OSError, subprocess.SubprocessError) as error: + typer.secho("❓ Command not recognized by TruShell or your host OS.", fg=typer.colors.YELLOW) + typer.secho(f"OS fallback error: {error}", fg=typer.colors.RED) + return True + + if completed.returncode != 0: + typer.secho("❓ Command not recognized by TruShell or your host OS.", fg=typer.colors.YELLOW) + return True + + +def run_interactive_shell() -> None: + """Persistent REPL loop for the TruShell core.""" + kernel = TruKernel() + + typer.secho("Entering TruShell. Type 'exit' to quit.", fg=typer.colors.CYAN) + + while True: + try: + raw_command, command, _ = _prompt_command() + except (KeyboardInterrupt, EOFError): + typer.echo("") + break + + result = kernel.execute_command(raw_command) + if result == EXIT_SENTINEL: + break + if result is False: + typer.secho(f"Unknown command: {command}", fg=typer.colors.RED) + + @app.callback(invoke_without_command=True) def main(ctx: typer.Context) -> None: """Launch the REPL when no command is provided.""" diff --git a/trushell/commands/chronoterm.py b/trushell/commands/chronoterm.py index c05f9d7..e235850 100644 --- a/trushell/commands/chronoterm.py +++ b/trushell/commands/chronoterm.py @@ -1,45 +1,53 @@ from __future__ import annotations -import shlex +from datetime import datetime +from zoneinfo import ZoneInfo -from trushell.chronoterm.shell import app as chronoterm_app - -def _run_chrono_command(raw_command: str) -> None: - try: - chronoterm_app(shlex.split(raw_command)) - except SystemExit: - return - except Exception as error: - print(f"ChronoTerm error: {error}") +def _format_time(dt: datetime) -> str: + return dt.strftime("%Y-%m-%d %H:%M:%S %Z") -def run_now(_: str) -> None: - _run_chrono_command("now") +def run_now(_: str) -> str: + """Return the current local time.""" + return _format_time(datetime.now()) -def run_time(_: str) -> None: - _run_chrono_command("time") +def run_time(_: str) -> str: + """Return the current local time (alias for now).""" + return run_now("") -def run_world(_: str) -> None: - _run_chrono_command("world") +def run_world(_: str) -> str: + """Return current world times in several major zones.""" + zones = { + "UTC": ZoneInfo("UTC"), + "New York": ZoneInfo("America/New_York"), + "London": ZoneInfo("Europe/London"), + "Tokyo": ZoneInfo("Asia/Tokyo"), + } + return "\n".join(f"{label}: {_format_time(datetime.now(tz))}" for label, tz in zones.items()) -def run_tz(args: str) -> None: +def run_tz(args: str) -> str: + """Show a specific timezone or the list of included zones.""" if not args.strip(): - _run_chrono_command("tz list") - return - _run_chrono_command(f"tz {args.strip()}") + return "Usage: tz " + try: + zone = ZoneInfo(args.strip()) + return _format_time(datetime.now(zone)) + except Exception: + return f"Unknown timezone: {args.strip()}" -def run_alarm(args: str) -> None: +def run_alarm(args: str) -> str: + """Handle simple alarm command placeholders.""" if not args.strip(): - _run_chrono_command("alarm list") - return - _run_chrono_command(f"alarm {args.strip()}") + return "No alarms configured. Use alarm