import pandas as pd from sqlalchemy import create_engine, inspect, text from sqlalchemy.exc import ProgrammingError from dotenv import load_dotenv import os load_dotenv() POSTGRES_DATABASE = os.getenv('POSTGRES_DATABASE') POSTGRES_USERNAME = os.getenv('POSTGRES_USERNAME') POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD') CSV_FILE_PATH = '../data/2015-street-tree-census-tree-data.csv' DATASET_TABLE = "street_tree_census_tree_data" # Функция преобразует тип данных pandas в соответствующий тип для PostgreSQL def dtype_to_postgres(dtype): type_mapping = { 'object': 'text', 'int64': 'bigint', 'float64': 'double precision', 'float32': 'real', 'bool': 'boolean', 'datetime64[ns]': 'timestamp without time zone', 'timedelta[ns]': 'interval' } # Проверяем, существует ли соответствие для данного типа данных if dtype.name in type_mapping: return type_mapping[dtype.name] else: raise ValueError(f"Unsupported dtype: {dtype.name}") # Создаем соединение с базой данных database_url = f'postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@localhost/{POSTGRES_DATABASE}' engine = create_engine(database_url) # Читаем CSV-файл df = pd.read_csv(CSV_FILE_PATH) # Получаем имена столбцов из DataFrame column_names = df.columns.tolist() # Создаем SQL-запрос для создания таблицы columns_str = ', '.join( [f'{col.replace(" ", "_")} {dtype_to_postgres(df[col].dtype)}' for col in column_names] ) table_creation_query = f"CREATE TABLE {DATASET_TABLE} ({columns_str});" # Создаем инспектора и проверяем существование таблицы inspector = inspect(engine) if inspector.has_table(DATASET_TABLE): text = f"Таблица '{DATASET_TABLE}' уже существует, перезапись первичного датасета запрещена!" raise ProgrammingError(text, params={}, orig=None) # Выполняем запрос на создание таблицы with engine.connect() as connection: connection.execute(text(table_creation_query)) # Загружаем данные из DataFrame в таблицу df.to_sql(DATASET_TABLE, engine, if_exists='replace', index=False)