-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathencrypted_secrets.py
More file actions
67 lines (49 loc) · 2.48 KB
/
encrypted_secrets.py
File metadata and controls
67 lines (49 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""EncryptedSecrets — per-scope credential storage over a :class:`SqliteStore` kv.
Real symmetric encryption via :class:`cryptography.fernet.Fernet` (AES-128-CBC +
HMAC). Each secret value is encrypted to a Fernet token before it lands in the
kv table, so a stolen database file yields no plaintext DSNs/API keys without
the key. The :class:`SecretsPort` boundary is unchanged, so callers (concierge,
``/connect``) need no edits.
Key management:
* If env ``LANG2SQL_SECRET_KEY`` holds a urlsafe-base64 Fernet key, that is the
encryption key — point every deployment instance at the same value so secrets
decrypt across restarts and across machines.
* Otherwise a key is generated once and persisted in the kv table under the
reserved scope ``__secrets__`` / key ``fernet_key``. This keeps a single
sqlite file self-contained (secrets survive restart) but is only as private
as the file itself — set ``LANG2SQL_SECRET_KEY`` for real key separation.
"""
from __future__ import annotations
import os
from cryptography.fernet import Fernet
from ..adapters.storage.sqlite_store import SqliteStore
# Reserved kv location for the auto-generated key (when env key is absent).
_KEY_SCOPE = "__secrets__"
_KEY_NAME = "fernet_key"
_ENV_KEY = "LANG2SQL_SECRET_KEY"
def _resolve_key(store: SqliteStore) -> bytes:
"""Pick the Fernet key: env override, else persisted, else freshly generated."""
env_key = os.environ.get(_ENV_KEY)
if env_key:
return env_key.encode("ascii")
stored = store.kv_get(_KEY_SCOPE, _KEY_NAME)
if stored is not None:
return stored.encode("ascii")
key = Fernet.generate_key()
store.kv_set(_KEY_SCOPE, _KEY_NAME, key.decode("ascii"))
return key
class EncryptedSecrets:
"""Implements :class:`SecretsPort` on top of a kv-capable store."""
def __init__(self, store: SqliteStore, *, key: bytes | None = None) -> None:
self._store = store
self._fernet = Fernet(key if key is not None else _resolve_key(store))
async def get(self, scope: str, key: str) -> str | None:
blob = self._store.kv_get(scope, key)
if blob is None:
return None
return self._fernet.decrypt(blob.encode("ascii")).decode("utf-8")
async def set(self, scope: str, key: str, value: str) -> None:
token = self._fernet.encrypt(value.encode("utf-8")).decode("ascii")
self._store.kv_set(scope, key, token)
async def delete(self, scope: str, key: str) -> None:
self._store.kv_delete(scope, key)