Migration commit from heyvince.co to heyvince.ca

This commit is contained in:
vgoineau 2024-05-03 09:24:48 -04:00
commit 2c898da04f
24 changed files with 679 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
__pycache__/
*.egg-info

0
README.md Normal file
View File

19
cli.py Normal file
View File

@ -0,0 +1,19 @@
import click
from dft.commands.migrate import createmigration, migrate
from dft.commands.load import load
@click.group()
def cli():
pass
#main.add_command(init)
#cli.add_command(debug)
cli.add_command(createmigration)
cli.add_command(migrate)
cli.add_command(load)
#main.add_command(deploy)
if __name__ == "__main__":
cli()

1
dft/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import base

1
dft/base/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import mixin

17
dft/base/config.py Normal file
View File

@ -0,0 +1,17 @@
import yaml
def get_config_from_yml(filepath="dft_project.yml"):
try:
with open(filepath, 'r') as file:
data = yaml.safe_load(file)
except Exception as e:
return e
else:
return data
def load_config():
data = get_config_from_yml()
return data
config = load_config()

50
dft/base/connector.py Normal file
View File

@ -0,0 +1,50 @@
import yaml
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from .config import config as cfg
import pandas as pd
class SQLConnector():
def __init__(self, src_name, filepath):
super().__init__(src_name, filepath)
def _get_source_url(self):
try:
with open(os.path.expanduser("~")+"/.dft/sources.yml",'r') as file:
data = yaml.safe_load(file)
target = data[cfg["profile"]]['target']
src_info = data[cfg["profile"]][target]["extract_from"][self.name]
src_url = URL.create(
src_info["connector"],
username=src_info["user"],
password=src_info["password"],
host=src_info["host"],
port=src_info["port"],
database=src_info["database"],
query={
"driver": src_info["driver"],
},
)
except Exception as e:
return e
else:
return src_url
def _create_engine(self):
try:
engine = create_engine(self._get_source_url())
except Exception as e:
return e
else:
return engine
def read_query(self, query) -> pd.DataFrame:
try:
df = pd.read_sql(query, self._create_engine())
except Exception as e:
return e
else:
return df

103
dft/base/files.py Normal file
View File

@ -0,0 +1,103 @@
from prefect import task
import os
import re
from jinja2 import Environment, PackageLoader
from .config import config as cfg
def _list_to_dict(input_list):
result_dict = {}
for item in input_list:
if '=' in item:
# Split the item on the first '=' and strip spaces and double quotes
key, value = item.split('=', 1)
key = key.strip()
value = value.strip().strip('"')
result_dict[key] = value
return result_dict
def delete_file(directory, donot=[]):
for root, dirs, files in os.walk(directory, topdown=False):
for name in files:
file_path = os.path.join(root, name)
if name not in donot:
try:
os.unlink(file_path)
print(f"Removing file: {file_path}")
except Exception as e:
print(f"Failed to remove {file_path}. Reason: {e}")
def generate_init(query_paths):
def get_declaration(query_paths):
model_names = []
for filepath in query_paths:
bn, filename, filetype = get_filename_from_path(filepath)
model_names.append(f'from .{filename} import {filename}')
return model_names
def render_init(declarations):
try:
env = Environment(loader=PackageLoader('dft', 'templates'))
template = env.get_template('model_init.jinja2')
# Render the template with variables
rendered_content = template.render(
model_names=declarations
)
except Exception as e:
return e
else:
print("Init is rendered")
return rendered_content
lst_model = get_declaration(query_paths)
content = render_init(lst_model)
try:
with open(f'{cfg["migration-path"]}/models/__init__.py', 'w') as f:
f.write(content)
except Exception as e:
return e
else:
print(f"Init file is updated")
return True
def get_filename_from_path(filepath):
try:
bn = os.path.basename(filepath)
filename = os.path.splitext(bn)
except Exception as e:
return e
else:
return bn, filename[0], filename[1]
def get_filepaths(directories):
lst_filepaths = []
# Walk through all directories and files in the given directory
for dir in directories:
for root, dirs, files in os.walk(dir):
for file in files:
# Create full path by joining root with file name
full_path = os.path.join(root, file)
lst_filepaths.append(full_path)
return lst_filepaths
def process_query_file(filepath):
with open(filepath, 'r') as file:
content = file.read()
# Extract key-value pairs from within '{{ }}'
match = re.search(r"{{\s*config\s*=\s*(\[[^\]]+\])\s*}}", content)
if match:
# Try to parse the extracted content into a Python dictionary
config_str = match.group(1)
config_str = re.sub(r'\s+', ' ', config_str).strip()
config_str = config_str.strip('[]')
config_lst = config_str.split(',')
config = _list_to_dict(config_lst)
else:
config = None
content = re.sub(r"{{.*?}}", "", content, flags=re.DOTALL)
content = re.sub(r'\s+', ' ', content).strip()
return config, content

