-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
56 lines (46 loc) · 1.55 KB
/
Copy pathworker.py
File metadata and controls
56 lines (46 loc) · 1.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from celery import Celery
from pathlib import Path
from typing import Optional
from config import settings
from services.ocr_service import process_id_document
from utils.file_utils import save_json_to_s3
from db.database import SessionLocal
from db.models import KYCTask
# Initialize Celery
# Broker: Redis is used to pass messages between FastAPI and Celery
# Backend: Redis is used to store the results of the completed tasks
REDIS_URL = settings.redis_url
celery_app = Celery(
"ocr_worker",
broker=REDIS_URL,
backend=REDIS_URL
)
@celery_app.task(name="process_id_task", bind=True)
def process_id_task(self, file_path_str: str):
"""Background task to run KYC extraction on an ID document."""
task_id = self.request.id
try:
# Run the Textract AI extraction
extracted_data = process_id_document(file_path_str)
# Save the extracted data to S3 as a JSON file
json_s3_uri = save_json_to_s3(extracted_data, file_path_str)
status = "SUCCESS"
except Exception as e:
extracted_data = {"error": str(e)}
json_s3_uri = None
status = "FAILURE"
# Update PostgreSQL Database
db = SessionLocal()
try:
task_record = db.query(KYCTask).filter(KYCTask.task_id == task_id).first()
if task_record:
task_record.status = status
task_record.extracted_fields = extracted_data
db.commit()
finally:
db.close()
return {
"kyc_data": extracted_data,
"saved_json_uri": json_s3_uri
}