Feat: Contact Form MySQL database created and hooked up to web app on form submission. \n Fix: Removal of ERP and otherwise deprecated database and server code..

This commit is contained in:
2025-03-21 11:12:03 +00:00
parent 63776954e1
commit 4f5037b504
477 changed files with 4298 additions and 102117 deletions

View File

@@ -13,11 +13,8 @@ Datastore for Store
# internal
# from routes import bp_home
import lib.argument_validation as av
from business_objects.access_level import Access_Level
from business_objects.region import Region
from business_objects.sql_error import SQL_Error
from business_objects.unit_measurement import Unit_Measurement
from business_objects.user import User, Parameters_User, User_Permission_Evaluation
from business_objects.project_hub.user import User
# from helpers.helper_db_mysql import Helper_DB_MySQL
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
from extensions import db
@@ -25,52 +22,26 @@ from forms.access_level import Filters_Access_Level
from forms.unit_measurement import Filters_Unit_Measurement
from helpers.helper_app import Helper_App
# external
# from abc import ABC, abstractmethod, abstractproperty
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import stripe
import os
from flask import Flask, session, current_app
from pydantic import BaseModel, ConfigDict
from typing import ClassVar
from datetime import datetime
import time
from sqlalchemy.exc import OperationalError
# db = SQLAlchemy()
class DataStore_Base(BaseModel):
# Global constants
# Attributes
"""
app: Flask = None
db: SQLAlchemy = None
session: object = None
"""
# model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Constructor
"""
self.db = db
self.app = current_app
with self.app.app_context():
self.session = session
"""
@staticmethod
def db_procedure_execute(proc_name, argument_dict_list = None):
# Argument validation
_m = 'DataStore_Base.db_procedure_execute'
av.val_str(proc_name, 'proc_name', _m)
has_arguments = not str(type(argument_dict_list)) == "<class 'NoneType'>"
if has_arguments:
# av.val_list_instances(argument_dict_list, 'argument_dict_list', _m, dict)
pass
# Methods
proc_string = f'CALL {proc_name}('
has_arguments = not str(type(argument_dict_list)) == "<class 'NoneType'>"
if has_arguments:
arg_keys = list(argument_dict_list.keys())
for i in range(len(arg_keys)):
@@ -79,9 +50,6 @@ class DataStore_Base(BaseModel):
proc_string = text(proc_string)
Helper_App.console_log(f'{_m}\nproc_string: {proc_string}\nargs: {argument_dict_list}')
# with self.db.session.begin() as session:
# conn = Helper_DB_MySQL(self.app).get_db_connection()
if has_arguments:
result = db.session.execute(proc_string, argument_dict_list)
else:
@@ -94,60 +62,14 @@ class DataStore_Base(BaseModel):
def db_cursor_clear(cursor):
while cursor.nextset():
Helper_App.console_log(f'new result set: {cursor.fetchall()}')
@staticmethod
def get_user_session():
Helper_App.console_log('DataStore_Base.get_user_session')
user = User.from_json(session.get(User.FLAG_USER))
if user.is_logged_in:
filters_user = Parameters_User.get_default()
filters_user.ids_user = user.id_user
users = DataStore_Base.get_many_user(filters_user)
if user.id_user <= 0:
user.id_user = 3
return user
@classmethod
def get_many_user(cls, filters=None):
_m = 'DataStore_Store_Base.get_many_access_level'
user = User.from_json(session.get(User.FLAG_USER))
if filters is None:
filters_user = Parameters_User.get_default()
filters_user.ids_user = user.id_user if user.is_logged_in else None
av.val_instance(filters, 'filters', _m, Parameters_User)
argument_dict = filters.to_json()
argument_dict = {
'a_id_user': user.id_user,
'a_id_user_auth0': user.id_user_auth0,
**argument_dict,
'a_debug': 0,
}
Helper_App.console_log(f'argument_dict: {argument_dict}')
Helper_App.console_log('executing p_get_many_user')
result = cls.db_procedure_execute('p_get_many_user', argument_dict)
cursor = result.cursor
Helper_App.console_log('data received')
# users
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw users: {result_set_1}')
users = []
for row in result_set_1:
new_user = User.from_DB_user(row)
users.append(new_user)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Base.db_cursor_clear(cursor)
cursor.close()
return users, errors
@staticmethod
def upload_bulk(permanent_table_name, records, batch_size):
@@ -190,81 +112,3 @@ class DataStore_Base(BaseModel):
except Exception as e:
db.session.rollback()
raise e
@classmethod
def get_many_access_level(cls, filters=None):
_m = 'DataStore_Store_Base.get_many_access_level'
if filters is None:
filters = Filters_Access_Level()
av.val_instance(filters, 'filters', _m, Filters_Access_Level)
argument_dict = filters.to_json()
result = cls.db_procedure_execute('p_shop_get_many_access_level', argument_dict)
cursor = result.cursor
result_set_1 = cursor.fetchall()
access_levels = []
for row in result_set_1:
new_access_level = Access_Level.from_DB_access_level(row)
access_levels.append(new_access_level)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Base.db_cursor_clear(cursor)
cursor.close()
return access_levels, errors
@classmethod
def get_many_unit_measurement(cls, filters=None):
_m = 'DataStore_Store_Base.get_many_unit_measurement'
if filters is None:
filters = Filters_Unit_Measurement()
av.val_instance(filters, 'filters', _m, Filters_Unit_Measurement)
argument_dict = filters.to_json()
result = cls.db_procedure_execute('p_shop_get_many_unit_measurement', argument_dict)
cursor = result.cursor
result_set_1 = cursor.fetchall()
units = []
for row in result_set_1:
new_unit = Unit_Measurement.from_DB_unit_measurement(row)
units.append(new_unit)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Base.db_cursor_clear(cursor)
cursor.close()
return units, errors
@classmethod
def get_many_region(cls, get_inactive = False):
_m = 'DataStore_Store_Base.get_many_region'
_m_db_region = 'p_shop_get_many_region'
argument_dict_list_region = {
'a_get_inactive_region': 1 if get_inactive else 0
}
result = cls.db_procedure_execute(_m_db_region, argument_dict_list_region)
cursor = result.cursor
result_set_1 = cursor.fetchall()
regions = []
for row in result_set_1:
region = Region.from_DB_region(row)
regions.append(region)
DataStore_Base.db_cursor_clear(cursor)
cursor.close()
return regions

