diff --git a/README.md b/README.md index 2c1f39a..6f7c9cc 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,56 @@ export REDISVL_VECTOR_DIMENSIONS=768 # Required for Ollama See **[LLM Providers](https://redis.github.io/agent-memory-server/llm-providers/)** for complete configuration options. +## Redis Cloud Agent Memory migration + +Redis Cloud Agent Memory Service stores long-term memories in a Cloud service key layout, while local `agent-memory-server` stores records in the RedisVL index layout. If you copy Redis data from a Cloud Agent Memory store into a local Redis deployment, run the migration command before rebuilding the local search index. + +Cloud exports observed in Redis use keys like: + +```text +memory::ltm: +``` + +with fields such as `id`, `owner_id`, and `text_vector`. Local `agent-memory-server` indexes keys like: + +```text +memory_idx: +``` + +with fields such as `id_`, `user_id`, and `vector`. The migration copies source hashes into local-shaped hashes, converts Cloud millisecond timestamps to seconds, and downcasts Cloud float64 vector blobs to RedisVL's local FLOAT32 vector format when needed. Source keys are not deleted. + +Dry run first: + +```bash +uv run agent-memory migrate-cloud-long-term-memory \ + --store-id +``` + +Apply the migration: + +```bash +uv run agent-memory migrate-cloud-long-term-memory \ + --store-id \ + --apply +``` + +Then rebuild the local index: + +```bash +uv run agent-memory rebuild-index +``` + +Useful options: + +```text +--source-pattern Override the source SCAN pattern. +--target-prefix Override the local target prefix, default memory_idx. +--batch-size Tune SCAN/write batch size. +--overwrite Replace existing target keys instead of skipping them. +``` + +See [Redis Cloud Agent Memory migration](docs/redis-cloud-migration.md) for the full schema mapping and Cloud-to-local workflow. + ## Documentation 📚 **[Full Documentation](https://redis.github.io/agent-memory-server/)** - Complete guides, API reference, and examples diff --git a/agent_memory_server/cli.py b/agent_memory_server/cli.py index 8e12635..3733915 100644 --- a/agent_memory_server/cli.py +++ b/agent_memory_server/cli.py @@ -109,6 +109,81 @@ async def run_migrations(): click.echo("Memory migrations completed successfully.") +@cli.command("migrate-cloud-long-term-memory") +@click.option( + "--store-id", + default=None, + help="Redis Cloud Agent Memory store ID. Builds source pattern memory::ltm:*.", +) +@click.option( + "--source-pattern", + default=None, + help="Explicit source SCAN pattern. Defaults to memory::ltm:* or memory:*:ltm:*.", +) +@click.option( + "--target-prefix", + default=None, + help="Local RedisVL memory key prefix. Defaults to REDIS_MEMORY_REDISVL_INDEX_PREFIX/memory_idx.", +) +@click.option("--batch-size", default=500, show_default=True, type=int) +@click.option( + "--apply", + "apply_changes", + is_flag=True, + help="Actually write migrated memory_idx: hashes. Without this, runs dry-run only.", +) +@click.option( + "--overwrite", + is_flag=True, + help="Overwrite existing target keys. By default existing local records are skipped.", +) +def migrate_cloud_long_term_memory_command( + store_id: str | None, + source_pattern: str | None, + target_prefix: str | None, + batch_size: int, + apply_changes: bool, + overwrite: bool, +): + """Copy Redis Cloud Agent Memory Service LTM hashes into local AMS schema. + + This migrates cloud-exported hashes shaped as memory::ltm: + with fields id/owner_id/text_vector into the local RedisVL schema shaped as + memory_idx: with fields id_/user_id/vector. Source keys are never deleted. + """ + import asyncio + + from agent_memory_server.redis_cloud_migration import ( + migrate_cloud_long_term_memory, + ) + + configure_logging() + + async def run_migration(): + redis = await get_redis_conn() + return await migrate_cloud_long_term_memory( + redis, + store_id=store_id, + source_pattern=source_pattern, + target_prefix=target_prefix, + batch_size=batch_size, + dry_run=not apply_changes, + overwrite=overwrite, + ) + + stats = asyncio.run(run_migration()) + result = stats.as_dict() + result["dry_run"] = not apply_changes + result["overwrite"] = overwrite + result["source_pattern"] = source_pattern or ( + f"memory:{store_id}:ltm:*" if store_id else "memory:*:ltm:*" + ) + result["target_prefix"] = target_prefix or settings.redisvl_index_prefix + click.echo(json.dumps(result, indent=2, sort_keys=True)) + if not apply_changes: + click.echo("Dry run only. Re-run with --apply to write migrated target keys.") + + @cli.command() @click.option( "--batch-size", diff --git a/agent_memory_server/redis_cloud_migration.py b/agent_memory_server/redis_cloud_migration.py new file mode 100644 index 0000000..924cc0e --- /dev/null +++ b/agent_memory_server/redis_cloud_migration.py @@ -0,0 +1,314 @@ +"""Migration helpers for Redis Cloud Agent Memory Service exports. + +Redis Cloud Agent Memory Service stores long-term memories with a service/store +scoped key layout like:: + + memory::ltm: + +and fields such as ``id``, ``owner_id`` and ``text_vector``. The local +agent-memory-server RedisVL backend stores compatible records under the local +RedisVL index prefix, usually:: + + memory_idx: + +with canonical fields ``id_``, ``user_id`` and ``vector``. + +This module copies cloud-exported hashes into the local schema without deleting +source keys. It is intentionally conservative: dry-run is the default at the CLI +layer, existing target keys are skipped unless overwrite is explicitly enabled, +and malformed source records are counted rather than partially migrated. +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import struct +from dataclasses import dataclass, field +from typing import Any + +from redis.asyncio import Redis + +from agent_memory_server.config import settings +from agent_memory_server.utils.keys import Keys +from agent_memory_server.utils.tag_codec import decode_tag_values, encode_tag_values + + +logger = logging.getLogger(__name__) + + +@dataclass +class CloudLongTermMemoryMigrationStats: + """Counters from a cloud long-term-memory schema migration.""" + + scanned: int = 0 + eligible: int = 0 + migrated: int = 0 + skipped_existing: int = 0 + skipped_missing_id: int = 0 + skipped_missing_text: int = 0 + skipped_missing_vector: int = 0 + failed: int = 0 + sample_source_keys: list[str] = field(default_factory=list) + sample_target_keys: list[str] = field(default_factory=list) + + def as_dict(self) -> dict[str, Any]: + return { + "scanned": self.scanned, + "eligible": self.eligible, + "migrated": self.migrated, + "skipped_existing": self.skipped_existing, + "skipped_missing_id": self.skipped_missing_id, + "skipped_missing_text": self.skipped_missing_text, + "skipped_missing_vector": self.skipped_missing_vector, + "failed": self.failed, + "sample_source_keys": self.sample_source_keys, + "sample_target_keys": self.sample_target_keys, + } + + +def _decode(value: Any) -> str: + if value is None: + return "" + if isinstance(value, bytes): + return value.decode("utf-8", "replace") + return str(value) + + +def _coerce_timestamp(value: Any) -> float | None: + """Convert Redis Cloud timestamps to local AMS seconds. + + Cloud exports observed in Redis store millisecond epoch values. Local + agent-memory-server stores seconds. If a value is already seconds, preserve + it. Empty/unparseable values return None so callers can omit the field. + """ + + if value in (None, b"", ""): + return None + try: + number = float(_decode(value)) + except (TypeError, ValueError): + return None + + # 1e11 is safely above current epoch seconds and below epoch millis. + if number > 100_000_000_000: + number = number / 1000.0 + return number + + +def _coerce_vector(value: Any, *, dimensions: int | None = None) -> bytes | None: + """Convert Cloud vector bytes to local RedisVL float32 bytes. + + Redis Cloud Agent Memory exports observed locally store 1536-dimensional + vectors as float64 blobs (12288 bytes). Local agent-memory-server's RedisVL + schema expects FLOAT32 blobs (1536 * 4 = 6144 bytes). If the vector is + already float32-sized, preserve it. If it is float64-sized, downcast little + endian doubles to little endian floats. + """ + + if not isinstance(value, bytes) or not value: + return None + + dims = dimensions or int(settings.redisvl_vector_dimensions) + float32_bytes = dims * 4 + float64_bytes = dims * 8 + + if len(value) == float32_bytes: + return value + if len(value) != float64_bytes: + return None + + floats = struct.unpack(f"<{dims}d", value) + return struct.pack(f"<{dims}f", *floats) + + +def cloud_hash_to_local_hash(source: dict[Any, Any]) -> dict[str, Any] | None: + """Map one Redis Cloud Agent Memory hash to local AMS RedisVL fields. + + Returns None when mandatory source fields are missing. The vector bytes are + copied as-is; no embedding provider/API call is required. + """ + + text = _decode(source.get(b"text", source.get("text"))).strip() + memory_id = _decode(source.get(b"id", source.get("id"))).strip() + vector = _coerce_vector(source.get(b"text_vector", source.get("text_vector"))) + + if not memory_id or not text or vector is None: + return None + + user_id = _decode(source.get(b"user_id", source.get("user_id"))).strip() + if not user_id: + user_id = _decode(source.get(b"owner_id", source.get("owner_id"))).strip() + + created_at = _coerce_timestamp(source.get(b"created_at", source.get("created_at"))) + updated_at = _coerce_timestamp(source.get(b"updated_at", source.get("updated_at"))) + last_accessed = _coerce_timestamp( + source.get(b"last_accessed", source.get("last_accessed")) + ) + if last_accessed is None: + last_accessed = updated_at or created_at + + memory_type = _decode( + source.get(b"memory_type", source.get("memory_type", "episodic")) + ).strip() or "episodic" + + target: dict[str, Any] = { + "id_": memory_id, + "text": text, + "vector": vector, + "session_id": _decode(source.get(b"session_id", source.get("session_id"))), + "user_id": user_id, + "namespace": _decode(source.get(b"namespace", source.get("namespace"))), + "memory_type": memory_type, + "topics": encode_tag_values( + decode_tag_values(_decode(source.get(b"topics", source.get("topics")))) + ), + "entities": encode_tag_values( + decode_tag_values(_decode(source.get(b"entities", source.get("entities")))) + ), + "extracted_from": encode_tag_values( + decode_tag_values( + _decode(source.get(b"extracted_from", source.get("extracted_from"))) + ) + ), + "memory_hash": _decode( + source.get(b"memory_hash", source.get("memory_hash")) + ), + "discrete_memory_extracted": _decode( + source.get( + b"discrete_memory_extracted", + source.get("discrete_memory_extracted", "f"), + ) + ) + or "f", + "pinned": int(_decode(source.get(b"pinned", source.get("pinned", 0))) or 0), + "access_count": int( + _decode(source.get(b"access_count", source.get("access_count", 0))) or 0 + ), + } + + if not target["memory_hash"]: + content_fields = { + "text": target["text"], + "user_id": target["user_id"], + "session_id": target["session_id"], + "namespace": target["namespace"], + "memory_type": target["memory_type"], + } + target["memory_hash"] = hashlib.sha256( + json.dumps(content_fields, sort_keys=True).encode() + ).hexdigest() + + if created_at is not None: + target["created_at"] = created_at + if updated_at is not None: + target["updated_at"] = updated_at + if last_accessed is not None: + target["last_accessed"] = last_accessed + + persisted_at = _coerce_timestamp( + source.get(b"persisted_at", source.get("persisted_at")) + ) + if persisted_at is not None: + target["persisted_at"] = persisted_at + + event_date = _coerce_timestamp(source.get(b"event_date", source.get("event_date"))) + if event_date is not None: + target["event_date"] = event_date + + return target + + +async def migrate_cloud_long_term_memory( + redis: Any, + *, + store_id: str | None = None, + source_pattern: str | None = None, + target_prefix: str | None = None, + batch_size: int = 500, + dry_run: bool = True, + overwrite: bool = False, +) -> CloudLongTermMemoryMigrationStats: + """Copy Redis Cloud Agent Memory Service hashes into local AMS schema.""" + + if batch_size <= 0: + raise ValueError("batch_size must be greater than zero") + + if source_pattern is None: + source_pattern = f"memory:{store_id}:ltm:*" if store_id else "memory:*:ltm:*" + + target_prefix = target_prefix or settings.redisvl_index_prefix + stats = CloudLongTermMemoryMigrationStats() + cursor = 0 + + while True: + cursor, keys = await redis.scan( + cursor=cursor, match=source_pattern, count=batch_size + ) + if not keys and cursor == 0: + break + + for key in keys: + key_str = _decode(key) + stats.scanned += 1 + if len(stats.sample_source_keys) < 5: + stats.sample_source_keys.append(key_str) + + source = await redis.hgetall(key) + memory_id = _decode(source.get(b"id", source.get("id"))).strip() + text = _decode(source.get(b"text", source.get("text"))).strip() + vector = _coerce_vector(source.get(b"text_vector", source.get("text_vector"))) + + if not memory_id: + stats.skipped_missing_id += 1 + continue + if not text: + stats.skipped_missing_text += 1 + continue + if vector is None: + stats.skipped_missing_vector += 1 + continue + + target_key = f"{target_prefix}:{memory_id}" + if len(stats.sample_target_keys) < 5: + stats.sample_target_keys.append(target_key) + + if not overwrite and await redis.exists(target_key): + stats.skipped_existing += 1 + continue + + stats.eligible += 1 + target = cloud_hash_to_local_hash(source) + if target is None: + stats.failed += 1 + continue + + if not dry_run: + try: + await redis.hset(target_key, mapping=target) + except Exception: + stats.failed += 1 + logger.exception("Failed to migrate %s to %s", key_str, target_key) + continue + + stats.migrated += 1 + + if cursor == 0: + break + + return stats + + +async def count_local_long_term_memory(redis: Redis) -> int: + """Count local AMS canonical long-term-memory keys for the current prefix.""" + + count = 0 + cursor = 0 + pattern = Keys.memory_key("*") + while True: + cursor, keys = await redis.scan(cursor=cursor, match=pattern, count=1000) + count += len(keys) + if cursor == 0: + break + return count diff --git a/docs/redis-cloud-migration.md b/docs/redis-cloud-migration.md new file mode 100644 index 0000000..5be3bde --- /dev/null +++ b/docs/redis-cloud-migration.md @@ -0,0 +1,122 @@ +# Redis Cloud Agent Memory migration + +Redis Cloud Agent Memory Service and a local `agent-memory-server` deployment both store long-term memories in Redis hashes, but the current local server does not index Redis Cloud export keys directly. + +## Schema difference + +Redis Cloud Agent Memory Service exports observed in Redis use this shape: + +```text +memory::ltm: +``` + +with hash fields: + +```text +id +text +text_vector +owner_id +session_id +namespace +memory_type +topics +created_at +updated_at +``` + +The local RedisVL backend uses the configured `redisvl_index_prefix`, defaulting to: + +```text +memory_idx: +``` + +with canonical hash fields: + +```text +id_ +text +vector +user_id +session_id +namespace +memory_type +topics +entities +memory_hash +discrete_memory_extracted +pinned +access_count +created_at +updated_at +last_accessed +``` + +The local RediSearch index is created from `agent_memory_server/memory_vector_db_factory.py` and configured by these settings in `agent_memory_server/config.py`: + +```text +REDIS_MEMORY_REDISVL_INDEX_NAME default: memory_records +REDIS_MEMORY_REDISVL_INDEX_PREFIX default: memory_idx +REDIS_MEMORY_REDISVL_VECTOR_DIMENSIONS default: 1536 +``` + +Setting `REDIS_MEMORY_REDISVL_INDEX_PREFIX` to the Cloud prefix is not enough: the server's query code expects the local field names (`id_`, `user_id`, `vector`) and search queries use `vector` as the vector field. + +## Migration command + +Use the built-in migration command to copy Cloud-shaped records into local AMS-shaped records. Source keys are never deleted. + +Dry run first: + +```bash +agent-memory migrate-cloud-long-term-memory \ + --store-id +``` + +Apply: + +```bash +agent-memory migrate-cloud-long-term-memory \ + --store-id \ + --apply +``` + +Then rebuild the local RediSearch index: + +```bash +agent-memory rebuild-index +``` + +The migration performs these transformations: + +```text +memory::ltm: -> memory_idx: +id -> id_ +owner_id -> user_id +text_vector -> vector, downcast from float64 to RedisVL FLOAT32 when needed +created_at / updated_at -> seconds epoch if Cloud exported milliseconds +missing memory_hash -> generated from text/user/session/namespace/type +missing local metadata -> safe defaults +``` + +Useful options: + +```text +--source-pattern Override the source SCAN pattern. +--target-prefix Override the local target prefix. +--batch-size Tune scan/write batch size. +--overwrite Replace existing target keys. Default is skip. +``` + +## Cloud to local workflow + +1. Export/copy Redis data from Cloud into local Redis. +2. Start local `agent-memory-server` pointed at that Redis. +3. Run the migration dry-run and check `eligible`, `skipped_*`, and sample keys. +4. Run with `--apply`. +5. Run `agent-memory rebuild-index`. +6. Verify with `/v1/long-term-memory/search` or the SDK/client. + +## Local to Cloud workflow + +The inverse direction should avoid direct key copying into Redis Cloud service internals unless the service contract explicitly supports it. Prefer the public Agent Memory API: read local `memory_idx:*` hashes, map local fields back to API `MemoryRecord` payloads, and write them through the Cloud endpoint so the service owns key layout and indexing. diff --git a/tests/test_redis_cloud_migration.py b/tests/test_redis_cloud_migration.py new file mode 100644 index 0000000..87189d3 --- /dev/null +++ b/tests/test_redis_cloud_migration.py @@ -0,0 +1,70 @@ +import struct + +import pytest + +from agent_memory_server.redis_cloud_migration import ( + _coerce_timestamp, + _coerce_vector, + cloud_hash_to_local_hash, +) + + +def test_cloud_hash_to_local_hash_maps_cloud_fields_to_local_schema(): + cloud_vector = struct.pack("<1536d", 0.5, -0.25, *([0.0] * 1534)) + source = { + b"id": b"abc123", + b"text": b"remember the important thing", + b"text_vector": cloud_vector, + b"owner_id": b"john", + b"session_id": b"session-1", + b"namespace": b"hermes-dev-default", + b"topics": b"alpha|beta", + b"created_at": b"1780000803492", + b"updated_at": b"1780000805492", + b"memory_type": b"episodic", + } + + target = cloud_hash_to_local_hash(source) + + assert target is not None + assert target["id_"] == "abc123" + assert target["text"] == "remember the important thing" + assert len(target["vector"]) == 1536 * 4 + assert struct.unpack("<2f", target["vector"][:8]) == pytest.approx((0.5, -0.25)) + assert target["user_id"] == "john" + assert target["session_id"] == "session-1" + assert target["namespace"] == "hermes-dev-default" + assert target["topics"] == "alpha,beta" + assert target["memory_type"] == "episodic" + assert target["discrete_memory_extracted"] == "f" + assert target["pinned"] == 0 + assert target["access_count"] == 0 + assert target["created_at"] == pytest.approx(1780000803.492) + assert target["updated_at"] == pytest.approx(1780000805.492) + assert target["last_accessed"] == pytest.approx(1780000805.492) + assert len(target["memory_hash"]) == 64 + + +def test_cloud_hash_to_local_hash_requires_id_text_and_vector(): + base = {b"id": b"abc123", b"text": b"memory", b"text_vector": b"vector"} + + for missing in [b"id", b"text", b"text_vector"]: + source = dict(base) + source.pop(missing) + assert cloud_hash_to_local_hash(source) is None + + +def test_coerce_vector_preserves_float32_and_downcasts_float64(): + float32_vector = struct.pack("<2f", 1.0, -2.0) + float64_vector = struct.pack("<2d", 1.0, -2.0) + + assert _coerce_vector(float32_vector, dimensions=2) == float32_vector + assert _coerce_vector(float64_vector, dimensions=2) == float32_vector + assert _coerce_vector(b"bad", dimensions=2) is None + + +def test_coerce_timestamp_preserves_seconds_and_converts_milliseconds(): + assert _coerce_timestamp("1780000803.492") == pytest.approx(1780000803.492) + assert _coerce_timestamp("1780000803492") == pytest.approx(1780000803.492) + assert _coerce_timestamp("") is None + assert _coerce_timestamp("not-a-number") is None