import pandas as pd from openpyxl import load_workbook, Workbook from openpyxl.worksheet.worksheet import Worksheet import requests from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service import re import time import random # import undetected_chromedriver as uc from undetected_chromedriver import Chrome class Cost_Fetcher_Base: PRODUCT_WORKSHEET_NAME = 'Product' SOURCING_WORKSHEET_NAME = 'Sourcing' WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' driver: Chrome # webdriver.Chrome eur_to_gbp_rate: float index_column_active_sourcing: int index_column_is_booster_product: int index_column_is_booster_box_product: int index_column_is_precon_product: int index_column_link_sourcing: int index_column_name_sourcing: int index_column_product_id_product: int index_column_product_id_sourcing: int index_column_unit_cost_sourcing: int index_row_header_product: int index_row_header_sourcing: int product_sheet: Worksheet sourcing_sheet: Worksheet wait: WebDriverWait workbook: Workbook @staticmethod def parse_cost(cost_text): if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) try: return float(cost_clean) / 100 except ValueError: return None @classmethod def parse_cost_from_pennies(cls, cost_text): if not cost_text: return None cost_clean = cls.parse_cost(cost_text = cost_text) if cost_clean is not None: cost_clean = cost_clean / 100 return cost_clean @classmethod def parse_cost_chaoscards(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_cardmarket(cls, cost_text): # return cls.parse_cost(cost_text = cost_text) """Convert '141,30 €' format to float in EUR""" if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) cost_clean = cost_clean.replace(',', '.') try: return float(cost_clean) except ValueError: return None @classmethod def parse_cost_gameslore(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_magicmadhouse(cls, cost_text): return cls.parse_cost(cost_text = cost_text) def get_eur_to_gbp_rate(self): try: response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) data = response.json() self.eur_to_gbp_rate = data['rates']['GBP'] except Exception as e: print(f"Error fetching exchange rate: {e}") print("Using fallback rate: 0.85") self.eur_to_gbp_rate = 0.85 def setup_driver(self): print("Starting driver") """ chrome_options = Options() # Remove headless mode to see the browser # chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') chrome_options.add_argument('--window-size=1920,1080') """ try: self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options) # return driver except Exception as e: print(f"Error setting up Chrome driver: {e}") print("Make sure Chrome and chromedriver are installed") # return None self.wait = WebDriverWait(self.driver, 15) def scrape_cost_and_active_selenium(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): try: print(f" Loading page...") # time.sleep(random.uniform(6, 10)) try: self.driver.get(url) element = self.wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) ) element = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) ) except Exception as e: self.driver.get(url) element = self.wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) ) element = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) ) max_attempts = 10 for attempt in range(max_attempts): try: element = None element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector) text = element.text print(f"✓ Element loaded successfully on attempt {attempt + 1}") # return True break except StaleElementReferenceException: print(f"Stale element on attempt {attempt + 1}, retrying...") if attempt < max_attempts - 1: time.sleep(1) else: raise ValueError("StaleElementReferenceException") print(f" Page title: {self.driver.title}") cost = None element = None counter = 0 while cost is None: counter += 1 try: element = self.driver.find_element(By.CSS_SELECTOR, cost_selector) text = element.text print(f" Text: '{text}'") cost = text except Exception as e: print(f" Selector failed: {e}") cost = None time.sleep(random.uniform(2, 4)) if counter > 10: print("10 cost selector fails") break active = None if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []: active = (cost is not None) else: try: elements = None elements = self.driver.find_elements(By.CSS_SELECTOR, active_selector) if len(elements) == 0: active = True else: text = elements[0].text print(f" Text: '{text}'") active = (invalid_active_statuses is None or text not in invalid_active_statuses) except Exception as e: print(f" Selector failed: {e}") if cost is None or active is None: print(f" ✗ No cost found") print(f"Cost: {cost}, Active: {active}") input("Press Enter to continue to next URL...") return cost, active except Exception as e: print(f" Error: {e}") input("Press Enter to continue to next URL...") return None, None def scrape_cost_and_active_selenium_cardmarket(self, url): page_load_element_selector = "body > main.container > div.page-title-container" cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' cost_text, active = self.scrape_cost_and_active_selenium( url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = None , invalid_active_statuses = [] ) cost = Cost_Fetcher_Base.parse_cost_cardmarket(cost_text) if cost is not None: item_shipping_cost_in = 0 if cost < 10: item_shipping_cost_in = 2 elif cost < 100: item_shipping_cost_in = 8 else: item_shipping_cost_in = 20 cost = cost * self.eur_to_gbp_rate + item_shipping_cost_in active = (cost is not None) return cost, active def scrape_cost_and_active_selenium_chaoscards(self, url): # page_load_element_selector = '#prod_title' cost_selector = '.price_inc > span:nth-child(2)' active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' cost_text, active = self.scrape_cost_and_active_selenium( url = url , page_load_element_selector = cost_selector # page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["Out of stock", "Coming soon"] ) cost = Cost_Fetcher_Base.parse_cost_chaoscards(cost_text) return cost, active def scrape_cost_and_active_selenium_gameslore(self, url): # page_load_element_selector = '.page-title' cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' active_selector = '.stock > span:nth-child(1)' cost_text, active = self.scrape_cost_and_active_selenium( url = url , page_load_element_selector = cost_selector # page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["OUT OF STOCK"] ) cost = Cost_Fetcher_Base.parse_cost_gameslore(cost_text) return cost, active def scrape_cost_and_active_selenium_magicmadhouse(self, url): page_load_element_selector = '.productView-title' cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' active_selector = '.alertBox.alertBox--error' cost_text, active = self.scrape_cost_and_active_selenium( url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = [] ) cost = Cost_Fetcher_Base.parse_cost_magicmadhouse(cost_text) return cost, active def scrape_prices_and_quantities_selenium_cardmarket(self, url): offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' quantity_selector = 'div.amount-container > span:nth-child(1)' try: print(f" Loading page...") # time.sleep(random.uniform(6, 10)) try: self.driver.get(url) element = self.wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector)) ) element = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector)) ) except Exception as e: self.driver.get(url) element = self.wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector)) ) element = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector)) ) max_attempts = 10 for attempt in range(max_attempts): try: element = None element = self.driver.find_element(By.CSS_SELECTOR, offer_container_selector) text = element.text print(f"✓ Element loaded successfully on attempt {attempt + 1}") # return True break except StaleElementReferenceException: print(f"Stale element on attempt {attempt + 1}, retrying...") if attempt < max_attempts - 1: time.sleep(1) else: raise ValueError("StaleElementReferenceException") print(f" Page title: {self.driver.title}") price_quantity_pairs = [] try: offer_containers = self.driver.find_elements(By.CSS_SELECTOR, offer_container_selector) print(f" Offer container selector: Found {len(offer_containers)} elements") for offer_container in offer_containers: price_element = offer_container.find_element(By.CSS_SELECTOR, price_selector) price_text = price_element.text if '€' in price_text and re.search(r'\d', price_text): print(f" ✓ Found price: {price_text}") else: price_text = None quantity_element = offer_container.find_element(By.CSS_SELECTOR, quantity_selector) quantity_text = quantity_element.text if price_text is None or quantity_text is None: continue price_quantity_pairs.append({ 'price': Cost_Fetcher_Base.parse_cost_cardmarket(price_text = price_text) , 'quantity': Cost_Fetcher_Base.parse_cost_cardmarket(quantity_text = quantity_text) }) except Exception as e: print(f" Price selector failed: {e}") return [] finally: return price_quantity_pairs def load_tcg_sole_trader_workbook(self): print("Loading workbook...") self.workbook = load_workbook(Cost_Fetcher_Base.WORKBOOK_NAME) if Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME}' not found") return if Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME}' not found") return self.sourcing_sheet = self.workbook[Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME] self.product_sheet = self.workbook[Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME] sourcing_table_found = False for row in range(1, self.sourcing_sheet.max_row + 1): if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found or not self.index_row_header_sourcing: for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found: print("Error: Could not find table 'tbl_Sourcing'") return product_table_found = False for row in range(1, self.product_sheet.max_row + 1): if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): self.index_row_header_product = row product_table_found = True break if not product_table_found: print("Error: Could not find table 'tbl_Product'") return for index_column in range(1, self.sourcing_sheet.max_column + 1): header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() if 'Source Name' == header: self.index_column_name_sourcing = index_column elif 'Source Link' == header: self.index_column_link_sourcing = index_column elif 'Source Unit Cost' == header: self.index_column_unit_cost_sourcing = index_column elif 'Active' == header: self.index_column_active_sourcing = index_column elif 'Product Id' == header: self.index_column_product_id_sourcing = index_column for index_column in range(1, self.product_sheet.max_column + 1): header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() if 'Is Booster Box' == header: self.index_column_is_booster_box_product = index_column elif 'Is Booster' == header: self.index_column_is_booster_product = index_column elif 'Is Precon' == header: self.index_column_is_precon_product = index_column elif 'Product Id' == header: self.index_column_product_id_product = index_column print(f"Sourcing max row: {self.sourcing_sheet.max_row}") print(f"Sourcing header row: {self.index_row_header_sourcing}") print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") print(f"Product max row: {self.product_sheet.max_row}") print(f"Product header row: {self.index_row_header_product}") print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") if not all([ self.index_column_name_sourcing , self.index_column_link_sourcing , self.index_column_unit_cost_sourcing , self.index_column_product_id_sourcing , self.index_column_active_sourcing , self.index_column_product_id_product , self.index_column_is_booster_product , self.index_column_is_booster_box_product , self.index_column_is_precon_product ]): print("Error: Could not find required columns") return def scrape_all_costs(self): try: processed_count = 0 updated_count = 0 cardmarket_accessed_last_on = 0 chaoscards_accessed_last_on = 0 gameslore_accessed_last_on = 0 magicmadhouse_accessed_last_on = 0 did_restart_since_last_chaos_cards_visit = True did_restart_since_last_games_lore_visit = True for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): # print(f"index_row: {index_row}") # print(f"{self.sourcing_sheet.cell(index_row, 1).value}, {self.sourcing_sheet.cell(index_row, 2).value}, {self.sourcing_sheet.cell(index_row, 3).value}, {self.sourcing_sheet.cell(index_row, 4).value}, {self.sourcing_sheet.cell(index_row, 5).value}, {self.sourcing_sheet.cell(index_row, 6).value}, {self.sourcing_sheet.cell(index_row, 7).value}, {self.sourcing_sheet.cell(index_row, 8).value}, {self.sourcing_sheet.cell(index_row, 9).value}, {self.sourcing_sheet.cell(index_row, 10).value}, {self.sourcing_sheet.cell(index_row, 11).value}, {self.sourcing_sheet.cell(index_row, 12).value}, {self.sourcing_sheet.cell(index_row, 13).value}, {self.sourcing_sheet.cell(index_row, 14).value}, {self.sourcing_sheet.cell(index_row, 15).value}, {self.sourcing_sheet.cell(index_row, 16).value}, {self.sourcing_sheet.cell(index_row, 17).value}, {self.sourcing_sheet.cell(index_row, 18).value}, {self.sourcing_sheet.cell(index_row, 19).value}") source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value if not source_name or not source_link: # or not str(source_link).strip(): continue print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") product_is_booster = False for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value # print(f"found product: id {product_id}") if product_id == source_product_id: product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() # print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}") product_is_booster = (product_is_booster_text == "TRUE") break print(f"product is booster: {product_is_booster}") if ( ( source_name == "Chaos Cards" and not did_restart_since_last_chaos_cards_visit ) or ( source_name == "Games Lore" and not did_restart_since_last_games_lore_visit ) ): self.stop_driver() self.setup_driver() if not self.driver: return did_restart_since_last_chaos_cards_visit = True did_restart_since_last_games_lore_visit = True if source_name in ["Card Market", "Chaos Cards", "Games Lore", "Magic Madhouse"]: self.clear_row_sourcing_sheet(index_row = index_row) processed_count += 1 Cost_Fetcher_Base.log_processing_new_row( index_row = index_row , source_link = source_link ) cost = None active = None if source_name == "Card Market": while (time.time() - cardmarket_accessed_last_on < random.uniform(10, 20)): time.sleep(random.uniform(3, 5)) if product_is_booster: price_quantity_pairs = self.scrape_prices_and_quantities_selenium_cardmarket(url = source_link) if price_quantity_pairs: self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" max_quantity = 0 updated_row_price = False for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") max_quantity = max(max_quantity, quantity) if quantity >= 8: if eur_price: gbp_price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}") self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price updated_count += 1 updated_row_price = True print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}") break else: print(f" Error: Could not parse price") if not updated_row_price: print("Offer with quantity >= 8 not found") for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") if max_quantity <= 2 or quantity == max_quantity: if eur_price: gbp_price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}") self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price updated_count += 1 updated_row_price = True print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}") break else: print(f" Error: Could not parse price") else: cost, active = self.scrape_cost_and_active_selenium_cardmarket(url = source_link) cardmarket_accessed_last_on = time.time() elif source_name == "Chaos Cards": while (time.time() - chaoscards_accessed_last_on < random.uniform(20, 30)): time.sleep(random.uniform(3, 5)) cost, active = self.scrape_cost_and_active_selenium_chaoscards(url = source_link) chaoscards_accessed_last_on = time.time() did_restart_since_last_chaos_cards_visit = False elif source_name == "Games Lore": while (time.time() - gameslore_accessed_last_on < random.uniform(10, 20)): time.sleep(random.uniform(3, 5)) cost, active = self.scrape_cost_and_active_selenium_gameslore(url = source_link) gameslore_accessed_last_on = time.time() did_restart_since_last_games_lore_visit = False elif source_name == "Magic Madhouse": while (time.time() - magicmadhouse_accessed_last_on < random.uniform(10, 20)): time.sleep(random.uniform(3, 5)) cost, active = self.scrape_cost_and_active_selenium_magicmadhouse(url = source_link) magicmadhouse_accessed_last_on = time.time() if (cost is not None and active is not None): print(f" Found cost: {cost}, active: {active}") self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = cost self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" updated_count += 1 else: print(f" Error: Could not find cost on page") # Save workbook print(f"\n{'='*60}") print(f"Saving workbook...") self.workbook.save(Cost_Fetcher_Base.WORKBOOK_NAME) print(f"\nComplete!") print(f"Processed: {processed_count} entries") print(f"Updated: {updated_count} costs") except Exception as e: print(f"Error: {e}") def clear_row_sourcing_sheet(self, index_row): self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" @staticmethod def log_processing_new_row(index_row, source_link): print(f"\n{'='*60}") print(f"Processing row {index_row}: {source_link}") print(f"{'='*60}") def __init__(self): print("Setting up browser automation (browser will not be visible)...") self.setup_driver() if not self.driver: return self.load_tcg_sole_trader_workbook() self.get_eur_to_gbp_rate() def stop_driver(self): self.driver.quit() def main(): cost_fetcher = Cost_Fetcher_Base() cost_fetcher.scrape_all_costs() cost_fetcher.stop_driver() if __name__ == "__main__": main()