Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ env:
REGISTRY_OLD: docker.io
BASE_IMAGE_USER: ghcr.io/driplineorg
BASE_IMAGE_REPO: dripline-cpp
DEV_SUFFIX: '-dev'
BASE_IMAGE_TAG: 'v2.10.11'
# BASE_IMAGE_TAG: 'dlcpp-hf2.10.8'
# DEV_SUFFIX: ''
# DEV_SUFFIX: '-dev'
# BASE_IMAGE_TAG: 'v2.10.11'
BASE_IMAGE_TAG: 'gha-test-new-amqp'
DEV_SUFFIX: ''

jobs:

Expand Down
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ include( GNUInstallDirs )
# We need to find Boost here so that all of the components have already been found
# before each dependency's config
# Here's the master list of required boost components
# Requirements come from: SimpleAmqpClient, and Scarab
list( APPEND boost_components chrono filesystem system )
# Boost (1.47 required for SimpleAmqpClient)
find_package( Boost 1.47.0 REQUIRED COMPONENTS ${boost_components} )
# Requirements come from: rmqcpp (via Dripline), and Scarab
list( APPEND boost_components filesystem )
# Boost (>=1.70 required for Scarab)
find_package( Boost 1.70.0 REQUIRED COMPONENTS ${boost_components} )

set( PUBLIC_EXT_LIBS )

Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ARG img_user=ghcr.io/driplineorg
ARG img_repo=dripline-cpp
#ARG img_tag=dlcpp-hf2.10.8
# TODO: update to the first dripline-cpp tag built with rmqcpp (v2.10.11 was the last SimpleAmqpClient-based release)
ARG img_tag=v2.10.11

FROM ${img_user}/${img_repo}:${img_tag} AS deps
Expand Down
6 changes: 3 additions & 3 deletions dripline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

def __get_version():
import scarab
import pkg_resources
import importlib.metadata
#TODO: this all needs to be populated from setup.py and gita
version = scarab.VersionSemantic()
logger.info('version should be: {}'.format(pkg_resources.get_distribution('dripline').version))
version.parse(pkg_resources.get_distribution('dripline').version)
logger.info('version should be: {}'.format(importlib.metadata.version('dripline')))
version.parse(importlib.metadata.version('dripline'))
version.package = 'driplineorg/dripline-python'
version.commit = 'na'
core.add_version('dripline-python', version)
Expand Down
18 changes: 12 additions & 6 deletions dripline/core/alert_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ def __init__(self, alert_keys=None, alert_key_parser_re='', **kwargs):
'''
Service.__init__(self, **kwargs)
self._alert_keys = ["#"] if alert_keys is None else alert_keys
self._alert_key_parser_re= alert_key_parser_re
self._alert_key_parser_re = alert_key_parser_re
self._alerts_queue = None # set during add_queues()

def add_queues(self):
logger.debug("in python's add_queues")
Service.add_queues(self)
logger.debug(f"adding ephemeral alerts queue for '{self.name}'")
self._alerts_queue = self.add_alerts_ephemeral_queue(self.name)

def bind_keys(self):
logger.debug("in python's bind keys")
to_return = Service.bind_keys(self);
logger.debug("in python's bind_keys")
Service.bind_keys(self)
for a_key in self._alert_keys:
logger.debug(f" binding alert key {a_key}")
to_return = to_return and self.bind_key(self.alerts_exchange, a_key)
return to_return
logger.debug(f" binding alert key '{a_key}'")
self.bind_alerts_key(self.name, a_key, self._alerts_queue)

def on_alert_message(self, an_alert):
logger.debug("in python's on alert")
Expand Down
12 changes: 12 additions & 0 deletions dripline/core/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ def __init__(self, name):
'''
_Endpoint.__init__(self, name)

def do_run_request(self, a_request_message):
'''
Default function for handling an OP_RUN request message addressed to this endpoint.

.. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from
the C++ base class. It intentionally just calls the C++ base implementation.

Args:
a_request_message (MsgRequest): the message received by this endpoint
'''
return _Endpoint.do_run_request(self, a_request_message)

