diff --git a/.github/workflows/build-libra.yml b/.github/workflows/build-libra.yml index 0232776..ecff190 100644 --- a/.github/workflows/build-libra.yml +++ b/.github/workflows/build-libra.yml @@ -6,7 +6,7 @@ on: jobs: docker: env: - IMAGE_TAG: "1.8.1" + IMAGE_TAG: "1.10" runs-on: ubuntu-latest steps: - name: Get branch names diff --git a/.java-version b/.java-version new file mode 100644 index 0000000..b4de394 --- /dev/null +++ b/.java-version @@ -0,0 +1 @@ +11 diff --git a/changelog.md b/changelog.md index 5b2eaad..0d5b047 100644 --- a/changelog.md +++ b/changelog.md @@ -1,14 +1,27 @@ # Changelog +## Release 1.10 [Unreleased] +Breif summary: -## Release 1.9 [unreleased] +### Breaking changes + +### Other changes + +--- +## Release 1.9 [Released 9/2/2025] +Brief summary - Load Tableau database with biopsy_tracker data - Insert dlu_upload_type to dlu_package_inventory table - Create recalled packages endpoint - Update Multi Modal package name - Tweak bulk upload fields +### Breaking changes +- changed column names in tables +- added new columns to dlu_package_inventory table + +--- ## Release 1.8.1 [Released 11/8/2024] Brief summary of what's in this release: diff --git a/data_management/.BulkUploader.swp b/data_management/.BulkUploader.swp new file mode 100644 index 0000000..05aea8a Binary files /dev/null and b/data_management/.BulkUploader.swp differ diff --git a/data_management/BulkUploader b/data_management/BulkUploader new file mode 100644 index 0000000..355cfb5 --- /dev/null +++ b/data_management/BulkUploader @@ -0,0 +1,25 @@ +FROM python:3.10-slim-bullseye + +WORKDIR /usr/src/app + +ENV FLASK_APP=app.py +ENV FLASK_RUN_HOST=0.0.0.0 + +RUN apt-get update \ + && apt-get install -y curl + +COPY requirements.txt ./ + +RUN pip3 config --user set global.progress_bar off +RUN pip3 install --no-cache-dir -r requirements.txt +RUN pip3 install -U flask-cors + +COPY lib/ ./lib +COPY main.py ./ +COPY app.py ./ +COPY process_bulk_uploads.py ./ +COPY services/ ./services +COPY model/ ./model +COPY .env ./.env + +ENTRYPOINT [] \ No newline at end of file diff --git a/data_management/DluWatcher b/data_management/DluWatcher index fa8cada..c44ab9e 100644 --- a/data_management/DluWatcher +++ b/data_management/DluWatcher @@ -1,5 +1,7 @@ FROM python:3.10-slim-bullseye +USER root + COPY requirements.txt ./ RUN pip3 install --progress-bar off --no-cache-dir -r requirements.txt @@ -10,6 +12,7 @@ COPY ./services/dlu_filesystem.py ./services/dlu_filesystem.py COPY ./services/dlu_package_inventory.py ./services/dlu_package_inventory.py COPY ./services/dlu_state.py ./services/dlu_state.py COPY ./services/dlu_management.py ./services/dlu_management.py +COPY ./services/slide_management.py ./services/slide_management.py COPY ./services/dlu_mongo.py ./services/dlu_mongo.py COPY ./model ./model COPY ./watch_files.py ./ diff --git a/data_management/Dockerfile b/data_management/Dockerfile index ba017e7..cb6adef 100644 --- a/data_management/Dockerfile +++ b/data_management/Dockerfile @@ -31,5 +31,5 @@ COPY app.py ./ COPY process_bulk_uploads.py ./ COPY services/ ./services -ENTRYPOINT ["gunicorn", "-b", ":5000", "app:app"] +ENTRYPOINT ["gunicorn", "-b", ":5000", "app:app", "-t", "1200"] diff --git a/data_management/app.py b/data_management/app.py index e0507a6..c8a649e 100644 --- a/data_management/app.py +++ b/data_management/app.py @@ -82,7 +82,7 @@ def recall_dlu_package(package_id): return error_msg dlu_data_directory = '/data/package_' + package_id - directory_info = DirectoryInfo(dlu_data_directory) + directory_info = DirectoryInfo(dlu_data_directory, calculate_checksums = False) file_list = None if directory_info.file_count == 0 and directory_info.subdir_count == 0: error_msg = "Error: package " + package_id + " has no files or top level subdirectory" @@ -92,9 +92,9 @@ def recall_dlu_package(package_id): if directory_info.file_count == 0 and directory_info.subdir_count == 1: contents = "".join(directory_info.dir_contents) top_level_subdir = package_id + "/" + contents - file_list = dlu_file_handler.match_files(top_level_subdir) + file_list = dlu_file_handler.match_files(top_level_subdir,False) else: - file_list = dlu_file_handler.match_files(package_id) + file_list = dlu_file_handler.match_files(package_id,False) dlu_files = [] for file in directory_info.file_details: @@ -117,4 +117,4 @@ def get_package_status(package_id): dlu_package_inventory = DLUPackageInventory() dlu_package_inventory.reconnect() status = dlu_package_inventory.get_package_status(package_id) - return status[0]["globus_dlu_status"] if len(status) > 0 and status[0]["globus_dlu_status"] is not None else "" \ No newline at end of file + return status[0]["globus_dlu_status"] if len(status) > 0 and status[0]["globus_dlu_status"] is not None else "" diff --git a/data_management/generate_sc_rnaseq_yaml.py b/data_management/generate_sc_rnaseq_yaml.py new file mode 100644 index 0000000..ecbcf7e --- /dev/null +++ b/data_management/generate_sc_rnaseq_yaml.py @@ -0,0 +1,38 @@ +import os +import yaml +import sys + +yamlData = { + "package_type": "Single-cell RNA-Seq", + "tis": "Michigan/Broad/Princeton", + "data_generators": "Rajasree Menon", + "dataset_description": "" +} +experiments = [] + +if len(sys.argv) == 1: + print("Error. Please specify directory: python3 generate_sc_rnaseq_yaml.py /path/to/bulk/upload") + exit(1) + +dir = sys.argv[1] +for root, dirs, files in os.walk(dir): + if root == dir: + continue + sample_id = os.path.split(root)[1] + experiment = { + "internal_experiment_id": sample_id, + "files": [] + } + for file in files: + experiment['files'].append({ + 'redcap_id': sample_id, + 'spectrack_sample_id': sample_id, + 'relative_file_path_and_name': sample_id + '/' + file, + 'file_metadata': "" + }) + experiments.append({ + "experiment": experiment + }) +yamlData["experiments"] = experiments +with open(os.path.join(dir, 'bulk-manifest.yaml'), 'w') as file: + yaml.dump(yamlData, file) diff --git a/data_management/lib/mysql_connection.py b/data_management/lib/mysql_connection.py index 302ec9f..cd20f44 100644 --- a/data_management/lib/mysql_connection.py +++ b/data_management/lib/mysql_connection.py @@ -86,7 +86,7 @@ def get_db_connection(self): self.database.get_warnings = True return self.database except Exception as error: - logger.error("Can't connect to MySQL: ", exec_info=error) + logger.exception("Can't connect to MySQL: ", error) os.sys.exit() def get_tableau_db_connection(self): @@ -102,7 +102,7 @@ def get_tableau_db_connection(self): self.database.get_warnings = True return self.database except Exception as error: - logger.error("Can't connect to MySQL: ", exc_info=error) + logger.exception("Can't connect to MySQL: ", error) os.sys.exit() def insert_data(self, sql, data): @@ -123,6 +123,20 @@ def insert_data(self, sql, data): finally: self.database.commit() self.cursor.close() + + def insert_data_no_alert(self, sql, data): + try: + self.get_db_cursor() + self.cursor.execute(sql, data) + warning = self.cursor.fetchwarnings() + if warning is not None: + print(warning) + except: + message = f"Error: Cannot insert with query: {sql}; and the data: {data}" + logger.error(message) + finally: + self.database.commit() + self.cursor.close() def get_data(self, sql, query_data=None): try: @@ -132,13 +146,12 @@ def get_data(self, sql, query_data=None): for row in self.cursor: data.append(row) return data - except: - message = "Error: Can't get data_management data." - logger.error(message) + except Exception as error: + logger.error(str(error)) requests.post( slack_url, headers={'Content-type': 'application/json', }, - data='{"text":"' + message + '"}' + data='{"text":"' + "Error: " + str(error) + '"}' ) finally: self.cursor.close() diff --git a/data_management/main.py b/data_management/main.py index 9f47c8e..c637dcd 100644 --- a/data_management/main.py +++ b/data_management/main.py @@ -43,6 +43,9 @@ def upsert_new_spectrack_specimens(self): def load_biopsy_tracking(self): return self.tableau.load_biopsy_tracking() + def load_biopsy_tracking_long(self): + return self.tableau.load_biopsy_tracking_long() + def load_data_manager_data(self): return self.tableau.load_data_manager_data() @@ -88,6 +91,7 @@ def update_biomarker_tracking_redcap_ids(self): if args.action == "insert" or args.action == "update": records_modified = main.load_biopsy_tracking() records_modified = records_modified + main.load_data_manager_data() + records_modified = records_modified + main.load_biopsy_tracking_long() if "records_modified" in locals(): logger.info(f"{records_modified} records modified") diff --git a/data_management/model/__init__.py b/data_management/model/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/data_management/model/dlu_package.py b/data_management/model/dlu_package.py index 7c74e7b..649112a 100644 --- a/data_management/model/dlu_package.py +++ b/data_management/model/dlu_package.py @@ -19,6 +19,7 @@ def __init__(self): self.dlu_protocol = None self.dlu_data_generators = None self.dlu_files = [] + self.dlu_upload_type = None self.submitter_name = None self.known_specimen = None self.redcap_id = None @@ -61,6 +62,7 @@ def get_dmd_dpi_tuple(self): self.dlu_subject_id, self.dlu_error, self.dlu_lfu, + self.dlu_upload_type, self.globus_dlu_status ) diff --git a/data_management/process_bulk_uploads.py b/data_management/process_bulk_uploads.py index f449316..5bc1459 100644 --- a/data_management/process_bulk_uploads.py +++ b/data_management/process_bulk_uploads.py @@ -27,8 +27,8 @@ class ProcessBulkUploads: def __init__(self, data_directory: str, globus_only: bool = False, globus_root: str = None, preserve_path: bool = False, bypass_dup_check: bool = False): try: self.dlu_management = DluManagement() - except: - logger.error("There was a problem loading the Data Management library.") + except Exception as e: + logger.exception("There was a problem loading the Data Management library.", e) try: self.submitter = os.environ["mongo_submitter_id"] self.submitter_name = os.environ["submitter_name"] @@ -68,12 +68,12 @@ def process_files(self, manifest_files_arr: list) -> list: logger.info(file_full_path) size = os.path.getsize(file_full_path) file_info = self.dlu_file_handler.split_path(file_path, self.preserve_path) - if file["file_metadata"] and "md5_hash" in file["file_metadata"]: + if "file_metadata" in file and "md5_hash" in file["file_metadata"]: checksum = file["file_metadata"]["md5_hash"] del file["file_metadata"]["md5_hash"] else: checksum = calculate_checksum(file_full_path) - if file["file_metadata"]: + if "file_metadata" in file: metadata = file["file_metadata"] else: metadata = {} @@ -81,7 +81,29 @@ def process_files(self, manifest_files_arr: list) -> list: dlu_files.append(dlu_file) return dlu_files + def process_globus_only_files(self, manifest_files_arr: list) -> list: + logger.info("globus only file processing") + files = [] + for file in manifest_files_arr: + file_path = file["relative_file_path_and_name"] + file_full_path = os.path.join(self.data_directory, file_path) + file_info = self.dlu_file_handler.split_path(file_path, self.preserve_path) + if "file_metadata" in file and "md5_hash" in file["file_metadata"]: + checksum = file["file_metadata"]["md5_hash"] + del file["file_metadata"]["md5_hash"] + if "file_metadata" in file: + metadata = file["file_metadata"] + else: + metadata = {} + + # Since this is going directly to globus, we don't need to calc checksum or filesize, and we need + # the path to the file on disk to actually copy it + dlu_file = DLUFile(file_info["file_name"], file_full_path, '', 0, metadata) + files.append(dlu_file) + return files + def process_bulk_uploads(self): + logger.info("in process bulk uploads") for manifest_name in MANIFEST_FILE_NAMES: manifest_file_path = os.path.join(self.data_directory, manifest_name) if os.path.isfile(manifest_file_path): @@ -93,13 +115,14 @@ def process_bulk_uploads(self): manifest_data = yaml.safe_load(stream) if manifest_data["package_type"] == "EM Images": package_type = PackageType.ELECTRON_MICROSCOPY - elif manifest_data["package_type"] == "Segmentation Masks": + elif manifest_data["package_type"] == "Segmentation Masks & Pathomics Vectors": package_type = PackageType.SEGMENTATION elif manifest_data["package_type"] == "Multimodal Images": package_type = PackageType.MULTI_MODAL elif manifest_data["package_type"] == "Single-cell RNA-Seq": package_type = PackageType.SINGLE_CELL else: + logger.info("package type is: ", manifest_data["package_type"]) package_type = PackageType.OTHER if "tis" in manifest_data: tis = manifest_data["tis"] @@ -111,12 +134,13 @@ def process_bulk_uploads(self): redcap_id = experiment["files"][0]["redcap_id"] sample_id = experiment["files"][0]["spectrack_sample_id"] if redcap_id and redcap_id.startswith("S-"): + logger.info("found redcap id starting with S-") sample_id = redcap_id redcap_results = self.dlu_management.get_redcapid_by_subjectid(sample_id) - if redcap_results is not None and len(redcap_results) == 1: + if redcap_results is not None and len(redcap_results) > 1: redcap_id = redcap_results else: - redcap_id = "" + redcap_id = None if not sample_id: sample_id = redcap_id @@ -124,7 +148,10 @@ def process_bulk_uploads(self): if (sample_id and len(self.dlu_management.get_participant_by_redcap_id(redcap_id)) > 0) or \ (self.globus_only and sample_id): logger.info(f"Trying to add package for {redcap_id} / {sample_id}") - dlu_file_list = self.process_files(experiment["files"]) + if self.globus_only: + dlu_file_list = self.process_globus_only_files(experiment["files"]) + else: + dlu_file_list = self.process_files(experiment["files"]) if package_type == PackageType.SEGMENTATION: dlu_file_list.append(self.get_single_file(SEGMENTATION_README)) tis = "UFL" @@ -150,6 +177,7 @@ def process_bulk_uploads(self): package.dlu_version = 4 package.dlu_dataset_information_version = 1 package.dlu_error = 0 + package.dlu_upload_type = 'KPMP Biopsy' if self.globus_only: package.globus_dlu_status = None else: diff --git a/data_management/rebuild.sh b/data_management/rebuild.sh index 985f891..5b70a1d 100644 --- a/data_management/rebuild.sh +++ b/data_management/rebuild.sh @@ -1,2 +1,2 @@ python3 setup.py install --user -docker build -t kingstonduo/data-management:1.8.1 . +docker build -t kingstonduo/data-management:1.10 . diff --git a/data_management/services/dlu_filesystem.py b/data_management/services/dlu_filesystem.py index 82dd329..f1b9d44 100644 --- a/data_management/services/dlu_filesystem.py +++ b/data_management/services/dlu_filesystem.py @@ -8,6 +8,7 @@ from zarr_checksum import compute_zarr_checksum from zarr_checksum.generators import yield_files_local from mmap import mmap, ACCESS_READ +import subprocess logger = logging.getLogger("DLUFilesystem") logger.setLevel(logging.INFO) @@ -114,6 +115,27 @@ def chown_dir(self, package_id: str, files: list[DLUFile], user_id): if os.stat(subdir_path).st_uid != user_id or os.stat(subdir_path).st_gid != int(os.environ['dlu_group']): os.chown(subdir_path, user_id, int(os.environ['dlu_group'])) + def rename_and_move_files(self, file_list: list[DLUFile], slide_name_map, package_id ): + dluFiles = [] + dest_package_directory = os.path.join(self.dlu_data_directory, self.dlu_package_dir_prefix + package_id) + if os.path.exists(dest_package_directory): + shutil.rmtree(dest_package_directory) + if not os.path.exists(dest_package_directory): + logger.info("Creating directory " + dest_package_directory) + os.makedirs(dest_package_directory, exist_ok=True) + + source_package_directory = self.globus_data_directory + '/' + self.globus_dir_prefix + package_id + for file in file_list: + dest_file = os.path.join(dest_package_directory, slide_name_map[file.name]) + logger.info("Copying file " + os.path.join(source_package_directory, file.name) + " to " + + os.path.join(dest_package_directory, slide_name_map[file.name])) + shutil.copy(os.path.join(source_package_directory, file.name), + dest_file) + file = DLUFile(name=slide_name_map[file.name], path=dest_package_directory, + checksum=calculate_checksum(dest_file), size=os.path.getsize(dest_file)) + dluFiles.append(file) + return dluFiles + def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: bool = False, no_src_package: bool = False): files_copied = 0 source_wd = os.getcwd() @@ -121,8 +143,10 @@ def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: b if os.path.exists(dest_package_directory): shutil.rmtree(dest_package_directory) for file in file_list: + source_package_directory = self.globus_data_directory + '/' + self.globus_dir_prefix # I.e. isn't a bulk upload that doesn't already have a package ID. + logger.info(source_package_directory) if not no_src_package: source_package_directory = source_package_directory + package_id if file.path and os.path.isdir(file.path): @@ -136,7 +160,6 @@ def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: b if os.path.isdir(os.path.join(source_package_directory, o))] dir = "".join(subdirs) if len(os.listdir(source_package_directory)) == 1 and os.path.isdir(source_package_directory) and os.path.isdir(dir): - os.chdir(dir) allfiles = os.listdir(dir) for f in allfiles: @@ -167,6 +190,10 @@ def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: b elif os.path.isfile(source_file): logger.info("Copying file to " + dest_file) shutil.copy(source_file, dest_file) + else: + source_file = os.path.join(source_package_directory, file.path) + logger.info("Copying file to " + dest_file) + shutil.copy(source_file, dest_file) files_copied = files_copied + 1 else: logger.warning(dest_file + " already exists. Skipping.") @@ -183,46 +210,53 @@ def validate_package_directories(self, package_id: str): logger.error("Directory for package " + package_id + " failed validation.") return success - def process_globus_directory(self, directoryListing, globusDirectories: list[DirectoryInfo], packageId, initialDir): - for dir in globusDirectories: + def process_globus_directory(self, directory_listing, globus_directories: list[DirectoryInfo], package_id, + initial_dir, calculate_checksums: bool = True): + for dir in globus_directories: prefix = "" - if not initialDir == "": - prefix = initialDir + "/" - currentDir = prefix + os.path.basename(dir.directory_path) + if not initial_dir == "": + prefix = initial_dir + "/" + current_dir = prefix + os.path.basename(dir.directory_path) - globusFiles = [] - globusDirectories = [] + globus_files = [] + globus_directories = [] for item in dir.file_details: if os.path.isdir(item.path): - globusDirectories.append(DirectoryInfo(item.path)) + globus_directories.append(DirectoryInfo(item.path, calculate_checksums=calculate_checksums)) else: - globusFiles.append(item) - directoryListing[currentDir] = globusFiles - if len(globusDirectories) > 0: - self.process_globus_directory(directoryListing, globusDirectories, packageId, currentDir) - return directoryListing - - def match_files(self, packageId) -> list[DLUFile]: - topLevelDir = DirectoryInfo(self.globus_data_directory + '/' + self.globus_dir_prefix + packageId) - globusFiles = [] - globusDirectories = [] - for obj in topLevelDir.file_details: + globus_files.append(item) + directory_listing[current_dir] = globus_files + if len(globus_directories) > 0: + self.process_globus_directory(directory_listing, globus_directories, package_id, current_dir, + calculate_checksums) + return directory_listing + + def match_files(self, package_id: str, calculate_checksums: bool = True) -> list[DLUFile]: + top_level_dir = DirectoryInfo(self.globus_data_directory + '/' + self.globus_dir_prefix + package_id, + calculate_checksums=calculate_checksums) + globus_files = [] + globus_directories = [] + for obj in top_level_dir.file_details: if os.path.isdir(obj.path): - directory = DirectoryInfo(obj.path) - globusDirectories.append(directory) + directory = DirectoryInfo(obj.path, calculate_checksums=calculate_checksums) + globus_directories.append(directory) else: - globusFiles.append(obj) - filesInGlobusDirectories = {} - filesInGlobusDirectories[""] = globusFiles - currentDir = "" - filesInGlobusDirectories = self.process_globus_directory(filesInGlobusDirectories, globusDirectories, packageId, currentDir) - return self.get_globus_file_paths(filesInGlobusDirectories) - - def get_globus_file_paths(self, filesInGlobusDirectories: dict[str, list[DLUFile]]) -> list[DLUFile]: + globus_files.append(obj) + files_in_globus_directories = {} + files_in_globus_directories[""] = globus_files + current_dir = "" + files_in_globus_directories = self.process_globus_directory(files_in_globus_directories, globus_directories, + package_id, current_dir, calculate_checksums) + return self.get_globus_file_paths(files_in_globus_directories) + + def get_globus_file_paths(self, files_in_globus_directories: dict[str, list[DLUFile]]) -> list[DLUFile]: fileList = [] - for dir, files in filesInGlobusDirectories.items(): + for dir, files in files_in_globus_directories.items(): for file in files: prefix = dir + "/" if dir else "" file.name = prefix + file.name fileList.append(file) - return fileList \ No newline at end of file + return fileList + + def validate_all_wsi_files_present(self, ): + return True \ No newline at end of file diff --git a/data_management/services/dlu_management.py b/data_management/services/dlu_management.py index 2a59102..1bb0dc0 100644 --- a/data_management/services/dlu_management.py +++ b/data_management/services/dlu_management.py @@ -101,6 +101,11 @@ def update_dlu_package(self, package_id: str, fields_values: dict): values = query_info["values"][0:] + (package_id,) query = "UPDATE data_manager_data_v SET " + query_info["set_clause"] + " WHERE dlu_package_id = %s" self.db.insert_data(query, values) + + def update_missing_slides(self, redcap_id: str): + return self.db.get_data( + "update slide_scan_curation set missing_slides = 1 where redcap_id = %s", (redcap_id,) + ), def insert_dlu_file(self, values): query = "INSERT INTO dlu_file (dlu_fileName, dlu_package_id, dlu_file_id, dlu_filesize, dlu_md5checksum, dlu_modified_at, dlu_metadata) VALUES(%s, %s, %s, %s, %s, %s, %s)" @@ -150,7 +155,7 @@ def find_all_files(self): ) def update_md5(self, file_id: str, checksum: str, package_id: str): - self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s", + return self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s", (checksum, file_id,package_id)) def move_globus_files_to_dlu(self, package_id: str): @@ -186,12 +191,22 @@ def get_biopsy_tracking(self): ) return result + def get_biopsy_tracking_long(self): + result = self.db.get_data( + "select * from biopsy_tracking_long_v" + ) + return result + def get_data_manager_data(self): result = self.db.get_data( """ - select dm.id, dm.dlu_package_id, dm.dlu_created, dm.dlu_submitter, dm.dlu_tis, dm.dlu_packageType, dm.dlu_subject_id, dm.dlu_error, dm.redcap_id, dm.known_specimen, dm.user_package_ready, dm.package_validated, dm.ready_to_move_from_globus, dm.globus_dlu_status, dm.package_status, dm.current_owner, dm.ar_promotion_status, dm.sv_promotion_status, dm.release_version, r.release_date, dm.removed_from_globus, dm.notes - from data_manager_data_v dm - left outer join `release` r on dm.release_version = r.release_version + SELECT dm.id, dm.dlu_package_id, dm.dlu_created, dm.dlu_submitter, dm.dlu_tis, dm.dlu_packageType, + dm.dlu_subject_id, dm.dlu_error, dm.redcap_id, dm.known_specimen, dm.user_package_ready, + dm.package_validated, dm.ready_to_move_from_globus, dm.globus_dlu_status, dm.dlu_upload_type, dm.upload_type_detail, dm.atlas_status, + dm.current_owner, dm.ar_promotion_status, dm.sv_promotion_status, dm.release_version, r.release_date, + dm.removed_from_globus, dm.notes + FROM data_manager_data_v dm + LEFT OUTER JOIN `release` r on dm.release_version = r.release_version """ ) return result @@ -210,24 +225,85 @@ def delete_files_by_package_id(self, package_id: str): return self.db.get_data("DELETE FROM dlu_file WHERE dlu_package_id = %s", (package_id,)) def get_equal_num_rows(self): - result = self.db.get_data("SELECT (SELECT COUNT(*) FROM slide_manifest_import) = (SELECT COUNT(*) FROM slide_scan_curation) AS equal_num_rows") + result = self.db.get_data("SELECT (SELECT COUNT(*) FROM slide_manifest_import) = " + "(SELECT COUNT(*) FROM slide_scan_curation) AS equal_num_rows") return result[0]["equal_num_rows"] def get_new_slide_manifest_import_rows(self): - return self.db.get_data("SELECT * FROM slide_manifest_import WHERE image_id NOT IN (SELECT image_id FROM slide_scan_curation)") + return self.db.get_data("SELECT * FROM slide_manifest_import WHERE image_id NOT IN " + "(SELECT image_id FROM slide_scan_curation)") def get_spectrack_redcap_record_id(self, kit_id): - result = self.db.get_data("SELECT spectrack_redcap_record_id FROM spectrack_specimen WHERE spectrack_specimen_kit_id = %s LIMIT 1", (kit_id,)) + result = self.db.get_data("SELECT spectrack_redcap_record_id FROM spectrack_specimen " + "WHERE spectrack_specimen_kit_id = %s LIMIT 1", (kit_id,)) if len(result) > 0 and "spectrack_redcap_record_id" in result[0]: return result[0]["spectrack_redcap_record_id"] else: return None + + def get_redcap_ids_with_null_package_id(self): + return self.db.get_data( + "select unique redcap_id from slide_scan_curation where dlu_package_id is null and error_message is null and redcap_id is not null", + (None), + ) + + def get_package_ids_for_redcap_id(self, redcap_id): + return self.db.get_data( + "select dlu_package_id from dlu_package_inventory where dlu_subject_id = %s and globus_dlu_status IS NULL", (redcap_id,) + ) + + def update_package_ids_in_slide_scan_curation(self, redcap_id, package_id): + return self.db.insert_data( + "update slide_scan_curation set dlu_package_id = %s where redcap_id = %s and dlu_package_id is null and error_message is null", + (package_id, redcap_id,)) def insert_into_slide_scan_curation(self, values): - query = "INSERT INTO slide_scan_curation (image_id, kit_id, redcap_id) VALUES (%s, %s, %s)" + query = "INSERT INTO slide_scan_curation (image_id, kit_id, redcap_id, new_file_name, source_file_name, " \ + "source_folder_name) VALUES (%s, %s, %s, %s, %s, %s)" self.db.insert_data(query, values) return query % values + def get_slide_manifest_import_by_kit(self, kit_id, stain): + return self.db.get_data("SELECT * FROM slide_manifest_import WHERE outside_acc= %s AND stain = %s " + "ORDER BY stain, block_id", + (kit_id,stain,)) + + def set_error_message_slide_scan_curation(self, error, image_id): + return self.db.insert_data("UPDATE slide_scan_curation set error_message = %s where image_id = %s", + (error, image_id,)) + + def set_error_message_slide_scan_curation_redcap_id(self, error, redcap_id): + return self.db.insert_data_no_alert("UPDATE slide_scan_curation set error_message = %s where redcap_id = %s", + (error, redcap_id,)) + + def find_slide_scan_info_by_package_id(self, package_id): + return self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s", + (package_id,)) + + def is_package_missing_slides(self, package_id): + return self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s and missing_slides = 1", + (package_id,)) + + def slides_marked_missing_by_redcap_id(self, redcap_id: str): + return self.db.get_data("SELECT * FROM slide_scan_v WHERE redcap_id = %s AND missing_slides = 1", + (redcap_id,)) + + def get_missing_slides_from_view(self, redcap_id: str): + return self.db.get_data("select * from missing_slides_v where spectrack_redcap_record_id = %s", + (redcap_id,)) + + def is_slides_in_error(self, package_id): + return self.db.get_data("SELECT * FROM slide_scan_curation WHERE dlu_package_id = %s and error_message IS NOT NULL", + (package_id,)) + + def find_not_approved_filenames(self, package_id): + return self.db.get_data("SELECT * FROM slide_scan_curation WHERE approve_file_name = 'yes' AND dlu_package_id = %s", + (package_id,)) + + def update_missing_slide_flag(self, image_id): + return self.db.insert_data("UPDATE slide_scan_curation SET missing_slides = 0 WHERE image_id = %s", + (image_id,)) + if __name__ == "__main__": dlu_management = DluManagement() dlu_management.get_data_management_tables() diff --git a/data_management/services/dlu_mongo.py b/data_management/services/dlu_mongo.py index 8830300..f1ef6e7 100644 --- a/data_management/services/dlu_mongo.py +++ b/data_management/services/dlu_mongo.py @@ -15,7 +15,7 @@ class PackageType(Enum): ELECTRON_MICROSCOPY = "Electron Microscopy Imaging" - SEGMENTATION = "Segmentation Masks" + SEGMENTATION = "Segmentation Masks & Pathomics Vectors" MULTI_MODAL = "Multimodal Imaging Mass Spectrometry" SINGLE_CELL = "Single-cell RNA-Seq" OTHER = "Other" diff --git a/data_management/services/dlu_package_inventory.py b/data_management/services/dlu_package_inventory.py index 03fc734..8809d9d 100644 --- a/data_management/services/dlu_package_inventory.py +++ b/data_management/services/dlu_package_inventory.py @@ -17,19 +17,19 @@ def reconnect(self): self.db = MYSQLConnection() self.database = self.db.get_db_connection() - def get_dlu_file(self, status): + def get_dlu_package(self, status): return self.db.get_data( 'SELECT * FROM data_management.data_manager_data_v WHERE ready_to_move_from_globus = %s AND (globus_dlu_status IS NULL OR globus_dlu_status = "recalled")', (status,) ) - def set_dlu_file_waiting(self, status, package_id): + def set_dlu_package_waiting(self, status, package_id): return self.db.insert_data( 'UPDATE data_management.data_manager_data_v SET globus_dlu_status = "waiting" WHERE ready_to_move_from_globus = %s AND dlu_package_id = %s', (status, package_id,) ) - def get_waiting_files(self): + def get_waiting_packages(self): return self.db.get_data( 'Select * from data_management.data_manager_data_v where globus_dlu_status = "waiting" and ready_to_move_from_globus = "yes"' ) diff --git a/data_management/services/dlu_state.py b/data_management/services/dlu_state.py index 26edbcd..af92949 100644 --- a/data_management/services/dlu_state.py +++ b/data_management/services/dlu_state.py @@ -41,7 +41,7 @@ def set_package_state(self, package_id: str, state: PackageState, codicil = None if e and e.strerror: logger.error("There was an error updating the state: " + e.strerror) else: - logger.error("There was an error updating the state.") + logger.exception("There was a problem updating state", e) def clear_cache(self): requests.get(self.cache_clear_url) diff --git a/data_management/services/redcap.py b/data_management/services/redcap.py index a36183a..8cee072 100644 --- a/data_management/services/redcap.py +++ b/data_management/services/redcap.py @@ -82,7 +82,7 @@ def parse_redcap_records_by_participant(self, redcap_id, field_name): return "Intra-operative Needle Biopsy" else: logger.error( - f'Error: unknown value for record: {record["record"]} with field_name: {record["field_name"]} value: {record["value"]}' + f'Error: unknown value for record with field_name: {record["field_name"]} value: {record["value"]}' ) os.sys.exit() logger.debug("End: parse_redcap_records_by_participant") @@ -195,7 +195,7 @@ def parse_participant_records(self): else: logger.error( - f'Error: Additional fields found we are not mapping: {record["record"]} with field_name: {record["field_name"]} value: {record["value"]}' + f'Error: Additional fields found we are not mapping: record with field_name: {record["field_name"]}' ) participant["redcap_tissue_source"] = "KPMP Recruitment Site" # hard-coded value provided by Jonas diff --git a/data_management/services/slide_management.py b/data_management/services/slide_management.py new file mode 100644 index 0000000..f51445e --- /dev/null +++ b/data_management/services/slide_management.py @@ -0,0 +1,163 @@ +import logging +from pathlib import PureWindowsPath + +logger = logging.getLogger("services-dlu_package_watcher") +logger.setLevel(logging.INFO) + + +class SlideScanModel: + + def __init__(self, image_id: str, redcap_id: str, kit_id: str, new_file_name: str, source_file_name: str, + source_folder_name: str): + self.image_id = image_id + self.redcap_id = redcap_id + self.kit_id = kit_id + self.new_file_name = new_file_name + self.source_file_name = source_file_name + self.source_folder_name = source_folder_name + + def get_dmd_tuple(self): + return ( + self.image_id, + self.kit_id, + self.redcap_id, + self.new_file_name, + self.source_file_name, + self.source_folder_name + ) + + +def determine_stain(stain_info, block_id): + if stain_info == "H&E": + if block_id != "OCT": + return "HE" + elif block_id == "OCT": + return "FRZ" + elif stain_info == "TRICHRM": + return "TRI" + elif stain_info == "PAS": + return "PAS" + elif stain_info == "Toluidine Blue": + return "TOL" + elif stain_info == "Jones Methenamine Silver (SIL)": + return "SIL" + return None + + +def calculate_denominator(slides_for_kit, block_id): + denominator = 0 + for slide in slides_for_kit: + if block_id != 'OCT' and slide['block_id'] != 'OCT': + denominator = denominator + 1 + elif block_id == 'OCT' and slide['block_id'] == 'OCT': + denominator = denominator + 1 + return denominator + + +def calculate_numerator(block_id, sample_id, slides_for_kit): + numerator = 1 + # Keep counting until we find this slide + for slide in slides_for_kit: + if slide['accession'] != sample_id: + if block_id != 'OCT' and slide['block_id'] != 'OCT': + numerator = numerator + 1 + elif block_id == 'OCT' and slide['block_id'] == 'OCT': + numerator = numerator + 1 + else: + break + return numerator + + +class SlideManagement: + def __init__(self, db): + self.db = db + + def process_slide_manifest_imports(self): + new_records = self.db.get_new_slide_manifest_import_rows() + redcap_ids_processed = [] + for record in new_records: + record_in_error = False + error_message = "" + kit_id = record["outside_acc"] + image_id = record["image_id"] + redcap_id = self.db.get_spectrack_redcap_record_id(kit_id) + if redcap_id is None: + error_message = "No redcap_id found for kit_id " + kit_id + "; " + logger.error(error_message) + continue + + if record["accession"] is not None: + new_file_name = self.determine_new_slide_name(sample_id=record["accession"], kit_id=kit_id, + stain_info=record["stain"], block_id=record["block_id"]) + else: + new_file_name = None + record_in_error = True + error_message = "Missing sample id, unable to determine file name; " + + # Sometimes the file location gets copied in with a single leading slash + if record["file_location"].count("\\") < 2: + source_file_name = None + source_folder_name = None + record_in_error = True + error_message = error_message + "Unable to determine source file or folder; " + else: + file_location = PureWindowsPath(record['file_location']) + source_file_name = file_location.name + source_folder_name = file_location.parent.name + + slide_scan = SlideScanModel(image_id=image_id, redcap_id=redcap_id, kit_id=kit_id, + new_file_name=new_file_name, source_file_name=source_file_name, + source_folder_name=source_folder_name) + self.db.insert_into_slide_scan_curation(slide_scan.get_dmd_tuple()) + + check_missing_slides = self.db.get_missing_slides_from_view(redcap_id) + redcap_ids_processed.append(redcap_id) + if all(check_missing_slides): + self.db.update_missing_slides(redcap_id) + + if record_in_error: + self.db.set_error_message_slide_scan_curation(image_id=image_id, error=error_message) + logger.info("Processed " + str(len(new_records)) + " new slide_manifest_import records.") + + for redcap_id in redcap_ids_processed: + self.update_missing_slides(redcap_id) + + def update_missing_slides(self, redcap_id: str): + # This MAY seem redundant, however this will ensure that we unmark any missing slides records that just got + # the missing one added + missing_slides = self.db.get_missing_slides_from_view(redcap_id) + if not missing_slides or len(missing_slides) ==0 : + slides_marked_missing = self.db.slides_marked_missing_by_redcap_id(redcap_id) + if slides_marked_missing and len(slides_marked_missing) > 0: + for slide in slides_marked_missing: + self.db.update_missing_slide_flag(slide['image_id']) + + def determine_new_slide_name(self, sample_id: str, kit_id: str, stain_info: str, block_id: str): + slides_for_kit = self.db.get_slide_manifest_import_by_kit(kit_id, stain_info) + + denominator = calculate_denominator(slides_for_kit, block_id) + + numerator = calculate_numerator(block_id, sample_id, slides_for_kit) + + stain_type = determine_stain(stain_info, block_id) + # If we are unable to determine the stain type, we will leave the new filename blank + if stain_type is None: + logger.info("Unable to determine stain type from stain: " + stain_info + " and block_id: " + block_id) + return None + else: + return sample_id + "_" + stain_type + "_" + str(numerator) + "of" + str(denominator) + ".svs" + + def fill_in_package_ids(self): + redcap_id_list = self.db.get_redcap_ids_with_null_package_id() + if len(redcap_id_list) != 0: + for row in redcap_id_list: + redcap_id = row['redcap_id'] + package_id_list = self.db.get_package_ids_for_redcap_id(redcap_id) + if None not in package_id_list and len(package_id_list) == 1: + package_id = package_id_list[0]['dlu_package_id'] + self.db.update_package_ids_in_slide_scan_curation(redcap_id=redcap_id, package_id=package_id) + logger.info("Updated package id " + package_id + " for redcap id " + redcap_id) + elif len(package_id_list) > 1: + error_message = "Multiple dlu_package_ids found for redcap_id " + redcap_id + ", unable to fill in package id." + logger.info(error_message) + self.db.set_error_message_slide_scan_curation_redcap_id(error=error_message, redcap_id=redcap_id) \ No newline at end of file diff --git a/data_management/services/tableau.py b/data_management/services/tableau.py index bfa23f0..0c678fe 100644 --- a/data_management/services/tableau.py +++ b/data_management/services/tableau.py @@ -15,6 +15,12 @@ def truncate_biopsy_tracking(self): ) return result + def truncate_biopsy_tracking_long(self): + result = self.db_tableau.get_data( + "truncate table biopsy_tracking_long" + ) + return result + def truncate_data_manager_data(self): result = self.db_tableau.get_data( "truncate table data_manager_data" @@ -27,18 +33,29 @@ def load_biopsy_tracking(self): query = "INSERT INTO biopsy_tracking (redcap_record_id, `Whole Slide Images`, `Single-nucleus RNA-Seq Status`, `Single-nucleus RNA-Seq Specimen ID`, `ATAC RNA-seq Status`, `ATAC RNA-seq Specimen ID`, `Single-cell RNA-Seq Status`, `Single-cell RNA-Seq Specimen ID`, `Regional Transcriptomics Status`, `Regional Transcriptomics Specimen ID`, `Bulk total/mRNA Experiment Status`, `Bulk total/mRNA Experiment Specimen ID`, `3D Tissue Imaging and Cytometry Experiment Status`, `3D Tissue Imaging and Cytometry Experiment Specimen ID`, `Regional Proteomics Experiment Status`, `Regional Proteomics Specimen ID`, `Spatial Metabolomics Experiment Status`, `Spatial Metabolomics Specimen ID`, `Spatial Lipidomics Experiment Status`, `Spatial Lipidomics Specimen ID`, `Spatial N-glycomics Experiment Status`, `Spatial N-glycomics Specimen ID`, `Spatial Transcriptomics Experiment Status`, `Spatial Transcriptomics Specimen ID`, `CODEX (IU) Experiment Status`, `CODEX (IU) Specimen ID`, `CODEX (UCSF) Experiment Status`, `CODEX (UCSF) Specimen ID`, `IMC Experiment Status`, `IMC Specimen ID`, `DNA Methyl-seq Experiment Status`, `DNA Methyl-seq Specimen ID`, `CUT & RUN Experiment Status`, `CUT & RUN Specimen ID`, `Metabolon Timed Urine - UHPLC MS-MS Experiment Status`, `Metabolon Timed Urine - UHPLC MS-MS Specimen ID`, `Metabolon Plasma EDTA - UHPLC MS-MS Experiment Status`, `Metabolon Plasma EDTA - UHPLC MS-MS Specimen ID`, `MSDQ120 Spot Urine Biomarker Status`, `MSDQ120 Spot Urine Biomarker Specimen ID`, `MSDQ120 Plasma EDTA Biomarker Status`, `MSDQ120 Plasma EDTA Biomarker Specimen ID`, `Litholink Timed Urine - BCAU680 - Status`, `Litholink Timed Urine - BCAU680 - Specimen ID`, `Stool Microbiome - Qaigen NextEra Status`, `Stool Microbiome - Qaigen NextEra Specimen ID`, `Clinical Chemistry Spot/Timed Urine - BCAU5812 Status`, `Clinical Chemistry Spot/Timed Urine - BCAU5812 Specimen ID`, `Clinical Chemistry Serum - BCAU5812 Status`, `Clinical Chemistry Serum - BCAU5812 Specimen ID`, `SomaScan Plasma EDTA - Status`, `SomaScan Plasma EDTA - Specimen ID`, `SomaScan Spot Urine - Status`, `SomaScan Spot Urine - Specimen ID`, `Descriptor Scoring (TIV)`, `Segmentation/Features Data - Status`, `fMRI - Status`, `Retinal - Status`) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" records_modified = 0 for result in bt_results: - insert_result = self.db_tableau.insert_data(query, tuple(result.values())) + self.db_tableau.insert_data(query, tuple(result.values())) + records_modified = records_modified + 1 + return records_modified + + def load_biopsy_tracking_long(self): + self.truncate_biopsy_tracking_long() + bt_results = self.dlu_management.get_biopsy_tracking_long() + query = "INSERT INTO biopsy_tracking_long(redcap_id, specimen_id, dlu_tis, dlu_packageType, status) VALUES(%s, %s, %s, %s, %s)" + records_modified = 0 + for result in bt_results: + self.db_tableau.insert_data(query, tuple(result.values())) records_modified = records_modified + 1 return records_modified def load_data_manager_data(self): self.truncate_data_manager_data() results = self.dlu_management.get_data_manager_data() - query = "INSERT INTO kpmp_dvc_integration.data_manager_data(id, dlu_package_id, dlu_created, dlu_submitter, dlu_tis, dlu_packageType, dlu_subject_id, dlu_error, redcap_id, known_specimen, user_package_ready, package_validated, ready_to_move_from_globus, globus_dlu_status, package_status, current_owner, ar_promotion_status, sv_promotion_status, release_version, release_date, removed_from_globus, notes) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + print(len(results)) + query = "INSERT INTO kpmp_dvc_integration.data_manager_data(id, dlu_package_id, dlu_created, dlu_submitter, dlu_tis, dlu_packageType, dlu_subject_id, dlu_error, redcap_id, known_specimen, user_package_ready, package_validated, ready_to_move_from_globus, globus_dlu_status, upload_type, upload_type_detail, atlas_status, current_owner, ar_promotion_status, sv_promotion_status, release_version, release_date, removed_from_globus, notes) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" records_modified = 0 for result in results: result["dlu_created"] = result["dlu_created"].strftime('%Y-%m-%d %H:%M:%S') - insert_result = self.db_tableau.insert_data(query, tuple(result.values())) + self.db_tableau.insert_data(query, tuple(result.values())) records_modified = records_modified + 1 return records_modified diff --git a/data_management/sql/biopsy_tracking_long.sql b/data_management/sql/biopsy_tracking_long.sql new file mode 100644 index 0000000..e302495 --- /dev/null +++ b/data_management/sql/biopsy_tracking_long.sql @@ -0,0 +1,10 @@ +CREATE TABLE biopsy_tracking_long +( + redcap_id varchar(100) NULL, + specimen_id varchar(100) NULL, + dlu_tis varchar(100) NULL, + dlu_packageType varchar(100) NULL, + status varchar(100) NULL +) ENGINE=InnoDB +DEFAULT CHARSET=latin1 +COLLATE=latin1_swedish_ci; \ No newline at end of file diff --git a/data_management/sql/data_manager_data.sql b/data_management/sql/data_manager_data.sql index 7778095..12504dc 100644 --- a/data_management/sql/data_manager_data.sql +++ b/data_management/sql/data_manager_data.sql @@ -1,27 +1,28 @@ -- kpmp_dvc_integration.data_manager_data definition CREATE TABLE `data_manager_data` ( - `id` int(11) DEFAULT 0, - `dlu_package_id` varchar(100) DEFAULT NULL, - `dlu_created` datetime NOT NULL, - `dlu_submitter` varchar(100) DEFAULT NULL, - `dlu_tis` varchar(100) DEFAULT NULL, - `dlu_packageType` varchar(100) DEFAULT NULL, - `dlu_subject_id` varchar(200) DEFAULT NULL, - `dlu_error` tinyint(1) DEFAULT 0, - `redcap_id` text DEFAULT NULL, - `known_specimen` text DEFAULT NULL, - `user_package_ready` char(1) DEFAULT 'N', - `package_validated` text DEFAULT NULL, - `ready_to_move_from_globus` varchar(100) DEFAULT NULL, - `globus_dlu_status` varchar(255) DEFAULT NULL, - `package_status` text DEFAULT NULL, - `current_owner` varchar(100) DEFAULT NULL, - `ar_promotion_status` varchar(100) DEFAULT NULL, - `sv_promotion_status` varchar(100) DEFAULT NULL, - `release_version` varchar(100) DEFAULT NULL, - `removed_from_globus` varchar(100) DEFAULT NULL, - `notes` text DEFAULT NULL + `id` int(11) DEFAULT 0, + `dlu_package_id` varchar(100) DEFAULT NULL, + `dlu_created` datetime NOT NULL, + `dlu_submitter` varchar(100) DEFAULT NULL, + `dlu_tis` varchar(100) DEFAULT NULL, + `dlu_packageType` varchar(100) DEFAULT NULL, + `dlu_subject_id` varchar(200) DEFAULT NULL, + `dlu_error` tinyint(1) DEFAULT 0, + `redcap_id` text DEFAULT NULL, + `known_specimen` text DEFAULT NULL, + `user_package_ready` char(1) DEFAULT 'N', + `package_validated` text DEFAULT NULL, + `ready_to_move_from_globus` varchar(100) DEFAULT NULL, + `globus_dlu_status` varchar(255) DEFAULT NULL, + `upload_type` varchar(100) DEFAULT NULL, + `upload_type_detail` text DEFAULT NULL, + `atlas_status` text DEFAULT NULL, + `current_owner` varchar(100) DEFAULT NULL, + `ar_promotion_status` varchar(100) DEFAULT NULL, + `sv_promotion_status` varchar(100) DEFAULT NULL, + `release_version` varchar(100) DEFAULT NULL, + `release_date` datetime DEFAULT NULL, + `removed_from_globus` varchar(100) DEFAULT NULL, + `notes` text DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci; - -ALTER TABLE data_manager_data ADD release_date DATETIME AFTER release_version; \ No newline at end of file diff --git a/data_management/watch_files.py b/data_management/watch_files.py index 9b9d866..8caafed 100644 --- a/data_management/watch_files.py +++ b/data_management/watch_files.py @@ -5,6 +5,7 @@ from services.dlu_management import DluManagement from model.dlu_package import DLUPackage from services.dlu_mongo import DLUMongo +from services.slide_management import SlideManagement from dotenv import load_dotenv import logging @@ -28,16 +29,17 @@ def __init__ (self, db: DLUPackageInventory = None): self.dlu_file_handler = DLUFileHandler() self.dluPackage = DLUPackage() self.dlu_state = DLUState() + self.slide_management = SlideManagement(self.dlu_management) - def watch_for_files(self): - files = self.db.get_dlu_file("yes") - if len(files) == 0: + def watch_for_packages(self): + packages = self.db.get_dlu_package("yes") + if len(packages) == 0: logger.info( "No records were found with status 'yes' " ) else: - self.update_files_for_globus(files) - self.move_packages_to_DLU(files) + self.update_packages_for_globus(packages) + self.move_packages_to_DLU(packages) def watch_for_side_manifest_records(self): equal_num_rows = self.dlu_management.get_equal_num_rows() @@ -48,17 +50,12 @@ def watch_for_side_manifest_records(self): def update_slide_scan_curation(self): logger.info("Importing new row(s) into slide_scan_curation") - new_records = self.dlu_management.get_new_slide_manifest_import_rows() - for record in new_records: - redcap_id = self.dlu_management.get_spectrack_redcap_record_id(record["outside_acc"]) - slide_scan_tuple = (record["image_id"], record["outside_acc"], redcap_id) - query_string = self.dlu_management.insert_into_slide_scan_curation(slide_scan_tuple) - logger.info(query_string) + self.slide_management.process_slide_manifest_imports() - def update_files_for_globus(self, files): - for index, file_result in enumerate(files): - logger.info("Setting file status to 'waiting' on package " + file_result['dlu_package_id']) - self.db.set_dlu_file_waiting("yes", file_result['dlu_package_id']) + def update_packages_for_globus(self, packages): + for index, package_result in enumerate(packages): + logger.info("Setting file status to 'waiting' on package " + package_result['dlu_package_id']) + self.db.set_dlu_package_waiting("yes", package_result['dlu_package_id']) def process_file_paths(self, file_list: list[DLUFile]) -> list: dlu_files = [] @@ -67,18 +64,20 @@ def process_file_paths(self, file_list: list[DLUFile]) -> list: dlu_files.append(file) return dlu_files - def pickup_waiting_files(self): - files_in_waiting = self.db.get_waiting_files() - if len(files_in_waiting) == 0: + def pickup_waiting_packages(self): + packages_in_waiting = self.db.get_waiting_packages() + if len(packages_in_waiting) == 0: return logger.info( "No records were found with status 'waiting'" ) else: - self.move_packages_to_DLU(files_in_waiting) + self.move_packages_to_DLU(packages_in_waiting) def move_packages_to_DLU(self, packages): file_list = None + for _, package in enumerate(packages): + skip_copy = False package_id = package['dlu_package_id'] logger.info("Moving package " + package_id) @@ -90,37 +89,120 @@ def move_packages_to_DLU(self, packages): self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": error_msg }) continue - directory_info = DirectoryInfo(globus_data_directory) + if package['dlu_packageType'] == 'Whole Slide Images' and package['globus_dlu_status'] != 'recalled': + success = self.do_wsi_file_renames(globus_data_directory, package_id) + if not success: + continue + else: + skip_copy = True - if directory_info.file_count == 0 and directory_info.subdir_count == 0: - error_msg = "Error: package " + package_id + " has no files or top level subdirectory" - logger.info(error_msg + " Skipping.") - self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": error_msg }) + if not skip_copy: + directory_info = DirectoryInfo(globus_data_directory) + if not self.is_directory_valid(directory_info, package_id): + continue + + if directory_info.file_count == 0 and directory_info.subdir_count == 1: + contents = "".join(directory_info.dir_contents) + top_level_subdir = package_id + "/" + contents + file_list = self.dlu_file_handler.match_files(top_level_subdir) + else: + file_list = self.dlu_file_handler.match_files(package_id) + + self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details)) + self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user'])) + file_info = self.dlu_management.insert_dlu_files(package_id, file_list) + self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" }) + self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" }) + self.dlu_mongo.update_package_files(package_id, file_info) + + self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) + self.dlu_state.clear_cache() + + def fill_in_null_package_ids(self): + self.slide_management.fill_in_package_ids() + + def do_wsi_file_renames(self, globus_data_directory: str, package_id: str): + logger.info("starting rename process") + error_msg = "" + slide_scan_info = self.dlu_management.find_slide_scan_info_by_package_id(package_id) + if slide_scan_info is None or len(slide_scan_info) == 0: + self.log_err_message_slide_rename("Error: Package not found in slide_scan_v", package_id) + return False + + missing_slides = self.dlu_management.is_package_missing_slides(package_id) + if missing_slides is not None and len(missing_slides) > 0: + self.log_err_message_slide_rename( "Error: Package is missing slides", package_id) + return False + + slides_in_error = self.dlu_management.is_slides_in_error(package_id) + if slides_in_error is not None and len(slides_in_error) > 0: + self.log_err_message_slide_rename("Error: Package has some slides in error", package_id) + return False + + unapproved_files = self.dlu_management.find_not_approved_filenames(package_id) + if unapproved_files is not None and len(unapproved_files) > 0: + self.log_err_message_slide_rename("Error: Package has unapproved filenames", package_id) + return False + + directory_info = DirectoryInfo(globus_data_directory, calculate_checksums=False) + if not self.is_directory_valid(directory_info, package_id): + # This method logs errors in it, so no need to continue, or capture error message + return False + + if directory_info.file_count == 0 or directory_info.file_count != len(slide_scan_info): + self.log_err_message_slide_rename("Error: Globus file count does not match expectation", package_id) + return False + + # No need to calc checksums here, we just need the list of files + file_list = self.dlu_file_handler.match_files(package_id, calculate_checksums=False) + expected_slides = [] + slide_name_map = {} + for slide in slide_scan_info: + expected_slides.append(slide['source_file_name']) + slide_name_map[slide['source_file_name']] = slide['new_file_name'] + for file in file_list: + if file.name not in expected_slides: + error_msg = "Error: Filenames in directory do not match slide_scan_curation info" continue - - if directory_info.file_count == 0 and directory_info.subdir_count == 1: - contents = "".join(directory_info.dir_contents) - top_level_subdir = package_id + "/" + contents - file_list = self.dlu_file_handler.match_files(top_level_subdir) - else: - file_list = self.dlu_file_handler.match_files(package_id) - - self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details)) - self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user'])) - file_info = self.dlu_management.insert_dlu_files(package_id, file_list) - self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" }) - self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" }) - self.dlu_mongo.update_package_files(package_id, file_info) - - self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) - self.dlu_state.clear_cache() + if error_msg != "": + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) + return False + + copied_files = self.dlu_file_handler.rename_and_move_files(file_list, slide_name_map, package_id) + if len(copied_files) == 0: + return False + + self.dlu_file_handler.chown_dir(package_id, copied_files, int(os.environ['dlu_user'])) + file_info = self.dlu_management.insert_dlu_files(package_id=package_id, file_list=copied_files) + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": "success"}) + self.dlu_management.update_dlu_package(package_id, {"ready_to_move_from_globus": "done"}) + self.dlu_mongo.update_package_files(package_id, file_info) + + self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) + self.dlu_state.clear_cache() + + return True + + def log_err_message_slide_rename(self, error_msg, package_id): + logger.error(error_msg + " for package: " + package_id) + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) + + def is_directory_valid(self, directory_info, package_id): + logger.info("checking if directory is valid") + if directory_info.file_count == 0 and directory_info.subdir_count == 0: + error_msg = "Error: package " + package_id + " has no files or top level subdirectory" + logger.info(error_msg + " Skipping.") + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) + return False + return True if __name__ == "__main__": dlu_watcher = DLUWatcher() - dlu_watcher.pickup_waiting_files() - while True: - dlu_watcher.watch_for_files() + dlu_watcher.pickup_waiting_packages() + while True: + dlu_watcher.watch_for_packages() + dlu_watcher.watch_for_side_manifest_records() + dlu_watcher.fill_in_null_package_ids() time.sleep(60) - dlu_watcher.watch_for_side_manifest_records() \ No newline at end of file