import pandas as pd from openpyxl import load_workbook, Workbook from openpyxl.worksheet.worksheet import Worksheet import requests import re import time import random from playwright.sync_api import sync_playwright, Browser, Page from playwright.async_api import async_playwright import asyncio from aioconsole import ainput from collections import defaultdict from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import subprocess import os CYCLE_INTERVAL_HOURS = 4 class Email_Notifier: def __init__(self, sender_email, sender_password, recipient_email, smtp_host='smtp.gmail.com', smtp_port=587): self.sender_email = sender_email self.sender_password = sender_password self.recipient_email = recipient_email self.smtp_host = smtp_host self.smtp_port = smtp_port def send_email(self, subject, body_html): msg = MIMEMultipart('alternative') msg['From'] = self.sender_email msg['To'] = self.recipient_email msg['Subject'] = subject msg.attach(MIMEText(body_html, 'html')) try: if self.smtp_port == 465: with smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) as server: server.login(self.sender_email, self.sender_password) server.send_message(msg) else: with smtplib.SMTP(self.smtp_host, self.smtp_port) as server: server.starttls() server.login(self.sender_email, self.sender_password) server.send_message(msg) print(f"Email sent: {subject}") return True except Exception as e: print(f"Error sending email: {e}") return False class Profitability_Monitor: MTG_SET_WORKSHEET_NAME = 'MTG Set' MTG_SET_DATA_START_ROW = 3 # Row 1 is a merged group header, row 2 has column names # MTG Set sheet column indices (1-based) COL_MTG_SET_NAME = 2 COL_PLAY_PROFIT = 45 COL_PLAY_MARGIN = 46 COL_COLLECTOR_PROFIT = 53 COL_COLLECTOR_MARGIN = 54 COL_PLAY_SINGLES_PROFIT = 59 COL_PLAY_SINGLES_MARGIN = 60 COL_COLLECTOR_SINGLES_PROFIT = 73 COL_COLLECTOR_SINGLES_MARGIN = 74 # Product sheet column indices (1-based) for precons COL_PRODUCT_NAME = 2 COL_PRODUCT_IS_PRECON = 7 COL_PRODUCT_MIN_COST = 8 COL_PRODUCT_PROFIT = 10 PROFIT_CHECKS = [ {'profit_col': COL_PLAY_PROFIT, 'margin_col': COL_PLAY_MARGIN, 'action_buy': 'Buy Play Booster', 'action_no_buy': 'DO NOT Buy Play Booster'}, {'profit_col': COL_COLLECTOR_PROFIT, 'margin_col': COL_COLLECTOR_MARGIN, 'action_buy': 'Buy Collector Booster', 'action_no_buy': 'DO NOT Buy Collector Booster'}, {'profit_col': COL_PLAY_SINGLES_PROFIT, 'margin_col': COL_PLAY_SINGLES_MARGIN, 'action_buy': 'Split Play Booster', 'action_no_buy': 'DO NOT Split Play Booster'}, {'profit_col': COL_COLLECTOR_SINGLES_PROFIT, 'margin_col': COL_COLLECTOR_SINGLES_MARGIN, 'action_buy': 'Split Collector Booster','action_no_buy': 'DO NOT Split Collector Booster'}, ] def read_states(self, workbook_path): """Load the workbook with data_only=True to read formula-calculated profit values.""" wb = load_workbook(workbook_path, data_only=True) mtg_set_states = {} precon_states = {} mtg_sheet = wb[self.MTG_SET_WORKSHEET_NAME] for row in range(self.MTG_SET_DATA_START_ROW, mtg_sheet.max_row + 1): set_name = mtg_sheet.cell(row, self.COL_MTG_SET_NAME).value if not set_name: continue set_data = {} for check in self.PROFIT_CHECKS: profit = mtg_sheet.cell(row, check['profit_col']).value margin = mtg_sheet.cell(row, check['margin_col']).value set_data[check['profit_col']] = { 'profit': profit, 'margin': margin, 'is_profitable': isinstance(profit, (int, float)) and profit > 0, } mtg_set_states[set_name] = set_data prod_sheet = wb['Product'] for row in range(2, prod_sheet.max_row + 1): is_precon = prod_sheet.cell(row, self.COL_PRODUCT_IS_PRECON).value if not is_precon: continue name = prod_sheet.cell(row, self.COL_PRODUCT_NAME).value profit = prod_sheet.cell(row, self.COL_PRODUCT_PROFIT).value min_cost = prod_sheet.cell(row, self.COL_PRODUCT_MIN_COST).value if not name: continue margin = (profit / min_cost) if (isinstance(profit, (int, float)) and isinstance(min_cost, (int, float)) and min_cost != 0) else None precon_states[name] = { 'profit': profit, 'margin': margin, 'is_profitable': isinstance(profit, (int, float)) and profit > 0, } wb.close() return {'mtg_set': mtg_set_states, 'precon': precon_states} def find_changes(self, old_states, new_states): """Compare old and new profit states; return list of alert dicts for any crossings of the 0 threshold.""" alerts = [] for check in self.PROFIT_CHECKS: col = check['profit_col'] for set_name, new_set_data in new_states['mtg_set'].items(): new_entry = new_set_data.get(col, {}) old_entry = old_states.get('mtg_set', {}).get(set_name, {}).get(col, {}) old_profitable = old_entry.get('is_profitable', None) new_profitable = new_entry.get('is_profitable', False) if old_profitable is None or old_profitable == new_profitable: continue action = check['action_buy'] if new_profitable else check['action_no_buy'] margin = new_entry.get('margin') margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A" alerts.append({'name': set_name, 'action': action, 'margin': margin_str}) for product_name, new_entry in new_states['precon'].items(): old_entry = old_states.get('precon', {}).get(product_name, {}) old_profitable = old_entry.get('is_profitable', None) new_profitable = new_entry.get('is_profitable', False) if old_profitable is None or old_profitable == new_profitable: continue action = 'Buy Precon' if new_profitable else 'DO NOT Buy Precon' margin = new_entry.get('margin') margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A" alerts.append({'name': product_name, 'action': action, 'margin': margin_str}) return alerts def format_email_html(self, alerts): rows = ''.join( f"{a['name']}" f"{a['action']}" f"{a['margin']}" for a in alerts ) return ( "" "