View File

@@ -1,147 +0,0 @@
"""
Project: PARTS Website
Author: Edward Middleton-Smith
Precision And Research Technology Systems Limited
Technology: DataStores
Feature: User DataStore
Description:
Datastore for Users
"""
# internal
# from routes import bp_home
import lib.argument_validation as av
from business_objects.sql_error import SQL_Error
from business_objects.user import User, Parameters_User, User_Permission_Evaluation
from datastores.datastore_base import DataStore_Base
from helpers.helper_app import Helper_App
from helpers.helper_db_mysql import Helper_DB_MySQL
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
from extensions import db
# external
# from abc import ABC, abstractmethod, abstractproperty
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import stripe
import os
from flask import Flask, session, current_app
from pydantic import BaseModel, ConfigDict
from typing import ClassVar
from datetime import datetime
db = SQLAlchemy()
class DataStore_User(DataStore_Base):
# Global constants
# Attributes
def __init__(self):
super().__init__()
def edit_user(self):
_m = 'DataStore_User.edit_user'
argument_dict_list = {
'a_id_user': self.info_user.get('sub'),
'a_name': self.info_user.get('name'),
'a_email': self.info_user.get('email'),
'a_email_verified': 1 if self.info_user.get('email_verified') == 'True' else 0
}
result = self.db_procedure_execute('p_shop_edit_user', argument_dict_list)
cursor = result.cursor
result_set_1 = cursor.fetchall()
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_2]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return (result_set_1[0][1] == b'\x01')
def get_many_user(self, user_filters, user=None):
_m = 'DataStore_User.get_many_user'
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
guid = Helper_DB_MySQL.create_guid()
if user is None:
user = self.get_user_session()
argument_dict_list = {
# 'a_guid': guid
'a_id_user': user.id_user
, 'a_id_user_auth0': user.id_user_auth0
, **user_filters.to_json()
, 'a_debug': 0
}
result = self.db_procedure_execute('p_get_many_user', argument_dict_list)
cursor = result.cursor
result_set = cursor.fetchall()
users = []
if len(result_set) > 0:
for row in result_set:
Helper_App.console_log(f'row: {row}')
user = User.from_DB_user(row)
users.append(user)
Helper_App.console_log(f'user {str(type(user))}: {user}')
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return users, errors
def get_many_user(self, user_filters, user=None):
_m = 'DataStore_User.get_many_user'
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
guid = Helper_DB_MySQL.create_guid()
if user is None:
user = self.get_user_session()
argument_dict_list = {
# 'a_guid': guid
'a_id_user': user.id_user
, 'a_id_user_auth0': user.id_user_auth0
, **user_filters.to_json()
, 'a_debug': 0
}
result = self.db_procedure_execute('p_get_many_user', argument_dict_list)
cursor = result.cursor
result_set = cursor.fetchall()
users = []
if len(result_set) > 0:
for row in result_set:
Helper_App.console_log(f'row: {row}')
user = User.from_DB_user(row)
users.append(user)
Helper_App.console_log(f'user {str(type(user))}: {user}')
Helper_App.console_log(f'type users: {str(type(users))}\n type user 0: {str(type(None if len(users) == 0 else users[0]))}')
# error_list, cursor = self.get_error_list_from_cursor(cursor)
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return users, errors

