Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ on: [push, pull_request]

jobs:
tests:
runs-on: ubuntu-latest
timeout-minutes: 15
runs-on: ${{ matrix.os }}
timeout-minutes: ${{ (matrix.os == 'windows-latest' && 30) || 15 }}

defaults:
run:
shell: bash -l {0}

strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
os: [ubuntu-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12"]

concurrency:
group: ci-tests-${{ matrix.python-version }}-${{ github.ref }}
group: ci-tests-${{ matrix.os }}-${{ matrix.python-version }}-${{ github.ref }}
cancel-in-progress: true

steps:
Expand All @@ -31,22 +32,28 @@ jobs:
auto-update-conda: true
conda-solver: libmamba

- name: Linting & Tests
- name: Install dependencies
run: |
pip install poetry poetry-plugin-export

poetry config virtualenvs.create false

poetry export --with dev --extras dbc --format requirements.txt --output reqs.txt --without-hashes

pip install -r reqs.txt
pip install -e ".[dbc]"

pre-commit run --files pysus/**/*

make test-pysus-with-coverage
if [ "${{ runner.os }}" = "Linux" ]; then
poetry install --without dev --extras dbc
pip install pre-commit
else
poetry install --without dev
fi
pip install pytest pytest-timeout pytest-retry pytest-asyncio pytest-cov

- name: Linting
if: matrix.os == 'ubuntu-latest'
run: pre-commit run --files pysus/**/*

- name: Tests
run: |
poetry run pytest -vv pysus/tests/ --retries 3 --retry-delay 15 --cov=pysus --cov-report=xml:coverage.xml --cov-report=term-missing

- name: Upload coverage to Codecov
if: matrix.os == 'ubuntu-latest'
uses: codecov/codecov-action@v5
with:
files: ./coverage.xml
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ jobs:
make html

- name: Configure GitHub Pages
if: github.ref == 'refs/heads/main'
uses: actions/configure-pages@v5

- name: Upload artifact
if: github.ref == 'refs/heads/main'
uses: actions/upload-pages-artifact@v3
with:
path: docs/build/html
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ sudo apt install libffi-dev
pip install pysus[dbc]
```

For the terminal user interface (TUI):
```bash
pip install pysus[tui]
```

## Quick Start

### Simplified Database Functions (New in 2.0)
Expand Down
2 changes: 1 addition & 1 deletion conda/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
- defaults
dependencies:
- docker-compose
- python>=3.10,<3.14
- python>=3.10,<3.13
- jupyter
- make
- pip
264 changes: 102 additions & 162 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ python = ">=3.10,<3.14"
python-dateutil = "2.8.2"
fastparquet = ">=2023.10.1,<=2024.11.0"
pyarrow = ">=11.0.0"
numpy = ">1,<3"
numpy = ">=1.22,<2"
tqdm = ">=4.67.0"
wget = "^3.2"
loguru = "^0.6.0"
Expand All @@ -31,23 +31,25 @@ pydantic = "^2.12.5"
duckdb = "^1.4.4"
duckdb-engine = "^0.17.0"
sqlalchemy = "^2.0.48"
textual = {extras = ["syntax"], version = "^8.2.1"}
python-magic = "^0.4.27"
chardet = "^7.4.0.post2"
anyio = "^4.13.0"
humanize = "^4.8.0"
httpx = ">=0.28.0"
aioftp = "^0.21.4"
dbfread = "2.0.7"
bigtree = "^0.12.2"

pyreaddbc = { version = ">=1.1.0", optional = true }
pycparser = { version = "2.21", optional = true }
textual = { extras = ["syntax"], version = "^8.2.1", optional = true }
humanize = { version = "^4.8.0", optional = true }
dotenv = "^0.9.9"
boto3 = "^1.42.89"
typer = "^0.24.1"

[tool.poetry.extras]
dbc = ["pyreaddbc", "pycparser"]
tui = ["textual", "humanize"]

[tool.poetry.group.dev.dependencies]
pytest = ">=6.1.0"
Expand All @@ -64,6 +66,7 @@ pytest-cov = "^7.1.0"

[tool.poetry.group.docs.dependencies]
sphinx = "^5.1.1"
standard-imghdr = "*"
nbmake = "^1.4.1"
matplotlib = "^3.7.1"
jupyterlab = "^4.0.5"
Expand Down Expand Up @@ -101,6 +104,12 @@ testpaths = [

exclude = ["*.git", "docs/"]

[tool.coverage.run]
omit = [
"pysus/management/client.py",
"pysus/tui/*",
]

[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
Expand Down
76 changes: 68 additions & 8 deletions pysus/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from pathlib import Path
from typing import TYPE_CHECKING, Literal

import anyio
import duckdb
import pandas as pd
from pysus import CACHEPATH
from sqlalchemy import DateTime, Enum, Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
Expand Down Expand Up @@ -235,14 +237,26 @@ async def download(
file: BaseRemoteFile,
token: str | None = None,
callback: Callable | None = None,
timeout: float | None = None,
) -> BaseLocalFile:
"""Download a remote file and return a local file handle."""
"""Download a remote file and return a local file handle.

Parameters
----------
timeout : float | None
Maximum seconds to wait for the download. ``None`` (default) means
no timeout – use this when the socket-level timeout on the
underlying client is sufficient.
"""

from pysus.api.extensions import ExtensionFactory

existing_local = await self.get_local_file(file)
if existing_local and existing_local.path.exists():
return existing_local
if existing_local.size == file.size:
return existing_local
await self._delete_record(str(existing_local.path))
existing_local.path.unlink(missing_ok=True)

client_name = file.client.name.lower()
remote_path = file.path
Expand Down Expand Up @@ -271,7 +285,11 @@ async def download(
f"No download logic for client: {client_name}",
)

await client._download_file(file, local_path, callback)
if timeout is not None:
with anyio.fail_after(timeout):
await client._download_file(file, local_path, callback)
else:
await client._download_file(file, local_path, callback)

await self._update_state(
local_path=local_path,
Expand Down Expand Up @@ -311,18 +329,22 @@ async def download_to_parquet(
file: BaseRemoteFile,
token: str | None = None,
callback: Callable[[int, int], None] | None = None,
timeout: float | None = None,
add_dv: bool = True,
) -> Parquet:
"""Download a file and convert it to Parquet format."""

local_file = await self.download(
file=file,
token=token,
callback=callback,
timeout=timeout,
)

if hasattr(local_file, "to_parquet"):
original_path = local_file.path
parquet_file = await local_file.to_parquet(callback=callback)
parquet_file.add_dv = add_dv

await self._update_state(
local_path=parquet_file.path,
Expand All @@ -346,7 +368,9 @@ async def download_to_parquet(
)

def get_local_hierarchy(self):
"""Build a nested dict of cached files grouped by client and dataset."""
"""
Build a nested dict of cached files grouped by client and dataset.
"""

with self.Session() as session:
records = session.query(LocalFileState).all()
Expand Down Expand Up @@ -414,8 +438,20 @@ def read_parquet(
paths: list[Path],
sql: str | None = None,
mode: Literal["union", "intersection", "strict"] = "union",
) -> "DuckDBPyConnection":
"""Read Parquet files with optional schema handling and SQL filter."""
add_dv: bool = True,
) -> "DuckDBPyConnection | pd.DataFrame":
"""Read Parquet files with optional schema handling and SQL filter.

Parameters
----------
add_dv : bool
When True, automatically applies the IBGE verification digit to
municipality code columns. If there are matching columns, a
DataFrame is returned instead of a DuckDBPyConnection.
"""

from pysus.api.utils import add_dv as _add_dv_fn
from pysus.api.utils import is_geocode_column

if not paths:
raise ValueError("No paths provided")
Expand Down Expand Up @@ -452,8 +488,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
else:
paths_str = ", ".join(f"'{p}'" for p in paths)
query = (
f"SELECT * FROM read_parquet([{paths_str}], "
"union_by_name=True)"
f"SELECT * FROM read_parquet([{paths_str}], union_by_name=True)"
)

if sql:
Expand All @@ -462,4 +497,29 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
else:
query = f"SELECT {sql} FROM ({query}) AS t"

base = duckdb.execute(query)

if not add_dv:
return base

geocode_cols = [
col[0] for col in base.description if is_geocode_column(col[0])
]
if not geocode_cols:
return base

duckdb.create_function(
"__pysus_add_dv",
_add_dv_fn,
null_handling="special",
)
selects = [
(
f'__pysus_add_dv("{c[0]}") AS "{c[0]}"'
if c[0] in geocode_cols
else f'"{c[0]}"'
)
for c in base.description
]
query = f"SELECT {', '.join(selects)} FROM ({query}) AS _t"
return duckdb.execute(query)
28 changes: 26 additions & 2 deletions pysus/api/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from typing import ClassVar

import chardet
import magic

try:
import magic
except (ImportError, OSError):
magic = None # type: ignore[assignment]

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand Down Expand Up @@ -188,6 +193,7 @@ class Parquet(BaseTabularFile):
"""Represents a Parquet file with optional date and integer type parsing."""

type: FileType = Field("PARQUET")
add_dv: bool = True

@property
def schema(self) -> pa.Schema:
Expand All @@ -204,12 +210,26 @@ def rows(self) -> int:
"""Return the number of rows from the Parquet metadata."""
return pq.read_metadata(self.path).num_rows

@staticmethod
def _apply_add_dv(df: pd.DataFrame) -> pd.DataFrame:
"""Apply the IBGE verification digit to geocode columns in-place."""
from pysus.api.utils import add_dv, is_geocode_column

geocode_cols = [c for c in df.columns if is_geocode_column(c)]
for col in geocode_cols:
df[col] = df[col].astype(str).apply(add_dv)
return df

async def load(self, parse: bool = True) -> pd.DataFrame:
"""Read the entire Parquet file into a DataFrame."""

def _load():
df = pd.read_parquet(self.path, engine="pyarrow")
return self.parse_dftypes(df) if parse else df
if parse:
df = self.parse_dftypes(df)
if self.add_dv:
df = self._apply_add_dv(df)
return df

return await to_thread.run_sync(_load)

Expand All @@ -226,6 +246,8 @@ async def stream(
df = batch.to_pandas()
if parse:
df = self.parse_dftypes(df)
if self.add_dv:
df = self._apply_add_dv(df)
yield df
await asyncio.sleep(0)

Expand Down Expand Up @@ -815,6 +837,8 @@ class ExtensionFactory:
@classmethod
async def _identify(cls, path: Path) -> type[BaseLocalFile] | None:
"""Identify the file class by its MIME type."""
if magic is None:
return None
try:
mime = await to_thread.run_sync(
magic.from_file,
Expand Down
3 changes: 2 additions & 1 deletion pysus/api/ftp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class FTP(BaseRemoteClient):
"""Async FTP client for navigating and downloading DATASUS data."""

host: str = "ftp.datasus.gov.br"
timeout: int = 60

_ftp: FTPLib | None = PrivateAttr(default=None)

Expand Down Expand Up @@ -77,7 +78,7 @@ async def connect(self) -> None:

def _connect():
if self.ftp is None:
self._ftp = FTPLib(self.host)
self._ftp = FTPLib(self.host, timeout=self.timeout)
self.ftp.login()

await to_thread.run_sync(_connect)
Expand Down
Loading
Loading