dft-core/dft/base/connector.py

51 lines
1.5 KiB
Python

import yaml
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from .config import config as cfg
import pandas as pd
class SQLConnector():
def __init__(self, src_name, filepath):
super().__init__(src_name, filepath)
def _get_source_url(self):
try:
with open(os.path.expanduser("~")+"/.dft/sources.yml",'r') as file:
data = yaml.safe_load(file)
target = data[cfg["profile"]]['target']
src_info = data[cfg["profile"]][target]["extract_from"][self.name]
src_url = URL.create(
src_info["connector"],
username=src_info["user"],
password=src_info["password"],
host=src_info["host"],
port=src_info["port"],
database=src_info["database"],
query={
"driver": src_info["driver"],
},
)
except Exception as e:
return e
else:
return src_url
def _create_engine(self):
try:
engine = create_engine(self._get_source_url())
except Exception as e:
return e
else:
return engine
def read_query(self, query) -> pd.DataFrame:
try:
df = pd.read_sql(query, self._create_engine())
except Exception as e:
return e
else:
return df