19
dft/base/loader.py Normal file
View File

@ -0,0 +1,19 @@
from .config import config as cfg
import re
class SQLLoader():
def __init__(self, src_name, filepath) -> None:
super().__init__(src_name, filepath)
def _get_initial_load_query(self, query_name, query):
match = re.search(r'with ' + query_name + ' as \((.*?)\)', query, re.DOTALL)
if match:
initial_load_query = match.group(1).strip()
return initial_load_query
else:
return None
def init_load(self):
self._is_table_empty()
self._get_initial_load_query(query_name, query)

24
dft/base/migrator.py Normal file
View File

@ -0,0 +1,24 @@
from prefect import task
from alembic.config import Config
from alembic import command
from .config import config as cfg
import yaml
import os
def get_target_url():
with open(os.path.expanduser("~")+"/.dft/sources.yml",'r') as file:
data = yaml.safe_load(file)
target = data[cfg["profile"]]['target']
conn_info = data[cfg["profile"]][target]["load_to"][cfg["target-name"]]
connection_url = f'mssql+pyodbc://{conn_info["user"]}:{conn_info["password"]}@{conn_info["host"]}:{conn_info["port"]}/{conn_info["database"]}?driver={conn_info["driver"].replace(" ", "+")}'
return connection_url
def create_migration_file(script_message):
alembic_cfg = Config(f'{cfg["migration-path"]}/alembic.ini')
alembic_cfg.set_main_option("sqlalchemy.url", get_target_url())
command.revision(alembic_cfg, autogenerate=True, message=script_message)
def migrate():
alembic_cfg = Config(f'{cfg["migration-path"]}/alembic.ini')
alembic_cfg.set_main_option("sqlalchemy.url", get_target_url())
command.upgrade(alembic_cfg, "head")

9
dft/base/mixin.py Normal file
View File

@ -0,0 +1,9 @@
from sqlalchemy import Column, Integer
from sqlalchemy.orm import sessionmaker
import pandas as pd
class DFTMixin():
#TODO: add repeatable field for slow dim etc.
Pk = Column("Pk", Integer, primary_key=True)
batch_id = Column("BatchID", Integer, nullable=True)

91
dft/base/modeler.py Normal file
View File

@ -0,0 +1,91 @@
from . import files
from jinja2 import Environment, PackageLoader
from sqlalchemy import text
from datetime import datetime, date, time
from decimal import Decimal
from .config import config as cfg
from . import files
class Modeler():
def __init__(self, src_name, filepath):
self.name = src_name
self.filepath = filepath
self.bn, self.filename, self.filetype = files.get_filename_from_path(self.filepath)
def infer_type_from_value(self, value):
if isinstance(value, int):
return 'Integer' # Maps to INTEGER in SQL
elif isinstance(value, str):
return 'String' # Maps to VARCHAR in SQL
elif isinstance(value, float):
return 'Float' # Maps to FLOAT in SQL
elif isinstance(value, Decimal):
# Infer precision and scale for decimal values
return f'Numeric(12, 3)' # Maps to NUMERIC in SQL
elif isinstance(value, datetime):
return 'DateTime' # Maps to DATETIME in SQL
elif isinstance(value, date):
return 'Date' # Maps to DATE in SQL
elif isinstance(value, time):
return 'Time' # Maps to TIME in SQL
elif isinstance(value, bool):
return 'Boolean' # Maps to BOOLEAN in SQL
elif isinstance(value, bytes):
return 'LargeBinary' # Maps to BLOB or BYTEA in SQL
else:
return 'String' # Default fall-back type
def render_model(self, columns):
version = datetime.now().strftime("%Y%m%d%H%M%S")
try:
env = Environment(loader=PackageLoader('dft', 'templates'))
template = env.get_template('model_declaration.jinja2')
rendered_content = template.render(
from_file = f"{(version)}_{str(self.filepath)}",
migration_path=cfg["migration-path"],
model_name=self.filename,
revision_id=version,
create_date=datetime.now().isoformat(),
columns=columns
)
except Exception as e:
return e
else:
print(f"Model {self.filename} content is rendered")
return rendered_content
def write_model_to_file(self, content):
to_file = cfg["migration-path"] + f"/models/{self.filename}.py"
try:
# Write the rendered content to a file
with open(to_file, 'w') as f:
f.write(content)
except Exception as e:
return e
else:
print(f"Model {self.filename} has been written to file {to_file} with success !")
return True
class SQLModeler(Modeler):
def __init__(self, src_name, filepath):
super().__init__(src_name, filepath)
def infer_model_columns(self, result):
first_row = result.fetchone()
columns = []
for column, value in zip(result.keys(), first_row):
sql_type = self.infer_type_from_value(value)
column = f'{column} = Column("{column}", {sql_type})'
columns.append(column)
return columns
def generate_model(self, query):
with self._create_engine().connect() as cnt:
rst = cnt.execute(text(query))
columns = self.infer_model_columns(rst)
content = self.render_model(columns)
self.write_model_to_file(content)