def do_get_request(self, a_request_message):
'''
Default function for handling an OP_GET request message addressed to this endpoint.
Expand Down
29 changes: 17 additions & 12 deletions dripline/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,13 @@ class Service(_Service, ObjectCreator, RequestSender, RequestHandler):
A service has a number of key characteristics (most of which come from its parent classes):
* `core` -- Has all of the basic AMQP capabilities, sending messages, and making and manipulating connections
* `endpoint` -- Handles Dripline messages
* `listener_receiver` -- Asynchronously recieves AMQP messages and turns them into Dripline messages
* `message_dispatcher` -- Receives AMQP messages via rmqcpp callbacks and dispatches them as Dripline messages
* `heartbeater` -- Sends periodic heartbeat messages
* `scheduler` -- Can schedule events

As is apparent from the above descriptions, a service is responsible for a number of threads
when it executes:
* Listening -- grabs AMQP messages off the channel when they arrive
* Message-wait -- any incomplete multi-part Dripline message will setup a thread to wait
* until the message is complete, and then submits it for handling
* Receiver -- grabs completed Dripline messages and handles it
* Async endpoint listening -- same as abovefor each asynchronous endpoint
* Async endpoint message-wait -- same as above for each asynchronous endpoint
* Async endpoint receiver -- same as above for each asynchronous endpoint
* Heatbeater -- sends regular heartbeat messages

Message delivery is handled by rmqcpp's internal thread pool via callbacks.
dripline-cpp manages only the following threads:
* Heartbeater -- sends regular heartbeat messages
* Scheduler -- executes scheduled events

In addition to receiving messages from the broker, a user or client code can give messages directly to the service
Expand Down Expand Up @@ -150,6 +143,18 @@ def add_endpoints_from_config(self):
logger.debug("queue up start logging for '{}'".format(an_endpoint.name))
an_endpoint.start_logging()

def do_run_request(self, a_request_message):
'''
Default function for handling an OP_RUN request message addressed to this service.

.. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from
the C++ base class. It intentionally just calls the C++ base implementation.

Args:
a_request_message (MsgRequest): the message received by this service
'''
return _Service.do_run_request(self, a_request_message)

def do_get_request(self, a_request_message):
'''
Default function for handling an OP_GET request message addressed to this service.
Expand Down
2 changes: 2 additions & 0 deletions module_bindings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ set( PB_DRIPLINE_CORE_HEADERFILES
dripline_core/constants_pybind.hh
dripline_core/dripline_config_pybind.hh
dripline_core/error_pybind.hh
dripline_core/message_dispatcher_pybind.hh
dripline_core/message_pybind.hh
dripline_core/message_virtual_pybind.hh
dripline_core/receiver_pybind.hh

dripline_core/reply_cache_pybind.hh
dripline_core/return_codes.hh
dripline_core/return_code_trampoline.hh
Expand Down
57 changes: 41 additions & 16 deletions module_bindings/dripline_core/_service_pybind.hh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
#include "_service_trampoline.hh"

#include "core.hh"
#include "message_dispatcher.hh"
#include "service.hh"

#include "rmqt_queue.h"

#include "authentication.hh"
#include "param_binding_helpers.hh"

Expand All @@ -26,6 +29,7 @@ namespace dripline_pybind
_service_trampoline,
dripline::core,
dripline::endpoint,
dripline::message_dispatcher,
dripline::receiver,
dripline::scheduler<>,
scarab::cancelable
Expand All @@ -45,8 +49,7 @@ namespace dripline_pybind

// Notes on send() bindings
// The Service.send() functions are useful because they set the sender service name in the message before sending.
// The bound functions use lambdas because the dripline::service functions include amqp_ptr_t arguments which aren't known to pybind11.
// Therefore when called from Python, the send process will use the default parameter, a new AMQP connection.
// The bound functions use lambdas to avoid exposing rmqcpp internal types to pybind11.
// The bindings to these functions are not included in the trampoline class because we're not directly overriding the C++ send() functions.
.def( "send",
[](_service& a_service, dripline::request_ptr_t a_request){return a_service.send(a_request);},
Expand All @@ -69,20 +72,6 @@ namespace dripline_pybind
//TODO: need to deal with lr_ptr_t to bind this
//.def_property_readonly( "async_children", &dripline::service::async_children )

.def( "bind_keys",
&_service::bind_keys,
"overridable method to create all desired key bindings, overrides should still call this version",
DL_BIND_CALL_GUARD_STREAMS
)
.def( "bind_key",
// Note, need to take a service pointer so that we can accept derived types... I think
[](_service* an_obj, std::string& an_exchange, std::string& a_key){return _service::bind_key(an_obj->channel(), an_exchange, an_obj->name(), a_key);},
pybind11::arg( "exchange" ),
pybind11::arg( "key" ),
"bind the service's message queue to a particular exchange and key",
DL_BIND_CALL_GUARD_STREAMS
)

.def( "run", &dripline::service::run, DL_BIND_CALL_GUARD_STREAMS_AND_GIL )
.def( "start", &dripline::service::start, DL_BIND_CALL_GUARD_STREAMS )
.def( "listen", &dripline::service::listen, DL_BIND_CALL_GUARD_STREAMS_AND_GIL )
Expand All @@ -92,7 +81,43 @@ namespace dripline_pybind
//.def( "noisy_func", []() { pybind11::scoped_ostream_redirect stream(std::cout, pybind11::module::import("sys").attr("stdout"));})

.def( "on_request_message", &_service::on_request_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL )

// Message handler overrides
.def( "on_reply_message", &_service::on_reply_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"callback to execute when a new reply message is received; available for override" )
.def( "on_alert_message", &_service::on_alert_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"callback to execute when a new alert message is received; available for override" )

// Request handler overrides
.def( "do_run_request", &_service::do_run_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"overridable method for implementing run handling behavior" )
.def( "do_get_request", &_service::do_get_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"overridable method for implementing get handling behavior" )
.def( "do_set_request", &_service::do_set_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"overridable method for implementing set handling behavior" )
.def( "do_cmd_request", &_service::do_cmd_request, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"overridable method for implementing cmd handling behavior" )

// Service lifecycle hook overrides (called by start() in order)
.def( "open_channels", &_service::open_channels, DL_BIND_CALL_GUARD_STREAMS,
"virtual hook: open the AMQP connection; called first by start()" )
.def( "add_queues", &_service::add_queues, DL_BIND_CALL_GUARD_STREAMS,
"virtual hook: declare AMQP queues in the topology; called second by start()" )
.def( "bind_keys", &_service::bind_keys, DL_BIND_CALL_GUARD_STREAMS,
"virtual hook: bind routing keys to queues; called third by start()" )
.def( "start_threads", &_service::start_threads, DL_BIND_CALL_GUARD_STREAMS,
"virtual hook: start heartbeat and scheduler threads; called fourth by start()" )
.def( "stop_threads", &_service::stop_threads, DL_BIND_CALL_GUARD_STREAMS,
"virtual hook: join heartbeat and scheduler threads; called by listen() and stop()" )

// Message dispatcher override
.def( "submit_message", &_service::submit_message, DL_BIND_CALL_GUARD_STREAMS_AND_GIL,
"virtual hook: dispatch an assembled Dripline message; called by the rmqcpp callback thread" )

// Note: the "queue" property is inherited from the MessageDispatcher base class
// (registered in message_dispatcher_pybind.hh)
;

return all_items;
}
} /* namespace dripline_pybind */
Expand Down
38 changes: 31 additions & 7 deletions module_bindings/dripline_core/_service_trampoline.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ namespace dripline_pybind
using dripline::service::service;

