From d38a19e03eb373d519bee8fcf7b890cd5f462455 Mon Sep 17 00:00:00 2001 From: whisper67265 Date: Thu, 2 Jul 2026 10:00:02 -0600 Subject: [PATCH 1/2] endpoint test depth: fuzz/adversarial ORM at trust boundary --- src/boost_weblate/endpoint/serializers.py | 76 +++- src/boost_weblate/endpoint/validators.py | 26 ++ tests/endpoint/test_serializers.py | 119 ++++++ tests/endpoint/test_views.py | 456 ++++++++++++++++++++++ 4 files changed, 675 insertions(+), 2 deletions(-) diff --git a/src/boost_weblate/endpoint/serializers.py b/src/boost_weblate/endpoint/serializers.py index 01ad283..46d9aa3 100644 --- a/src/boost_weblate/endpoint/serializers.py +++ b/src/boost_weblate/endpoint/serializers.py @@ -17,7 +17,12 @@ boost_validation_errors, to_error_dict, ) -from boost_weblate.endpoint.validators import validate_repo_segment +from boost_weblate.endpoint.validators import ( + MAX_ADD_OR_UPDATE_LANGS, + MAX_SUBMODULES_PER_LANG, + validate_language_code, + validate_repo_segment, +) class DrfValidationCode(StrEnum): @@ -190,15 +195,56 @@ def validate_organization(self, value: str) -> str: ) raise serializers.ValidationError(str(exc)) from exc + def validate_version(self, value: str) -> str: + """Reject version strings with unsafe characters or excessive length.""" + try: + return validate_repo_segment(value, field="version") + except ValidationError as exc: + self._custom_error_fields.add(RequestField.VERSION) + self._custom_validation_errors.extend( + boost_validation_errors( + [ + ( + BoostEndpointErrorCode.INVALID_CLONE_URL, + str(exc), + {"field": RequestField.VERSION}, + ) + ] + ) + ) + raise serializers.ValidationError(str(exc)) from exc + def validate_extensions(self, value: list[str] | None) -> list[str] | None: """Strip entries and remove blanks so all-empty input does not filter files.""" if value is None: return None - return [v.strip() for v in value if v.strip()] + cleaned: list[str] = [] + for entry in value: + if not isinstance(entry, str): + raise serializers.ValidationError( + "Each extension must be a string.", + code=DrfValidationCode.NOT_A_LIST, + ) + stripped = entry.strip() + if stripped: + cleaned.append(stripped) + return cleaned or None def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]: """Require non-empty string language keys and non-empty submodule lists.""" items: list[tuple[BoostEndpointErrorCode, str, dict[str, Any]]] = [] + if len(value) > MAX_ADD_OR_UPDATE_LANGS: + items.append( + ( + BoostEndpointErrorCode.INVALID_LANGUAGE_CODE, + ( + f"add_or_update: exceeds maximum of " + f"{MAX_ADD_OR_UPDATE_LANGS} language keys " + f"(got {len(value)})." + ), + {"field": RequestField.ADD_OR_UPDATE}, + ) + ) for lang_code, submodules in value.items(): if not isinstance(lang_code, str) or lang_code.strip() == "": items.append( @@ -215,6 +261,20 @@ def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]: ) ) continue + try: + validate_language_code(lang_code) + except ValidationError as exc: + items.append( + ( + BoostEndpointErrorCode.INVALID_LANGUAGE_CODE, + str(exc), + { + "field": RequestField.ADD_OR_UPDATE, + "language": lang_code, + }, + ) + ) + continue if not isinstance(submodules, list): items.append( ( @@ -238,6 +298,18 @@ def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]: {"field": RequestField.ADD_OR_UPDATE, "language": lang_code}, ) ) + elif len(submodules) > MAX_SUBMODULES_PER_LANG: + items.append( + ( + BoostEndpointErrorCode.INVALID_SUBMODULE_LIST, + ( + f"add_or_update: key {lang_code!r} exceeds maximum of " + f"{MAX_SUBMODULES_PER_LANG} submodules " + f"(got {len(submodules)})." + ), + {"field": RequestField.ADD_OR_UPDATE, "language": lang_code}, + ) + ) else: for submodule in submodules: if not isinstance(submodule, str): diff --git a/src/boost_weblate/endpoint/validators.py b/src/boost_weblate/endpoint/validators.py index aa456bc..9d35609 100644 --- a/src/boost_weblate/endpoint/validators.py +++ b/src/boost_weblate/endpoint/validators.py @@ -14,16 +14,29 @@ from django.conf import settings from django.core.exceptions import ValidationError +MAX_SEGMENT_LEN = 256 +MAX_ADD_OR_UPDATE_LANGS = 50 +MAX_SUBMODULES_PER_LANG = 100 + _REPO_SEGMENT_RE = re.compile(r"^[A-Za-z0-9._-]+$") +_LANGUAGE_CODE_RE = re.compile(r"^[a-zA-Z0-9_-]+$") # SCP-style SSH: git@host:path/to/repo.git _SCP_SSH_RE = re.compile(r"^git@([^:/]+):(.+)$") +def _check_segment_length(name: str, *, field: str) -> None: + if len(name) > MAX_SEGMENT_LEN: + raise ValidationError( + f"{field}: exceeds maximum length of {MAX_SEGMENT_LEN} characters" + ) + + def validate_repo_segment(name: str, *, field: str) -> str: """Restrict organization/submodule to safe GitHub path segments.""" if not name or not name.strip(): raise ValidationError(f"{field}: must be a non-empty string") + _check_segment_length(name, field=field) if not _REPO_SEGMENT_RE.fullmatch(name): raise ValidationError( f"{field}: invalid characters in {name!r}; " @@ -32,6 +45,19 @@ def validate_repo_segment(name: str, *, field: str) -> str: return name +def validate_language_code(code: str) -> str: + """Restrict language codes to safe Weblate-style identifiers.""" + if not code or not code.strip(): + raise ValidationError("language: must be a non-empty string") + _check_segment_length(code, field="language") + if not _LANGUAGE_CODE_RE.fullmatch(code): + raise ValidationError( + f"language: invalid characters in {code!r}; " + "allowed: letters, digits, '_', '-'" + ) + return code + + def _normalize_clone_url(url: str) -> str: """Convert SCP-style SSH URLs to HTTPS for validation; reject other forms.""" url = url.strip() diff --git a/tests/endpoint/test_serializers.py b/tests/endpoint/test_serializers.py index 22d8894..e12a97e 100644 --- a/tests/endpoint/test_serializers.py +++ b/tests/endpoint/test_serializers.py @@ -232,3 +232,122 @@ def test_invalid_organization_still_flattens_other_drf_errors() -> None: assert org_errors[0]["code"] == BoostEndpointErrorCode.INVALID_CLONE_URL.value assert len(version_errors) == 1 assert version_errors[0]["metadata"]["drf_code"] == "required" + + +def test_add_or_update_serializer_rejects_invalid_version() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "../evil", + "add_or_update": {"zh_Hans": ["json"]}, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in _error_codes( + ser.structured_errors + ) + version_errors = [ + e for e in ser.structured_errors if e["metadata"]["field"] == "version" + ] + assert len(version_errors) == 1 + + +def test_add_or_update_serializer_rejects_sql_injection_lang_code() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": {"'; DROP TABLE--": ["json"]}, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes( + ser.structured_errors + ) + + +def test_add_or_update_serializer_rejects_whitespace_lang_code() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": {" ": ["json"]}, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes( + ser.structured_errors + ) + + +def test_add_or_update_serializer_rejects_non_string_submodule() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": {"zh_Hans": [["json"]]}, + } + ) + assert not ser.is_valid() + assert any(e["metadata"]["field"] == "add_or_update" for e in ser.structured_errors) + + +def test_add_or_update_serializer_rejects_oversized_organization() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o" * 10_000, + "version": "v", + "add_or_update": {"zh_Hans": ["json"]}, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in _error_codes( + ser.structured_errors + ) + + +def test_add_or_update_serializer_rejects_too_many_languages() -> None: + from boost_weblate.endpoint.validators import MAX_ADD_OR_UPDATE_LANGS + + langs = {f"lang{i}": ["json"] for i in range(MAX_ADD_OR_UPDATE_LANGS + 1)} + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": langs, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in _error_codes( + ser.structured_errors + ) + + +def test_add_or_update_serializer_rejects_too_many_submodules() -> None: + from boost_weblate.endpoint.validators import MAX_SUBMODULES_PER_LANG + + submodules = [f"mod{i}" for i in range(MAX_SUBMODULES_PER_LANG + 1)] + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": {"zh_Hans": submodules}, + } + ) + assert not ser.is_valid() + assert BoostEndpointErrorCode.INVALID_SUBMODULE_LIST.value in _error_codes( + ser.structured_errors + ) + + +def test_add_or_update_serializer_rejects_extensions_dict() -> None: + ser = AddOrUpdateRequestSerializer( + data={ + "organization": "o", + "version": "v", + "add_or_update": {"zh_Hans": ["json"]}, + "extensions": {".md": True}, + } + ) + assert not ser.is_valid() + assert any(e["metadata"]["field"] == "extensions" for e in ser.structured_errors) diff --git a/tests/endpoint/test_views.py b/tests/endpoint/test_views.py index e3f6b59..e70dae9 100644 --- a/tests/endpoint/test_views.py +++ b/tests/endpoint/test_views.py @@ -529,3 +529,459 @@ def test_boost_endpoint_info_user_throttle_can_429( response = view(request) assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS assert "Retry-After" in response + + +# --------------------------------------------------------------------------- +# Adversarial / trust-boundary tests +# --------------------------------------------------------------------------- + +_SQL_INJECTION_PAYLOADS = ( + "'; DROP TABLE auth_user; --", + "1 OR 1=1", + "' UNION SELECT", +) + +_PATH_TRAVERSAL_PAYLOADS = ( + "../evil", + "org/../../etc", +) + +_CONTROL_BYTE_PAYLOADS = ( + "org\x00evil", + "bad\r\norg", +) + +_VALID_ADD_OR_UPDATE_BODY = { + "organization": "CppDigest", + "version": "boost-1.90.0", + "add_or_update": {"zh_Hans": ["json"]}, +} + + +class TestPluginPingAdversarial: + def test_post_returns_405(self) -> None: + request = RequestFactory().post("/plugin-ping/") + response = plugin_ping(request) + assert response.status_code == 405 + + def test_oversized_query_string_still_ok(self) -> None: + query = "x" * 8192 + request = RequestFactory().get("/plugin-ping/", data={"q": query}) + response = plugin_ping(request) + assert response.status_code == 200 + assert response.content == b"ok" + + def test_sql_injection_query_param_still_ok(self) -> None: + request = RequestFactory().get("/plugin-ping/", data={"x": "'; DROP TABLE--"}) + response = plugin_ping(request) + assert response.status_code == 200 + assert response.content == b"ok" + + +class TestBoostEndpointInfoAdversarial: + def test_post_returns_405(self) -> None: + factory = APIRequestFactory() + request = factory.post( + "/info/", + {"filter": "' OR 1=1--"}, + format="json", + ) + user = User(username="t_adv_info", pk=201) + force_authenticate(request, user=user) + response = BoostEndpointInfo.as_view()(request) + assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED + + def test_anonymous_malformed_auth_token_returns_401( + self, + weblate_anonymous_user_no_db: None, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + from rest_framework.authentication import TokenAuthentication + from rest_framework.exceptions import AuthenticationFailed + + def reject_invalid_token(_self, request): # noqa: ANN001 + auth = request.META.get("HTTP_AUTHORIZATION", "") + if auth.startswith("Token "): + raise AuthenticationFailed("Invalid token.") + return None + + monkeypatch.setattr(TokenAuthentication, "authenticate", reject_invalid_token) + + factory = APIRequestFactory() + request = factory.get( + "/info/", + HTTP_AUTHORIZATION="Token not-a-valid-token", + ) + response = BoostEndpointInfo.as_view()(request) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_authenticated_sql_injection_query_param_ignored(self) -> None: + factory = APIRequestFactory() + request = factory.get("/info/", data={"filter": "' OR 1=1--"}) + user = User(username="t_adv_info2", pk=202) + force_authenticate(request, user=user) + response = BoostEndpointInfo.as_view()(request) + assert response.status_code == status.HTTP_200_OK + assert response.data["module"] == "cppa-weblate-plugin" + assert "info" in response.data["capabilities"] + + +class TestAddOrUpdateAdversarial: + @pytest.fixture(autouse=True) + def _high_throttle_limits(self): + rest_framework = _throttle_rest_framework( + user="10000/hour", + info="10000/minute", + **{"add-or-update": "10000/hour"}, + ) + with _isolated_throttle_rates(rest_framework): + yield + + @staticmethod + def _authenticated_post(body, *, format="json", content_type=None, data=None): + factory = APIRequestFactory() + kwargs: dict = {} + if content_type is not None: + kwargs["content_type"] = content_type + if data is not None: + request = factory.post("/add-or-update/", data=data, **kwargs) + else: + request = factory.post("/add-or-update/", body, format=format, **kwargs) + user = User(username="t_adv_aou", pk=301) + force_authenticate(request, user=user) + return request, user + + def test_non_json_body_rejected_without_enqueue( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + request, _ = self._authenticated_post( + None, content_type="application/json", data="not-json" + ) + response = AddOrUpdateView.as_view()(request) + assert response.status_code in ( + status.HTTP_400_BAD_REQUEST, + status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, + ) + delay_mock.assert_not_called() + + def test_add_or_update_string_type_rejected( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "add_or_update": "not-a-dict", + } + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "errors" in response.data + delay_mock.assert_not_called() + + def test_extensions_dict_type_rejected( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "extensions": {"not": "a-list"}, + } + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "errors" in response.data + delay_mock.assert_not_called() + + def test_oversized_organization_rejected( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "organization": "o" * 10_000, + } + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + codes = [e["code"] for e in response.data["errors"]] + assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes + delay_mock.assert_not_called() + + def test_oversized_version_rejected(self, monkeypatch: pytest.MonkeyPatch) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "version": "v" * 10_000, + } + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + codes = [e["code"] for e in response.data["errors"]] + assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes + delay_mock.assert_not_called() + + def test_oversized_add_or_update_lang_count_rejected( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + from boost_weblate.endpoint.validators import MAX_ADD_OR_UPDATE_LANGS + + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + langs = {f"lang{i}": ["json"] for i in range(MAX_ADD_OR_UPDATE_LANGS + 1)} + body = {**_VALID_ADD_OR_UPDATE_BODY, "add_or_update": langs} + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + codes = [e["code"] for e in response.data["errors"]] + assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in codes + delay_mock.assert_not_called() + + @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) + def test_sql_injection_in_organization_rejected( + self, payload: str, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + codes = [e["code"] for e in response.data["errors"]] + assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes + delay_mock.assert_not_called() + + @pytest.mark.parametrize("payload", _PATH_TRAVERSAL_PAYLOADS) + def test_path_traversal_in_organization_rejected( + self, payload: str, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + delay_mock.assert_not_called() + + @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) + def test_sql_injection_in_version_rejected( + self, payload: str, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = {**_VALID_ADD_OR_UPDATE_BODY, "version": payload} + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + delay_mock.assert_not_called() + + @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) + def test_sql_injection_in_lang_code_rejected( + self, payload: str, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "add_or_update": {payload: ["json"]}, + } + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + codes = [e["code"] for e in response.data["errors"]] + assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in codes + delay_mock.assert_not_called() + + @pytest.mark.parametrize("payload", _CONTROL_BYTE_PAYLOADS) + def test_control_bytes_in_organization_rejected( + self, payload: str, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} + request, _ = self._authenticated_post(body) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + delay_mock.assert_not_called() + + +class TestOrmTrustBoundary: + @pytest.fixture(autouse=True) + def _high_throttle_limits(self): + rest_framework = _throttle_rest_framework( + user="10000/hour", + info="10000/minute", + **{"add-or-update": "10000/hour"}, + ) + with _isolated_throttle_rates(rest_framework): + yield + + def test_rejected_injection_never_reaches_celery( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + factory = APIRequestFactory() + request = factory.post( + "/add-or-update/", + { + "organization": "'; DROP TABLE--", + "version": "boost-1.0", + "add_or_update": {"zh_Hans": ["json"]}, + }, + format="json", + ) + force_authenticate(request, user=User(username="t_orm", pk=401)) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_400_BAD_REQUEST + delay_mock.assert_not_called() + + def test_valid_payload_passes_literal_strings_to_celery( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock(return_value=MagicMock(id="task-orm")) + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + factory = APIRequestFactory() + request = factory.post( + "/add-or-update/", + _VALID_ADD_OR_UPDATE_BODY, + format="json", + ) + force_authenticate(request, user=User(username="t_orm2", pk=402)) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_202_ACCEPTED + delay_mock.assert_called_once_with( + organization="CppDigest", + add_or_update={"zh_Hans": ["json"]}, + version="boost-1.90.0", + extensions=None, + user_id=402, + ) + + def test_user_id_from_auth_not_request_body( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + delay_mock = MagicMock(return_value=MagicMock(id="task-uid")) + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + factory = APIRequestFactory() + body = { + **_VALID_ADD_OR_UPDATE_BODY, + "user_id": 99999, + } + request = factory.post("/add-or-update/", body, format="json") + force_authenticate(request, user=User(username="t_orm3", pk=403)) + response = AddOrUpdateView.as_view()(request) + assert response.status_code == status.HTTP_202_ACCEPTED + assert delay_mock.call_args.kwargs["user_id"] == 403 + + def test_language_get_receives_literal_lang_code( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + from weblate.lang.models import Language + + from boost_weblate.endpoint import tasks as tasks_mod + + lang_code = "zh_Hans" + user = MagicMock() + monkeypatch.setattr(tasks_mod.User.objects, "get", lambda pk: user) + + lang_get_mock = MagicMock() + monkeypatch.setattr(Language.objects, "get", lang_get_mock) + + class FakeService: + def __init__(self, **kw): # noqa: ANN003 + self.lang_code = kw["lang_code"] + self.organization = kw["organization"] + self.version = kw["version"] + self.extensions = kw["extensions"] + + def process_all(self, _submodules, *, user, request=None): # noqa: ANN001 + Language.objects.get(code=self.lang_code) + return {} + + monkeypatch.setattr(tasks_mod, "BoostComponentService", FakeService) + + tasks_mod.boost_add_or_update_task.run( + organization="org", + add_or_update={lang_code: ["json"]}, + version="boost-1.0", + extensions=None, + user_id=1, + ) + + lang_get_mock.assert_called_once_with(code=lang_code) + + def test_get_or_create_project_uses_literal_lang_in_slug( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + from boost_weblate.endpoint import services as services_mod + + lang_code = "zh_Hans" + slug_calls: list[str] = [] + + def capture_get_or_create(*, slug: str, defaults: dict): + slug_calls.append(slug) + project = MagicMock() + project.post_create = MagicMock() + return project, True + + monkeypatch.setattr( + services_mod.Project.objects, "get_or_create", capture_get_or_create + ) + + service = services_mod.BoostComponentService( + organization="org", + lang_code=lang_code, + version="boost-1.0", + extensions=None, + ) + service.get_or_create_project("json", user=MagicMock()) + + assert slug_calls == [f"boost-json-documentation-{lang_code}"] From 2c4c408e056bcf1e9861d68b5ea29eb8d0d4ece9 Mon Sep 17 00:00:00 2001 From: whisper67265 Date: Thu, 2 Jul 2026 10:18:32 -0600 Subject: [PATCH 2/2] fix the coderabbitai review comments --- src/boost_weblate/endpoint/serializers.py | 3 + src/boost_weblate/endpoint/validators.py | 47 ++++--- tests/endpoint/test_views.py | 146 ++++++++-------------- 3 files changed, 79 insertions(+), 117 deletions(-) diff --git a/src/boost_weblate/endpoint/serializers.py b/src/boost_weblate/endpoint/serializers.py index 46d9aa3..b4eca5e 100644 --- a/src/boost_weblate/endpoint/serializers.py +++ b/src/boost_weblate/endpoint/serializers.py @@ -245,6 +245,9 @@ def validate_add_or_update(self, value: dict[str, Any]) -> dict[str, Any]: {"field": RequestField.ADD_OR_UPDATE}, ) ) + self._custom_error_fields.add(RequestField.ADD_OR_UPDATE) + self._custom_validation_errors.extend(boost_validation_errors(items)) + raise serializers.ValidationError({RequestField.ADD_OR_UPDATE: "invalid"}) for lang_code, submodules in value.items(): if not isinstance(lang_code, str) or lang_code.strip() == "": items.append( diff --git a/src/boost_weblate/endpoint/validators.py b/src/boost_weblate/endpoint/validators.py index 9d35609..52b2afb 100644 --- a/src/boost_weblate/endpoint/validators.py +++ b/src/boost_weblate/endpoint/validators.py @@ -25,37 +25,44 @@ _SCP_SSH_RE = re.compile(r"^git@([^:/]+):(.+)$") -def _check_segment_length(name: str, *, field: str) -> None: - if len(name) > MAX_SEGMENT_LEN: +def _validate_segment( + value: str, + *, + field: str, + pattern: re.Pattern[str], + allowed_chars: str, +) -> str: + if not value or not value.strip(): + raise ValidationError(f"{field}: must be a non-empty string") + if len(value) > MAX_SEGMENT_LEN: raise ValidationError( f"{field}: exceeds maximum length of {MAX_SEGMENT_LEN} characters" ) + if not pattern.fullmatch(value): + raise ValidationError( + f"{field}: invalid characters in {value!r}; allowed: {allowed_chars}" + ) + return value def validate_repo_segment(name: str, *, field: str) -> str: """Restrict organization/submodule to safe GitHub path segments.""" - if not name or not name.strip(): - raise ValidationError(f"{field}: must be a non-empty string") - _check_segment_length(name, field=field) - if not _REPO_SEGMENT_RE.fullmatch(name): - raise ValidationError( - f"{field}: invalid characters in {name!r}; " - "allowed: letters, digits, '.', '_', '-'" - ) - return name + return _validate_segment( + name, + field=field, + pattern=_REPO_SEGMENT_RE, + allowed_chars="letters, digits, '.', '_', '-'", + ) def validate_language_code(code: str) -> str: """Restrict language codes to safe Weblate-style identifiers.""" - if not code or not code.strip(): - raise ValidationError("language: must be a non-empty string") - _check_segment_length(code, field="language") - if not _LANGUAGE_CODE_RE.fullmatch(code): - raise ValidationError( - f"language: invalid characters in {code!r}; " - "allowed: letters, digits, '_', '-'" - ) - return code + return _validate_segment( + code, + field="language", + pattern=_LANGUAGE_CODE_RE, + allowed_chars="letters, digits, '_', '-'", + ) def _normalize_clone_url(url: str) -> str: diff --git a/tests/endpoint/test_views.py b/tests/endpoint/test_views.py index e70dae9..1293e97 100644 --- a/tests/endpoint/test_views.py +++ b/tests/endpoint/test_views.py @@ -558,6 +558,29 @@ def test_boost_endpoint_info_user_throttle_can_429( } +@pytest.fixture +def high_throttle_limits(): + """Relax scoped/user throttles for adversarial and ORM trust-boundary tests.""" + rest_framework = _throttle_rest_framework( + user="10000/hour", + info="10000/minute", + **{"add-or-update": "10000/hour"}, + ) + with _isolated_throttle_rates(rest_framework): + yield + + +@pytest.fixture +def mock_add_or_update_delay(monkeypatch: pytest.MonkeyPatch) -> MagicMock: + """Patch AddOrUpdateView Celery enqueue for rejection assertions.""" + delay_mock = MagicMock() + monkeypatch.setattr( + "boost_weblate.endpoint.views.boost_add_or_update_task.delay", + delay_mock, + ) + return delay_mock + + class TestPluginPingAdversarial: def test_post_returns_405(self) -> None: request = RequestFactory().post("/plugin-ping/") @@ -626,17 +649,8 @@ def test_authenticated_sql_injection_query_param_ignored(self) -> None: assert "info" in response.data["capabilities"] +@pytest.mark.usefixtures("high_throttle_limits") class TestAddOrUpdateAdversarial: - @pytest.fixture(autouse=True) - def _high_throttle_limits(self): - rest_framework = _throttle_rest_framework( - user="10000/hour", - info="10000/minute", - **{"add-or-update": "10000/hour"}, - ) - with _isolated_throttle_rates(rest_framework): - yield - @staticmethod def _authenticated_post(body, *, format="json", content_type=None, data=None): factory = APIRequestFactory() @@ -652,13 +666,8 @@ def _authenticated_post(body, *, format="json", content_type=None, data=None): return request, user def test_non_json_body_rejected_without_enqueue( - self, monkeypatch: pytest.MonkeyPatch + self, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) request, _ = self._authenticated_post( None, content_type="application/json", data="not-json" ) @@ -667,16 +676,11 @@ def test_non_json_body_rejected_without_enqueue( status.HTTP_400_BAD_REQUEST, status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, ) - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() def test_add_or_update_string_type_rejected( - self, monkeypatch: pytest.MonkeyPatch + self, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = { **_VALID_ADD_OR_UPDATE_BODY, "add_or_update": "not-a-dict", @@ -685,16 +689,11 @@ def test_add_or_update_string_type_rejected( response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "errors" in response.data - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() def test_extensions_dict_type_rejected( - self, monkeypatch: pytest.MonkeyPatch + self, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = { **_VALID_ADD_OR_UPDATE_BODY, "extensions": {"not": "a-list"}, @@ -703,16 +702,11 @@ def test_extensions_dict_type_rejected( response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST assert "errors" in response.data - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() def test_oversized_organization_rejected( - self, monkeypatch: pytest.MonkeyPatch + self, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = { **_VALID_ADD_OR_UPDATE_BODY, "organization": "o" * 10_000, @@ -722,14 +716,11 @@ def test_oversized_organization_rejected( assert response.status_code == status.HTTP_400_BAD_REQUEST codes = [e["code"] for e in response.data["errors"]] assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() - def test_oversized_version_rejected(self, monkeypatch: pytest.MonkeyPatch) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) + def test_oversized_version_rejected( + self, mock_add_or_update_delay: MagicMock + ) -> None: body = { **_VALID_ADD_OR_UPDATE_BODY, "version": "v" * 10_000, @@ -739,18 +730,13 @@ def test_oversized_version_rejected(self, monkeypatch: pytest.MonkeyPatch) -> No assert response.status_code == status.HTTP_400_BAD_REQUEST codes = [e["code"] for e in response.data["errors"]] assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() def test_oversized_add_or_update_lang_count_rejected( - self, monkeypatch: pytest.MonkeyPatch + self, mock_add_or_update_delay: MagicMock ) -> None: from boost_weblate.endpoint.validators import MAX_ADD_OR_UPDATE_LANGS - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) langs = {f"lang{i}": ["json"] for i in range(MAX_ADD_OR_UPDATE_LANGS + 1)} body = {**_VALID_ADD_OR_UPDATE_BODY, "add_or_update": langs} request, _ = self._authenticated_post(body) @@ -758,64 +744,44 @@ def test_oversized_add_or_update_lang_count_rejected( assert response.status_code == status.HTTP_400_BAD_REQUEST codes = [e["code"] for e in response.data["errors"]] assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in codes - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) def test_sql_injection_in_organization_rejected( - self, payload: str, monkeypatch: pytest.MonkeyPatch + self, payload: str, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} request, _ = self._authenticated_post(body) response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST codes = [e["code"] for e in response.data["errors"]] assert BoostEndpointErrorCode.INVALID_CLONE_URL.value in codes - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() @pytest.mark.parametrize("payload", _PATH_TRAVERSAL_PAYLOADS) def test_path_traversal_in_organization_rejected( - self, payload: str, monkeypatch: pytest.MonkeyPatch + self, payload: str, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} request, _ = self._authenticated_post(body) response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) def test_sql_injection_in_version_rejected( - self, payload: str, monkeypatch: pytest.MonkeyPatch + self, payload: str, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = {**_VALID_ADD_OR_UPDATE_BODY, "version": payload} request, _ = self._authenticated_post(body) response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() @pytest.mark.parametrize("payload", _SQL_INJECTION_PAYLOADS) def test_sql_injection_in_lang_code_rejected( - self, payload: str, monkeypatch: pytest.MonkeyPatch + self, payload: str, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = { **_VALID_ADD_OR_UPDATE_BODY, "add_or_update": {payload: ["json"]}, @@ -825,35 +791,21 @@ def test_sql_injection_in_lang_code_rejected( assert response.status_code == status.HTTP_400_BAD_REQUEST codes = [e["code"] for e in response.data["errors"]] assert BoostEndpointErrorCode.INVALID_LANGUAGE_CODE.value in codes - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() @pytest.mark.parametrize("payload", _CONTROL_BYTE_PAYLOADS) def test_control_bytes_in_organization_rejected( - self, payload: str, monkeypatch: pytest.MonkeyPatch + self, payload: str, mock_add_or_update_delay: MagicMock ) -> None: - delay_mock = MagicMock() - monkeypatch.setattr( - "boost_weblate.endpoint.views.boost_add_or_update_task.delay", - delay_mock, - ) body = {**_VALID_ADD_OR_UPDATE_BODY, "organization": payload} request, _ = self._authenticated_post(body) response = AddOrUpdateView.as_view()(request) assert response.status_code == status.HTTP_400_BAD_REQUEST - delay_mock.assert_not_called() + mock_add_or_update_delay.assert_not_called() +@pytest.mark.usefixtures("high_throttle_limits") class TestOrmTrustBoundary: - @pytest.fixture(autouse=True) - def _high_throttle_limits(self): - rest_framework = _throttle_rest_framework( - user="10000/hour", - info="10000/minute", - **{"add-or-update": "10000/hour"}, - ) - with _isolated_throttle_rates(rest_framework): - yield - def test_rejected_injection_never_reaches_celery( self, monkeypatch: pytest.MonkeyPatch ) -> None: