""" Project: PARTS Website Author: Edward Middleton-Smith Precision And Research Technology Systems Limited Technology: DataStores Feature: Base DataStore Description: Datastore for Store """ # internal # from routes import bp_home import lib.argument_validation as av from business_objects.sql_error import SQL_Error from business_objects.project_hub.user import User # from helpers.helper_db_mysql import Helper_DB_MySQL # from models.model_view_store_checkout import Model_View_Store_Checkout # circular! from extensions import db from forms.access_level import Filters_Access_Level from forms.unit_measurement import Filters_Unit_Measurement from helpers.helper_app import Helper_App # external from sqlalchemy import text from flask import Flask, session, current_app from pydantic import BaseModel, ConfigDict import time from sqlalchemy.exc import OperationalError class DataStore_Base(BaseModel): def __init__(self, **kwargs): super().__init__(**kwargs) @staticmethod def db_procedure_execute(proc_name, argument_dict_list = None): # Argument validation _m = 'DataStore_Base.db_procedure_execute' av.val_str(proc_name, 'proc_name', _m) proc_string = f'CALL {proc_name}(' has_arguments = not str(type(argument_dict_list)) == "" if has_arguments: arg_keys = list(argument_dict_list.keys()) for i in range(len(arg_keys)): proc_string += f'{"" if i == 0 else ", "}:{arg_keys[i]}' proc_string += ')' proc_string = text(proc_string) Helper_App.console_log(f'{_m}\nproc_string: {proc_string}\nargs: {argument_dict_list}') if has_arguments: result = db.session.execute(proc_string, argument_dict_list) else: result = db.session.execute(proc_string) Helper_App.console_log(f'result: {result}') # conn.session.remove() return result @staticmethod def db_cursor_clear(cursor): while cursor.nextset(): Helper_App.console_log(f'new result set: {cursor.fetchall()}') @staticmethod def get_user_session(): Helper_App.console_log('DataStore_Base.get_user_session') user = User.from_json(session.get(User.FLAG_USER)) if user.id_user <= 0: user.id_user = 3 return user @staticmethod def upload_bulk(permanent_table_name, records, batch_size): _m = 'DataStore_Base.upload_bulk' Helper_App.console_log(f'{_m}\nstarting...') Helper_App.console_log(f'permanent_table_name: {permanent_table_name}') if db.session.dirty or db.session.new or db.session.deleted: Helper_App.console_log("Session is not clean") return # Assuming `permanent_table_name` is a string representing the table name table_object = db.metadata.tables.get(permanent_table_name) if table_object is None: Helper_App.console_log(f"Table {permanent_table_name} not found in metadata.") return else: expected_columns = set(column.name for column in db.inspect(table_object).columns) Helper_App.console_log(f'expected_columns: {expected_columns}') max_retries = 3 initial_backoff = 1 for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] try: retries = 0 while retries < max_retries: try: db.session.bulk_save_objects(batch) db.session.commit() break except OperationalError as e: if "Lock wait timeout exceeded" not in str(e) or retries == max_retries - 1: raise wait_time = initial_backoff * (2 ** retries) current_app.logger.warning(f"Lock timeout encountered. Retrying in {wait_time} seconds... (Attempt {retries + 1}/{max_retries})") time.sleep(wait_time) retries += 1 # Ensure the session is clean for the retry db.session.rollback() except Exception as e: db.session.rollback() raise e