From 54d7a51e4093ef89e5d722d5a5c904acddb1ac75 Mon Sep 17 00:00:00 2001 From: Noah Oblath Date: Fri, 5 Jun 2026 10:47:56 -0700 Subject: [PATCH 1/5] Adapted Python bindings and documentation to rmqcpp-based dripline-cpp --- CMakeLists.txt | 8 ++++---- Dockerfile | 1 + dripline/core/service.py | 17 +++++------------ .../dripline_core/_service_pybind.hh | 19 +++---------------- .../dripline_core/_service_trampoline.hh | 7 ------- module_bindings/dripline_core/core_pybind.hh | 12 +----------- 6 files changed, 14 insertions(+), 50 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 965bafd4..409e5e96 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,10 +12,10 @@ include( GNUInstallDirs ) # We need to find Boost here so that all of the components have already been found # before each dependency's config # Here's the master list of required boost components -# Requirements come from: SimpleAmqpClient, and Scarab -list( APPEND boost_components chrono filesystem system ) -# Boost (1.47 required for SimpleAmqpClient) -find_package( Boost 1.47.0 REQUIRED COMPONENTS ${boost_components} ) +# Requirements come from: rmqcpp (via Dripline), and Scarab +list( APPEND boost_components filesystem ) +# Boost (>=1.70 required for Scarab) +find_package( Boost 1.70.0 REQUIRED COMPONENTS ${boost_components} ) set( PUBLIC_EXT_LIBS ) diff --git a/Dockerfile b/Dockerfile index 4411937e..aed02ebf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,7 @@ ARG img_user=ghcr.io/driplineorg ARG img_repo=dripline-cpp #ARG img_tag=dlcpp-hf2.10.8 +# TODO: update to the first dripline-cpp tag built with rmqcpp (v2.10.11 was the last SimpleAmqpClient-based release) ARG img_tag=v2.10.11 FROM ${img_user}/${img_repo}:${img_tag} AS deps diff --git a/dripline/core/service.py b/dripline/core/service.py index ad4574bd..0b308a7d 100644 --- a/dripline/core/service.py +++ b/dripline/core/service.py @@ -37,20 +37,13 @@ class Service(_Service, ObjectCreator, RequestSender, RequestHandler): A service has a number of key characteristics (most of which come from its parent classes): * `core` -- Has all of the basic AMQP capabilities, sending messages, and making and manipulating connections * `endpoint` -- Handles Dripline messages - * `listener_receiver` -- Asynchronously recieves AMQP messages and turns them into Dripline messages + * `message_dispatcher` -- Receives AMQP messages via rmqcpp callbacks and dispatches them as Dripline messages * `heartbeater` -- Sends periodic heartbeat messages * `scheduler` -- Can schedule events - - As is apparent from the above descriptions, a service is responsible for a number of threads - when it executes: - * Listening -- grabs AMQP messages off the channel when they arrive - * Message-wait -- any incomplete multi-part Dripline message will setup a thread to wait - * until the message is complete, and then submits it for handling - * Receiver -- grabs completed Dripline messages and handles it - * Async endpoint listening -- same as abovefor each asynchronous endpoint - * Async endpoint message-wait -- same as above for each asynchronous endpoint - * Async endpoint receiver -- same as above for each asynchronous endpoint - * Heatbeater -- sends regular heartbeat messages + + Message delivery is handled by rmqcpp's internal thread pool via callbacks. + dripline-cpp manages only the following threads: + * Heartbeater -- sends regular heartbeat messages * Scheduler -- executes scheduled events In addition to receiving messages from the broker, a user or client code can give messages directly to the service diff --git a/module_bindings/dripline_core/_service_pybind.hh b/module_bindings/dripline_core/_service_pybind.hh index 555fc156..088a55c4 100644 --- a/module_bindings/dripline_core/_service_pybind.hh +++ b/module_bindings/dripline_core/_service_pybind.hh @@ -5,6 +5,7 @@ #include "_service_trampoline.hh" #include "core.hh" +#include "message_dispatcher.hh" #include "service.hh" #include "authentication.hh" @@ -26,6 +27,7 @@ namespace dripline_pybind _service_trampoline, dripline::core, dripline::endpoint, + dripline::message_dispatcher, dripline::receiver, dripline::scheduler<>, scarab::cancelable @@ -45,8 +47,7 @@ namespace dripline_pybind // Notes on send() bindings // The Service.send() functions are useful because they set the sender service name in the message before sending. - // The bound functions use lambdas because the dripline::service functions include amqp_ptr_t arguments which aren't known to pybind11. - // Therefore when called from Python, the send process will use the default parameter, a new AMQP connection. + // The bound functions use lambdas to avoid exposing rmqcpp internal types to pybind11. // The bindings to these functions are not included in the trampoline class because we're not directly overriding the C++ send() functions. .def( "send", [](_service& a_service, dripline::request_ptr_t a_request){return a_service.send(a_request);}, @@ -69,20 +70,6 @@ namespace dripline_pybind //TODO: need to deal with lr_ptr_t to bind this //.def_property_readonly( "async_children", &dripline::service::async_children ) - .def( "bind_keys", - &_service::bind_keys, - "overridable method to create all desired key bindings, overrides should still call this version", - DL_BIND_CALL_GUARD_STREAMS - ) - .def( "bind_key", - // Note, need to take a service pointer so that we can accept derived types... I think - [](_service* an_obj, std::string& an_exchange, std::string& a_key){return _service::bind_key(an_obj->channel(), an_exchange, an_obj->name(), a_key);}, - pybind11::arg( "exchange" ), - pybind11::arg( "key" ), - "bind the service's message queue to a particular exchange and key", - DL_BIND_CALL_GUARD_STREAMS - ) - .def( "run", &dripline::service::run, DL_BIND_CALL_GUARD_STREAMS_AND_GIL ) .def( "start", &dripline::service::start, DL_BIND_CALL_GUARD_STREAMS ) .def( "listen", &dripline::service::listen, DL_BIND_CALL_GUARD_STREAMS_AND_GIL ) diff --git a/module_bindings/dripline_core/_service_trampoline.hh b/module_bindings/dripline_core/_service_trampoline.hh index f0400d07..742353c0 100644 --- a/module_bindings/dripline_core/_service_trampoline.hh +++ b/module_bindings/dripline_core/_service_trampoline.hh @@ -15,8 +15,6 @@ namespace dripline_pybind using dripline::service::service; //make methods public for use in overload macro - using dripline::service::bind_keys; - using dripline::core::bind_key; using dripline::service::on_request_message; }; @@ -28,11 +26,6 @@ namespace dripline_pybind //_service_trampoline(_service &&base) : _service(std::move(base)) {} // Local overrides - bool bind_keys() override - { - PYBIND11_OVERRIDE( bool, _service, bind_keys, ); - } - void run() override { PYBIND11_OVERRIDE( void, _service, run, ); diff --git a/module_bindings/dripline_core/core_pybind.hh b/module_bindings/dripline_core/core_pybind.hh index 280df256..b1fcc3ed 100644 --- a/module_bindings/dripline_core/core_pybind.hh +++ b/module_bindings/dripline_core/core_pybind.hh @@ -41,8 +41,7 @@ namespace dripline_pybind ) // Notes on send() bindings - // The bound functions use lambdas because the dripline::core functions include amqp_ptr_t arguments which aren't known to pybind11. - // Therefore when called from Python, the send process will use the default parameter, a new AMQP connection. + // The bound functions use lambdas to avoid exposing rmqcpp internal types to pybind11. // The bindings to these functions are not included in a trampoline class because we're not directly overriding the C++ send() functions. // Therefore calls to send() from a base-class pointer will not redirect appropriately to the derived-class versions of send(). .def( "send", @@ -73,15 +72,6 @@ namespace dripline_pybind .def_property( "max_connection_attempts", &dripline::core::get_max_connection_attempts, &dripline::core::set_max_connection_attempts ) ; - // bind core's internal types - pybind11::enum_(t_core, "PostListenStatus") - .value("Unknown", dripline::core::post_listen_status::unknown) - .value("MessageReceived", dripline::core::post_listen_status::message_received) - .value("Timeout", dripline::core::post_listen_status::timeout) - .value("SoftError", dripline::core::post_listen_status::soft_error) - .value("HardError", dripline::core::post_listen_status::hard_error) - ; - return all_items; } } /* namespace dripline_pybind */ From 758c788218f5f284f0d897ec2d7caa152e9b3c09 Mon Sep 17 00:00:00 2001 From: Noah Oblath Date: Tue, 16 Jun 2026 15:49:55 -0700 Subject: [PATCH 2/5] Updating binding code and downstream Python code --- dripline/core/alert_consumer.py | 18 ++++-- dripline/core/endpoint.py | 12 ++++ dripline/core/service.py | 12 ++++ module_bindings/CMakeLists.txt | 2 + .../dripline_core/_service_pybind.hh | 38 +++++++++++++ .../dripline_core/_service_trampoline.hh | 31 ++++++++++ module_bindings/dripline_core/core_pybind.hh | 57 +++++++++++++++++++ .../dripline_core_namespace_pybind.cc | 4 ++ .../message_dispatcher_pybind.hh | 37 ++++++++++++ .../dripline_core/message_trampoline.hh | 7 ++- 10 files changed, 209 insertions(+), 9 deletions(-) create mode 100644 module_bindings/dripline_core/message_dispatcher_pybind.hh diff --git a/dripline/core/alert_consumer.py b/dripline/core/alert_consumer.py index 9434dd99..ac9647c0 100644 --- a/dripline/core/alert_consumer.py +++ b/dripline/core/alert_consumer.py @@ -28,15 +28,21 @@ def __init__(self, alert_keys=None, alert_key_parser_re='', **kwargs): ''' Service.__init__(self, **kwargs) self._alert_keys = ["#"] if alert_keys is None else alert_keys - self._alert_key_parser_re= alert_key_parser_re + self._alert_key_parser_re = alert_key_parser_re + self._alerts_queue = None # set during add_queues() + + def add_queues(self): + logger.debug("in python's add_queues") + Service.add_queues(self) + logger.debug(f"adding ephemeral alerts queue for '{self.name}'") + self._alerts_queue = self.add_alerts_ephemeral_queue(self.name) def bind_keys(self): - logger.debug("in python's bind keys") - to_return = Service.bind_keys(self); + logger.debug("in python's bind_keys") + Service.bind_keys(self) for a_key in self._alert_keys: - logger.debug(f" binding alert key {a_key}") - to_return = to_return and self.bind_key(self.alerts_exchange, a_key) - return to_return + logger.debug(f" binding alert key '{a_key}'") + self.bind_alerts_key(self.name, a_key, self._alerts_queue) def on_alert_message(self, an_alert): logger.debug("in python's on alert") diff --git a/dripline/core/endpoint.py b/dripline/core/endpoint.py index e1250a80..964b4120 100644 --- a/dripline/core/endpoint.py +++ b/dripline/core/endpoint.py @@ -23,6 +23,18 @@ def __init__(self, name): ''' _Endpoint.__init__(self, name) + def do_run_request(self, a_request_message): + ''' + Default function for handling an OP_RUN request message addressed to this endpoint. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the C++ base implementation. + + Args: + a_request_message (MsgRequest): the message received by this endpoint + ''' + return _Endpoint.do_run_request(self, a_request_message) + def do_get_request(self, a_request_message): ''' Default function for handling an OP_GET request message addressed to this endpoint. diff --git a/dripline/core/service.py b/dripline/core/service.py index 0b308a7d..5253ec95 100644 --- a/dripline/core/service.py +++ b/dripline/core/service.py @@ -143,6 +143,18 @@ def add_endpoints_from_config(self): logger.debug("queue up start logging for '{}'".format(an_endpoint.name)) an_endpoint.start_logging() + def do_run_request(self, a_request_message): + ''' + Default function for handling an OP_RUN request message addressed to this service. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the C++ base implementation. + + Args: + a_request_message (MsgRequest): the message received by this service + ''' + return _Service.do_run_request(self, a_request_message) + def do_get_request(self, a_request_message): ''' Default function for handling an OP_GET request message addressed to this service. diff --git a/module_bindings/CMakeLists.txt b/module_bindings/CMakeLists.txt index c0cc02a5..1c3f2571 100644 --- a/module_bindings/CMakeLists.txt +++ b/module_bindings/CMakeLists.txt @@ -12,9 +12,11 @@ set( PB_DRIPLINE_CORE_HEADERFILES dripline_core/constants_pybind.hh dripline_core/dripline_config_pybind.hh dripline_core/error_pybind.hh + dripline_core/message_dispatcher_pybind.hh dripline_core/message_pybind.hh dripline_core/message_virtual_pybind.hh dripline_core/receiver_pybind.hh + dripline_core/reply_cache_pybind.hh dripline_core/return_codes.hh dripline_core/return_code_trampoline.hh diff --git a/module_bindings/dripline_core/_service_pybind.hh b/module_bindings/dripline_core/_service_pybind.hh index 088a55c4..7b1d6e9a 100644 --- a/module_bindings/dripline_core/_service_pybind.hh +++ b/module_bindings/dripline_core/_service_pybind.hh @@ -8,6 +8,8 @@ #include "message_dispatcher.hh" #include "service.hh" +#include "rmqt_queue.h" + #include "authentication.hh" #include "param_binding_helpers.hh" @@ -79,7 +81,43 @@ namespace dripline_pybind //.def( "noisy_func", []() { pybind11::scoped_ostream_redirect stream(std::cout, pybind11::module::import("sys").attr("stdout"));}) .def( "on_request_message", &_service::on_request_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL ) + + // Message handler overrides + .def( "on_reply_message", &_service::on_reply_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "callback to execute when a new reply message is received; available for override" ) + .def( "on_alert_message", &_service::on_alert_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "callback to execute when a new alert message is received; available for override" ) + + // Request handler overrides + .def( "do_run_request", &_service::do_run_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "overridable method for implementing run handling behavior" ) + .def( "do_get_request", &_service::do_get_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "overridable method for implementing get handling behavior" ) + .def( "do_set_request", &_service::do_set_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "overridable method for implementing set handling behavior" ) + .def( "do_cmd_request", &_service::do_cmd_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "overridable method for implementing cmd handling behavior" ) + + // Service lifecycle hook overrides (called by start() in order) + .def( "open_channels", &_service::open_channels, DL_BIND_CALL_GUARD_STREAMS, + "virtual hook: open the AMQP connection; called first by start()" ) + .def( "add_queues", &_service::add_queues, DL_BIND_CALL_GUARD_STREAMS, + "virtual hook: declare AMQP queues in the topology; called second by start()" ) + .def( "bind_keys", &_service::bind_keys, DL_BIND_CALL_GUARD_STREAMS, + "virtual hook: bind routing keys to queues; called third by start()" ) + .def( "start_threads", &_service::start_threads, DL_BIND_CALL_GUARD_STREAMS, + "virtual hook: start heartbeat and scheduler threads; called fourth by start()" ) + .def( "stop_threads", &_service::stop_threads, DL_BIND_CALL_GUARD_STREAMS, + "virtual hook: join heartbeat and scheduler threads; called by listen() and stop()" ) + + // Message dispatcher override + .def( "submit_message", &_service::submit_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "virtual hook: dispatch an assembled Dripline message; called by the rmqcpp callback thread" ) + + // Note: the "queue" property is inherited from the MessageDispatcher base class + // (registered in message_dispatcher_pybind.hh) ; + return all_items; } } /* namespace dripline_pybind */ diff --git a/module_bindings/dripline_core/_service_trampoline.hh b/module_bindings/dripline_core/_service_trampoline.hh index 742353c0..3364a13b 100644 --- a/module_bindings/dripline_core/_service_trampoline.hh +++ b/module_bindings/dripline_core/_service_trampoline.hh @@ -16,6 +16,7 @@ namespace dripline_pybind //make methods public for use in overload macro using dripline::service::on_request_message; + using dripline::service::submit_message; }; @@ -47,6 +48,10 @@ namespace dripline_pybind } // Overrides for virtual do_[request-type]_request + dripline::reply_ptr_t do_run_request( const dripline::request_ptr_t a_request ) override + { + PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_run_request, a_request ); + } dripline::reply_ptr_t do_get_request( const dripline::request_ptr_t a_request ) override { PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_get_request, a_request ); @@ -60,6 +65,32 @@ namespace dripline_pybind PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_cmd_request, a_request ); } + // Overrides for virtual service lifecycle hooks + void open_channels() override + { + PYBIND11_OVERRIDE( void, _service, open_channels, ); + } + void add_queues() override + { + PYBIND11_OVERRIDE( void, _service, add_queues, ); + } + void bind_keys() override + { + PYBIND11_OVERRIDE( void, _service, bind_keys, ); + } + void start_threads() override + { + PYBIND11_OVERRIDE( void, _service, start_threads, ); + } + void stop_threads() override + { + PYBIND11_OVERRIDE( void, _service, stop_threads, ); + } + void submit_message( dripline::message_ptr_t a_message ) override + { + PYBIND11_OVERRIDE( void, _service, submit_message, a_message ); + } + }; } /* namespace dripline_pybind */ diff --git a/module_bindings/dripline_core/core_pybind.hh b/module_bindings/dripline_core/core_pybind.hh index b1fcc3ed..e8166295 100644 --- a/module_bindings/dripline_core/core_pybind.hh +++ b/module_bindings/dripline_core/core_pybind.hh @@ -8,6 +8,8 @@ #include "authentication.hh" +#include "rmqt_queue.h" + #include "pybind11/pybind11.h" #include "pybind11/stl.h" #include "pybind11/iostream.h" @@ -18,6 +20,13 @@ namespace dripline_pybind { std::list< std::string > all_items; + all_items.push_back( "QueueHandle" ); + pybind11::class_< BloombergLP::rmqt::QueueHandle >( mod, "QueueHandle", + "Opaque handle to an AMQP queue. " + "Returned by add_requests_*_queue() and add_alerts_*_queue(); " + "passed to bind_requests_key(), bind_alerts_key(), and the queue property." ) + ; + all_items.push_back( "SentMessagePackage" ); pybind11::classh< dripline::sent_msg_pkg >( mod, "SentMessagePackage", "Data structure for sent messages" ) .def_property_readonly( "successful_send", [](const dripline::sent_msg_pkg& an_obj){ return an_obj.f_successful_send; } ) @@ -70,6 +79,54 @@ namespace dripline_pybind .def_property( "max_payload_size", &dripline::core::get_max_payload_size, &dripline::core::set_max_payload_size ) .def_property( "make_connection", &dripline::core::get_make_connection, &dripline::core::set_make_connection ) .def_property( "max_connection_attempts", &dripline::core::get_max_connection_attempts, &dripline::core::set_max_connection_attempts ) + + // Topology helpers (public in core) + .def( "open_connection", + &dripline::core::open_connection, + DL_BIND_CALL_GUARD_STREAMS, + "Open the RabbitMQ connection and declare both exchanges in the topology. " + "Must be called before any add_*_queue() or bind_*_key() call." ) + .def( "add_requests_durable_queue", + &dripline::core::add_requests_durable_queue, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), + "Declare a durable queue on the requests exchange. " + "Must be called after open_connection(). " + "Returns a QueueHandle to pass to bind_requests_key()." ) + .def( "add_requests_ephemeral_queue", + &dripline::core::add_requests_ephemeral_queue, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), + "Declare an ephemeral (auto-delete) queue on the requests exchange. " + "Must be called after open_connection(). " + "Returns a QueueHandle to pass to bind_requests_key()." ) + .def( "add_alerts_durable_queue", + &dripline::core::add_alerts_durable_queue, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), + "Declare a durable queue on the alerts exchange. " + "Must be called after open_connection(). " + "Returns a QueueHandle to pass to bind_alerts_key()." ) + .def( "add_alerts_ephemeral_queue", + &dripline::core::add_alerts_ephemeral_queue, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), + "Declare an ephemeral (auto-delete) queue on the alerts exchange. " + "Must be called after open_connection(). " + "Returns a QueueHandle to pass to bind_alerts_key()." ) + .def( "bind_requests_key", + &dripline::core::bind_requests_key, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), pybind11::arg("routing_key"), pybind11::arg("queue"), + "Bind a queue to the requests exchange with the given routing key. " + "Must be called after add_requests_*_queue()." ) + .def( "bind_alerts_key", + &dripline::core::bind_alerts_key, + DL_BIND_CALL_GUARD_STREAMS, + pybind11::arg("queue_name"), pybind11::arg("routing_key"), pybind11::arg("queue"), + "Bind a queue to the alerts exchange with the given routing key. " + "Must be called after add_alerts_*_queue() (or add_requests_*_queue() " + "if sharing a single queue across both exchanges, as monitor does)." ) ; return all_items; diff --git a/module_bindings/dripline_core/dripline_core_namespace_pybind.cc b/module_bindings/dripline_core/dripline_core_namespace_pybind.cc index 30a6c4e0..1cb30864 100644 --- a/module_bindings/dripline_core/dripline_core_namespace_pybind.cc +++ b/module_bindings/dripline_core/dripline_core_namespace_pybind.cc @@ -6,8 +6,10 @@ #include "_endpoint_pybind.hh" #include "_endpoint_trampoline.hh" #include "error_pybind.hh" +#include "message_dispatcher_pybind.hh" #include "message_pybind.hh" #include "receiver_pybind.hh" + #include "reply_cache_pybind.hh" #include "return_codes_pybind.hh" #include "scheduler_pybind.hh" @@ -31,7 +33,9 @@ PYBIND11_MODULE( _dripline, dripline_mod ) all_members.splice( all_members.end(), dripline_pybind::export_error( dripline_core_mod ) ); all_members.splice( all_members.end(), dripline_pybind::export_message( dripline_core_mod ) ); all_members.splice( all_members.end(), dripline_pybind::export_receiver( dripline_core_mod ) ); + all_members.splice( all_members.end(), dripline_pybind::export_message_dispatcher( dripline_core_mod ) ); all_members.splice( all_members.end(), dripline_pybind::export_return_codes( dripline_core_mod ) ); + all_members.splice( all_members.end(), dripline_pybind::export_scheduler( dripline_core_mod ) ); //all_members.splice( all_members.end(), dripline_pybind::export_run_simple_service( dripline_core_mod ) ); all_members.splice( all_members.end(), dripline_pybind::export_specifier( dripline_core_mod ) ); diff --git a/module_bindings/dripline_core/message_dispatcher_pybind.hh b/module_bindings/dripline_core/message_dispatcher_pybind.hh new file mode 100644 index 00000000..ac4bae33 --- /dev/null +++ b/module_bindings/dripline_core/message_dispatcher_pybind.hh @@ -0,0 +1,37 @@ +#ifndef DRIPLINE_PYBIND_MESSAGE_DISPATCHER_HH_ +#define DRIPLINE_PYBIND_MESSAGE_DISPATCHER_HH_ + +#include "binding_helpers.hh" + +#include "message_dispatcher.hh" + +#include "rmqt_queue.h" + +#include "pybind11/pybind11.h" + +namespace dripline_pybind +{ + std::list< std::string > export_message_dispatcher( pybind11::module& mod ) + { + std::list< std::string > all_members; + + all_members.push_back( "MessageDispatcher" ); + // message_dispatcher is abstract (submit_message() is pure-virtual), so no constructor is bound. + // It must be registered here because it is listed as a base class of _Service. + pybind11::classh< dripline::message_dispatcher, dripline::receiver >( mod, "MessageDispatcher", + "Receives assembled Dripline messages from an AMQP consumer and dispatches them" ) + + // Queue handle property + .def_property( "queue", + &dripline::message_dispatcher::get_queue, + &dripline::message_dispatcher::set_queue, + "The QueueHandle for this dispatcher's AMQP queue. " + "Set by add_queues() (via add_requests_ephemeral_queue()), " + "and passed to start_listening() during listen()." ) + + ; + + return all_members; + } +} /* namespace dripline_pybind */ +#endif /* DRIPLINE_PYBIND_MESSAGE_DISPATCHER_HH_ */ diff --git a/module_bindings/dripline_core/message_trampoline.hh b/module_bindings/dripline_core/message_trampoline.hh index 003df584..70e14ada 100644 --- a/module_bindings/dripline_core/message_trampoline.hh +++ b/module_bindings/dripline_core/message_trampoline.hh @@ -28,16 +28,17 @@ namespace dripline_pybind } // don't know if we plan to override in python, but need this method to not be pure-virtual - void derived_modify_amqp_message( dripline::amqp_message_ptr a_amqp_msg, AmqpClient::Table& a_properties ) const override + void derived_modify_amqp_message( BloombergLP::rmqt::FieldTable& a_headers ) const override { - PYBIND11_OVERRIDE_PURE( void, dripline::message, derived_modify_amqp_message, a_amqp_msg, a_properties ); + PYBIND11_OVERRIDE_PURE( void, dripline::message, derived_modify_amqp_message, a_headers ); } void derived_modify_message_param( scarab::param_node& a_node ) const override { - PYBIND11_OVERRIDE_PURE( void, dripline::message, derived_modify_message_body, a_node ); + PYBIND11_OVERRIDE_PURE( void, dripline::message, derived_modify_message_param, a_node ); } + dripline::msg_t message_type() const override { PYBIND11_OVERRIDE_PURE( dripline::msg_t, dripline::message, message_type, ); From 3f3b2cb3ccde6c600f7d1cac28e63da12c91837e Mon Sep 17 00:00:00 2001 From: Noah Oblath Date: Tue, 16 Jun 2026 15:59:57 -0700 Subject: [PATCH 3/5] Use a test branch for the base container --- .github/workflows/publish.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 8b4d4f3f..bda09ed8 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -12,10 +12,10 @@ env: REGISTRY_OLD: docker.io BASE_IMAGE_USER: ghcr.io/driplineorg BASE_IMAGE_REPO: dripline-cpp - DEV_SUFFIX: '-dev' - BASE_IMAGE_TAG: 'v2.10.11' -# BASE_IMAGE_TAG: 'dlcpp-hf2.10.8' -# DEV_SUFFIX: '' +# DEV_SUFFIX: '-dev' +# BASE_IMAGE_TAG: 'v2.10.11' + BASE_IMAGE_TAG: 'test-new-amqp' + DEV_SUFFIX: '' jobs: From 44363b1b58b1d50c98029517e2bc7ae62514a29b Mon Sep 17 00:00:00 2001 From: Noah Oblath Date: Tue, 16 Jun 2026 18:06:27 -0700 Subject: [PATCH 4/5] Fixing the dl-cpp image tag for the test build --- .github/workflows/publish.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index bda09ed8..e70c3788 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -14,7 +14,7 @@ env: BASE_IMAGE_REPO: dripline-cpp # DEV_SUFFIX: '-dev' # BASE_IMAGE_TAG: 'v2.10.11' - BASE_IMAGE_TAG: 'test-new-amqp' + BASE_IMAGE_TAG: 'gha-test-new-amqp' DEV_SUFFIX: '' jobs: From e42f8a28bf5b77688b77ee5f8b288857cb5c9088 Mon Sep 17 00:00:00 2001 From: Noah Oblath Date: Fri, 19 Jun 2026 12:55:33 -0700 Subject: [PATCH 5/5] Use importlib.metadata instead of pkg_resources --- dripline/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dripline/__init__.py b/dripline/__init__.py index 09cf17ba..d5bb19a6 100644 --- a/dripline/__init__.py +++ b/dripline/__init__.py @@ -7,11 +7,11 @@ def __get_version(): import scarab - import pkg_resources + import importlib.metadata #TODO: this all needs to be populated from setup.py and gita version = scarab.VersionSemantic() - logger.info('version should be: {}'.format(pkg_resources.get_distribution('dripline').version)) - version.parse(pkg_resources.get_distribution('dripline').version) + logger.info('version should be: {}'.format(importlib.metadata.version('dripline'))) + version.parse(importlib.metadata.version('dripline')) version.package = 'driplineorg/dripline-python' version.commit = 'na' core.add_version('dripline-python', version)