30
dft/base/sources.py Normal file
View File

@ -0,0 +1,30 @@
import yaml
import os
from .connector import SQLConnector
from .modeler import SQLModeler
from .loader import SQLLoader
from .config import config as cfg
import pandas as pd
def get_source_type(cnt_name, action="extract_from"):
with open(os.path.expanduser("~")+"/.dft/sources.yml",'r') as file:
data = yaml.safe_load(file)
target = data[cfg["profile"]]['target']
return data[cfg["profile"]][target][action][cnt_name]["type"]
def get_source(src_name, filepath, action="extract_from"):
src_type = get_source_type(src_name, action)
#TODO: SelectConnector depending on other type
if src_type == "sql":
# SQLSource
return SQLSource(src_name, filepath)
else:
return "Source type not supported for now."
class SQLSource(SQLModeler, SQLConnector, SQLLoader):
type = "sql"
def __init__(self, src_name, filepath) -> None:
super().__init__(src_name, filepath)

0
dft/commands/__init__.py Normal file
View File

7
dft/commands/deploy.py Normal file
View File

@ -0,0 +1,7 @@
import click
@click.command()
@click.option('-c', default="dft_project.yml", help='Build your flow with scheduler and deploy them to your server.')
def deploy(config):
click.echo("Deploying your query files to server...")

16
dft/commands/init.py Normal file
View File

@ -0,0 +1,16 @@
import click
@click.command()
def init():
click.echo("Initializing your dft environment...")
# Create folder .dft inside home
# Add profile.yml file inside .dft folder
# Init Alembic with folder migrations
# Move alembic.ini
# Create models folder inside migrations
# Ajouter migration/models/ base.py
# Create files inside folder models
# Generate first init.py with dftbatch + base
# Add to env.py from migration import models + models.Base.metadata
# Create folder query
# Add file with dft_projects.yml

35
dft/commands/load.py Normal file
View File

@ -0,0 +1,35 @@
import click
from ..base import config, files, sources
from ..base.config import config as cfg
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.sql import select, insert
import pandas as pd
from datetime import datetime
@click.command()
def load():
click.echo("Loading data from your query to your target db...")
#TODO: Validate init is done
# Iterate over query-paths
query_paths = files.get_filepaths(cfg["query-paths"])
for filepath in query_paths:
conf, query = files.process_query_file(filepath)
import pdb; pdb.set_trace()
# Do insert data to target db
src = sources.get_source(conf["extract_from"], filepath)
rst_src = src.read_query(query)
cnt_to = sources.get_source(
cnt_name=cfg["target-name"],
filepath=filepath
)
# add batch_id to df
#df["BatchID"] = 20100
# Manipulate the DataFrame if necessary (e.g., filtering, adding new columns)
# Example: df['new_column'] = df['column1'] * 2
#df.to_sql(cnt_to.filename, con=cnt_to.engine, if_exists='append', index=False)

32
dft/commands/migrate.py Normal file
View File

@ -0,0 +1,32 @@
import click
from ..base import files, sources
from ..base.config import config as cfg
from ..base import migrator as mig
from alembic.config import Config
@click.command()
@click.option('-m', default="", help='Description of your migration')
def createmigration(m):
#TODO: Validate init is done
click.echo("Generating migration files... but first lets remove your models")
# Delete files from model folder
files.delete_file(cfg["migration-path"]+"/models", donot=["base.py"])
# Iterate over query-paths
query_paths = files.get_filepaths(cfg["query-paths"])
for filepath in query_paths:
conf, query = files.process_query_file(filepath)
src = sources.get_source(conf["extract_from"],filepath)
src.generate_model(query)
# Generate Init file
files.generate_init(query_paths)
mig.create_migration_file(script_message=str(m))
click.echo("Creation migration is done !")
@click.command()
def migrate():
mig.migrate()

