Feat: \n 1. Contact Us page form submission success page created. \n 2. Contact Us page styling and CAPTCHA text content. \n 3. Removal of ERP, Google CAPTCHA, and ALTCHA API code and left over comments in JavaScript, Python.

This commit is contained in:
2025-03-16 16:56:15 +00:00
parent 4add7e626e
commit dd608022cd
68 changed files with 597 additions and 3921 deletions

View File

@@ -89,60 +89,11 @@ class DataStore_Base(BaseModel):
Helper_App.console_log(f'result: {result}')
# conn.session.remove()
return result
cursor = result.cursor
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'categories: {result_set_1}')
cursor.nextset()
result_set_2 = cursor.fetchall()
Helper_App.console_log(f'products: {result_set_2}')
@staticmethod
def db_cursor_clear(cursor):
while cursor.nextset():
Helper_App.console_log(f'new result set: {cursor.fetchall()}')
@classmethod
def get_many_region_and_currency(cls):
_m = 'DataStore_Base.get_many_region_and_currency'
_m_db_currency = 'p_shop_get_many_currency'
_m_db_region = 'p_shop_get_many_region'
argument_dict_list_currency = {
'a_get_inactive_currency': 0
}
argument_dict_list_region = {
'a_get_inactive_currency': 0
}
Helper_App.console_log(f'executing {_m_db_currency}')
result = cls.db_procedure_execute(_m_db_currency, argument_dict_list_currency)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
currencies = []
for row in result_set_1:
currency = Currency.make_from_DB_currency(row)
currencies.append(currency)
Helper_App.console_log(f'currencies: {currencies}')
DataStore_Base.db_cursor_clear(cursor)
Helper_App.console_log(f'executing {_m_db_region}')
result = cls.db_procedure_execute(_m_db_region, argument_dict_list_region)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
regions = []
for row in result_set_1:
region = Region.make_from_DB_region(row)
regions.append(region)
Helper_App.console_log(f'regions: {regions}')
DataStore_Base.db_cursor_clear(cursor)
cursor.close()
return regions, currencies
@staticmethod
def get_user_session():
Helper_App.console_log('DataStore_Base.get_user_session')
@@ -214,30 +165,6 @@ class DataStore_Base(BaseModel):
else:
expected_columns = set(column.name for column in db.inspect(table_object).columns)
Helper_App.console_log(f'expected_columns: {expected_columns}')
""" v1, v2
try:
for i in range(0, len(records), batch_size):
"" v1
batch = records[i:i+batch_size]
Helper_App.console_log(f'batch: {batch}')
db.session.bulk_save_objects(batch)
""
""
data = [object.to_json() for object in batch]
Helper_App.console_log(f'data: {data}')
for row in data:
row_keys = set(row.keys())
if row_keys != expected_columns:
Helper_App.console_log(f"Column mismatch in row: {row}")
Helper_App.console_log(f'missing columns: {expected_columns - row_keys}')
Helper_App.console_log(f'extra columns: {row_keys - expected_columns}')
# db.session.bulk_insert_mappings(permanent_table_name, data)
""
except Exception as e:
Helper_App.console_log(f'{_m}\n{e}')
db.session.rollback()
raise e
"""
max_retries = 3
initial_backoff = 1
for i in range(0, len(records), batch_size):
@@ -271,17 +198,9 @@ class DataStore_Base(BaseModel):
filters = Filters_Access_Level()
av.val_instance(filters, 'filters', _m, Filters_Access_Level)
argument_dict = filters.to_json()
# user = cls.get_user_session()
# argument_dict['a_id_user'] = 1 # 'auth0|6582b95c895d09a70ba10fef' # id_user
Helper_App.console_log(f'argument_dict: {argument_dict}')
Helper_App.console_log('executing p_shop_get_many_access_level')
result = cls.db_procedure_execute('p_shop_get_many_access_level', argument_dict)
cursor = result.cursor
Helper_App.console_log('data received')
# access_levels
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw access levels: {result_set_1}')
access_levels = []
for row in result_set_1:
new_access_level = Access_Level.from_DB_access_level(row)
@@ -290,7 +209,6 @@ class DataStore_Base(BaseModel):
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
@@ -308,17 +226,9 @@ class DataStore_Base(BaseModel):
filters = Filters_Unit_Measurement()
av.val_instance(filters, 'filters', _m, Filters_Unit_Measurement)
argument_dict = filters.to_json()
# user = cls.get_user_session()
# argument_dict['a_id_user'] = 1 # 'auth0|6582b95c895d09a70ba10fef' # id_user
Helper_App.console_log(f'argument_dict: {argument_dict}')
Helper_App.console_log('executing p_shop_get_many_unit_measurement')
result = cls.db_procedure_execute('p_shop_get_many_unit_measurement', argument_dict)
cursor = result.cursor
Helper_App.console_log('data received')
# units of measurement
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw units of measurement: {result_set_1}')
units = []
for row in result_set_1:
new_unit = Unit_Measurement.from_DB_unit_measurement(row)
@@ -327,7 +237,6 @@ class DataStore_Base(BaseModel):
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
@@ -348,18 +257,13 @@ class DataStore_Base(BaseModel):
'a_get_inactive_region': 1 if get_inactive else 0
}
Helper_App.console_log(f'executing {_m_db_region}')
result = cls.db_procedure_execute(_m_db_region, argument_dict_list_region)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
regions = []
for row in result_set_1:
region = Region.from_DB_region(row)
regions.append(region)
Helper_App.console_log(f'regions: {regions}')
DataStore_Base.db_cursor_clear(cursor)
cursor.close()

