-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
66 lines (50 loc) · 1.71 KB
/
worker.py
File metadata and controls
66 lines (50 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Temporal worker application for background workflows and activities."""
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from src.app.tasks.user_tasks import SendWelcomeEmailWorkflow, send_welcome_email_activity
from src.infrastructure.config import get_settings
from src.infrastructure.logging.config import configure_logging, get_logger
# Get settings
settings = get_settings()
# Configure logging
configure_logging(settings)
logger = get_logger(__name__)
async def main() -> None:
"""Start Temporal worker."""
try:
# Connect to Temporal server
logger.info(
"connecting_to_temporal",
host=settings.temporal_host,
namespace=settings.temporal_namespace,
)
client = await Client.connect(
settings.temporal_host,
namespace=settings.temporal_namespace,
)
logger.info(
"temporal_connected",
host=settings.temporal_host,
namespace=settings.temporal_namespace,
)
# Create worker
worker = Worker(
client,
task_queue=settings.temporal_task_queue,
workflows=[SendWelcomeEmailWorkflow],
activities=[send_welcome_email_activity],
)
logger.info(
"temporal_worker_starting",
task_queue=settings.temporal_task_queue,
workflows=["SendWelcomeEmailWorkflow"],
activities=["send_welcome_email_activity"],
)
# Run worker
await worker.run()
except Exception as exc:
logger.exception("temporal_worker_error", error=str(exc))
raise
if __name__ == "__main__":
asyncio.run(main())