"""Module to interact with s3"""
import os
from datetime import datetime
import boto3
from src.postgres_interface import PostgresInterface
from src.utils import custom_logger
from config import BACKUP_TABLES
[docs]class S3Interface:
"""
Class to interact with s3
"""
def __init__(self):
self.s3_client = boto3.client("s3")
self.s3_resource = boto3.resource("s3")
self.logger = custom_logger(logger_name="s3_interface")
[docs] def get_bucket_names(self) -> list:
"""
Method to get the names of the buckets in s3
Returns
-------
list
list with the names of the buckets in s3
"""
response = self.s3_client.list_buckets()
buckets = [bucket["Name"] for bucket in response["Buckets"]]
return buckets
[docs] def backup_tables(self, bucket_name: str = "ahnazary-finance-prod"):
"""
Method to backup tables to s3
"""
pg_interface = PostgresInterface()
for table_name in BACKUP_TABLES:
df = pg_interface.read_table_to_df(table=table_name)
current_date = datetime.now().strftime("%Y-%m-%d")
file_path = f"{os.getcwd()}/finance/src/data/{table_name}.parquet"
# write df to parquet file in data directory
df.to_parquet(file_path)
# upload parquet file to s3
self.s3_client.upload_file(
Filename=file_path,
Bucket=bucket_name,
Key=f"backup/{table_name}/{current_date}.parquet",
)
self.logger.info(f"Uploaded {table_name} to s3")