diff --git a/mtg_commander_life_tracker.html b/mtg_commander_life_tracker.html deleted file mode 100644 index 18c813d..0000000 --- a/mtg_commander_life_tracker.html +++ /dev/null @@ -1,764 +0,0 @@ - - - - - - Commander Tracker - - - - -
-
-

Commander

-
Life & Damage Tracker
-
- -
-
- - - - -
-
-
Player Names
-
-
-
- -
-
- - -
- -
Game Saved
- - - - \ No newline at end of file diff --git a/product_scraping/TCG Sole Trader Copy (copy).xlsx b/product_scraping/TCG Sole Trader Copy (copy).xlsx new file mode 100644 index 0000000..0c9dbd6 Binary files /dev/null and b/product_scraping/TCG Sole Trader Copy (copy).xlsx differ diff --git a/product_scraping/TCG Sole Trader Copy - dead.xlsx b/product_scraping/TCG Sole Trader Copy - dead.xlsx deleted file mode 100644 index 1401153..0000000 Binary files a/product_scraping/TCG Sole Trader Copy - dead.xlsx and /dev/null differ diff --git a/product_scraping/TCG Sole Trader Copy.xlsx b/product_scraping/TCG Sole Trader Copy.xlsx index 9604431..81ea1db 100644 Binary files a/product_scraping/TCG Sole Trader Copy.xlsx and b/product_scraping/TCG Sole Trader Copy.xlsx differ diff --git a/product_scraping/__init__.py b/product_scraping/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/product_scraping/fix_excel_for_libreoffice.py b/product_scraping/fix_excel_for_libreoffice.py new file mode 100644 index 0000000..eb12ef5 --- /dev/null +++ b/product_scraping/fix_excel_for_libreoffice.py @@ -0,0 +1,69 @@ +""" +fix_excel_for_libreoffice.py +---------------------------- +Run this once on an xlsx downloaded from Excel Online before opening it in +LibreOffice Calc. It removes the _xlfn. / _xlws. prefixes that Excel Online +injects into formulas — those prefixes make LibreOffice show #NAME? errors. + +Usage: + python3 fix_excel_for_libreoffice.py "TCG Sole Trader Copy.xlsx" + +The original file is left untouched (a .bak copy is kept). +""" + +import sys +import os +import re +import zipfile +import shutil + +FORMULA_PREFIXES = re.compile(r'_xlfn\.|_xlws\.') + + +def fix_xlsx(path: str) -> None: + if not os.path.exists(path): + print(f"File not found: {path}") + sys.exit(1) + + bak_path = path + '.bak' + tmp_path = path + '.tmp' + + shutil.copy2(path, bak_path) + print(f"Backup saved → {bak_path}") + + changes = 0 + + with zipfile.ZipFile(path, 'r') as zin, \ + zipfile.ZipFile(tmp_path, 'w', zipfile.ZIP_DEFLATED) as zout: + + for item in zin.infolist(): + data = zin.read(item.filename) + + # Only patch sheet XML — leave everything else byte-for-byte identical + if item.filename.startswith('xl/worksheets/sheet') and item.filename.endswith('.xml'): + text = data.decode('utf-8') + new_text, n = FORMULA_PREFIXES.subn('', text) + if n: + print(f" {item.filename}: removed {n} prefix(es)") + changes += n + data = new_text.encode('utf-8') + + zout.writestr(item, data) + + os.replace(tmp_path, path) + + if changes: + print(f"\nDone — {changes} prefix(es) removed. Open {path} in LibreOffice Calc.") + else: + print("\nNo _xlfn./_xlws. prefixes found — file was already clean.") + os.remove(bak_path) + + +if __name__ == '__main__': + if len(sys.argv) < 2: + # Default to the workbook the scraper uses + target = 'TCG Sole Trader Copy.xlsx' + else: + target = sys.argv[1] + + fix_xlsx(target) diff --git a/product_scraping/mtg_booster_expected_value_fetcher_botbox.py b/product_scraping/mtg_booster_expected_value_fetcher_botbox.py new file mode 100644 index 0000000..63ecb42 --- /dev/null +++ b/product_scraping/mtg_booster_expected_value_fetcher_botbox.py @@ -0,0 +1,213 @@ +import pandas as pd +from openpyxl import load_workbook +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.options import Options +import re +import time +import random + +def setup_driver(headless=True): + chrome_options = Options() + if headless: + chrome_options.add_argument('--headless') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') + chrome_options.add_argument('--window-size=1920,1080') + + try: + driver = webdriver.Chrome(options=chrome_options) + return driver + except Exception as e: + print(f"Error setting up Chrome driver: {e}") + print("Make sure Chrome and chromedriver are installed") + return None + +def parse_price_value(text): + if not text: + return None + # Remove currency symbols and extract numbers + cleaned = re.sub(r'[^\d,.\-]', '', text) + # Replace comma with full-stop for decimal + cleaned = cleaned.replace(',', '.') + try: + return float(cleaned) + except ValueError: + return None + +def scrape_mtg_stocks_values(driver, url): + cost = None + expected_value = None + try: + print(f" Loading page...") + driver.get(url) + + time.sleep(random.uniform(5, 10)) + + cost_element_id = 'cost-value' + expected_value_element_id = 'ev-value' + + cost_element = driver.find_element(By.ID, cost_element_id) + cost_text = cost_element.text.strip() + + expected_value_element = driver.find_element(By.ID, expected_value_element_id) + expected_value_text = expected_value_element.text.strip() + + print(f" Cost: '{cost_text}'") + print(f" Expected Value: '{expected_value_text}'") + + cost = parse_price_value(cost_text) + expected_value = parse_price_value(expected_value_text) + + return { + 'expected_value': expected_value, + 'market_value': cost + } + + except Exception as ex: + print(f" ✗ Failed:\nEV: {expected_value}\nCost: {cost}") + return { + 'expected_value': expected_value, + 'market_value': cost + } + +def main(): + workbook_name = 'TCG Sole Trader Copy.xlsx' + sheet_name = 'MTG Set' + + print("Loading workbook...") + wb = load_workbook(workbook_name) + + if sheet_name not in wb.sheetnames: + print(f"Error: Sheet '{sheet_name}' not found") + return + + sheet = wb[sheet_name] + table_found = False + start_row = None + header_row = None + + + for row in range(2, max(50, sheet.max_row + 1)): + cell_value = str(sheet.cell(row, 1).value) + + for col in range(1, max(10, sheet.max_column + 1)): + cell_value = str(sheet.cell(row, col).value) + if 'EV Play BotBox Link' in cell_value: + header_row = row + start_row = row + 1 + table_found = True + break + if table_found: + break + + if not table_found: + print("Error: Could not find 'EV Play BotBox Link' column") + return + + print(f"Found table header at row {header_row}") + print(f"Starting from row {start_row}") + + play_ev_link_col = None + collector_ev_link_col = None + play_expected_value_col = None + play_market_value_col = None + collector_expected_value_col = None + collector_market_value_col = None + + for col in range(1, sheet.max_column + 1): + header = str(sheet.cell(header_row, col).value).strip() + if 'EV Play BotBox Link' in header: + play_ev_link_col = col + elif 'EV Collector BotBox Link' in header: + collector_ev_link_col = col + elif 'Play Booster Expected Market Value BotBox' in header: + play_expected_value_col = col + elif 'Play Booster Sealed Market Value BotBox' in header: + play_market_value_col = col + elif 'Collector Booster Expected Market Value BotBox' in header: + collector_expected_value_col = col + elif 'Collector Booster Sealed Market Value BotBox' in header: + collector_market_value_col = col + + print(f"Columns - Play EV Link: {play_ev_link_col}, Collector EV Link: {collector_ev_link_col}, Play Expected Value: {play_expected_value_col}, Play Market Value: {play_market_value_col}, Collector Expected Value: {collector_expected_value_col}, Collector Market Value: {collector_market_value_col}") + + if not all([play_ev_link_col, collector_ev_link_col, play_expected_value_col, play_market_value_col, collector_expected_value_col, collector_market_value_col]): + print("Error: Could not find all required columns") + print(f" EV Play BotBox Link: {'Found' if play_ev_link_col else 'NOT FOUND'}") + print(f" EV Collector BotBox Link: {'Found' if collector_ev_link_col else 'NOT FOUND'}") + print(f" Play Booster Expected Market Value BotBox: {'Found' if play_expected_value_col else 'NOT FOUND'}") + print(f" Play Booster Sealed Market Value BotBox: {'Found' if play_market_value_col else 'NOT FOUND'}") + print(f" Collector Booster Expected Market Value BotBox: {'Found' if collector_expected_value_col else 'NOT FOUND'}") + print(f" Collector Booster Sealed Market Value BotBox : {'Found' if collector_market_value_col else 'NOT FOUND'}") + return + + print("Setting up browser automation...") + driver = setup_driver(headless=False) + if not driver: + return + + try: + processed_count = 0 + play_updated_count = 0 + collector_updated_count = 0 + play_cleared_count = 0 + collector_cleared_count = 0 + + for row in range(start_row, sheet.max_row + 1): + play_ev_link = sheet.cell(row, play_ev_link_col).value + collector_ev_link = sheet.cell(row, collector_ev_link_col).value + + if play_ev_link is None or play_ev_link == '' or collector_ev_link is None or collector_ev_link == '': + continue + + processed_count += 1 + print(f"\n{'='*80}") + print(f"Processing row {row}:\n{play_ev_link}\n{collector_ev_link}") + print(f"{'='*80}") + + play_result = scrape_mtg_stocks_values(driver, play_ev_link) + play_cost = play_result['market_value'] + play_ev = play_result['expected_value'] + + sheet.cell(row, play_market_value_col).value = play_cost if play_cost is not None else '' + sheet.cell(row, play_expected_value_col).value = play_ev if play_ev is not None else '' + if play_cost is not None or play_ev is not None: + play_updated_count += 1 + print(f" ✓ Play updated - Expected: {play_ev}, Market: {play_cost}") + else: + play_cleared_count += 1 + print(f" ✗ Play cleared values - no matching booster type found") + + collector_result = scrape_mtg_stocks_values(driver, collector_ev_link) + collector_cost = collector_result['market_value'] + collector_ev = collector_result['expected_value'] + sheet.cell(row, collector_market_value_col).value = collector_cost if collector_cost is not None else '' + sheet.cell(row, collector_expected_value_col).value = collector_ev if collector_ev is not None else '' + if collector_cost is not None or collector_ev is not None: + collector_updated_count += 1 + print(f" ✓ Collector updated - Expected: {collector_ev}, Market: {collector_cost}") + else: + collector_cleared_count += 1 + print(f" ✗ Collector cleared values - no matching booster type found") + + print(f"\n{'='*80}") + print(f"Saving workbook...") + wb.save(workbook_name) + + print(f"\nComplete!") + print(f"Processed: {processed_count} entries") + print(f"Play updated: {play_updated_count} entries") + print(f"Collector updated: {collector_updated_count} entries") + print(f"Play fields cleared: {play_cleared_count} entries (no matching data)") + print(f"Collector fields cleared: {collector_cleared_count} entries (no matching data)") + + finally: + driver.quit() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/product_scraping/mtg_booster_expected_value_fetcher.py b/product_scraping/mtg_booster_expected_value_fetcher_mtg_stocks.py similarity index 90% rename from product_scraping/mtg_booster_expected_value_fetcher.py rename to product_scraping/mtg_booster_expected_value_fetcher_mtg_stocks.py index 1cd23be..5397823 100644 --- a/product_scraping/mtg_booster_expected_value_fetcher.py +++ b/product_scraping/mtg_booster_expected_value_fetcher_mtg_stocks.py @@ -171,13 +171,9 @@ def main(): sheet = wb[sheet_name] - # Find table boundaries and columns table_found = False start_row = None header_row = None - - # Search for table header - print("max sheet column: ", str(sheet.max_column)) for row in range(2, max(50, sheet.max_row + 1)): cell_value = str(sheet.cell(row, 1).value) @@ -210,13 +206,13 @@ def main(): header = str(sheet.cell(header_row, col).value).strip() if 'EV MTG Stocks Link' in header: ev_link_col = col - elif 'Play Booster Expected Market Value' in header: + elif 'Play Booster Expected Market Value MTG Stocks' in header: play_expected_value_col = col - elif 'Play Boost Sealed Market Value' in header: + elif 'Play Booster Sealed Market Value MTG Stocks' in header: play_market_value_col = col - elif 'Collector Booster Expected Market Value' in header: + elif 'Collector Booster Expected Market Value MTG Stocks' in header: collector_expected_value_col = col - elif 'Collector Boost Sealed Market Value' in header: + elif 'Collector Booster Sealed Market Value MTG Stocks' in header: collector_market_value_col = col print(f"Columns - EV Link: {ev_link_col}, Play Expected Value: {play_expected_value_col}, Play Market Value: {play_market_value_col}, Collector Expected Value: {collector_expected_value_col}, Collector Market Value: {collector_market_value_col}") @@ -225,19 +221,19 @@ def main(): print("Error: Could not find all required columns") print(f" EV MTG Stocks Link: {'Found' if ev_link_col else 'NOT FOUND'}") print(f" Play Booster Expected Market Value: {'Found' if play_expected_value_col else 'NOT FOUND'}") - print(f" Play Boost Sealed Market Value: {'Found' if play_market_value_col else 'NOT FOUND'}") + print(f" Play Booster Sealed Market Value: {'Found' if play_market_value_col else 'NOT FOUND'}") print(f" Collector Booster Expected Market Value: {'Found' if collector_expected_value_col else 'NOT FOUND'}") - print(f" Collector Boost Sealed Market Value: {'Found' if collector_market_value_col else 'NOT FOUND'}") + print(f" Collector Booster Sealed Market Value: {'Found' if collector_market_value_col else 'NOT FOUND'}") return - # Setup Selenium driver print("Setting up browser automation...") - driver = setup_driver(headless=False) # Set to False to see browser + driver = setup_driver(headless=False) if not driver: return + print(f'Sheet dimensions: {sheet.max_row} rows x {sheet.max_column} columns') + try: - # Process rows processed_count = 0 updated_count = 0 play_cleared_count = 0 @@ -245,24 +241,11 @@ def main(): for row in range(start_row, sheet.max_row + 1): ev_link = sheet.cell(row, ev_link_col).value - - # Check if row is empty - if not ev_link: - # Check if we've passed the end of the table - empty_count = 0 - for check_col in range(1, min(10, sheet.max_column + 1)): - if not sheet.cell(row, check_col).value: - empty_count += 1 - if empty_count >= 5: # If most columns are empty, assume end of table - break - continue - processed_count += 1 print(f"\n{'='*80}") print(f"Processing row {row}: {ev_link}") print(f"{'='*80}") - # Scrape values result = scrape_mtg_stocks_values(driver, ev_link) if result['found_play']: diff --git a/product_scraping/product_scraper manual backup.py b/product_scraping/product_scraper manual backup.py new file mode 100644 index 0000000..5eb56a7 --- /dev/null +++ b/product_scraping/product_scraper manual backup.py @@ -0,0 +1,708 @@ +import pandas as pd +from openpyxl import load_workbook, Workbook +from openpyxl.worksheet.worksheet import Worksheet +import requests +import re +import time +import random +from playwright.sync_api import sync_playwright, Browser, Page +from playwright.async_api import async_playwright +import asyncio +from aioconsole import ainput +from collections import defaultdict +from datetime import datetime, timedelta + +class Product_Scraper: + domain: str + page: Page + + def __init__(self, domain): + print("Setting up browser automation") + self.domain = domain + + @staticmethod + def parse_cost(cost_text): + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + try: + return float(cost_clean) / 100 + except ValueError: + return None + @classmethod + def parse_cost_from_pennies(cls, cost_text): + if not cost_text: + return None + cost_clean = cls.parse_cost(cost_text = cost_text) + if cost_clean is not None: + cost_clean = cost_clean / 100 + return cost_clean + + @classmethod + def parse_cost_chaoscards(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_cardmarket(cls, cost_text): + """Convert '141,30 €' format to float in EUR""" + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + cost_clean = cost_clean.replace(',', '.') + try: + return float(cost_clean) + except ValueError: + return None + @classmethod + def parse_cost_gameslore(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_magicmadhouse(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_newrealitiesgaming(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + + async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): + print(f" Loading page...") + self.page = await browser.new_page() + await self.page.goto(url = url) + await asyncio.sleep(random.uniform(20, 25)) + cost = None + active = None + try: + element = self.page.locator(selector = page_load_element_selector) + page_title = await self.page.title() + print(f" Page title: {page_title}") + + element = self.page.locator(selector = cost_selector) + text = await element.text_content() + print(f" Text: '{text}'") + cost = text + + active = None + if active_selector is None: + active = (cost is not None) + else: + try: + elements = await self.page.query_selector_all(selector = active_selector) + print(f'# active elements: {len(elements)}') + if len(elements) == 0: + active = True + else: + text = await elements[0].text_content() + text = text.strip() + print(f" Text: '{text}'") + active = (invalid_active_statuses is None or text not in invalid_active_statuses) + except Exception as e: + print(f" Selector failed: {e}") + + if cost is None or active is None: + print(f" ✗ No cost found") + # await ainput("Press Enter to continue to next URL...") + print(f"Cost: {cost}, Active: {active}") + + except Exception as e: + print(f" Error: {e}") + # await ainput("Press Enter to continue to next URL...") + return None, None + finally: + await self.page.close() + return cost, active + + async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate): + page_load_element_selector = "body > main.container > div.page-title-container" + cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = None + , invalid_active_statuses = [] + ) + cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text) + if cost is not None: + item_shipping_cost_in = 0 + if cost < 10: + item_shipping_cost_in = 2 + elif cost < 100: + item_shipping_cost_in = 8 + else: + item_shipping_cost_in = 20 + cost = cost * eur_to_gbp_rate + item_shipping_cost_in + active = (cost is not None) + return cost, active + + async def scrape_cost_and_active_playwright_chaoscards(self, browser, url): + cost_selector = '.price_inc > span:nth-child(2)' + active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = cost_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["Out of stock", "Coming soon"] + ) + cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_gameslore(self, browser, url): + cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' + active_selector = '.stock > span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = cost_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["OUT OF STOCK"] + ) + cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url): + page_load_element_selector = '.productView-title' + cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' + active_selector = '.alertBox.alertBox--error' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = [] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url): + button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button' + page_load_element_selector = button_selector + cost_selector = f'{button_selector} span:nth-child(2)' + active_selector = f'{button_selector} span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ['Out of stock'] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) + return cost, active + + async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate): + offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' + price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' + quantity_selector = 'div.amount-container > span:nth-child(1)' + + print(f" Loading page...") + self.page = await browser.new_page() + await self.page.goto(url = url) + await asyncio.sleep(random.uniform(20, 25)) + + try: + page_title = await self.page.title() + print(f" Page title: {page_title}") + + price_quantity_pairs = [] + try: + offer_containers = await self.page.query_selector_all(offer_container_selector) + print(f" Offer container selector: Found {len(offer_containers)} elements") + for offer_container in offer_containers: + price_element = await offer_container.query_selector(price_selector) + price_text = await price_element.text_content() + if '€' in price_text and re.search(r'\d', price_text): + print(f" ✓ Found price: {price_text}") + else: + price_text = None + + quantity_element = await offer_container.query_selector(quantity_selector) + quantity_text = await quantity_element.text_content() + + if price_text is None or quantity_text is None: + continue + price_quantity_pairs.append({ + 'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate + , 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text) + }) + except Exception as e: + print(f" Price selector failed: {e}") + # await ainput("Press enter to continue to next URL...") + return [] + finally: + await self.page.close() + return price_quantity_pairs + + +class TCG_Sole_Trader_Workbook_Container: + NAME_COLUMN_ACTIVE: str = 'Active' + NAME_COLUMN_INDEX_ROW: str = 'Index Row' + NAME_COLUMN_LINK: str = 'Link' + NAME_COLUMN_PRODUCT_ID: str = 'Product Id' + NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster' + NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box' + NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon' + NAME_COLUMN_SOURCE_NAME: str = 'Source Name' + NAME_COLUMN_UNIT_COST: str = 'Cost' + NAME_COLUMN_UNIT_PRICE: str = 'Price' + PRODUCT_WORKSHEET_NAME = 'Product' + SOURCING_WORKSHEET_NAME = 'Sourcing' + WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' + + index_column_active_sourcing: int + index_column_is_booster_product: int + index_column_is_booster_box_product: int + index_column_is_precon_product: int + index_column_link_sourcing: int + index_column_name_sourcing: int + index_column_product_id_product: int + index_column_product_id_sourcing: int + index_column_unit_cost_sourcing: int + index_column_unit_price_sourcing: int + index_row_header_product: int + index_row_header_sourcing: int + product_sheet: Worksheet + sourcing_sheet: Worksheet + workbook: Workbook + + def __init__(self): + print("Loading workbook...") + self.workbook = load_workbook(self.WORKBOOK_NAME) + + if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: + print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found") + return + if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: + print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found") + return + + self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME] + self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME] + + sourcing_table_found = False + for row in range(1, self.sourcing_sheet.max_row + 1): + if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): + self.index_row_header_sourcing = row + sourcing_table_found = True + break + + if not sourcing_table_found or not self.index_row_header_sourcing: + for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): + if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): + self.index_row_header_sourcing = row + sourcing_table_found = True + break + + if not sourcing_table_found: + print("Error: Could not find table 'tbl_Sourcing'") + return + + product_table_found = False + for row in range(1, self.product_sheet.max_row + 1): + if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): + self.index_row_header_product = row + product_table_found = True + break + + if not product_table_found: + print("Error: Could not find table 'tbl_Product'") + return + + for index_column in range(1, self.sourcing_sheet.max_column + 1): + header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() + if 'Source Name' == header: + self.index_column_name_sourcing = index_column + elif 'Source Link' == header: + self.index_column_link_sourcing = index_column + elif 'Source Unit Cost' == header: + self.index_column_unit_cost_sourcing = index_column + elif 'Sale Price' == header: + self.index_column_unit_price_sourcing = index_column + elif 'Active' == header: + self.index_column_active_sourcing = index_column + elif 'Product Id' == header: + self.index_column_product_id_sourcing = index_column + + for index_column in range(1, self.product_sheet.max_column + 1): + header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() + if 'Is Booster Box' == header: + self.index_column_is_booster_box_product = index_column + elif 'Is Booster' == header: + self.index_column_is_booster_product = index_column + elif 'Is Precon' == header: + self.index_column_is_precon_product = index_column + elif 'Product Id' == header: + self.index_column_product_id_product = index_column + + print(f"Sourcing max row: {self.sourcing_sheet.max_row}") + print(f"Sourcing header row: {self.index_row_header_sourcing}") + print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") + print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") + print(f"Product max row: {self.product_sheet.max_row}") + print(f"Product header row: {self.index_row_header_product}") + print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") + print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") + + if not all([ + self.index_column_name_sourcing + , self.index_column_link_sourcing + , self.index_column_unit_cost_sourcing + , self.index_column_unit_price_sourcing + , self.index_column_product_id_sourcing + , self.index_column_active_sourcing + , self.index_column_product_id_product + , self.index_column_is_booster_product + , self.index_column_is_booster_box_product + , self.index_column_is_precon_product + ]): + print("Error: Could not find required columns") + return + + @classmethod + def create_product_source_df(cls): + return pd.DataFrame(columns = [ + cls.NAME_COLUMN_INDEX_ROW + , cls.NAME_COLUMN_PRODUCT_ID + , cls.NAME_COLUMN_SOURCE_NAME + , cls.NAME_COLUMN_LINK + , cls.NAME_COLUMN_PRODUCT_IS_BOOSTER + , cls.NAME_COLUMN_UNIT_COST + , cls.NAME_COLUMN_UNIT_PRICE + , cls.NAME_COLUMN_ACTIVE + ]) + + def get_sourcing_entries(self): + product_sources = self.create_product_source_df() + try: + for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): + source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value + source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value + source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value + if not source_name or not source_link: + continue + print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") + product_is_booster = False + for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): + product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value + if product_id == source_product_id: + product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() + product_is_booster = (product_is_booster_text == "TRUE") + break + print(f"product is booster: {product_is_booster}") + + product_sources.loc[len(product_sources)] = [ + index_row + , source_product_id + , source_name + , source_link + , product_is_booster + , None # cost + , None # price + , None # active + ] + except Exception as e: + print(f"Error: {e}") + product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME) + return product_sources + + def clear_row_sourcing_sheet(self, index_row): + self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None + self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" + + def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None): + if unit_cost is not None: + self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost + if unit_price is not None: + self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price + if active is not None: + self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" + + def save_workbook(self): + print(f"\n{'='*60}") + print(f"Saving workbook...") + self.workbook.save(self.WORKBOOK_NAME) + + +class Cost_Fetcher: + ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On' + ACTIVE_FLAG: str = 'Active' + COST_FLAG: str = 'Cost' + DATA_FLAG: str = 'Data' + ERROR_FLAG: str = 'Error' + INDEX_DOMAIN_FLAG: str = 'Index Domain' + INDEX_ROW_FLAG: str = 'Index Row' + NAME_DOMAIN_CARD_MARKET: str = 'Card Market' + NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' + NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' + NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' + NAME_DOMAIN_NEW_REALITIES_GAMING: str = 'New Realities Gaming' + NAME_FLAG: str = 'Name' + PRICE_FLAG: str = 'Price' + SUCCESS_FLAG: str = 'Success' + URL_FLAG: str = 'Url' + + domain_names: list[str] + eur_to_gbp_rate: float + product_scrapers: list[Product_Scraper] + product_sources: pd.DataFrame + workbook_container: TCG_Sole_Trader_Workbook_Container + + def __init__(self): + self.domain_names = [ + self.NAME_DOMAIN_CARD_MARKET + , self.NAME_DOMAIN_CHAOS_CARDS + , self.NAME_DOMAIN_GAMES_LORE + , self.NAME_DOMAIN_MAGIC_MADHOUSE + , self.NAME_DOMAIN_NEW_REALITIES_GAMING + ] + self.domain_details = { + self.NAME_DOMAIN_CARD_MARKET: { + self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_CHAOS_CARDS: { + self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_GAMES_LORE: { + self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_MAGIC_MADHOUSE: { + self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_NEW_REALITIES_GAMING: { + self.NAME_FLAG: self.NAME_DOMAIN_NEW_REALITIES_GAMING + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_NEW_REALITIES_GAMING) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + } + product_scrapers = [] + for index_domain in range(len(self.domain_names)): + domain = self.domain_names[index_domain] + product_scraper = Product_Scraper(domain) + product_scrapers.append(product_scraper) + self.product_scrapers = product_scrapers + self.workbook_container = TCG_Sole_Trader_Workbook_Container() + self.get_eur_to_gbp_rate() + + def get_index_domain_from_name(self, domain_name): + for index_domain in range(len(self.domain_names)): + if (self.domain_names[index_domain] == domain_name): + return index_domain + raise ValueError(f'Domain does not exist: {domain_name}') + + def get_eur_to_gbp_rate(self): + try: + response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) + data = response.json() + self.eur_to_gbp_rate = data['rates']['GBP'] + except Exception as e: + print(f"Error fetching exchange rate: {e}") + print("Using fallback rate: 0.85") + self.eur_to_gbp_rate = 0.85 + + async def fetch_all(self): + try: + processed_count = 0 + updated_count = 0 + self.product_sources = self.workbook_container.get_sourcing_entries() + + # Group product sources by domain + domain_groups = {domain: [] for domain in self.domain_names} + for index_product_source, product_source in self.product_sources.iterrows(): + source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] + if source_name in domain_groups: + domain_groups[source_name].append(product_source) + + # Create one browser per domain and process all URLs for that domain + async with async_playwright() as p: + # Create one browser per domain that has URLs + domain_tasks = [] + for domain_name in self.domain_names: + if domain_groups[domain_name]: # Only if there are URLs for this domain + browser = await p.chromium.launch(headless=False) + task = self.process_domain_urls(browser, domain_name, domain_groups[domain_name]) + domain_tasks.append(task) + + # Process all domains in parallel + all_sourced_products = await asyncio.gather(*domain_tasks) + + # Flatten results from all domains + sourced_products = [] + for domain_results in all_sourced_products: + sourced_products.extend(domain_results) + + # Process results + for sourced_product in sourced_products: + index_row = sourced_product[self.INDEX_ROW_FLAG] + unit_cost = sourced_product[self.COST_FLAG] + unit_price = sourced_product[self.PRICE_FLAG] + active = sourced_product[self.ACTIVE_FLAG] + processed_count += 1 + if not active: + continue + updated_count += 1 + self.workbook_container.update_row_sourcing_sheet( + index_row = index_row + , unit_cost = unit_cost + , unit_price = unit_price + , active = active + ) + self.workbook_container.save_workbook() + print(f"\nComplete!") + print(f"Processed: {processed_count} entries") + print(f"Updated: {updated_count} costs") + except Exception as e: + print(f"Error: {e}") + + async def process_domain_urls(self, browser, domain_name, product_sources): + """Process all URLs for a single domain sequentially with rate limiting""" + results = [] + last_access_time = 0 + + try: + for product_source in product_sources: + # Rate limiting: wait 60s between requests to same domain + time_since_last = time.time() - last_access_time + if time_since_last < 45: + wait_time = 45 - time_since_last + random.uniform(0, 5) # 45-50s + print(f" [{domain_name}] Waiting {wait_time:.1f}s before next request...") + await asyncio.sleep(wait_time) + + # Process the URL + result = await self.fetch_single_with_browser(browser, domain_name, product_source) + results.append(result) + last_access_time = time.time() + + finally: + await browser.close() + + return results + + async def fetch_single_with_browser(self, browser, domain_name, product_source): + """Fetch a single URL using the provided browser""" + index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] + source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] + + self.workbook_container.clear_row_sourcing_sheet(index_row = index_row) + Cost_Fetcher.log_processing_new_row( + index_row = index_row + , source_link = source_link + ) + + index_domain = self.get_index_domain_from_name(domain_name) + cost = None + price = None + active = None + + try: + if domain_name == self.NAME_DOMAIN_CARD_MARKET: + if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: + price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket( + browser = browser + , url = source_link + , eur_to_gbp_rate = self.eur_to_gbp_rate + ) + price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs) + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket( + browser = browser + , url = source_link + , eur_to_gbp_rate = self.eur_to_gbp_rate + ) + elif domain_name == self.NAME_DOMAIN_CHAOS_CARDS: + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards( + browser = browser + , url = source_link + ) + elif domain_name == self.NAME_DOMAIN_GAMES_LORE: + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore( + browser = browser + , url = source_link + ) + elif domain_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse( + browser = browser + , url = source_link + ) + """ unverified + elif domain_name == self.NAME_DOMAIN_NEW_REALITIES_GAMING: + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_newrealitiesgaming( + browser = browser + , url = source_link + ) + """ + if ((cost is None and price is None) or active is None): + print(f" Error: Could not find cost on page") + + except Exception as e: + print(f" Error processing {source_link}: {e}") + + return self.make_result_data_json( + index_row = index_row + , cost = cost + , price = price + , active = active + ) + + @classmethod + def make_result_data_json(cls, index_row, cost = None, price = None, active = None): + return { + cls.INDEX_ROW_FLAG: index_row + , cls.COST_FLAG: cost + , cls.PRICE_FLAG: price + , cls.ACTIVE_FLAG: active + } + + def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs): + if not price_quantity_pairs: + return None + + max_quantity = 0 + price = None + + # First pass: look for quantity >= 8 + for price_quantity_pair in price_quantity_pairs: + eur_price = price_quantity_pair['price'] + quantity = price_quantity_pair['quantity'] + print(f" Found price: €{eur_price}") + print(f" Found quantity: {quantity}") + max_quantity = max(max_quantity, quantity) + + if quantity >= 8 and eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + return price + + # Second pass: use max quantity if no quantity >= 8 + print("Offer with quantity >= 8 not found") + for price_quantity_pair in price_quantity_pairs: + eur_price = price_quantity_pair['price'] + quantity = price_quantity_pair['quantity'] + + if (max_quantity <= 2 or quantity == max_quantity) and eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + return price + + return price + + @staticmethod + def log_processing_new_row(index_row, source_link): + print(f"\n{'='*60}") + print(f"Processing row {index_row}: {source_link}") + print(f"{'='*60}") + +async def main(): + cost_fetcher = Cost_Fetcher() + await cost_fetcher.fetch_all() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/product_scraping/product_scraper.py b/product_scraping/product_scraper.py index 20bb66e..7babd05 100644 --- a/product_scraping/product_scraper.py +++ b/product_scraping/product_scraper.py @@ -2,28 +2,177 @@ import pandas as pd from openpyxl import load_workbook, Workbook from openpyxl.worksheet.worksheet import Worksheet import requests -""" -from selenium import webdriver -from selenium.common.exceptions import StaleElementReferenceException -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.chrome.service import Service -""" import re import time import random from playwright.sync_api import sync_playwright, Browser, Page from playwright.async_api import async_playwright -# import playwright -# import undetected_chromedriver as uc -# from undetected_chromedriver import Chrome - import asyncio from aioconsole import ainput from collections import defaultdict from datetime import datetime, timedelta +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +import subprocess +import os + +CYCLE_INTERVAL_HOURS = 4 + + +class Email_Notifier: + def __init__(self, sender_email, sender_password, recipient_email, smtp_host='smtp.gmail.com', smtp_port=587): + self.sender_email = sender_email + self.sender_password = sender_password + self.recipient_email = recipient_email + self.smtp_host = smtp_host + self.smtp_port = smtp_port + + def send_email(self, subject, body_html): + msg = MIMEMultipart('alternative') + msg['From'] = self.sender_email + msg['To'] = self.recipient_email + msg['Subject'] = subject + msg.attach(MIMEText(body_html, 'html')) + try: + if self.smtp_port == 465: + with smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) as server: + server.login(self.sender_email, self.sender_password) + server.send_message(msg) + else: + with smtplib.SMTP(self.smtp_host, self.smtp_port) as server: + server.starttls() + server.login(self.sender_email, self.sender_password) + server.send_message(msg) + print(f"Email sent: {subject}") + return True + except Exception as e: + print(f"Error sending email: {e}") + return False + + +class Profitability_Monitor: + MTG_SET_WORKSHEET_NAME = 'MTG Set' + MTG_SET_DATA_START_ROW = 3 # Row 1 is a merged group header, row 2 has column names + + # MTG Set sheet column indices (1-based) + COL_MTG_SET_NAME = 2 + COL_PLAY_PROFIT = 45 + COL_PLAY_MARGIN = 46 + COL_COLLECTOR_PROFIT = 53 + COL_COLLECTOR_MARGIN = 54 + COL_PLAY_SINGLES_PROFIT = 59 + COL_PLAY_SINGLES_MARGIN = 60 + COL_COLLECTOR_SINGLES_PROFIT = 73 + COL_COLLECTOR_SINGLES_MARGIN = 74 + + # Product sheet column indices (1-based) for precons + COL_PRODUCT_NAME = 2 + COL_PRODUCT_IS_PRECON = 7 + COL_PRODUCT_MIN_COST = 8 + COL_PRODUCT_PROFIT = 10 + + PROFIT_CHECKS = [ + {'profit_col': COL_PLAY_PROFIT, 'margin_col': COL_PLAY_MARGIN, 'action_buy': 'Buy Play Booster', 'action_no_buy': 'DO NOT Buy Play Booster'}, + {'profit_col': COL_COLLECTOR_PROFIT, 'margin_col': COL_COLLECTOR_MARGIN, 'action_buy': 'Buy Collector Booster', 'action_no_buy': 'DO NOT Buy Collector Booster'}, + {'profit_col': COL_PLAY_SINGLES_PROFIT, 'margin_col': COL_PLAY_SINGLES_MARGIN, 'action_buy': 'Split Play Booster', 'action_no_buy': 'DO NOT Split Play Booster'}, + {'profit_col': COL_COLLECTOR_SINGLES_PROFIT, 'margin_col': COL_COLLECTOR_SINGLES_MARGIN, 'action_buy': 'Split Collector Booster','action_no_buy': 'DO NOT Split Collector Booster'}, + ] + + def read_states(self, workbook_path): + """Load the workbook with data_only=True to read formula-calculated profit values.""" + wb = load_workbook(workbook_path, data_only=True) + mtg_set_states = {} + precon_states = {} + + mtg_sheet = wb[self.MTG_SET_WORKSHEET_NAME] + for row in range(self.MTG_SET_DATA_START_ROW, mtg_sheet.max_row + 1): + set_name = mtg_sheet.cell(row, self.COL_MTG_SET_NAME).value + if not set_name: + continue + set_data = {} + for check in self.PROFIT_CHECKS: + profit = mtg_sheet.cell(row, check['profit_col']).value + margin = mtg_sheet.cell(row, check['margin_col']).value + set_data[check['profit_col']] = { + 'profit': profit, + 'margin': margin, + 'is_profitable': isinstance(profit, (int, float)) and profit > 0, + } + mtg_set_states[set_name] = set_data + + prod_sheet = wb['Product'] + for row in range(2, prod_sheet.max_row + 1): + is_precon = prod_sheet.cell(row, self.COL_PRODUCT_IS_PRECON).value + if not is_precon: + continue + name = prod_sheet.cell(row, self.COL_PRODUCT_NAME).value + profit = prod_sheet.cell(row, self.COL_PRODUCT_PROFIT).value + min_cost = prod_sheet.cell(row, self.COL_PRODUCT_MIN_COST).value + if not name: + continue + margin = (profit / min_cost) if (isinstance(profit, (int, float)) and isinstance(min_cost, (int, float)) and min_cost != 0) else None + precon_states[name] = { + 'profit': profit, + 'margin': margin, + 'is_profitable': isinstance(profit, (int, float)) and profit > 0, + } + + wb.close() + return {'mtg_set': mtg_set_states, 'precon': precon_states} + + def find_changes(self, old_states, new_states): + """Compare old and new profit states; return list of alert dicts for any crossings of the 0 threshold.""" + alerts = [] + + for check in self.PROFIT_CHECKS: + col = check['profit_col'] + for set_name, new_set_data in new_states['mtg_set'].items(): + new_entry = new_set_data.get(col, {}) + old_entry = old_states.get('mtg_set', {}).get(set_name, {}).get(col, {}) + old_profitable = old_entry.get('is_profitable', None) + new_profitable = new_entry.get('is_profitable', False) + if old_profitable is None or old_profitable == new_profitable: + continue + action = check['action_buy'] if new_profitable else check['action_no_buy'] + margin = new_entry.get('margin') + margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A" + alerts.append({'name': set_name, 'action': action, 'margin': margin_str}) + + for product_name, new_entry in new_states['precon'].items(): + old_entry = old_states.get('precon', {}).get(product_name, {}) + old_profitable = old_entry.get('is_profitable', None) + new_profitable = new_entry.get('is_profitable', False) + if old_profitable is None or old_profitable == new_profitable: + continue + action = 'Buy Precon' if new_profitable else 'DO NOT Buy Precon' + margin = new_entry.get('margin') + margin_str = f"{margin * 100:.1f}%" if isinstance(margin, (int, float)) else "N/A" + alerts.append({'name': product_name, 'action': action, 'margin': margin_str}) + + return alerts + + def format_email_html(self, alerts): + rows = ''.join( + f"{a['name']}" + f"{a['action']}" + f"{a['margin']}" + for a in alerts + ) + return ( + "" + "

