Initial commit.
This commit is contained in:
114
datastores/datastore_base.py
Normal file
114
datastores/datastore_base.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""
|
||||
Project: PARTS Website
|
||||
Author: Edward Middleton-Smith
|
||||
Precision And Research Technology Systems Limited
|
||||
|
||||
Technology: DataStores
|
||||
Feature: Base DataStore
|
||||
|
||||
Description:
|
||||
Datastore for Store
|
||||
"""
|
||||
|
||||
# internal
|
||||
# from routes import bp_home
|
||||
import lib.argument_validation as av
|
||||
from business_objects.sql_error import SQL_Error
|
||||
from business_objects.project_hub.user import User
|
||||
# from helpers.helper_db_mysql import Helper_DB_MySQL
|
||||
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
|
||||
from extensions import db
|
||||
from forms.access_level import Filters_Access_Level
|
||||
from forms.unit_measurement import Filters_Unit_Measurement
|
||||
from helpers.helper_app import Helper_App
|
||||
# external
|
||||
from sqlalchemy import text
|
||||
from flask import Flask, session, current_app
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
import time
|
||||
from sqlalchemy.exc import OperationalError
|
||||
|
||||
|
||||
|
||||
class DataStore_Base(BaseModel):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def db_procedure_execute(proc_name, argument_dict_list = None):
|
||||
# Argument validation
|
||||
_m = 'DataStore_Base.db_procedure_execute'
|
||||
av.val_str(proc_name, 'proc_name', _m)
|
||||
proc_string = f'CALL {proc_name}('
|
||||
has_arguments = not str(type(argument_dict_list)) == "<class 'NoneType'>"
|
||||
if has_arguments:
|
||||
arg_keys = list(argument_dict_list.keys())
|
||||
for i in range(len(arg_keys)):
|
||||
proc_string += f'{"" if i == 0 else ", "}:{arg_keys[i]}'
|
||||
proc_string += ')'
|
||||
proc_string = text(proc_string)
|
||||
Helper_App.console_log(f'{_m}\nproc_string: {proc_string}\nargs: {argument_dict_list}')
|
||||
|
||||
if has_arguments:
|
||||
result = db.session.execute(proc_string, argument_dict_list)
|
||||
else:
|
||||
result = db.session.execute(proc_string)
|
||||
Helper_App.console_log(f'result: {result}')
|
||||
# conn.session.remove()
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def db_cursor_clear(cursor):
|
||||
while cursor.nextset():
|
||||
Helper_App.console_log(f'new result set: {cursor.fetchall()}')
|
||||
|
||||
@staticmethod
|
||||
def get_user_session():
|
||||
Helper_App.console_log('DataStore_Base.get_user_session')
|
||||
user = User.from_json(session.get(User.FLAG_USER))
|
||||
if user.id_user <= 0:
|
||||
user.id_user = 3
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
def upload_bulk(permanent_table_name, records, batch_size):
|
||||
_m = 'DataStore_Base.upload_bulk'
|
||||
Helper_App.console_log(f'{_m}\nstarting...')
|
||||
Helper_App.console_log(f'permanent_table_name: {permanent_table_name}')
|
||||
if db.session.dirty or db.session.new or db.session.deleted:
|
||||
Helper_App.console_log("Session is not clean")
|
||||
return
|
||||
# Assuming `permanent_table_name` is a string representing the table name
|
||||
table_object = db.metadata.tables.get(permanent_table_name)
|
||||
if table_object is None:
|
||||
Helper_App.console_log(f"Table {permanent_table_name} not found in metadata.")
|
||||
return
|
||||
else:
|
||||
expected_columns = set(column.name for column in db.inspect(table_object).columns)
|
||||
Helper_App.console_log(f'expected_columns: {expected_columns}')
|
||||
max_retries = 3
|
||||
initial_backoff = 1
|
||||
for i in range(0, len(records), batch_size):
|
||||
batch = records[i:i + batch_size]
|
||||
try:
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
try:
|
||||
db.session.bulk_save_objects(batch)
|
||||
db.session.commit()
|
||||
break
|
||||
except OperationalError as e:
|
||||
if "Lock wait timeout exceeded" not in str(e) or retries == max_retries - 1:
|
||||
raise
|
||||
|
||||
wait_time = initial_backoff * (2 ** retries)
|
||||
current_app.logger.warning(f"Lock timeout encountered. Retrying in {wait_time} seconds... (Attempt {retries + 1}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
retries += 1
|
||||
|
||||
# Ensure the session is clean for the retry
|
||||
db.session.rollback()
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
raise e
|
||||
Reference in New Issue
Block a user