View File

@@ -1,351 +0,0 @@
"""
Project: PARTS Website
Author: Edward Middleton-Smith
Precision And Research Technology Systems Limited
Technology: DataStores
Feature: Base Store DataStore
Description:
Datastore for Store
"""
# internal
# from routes import bp_home
from business_objects.store.basket import Basket, Basket_Item
from business_objects.store.product_category import Product_Category_Container, Product_Category
from business_objects.currency import Currency
from business_objects.store.image import Image
from business_objects.store.delivery_option import Delivery_Option
from business_objects.region import Region
from business_objects.store.discount import Discount
from business_objects.store.order import Order
from business_objects.store.plant import Plant
from business_objects.store.product import Product, Product_Permutation, Parameters_Product
from business_objects.sql_error import SQL_Error
from business_objects.store.stock_item import Stock_Item
from business_objects.store.storage_location import Storage_Location
from business_objects.store.product_variation import Product_Variation, Parameters_Product_Variation
from business_objects.store.product_variation_type import Product_Variation_Type
from datastores.datastore_base import DataStore_Base
from extensions import db
from helpers.helper_app import Helper_App
from helpers.helper_db_mysql import Helper_DB_MySQL
import lib.argument_validation as av
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
# external
# from abc import ABC, abstractmethod, abstractproperty
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import stripe
import os
from flask import Flask, session, current_app
from pydantic import BaseModel, ConfigDict
from typing import ClassVar
from datetime import datetime
# db = SQLAlchemy()
class DataStore_Store_Base(DataStore_Base):
# Global constants
KEY_BASKET: ClassVar[str] = Basket.KEY_BASKET
KEY_IS_INCLUDED_VAT: ClassVar[str] = Basket.KEY_IS_INCLUDED_VAT # 'is_included_VAT'
KEY_ID_CURRENCY: ClassVar[str] = Basket.KEY_ID_CURRENCY # 'id_currency'
KEY_ID_REGION_DELIVERY: ClassVar[str] = Basket.KEY_ID_REGION_DELIVERY # 'id_region_delivery'
# Attributes
def __init__(self, **kwargs):
super().__init__(**kwargs)
@classmethod
def get_many_product(cls, product_filters):
# redundant argument validation?
_m = 'DataStore_Store_Base.get_many_product'
av.val_instance(product_filters, 'product_filters', _m, Parameters_Product)
argument_dict = product_filters.to_json()
user = cls.get_user_session()
"""
argument_dict['a_id_user'] = user.id_user # 'auth0|6582b95c895d09a70ba10fef' # id_user
argument_dict['a_debug'] = 0
"""
argument_dict = {
'a_id_user': user.id_user
, **argument_dict
, 'a_debug': 0
}
Helper_App.console_log(f'argument_dict: {argument_dict}')
Helper_App.console_log('executing p_shop_get_many_product')
result = cls.db_procedure_execute('p_shop_get_many_product', argument_dict)
cursor = result.cursor
Helper_App.console_log('data received')
category_list = Product_Category_Container()
Helper_App.console_log(f'initial category_list: {category_list}')
# Categories
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw categories: {result_set_1}')
for row in result_set_1:
new_category = Product_Category.from_DB_get_many_product_catalogue(row)
Helper_App.console_log(f'new_category: {new_category}')
category_list.add_product_category(new_category)
Helper_App.console_log(f'category-loaded category_list: {category_list}')
# Products
cursor.nextset()
result_set_2 = cursor.fetchall()
Helper_App.console_log(f'raw products: {result_set_2}')
for row in result_set_2:
Helper_App.console_log(f'row: {row}')
new_product = Product.from_DB_get_many_product_catalogue(row)
Helper_App.console_log(f'new_product: {new_product}')
try:
category_list.add_product(new_product)
except Exception as e:
Helper_App.console_log(f'Error adding product: {e}')
# Permutations
cursor.nextset()
result_set_3 = cursor.fetchall()
for row in result_set_3:
new_permutation = Product_Permutation.from_DB_get_many_product_catalogue(row)
try:
category_list.add_product_permutation(new_permutation)
except Exception as e:
Helper_App.console_log(f'Error adding permutation: {e}')
# Product_Variations
cursor.nextset()
result_set_4 = cursor.fetchall()
for row in result_set_4:
new_variation_type = Product_Variation_Type.from_DB_get_many_product_catalogue(row)
try:
category_list.add_product_variation_type(new_variation_type)
except Exception as e:
Helper_App.console_log(f'Error adding variation: {e}')
# Images
cursor.nextset()
result_set_5 = cursor.fetchall()
for row in result_set_5:
new_image = Image.from_DB_get_many_product_catalogue(row)
category_list.add_product_image(new_image)
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
errors = []
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # (row[0], row[1])
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
category_list.get_all_product_variation_trees()
"""
for category in category_list.categories:
Helper_App.console_log(f'category: {category.name}')
for product in category.products:
permutation = product.get_permutation_selected()
Helper_App.console_log(f'product: {product.name}\nselected permutation: {permutation}')
"""
if len(errors) > 0:
for error in errors:
if error.code == 'PRODUCT_AVAILABILITY':
ids_permutation_unavailable = DataStore_Store_Base.get_ids_permutation_from_error_availability(error.msg)
for id_permutation in ids_permutation_unavailable:
index_category = category_list.get_index_category_from_id_permutation(id_permutation)
category = category_list.categories[index_category]
index_product = category.get_index_product_from_id_permutation(id_permutation)
product = category.products[index_product]
index_permutation = product.get_index_permutation_from_id(id_permutation)
permutation = product.permutations[index_permutation]
permutation.is_available = False
if 'region' in error.msg or 'currency' in error.msg:
permutation.is_unavailable_in_currency_or_region = True
DataStore_Store_Base.db_cursor_clear(cursor)
cursor.close()
Helper_App.console_log(f'get many category_list: {category_list}')
return category_list, errors # categories, category_index
"""
def get_many_id_price(self, product_ids):
_m = 'DataStore_Store_Base.get_many_id_price'
av.val_str(product_ids, 'product_ids', _m)
price_ids = []
for product_id in product_ids.split(','):
if product_id == 'prod_PB0NUOSEs06ymG':
price_ids.append() # get price id
return price_ids
"""
@staticmethod
def get_ids_permutation_from_error_availability(msg_error_availability):
ids_permutation = []
index_colon = msg_error_availability.find(':', msg_error_availability.find(':'))
msg_error_availability = msg_error_availability[index_colon + 1:]
index_comma = 0
while index_comma > -1:
msg_error_availability = msg_error_availability[index_comma:]
index_comma = msg_error_availability.find(',')
ids_permutation.append(msg_error_availability[:index_comma])
return ids_permutation
@classmethod
def get_many_plant(cls, get_inactive = False):
_m = 'DataStore_Store_Base.get_many_plant'
_m_db_plant = 'p_shop_get_many_plant'
argument_dict_list_plant = {
'a_get_inactive_plant': 1 if get_inactive else 0
}
Helper_App.console_log(f'executing {_m_db_plant}')
result = cls.db_procedure_execute(_m_db_plant, argument_dict_list_plant)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
plants = []
for row in result_set_1:
plant = Plant.from_DB_plant(row)
plants.append(plant)
Helper_App.console_log(f'plants: {plants}')
DataStore_Store_Base.db_cursor_clear(cursor)
cursor.close()
return plants
@classmethod
def get_many_storage_location(self, get_inactive = False):
_m = 'DataStore_Store_Base.get_many_storage_location'
_m_db_storage_location = 'p_shop_get_many_storage_location'
argument_dict_list_storage_location = {
'a_get_inactive_storage_location': 1 if get_inactive else 0
}
Helper_App.console_log(f'executing {_m_db_storage_location}')
result = self.db_procedure_execute(_m_db_storage_location, argument_dict_list_storage_location)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
storage_locations = []
for row in result_set_1:
storage_location = Storage_Location.from_DB_storage_location(row)
storage_locations.append(storage_location)
Helper_App.console_log(f'storage_locations: {storage_locations}')
DataStore_Store_Base.db_cursor_clear(cursor)
cursor.close()
return storage_locations
@classmethod
def get_many_currency(cls, get_inactive = False):
_m = 'DataStore_Store_Base.get_many_currency'
_m_db_currency = 'p_shop_get_many_currency'
argument_dict_list_currency = {
'a_get_inactive_currency': 1 if get_inactive else 0
}
Helper_App.console_log(f'executing {_m_db_currency}')
result = cls.db_procedure_execute(_m_db_currency, argument_dict_list_currency)
cursor = result.cursor
Helper_App.console_log('data received')
# cursor.nextset()
result_set_1 = cursor.fetchall()
currencies = []
for row in result_set_1:
currency = Currency.from_DB_currency(row)
currencies.append(currency)
Helper_App.console_log(f'currencies: {currencies}')
DataStore_Store_Base.db_cursor_clear(cursor)
return currencies
@classmethod
def get_many_region_and_currency(cls, get_inactive_currency = False, get_inactive_region = False):
_m = 'DataStore_Store_Base.get_many_region_and_currency'
currencies = cls.get_many_currency(get_inactive_currency)
regions = cls.get_many_region(get_inactive_region)
return regions, currencies
@classmethod
def get_many_product_variation(cls, variation_filters):
_m = 'DataStore_Store_Base.get_many_product_variation'
Helper_App.console_log(_m)
av.val_instance(variation_filters, 'variation_filters', _m, Parameters_Product_Variation)
guid = Helper_DB_MySQL.create_guid()
# now = datetime.now()
# user = self.get_user_session()
"""
argument_dict_list = {
'a_id_user': id_user,
'a_comment': comment,
'a_guid': guid
}
"""
user = cls.get_user_session()
argument_dict_list = {
# 'a_guid': guid
'a_id_user': user.id_user
, **variation_filters.to_json()
, 'a_debug': 0
}
# argument_dict_list['a_guid'] = guid
result = cls.db_procedure_execute('p_shop_get_many_product_variation', argument_dict_list)
cursor = result.cursor
result_set_vt = cursor.fetchall()
# Product_Variation Types
# variation_container = Product_Variation_Container()
variation_types = []
index_variation_type = {}
for row in result_set_vt:
new_variation_type = Product_Variation_Type.from_DB_get_many_product_variation(row)
# variation_container.add_product_variation_type(new_variation_type)
index_variation_type[new_variation_type.id_type] = len(variation_types)
variation_types.append(new_variation_type)
Helper_App.console_log(f'index_variation_type: {index_variation_type}')
# Product_Variations
cursor.nextset()
result_set_v = cursor.fetchall()
# variations = Product_Variation_Container()
variations = []
for row in result_set_v:
new_variation = Product_Variation.from_DB_get_many_product_variation(row)
# new_variation.variation_type = variation_types_dict[new_variation.id_type]
# variation_container.add_product_variation(new_variation)
variation_types[index_variation_type[new_variation.id_type]].variations.append(new_variation)
variations.append(new_variation)
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Store_Base.db_cursor_clear(cursor)
cursor.close()
return variation_types, variations, errors