TCG Profitability Alert

" + "" + "" + "" + "" + "" + "" + f"{rows}" + "
MTG SetActionMargin
" + f"

Generated {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

" + "" + ) class Product_Scraper: domain: str @@ -50,12 +199,12 @@ class Product_Scraper: if cost_clean is not None: cost_clean = cost_clean / 100 return cost_clean + @classmethod def parse_cost_chaoscards(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_cardmarket(cls, cost_text): - # return cls.parse_cost(cost_text = cost_text) """Convert '141,30 €' format to float in EUR""" if not cost_text: return None @@ -71,15 +220,18 @@ class Product_Scraper: @classmethod def parse_cost_magicmadhouse(cls, cost_text): return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_newrealitiesgaming(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) - async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0): + async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): print(f" Loading page...") self.page = await browser.new_page() - await self.page.goto(url = url) cost = None active = None try: - # Automatically waits up to 30s by default + await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000) + await asyncio.sleep(random.uniform(20, 25)) element = self.page.locator(selector = page_load_element_selector) page_title = await self.page.title() print(f" Page title: {page_title}") @@ -94,11 +246,13 @@ class Product_Scraper: active = (cost is not None) else: try: - elements = await self.page.query_selector_all(selector = cost_selector) + elements = await self.page.query_selector_all(selector = active_selector) + print(f'# active elements: {len(elements)}') if len(elements) == 0: active = True else: text = await elements[0].text_content() + text = text.strip() print(f" Text: '{text}'") active = (invalid_active_statuses is None or text not in invalid_active_statuses) except Exception as e: @@ -106,14 +260,15 @@ class Product_Scraper: if cost is None or active is None: print(f" ✗ No cost found") - print(f"Cost: {cost}, Active: {active}") - await ainput("Press Enter to continue to next URL...") + # await ainput("Press Enter to continue to next URL...") + print(f"Cost: {cost}, Active: {active}") except Exception as e: print(f" Error: {e}") - await ainput("Press Enter to continue to next URL...") + # await ainput("Press Enter to continue to next URL...") return None, None finally: + await self.page.close() return cost, active async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate): @@ -126,9 +281,8 @@ class Product_Scraper: , cost_selector = cost_selector , active_selector = None , invalid_active_statuses = [] - , min_delay = 15 ) - cost = Product_Scraper.parse_cost_cardmarket(cost_text) + cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text) if cost is not None: item_shipping_cost_in = 0 if cost < 10: @@ -142,34 +296,31 @@ class Product_Scraper: return cost, active async def scrape_cost_and_active_playwright_chaoscards(self, browser, url): - # page_load_element_selector = '#prod_title' cost_selector = '.price_inc > span:nth-child(2)' - active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' + active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url - , page_load_element_selector = cost_selector # page_load_element_selector + , page_load_element_selector = cost_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["Out of stock", "Coming soon"] - , min_delay = 15 ) - cost = Product_Scraper.parse_cost_chaoscards(cost_text) + cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text) return cost, active async def scrape_cost_and_active_playwright_gameslore(self, browser, url): - # page_load_element_selector = '.page-title' cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' active_selector = '.stock > span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url - , page_load_element_selector = cost_selector # page_load_element_selector + , page_load_element_selector = cost_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["OUT OF STOCK"] ) - cost = Product_Scraper.parse_cost_gameslore(cost_text) + cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text) return cost, active async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url): @@ -184,7 +335,23 @@ class Product_Scraper: , active_selector = active_selector , invalid_active_statuses = [] ) - cost = Product_Scraper.parse_cost_magicmadhouse(cost_text) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url): + button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button' + page_load_element_selector = button_selector + cost_selector = f'{button_selector} span:nth-child(2)' + active_selector = f'{button_selector} span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ['Out of stock'] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) return cost, active async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate): @@ -194,13 +361,12 @@ class Product_Scraper: print(f" Loading page...") self.page = await browser.new_page() - await self.page.goto(url = url) - try: - # Automatically waits up to 30s by default + await self.page.goto(url=url, wait_until="domcontentloaded", timeout=30000) + await asyncio.sleep(random.uniform(20, 25)) page_title = await self.page.title() print(f" Page title: {page_title}") - + price_quantity_pairs = [] try: offer_containers = await self.page.query_selector_all(offer_container_selector) @@ -219,14 +385,15 @@ class Product_Scraper: if price_text is None or quantity_text is None: continue price_quantity_pairs.append({ - 'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate - , 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text) + 'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate + , 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text) }) except Exception as e: print(f" Price selector failed: {e}") - await ainput("Press enter to continue to next URL...") + # await ainput("Press enter to continue to next URL...") return [] finally: + await self.page.close() return price_quantity_pairs @@ -370,7 +537,6 @@ class TCG_Sole_Trader_Workbook_Container: def get_sourcing_entries(self): product_sources = self.create_product_source_df() try: - # products = [] for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value @@ -381,15 +547,12 @@ class TCG_Sole_Trader_Workbook_Container: product_is_booster = False for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value - # print(f"found product: id {product_id}") if product_id == source_product_id: product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() - # print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}") product_is_booster = (product_is_booster_text == "TRUE") break print(f"product is booster: {product_is_booster}") - # products.append((index_row, source_product_id, source_name, source_link, product_is_booster)) product_sources.loc[len(product_sources)] = [ index_row , source_product_id @@ -435,32 +598,31 @@ class Cost_Fetcher: NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' + NAME_DOMAIN_NEW_REALITIES_GAMING: str = 'New Realities Gaming' NAME_FLAG: str = 'Name' PRICE_FLAG: str = 'Price' SUCCESS_FLAG: str = 'Success' URL_FLAG: str = 'Url' - active_row_indices = list[int] domain_names: list[str] eur_to_gbp_rate: float product_scrapers: list[Product_Scraper] product_sources: pd.DataFrame workbook_container: TCG_Sole_Trader_Workbook_Container - def __init__(self): + def __init__(self, email_notifier=None): + self.email_notifier = email_notifier + self.profitability_monitor = Profitability_Monitor() + self.workbook_save_lock = asyncio.Lock() self.domain_names = [ - self.NAME_DOMAIN_CARD_MARKET - , self.NAME_DOMAIN_CHAOS_CARDS + # self.NAME_DOMAIN_CARD_MARKET + self.NAME_DOMAIN_CHAOS_CARDS , self.NAME_DOMAIN_GAMES_LORE , self.NAME_DOMAIN_MAGIC_MADHOUSE + # , self.NAME_DOMAIN_NEW_REALITIES_GAMING ] self.domain_details = { - self.NAME_DOMAIN_CARD_MARKET: { - self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET - , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) - , self.ACCESSED_LAST_ON_FLAG: 0 - } - , self.NAME_DOMAIN_CHAOS_CARDS: { + self.NAME_DOMAIN_CHAOS_CARDS: { self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS) , self.ACCESSED_LAST_ON_FLAG: 0 @@ -476,19 +638,28 @@ class Cost_Fetcher: , self.ACCESSED_LAST_ON_FLAG: 0 } } + """ + self.NAME_DOMAIN_CARD_MARKET: { + self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + """ + """ + , self.NAME_DOMAIN_NEW_REALITIES_GAMING: { + self.NAME_FLAG: self.NAME_DOMAIN_NEW_REALITIES_GAMING + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_NEW_REALITIES_GAMING) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + """ product_scrapers = [] - active_row_indices = [] for index_domain in range(len(self.domain_names)): domain = self.domain_names[index_domain] product_scraper = Product_Scraper(domain) product_scrapers.append(product_scraper) - active_row_indices.append(None) self.product_scrapers = product_scrapers - self.active_row_indices = active_row_indices - self.workbook_container = TCG_Sole_Trader_Workbook_Container() - self.get_eur_to_gbp_rate() - - self.domain_locks = defaultdict(asyncio.Lock) + self.workbook_container = None + self.eur_to_gbp_rate = 0.85 def get_index_domain_from_name(self, domain_name): for index_domain in range(len(self.domain_names)): @@ -508,134 +679,198 @@ class Cost_Fetcher: async def fetch_all(self): try: + if self.email_notifier: + sent = self.email_notifier.send_email( + subject=f"TCG Profitability Scanner Boot - {datetime.now().strftime('%Y-%m-%d %H:%M')}" + , body_html="

Booted

" + ) + if sent: + print("Sent boot test email") + else: + print("Error sending boot test email") + # Reload workbook and exchange rate fresh each cycle + self.get_eur_to_gbp_rate() + self.workbook_container = TCG_Sole_Trader_Workbook_Container() + self.product_sources = self.workbook_container.get_sourcing_entries() + + workbook_path = os.path.abspath(TCG_Sole_Trader_Workbook_Container.WORKBOOK_NAME) + + # Snapshot profitability before any scraping + print("Reading current profitability states...") + old_profit_states = self.profitability_monitor.read_states(workbook_path) + + # Group product sources by domain + domain_groups = {domain: [] for domain in self.domain_names} + for _, product_source in self.product_sources.iterrows(): + source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] + if source_name in domain_groups: + domain_groups[source_name].append(product_source) + + # Create one browser per domain and process all URLs; saves workbook after each item processed_count = 0 updated_count = 0 - self.product_sources = self.workbook_container.get_sourcing_entries() - # sourced_products = await self.scrape_with_browser_pool() - # for sourced_product in sourced_products: - # for product_source in self.product_sources: - # sourced_product = await self.fetch_single(product_source = product_source) - # for index_product_source, product_source in self.product_sources.iterrows(): - # sourced_product = await self.fetch_single(product_source = product_source) - # Create tasks for parallel execution - tasks = [] - for index_product_source, product_source in self.product_sources.iterrows(): - task = self.fetch_single(product_source = product_source) - tasks.append(task) + async with async_playwright() as p: + domain_tasks = [] + for domain_name in self.domain_names: + if domain_groups[domain_name]: + browser = await p.chromium.launch(headless=False) + task = self.process_domain_urls(browser, domain_name, domain_groups[domain_name]) + domain_tasks.append(task) - # Execute all tasks in parallel - sourced_products = await asyncio.gather(*tasks) + all_domain_results = await asyncio.gather(*domain_tasks) - # Process results - for sourced_product in sourced_products: - index_row = sourced_product[self.workbook_container.NAME_COLUMN_INDEX_ROW] - unit_cost = sourced_product[self.workbook_container.NAME_COLUMN_UNIT_COST] - unit_price = sourced_product[self.workbook_container.index_column_unit_price_sourcing] - active = sourced_product[self.workbook_container.index_column_active_sourcing] - processed_count += 1 - if not active: - continue - updated_count += 1 - self.workbook_container.update_row_sourcing_sheet( - index_row = index_row - , unit_cost = unit_cost - , unit_price = unit_price - , active = active - ) - self.workbook_container.save_workbook() - print(f"\nComplete!") - print(f"Processed: {processed_count} entries") - print(f"Updated: {updated_count} costs") + for domain_results in all_domain_results: + for result in domain_results: + processed_count += 1 + if result[self.ACTIVE_FLAG]: + updated_count += 1 + + print(f"\nComplete! Processed: {processed_count} entries, Updated: {updated_count} costs") + + # Recalculate spreadsheet formulas and check for profitability changes + if self.email_notifier: + recalculated = self.recalculate_workbook(workbook_path) + if recalculated: + new_profit_states = self.profitability_monitor.read_states(workbook_path) + alerts = self.profitability_monitor.find_changes(old_profit_states, new_profit_states) + if alerts: + html = self.profitability_monitor.format_email_html(alerts) + self.email_notifier.send_email( + subject=f"TCG Profitability Alert - {datetime.now().strftime('%Y-%m-%d %H:%M')}" + , body_html=html + ) + print(f"Sent {len(alerts)} profitability alert(s).") + else: + print("No profitability changes detected.") except Exception as e: - print(f"Error: {e}") - """ - async def scrape_with_browser_pool(self): - count_domains = len(self.domain_names) - - async with async_playwright() as p: - browsers = [await p.chromium.launch(headless = False) for _ in range(count_domains)] - - try: - tasks = [] - # for i, url in enumerate(urls): - for index_product_source in range(len(self.product_sources)): - product_source = self.product_sources.loc[index_product_source] - browser = browsers[index_product_source % count_domains] - tasks.append(self.fetch_single(browser, product_source)) - - return await asyncio.gather(*tasks) - finally: - for browser in browsers: - await browser.close() - """ - async def fetch_single(self, product_source): # browser, - async with async_playwright() as p: - browser = await p.chromium.launch(headless = False) - print(f'Product source: {product_source}') - index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] - source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] - source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] - index_domain = None - try: - index_domain = self.get_index_domain_from_name(source_name) - except: - await browser.close() - return self.make_result_data_json(index_row = index_row) - domain_details = self.domain_details[source_name] - self.workbook_container.clear_row_sourcing_sheet(index_row = index_row) - Cost_Fetcher.log_processing_new_row( - index_row = index_row - , source_link = source_link - ) - - cost = None - price = None - active = None - if source_name == self.NAME_DOMAIN_CARD_MARKET: - while (self.active_row_indices[index_domain] is None or time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): - await asyncio.sleep(random.uniform(3, 5)) - self.active_row_indices[index_domain] = index_row - if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: - price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) - price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs) - cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) - elif source_name == self.NAME_DOMAIN_CHAOS_CARDS: - while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): - await asyncio.sleep(random.uniform(3, 5)) - self.active_row_indices[index_domain] = index_row - cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(browser = browser, url = source_link) - elif source_name == self.NAME_DOMAIN_GAMES_LORE: - while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): - await asyncio.sleep(random.uniform(3, 5)) - self.active_row_indices[index_domain] = index_row - cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(browser = browser, url = source_link) - elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: - while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): - await asyncio.sleep(random.uniform(3, 5)) - self.active_row_indices[index_domain] = index_row - cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(browser = browser, url = source_link) - - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time() - self.active_row_indices[index_domain] = None - - if ((cost is None and price is None) or active is None): - print(f" Error: Could not find cost on page") - + import traceback + print(f"Error in fetch_all: {e}") + traceback.print_exc() + + async def process_domain_urls(self, browser, domain_name, product_sources): + """Process all URLs for a single domain sequentially with rate limiting. + Saves the workbook immediately after each item is cleared (before scrape) + and again after each result is written (after scrape).""" + results = [] + last_access_time = 0 + + try: + for product_source in product_sources: + # Rate limiting: wait between requests to the same domain + time_since_last = time.time() - last_access_time + if time_since_last < 45: + wait_time = 45 - time_since_last + random.uniform(0, 5) + print(f" [{domain_name}] Waiting {wait_time:.1f}s before next request...") + await asyncio.sleep(wait_time) + + index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] + + # Clear stale data and persist before scraping + async with self.workbook_save_lock: + self.workbook_container.clear_row_sourcing_sheet(index_row) + self.workbook_container.save_workbook() + + result = await self.fetch_single_with_browser(browser, domain_name, product_source) + + # Write fresh data and persist immediately + async with self.workbook_save_lock: + if result[self.ACTIVE_FLAG]: + self.workbook_container.update_row_sourcing_sheet( + index_row=result[self.INDEX_ROW_FLAG] + , unit_cost=result[self.COST_FLAG] + , unit_price=result[self.PRICE_FLAG] + , active=result[self.ACTIVE_FLAG] + ) + self.workbook_container.save_workbook() + + results.append(result) + last_access_time = time.time() + + finally: await browser.close() - return self.make_result_data_json( - index_row = index_row - , cost = cost - , price = price - , active = active - ) - @classmethod - def make_result(cls, url, success, data, error): - return { - cls.URL_FLAG: url - , cls.SUCCESS_FLAG: success - , cls.DATA_FLAG: data - , cls.ERROR_FLAG: error - } + + return results + + async def fetch_single_with_browser(self, browser, domain_name, product_source): + """Fetch a single URL using the provided browser""" + index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] + source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] + + Cost_Fetcher.log_processing_new_row( + index_row = index_row + , source_link = source_link + ) + + index_domain = self.get_index_domain_from_name(domain_name) + cost = None + price = None + active = None + + try: + did_attempt = False + """ + if domain_name == self.NAME_DOMAIN_CARD_MARKET: + if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: + price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket( + browser = browser + , url = source_link + , eur_to_gbp_rate = self.eur_to_gbp_rate + ) + price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs) + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket( + browser = browser + , url = source_link + , eur_to_gbp_rate = self.eur_to_gbp_rate + ) + el""" + if domain_name == self.NAME_DOMAIN_CHAOS_CARDS: + did_attempt = True + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards( + browser = browser + , url = source_link + ) + elif domain_name == self.NAME_DOMAIN_GAMES_LORE: + did_attempt = True + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore( + browser = browser + , url = source_link + ) + elif domain_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: + did_attempt = True + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse( + browser = browser + , url = source_link + ) + """ unverified + elif domain_name == self.NAME_DOMAIN_NEW_REALITIES_GAMING: + cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_newrealitiesgaming( + browser = browser + , url = source_link + ) + """ + + if ( + did_attempt + and ( + ( + cost is None + and price is None + ) + or active is None + ) + ): + print(f" Error: Could not find cost on page") + + except Exception as e: + print(f" Error processing {source_link}: {e}") + + return self.make_result_data_json( + index_row = index_row + , cost = cost + , price = price + , active = active + ) + @classmethod def make_result_data_json(cls, index_row, cost = None, price = None, active = None): return { @@ -644,44 +879,82 @@ class Cost_Fetcher: , cls.PRICE_FLAG: price , cls.ACTIVE_FLAG: active } + def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs): - if not price_quantity_pairs: return None, False + if not price_quantity_pairs: + return None + max_quantity = 0 - updated_row_price = False + price = None + + # First pass: look for quantity >= 8 for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") max_quantity = max(max_quantity, quantity) - if quantity >= 8: - if eur_price: - price = eur_price * self.eur_to_gbp_rate - print(f" Converted: €{eur_price:.2f} → £{price:.2f}") - # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price - updated_row_price = True - # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") - break - else: - print(f" Error: Could not parse price") - if not updated_row_price: - print("Offer with quantity >= 8 not found") - for price_quantity_pair in price_quantity_pairs: - eur_price = price_quantity_pair['price'] - quantity = price_quantity_pair['quantity'] - print(f" Found price: €{eur_price}") - print(f" Found quantity: {quantity}") - if max_quantity <= 2 or quantity == max_quantity: - if eur_price: - price = eur_price * self.eur_to_gbp_rate - print(f" Converted: €{eur_price:.2f} → £{price:.2f}") - # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price - updated_row_price = True - # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") - break - else: - print(f" Error: Could not parse price") - return price, True + + if quantity >= 8 and eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + return price + + # Second pass: use max quantity if no quantity >= 8 + print("Offer with quantity >= 8 not found") + for price_quantity_pair in price_quantity_pairs: + eur_price = price_quantity_pair['price'] + quantity = price_quantity_pair['quantity'] + + if (max_quantity <= 2 or quantity == max_quantity) and eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + return price + + return price + + def recalculate_workbook(self, workbook_path): + """Run LibreOffice headless to recalculate all formula cells after saving new data. + Returns True if recalculation succeeded, False otherwise.""" + workbook_dir = os.path.dirname(workbook_path) + workbook_name = os.path.basename(workbook_path) + lock_file = os.path.join(workbook_dir, f'.~lock.{workbook_name}#') + + if os.path.exists(lock_file): + print(f"Warning: '{workbook_name}' is open in LibreOffice — skipping recalculation to avoid conflict.") + return False + + print("Recalculating workbook formulas with LibreOffice headless...") + try: + result = subprocess.run( + ['libreoffice', '--headless', '--norestore', '--convert-to', 'xlsx', '--outdir', workbook_dir, workbook_path] + , capture_output=True, text=True, timeout=120 + ) + if result.returncode == 0: + print("Recalculation complete.") + return True + print(f"Recalculation failed (exit {result.returncode}): {result.stderr.strip()}") + except subprocess.TimeoutExpired: + print("LibreOffice recalculation timed out.") + except Exception as e: + print(f"Error during recalculation: {e}") + return False + + async def run_continuous(self): + """Run fetch_all in an infinite loop, sleeping CYCLE_INTERVAL_HOURS between cycles.""" + while True: + print(f"\n{'='*60}") + print(f"Cycle started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + try: + await self.fetch_all() + except Exception as e: + import traceback + print(f"Unhandled cycle error: {e}") + traceback.print_exc() + next_run = datetime.now() + timedelta(hours=CYCLE_INTERVAL_HOURS) + print(f"Next cycle: {next_run.strftime('%Y-%m-%d %H:%M:%S')} (in {CYCLE_INTERVAL_HOURS}h)") + await asyncio.sleep(CYCLE_INTERVAL_HOURS * 3600) + @staticmethod def log_processing_new_row(index_row, source_link): print(f"\n{'='*60}") @@ -689,8 +962,31 @@ class Cost_Fetcher: print(f"{'='*60}") async def main(): - cost_fetcher = Cost_Fetcher() - await cost_fetcher.fetch_all() + sender_email = os.environ.get('EMAIL_SENDER') + sender_password = os.environ.get('EMAIL_PASSWORD') + recipient_email = os.environ.get('EMAIL_RECIPIENT') + smtp_host = os.environ.get('SMTP_HOST', 'smtp.gmail.com') + smtp_port = int(os.environ.get('SMTP_PORT', '587')) + + email_notifier = None + if sender_email and sender_password and recipient_email: + email_notifier = Email_Notifier( + sender_email=sender_email + , sender_password=sender_password + , recipient_email=recipient_email + , smtp_host=smtp_host + , smtp_port=smtp_port + ) + print(f"Email notifications enabled: {sender_email} → {recipient_email}") + else: + print( + "Email notifications disabled.\n" + "Set EMAIL_SENDER, EMAIL_PASSWORD, and EMAIL_RECIPIENT environment variables to enable.\n" + "For Gmail, use an App Password (https://myaccount.google.com/apppasswords)." + ) + + cost_fetcher = Cost_Fetcher(email_notifier=email_notifier) + await cost_fetcher.run_continuous() if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/product_scraping/single run/__init__.py b/product_scraping/single run/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/product_scraping/single run/product_scraper.py b/product_scraping/single run/product_scraper.py new file mode 100644 index 0000000..1b713a3 --- /dev/null +++ b/product_scraping/single run/product_scraper.py @@ -0,0 +1,237 @@ +""" +Project: Shuffle & Skirmish Market Scraper +Author: Edward Middleton-Smith + Shuffle & Skirmish + +Technology: Business Objects +Feature: Product Scraper Class +""" + +import pandas as pd +from openpyxl import load_workbook, Workbook +from openpyxl.worksheet.worksheet import Worksheet +import requests +import re +import time +import random +from playwright.sync_api import sync_playwright, Browser, Page +from playwright.async_api import async_playwright +import asyncio +from aioconsole import ainput +from collections import defaultdict +from datetime import datetime, timedelta + +class Product_Scraper: + domain: str + page: Page + + def __init__(self, domain): + print("Setting up browser automation") + self.domain = domain + + @staticmethod + def parse_cost(cost_text): + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + try: + return float(cost_clean) / 100 + except ValueError: + return None + + @classmethod + def parse_cost_chaoscards(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_cardmarket(cls, cost_text): + """Convert '141,30 €' format to float in EUR""" + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + cost_clean = cost_clean.replace(',', '.') + try: + return float(cost_clean) + except ValueError: + return None + @classmethod + def parse_cost_gameslore(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_magicmadhouse(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_newrealitiesgaming(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + + async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): + print(f" Loading page...") + self.page = await browser.new_page() + await self.page.goto(url = url) + await asyncio.sleep(random.uniform(20, 25)) + cost = None + active = None + try: + element = self.page.locator(selector = page_load_element_selector) + page_title = await self.page.title() + print(f" Page title: {page_title}") + + element = self.page.locator(selector = cost_selector) + text = await element.text_content() + print(f" Text: '{text}'") + cost = text + + active = None + if active_selector is None: + active = (cost is not None) + else: + try: + elements = await self.page.query_selector_all(selector = active_selector) + print(f'# active elements: {len(elements)}') + if len(elements) == 0: + active = True + else: + text = await elements[0].text_content() + text = text.strip() + print(f" Text: '{text}'") + active = (invalid_active_statuses is None or text not in invalid_active_statuses) + except Exception as e: + print(f" Selector failed: {e}") + + if cost is None or active is None: + print(f" ✗ No cost found") + # await ainput("Press Enter to continue to next URL...") + print(f"Cost: {cost}, Active: {active}") + + except Exception as e: + print(f" Error: {e}") + # await ainput("Press Enter to continue to next URL...") + return None, None + finally: + await self.page.close() + return cost, active + + async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate): + page_load_element_selector = "body > main.container > div.page-title-container" + cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = None + , invalid_active_statuses = [] + ) + cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text) + if cost is not None: + item_shipping_cost_in = 0 + if cost < 10: + item_shipping_cost_in = 2 + elif cost < 100: + item_shipping_cost_in = 8 + else: + item_shipping_cost_in = 20 + cost = cost * eur_to_gbp_rate + item_shipping_cost_in + active = (cost is not None) + return cost, active + + async def scrape_cost_and_active_playwright_chaoscards(self, browser, url): + cost_selector = '.price_inc > span:nth-child(2)' + active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = cost_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["Out of stock", "Coming soon"] + ) + cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_gameslore(self, browser, url): + cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' + active_selector = '.stock > span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = cost_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["OUT OF STOCK"] + ) + cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url): + page_load_element_selector = '.productView-title' + cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' + active_selector = '.alertBox.alertBox--error' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = [] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) + return cost, active + + async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url): + button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button' + page_load_element_selector = button_selector + cost_selector = f'{button_selector} span:nth-child(2)' + active_selector = f'{button_selector} span:nth-child(1)' + cost_text, active = await self.scrape_cost_and_active_playwright( + browser = browser + , url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ['Out of stock'] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text) + return cost, active + + async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate): + offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' + price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' + quantity_selector = 'div.amount-container > span:nth-child(1)' + + print(f" Loading page...") + self.page = await browser.new_page() + await self.page.goto(url = url) + await asyncio.sleep(random.uniform(20, 25)) + + try: + page_title = await self.page.title() + print(f" Page title: {page_title}") + + price_quantity_pairs = [] + try: + offer_containers = await self.page.query_selector_all(offer_container_selector) + print(f" Offer container selector: Found {len(offer_containers)} elements") + for offer_container in offer_containers: + price_element = await offer_container.query_selector(price_selector) + price_text = await price_element.text_content() + if '€' in price_text and re.search(r'\d', price_text): + print(f" ✓ Found price: {price_text}") + else: + price_text = None + + quantity_element = await offer_container.query_selector(quantity_selector) + quantity_text = await quantity_element.text_content() + + if price_text is None or quantity_text is None: + continue + price_quantity_pairs.append({ + 'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate + , 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text) + }) + except Exception as e: + print(f" Price selector failed: {e}") + # await ainput("Press enter to continue to next URL...") + return [] + finally: + await self.page.close() + return price_quantity_pairs \ No newline at end of file diff --git a/product_scraping/tcg-scraper.service b/product_scraping/tcg-scraper.service new file mode 100644 index 0000000..a6bbd0f --- /dev/null +++ b/product_scraping/tcg-scraper.service @@ -0,0 +1,33 @@ +[Unit] +Description=TCG Sole Trader Price Scraper +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=teddy +WorkingDirectory=/media/teddy/3_6GB-SSD-Storage/Documents/Programming/python/Yu-Gi-Oh_Card_Fetcher/product_scraping +ExecStart=/media/teddy/3_6GB-SSD-Storage/Documents/Programming/python/Yu-Gi-Oh_Card_Fetcher/env_api/bin/python3 product_scraper.py + +# Email credentials — fill these in before installing +Environment=EMAIL_SENDER=bot@partsltd.co.uk +Environment=EMAIL_PASSWORD=@sJt1b5Cliu6 +Environment=EMAIL_RECIPIENT=teddy@shuffleandskirmish.co.uk +# ,teddy@partsltd.co.uk,edward.middletonsmith@gmail.com +# Optional overrides (defaults shown): +Environment=SMTP_HOST=mail.partsltd.co.uk +Environment=SMTP_PORT=465 + +# Restart automatically if it crashes, but not if you stop it intentionally +Restart=on-failure +RestartSec=30 + +# Give the browser enough time to shut down cleanly on stop +TimeoutStopSec=30 + +# Send stdout/stderr to the journal (view with: journalctl -fu tcg-scraper) +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target diff --git a/requirements.txt b/requirements.txt index f3b121c..334fd65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,34 @@ xlsxwriter # selenium # undetected_chromedriver playwright -aioconsole \ No newline at end of file +aioconsole + +""" Call +EMAIL_SENDER=bot@partsltd.co.uk \ +EMAIL_PASSWORD=@sJt1b5Cliu6 \ +EMAIL_RECIPIENT=teddy@shuffleandskirmish.co.uk \ +python3 product_scraper.py + + + +One-time setup (edit credentials first): + + +# 1. Fill in your email details in the service file +nano product_scraping/tcg-scraper.service + +# 2. Copy to systemd and enable +sudo cp product_scraping/tcg-scraper.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable tcg-scraper # auto-start on boot +sudo systemctl start tcg-scraper +Day-to-day controls: + + +sudo systemctl stop tcg-scraper # stop immediately +sudo systemctl start tcg-scraper # start again +sudo systemctl restart tcg-scraper # restart (e.g. after editing the .py) +sudo systemctl status tcg-scraper # is it running? last exit code? +journalctl -fu tcg-scraper # live log tail (Ctrl+C to exit) +journalctl -u tcg-scraper --since today # today's log +""" \ No newline at end of file