//make methods public for use in overload macro
using dripline::service::bind_keys;
using dripline::core::bind_key;
using dripline::service::on_request_message;
using dripline::service::submit_message;

};

Expand All @@ -28,11 +27,6 @@ namespace dripline_pybind
//_service_trampoline(_service &&base) : _service(std::move(base)) {}

// Local overrides
bool bind_keys() override
{
PYBIND11_OVERRIDE( bool, _service, bind_keys, );
}

void run() override
{
PYBIND11_OVERRIDE( void, _service, run, );
Expand All @@ -54,6 +48,10 @@ namespace dripline_pybind
}

// Overrides for virtual do_[request-type]_request
dripline::reply_ptr_t do_run_request( const dripline::request_ptr_t a_request ) override
{
PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_run_request, a_request );
}
dripline::reply_ptr_t do_get_request( const dripline::request_ptr_t a_request ) override
{
PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_get_request, a_request );
Expand All @@ -67,6 +65,32 @@ namespace dripline_pybind
PYBIND11_OVERRIDE( dripline::reply_ptr_t, _service, do_cmd_request, a_request );
}

// Overrides for virtual service lifecycle hooks
void open_channels() override
{
PYBIND11_OVERRIDE( void, _service, open_channels, );
}
void add_queues() override
{
PYBIND11_OVERRIDE( void, _service, add_queues, );
}
void bind_keys() override
{
PYBIND11_OVERRIDE( void, _service, bind_keys, );
}
void start_threads() override
{
PYBIND11_OVERRIDE( void, _service, start_threads, );
}
void stop_threads() override
{
PYBIND11_OVERRIDE( void, _service, stop_threads, );
}
void submit_message( dripline::message_ptr_t a_message ) override
{
PYBIND11_OVERRIDE( void, _service, submit_message, a_message );
}

};

} /* namespace dripline_pybind */
Expand Down
Loading
Loading