View File

@@ -1,195 +0,0 @@
"""
Project: PARTS Website
Author: Edward Middleton-Smith
Precision And Research Technology Systems Limited
Technology: DataStores
Feature: Store Stripe DataStore
Description:
Datastore for Store Stripe service
"""
# internal
# from routes import bp_home
import lib.argument_validation as av
from business_objects.store.basket import Basket, Basket_Item
from business_objects.store.product import Product, Product_Permutation, Product_Price, Parameters_Product
from business_objects.sql_error import SQL_Error
from datastores.datastore_store_base import DataStore_Store_Base
# from helpers.helper_db_mysql import Helper_DB_MySQL
# from models.model_view_store_checkout import Model_View_Store_Checkout # circular!
from extensions import db
from helpers.helper_app import Helper_App
# external
# from abc import ABC, abstractmethod, abstractproperty
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import stripe
import os
from flask import Flask, session, current_app
from pydantic import BaseModel, ConfigDict
from typing import ClassVar
from datetime import datetime
# db = SQLAlchemy()
class DataStore_Store_Stripe(DataStore_Store_Base):
# Global constants
# Attributes
key_public_stripe: str = None
key_secret_stripe: str = None
def __init__(self):
super().__init__()
self.key_secret_stripe = os.environ.get("KEY_SECRET_STRIPE")
self.key_public_stripe = os.environ.get("KEY_PUBLIC_STRIPE")
# For sample support and debugging, not required for production:
stripe.set_app_info(
'stripe-samples/checkout-one-time-payments',
version='0.0.1',
url='https://github.com/stripe-samples/checkout-one-time-payments')
stripe.api_key = self.key_secret_stripe
def get_many_stripe_product_new(self):
_m = 'DataStore_Store_Stripe.get_many_stripe_product_new'
_m_db = 'p_shop_get_many_stripe_product_new'
# av.val_str(id_user)
# validation conducted by server
argument_dict_list = {
'a_id_user': self.info_user
}
Helper_App.console_log(f'executing {_m_db}')
result = self.db_procedure_execute(_m_db, argument_dict_list)
cursor = result.cursor
Helper_App.console_log('data received')
# Products
cursor.nextset()
result_set_1 = cursor.fetchall()
products = []
for row in result_set_1:
new_product = Product.from_DB_Stripe_product(row) # Product(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19])
products.append(new_product)
Helper_App.console_log(f'products: {products}')
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Store_Stripe.db_cursor_clear(cursor)
return products
def get_many_stripe_price_new(self):
_m = 'DataStore_Store_Stripe.get_many_stripe_price_new'
_m_db = 'p_shop_get_many_stripe_price_new'
# av.val_str(id_user)
# validation conducted by server
argument_dict_list = {
'a_id_user': self.info_user
}
Helper_App.console_log(f'executing {_m_db}')
result = self.db_procedure_execute(_m_db, argument_dict_list)
cursor = result.cursor
Helper_App.console_log('data received')
# Products
cursor.nextset()
result_set_1 = cursor.fetchall()
products = []
for row in result_set_1:
new_product = Product.from_DB_Stripe_price(row) # Product(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17], row[18], row[19])
products.append(new_product)
Helper_App.console_log(f'products: {products}')
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_Store_Stripe.db_cursor_clear(cursor)
return products
def get_many_product_new(self):
_m = 'DataStore_Store_Stripe.get_many_product_new'
# Stripe
new_products = self.get_many_stripe_product_new()
for product in new_products:
product.id_stripe_product = self.create_stripe_product(product)
return new_products
def get_many_price_new(self):
_m = 'DataStore_Store_Stripe.get_many_product_new'
# Stripe
new_products = self.get_many_stripe_price_new()
for product in new_products:
product.id_stripe_price = self.create_stripe_price(product)
return new_products
# Stripe
def create_stripe_product(self, product): # _name, product_description):
_m = 'DataStore_Store_Stripe_Checkout.create_stripe_product'
# av.val_str(product_name, 'product_name', _m)
# av.val_str(product_description, 'product_description', _m)
av.val_instance(product, 'product', _m, Product)
Helper_App.console_log(f'stripe.api_key = {stripe.api_key}')
new_product = stripe.Product.create(
name = product.name,
description = product.description,
)
# Save these identifiers
Helper_App.console_log(f"Success! Here is your new Stripe product id: {new_product.id}")
return new_product.id
def create_stripe_price(self, product, currency): # product_id, product_price, product_currency, product_is_subscription, product_recurring_interval = '', product_interval_count = 0):
_m = 'DataStore_Store_Stripe_Checkout.create_stripe_price'
"""
av.val_str(p_id, 'p_id', _m)
av.full_val_float(p_price, 'p_price', _m, 0.01)
p_price = round(p_price, 2)
av.val_str(p_currency, 'p_currency', _m)
av.full_val_bool(p_is_subscription, 'p_is_subscription', _m)
p_is_subscription = bool(p_is_subscription)
av.val_str(p_recurring_interval, 'p_recurring_interval', _m)
av.full_val_int(p_interval_count, 'p_interval_count', _m, 1 if p_is_subscription else 0)
p_interval_count = int(p_interval_count)
"""
av.val_instance(product, 'product', _m, Product)
av.val_str(currency, 'currency', _m)
Helper_App.console_log(f'stripe.api_key = {stripe.api_key}')
new_product_price = stripe.Price.create(
unit_amount = product.unit_price,
currency = currency,
recurring = { "interval": product.name_recurring_interval, "interval_count": product.count_recurring_interval } if product.is_subscription else None,
product = product.id_stripe_product
)
# Save these identifiers
Helper_App.console_log(f"Success! Here is your Stripe product price id: {new_product_price.id} for {product.name}")
return new_product_price.id

