Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Developer.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ print("Published:", job_id)
``` python
import time
from omniq.client import OmniqClient
from omniq.types import JobCtx

def handler(ctx):
def handler(ctx: JobCtx):
print("Processing:", ctx.job_id)
time.sleep(2)
print("Done")
Expand Down Expand Up @@ -283,7 +284,7 @@ The heartbeat call renews the lease of the currently running job.

## Handler Context

### Inside handler(ctx):
### Inside handler(ctx: JobCtx):

| Field | Description |
|----------------|--------------------------------------------|
Expand All @@ -292,6 +293,7 @@ The heartbeat call renews the lease of the currently running job.
| payload | Deserialized payload |
| payload_raw | Raw JSON |
| attempt | Current attempt number |
| max_attempts | Maximum number of attempts for the job |
| lock_until_ms | Lease expiration timestamp |
| lease_token | Required token for ACK/heartbeat |
| gid | Group identifier |
Expand Down Expand Up @@ -398,6 +400,8 @@ Can be used externally or inside a handler via ctx.exec.
## Parent/Child Flow

``` python
from omniq.types import JobCtx

remaining = ctx.exec.child_ack(completion_key)

if remaining == 0:
Expand All @@ -406,7 +410,7 @@ if remaining == 0:
**Returns:**
- Pending children -> `> 0`
- Last Child -> `0`
- Counter error -> `1`
- Counter error -> `-1`

