From e4287393c4ca9c091d9f8aa6f85ba8a36ca53c00 Mon Sep 17 00:00:00 2001 From: tcely Date: Fri, 15 May 2026 18:07:48 -0400 Subject: [PATCH 1/3] feat: retry messages after reconnecting --- src_py/hat/syslog/handler.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src_py/hat/syslog/handler.py b/src_py/hat/syslog/handler.py index 098c148..938c7d0 100644 --- a/src_py/hat/syslog/handler.py +++ b/src_py/hat/syslog/handler.py @@ -129,7 +129,9 @@ def _logging_handler_thread(state): ctx.check_hostname = False ctx.verify_mode = ssl.VerifyMode.CERT_NONE + # keep looping until signaled that the handler is closed while not state.closed.is_set(): + # connect to the endpoint try: if state.comm_type == common.CommType.UDP: s = socket.socket(type=socket.SOCK_DGRAM) @@ -151,6 +153,8 @@ def _logging_handler_thread(state): time.sleep(state.reconnect_delay) continue + # the connection was established + msg = None try: while True: with state.cv: @@ -176,10 +180,17 @@ def _logging_handler_thread(state): else: s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) + msg = None + msg_bytes = None + except Exception: - pass + # When failing with msg assigned, put the msg back. + # After reconnecting we will try it again. + if msg is not None: + state.queue.appendleft(msg) finally: + # close the connection to avoid leaking it with contextlib.suppress(Exception): s.close() From 8df6d6f26d14a3368e4223d0a0e984d770c524da Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 6 Jun 2026 06:02:53 -0400 Subject: [PATCH 2/3] chore: remove unrelated comments --- src_py/hat/syslog/handler.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src_py/hat/syslog/handler.py b/src_py/hat/syslog/handler.py index 938c7d0..73c3aa6 100644 --- a/src_py/hat/syslog/handler.py +++ b/src_py/hat/syslog/handler.py @@ -129,9 +129,7 @@ def _logging_handler_thread(state): ctx.check_hostname = False ctx.verify_mode = ssl.VerifyMode.CERT_NONE - # keep looping until signaled that the handler is closed while not state.closed.is_set(): - # connect to the endpoint try: if state.comm_type == common.CommType.UDP: s = socket.socket(type=socket.SOCK_DGRAM) @@ -153,7 +151,6 @@ def _logging_handler_thread(state): time.sleep(state.reconnect_delay) continue - # the connection was established msg = None try: while True: @@ -190,7 +187,6 @@ def _logging_handler_thread(state): state.queue.appendleft(msg) finally: - # close the connection to avoid leaking it with contextlib.suppress(Exception): s.close() From 19ee558bb457d59f9a44e32b5ff66bf896b21397 Mon Sep 17 00:00:00 2001 From: tcely Date: Sat, 6 Jun 2026 19:51:03 -0400 Subject: [PATCH 3/3] fix: send dropped message before adjusting the counter --- src_py/hat/syslog/handler.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src_py/hat/syslog/handler.py b/src_py/hat/syslog/handler.py index 73c3aa6..ff6e7a6 100644 --- a/src_py/hat/syslog/handler.py +++ b/src_py/hat/syslog/handler.py @@ -162,23 +162,18 @@ def _logging_handler_thread(state): return if state.dropped[0]: - msg = _create_dropped_msg( + msg = None + dropped_msg = _create_dropped_msg( state.dropped[0], '_logging_handler_thread', 0) + _send_message(s, dropped_msg, state.comm_type == common.CommType.UDP) state.dropped[0] = 0 + continue else: msg = state.queue.popleft() - msg_bytes = encoder.msg_to_str(msg).encode() - - if state.comm_type == common.CommType.UDP: - s.send(msg_bytes) - - else: - s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) - + _send_message(s, msg, state.comm_type == common.CommType.UDP) msg = None - msg_bytes = None except Exception: # When failing with msg assigned, put the msg back. @@ -219,6 +214,14 @@ def _record_to_msg(record): msg=record.getMessage()) +def _send_message(s, msg, only_bytes=False): + msg_bytes = encoder.msg_to_str(msg).encode() + if only_bytes: + s.send(msg_bytes) + else: + s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) + + def _create_dropped_msg(dropped, func_name, lineno): hat_data = {'name': __name__, 'thread': str(threading.get_ident()),