103 lines
3.3 KiB
Python
103 lines
3.3 KiB
Python
from prefect import task
|
|
import os
|
|
import re
|
|
from jinja2 import Environment, PackageLoader
|
|
from .config import config as cfg
|
|
|
|
|
|
def _list_to_dict(input_list):
|
|
result_dict = {}
|
|
for item in input_list:
|
|
if '=' in item:
|
|
# Split the item on the first '=' and strip spaces and double quotes
|
|
key, value = item.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"')
|
|
result_dict[key] = value
|
|
return result_dict
|
|
|
|
def delete_file(directory, donot=[]):
|
|
for root, dirs, files in os.walk(directory, topdown=False):
|
|
for name in files:
|
|
file_path = os.path.join(root, name)
|
|
if name not in donot:
|
|
try:
|
|
os.unlink(file_path)
|
|
print(f"Removing file: {file_path}")
|
|
except Exception as e:
|
|
print(f"Failed to remove {file_path}. Reason: {e}")
|
|
|
|
def generate_init(query_paths):
|
|
def get_declaration(query_paths):
|
|
model_names = []
|
|
for filepath in query_paths:
|
|
bn, filename, filetype = get_filename_from_path(filepath)
|
|
model_names.append(f'from .{filename} import {filename}')
|
|
return model_names
|
|
|
|
def render_init(declarations):
|
|
try:
|
|
env = Environment(loader=PackageLoader('dft', 'templates'))
|
|
template = env.get_template('model_init.jinja2')
|
|
|
|
# Render the template with variables
|
|
rendered_content = template.render(
|
|
model_names=declarations
|
|
)
|
|
except Exception as e:
|
|
return e
|
|
else:
|
|
print("Init is rendered")
|
|
return rendered_content
|
|
|
|
lst_model = get_declaration(query_paths)
|
|
content = render_init(lst_model)
|
|
try:
|
|
with open(f'{cfg["migration-path"]}/models/__init__.py', 'w') as f:
|
|
f.write(content)
|
|
except Exception as e:
|
|
return e
|
|
else:
|
|
print(f"Init file is updated")
|
|
return True
|
|
|
|
def get_filename_from_path(filepath):
|
|
try:
|
|
bn = os.path.basename(filepath)
|
|
filename = os.path.splitext(bn)
|
|
except Exception as e:
|
|
return e
|
|
else:
|
|
return bn, filename[0], filename[1]
|
|
|
|
def get_filepaths(directories):
|
|
lst_filepaths = []
|
|
# Walk through all directories and files in the given directory
|
|
for dir in directories:
|
|
for root, dirs, files in os.walk(dir):
|
|
for file in files:
|
|
# Create full path by joining root with file name
|
|
full_path = os.path.join(root, file)
|
|
lst_filepaths.append(full_path)
|
|
return lst_filepaths
|
|
|
|
def process_query_file(filepath):
|
|
with open(filepath, 'r') as file:
|
|
content = file.read()
|
|
|
|
# Extract key-value pairs from within '{{ }}'
|
|
match = re.search(r"{{\s*config\s*=\s*(\[[^\]]+\])\s*}}", content)
|
|
if match:
|
|
# Try to parse the extracted content into a Python dictionary
|
|
config_str = match.group(1)
|
|
config_str = re.sub(r'\s+', ' ', config_str).strip()
|
|
config_str = config_str.strip('[]')
|
|
config_lst = config_str.split(',')
|
|
config = _list_to_dict(config_lst)
|
|
else:
|
|
config = None
|
|
|
|
content = re.sub(r"{{.*?}}", "", content, flags=re.DOTALL)
|
|
content = re.sub(r'\s+', ' ', content).strip()
|
|
|
|
return config, content |