Files
tcg_web_scraper/product_scraping/product_scraper.py

992 lines
45 KiB
Python

import pandas as pd
from openpyxl import load_workbook, Workbook
from openpyxl.worksheet.worksheet import Worksheet
import requests
import re
import time
import random
from playwright.sync_api import sync_playwright, Browser, Page
from playwright.async_api import async_playwright
import asyncio
from aioconsole import ainput
from collections import defaultdict
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import subprocess
import os
CYCLE_INTERVAL_HOURS = 4
class Email_Notifier:
def __init__(self, sender_email, sender_password, recipient_email, smtp_host='smtp.gmail.com', smtp_port=587):
self.sender_email = sender_email
self.sender_password = sender_password
self.recipient_email = recipient_email
self.smtp_host = smtp_host
self.smtp_port = smtp_port
def send_email(self, subject, body_html):
msg = MIMEMultipart('alternative')
msg['From'] = self.sender_email
msg['To'] = self.recipient_email
msg['Subject'] = subject
msg.attach(MIMEText(body_html, 'html'))
try:
if self.smtp_port == 465:
with smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) as server:
server.login(self.sender_email, self.sender_password)
server.send_message(msg)
else:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.sender_email, self.sender_password)
server.send_message(msg)
print(f"Email sent: {subject}")
return True
except Exception as e:
print(f"Error sending email: {e}")
return False
class Profitability_Monitor:
MTG_SET_WORKSHEET_NAME = 'MTG Set'
MTG_SET_DATA_START_ROW = 3 # Row 1 is a merged group header, row 2 has column names
# MTG Set sheet column indices (1-based)
COL_MTG_SET_NAME = 2
COL_PLAY_PROFIT = 45
COL_PLAY_MARGIN = 46
COL_COLLECTOR_PROFIT = 53
COL_COLLECTOR_MARGIN = 54
COL_PLAY_SINGLES_PROFIT = 59
COL_PLAY_SINGLES_MARGIN = 60
COL_COLLECTOR_SINGLES_PROFIT = 73
COL_COLLECTOR_SINGLES_MARGIN = 74
# Product sheet column indices (1-based) for precons
COL_PRODUCT_NAME = 2
COL_PRODUCT_IS_PRECON = 7
COL_PRODUCT_MIN_COST = 8
COL_PRODUCT_PROFIT = 10
PROFIT_CHECKS = [
{'profit_col': COL_PLAY_PROFIT, 'margin_col': COL_PLAY_MARGIN, 'action_buy': 'Buy Play Booster', 'action_no_buy': 'DO NOT Buy Play Booster'},
{'profit_col': COL_COLLECTOR_PROFIT, 'margin_col': COL_COLLECTOR_MARGIN, 'action_buy': 'Buy Collector Booster', 'action_no_buy': 'DO NOT Buy Collector Booster'},
{'profit_col': COL_PLAY_SINGLES_PROFIT, 'margin_col': COL_PLAY_SINGLES_MARGIN, 'action_buy': 'Split Play Booster', 'action_no_buy': 'DO NOT Split Play Booster'},
{'profit_col': COL_COLLECTOR_SINGLES_PROFIT, 'margin_col': COL_COLLECTOR_SINGLES_MARGIN, 'action_buy': 'Split Collector Booster','action_no_buy': 'DO NOT Split Collector Booster'},
]
def read_states(self, workbook_path):
"""Load the workbook with data_only=True to read formula-calculated profit values."""
wb = load_workbook(workbook_path, data_only=True)
mtg_set_states = {}
precon_states = {}
mtg_sheet = wb[self.MTG_SET_WORKSHEET_NAME]
for row in range(self.MTG_SET_DATA_START_ROW, mtg_sheet.max_row + 1):
set_name = mtg_sheet.cell(row, self.COL_MTG_SET_NAME).value
if not set_name:
continue
set_data = {}
for check in self.PROFIT_CHECKS:
profit = mtg_sheet.cell(row, check['profit_col']).value
margin = mtg_sheet.cell(row, check['margin_col']).value
set_data[check['profit_col']] = {
'profit': profit,
'margin': margin,
'is_profitable': isinstance(profit, (int, float)) and profit > 0,
}
mtg_set_states[set_name] = set_data
prod_sheet = wb['Product']
for row in range(2, prod_sheet.max_row + 1):
is_precon = prod_sheet.cell(row, self.COL_PRODUCT_IS_PRECON).value
if not is_precon:
continue
name = prod_sheet.cell(row, self.COL_PRODUCT_NAME).value
profit = prod_sheet.cell(row, self.COL_PRODUCT_PROFIT).value
min_cost = prod_sheet.cell(row, self.COL_PRODUCT_MIN_COST).value
if not name:
continue
margin = (profit / min_cost) if (isinstance(profit, (int, float)) and isinstance(min_cost, (int, float)) and min_cost != 0) else None
precon_states[name] = {
'profit': profit,
'margin': margin,
'is_profitable': isinstance(profit, (int, float)) and profit > 0,
}
wb.close()
return {'mtg_set': mtg_set_states, 'precon': precon_states}
def find_changes(self, old_states, new_states):
"""Compare old and new profit states; return list of alert dicts for any crossings of the 0 threshold."""
alerts = []
for check in self.PROFIT_CHECKS:
col = check['profit_col']
for set_name, new_set_data in new_states['mtg_set'].items():
new_entry = new_set_data.get(col, {})
old_entry = old_states.get('mtg_set', {}).get(set_name, {}).get(col, {})
old_profitable = old_entry.get('is_profitable', None)
new_profitable = new_entry.get('is_profitable', False)
if old_profitable is None or old_profitable == new_profitable:
continue
action = check['action_buy'] if new_profitable else check['action_no_buy']
margin = new_entry.get('margin')
margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A"
alerts.append({'name': set_name, 'action': action, 'margin': margin_str})
for product_name, new_entry in new_states['precon'].items():
old_entry = old_states.get('precon', {}).get(product_name, {})
old_profitable = old_entry.get('is_profitable', None)
new_profitable = new_entry.get('is_profitable', False)
if old_profitable is None or old_profitable == new_profitable:
continue
action = 'Buy Precon' if new_profitable else 'DO NOT Buy Precon'
margin = new_entry.get('margin')
margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A"
alerts.append({'name': product_name, 'action': action, 'margin': margin_str})
return alerts
def format_email_html(self, alerts):
rows = ''.join(
f"<tr><td style='padding:6px 12px'>{a['name']}</td>"
f"<td style='padding:6px 12px'>{a['action']}</td>"
f"<td style='padding:6px 12px;text-align:right'>{a['margin']}</td></tr>"
for a in alerts
)
return (
"<html><body>"
"<h2 style='font-family:sans-serif'>TCG Profitability Alert</h2>"
"<table border='1' cellpadding='0' cellspacing='0' style='border-collapse:collapse;font-family:sans-serif'>"
"<tr style='background:#ddd'>"
"<th style='padding:6px 12px'>MTG Set</th>"
"<th style='padding:6px 12px'>Action</th>"
"<th style='padding:6px 12px'>Margin</th>"
"</tr>"
f"{rows}"
"</table>"
f"<p style='font-family:sans-serif;color:#666'><small>Generated {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</small></p>"
"</body></html>"
)
class Product_Scraper:
domain: str
page: Page
def __init__(self, domain):
print("Setting up browser automation")
self.domain = domain
@staticmethod
def parse_cost(cost_text):
if not cost_text:
return None
cost_clean = re.sub(r'[^\d,]', '', cost_text)
try:
return float(cost_clean) / 100
except ValueError:
return None
@classmethod
def parse_cost_from_pennies(cls, cost_text):
if not cost_text:
return None
cost_clean = cls.parse_cost(cost_text = cost_text)
if cost_clean is not None:
cost_clean = cost_clean / 100
return cost_clean
@classmethod
def parse_cost_chaoscards(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_cardmarket(cls, cost_text):
"""Convert '141,30 €' format to float in EUR"""
if not cost_text:
return None
cost_clean = re.sub(r'[^\d,]', '', cost_text)
cost_clean = cost_clean.replace(',', '.')
try:
return float(cost_clean)
except ValueError:
return None
@classmethod
def parse_cost_gameslore(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_magicmadhouse(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_newrealitiesgaming(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses):
print(f" Loading page...")
self.page = await browser.new_page()
cost = None
active = None
try:
await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000)
await asyncio.sleep(random.uniform(20, 25))
element = self.page.locator(selector = page_load_element_selector)
page_title = await self.page.title()
print(f" Page title: {page_title}")
element = self.page.locator(selector = cost_selector)
text = await element.text_content()
print(f" Text: '{text}'")
cost = text
active = None
if active_selector is None:
active = (cost is not None)
else:
try:
elements = await self.page.query_selector_all(selector = active_selector)
print(f'# active elements: {len(elements)}')
if len(elements) == 0:
active = True
else:
text = await elements[0].text_content()
text = text.strip()
print(f" Text: '{text}'")
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
except Exception as e:
print(f" Selector failed: {e}")
if cost is None or active is None:
print(f" ✗ No cost found")
# await ainput("Press Enter to continue to next URL...")
print(f"Cost: {cost}, Active: {active}")
except Exception as e:
print(f" Error: {e}")
# await ainput("Press Enter to continue to next URL...")
return None, None
finally:
await self.page.close()
return cost, active
async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate):
page_load_element_selector = "body > main.container > div.page-title-container"
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = None
, invalid_active_statuses = []
)
cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text)
if cost is not None:
item_shipping_cost_in = 0
if cost < 10:
item_shipping_cost_in = 2
elif cost < 100:
item_shipping_cost_in = 8
else:
item_shipping_cost_in = 20
cost = cost * eur_to_gbp_rate + item_shipping_cost_in
active = (cost is not None)
return cost, active
async def scrape_cost_and_active_playwright_chaoscards(self, browser, url):
cost_selector = '.price_inc > span:nth-child(2)'
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = cost_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ["Out of stock", "Coming soon"]
)
cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_gameslore(self, browser, url):
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
active_selector = '.stock > span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = cost_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ["OUT OF STOCK"]
)
cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url):
page_load_element_selector = '.productView-title'
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
active_selector = '.alertBox.alertBox--error'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = []
)
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url):
button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button'
page_load_element_selector = button_selector
cost_selector = f'{button_selector} span:nth-child(2)'
active_selector = f'{button_selector} span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ['Out of stock']
)
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
return cost, active
async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate):
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
quantity_selector = 'div.amount-container > span:nth-child(1)'
print(f" Loading page...")
self.page = await browser.new_page()
try:
await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000)
await asyncio.sleep(random.uniform(20, 25))
page_title = await self.page.title()
print(f" Page title: {page_title}")
price_quantity_pairs = []
try:
offer_containers = await self.page.query_selector_all(offer_container_selector)
print(f" Offer container selector: Found {len(offer_containers)} elements")
for offer_container in offer_containers:
price_element = await offer_container.query_selector(price_selector)
price_text = await price_element.text_content()
if '' in price_text and re.search(r'\d', price_text):
print(f" ✓ Found price: {price_text}")
else:
price_text = None
quantity_element = await offer_container.query_selector(quantity_selector)
quantity_text = await quantity_element.text_content()
if price_text is None or quantity_text is None:
continue
price_quantity_pairs.append({
'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate
, 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text)
})
except Exception as e:
print(f" Price selector failed: {e}")
# await ainput("Press enter to continue to next URL...")
return []
finally:
await self.page.close()
return price_quantity_pairs
class TCG_Sole_Trader_Workbook_Container:
NAME_COLUMN_ACTIVE: str = 'Active'
NAME_COLUMN_INDEX_ROW: str = 'Index Row'
NAME_COLUMN_LINK: str = 'Link'
NAME_COLUMN_PRODUCT_ID: str = 'Product Id'
NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster'
NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box'
NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon'
NAME_COLUMN_SOURCE_NAME: str = 'Source Name'
NAME_COLUMN_UNIT_COST: str = 'Cost'
NAME_COLUMN_UNIT_PRICE: str = 'Price'
PRODUCT_WORKSHEET_NAME = 'Product'
SOURCING_WORKSHEET_NAME = 'Sourcing'
WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx'
index_column_active_sourcing: int
index_column_is_booster_product: int
index_column_is_booster_box_product: int
index_column_is_precon_product: int
index_column_link_sourcing: int
index_column_name_sourcing: int
index_column_product_id_product: int
index_column_product_id_sourcing: int
index_column_unit_cost_sourcing: int
index_column_unit_price_sourcing: int
index_row_header_product: int
index_row_header_sourcing: int
product_sheet: Worksheet
sourcing_sheet: Worksheet
workbook: Workbook
def __init__(self):
print("Loading workbook...")
self.workbook = load_workbook(self.WORKBOOK_NAME)
if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames:
print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found")
return
if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames:
print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found")
return
self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME]
self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME]
sourcing_table_found = False
for row in range(1, self.sourcing_sheet.max_row + 1):
if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
self.index_row_header_sourcing = row
sourcing_table_found = True
break
if not sourcing_table_found or not self.index_row_header_sourcing:
for row in range(1, min(20, self.sourcing_sheet.max_row + 1)):
if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
self.index_row_header_sourcing = row
sourcing_table_found = True
break
if not sourcing_table_found:
print("Error: Could not find table 'tbl_Sourcing'")
return
product_table_found = False
for row in range(1, self.product_sheet.max_row + 1):
if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value):
self.index_row_header_product = row
product_table_found = True
break
if not product_table_found:
print("Error: Could not find table 'tbl_Product'")
return
for index_column in range(1, self.sourcing_sheet.max_column + 1):
header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip()
if 'Source Name' == header:
self.index_column_name_sourcing = index_column
elif 'Source Link' == header:
self.index_column_link_sourcing = index_column
elif 'Source Unit Cost' == header:
self.index_column_unit_cost_sourcing = index_column
elif 'Sale Price' == header:
self.index_column_unit_price_sourcing = index_column
elif 'Active' == header:
self.index_column_active_sourcing = index_column
elif 'Product Id' == header:
self.index_column_product_id_sourcing = index_column
for index_column in range(1, self.product_sheet.max_column + 1):
header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip()
if 'Is Booster Box' == header:
self.index_column_is_booster_box_product = index_column
elif 'Is Booster' == header:
self.index_column_is_booster_product = index_column
elif 'Is Precon' == header:
self.index_column_is_precon_product = index_column
elif 'Product Id' == header:
self.index_column_product_id_product = index_column
print(f"Sourcing max row: {self.sourcing_sheet.max_row}")
print(f"Sourcing header row: {self.index_row_header_sourcing}")
print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}")
print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}")
print(f"Product max row: {self.product_sheet.max_row}")
print(f"Product header row: {self.index_row_header_product}")
print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}")
print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}")
if not all([
self.index_column_name_sourcing
, self.index_column_link_sourcing
, self.index_column_unit_cost_sourcing
, self.index_column_unit_price_sourcing
, self.index_column_product_id_sourcing
, self.index_column_active_sourcing
, self.index_column_product_id_product
, self.index_column_is_booster_product
, self.index_column_is_booster_box_product
, self.index_column_is_precon_product
]):
print("Error: Could not find required columns")
return
@classmethod
def create_product_source_df(cls):
return pd.DataFrame(columns = [
cls.NAME_COLUMN_INDEX_ROW
, cls.NAME_COLUMN_PRODUCT_ID
, cls.NAME_COLUMN_SOURCE_NAME
, cls.NAME_COLUMN_LINK
, cls.NAME_COLUMN_PRODUCT_IS_BOOSTER
, cls.NAME_COLUMN_UNIT_COST
, cls.NAME_COLUMN_UNIT_PRICE
, cls.NAME_COLUMN_ACTIVE
])
def get_sourcing_entries(self):
product_sources = self.create_product_source_df()
try:
for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1):
source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value
source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value
source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value
if not source_name or not source_link:
continue
print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
product_is_booster = False
for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1):
product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value
if product_id == source_product_id:
product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper()
product_is_booster = (product_is_booster_text == "TRUE")
break
print(f"product is booster: {product_is_booster}")
product_sources.loc[len(product_sources)] = [
index_row
, source_product_id
, source_name
, source_link
, product_is_booster
, None # cost
, None # price
, None # active
]
except Exception as e:
print(f"Error: {e}")
product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME)
return product_sources
def clear_row_sourcing_sheet(self, index_row):
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE"
def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None):
if unit_cost is not None:
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost
if unit_price is not None:
self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price
if active is not None:
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE"
def save_workbook(self):
print(f"\n{'='*60}")
print(f"Saving workbook...")
self.workbook.save(self.WORKBOOK_NAME)
class Cost_Fetcher:
ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On'
ACTIVE_FLAG: str = 'Active'
COST_FLAG: str = 'Cost'
DATA_FLAG: str = 'Data'
ERROR_FLAG: str = 'Error'
INDEX_DOMAIN_FLAG: str = 'Index Domain'
INDEX_ROW_FLAG: str = 'Index Row'
NAME_DOMAIN_CARD_MARKET: str = 'Card Market'
NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards'
NAME_DOMAIN_GAMES_LORE: str = 'Games Lore'
NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse'
NAME_DOMAIN_NEW_REALITIES_GAMING: str = 'New Realities Gaming'
NAME_FLAG: str = 'Name'
PRICE_FLAG: str = 'Price'
SUCCESS_FLAG: str = 'Success'
URL_FLAG: str = 'Url'
domain_names: list[str]
eur_to_gbp_rate: float
product_scrapers: list[Product_Scraper]
product_sources: pd.DataFrame
workbook_container: TCG_Sole_Trader_Workbook_Container
def __init__(self, email_notifier=None):
self.email_notifier = email_notifier
self.profitability_monitor = Profitability_Monitor()
self.workbook_save_lock = asyncio.Lock()
self.domain_names = [
# self.NAME_DOMAIN_CARD_MARKET
self.NAME_DOMAIN_CHAOS_CARDS
, self.NAME_DOMAIN_GAMES_LORE
, self.NAME_DOMAIN_MAGIC_MADHOUSE
# , self.NAME_DOMAIN_NEW_REALITIES_GAMING
]
self.domain_details = {
self.NAME_DOMAIN_CHAOS_CARDS: {
self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS)
, self.ACCESSED_LAST_ON_FLAG: 0
}
, self.NAME_DOMAIN_GAMES_LORE: {
self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE)
, self.ACCESSED_LAST_ON_FLAG: 0
}
, self.NAME_DOMAIN_MAGIC_MADHOUSE: {
self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE)
, self.ACCESSED_LAST_ON_FLAG: 0
}
}
"""
self.NAME_DOMAIN_CARD_MARKET: {
self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET)
, self.ACCESSED_LAST_ON_FLAG: 0
}
"""
"""
, self.NAME_DOMAIN_NEW_REALITIES_GAMING: {
self.NAME_FLAG: self.NAME_DOMAIN_NEW_REALITIES_GAMING
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_NEW_REALITIES_GAMING)
, self.ACCESSED_LAST_ON_FLAG: 0
}
"""
product_scrapers = []
for index_domain in range(len(self.domain_names)):
domain = self.domain_names[index_domain]
product_scraper = Product_Scraper(domain)
product_scrapers.append(product_scraper)
self.product_scrapers = product_scrapers
self.workbook_container = None
self.eur_to_gbp_rate = 0.85
def get_index_domain_from_name(self, domain_name):
for index_domain in range(len(self.domain_names)):
if (self.domain_names[index_domain] == domain_name):
return index_domain
raise ValueError(f'Domain does not exist: {domain_name}')
def get_eur_to_gbp_rate(self):
try:
response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10)
data = response.json()
self.eur_to_gbp_rate = data['rates']['GBP']
except Exception as e:
print(f"Error fetching exchange rate: {e}")
print("Using fallback rate: 0.85")
self.eur_to_gbp_rate = 0.85
async def fetch_all(self):
try:
if self.email_notifier:
sent = self.email_notifier.send_email(
subject=f"TCG Profitability Scanner Boot - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
, body_html="<html><body><h2>Booted</h2></body></html>"
)
if sent:
print("Sent boot test email")
else:
print("Error sending boot test email")
# Reload workbook and exchange rate fresh each cycle
self.get_eur_to_gbp_rate()
self.workbook_container = TCG_Sole_Trader_Workbook_Container()
self.product_sources = self.workbook_container.get_sourcing_entries()
workbook_path = os.path.abspath(TCG_Sole_Trader_Workbook_Container.WORKBOOK_NAME)
# Snapshot profitability before any scraping
print("Reading current profitability states...")
old_profit_states = self.profitability_monitor.read_states(workbook_path)
# Group product sources by domain
domain_groups = {domain: [] for domain in self.domain_names}
for _, product_source in self.product_sources.iterrows():
source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME]
if source_name in domain_groups:
domain_groups[source_name].append(product_source)
# Create one browser per domain and process all URLs; saves workbook after each item
processed_count = 0
updated_count = 0
async with async_playwright() as p:
domain_tasks = []
for domain_name in self.domain_names:
if domain_groups[domain_name]:
browser = await p.chromium.launch(headless=False)
task = self.process_domain_urls(browser, domain_name, domain_groups[domain_name])
domain_tasks.append(task)
all_domain_results = await asyncio.gather(*domain_tasks)
for domain_results in all_domain_results:
for result in domain_results:
processed_count += 1
if result[self.ACTIVE_FLAG]:
updated_count += 1
print(f"\nComplete! Processed: {processed_count} entries, Updated: {updated_count} costs")
# Recalculate spreadsheet formulas and check for profitability changes
if self.email_notifier:
recalculated = self.recalculate_workbook(workbook_path)
if recalculated:
new_profit_states = self.profitability_monitor.read_states(workbook_path)
alerts = self.profitability_monitor.find_changes(old_profit_states, new_profit_states)
if alerts:
html = self.profitability_monitor.format_email_html(alerts)
self.email_notifier.send_email(
subject=f"TCG Profitability Alert - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
, body_html=html
)
print(f"Sent {len(alerts)} profitability alert(s).")
else:
print("No profitability changes detected.")
except Exception as e:
import traceback
print(f"Error in fetch_all: {e}")
traceback.print_exc()
async def process_domain_urls(self, browser, domain_name, product_sources):
"""Process all URLs for a single domain sequentially with rate limiting.
Saves the workbook immediately after each item is cleared (before scrape)
and again after each result is written (after scrape)."""
results = []
last_access_time = 0
try:
for product_source in product_sources:
# Rate limiting: wait between requests to the same domain
time_since_last = time.time() - last_access_time
if time_since_last < 45:
wait_time = 45 - time_since_last + random.uniform(0, 5)
print(f" [{domain_name}] Waiting {wait_time:.1f}s before next request...")
await asyncio.sleep(wait_time)
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW]
# Clear stale data and persist before scraping
async with self.workbook_save_lock:
self.workbook_container.clear_row_sourcing_sheet(index_row)
self.workbook_container.save_workbook()
result = await self.fetch_single_with_browser(browser, domain_name, product_source)
# Write fresh data and persist immediately
async with self.workbook_save_lock:
if result[self.ACTIVE_FLAG]:
self.workbook_container.update_row_sourcing_sheet(
index_row=result[self.INDEX_ROW_FLAG]
, unit_cost=result[self.COST_FLAG]
, unit_price=result[self.PRICE_FLAG]
, active=result[self.ACTIVE_FLAG]
)
self.workbook_container.save_workbook()
results.append(result)
last_access_time = time.time()
finally:
await browser.close()
return results
async def fetch_single_with_browser(self, browser, domain_name, product_source):
"""Fetch a single URL using the provided browser"""
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW]
source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK]
Cost_Fetcher.log_processing_new_row(
index_row = index_row
, source_link = source_link
)
index_domain = self.get_index_domain_from_name(domain_name)
cost = None
price = None
active = None
try:
did_attempt = False
"""
if domain_name == self.NAME_DOMAIN_CARD_MARKET:
if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]:
price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(
browser = browser
, url = source_link
, eur_to_gbp_rate = self.eur_to_gbp_rate
)
price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs)
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(
browser = browser
, url = source_link
, eur_to_gbp_rate = self.eur_to_gbp_rate
)
el"""
if domain_name == self.NAME_DOMAIN_CHAOS_CARDS:
did_attempt = True
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(
browser = browser
, url = source_link
)
elif domain_name == self.NAME_DOMAIN_GAMES_LORE:
did_attempt = True
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(
browser = browser
, url = source_link
)
elif domain_name == self.NAME_DOMAIN_MAGIC_MADHOUSE:
did_attempt = True
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(
browser = browser
, url = source_link
)
""" unverified
elif domain_name == self.NAME_DOMAIN_NEW_REALITIES_GAMING:
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_newrealitiesgaming(
browser = browser
, url = source_link
)
"""
if (
did_attempt
and (
(
cost is None
and price is None
)
or active is None
)
):
print(f" Error: Could not find cost on page")
except Exception as e:
print(f" Error processing {source_link}: {e}")
return self.make_result_data_json(
index_row = index_row
, cost = cost
, price = price
, active = active
)
@classmethod
def make_result_data_json(cls, index_row, cost = None, price = None, active = None):
return {
cls.INDEX_ROW_FLAG: index_row
, cls.COST_FLAG: cost
, cls.PRICE_FLAG: price
, cls.ACTIVE_FLAG: active
}
def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs):
if not price_quantity_pairs:
return None
max_quantity = 0
price = None
# First pass: look for quantity >= 8
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
max_quantity = max(max_quantity, quantity)
if quantity >= 8 and eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
return price
# Second pass: use max quantity if no quantity >= 8
print("Offer with quantity >= 8 not found")
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
if (max_quantity <= 2 or quantity == max_quantity) and eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
return price
return price
def recalculate_workbook(self, workbook_path):
"""Run LibreOffice headless to recalculate all formula cells after saving new data.
Returns True if recalculation succeeded, False otherwise."""
workbook_dir = os.path.dirname(workbook_path)
workbook_name = os.path.basename(workbook_path)
lock_file = os.path.join(workbook_dir, f'.~lock.{workbook_name}#')
if os.path.exists(lock_file):
print(f"Warning: '{workbook_name}' is open in LibreOffice — skipping recalculation to avoid conflict.")
return False
print("Recalculating workbook formulas with LibreOffice headless...")
try:
result = subprocess.run(
['libreoffice', '--headless', '--norestore', '--convert-to', 'xlsx', '--outdir', workbook_dir, workbook_path]
, capture_output=True, text=True, timeout=120
)
if result.returncode == 0:
print("Recalculation complete.")
return True
print(f"Recalculation failed (exit {result.returncode}): {result.stderr.strip()}")
except subprocess.TimeoutExpired:
print("LibreOffice recalculation timed out.")
except Exception as e:
print(f"Error during recalculation: {e}")
return False
async def run_continuous(self):
"""Run fetch_all in an infinite loop, sleeping CYCLE_INTERVAL_HOURS between cycles."""
while True:
print(f"\n{'='*60}")
print(f"Cycle started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
try:
await self.fetch_all()
except Exception as e:
import traceback
print(f"Unhandled cycle error: {e}")
traceback.print_exc()
next_run = datetime.now() + timedelta(hours=CYCLE_INTERVAL_HOURS)
print(f"Next cycle: {next_run.strftime('%Y-%m-%d %H:%M:%S')} (in {CYCLE_INTERVAL_HOURS}h)")
await asyncio.sleep(CYCLE_INTERVAL_HOURS * 3600)
@staticmethod
def log_processing_new_row(index_row, source_link):
print(f"\n{'='*60}")
print(f"Processing row {index_row}: {source_link}")
print(f"{'='*60}")
async def main():
sender_email = os.environ.get('EMAIL_SENDER')
sender_password = os.environ.get('EMAIL_PASSWORD')
recipient_email = os.environ.get('EMAIL_RECIPIENT')
smtp_host = os.environ.get('SMTP_HOST', 'smtp.gmail.com')
smtp_port = int(os.environ.get('SMTP_PORT', '587'))
email_notifier = None
if sender_email and sender_password and recipient_email:
email_notifier = Email_Notifier(
sender_email=sender_email
, sender_password=sender_password
, recipient_email=recipient_email
, smtp_host=smtp_host
, smtp_port=smtp_port
)
print(f"Email notifications enabled: {sender_email}{recipient_email}")
else:
print(
"Email notifications disabled.\n"
"Set EMAIL_SENDER, EMAIL_PASSWORD, and EMAIL_RECIPIENT environment variables to enable.\n"
"For Gmail, use an App Password (https://myaccount.google.com/apppasswords)."
)
cost_fetcher = Cost_Fetcher(email_notifier=email_notifier)
await cost_fetcher.run_continuous()
if __name__ == "__main__":
asyncio.run(main())