""" Project: Magic Tracker Author: Edward Middleton-Smith Shuffle & Skirmish Technology: DataStores Feature: Base DataStore Description: Datastore for Store """ # internal # from routes import bp_home import lib.argument_validation as av from business_objects.sql_error import SQL_Error, Parameters_SQL_Error from business_objects.tcg.user import User # from helpers.helper_db_sql import Helper_DB_SQL # from models.model_view_store_checkout import Model_View_Store_Checkout # circular! from extensions import db from helpers.helper_app import Helper_App # external from sqlalchemy import text from flask import Flask, session, current_app from pydantic import BaseModel, ConfigDict import time from sqlalchemy.exc import OperationalError from typing import ClassVar import uuid as uuid_lib class DataStore_Base(BaseModel): def __init__(self, **kwargs): super().__init__(**kwargs) @staticmethod def db_procedure_execute(proc_name, argument_dict_list = None, argument_types_dict = None): """Execute a PostgreSQL procedure using autocommit to allow internal COMMIT/ROLLBACK""" _m = 'DataStore_Base.db_procedure_execute' av.val_str(proc_name, 'proc_name', _m) proc_string = f'CALL {proc_name}(' has_arguments = argument_dict_list is not None if has_arguments: arg_keys = list(argument_dict_list.keys()) for i in range(len(arg_keys)): param_name = arg_keys[i] # Add explicit PostgreSQL CAST for typed parameters if argument_types_dict and param_name in argument_types_dict: type_name = argument_types_dict[param_name].__name__ pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name) if pg_type: param_expr = f'CAST(:{param_name} AS {pg_type})' else: param_expr = f':{param_name}' else: param_expr = f':{param_name}' proc_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}' proc_string += ')' stmt = text(proc_string) # Helper_App.console_log(f'{_m}\nproc_string: {stmt}\nargs: {argument_dict_list}') rows = [] with db.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: if has_arguments: result = conn.execute(stmt, argument_dict_list) else: result = conn.execute(stmt) # Fetch all rows as mappings before connection closes if result.returns_rows: rows = result.mappings().fetchall() # Helper_App.console_log(f'result: {rows}') return rows # Map SQLAlchemy types to PostgreSQL type names for CAST() TYPE_CAST_MAP: ClassVar[dict[str, str]] = { 'String': 'varchar', 'Text': 'text', 'Integer': 'integer', 'Boolean': 'boolean', 'UUID': 'uuid', 'Uuid': 'uuid', } @staticmethod def db_function_execute(func_name, argument_dict_list = None, argument_types_dict = None): """Execute a PostgreSQL function that returns a table using SELECT * FROM function_name()""" _m = 'DataStore_Base.db_function_execute' av.val_str(func_name, 'func_name', _m) func_string = f'SELECT * FROM {func_name}(' has_arguments = argument_dict_list is not None if has_arguments: arg_keys = list(argument_dict_list.keys()) for i in range(len(arg_keys)): param_name = arg_keys[i] # Add explicit PostgreSQL CAST for typed parameters if argument_types_dict and param_name in argument_types_dict: type_name = argument_types_dict[param_name].__name__ pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name) if pg_type: param_expr = f'CAST(:{param_name} AS {pg_type})' else: param_expr = f':{param_name}' else: param_expr = f':{param_name}' func_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}' func_string += ')' stmt = text(func_string) # Helper_App.console_log(f'{_m}\nfunc_string: {stmt}\nargs: {argument_dict_list}') if has_arguments: result = db.session.execute(stmt, argument_dict_list) else: result = db.session.execute(stmt) # Helper_App.console_log(f'result: {result}') return result @staticmethod def db_cursor_clear(cursor): while cursor.nextset(): Helper_App.console_log(f'db_cursor_clear\nunexpected result set: {cursor.fetchall()}') @staticmethod def get_user_session(): # Helper_App.console_log('DataStore_Base.get_user_session') user = User.from_json(session.get(User.FLAG_USER)) """ if user.user_id <= 0: user.user_id = 3 """ # Helper_App.console_log(f'User: {user}') return user @staticmethod def upload_bulk(permanent_table_name, records, batch_size): _m = 'DataStore_Base.upload_bulk' # Helper_App.console_log(f'{_m}\nstarting...') # Helper_App.console_log(f'permanent_table_name: {permanent_table_name}') if db.session.dirty or db.session.new or db.session.deleted: # Helper_App.console_log("Session is not clean") return # Assuming `permanent_table_name` is a string representing the table name table_object = db.metadata.tables.get(permanent_table_name) # Helper_App.console_log(f'Tables: {list(db.metadata.tables.keys())}') if table_object is None: # Helper_App.console_log(f"Table {permanent_table_name} not found in metadata.") return # else: # expected_columns = set(column.name for column in db.inspect(table_object).columns) # Helper_App.console_log(f'table name: {table_object.name}') # Helper_App.console_log(f'expected_columns: {expected_columns}') max_retries = 3 initial_backoff = 1 for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] try: retries = 0 while retries < max_retries: try: # Helper_App.console_log(f'Before upload batch.') db.session.add_all(batch) # db.session.bulk_save_objects(batch) # Helper_App.console_log(f'Before commit batch.') db.session.commit() # Helper_App.console_log(f'Batch uploaded.') break except OperationalError as e: if "Lock wait timeout exceeded" not in str(e) or retries == max_retries - 1: raise wait_time = initial_backoff * (2 ** retries) current_app.logger.warning(f"Lock timeout encountered. Retrying in {wait_time} seconds... (Attempt {retries + 1}/{max_retries})") time.sleep(wait_time) retries += 1 # Ensure the session is clean for the retry db.session.rollback() except Exception as e: db.session.rollback() raise e # Helper_App.console_log(f'Records uploaded in batches.') @classmethod def get_many_error(cls, guid): _m = f'{cls.__qualname__}.get_many_error' # user = cls.get_user_session() arguments = Parameters_SQL_Error.get_default(guid) argument_dict = arguments.to_json() argument_types = Parameters_SQL_Error.get_type_hints() # Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}') errors = [] try: error_result = cls.db_function_execute('tcg.public.FN_Error_Get_Many', argument_dict, argument_types) error_result_set = error_result.fetchall() # Helper_App.console_log(f'raw errors: {error_result_set}') errors = [] error_indexes = {} for row in error_result_set: new_error = SQL_Error.from_db_error(row) error_indexes[new_error.error_id] = len(errors) errors.append(new_error) # Helper_App.console_log(f'error {str(type(new_error))}: {new_error}') except Exception as e: # Helper_App.console_log(f'Error: {str(e)}') error = SQL_Error() error.msg = str(e) errors.append(error) return errors @classmethod def clear_error(cls, guid): _m = f'{cls.__qualname__}.clear_error' # user = cls.get_user_session() arguments = Parameters_SQL_Error.get_default(guid) argument_dict = arguments.to_json() argument_types = Parameters_SQL_Error.get_type_hints() # Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}') cls.db_procedure_execute('tcg.public.USP_Error_Clear', argument_dict, argument_types)