Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 81 additions & 37 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1738,10 +1738,11 @@ async def get_metadata(self) -> GetMetadataResponse:
mcp_servers=mcp_servers,
)

async def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
async def schedule_job(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Schedules a job to be triggered at a specified time or interval.

This is an Alpha API and is subject to change.
Calls the stable ScheduleJob RPC and falls back to ScheduleJobAlpha1 when
the sidecar predates the stable Jobs API.

Args:
job (Job): The job to schedule. Must have a name and either schedule or due_time.
Expand All @@ -1754,32 +1755,46 @@ async def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprRe
ValueError: If job name is empty or both schedule and due_time are missing.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=job.name)

if not job.schedule and not job.due_time:
raise ValueError('Job must have either schedule or due_time specified')

# Convert job to proto using the Job class private method
job_proto = job._get_proto()
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)

try:
call = self._stub.ScheduleJobAlpha1(request)
call = self._stub.ScheduleJob(request)
await call
return DaprResponse(headers=await call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprGrpcError(err) from err
except AioRpcError as err:
if err.code() == StatusCode.UNIMPLEMENTED:
try:
call = self._stub.ScheduleJobAlpha1(request)
await call
except AioRpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return DaprResponse(headers=await call.initial_metadata())

async def get_job_alpha1(self, name: str) -> Job:
async def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Deprecated: use :meth:`schedule_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`schedule_job`.
"""
warn(
'schedule_job_alpha1 is deprecated; use schedule_job instead.',
DeprecationWarning,
stacklevel=2,
)
return await self.schedule_job(job, overwrite)

async def get_job(self, name: str) -> Job:
"""Gets a scheduled job by name.

This is an Alpha API and is subject to change.
Calls the stable GetJob RPC and falls back to GetJobAlpha1 when the
sidecar predates the stable Jobs API.

Args:
name (str): The name of the job to retrieve.
Expand All @@ -1791,27 +1806,42 @@ async def get_job_alpha1(self, name: str) -> Job:
ValueError: If job name is empty.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=name)

request = api_v1.GetJobRequest(name=name)

try:
call = self._stub.GetJobAlpha1(request)
call = self._stub.GetJob(request)
response = await call
return Job._from_proto(response.job)
except grpc.aio.AioRpcError as err:
raise DaprGrpcError(err) from err
except AioRpcError as err:
if err.code() == StatusCode.UNIMPLEMENTED:
try:
call = self._stub.GetJobAlpha1(request)
response = await call
except AioRpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return Job._from_proto(response.job)

async def delete_job_alpha1(self, name: str) -> DaprResponse:
async def get_job_alpha1(self, name: str) -> Job:
"""Deprecated: use :meth:`get_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`get_job`.
"""
warn(
'get_job_alpha1 is deprecated; use get_job instead.',
DeprecationWarning,
stacklevel=2,
)
return await self.get_job(name)

async def delete_job(self, name: str) -> DaprResponse:
"""Deletes a scheduled job by name.

This is an Alpha API and is subject to change.
Calls the stable DeleteJob RPC and falls back to DeleteJobAlpha1 when the
sidecar predates the stable Jobs API.

Args:
name (str): The name of the job to delete.
Expand All @@ -1823,22 +1853,36 @@ async def delete_job_alpha1(self, name: str) -> DaprResponse:
ValueError: If job name is empty.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=name)

request = api_v1.DeleteJobRequest(name=name)

try:
call = self._stub.DeleteJobAlpha1(request)
call = self._stub.DeleteJob(request)
await call
return DaprResponse(headers=await call.initial_metadata())
except grpc.aio.AioRpcError as err:
raise DaprGrpcError(err) from err
except AioRpcError as err:
if err.code() == StatusCode.UNIMPLEMENTED:
try:
call = self._stub.DeleteJobAlpha1(request)
await call
except AioRpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return DaprResponse(headers=await call.initial_metadata())

async def delete_job_alpha1(self, name: str) -> DaprResponse:
"""Deprecated: use :meth:`delete_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`delete_job`.
"""
warn(
'delete_job_alpha1 is deprecated; use delete_job instead.',
DeprecationWarning,
stacklevel=2,
)
return await self.delete_job(name)