View File

@@ -42,9 +42,7 @@ class DataStore_User(DataStore_Base):
super().__init__()
def edit_user(self):
# redundant argument validation?
_m = 'DataStore_User.edit_user'
# av.val_instance(filters, 'filters', _m, Filters_Product_Category)
argument_dict_list = {
'a_id_user': self.info_user.get('sub'),
@@ -57,12 +55,10 @@ class DataStore_User(DataStore_Base):
cursor = result.cursor
result_set_1 = cursor.fetchall()
Helper_App.console_log(f'raw user data: {result_set_1}')
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_2]
for error in errors:
@@ -71,67 +67,12 @@ class DataStore_User(DataStore_Base):
DataStore_User.db_cursor_clear(cursor)
return (result_set_1[0][1] == b'\x01')
"""
def get_many_user_order(self, id_user, ids_order, n_order_max, id_checkout_session):
_m = 'DataStore_User.get_many_user_order'
# av.val_str(id_user)
# validation conducted by server
argument_dict_list = {
'a_id_user': id_user,
'a_ids_order': ids_order,
'a_n_order_max': n_order_max,
'a_id_checkout_session': id_checkout_session
}
Helper_App.console_log('executing p_shop_get_many_user_order')
result = self.db_procedure_execute('p_shop_get_many_user_order', argument_dict_list)
cursor = result.cursor
Helper_App.console_log('data received')
# Discount Delivery Regions
cursor.nextset()
result_set_1 = cursor.fetchall()
orders = []
for row in result_set_1:
new_order = Order(row[0], row[1], row[2], row[3], row[4], row[5], row[6])
orders.append(new_order)
Helper_App.console_log(f'orders: {orders}')
# Errors
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return orders
"""
def get_many_user(self, user_filters, user=None):
_m = 'DataStore_User.get_many_user'
Helper_App.console_log(_m)
# av.val_str(user_filters, 'user_filters', _m)
# av.val_list(permutations, 'list_permutations', _m, Product_Permutation, 1)
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
guid = Helper_DB_MySQL.create_guid()
# now = datetime.now()
# user = self.get_user_session()
"""
argument_dict_list = {
'a_id_user': id_user,
'a_comment': comment,
'a_guid': guid
}
"""
if user is None:
user = self.get_user_session()
argument_dict_list = {
@@ -142,92 +83,47 @@ class DataStore_User(DataStore_Base):
, 'a_debug': 0
}
# argument_dict_list['a_guid'] = guid
result = self.db_procedure_execute('p_get_many_user', argument_dict_list)
"""
query = text(f"SELECT * FROM Shop_Calc_User_Temp UE_T WHERE UE_T.guid = '{guid}'")
result = self.db.session.execute(query)
"""
cursor = result.cursor
result_set = cursor.fetchall()
Helper_App.console_log(f'raw users: {result_set}')
Helper_App.console_log(f'type result set: {str(type(result_set))}')
Helper_App.console_log(f'len result set: {len(result_set)}')
"""
user_permission_evals = []
for row in result_set:
user_permission_eval = User_Permission_Evaluation.from_DB_user_eval(row)
user_permission_evals.append(user_permission_eval)
Helper_App.console_log(f'user_permission_evals: {user_permission_evals}')
"""
users = []
if len(result_set) > 0:
for row in result_set:
Helper_App.console_log(f'row: {row}')
user = User.from_DB_user(row)
users.append(user)
Helper_App.console_log(f'user {str(type(user))}: {user}')
Helper_App.console_log(f'type users: {str(type(users))}\n type user 0: {str(type(None if len(users) == 0 else users[0]))}')
# error_list, cursor = self.get_error_list_from_cursor(cursor)
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return users, errors
def get_many_user(self, user_filters, user=None):
_m = 'DataStore_User.get_many_user'
Helper_App.console_log(_m)
# av.val_str(user_filters, 'user_filters', _m)
# av.val_list(permutations, 'list_permutations', _m, Product_Permutation, 1)
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
guid = Helper_DB_MySQL.create_guid()
# now = datetime.now()
# user = self.get_user_session()
"""
argument_dict_list = {
'a_id_user': id_user,
'a_comment': comment,
'a_guid': guid
}
"""
if user is None:
user = self.get_user_session()
argument_dict_list = {
# 'a_guid': guid
'a_id_user': user.id_user
, 'a_id_user_auth0': user.id_user_auth0
, **user_filters.to_json()
, 'a_debug': 0
}
# argument_dict_list['a_guid'] = guid
result = self.db_procedure_execute('p_get_many_user', argument_dict_list)
"""
query = text(f"SELECT * FROM Shop_Calc_User_Temp UE_T WHERE UE_T.guid = '{guid}'")
result = self.db.session.execute(query)
"""
cursor = result.cursor
result_set = cursor.fetchall()
Helper_App.console_log(f'raw users: {result_set}')
Helper_App.console_log(f'type result set: {str(type(result_set))}')
Helper_App.console_log(f'len result set: {len(result_set)}')
"""
user_permission_evals = []
for row in result_set:
user_permission_eval = User_Permission_Evaluation.from_DB_user_eval(row)
user_permission_evals.append(user_permission_eval)
Helper_App.console_log(f'user_permission_evals: {user_permission_evals}')
"""
users = []
if len(result_set) > 0:
for row in result_set:
Helper_App.console_log(f'row: {row}')
user = User.from_DB_user(row)
users.append(user)
Helper_App.console_log(f'user {str(type(user))}: {user}')
errors = []
cursor.nextset()
result_set_e = cursor.fetchall()
Helper_App.console_log(f'raw errors: {result_set_e}')
if len(result_set_e) > 0:
errors = [SQL_Error.from_DB_record(row) for row in result_set_e] # [SQL_Error(row[0], row[1]) for row in result_set_e]
for error in errors:
Helper_App.console_log(f"Error [{error.code}]: {error.msg}")
DataStore_User.db_cursor_clear(cursor)
return users, errors
def get_many_user(self, user_filters, user=None):
_m = 'DataStore_User.get_many_user'
av.val_instance(user_filters, 'user_filters', _m, Parameters_User)
guid = Helper_DB_MySQL.create_guid()
if user is None:
user = self.get_user_session()
argument_dict_list = {
# 'a_guid': guid
'a_id_user': user.id_user
, 'a_id_user_auth0': user.id_user_auth0
, **user_filters.to_json()
, 'a_debug': 0
}
result = self.db_procedure_execute('p_get_many_user', argument_dict_list)
cursor = result.cursor
result_set = cursor.fetchall()
users = []
if len(result_set) > 0:
for row in result_set: