diff --git a/dimos/core/coordination/blueprints.py b/dimos/core/coordination/blueprints.py index f21ff3fe30..3eb792c9fe 100644 --- a/dimos/core/coordination/blueprints.py +++ b/dimos/core/coordination/blueprints.py @@ -85,6 +85,9 @@ class BlueprintAtom: module: type[ModuleBase] streams: tuple[StreamRef, ...] module_refs: tuple[ModuleRef, ...] + # new + namespace: str | None = None + # end @classmethod def create(cls, module: type[ModuleBase], kwargs: dict[str, Any]) -> Self: @@ -176,6 +179,14 @@ def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": blueprint = BlueprintAtom.create(module, kwargs) return cls(blueprints=(blueprint,)) + # new + def namespace(self, ns: str) -> "Blueprint": + """Recursively apply namespace to all blueprint atoms.""" + new_atoms = tuple(replace(a, namespace=ns) for a in self.blueprints) + return replace(self, blueprints=new_atoms) + + # end + def disabled_modules(self, *modules: type[ModuleBase]) -> "Blueprint": return replace(self, disabled_modules_tuple=self.disabled_modules_tuple + modules) @@ -251,7 +262,10 @@ def _eliminate_duplicates(blueprints: list[BlueprintAtom]) -> list[BlueprintAtom seen = set() unique_blueprints = [] for bp in reversed(blueprints): - if bp.module not in seen: - seen.add(bp.module) + # new + key = (bp.namespace, bp.module) + if key not in seen: + seen.add(key) unique_blueprints.append(bp) + # end return list(reversed(unique_blueprints)) diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index a560152133..3b92dece34 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -41,6 +41,10 @@ logger = setup_logger() +class InstanceKey(NamedTuple): + namespace: str | None + module: type[ModuleBase] + class ModuleDescriptor(NamedTuple): """Returned by `Coordinator/list_modules` so a remote client can build a proxy.""" @@ -53,7 +57,8 @@ class ModuleDescriptor(NamedTuple): class ModuleCoordinator(Resource): _managers: dict[str, WorkerManager] _global_config: GlobalConfig - _deployed_modules: dict[type[ModuleBase], ModuleProxyProtocol] + + _deployed_modules: dict[InstanceKey, ModuleProxyProtocol] def __init__( self, @@ -63,11 +68,11 @@ def __init__( manager_types: list[type[WorkerManager]] = [WorkerManagerPython] self._managers = {cls.deployment_identifier: cls(g=g) for cls in manager_types} self._deployed_modules = {} - self._deployed_atoms: dict[type[ModuleBase], BlueprintAtom] = {} - self._resolved_module_refs: dict[tuple[type[ModuleBase], str], type[ModuleBase]] = {} - self._transport_registry: dict[tuple[str, type], PubSubTransport[Any]] = {} - self._class_aliases: dict[type[ModuleBase], type[ModuleBase]] = {} - self._module_transports: dict[type[ModuleBase], dict[str, PubSubTransport[Any]]] = {} + self._deployed_atoms: dict[InstanceKey, BlueprintAtom] = {} + self._resolved_module_refs: dict[tuple[InstanceKey, str], InstanceKey] = {} + self._transport_registry: dict[tuple[str | None, str, type], PubSubTransport[Any]] = {} + self._class_aliases: dict[InstanceKey, InstanceKey] = {} + self._module_transports: dict[InstanceKey, dict[str, PubSubTransport[Any]]] = {} self._started = False self._modules_lock = threading.RLock() self._coordinator_rpc: CoordinatorRPC | None = None @@ -85,13 +90,13 @@ def stop(self) -> None: self._coordinator_rpc.stop() self._coordinator_rpc = None - for module_class, module in reversed(self._deployed_modules.items()): - logger.info("Stopping module...", module=module_class.__name__) + for key, module in reversed(self._deployed_modules.items()): + logger.info("Stopping module...", module=key.module.__name__) try: module.stop() except Exception: - logger.error("Error stopping module", module=module_class.__name__, exc_info=True) - logger.info("Module stopped.", module=module_class.__name__) + logger.error("Error stopping module", module=key.module.__name__, exc_info=True) + logger.info("Module stopped.", module=key.module.__name__) def _stop_manager(m: WorkerManager) -> None: try: @@ -125,11 +130,13 @@ def ping(self) -> str: def list_modules(self) -> list[ModuleDescriptor]: with self._modules_lock: descriptors: list[ModuleDescriptor] = [] - for cls in self._deployed_modules: + for key in self._deployed_modules: + cls = key.module qualified = f"{cls.__module__}.{cls.__name__}" + ns_prefix = f"{key.namespace}/" if key.namespace else "" descriptors.append( ModuleDescriptor( - class_name=cls.__name__, + class_name=f"{ns_prefix}{cls.__name__}", qualified_path=qualified, rpc_names=list(cls.rpcs.keys()), ) @@ -144,7 +151,10 @@ def load_blueprint_by_name(self, name: str) -> None: def list_module_names(self) -> list[str]: with self._modules_lock: - return [cls.__name__ for cls in self._deployed_modules] + return [ + f"{k.namespace}/{k.module.__name__}" if k.namespace else k.module.__name__ + for k in self._deployed_modules + ] def health_check(self) -> bool: return all(m.health_check() for m in self._managers.values()) @@ -169,8 +179,16 @@ def deploy( deployed_module = self._managers[module_class.deployment].deploy( module_class, global_config, kwargs ) + + # patch the proxy hardcoded remote name so it correctly targets the namespace + rpc_name = kwargs.get("__dimos_rpc_name__") + if rpc_name and hasattr(deployed_module, "remote_name"): + deployed_module.remote_name = rpc_name + with self._modules_lock: - self._deployed_modules[module_class] = deployed_module + ns = kwargs.get("__dimos_namespace__") + key = InstanceKey(ns, module_class) + self._deployed_modules[key] = deployed_module return deployed_module # type: ignore[return-value] def deploy_parallel( @@ -192,8 +210,16 @@ def deploy_parallel( def _deploy_group(dep: str) -> None: deployed = self._managers[dep].deploy_parallel(specs_by_deployment[dep], blueprint_args) - for index, module in zip(indices_by_deployment[dep], deployed, strict=True): - results[index] = module + for i, (original_index, module) in enumerate( + zip(indices_by_deployment[dep], deployed, strict=True) + ): + if module is not None: + spec_kwargs = specs_by_deployment[dep][i][2] + rpc_name = spec_kwargs.get("__dimos_rpc_name__") + if rpc_name and hasattr(module, "remote_name"): + module.remote_name = rpc_name + + results[original_index] = module try: safe_thread_map(list(specs_by_deployment.keys()), _deploy_group) @@ -204,8 +230,8 @@ def _deploy_group(dep: str) -> None: with self._modules_lock: self._deployed_modules.update( { - cls: mod - for (cls, _, _), mod in zip(module_specs, results, strict=True) + InstanceKey(kwargs.get("__dimos_namespace__"), cls): mod + for (cls, _, kwargs), mod in zip(module_specs, results, strict=True) if mod is not None } ) @@ -237,11 +263,17 @@ def start_all_modules(self) -> None: self._send_on_system_modules() - def _resolve_class(self, cls: type[ModuleBase]) -> type[ModuleBase]: - return self._class_aliases.get(cls, cls) + def _resolve_key(self, key: InstanceKey) -> InstanceKey: + return self._class_aliases.get(key, key) + + def get_instance_by_key(self, key_or_cls: InstanceKey | type[ModuleBase]) -> ModuleProxy: + key = key_or_cls if isinstance(key_or_cls, InstanceKey) else InstanceKey(None, key_or_cls) + return self._deployed_modules.get(self._resolve_key(key)) # type: ignore[return-value] - def get_instance(self, module: type[ModuleBase]) -> ModuleProxy: - return self._deployed_modules.get(self._resolve_class(module)) # type: ignore[return-value] + def get_instance(self, module_or_key: type[ModuleBase] | InstanceKey) -> ModuleProxy: + if isinstance(module_or_key, InstanceKey): + return self.get_instance_by_key(module_or_key) + return self.get_instance_by_key(InstanceKey(None, module_or_key)) def _send_on_system_modules(self) -> None: modules = list(self._deployed_modules.values()) @@ -250,32 +282,35 @@ def _send_on_system_modules(self) -> None: module.on_system_modules(modules) def _connect_streams(self, blueprint: Blueprint) -> None: - streams: dict[tuple[str, type], list[tuple[type, str]]] = defaultdict(list) + streams: dict[tuple[str | None, str, type], list[tuple[InstanceKey, str]]] = defaultdict( + list + ) for bp in blueprint.active_blueprints: + key = InstanceKey(bp.namespace, bp.module) for conn in bp.streams: remapped_name = blueprint.remapping_map.get((bp.module, conn.name), conn.name) if isinstance(remapped_name, str): - streams[remapped_name, conn.type].append((bp.module, conn.name)) + streams[bp.namespace, remapped_name, conn.type].append((key, conn.name)) - for remapped_name, stream_type in streams.keys(): - key = (remapped_name, stream_type) - if key in self._transport_registry: - transport = self._transport_registry[key] + for namespace, remapped_name, stream_type in streams.keys(): + map_key = (namespace, remapped_name, stream_type) + if map_key in self._transport_registry: + transport = self._transport_registry[map_key] else: - transport = _get_transport_for(blueprint, remapped_name, stream_type) - self._transport_registry[key] = transport - for module, original_name in streams[key]: - instance = self.get_instance(module) # type: ignore[assignment] + transport = _get_transport_for(blueprint, remapped_name, stream_type, namespace) + self._transport_registry[map_key] = transport + for key, original_name in streams[namespace, remapped_name, stream_type]: + instance = self.get_instance_by_key(key) # type: ignore[assignment] instance.set_transport(original_name, transport) # type: ignore[union-attr] - self._module_transports.setdefault(module, {})[original_name] = transport + self._module_transports.setdefault(key, {})[original_name] = transport logger.info( "Transport", name=remapped_name, original_name=original_name, topic=str(getattr(transport, "topic", None)), type=f"{stream_type.__module__}.{stream_type.__qualname__}", - module=module.__name__, + module=key.module.__name__, transport=transport.__class__.__name__, ) @@ -353,9 +388,10 @@ def _load_blueprint( # Reject duplicate modules. for bp in blueprint.active_blueprints: - if bp.module in self._deployed_modules: + key = InstanceKey(bp.namespace, bp.module) + if key in self._deployed_modules: raise ValueError( - f"{bp.module.__name__} is already deployed; cannot load the same module twice" + f"{bp.module.__name__} in namespace {bp.namespace} is already deployed; cannot load the same module twice" ) before = set(self._deployed_modules) @@ -364,7 +400,7 @@ def _load_blueprint( self._connect_streams(blueprint) _connect_module_refs(blueprint, self, existing_modules=before) - new_modules = [proxy for cls, proxy in self._deployed_modules.items() if cls not in before] + new_modules = [proxy for key, proxy in self._deployed_modules.items() if key not in before] if new_modules: safe_thread_map(new_modules, lambda m: m.build()) @@ -389,18 +425,20 @@ def unload_module(self, module_class: type[ModuleBase]) -> None: are responsible for rewiring. """ with self._modules_lock: - self._unload_module(module_class) + self._unload_module(InstanceKey(None, module_class)) - def _unload_module(self, module_class: type[ModuleBase]) -> None: - module_class = self._resolve_class(module_class) - if module_class not in self._deployed_modules: + def _unload_module(self, key_or_cls: InstanceKey | type[ModuleBase]) -> None: + key = key_or_cls if isinstance(key_or_cls, InstanceKey) else InstanceKey(None, key_or_cls) + key = self._resolve_key(key) + module_class = key.module + if key not in self._deployed_modules: raise ValueError(f"{module_class.__name__} is not deployed") if module_class.deployment != "python": raise NotImplementedError( f"unload_module only supports python deployment, got {module_class.deployment!r}" ) - proxy = self._deployed_modules[module_class] + proxy = self._deployed_modules[key] try: proxy.stop() @@ -421,30 +459,30 @@ def _unload_module(self, module_class: type[ModuleBase]) -> None: exc_info=True, ) - del self._deployed_modules[module_class] - self._deployed_atoms.pop(module_class, None) - self._module_transports.pop(module_class, None) - self._class_aliases = { - k: v for k, v in self._class_aliases.items() if v is not module_class - } + del self._deployed_modules[key] + self._deployed_atoms.pop(key, None) + self._module_transports.pop(key, None) + self._class_aliases = {k: v for k, v in self._class_aliases.items() if v != key} self._resolved_module_refs = { - key: target - for key, target in self._resolved_module_refs.items() - if key[0] is not module_class and target is not module_class + k: target + for k, target in self._resolved_module_refs.items() + if k[0] != key and target != key } def restart_module_by_class_name( self, class_name: str, *, + namespace: str | None = None, reload_source: bool = True, ) -> None: with self._modules_lock: - for cls in self._deployed_modules: - if cls.__name__ == class_name: - self._restart_module(cls, reload_source=reload_source) + for key in self._deployed_modules: + if key.module.__name__ == class_name and key.namespace == namespace: + self._restart_module(key, reload_source=reload_source) return - raise ValueError(f"No deployed module with class name {class_name!r}") + ns_prefix = f"{namespace}/" if namespace else "" + raise ValueError(f"No deployed module with name {ns_prefix}{class_name!r}") def restart_module( self, @@ -461,37 +499,42 @@ def restart_module( held a reference to it. """ with self._modules_lock: - return self._restart_module(module_class, reload_source=reload_source) + return self._restart_module( + InstanceKey(None, module_class), reload_source=reload_source + ) def _restart_module( self, - module_class: type[ModuleBase], + key_or_cls: InstanceKey | type[ModuleBase], *, reload_source: bool = True, ) -> ModuleProxyProtocol: - module_class = self._resolve_class(module_class) - if module_class not in self._deployed_modules: + key = key_or_cls if isinstance(key_or_cls, InstanceKey) else InstanceKey(None, key_or_cls) + key = self._resolve_key(key) + module_class = key.module + if key not in self._deployed_modules: raise ValueError(f"{module_class.__name__} is not deployed") if module_class.deployment != "python": raise NotImplementedError( f"restart_module only supports python deployment, got {module_class.deployment!r}" ) - old_atom = self._deployed_atoms[module_class] + old_atom = self._deployed_atoms[key] kwargs = dict(old_atom.kwargs) - saved_transports = dict(self._module_transports.get(module_class, {})) + saved_transports = dict(self._module_transports.get(key, {})) + inbound_refs = [ - (consumer, ref_name) - for (consumer, ref_name), target in self._resolved_module_refs.items() - if target is module_class + (consumer_key, ref_name) + for (consumer_key, ref_name), target in self._resolved_module_refs.items() + if target == key ] outbound_refs = [ (ref_name, target) - for (consumer, ref_name), target in self._resolved_module_refs.items() - if consumer is module_class + for (consumer_key, ref_name), target in self._resolved_module_refs.items() + if consumer_key == key ] - self.unload_module(module_class) + self._unload_module(key) if reload_source: source_mod = sys.modules.get(module_class.__module__) @@ -502,43 +545,54 @@ def _restart_module( else: new_class = module_class - if new_class is not module_class: - for old_cls in list(self._class_aliases): - if self._class_aliases[old_cls] is module_class: - self._class_aliases[old_cls] = new_class - self._class_aliases[module_class] = new_class + new_key = InstanceKey(key.namespace, new_class) + if new_key != key: + for old_key in list(self._class_aliases): + if self._class_aliases[old_key] == key: + self._class_aliases[old_key] = new_key + self._class_aliases[key] = new_key + + kwargs["__dimos_namespace__"] = key.namespace + kwargs["__dimos_rpc_name__"] = ( + f"{key.namespace}/{new_class.__name__}" if key.namespace else new_class.__name__ + ) python_wm = cast("WorkerManagerPython", self._managers["python"]) new_proxy = python_wm.deploy_fresh(new_class, self._global_config, kwargs) - self._deployed_modules[new_class] = new_proxy + if hasattr(new_proxy, "remote_name"): + new_proxy.remote_name = kwargs["__dimos_rpc_name__"] - new_bp = new_class.blueprint(**kwargs) + self._deployed_modules[new_key] = new_proxy + + # Strip internal framework keys before calling blueprint() so they don't pollute kwargs + clean_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("__dimos_")} + new_bp = new_class.blueprint(**clean_kwargs) new_atom = new_bp.active_blueprints[0] - self._deployed_atoms[new_class] = new_atom + self._deployed_atoms[new_key] = new_atom for stream_ref in new_atom.streams: transport = saved_transports.get(stream_ref.name) if transport is not None: new_proxy.set_transport(stream_ref.name, transport) - self._module_transports[new_class] = { + self._module_transports[new_key] = { s.name: t for s in new_atom.streams if (t := saved_transports.get(s.name)) is not None } - for consumer_class, ref_name in inbound_refs: - consumer_proxy = self._deployed_modules.get(consumer_class) + for consumer_key, ref_name in inbound_refs: + consumer_proxy = self._deployed_modules.get(consumer_key) if consumer_proxy is None: continue setattr(consumer_proxy, ref_name, new_proxy) consumer_proxy.set_module_ref(ref_name, new_proxy) # type: ignore[attr-defined] - self._resolved_module_refs[consumer_class, ref_name] = new_class + self._resolved_module_refs[consumer_key, ref_name] = new_key - for ref_name, target_class in outbound_refs: - target_proxy = self._deployed_modules.get(target_class) + for ref_name, target_key in outbound_refs: + target_proxy = self._deployed_modules.get(target_key) if target_proxy is None: continue setattr(new_proxy, ref_name, target_proxy) new_proxy.set_module_ref(ref_name, target_proxy) # type: ignore[attr-defined] - self._resolved_module_refs[new_class, ref_name] = target_class + self._resolved_module_refs[new_key, ref_name] = target_key new_proxy.build() new_proxy.start() @@ -567,19 +621,30 @@ def _all_name_types(blueprint: Blueprint) -> set[tuple[str, type]]: return result -def _is_name_unique(blueprint: Blueprint, name: str) -> bool: - return sum(1 for n, _ in _all_name_types(blueprint) if n == name) == 1 +def _is_name_unique(blueprint: Blueprint, name: str, namespace: str | None = None) -> bool: + count = 0 + for bp in blueprint.active_blueprints: + if bp.namespace == namespace: + for conn in bp.streams: + remapped = blueprint.remapping_map.get((bp.module, conn.name), conn.name) + if remapped == name: + count += 1 + return count == 1 -def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> PubSubTransport[Any]: +def _get_transport_for( + blueprint: Blueprint, name: str, stream_type: type, namespace: str | None = None +) -> PubSubTransport[Any]: transport = blueprint.transport_map.get((name, stream_type), None) if transport: return transport use_pickled = getattr(stream_type, "lcm_encode", None) is None - topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}" - transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type) + topic_base = f"/{namespace}/{name}" if namespace else f"/{name}" + topic = topic_base if _is_name_unique(blueprint, name, namespace) else f"/{short_id()}" + + transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type) return transport @@ -590,8 +655,9 @@ def _verify_no_name_conflicts(blueprint: Blueprint) -> None: for bp in blueprint.active_blueprints: for conn in bp.streams: stream_name = blueprint.remapping_map.get((bp.module, conn.name), conn.name) - name_to_types[stream_name].add(conn.type) - name_to_modules[stream_name].append((bp.module, conn.type)) + isolated_name = f"{bp.namespace}/{stream_name}" if bp.namespace else stream_name + name_to_types[isolated_name].add(conn.type) + name_to_modules[isolated_name].append((bp.module, conn.type)) conflicts: dict[Any, dict[type, list[type]]] = {} for conn_name, types in name_to_types.items(): @@ -621,21 +687,21 @@ def _verify_no_name_conflicts(blueprint: Blueprint) -> None: def _verify_no_conflicts_with_existing( blueprint: Blueprint, - existing_registry: dict[tuple[str, type], PubSubTransport[Any]], + existing_registry: dict[tuple[str | None, str, type], PubSubTransport[Any]], ) -> None: """Check that a new blueprint's streams don't conflict with already-registered transports.""" if not existing_registry: return - existing_names: dict[str, set[type]] = defaultdict(set) - for name, stream_type in existing_registry: - existing_names[name].add(stream_type) + existing_names: dict[tuple[str | None, str], set[type]] = defaultdict(set) + for namespace, name, stream_type in existing_registry: + existing_names[(namespace, name)].add(stream_type) for bp in blueprint.active_blueprints: for conn in bp.streams: remapped_name = blueprint.remapping_map.get((bp.module, conn.name), conn.name) - if isinstance(remapped_name, str) and remapped_name in existing_names: - for existing_type in existing_names[remapped_name]: + if isinstance(remapped_name, str) and (bp.namespace, remapped_name) in existing_names: + for existing_type in existing_names[(bp.namespace, remapped_name)]: if existing_type != conn.type: raise ValueError( f"Stream '{remapped_name}' in {bp.module.__name__} has type " @@ -685,12 +751,18 @@ def _deploy_all_modules( ) -> None: module_specs: list[ModuleSpec] = [] for bp in blueprint.active_blueprints: - module_specs.append((bp.module, gc, bp.kwargs.copy())) + kwargs = bp.kwargs.copy() + kwargs["__dimos_namespace__"] = bp.namespace + kwargs["__dimos_rpc_name__"] = ( + f"{bp.namespace}/{bp.module.__name__}" if bp.namespace else bp.module.__name__ + ) + module_specs.append((bp.module, gc, kwargs)) module_coordinator.deploy_parallel(module_specs, blueprint_args) for bp in blueprint.active_blueprints: - module_coordinator._deployed_atoms[bp.module] = bp + key = InstanceKey(bp.namespace, bp.module) + module_coordinator._deployed_atoms[key] = bp def _ref_msg(module_name: str, ref: object, spec_name: str, detail: str) -> str: @@ -706,7 +778,7 @@ def _resolve_single_ref( spec: Any, blueprint: Blueprint, disabled_set: set[type], - existing_modules: set[type[ModuleBase]] | None = None, + existing_modules: set[InstanceKey] | None = None, ) -> Any: """Resolve a module ref to its provider. @@ -718,20 +790,20 @@ def _resolve_single_ref( s = module_ref.spec.__name__ possible = [ - other.module + InstanceKey(other.namespace, other.module) for other in blueprint.active_blueprints if other != bp and spec_structural_compliance(other.module, spec) ] if existing_modules: - bp_module_set = {o.module for o in blueprint.active_blueprints} - for mod_cls in existing_modules: + bp_module_set = {InstanceKey(o.namespace, o.module) for o in blueprint.active_blueprints} + for key in existing_modules: if ( - mod_cls != bp.module - and mod_cls not in bp_module_set - and spec_structural_compliance(mod_cls, spec) + key != InstanceKey(bp.namespace, bp.module) + and key not in bp_module_set + and spec_structural_compliance(key.module, spec) ): - possible.append(mod_cls) - valid = [c for c in possible if spec_annotation_compliance(c, spec)] + possible.append(key) + valid = [c for c in possible if spec_annotation_compliance(c.module, spec)] if not possible: if module_ref.optional: @@ -762,9 +834,9 @@ def _resolve_single_ref( m, module_ref, s, - f"{possible[0].__name__} met the spec structurally but had " + f"{possible[0].module.__name__} met the spec structurally but had " f"annotation mismatches.\nPlease either change the {s} spec " - f"or the {possible[0].__name__} module.", + f"or the {possible[0].module.__name__} module.", ) ) return possible[0] @@ -785,7 +857,7 @@ def _resolve_single_ref( ) ) - names = ", ".join(c.__name__ for c in possible) + names = ", ".join(c.module.__name__ for c in possible) raise Exception( _ref_msg( m, @@ -799,32 +871,28 @@ def _resolve_single_ref( def _connect_module_refs( blueprint: Blueprint, module_coordinator: ModuleCoordinator, - existing_modules: set[type[ModuleBase]] | None = None, + existing_modules: set[InstanceKey] | None = None, ) -> None: from dimos.core.coordination.blueprints import DisabledModuleProxy from dimos.core.module import is_module_type from dimos.core.rpc_client import AsyncSpecProxy - mod_and_mod_ref_to_proxy = { - (module, name): replacement - for (module, name), replacement in blueprint.remapping_map.items() - if is_spec(replacement) or is_module_type(replacement) - } - - # Track the consumer's declared spec for each ref so we can wrap the proxy - # below if the spec contains async-declared methods. - declared_spec: dict[tuple[type[ModuleBase], str], Any] = {} - - disabled_ref_proxies: dict[tuple[type[ModuleBase], str], DisabledModuleProxy] = {} + mod_and_mod_ref_to_proxy: dict[tuple[InstanceKey, str], InstanceKey] = {} + declared_spec: dict[tuple[InstanceKey, str], Any] = {} + disabled_ref_proxies: dict[tuple[InstanceKey, str], DisabledModuleProxy] = {} disabled_set = set(blueprint.disabled_modules_tuple) for bp in blueprint.active_blueprints: + base_key = InstanceKey(bp.namespace, bp.module) for module_ref in bp.module_refs: - declared_spec[bp.module, module_ref.name] = module_ref.spec - spec = mod_and_mod_ref_to_proxy.get((bp.module, module_ref.name), module_ref.spec) + declared_spec[base_key, module_ref.name] = module_ref.spec + spec = blueprint.remapping_map.get((bp.module, module_ref.name), module_ref.spec) if is_module_type(spec): - mod_and_mod_ref_to_proxy[bp.module, module_ref.name] = spec + target_key = InstanceKey(bp.namespace, cast("type[ModuleBase]", spec)) + if target_key not in module_coordinator._deployed_modules: + target_key = InstanceKey(None, cast("type[ModuleBase]", spec)) + mod_and_mod_ref_to_proxy[base_key, module_ref.name] = target_key continue result = _resolve_single_ref( @@ -833,24 +901,41 @@ def _connect_module_refs( if result is None: continue if isinstance(result, DisabledModuleProxy): - disabled_ref_proxies[bp.module, module_ref.name] = result + disabled_ref_proxies[base_key, module_ref.name] = result else: - mod_and_mod_ref_to_proxy[bp.module, module_ref.name] = result - - for (base_module, ref_name), target_module in mod_and_mod_ref_to_proxy.items(): - base_instance = module_coordinator.get_instance(base_module) - target_instance: Any = module_coordinator.get_instance(target_module) # type: ignore[arg-type] - async_methods = _async_methods_of_spec(declared_spec.get((base_module, ref_name))) - if async_methods: - target_instance = AsyncSpecProxy(target_instance, async_methods) - setattr(base_instance, ref_name, target_instance) - base_instance.set_module_ref(ref_name, target_instance) - module_coordinator._resolved_module_refs[base_module, ref_name] = cast( - "type[ModuleBase]", target_module - ) + target_key = result + if target_key not in module_coordinator._deployed_modules: + target_key = InstanceKey(None, target_key.module) + mod_and_mod_ref_to_proxy[base_key, module_ref.name] = target_key + + for bp in blueprint.active_blueprints: + base_key = InstanceKey(bp.namespace, bp.module) + for module_ref in bp.module_refs: + ref_name = module_ref.name + if (base_key, ref_name) not in mod_and_mod_ref_to_proxy: + continue + + target_key = mod_and_mod_ref_to_proxy[base_key, ref_name] + + base_instance = module_coordinator.get_instance_by_key(base_key) + target_instance: Any = module_coordinator.get_instance_by_key(target_key) + + if target_instance is None: + logger.error( + f"Failed to wire {base_key} -> {ref_name}. Target {target_key} not found." + ) + continue + + async_methods = _async_methods_of_spec(declared_spec.get((base_key, ref_name))) + if async_methods: + target_instance = AsyncSpecProxy(target_instance, async_methods) + + setattr(base_instance, ref_name, target_instance) + base_instance.set_module_ref(ref_name, target_instance) + module_coordinator._resolved_module_refs[base_key, ref_name] = target_key - for (base_module, ref_name), proxy in disabled_ref_proxies.items(): - base_instance = module_coordinator.get_instance(base_module) + for (base_key, ref_name), proxy in disabled_ref_proxies.items(): + base_instance = module_coordinator.get_instance_by_key(base_key) setattr(base_instance, ref_name, proxy) base_instance.set_module_ref(ref_name, cast("Any", proxy)) @@ -891,4 +976,4 @@ def _log_blueprint_graph(blueprint: Blueprint, module_coordinator: ModuleCoordin bridge = module_coordinator.get_instance(RerunBridgeModule) # type: ignore[arg-type] bridge.log_blueprint_graph(dot_code, module_names) except Exception: - logger.error("Failed to log blueprint graph to Rerun", exc_info=True) + logger.error("Failed to log blueprint graph to Rerun", exc_info=True) \ No newline at end of file diff --git a/dimos/core/coordination/test_module_coordinator.py b/dimos/core/coordination/test_module_coordinator.py index c1baad17b2..b75b533cf7 100644 --- a/dimos/core/coordination/test_module_coordinator.py +++ b/dimos/core/coordination/test_module_coordinator.py @@ -502,7 +502,7 @@ def test_load_blueprint_conflict_with_existing() -> None: """Loading a blueprint whose stream name clashes (different type) raises ValueError.""" from dimos.core.transport import pLCMTransport - registry: dict[tuple[str, type], object] = {("data1", Data1): pLCMTransport("/data1")} + registry: dict[tuple[str | None, str, type], object] = {(None, "data1", Data1): pLCMTransport("/data1")} class ConflictModule(Module): data1: In[Data2] # same name, different type @@ -613,12 +613,12 @@ def test_restart_module_preserves_stream_wiring(dynamic_coordinator) -> None: c = dynamic_coordinator.get_instance(ModuleC) assert c is not None topic_before = c.data3.transport.topic - registry_before = dynamic_coordinator._transport_registry[("data3", Data3)] + registry_before = dynamic_coordinator._transport_registry[(None, "data3", Data3)] dynamic_coordinator.restart_module(ModuleC, reload_source=False) # Transport in the registry is the same parent-side object. - assert dynamic_coordinator._transport_registry[("data3", Data3)] is registry_before + assert dynamic_coordinator._transport_registry[(None, "data3", Data3)] is registry_before c_after = dynamic_coordinator.get_instance(ModuleC) assert c_after is not None @@ -765,12 +765,12 @@ def test_restart_preserves_remapped_streams(dynamic_coordinator) -> None: dynamic_coordinator.load_blueprint(bp) target = dynamic_coordinator.get_instance(TargetModule) - registry_before = dynamic_coordinator._transport_registry[("remapped_data", Data1)] + registry_before = dynamic_coordinator._transport_registry[(None, "remapped_data", Data1)] dynamic_coordinator.restart_module(SourceModule, reload_source=False) # The coordinator-side transport object in the registry is unchanged. - assert dynamic_coordinator._transport_registry[("remapped_data", Data1)] is registry_before + assert dynamic_coordinator._transport_registry[(None, "remapped_data", Data1)] is registry_before # The restarted proxy sees the same topic as the target. source_after = dynamic_coordinator.get_instance(SourceModule) assert source_after.color_image.transport.topic == target.remapped_data.transport.topic @@ -789,4 +789,4 @@ def test_list_module_names(dynamic_coordinator) -> None: assert dynamic_coordinator.list_module_names() == [] dynamic_coordinator.load_module(ModuleA) dynamic_coordinator.load_module(ModuleC) - assert set(dynamic_coordinator.list_module_names()) == {"ModuleA", "ModuleC"} + assert set(dynamic_coordinator.list_module_names()) == {"ModuleA", "ModuleC"} \ No newline at end of file diff --git a/dimos/core/module.py b/dimos/core/module.py index 26a2b6f893..e967253038 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -144,6 +144,10 @@ class ModuleBase(Configurable, CompositeResource): _tools_lock: threading.Lock def __init__(self, config_args: dict[str, Any]) -> None: + # new + self._dimos_rpc_name = config_args.pop("__dimos_rpc_name__", None) + self._dimos_namespace = config_args.pop("__dimos_namespace__", None) + # end super().__init__(**config_args) self._module_closed_lock = threading.Lock() self._tools = {} @@ -154,7 +158,9 @@ def __init__(self, config_args: dict[str, Any]) -> None: rpc_timeouts=self.config.rpc_timeouts, default_rpc_timeout=self.config.default_rpc_timeout, ) - self.rpc.serve_module_rpc(self) + # new + self.rpc.serve_module_rpc(self, name=self._dimos_rpc_name or self.name) + # end self.rpc.start() # type: ignore[attr-defined] except ValueError: ... diff --git a/dimos/core/tests/test_blueprint_namespaces.py b/dimos/core/tests/test_blueprint_namespaces.py new file mode 100644 index 0000000000..c4acb0d0e1 --- /dev/null +++ b/dimos/core/tests/test_blueprint_namespaces.py @@ -0,0 +1,16 @@ +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.coordination.test_blueprints import ModuleA, ModuleB + + +def test_namespaces_allow_duplicates_and_scope(dynamic_coordinator): + """ + verify that we can deploy the same module classes multiple times as long as they are isolated within different namespaces. + """ + robot1_bp = autoconnect(ModuleA.blueprint(), ModuleB.blueprint()).namespace("robot1") + robot2_bp = autoconnect(ModuleA.blueprint(), ModuleB.blueprint()).namespace("robot2") + system_bp = autoconnect(robot1_bp, robot2_bp) + assert len(system_bp.blueprints) == 4, "system blueprint should contain exactly 4 atoms" + dynamic_coordinator.load_blueprint(system_bp) + assert dynamic_coordinator.n_modules == 4, ( + "coordinator should be tracking exactly 4 active modules" + ) diff --git a/dimos/porcelain/local_module_source.py b/dimos/porcelain/local_module_source.py index 8dea71154f..1b4327e8a3 100644 --- a/dimos/porcelain/local_module_source.py +++ b/dimos/porcelain/local_module_source.py @@ -42,8 +42,8 @@ def list_module_names(self) -> list[str]: return self._coordinator.list_module_names() def get_module(self, name: str) -> Any: - for cls, proxy in self._coordinator._deployed_modules.items(): - if cls.__name__ == name: + for key, proxy in self._coordinator._deployed_modules.items(): + if key.module.__name__ == name: return proxy raise KeyError(name) @@ -51,4 +51,4 @@ def invalidate(self, name: str) -> None: return None def close(self) -> None: - return None + return None \ No newline at end of file diff --git a/dimos/porcelain/test_dimos.py b/dimos/porcelain/test_dimos.py index b9ab867195..75e02d73e0 100644 --- a/dimos/porcelain/test_dimos.py +++ b/dimos/porcelain/test_dimos.py @@ -215,4 +215,4 @@ def test_connected_dir(client): def test_connected_skills(client): result = client.skills.ping() - assert result == "pong" + assert result == "pong" \ No newline at end of file