Source code for timescale_access.client

from datetime import datetime
from typing import Any, Dict, List, Optional

import pandas as pd
from sqlalchemy.engine import Engine

from . import analysis, engine, read, write


[docs] class TimescaleAccess: """ Convenience wrapper around database operations using SQLAlchemy. """ def __init__(self, db_url: str) -> None: """ Initialize a database connection using the ``connection`` module. Args: db_url (str): Database URL. """ self.engine: Engine = engine.get_engine(db_url)
[docs] def dispose_connection(self) -> None: """ Explicitly dispose the underlying SQLAlchemy engine and free resources. """ self.engine.dispose()
[docs] def check_connection(self) -> bool: """ Test whether a connection to the database can be established. Returns: bool: True if the connection check succeeds, False otherwise. """ return engine.check_connection(self.engine)
[docs] def insert_hypertable( self, schema_name: str, table_name: str, df: pd.DataFrame, index: bool = False, chunksize: int = 500, time_column: str = "timestamp", ) -> None: """ Insert a pandas DataFrame into a TimescaleDB hypertable. This method is tailored to time-series data. If the target table does not exist, it is automatically created as a TimescaleDB hypertable. The time column is set to ``"timestamp"`` by default. Args: schema_name (str): Name of the target schema. table_name (str): Name of the target table. df (pd.DataFrame): DataFrame to insert. Must contain a time column. index (bool, optional): Whether to persist the pandas index (default: False). chunksize (int, optional): Number of rows per batch insert (default: 500). time_column (str, optional): Name of the time column in the DataFrame (default: ``"timestamp"``). Raises: ValueError: If the given time column is not present in ``df``. RuntimeError: If insertion or hypertable creation fails. """ write.insert_hypertable( engine=self.engine, schema_name=schema_name, table_name=table_name, df=df, index=index, chunksize=chunksize, time_column=time_column, )
[docs] def drop_table(self, schema_name: str, table_name: str) -> None: """ Drop a table in the given schema. Args: schema_name (str): Schema name. table_name (str): Name of the table to drop. """ write.drop_table(self.engine, schema_name, table_name)
[docs] def get_existing_timestamps( self, schema_name: str, table_name: str, column: str = "timestamp", ) -> List[datetime]: """ Return a sorted list of distinct timestamps from a specific column. Args: schema_name (str): Schema name. table_name (str): Table name. column (str, optional): Column name to query (default: ``"timestamp"``). Returns: List[datetime]: Sorted list of timestamps. """ return read.get_existing_timestamps(self.engine, schema_name, table_name, column)
[docs] def get_table_names(self, schema_name: str) -> List[str]: """ Return all table names in the given schema. Args: schema_name (str): Schema name. Returns: List[str]: List of table names. """ return read.get_table_names(self.engine, schema_name)
[docs] def get_column_names(self, schema_name: str, table_name: str) -> List[str]: """ Return all column names for a given table. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: List[str]: List of column names. """ return read.get_column_names(self.engine, schema_name, table_name)
[docs] def get_distinct_values( self, schema_name: str, table_name: str, column_name: str, ) -> List[str]: """ Return all distinct values of a column in a table. Args: schema_name (str): Schema name. table_name (str): Table name. column_name (str): Column name. Returns: List[str]: List of distinct values in the column. """ return read.get_distinct_values(self.engine, schema_name, table_name, column_name)
[docs] def get_indexes(self, schema_name: str, table_name: str) -> List[Dict[str, Any]]: """ Return all indexes for a given table. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: List[dict]: List of index metadata dictionaries. """ return read.get_indexes(self.engine, schema_name, table_name)
[docs] def get_table( self, schema_name: str, table_name: str, filters: Optional[Dict[str, Any]] = None, ) -> pd.DataFrame: """ Load a table as a pandas DataFrame with optional filters. Args: schema_name (str): Schema name. table_name (str): Table name. filters (Optional[Dict[str, Any]], optional): Filter specification for the WHERE clause. Examples: - Single/multiple values: ``{"instrument_name": ["BTC-14MAR25", "ETH-14MAR25"]}`` - Range filter: ``{"trade_seq": {"between": (100, 200)}}`` Returns: pd.DataFrame: Filtered table contents. """ return read.get_table(self.engine, schema_name, table_name, filters)
[docs] def get_databases(self) -> List[str]: """ Return all non-template databases in the PostgreSQL instance. Returns: List[str]: List of database names. """ return read.get_databases(self.engine)
[docs] def get_roles(self) -> List[Dict[str, Any]]: """ Return all roles and their privileges. Returns: List[dict]: List of role metadata, including superuser and createdb flags. """ return read.get_roles(self.engine)
[docs] def get_role_memberships(self) -> List[Dict[str, Any]]: """ Return all role memberships in the database. Returns: List[dict]: List of mappings from member to role. """ return read.get_role_memberships(self.engine)
[docs] def get_active_connections(self) -> List[Dict[str, Any]]: """ Return information about active database connections. Returns: List[dict]: List of active connections with database name, user and client IP. """ return read.get_active_connections(self.engine)
[docs] def get_schemas(self) -> List[str]: """ Return a list of all user-defined schemas. Returns: List[str]: List of schema names. """ return read.get_schemas(self.engine)
[docs] def ensure_schema_exists(self, schema_name: str) -> None: """ Create the given schema if it does not already exist. Args: schema_name (str): Name of the schema to ensure. """ write.ensure_schema_exists(self.engine, schema_name)
[docs] def get_missing_trade_seq(self, schema_name: str, table_name: str) -> pd.DataFrame: """ Return all expected but missing ``trade_seq`` values per ``instrument_name``. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: pd.DataFrame: Rows describing missing sequence values per instrument. """ return analysis.get_missing_trade_seq(self.engine, schema_name, table_name)
[docs] def get_nonconsecutive_trade_seq( self, schema_name: str, table_name: str, ) -> pd.DataFrame: """ Return all ``trade_seq`` rows where the sequence is not consecutive. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: pd.DataFrame: Rows with gaps in ``trade_seq`` per instrument. """ return analysis.get_nonconsecutive_trade_seq(self.engine, schema_name, table_name)
[docs] def get_duplicate_rows(self, schema_name: str, table_name: str) -> pd.DataFrame: """ Return all duplicate rows where ``(instrument_name, trade_seq)`` occurs multiple times. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: pd.DataFrame: DataFrame containing all duplicate rows. """ return analysis.get_duplicate_rows(self.engine, schema_name, table_name)
[docs] def get_null_summary(self, schema_name: str, table_name: str) -> pd.DataFrame: """ Create (if necessary) and invoke a function that summarizes NULL values in a table. The created function is named according to the pattern: ``check_nulls_in_{schema_name}_{table_name}``. Args: schema_name (str): Schema name. table_name (str): Table name. Returns: pd.DataFrame: Summary of NULL counts per column and instrument name. """ return analysis.get_null_summary(self.engine, schema_name, table_name)
[docs] def get_hypertable_size(self, schema_name: str, table_name: str) -> str: """ Return the total size of a TimescaleDB hypertable as a formatted string (for example ``"123 MB"``). The implementation: 1. Retrieves the internal hypertable ID from ``_timescaledb_catalog.hypertable``. 2. Summarizes the size of all chunks in ``_timescaledb_internal``. Args: schema_name (str): Schema name of the hypertable (for example ``"raw_data"``). table_name (str): Hypertable name (for example ``"btc_weekly"``). Returns: str: Total size formatted by ``pg_size_pretty`` (for example ``"512 MB"``, ``"3 GB"``). Raises: ValueError: If the hypertable cannot be found. """ return analysis.get_hypertable_size(self.engine, schema_name, table_name)
[docs] def get_row_count(self, schema_name: str, table_name: str) -> int: """ Return the number of rows in a table. Args: schema_name (str): Schema name (for example ``"raw_data"``). table_name (str): Table name (for example ``"btc_weekly"``). Returns: int: Number of rows in the table. """ return analysis.get_row_count(self.engine, schema_name, table_name)
[docs] def insert_hypertable_on_conflict( self, schema_name: str, table_name: str, df: pd.DataFrame, time_column: str = "timestamp", ) -> None: """ Insert a DataFrame into a TimescaleDB hypertable and avoid duplicates via ``ON CONFLICT``. The function creates the table and columns as needed and ensures a UNIQUE index on ``(instrument_name, trade_seq, timestamp)``. All index columns must be present in the DataFrame. Args: schema_name (str): Target schema. table_name (str): Target table name. df (pd.DataFrame): Data to insert. time_column (str): Name of the time column used by TimescaleDB. """ analysis.insert_hypertable_on_conflict( self.engine, schema_name, table_name, df, time_column, )