From 71bb92f4a98a1007ffdb778cc3039ae97fea9192 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 14 Apr 2026 08:42:05 +0200 Subject: [PATCH 1/5] Replace lockfile with upload last_ping db column This is a refactoring that replaces the filesystem-based Toucher/lockfile mechanism with a database-based heartbeat approach for tracking active uploads. This eliminates the NFS-specific hack (os.access() + os.utime()) that was needed to bust NFS attribute caches in the old approach. The new approach is cleaner and more cloud-native. --- server/mergin/sync/models.py | 77 ++++++++++++++++--- server/mergin/sync/public_api.yaml | 2 +- server/mergin/sync/public_api_controller.py | 10 +-- .../mergin/sync/public_api_v2_controller.py | 3 +- server/mergin/sync/utils.py | 47 ----------- .../mergin/tests/test_project_controller.py | 9 ++- server/mergin/tests/test_public_api_v2.py | 1 - .../e3a7f2b1c94d_add_upload_last_ping.py | 29 +++++++ 8 files changed, 105 insertions(+), 73 deletions(-) create mode 100644 server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index f1c6cbfd..4d9d42f1 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2,12 +2,14 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from __future__ import annotations +from contextlib import contextmanager import json import logging import os +import threading import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict @@ -21,7 +23,7 @@ from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError -from flask import current_app +from flask import Flask, current_app from .files import ( DeltaChangeMerged, @@ -44,7 +46,6 @@ LOG_BASE, Checkpoint, generate_checksum, - Toucher, get_chunk_location, get_project_path, is_supported_type, @@ -1805,6 +1806,8 @@ class Upload(db.Model): db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) + # last ping time to determine if upload is still active + last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) user = db.relationship("User") project = db.relationship( @@ -1827,17 +1830,67 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): def upload_dir(self): return os.path.join(self.project.storage.project_dir, "tmp", self.id) - @property - def lockfile(self): - return os.path.join(self.upload_dir, "lockfile") - def is_active(self): - """Check if upload is still active because there was a ping (lockfile update) from underlying process""" - return os.path.exists(self.lockfile) and ( - time.time() - os.path.getmtime(self.lockfile) - < current_app.config["LOCKFILE_EXPIRATION"] + """Check if upload is still active because there was a ping from underlying process""" + return datetime.now(tz=timezone.utc) < self.last_ping.replace( + tzinfo=timezone.utc + ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) + + def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): + """ + Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. + Uses a fresh engine connection to stay pool-efficient. + """ + # manual context push is required for background execution + with app.app_context(): + while not stop_event.is_set(): + try: + # db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool + with db.engine.begin() as conn: + conn.execute( + db.text( + "UPDATE upload SET last_ping = NOW() WHERE id = :id" + ), + {"id": self.id}, + ) + except Exception as e: + logging.exception( + f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}" + ) + + # wait for x seconds, but wake up immediately if stop_event is set + stop_event.wait(timeout) + + @contextmanager + def heartbeat(self, timeout: int = 5): + """ + Context manager to be used inside a Flask route. + + Example of usage: + ----------------- + with upload.heartbeat(interval): + do_something_slow + """ + # we need to pass a real Flask app object to the thread + app = current_app._get_current_object() + stop_event = threading.Event() + + bg = threading.Thread( + target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True ) + bg.start() + try: + yield + finally: + # signal the loop to stop + stop_event.set() + + # wait for the task to finish its last SQL call. + # in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s + # this is to protect main thread / greenlet from zombie bg processes + bg.join(timeout=2) + def clear(self): """Clean up pending upload. Uploaded files and table records are removed, and another upload can start. @@ -1864,7 +1917,7 @@ def process_chunks( to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] current_files = [f for f in self.project.files if f.path not in to_remove] - with Toucher(self.lockfile, 5): + with self.heartbeat(5): for f in file_changes: if f.change == PushChangeType.DELETE: continue diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 5227b562..157e8262 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -699,7 +699,7 @@ paths: - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory" + - remove artifacts (chunks) by moving them to tmp directory" operationId: push_finish parameters: - name: transaction_id diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8e2b0ea8..e27527cd 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -75,7 +75,6 @@ ) from .utils import ( generate_checksum, - Toucher, get_ip, get_user_agent, generate_location, @@ -849,11 +848,10 @@ def project_push(namespace, project_name): logging.error(f"Failed to create upload session: {str(err)}") abort(422, "Failed to create upload session. Please try later.") - # Create transaction folder and lockfile + # Create transaction folder os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit + # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 file_changes = files_changes_from_upload( @@ -920,7 +918,7 @@ def chunk_upload(transaction_id, chunk_id): abort(404) dest = os.path.join(upload_dir, "chunks", chunk_id) - with Toucher(upload.lockfile, 30): + with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) @@ -945,7 +943,7 @@ def push_finish(transaction_id): - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory + - remove artifacts (chunks) by moving them to tmp directory :param transaction_id: Transaction id. :type transaction_id: str diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 9a82a211..2fc7a855 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -327,9 +327,8 @@ def create_project_version(id): logging.error(f"Failed to create upload session: {str(err)}") return AnotherUploadRunning().response(409) - # Create transaction folder and lockfile + # Create transaction folder os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 9e89eb7c..48966457 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -57,53 +57,6 @@ def generate_checksum(file, chunk_size=4096): checksum.update(chunk) -class Toucher: - """ - Helper class to periodically update modification time of file during - execution of longer lasting task. - - Example of usage: - ----------------- - with Toucher(file, interval): - do_something_slow - - """ - - def __init__(self, lockfile, interval): - self.lockfile = lockfile - self.interval = interval - self.running = False - self.timer = None - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, tb): # pylint: disable=W0612,W0622 - self.release() - - def release(self): - self.running = False - if self.timer: - self.timer.cancel() - self.timer = None - - def acquire(self): - self.running = True - self.touch_lockfile() - - def touch_lockfile(self): - # do an NFS ACCESS procedure request to clear the attribute cache (for various pods to actually see the file) - # https://docs.aws.amazon.com/efs/latest/ug/troubleshooting-efs-general.html#custom-nfs-settings-write-delays - os.access(self.lockfile, os.W_OK) - with open(self.lockfile, "a"): - os.utime(self.lockfile, None) - - sleep(0) # to unblock greenlet - if self.running: - self.timer = Timer(self.interval, self.touch_lockfile) - self.timer.start() - - def is_qgis(path: str) -> bool: """ Check if file is a QGIS project file. diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 25e0e055..e5f61c18 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1294,7 +1294,6 @@ def create_transaction(username, changes, version=1): db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) os.makedirs(upload_dir) - open(os.path.join(upload_dir, "lockfile"), "w").close() return upload, upload_dir @@ -1320,6 +1319,7 @@ def test_chunk_upload(client, app): resp = client.post(url, data=data, headers=headers) assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() + assert os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests to send bigger chunk than allowed app.config["MAX_CHUNK_SIZE"] = 10 * CHUNK_SIZE @@ -1332,6 +1332,8 @@ def test_chunk_upload(client, app): failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() assert failure.error_type == "chunk_upload" assert failure.error_details == "Too big chunk" + # residual after upload was removed + assert not os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests with transaction with no uploads expected changes = _get_changes(test_project_dir) @@ -1342,9 +1344,8 @@ def test_chunk_upload(client, app): resp2 = client.post(url, data=data, headers=headers) assert resp2.status_code == 404 assert SyncFailuresHistory.query.count() == 1 - - # cleanup - shutil.rmtree(upload_dir) + # we do not have any chunks, so parent dir was removed as well + assert not os.path.exists(os.path.join(upload_dir)) def upload_chunks(upload_dir, changes, src_dir=test_project_dir): diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 7a87b1d0..f3c91539 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1035,7 +1035,6 @@ def test_create_version_failures(client): db.session.add(upload) db.session.commit() os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py new file mode 100644 index 00000000..dd727e5a --- /dev/null +++ b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py @@ -0,0 +1,29 @@ +"""Add last_ping to upload + +Revision ID: e3a7f2b1c94d +Revises: 4b4648483770 +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "e3a7f2b1c94d" +down_revision = "4b4648483770" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL") + op.alter_column("upload", "last_ping", nullable=False) + + +def downgrade(): + # drop the column but required lockfiles will be missing - make sure all uploads are gone + op.drop_column("upload", "last_ping") From dde191f037e5b504b378c34cdd86ee921d83bb09 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 14 Apr 2026 13:11:20 +0200 Subject: [PATCH 2/5] Rework concurrent upload using upsert strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the old try/except IntegrityError + cleanup loop pattern with an atomic upsert in Upload.create_upload(). Decouple the upload directory name from the DB primary key via transaction_id. Upload directory is created at this stage, no need to take care for it later. With the upsert logic, the ObjectDeletedError scenario which happened because a concurrent request could delete a stale upload row mid-operation is eliminated: - During push_finish, the heartbeat context manager continuously updates last_ping, keeping the upload fresh throughout the operation - A concurrent request can only take over an upload whose last_ping has expired - Since heartbeat prevents expiry, no other request can claim the row while push_finish is running - The upload object therefore stays valid for the full lifetime of the request — ObjectDeletedError becomes impossible --- server/mergin/sync/models.py | 95 +++++++++++++- server/mergin/sync/permissions.py | 7 +- server/mergin/sync/public_api_controller.py | 119 +++++++---------- .../mergin/sync/public_api_v2_controller.py | 118 +++++++---------- server/mergin/sync/schemas.py | 4 +- server/mergin/tests/test_db_hooks.py | 3 +- .../mergin/tests/test_project_controller.py | 122 ++++++++++++------ server/mergin/tests/test_public_api_v2.py | 56 +------- .../f1d9e4a7b823_add_upload_transaction_id.py | 35 +++++ 9 files changed, 308 insertions(+), 251 deletions(-) create mode 100644 server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 4d9d42f1..08ff6217 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -19,7 +19,7 @@ from flask_login import current_user from pygeodiff import GeoDiff from sqlalchemy import text, null, desc, nullslast, tuple_ -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM +from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError @@ -1808,6 +1808,7 @@ class Upload(db.Model): created = db.Column(db.DateTime, default=datetime.utcnow) # last ping time to determine if upload is still active last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) user = db.relationship("User") project = db.relationship( @@ -1825,10 +1826,98 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.version = version self.changes = ChangesSchema().dump(changes) self.user_id = user_id + self.transaction_id = str(uuid.uuid4()) + + @classmethod + def create_upload( + cls, project_id: str, version: int, changes: dict, user_id: int + ) -> Upload | None: + """Create upload session, it can either create a new record or handover an existing one but with new transaction id + Old transaction folder is removed and new one is created. + """ + now = datetime.now(timezone.utc) + expiration = current_app.config["LOCKFILE_EXPIRATION"] + new_tx_id = str(uuid.uuid4()) + + # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) + # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload + existing_cte = ( + db.select(Upload.transaction_id) + .where( + Upload.project_id == project_id, + Upload.version == version, + ) + .cte("existing") + ) + + stmt = ( + insert(Upload) + .values( + id=str(uuid.uuid4()), + transaction_id=new_tx_id, + project_id=project_id, + version=version, + user_id=user_id, + last_ping=now, + changes=ChangesSchema().dump(changes), + ) + .add_cte(existing_cte) + ) + + upsert_stmt = stmt.on_conflict_do_update( + constraint="uq_upload_project_id", + set_={ + "transaction_id": new_tx_id, + "user_id": user_id, + "last_ping": now, + "changes": ChangesSchema().dump(changes), + }, + # ONLY update if the existing row is stale + where=(Upload.last_ping < (now - timedelta(seconds=expiration))), + ) + + upsert_stmt = upsert_stmt.returning( + Upload, + db.select(existing_cte.c.transaction_id) + .scalar_subquery() + .label("old_transaction_id"), + ) + + result = db.session.execute(upsert_stmt).fetchone() + db.session.commit() + + # if nothing returned, it means the WHERE clause failed (active upload) + if not result: + return + + upload = result.Upload + old_transaction_id = result.old_transaction_id + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id + ) + if os.path.exists( + os.path.join( + upload.project.storage.project_dir, "tmp", old_transaction_id + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, "tmp", old_transaction_id + ), + old_transaction_id, + ) + + return upload @property def upload_dir(self): - return os.path.join(self.project.storage.project_dir, "tmp", self.id) + return os.path.join( + self.project.storage.project_dir, "tmp", self.transaction_id + ) def is_active(self): """Check if upload is still active because there was a ping from underlying process""" @@ -1896,7 +1985,7 @@ def clear(self): Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.id) + move_to_tmp(self.upload_dir, self.transaction_id) db.session.delete(self) db.session.commit() except Exception: diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index e155020a..880c0e33 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -271,8 +271,8 @@ def check_project_permissions( return None -def get_upload(transaction_id): - upload = Upload.query.get_or_404(transaction_id) +def get_upload_or_fail(transaction_id: str) -> Upload: + upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: abort(404) @@ -280,8 +280,7 @@ def get_upload(transaction_id): if upload.user_id != current_user.id: abort(403, "You do not have permissions for ongoing upload") - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id) - return upload, upload_dir + return upload def projects_query(permission, as_admin=True, public=True): diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index e27527cd..894101b3 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -25,7 +25,7 @@ from pygeodiff import GeoDiffLibError from flask_login import current_user from sqlalchemy import and_, desc, asc -from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from gevent import sleep import base64 from werkzeug.exceptions import HTTPException, Conflict @@ -70,7 +70,7 @@ require_project, projects_query, ProjectPermissions, - get_upload, + get_upload_or_fail, require_project_by_uuid, ) from .utils import ( @@ -774,13 +774,6 @@ def project_push(namespace, project_name): if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") - # reject upload early if there is another one already running - pending_upload = Upload.query.filter_by( - project_id=project.id, version=version - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -812,44 +805,20 @@ def project_push(namespace, project_name): if requested_storage > ws.storage: return StorageLimitHit(current_usage, ws.storage).response(422) - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) try: - # Creating upload transaction with different project's version is possible. - db.session.commit() + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: + abort(400, "Another process is running. Please try later.") + logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.transaction_id} created for project: {project.id}, version: {version}" ) - except IntegrityError: + except (IntegrityError, SQLAlchemyError) as err: db.session.rollback() - # check and clean dangling uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - abort(400, "Another process is running. Please try later.") - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - # Try again after cleanup - db.session.add(upload) - try: - db.session.commit() - logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" - ) - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") - abort(422, "Failed to create upload session. Please try later.") - - # Create transaction folder - os.makedirs(upload.upload_dir) + logging.exception(f"Failed to create upload: {str(err)}") + abort(422, "Failed to create upload session. Please try later.") # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): @@ -874,7 +843,7 @@ def project_push(namespace, project_name): db.session.commit() logging.info( f"A project version {ProjectVersion.to_v_name(next_version)} for project: {project.id} created. " - f"Transaction id: {upload.id}. No upload." + f"Transaction id: {upload.transaction_id}. No upload." ) project_version_created.send(pv) push_finished.send(pv) @@ -882,7 +851,7 @@ def project_push(namespace, project_name): except IntegrityError as err: db.session.rollback() logging.exception( - f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}" + f"Failed to upload a new project version using transaction id: {upload.transaction_id}: {str(err)}" ) abort(422, "Failed to upload a new project version. Please try later.") except gevent.timeout.Timeout: @@ -891,7 +860,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id}, 200 + return {"transaction": upload.transaction_id}, 200 @auth_required @@ -908,7 +877,7 @@ def chunk_upload(transaction_id, chunk_id): :rtype: Dict """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project chunks = [] for file in upload.changes["added"] + upload.changes["updated"]: @@ -917,7 +886,7 @@ def chunk_upload(transaction_id, chunk_id): if chunk_id not in chunks: abort(404) - dest = os.path.join(upload_dir, "chunks", chunk_id) + dest = os.path.join(upload.upload_dir, "chunks", chunk_id) with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue @@ -950,7 +919,7 @@ def push_finish(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project project = upload.project next_version = project.next_version() @@ -989,7 +958,7 @@ def push_finish(transaction_id): abort(422, f"Failed to create new version: {msg}") - files_dir = os.path.join(upload_dir, "files", v_next_version) + files_dir = os.path.join(upload.upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): pv = ProjectVersion.query.filter_by( @@ -1007,29 +976,31 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - user_agent = get_user_agent(request) - device_id = get_device_id(request) - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - user_agent, - device_id, - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + user_agent = get_user_agent(request) + device_id = get_device_id(request) + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + user_agent, + device_id, + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # let's move uploaded files where they are expected to be + os.renames(files_dir, version_dir) - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." - ) - project_version_created.send(pv) - push_finished.send(pv) + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." + ) + project_version_created.send(pv) + push_finished.send(pv) except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: db.session.rollback() logging.exception( @@ -1059,10 +1030,8 @@ def push_cancel(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) - db.session.delete(upload) - db.session.commit() - move_to_tmp(upload_dir) + upload = get_upload_or_fail(transaction_id) + upload.clear() return NoContent, 200 diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 2fc7a855..64d8ce84 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -15,8 +15,7 @@ from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import ObjectDeletedError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from .schemas_v2 import BatchErrorSchema, ProjectSchema as ProjectSchemaV2 from ..app import db @@ -296,87 +295,65 @@ def create_project_version(id): return NoContent, 204 try: - # while processing data, block other uploads - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - # Creating blocking upload can fail, e.g. in case of racing condition - db.session.commit() - except IntegrityError: - db.session.rollback() - # check and clean dangling blocking uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - return AnotherUploadRunning().response(409) - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - try: - # Try again after cleanup - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - db.session.commit() - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: return AnotherUploadRunning().response(409) + except (IntegrityError, SQLAlchemyError) as err: + db.session.rollback() + logging.exception(f"Failed to create upload: {str(err)}") + return UploadError().response(422) - # Create transaction folder - os.makedirs(upload.upload_dir) - + # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() return DataSyncError(failed_files=errors).response(422) - upload_deleted = False try: - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - get_user_agent(request), - get_device_id(request), - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() - - # let's move uploaded files where they are expected to be - if to_be_added_files or to_be_updated_files: - temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) - os.renames(temp_files_dir, version_dir) - - # remove used chunks - # get chunks from added and updated files - chunks_ids = [] - for file in to_be_added_files + to_be_updated_files: - file_chunks = file.get("chunks", []) - chunks_ids.extend(file_chunks) - remove_transaction_chunks.delay(chunks_ids) - - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}." - ) - project_version_created.send(pv) - push_finished.send(pv) + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() + + # let's move uploaded files where they are expected to be + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join( + upload.upload_dir, "files", v_next_version + ) + os.renames(temp_files_dir, version_dir) + + # remove used chunks + # get chunks from added and updated files + chunks_ids = [] + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}." + ) + project_version_created.send(pv) + push_finished.send(pv) except ( psycopg2.Error, FileNotFoundError, IntegrityError, - ObjectDeletedError, ) as err: db.session.rollback() - upload_deleted = isinstance(err, ObjectDeletedError) logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}: {str(err)}" ) @@ -400,9 +377,8 @@ def create_project_version(id): move_to_tmp(version_dir) raise finally: - # remove artifacts only if upload object is still valid - if not upload_deleted: - upload.clear() + # remove upload artifacts + upload.clear() result = ProjectSchemaV2().dump(project) result["files"] = ProjectFileSchema( diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 4eecca0c..da18f7db 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -132,7 +132,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.project.uploads.all()] + return [u.transaction_id for u in obj.project.uploads.all()] def _permissions(self, obj): return project_user_permissions(obj.project) @@ -180,7 +180,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.uploads.all()] + return [u.transaction_id for u in obj.uploads.all()] class Meta: model = Project diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 044294c5..27aadb5b 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -114,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - upload = Upload(diff_project, 10, [], mergin_user.id) - db.session.add(upload) + upload = Upload.create_upload(diff_project.id, 10, [], mergin_user.id) project_id = diff_project.id user = add_user("user", "user") access_request = AccessRequest(diff_project, user.id) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index e5f61c18..71cd0fe2 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -404,7 +404,7 @@ def test_add_project(client, app, data, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # add TEMPLATES user and make him creator of test_project (to become template) @@ -508,7 +508,7 @@ def test_delete_project(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # try force delete for active project @@ -1120,7 +1120,7 @@ def test_push_to_new_project(client): assert resp.status_code == 200 upload_id = resp.json["transaction"] - upload = Upload.query.filter_by(id=upload_id).first() + upload = Upload.query.filter_by(transaction_id=upload_id).first() blacklisted_file = all( added["path"] != "test_dir/test4.txt" for added in upload.changes["added"] ) @@ -1211,6 +1211,52 @@ def test_push_integrity_error(client, app): assert failure.error_details == "No changes" +def test_stale_upload_takeover(client, app): + """Stale upload (last_ping expired) is atomically replaced by a new one. + + Verifies that: + - the new upload gets a fresh transaction_id + - the old upload directory is cleaned up + - a push_lost failure is recorded for the abandoned upload + """ + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + user = User.query.filter_by(username="mergin").first() + changes = _get_changes(test_project_dir) + changes["added"] = changes["removed"] = [] + + # create initial upload and record its identity + upload = Upload.create_upload(project.id, 1, changes, user.id) + old_tx_id = upload.transaction_id + old_upload_dir = upload.upload_dir + assert os.path.exists(old_upload_dir) + + # backdate last_ping to make the upload appear stale + db.session.execute( + db.text( + "UPDATE upload SET last_ping = NOW() - :expiry * INTERVAL '1 second' WHERE id = :id" + ), + { + "id": upload.id, + "expiry": client.application.config["LOCKFILE_EXPIRATION"] + 1, + }, + ) + db.session.commit() + + # takeover — should succeed and replace the stale upload + new_upload = Upload.create_upload(project.id, 1, changes, user.id) + assert new_upload is not None + assert new_upload.transaction_id != old_tx_id + assert os.path.exists(new_upload.upload_dir) + # old directory was moved away + assert not os.path.exists(old_upload_dir) + # push_lost was recorded for the abandoned upload + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + assert failure.error_type == "push_lost" + assert failure.error_details == "Push artefact removed by subsequent push" + + def test_exceed_data_limit(client): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1289,12 +1335,8 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload = Upload(project, version, changes, user.id) - db.session.add(upload) - db.session.commit() - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) - os.makedirs(upload_dir) - return upload, upload_dir + upload = Upload.create_upload(project.id, version, changes, user.id) + return upload, upload.upload_dir def remove_transaction(transaction_id): @@ -1310,7 +1352,7 @@ def test_chunk_upload(client, app): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) chunk_id = upload.changes["added"][0]["chunks"][0] - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_project_dir, "test_dir", "test4.txt"), "rb") as file: data = file.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1367,7 +1409,9 @@ def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1388,7 +1432,7 @@ def test_push_finish(client): chunks.append(chunk) resp2 = client.post( - f"/v1/project/push/finish/{upload.id}", + f"/v1/project/push/finish/{upload.transaction_id}", headers={**json_headers, "User-Agent": "Werkzeug"}, ) assert resp2.status_code == 200 @@ -1415,7 +1459,7 @@ def test_push_finish(client): db.session.commit() upload, upload_dir = create_transaction(user.username, changes) - url = "/v1/project/push/finish/{}".format(upload.id) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) db.session.add(upload) db.session.commit() # still log in as mergin user @@ -1429,7 +1473,7 @@ def test_push_finish(client): def test_push_close(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/cancel/{}".format(upload.id) + url = "/v1/project/push/cancel/{}".format(upload.transaction_id) resp = client.post(url) assert resp.status_code == 200 @@ -1472,12 +1516,12 @@ def test_whole_push_process(client): assert resp.status_code == 200 assert "transaction" in resp.json.keys() - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.id in resp.json["uploads"] + assert upload.transaction_id in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" @@ -1488,7 +1532,7 @@ def test_whole_push_process(client): # push upload: upload file chunks for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_dir, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1500,7 +1544,7 @@ def test_whole_push_process(client): assert resp.json["checksum"] == checksum.hexdigest() # push finish: call server to concatenate chunks and finish upload - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1528,7 +1572,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check there are not any changes between local modified file and server patched file (using geodiff) geodiff = GeoDiff() @@ -1552,7 +1596,7 @@ def test_push_diff_finish(client): upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert ( "GEODIFF ERROR: Nothing inserted (this should never happen)" @@ -1561,10 +1605,10 @@ def test_push_diff_finish(client): error = resp.json["detail"] # try again to make sure geodiff logs are related only to recent event - client.post("/v1/project/push/cancel/{}".format(upload.id)) + client.post("/v1/project/push/cancel/{}".format(upload.transaction_id)) upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert resp.json["detail"] == error @@ -1572,7 +1616,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff_0_size(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, 3) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 @@ -1598,7 +1642,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check diff file was generated by server, and it is in file history latest_version = upload.project.get_latest_version() @@ -1638,7 +1682,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() assert all( @@ -1710,7 +1754,7 @@ def test_clone_project(client, data, username, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 endpoint = "/v1/project/clone/{}/{}".format(test_workspace_name, test_project) @@ -1852,7 +1896,7 @@ def test_optimize_storage(app, client, diff_project): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 assert os.path.exists(optimize_v4) @@ -2214,16 +2258,16 @@ def test_inactive_project(client, diff_project): upload, upload_dir = create_transaction("mergin", _get_changes(test_project_dir)) chunk_id = upload.changes["added"][0]["chunks"][0] resp = client.post( - f"/v1/project/push/chunk/{upload.id}/{chunk_id}", + f"/v1/project/push/chunk/{upload.transaction_id}/{chunk_id}", data=data, headers={"Content-Type": "application/octet-stream"}, ) assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/cancel/{upload.id}") + resp = client.post(f"/v1/project/push/cancel/{upload.transaction_id}") assert resp.status_code == 404 # delete project again @@ -2324,7 +2368,7 @@ def test_project_version_integrity(client): "__init__", side_effect=IntegrityError("Project version already exists", None, None), ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert "Failed to create new version" in resp.json["detail"] failure = SyncFailuresHistory.query.filter_by( @@ -2383,7 +2427,7 @@ def _get_user_agent(): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 @@ -2419,12 +2463,12 @@ def test_delete_diff_file(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") fh = FileHistory.query.filter_by( project_version_name=upload.project.latest_version, @@ -2570,12 +2614,12 @@ def test_supported_file_upload(client): headers=json_headers, ) assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # Even chunks are correctly uploaded for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(TMP_DIR, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -2586,7 +2630,7 @@ def test_supported_file_upload(client): assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() # Unsupported file type is revealed when reconstructed from chunks - based on the mime type - and upload is refused - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 400 assert ( resp.json["detail"] @@ -2619,8 +2663,8 @@ def test_locked_project(client, diff_project): assert resp.headers["Content-Type"] == "application/problem+json" assert resp.json["code"] == "ProjectLocked" # to play safe push finish is also blocked - upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) + upload, _ = create_transaction("mergin", changes) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) resp = client.post(url, headers=json_headers) assert resp.status_code == 422 diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index f3c91539..56caa7ff 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1031,11 +1031,7 @@ def test_create_version_failures(client): data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} # somebody else is syncing - upload = Upload(project, 1, _get_changes(test_project_dir), 1) - db.session.add(upload) - db.session.commit() - os.makedirs(upload.upload_dir) - + upload = Upload.create_upload(project.id, 1, _get_changes(test_project_dir), 1) response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 assert response.json["code"] == AnotherUploadRunning.code @@ -1072,16 +1068,6 @@ def test_create_version_failures(client): assert response.status_code == 422 assert response.json["code"] == UploadError.code - # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions - with patch.object( - Upload, - "__init__", - side_effect=IntegrityError("Cannot insert upload", None, None), - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - assert response.status_code == 409 - assert response.json["code"] == AnotherUploadRunning.code - # try to finish the transaction which would fail on unexpected integrity error # patch of ChangesSchema is just a workaround to trigger and error with patch.object( @@ -1093,46 +1079,6 @@ def test_create_version_failures(client): assert response.status_code == 409 -def test_create_version_object_deleted_error(client): - """Test that ObjectDeletedError during push returns 422 without secondary exception""" - project = Project.query.filter_by( - workspace_id=test_workspace_id, name=test_project - ).first() - - data = { - "version": "v1", - "changes": { - "added": [], - "removed": [ - file_info(test_project_dir, "base.gpkg"), - ], - "updated": [], - }, - } - - # Create a real ObjectDeletedError by using internal SQLAlchemy state - def raise_object_deleted(*args, **kwargs): - # Create a minimal state-like object that ObjectDeletedError can use - class FakeState: - class_ = Upload - - def obj(self): - return None - - raise ObjectDeletedError(FakeState()) - - with patch.object( - ProjectVersion, - "__init__", - side_effect=raise_object_deleted, - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - - # Should return 422 UploadError, not 500 from secondary exception - assert response.status_code == 422 - assert response.json["code"] == UploadError.code - - def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py new file mode 100644 index 00000000..5e799a86 --- /dev/null +++ b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py @@ -0,0 +1,35 @@ +"""Add transaction_id to upload + +Revision ID: f1d9e4a7b823 +Revises: e3a7f2b1c94d +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "f1d9e4a7b823" +down_revision = "e3a7f2b1c94d" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("transaction_id", sa.String(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint + op.execute( + "UPDATE upload SET transaction_id = id::text WHERE transaction_id IS NULL" + ) + op.alter_column("upload", "transaction_id", nullable=False) + op.create_index( + op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True + ) + + +def downgrade(): + op.drop_index(op.f("ix_upload_transaction_id"), table_name="upload") + # column is dropped but there could be orphan transaction folders, make sure upload table is empty + op.drop_column("upload", "transaction_id") From b5378642b88719ffa60d48b5b2ee595ca4fb9729 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Thu, 16 Apr 2026 13:27:55 +0200 Subject: [PATCH 3/5] Update revision branch hash --- .../migrations/community/e3a7f2b1c94d_add_upload_last_ping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py index dd727e5a..d53d4440 100644 --- a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py +++ b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py @@ -1,7 +1,7 @@ """Add last_ping to upload Revision ID: e3a7f2b1c94d -Revises: 4b4648483770 +Revises: e3f1a9b2c4d6 Create Date: 2026-04-14 00:00:00.000000 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "e3a7f2b1c94d" -down_revision = "4b4648483770" +down_revision = "e3f1a9b2c4d6" branch_labels = None depends_on = None From 257e16d7bf8c26e6adedb3bad21c02be10eb0556 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 17 Apr 2026 14:04:45 +0200 Subject: [PATCH 4/5] Concurrent upload fixes - guard against transaction folder errors - make last_ping tz naive - set transaction id to uuid type - remove redundant is_active method --- server/mergin/sync/models.py | 61 +++++++++++-------- server/mergin/sync/permissions.py | 2 + .../mergin/sync/public_api_v2_controller.py | 8 +-- .../mergin/tests/test_project_controller.py | 2 +- .../f1d9e4a7b823_add_upload_transaction_id.py | 9 +-- 5 files changed, 46 insertions(+), 36 deletions(-) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 08ff6217..5f4aa967 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -1808,7 +1808,9 @@ class Upload(db.Model): created = db.Column(db.DateTime, default=datetime.utcnow) # last ping time to determine if upload is still active last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) + transaction_id = db.Column( + UUID(as_uuid=True), unique=True, nullable=False, index=True + ) user = db.relationship("User") project = db.relationship( @@ -1835,7 +1837,7 @@ def create_upload( """Create upload session, it can either create a new record or handover an existing one but with new transaction id Old transaction folder is removed and new one is created. """ - now = datetime.now(timezone.utc) + now = datetime.now(timezone.utc).replace(tzinfo=None) expiration = current_app.config["LOCKFILE_EXPIRATION"] new_tx_id = str(uuid.uuid4()) @@ -1892,42 +1894,49 @@ def create_upload( upload = result.Upload old_transaction_id = result.old_transaction_id - os.makedirs(upload.upload_dir) - # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload - if old_transaction_id: - upload.project.sync_failed( - "", "push_lost", "Push artefact removed by subsequent push", user_id - ) - if os.path.exists( - os.path.join( - upload.project.storage.project_dir, "tmp", old_transaction_id + try: + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id ) - ): - move_to_tmp( + if os.path.exists( os.path.join( - upload.project.storage.project_dir, "tmp", old_transaction_id - ), - old_transaction_id, - ) + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ), + str(old_transaction_id), + ) + except OSError as err: + # filesystem setup failed after the DB row was already committed. + # delete the row immediately so the next attempt isn't blocked until expiration. + db.session.delete(upload) + db.session.commit() + logging.error(f"Failed to create upload directory: {err}") + return return upload @property def upload_dir(self): return os.path.join( - self.project.storage.project_dir, "tmp", self.transaction_id + self.project.storage.project_dir, "tmp", str(self.transaction_id) ) - def is_active(self): - """Check if upload is still active because there was a ping from underlying process""" - return datetime.now(tz=timezone.utc) < self.last_ping.replace( - tzinfo=timezone.utc - ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) - def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): """ - Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. + Background task: Runs as a Thread, it is compatible with Sync (direct) or Gevent (monkey-patch) worker type. Uses a fresh engine connection to stay pool-efficient. """ # manual context push is required for background execution @@ -1985,7 +1994,7 @@ def clear(self): Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.transaction_id) + move_to_tmp(self.upload_dir, str(self.transaction_id)) db.session.delete(self) db.session.commit() except Exception: diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index 880c0e33..4fe3ad54 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -272,6 +272,8 @@ def check_project_permissions( def get_upload_or_fail(transaction_id: str) -> Upload: + if not is_valid_uuid(transaction_id): + abort(404) upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 64d8ce84..0d34c9c3 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -240,11 +240,6 @@ def create_project_version(id): if pv and pv.name != version: return ProjectVersionExists(version, pv.name).response(409) - # reject push if there is another one already running - pending_upload = Upload.query.filter_by(project_id=project.id).first() - if pending_upload and pending_upload.is_active(): - return AnotherUploadRunning().response(409) - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -304,6 +299,9 @@ def create_project_version(id): db.session.rollback() logging.exception(f"Failed to create upload: {str(err)}") return UploadError().response(422) + except OSError as err: + logging.exception(f"Failed to create upload directory: {str(err)}") + return UploadError().response(422) # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 7b14fbf3..07d8fe77 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1520,7 +1520,7 @@ def test_whole_push_process(client): # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.transaction_id in resp.json["uploads"] + assert str(upload.transaction_id) in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py index 5e799a86..d3797b8b 100644 --- a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py +++ b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py @@ -8,6 +8,7 @@ from alembic import op import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID # revision identifiers, used by Alembic. @@ -18,11 +19,11 @@ def upgrade(): - op.add_column("upload", sa.Column("transaction_id", sa.String(), nullable=True)) - # backfill existing rows before adding NOT NULL constraint - op.execute( - "UPDATE upload SET transaction_id = id::text WHERE transaction_id IS NULL" + op.add_column( + "upload", sa.Column("transaction_id", UUID(as_uuid=True), nullable=True) ) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET transaction_id = id WHERE transaction_id IS NULL") op.alter_column("upload", "transaction_id", nullable=False) op.create_index( op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True From 683eaf71e58ff240ef6ac8af1c791991390cdf36 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 17 Apr 2026 14:33:56 +0200 Subject: [PATCH 5/5] Fix: Swap file rename and DB project version commit to avoid orphaned versions in DB If os.rename failed for moving uploaded data we would end up in broken project with project version in DB but no actual data. --- server/mergin/sync/public_api_controller.py | 24 +++++++++++++++---- .../mergin/sync/public_api_v2_controller.py | 20 ++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 894101b3..8f142e71 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -991,26 +991,42 @@ def push_finish(transaction_id): ) db.session.add(pv) db.session.add(project) - db.session.commit() - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # move files before committing so a filesystem failure leaves the DB clean + if os.path.exists(files_dir): + os.renames(files_dir, version_dir) + + db.session.commit() logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + except (psycopg2.Error, OSError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) raise finally: # remove artifacts diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 0d34c9c3..ebd909ad 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -310,6 +310,15 @@ def create_project_version(id): upload.clear() return DataSyncError(failed_files=errors).response(422) + if os.path.exists(version_dir): + if ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count(): + return UploadError( + error=f"Version {v_next_version} already exists" + ).response(409) + move_to_tmp(version_dir) + try: # let's keep upload alive until all work is done so no one else can claim it with upload.heartbeat(5): @@ -324,17 +333,18 @@ def create_project_version(id): ) db.session.add(pv) db.session.add(project) - db.session.commit() - # let's move uploaded files where they are expected to be + # move files before committing so a filesystem failure leaves the DB clean if to_be_added_files or to_be_updated_files: temp_files_dir = os.path.join( upload.upload_dir, "files", v_next_version ) os.renames(temp_files_dir, version_dir) - # remove used chunks - # get chunks from added and updated files + db.session.commit() + + # remove used chunks only after commit — chunks belong to the now-committed version + if to_be_added_files or to_be_updated_files: chunks_ids = [] for file in to_be_added_files + to_be_updated_files: file_chunks = file.get("chunks", []) @@ -348,7 +358,7 @@ def create_project_version(id): push_finished.send(pv) except ( psycopg2.Error, - FileNotFoundError, + OSError, IntegrityError, ) as err: db.session.rollback()