View File

@@ -0,0 +1,113 @@
"""
Project: PARTS Website
Author: Edward Middleton-Smith
Precision And Research Technology Systems Limited
Technology: DataStores
Feature: User DataStore
Description:
Datastore for Users
"""
# internal
# from routes import bp_home
import lib.argument_validation as av
from business_objects.sql_error import SQL_Error
from parts_website.business_objects.project_hub.contact_form import Contact_Form, Contact_Form_Temp
from datastores.datastore_base import DataStore_Base
from helpers.helper_app import Helper_App
from helpers.helper_db_mysql import Helper_DB_MySQL
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
from extensions import db
# external
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
db = SQLAlchemy()
class DataStore_Contact_Form(DataStore_Base):
def __init__(self):
super().__init__()
@classmethod
def get_many_contact_form(cls):
_m = f'{cls.__qualname__}.get_many_contact_form'
user = cls.get_user_session()
argument_dict = {
'a_id_user': user.id_user
, 'a_debug': 0
}
Helper_App.console_log(f'argument_dict: {argument_dict}')
result = cls.db_procedure_execute('p_ph_get_many_contact_form', argument_dict)
cursor = result.cursor
# Contact_Forms
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw contact_forms: {result_set_1}')
contact_forms = []
contact_form_indexes = {}
for row in result_set_1:
new_contact_form = Contact_Form.from_DB_contact_form(row)
contact_form_indexes[new_contact_form.id_contact_form] = len(contact_forms)
contact_forms.append(new_contact_form)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
cls.db_cursor_clear(cursor)
return contact_forms, errors
@classmethod
def save_contact_forms(cls, comment, contact_forms):
_m = f'{cls}.save_contact_forms'
av.val_str(comment, 'comment', _m)
guid = Helper_DB_MySQL.create_guid_str()
now = datetime.now()
user = cls.get_user_session()
Helper_App.console_log(f'saving contact forms: {contact_forms}')
rows = []
for contact_form in contact_forms:
row = Contact_Form_Temp.from_contact_form(contact_form)
row.guid = guid
rows.append(row)
cls.upload_bulk(Contact_Form_Temp.__tablename__, rows, 1000)
Helper_App.console_log('Contact Forms uploaded')
argument_dict_list = {
'a_comment': comment,
'a_guid': guid,
'a_id_user': user.id_user,
'a_debug': 0
}
result = cls.db_procedure_execute('p_ph_save_contact_form', argument_dict_list)
Helper_App.console_log('Contact Forms saved')
# Errors
cursor = result.cursor
cursor.nextset()
result_set_e = cursor.fetchall()
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
cls.db_cursor_clear(cursor)
return errors