**Properties:**
- Idempotent
Expand Down
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ import time

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def my_actions(ctx):
def my_actions(ctx: JobCtx):
print("Waiting 2 seconds")
time.sleep(2)
print("Done")
Expand All @@ -174,16 +175,27 @@ omniq.consume(

## Handler Context

Inside `handler(ctx)`:
Inside `handler(ctx: JobCtx)`:
- `queue`
- `job_id`
- `payload_raw`
- `payload`
- `attempt`
- `max_attempts`
- `lock_until_ms`
- `lease_token`
- `gid`
- `exec` - execution layer (`ctx.exex`)
- `exec` - execution layer (`ctx.exec`)

Example:

```python
from omniq.types import JobCtx

def my_actions(ctx: JobCtx):
is_last_attempt = ctx.attempt >= ctx.max_attempts
print("Last attempt?", is_last_attempt)
```

------------------------------------------------------------------------

Expand Down Expand Up @@ -358,9 +370,10 @@ import time

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def page_worker(ctx):
def page_worker(ctx: JobCtx):

page = ctx.payload["page"]
# getting the unique key to track the childs
Expand Down Expand Up @@ -449,9 +462,10 @@ import time

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def pause_unpause_example(ctx):
def pause_unpause_example(ctx: JobCtx):
print("Waiting 2 seconds")

# checking if this queue it is paused (spoiler: it's not)
Expand Down Expand Up @@ -518,4 +532,4 @@ in the `/examples` folder.

## License

See the repository license.
See the repository license.
3 changes: 2 additions & 1 deletion examples/childs/consumer1.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def document_worker(ctx):
def document_worker(ctx: JobCtx):

# getting the data from payload
document_id = ctx.payload["document_id"]
Expand Down
6 changes: 3 additions & 3 deletions examples/childs/consumer2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def page_worker(ctx):
def page_worker(ctx: JobCtx):

page = ctx.payload["page"]
# getting the unique key to track the childs
Expand All @@ -16,8 +17,7 @@ def page_worker(ctx):
# acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed
remaining = ctx.exec.child_ack(completion_key)

print(f"[page_worker] Page {page} done. Remaining={remaining}")

print(f"[page_worker] Page {page} done. Remaining={remaining}")

# remaining will be 0 ONLY when this is the last job
# will return > 0 when are still jobs to process
Expand Down
38 changes: 38 additions & 0 deletions examples/max_attempts/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx


# creating your handler (ctx will have all the job information and actions)
def fail_until_last_attempt(ctx: JobCtx):
is_last_attempt = ctx.attempt >= ctx.max_attempts

print(
f"[max_attempts] job_id={ctx.job_id} "
f"attempt={ctx.attempt}/{ctx.max_attempts} "
f"last_attempt={is_last_attempt}"
)

if not is_last_attempt:
print("[max_attempts] Failing on purpose to force a retry.")
raise RuntimeError("Intentional failure before the last attempt")

print("[max_attempts] Last attempt reached. Finishing successfully.")
time.sleep(1)


# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)

# creating the consumer that will listen and execute the actions in your handler
omniq.consume(
queue="max-attempts",
handler=fail_until_last_attempt,
verbose=True,
drain=False,
)
19 changes: 19 additions & 0 deletions examples/max_attempts/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# importing the lib
from omniq.client import OmniqClient

# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)

# publishing the job
job_id = omniq.publish(
queue="max-attempts",
payload={"hello": "world"},
max_attempts=3,
backoff_ms=1_000,
timeout_ms=30_000,
)

print("OK", job_id)
4 changes: 2 additions & 2 deletions examples/pause_resume/consumer-pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def pause_unpause_example(ctx):
def pause_unpause_example(ctx: JobCtx):
print("Waiting 2 seconds")

# checking if this queue it is paused (spoiler: it's not)
Expand All @@ -14,7 +15,6 @@ def pause_unpause_example(ctx):
print("Is paused", is_paused)
time.sleep(2)


print("Pausing")

# pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed)
Expand Down
5 changes: 4 additions & 1 deletion examples/simple/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

# importing the lib
from omniq.client import OmniqClient
from omniq.types import JobCtx

# creating your handler (ctx will have all the job information and actions)
def my_actions(ctx):
def my_actions(ctx: JobCtx):
is_last_attempt = ctx.attempt >= ctx.max_attempts
print("Last attempt?", is_last_attempt)
print("Waiting 2 seconds")
time.sleep(2)
print("Done")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "omniq"
version = "3.0.0"
version = "3.1.0"
authors = [
{ name="Not Empty Foundation", email="dev@not-empty.org" },
]
Expand Down
7 changes: 4 additions & 3 deletions src/omniq/_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def reserve(self, *, queue: str, now_ms_override: int = 0) -> ReserveResult:
if res[0] == "PAUSED":
return ReservePaused()

if res[0] != "JOB" or len(res) < 7:
if res[0] != "JOB" or len(res) < 8:
raise RuntimeError(f"Unexpected RESERVE response: {res}")

return ReserveJob(
Expand All @@ -153,8 +153,9 @@ def reserve(self, *, queue: str, now_ms_override: int = 0) -> ReserveResult:
payload=str(res[2]),
lock_until_ms=int(res[3]),
attempt=int(res[4]),
gid=str(res[5] or ""),
lease_token=str(res[6] or ""),
max_attempts=int(res[5]),
gid=str(res[6] or ""),
lease_token=str(res[7] or ""),
)

def heartbeat(self, *, queue: str, job_id: str, lease_token: str, now_ms_override: int = 0) -> int:
Expand Down
4 changes: 2 additions & 2 deletions src/omniq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ._ops import OmniqOps
from .scripts import load_scripts, default_scripts_dir
from .transport import RedisConnOpts, build_redis_client, RedisLike
from .types import ReserveResult, AckFailResult
from .types import ReserveResult, AckFailResult, JobCtx
from .helper import queue_base

def _safe_close_redis(r: Any) -> None:
Expand Down Expand Up @@ -209,7 +209,7 @@ def consume(
self,
*,
queue: str,
handler: Callable[[Any], None],
handler: Callable[[JobCtx], None],
poll_interval_s: float = 0.05,
promote_interval_s: float = 1.0,
promote_batch: int = 1000,
Expand Down
1 change: 1 addition & 0 deletions src/omniq/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def on_sigint(signum, frame):
payload_raw=res.payload,
payload=payload_obj,
attempt=res.attempt,
max_attempts=res.max_attempts,
lock_until_ms=res.lock_until_ms,
lease_token=res.lease_token,
gid=res.gid,
Expand Down
5 changes: 3 additions & 2 deletions src/omniq/core/scripts/reserve.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ local function lease_job(job_id)
if timeout_ms <= 0 then timeout_ms = 60000 end

local attempt = to_i(redis.call("HGET", k_job, "attempt")) + 1
local max_attempts = to_i(redis.call("HGET", k_job, "max_attempts"))
local lock_until = now_ms + timeout_ms

local payload = redis.call("HGET", k_job, "payload") or ""
Expand Down Expand Up @@ -91,7 +92,7 @@ local function lease_job(job_id)
"last_reserve_ms", tostring(now_ms)
)

return {"JOB", job_id, payload, tostring(lock_until), tostring(attempt), gid, lease_token}
return {"JOB", job_id, payload, tostring(lock_until), tostring(attempt), tostring(max_attempts), gid, lease_token}
end

local function try_ungrouped()
Expand Down Expand Up @@ -184,4 +185,4 @@ else
redis.call("SET", k_rr, "0")
end

return res
return res
9 changes: 7 additions & 2 deletions src/omniq/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple, Union, Literal, List
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union, Literal, List

if TYPE_CHECKING:
from .exec import Exec

PayloadT = Union[Dict[str, Any], list, str]

Expand All @@ -10,10 +13,11 @@ class JobCtx:
payload_raw: str
payload: PayloadT
attempt: int
max_attempts: int
lock_until_ms: int
lease_token: str
exec: "Exec"
gid: str = ""
exec: Any = None

@dataclass(frozen=True)
class ReservePaused:
Expand All @@ -26,6 +30,7 @@ class ReserveJob:
payload: str
lock_until_ms: int
attempt: int
max_attempts: int
gid: str
lease_token: str

Expand Down
Loading