255 lines
10 KiB
Python
255 lines
10 KiB
Python
"""
|
|
Project: PARTS Website
|
|
Author: Edward Middleton-Smith
|
|
Precision And Research Technology Systems Limited
|
|
|
|
Technology: App Routing
|
|
Feature: User Routes
|
|
|
|
Description:
|
|
Initialises the Flask application, sets the configuration based on the environment, and defines two routes (/ and /about) that render templates with the specified titles.
|
|
"""
|
|
|
|
# IMPORTS
|
|
# internal
|
|
from models.model_view_base import Model_View_Base
|
|
from models.model_view_user import Model_View_User
|
|
from business_objects.tcg.user import User, Parameters_User
|
|
from datastores.datastore_user import DataStore_User
|
|
from forms.tcg.user import Filters_User
|
|
from helpers.helper_app import Helper_App
|
|
import lib.argument_validation as av
|
|
# external
|
|
from flask import Flask, render_template, jsonify, request, render_template_string, send_from_directory, redirect, url_for, session, Blueprint, current_app
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import exc
|
|
from flask_wtf.csrf import generate_csrf
|
|
from werkzeug.exceptions import BadRequest
|
|
from extensions import oauth # db,
|
|
from urllib.parse import quote_plus, urlencode
|
|
from authlib.integrations.flask_client import OAuth
|
|
from authlib.integrations.base_client import OAuthError
|
|
from urllib.parse import quote, urlparse, parse_qs
|
|
from functools import wraps
|
|
|
|
db = SQLAlchemy()
|
|
|
|
routes_user = Blueprint('routes_user', __name__)
|
|
|
|
def handle_db_disconnect(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
try:
|
|
return f(*args, **kwargs)
|
|
except exc.OperationalError as e:
|
|
if "MySQL server has gone away" in str(e):
|
|
# Close the session and create a new connection
|
|
db.session.remove()
|
|
db.session.rollback()
|
|
# Retry the operation
|
|
return f(*args, **kwargs)
|
|
raise
|
|
return decorated_function
|
|
|
|
# User authentication
|
|
@routes_user.route("/login", methods=['POST', 'OPTIONS']) # required endpoint for Auth0
|
|
def login():
|
|
oauth = current_app.extensions['authlib.integrations.flask_client']
|
|
try:
|
|
# Helper_App.console_log('login')
|
|
# Helper_App.console_log(f'method={request.method}')
|
|
try:
|
|
data = request.json
|
|
try:
|
|
data = request.get_json()
|
|
except:
|
|
data = {}
|
|
except:
|
|
data = {}
|
|
# Helper_App.console_log(f'data={data}')
|
|
hash_callback = data.get(Model_View_Base.FLAG_CALLBACK, Model_View_Base.HASH_PAGE_MTG_HOME)
|
|
# Helper_App.console_log(f'hash_callback: {hash_callback}')
|
|
|
|
uri_redirect = url_for('routes_user.login_callback', _external=True)
|
|
# Helper_App.console_log(f'redirect uri: {uri_redirect}')
|
|
|
|
# Helper_App.console_log(f'Before red')
|
|
# Helper_App.console_log(f"Registered clients: {list(oauth._clients.keys())}")
|
|
try:
|
|
with current_app.app_context():
|
|
red = oauth.auth0.authorize_redirect(
|
|
redirect_uri = uri_redirect
|
|
, state = quote(hash_callback)
|
|
)
|
|
except Exception as e:
|
|
Helper_App.console_log(f"Error: {str(e)}")
|
|
# Helper_App.console_log(f'redirect: {red}')
|
|
headers = red.headers['Location']
|
|
# Helper_App.console_log(f'headers: {headers}')
|
|
parsed_url = urlparse(headers)
|
|
query_params = parse_qs(parsed_url.query)
|
|
Helper_App.console_log(f"""
|
|
OAuth Authorize Redirect URL:
|
|
|
|
Base URL: {parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}
|
|
{parsed_url}
|
|
|
|
Query Parameters: {query_params}
|
|
""")
|
|
return jsonify({'Success': True, Model_View_Base.FLAG_STATUS: Model_View_Base.FLAG_SUCCESS, f'{Model_View_Base.FLAG_CALLBACK}': headers})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)}), 400
|
|
|
|
|
|
@routes_user.route("/login_callback")
|
|
@handle_db_disconnect
|
|
def login_callback():
|
|
oauth = current_app.extensions['authlib.integrations.flask_client']
|
|
# Helper_App.console_log('login_callback')
|
|
try:
|
|
# Helper_App.console_log(f'Redceived state: {request.args.get("state")}')
|
|
token = None
|
|
try:
|
|
token = oauth.auth0.authorize_access_token()
|
|
except Exception as e:
|
|
Helper_App.console_log(f"Error: {str(e)}")
|
|
session[current_app.config['ID_TOKEN_USER']] = token
|
|
|
|
try:
|
|
hash_callback = token.get('hash_callback')
|
|
if hash_callback is None:
|
|
Helper_App.console_log('hash is none')
|
|
state = request.args.get('state')
|
|
Helper_App.console_log(f'state: {state}')
|
|
hash_callback = state
|
|
Helper_App.console_log(f'hash_callback: {hash_callback}')
|
|
except:
|
|
Helper_App.console_log("get hash callback failed")
|
|
|
|
error_state = request.args.get(Model_View_User.FLAG_ERROR_OAUTH)
|
|
has_error = error_state is not None
|
|
if has_error:
|
|
error_description = request.args.get(Model_View_User.FLAG_ERROR_DESCRIPTION_OAUTH)
|
|
error_text = f'Error: {error_state}: {error_description}'
|
|
Helper_App.console_log(error_text)
|
|
return redirect(f"{current_app.config['URL_HOST']}{hash_callback}") # login()
|
|
|
|
user = User.from_json_auth0(token)
|
|
Helper_App.console_log(f'user: {user}')
|
|
"""
|
|
session[Model_View_Base.FLAG_USER] = user.to_json()
|
|
Helper_App.console_log(f'user stored on session')
|
|
"""
|
|
|
|
datastore_user = DataStore_User()
|
|
try:
|
|
saved_user, errors = datastore_user.login_user(user)
|
|
if (len(errors) > 0): raise ValueError(f'Database errors: {errors}')
|
|
Helper_App.console_log('User logged in')
|
|
Helper_App.console_log(f'user ({str(type(saved_user))}): {saved_user}')
|
|
Helper_App.console_log(f'user key: {Model_View_Base.FLAG_USER}')
|
|
saved_user_json = saved_user.to_json()
|
|
Helper_App.console_log(f'User JSON: {saved_user_json}')
|
|
session[Model_View_Base.FLAG_USER] = saved_user_json
|
|
Helper_App.console_log(f'user stored on session')
|
|
except Exception as e:
|
|
Helper_App.console_log(f'User not found: {saved_user}\nDatabase query error: {errors}')
|
|
|
|
|
|
Helper_App.console_log(f'user session: {session.get(Model_View_Base.FLAG_USER, "(Key not found)")}')
|
|
return redirect(f"{current_app.config['URL_HOST']}{hash_callback}")
|
|
except Exception as e:
|
|
return jsonify({Model_View_Base.FLAG_STATUS: Model_View_Base.FLAG_FAILURE, Model_View_Base.FLAG_MESSAGE: f'Controller error.\n{e}'})
|
|
|
|
@routes_user.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
url_logout = f"https://{current_app.config['DOMAIN_AUTH0']}/v2/logout?" + urlencode(
|
|
{
|
|
"returnTo": url_for("routes_user.logout_callback", _external=True),
|
|
"client_id": current_app.config['ID_AUTH0_CLIENT'],
|
|
}
|
|
)
|
|
Helper_App.console_log(f"Redirecting to {url_logout}")
|
|
return redirect(url_logout)
|
|
|
|
@routes_user.route("/logout_callback")
|
|
@handle_db_disconnect
|
|
def logout_callback():
|
|
return redirect(url_for('routes_mtg_game.home'))
|
|
|
|
@routes_user.route(Model_View_User.HASH_PAGE_USER_ACCOUNT)
|
|
def user():
|
|
try:
|
|
user_session = Model_View_User.get_user_session()
|
|
if not user_session.get_is_logged_in():
|
|
return redirect(url_for('routes_mtg_game.home'))
|
|
form_filters = Filters_User.get_default()
|
|
model = Model_View_User(form_filters_old = form_filters)
|
|
model._title = model.user.firstname
|
|
# model.users = [model.user]
|
|
html_body = render_template('pages/user/_user.html', model = model)
|
|
except Exception as e:
|
|
return str(e)
|
|
return html_body
|
|
|
|
@routes_user.route(Model_View_User.HASH_PAGE_USER_ACCOUNTS)
|
|
def users():
|
|
try:
|
|
Helper_App.console_log(f'request_args: {request.args}')
|
|
user_session = Model_View_User.get_user_session()
|
|
if (not user_session.get_is_logged_in()) or (not user_session.can_admin_user):
|
|
return redirect(url_for('routes_mtg_game.home'))
|
|
try:
|
|
form_filters = Filters_User.from_json(request.args)
|
|
except Exception as e:
|
|
Helper_App.console_log(f'Error: {e}')
|
|
form_filters = Filters_User.get_default()
|
|
model = Model_View_User(form_filters, hash_page_current = Model_View_User.HASH_PAGE_USER_ACCOUNTS)
|
|
html_body = render_template('pages/user/_users.html', model = model)
|
|
except Exception as e:
|
|
return str(e)
|
|
return html_body
|
|
|
|
|
|
@routes_user.route(Model_View_User.HASH_SAVE_USER_USER, methods=['POST'])
|
|
def save_user():
|
|
data = Helper_App.get_request_data(request)
|
|
try:
|
|
form_filters = Filters_User.from_json(data[Model_View_User.FLAG_FORM_FILTERS])
|
|
if not form_filters.validate_on_submit():
|
|
return jsonify({
|
|
Model_View_User.FLAG_STATUS: Model_View_User.FLAG_FAILURE,
|
|
Model_View_User.FLAG_MESSAGE: f'Filters form invalid.\n{form_filters.errors}'
|
|
})
|
|
model_return = Model_View_User(form_filters_old=form_filters)
|
|
if not model_return.is_user_logged_in:
|
|
return redirect(url_for('routes_mtg_game.home'))
|
|
|
|
users = data[Model_View_User.FLAG_USER]
|
|
if len(users) == 0:
|
|
return jsonify({
|
|
Model_View_User.FLAG_STATUS: Model_View_User.FLAG_FAILURE,
|
|
Model_View_User.FLAG_MESSAGE: f'No users.'
|
|
})
|
|
objs_user = []
|
|
for user in users:
|
|
objs_user.append(User.from_json(user))
|
|
Helper_App.console_log(f'objs_user={objs_user}')
|
|
errors = DataStore_User.save_users(data.get('comment', 'No comment'), objs_user)
|
|
|
|
if (len(errors) > 0):
|
|
return jsonify({
|
|
Model_View_User.FLAG_STATUS: Model_View_User.FLAG_FAILURE,
|
|
Model_View_User.FLAG_MESSAGE: f'Error saving users.\n{model_return.convert_list_objects_to_json(errors)}'
|
|
})
|
|
return jsonify({
|
|
Model_View_User.FLAG_STATUS: Model_View_User.FLAG_SUCCESS,
|
|
Model_View_User.FLAG_DATA: Model_View_User.convert_list_objects_to_json(model_return.users)
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
Model_View_User.FLAG_STATUS: Model_View_User.FLAG_FAILURE,
|
|
Model_View_User.FLAG_MESSAGE: f'Bad data received by controller.\n{e}'
|
|
}) |