diff --git a/.env.example b/.env.example index 3f6ad1d..4ab8ca8 100644 --- a/.env.example +++ b/.env.example @@ -18,9 +18,6 @@ POSTGRES_PORT=5432 DATABASE_URL=postgresql+asyncpg://techwatch:techwatch@postgres:5432/techwatch DATABASE_SYNC_URL=postgresql://techwatch:techwatch@postgres:5432/techwatch -# Redis -REDIS_URL=redis://redis:6379 - # App exposure / admin FRONTEND_URL=http://localhost:3000 CORS_ORIGINS=http://localhost:3000,http://127.0.0.1:3000,http://localhost:5173,http://127.0.0.1:5173 diff --git a/.github/workflows/ci-frontend.yml b/.github/workflows/ci-frontend.yml new file mode 100644 index 0000000..af96002 --- /dev/null +++ b/.github/workflows/ci-frontend.yml @@ -0,0 +1,43 @@ +name: Frontend CI + +on: + push: + branches: [main, master, dev, 'feat/**', 'fix/**', 'chore/**', 'hotfix/**'] + paths: + - 'frontend/**' + - '.github/workflows/ci-frontend.yml' + pull_request: + branches: [main, master, dev] + paths: + - 'frontend/**' + +env: + NODE_VERSION: '20' + +jobs: + build: + name: Type-check & Build + runs-on: ubuntu-latest + defaults: + run: + working-directory: frontend + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-node@v4 + with: + node-version: ${{ env.NODE_VERSION }} + cache: 'npm' + cache-dependency-path: frontend/package-lock.json + + - name: Install dependencies + run: npm ci + + - name: Type-check + run: npx tsc --noEmit + + - name: Build + run: npm run build + env: + VITE_API_URL: http://localhost:8000 diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml new file mode 100644 index 0000000..8611d3d --- /dev/null +++ b/.github/workflows/ci-python.yml @@ -0,0 +1,93 @@ +name: Python CI + +on: + push: + branches: [main, master, dev, 'feat/**', 'fix/**', 'chore/**', 'hotfix/**'] + paths: + - 'app/**' + - 'tests/**' + - 'alembic/**' + - 'pyproject.toml' + - '.github/workflows/ci-python.yml' + pull_request: + branches: [main, master, dev] + paths: + - 'app/**' + - 'tests/**' + - 'alembic/**' + - 'pyproject.toml' + +env: + PYTHON_VERSION: '3.11' + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dev dependencies + run: | + uv venv .venv --python ${{ env.PYTHON_VERSION }} + uv pip install --python .venv/bin/python -e ".[dev]" + echo "$GITHUB_WORKSPACE/.venv/bin" >> $GITHUB_PATH + + - name: ruff check + run: ruff check . + + test: + name: Test + runs-on: ubuntu-latest + services: + postgres: + image: pgvector/pgvector:pg16 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: techwatch_test + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + DATABASE_URL: postgresql+asyncpg://postgres:postgres@localhost:5432/techwatch_test + DATABASE_SYNC_URL: postgresql://postgres:postgres@localhost:5432/techwatch_test + ADMIN_API_TOKEN: ci-test-token + CONFIG_ENCRYPTION_KEY: "" + LLM_API_KEY: ci-test-key + + steps: + - uses: actions/checkout@v4 + + - uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: | + uv venv .venv --python ${{ env.PYTHON_VERSION }} + uv pip install --python .venv/bin/python -e ".[dev]" + echo "$GITHUB_WORKSPACE/.venv/bin" >> $GITHUB_PATH + + - name: Run migrations + run: alembic upgrade head + + - name: Run tests + run: pytest tests/ -v --cov=app --cov-report=xml + + - name: Upload coverage + uses: codecov/codecov-action@v4 + with: + file: ./coverage.xml + fail_ci_if_error: false diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index cf9dd72..0000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,175 +0,0 @@ -name: CI - -on: - push: - branches: [main, master, dev, 'feat/**', 'fix/**', 'chore/**', 'hotfix/**'] - pull_request: - branches: [main, master, dev] - -env: - PYTHON_VERSION: '3.11' - NODE_VERSION: '20' - -jobs: - lint: - name: Lint - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - uses: actions/setup-python@v5 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Cache pip - uses: actions/cache@v4 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-dev-${{ hashFiles('pyproject.toml') }} - restore-keys: ${{ runner.os }}-pip-dev- - - - name: Install dev dependencies - run: pip install -e ".[dev]" - - - name: ruff check - run: ruff check . - - test: - name: Test - runs-on: ubuntu-latest - services: - postgres: - image: pgvector/pgvector:pg16 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: techwatch_test - ports: - - 5432:5432 - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - - env: - DATABASE_URL: postgresql+asyncpg://postgres:postgres@localhost:5432/techwatch_test - DATABASE_SYNC_URL: postgresql://postgres:postgres@localhost:5432/techwatch_test - ADMIN_API_TOKEN: ci-test-token - CONFIG_ENCRYPTION_KEY: "" - LLM_API_KEY: ci-test-key - - steps: - - uses: actions/checkout@v4 - - - uses: actions/setup-python@v5 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Cache pip - uses: actions/cache@v4 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('pyproject.toml') }} - restore-keys: ${{ runner.os }}-pip- - - - name: Install dependencies - run: pip install -e ".[dev]" - - - name: Run migrations - run: alembic upgrade head - - - name: Run tests - run: pytest tests/ -v --cov=app --cov-report=xml - - - name: Upload coverage - uses: codecov/codecov-action@v4 - with: - file: ./coverage.xml - fail_ci_if_error: false - - docker-build: - name: Docker Build - runs-on: ubuntu-latest - needs: [lint, test] - if: github.event_name == 'push' - permissions: - contents: read - packages: write - steps: - - uses: actions/checkout@v4 - - - uses: docker/setup-buildx-action@v3 - - - name: Login to GitHub Container Registry - uses: docker/login-action@v3 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata - id: meta - uses: docker/metadata-action@v5 - with: - images: ghcr.io/${{ github.repository }} - tags: | - type=ref,event=branch - type=sha,prefix=,format=short - type=raw,value=latest,enable={{is_default_branch}} - - - name: Build and push - uses: docker/build-push-action@v6 - with: - context: . - file: ./docker/Dockerfile - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max - - docker-smoke: - name: Docker Smoke Test - runs-on: ubuntu-latest - needs: [docker-build] - steps: - - uses: actions/checkout@v4 - - - name: Prepare env file - run: cp docker/env.docker .env - - - name: Start infrastructure - run: | - docker compose -f docker/docker-compose.yml up -d postgres redis searxng - docker compose -f docker/docker-compose.yml ps - - - name: Wait for postgres - run: | - for i in $(seq 1 20); do - docker compose -f docker/docker-compose.yml exec -T postgres \ - pg_isready -U techwatch -d techwatch && break - echo "Waiting for postgres... ($i/20)" - sleep 3 - done - - - name: Build and start API - run: docker compose -f docker/docker-compose.yml up -d --build api - - - name: Health check - run: | - for attempt in $(seq 1 15); do - if curl -fsS http://localhost:8000/health; then - echo "API is healthy" - exit 0 - fi - echo "Waiting for API... ($attempt/15)" - sleep 5 - done - echo "--- API logs ---" - docker compose -f docker/docker-compose.yml logs api - exit 1 - - - name: Stop services - if: always() - run: docker compose -f docker/docker-compose.yml down -v --remove-orphans diff --git a/app/agents/deep_research/graph.py b/app/agents/deep_research/graph.py index ce88779..632a644 100644 --- a/app/agents/deep_research/graph.py +++ b/app/agents/deep_research/graph.py @@ -65,7 +65,7 @@ def build(self, checkpointer=None) -> CompiledStateGraph: # Build the main agent graph main_graph = StateGraph( DeepResearchAgentState, - config_schema=DeepResearchConfig, + context_schema=DeepResearchConfig, ) # Add main workflow nodes @@ -98,7 +98,7 @@ def _build_supervisor_subgraph(self) -> CompiledStateGraph: """ supervisor_graph = StateGraph( SupervisorState, - config_schema=DeepResearchConfig, + context_schema=DeepResearchConfig, ) supervisor_graph.add_node("supervisor", self.nodes.supervisor) diff --git a/app/agents/deep_research/nodes.py b/app/agents/deep_research/nodes.py index afa2ab8..7d26778 100644 --- a/app/agents/deep_research/nodes.py +++ b/app/agents/deep_research/nodes.py @@ -18,6 +18,8 @@ import re from typing import Any, Optional +from langgraph.types import RunnableConfig + from langchain_core.messages import ( AIMessage, HumanMessage, @@ -496,7 +498,7 @@ async def _generate_completion( async def clarify_with_user( self, state: DeepResearchAgentState, - config: Optional[dict] = None, + config: Optional[RunnableConfig] = None, ) -> dict[str, Any]: """Analyze user messages and ask clarifying questions if needed.""" from langgraph.types import Command @@ -538,7 +540,7 @@ async def clarify_with_user( async def write_research_brief( self, state: DeepResearchAgentState, - config: Optional[dict] = None, + config: Optional[RunnableConfig] = None, ) -> dict[str, Any]: """Transform user messages into a structured research brief.""" from langgraph.types import Command @@ -584,7 +586,7 @@ async def write_research_brief( async def supervisor( self, state: SupervisorState, - config: Optional[dict] = None, + config: Optional[RunnableConfig] = None, ) -> dict[str, Any]: """Lead research supervisor that plans and delegates research.""" from langgraph.types import Command @@ -652,7 +654,7 @@ class SupervisorDecision(BaseModel): async def supervisor_tools( self, state: SupervisorState, - config: Optional[dict] = None, + config: Optional[RunnableConfig] = None, ) -> dict[str, Any]: """Execute supervisor tools (research delegation).""" from langgraph.types import Command @@ -862,7 +864,7 @@ async def merge_results( async def final_report_generation( self, state: DeepResearchAgentState, - config: Optional[dict] = None, + config: Optional[RunnableConfig] = None, ) -> dict[str, Any]: """Generate the final research report.""" research_brief = state.get("research_brief", "") diff --git a/app/agents/orchestrator/graph.py b/app/agents/orchestrator/graph.py index 12e80dc..4dafb32 100644 --- a/app/agents/orchestrator/graph.py +++ b/app/agents/orchestrator/graph.py @@ -160,6 +160,7 @@ def build(self) -> StateGraph: "proceed": "analyzer", "retry": "dispatcher", "bypass": "analyzer", + "pause": END, } ) @@ -211,7 +212,7 @@ def _route_after_validation(self, state: OrchestratorState) -> Literal["retry", return "retry" return "approval" - def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", "retry", "bypass"]: + def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", "retry", "bypass", "pause"]: approval_result = state.get("approval_result", "pending") autonomous = state.get("autonomous", False) @@ -220,6 +221,10 @@ def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", if approval_result == "approved": return "proceed" + + if approval_result == "awaiting_approval": + return "pause" + return "retry" diff --git a/app/agents/orchestrator/nodes.py b/app/agents/orchestrator/nodes.py index a0be67a..d2f3a32 100644 --- a/app/agents/orchestrator/nodes.py +++ b/app/agents/orchestrator/nodes.py @@ -48,6 +48,7 @@ SYNTHESIZER_SYSTEM, EMAILER_USER, ) +from langgraph.types import RunnableConfig from app.services.llm import ChatCompletionClient from app.services.llm.health import LLMHealthManager, get_health_manager from app.tools.registry import get_global_registry @@ -597,12 +598,30 @@ async def planner(self, state: OrchestratorState) -> OrchestratorState: if isinstance(watch_ctx, dict): watch_ctx = WatchContext(**watch_ctx) - import random from datetime import datetime as _dt - # Add a per-session seed so the LLM generates different plans across runs session_seed = state.get("session_id", "")[-6:] if state.get("session_id") else "" run_ts = _dt.now().strftime("%H:%M:%S") + # Inject vector memory context so the planner avoids repeating past research + memory_block = "" + try: + from app.tools.memory.search_memory import SearchMemoryTool + _mem = SearchMemoryTool() + _mem_result = await _mem.execute({"query": task, "top_k": 8, "min_score": 0.25}) + if _mem_result.get("success"): + _recent = _mem_result.get("data", {}).get("results", []) + if _recent: + _covered_topics = list({r.get("topic", "") for r in _recent if r.get("topic")}) + _titles = [r["title"] for r in _recent[:6] if r.get("title")] + memory_block = "\n\n## Previously researched (avoid repeating)\n" + if _covered_topics: + memory_block += f"Topics already covered: {', '.join(_covered_topics[:6])}\n" + memory_block += "Recent articles already in memory:\n" + memory_block += "\n".join(f"- {t}" for t in _titles) + memory_block += "\nFocus on NEW angles, sources, and developments not yet covered." + except Exception as _mem_exc: + logger.debug("Planner memory fetch failed (non-blocking): %s", _mem_exc) + prompt = PLANNER_USER.format( task=task, watch_context=watch_ctx.to_prompt_block(), @@ -613,7 +632,7 @@ async def planner(self, state: OrchestratorState) -> OrchestratorState: current_month=watch_ctx.month_name, topic=", ".join(topics) if isinstance(topics, list) else str(topics), ) - # Append a uniqueness hint so repeated tasks yield different plans + prompt += memory_block prompt += f"\n\n[session={session_seed} ts={run_ts} vary_approach=true]" try: @@ -1086,10 +1105,41 @@ async def collector(self, state: OrchestratorState) -> OrchestratorState: data.setdefault("source", tool) all_articles_data.append(data) + # 1b. Inter-session deduplication: separate fresh articles from already-known ones. + # Fresh articles (not yet in the Article table) are ranked first; known ones are + # appended as fallback so the session is never empty if all results are familiar. + fresh_articles_data = all_articles_data + known_articles_data: list[dict] = [] + candidate_urls = [a.get("url", "") for a in all_articles_data if a.get("url")] + if candidate_urls: + try: + from sqlalchemy import select + from app.db.base import get_db_context + from app.db.models import Article as _ArticleDB + async with get_db_context() as _db: + rows = await _db.execute( + select(_ArticleDB.url).where(_ArticleDB.url.in_(candidate_urls)) + ) + known_urls: set[str] = {r[0] for r in rows} + fresh_articles_data = [a for a in all_articles_data if a.get("url") not in known_urls] + known_articles_data = [a for a in all_articles_data if a.get("url") in known_urls] + if fresh_articles_data: + logger.info( + "Collector dedup: %d new, %d already seen (suppressed)", + len(fresh_articles_data), len(known_articles_data), + ) + else: + # Nothing new — fall back to all articles to avoid empty synthesis + fresh_articles_data = all_articles_data + known_articles_data = [] + logger.info("Collector dedup: all %d articles already seen, keeping all", len(all_articles_data)) + except Exception as _dedup_exc: + logger.debug("Inter-session dedup skipped (non-blocking): %s", _dedup_exc) + # 2. Convert to Article objects for ranking from app.core.models import Article as ArticleModel articles_to_rank = [] - for a in all_articles_data: + for a in fresh_articles_data: articles_to_rank.append(ArticleModel( title=a.get("title", a.get("name", "")), summary=a.get("summary", a.get("description", "")), @@ -1414,25 +1464,35 @@ async def validator(self, state: OrchestratorState) -> OrchestratorState: async def human_approval(self, state: OrchestratorState) -> OrchestratorState: """Human-in-the-loop approval checkpoint. - This node pauses execution and waits for human approval. - In a real implementation, this would use LangGraph's interrupt - mechanism or a message queue to pause and wait for user input. - - For now, it automatically approves if quality score is above threshold. + In autonomous mode: auto-approves if quality >= threshold. + In interactive mode (autonomous=False): pauses the graph when quality + is below threshold so the user can review research results before synthesis. + The streaming service detects approval_result="awaiting_approval" and + emits an `approval_required` SSE event, then saves session state. """ quality_score = state.get("quality_score", 0.0) approval_threshold = state.get("approval_threshold", 0.7) - - state["approval_status"] = "auto_approved" if quality_score >= approval_threshold else "needs_review" + autonomous = state.get("autonomous", True) if quality_score >= approval_threshold: logger.info("Auto-approved: quality %.2f >= threshold %.2f", quality_score, approval_threshold) state["approval_result"] = "approved" + state["approval_status"] = "auto_approved" + state["approved_at"] = datetime.now().isoformat() + elif autonomous: + # Autonomous mode: bypass regardless of quality + logger.warning("Bypassing approval in autonomous mode (quality %.2f < %.2f)", + quality_score, approval_threshold) + state["approval_result"] = "approved" + state["approval_status"] = "auto_approved_autonomous" + state["approved_at"] = datetime.now().isoformat() else: - logger.warning("Needs review: quality %.2f < threshold %.2f", quality_score, approval_threshold) - state["approval_result"] = "pending" - - state["approved_at"] = datetime.now().isoformat() if quality_score >= approval_threshold else None + # Interactive mode + low quality: pause for human review + logger.info("Interactive mode: pausing for human approval (quality %.2f < %.2f)", + quality_score, approval_threshold) + state["approval_result"] = "awaiting_approval" + state["approval_status"] = "needs_review" + state["approved_at"] = None return state @@ -1504,7 +1564,7 @@ async def analyzer(self, state: OrchestratorState) -> OrchestratorState: return state - async def synthesizer(self, state: OrchestratorState, config: Optional[dict] = None) -> OrchestratorState: + async def synthesizer(self, state: OrchestratorState, config: Optional[RunnableConfig] = None) -> OrchestratorState: """Create the final comprehensive report. Also stores the report in ResearchSession for future reference. diff --git a/app/api/main.py b/app/api/main.py index c181820..ebebb86 100644 --- a/app/api/main.py +++ b/app/api/main.py @@ -223,18 +223,32 @@ async def lifespan(app: FastAPI): except Exception as _exc: logger.warning("Stale session cleanup skipped: %s", _exc) - # Start in-process profile scheduler - scheduler_task = asyncio.create_task(_profile_scheduler_loop()) + # Start persistent APScheduler (falls back silently if apscheduler not installed) + from app.scheduler.persistent import get_profile_scheduler + profile_scheduler = get_profile_scheduler() + try: + await profile_scheduler.start() + except Exception as _sched_exc: + logger.warning("APScheduler init failed, falling back to polling loop: %s", _sched_exc) + profile_scheduler = None + + # Fallback polling loop if APScheduler unavailable + scheduler_task = None + if not (profile_scheduler and profile_scheduler.is_ready): + scheduler_task = asyncio.create_task(_profile_scheduler_loop()) yield # Shutdown logger.info("Shutting down tech-watch-agent API") - scheduler_task.cancel() - try: - await scheduler_task - except asyncio.CancelledError: - pass + if profile_scheduler and profile_scheduler.is_ready: + await profile_scheduler.stop() + if scheduler_task: + scheduler_task.cancel() + try: + await scheduler_task + except asyncio.CancelledError: + pass try: await close_db() except Exception as exc: diff --git a/app/api/routers/newsletter.py b/app/api/routers/newsletter.py index 7a2c43b..a7aa5e2 100644 --- a/app/api/routers/newsletter.py +++ b/app/api/routers/newsletter.py @@ -4,65 +4,76 @@ from app.db.base import async_session_factory from app.db.repositories import NewsletterRunRepository -from app.agents.newsletter.agent import create_newsletter_agent from app.config.settings import get_settings from app.api.models import NewsletterGenerateRequest, NewsletterGenerateResponse from app.core.logging import get_logger -from app.delivery.service import ReportDeliveryService +from app.core.research_brief import build_research_brief, derive_session_title logger = get_logger(__name__) router = APIRouter(prefix="/newsletter", tags=["Newsletter"]) + @router.post("/generate", response_model=NewsletterGenerateResponse) async def generate_newsletter( payload: NewsletterGenerateRequest, _background_tasks: BackgroundTasks, ) -> NewsletterGenerateResponse: - """Generate newsletter content and optionally deliver it.""" + """Generate newsletter content via the orchestrator pipeline and optionally deliver it.""" try: - resolved_settings = get_settings() - agent = create_newsletter_agent(resolved_settings) + from app.agents.orchestrator.agent import OrchestratorAgent + + topics = payload.topics or [] + subject = f"Newsletter: {', '.join(topics[:3])}" if topics else "Newsletter Tech Watch" + task = build_research_brief( + subject=subject, + topics=topics or None, + research_instructions=None, + ) - # Run synchronously for now (background_tasks not fully implemented) + agent = OrchestratorAgent() result = await agent.execute({ - "topics": payload.topics, + "task": task, + "subject": subject, + "title": derive_session_title(subject=subject, task=task), + "topics": topics, + "send_email": payload.send_email, + "autonomous": True, }) if not result.success: - raise HTTPException(status_code=500, detail=result.errors[0] if result.errors else "Generation failed") + raise HTTPException( + status_code=500, + detail=result.errors[0] if result.errors else "Generation failed", + ) - output = result.output - newsletter = output.get("newsletter", "") - article_count = result.metadata.get("article_count", 0) - - # Get first line as subject - subject = newsletter.split("\n")[0] if newsletter else "Tech Watch Newsletter" - subject = subject.replace("#", "").strip() - delivery = ReportDeliveryService(resolved_settings).deliver( - report=newsletter, - subject=subject, - send=payload.send_email, - ) + output = result.output or {} + report = output.get("report") or "" + article_count = len(output.get("research_results", [])) + subject_line = report.split("\n")[0].replace("#", "").strip() if report else subject return NewsletterGenerateResponse( run_id=str(result.session_id) if result.session_id else str(uuid.uuid4()), - subject=subject, + subject=subject_line, article_count=article_count, status="completed", - preview=newsletter[:500], - email_sent=delivery.sent, - delivery_message=delivery.message, + preview=report[:500], + email_sent=output.get("email_sent", False), + delivery_message="Newsletter générée via orchestrateur", ) + except HTTPException: + raise except Exception as exc: logger.error("Newsletter generation failed: %s", exc) raise HTTPException(status_code=500, detail=str(exc)) + @router.post("/generate/sync", response_model=NewsletterGenerateResponse) async def generate_newsletter_sync(payload: NewsletterGenerateRequest) -> NewsletterGenerateResponse: - """Generate a newsletter synchronously.""" + """Generate a newsletter synchronously (alias for /generate).""" return await generate_newsletter(payload, BackgroundTasks()) + @router.get("/history") async def newsletter_history( user_id: Optional[str] = None, @@ -71,10 +82,8 @@ async def newsletter_history( """Get newsletter generation history.""" async with async_session_factory() as session: repo = NewsletterRunRepository(session) - user_uuid = uuid.UUID(user_id) if user_id else None runs = await repo.get_recent(user_uuid, limit) - return [ { "id": str(run.id), @@ -88,6 +97,7 @@ async def newsletter_history( for run in runs ] + @router.get("/stats") async def newsletter_stats(user_id: Optional[str] = None) -> dict[str, Any]: """Get newsletter statistics.""" diff --git a/app/api/routers/orchestrator.py b/app/api/routers/orchestrator.py index 9c9b65f..b318f2b 100644 --- a/app/api/routers/orchestrator.py +++ b/app/api/routers/orchestrator.py @@ -1,6 +1,7 @@ -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from typing import Optional +from app.api.security import require_admin_access from app.config.settings import get_settings from app.api.models import OrchestratorRequest, OrchestratorResponse from app.core.research_brief import build_research_brief, derive_session_title @@ -136,6 +137,141 @@ async def setup_scheduled_task( } +@router.post("/sessions", response_model=dict) +async def create_session(payload: OrchestratorRequest) -> dict: + """Create a pending session and return its ID. + + Allows the client to obtain a session_id synchronously, navigate to the + session page, then start streaming via GET /orchestrator/stream?session_id={id}. + """ + import uuid as _uuid + from app.services.session_manager import create_session as _create_session + + session_uuid = _uuid.uuid4() + effective_task = ( + build_research_brief(payload.subject, payload.topics, payload.research_instructions) + if payload.subject + else payload.task + ) + + try: + await _create_session( + task=effective_task, + topics=payload.topics, + session_id=session_uuid, + meta_data={ + "title": derive_session_title(title=payload.title, subject=payload.subject, task=effective_task), + "subject": payload.subject or derive_session_title(task=effective_task), + "research_instructions": payload.research_instructions, + "send_email": payload.send_email, + "autonomous": payload.autonomous, + }, + ) + except Exception as exc: + logger.warning("Could not pre-create session record: %s", exc) + + return { + "session_id": str(session_uuid), + "status": "pending", + "stream_url": f"/orchestrator/stream?session_id={session_uuid}", + } + + +@router.post("/sessions/{session_id}/approve", dependencies=[Depends(require_admin_access)]) +async def approve_session(session_id: str) -> StreamingResponse: + """Approve a paused session and resume synthesis + email delivery. + + Re-runs the orchestrator from the analyzer node using the research + results already collected and saved in the session checkpoint. + """ + import uuid as _uuid + from app.db.base import async_session_factory + from app.db.repositories import ResearchSessionRepository + from app.services.streaming_service import StreamingOrchestratorService + + try: + session_uuid = _uuid.UUID(session_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid session ID format") + + async with async_session_factory() as db: + repo = ResearchSessionRepository(db) + session = await repo.get_by_id(session_uuid) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session.status != "awaiting_approval": + raise HTTPException( + status_code=400, + detail=f"Session is not awaiting approval (status: {session.status})" + ) + # Mark approved before resuming so the node routes correctly + session.status = "running" + await db.commit() + + # Resume: inject pre-loaded research results and force approval bypass + research_results = session.research_results or [] + plan = session.plan or [] + # Mark all plan steps as done so dispatcher skips them + for step in plan: + step["status"] = "done" + + service = StreamingOrchestratorService() + task = session.research_brief or "" + meta = session.meta_data or {} + + return StreamingResponse( + service.stream_run( + task=task, + session_id=session_id, + send_email=meta.get("send_email", False), + autonomous=True, # bypass approval on resume + subject=meta.get("subject"), + title=meta.get("title"), + research_instructions=meta.get("research_instructions"), + # Pre-loaded state injected via topics (the service will merge) + _preloaded_research=research_results, + _preloaded_plan=plan, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@router.post("/sessions/{session_id}/reject", dependencies=[Depends(require_admin_access)]) +async def reject_session(session_id: str) -> dict: + """Reject a paused session — marks it as failed so the user can re-run.""" + import uuid as _uuid + from app.db.base import async_session_factory + from app.db.repositories import ResearchSessionRepository + + try: + session_uuid = _uuid.UUID(session_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid session ID format") + + async with async_session_factory() as db: + repo = ResearchSessionRepository(db) + session = await repo.get_by_id(session_uuid) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session.status != "awaiting_approval": + raise HTTPException( + status_code=400, + detail=f"Session is not awaiting approval (status: {session.status})" + ) + session.status = "failed" + meta = dict(session.meta_data or {}) + meta["rejection_reason"] = "Rejected by user after review" + session.meta_data = meta + await db.commit() + + return {"session_id": session_id, "status": "rejected"} + + @router.get("/status", response_model=dict) async def get_orchestrator_status() -> dict: """Get the current runtime status of the orchestrator.""" diff --git a/app/api/routers/watch_profiles.py b/app/api/routers/watch_profiles.py index 9ef0e26..af7177b 100644 --- a/app/api/routers/watch_profiles.py +++ b/app/api/routers/watch_profiles.py @@ -171,7 +171,13 @@ async def create_profile(body: WatchProfileCreate) -> WatchProfileResponse: raise HTTPException(status_code=404, detail=str(exc)) from exc await db.commit() refreshed = await repo.get_by_id(created.id) - return WatchProfileResponse.from_model(refreshed or created) + result = refreshed or created + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_created(result) + except Exception: + pass + return WatchProfileResponse.from_model(result) @router.get("/{profile_id}", response_model=WatchProfileResponse) @@ -204,6 +210,11 @@ async def update_profile(profile_id: str, body: WatchProfileUpdate) -> WatchProf updated = await repo.update(profile) await db.commit() + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_updated(updated) + except Exception: + pass return WatchProfileResponse.from_model(updated) @@ -215,6 +226,11 @@ async def delete_profile(profile_id: str) -> None: if not deleted: raise HTTPException(status_code=404, detail="Profile not found") await db.commit() + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_deleted(profile_id) + except Exception: + pass @router.post("/{profile_id}/run") diff --git a/app/scheduler/persistent.py b/app/scheduler/persistent.py new file mode 100644 index 0000000..a30fb82 --- /dev/null +++ b/app/scheduler/persistent.py @@ -0,0 +1,277 @@ +""" +Persistent APScheduler-based scheduler for WatchProfiles. + +Replaces the polling loop with real cron jobs persisted in PostgreSQL. +Each active WatchProfile gets an APScheduler job keyed by its UUID. +Jobs survive API restarts and fire at exact scheduled times. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Optional + +from app.core.logging import get_logger + +logger = get_logger(__name__) + +_scheduler: Optional["ProfileScheduler"] = None + + +def get_profile_scheduler() -> "ProfileScheduler": + global _scheduler + if _scheduler is None: + _scheduler = ProfileScheduler() + return _scheduler + + +class ProfileScheduler: + """APScheduler wrapper that persists jobs in PostgreSQL.""" + + _JOB_PREFIX = "profile_" + + def __init__(self) -> None: + self._scheduler = None + self._ready = False + + def _build_scheduler(self, sync_db_url: str): + try: + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore + except ImportError: + logger.warning("apscheduler not installed — falling back to polling loop") + return None + + jobstores = {"default": SQLAlchemyJobStore(url=sync_db_url)} + return AsyncIOScheduler(jobstores=jobstores, timezone="UTC") + + async def start(self) -> None: + """Start the scheduler and load all active profiles from DB.""" + from app.config.settings import get_settings + settings = get_settings() + + self._scheduler = self._build_scheduler(settings.database_sync_url) + if self._scheduler is None: + return + + self._scheduler.start() + self._ready = True + logger.info("APScheduler started with PostgreSQL jobstore") + + # Load all active profiles and schedule them + await self._reload_all_profiles() + + async def stop(self) -> None: + if self._scheduler and self._ready: + self._scheduler.shutdown(wait=False) + self._ready = False + logger.info("APScheduler stopped") + + async def _reload_all_profiles(self) -> None: + """Load all active WatchProfiles from DB and schedule their jobs.""" + if not self._ready: + return + try: + from app.db.base import async_session_factory + from app.db.repositories import WatchProfileRepository + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + profiles = await repo.list_all(active_only=True) + + count = 0 + for profile in profiles: + if profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + count += 1 + logger.info("Scheduled %d watch profile jobs", count) + except Exception as exc: + logger.warning("Could not reload profile jobs: %s", exc) + + def _upsert_job(self, profile) -> None: + """Add or replace the APScheduler job for a WatchProfile.""" + if not self._ready or self._scheduler is None: + return + + job_id = f"{self._JOB_PREFIX}{profile.id}" + trigger = self._build_trigger(profile) + if trigger is None: + self._remove_job(str(profile.id)) + return + + try: + if self._scheduler.get_job(job_id): + self._scheduler.reschedule_job(job_id, trigger=trigger) + logger.debug("Rescheduled job for profile '%s'", profile.name) + else: + self._scheduler.add_job( + _run_profile_job, + trigger=trigger, + id=job_id, + name=f"WatchProfile: {profile.name}", + args=[str(profile.id), profile.name], + replace_existing=True, + misfire_grace_time=300, # 5 min grace for missed fires + ) + logger.info("Scheduled job for profile '%s' (type=%s, time=%s)", + profile.name, profile.schedule_type, profile.schedule_time) + except Exception as exc: + logger.warning("Could not schedule profile '%s': %s", profile.name, exc) + + def _build_trigger(self, profile): + """Build APScheduler trigger from profile schedule config.""" + try: + from apscheduler.triggers.cron import CronTrigger + from apscheduler.triggers.date import DateTrigger + except ImportError: + return None + + if not profile.schedule_time: + return None + + try: + hour, minute = profile.schedule_time.split(":") + hour_int, minute_int = int(hour), int(minute) + except (ValueError, AttributeError): + logger.warning("Invalid schedule_time '%s' for profile '%s'", + profile.schedule_time, profile.name) + return None + + stype = (profile.schedule_type or "weekly").lower() + + if stype == "once": + if not profile.schedule_date: + return None + try: + run_date = datetime.strptime( + f"{profile.schedule_date} {profile.schedule_time}", "%Y-%m-%d %H:%M" + ) + return DateTrigger(run_date=run_date) + except ValueError: + return None + + if stype == "weekly": + days = profile.schedule_days or [] + day_map = { + "monday": "mon", "tuesday": "tue", "wednesday": "wed", + "thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun", + "lundi": "mon", "mardi": "tue", "mercredi": "wed", + "jeudi": "thu", "vendredi": "fri", "samedi": "sat", "dimanche": "sun", + } + cron_days = ",".join(day_map.get(d.lower(), d[:3]) for d in days) if days else "mon-fri" + return CronTrigger(day_of_week=cron_days, hour=hour_int, minute=minute_int) + + if stype == "monthly": + return CronTrigger(day=1, hour=hour_int, minute=minute_int) + + if stype == "custom": + interval = max(1, profile.schedule_interval_months or 1) + # APScheduler doesn't have a native N-month interval, use IntervalTrigger in weeks + from apscheduler.triggers.interval import IntervalTrigger + return IntervalTrigger(weeks=interval * 4) + + # Fallback to daily + return CronTrigger(hour=hour_int, minute=minute_int) + + def _remove_job(self, profile_id: str) -> None: + if not self._ready or self._scheduler is None: + return + job_id = f"{self._JOB_PREFIX}{profile_id}" + try: + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + logger.debug("Removed APScheduler job for profile %s", profile_id) + except Exception as exc: + logger.debug("Could not remove job %s: %s", job_id, exc) + + def on_profile_created(self, profile) -> None: + if profile.is_active and profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + + def on_profile_updated(self, profile) -> None: + if profile.is_active and profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + else: + self._remove_job(str(profile.id)) + + def on_profile_deleted(self, profile_id: str) -> None: + self._remove_job(profile_id) + + def get_jobs_info(self) -> list[dict]: + if not self._ready or self._scheduler is None: + return [] + jobs = [] + for job in self._scheduler.get_jobs(): + if job.id.startswith(self._JOB_PREFIX): + jobs.append({ + "job_id": job.id, + "profile_id": job.id[len(self._JOB_PREFIX):], + "name": job.name, + "next_run": job.next_run_time.isoformat() if job.next_run_time else None, + }) + return jobs + + @property + def is_ready(self) -> bool: + return self._ready + + +async def _run_profile_job(profile_id: str, profile_name: str) -> None: + """APScheduler job function: runs a WatchProfile through the orchestrator.""" + logger.info("APScheduler firing profile '%s'", profile_name) + try: + from app.core.watch_context import WatchContext + from app.db.base import async_session_factory + from app.db.repositories import EmailGroupRepository, WatchProfileRepository + from app.scheduler.service import OrchestratorScheduler + from app.config.settings import get_settings + from app.core.research_brief import build_research_brief + + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + group_repo = EmailGroupRepository(db) + profile = await repo.get_by_id(uuid.UUID(profile_id)) + if not profile or not profile.is_active: + logger.info("Profile '%s' not active or not found, skipping", profile_name) + return + + recipients = await group_repo.resolve_recipients_for_profile(profile) + await repo.touch_last_run(uuid.UUID(profile_id)) + await db.commit() + + ctx = WatchContext.from_profile(profile) + task = build_research_brief( + profile.subject or profile.name, + ctx.topics or None, + profile.focus, + ) + + scheduler = OrchestratorScheduler(mode="v2", settings=get_settings()) + result = await scheduler.run_task( + task=task, + topics=ctx.topics or None, + send_email=bool(recipients), + autonomous=True, + watch_context=ctx, + recipients_override=recipients or None, + ) + + if result.get("success"): + logger.info("APScheduler: profile '%s' completed (session=%s)", + profile_name, result.get("session_id")) + else: + logger.error("APScheduler: profile '%s' failed: %s", + profile_name, result.get("errors")) + + # Deactivate once-profiles after firing + if (profile.schedule_type or "").lower() == "once": + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + p = await repo.get_by_id(uuid.UUID(profile_id)) + if p: + p.is_active = False + await repo.update(p) + await db.commit() + + except Exception as exc: + logger.error("APScheduler profile '%s' raised: %s", profile_name, exc) diff --git a/app/services/streaming_service.py b/app/services/streaming_service.py index 9241489..03bee43 100644 --- a/app/services/streaming_service.py +++ b/app/services/streaming_service.py @@ -69,6 +69,36 @@ async def _finalize_session( await manager.close() +async def _pause_session_for_approval(session_uuid: uuid.UUID, state: dict) -> None: + """Persist research state and mark session as awaiting human approval.""" + manager = None + try: + from app.services.session_manager import SessionManager, SessionPhase + manager = SessionManager(session_uuid) + await manager.initialize() + if manager.session is None: + return + # Save a checkpoint so the state can be resumed after approval + await manager.create_checkpoint( + phase=SessionPhase.COLLECTION, + state_snapshot=state, + articles=state.get("articles", []), + results=state.get("research_results", []), + ) + # Mark session as awaiting_approval + await manager.update_session( + status="awaiting_approval", + research_results=state.get("research_results", []), + ) + logger.info("Session %s paused for human approval (%d results)", + session_uuid, len(state.get("research_results", []))) + except Exception as exc: + logger.warning("Could not pause session %s for approval: %s", session_uuid, exc) + finally: + if manager is not None: + await manager.close() + + async def cleanup_stale_running_sessions(max_age_hours: int = 3) -> int: """Mark sessions stuck in 'running' state for too long as 'failed'. @@ -124,6 +154,8 @@ async def stream_run( subject: Optional[str] = None, title: Optional[str] = None, research_instructions: Optional[str] = None, + _preloaded_research: Optional[list] = None, + _preloaded_plan: Optional[list] = None, ) -> AsyncGenerator[str, None]: """Run the orchestrator and yield SSE-formatted events. @@ -177,10 +209,10 @@ async def stream_run( "topics": topics or [], "send_email": send_email, "metadata": {"topics": topics or [], "subject": subject, "research_instructions": research_instructions, "title": derive_session_title(title=title, subject=subject, task=task)}, - "plan": [], - "current_step_index": 0, + "plan": _preloaded_plan or [], + "current_step_index": len(_preloaded_plan) if _preloaded_plan else 0, "articles": [], - "research_results": [], + "research_results": _preloaded_research or [], "analysis_results": "", "synthesis_result": "", "final_report": "", @@ -189,6 +221,7 @@ async def stream_run( "iteration_count": 0, "max_iterations": 5, "autonomous": autonomous, + "approval_result": "approved" if _preloaded_research else "", } # Yield the first event synchronously before entering the queue loop @@ -291,7 +324,21 @@ async def _produce() -> None: final_report = final_state.get("final_report") errors = final_state.get("errors") - if isinstance(final_report, str) and final_report.strip(): + approval_result = final_state.get("approval_result") + + if approval_result == "awaiting_approval": + # Interactive mode: save research state and pause for user review + await _pause_session_for_approval(session_uuid, final_state) + research_count = len(final_state.get("research_results", [])) + quality = final_state.get("quality_score", 0.0) + await queue.put(self._format_sse("approval_required", { + "session_id": session_id, + "research_count": research_count, + "quality_score": quality, + "message": f"{research_count} sources collectées — en attente de validation avant synthèse", + "timestamp": datetime.now().isoformat() + })) + elif isinstance(final_report, str) and final_report.strip(): await _finalize_session(session_uuid, "completed", final_report=final_report) await queue.put(self._format_sse("session_completed", { "session_id": session_id, diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 59490a5..f8d1077 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -18,21 +18,6 @@ services: networks: - tech-watch-network - redis: - image: redis:7-alpine - ports: - - "6380:6379" - volumes: - - ../.volumes/redis:/data - healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 5s - timeout: 5s - retries: 10 - restart: unless-stopped - networks: - - tech-watch-network - searxng: image: searxng/searxng:latest ports: @@ -100,7 +85,6 @@ services: environment: DATABASE_URL: ${DATABASE_URL:-postgresql+asyncpg://techwatch:techwatch@postgres:5432/techwatch} DATABASE_SYNC_URL: ${DATABASE_SYNC_URL:-postgresql://techwatch:techwatch@postgres:5432/techwatch} - REDIS_URL: ${REDIS_URL:-redis://redis:6379} SEARXNG_URL: ${SEARXNG_URL:-http://searxng:8080} FRONTEND_URL: ${FRONTEND_URL:-http://localhost:3000} CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000,http://127.0.0.1:3000} @@ -113,8 +97,6 @@ services: depends_on: postgres: condition: service_healthy - redis: - condition: service_healthy searxng: condition: service_started volumes: @@ -151,7 +133,6 @@ services: environment: DATABASE_URL: ${DATABASE_URL:-postgresql+asyncpg://techwatch:techwatch@postgres:5432/techwatch} DATABASE_SYNC_URL: ${DATABASE_SYNC_URL:-postgresql://techwatch:techwatch@postgres:5432/techwatch} - REDIS_URL: ${REDIS_URL:-redis://redis:6379} SEARXNG_URL: ${SEARXNG_URL:-http://searxng:8080} FRONTEND_URL: ${FRONTEND_URL:-http://localhost:3000} CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000,http://127.0.0.1:3000} @@ -164,8 +145,6 @@ services: depends_on: postgres: condition: service_healthy - redis: - condition: service_healthy searxng: condition: service_started volumes: @@ -193,7 +172,6 @@ services: environment: DATABASE_URL: ${DATABASE_URL:-postgresql+asyncpg://techwatch:techwatch@postgres:5432/techwatch} DATABASE_SYNC_URL: ${DATABASE_SYNC_URL:-postgresql://techwatch:techwatch@postgres:5432/techwatch} - REDIS_URL: ${REDIS_URL:-redis://redis:6379} SEARXNG_URL: ${SEARXNG_URL:-http://searxng:8080} FRONTEND_URL: ${FRONTEND_URL:-http://localhost:3000} CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000,http://127.0.0.1:3000} @@ -206,8 +184,6 @@ services: depends_on: postgres: condition: service_healthy - redis: - condition: service_healthy searxng: condition: service_started volumes: diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 5295f22..1b5a2bc 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -8,6 +8,7 @@ import { NewsletterPage } from './pages/NewsletterPage'; import { SettingsPage } from './pages/SettingsPage'; import { SourcesPage } from './pages/SourcesPage'; import { EmailGroupsPage } from './pages/EmailGroupsPage'; +import { WatchProfilesPage } from './pages/WatchProfilesPage'; import { LiveRunModal } from './components/LiveRunModal'; import { ApiService } from './services/api'; import type { ActiveSessionInfo, ResearchSession, SessionLaunchPayload } from './types'; @@ -46,7 +47,7 @@ class ErrorBoundary extends Component<{ children: ReactNode }, { error: Error | } } -type Page = 'home' | 'sessions' | 'newsletter' | 'sources' | 'email-groups' | 'settings' | 'detail'; +type Page = 'home' | 'sessions' | 'watch-profiles' | 'newsletter' | 'sources' | 'email-groups' | 'settings' | 'detail'; const SESSION_EVENT_TYPES = [ 'session_created', 'phase_transition', 'plan_updated', @@ -121,9 +122,18 @@ function App() { return () => subscribers.current.get(sessionId)?.delete(cb); }, []); - // Start a new session stream in the background (no navigation) - const handleRunLive = useCallback((payload: SessionLaunchPayload) => { - const sessionId = crypto.randomUUID(); + // Start a new session: pre-create DB record to get a stable ID, navigate to its + // detail page immediately, then open the SSE stream so events start flowing. + const handleRunLive = useCallback(async (payload: SessionLaunchPayload) => { + // Optimistic local ID while the POST is in flight + let sessionId: string = crypto.randomUUID(); + try { + const created = await ApiService.createSession(payload); + sessionId = created.session_id; + } catch { + // Fall back to client-generated UUID — stream will still create the record + } + const streamUrl = ApiService.getStreamUrl(payload, sessionId); buffers.current.set(sessionId, []); @@ -147,7 +157,9 @@ function App() { articleCount: 0, })); setIsModalOpen(false); - setCurrentPage('sessions'); + // Navigate directly to the session detail page with the known ID + setSelectedSessionId(sessionId); + setCurrentPage('detail'); }, [emit]); const handleSessionClick = useCallback((id: string) => { @@ -227,6 +239,7 @@ function App() { )} + {currentPage === 'watch-profiles' && } {currentPage === 'newsletter' && } {currentPage === 'settings' && } {currentPage === 'email-groups' && } diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 92ba90c..dcc7cf7 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -1,5 +1,5 @@ import React, { useEffect, useState } from 'react'; -import { LayoutDashboard, Mail, Settings, Zap, History, Search, Command, Shield, Users } from 'lucide-react'; +import { LayoutDashboard, Mail, Settings, Zap, History, Search, Command, Shield, Users, CalendarDays } from 'lucide-react'; import { ApiService } from '../services/api'; interface SidebarProps { @@ -9,8 +9,9 @@ interface SidebarProps { const navItems = [ { id: 'home', label: 'Accueil', icon: LayoutDashboard }, - { id: 'sessions', label: 'Sessions', icon: History }, - { id: 'newsletter',label: 'Newsletter', icon: Mail }, + { id: 'sessions', label: 'Sessions', icon: History }, + { id: 'watch-profiles', label: 'Profils', icon: CalendarDays }, + { id: 'newsletter', label: 'Newsletter', icon: Mail }, { id: 'sources', label: 'Sources', icon: Search }, { id: 'email-groups', label: 'Email Groups', icon: Users }, { id: 'settings', label: 'Paramètres', icon: Settings }, diff --git a/frontend/src/hooks/useOrchestratorStream.ts b/frontend/src/hooks/useOrchestratorStream.ts index 9e26ac7..f9401b7 100644 --- a/frontend/src/hooks/useOrchestratorStream.ts +++ b/frontend/src/hooks/useOrchestratorStream.ts @@ -20,6 +20,13 @@ export interface StepResult { export interface BusEvent { type: string; data: any } +export interface ApprovalInfo { + sessionId: string; + researchCount: number; + qualityScore: number; + message: string; +} + /** * Subscribe function type: caller provides a callback and receives an unsubscribe fn. * Used to connect to a shared EventSource managed by App.tsx instead of opening a new one. @@ -35,9 +42,10 @@ export const useOrchestratorStream = ( const [articles, setArticles] = useState([]); const [stepResults, setStepResults] = useState>({}); const [phase, setPhase] = useState('idle'); - const [status, setStatus] = useState<'idle' | 'running' | 'completed' | 'failed'>('idle'); + const [status, setStatus] = useState<'idle' | 'running' | 'completed' | 'failed' | 'awaiting_approval'>('idle'); const [error, setError] = useState(null); const [sessionId, setSessionId] = useState(null); + const [approvalInfo, setApprovalInfo] = useState(null); const esRef = useRef(null); useEffect(() => { @@ -47,8 +55,8 @@ export const useOrchestratorStream = ( setStepResults({}); setError(null); setSessionId(null); + setApprovalInfo(null); - // Only mark as running when actually connected to a stream if (url || subscribe) { setPhase('initializing'); setStatus('running'); @@ -91,10 +99,18 @@ export const useOrchestratorStream = ( setError(data.error ?? 'Erreur inconnue'); setStatus('failed'); setPhase('failed'); + } else if (type === 'approval_required') { + setStatus('awaiting_approval'); + setPhase('awaiting_approval'); + setApprovalInfo({ + sessionId: data.session_id ?? '', + researchCount: data.research_count ?? 0, + qualityScore: data.quality_score ?? 0, + message: data.message ?? 'En attente de validation', + }); } }; - // Subscribe mode: use the shared event bus from App.tsx if (subscribe) { const unsubscribe = subscribe((evt) => { try { handleEvent(evt.type, evt.data); } catch { /* noop */ } @@ -104,14 +120,13 @@ export const useOrchestratorStream = ( if (!url) return; - // Normal mode: open a dedicated EventSource const es = new EventSource(url); esRef.current = es; const EVENT_TYPES = [ 'session_created', 'phase_transition', 'plan_updated', 'research_result', 'report_chunk', 'report_completed', - 'session_completed', 'session_failed', + 'session_completed', 'session_failed', 'approval_required', ] as const; for (const type of EVENT_TYPES) { @@ -132,5 +147,5 @@ export const useOrchestratorStream = ( }; }, [url, subscribe]); - return { report, plan, articles, stepResults, phase, status, error, sessionId }; + return { report, plan, articles, stepResults, phase, status, error, sessionId, approvalInfo }; }; diff --git a/frontend/src/pages/SessionDetailPage.tsx b/frontend/src/pages/SessionDetailPage.tsx index 5cfdee8..67bbf48 100644 --- a/frontend/src/pages/SessionDetailPage.tsx +++ b/frontend/src/pages/SessionDetailPage.tsx @@ -7,7 +7,7 @@ import { ApiService } from '../services/api'; import { CheckCircle2, Circle, AlertCircle, Globe, GitBranch, Play, MessageCircle, FileText, ExternalLink, Download, - ChevronRight, Loader2, Clock, + ChevronRight, Loader2, Clock, ThumbsUp, ThumbsDown, } from 'lucide-react'; interface SessionDetailPageProps { @@ -287,6 +287,7 @@ export const SessionDetailPage: React.FC = ({ streamUrl, status, error: streamError, sessionId: streamedSessionId, + approvalInfo, } = useOrchestratorStream(subscribe ? null : (streamUrl || null), subscribe); const [activeTab, setActiveTab] = useState('report'); @@ -295,6 +296,7 @@ export const SessionDetailPage: React.FC = ({ streamUrl, const [loadingSession, setLoadingSession] = useState(false); const [selectedStepId, setSelectedStepId] = useState(null); const [sourceFilter, setSourceFilter] = useState('all'); + const [approvalAction, setApprovalAction] = useState<'idle' | 'approving' | 'rejecting' | 'done'>('idle'); // Load session from DB (history view, not when streaming) useEffect(() => { @@ -362,6 +364,35 @@ export const SessionDetailPage: React.FC = ({ streamUrl, const effectiveError = streamError ?? (session?.status === 'failed' ? 'La session a échoué.' : null); const isStreaming = (!!streamUrl || !!subscribe) && status === 'running'; + const resolvedApprovalId = approvalInfo?.sessionId || session?.id || streamedSessionId; + const isAwaitingApproval = effectiveStatus === 'awaiting_approval' || session?.status === 'awaiting_approval'; + + const handleApprove = async () => { + if (!resolvedApprovalId || approvalAction !== 'idle') return; + setApprovalAction('approving'); + try { + await ApiService.approveSession(resolvedApprovalId); + setApprovalAction('done'); + } catch (err: any) { + alert(err.message); + setApprovalAction('idle'); + } + }; + + const handleReject = async () => { + if (!resolvedApprovalId || approvalAction !== 'idle') return; + if (!confirm('Rejeter cette session ? Elle sera marquée comme échouée.')) return; + setApprovalAction('rejecting'); + try { + await ApiService.rejectSession(resolvedApprovalId); + setApprovalAction('done'); + if (session) setSession({ ...session, status: 'failed' }); + } catch (err: any) { + alert(err.message); + setApprovalAction('idle'); + } + }; + // Source filter counts const filterCounts = useMemo(() => ({ all: allSources.length, @@ -400,6 +431,11 @@ export const SessionDetailPage: React.FC = ({ streamUrl, {effectiveError} )} + {isAwaitingApproval && ( +
+ Validation requise +
+ )} {finalReport && ( + + + + )} + {isAwaitingApproval && approvalAction === 'done' && ( +
+ Action enregistrée — la session va reprendre. +
+ )} +
diff --git a/frontend/src/pages/WatchProfilesPage.tsx b/frontend/src/pages/WatchProfilesPage.tsx new file mode 100644 index 0000000..db5ca65 --- /dev/null +++ b/frontend/src/pages/WatchProfilesPage.tsx @@ -0,0 +1,643 @@ +import React, { useEffect, useMemo, useState } from 'react'; +import { CalendarDays, Clock, Play, Plus, Save, Trash2 } from 'lucide-react'; +import { ApiService } from '../services/api'; +import type { EmailGroup, WatchProfile } from '../types'; + +type DepthOption = 'brief' | 'standard' | 'deep'; +type FormatOption = 'digest' | 'report' | 'newsletter'; +type ScheduleType = 'none' | 'once' | 'weekly' | 'monthly'; + +type DraftProfile = { + id?: string; + name: string; + subject: string; + topics: string; + depth: DepthOption; + format: FormatOption; + language: string; + is_active: boolean; + schedule_type: ScheduleType; + schedule_time: string; + schedule_days: string[]; + email_group_ids: string[]; +}; + +const WEEK_DAYS = [ + { id: 'lun', label: 'Lun' }, + { id: 'mar', label: 'Mar' }, + { id: 'mer', label: 'Mer' }, + { id: 'jeu', label: 'Jeu' }, + { id: 'ven', label: 'Ven' }, + { id: 'sam', label: 'Sam' }, + { id: 'dim', label: 'Dim' }, +]; + +const emptyDraft = (): DraftProfile => ({ + name: '', + subject: '', + topics: '', + depth: 'standard', + format: 'digest', + language: 'fr', + is_active: true, + schedule_type: 'none', + schedule_time: '08:00', + schedule_days: [], + email_group_ids: [], +}); + +function formatDate(iso?: string): string { + if (!iso) return '—'; + try { + return new Date(iso).toLocaleDateString('fr-FR', { + day: '2-digit', + month: 'short', + year: 'numeric', + hour: '2-digit', + minute: '2-digit', + }); + } catch { + return iso; + } +} + +export const WatchProfilesPage: React.FC = () => { + const [profiles, setProfiles] = useState([]); + const [emailGroups, setEmailGroups] = useState([]); + const [selectedId, setSelectedId] = useState(null); + const [draft, setDraft] = useState(emptyDraft()); + const [loading, setLoading] = useState(true); + const [saving, setSaving] = useState(false); + const [running, setRunning] = useState(false); + const [error, setError] = useState(null); + const [successMessage, setSuccessMessage] = useState(null); + + const selectedProfile = useMemo( + () => profiles.find((p) => p.id === selectedId) ?? null, + [profiles, selectedId], + ); + + const loadData = async () => { + setLoading(true); + setError(null); + try { + const [profilesData, groupsData] = await Promise.all([ + ApiService.getWatchProfiles(), + ApiService.getEmailGroups(), + ]); + setProfiles(profilesData); + setEmailGroups(groupsData); + } catch (err: any) { + setError(err.message); + } finally { + setLoading(false); + } + }; + + useEffect(() => { void loadData(); }, []); + + useEffect(() => { + if (!selectedProfile) return; + const rawSchedule = selectedProfile.schedule_type ?? 'none'; + const scheduleType: ScheduleType = + rawSchedule === 'weekly' || rawSchedule === 'once' || rawSchedule === 'monthly' + ? rawSchedule + : rawSchedule === 'none' + ? 'none' + : 'weekly'; // map 'custom' or unknown → 'weekly' + setDraft({ + id: selectedProfile.id, + name: selectedProfile.name, + subject: selectedProfile.subject ?? '', + topics: selectedProfile.topics.join(', '), + depth: selectedProfile.depth, + format: selectedProfile.format, + language: selectedProfile.language ?? 'fr', + is_active: selectedProfile.is_active, + schedule_type: scheduleType, + schedule_time: selectedProfile.schedule_time ?? '08:00', + schedule_days: selectedProfile.schedule_days ?? [], + email_group_ids: selectedProfile.email_groups.map((g) => g.id), + }); + }, [selectedProfile]); + + const handleNew = () => { + setSelectedId(null); + setDraft(emptyDraft()); + setError(null); + setSuccessMessage(null); + }; + + const showSuccess = (msg: string) => { + setSuccessMessage(msg); + window.setTimeout(() => setSuccessMessage(null), 4000); + }; + + const handleSave = async () => { + setSaving(true); + setError(null); + setSuccessMessage(null); + + const name = draft.name.trim(); + const subject = draft.subject.trim(); + if (!name) { + setError('Le nom du profil est requis.'); + setSaving(false); + return; + } + if (!subject) { + setError('Le sujet est requis.'); + setSaving(false); + return; + } + + const topics = draft.topics + .split(',') + .map((t) => t.trim()) + .filter(Boolean); + + const payload = { + name, + subject, + topics, + depth: draft.depth, + format: draft.format, + language: draft.language || undefined, + is_active: draft.is_active, + schedule_type: draft.schedule_type === 'none' ? undefined : draft.schedule_type, + schedule_time: draft.schedule_type !== 'none' ? draft.schedule_time : undefined, + schedule_days: draft.schedule_type === 'weekly' ? draft.schedule_days : [], + email_group_ids: draft.email_group_ids, + }; + + try { + const saved = draft.id + ? await ApiService.updateWatchProfile(draft.id, payload) + : await ApiService.createWatchProfile({ ...payload, subject }); + await loadData(); + setSelectedId(saved.id); + showSuccess(draft.id ? 'Profil mis à jour.' : 'Profil créé.'); + } catch (err: any) { + setError(err.message); + } finally { + setSaving(false); + } + }; + + const handleDelete = async () => { + if (!draft.id) return; + if (!window.confirm(`Supprimer le profil "${draft.name}" ?`)) return; + setSaving(true); + setError(null); + try { + await ApiService.deleteWatchProfile(draft.id); + await loadData(); + setSelectedId(null); + setDraft(emptyDraft()); + showSuccess('Profil supprimé.'); + } catch (err: any) { + setError(err.message); + } finally { + setSaving(false); + } + }; + + const handleRun = async () => { + if (!draft.id) return; + setRunning(true); + setError(null); + setSuccessMessage(null); + try { + const result = await ApiService.runProfile(draft.id, { send_email: draft.email_group_ids.length > 0 }); + if (result.success) { + const parts: string[] = ['Analyse lancée avec succès.']; + if (result.session_id) parts.push(`Session : ${result.session_id.slice(0, 8)}…`); + if (result.email_sent) parts.push('Email envoyé.'); + showSuccess(parts.join(' ')); + await loadData(); + } else { + setError(result.error ?? 'Échec du lancement.'); + } + } catch (err: any) { + setError(err.message); + } finally { + setRunning(false); + } + }; + + const toggleDay = (day: string) => { + setDraft((current) => ({ + ...current, + schedule_days: current.schedule_days.includes(day) + ? current.schedule_days.filter((d) => d !== day) + : [...current.schedule_days, day], + })); + }; + + const toggleEmailGroup = (groupId: string) => { + setDraft((current) => ({ + ...current, + email_group_ids: current.email_group_ids.includes(groupId) + ? current.email_group_ids.filter((id) => id !== groupId) + : [...current.email_group_ids, groupId], + })); + }; + + return ( +
+
+
+

Profils de veille

+

+ Configurez et planifiez vos analyses de veille technologique automatiques. +

+
+ +
+ + {error && ( +
+ {error} +
+ )} + + {successMessage && ( +
+ {successMessage} +
+ )} + +
+ {/* Left panel — profile list */} +
+
+
+ + Profils +
+ {profiles.length} +
+ + {loading ? ( +
Chargement…
+ ) : profiles.length === 0 ? ( +
+ Aucun profil pour le moment. Créez-en un pour automatiser vos analyses. +
+ ) : ( + profiles.map((profile) => { + const active = profile.id === selectedId; + return ( + + ); + }) + )} +
+ + {/* Right panel — form */} +
+
+
+
{draft.id ? 'Modifier le profil' : 'Nouveau profil'}
+
+ {draft.id ? 'Modifiez les paramètres de ce profil de veille.' : 'Définissez un profil pour automatiser vos analyses.'} +
+
+ +
+ + {/* Basic fields */} +
+
+
+ + setDraft((c) => ({ ...c, name: e.target.value }))} + placeholder="Veille IA hebdomadaire" + style={inputStyle} + /> +
+
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+ +