Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cbc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
author: Anna Beckers
author: Anna Beckers
description: Query DB to a CSV
docker_image: ghcr.io/rwth-time/database-interactions/database-interactions
name: Database Interactions
Expand All @@ -7,6 +7,7 @@ entrypoints:
description: Run the query using a file containing the query
envs:
DB_DSN: null
DB_SCHEMA: null
inputs:
query_file:
config:
Expand All @@ -31,12 +32,13 @@ entrypoints:
csv_output_S3_HOST: null
csv_output_S3_PORT: null
csv_output_S3_SECRET_KEY: null
description: CSV of the queries result
description: CSV of the queries result
type: file
run_query_from_string:
description: Run the query directly by passing an SQL Query
description: Run the query directly by passing an SQL Query
envs:
DB_DSN: null
DB_SCHEMA: null
inputs:
query_str:
config:
Expand All @@ -54,5 +56,5 @@ entrypoints:
csv_output_S3_HOST: null
csv_output_S3_PORT: null
csv_output_S3_SECRET_KEY: null
description: CSV of the queries result
description: CSV of the queries result
type: file
29 changes: 8 additions & 21 deletions interactions/query.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
import logging
import csv
import sys
from sqlalchemy import create_engine, text

from scystream.sdk.env.settings import (
EnvSettings,
from scystream.sdk.database_handling.database_manager import (
PandasDatabaseOperations,
)


def query_db(
query: str,
db_settings: EnvSettings,
output_file_name: str
def execute_query_to_csv(
query: str, dsn: str, output_file: str, schema: str | None
) -> None:
try:
engine = create_engine(db_settings.DB_DSN)
with engine.connect() as conn:
result = conn.execute(text(query))
col_names = result.keys()
rows = result.fetchall()

with open(output_file_name, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(col_names)
writer.writerows(rows)
db = PandasDatabaseOperations(dsn, schema)
df = db.read(query=query)
df.to_csv(output_file, index=False)
except Exception as e:
logging.error(f"Failed to execute query or write CSV: {e}")
logging.error(f"Database query failed: {e}")
sys.exit(1)
finally:
conn.close()
61 changes: 31 additions & 30 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
FileSettings,
)
from scystream.sdk.file_handling.s3_manager import S3Operations
from interactions.query import query_db
from interactions.query import execute_query_to_csv


def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:
Expand All @@ -22,7 +22,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:
f"{output_settings.FILE_PATH}/"
f"{output_settings.FILE_NAME}."
f"{output_settings.FILE_EXT}"
)
),
)
except Exception as e:
logging.error(f"Failed to upload CSV to S3: {e}")
Expand All @@ -31,7 +31,7 @@ def upload_to_s3(local_file_path: str, output_settings: FileSettings) -> None:

def read_query_file(file_path: str) -> str:
try:
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
return f.read().strip()
except Exception as e:
logging.error(f"Failed to read query file: {e}")
Expand All @@ -54,68 +54,69 @@ class CSVOutput(FileSettings, OutputSettings):

class QueryDatabaseFromFileEntrypointSettings(EnvSettings):
DB_DSN: str
DB_SCHEMA: str | None = None

query_file: QueryFileInput
csv_output: CSVOutput


class QueryDatabaseEntrypointSettings(EnvSettings):
DB_DSN: str
DB_SCHEMA: str | None = None

query_str: QueryStrInput
csv_output: CSVOutput


@entrypoint(QueryDatabaseEntrypointSettings)
def run_query_from_string(settings):
query = settings.query_str.QUERY
target_csv = "output.csv"
query_db(query, settings, target_csv)
execute_query_to_csv(
query=settings.query_str.QUERY,
dsn=settings.DB_DSN,
output_file=target_csv,
schema=settings.DB_SCHEMA,
)
upload_to_s3(target_csv, settings.csv_output)


@entrypoint(QueryDatabaseFromFileEntrypointSettings)
def run_query_from_file(settings):
local_file = "query_file.txt"

s3_conn = S3Operations(settings.query_file)
try:
s3_conn.download_file(
bucket_name=settings.query_file.BUCKET_NAME,
s3_object_name=(
f"{settings.query_file.FILE_PATH}/"
f"{settings.query_file.FILE_NAME}."
f"{settings.query_file.FILE_EXT}"
),
local_file_path=local_file
)
S3Operations.download(settings.query_file, local_file)
except Exception as e:
logging.error(f"Failed to download query file: {e}")
sys.exit(1)

query = read_query_file(local_file)
target_csv = "output.csv"
query_db(query, settings, target_csv)

execute_query_to_csv(
query=query,
dsn=settings.DB_DSN,
output_file=target_csv,
schema=settings.DB_SCHEMA,
)
upload_to_s3(target_csv, settings.csv_output)


"""
if __name__ == "__main__":
test = QueryDatabaseEntrypointSettings(
DB_DSN="postgresql://guest:guest@localhost:5432/patstat",
query_str=QueryStrInput(
QUERY="SELECT name FROM employees;"
),
csv_output=CSVOutput(
S3_HOST="http://localhost",
S3_PORT="9000",
S3_ACCESS_KEY="minioadmin",
S3_SECRET_KEY="minioadmin",
BUCKET_NAME="output-bucket",
FILE_PATH="output_file_path",
FILE_NAME="csv_file",
FILE_EXT="csv"
)
DB_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/postgres",
query_str=QueryStrInput(QUERY="SELECT * FROM test_table;"),
csv_output=CSVOutput(
S3_HOST="http://localhost",
S3_PORT="9000",
S3_ACCESS_KEY="minioadmin",
S3_SECRET_KEY="minioadmin",
BUCKET_NAME="output-bucket",
FILE_PATH="output_file_path",
FILE_NAME="csv_file",
FILE_EXT="csv",
),
)

run_query_from_string(test)
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
scystream-sdk==1.2.1
SQLAlchemy==2.0.43
scystream-sdk[all]==1.5.0
psycopg2-binary==2.9.10
PyMySQL==1.1.2
duckdb==1.4.1
Expand Down
Loading