From 933f49b4a3d0d4e29940c7b5d8e72b987b1efa5b Mon Sep 17 00:00:00 2001 From: EngineerProjects Date: Thu, 11 Jun 2026 14:41:34 +0200 Subject: [PATCH 1/5] feat: Watch Profiles page, persistent scheduler, human approval flow - Add Watch Profiles React page (full CRUD: create, update, delete, run now) with schedule config, email group multi-select, sidebar nav entry - Replace in-memory polling scheduler with APScheduler + PostgreSQL job store; falls back to polling loop if apscheduler unavailable - Add human-in-the-loop approval: orchestrator pauses after research phase when autonomous=False and quality below threshold; session saved as awaiting_approval with checkpoint; new /approve and /reject endpoints resume or cancel synthesis; SSE approval_required event drives frontend banner with Approve/Reject buttons in SessionDetailPage - Add ApiService.approveSession and rejectSession methods --- app/agents/orchestrator/graph.py | 7 +- app/agents/orchestrator/nodes.py | 32 +- app/api/main.py | 28 +- app/api/routers/orchestrator.py | 98 ++- app/api/routers/watch_profiles.py | 18 +- app/scheduler/persistent.py | 277 +++++++++ app/services/streaming_service.py | 55 +- frontend/src/App.tsx | 4 +- frontend/src/components/Sidebar.tsx | 7 +- frontend/src/hooks/useOrchestratorStream.ts | 27 +- frontend/src/pages/SessionDetailPage.tsx | 72 ++- frontend/src/pages/WatchProfilesPage.tsx | 643 ++++++++++++++++++++ frontend/src/services/api.ts | 8 + pyproject.toml | 1 + 14 files changed, 1241 insertions(+), 36 deletions(-) create mode 100644 app/scheduler/persistent.py create mode 100644 frontend/src/pages/WatchProfilesPage.tsx diff --git a/app/agents/orchestrator/graph.py b/app/agents/orchestrator/graph.py index 12e80dc..4dafb32 100644 --- a/app/agents/orchestrator/graph.py +++ b/app/agents/orchestrator/graph.py @@ -160,6 +160,7 @@ def build(self) -> StateGraph: "proceed": "analyzer", "retry": "dispatcher", "bypass": "analyzer", + "pause": END, } ) @@ -211,7 +212,7 @@ def _route_after_validation(self, state: OrchestratorState) -> Literal["retry", return "retry" return "approval" - def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", "retry", "bypass"]: + def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", "retry", "bypass", "pause"]: approval_result = state.get("approval_result", "pending") autonomous = state.get("autonomous", False) @@ -220,6 +221,10 @@ def _route_after_approval(self, state: OrchestratorState) -> Literal["proceed", if approval_result == "approved": return "proceed" + + if approval_result == "awaiting_approval": + return "pause" + return "retry" diff --git a/app/agents/orchestrator/nodes.py b/app/agents/orchestrator/nodes.py index a0be67a..6123a89 100644 --- a/app/agents/orchestrator/nodes.py +++ b/app/agents/orchestrator/nodes.py @@ -1414,25 +1414,35 @@ async def validator(self, state: OrchestratorState) -> OrchestratorState: async def human_approval(self, state: OrchestratorState) -> OrchestratorState: """Human-in-the-loop approval checkpoint. - This node pauses execution and waits for human approval. - In a real implementation, this would use LangGraph's interrupt - mechanism or a message queue to pause and wait for user input. - - For now, it automatically approves if quality score is above threshold. + In autonomous mode: auto-approves if quality >= threshold. + In interactive mode (autonomous=False): pauses the graph when quality + is below threshold so the user can review research results before synthesis. + The streaming service detects approval_result="awaiting_approval" and + emits an `approval_required` SSE event, then saves session state. """ quality_score = state.get("quality_score", 0.0) approval_threshold = state.get("approval_threshold", 0.7) - - state["approval_status"] = "auto_approved" if quality_score >= approval_threshold else "needs_review" + autonomous = state.get("autonomous", True) if quality_score >= approval_threshold: logger.info("Auto-approved: quality %.2f >= threshold %.2f", quality_score, approval_threshold) state["approval_result"] = "approved" + state["approval_status"] = "auto_approved" + state["approved_at"] = datetime.now().isoformat() + elif autonomous: + # Autonomous mode: bypass regardless of quality + logger.warning("Bypassing approval in autonomous mode (quality %.2f < %.2f)", + quality_score, approval_threshold) + state["approval_result"] = "approved" + state["approval_status"] = "auto_approved_autonomous" + state["approved_at"] = datetime.now().isoformat() else: - logger.warning("Needs review: quality %.2f < threshold %.2f", quality_score, approval_threshold) - state["approval_result"] = "pending" - - state["approved_at"] = datetime.now().isoformat() if quality_score >= approval_threshold else None + # Interactive mode + low quality: pause for human review + logger.info("Interactive mode: pausing for human approval (quality %.2f < %.2f)", + quality_score, approval_threshold) + state["approval_result"] = "awaiting_approval" + state["approval_status"] = "needs_review" + state["approved_at"] = None return state diff --git a/app/api/main.py b/app/api/main.py index c181820..ebebb86 100644 --- a/app/api/main.py +++ b/app/api/main.py @@ -223,18 +223,32 @@ async def lifespan(app: FastAPI): except Exception as _exc: logger.warning("Stale session cleanup skipped: %s", _exc) - # Start in-process profile scheduler - scheduler_task = asyncio.create_task(_profile_scheduler_loop()) + # Start persistent APScheduler (falls back silently if apscheduler not installed) + from app.scheduler.persistent import get_profile_scheduler + profile_scheduler = get_profile_scheduler() + try: + await profile_scheduler.start() + except Exception as _sched_exc: + logger.warning("APScheduler init failed, falling back to polling loop: %s", _sched_exc) + profile_scheduler = None + + # Fallback polling loop if APScheduler unavailable + scheduler_task = None + if not (profile_scheduler and profile_scheduler.is_ready): + scheduler_task = asyncio.create_task(_profile_scheduler_loop()) yield # Shutdown logger.info("Shutting down tech-watch-agent API") - scheduler_task.cancel() - try: - await scheduler_task - except asyncio.CancelledError: - pass + if profile_scheduler and profile_scheduler.is_ready: + await profile_scheduler.stop() + if scheduler_task: + scheduler_task.cancel() + try: + await scheduler_task + except asyncio.CancelledError: + pass try: await close_db() except Exception as exc: diff --git a/app/api/routers/orchestrator.py b/app/api/routers/orchestrator.py index 9c9b65f..92a36cf 100644 --- a/app/api/routers/orchestrator.py +++ b/app/api/routers/orchestrator.py @@ -1,6 +1,7 @@ -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from typing import Optional +from app.api.security import require_admin_access from app.config.settings import get_settings from app.api.models import OrchestratorRequest, OrchestratorResponse from app.core.research_brief import build_research_brief, derive_session_title @@ -136,6 +137,101 @@ async def setup_scheduled_task( } +@router.post("/sessions/{session_id}/approve", dependencies=[Depends(require_admin_access)]) +async def approve_session(session_id: str) -> StreamingResponse: + """Approve a paused session and resume synthesis + email delivery. + + Re-runs the orchestrator from the analyzer node using the research + results already collected and saved in the session checkpoint. + """ + import uuid as _uuid + from app.db.base import async_session_factory + from app.db.repositories import ResearchSessionRepository + from app.services.streaming_service import StreamingOrchestratorService + + try: + session_uuid = _uuid.UUID(session_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid session ID format") + + async with async_session_factory() as db: + repo = ResearchSessionRepository(db) + session = await repo.get_by_id(session_uuid) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session.status != "awaiting_approval": + raise HTTPException( + status_code=400, + detail=f"Session is not awaiting approval (status: {session.status})" + ) + # Mark approved before resuming so the node routes correctly + session.status = "running" + await db.commit() + + # Resume: inject pre-loaded research results and force approval bypass + research_results = session.research_results or [] + plan = session.plan or [] + # Mark all plan steps as done so dispatcher skips them + for step in plan: + step["status"] = "done" + + service = StreamingOrchestratorService() + task = session.research_brief or "" + meta = session.meta_data or {} + + return StreamingResponse( + service.stream_run( + task=task, + session_id=session_id, + send_email=meta.get("send_email", False), + autonomous=True, # bypass approval on resume + subject=meta.get("subject"), + title=meta.get("title"), + research_instructions=meta.get("research_instructions"), + # Pre-loaded state injected via topics (the service will merge) + _preloaded_research=research_results, + _preloaded_plan=plan, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@router.post("/sessions/{session_id}/reject", dependencies=[Depends(require_admin_access)]) +async def reject_session(session_id: str) -> dict: + """Reject a paused session — marks it as failed so the user can re-run.""" + import uuid as _uuid + from app.db.base import async_session_factory + from app.db.repositories import ResearchSessionRepository + + try: + session_uuid = _uuid.UUID(session_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid session ID format") + + async with async_session_factory() as db: + repo = ResearchSessionRepository(db) + session = await repo.get_by_id(session_uuid) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session.status != "awaiting_approval": + raise HTTPException( + status_code=400, + detail=f"Session is not awaiting approval (status: {session.status})" + ) + session.status = "failed" + meta = dict(session.meta_data or {}) + meta["rejection_reason"] = "Rejected by user after review" + session.meta_data = meta + await db.commit() + + return {"session_id": session_id, "status": "rejected"} + + @router.get("/status", response_model=dict) async def get_orchestrator_status() -> dict: """Get the current runtime status of the orchestrator.""" diff --git a/app/api/routers/watch_profiles.py b/app/api/routers/watch_profiles.py index 9ef0e26..af7177b 100644 --- a/app/api/routers/watch_profiles.py +++ b/app/api/routers/watch_profiles.py @@ -171,7 +171,13 @@ async def create_profile(body: WatchProfileCreate) -> WatchProfileResponse: raise HTTPException(status_code=404, detail=str(exc)) from exc await db.commit() refreshed = await repo.get_by_id(created.id) - return WatchProfileResponse.from_model(refreshed or created) + result = refreshed or created + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_created(result) + except Exception: + pass + return WatchProfileResponse.from_model(result) @router.get("/{profile_id}", response_model=WatchProfileResponse) @@ -204,6 +210,11 @@ async def update_profile(profile_id: str, body: WatchProfileUpdate) -> WatchProf updated = await repo.update(profile) await db.commit() + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_updated(updated) + except Exception: + pass return WatchProfileResponse.from_model(updated) @@ -215,6 +226,11 @@ async def delete_profile(profile_id: str) -> None: if not deleted: raise HTTPException(status_code=404, detail="Profile not found") await db.commit() + try: + from app.scheduler.persistent import get_profile_scheduler + get_profile_scheduler().on_profile_deleted(profile_id) + except Exception: + pass @router.post("/{profile_id}/run") diff --git a/app/scheduler/persistent.py b/app/scheduler/persistent.py new file mode 100644 index 0000000..a30fb82 --- /dev/null +++ b/app/scheduler/persistent.py @@ -0,0 +1,277 @@ +""" +Persistent APScheduler-based scheduler for WatchProfiles. + +Replaces the polling loop with real cron jobs persisted in PostgreSQL. +Each active WatchProfile gets an APScheduler job keyed by its UUID. +Jobs survive API restarts and fire at exact scheduled times. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Optional + +from app.core.logging import get_logger + +logger = get_logger(__name__) + +_scheduler: Optional["ProfileScheduler"] = None + + +def get_profile_scheduler() -> "ProfileScheduler": + global _scheduler + if _scheduler is None: + _scheduler = ProfileScheduler() + return _scheduler + + +class ProfileScheduler: + """APScheduler wrapper that persists jobs in PostgreSQL.""" + + _JOB_PREFIX = "profile_" + + def __init__(self) -> None: + self._scheduler = None + self._ready = False + + def _build_scheduler(self, sync_db_url: str): + try: + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore + except ImportError: + logger.warning("apscheduler not installed — falling back to polling loop") + return None + + jobstores = {"default": SQLAlchemyJobStore(url=sync_db_url)} + return AsyncIOScheduler(jobstores=jobstores, timezone="UTC") + + async def start(self) -> None: + """Start the scheduler and load all active profiles from DB.""" + from app.config.settings import get_settings + settings = get_settings() + + self._scheduler = self._build_scheduler(settings.database_sync_url) + if self._scheduler is None: + return + + self._scheduler.start() + self._ready = True + logger.info("APScheduler started with PostgreSQL jobstore") + + # Load all active profiles and schedule them + await self._reload_all_profiles() + + async def stop(self) -> None: + if self._scheduler and self._ready: + self._scheduler.shutdown(wait=False) + self._ready = False + logger.info("APScheduler stopped") + + async def _reload_all_profiles(self) -> None: + """Load all active WatchProfiles from DB and schedule their jobs.""" + if not self._ready: + return + try: + from app.db.base import async_session_factory + from app.db.repositories import WatchProfileRepository + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + profiles = await repo.list_all(active_only=True) + + count = 0 + for profile in profiles: + if profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + count += 1 + logger.info("Scheduled %d watch profile jobs", count) + except Exception as exc: + logger.warning("Could not reload profile jobs: %s", exc) + + def _upsert_job(self, profile) -> None: + """Add or replace the APScheduler job for a WatchProfile.""" + if not self._ready or self._scheduler is None: + return + + job_id = f"{self._JOB_PREFIX}{profile.id}" + trigger = self._build_trigger(profile) + if trigger is None: + self._remove_job(str(profile.id)) + return + + try: + if self._scheduler.get_job(job_id): + self._scheduler.reschedule_job(job_id, trigger=trigger) + logger.debug("Rescheduled job for profile '%s'", profile.name) + else: + self._scheduler.add_job( + _run_profile_job, + trigger=trigger, + id=job_id, + name=f"WatchProfile: {profile.name}", + args=[str(profile.id), profile.name], + replace_existing=True, + misfire_grace_time=300, # 5 min grace for missed fires + ) + logger.info("Scheduled job for profile '%s' (type=%s, time=%s)", + profile.name, profile.schedule_type, profile.schedule_time) + except Exception as exc: + logger.warning("Could not schedule profile '%s': %s", profile.name, exc) + + def _build_trigger(self, profile): + """Build APScheduler trigger from profile schedule config.""" + try: + from apscheduler.triggers.cron import CronTrigger + from apscheduler.triggers.date import DateTrigger + except ImportError: + return None + + if not profile.schedule_time: + return None + + try: + hour, minute = profile.schedule_time.split(":") + hour_int, minute_int = int(hour), int(minute) + except (ValueError, AttributeError): + logger.warning("Invalid schedule_time '%s' for profile '%s'", + profile.schedule_time, profile.name) + return None + + stype = (profile.schedule_type or "weekly").lower() + + if stype == "once": + if not profile.schedule_date: + return None + try: + run_date = datetime.strptime( + f"{profile.schedule_date} {profile.schedule_time}", "%Y-%m-%d %H:%M" + ) + return DateTrigger(run_date=run_date) + except ValueError: + return None + + if stype == "weekly": + days = profile.schedule_days or [] + day_map = { + "monday": "mon", "tuesday": "tue", "wednesday": "wed", + "thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun", + "lundi": "mon", "mardi": "tue", "mercredi": "wed", + "jeudi": "thu", "vendredi": "fri", "samedi": "sat", "dimanche": "sun", + } + cron_days = ",".join(day_map.get(d.lower(), d[:3]) for d in days) if days else "mon-fri" + return CronTrigger(day_of_week=cron_days, hour=hour_int, minute=minute_int) + + if stype == "monthly": + return CronTrigger(day=1, hour=hour_int, minute=minute_int) + + if stype == "custom": + interval = max(1, profile.schedule_interval_months or 1) + # APScheduler doesn't have a native N-month interval, use IntervalTrigger in weeks + from apscheduler.triggers.interval import IntervalTrigger + return IntervalTrigger(weeks=interval * 4) + + # Fallback to daily + return CronTrigger(hour=hour_int, minute=minute_int) + + def _remove_job(self, profile_id: str) -> None: + if not self._ready or self._scheduler is None: + return + job_id = f"{self._JOB_PREFIX}{profile_id}" + try: + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + logger.debug("Removed APScheduler job for profile %s", profile_id) + except Exception as exc: + logger.debug("Could not remove job %s: %s", job_id, exc) + + def on_profile_created(self, profile) -> None: + if profile.is_active and profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + + def on_profile_updated(self, profile) -> None: + if profile.is_active and profile.schedule_time and profile.schedule_type: + self._upsert_job(profile) + else: + self._remove_job(str(profile.id)) + + def on_profile_deleted(self, profile_id: str) -> None: + self._remove_job(profile_id) + + def get_jobs_info(self) -> list[dict]: + if not self._ready or self._scheduler is None: + return [] + jobs = [] + for job in self._scheduler.get_jobs(): + if job.id.startswith(self._JOB_PREFIX): + jobs.append({ + "job_id": job.id, + "profile_id": job.id[len(self._JOB_PREFIX):], + "name": job.name, + "next_run": job.next_run_time.isoformat() if job.next_run_time else None, + }) + return jobs + + @property + def is_ready(self) -> bool: + return self._ready + + +async def _run_profile_job(profile_id: str, profile_name: str) -> None: + """APScheduler job function: runs a WatchProfile through the orchestrator.""" + logger.info("APScheduler firing profile '%s'", profile_name) + try: + from app.core.watch_context import WatchContext + from app.db.base import async_session_factory + from app.db.repositories import EmailGroupRepository, WatchProfileRepository + from app.scheduler.service import OrchestratorScheduler + from app.config.settings import get_settings + from app.core.research_brief import build_research_brief + + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + group_repo = EmailGroupRepository(db) + profile = await repo.get_by_id(uuid.UUID(profile_id)) + if not profile or not profile.is_active: + logger.info("Profile '%s' not active or not found, skipping", profile_name) + return + + recipients = await group_repo.resolve_recipients_for_profile(profile) + await repo.touch_last_run(uuid.UUID(profile_id)) + await db.commit() + + ctx = WatchContext.from_profile(profile) + task = build_research_brief( + profile.subject or profile.name, + ctx.topics or None, + profile.focus, + ) + + scheduler = OrchestratorScheduler(mode="v2", settings=get_settings()) + result = await scheduler.run_task( + task=task, + topics=ctx.topics or None, + send_email=bool(recipients), + autonomous=True, + watch_context=ctx, + recipients_override=recipients or None, + ) + + if result.get("success"): + logger.info("APScheduler: profile '%s' completed (session=%s)", + profile_name, result.get("session_id")) + else: + logger.error("APScheduler: profile '%s' failed: %s", + profile_name, result.get("errors")) + + # Deactivate once-profiles after firing + if (profile.schedule_type or "").lower() == "once": + async with async_session_factory() as db: + repo = WatchProfileRepository(db) + p = await repo.get_by_id(uuid.UUID(profile_id)) + if p: + p.is_active = False + await repo.update(p) + await db.commit() + + except Exception as exc: + logger.error("APScheduler profile '%s' raised: %s", profile_name, exc) diff --git a/app/services/streaming_service.py b/app/services/streaming_service.py index 9241489..03bee43 100644 --- a/app/services/streaming_service.py +++ b/app/services/streaming_service.py @@ -69,6 +69,36 @@ async def _finalize_session( await manager.close() +async def _pause_session_for_approval(session_uuid: uuid.UUID, state: dict) -> None: + """Persist research state and mark session as awaiting human approval.""" + manager = None + try: + from app.services.session_manager import SessionManager, SessionPhase + manager = SessionManager(session_uuid) + await manager.initialize() + if manager.session is None: + return + # Save a checkpoint so the state can be resumed after approval + await manager.create_checkpoint( + phase=SessionPhase.COLLECTION, + state_snapshot=state, + articles=state.get("articles", []), + results=state.get("research_results", []), + ) + # Mark session as awaiting_approval + await manager.update_session( + status="awaiting_approval", + research_results=state.get("research_results", []), + ) + logger.info("Session %s paused for human approval (%d results)", + session_uuid, len(state.get("research_results", []))) + except Exception as exc: + logger.warning("Could not pause session %s for approval: %s", session_uuid, exc) + finally: + if manager is not None: + await manager.close() + + async def cleanup_stale_running_sessions(max_age_hours: int = 3) -> int: """Mark sessions stuck in 'running' state for too long as 'failed'. @@ -124,6 +154,8 @@ async def stream_run( subject: Optional[str] = None, title: Optional[str] = None, research_instructions: Optional[str] = None, + _preloaded_research: Optional[list] = None, + _preloaded_plan: Optional[list] = None, ) -> AsyncGenerator[str, None]: """Run the orchestrator and yield SSE-formatted events. @@ -177,10 +209,10 @@ async def stream_run( "topics": topics or [], "send_email": send_email, "metadata": {"topics": topics or [], "subject": subject, "research_instructions": research_instructions, "title": derive_session_title(title=title, subject=subject, task=task)}, - "plan": [], - "current_step_index": 0, + "plan": _preloaded_plan or [], + "current_step_index": len(_preloaded_plan) if _preloaded_plan else 0, "articles": [], - "research_results": [], + "research_results": _preloaded_research or [], "analysis_results": "", "synthesis_result": "", "final_report": "", @@ -189,6 +221,7 @@ async def stream_run( "iteration_count": 0, "max_iterations": 5, "autonomous": autonomous, + "approval_result": "approved" if _preloaded_research else "", } # Yield the first event synchronously before entering the queue loop @@ -291,7 +324,21 @@ async def _produce() -> None: final_report = final_state.get("final_report") errors = final_state.get("errors") - if isinstance(final_report, str) and final_report.strip(): + approval_result = final_state.get("approval_result") + + if approval_result == "awaiting_approval": + # Interactive mode: save research state and pause for user review + await _pause_session_for_approval(session_uuid, final_state) + research_count = len(final_state.get("research_results", [])) + quality = final_state.get("quality_score", 0.0) + await queue.put(self._format_sse("approval_required", { + "session_id": session_id, + "research_count": research_count, + "quality_score": quality, + "message": f"{research_count} sources collectées — en attente de validation avant synthèse", + "timestamp": datetime.now().isoformat() + })) + elif isinstance(final_report, str) and final_report.strip(): await _finalize_session(session_uuid, "completed", final_report=final_report) await queue.put(self._format_sse("session_completed", { "session_id": session_id, diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 5295f22..b443b90 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -8,6 +8,7 @@ import { NewsletterPage } from './pages/NewsletterPage'; import { SettingsPage } from './pages/SettingsPage'; import { SourcesPage } from './pages/SourcesPage'; import { EmailGroupsPage } from './pages/EmailGroupsPage'; +import { WatchProfilesPage } from './pages/WatchProfilesPage'; import { LiveRunModal } from './components/LiveRunModal'; import { ApiService } from './services/api'; import type { ActiveSessionInfo, ResearchSession, SessionLaunchPayload } from './types'; @@ -46,7 +47,7 @@ class ErrorBoundary extends Component<{ children: ReactNode }, { error: Error | } } -type Page = 'home' | 'sessions' | 'newsletter' | 'sources' | 'email-groups' | 'settings' | 'detail'; +type Page = 'home' | 'sessions' | 'watch-profiles' | 'newsletter' | 'sources' | 'email-groups' | 'settings' | 'detail'; const SESSION_EVENT_TYPES = [ 'session_created', 'phase_transition', 'plan_updated', @@ -227,6 +228,7 @@ function App() { )} + {currentPage === 'watch-profiles' && } {currentPage === 'newsletter' && } {currentPage === 'settings' && } {currentPage === 'email-groups' && } diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 92ba90c..dcc7cf7 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -1,5 +1,5 @@ import React, { useEffect, useState } from 'react'; -import { LayoutDashboard, Mail, Settings, Zap, History, Search, Command, Shield, Users } from 'lucide-react'; +import { LayoutDashboard, Mail, Settings, Zap, History, Search, Command, Shield, Users, CalendarDays } from 'lucide-react'; import { ApiService } from '../services/api'; interface SidebarProps { @@ -9,8 +9,9 @@ interface SidebarProps { const navItems = [ { id: 'home', label: 'Accueil', icon: LayoutDashboard }, - { id: 'sessions', label: 'Sessions', icon: History }, - { id: 'newsletter',label: 'Newsletter', icon: Mail }, + { id: 'sessions', label: 'Sessions', icon: History }, + { id: 'watch-profiles', label: 'Profils', icon: CalendarDays }, + { id: 'newsletter', label: 'Newsletter', icon: Mail }, { id: 'sources', label: 'Sources', icon: Search }, { id: 'email-groups', label: 'Email Groups', icon: Users }, { id: 'settings', label: 'Paramètres', icon: Settings }, diff --git a/frontend/src/hooks/useOrchestratorStream.ts b/frontend/src/hooks/useOrchestratorStream.ts index 9e26ac7..f9401b7 100644 --- a/frontend/src/hooks/useOrchestratorStream.ts +++ b/frontend/src/hooks/useOrchestratorStream.ts @@ -20,6 +20,13 @@ export interface StepResult { export interface BusEvent { type: string; data: any } +export interface ApprovalInfo { + sessionId: string; + researchCount: number; + qualityScore: number; + message: string; +} + /** * Subscribe function type: caller provides a callback and receives an unsubscribe fn. * Used to connect to a shared EventSource managed by App.tsx instead of opening a new one. @@ -35,9 +42,10 @@ export const useOrchestratorStream = ( const [articles, setArticles] = useState([]); const [stepResults, setStepResults] = useState>({}); const [phase, setPhase] = useState('idle'); - const [status, setStatus] = useState<'idle' | 'running' | 'completed' | 'failed'>('idle'); + const [status, setStatus] = useState<'idle' | 'running' | 'completed' | 'failed' | 'awaiting_approval'>('idle'); const [error, setError] = useState(null); const [sessionId, setSessionId] = useState(null); + const [approvalInfo, setApprovalInfo] = useState(null); const esRef = useRef(null); useEffect(() => { @@ -47,8 +55,8 @@ export const useOrchestratorStream = ( setStepResults({}); setError(null); setSessionId(null); + setApprovalInfo(null); - // Only mark as running when actually connected to a stream if (url || subscribe) { setPhase('initializing'); setStatus('running'); @@ -91,10 +99,18 @@ export const useOrchestratorStream = ( setError(data.error ?? 'Erreur inconnue'); setStatus('failed'); setPhase('failed'); + } else if (type === 'approval_required') { + setStatus('awaiting_approval'); + setPhase('awaiting_approval'); + setApprovalInfo({ + sessionId: data.session_id ?? '', + researchCount: data.research_count ?? 0, + qualityScore: data.quality_score ?? 0, + message: data.message ?? 'En attente de validation', + }); } }; - // Subscribe mode: use the shared event bus from App.tsx if (subscribe) { const unsubscribe = subscribe((evt) => { try { handleEvent(evt.type, evt.data); } catch { /* noop */ } @@ -104,14 +120,13 @@ export const useOrchestratorStream = ( if (!url) return; - // Normal mode: open a dedicated EventSource const es = new EventSource(url); esRef.current = es; const EVENT_TYPES = [ 'session_created', 'phase_transition', 'plan_updated', 'research_result', 'report_chunk', 'report_completed', - 'session_completed', 'session_failed', + 'session_completed', 'session_failed', 'approval_required', ] as const; for (const type of EVENT_TYPES) { @@ -132,5 +147,5 @@ export const useOrchestratorStream = ( }; }, [url, subscribe]); - return { report, plan, articles, stepResults, phase, status, error, sessionId }; + return { report, plan, articles, stepResults, phase, status, error, sessionId, approvalInfo }; }; diff --git a/frontend/src/pages/SessionDetailPage.tsx b/frontend/src/pages/SessionDetailPage.tsx index 5cfdee8..67bbf48 100644 --- a/frontend/src/pages/SessionDetailPage.tsx +++ b/frontend/src/pages/SessionDetailPage.tsx @@ -7,7 +7,7 @@ import { ApiService } from '../services/api'; import { CheckCircle2, Circle, AlertCircle, Globe, GitBranch, Play, MessageCircle, FileText, ExternalLink, Download, - ChevronRight, Loader2, Clock, + ChevronRight, Loader2, Clock, ThumbsUp, ThumbsDown, } from 'lucide-react'; interface SessionDetailPageProps { @@ -287,6 +287,7 @@ export const SessionDetailPage: React.FC = ({ streamUrl, status, error: streamError, sessionId: streamedSessionId, + approvalInfo, } = useOrchestratorStream(subscribe ? null : (streamUrl || null), subscribe); const [activeTab, setActiveTab] = useState('report'); @@ -295,6 +296,7 @@ export const SessionDetailPage: React.FC = ({ streamUrl, const [loadingSession, setLoadingSession] = useState(false); const [selectedStepId, setSelectedStepId] = useState(null); const [sourceFilter, setSourceFilter] = useState('all'); + const [approvalAction, setApprovalAction] = useState<'idle' | 'approving' | 'rejecting' | 'done'>('idle'); // Load session from DB (history view, not when streaming) useEffect(() => { @@ -362,6 +364,35 @@ export const SessionDetailPage: React.FC = ({ streamUrl, const effectiveError = streamError ?? (session?.status === 'failed' ? 'La session a échoué.' : null); const isStreaming = (!!streamUrl || !!subscribe) && status === 'running'; + const resolvedApprovalId = approvalInfo?.sessionId || session?.id || streamedSessionId; + const isAwaitingApproval = effectiveStatus === 'awaiting_approval' || session?.status === 'awaiting_approval'; + + const handleApprove = async () => { + if (!resolvedApprovalId || approvalAction !== 'idle') return; + setApprovalAction('approving'); + try { + await ApiService.approveSession(resolvedApprovalId); + setApprovalAction('done'); + } catch (err: any) { + alert(err.message); + setApprovalAction('idle'); + } + }; + + const handleReject = async () => { + if (!resolvedApprovalId || approvalAction !== 'idle') return; + if (!confirm('Rejeter cette session ? Elle sera marquée comme échouée.')) return; + setApprovalAction('rejecting'); + try { + await ApiService.rejectSession(resolvedApprovalId); + setApprovalAction('done'); + if (session) setSession({ ...session, status: 'failed' }); + } catch (err: any) { + alert(err.message); + setApprovalAction('idle'); + } + }; + // Source filter counts const filterCounts = useMemo(() => ({ all: allSources.length, @@ -400,6 +431,11 @@ export const SessionDetailPage: React.FC = ({ streamUrl, {effectiveError} )} + {isAwaitingApproval && ( +
+ Validation requise +
+ )} {finalReport && ( + + + + )} + {isAwaitingApproval && approvalAction === 'done' && ( +
+ Action enregistrée — la session va reprendre. +
+ )} +
diff --git a/frontend/src/pages/WatchProfilesPage.tsx b/frontend/src/pages/WatchProfilesPage.tsx new file mode 100644 index 0000000..db5ca65 --- /dev/null +++ b/frontend/src/pages/WatchProfilesPage.tsx @@ -0,0 +1,643 @@ +import React, { useEffect, useMemo, useState } from 'react'; +import { CalendarDays, Clock, Play, Plus, Save, Trash2 } from 'lucide-react'; +import { ApiService } from '../services/api'; +import type { EmailGroup, WatchProfile } from '../types'; + +type DepthOption = 'brief' | 'standard' | 'deep'; +type FormatOption = 'digest' | 'report' | 'newsletter'; +type ScheduleType = 'none' | 'once' | 'weekly' | 'monthly'; + +type DraftProfile = { + id?: string; + name: string; + subject: string; + topics: string; + depth: DepthOption; + format: FormatOption; + language: string; + is_active: boolean; + schedule_type: ScheduleType; + schedule_time: string; + schedule_days: string[]; + email_group_ids: string[]; +}; + +const WEEK_DAYS = [ + { id: 'lun', label: 'Lun' }, + { id: 'mar', label: 'Mar' }, + { id: 'mer', label: 'Mer' }, + { id: 'jeu', label: 'Jeu' }, + { id: 'ven', label: 'Ven' }, + { id: 'sam', label: 'Sam' }, + { id: 'dim', label: 'Dim' }, +]; + +const emptyDraft = (): DraftProfile => ({ + name: '', + subject: '', + topics: '', + depth: 'standard', + format: 'digest', + language: 'fr', + is_active: true, + schedule_type: 'none', + schedule_time: '08:00', + schedule_days: [], + email_group_ids: [], +}); + +function formatDate(iso?: string): string { + if (!iso) return '—'; + try { + return new Date(iso).toLocaleDateString('fr-FR', { + day: '2-digit', + month: 'short', + year: 'numeric', + hour: '2-digit', + minute: '2-digit', + }); + } catch { + return iso; + } +} + +export const WatchProfilesPage: React.FC = () => { + const [profiles, setProfiles] = useState([]); + const [emailGroups, setEmailGroups] = useState([]); + const [selectedId, setSelectedId] = useState(null); + const [draft, setDraft] = useState(emptyDraft()); + const [loading, setLoading] = useState(true); + const [saving, setSaving] = useState(false); + const [running, setRunning] = useState(false); + const [error, setError] = useState(null); + const [successMessage, setSuccessMessage] = useState(null); + + const selectedProfile = useMemo( + () => profiles.find((p) => p.id === selectedId) ?? null, + [profiles, selectedId], + ); + + const loadData = async () => { + setLoading(true); + setError(null); + try { + const [profilesData, groupsData] = await Promise.all([ + ApiService.getWatchProfiles(), + ApiService.getEmailGroups(), + ]); + setProfiles(profilesData); + setEmailGroups(groupsData); + } catch (err: any) { + setError(err.message); + } finally { + setLoading(false); + } + }; + + useEffect(() => { void loadData(); }, []); + + useEffect(() => { + if (!selectedProfile) return; + const rawSchedule = selectedProfile.schedule_type ?? 'none'; + const scheduleType: ScheduleType = + rawSchedule === 'weekly' || rawSchedule === 'once' || rawSchedule === 'monthly' + ? rawSchedule + : rawSchedule === 'none' + ? 'none' + : 'weekly'; // map 'custom' or unknown → 'weekly' + setDraft({ + id: selectedProfile.id, + name: selectedProfile.name, + subject: selectedProfile.subject ?? '', + topics: selectedProfile.topics.join(', '), + depth: selectedProfile.depth, + format: selectedProfile.format, + language: selectedProfile.language ?? 'fr', + is_active: selectedProfile.is_active, + schedule_type: scheduleType, + schedule_time: selectedProfile.schedule_time ?? '08:00', + schedule_days: selectedProfile.schedule_days ?? [], + email_group_ids: selectedProfile.email_groups.map((g) => g.id), + }); + }, [selectedProfile]); + + const handleNew = () => { + setSelectedId(null); + setDraft(emptyDraft()); + setError(null); + setSuccessMessage(null); + }; + + const showSuccess = (msg: string) => { + setSuccessMessage(msg); + window.setTimeout(() => setSuccessMessage(null), 4000); + }; + + const handleSave = async () => { + setSaving(true); + setError(null); + setSuccessMessage(null); + + const name = draft.name.trim(); + const subject = draft.subject.trim(); + if (!name) { + setError('Le nom du profil est requis.'); + setSaving(false); + return; + } + if (!subject) { + setError('Le sujet est requis.'); + setSaving(false); + return; + } + + const topics = draft.topics + .split(',') + .map((t) => t.trim()) + .filter(Boolean); + + const payload = { + name, + subject, + topics, + depth: draft.depth, + format: draft.format, + language: draft.language || undefined, + is_active: draft.is_active, + schedule_type: draft.schedule_type === 'none' ? undefined : draft.schedule_type, + schedule_time: draft.schedule_type !== 'none' ? draft.schedule_time : undefined, + schedule_days: draft.schedule_type === 'weekly' ? draft.schedule_days : [], + email_group_ids: draft.email_group_ids, + }; + + try { + const saved = draft.id + ? await ApiService.updateWatchProfile(draft.id, payload) + : await ApiService.createWatchProfile({ ...payload, subject }); + await loadData(); + setSelectedId(saved.id); + showSuccess(draft.id ? 'Profil mis à jour.' : 'Profil créé.'); + } catch (err: any) { + setError(err.message); + } finally { + setSaving(false); + } + }; + + const handleDelete = async () => { + if (!draft.id) return; + if (!window.confirm(`Supprimer le profil "${draft.name}" ?`)) return; + setSaving(true); + setError(null); + try { + await ApiService.deleteWatchProfile(draft.id); + await loadData(); + setSelectedId(null); + setDraft(emptyDraft()); + showSuccess('Profil supprimé.'); + } catch (err: any) { + setError(err.message); + } finally { + setSaving(false); + } + }; + + const handleRun = async () => { + if (!draft.id) return; + setRunning(true); + setError(null); + setSuccessMessage(null); + try { + const result = await ApiService.runProfile(draft.id, { send_email: draft.email_group_ids.length > 0 }); + if (result.success) { + const parts: string[] = ['Analyse lancée avec succès.']; + if (result.session_id) parts.push(`Session : ${result.session_id.slice(0, 8)}…`); + if (result.email_sent) parts.push('Email envoyé.'); + showSuccess(parts.join(' ')); + await loadData(); + } else { + setError(result.error ?? 'Échec du lancement.'); + } + } catch (err: any) { + setError(err.message); + } finally { + setRunning(false); + } + }; + + const toggleDay = (day: string) => { + setDraft((current) => ({ + ...current, + schedule_days: current.schedule_days.includes(day) + ? current.schedule_days.filter((d) => d !== day) + : [...current.schedule_days, day], + })); + }; + + const toggleEmailGroup = (groupId: string) => { + setDraft((current) => ({ + ...current, + email_group_ids: current.email_group_ids.includes(groupId) + ? current.email_group_ids.filter((id) => id !== groupId) + : [...current.email_group_ids, groupId], + })); + }; + + return ( +
+
+
+

Profils de veille

+

+ Configurez et planifiez vos analyses de veille technologique automatiques. +

+
+ +
+ + {error && ( +
+ {error} +
+ )} + + {successMessage && ( +
+ {successMessage} +
+ )} + +
+ {/* Left panel — profile list */} +
+
+
+ + Profils +
+ {profiles.length} +
+ + {loading ? ( +
Chargement…
+ ) : profiles.length === 0 ? ( +
+ Aucun profil pour le moment. Créez-en un pour automatiser vos analyses. +
+ ) : ( + profiles.map((profile) => { + const active = profile.id === selectedId; + return ( + + ); + }) + )} +
+ + {/* Right panel — form */} +
+
+
+
{draft.id ? 'Modifier le profil' : 'Nouveau profil'}
+
+ {draft.id ? 'Modifiez les paramètres de ce profil de veille.' : 'Définissez un profil pour automatiser vos analyses.'} +
+
+ +
+ + {/* Basic fields */} +
+
+
+ + setDraft((c) => ({ ...c, name: e.target.value }))} + placeholder="Veille IA hebdomadaire" + style={inputStyle} + /> +
+
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+ +