Feat: Multiplayer sessions added using CRUD database.
This commit is contained in:
11
datastores/__init__.py
Normal file
11
datastores/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""
|
||||
Project: PARTS Website
|
||||
Author: Edward Middleton-Smith
|
||||
Precision And Research Technology Systems Limited
|
||||
|
||||
Technology: Module Initialisation
|
||||
Feature: DataStores
|
||||
|
||||
Description:
|
||||
Initialises datastores module.
|
||||
"""
|
||||
225
datastores/datastore_base.py
Normal file
225
datastores/datastore_base.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
Project: PARTS Website
|
||||
Author: Edward Middleton-Smith
|
||||
Precision And Research Technology Systems Limited
|
||||
|
||||
Technology: DataStores
|
||||
Feature: Base DataStore
|
||||
|
||||
Description:
|
||||
Datastore for Store
|
||||
"""
|
||||
|
||||
# internal
|
||||
# from routes import bp_home
|
||||
import lib.argument_validation as av
|
||||
from business_objects.sql_error import SQL_Error, Parameters_SQL_Error
|
||||
from business_objects.tcg.user import User
|
||||
# from helpers.helper_db_sql import Helper_DB_SQL
|
||||
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
|
||||
from extensions import db
|
||||
from helpers.helper_app import Helper_App
|
||||
# external
|
||||
from sqlalchemy import text
|
||||
from flask import Flask, session, current_app
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
import time
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from typing import ClassVar
|
||||
import uuid as uuid_lib
|
||||
|
||||
|
||||
|
||||
class DataStore_Base(BaseModel):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def db_procedure_execute(proc_name, argument_dict_list = None, argument_types_dict = None):
|
||||
"""Execute a PostgreSQL procedure using autocommit to allow internal COMMIT/ROLLBACK"""
|
||||
_m = 'DataStore_Base.db_procedure_execute'
|
||||
av.val_str(proc_name, 'proc_name', _m)
|
||||
proc_string = f'CALL {proc_name}('
|
||||
has_arguments = argument_dict_list is not None
|
||||
if has_arguments:
|
||||
arg_keys = list(argument_dict_list.keys())
|
||||
for i in range(len(arg_keys)):
|
||||
param_name = arg_keys[i]
|
||||
# Add explicit PostgreSQL CAST for typed parameters
|
||||
if argument_types_dict and param_name in argument_types_dict:
|
||||
type_name = argument_types_dict[param_name].__name__
|
||||
pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name)
|
||||
if pg_type:
|
||||
param_expr = f'CAST(:{param_name} AS {pg_type})'
|
||||
else:
|
||||
param_expr = f':{param_name}'
|
||||
else:
|
||||
param_expr = f':{param_name}'
|
||||
proc_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}'
|
||||
proc_string += ')'
|
||||
stmt = text(proc_string)
|
||||
Helper_App.console_log(f'{_m}\nproc_string: {stmt}\nargs: {argument_dict_list}')
|
||||
|
||||
rows = []
|
||||
with db.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
||||
if has_arguments:
|
||||
result = conn.execute(stmt, argument_dict_list)
|
||||
else:
|
||||
result = conn.execute(stmt)
|
||||
# Fetch all rows as mappings before connection closes
|
||||
if result.returns_rows:
|
||||
rows = result.mappings().fetchall()
|
||||
Helper_App.console_log(f'result: {rows}')
|
||||
return rows
|
||||
|
||||
# Map SQLAlchemy types to PostgreSQL type names for CAST()
|
||||
TYPE_CAST_MAP: ClassVar[dict[str, str]] = {
|
||||
'String': 'varchar',
|
||||
'Text': 'text',
|
||||
'Integer': 'integer',
|
||||
'Boolean': 'boolean',
|
||||
'UUID': 'uuid',
|
||||
'Uuid': 'uuid',
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def db_function_execute(func_name, argument_dict_list = None, argument_types_dict = None):
|
||||
"""Execute a PostgreSQL function that returns a table using SELECT * FROM function_name()"""
|
||||
_m = 'DataStore_Base.db_function_execute'
|
||||
av.val_str(func_name, 'func_name', _m)
|
||||
func_string = f'SELECT * FROM {func_name}('
|
||||
has_arguments = argument_dict_list is not None
|
||||
if has_arguments:
|
||||
arg_keys = list(argument_dict_list.keys())
|
||||
for i in range(len(arg_keys)):
|
||||
param_name = arg_keys[i]
|
||||
# Add explicit PostgreSQL CAST for typed parameters
|
||||
if argument_types_dict and param_name in argument_types_dict:
|
||||
type_name = argument_types_dict[param_name].__name__
|
||||
pg_type = DataStore_Base.TYPE_CAST_MAP.get(type_name)
|
||||
if pg_type:
|
||||
param_expr = f'CAST(:{param_name} AS {pg_type})'
|
||||
else:
|
||||
param_expr = f':{param_name}'
|
||||
else:
|
||||
param_expr = f':{param_name}'
|
||||
func_string += f'{"" if i == 0 else ", "}{param_name} := {param_expr}'
|
||||
func_string += ')'
|
||||
stmt = text(func_string)
|
||||
|
||||
Helper_App.console_log(f'{_m}\nfunc_string: {stmt}\nargs: {argument_dict_list}')
|
||||
|
||||
if has_arguments:
|
||||
result = db.session.execute(stmt, argument_dict_list)
|
||||
else:
|
||||
result = db.session.execute(stmt)
|
||||
Helper_App.console_log(f'result: {result}')
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def db_cursor_clear(cursor):
|
||||
while cursor.nextset():
|
||||
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
|
||||
|
||||
@staticmethod
|
||||
def get_user_session():
|
||||
Helper_App.console_log('DataStore_Base.get_user_session')
|
||||
user = User.from_json(session.get(User.FLAG_USER))
|
||||
"""
|
||||
if user.user_id <= 0:
|
||||
user.user_id = 3
|
||||
"""
|
||||
Helper_App.console_log(f'User: {user}')
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
def upload_bulk(permanent_table_name, records, batch_size):
|
||||
_m = 'DataStore_Base.upload_bulk'
|
||||
Helper_App.console_log(f'{_m}\nstarting...')
|
||||
Helper_App.console_log(f'permanent_table_name: {permanent_table_name}')
|
||||
if db.session.dirty or db.session.new or db.session.deleted:
|
||||
Helper_App.console_log("Session is not clean")
|
||||
return
|
||||
# Assuming `permanent_table_name` is a string representing the table name
|
||||
table_object = db.metadata.tables.get(permanent_table_name)
|
||||
Helper_App.console_log(f'Tables: {list(db.metadata.tables.keys())}')
|
||||
if table_object is None:
|
||||
Helper_App.console_log(f"Table {permanent_table_name} not found in metadata.")
|
||||
return
|
||||
else:
|
||||
expected_columns = set(column.name for column in db.inspect(table_object).columns)
|
||||
Helper_App.console_log(f'table name: {table_object.name}')
|
||||
Helper_App.console_log(f'expected_columns: {expected_columns}')
|
||||
max_retries = 3
|
||||
initial_backoff = 1
|
||||
for i in range(0, len(records), batch_size):
|
||||
batch = records[i:i + batch_size]
|
||||
try:
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
try:
|
||||
# Helper_App.console_log(f'Before upload batch.')
|
||||
db.session.add_all(batch)
|
||||
# db.session.bulk_save_objects(batch)
|
||||
# Helper_App.console_log(f'Before commit batch.')
|
||||
db.session.commit()
|
||||
# Helper_App.console_log(f'Batch uploaded.')
|
||||
break
|
||||
except OperationalError as e:
|
||||
if "Lock wait timeout exceeded" not in str(e) or retries == max_retries - 1:
|
||||
raise
|
||||
|
||||
wait_time = initial_backoff * (2 ** retries)
|
||||
current_app.logger.warning(f"Lock timeout encountered. Retrying in {wait_time} seconds... (Attempt {retries + 1}/{max_retries})")
|
||||
time.sleep(wait_time)
|
||||
retries += 1
|
||||
|
||||
# Ensure the session is clean for the retry
|
||||
db.session.rollback()
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
raise e
|
||||
Helper_App.console_log(f'Records uploaded in batches.')
|
||||
|
||||
@classmethod
|
||||
def get_many_error(cls, guid):
|
||||
_m = f'{cls.__qualname__}.get_many_error'
|
||||
# user = cls.get_user_session()
|
||||
arguments = Parameters_SQL_Error.get_default(guid)
|
||||
argument_dict = arguments.to_json()
|
||||
argument_types = Parameters_SQL_Error.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
errors = []
|
||||
try:
|
||||
error_result = cls.db_function_execute('tcg.public.FN_Error_Get_Many', argument_dict, argument_types)
|
||||
error_result_set = error_result.fetchall()
|
||||
Helper_App.console_log(f'raw errors: {error_result_set}')
|
||||
errors = []
|
||||
error_indexes = {}
|
||||
for row in error_result_set:
|
||||
new_error = SQL_Error.from_db_error(row)
|
||||
error_indexes[new_error.error_id] = len(errors)
|
||||
errors.append(new_error)
|
||||
Helper_App.console_log(f'error {str(type(new_error))}: {new_error}')
|
||||
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return errors
|
||||
|
||||
@classmethod
|
||||
def clear_error(cls, guid):
|
||||
_m = f'{cls.__qualname__}.clear_error'
|
||||
# user = cls.get_user_session()
|
||||
arguments = Parameters_SQL_Error.get_default(guid)
|
||||
argument_dict = arguments.to_json()
|
||||
argument_types = Parameters_SQL_Error.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
cls.db_procedure_execute('tcg.public.USP_Error_Clear', argument_dict, argument_types)
|
||||
|
||||
341
datastores/datastore_mtg.py
Normal file
341
datastores/datastore_mtg.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""
|
||||
Project: PARTS Website
|
||||
Author: Edward Middleton-Smith
|
||||
Precision And Research Technology Systems Limited
|
||||
|
||||
Technology: DataStores
|
||||
Feature: MTG DataStore
|
||||
|
||||
Description:
|
||||
Datastore for MTG game data
|
||||
"""
|
||||
|
||||
# internal
|
||||
import lib.argument_validation as av
|
||||
from business_objects.tcg.mtg_game import MTG_Game, Parameters_MTG_Game
|
||||
from business_objects.tcg.mtg_game_player import MTG_Game_Player, Parameters_MTG_Game_Player, MTG_Game_Player_Temp
|
||||
from business_objects.tcg.mtg_game_round import MTG_Game_Round, Parameters_MTG_Game_Round, MTG_Game_Round_Temp
|
||||
from business_objects.tcg.mtg_game_round_player_damage import MTG_Game_Round_Player_Damage, Parameters_MTG_Game_Round_Player_Damage, MTG_Game_Round_Player_Damage_Temp
|
||||
from business_objects.tcg.mtg_deck import MTG_Deck, Parameters_MTG_Deck
|
||||
from business_objects.sql_error import SQL_Error, Parameters_SQL_Error
|
||||
from datastores.datastore_base import DataStore_Base
|
||||
from helpers.helper_app import Helper_App
|
||||
from helpers.helper_db_sql import Helper_DB_SQL
|
||||
from extensions import db
|
||||
# external
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from sqlalchemy import Uuid
|
||||
from sqlalchemy.types import Text, Boolean, Integer
|
||||
from sqlalchemy.dialects.postgresql import TIMESTAMP
|
||||
from datetime import datetime
|
||||
|
||||
db = SQLAlchemy()
|
||||
|
||||
|
||||
class DataStore_MTG(DataStore_Base):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
@classmethod
|
||||
def get_many_mtg_game(cls, game_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_mtg_game'
|
||||
# user = cls.get_user_session()
|
||||
argument_dict = game_filters.to_json()
|
||||
argument_types = Parameters_MTG_Game.get_type_hints()
|
||||
Helper_App.console_log(f'argument_dict: {argument_dict}')
|
||||
|
||||
games = []
|
||||
errors = []
|
||||
try:
|
||||
game_result = cls.db_function_execute('tcg.public.FN_TCG_MTG_Game_Get_Many', argument_dict, argument_types)
|
||||
game_result_set = game_result.fetchall()
|
||||
Helper_App.console_log(f'raw games: {game_result_set}')
|
||||
games = []
|
||||
game_indexes = {}
|
||||
for row in game_result_set:
|
||||
new_game = MTG_Game.from_db_mtg_game(row)
|
||||
game_indexes[new_game.game_id] = len(games)
|
||||
games.append(new_game)
|
||||
Helper_App.console_log(f'game {str(type(new_game))}: {new_game}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return games, errors
|
||||
|
||||
@classmethod
|
||||
def save_mtg_game(cls, game):
|
||||
_m = f'{cls.__qualname__}.save_mtg_game'
|
||||
user = cls.get_user_session()
|
||||
guid = Helper_DB_SQL.create_guid()
|
||||
game_id = None
|
||||
success = None
|
||||
argument_dict = {
|
||||
'a_comment': 'Save game'
|
||||
, 'a_end_on': game.end_on
|
||||
, 'a_game_id': game.game_id
|
||||
, 'a_guid': guid
|
||||
, 'a_location_name': game.location_name
|
||||
, 'a_notes': game.notes
|
||||
, 'a_starting_life': game.starting_life
|
||||
, 'a_start_on': game.start_on
|
||||
, 'a_user_id': user.user_id
|
||||
, 'a_do_delete': False
|
||||
, 'a_is_commander': True
|
||||
, 'a_is_draft': False
|
||||
, 'a_is_sealed': False
|
||||
, 'o_game_id': game_id
|
||||
, 'o_success': success
|
||||
}
|
||||
argument_type_hints = {
|
||||
'a_comment': Text
|
||||
, 'a_end_on': TIMESTAMP
|
||||
, 'a_game_id': Integer
|
||||
, 'a_guid': Uuid
|
||||
, 'a_location_name': Text
|
||||
, 'a_notes': Text
|
||||
, 'a_starting_life': Integer
|
||||
, 'a_start_on': TIMESTAMP
|
||||
, 'a_user_id': Integer
|
||||
, 'a_do_delete': Boolean
|
||||
, 'a_is_commander': Boolean
|
||||
, 'a_is_draft': Boolean
|
||||
, 'a_is_sealed': Boolean
|
||||
, 'o_game_id': Integer
|
||||
, 'o_success': Boolean
|
||||
}
|
||||
Helper_App.console_log(f'argument_dict: {argument_dict}')
|
||||
|
||||
errors = []
|
||||
try:
|
||||
rows = cls.db_procedure_execute('tcg.public.USP_TCG_MTG_Game_Save', argument_dict, argument_type_hints)
|
||||
row = rows[0] if rows else None
|
||||
game_id = row['o_game_id'] if row else None
|
||||
success = row['o_success'] if row else False
|
||||
Helper_App.console_log(f'Game ID: {game_id}')
|
||||
Helper_App.console_log(f'Success: {success}')
|
||||
if not success:
|
||||
errors = cls.get_many_error(guid = guid)
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
cls.clear_error(guid = guid)
|
||||
|
||||
return game_id, errors
|
||||
|
||||
@classmethod
|
||||
def get_many_mtg_game_player(cls, player_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_mtg_game_player'
|
||||
argument_dict = player_filters.to_json()
|
||||
argument_types = Parameters_MTG_Game_Player.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
players = []
|
||||
errors = []
|
||||
try:
|
||||
result = cls.db_function_execute('tcg.public.FN_TCG_MTG_Game_Player_Get_Many', argument_dict, argument_types)
|
||||
result_set = result.fetchall()
|
||||
Helper_App.console_log(f'raw players: {result_set}')
|
||||
for row in result_set:
|
||||
new_player = MTG_Game_Player.from_db_mtg_game_player(row)
|
||||
players.append(new_player)
|
||||
Helper_App.console_log(f'player {str(type(new_player))}: {new_player}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return players, errors
|
||||
|
||||
@classmethod
|
||||
def get_many_mtg_game_round(cls, round_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_mtg_game_round'
|
||||
argument_dict = round_filters.to_json()
|
||||
argument_types = Parameters_MTG_Game_Round.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
rounds = []
|
||||
errors = []
|
||||
try:
|
||||
result = cls.db_function_execute('tcg.public.FN_TCG_MTG_Game_Round_Get_Many', argument_dict, argument_types)
|
||||
result_set = result.fetchall()
|
||||
Helper_App.console_log(f'raw rounds: {result_set}')
|
||||
for row in result_set:
|
||||
new_round = MTG_Game_Round.from_db_mtg_game_round(row)
|
||||
rounds.append(new_round)
|
||||
Helper_App.console_log(f'round {str(type(new_round))}: {new_round}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return rounds, errors
|
||||
|
||||
@classmethod
|
||||
def get_many_mtg_game_round_player_damage(cls, damage_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_mtg_game_round_player_damage'
|
||||
argument_dict = damage_filters.to_json()
|
||||
argument_types = Parameters_MTG_Game_Round_Player_Damage.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
damages = []
|
||||
errors = []
|
||||
try:
|
||||
result = cls.db_function_execute('tcg.public.FN_TCG_MTG_Game_Round_Player_Damage_Get_Many', argument_dict, argument_types)
|
||||
result_set = result.fetchall()
|
||||
Helper_App.console_log(f'raw damages: {result_set}')
|
||||
for row in result_set:
|
||||
new_damage = MTG_Game_Round_Player_Damage.from_db_mtg_game_round_player_damage(row)
|
||||
damages.append(new_damage)
|
||||
Helper_App.console_log(f'damage {str(type(new_damage))}: {new_damage.damage_id}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return damages, errors
|
||||
|
||||
@classmethod
|
||||
def get_many_mtg_deck(cls, deck_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_mtg_deck'
|
||||
argument_dict = deck_filters.to_json()
|
||||
argument_types = Parameters_MTG_Deck.get_type_hints()
|
||||
Helper_App.console_log(f'{_m}\nargument_dict: {argument_dict}')
|
||||
|
||||
decks = []
|
||||
errors = []
|
||||
try:
|
||||
result = cls.db_function_execute('tcg.public.FN_TCG_MTG_Deck_Get_Many', argument_dict, argument_types)
|
||||
result_set = result.fetchall()
|
||||
Helper_App.console_log(f'raw decks: {result_set}')
|
||||
for row in result_set:
|
||||
new_deck = MTG_Deck.from_db_mtg_deck(row)
|
||||
decks.append(new_deck)
|
||||
Helper_App.console_log(f'deck {str(type(new_deck))}: {new_deck}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return decks, errors
|
||||
|
||||
|
||||
@classmethod
|
||||
def save_mtg_game_player(cls, players):
|
||||
_m = f'{cls.__qualname__}.save_mtg_game_player'
|
||||
user = cls.get_user_session()
|
||||
guid = Helper_DB_SQL.create_guid()
|
||||
success = None
|
||||
argument_dict = {
|
||||
'a_comment': 'Save game player'
|
||||
, 'a_guid': guid
|
||||
, 'a_user_id': user.user_id
|
||||
, 'o_success': success
|
||||
}
|
||||
argument_type_hints = {
|
||||
'a_comment': Text
|
||||
, 'a_guid': Uuid
|
||||
, 'a_user_id': Integer
|
||||
, 'o_success': Boolean
|
||||
}
|
||||
Helper_App.console_log(f'argument_dict: {argument_dict}')
|
||||
|
||||
objs_player_temp = []
|
||||
for player in players:
|
||||
obj_player_temp = MTG_Game_Player_Temp.from_player(player = player, guid = guid)
|
||||
objs_player_temp.append(obj_player_temp)
|
||||
|
||||
success = False
|
||||
errors = []
|
||||
try:
|
||||
cls.upload_bulk(
|
||||
permanent_table_name = MTG_Game_Player_Temp.__tablename__
|
||||
, records = objs_player_temp
|
||||
, batch_size = 1000
|
||||
)
|
||||
rows = cls.db_procedure_execute('tcg.public.USP_TCG_MTG_Game_Player_Save', argument_dict, argument_type_hints)
|
||||
row = rows[0] if rows else None
|
||||
success = row['o_success'] if row else False
|
||||
Helper_App.console_log(f'Success: {success}')
|
||||
if not success:
|
||||
errors = cls.get_many_error(guid = guid)
|
||||
Helper_App.console_log(f'Errors: {str(errors)}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
cls.clear_error(guid = guid)
|
||||
|
||||
return success, errors
|
||||
|
||||
@classmethod
|
||||
def save_mtg_game_round_player_damage(cls, rounds, damages):
|
||||
_m = f'{cls.__qualname__}.save_mtg_game_round_player_damage'
|
||||
user = cls.get_user_session()
|
||||
guid = Helper_DB_SQL.create_guid()
|
||||
success = None
|
||||
argument_dict = {
|
||||
'a_comment': 'Save game player'
|
||||
, 'a_guid': guid
|
||||
, 'a_user_id': user.user_id
|
||||
, 'o_success': success
|
||||
}
|
||||
argument_type_hints = {
|
||||
'a_comment': Text
|
||||
, 'a_guid': Uuid
|
||||
, 'a_user_id': Integer
|
||||
, 'o_success': Boolean
|
||||
}
|
||||
Helper_App.console_log(f'argument_dict: {argument_dict}')
|
||||
|
||||
objs_round_temp = []
|
||||
for round in rounds:
|
||||
obj_round_temp = MTG_Game_Round_Temp.from_round(round = round, guid = guid)
|
||||
objs_round_temp.append(obj_round_temp)
|
||||
|
||||
objs_damage_temp = []
|
||||
for damage in damages:
|
||||
obj_damage_temp = MTG_Game_Round_Player_Damage_Temp.from_damage(damage = damage, guid = guid)
|
||||
objs_damage_temp.append(obj_damage_temp)
|
||||
|
||||
success = False
|
||||
errors = []
|
||||
try:
|
||||
cls.upload_bulk(
|
||||
permanent_table_name = MTG_Game_Round_Temp.__tablename__
|
||||
, records = objs_round_temp
|
||||
, batch_size = 1000
|
||||
)
|
||||
cls.upload_bulk(
|
||||
permanent_table_name = MTG_Game_Round_Player_Damage_Temp.__tablename__
|
||||
, records = objs_damage_temp
|
||||
, batch_size = 1000
|
||||
)
|
||||
rows = cls.db_procedure_execute('tcg.public.USP_TCG_MTG_Game_Round_Damage_Save', argument_dict, argument_type_hints)
|
||||
row = rows[0] if rows else None
|
||||
success = row['o_success'] if row else False
|
||||
Helper_App.console_log(f'Success: {success}')
|
||||
if not success:
|
||||
errors = cls.get_many_error(guid = guid)
|
||||
Helper_App.console_log(f'Errors: {str(errors)}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
cls.clear_error(guid = guid)
|
||||
|
||||
return success, errors
|
||||
107
datastores/datastore_user.py
Normal file
107
datastores/datastore_user.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""
|
||||
Project: PARTS Website
|
||||
Author: Edward Middleton-Smith
|
||||
Precision And Research Technology Systems Limited
|
||||
|
||||
Technology: DataStores
|
||||
Feature: User DataStore
|
||||
|
||||
Description:
|
||||
Datastore for Users
|
||||
"""
|
||||
|
||||
# internal
|
||||
# from routes import bp_home
|
||||
import lib.argument_validation as av
|
||||
from business_objects.sql_error import SQL_Error
|
||||
from business_objects.tcg.user import User, Parameters_User
|
||||
from datastores.datastore_base import DataStore_Base
|
||||
from helpers.helper_app import Helper_App
|
||||
from helpers.helper_db_sql import Helper_DB_SQL
|
||||
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
|
||||
from extensions import db
|
||||
# external
|
||||
# from abc import ABC, abstractmethod, abstractproperty
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from sqlalchemy import text
|
||||
import stripe
|
||||
import os
|
||||
from flask import Flask, session, current_app
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from typing import ClassVar
|
||||
from datetime import datetime
|
||||
|
||||
db = SQLAlchemy()
|
||||
|
||||
|
||||
class DataStore_User(DataStore_Base):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
@classmethod
|
||||
def get_many_user(cls, user_filters):
|
||||
_m = f'{cls.__qualname__}.get_many_user'
|
||||
Helper_App.console_log(_m)
|
||||
Helper_App.console_log(f'user_filters: {user_filters}')
|
||||
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
|
||||
|
||||
argument_dict = user_filters.to_json()
|
||||
Helper_App.console_log(f'argument_dict: {argument_dict}')
|
||||
|
||||
users = []
|
||||
errors = []
|
||||
try:
|
||||
user_result = cls.db_function_execute('tcg.public.FN_TCG_User_Get_Many', argument_dict, Parameters_User.get_type_hints())
|
||||
user_result_set = user_result.fetchall()
|
||||
Helper_App.console_log(f'raw users: {user_result_set}')
|
||||
|
||||
for row in user_result_set:
|
||||
Helper_App.console_log(f'row: {row}')
|
||||
user = User.from_db_user(row)
|
||||
users.append(user)
|
||||
Helper_App.console_log(f'user {str(type(user))}: {user}')
|
||||
except Exception as e:
|
||||
Helper_App.console_log(f'Error: {str(e)}')
|
||||
error = SQL_Error()
|
||||
error.msg = str(e)
|
||||
errors.append(error)
|
||||
|
||||
return users, errors
|
||||
|
||||
@classmethod
|
||||
def login_user(cls, user):
|
||||
_m = f'{cls}.login_user'
|
||||
# av.val_str(comment, 'comment', _m)
|
||||
|
||||
guid = Helper_DB_SQL.create_guid_str()
|
||||
|
||||
Helper_App.console_log(f'login user: {user}')
|
||||
|
||||
success = False
|
||||
user_id = None
|
||||
try:
|
||||
argument_dict_list = {
|
||||
'a_user_auth0_id': user.user_auth0_id
|
||||
, 'a_email': user.email
|
||||
, 'a_guid': guid
|
||||
, 'a_is_email_verified': user.is_email_verified
|
||||
, 'o_success': success
|
||||
, 'o_user_id': user_id
|
||||
}
|
||||
rows = cls.db_procedure_execute('tcg.public.USP_TCG_User_Login', argument_dict_list)
|
||||
row = rows[0] if rows else None
|
||||
success = row['o_success'] if row else False
|
||||
user_id = row['o_user_id'] if row else None
|
||||
Helper_App.console_log('User logged in')
|
||||
Helper_App.console_log(f'Success: {success}\nUser ID: {user_id}')
|
||||
|
||||
user.user_id = user_id
|
||||
user_filters = Parameters_User.from_user(user)
|
||||
users, errors = cls.get_many_user(user_filters = user_filters)
|
||||
user = users[0] if len(users) > 0 else None
|
||||
if user is not None:
|
||||
user.is_logged_in = True
|
||||
return user, errors
|
||||
except Exception as e:
|
||||
return None, SQL_Error.from_exception(e)
|
||||
|
||||
Reference in New Issue
Block a user