Files
dog_training/datastores/datastore_user.py

307 lines
10 KiB
Python

"""
Project: PARTS Website
Author: Edward Middleton-Smith
Precision And Research Technology Systems Limited
Technology: DataStores
Feature: User DataStore
Description:
Datastore for Users
"""
# internal
# from routes import bp_home
import lib.argument_validation as av
from business_objects.sql_error import SQL_Error
from business_objects.dog.company import Company, Parameters_Company, Company_Temp
from business_objects.dog.role import Role, Parameters_Role
from business_objects.dog.user import User, User_Temp, Parameters_User
from datastores.datastore_base import DataStore_Base
from helpers.helper_app import Helper_App
from helpers.helper_db_mysql import Helper_DB_MySQL
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
from extensions import db
# external
# from abc import ABC, abstractmethod, abstractproperty
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import stripe
import os
from flask import Flask, session, current_app
from pydantic import BaseModel, ConfigDict
from typing import ClassVar
from datetime import datetime
db = SQLAlchemy()
class DataStore_User(DataStore_Base):
def __init__(self):
super().__init__()
@classmethod
def get_many_user(cls, user_filters, user=None):
_m = f'{cls.__qualname__}.get_many_user'
Helper_App.console_log(_m)
Helper_App.console_log(f'user_filters: {user_filters}')
# Helper_App.console_log(f"valid user_filters: {av.val_instance(user_filters, 'user_filters', _m, Parameters_User)}")
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
# guid = Helper_DB_MySQL.create_guid()
# Helper_App.console_log(f'user: {user}')
if user is None:
user = cls.get_user_session()
Helper_App.console_log(f'user: {user}')
user_filters.id_user = user.id_user
user_filters.auth0_id_user = user.id_user_auth0
argument_dict_list = {
**user_filters.to_json()
, 'a_debug': 0
}
Helper_App.console_log(f'argument_dict_list: {argument_dict_list}')
result = cls.db_procedure_execute('p_dog_get_many_user', argument_dict_list)
cursor = result.cursor
result_set = cursor.fetchall()
Helper_App.console_log(f'raw users: {result_set}')
users = []
if len(result_set) > 0:
for row in result_set:
Helper_App.console_log(f'row: {row}')
user = User.from_db_user(row)
users.append(user)
Helper_App.console_log(f'user {str(type(user))}: {user}')
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}\nlength: {len(result_set_e)}')
if len(result_set_e) > 0:
for row in result_set_e:
Helper_App.console_log(f'row: {row}')
error = SQL_Error.from_db_record(row)
errors.append(error)
error.__repr__()
Helper_App.console_log(f'Error: {error}\nErrors: {errors}')
for error in errors:
Helper_App.console_log(f"Error [{error.code_type}]: {error.msg}")
# errors = None
# cls.db_cursor_clear(cursor)
try:
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
except Exception as e:
Helper_App.console_log(f'Error: {str(e)}')
return users, errors
@classmethod
def save_users(cls, comment, users):
_m = f'{cls}.save_users'
av.val_str(comment, 'comment', _m)
guid = Helper_DB_MySQL.create_guid_str()
now = datetime.now()
user = cls.get_user_session()
Helper_App.console_log(f'saving users: {users}')
rows = []
for obj_user in users:
row = User_Temp.from_user(obj_user)
row.guid = guid
rows.append(row)
cls.upload_bulk(User_Temp.__tablename__, rows, 1000)
Helper_App.console_log('Users uploaded')
argument_dict_list = {
'a_comment': comment
, 'a_guid': guid
, 'a_id_user': user.id_user
, 'a_debug': 0
}
result = cls.db_procedure_execute('p_dog_save_user', argument_dict_list)
Helper_App.console_log('Users saved')
# Errors
cursor = result.cursor
cursor.nextset()
result_set_e = cursor.fetchall()
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_db_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
# cls.db_cursor_clear(cursor)
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
return errors
@classmethod
def save_new_founding_partner(cls, user):
_m = f'{cls}.save_new_founding_partner'
# av.val_str(comment, 'comment', _m)
guid = Helper_DB_MySQL.create_guid_str()
Helper_App.console_log(f'saving new founding partner: {user}')
argument_dict_list = {
'a_comment': 'save new founding partner'
, 'a_guid': guid
, 'a_id_user_auth0': user.id_user_auth0
, 'a_email_user': user.email
, 'a_debug': 0
}
result = cls.db_procedure_execute('p_dog_save_new_founding_partner', argument_dict_list)
Helper_App.console_log('Users saved')
# Errors
cursor = result.cursor
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_db_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
# cls.db_cursor_clear(cursor)
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
return errors
@classmethod
def get_many_role(cls, role_filters):
_m = f'{cls.__qualname__}.get_many_role'
user = cls.get_user_session()
argument_dict = {
'a_id_user': user.id_user
, **role_filters.to_json()
, 'a_debug': 0
}
Helper_App.console_log(f'argument_dict: {argument_dict}')
result = cls.db_procedure_execute('p_dog_get_many_role', argument_dict)
cursor = result.cursor
# Roles
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw roles: {result_set_1}')
roles = []
role_indexes = {}
for row in result_set_1:
Helper_App.console_log(f'Raw role: {row}')
new_role = Role.from_db_role(row)
role_indexes[new_role.id_role] = len(roles)
roles.append(new_role)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_db_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
# cls.db_cursor_clear(cursor)
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
return roles, errors
@classmethod
def get_many_company(cls, company_filters):
_m = f'{cls.__qualname__}.get_many_company'
user = cls.get_user_session()
argument_dict = {
'a_id_user': user.id_user
, **company_filters.to_json()
, 'a_debug': 0
}
Helper_App.console_log(f'argument_dict: {argument_dict}')
result = cls.db_procedure_execute('p_dog_get_many_company', argument_dict)
cursor = result.cursor
# Companies
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw companies: {result_set_1}')
companies = []
company_indexes = {}
for row in result_set_1:
Helper_App.console_log(f'Raw company: {row}')
new_company = Company.from_db_company(row)
company_indexes[new_company.id_company] = len(companies)
companies.append(new_company)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_db_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
# cls.db_cursor_clear(cursor)
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
return companies, errors
@classmethod
def save_companies(cls, comment, companies):
_m = f'{cls}.save_companies'
av.val_str(comment, 'comment', _m)
guid = Helper_DB_MySQL.create_guid_str()
now = datetime.now()
user = cls.get_user_session()
Helper_App.console_log(f'saving command categories: {companies}')
rows = []
for company in companies:
row = Company_Temp.from_company(company)
row.guid = guid
rows.append(row)
cls.upload_bulk(Company_Temp.__tablename__, rows, 1000)
Helper_App.console_log('Commands uploaded')
argument_dict_list = {
'a_comment': comment,
'a_guid': guid,
'a_id_user': user.id_user,
'a_debug': 0
}
result = cls.db_procedure_execute('p_dog_save_company', argument_dict_list)
Helper_App.console_log('Companies saved')
# Errors
cursor = result.cursor
cursor.nextset()
result_set_e = cursor.fetchall()
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_db_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
# cls.db_cursor_clear(cursor)
while cursor.nextset():
Helper_App.console_log(f'unexpected result set: {cursor.fetchall()}')
return errors