Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions flocks/channel/inbound/session_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,33 @@ async def list_bindings(
rows = await cursor.fetchall()
return [self._row_to_binding(r) for r in rows]

async def latest_active_user_binding(
self,
*,
channel_id: str,
account_id: Optional[str] = None,
chat_id: Optional[str] = None,
) -> Optional[SessionBinding]:
"""Return the binding only when a channel target resolves uniquely."""
from flocks.session.session import Session

candidates = await self.list_bindings(channel_id=channel_id)
if account_id:
candidates = [b for b in candidates if b.account_id == account_id]
if chat_id:
candidates = [b for b in candidates if b.chat_id == chat_id]

active_candidates: list[SessionBinding] = []
for binding in candidates:
session = await Session.get_by_id(binding.session_id)
if (
session
and session.status == "active"
and session.category == "user"
):
active_candidates.append(binding)
return active_candidates[0] if len(active_candidates) == 1 else None

# --- internal helpers ---

async def _find_binding(
Expand Down
9 changes: 6 additions & 3 deletions flocks/cli/commands/skill.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,12 @@ async def _search_safeskill(query: str) -> list:
name = item.get("name") or item.get("slug") or source
if not source or not name:
continue
install_hint = source if str(source).startswith("safeskill://") else f"safeskill:{source}"
results.append({
"name": str(name),
"description": str(item.get("description") or ""),
"source": "safeskill.cn",
"install_hint": f"safeskill:{source}",
"install_hint": install_hint,
})
return results
except Exception:
Expand All @@ -389,11 +390,12 @@ def _parse_safeskill_text_results(text: str) -> list:
continue
source = match.group(1).rstrip(",.;")
name = source.rstrip("/").split("/")[-1]
install_hint = source if source.startswith("safeskill://") else f"safeskill:{source}"
results.append({
"name": name,
"description": clean,
"source": "safeskill.cn",
"install_hint": f"safeskill:{source}",
"install_hint": install_hint,
})
return results

Expand Down Expand Up @@ -472,7 +474,8 @@ def install_skill(
"Install source:\n"
" clawhub:<name> – clawhub.com registry\n"
" skills-sh:<id> – skills.sh identifier (owner/repo/skill)\n"
" safeskill:<source> – SafeSkill Hub/GitHub/local source via SafeSkill CLI\n"
" safeskill://... – SafeSkill package URI\n"
" safeskill:<source> – SafeSkill source alias via SafeSkill CLI\n"
" github:<owner>/<repo> – GitHub repository\n"
" <owner>/<repo> – GitHub shorthand\n"
" https://... – direct SKILL.md URL\n"
Expand Down
244 changes: 232 additions & 12 deletions flocks/console/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,80 @@ def _shared_console_session_path() -> Path:
return Path(raw).expanduser() / "run" / "console-session.json"


def _flocks_root() -> Path:
return Path(os.getenv("FLOCKS_ROOT", str(Path.home() / ".flocks"))).expanduser()


def _read_json_file(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}


def _read_pro_bundle_marker() -> dict[str, Any]:
return _read_json_file(_flocks_root() / "run" / "pro-bundle-installed.json")


def _local_pro_license_path() -> Path:
return _flocks_root() / "flockspro" / "license.json"


def _read_local_pro_license_state() -> dict[str, Any]:
return _read_json_file(_local_pro_license_path())


def _read_local_pro_license_id() -> str:
state = _read_local_pro_license_state()
payload = state.get("payload") if isinstance(state.get("payload"), dict) else {}
return str(state.get("license_id") or payload.get("license_id") or "").strip()


def _read_local_pro_license_status() -> str:
state = _read_local_pro_license_state()
payload = state.get("payload") if isinstance(state.get("payload"), dict) else {}
return str(
state.get("license_status")
or state.get("status")
or payload.get("license_status")
or payload.get("status")
or ""
).strip()


def _pending_pro_bundle_install_receipt_path() -> Path:
return _flocks_root() / "run" / "pro-bundle-install-receipt-pending.json"


def _sync_local_pro_license_from_heartbeat_response(data: dict[str, Any]) -> None:
license_path = _local_pro_license_path()
state = _read_json_file(license_path)
now_ts = int(datetime.now(UTC).timestamp())
if state:
changed = False
patch_token = data.get("license_patch") or data.get("latest_patch")
if isinstance(patch_token, str) and patch_token:
patches = state.get("patches") if isinstance(state.get("patches"), list) else []
if patch_token not in patches:
state["patches"] = [*patches, patch_token]
changed = True
if state.get("last_sync_at") != now_ts:
state["last_sync_at"] = now_ts
state["last_heartbeat_ok_at"] = now_ts
changed = True
if changed:
license_path.parent.mkdir(parents=True, exist_ok=True)
license_path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")

revoked_license_ids = data.get("revoked_license_ids")
if isinstance(revoked_license_ids, list):
revocation_path = _flocks_root() / "flockspro" / "revocation.json"
revocation_path.parent.mkdir(parents=True, exist_ok=True)
payload = {"revoked_license_ids": sorted({str(item) for item in revoked_license_ids})}
revocation_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")


def _write_shared_console_session(session: dict[str, Any]) -> None:
path = _shared_console_session_path()
path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -42,6 +116,29 @@ def _write_shared_console_session(session: dict[str, Any]) -> None:
pass


def read_shared_console_session() -> dict[str, Any] | None:
path = _shared_console_session_path()
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, OSError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
token = str(payload.get("console_session_token") or "").strip()
fingerprint = str(payload.get("fingerprint") or "").strip()
install_id = str(payload.get("install_id") or "").strip()
if not token or not fingerprint or not install_id:
return None
expires_at = str(payload.get("expires_at") or "").strip()
if expires_at:
try:
if _parse_iso(expires_at) <= datetime.now(UTC):
return None
except ValueError:
return None
return payload


def _delete_shared_console_session() -> None:
path = _shared_console_session_path()
try:
Expand Down Expand Up @@ -284,39 +381,162 @@ def _runtime_version() -> str:
return str(__version__).lstrip("v")

@classmethod
async def send_heartbeat(cls) -> dict[str, Any]:
session = await cls._require_session()
console_base = cls.console_base_url()
def runtime_version_payload(cls, *, pro_component_version: str | None = None) -> dict[str, str]:
marker = _read_pro_bundle_marker()
core_version = str(
marker.get("core_version")
or cls._runtime_version()
).strip()
bundle_version = str(
marker.get("bundle_version")
or ""
).strip()
pro_component_version = str(marker.get("flockspro_component_version") or pro_component_version or "").strip()
has_pro_bundle = bool(bundle_version or pro_component_version)
edition = "flockspro" if has_pro_bundle else "oss"
payload = {
"fingerprint": session["fingerprint"],
"install_id": session["install_id"],
"edition": edition,
}
if core_version:
payload["core_version"] = core_version
if edition == "flockspro" and bundle_version:
payload["bundle_version"] = bundle_version
if edition == "flockspro" and pro_component_version:
payload["flockspro_component_version"] = pro_component_version
return {key: value for key, value in payload.items() if value}

@classmethod
def heartbeat_payload(
cls,
session: dict[str, Any],
*,
status: str = "ok",
license_id: str | None = None,
pro_component_version: str | None = None,
) -> dict[str, Any]:
version_payload = cls.runtime_version_payload(pro_component_version=pro_component_version)
return {
"fingerprint": session.get("fingerprint"),
"install_id": session.get("install_id"),
"console_login_id": session.get("console_login_id"),
"sent_at": _now_iso(),
"status": "ok",
"status": status,
"license_id": license_id or None,
**version_payload,
}
if not console_base:

@classmethod
async def send_heartbeat_for_session(
cls,
*,
session: dict[str, Any],
status: str = "ok",
license_id: str | None = None,
heartbeat_url: str | None = None,
report_install_receipt: bool = False,
pro_component_version: str | None = None,
) -> dict[str, Any]:
console_base = cls.console_base_url()
payload = cls.heartbeat_payload(
session,
status=status,
license_id=license_id,
pro_component_version=pro_component_version,
)
target_url = heartbeat_url or (f"{console_base}/v1/heartbeats" if console_base else "")
if not target_url:
return {"ok": True, "mode": "mock", "node": payload}
token = str(session.get("console_session_token") or "").strip()
if not token:
raise ValueError("console_session_token 缺失,无法发送心跳")
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{console_base}/v1/heartbeats",
target_url,
json=payload,
headers={"Authorization": f"Bearer {session['console_session_token']}"},
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code in {401, 403}:
raise ValueError("console 会话已失效,请重新登录")
resp.raise_for_status()
return resp.json()
data = resp.json()
_sync_local_pro_license_from_heartbeat_response(data)
if report_install_receipt:
await cls._report_pending_pro_bundle_install_receipt(client=client, session=session)
return data

@classmethod
async def send_heartbeat(cls) -> dict[str, Any]:
session = await cls._require_session()
return await cls.send_heartbeat_for_session(
session=session,
status=_read_local_pro_license_status() or "ok",
license_id=_read_local_pro_license_id() or None,
report_install_receipt=True,
)

@classmethod
async def report_pending_pro_bundle_install_receipt(cls) -> bool:
try:
session = await cls._require_session()
except Exception:
session = read_shared_console_session()
if not session:
return False
async with httpx.AsyncClient(timeout=10) as client:
return await cls._report_pending_pro_bundle_install_receipt(client=client, session=session)

@classmethod
def _console_base_url_for_session(cls, session: dict[str, Any]) -> str:
console_base = cls.console_base_url() or str(session.get("console_base_url") or "").strip().rstrip("/")
if console_base:
return console_base
shared_session = read_shared_console_session()
return str((shared_session or {}).get("console_base_url") or "").strip().rstrip("/")

@classmethod
async def _report_pending_pro_bundle_install_receipt(
cls,
*,
client: httpx.AsyncClient,
session: dict[str, Any],
) -> bool:
console_base = cls._console_base_url_for_session(session)
token = str(session.get("console_session_token") or "").strip()
if not console_base or not token:
return False
path = _pending_pro_bundle_install_receipt_path()
payload = _read_json_file(path)
if not payload:
return False
payload = {
**payload,
"fingerprint": session.get("fingerprint"),
"install_id": session.get("install_id"),
"license_id": payload.get("license_id") or _read_local_pro_license_id() or None,
}
try:
resp = await client.post(
f"{console_base}/v1/pro-bundles/installations",
json=payload,
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code in {200, 201, 202}:
path.unlink(missing_ok=True)
return True
except Exception:
return False
return False

@classmethod
async def sync_node_profile(cls, *, force: bool = False, source: str = "scheduled") -> dict[str, Any]:
_ = force
session = await cls._require_session()
console_base = cls.console_base_url()
version_payload = cls.runtime_version_payload()
payload = {
"fingerprint": session["fingerprint"],
"install_id": session["install_id"],
"edition": cls._edition(),
"version": cls._runtime_version(),
**version_payload,
"source": source,
"sent_at": _now_iso(),
}
Expand Down
Loading
Loading