async def set_metadata(self, attributeName: str, attributeValue: str) -> DaprResponse:
"""Adds a custom (extended) metadata attribute to the Dapr sidecar
Expand Down
115 changes: 81 additions & 34 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1668,10 +1668,11 @@ def converse_alpha2(
except RpcError as err:
raise DaprGrpcError(err) from err

def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
def schedule_job(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Schedules a job to be triggered at a specified time or interval.

This is an Alpha API and is subject to change.
Calls the stable ScheduleJob RPC and falls back to ScheduleJobAlpha1 when
the sidecar predates the stable Jobs API.

Args:
job (Job): The job to schedule. Must have a name and either schedule or due_time.
Expand All @@ -1684,31 +1685,46 @@ def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse
ValueError: If job name is empty or both schedule and due_time are missing.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=job.name)

if not job.schedule and not job.due_time:
raise ValueError('Job must have either schedule or due_time specified')

# Convert job to proto using the Job class private method
job_proto = job._get_proto()
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)

try:
_, call = self.retry_policy.run_rpc(self._stub.ScheduleJobAlpha1.with_call, request)
return DaprResponse(headers=call.initial_metadata())
_, call = self.retry_policy.run_rpc(self._stub.ScheduleJob.with_call, request)
except RpcError as err:
raise DaprGrpcError(err) from err
if err.code() == StatusCode.UNIMPLEMENTED:
try:
_, call = self.retry_policy.run_rpc(
self._stub.ScheduleJobAlpha1.with_call, request
)
except RpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return DaprResponse(headers=call.initial_metadata())

def get_job_alpha1(self, name: str) -> Job:
def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Deprecated: use :meth:`schedule_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`schedule_job`.
"""
warn(
'schedule_job_alpha1 is deprecated; use schedule_job instead.',
DeprecationWarning,
stacklevel=2,
)
return self.schedule_job(job, overwrite)

def get_job(self, name: str) -> Job:
"""Gets a scheduled job by name.

This is an Alpha API and is subject to change.
Calls the stable GetJob RPC and falls back to GetJobAlpha1 when the
sidecar predates the stable Jobs API.

Args:
name (str): The name of the job to retrieve.
Expand All @@ -1720,26 +1736,42 @@ def get_job_alpha1(self, name: str) -> Job:
ValueError: If job name is empty.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=name)

request = api_v1.GetJobRequest(name=name)

try:
response, call = self.retry_policy.run_rpc(self._stub.GetJobAlpha1.with_call, request)
return Job._from_proto(response.job)
response, _ = self.retry_policy.run_rpc(self._stub.GetJob.with_call, request)
except RpcError as err:
raise DaprGrpcError(err) from err
if err.code() == StatusCode.UNIMPLEMENTED:
try:
response, _ = self.retry_policy.run_rpc(
self._stub.GetJobAlpha1.with_call, request
)
except RpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return Job._from_proto(response.job)

def delete_job_alpha1(self, name: str) -> DaprResponse:
def get_job_alpha1(self, name: str) -> Job:
"""Deprecated: use :meth:`get_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`get_job`.
"""
warn(
'get_job_alpha1 is deprecated; use get_job instead.',
DeprecationWarning,
stacklevel=2,
)
return self.get_job(name)

def delete_job(self, name: str) -> DaprResponse:
"""Deletes a scheduled job by name.

This is an Alpha API and is subject to change.
Calls the stable DeleteJob RPC and falls back to DeleteJobAlpha1 when the
sidecar predates the stable Jobs API.

Args:
name (str): The name of the job to delete.
Expand All @@ -1751,21 +1783,36 @@ def delete_job_alpha1(self, name: str) -> DaprResponse:
ValueError: If job name is empty.
DaprGrpcError: If the Dapr runtime returns an error.
"""
# Warnings and input validation
warn(
'The Jobs API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(job_name=name)

request = api_v1.DeleteJobRequest(name=name)

try:
_, call = self.retry_policy.run_rpc(self._stub.DeleteJobAlpha1.with_call, request)
return DaprResponse(headers=call.initial_metadata())
_, call = self.retry_policy.run_rpc(self._stub.DeleteJob.with_call, request)
except RpcError as err:
raise DaprGrpcError(err) from err
if err.code() == StatusCode.UNIMPLEMENTED:
try:
_, call = self.retry_policy.run_rpc(
self._stub.DeleteJobAlpha1.with_call, request
)
except RpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err
return DaprResponse(headers=call.initial_metadata())

def delete_job_alpha1(self, name: str) -> DaprResponse:
"""Deprecated: use :meth:`delete_job`.

The Jobs API graduated to stable in Dapr 1.18; this Alpha1 alias remains
for backwards compatibility and forwards to :meth:`delete_job`.
"""
warn(
'delete_job_alpha1 is deprecated; use delete_job instead.',
DeprecationWarning,
stacklevel=2,
)
return self.delete_job(name)

def wait(self, timeout_s: float):
"""Waits for sidecar to be available within the timeout.
Expand Down
Loading
Loading