dft-core/dft/base/migrator.py

25 lines
1.0 KiB
Python

from prefect import task
from alembic.config import Config
from alembic import command
from .config import config as cfg
import yaml
import os
def get_target_url():
with open(os.path.expanduser("~")+"/.dft/sources.yml",'r') as file:
data = yaml.safe_load(file)
target = data[cfg["profile"]]['target']
conn_info = data[cfg["profile"]][target]["load_to"][cfg["target-name"]]
connection_url = f'mssql+pyodbc://{conn_info["user"]}:{conn_info["password"]}@{conn_info["host"]}:{conn_info["port"]}/{conn_info["database"]}?driver={conn_info["driver"].replace(" ", "+")}'
return connection_url
def create_migration_file(script_message):
alembic_cfg = Config(f'{cfg["migration-path"]}/alembic.ini')
alembic_cfg.set_main_option("sqlalchemy.url", get_target_url())
command.revision(alembic_cfg, autogenerate=True, message=script_message)
def migrate():
alembic_cfg = Config(f'{cfg["migration-path"]}/alembic.ini')
alembic_cfg.set_main_option("sqlalchemy.url", get_target_url())
command.upgrade(alembic_cfg, "head")