Skip to content

feat: retry messages after reconnecting#9

Open
tcely wants to merge 3 commits into
hat-open:masterfrom
tcely:patch-4
Open

feat: retry messages after reconnecting#9
tcely wants to merge 3 commits into
hat-open:masterfrom
tcely:patch-4

Conversation

@tcely
Copy link
Copy Markdown
Contributor

@tcely tcely commented May 15, 2026

This is a simple change to prevent broken connections from causing generated/queued messages from being lost.

@bozokopic
Copy link
Copy Markdown
Contributor

i think that additional comments from this commit are not necessary and are not related to added functionality

regarding described functionality, i think this is something that should be incorporated into master branch but i don't like proposed implementation. as simple alternative, you could move "state.dropped[0] = 0" from 'after _create_message' to 'after send'. this would cause invalidation of dropped counter after message is successfully sent.

@tcely
Copy link
Copy Markdown
Contributor Author

tcely commented Jun 6, 2026

i think that additional comments from this commit are not necessary and are not related to added functionality

Fair enough. Dropping the additional comments won't be any trouble.

regarding described functionality, i think this is something that should be incorporated into master branch but i don't like proposed implementation. as simple alternative, you could move "state.dropped[0] = 0" from 'after _create_message' to 'after send'. this would cause invalidation of dropped counter after message is successfully sent.

What advantage do you hope to achieve by changing the counter reset from after the summary message is created and enqueued to after it is sent??

From my perspective, this seems to be a way to desynchronize the state for no good purpose.

@bozokopic
Copy link
Copy Markdown
Contributor

i was thinking about something like this:

--- a/src_py/hat/syslog/handler.py
+++ b/src_py/hat/syslog/handler.py
@@ -160,10 +160,11 @@ def _logging_handler_thread(state):
                     if state.closed.is_set():
                         return

-                    if state.dropped[0]:
+                    dropped = state.dropped[0]
+
+                    if dropped:
                         msg = _create_dropped_msg(
-                            state.dropped[0], '_logging_handler_thread', 0)
-                        state.dropped[0] = 0
+                            dropped, '_logging_handler_thread', 0)

                     else:
                         msg = state.queue.popleft()
@@ -176,6 +177,10 @@ def _logging_handler_thread(state):
                 else:
                     s.send(f'{len(msg_bytes)} '.encode() + msg_bytes)

+                if dropped:
+                    with state.cv:
+                        state.dropped[0] -= dropped
+
         except Exception:
             pass

in this change, number of dropped messages is not reset (decremented to 0) immediately when dropped message is formatted. dropped counter is decremented after message is sent which means that send error will not reset dropped counter.

@tcely
Copy link
Copy Markdown
Contributor Author

tcely commented Jun 6, 2026

in this change, number of dropped messages is not reset (decremented to 0) immediately when dropped message is formatted. dropped counter is decremented after message is sent which means that send error will not reset dropped counter.

Yes, I understand what you were proposing. However, it is not advantageous to do this as far as I understand.

Why do you want to disconnect the dropped messages counter manipulation from the reporting message creation at all?

For the case where the message is not sent, the reporting message will be saved in the queue anyway. If you didn't adjust the counter at message creation, but waited until a successful send, the reporting message would be created over and over again.

@bozokopic
Copy link
Copy Markdown
Contributor

bozokopic commented Jun 6, 2026

first of all, unconditional adding of message to queue from consumer side violates queue_size limit. it is possible for queue to grow without limit if send repeatedly fails and dropped counter is incremented - each time "connection loop" detects new dropped message, it will add new message to queue.

message reporting other dropped messages should not be persisted in queue. exact number of dropped messages precisely before send attempt is irrelevant to user - eventual total number of dropped messages can have significance. as user, i would rather receive one message informing me of 10 dropped messages, then receive two consecutive messages where each informs me of 5 dropped messages.

i think that remembering of message that could not be sent is good thing but i would limit this returning of unsent messages to only those that were previously removed from the queue. if we return to queue only those messages that where previously removed from queue, queue size will exceed queue_size only by one and we wont have possibility of limitless queue growth

@bozokopic
Copy link
Copy Markdown
Contributor

please disregard my previous conclusion of unbounded message queue growth.

looking at code, producer (emit method) contains while loop that will drop messages from queue - even those that were set by "connection loop"

@bozokopic
Copy link
Copy Markdown
Contributor

consider this edge case

lets call emit method producer and _logging_handler_thread consumer. if consumer calls appendleft on queue when send fails, and send in not locked by mutex associated with shared conditional variable, following scenario is possible:

  • produces appends messages to queue until queue is full
  • consumer popleft single message
  • consumer call send and waits for it to finish
  • producer appends single message to queue
  • producer appends another message to queue but queue size is exceeded so dropped counter is incremented and popleft is called
  • consumers call to send finishes with exception so it appends left message that it was tring to send
  • consumer detects drop counter >0 and sends message information of 1 dropped message
  • consumer popleft queue (retrieves message for which send failed) and sends it

in this case, information about dropped messages is out of order - it is reported before "older" message

@tcely
Copy link
Copy Markdown
Contributor Author

tcely commented Jun 6, 2026

I have added a commit to attempt sending the created message while still holding the lock to address the edge case that you described.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants