-
Notifications
You must be signed in to change notification settings - Fork 51
fix: tables schema command replace manual _delta_log JSON parsing with DeltaTable.schema() from the deltalake library #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c22f70e
c0dd71b
f0327bc
746e8f6
ca07e17
0d18818
baaba5d
8221723
1012572
3963efa
34c7141
bce23dc
a4c4ba7
55c3515
f206e12
4cda700
c954a2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: fixed | ||
| body: Refactor `fab tables schema` to use the `deltalake` Python library for schema extraction via ABFSS URI instead of manually parsing Delta log commit files | ||
| time: 2026-04-30T13:05:58.364670843Z | ||
| custom: | ||
| Author: pkontek | ||
| AuthorLink: https://github.com/pkontek |
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Please prove (via test) that the new URI works for all listed item types, or narrow add a validation and fail in code for the non supported item in deltatable flow. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,78 +3,55 @@ | |
|
|
||
| import json | ||
| from argparse import Namespace | ||
| from typing import Optional | ||
|
|
||
| from fabric_cli.client import fab_api_onelake as onelake_api | ||
| from deltalake import DeltaTable | ||
| from deltalake.exceptions import DeltaError | ||
|
|
||
| from fabric_cli.core import fab_constant | ||
| from fabric_cli.core import fab_handle_context as handle_context | ||
| from fabric_cli.core.fab_auth import FabAuth | ||
| from fabric_cli.core.fab_exceptions import FabricCLIError | ||
| from fabric_cli.core.hiearchy.fab_hiearchy import OneLakeItem | ||
| from fabric_cli.utils import fab_ui | ||
| from fabric_cli.utils import fab_util as utils | ||
|
|
||
|
|
||
| def exec_command(args: Namespace) -> None: | ||
| schema = _extract_schema_from_commit_logs(args) | ||
| if schema: | ||
| fab_ui.print_grey("Schema extracted successfully") | ||
| _schema = json.loads(schema)["fields"] | ||
| fab_ui.print_output_format(args, data=_schema, show_headers=True) | ||
| schema_fields = _get_table_schema(args) | ||
| fab_ui.print_grey("Schema extracted successfully") | ||
| fab_ui.print_output_format(args, data=schema_fields, show_headers=True) | ||
|
|
||
|
pkontek marked this conversation as resolved.
|
||
| else: | ||
|
|
||
| def _get_table_schema(args: Namespace) -> list[dict]: | ||
| token = FabAuth().get_access_token(fab_constant.SCOPE_ONELAKE_DEFAULT) | ||
| if token is None: | ||
| raise FabricCLIError( | ||
| "Failed to extract the table schema. Please ensure the path points to a valid Delta table", | ||
| fab_constant.ERROR_INVALID_DETLA_TABLE, | ||
| "Failed to obtain access token.", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Failed to obtain access token." already exists as AuthErrors.access_token_failed(). Per commands.instructions.md, reuse messages from src/fabric_cli/errors/.py: from fabric_cli.errors import ErrorMessages |
||
| fab_constant.ERROR_AUTHENTICATION_FAILED, | ||
| ) | ||
|
|
||
|
|
||
| def _get_commit_logs(args: Namespace) -> Optional[list[str]]: | ||
| _delta_log_path = args.path | ||
| _delta_log_path[-1] = _delta_log_path[-1] + "/_delta_log" | ||
|
|
||
| _context = handle_context.get_command_context(_delta_log_path, raise_error=True) | ||
| assert isinstance(_context, OneLakeItem) | ||
| onelake: OneLakeItem = _context | ||
| workspace_id = onelake.workspace.id | ||
| item_id = onelake.item.id | ||
| local_path = onelake.local_path | ||
|
|
||
| local_path = utils.remove_dot_suffix(local_path) | ||
| args.directory = f"{workspace_id}/?recursive=false&resource=filesystem&directory={item_id}/{local_path}&getShortcutMetadata=true" | ||
| response = onelake_api.list_tables_files_recursive(args) | ||
|
|
||
| if response.status_code in {200, 201}: | ||
| file_names = [f["name"] for f in response.json().get("paths", [])] | ||
| json_files = [ | ||
| f"{workspace_id}/{item_id}/{f.split('/', 1)[1]}" | ||
| for f in file_names | ||
| if f.endswith(".json") and f != "_temporary" | ||
| ] | ||
| json_files.sort(reverse=True) | ||
| return json_files | ||
| return None | ||
|
|
||
|
|
||
| def _extract_schema_from_commit_logs(args: Namespace) -> Optional[str]: | ||
| commit_logs = _get_commit_logs(args) | ||
|
|
||
| if not commit_logs: | ||
| return None | ||
|
|
||
| for log in commit_logs: | ||
| args.from_path = log | ||
| args.wait = True | ||
| response = onelake_api.read(args) | ||
|
|
||
| if response.status_code in {200, 201}: | ||
| json_string = response.text | ||
| json_objects = json_string.strip().split("\n") | ||
|
|
||
| for obj in json_objects: | ||
| commit_data = json.loads(obj) | ||
| if "metaData" in commit_data: | ||
| metadata = commit_data["metaData"] | ||
| schema = metadata["schemaString"] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old code returned json.loads(metaData.schemaString)["fields"], the Delta protocol's field schema. The new code returns json.loads(DeltaTable.schema().to_json())["fields"], delta-rs's serialization of the Arrow schema. For complex types (nested struct, decimal, timestamp, map), the JSON shapes can differ ("type": "long" vs "type": "integer", nested vs string representation, etc.). Could you please add a unit test with at least one nested/decimal/timestamp field and assert the exact output structure to lock the contract? Otherwise this is a silent breaking change for users who pipe |
||
| return schema | ||
|
|
||
| return None | ||
| if args.schema: | ||
| local_path = f"Tables/{args.schema}/{args.table_name}" | ||
| else: | ||
| local_path = f"Tables/{args.table_name}" | ||
|
Comment on lines
+29
to
+32
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please pass the resolved context.local_path from fab_tables.py::schema_command (either directly or via an args.table_local_path populated in add_table_props_to_args). Drop the manual f"Tables/..." reconstruction.
Comment on lines
+29
to
+32
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old code defensively called utils.remove_dot_suffix(local_path) (default suffix .Shortcut). add_table_props_to_args sets args.table_name = table_path[-1] without stripping .Shortcut. If a user runs fab table schema /ws.Workspace/lh.Lakehouse/Tables/my_table.Shortcut, the new URI becomes …/Tables/my_table.Shortcut, which won't exist in storage. PLease add a regression test for the .Shortcut path and either (a) call remove_dot_suffix(args.table_name) defensively here, or (b) confirm in add_table_props_to_args that the suffix is normalized upstream. |
||
|
|
||
| table_uri = ( | ||
| f"abfss://{args.ws_id}@{fab_constant.API_ENDPOINT_ONELAKE}" | ||
| f"/{args.lakehouse_id}/{local_path}" | ||
|
pkontek marked this conversation as resolved.
|
||
| ) | ||
|
|
||
| try: | ||
| table = DeltaTable( | ||
| table_uri, | ||
| storage_options={ | ||
| "bearer_token": token, | ||
| "use_fabric_endpoint": "true", | ||
|
pkontek marked this conversation as resolved.
|
||
| }, | ||
| ) | ||
| schema_json = table.schema().to_json() | ||
| schema_dict = json.loads(schema_json) | ||
| schema_fields = schema_dict.get("fields") | ||
| if not isinstance(schema_fields, list): | ||
| raise ValueError("Delta table schema JSON does not contain a valid 'fields' list.") | ||
| return schema_fields | ||
| except (DeltaError, json.JSONDecodeError, ValueError) as exc: | ||
| raise FabricCLIError( | ||
| f"Failed to extract the table schema. Please ensure the path points to a valid Delta table: {exc}", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe exception strings can include the full abfss://… URI (containing workspace & item GUIDs), internal Rust diagnostics, and storage-backend details. That violates the supportability/security guidance in commands.instructions.md ("Never log tokens, cookies, correlation IDs, or PII"). |
||
| fab_constant.ERROR_INVALID_DELTA_TABLE, | ||
|
pkontek marked this conversation as resolved.
|
||
| ) from exc | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| interactions: | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: GET | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces | ||
| response: | ||
| body: | ||
| string: '{"value": [{"id": "94da8ea5-0bd6-4a9e-b717-5fdb482f4c71", "displayName": | ||
| "My workspace", "description": "", "type": "Personal"}]}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: GET | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces | ||
| response: | ||
| body: | ||
| string: '{"value": [{"id": "94da8ea5-0bd6-4a9e-b717-5fdb482f4c71", "displayName": | ||
| "My workspace", "description": "", "type": "Personal"}]}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: GET | ||
| uri: https://api.fabric.microsoft.com/v1/capacities | ||
| response: | ||
| body: | ||
| string: '{"value": [{"id": "00000000-0000-0000-0000-000000000004", "displayName": | ||
| "mocked_fabriccli_capacity_name", "sku": "F2", "region": "West Europe", "state": | ||
| "Active"}]}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| - request: | ||
| body: '{"displayName": "fabriccli_WorkspacePerTestclass_000001", "capacityId": "00000000-0000-0000-0000-000000000004"}' | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: POST | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces | ||
| response: | ||
| body: | ||
| string: '{"id": "d5e6f7a8-b9c0-d1e2-f3a4-b5c6d7e8f9a0", "displayName": "fabriccli_WorkspacePerTestclass_000001", | ||
| "type": "Workspace", "capacityId": "00000000-0000-0000-0000-000000000004"}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 201 | ||
| message: Created | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: GET | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces | ||
| response: | ||
| body: | ||
| string: '{"value": [{"id": "94da8ea5-0bd6-4a9e-b717-5fdb482f4c71", "displayName": | ||
| "My workspace", "description": "", "type": "Personal"}, {"id": "d5e6f7a8-b9c0-d1e2-f3a4-b5c6d7e8f9a0", | ||
| "displayName": "fabriccli_WorkspacePerTestclass_000001", "type": "Workspace", "capacityId": | ||
| "00000000-0000-0000-0000-000000000004"}]}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: GET | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces/d5e6f7a8-b9c0-d1e2-f3a4-b5c6d7e8f9a0/items | ||
| response: | ||
| body: | ||
| string: '{"value": []}' | ||
| headers: | ||
| Content-Type: | ||
| - application/json; charset=utf-8 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| - request: | ||
| body: null | ||
| headers: | ||
| Accept: | ||
| - '*/*' | ||
| Accept-Encoding: | ||
| - gzip, deflate | ||
| Connection: | ||
| - keep-alive | ||
| Content-Length: | ||
| - '0' | ||
| Content-Type: | ||
| - application/json | ||
| User-Agent: | ||
| - ms-fabric-cli-test/1.0.0 | ||
| method: DELETE | ||
| uri: https://api.fabric.microsoft.com/v1/workspaces/d5e6f7a8-b9c0-d1e2-f3a4-b5c6d7e8f9a0 | ||
| response: | ||
| body: | ||
| string: '' | ||
| headers: | ||
| Content-Type: | ||
| - application/octet-stream | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| version: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous implementation went through fab_api_onelake (centralized retries, timeouts, telemetry, request-id surfacing, and respect for FAB_API_ENDPOINT_ONELAKE testing hooks via fab_api_client.py). The new code constructs an abfss:// URI and hands the bearer token directly to delta-rs's Rust object_store.
That conflicts with commands.instructions.md - "All API calls must be invoked via centralized request utilities", "Use the common API client and common retry/backoff policy - don't hand-roll session logic".
Concretely we lose:
--timeoutsemanticsPlease wrap the DeltaTable(...) construction in a thin helper under fabric_cli/client/ (e.g., fab_delta_client.py) that owns storage_options assembly, and error handling for exceptions → FabricCLIError mapping, and provides a single seam for future retry/timeout/telemetry.