diff --git a/Developer.md b/Developer.md index d20d474..25dcf6e 100644 --- a/Developer.md +++ b/Developer.md @@ -119,8 +119,9 @@ print("Published:", job_id) ``` python import time from omniq.client import OmniqClient +from omniq.types import JobCtx -def handler(ctx): +def handler(ctx: JobCtx): print("Processing:", ctx.job_id) time.sleep(2) print("Done") @@ -283,7 +284,7 @@ The heartbeat call renews the lease of the currently running job. ## Handler Context -### Inside handler(ctx): +### Inside handler(ctx: JobCtx): | Field | Description | |----------------|--------------------------------------------| @@ -292,6 +293,7 @@ The heartbeat call renews the lease of the currently running job. | payload | Deserialized payload | | payload_raw | Raw JSON | | attempt | Current attempt number | +| max_attempts | Maximum number of attempts for the job | | lock_until_ms | Lease expiration timestamp | | lease_token | Required token for ACK/heartbeat | | gid | Group identifier | @@ -398,6 +400,8 @@ Can be used externally or inside a handler via ctx.exec. ## Parent/Child Flow ``` python +from omniq.types import JobCtx + remaining = ctx.exec.child_ack(completion_key) if remaining == 0: @@ -406,7 +410,7 @@ if remaining == 0: **Returns:** - Pending children -> `> 0` - Last Child -> `0` -- Counter error -> `1` +- Counter error -> `-1` **Properties:** - Idempotent diff --git a/README.md b/README.md index 7ea38a3..ddac94b 100644 --- a/README.md +++ b/README.md @@ -148,9 +148,10 @@ import time # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def my_actions(ctx): +def my_actions(ctx: JobCtx): print("Waiting 2 seconds") time.sleep(2) print("Done") @@ -174,16 +175,27 @@ omniq.consume( ## Handler Context -Inside `handler(ctx)`: +Inside `handler(ctx: JobCtx)`: - `queue` - `job_id` - `payload_raw` - `payload` - `attempt` +- `max_attempts` - `lock_until_ms` - `lease_token` - `gid` -- `exec` - execution layer (`ctx.exex`) +- `exec` - execution layer (`ctx.exec`) + +Example: + +```python +from omniq.types import JobCtx + +def my_actions(ctx: JobCtx): + is_last_attempt = ctx.attempt >= ctx.max_attempts + print("Last attempt?", is_last_attempt) +``` ------------------------------------------------------------------------ @@ -358,9 +370,10 @@ import time # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def page_worker(ctx): +def page_worker(ctx: JobCtx): page = ctx.payload["page"] # getting the unique key to track the childs @@ -449,9 +462,10 @@ import time # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def pause_unpause_example(ctx): +def pause_unpause_example(ctx: JobCtx): print("Waiting 2 seconds") # checking if this queue it is paused (spoiler: it's not) @@ -518,4 +532,4 @@ in the `/examples` folder. ## License -See the repository license. \ No newline at end of file +See the repository license. diff --git a/examples/childs/consumer1.py b/examples/childs/consumer1.py index 7069c90..99524de 100644 --- a/examples/childs/consumer1.py +++ b/examples/childs/consumer1.py @@ -1,8 +1,9 @@ # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def document_worker(ctx): +def document_worker(ctx: JobCtx): # getting the data from payload document_id = ctx.payload["document_id"] diff --git a/examples/childs/consumer2.py b/examples/childs/consumer2.py index bb54aca..1db02b5 100644 --- a/examples/childs/consumer2.py +++ b/examples/childs/consumer2.py @@ -2,9 +2,10 @@ # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def page_worker(ctx): +def page_worker(ctx: JobCtx): page = ctx.payload["page"] # getting the unique key to track the childs @@ -16,8 +17,7 @@ def page_worker(ctx): # acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed remaining = ctx.exec.child_ack(completion_key) - print(f"[page_worker] Page {page} done. Remaining={remaining}") - + print(f"[page_worker] Page {page} done. Remaining={remaining}") # remaining will be 0 ONLY when this is the last job # will return > 0 when are still jobs to process diff --git a/examples/max_attempts/consumer.py b/examples/max_attempts/consumer.py new file mode 100644 index 0000000..25d77d8 --- /dev/null +++ b/examples/max_attempts/consumer.py @@ -0,0 +1,38 @@ +import time + +# importing the lib +from omniq.client import OmniqClient +from omniq.types import JobCtx + + +# creating your handler (ctx will have all the job information and actions) +def fail_until_last_attempt(ctx: JobCtx): + is_last_attempt = ctx.attempt >= ctx.max_attempts + + print( + f"[max_attempts] job_id={ctx.job_id} " + f"attempt={ctx.attempt}/{ctx.max_attempts} " + f"last_attempt={is_last_attempt}" + ) + + if not is_last_attempt: + print("[max_attempts] Failing on purpose to force a retry.") + raise RuntimeError("Intentional failure before the last attempt") + + print("[max_attempts] Last attempt reached. Finishing successfully.") + time.sleep(1) + + +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) + +# creating the consumer that will listen and execute the actions in your handler +omniq.consume( + queue="max-attempts", + handler=fail_until_last_attempt, + verbose=True, + drain=False, +) diff --git a/examples/max_attempts/publish.py b/examples/max_attempts/publish.py new file mode 100644 index 0000000..3bda7dd --- /dev/null +++ b/examples/max_attempts/publish.py @@ -0,0 +1,19 @@ +# importing the lib +from omniq.client import OmniqClient + +# creating OmniQ passing redis information +omniq = OmniqClient( + host="omniq-redis", + port=6379, +) + +# publishing the job +job_id = omniq.publish( + queue="max-attempts", + payload={"hello": "world"}, + max_attempts=3, + backoff_ms=1_000, + timeout_ms=30_000, +) + +print("OK", job_id) diff --git a/examples/pause_resume/consumer-pause.py b/examples/pause_resume/consumer-pause.py index 72040a3..fe989e1 100644 --- a/examples/pause_resume/consumer-pause.py +++ b/examples/pause_resume/consumer-pause.py @@ -2,9 +2,10 @@ # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def pause_unpause_example(ctx): +def pause_unpause_example(ctx: JobCtx): print("Waiting 2 seconds") # checking if this queue it is paused (spoiler: it's not) @@ -14,7 +15,6 @@ def pause_unpause_example(ctx): print("Is paused", is_paused) time.sleep(2) - print("Pausing") # pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed) diff --git a/examples/simple/consumer.py b/examples/simple/consumer.py index b37650f..a84e1d4 100644 --- a/examples/simple/consumer.py +++ b/examples/simple/consumer.py @@ -2,9 +2,12 @@ # importing the lib from omniq.client import OmniqClient +from omniq.types import JobCtx # creating your handler (ctx will have all the job information and actions) -def my_actions(ctx): +def my_actions(ctx: JobCtx): + is_last_attempt = ctx.attempt >= ctx.max_attempts + print("Last attempt?", is_last_attempt) print("Waiting 2 seconds") time.sleep(2) print("Done") diff --git a/pyproject.toml b/pyproject.toml index 48907f1..23eaa52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "omniq" -version = "3.0.0" +version = "3.1.0" authors = [ { name="Not Empty Foundation", email="dev@not-empty.org" }, ] diff --git a/src/omniq/_ops.py b/src/omniq/_ops.py index 1afa8dd..346296f 100644 --- a/src/omniq/_ops.py +++ b/src/omniq/_ops.py @@ -144,7 +144,7 @@ def reserve(self, *, queue: str, now_ms_override: int = 0) -> ReserveResult: if res[0] == "PAUSED": return ReservePaused() - if res[0] != "JOB" or len(res) < 7: + if res[0] != "JOB" or len(res) < 8: raise RuntimeError(f"Unexpected RESERVE response: {res}") return ReserveJob( @@ -153,8 +153,9 @@ def reserve(self, *, queue: str, now_ms_override: int = 0) -> ReserveResult: payload=str(res[2]), lock_until_ms=int(res[3]), attempt=int(res[4]), - gid=str(res[5] or ""), - lease_token=str(res[6] or ""), + max_attempts=int(res[5]), + gid=str(res[6] or ""), + lease_token=str(res[7] or ""), ) def heartbeat(self, *, queue: str, job_id: str, lease_token: str, now_ms_override: int = 0) -> int: diff --git a/src/omniq/client.py b/src/omniq/client.py index 3305fe4..f0a6dcf 100644 --- a/src/omniq/client.py +++ b/src/omniq/client.py @@ -4,7 +4,7 @@ from ._ops import OmniqOps from .scripts import load_scripts, default_scripts_dir from .transport import RedisConnOpts, build_redis_client, RedisLike -from .types import ReserveResult, AckFailResult +from .types import ReserveResult, AckFailResult, JobCtx from .helper import queue_base def _safe_close_redis(r: Any) -> None: @@ -209,7 +209,7 @@ def consume( self, *, queue: str, - handler: Callable[[Any], None], + handler: Callable[[JobCtx], None], poll_interval_s: float = 0.05, promote_interval_s: float = 1.0, promote_batch: int = 1000, diff --git a/src/omniq/consumer.py b/src/omniq/consumer.py index c946e24..f49f222 100644 --- a/src/omniq/consumer.py +++ b/src/omniq/consumer.py @@ -196,6 +196,7 @@ def on_sigint(signum, frame): payload_raw=res.payload, payload=payload_obj, attempt=res.attempt, + max_attempts=res.max_attempts, lock_until_ms=res.lock_until_ms, lease_token=res.lease_token, gid=res.gid, diff --git a/src/omniq/core/scripts/reserve.lua b/src/omniq/core/scripts/reserve.lua index 3e17f22..9b8268e 100644 --- a/src/omniq/core/scripts/reserve.lua +++ b/src/omniq/core/scripts/reserve.lua @@ -57,6 +57,7 @@ local function lease_job(job_id) if timeout_ms <= 0 then timeout_ms = 60000 end local attempt = to_i(redis.call("HGET", k_job, "attempt")) + 1 + local max_attempts = to_i(redis.call("HGET", k_job, "max_attempts")) local lock_until = now_ms + timeout_ms local payload = redis.call("HGET", k_job, "payload") or "" @@ -91,7 +92,7 @@ local function lease_job(job_id) "last_reserve_ms", tostring(now_ms) ) - return {"JOB", job_id, payload, tostring(lock_until), tostring(attempt), gid, lease_token} + return {"JOB", job_id, payload, tostring(lock_until), tostring(attempt), tostring(max_attempts), gid, lease_token} end local function try_ungrouped() @@ -184,4 +185,4 @@ else redis.call("SET", k_rr, "0") end -return res \ No newline at end of file +return res diff --git a/src/omniq/types.py b/src/omniq/types.py index 0a8ba9b..5d0a0ec 100644 --- a/src/omniq/types.py +++ b/src/omniq/types.py @@ -1,5 +1,8 @@ from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple, Union, Literal, List +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union, Literal, List + +if TYPE_CHECKING: + from .exec import Exec PayloadT = Union[Dict[str, Any], list, str] @@ -10,10 +13,11 @@ class JobCtx: payload_raw: str payload: PayloadT attempt: int + max_attempts: int lock_until_ms: int lease_token: str + exec: "Exec" gid: str = "" - exec: Any = None @dataclass(frozen=True) class ReservePaused: @@ -26,6 +30,7 @@ class ReserveJob: payload: str lock_until_ms: int attempt: int + max_attempts: int gid: str lease_token: str