View File

@ -0,0 +1,32 @@
profile-name:
target: dev
dev:
extract_from:
sql_server:
type: sql
connector: mssql+pyodbc
driver: 'ODBC Driver 17 for SQL Server'
host: localhost
port: 1433
database: dabase_name
schema: dbo
user: user_read
password: yourpassword
encrypt: false
# source2:
# connector:test
load_to:
sql_server:
type: sql
connector: mssql+pyodbc
driver: 'ODBC Driver 17 for SQL Server'
host: localhost
port: 1433
database: dabase_name
schema: dbo
user: user_write
password: password
encrypt: false
# prod:
# extract_from:
# load_to:

View File

@ -0,0 +1,16 @@
# Name your project! Project names should contain only lowercase characters
name: 'waffle_shop'
version: '1.0.0'
config-version: 2
# Just like dft, this setting configures which "profile" dft uses for this project.
# Do not forget to configure your connection.yml file in ~/.dft folder
profile: 'profile-name'
target-name: 'target_connection_name' # Change Target file in .dft
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
test-paths: ["tests"]
query-paths: ["queries"]
migration-path: "migration"

View File

@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, Float, DateTime, Numeric, Date, Time, Boolean, LargeBinary
from dft.base.mixin import DFTMixin
from .base import Base
####
# Model generated by DFT
# - From : {{ from_file }}
# - Version : {{ revision_id }}
# - Created Date : {{ create_date }}
####
class {{ model_name }}(Base, DFTMixin):
__tablename__ = '{{ model_name }}'
{% for item in columns %}
{{ item }} {% endfor %}

View File

@ -0,0 +1,3 @@
from .base import Base
{% for item in model_names %}
{{ item }}{% endfor %}

131
requirements.txt Normal file
View File

@ -0,0 +1,131 @@
agate==1.7.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.5
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
azure-core==1.30.1
azure-identity==1.16.0
Babel==2.14.0
build==1.2.1
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.3
cryptography==42.0.5
dateparser==1.2.0
dbt-core==1.7.11
dbt-extractor==0.5.1
dbt-fabric==1.7.4
dbt-semantic-interfaces==0.4.4
dbt-sqlserver==1.7.4
Deprecated==1.2.14
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.0
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.42.1
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib-metadata==6.11.0
importlib_resources==6.1.3
isodate==0.6.1
itsdangerous==2.1.2
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
leather==0.4.0
Logbook==1.5.3
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mashumaro==3.12
mdurl==0.1.2
minimal-snowplow-tracker==0.0.2
more-itertools==10.2.0
msal==1.28.0
msal-extensions==1.1.0
msgpack==1.0.8
networkx==3.3
numpy==1.26.4
oauthlib==3.2.2
opentelemetry-api==1.24.0
orjson==3.10.0
packaging==24.0
pandas==2.2.2
parsedatetime==2.6
pathspec==0.11.2
pendulum==2.1.2
portalocker==2.8.2
prefect==2.16.9
protobuf==4.25.3
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.0
pydantic_core==2.18.1
Pygments==2.17.2
PyJWT==2.8.0
pyodbc==5.0.1
pyproject_hooks==1.0.0
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytimeparse==1.1.8
pytz==2024.1
pytzdata==2020.1
PyYAML==6.0.1
readchar==4.0.6
referencing==0.34.0
regex==2023.12.25
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
sqlparse==0.4.4
text-unidecode==1.3
toml==0.10.2
tomli==2.0.1
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==1.26.18
uvicorn==0.28.1
websocket-client==1.7.0
websockets==12.0
wrapt==1.16.0
zipp==3.18.1

26
setup.py Normal file
View File

@ -0,0 +1,26 @@
from setuptools import setup, find_packages
setup(
name='dft',
version='0.1.0',
packages=find_packages(),
include_package_data=True,
description='Data flow tools',
long_description='Use dft to do extraction and load in your data pipeline',
author='Vincent Goineau',
author_email='vgoineau@ctocollective.com',
package_data={
'dft': ['templates/*.jinja'],
},
install_requires=[
'click',
'sqlalchemy',
'alembic',
'jinja2',
],
entry_points={
'console_scripts': [
'dft = cli:cli',
],
},
)