226 lines
9.2 KiB
Python
226 lines
9.2 KiB
Python
"""
|
|
Project: Magic Tracker
|
|
Author: Edward Middleton-Smith
|
|
Shuffle & Skirmish
|
|
|
|
Technology: DataStores
|
|
Feature: Base DataStore
|
|
|
|
Description:
|
|
Datastore for Store
|
|
"""
|
|
|
|
# internal
|
|
# from routes import bp_home
|
|
import lib.argument_validation as av
|
|
from business_objects.sql_error import SQL_Error, Parameters_SQL_Error
|
|
from business_objects.tcg.user import User
|
|
# from helpers.helper_db_sql import Helper_DB_SQL
|
|
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
|
|
from extensions import db
|
|
from helpers.helper_app import Helper_App
|
|
# external
|
|
from sqlalchemy import text
|
|
from flask import Flask, session, current_app
|
|
from pydantic import BaseModel, ConfigDict
|
|
import time
|
|
from sqlalchemy.exc import OperationalError
|
|
from typing import ClassVar
|
|
import uuid as uuid_lib
|
|
|
|
|
|
|
|
class DataStore_Base(BaseModel):
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
|
|
@staticmethod
|
|
def db_procedure_execute(proc_name, argument_dict_list = None, argument_types_dict = None):
|
|
"""Execute a PostgreSQL procedure using autocommit to allow internal COMMIT/ROLLBACK"""
|
|
_m = 'DataStore_Base.db_procedure_execute'
|
|
av.val_str(proc_name, 'proc_name', _m)
|
|
proc_string = f'CALL {proc_name}('
|
|
has_arguments = argument_dict_list is not None
|
|
if has_arguments:
|
|
arg_keys = list(argument_dict_list.keys())
|
|
for i in range(len(arg_keys)):
|
|
param_name = arg_keys[i]
|
|
# Add explicit PostgreSQL CAST for typed parameters
|
|
if argument_types_dict and param_name in argument_types_dict:
|
|
type_name = argument_types_dict[param_name].__name__
|
|
pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name)
|
|
if pg_type:
|
|
param_expr = f'CAST(:{param_name} AS {pg_type})'
|
|
else:
|
|
param_expr = f':{param_name}'
|
|
else:
|
|
param_expr = f':{param_name}'
|
|
proc_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}'
|
|
proc_string += ')'
|
|
stmt = text(proc_string)
|
|
# Helper_App.console_log(f'{_m}\nproc_string: {stmt}\nargs: {argument_dict_list}')
|
|
|
|
rows = []
|
|
with db.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
if has_arguments:
|
|
result = conn.execute(stmt, argument_dict_list)
|
|
else:
|
|
result = conn.execute(stmt)
|
|
# Fetch all rows as mappings before connection closes
|
|
if result.returns_rows:
|
|
rows = result.mappings().fetchall()
|
|
# Helper_App.console_log(f'result: {rows}')
|
|
return rows
|
|
|
|
# Map SQLAlchemy types to PostgreSQL type names for CAST()
|
|
TYPE_CAST_MAP: ClassVar[dict[str, str]] = {
|
|
'String': 'varchar',
|
|
'Text': 'text',
|
|
'Integer': 'integer',
|
|
'Boolean': 'boolean',
|
|
'UUID': 'uuid',
|
|
'Uuid': 'uuid',
|
|
}
|
|
|
|
@staticmethod
|
|
def db_function_execute(func_name, argument_dict_list = None, argument_types_dict = None):
|
|
"""Execute a PostgreSQL function that returns a table using SELECT * FROM function_name()"""
|
|
_m = 'DataStore_Base.db_function_execute'
|
|
av.val_str(func_name, 'func_name', _m)
|
|
func_string = f'SELECT * FROM {func_name}('
|
|
has_arguments = argument_dict_list is not None
|
|
if has_arguments:
|
|
arg_keys = list(argument_dict_list.keys())
|
|
for i in range(len(arg_keys)):
|
|
param_name = arg_keys[i]
|
|
# Add explicit PostgreSQL CAST for typed parameters
|
|
if argument_types_dict and param_name in argument_types_dict:
|
|
type_name = argument_types_dict[param_name].__name__
|
|
pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name)
|
|
if pg_type:
|
|
param_expr = f'CAST(:{param_name} AS {pg_type})'
|
|
else:
|
|
param_expr = f':{param_name}'
|
|
else:
|
|
param_expr = f':{param_name}'
|
|
func_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}'
|
|
func_string += ')'
|
|
stmt = text(func_string)
|
|
|
|
# Helper_App.console_log(f'{_m}\nfunc_string: {stmt}\nargs: {argument_dict_list}')
|
|
|
|
if has_arguments:
|
|
result = db.session.execute(stmt, argument_dict_list)
|
|
else:
|
|
result = db.session.execute(stmt)
|
|
# Helper_App.console_log(f'result: {result}')
|
|
return result
|
|
|
|
@staticmethod
|
|
def db_cursor_clear(cursor):
|
|
while cursor.nextset():
|
|
Helper_App.console_log(f'db_cursor_clear\nunexpected result set: {cursor.fetchall()}')
|
|
|
|
@staticmethod
|
|
def get_user_session():
|
|
# Helper_App.console_log('DataStore_Base.get_user_session')
|
|
user = User.from_json(session.get(User.FLAG_USER))
|
|
"""
|
|
if user.user_id <= 0:
|
|
user.user_id = 3
|
|
"""
|
|
# Helper_App.console_log(f'User: {user}')
|
|
return user
|
|
|
|
@staticmethod
|
|
def upload_bulk(permanent_table_name, records, batch_size):
|
|
_m = 'DataStore_Base.upload_bulk'
|
|
# Helper_App.console_log(f'{_m}\nstarting...')
|
|
# Helper_App.console_log(f'permanent_table_name: {permanent_table_name}')
|
|
if db.session.dirty or db.session.new or db.session.deleted:
|
|
# Helper_App.console_log("Session is not clean")
|
|
return
|
|
# Assuming `permanent_table_name` is a string representing the table name
|
|
table_object = db.metadata.tables.get(permanent_table_name)
|
|
# Helper_App.console_log(f'Tables: {list(db.metadata.tables.keys())}')
|
|
if table_object is None:
|
|
# Helper_App.console_log(f"Table {permanent_table_name} not found in metadata.")
|
|
return
|
|
# else:
|
|
# expected_columns = set(column.name for column in db.inspect(table_object).columns)
|
|
# Helper_App.console_log(f'table name: {table_object.name}')
|
|
# Helper_App.console_log(f'expected_columns: {expected_columns}')
|
|
max_retries = 3
|
|
initial_backoff = 1
|
|
for i in range(0, len(records), batch_size):
|
|
batch = records[i:i + batch_size]
|
|
try:
|
|
retries = 0
|
|
while retries < max_retries:
|
|
try:
|
|
# Helper_App.console_log(f'Before upload batch.')
|
|
db.session.add_all(batch)
|
|
# db.session.bulk_save_objects(batch)
|
|
# Helper_App.console_log(f'Before commit batch.')
|
|
db.session.commit()
|
|
# Helper_App.console_log(f'Batch uploaded.')
|
|
break
|
|
except OperationalError as e:
|
|
if "Lock wait timeout exceeded" not in str(e) or retries == max_retries - 1:
|
|
raise
|
|
|
|
wait_time = initial_backoff * (2 ** retries)
|
|
current_app.logger.warning(f"Lock timeout encountered. Retrying in {wait_time} seconds... (Attempt {retries + 1}/{max_retries})")
|
|
time.sleep(wait_time)
|
|
retries += 1
|
|
|
|
# Ensure the session is clean for the retry
|
|
db.session.rollback()
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
raise e
|
|
# Helper_App.console_log(f'Records uploaded in batches.')
|
|
|
|
@classmethod
|
|
def get_many_error(cls, guid):
|
|
_m = f'{cls.__qualname__}.get_many_error'
|
|
# user = cls.get_user_session()
|
|
arguments = Parameters_SQL_Error.get_default(guid)
|
|
argument_dict = arguments.to_json()
|
|
argument_types = Parameters_SQL_Error.get_type_hints()
|
|
# Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
|
|
|
errors = []
|
|
try:
|
|
error_result = cls.db_function_execute('tcg.public.FN_Error_Get_Many', argument_dict, argument_types)
|
|
error_result_set = error_result.fetchall()
|
|
# Helper_App.console_log(f'raw errors: {error_result_set}')
|
|
errors = []
|
|
error_indexes = {}
|
|
for row in error_result_set:
|
|
new_error = SQL_Error.from_db_error(row)
|
|
error_indexes[new_error.error_id] = len(errors)
|
|
errors.append(new_error)
|
|
# Helper_App.console_log(f'error {str(type(new_error))}: {new_error}')
|
|
|
|
except Exception as e:
|
|
# Helper_App.console_log(f'Error: {str(e)}')
|
|
error = SQL_Error()
|
|
error.msg = str(e)
|
|
errors.append(error)
|
|
|
|
return errors
|
|
|
|
@classmethod
|
|
def clear_error(cls, guid):
|
|
_m = f'{cls.__qualname__}.clear_error'
|
|
# user = cls.get_user_session()
|
|
arguments = Parameters_SQL_Error.get_default(guid)
|
|
argument_dict = arguments.to_json()
|
|
argument_types = Parameters_SQL_Error.get_type_hints()
|
|
# Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
|
|
|
cls.db_procedure_execute('tcg.public.USP_Error_Clear', argument_dict, argument_types)
|
|
|