import os
from typing import Any

import pandas as pd
from sqlalchemy import create_engine

from core.config import settings


class HybridDataRepository:
    def __init__(self) -> None:
        self.mode = settings.data_source_mode.upper()
        self._engine = None

    def _db_engine(self):
        if self._engine is None:
            dsn = (
                f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}"
                f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}"
            )
            self._engine = create_engine(dsn, pool_pre_ping=True)
        return self._engine

    def _read_csv(self, path: str) -> pd.DataFrame:
        if not os.path.exists(path):
            return pd.DataFrame()
        return pd.read_csv(path)

    def _read_db_events(self) -> pd.DataFrame:
        query = """
        SELECT
            td.id AS event_id,
            td.dispatch_site_id AS site_id,
            td.vehicle_number AS vehicle_id,
            td.material_weight AS dispatch_weight,
            td.received_weight,
            td.dispatch_date,
            td.status,
            COALESCE(TIMESTAMPDIFF(MINUTE, td.created_at, td.arrived_at), 0) AS actual_duration_minutes
        FROM transport_dispatches td
        """
        return pd.read_sql(query, self._db_engine())

    def load_events(self) -> pd.DataFrame:
        if self.mode == "CSV_ONLY":
            return self._normalize(self._read_csv(settings.events_csv_path))

        if self.mode == "DB_FIRST":
            return self._normalize(self._read_db_events())

        # HYBRID
        try:
            db_df = self._read_db_events()
        except Exception:
            db_df = pd.DataFrame()

        csv_df = self._read_csv(settings.events_csv_path)
        if db_df.empty and csv_df.empty:
            return pd.DataFrame()
        if db_df.empty:
            return self._normalize(csv_df)
        if csv_df.empty:
            return self._normalize(db_df)
        merged = pd.concat([db_df, csv_df], ignore_index=True).drop_duplicates(subset=["event_id"], keep="last")
        return self._normalize(merged)

    def _normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        if df.empty:
            return df
        out = df.copy()
        for col in ["dispatch_weight", "received_weight", "actual_duration_minutes"]:
            if col in out.columns:
                out[col] = pd.to_numeric(out[col], errors="coerce").fillna(0.0)
        if "status" not in out.columns:
            out["status"] = "unknown"
        if "site_id" not in out.columns:
            out["site_id"] = 0
        if "vehicle_id" not in out.columns:
            out["vehicle_id"] = "unknown"
        return out


def dict_to_frame(payload: dict[str, Any]) -> pd.DataFrame:
    return pd.DataFrame([payload])