TCG Profitability Alert

" "" "" "" "" "" "" f"{rows}" "
MTG SetActionMargin
" f"

Generated {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

" "" ) class Product_Scraper: domain: str page: Page def __init__(self, domain): print("Setting up browser automation") self.domain = domain @staticmethod def parse_cost(cost_text): if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) try: return float(cost_clean) / 100 except ValueError: return None @classmethod def parse_cost_from_pennies(cls, cost_text): if not cost_text: return None cost_clean = cls.parse_cost(cost_text = cost_text) if cost_clean is not None: cost_clean = cost_clean / 100 return cost_clean @classmethod def parse_cost_chaoscards(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_cardmarket(cls, cost_text): """Convert '141,30 €' format to float in EUR""" if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) cost_clean = cost_clean.replace(',', '.') try: return float(cost_clean) except ValueError: return None @classmethod def parse_cost_gameslore(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_magicmadhouse(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_newrealitiesgaming(cls, cost_text): return cls.parse_cost(cost_text = cost_text) async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): print(f" Loading page...") self.page = await browser.new_page() cost = None active = None try: await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(random.uniform(20, 25)) element = self.page.locator(selector = page_load_element_selector) page_title = await self.page.title() print(f" Page title: {page_title}") element = self.page.locator(selector = cost_selector) text = await element.text_content() print(f" Text: '{text}'") cost = text active = None if active_selector is None: active = (cost is not None) else: try: elements = await self.page.query_selector_all(selector = active_selector) print(f'# active elements: {len(elements)}') if len(elements) == 0: active = True else: text = await elements[0].text_content() text = text.strip() print(f" Text: '{text}'") active = (invalid_active_statuses is None or text not in invalid_active_statuses) except Exception as e: print(f" Selector failed: {e}") if cost is None or active is None: print(f" ✗ No cost found") # await ainput("Press Enter to continue to next URL...") print(f"Cost: {cost}, Active: {active}") except Exception as e: print(f" Error: {e}") # await ainput("Press Enter to continue to next URL...") return None, None finally: await self.page.close() return cost, active async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate): page_load_element_selector = "body > main.container > div.page-title-container" cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = None , invalid_active_statuses = [] ) cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text) if cost is not None: item_shipping_cost_in = 0 if cost < 10: item_shipping_cost_in = 2 elif cost < 100: item_shipping_cost_in = 8 else: item_shipping_cost_in = 20 cost = cost * eur_to_gbp_rate + item_shipping_cost_in active = (cost is not None) return cost, active async def scrape_cost_and_active_playwright_chaoscards(self, browser, url): cost_selector = '.price_inc > span:nth-child(2)' active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = cost_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["Out of stock", "Coming soon"] ) cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text) return cost, active async def scrape_cost_and_active_playwright_gameslore(self, browser, url): cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' active_selector = '.stock > span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = cost_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["OUT OF STOCK"] ) cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text) return cost, active async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url): page_load_element_selector = '.productView-title' cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' active_selector = '.alertBox.alertBox--error' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = [] ) cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) return cost, active async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url): button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button' page_load_element_selector = button_selector cost_selector = f'{button_selector} span:nth-child(2)' active_selector = f'{button_selector} span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ['Out of stock'] ) cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) return cost, active async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate): offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' quantity_selector = 'div.amount-container > span:nth-child(1)' print(f" Loading page...") self.page = await browser.new_page() try: await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(random.uniform(20, 25)) page_title = await self.page.title() print(f" Page title: {page_title}") price_quantity_pairs = [] try: offer_containers = await self.page.query_selector_all(offer_container_selector) print(f" Offer container selector: Found {len(offer_containers)} elements") for offer_container in offer_containers: price_element = await offer_container.query_selector(price_selector) price_text = await price_element.text_content() if '€' in price_text and re.search(r'\d', price_text): print(f" ✓ Found price: {price_text}") else: price_text = None quantity_element = await offer_container.query_selector(quantity_selector) quantity_text = await quantity_element.text_content() if price_text is None or quantity_text is None: continue price_quantity_pairs.append({ 'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate , 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text) }) except Exception as e: print(f" Price selector failed: {e}") # await ainput("Press enter to continue to next URL...") return [] finally: await self.page.close() return price_quantity_pairs class TCG_Sole_Trader_Workbook_Container: NAME_COLUMN_ACTIVE: str = 'Active' NAME_COLUMN_INDEX_ROW: str = 'Index Row' NAME_COLUMN_LINK: str = 'Link' NAME_COLUMN_PRODUCT_ID: str = 'Product Id' NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster' NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box' NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon' NAME_COLUMN_SOURCE_NAME: str = 'Source Name' NAME_COLUMN_UNIT_COST: str = 'Cost' NAME_COLUMN_UNIT_PRICE: str = 'Price' PRODUCT_WORKSHEET_NAME = 'Product' SOURCING_WORKSHEET_NAME = 'Sourcing' WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' index_column_active_sourcing: int index_column_is_booster_product: int index_column_is_booster_box_product: int index_column_is_precon_product: int index_column_link_sourcing: int index_column_name_sourcing: int index_column_product_id_product: int index_column_product_id_sourcing: int index_column_unit_cost_sourcing: int index_column_unit_price_sourcing: int index_row_header_product: int index_row_header_sourcing: int product_sheet: Worksheet sourcing_sheet: Worksheet workbook: Workbook def __init__(self): print("Loading workbook...") self.workbook = load_workbook(self.WORKBOOK_NAME) if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found") return if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found") return self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME] self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME] sourcing_table_found = False for row in range(1, self.sourcing_sheet.max_row + 1): if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found or not self.index_row_header_sourcing: for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found: print("Error: Could not find table 'tbl_Sourcing'") return product_table_found = False for row in range(1, self.product_sheet.max_row + 1): if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): self.index_row_header_product = row product_table_found = True break if not product_table_found: print("Error: Could not find table 'tbl_Product'") return for index_column in range(1, self.sourcing_sheet.max_column + 1): header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() if 'Source Name' == header: self.index_column_name_sourcing = index_column elif 'Source Link' == header: self.index_column_link_sourcing = index_column elif 'Source Unit Cost' == header: self.index_column_unit_cost_sourcing = index_column elif 'Sale Price' == header: self.index_column_unit_price_sourcing = index_column elif 'Active' == header: self.index_column_active_sourcing = index_column elif 'Product Id' == header: self.index_column_product_id_sourcing = index_column for index_column in range(1, self.product_sheet.max_column + 1): header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() if 'Is Booster Box' == header: self.index_column_is_booster_box_product = index_column elif 'Is Booster' == header: self.index_column_is_booster_product = index_column elif 'Is Precon' == header: self.index_column_is_precon_product = index_column elif 'Product Id' == header: self.index_column_product_id_product = index_column print(f"Sourcing max row: {self.sourcing_sheet.max_row}") print(f"Sourcing header row: {self.index_row_header_sourcing}") print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") print(f"Product max row: {self.product_sheet.max_row}") print(f"Product header row: {self.index_row_header_product}") print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") if not all([ self.index_column_name_sourcing , self.index_column_link_sourcing , self.index_column_unit_cost_sourcing , self.index_column_unit_price_sourcing , self.index_column_product_id_sourcing , self.index_column_active_sourcing , self.index_column_product_id_product , self.index_column_is_booster_product , self.index_column_is_booster_box_product , self.index_column_is_precon_product ]): print("Error: Could not find required columns") return @classmethod def create_product_source_df(cls): return pd.DataFrame(columns = [ cls.NAME_COLUMN_INDEX_ROW , cls.NAME_COLUMN_PRODUCT_ID , cls.NAME_COLUMN_SOURCE_NAME , cls.NAME_COLUMN_LINK , cls.NAME_COLUMN_PRODUCT_IS_BOOSTER , cls.NAME_COLUMN_UNIT_COST , cls.NAME_COLUMN_UNIT_PRICE , cls.NAME_COLUMN_ACTIVE ]) def get_sourcing_entries(self): product_sources = self.create_product_source_df() try: for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value if not source_name or not source_link: continue print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") product_is_booster = False for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value if product_id == source_product_id: product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() product_is_booster = (product_is_booster_text == "TRUE") break print(f"product is booster: {product_is_booster}") product_sources.loc[len(product_sources)] = [ index_row , source_product_id , source_name , source_link , product_is_booster , None # cost , None # price , None # active ] except Exception as e: print(f"Error: {e}") product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME) return product_sources def clear_row_sourcing_sheet(self, index_row): self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None): if unit_cost is not None: self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost if unit_price is not None: self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price if active is not None: self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" def save_workbook(self): print(f"\n{'='*60}") print(f"Saving workbook...") self.workbook.save(self.WORKBOOK_NAME) class Cost_Fetcher: ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On' ACTIVE_FLAG: str = 'Active' COST_FLAG: str = 'Cost' DATA_FLAG: str = 'Data' ERROR_FLAG: str = 'Error' INDEX_DOMAIN_FLAG: str = 'Index Domain' INDEX_ROW_FLAG: str = 'Index Row' NAME_DOMAIN_CARD_MARKET: str = 'Card Market' NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' NAME_DOMAIN_NEW_REALITIES_GAMING: str = 'New Realities Gaming' NAME_FLAG: str = 'Name' PRICE_FLAG: str = 'Price' SUCCESS_FLAG: str = 'Success' URL_FLAG: str = 'Url' domain_names: list[str] eur_to_gbp_rate: float product_scrapers: list[Product_Scraper] product_sources: pd.DataFrame workbook_container: TCG_Sole_Trader_Workbook_Container def __init__(self, email_notifier=None): self.email_notifier = email_notifier self.profitability_monitor = Profitability_Monitor() self.workbook_save_lock = asyncio.Lock() self.domain_names = [ # self.NAME_DOMAIN_CARD_MARKET self.NAME_DOMAIN_CHAOS_CARDS , self.NAME_DOMAIN_GAMES_LORE , self.NAME_DOMAIN_MAGIC_MADHOUSE # , self.NAME_DOMAIN_NEW_REALITIES_GAMING ] self.domain_details = { self.NAME_DOMAIN_CHAOS_CARDS: { self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS) , self.ACCESSED_LAST_ON_FLAG: 0 } , self.NAME_DOMAIN_GAMES_LORE: { self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE) , self.ACCESSED_LAST_ON_FLAG: 0 } , self.NAME_DOMAIN_MAGIC_MADHOUSE: { self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE) , self.ACCESSED_LAST_ON_FLAG: 0 } } """ self.NAME_DOMAIN_CARD_MARKET: { self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) , self.ACCESSED_LAST_ON_FLAG: 0 } """ """ , self.NAME_DOMAIN_NEW_REALITIES_GAMING: { self.NAME_FLAG: self.NAME_DOMAIN_NEW_REALITIES_GAMING , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_NEW_REALITIES_GAMING) , self.ACCESSED_LAST_ON_FLAG: 0 } """ product_scrapers = [] for index_domain in range(len(self.domain_names)): domain = self.domain_names[index_domain] product_scraper = Product_Scraper(domain) product_scrapers.append(product_scraper) self.product_scrapers = product_scrapers self.workbook_container = None self.eur_to_gbp_rate = 0.85 def get_index_domain_from_name(self, domain_name): for index_domain in range(len(self.domain_names)): if (self.domain_names[index_domain] == domain_name): return index_domain raise ValueError(f'Domain does not exist: {domain_name}') def get_eur_to_gbp_rate(self): try: response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) data = response.json() self.eur_to_gbp_rate = data['rates']['GBP'] except Exception as e: print(f"Error fetching exchange rate: {e}") print("Using fallback rate: 0.85") self.eur_to_gbp_rate = 0.85 async def fetch_all(self): try: if self.email_notifier: sent = self.email_notifier.send_email( subject=f"TCG Profitability Scanner Boot - {datetime.now().strftime('%Y-%m-%d %H:%M')}" , body_html="

Booted

" ) if sent: print("Sent boot test email") else: print("Error sending boot test email") # Reload workbook and exchange rate fresh each cycle self.get_eur_to_gbp_rate() self.workbook_container = TCG_Sole_Trader_Workbook_Container() self.product_sources = self.workbook_container.get_sourcing_entries() workbook_path = os.path.abspath(TCG_Sole_Trader_Workbook_Container.WORKBOOK_NAME) # Snapshot profitability before any scraping print("Reading current profitability states...") old_profit_states = self.profitability_monitor.read_states(workbook_path) # Group product sources by domain domain_groups = {domain: [] for domain in self.domain_names} for _, product_source in self.product_sources.iterrows(): source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] if source_name in domain_groups: domain_groups[source_name].append(product_source) # Create one browser per domain and process all URLs; saves workbook after each item processed_count = 0 updated_count = 0 async with async_playwright() as p: domain_tasks = [] for domain_name in self.domain_names: if domain_groups[domain_name]: browser = await p.chromium.launch(headless=False) task = self.process_domain_urls(browser, domain_name, domain_groups[domain_name]) domain_tasks.append(task) all_domain_results = await asyncio.gather(*domain_tasks) for domain_results in all_domain_results: for result in domain_results: processed_count += 1 if result[self.ACTIVE_FLAG]: updated_count += 1 print(f"\nComplete! Processed: {processed_count} entries, Updated: {updated_count} costs") # Recalculate spreadsheet formulas and check for profitability changes if self.email_notifier: recalculated = self.recalculate_workbook(workbook_path) if recalculated: new_profit_states = self.profitability_monitor.read_states(workbook_path) alerts = self.profitability_monitor.find_changes(old_profit_states, new_profit_states) if alerts: html = self.profitability_monitor.format_email_html(alerts) self.email_notifier.send_email( subject=f"TCG Profitability Alert - {datetime.now().strftime('%Y-%m-%d %H:%M')}" , body_html=html ) print(f"Sent {len(alerts)} profitability alert(s).") else: print("No profitability changes detected.") except Exception as e: import traceback print(f"Error in fetch_all: {e}") traceback.print_exc() async def process_domain_urls(self, browser, domain_name, product_sources): """Process all URLs for a single domain sequentially with rate limiting. Saves the workbook immediately after each item is cleared (before scrape) and again after each result is written (after scrape).""" results = [] last_access_time = 0 try: for product_source in product_sources: # Rate limiting: wait between requests to the same domain time_since_last = time.time() - last_access_time if time_since_last < 45: wait_time = 45 - time_since_last + random.uniform(0, 5) print(f" [{domain_name}] Waiting {wait_time:.1f}s before next request...") await asyncio.sleep(wait_time) index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] # Clear stale data and persist before scraping async with self.workbook_save_lock: self.workbook_container.clear_row_sourcing_sheet(index_row) self.workbook_container.save_workbook() result = await self.fetch_single_with_browser(browser, domain_name, product_source) # Write fresh data and persist immediately async with self.workbook_save_lock: if result[self.ACTIVE_FLAG]: self.workbook_container.update_row_sourcing_sheet( index_row=result[self.INDEX_ROW_FLAG] , unit_cost=result[self.COST_FLAG] , unit_price=result[self.PRICE_FLAG] , active=result[self.ACTIVE_FLAG] ) self.workbook_container.save_workbook() results.append(result) last_access_time = time.time() finally: await browser.close() return results async def fetch_single_with_browser(self, browser, domain_name, product_source): """Fetch a single URL using the provided browser""" index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] Cost_Fetcher.log_processing_new_row( index_row = index_row , source_link = source_link ) index_domain = self.get_index_domain_from_name(domain_name) cost = None price = None active = None try: did_attempt = False """ if domain_name == self.NAME_DOMAIN_CARD_MARKET: if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket( browser = browser , url = source_link , eur_to_gbp_rate = self.eur_to_gbp_rate ) price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs) cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket( browser = browser , url = source_link , eur_to_gbp_rate = self.eur_to_gbp_rate ) el""" if domain_name == self.NAME_DOMAIN_CHAOS_CARDS: did_attempt = True cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards( browser = browser , url = source_link ) elif domain_name == self.NAME_DOMAIN_GAMES_LORE: did_attempt = True cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore( browser = browser , url = source_link ) elif domain_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: did_attempt = True cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse( browser = browser , url = source_link ) """ unverified elif domain_name == self.NAME_DOMAIN_NEW_REALITIES_GAMING: cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_newrealitiesgaming( browser = browser , url = source_link ) """ if ( did_attempt and ( ( cost is None and price is None ) or active is None ) ): print(f" Error: Could not find cost on page") except Exception as e: print(f" Error processing {source_link}: {e}") return self.make_result_data_json( index_row = index_row , cost = cost , price = price , active = active ) @classmethod def make_result_data_json(cls, index_row, cost = None, price = None, active = None): return { cls.INDEX_ROW_FLAG: index_row , cls.COST_FLAG: cost , cls.PRICE_FLAG: price , cls.ACTIVE_FLAG: active } def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs): if not price_quantity_pairs: return None max_quantity = 0 price = None # First pass: look for quantity >= 8 for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") max_quantity = max(max_quantity, quantity) if quantity >= 8 and eur_price: price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{price:.2f}") return price # Second pass: use max quantity if no quantity >= 8 print("Offer with quantity >= 8 not found") for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] if (max_quantity <= 2 or quantity == max_quantity) and eur_price: price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{price:.2f}") return price return price def recalculate_workbook(self, workbook_path): """Run LibreOffice headless to recalculate all formula cells after saving new data. Returns True if recalculation succeeded, False otherwise.""" workbook_dir = os.path.dirname(workbook_path) workbook_name = os.path.basename(workbook_path) lock_file = os.path.join(workbook_dir, f'.~lock.{workbook_name}#') if os.path.exists(lock_file): print(f"Warning: '{workbook_name}' is open in LibreOffice — skipping recalculation to avoid conflict.") return False print("Recalculating workbook formulas with LibreOffice headless...") try: result = subprocess.run( ['libreoffice', '--headless', '--norestore', '--convert-to', 'xlsx', '--outdir', workbook_dir, workbook_path] , capture_output=True, text=True, timeout=120 ) if result.returncode == 0: print("Recalculation complete.") return True print(f"Recalculation failed (exit {result.returncode}): {result.stderr.strip()}") except subprocess.TimeoutExpired: print("LibreOffice recalculation timed out.") except Exception as e: print(f"Error during recalculation: {e}") return False async def run_continuous(self): """Run fetch_all in an infinite loop, sleeping CYCLE_INTERVAL_HOURS between cycles.""" while True: print(f"\n{'='*60}") print(f"Cycle started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") try: await self.fetch_all() except Exception as e: import traceback print(f"Unhandled cycle error: {e}") traceback.print_exc() next_run = datetime.now() + timedelta(hours=CYCLE_INTERVAL_HOURS) print(f"Next cycle: {next_run.strftime('%Y-%m-%d %H:%M:%S')} (in {CYCLE_INTERVAL_HOURS}h)") await asyncio.sleep(CYCLE_INTERVAL_HOURS * 3600) @staticmethod def log_processing_new_row(index_row, source_link): print(f"\n{'='*60}") print(f"Processing row {index_row}: {source_link}") print(f"{'='*60}") async def main(): sender_email = os.environ.get('EMAIL_SENDER') sender_password = os.environ.get('EMAIL_PASSWORD') recipient_email = os.environ.get('EMAIL_RECIPIENT') smtp_host = os.environ.get('SMTP_HOST', 'smtp.gmail.com') smtp_port = int(os.environ.get('SMTP_PORT', '587')) email_notifier = None if sender_email and sender_password and recipient_email: email_notifier = Email_Notifier( sender_email=sender_email , sender_password=sender_password , recipient_email=recipient_email , smtp_host=smtp_host , smtp_port=smtp_port ) print(f"Email notifications enabled: {sender_email} → {recipient_email}") else: print( "Email notifications disabled.\n" "Set EMAIL_SENDER, EMAIL_PASSWORD, and EMAIL_RECIPIENT environment variables to enable.\n" "For Gmail, use an App Password (https://myaccount.google.com/apppasswords)." ) cost_fetcher = Cost_Fetcher(email_notifier=email_notifier) await cost_fetcher.run_continuous() if __name__ == "__main__": asyncio.run(main())