Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import logging

from gooddata_sdk._version import __version__
from gooddata_sdk.catalog.ai_lake.entity_model.column_expression import (
CatalogColumnExpression,
ColumnExpressionFunction,
)
from gooddata_sdk.catalog.ai_lake.entity_model.object_storage import CatalogObjectStorageInfo
from gooddata_sdk.catalog.ai_lake.service import (
CatalogAILakeOperation,
CatalogAILakeOperationError,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# (C) 2026 GoodData Corporation
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# (C) 2026 GoodData Corporation
"""SDK model for AI Lake pipe-table ColumnExpression projections."""

from __future__ import annotations

from typing import Literal

import attrs
from gooddata_api_client.model.column_expression import ColumnExpression

ColumnExpressionFunction = Literal["HLL_HASH", "BITMAP_HASH", "BITMAP_HASH64", "TO_BITMAP"]
"""StarRocks transform functions supported in pipe-table column projection overrides."""


@attrs.define(kw_only=True)
class CatalogColumnExpression:
"""Single column projection override for a pipe table.

Each instance produces ``<function>(<column>) AS <target_column>`` in the
``SELECT`` list of the generated ``CREATE PIPE … AS INSERT`` statement.
Required for AGGREGATE-KEY tables that include native HLL or BITMAP columns
because StarRocks rejects raw VARBINARY values into those column types.

Pass a mapping of ``{target_column: CatalogColumnExpression}`` as the
``column_expressions`` argument to
:py:meth:`~gooddata_sdk.catalog.ai_lake.service.CatalogAILakeService.create_pipe_table`.

Example::

from gooddata_sdk import CatalogColumnExpression

exprs = {
"user_hll": CatalogColumnExpression(column="user_id", function="HLL_HASH"),
"page_bmp": CatalogColumnExpression(column="page_id", function="TO_BITMAP"),
}
"""

column: str
"""Source column produced by parquet schema inference (after ``columnOverrides``)."""

function: ColumnExpressionFunction
"""StarRocks transform to apply to *column* when projecting it."""

def as_api_model(self) -> ColumnExpression:
"""Serialize to the auto-generated ``ColumnExpression`` API model."""
return ColumnExpression(
column=self.column,
function=self.function,
_check_type=False,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# (C) 2026 GoodData Corporation
"""SDK model for AI Lake ObjectStorage descriptors."""

from __future__ import annotations

from typing import Any

import attrs


@attrs.define(kw_only=True)
class CatalogObjectStorageInfo:
"""Descriptor of a registered AI Lake ObjectStorage.

Provider credentials are stripped server-side — only safe descriptors
(id, name, type, and provider-specific metadata like bucket/region) are
returned. Use :attr:`name` as ``source_storage_name`` when calling
:py:meth:`~gooddata_sdk.catalog.ai_lake.service.CatalogAILakeService.create_pipe_table`,
or pass :attr:`storage_id` to the ``storageIds`` list of a
``ProvisionDatabase`` request.
"""

name: str
"""Human-readable name of the storage configuration."""

storage_id: str
"""Stable UUID identifier of the storage configuration."""

storage_type: str
"""Provider type (e.g. ``S3``, ``MINIO``, ``ADLS``)."""

storage_config: dict[str, str] = attrs.field(factory=dict)
"""Provider-specific descriptors (bucket, region, endpoint, …).

Credential references (keys ending in ``_env``) are stripped by the server.
"""

@classmethod
def from_dict(cls, data: dict[str, Any]) -> CatalogObjectStorageInfo:
"""Construct from a snake_case dict as returned by the API client's ``to_dict()``."""
return cls(
name=data["name"],
storage_id=data["storage_id"],
storage_type=data["storage_type"],
storage_config=data.get("storage_config") or {},
)
121 changes: 110 additions & 11 deletions packages/gooddata-sdk/src/gooddata_sdk/catalog/ai_lake/service.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
# (C) 2026 GoodData Corporation
"""SDK wrapper for the AI Lake long-running-operation surface.
"""SDK wrapper for the AI Lake API surface.

Today this exposes only the operations needed by aggregate-aware LDMs:
Currently exposed operations:

- `analyze_statistics` triggers `ANALYZE TABLE` over a database instance so
CBO statistics catch up after a schema or data change. Required after
registering a pre-aggregation table whose dim attributes the platform will
later resolve via filter pushdown.
- `list_object_storages` lists ObjectStorages registered for the organization.
Use the returned names as ``source_storage_name`` in `create_pipe_table`.
- `create_pipe_table` registers a pipe table in a database instance, with
optional `CatalogColumnExpression` overrides for HLL / BITMAP columns.
- `analyze_statistics` triggers ``ANALYZE TABLE`` over a database instance so
CBO statistics catch up after a schema or data change.
- `get_operation` and `wait_for_operation` cover the polling side of the
long-running operation contract that `analyze_statistics` returns.

The full AI Lake API surface (database provisioning, pipe-table
registration, service commands) is not yet wrapped here; consumers that
need those should call `client.ai_lake_api.<method>` directly until a
ticket adds typed wrappers.
"""

from __future__ import annotations
Expand All @@ -25,7 +22,10 @@
from attrs import define
from gooddata_api_client.api.ai_lake_api import AILakeApi
from gooddata_api_client.model.analyze_statistics_request import AnalyzeStatisticsRequest
from gooddata_api_client.model.create_pipe_table_request import CreatePipeTableRequest

from gooddata_sdk.catalog.ai_lake.entity_model.column_expression import CatalogColumnExpression
from gooddata_sdk.catalog.ai_lake.entity_model.object_storage import CatalogObjectStorageInfo
from gooddata_sdk.catalog.base import Base
from gooddata_sdk.client import GoodDataApiClient

Expand Down Expand Up @@ -76,6 +76,105 @@ def __init__(self, api_client: GoodDataApiClient) -> None:
self._client = api_client
self._ai_lake_api: AILakeApi = AILakeApi(api_client._api_client)

# ------------------------------------------------------------------
# ObjectStorage listing
# ------------------------------------------------------------------

def list_object_storages(self) -> list[CatalogObjectStorageInfo]:
"""List ObjectStorages registered for the organization.

Provider credentials are stripped server-side — only safe descriptors
(id, name, type, bucket, region, endpoint, …) are returned.

Use the returned :attr:`~CatalogObjectStorageInfo.name` as
``source_storage_name`` when calling :meth:`create_pipe_table`, or
pass :attr:`~CatalogObjectStorageInfo.storage_id` to the
``ProvisionDatabase`` ``storageIds`` list.

Returns:
List of :class:`CatalogObjectStorageInfo`, ordered by name.
"""
response = self._ai_lake_api.list_ai_lake_object_storages(_check_return_type=False)
data = response.to_dict() if hasattr(response, "to_dict") else dict(response)
return [CatalogObjectStorageInfo.from_dict(s) for s in data.get("storages", [])]

# ------------------------------------------------------------------
# Pipe-table management
# ------------------------------------------------------------------

def create_pipe_table(
self,
instance_id: str,
table_name: str,
source_storage_name: str,
path_prefix: str,
*,
column_expressions: dict[str, CatalogColumnExpression] | None = None,
column_overrides: dict[str, str] | None = None,
aggregation_overrides: dict[str, str] | None = None,
max_varchar_length: int | None = None,
polling_interval_seconds: int | None = None,
table_properties: dict[str, str] | None = None,
) -> None:
"""Register a new pipe table in an AI Lake database instance.

Args:
instance_id: Database instance name (preferred) or UUID.
table_name: OLAP table name. Must match ``^[a-z][a-z0-9_-]{0,62}$``.
source_storage_name: Name of a registered ObjectStorage (use
:meth:`list_object_storages` to discover available names).
path_prefix: Path prefix to the parquet files in the storage
(e.g. ``'my-dataset/year=2024/'``).
column_expressions: Per-target-column projection overrides. Each
key is the target column name; the value is a
:class:`CatalogColumnExpression` that emits
``<function>(<column>) AS <key>`` in the generated
``CREATE PIPE … AS INSERT`` SELECT list. Required for
AGGREGATE-KEY tables that include native HLL or BITMAP columns.
column_overrides: Override inferred column types, e.g.
``{"year": "INT", "event_date": "DATE"}``.
aggregation_overrides: Maps non-key column names to their StarRocks
aggregation function (``SUM``, ``MIN``, ``MAX``, ``REPLACE``,
``HLL_UNION``, ``BITMAP_UNION``, …). Required for every
non-key column when ``key_config`` type is ``'aggregate'``.
max_varchar_length: Cap VARCHAR(N) columns to this length; 0 means
no cap.
polling_interval_seconds: How often (in seconds) the pipe polls for
new files; 0 or ``None`` uses the server default.
table_properties: ``CREATE TABLE PROPERTIES`` key-value pairs.
Defaults to ``{"replication_num": "1"}`` server-side.
"""
kwargs: dict[str, Any] = {}
if column_expressions is not None:
kwargs["column_expressions"] = {k: v.as_api_model() for k, v in column_expressions.items()}
if column_overrides is not None:
kwargs["column_overrides"] = column_overrides
if aggregation_overrides is not None:
kwargs["aggregation_overrides"] = aggregation_overrides
if max_varchar_length is not None:
kwargs["max_varchar_length"] = max_varchar_length
if polling_interval_seconds is not None:
kwargs["polling_interval_seconds"] = polling_interval_seconds
if table_properties is not None:
kwargs["table_properties"] = table_properties

request = CreatePipeTableRequest(
table_name=table_name,
source_storage_name=source_storage_name,
path_prefix=path_prefix,
_check_type=False,
**kwargs,
)
self._ai_lake_api.create_ai_lake_pipe_table(
instance_id,
request,
_check_return_type=False,
)

# ------------------------------------------------------------------
# Statistics
# ------------------------------------------------------------------

def analyze_statistics(
self,
instance_id: str,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
interactions:
- request:
body: null
headers:
Accept:
- application/json
Accept-Encoding:
- br, gzip, deflate
X-GDC-VALIDATE-RELATIONS:
- 'true'
X-Requested-With:
- XMLHttpRequest
method: GET
uri: http://localhost:3000/api/v1/ailake/object-storages
response:
body:
string:
detail: Your organization does not have the necessary entitlement to use
AI Lake APIs.
status: 403
title: Forbidden
traceId: NORMALIZED_TRACE_ID_000000000000
headers:
Content-Type:
- application/problem+json
DATE: &id001
- PLACEHOLDER
Expires:
- '0'
Pragma:
- no-cache
X-Content-Type-Options:
- nosniff
X-GDC-TRACE-ID: *id001
status:
code: 403
message: Forbidden
- request:
body: null
headers:
Accept:
- application/json
Accept-Encoding:
- br, gzip, deflate
X-GDC-VALIDATE-RELATIONS:
- 'true'
X-Requested-With:
- XMLHttpRequest
method: GET
uri: http://localhost:3000/api/v1/ailake/object-storages
response:
body:
string:
detail: Your organization does not have the necessary entitlement to use
AI Lake APIs.
status: 403
title: Forbidden
traceId: NORMALIZED_TRACE_ID_000000000000
headers:
Content-Type:
- application/problem+json
DATE: *id001
Expires:
- '0'
Pragma:
- no-cache
X-Content-Type-Options:
- nosniff
X-GDC-TRACE-ID: *id001
status:
code: 403
message: Forbidden
version: 1
39 changes: 39 additions & 0 deletions packages/gooddata-sdk/tests/catalog/test_catalog_ai_lake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# (C) 2026 GoodData Corporation
"""Integration tests for AI Lake SDK methods backed by VCR cassettes.

These tests exercise the HTTP surface of `CatalogAILakeService`. Each test
needs a cassette recorded against a live stack; the cassette files are created
by the recorder and are not hand-edited.
"""

from __future__ import annotations

from pathlib import Path

import pytest
from gooddata_sdk import CatalogObjectStorageInfo, GoodDataSdk
from tests_support.vcrpy_utils import get_vcr

gd_vcr = get_vcr()

_current_dir = Path(__file__).parent.absolute()
_fixtures_dir = _current_dir / "fixtures" / "ai_lake"

_cassette_list_object_storages = _fixtures_dir / "test_list_ai_lake_object_storages.yaml"


@gd_vcr.use_cassette(str(_cassette_list_object_storages))
def test_list_ai_lake_object_storages(test_config):
"""List registered AI Lake ObjectStorages and verify the response shape."""
if not _cassette_list_object_storages.exists():
pytest.skip("Cassette not yet recorded — requires an AI Lake-enabled environment")
sdk = GoodDataSdk.create(host_=test_config["host"], token_=test_config["token"])
storages = sdk.catalog_ai_lake.list_object_storages()

assert isinstance(storages, list)
for storage in storages:
assert isinstance(storage, CatalogObjectStorageInfo)
assert storage.name, "name must be non-empty"
assert storage.storage_id, "storage_id must be non-empty"
assert storage.storage_type, "storage_type must be non-empty"
assert isinstance(storage.storage_config, dict)
Loading
Loading