Source code for finance.src.postgres_interface

"""
Module to interact with postgres databases

It contains generic methods to interact with postgres databases regardless of
the data they contain
"""

import os

import pandas as pd
import sqlalchemy
from dotenv import load_dotenv
from sqlalchemy import MetaData, Table, select, text
from sqlalchemy.dialects.postgresql import insert
from src.utils import emit_log

from config import LOG_LEVEL


[docs]class PostgresInterface: """ Class to interact with postgres databases """ def __init__(self): load_dotenv() self.provider = os.environ.get("PROVIDER")
[docs] def create_engine(self) -> sqlalchemy.engine.Engine: """ function that creates engines to connect to postgres databases Returns ------- dict dictionary with the engines to connect to the databases """ conn_str = os.environ.get(f"{self.provider}_POSTGRES_CONNECTION_STRING") user = os.environ.get(f"{self.provider}_POSTGRES_USER") password = os.environ.get(f"{self.provider}_POSTGRES_PASSWORD") host = os.environ.get(f"{self.provider}_POSTGRES_HOST") port = os.environ.get(f"{self.provider}_POSTGRES_PORT") db = os.environ.get(f"{self.provider}_POSTGRES_DB") ssl_mode = "?sslmode=" + os.environ.get(f"{self.provider}_SSL_MODE") or "" engine = ( sqlalchemy.create_engine( f"postgresql://{user}:{password}@{host}:{port}/{db}{ssl_mode}" ) if not conn_str else sqlalchemy.create_engine(conn_str) ) return engine
[docs] def create_table_object( self, table_name: str, engine: sqlalchemy.engine.Engine, schema: str = "finance" ) -> sqlalchemy.Table: """ Method to create a table object Parameters ---------- table_name : str name of the table to create the object for engine : sqlalchemy.engine.Engine engine to connect to the database schema : str schema of the table default: stocks Returns ------- sqlalchemy.Table table object """ metadata = MetaData() table = Table(table_name, metadata, autoload_with=engine, schema=schema) return table
[docs] def insert_batch( self, table: sqlalchemy.Table, batch: list, conn: sqlalchemy.engine.Connection ) -> None: """ Method to insert a batch of data into a table Parameters ---------- table : str table to insert data into data : list list of tuples with the data to insert into the table Returns ------- None """ # statement to insert data into neon database emit_log(f"Inserting batch of {len(batch)} rows", log_level=LOG_LEVEL) insert_statement = insert(table).values(batch).on_conflict_do_nothing() conn.execute(insert_statement) conn.commit()
[docs] def read_table_to_df(self, table: str, schema: str = "finance") -> pd.DataFrame: """ Method to read a table into a dataframe Parameters ---------- table : tabble name to read Returns ------- pd.DataFrame dataframe with the data from the table """ engine = self.create_engine() table = self.create_table_object(table, engine, schema) query = select(table) with engine.connect() as conn: result = conn.execute(query).fetchall() df = pd.DataFrame(result, columns=table.columns.keys()) return df