Source code for timescale_access.read

from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine


[docs] def get_existing_timestamps( engine: Engine, schema_name: str, table_name: str, column: str, ) -> List[datetime]: """ Return a sorted list of distinct timestamps from a given column. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. table_name (str): Table name. column (str): Column name. Returns: List[datetime]: Sorted list of timestamps. """ full_table = f"{schema_name}.{table_name}" query = text(f"SELECT DISTINCT {column} FROM {full_table} ORDER BY {column}") with engine.connect() as conn: result = conn.execute(query) return [row[0] for row in result]
[docs] def get_table_names(engine: Engine, schema_name: str) -> List[str]: """ Return all table names in a given schema. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. Returns: List[str]: List of table names. """ return inspect(engine).get_table_names(schema_name)
[docs] def get_column_names(engine: Engine, schema_name: str, table_name: str) -> List[str]: """ Return all column names of a given table. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. table_name (str): Table name. Returns: List[str]: List of column names. """ inspector = inspect(engine) return [col["name"] for col in inspector.get_columns(table_name, schema_name)]
[docs] def get_distinct_values( engine: Engine, schema_name: str, table_name: str, column_name: str, ) -> List[str]: """ Return all distinct, non-null values of a column in a table. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. table_name (str): Table name. column_name (str): Column name. Returns: List[str]: List of distinct values in the column. """ query = text( f""" SELECT DISTINCT "{column_name}" FROM "{schema_name}"."{table_name}" WHERE "{column_name}" IS NOT NULL """ ) with engine.connect() as conn: result = conn.execute(query) return [row[0] for row in result]
[docs] def get_indexes(engine: Engine, schema_name: str, table_name: str) -> List[Dict[str, Any]]: """ Return index information for a given table. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. table_name (str): Table name. Returns: List[dict]: List of index metadata dictionaries. """ inspector = inspect(engine) return inspector.get_indexes(table_name, schema_name)
[docs] def get_table( engine: Engine, schema_name: str, table_name: str, filters: Optional[Dict[str, Union[Any, Dict[str, Any]]]] = None, ) -> pd.DataFrame: """ Load a table as a pandas DataFrame with optional filter conditions. Args: engine (Engine): SQLAlchemy engine. schema_name (str): Schema name. table_name (str): Table name. filters (Optional[Dict[str, Union[Any, Dict[str, Any]]]], optional): Filter specification for the WHERE clause. Examples: - Single/multiple values: ``{"instrument_name": ["BTC-14MAR25", "ETH-14MAR25"]}`` - Range filter: ``{"trade_seq": {"between": (100, 200)}}`` Returns: pd.DataFrame: Filtered table. """ full_table = f"{schema_name}.{table_name}" base_query = f"SELECT * FROM {full_table}" conditions: List[str] = [] if filters: for column, value in filters.items(): # Range filter if isinstance(value, dict) and "between" in value: low, high = value["between"] conditions.append(f"{column} BETWEEN {low} AND {high}") # Multiple values (IN) elif isinstance(value, (list, tuple, set)): value_list = ", ".join( f"'{v}'" if isinstance(v, str) else str(v) for v in value ) conditions.append(f"{column} IN ({value_list})") # Single equality else: value_str = f"'{value}'" if isinstance(value, str) else str(value) conditions.append(f"{column} = {value_str}") where_clause = f" WHERE {' AND '.join(conditions)}" if conditions else "" query = base_query + where_clause + " ORDER BY trade_seq" return pd.read_sql(query, engine)
[docs] def get_databases(engine: Engine) -> List[str]: """ Return all non-template databases in the PostgreSQL instance. Args: engine (Engine): SQLAlchemy engine. Returns: List[str]: List of database names. """ with engine.connect() as conn: result = conn.execute( text("SELECT datname FROM pg_database WHERE datistemplate = false;") ) return [row[0] for row in result]
[docs] def get_roles(engine: Engine) -> List[Dict[str, Any]]: """ Return all roles and their basic privileges. Args: engine (Engine): SQLAlchemy engine. Returns: List[dict]: List of dictionaries with role metadata. """ with engine.connect() as conn: result = conn.execute( text( """ SELECT rolname, rolsuper, rolcreaterole, rolcreatedb FROM pg_roles; """ ) ) return [dict(row._mapping) for row in result]
[docs] def get_role_memberships(engine: Engine) -> List[Dict[str, Any]]: """ Return all role memberships in the database. Args: engine (Engine): SQLAlchemy engine. Returns: List[dict]: List of mappings from member to role. """ with engine.connect() as conn: result = conn.execute( text( """ SELECT r.rolname AS role_name, m.rolname AS member_name FROM pg_auth_members am JOIN pg_roles r ON r.oid = am.roleid JOIN pg_roles m ON m.oid = am.member; """ ) ) return [dict(row._mapping) for row in result]
[docs] def get_active_connections(engine: Engine) -> List[Dict[str, Any]]: """ Return information about active database connections. Args: engine (Engine): SQLAlchemy engine. Returns: List[dict]: List of active connections with database name, user and client IP. """ with engine.connect() as conn: result = conn.execute( text("SELECT datname, usename, client_addr FROM pg_stat_activity;") ) return [dict(row._mapping) for row in result]
[docs] def get_schemas(engine: Engine) -> List[str]: """ Return a list of user-defined schemas, excluding system and TimescaleDB internals. Args: engine (Engine): SQLAlchemy engine. Returns: List[str]: List of schema names. """ ignored_schemas = ( "pg_catalog", "information_schema", "pg_toast", "timescaledb_information", "timescaledb_experimental", "_timescaledb_internal", "_timescaledb_functions", "_timescaledb_debug", "_timescaledb_config", "_timescaledb_catalog", "_timescaledb_cache", ) query = text( """ SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN :ignored; """ ) with engine.connect() as conn: result = conn.execute(query, {"ignored": ignored_schemas}) return [row[0] for row in result]