diff --git a/product_scraping/.~lock.TCG Sole Trader Copy.xlsx# b/product_scraping/.~lock.TCG Sole Trader Copy.xlsx# deleted file mode 100644 index dfc9a02..0000000 --- a/product_scraping/.~lock.TCG Sole Trader Copy.xlsx# +++ /dev/null @@ -1 +0,0 @@ -,teddy,lord-T-1024,09.01.2026 15:38,file:///home/teddy/.config/libreoffice/4; \ No newline at end of file diff --git a/product_scraping/TCG Sole Trader Copy.xlsx b/product_scraping/TCG Sole Trader Copy.xlsx index 3f5e57e..9604431 100644 Binary files a/product_scraping/TCG Sole Trader Copy.xlsx and b/product_scraping/TCG Sole Trader Copy.xlsx differ diff --git a/product_scraping/cost_fetcher_base.py b/product_scraping/cost_fetcher_base.py deleted file mode 100644 index 54be02e..0000000 --- a/product_scraping/cost_fetcher_base.py +++ /dev/null @@ -1,593 +0,0 @@ -import pandas as pd -from openpyxl import load_workbook, Workbook -from openpyxl.worksheet.worksheet import Worksheet -import requests -from selenium import webdriver -from selenium.common.exceptions import StaleElementReferenceException -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.chrome.service import Service -import re -import time -import random - -# import undetected_chromedriver as uc -from undetected_chromedriver import Chrome - - -class Cost_Fetcher_Base: - PRODUCT_WORKSHEET_NAME = 'Product' - SOURCING_WORKSHEET_NAME = 'Sourcing' - WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' - - driver: Chrome # webdriver.Chrome - eur_to_gbp_rate: float - index_column_active_sourcing: int - index_column_is_booster_product: int - index_column_is_booster_box_product: int - index_column_is_precon_product: int - index_column_link_sourcing: int - index_column_name_sourcing: int - index_column_product_id_product: int - index_column_product_id_sourcing: int - index_column_unit_cost_sourcing: int - index_row_header_product: int - index_row_header_sourcing: int - product_sheet: Worksheet - sourcing_sheet: Worksheet - wait: WebDriverWait - workbook: Workbook - - @staticmethod - def parse_cost(cost_text): - if not cost_text: - return None - cost_clean = re.sub(r'[^\d,]', '', cost_text) - try: - return float(cost_clean) / 100 - except ValueError: - return None - @classmethod - def parse_cost_from_pennies(cls, cost_text): - if not cost_text: - return None - cost_clean = cls.parse_cost(cost_text = cost_text) - if cost_clean is not None: - cost_clean = cost_clean / 100 - return cost_clean - @classmethod - def parse_cost_chaoscards(cls, cost_text): - return cls.parse_cost(cost_text = cost_text) - @classmethod - def parse_cost_cardmarket(cls, cost_text): - # return cls.parse_cost(cost_text = cost_text) - """Convert '141,30 €' format to float in EUR""" - if not cost_text: - return None - cost_clean = re.sub(r'[^\d,]', '', cost_text) - cost_clean = cost_clean.replace(',', '.') - try: - return float(cost_clean) - except ValueError: - return None - @classmethod - def parse_cost_gameslore(cls, cost_text): - return cls.parse_cost(cost_text = cost_text) - @classmethod - def parse_cost_magicmadhouse(cls, cost_text): - return cls.parse_cost(cost_text = cost_text) - - def get_eur_to_gbp_rate(self): - try: - response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) - data = response.json() - self.eur_to_gbp_rate = data['rates']['GBP'] - except Exception as e: - print(f"Error fetching exchange rate: {e}") - print("Using fallback rate: 0.85") - self.eur_to_gbp_rate = 0.85 - - def setup_driver(self): - print("Starting driver") - """ - chrome_options = Options() - # Remove headless mode to see the browser - # chrome_options.add_argument('--headless') - chrome_options.add_argument('--no-sandbox') - chrome_options.add_argument('--disable-dev-shm-usage') - chrome_options.add_argument('--disable-blink-features=AutomationControlled') - chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') - chrome_options.add_argument('--window-size=1920,1080') - """ - try: - self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options) - # return driver - except Exception as e: - print(f"Error setting up Chrome driver: {e}") - print("Make sure Chrome and chromedriver are installed") - # return None - self.wait = WebDriverWait(self.driver, 15) - - def scrape_cost_and_active_selenium(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses): - try: - print(f" Loading page...") - # time.sleep(random.uniform(6, 10)) - try: - self.driver.get(url) - element = self.wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) - ) - element = self.wait.until( - EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) - ) - except Exception as e: - self.driver.get(url) - element = self.wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) - ) - element = self.wait.until( - EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) - ) - - max_attempts = 10 - for attempt in range(max_attempts): - try: - element = None - element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector) - text = element.text - print(f"✓ Element loaded successfully on attempt {attempt + 1}") - # return True - break - except StaleElementReferenceException: - print(f"Stale element on attempt {attempt + 1}, retrying...") - if attempt < max_attempts - 1: - time.sleep(1) - else: - raise ValueError("StaleElementReferenceException") - - print(f" Page title: {self.driver.title}") - - cost = None - element = None - counter = 0 - while cost is None: - counter += 1 - try: - element = self.driver.find_element(By.CSS_SELECTOR, cost_selector) - text = element.text - print(f" Text: '{text}'") - cost = text - except Exception as e: - print(f" Selector failed: {e}") - cost = None - time.sleep(random.uniform(2, 4)) - if counter > 10: - print("10 cost selector fails") - break - - active = None - if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []: - active = (cost is not None) - else: - try: - elements = None - elements = self.driver.find_elements(By.CSS_SELECTOR, active_selector) - if len(elements) == 0: - active = True - else: - text = elements[0].text - print(f" Text: '{text}'") - active = (invalid_active_statuses is None or text not in invalid_active_statuses) - except Exception as e: - print(f" Selector failed: {e}") - - if cost is None or active is None: - print(f" ✗ No cost found") - print(f"Cost: {cost}, Active: {active}") - input("Press Enter to continue to next URL...") - return cost, active - - except Exception as e: - print(f" Error: {e}") - input("Press Enter to continue to next URL...") - return None, None - - def scrape_cost_and_active_selenium_cardmarket(self, url): - page_load_element_selector = "body > main.container > div.page-title-container" - cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' - cost_text, active = self.scrape_cost_and_active_selenium( - url = url - , page_load_element_selector = page_load_element_selector - , cost_selector = cost_selector - , active_selector = None - , invalid_active_statuses = [] - ) - cost = Cost_Fetcher_Base.parse_cost_cardmarket(cost_text) - if cost is not None: - item_shipping_cost_in = 0 - if cost < 10: - item_shipping_cost_in = 2 - elif cost < 100: - item_shipping_cost_in = 8 - else: - item_shipping_cost_in = 20 - cost = cost * self.eur_to_gbp_rate + item_shipping_cost_in - active = (cost is not None) - return cost, active - - def scrape_cost_and_active_selenium_chaoscards(self, url): - # page_load_element_selector = '#prod_title' - cost_selector = '.price_inc > span:nth-child(2)' - active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' - cost_text, active = self.scrape_cost_and_active_selenium( - url = url - , page_load_element_selector = cost_selector # page_load_element_selector - , cost_selector = cost_selector - , active_selector = active_selector - , invalid_active_statuses = ["Out of stock", "Coming soon"] - ) - cost = Cost_Fetcher_Base.parse_cost_chaoscards(cost_text) - return cost, active - - def scrape_cost_and_active_selenium_gameslore(self, url): - # page_load_element_selector = '.page-title' - cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' - active_selector = '.stock > span:nth-child(1)' - cost_text, active = self.scrape_cost_and_active_selenium( - url = url - , page_load_element_selector = cost_selector # page_load_element_selector - , cost_selector = cost_selector - , active_selector = active_selector - , invalid_active_statuses = ["OUT OF STOCK"] - ) - cost = Cost_Fetcher_Base.parse_cost_gameslore(cost_text) - return cost, active - - def scrape_cost_and_active_selenium_magicmadhouse(self, url): - page_load_element_selector = '.productView-title' - cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' - active_selector = '.alertBox.alertBox--error' - cost_text, active = self.scrape_cost_and_active_selenium( - url = url - , page_load_element_selector = page_load_element_selector - , cost_selector = cost_selector - , active_selector = active_selector - , invalid_active_statuses = [] - ) - cost = Cost_Fetcher_Base.parse_cost_magicmadhouse(cost_text) - return cost, active - - def scrape_prices_and_quantities_selenium_cardmarket(self, url): - offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' - price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' - quantity_selector = 'div.amount-container > span:nth-child(1)' - - try: - print(f" Loading page...") - # time.sleep(random.uniform(6, 10)) - try: - self.driver.get(url) - element = self.wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector)) - ) - element = self.wait.until( - EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector)) - ) - except Exception as e: - self.driver.get(url) - element = self.wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector)) - ) - element = self.wait.until( - EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector)) - ) - - max_attempts = 10 - for attempt in range(max_attempts): - try: - element = None - element = self.driver.find_element(By.CSS_SELECTOR, offer_container_selector) - text = element.text - print(f"✓ Element loaded successfully on attempt {attempt + 1}") - # return True - break - except StaleElementReferenceException: - print(f"Stale element on attempt {attempt + 1}, retrying...") - if attempt < max_attempts - 1: - time.sleep(1) - else: - raise ValueError("StaleElementReferenceException") - - print(f" Page title: {self.driver.title}") - - price_quantity_pairs = [] - try: - offer_containers = self.driver.find_elements(By.CSS_SELECTOR, offer_container_selector) - print(f" Offer container selector: Found {len(offer_containers)} elements") - for offer_container in offer_containers: - price_element = offer_container.find_element(By.CSS_SELECTOR, price_selector) - price_text = price_element.text - if '€' in price_text and re.search(r'\d', price_text): - print(f" ✓ Found price: {price_text}") - else: - price_text = None - - quantity_element = offer_container.find_element(By.CSS_SELECTOR, quantity_selector) - quantity_text = quantity_element.text - - if price_text is None or quantity_text is None: - continue - price_quantity_pairs.append({ - 'price': Cost_Fetcher_Base.parse_cost_cardmarket(price_text = price_text) - , 'quantity': Cost_Fetcher_Base.parse_cost_cardmarket(quantity_text = quantity_text) - }) - except Exception as e: - print(f" Price selector failed: {e}") - return [] - finally: - return price_quantity_pairs - - def load_tcg_sole_trader_workbook(self): - print("Loading workbook...") - self.workbook = load_workbook(Cost_Fetcher_Base.WORKBOOK_NAME) - - if Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: - print(f"Error: Sheet '{Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME}' not found") - return - if Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: - print(f"Error: Sheet '{Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME}' not found") - return - - self.sourcing_sheet = self.workbook[Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME] - self.product_sheet = self.workbook[Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME] - - sourcing_table_found = False - for row in range(1, self.sourcing_sheet.max_row + 1): - if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): - self.index_row_header_sourcing = row - sourcing_table_found = True - break - - if not sourcing_table_found or not self.index_row_header_sourcing: - for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): - if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): - self.index_row_header_sourcing = row - sourcing_table_found = True - break - - if not sourcing_table_found: - print("Error: Could not find table 'tbl_Sourcing'") - return - - product_table_found = False - for row in range(1, self.product_sheet.max_row + 1): - if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): - self.index_row_header_product = row - product_table_found = True - break - - if not product_table_found: - print("Error: Could not find table 'tbl_Product'") - return - - for index_column in range(1, self.sourcing_sheet.max_column + 1): - header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() - if 'Source Name' == header: - self.index_column_name_sourcing = index_column - elif 'Source Link' == header: - self.index_column_link_sourcing = index_column - elif 'Source Unit Cost' == header: - self.index_column_unit_cost_sourcing = index_column - elif 'Active' == header: - self.index_column_active_sourcing = index_column - elif 'Product Id' == header: - self.index_column_product_id_sourcing = index_column - - for index_column in range(1, self.product_sheet.max_column + 1): - header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() - if 'Is Booster Box' == header: - self.index_column_is_booster_box_product = index_column - elif 'Is Booster' == header: - self.index_column_is_booster_product = index_column - elif 'Is Precon' == header: - self.index_column_is_precon_product = index_column - elif 'Product Id' == header: - self.index_column_product_id_product = index_column - - print(f"Sourcing max row: {self.sourcing_sheet.max_row}") - print(f"Sourcing header row: {self.index_row_header_sourcing}") - print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") - print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") - print(f"Product max row: {self.product_sheet.max_row}") - print(f"Product header row: {self.index_row_header_product}") - print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") - print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") - - if not all([ - self.index_column_name_sourcing - , self.index_column_link_sourcing - , self.index_column_unit_cost_sourcing - , self.index_column_product_id_sourcing - , self.index_column_active_sourcing - , self.index_column_product_id_product - , self.index_column_is_booster_product - , self.index_column_is_booster_box_product - , self.index_column_is_precon_product - ]): - print("Error: Could not find required columns") - return - - def scrape_all_costs(self): - try: - processed_count = 0 - updated_count = 0 - cardmarket_accessed_last_on = 0 - chaoscards_accessed_last_on = 0 - gameslore_accessed_last_on = 0 - magicmadhouse_accessed_last_on = 0 - did_restart_since_last_chaos_cards_visit = True - did_restart_since_last_games_lore_visit = True - for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): - # print(f"index_row: {index_row}") - # print(f"{self.sourcing_sheet.cell(index_row, 1).value}, {self.sourcing_sheet.cell(index_row, 2).value}, {self.sourcing_sheet.cell(index_row, 3).value}, {self.sourcing_sheet.cell(index_row, 4).value}, {self.sourcing_sheet.cell(index_row, 5).value}, {self.sourcing_sheet.cell(index_row, 6).value}, {self.sourcing_sheet.cell(index_row, 7).value}, {self.sourcing_sheet.cell(index_row, 8).value}, {self.sourcing_sheet.cell(index_row, 9).value}, {self.sourcing_sheet.cell(index_row, 10).value}, {self.sourcing_sheet.cell(index_row, 11).value}, {self.sourcing_sheet.cell(index_row, 12).value}, {self.sourcing_sheet.cell(index_row, 13).value}, {self.sourcing_sheet.cell(index_row, 14).value}, {self.sourcing_sheet.cell(index_row, 15).value}, {self.sourcing_sheet.cell(index_row, 16).value}, {self.sourcing_sheet.cell(index_row, 17).value}, {self.sourcing_sheet.cell(index_row, 18).value}, {self.sourcing_sheet.cell(index_row, 19).value}") - source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value - source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value - source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value - - if not source_name or not source_link: # or not str(source_link).strip(): - continue - - print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") - - product_is_booster = False - for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): - product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value - # print(f"found product: id {product_id}") - if product_id == source_product_id: - product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() - # print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}") - product_is_booster = (product_is_booster_text == "TRUE") - break - print(f"product is booster: {product_is_booster}") - - if ( - ( - source_name == "Chaos Cards" - and not did_restart_since_last_chaos_cards_visit - ) - or ( - source_name == "Games Lore" - and not did_restart_since_last_games_lore_visit - ) - ): - self.stop_driver() - self.setup_driver() - if not self.driver: - return - did_restart_since_last_chaos_cards_visit = True - did_restart_since_last_games_lore_visit = True - - if source_name in ["Card Market", "Chaos Cards", "Games Lore", "Magic Madhouse"]: - self.clear_row_sourcing_sheet(index_row = index_row) - processed_count += 1 - Cost_Fetcher_Base.log_processing_new_row( - index_row = index_row - , source_link = source_link - ) - - cost = None - active = None - if source_name == "Card Market": - while (time.time() - cardmarket_accessed_last_on < random.uniform(10, 20)): - time.sleep(random.uniform(3, 5)) - if product_is_booster: - price_quantity_pairs = self.scrape_prices_and_quantities_selenium_cardmarket(url = source_link) - if price_quantity_pairs: - self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" - max_quantity = 0 - updated_row_price = False - for price_quantity_pair in price_quantity_pairs: - eur_price = price_quantity_pair['price'] - quantity = price_quantity_pair['quantity'] - print(f" Found price: €{eur_price}") - print(f" Found quantity: {quantity}") - max_quantity = max(max_quantity, quantity) - if quantity >= 8: - if eur_price: - gbp_price = eur_price * self.eur_to_gbp_rate - print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}") - self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price - updated_count += 1 - updated_row_price = True - print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}") - break - else: - print(f" Error: Could not parse price") - if not updated_row_price: - print("Offer with quantity >= 8 not found") - for price_quantity_pair in price_quantity_pairs: - eur_price = price_quantity_pair['price'] - quantity = price_quantity_pair['quantity'] - print(f" Found price: €{eur_price}") - print(f" Found quantity: {quantity}") - if max_quantity <= 2 or quantity == max_quantity: - if eur_price: - gbp_price = eur_price * self.eur_to_gbp_rate - print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}") - self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price - updated_count += 1 - updated_row_price = True - print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}") - break - else: - print(f" Error: Could not parse price") - else: - cost, active = self.scrape_cost_and_active_selenium_cardmarket(url = source_link) - cardmarket_accessed_last_on = time.time() - elif source_name == "Chaos Cards": - while (time.time() - chaoscards_accessed_last_on < random.uniform(20, 30)): - time.sleep(random.uniform(3, 5)) - cost, active = self.scrape_cost_and_active_selenium_chaoscards(url = source_link) - chaoscards_accessed_last_on = time.time() - did_restart_since_last_chaos_cards_visit = False - elif source_name == "Games Lore": - while (time.time() - gameslore_accessed_last_on < random.uniform(10, 20)): - time.sleep(random.uniform(3, 5)) - cost, active = self.scrape_cost_and_active_selenium_gameslore(url = source_link) - gameslore_accessed_last_on = time.time() - did_restart_since_last_games_lore_visit = False - elif source_name == "Magic Madhouse": - while (time.time() - magicmadhouse_accessed_last_on < random.uniform(10, 20)): - time.sleep(random.uniform(3, 5)) - cost, active = self.scrape_cost_and_active_selenium_magicmadhouse(url = source_link) - magicmadhouse_accessed_last_on = time.time() - - if (cost is not None and active is not None): - print(f" Found cost: {cost}, active: {active}") - - self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = cost - self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" - updated_count += 1 - else: - print(f" Error: Could not find cost on page") - # Save workbook - print(f"\n{'='*60}") - print(f"Saving workbook...") - self.workbook.save(Cost_Fetcher_Base.WORKBOOK_NAME) - - print(f"\nComplete!") - print(f"Processed: {processed_count} entries") - print(f"Updated: {updated_count} costs") - except Exception as e: - print(f"Error: {e}") - - def clear_row_sourcing_sheet(self, index_row): - self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None - self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" - - @staticmethod - def log_processing_new_row(index_row, source_link): - print(f"\n{'='*60}") - print(f"Processing row {index_row}: {source_link}") - print(f"{'='*60}") - - def __init__(self): - print("Setting up browser automation (browser will not be visible)...") - self.setup_driver() - if not self.driver: - return - self.load_tcg_sole_trader_workbook() - self.get_eur_to_gbp_rate() - - def stop_driver(self): - self.driver.quit() - -def main(): - cost_fetcher = Cost_Fetcher_Base() - cost_fetcher.scrape_all_costs() - cost_fetcher.stop_driver() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/product_scraping/product_scraper.py b/product_scraping/product_scraper.py new file mode 100644 index 0000000..aabaf44 --- /dev/null +++ b/product_scraping/product_scraper.py @@ -0,0 +1,689 @@ +import pandas as pd +from openpyxl import load_workbook, Workbook +from openpyxl.worksheet.worksheet import Worksheet +import requests +""" +from selenium import webdriver +from selenium.common.exceptions import StaleElementReferenceException +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +""" +import re +import time +import random +from playwright.sync_api import sync_playwright, Browser, Page +# import playwright +# import undetected_chromedriver as uc +# from undetected_chromedriver import Chrome + +import asyncio +from collections import defaultdict +from datetime import datetime, timedelta + +class Product_Scraper: + browser: Browser # sync_playwright.chromium + domain: str + # driver: Chrome # webdriver.Chrome + page: Page + # wait: WebDriverWait + + def __init__(self, domain): + print("Setting up browser automation") + self.domain = domain + """ + self.setup_browser() + if not self.browser: + return + """ + + def stop_browser(self, min_delay = 0): + if (min_delay > 0): + time.sleep(random.uniform(min_delay, min_delay * 1.5)) + self.browser.close() + + @staticmethod + def parse_cost(cost_text): + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + try: + return float(cost_clean) / 100 + except ValueError: + return None + @classmethod + def parse_cost_from_pennies(cls, cost_text): + if not cost_text: + return None + cost_clean = cls.parse_cost(cost_text = cost_text) + if cost_clean is not None: + cost_clean = cost_clean / 100 + return cost_clean + @classmethod + def parse_cost_chaoscards(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_cardmarket(cls, cost_text): + # return cls.parse_cost(cost_text = cost_text) + """Convert '141,30 €' format to float in EUR""" + if not cost_text: + return None + cost_clean = re.sub(r'[^\d,]', '', cost_text) + cost_clean = cost_clean.replace(',', '.') + try: + return float(cost_clean) + except ValueError: + return None + @classmethod + def parse_cost_gameslore(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + @classmethod + def parse_cost_magicmadhouse(cls, cost_text): + return cls.parse_cost(cost_text = cost_text) + + """ + def setup_driver(self): + print("Starting driver") + " "" + chrome_options = Options() + # Remove headless mode to see the browser + # chrome_options.add_argument('--headless') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') + chrome_options.add_argument('--window-size=1920,1080') + " "" + try: + self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options) + # return driver + except Exception as e: + print(f"Error setting up Chrome driver: {e}") + print("Make sure Chrome and chromedriver are installed") + # return None + self.wait = WebDriverWait(self.driver, 15) + " "" + def setup_browser(self): + print("Starting browser") + with sync_playwright() as p: + self.browser = p.chromium.launch() + self.page = self.browser.new_page() + """ + def scrape_cost_and_active_playwright(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0): + print(f" Loading page...") + # time.sleep(random.uniform(6, 10)) + """ + try: + self.driver.get(url) + element = self.wait.until( + EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) + ) + element = self.wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) + ) + except Exception as e: + self.driver.get(url) + element = self.wait.until( + EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) + ) + element = self.wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) + ) + + max_attempts = 10 + for attempt in range(max_attempts): + try: + element = None + element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector) + text = element.text + print(f"✓ Element loaded successfully on attempt {attempt + 1}") + # return True + break + except StaleElementReferenceException: + print(f"Stale element on attempt {attempt + 1}, retrying...") + if attempt < max_attempts - 1: + time.sleep(1) + else: + raise ValueError("StaleElementReferenceException") + """ + with sync_playwright() as p: + self.browser = p.chromium.launch(headless=False) + self.page = self.browser.new_page() + self.page.goto(url = url) + try: + # Automatically waits up to 30s by default + element = self.page.locator(selector = page_load_element_selector) + print(f" Page title: {self.page.title()}") + + element = self.page.locator(selector = cost_selector) + text = element.text_content() + print(f" Text: '{text}'") + cost = text + + active = None + if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []: + active = (cost is not None) + else: + try: + elements = self.page.query_selector_all(selector = cost_selector) + if len(elements) == 0: + active = True + else: + text = elements[0].text_content() + print(f" Text: '{text}'") + active = (invalid_active_statuses is None or text not in invalid_active_statuses) + except Exception as e: + print(f" Selector failed: {e}") + + if cost is None or active is None: + print(f" ✗ No cost found") + print(f"Cost: {cost}, Active: {active}") + input("Press Enter to continue to next URL...") + + except Exception as e: + print(f" Error: {e}") + input("Press Enter to continue to next URL...") + self.stop_browser(min_delay = min_delay) + return None, None + finally: + self.stop_browser(min_delay = min_delay) + return cost, active + + def scrape_cost_and_active_playwright_cardmarket(self, url, eur_to_gbp_rate): + page_load_element_selector = "body > main.container > div.page-title-container" + cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' + cost_text, active = self.scrape_cost_and_active_playwright( + url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = None + , invalid_active_statuses = [] + , min_delay = 15 + ) + cost = Product_Scraper.parse_cost_cardmarket(cost_text) + if cost is not None: + item_shipping_cost_in = 0 + if cost < 10: + item_shipping_cost_in = 2 + elif cost < 100: + item_shipping_cost_in = 8 + else: + item_shipping_cost_in = 20 + cost = cost * eur_to_gbp_rate + item_shipping_cost_in + active = (cost is not None) + return cost, active + + def scrape_cost_and_active_playwright_chaoscards(self, url): + # page_load_element_selector = '#prod_title' + cost_selector = '.price_inc > span:nth-child(2)' + active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' + cost_text, active = self.scrape_cost_and_active_playwright( + url = url + , page_load_element_selector = cost_selector # page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["Out of stock", "Coming soon"] + , min_delay = 15 + ) + cost = Product_Scraper.parse_cost_chaoscards(cost_text) + return cost, active + + def scrape_cost_and_active_playwright_gameslore(self, url): + # page_load_element_selector = '.page-title' + cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' + active_selector = '.stock > span:nth-child(1)' + cost_text, active = self.scrape_cost_and_active_playwright( + url = url + , page_load_element_selector = cost_selector # page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = ["OUT OF STOCK"] + ) + cost = Product_Scraper.parse_cost_gameslore(cost_text) + return cost, active + + def scrape_cost_and_active_playwright_magicmadhouse(self, url): + page_load_element_selector = '.productView-title' + cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' + active_selector = '.alertBox.alertBox--error' + cost_text, active = self.scrape_cost_and_active_playwright( + url = url + , page_load_element_selector = page_load_element_selector + , cost_selector = cost_selector + , active_selector = active_selector + , invalid_active_statuses = [] + ) + cost = Product_Scraper.parse_cost_magicmadhouse(cost_text) + return cost, active + + def scrape_prices_and_quantities_playwright_cardmarket(self, url, eur_to_gbp_rate): + offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' + price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' + quantity_selector = 'div.amount-container > span:nth-child(1)' + + print(f" Loading page...") + with sync_playwright() as p: + self.browser = p.chromium.launch(headless=False) + self.page = self.browser.new_page() + self.page.goto(url = url) + + try: + # Automatically waits up to 30s by default + print(f" Page title: {self.page.title()}") + + price_quantity_pairs = [] + try: + offer_containers = self.page.query_selector_all(offer_container_selector) + print(f" Offer container selector: Found {len(offer_containers)} elements") + for offer_container in offer_containers: + price_element = offer_container.query_selector(price_selector) + price_text = price_element.text_content() + if '€' in price_text and re.search(r'\d', price_text): + print(f" ✓ Found price: {price_text}") + else: + price_text = None + + quantity_element = offer_container.query_selector(quantity_selector) + quantity_text = quantity_element.text_content() + + if price_text is None or quantity_text is None: + continue + price_quantity_pairs.append({ + 'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate + , 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text) + }) + except Exception as e: + print(f" Price selector failed: {e}") + input("Press enter to continue to next URL...") + self.stop_browser(min_delay = 15) + return [] + finally: + self.stop_browser(min_delay = 15) + return price_quantity_pairs + + +class TCG_Sole_Trader_Workbook_Container: + NAME_COLUMN_ACTIVE: str = 'Active' + NAME_COLUMN_INDEX_ROW: str = 'Index Row' + NAME_COLUMN_LINK: str = 'Link' + NAME_COLUMN_PRODUCT_ID: str = 'Product Id' + NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster' + NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box' + NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon' + NAME_COLUMN_SOURCE_NAME: str = 'Source Name' + NAME_COLUMN_UNIT_COST: str = 'Cost' + NAME_COLUMN_UNIT_PRICE: str = 'Price' + PRODUCT_WORKSHEET_NAME = 'Product' + SOURCING_WORKSHEET_NAME = 'Sourcing' + WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' + + index_column_active_sourcing: int + index_column_is_booster_product: int + index_column_is_booster_box_product: int + index_column_is_precon_product: int + index_column_link_sourcing: int + index_column_name_sourcing: int + index_column_product_id_product: int + index_column_product_id_sourcing: int + index_column_unit_cost_sourcing: int + index_column_unit_price_sourcing: int + index_row_header_product: int + index_row_header_sourcing: int + product_sheet: Worksheet + sourcing_sheet: Worksheet + workbook: Workbook + + def __init__(self): + print("Loading workbook...") + self.workbook = load_workbook(self.WORKBOOK_NAME) + + if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: + print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found") + return + if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: + print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found") + return + + self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME] + self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME] + + sourcing_table_found = False + for row in range(1, self.sourcing_sheet.max_row + 1): + if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): + self.index_row_header_sourcing = row + sourcing_table_found = True + break + + if not sourcing_table_found or not self.index_row_header_sourcing: + for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): + if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): + self.index_row_header_sourcing = row + sourcing_table_found = True + break + + if not sourcing_table_found: + print("Error: Could not find table 'tbl_Sourcing'") + return + + product_table_found = False + for row in range(1, self.product_sheet.max_row + 1): + if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): + self.index_row_header_product = row + product_table_found = True + break + + if not product_table_found: + print("Error: Could not find table 'tbl_Product'") + return + + for index_column in range(1, self.sourcing_sheet.max_column + 1): + header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() + if 'Source Name' == header: + self.index_column_name_sourcing = index_column + elif 'Source Link' == header: + self.index_column_link_sourcing = index_column + elif 'Source Unit Cost' == header: + self.index_column_unit_cost_sourcing = index_column + elif 'Sale Price' == header: + self.index_column_unit_price_sourcing = index_column + elif 'Active' == header: + self.index_column_active_sourcing = index_column + elif 'Product Id' == header: + self.index_column_product_id_sourcing = index_column + + for index_column in range(1, self.product_sheet.max_column + 1): + header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() + if 'Is Booster Box' == header: + self.index_column_is_booster_box_product = index_column + elif 'Is Booster' == header: + self.index_column_is_booster_product = index_column + elif 'Is Precon' == header: + self.index_column_is_precon_product = index_column + elif 'Product Id' == header: + self.index_column_product_id_product = index_column + + print(f"Sourcing max row: {self.sourcing_sheet.max_row}") + print(f"Sourcing header row: {self.index_row_header_sourcing}") + print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") + print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") + print(f"Product max row: {self.product_sheet.max_row}") + print(f"Product header row: {self.index_row_header_product}") + print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") + print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") + + if not all([ + self.index_column_name_sourcing + , self.index_column_link_sourcing + , self.index_column_unit_cost_sourcing + , self.index_column_unit_price_sourcing + , self.index_column_product_id_sourcing + , self.index_column_active_sourcing + , self.index_column_product_id_product + , self.index_column_is_booster_product + , self.index_column_is_booster_box_product + , self.index_column_is_precon_product + ]): + print("Error: Could not find required columns") + return + + @classmethod + def create_product_source_df(cls): + return pd.DataFrame(columns = [ + cls.NAME_COLUMN_INDEX_ROW + , cls.NAME_COLUMN_PRODUCT_ID + , cls.NAME_COLUMN_SOURCE_NAME + , cls.NAME_COLUMN_LINK + , cls.NAME_COLUMN_PRODUCT_IS_BOOSTER + , cls.NAME_COLUMN_UNIT_COST + , cls.NAME_COLUMN_UNIT_PRICE + , cls.NAME_COLUMN_ACTIVE + ]) + + def get_sourcing_entries(self): + product_sources = self.create_product_source_df() + try: + # products = [] + for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): + source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value + source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value + source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value + print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") + + if not source_name or not source_link: + continue + + product_is_booster = False + for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): + product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value + # print(f"found product: id {product_id}") + if product_id == source_product_id: + product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() + # print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}") + product_is_booster = (product_is_booster_text == "TRUE") + break + print(f"product is booster: {product_is_booster}") + + # products.append((index_row, source_product_id, source_name, source_link, product_is_booster)) + product_sources.loc[len(product_sources)] = [ + index_row + , source_product_id + , source_name + , source_link + , product_is_booster + , None # cost + , None # price + , None # active + ] + except Exception as e: + print(f"Error: {e}") + product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME) + return product_sources + + def clear_row_sourcing_sheet(self, index_row): + self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None + self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" + + def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None): + if unit_cost is not None: + self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost + if unit_price is not None: + self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price + if active is not None: + self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" + + def save_workbook(self): + print(f"\n{'='*60}") + print(f"Saving workbook...") + self.workbook.save(self.WORKBOOK_NAME) + + +class Cost_Fetcher: + ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On' + INDEX_DOMAIN_FLAG: str = 'Index Domain' + NAME_DOMAIN_CARD_MARKET: str = 'Card Market' + NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' + NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' + NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' + NAME_FLAG: str = 'Name' + + domain_names: list[str] + eur_to_gbp_rate: float + product_scrapers: list[Product_Scraper] + product_sources: pd.DataFrame + workbook_container: TCG_Sole_Trader_Workbook_Container + + def __init__(self): + self.domain_names = [ + self.NAME_DOMAIN_CARD_MARKET + , self.NAME_DOMAIN_CHAOS_CARDS + , self.NAME_DOMAIN_GAMES_LORE + , self.NAME_DOMAIN_MAGIC_MADHOUSE + ] + self.domain_details = { + self.NAME_DOMAIN_CARD_MARKET: { + self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_CHAOS_CARDS: { + self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_GAMES_LORE: { + self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + , self.NAME_DOMAIN_MAGIC_MADHOUSE: { + self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE + , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE) + , self.ACCESSED_LAST_ON_FLAG: 0 + } + } + product_scrapers = [] + for index_domain in range(len(self.domain_names)): + domain = self.domain_names[index_domain] + product_scraper = Product_Scraper(domain) + product_scrapers.append(product_scraper) + self.product_scrapers = product_scrapers + self.workbook_container = TCG_Sole_Trader_Workbook_Container() + self.get_eur_to_gbp_rate() + + self.domain_locks = defaultdict(asyncio.Lock) + + def get_index_domain_from_name(self, domain_name): + for index_domain in range(len(self.domain_names)): + if (self.domain_names[index_domain] == domain_name): + return index_domain + raise ValueError(f'Domain does not exist: {domain_name}') + + def get_eur_to_gbp_rate(self): + try: + response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) + data = response.json() + self.eur_to_gbp_rate = data['rates']['GBP'] + except Exception as e: + print(f"Error fetching exchange rate: {e}") + print("Using fallback rate: 0.85") + self.eur_to_gbp_rate = 0.85 + + def fetch_all(self): + try: + processed_count = 0 + updated_count = 0 + self.product_sources = self.workbook_container.get_sourcing_entries() + for index_product_source in range(len(self.product_sources)): + product_source = self.product_sources.loc[index_product_source] + print(f'Product source: {product_source}') + index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] + source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] + source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] + index_domain = None + try: + index_domain = self.get_index_domain_from_name(source_name) + except: + continue + domain_details = self.domain_details[source_name] + self.workbook_container.clear_row_sourcing_sheet(index_row = index_row) + processed_count += 1 + Cost_Fetcher.log_processing_new_row( + index_row = index_row + , source_link = source_link + ) + + cost = None + price = None + active = None + if source_name == self.NAME_DOMAIN_CARD_MARKET: + while (time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): + time.sleep(random.uniform(3, 5)) + if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: + price_quantity_pairs = self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) + if price_quantity_pairs: + active = True + max_quantity = 0 + updated_row_price = False + for price_quantity_pair in price_quantity_pairs: + eur_price = price_quantity_pair['price'] + quantity = price_quantity_pair['quantity'] + print(f" Found price: €{eur_price}") + print(f" Found quantity: {quantity}") + max_quantity = max(max_quantity, quantity) + if quantity >= 8: + if eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price + updated_count += 1 + updated_row_price = True + # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") + break + else: + print(f" Error: Could not parse price") + if not updated_row_price: + print("Offer with quantity >= 8 not found") + for price_quantity_pair in price_quantity_pairs: + eur_price = price_quantity_pair['price'] + quantity = price_quantity_pair['quantity'] + print(f" Found price: €{eur_price}") + print(f" Found quantity: {quantity}") + if max_quantity <= 2 or quantity == max_quantity: + if eur_price: + price = eur_price * self.eur_to_gbp_rate + print(f" Converted: €{eur_price:.2f} → £{price:.2f}") + # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price + updated_count += 1 + updated_row_price = True + # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") + break + else: + print(f" Error: Could not parse price") + cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) + elif source_name == self.NAME_DOMAIN_CHAOS_CARDS: + while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): + time.sleep(random.uniform(3, 5)) + cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(url = source_link) + elif source_name == self.NAME_DOMAIN_GAMES_LORE: + while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): + time.sleep(random.uniform(3, 5)) + cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(url = source_link) + elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: + while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): + time.sleep(random.uniform(3, 5)) + cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(url = source_link) + + self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time() + + if ((cost is not None or price is not None) and active is not None): + print(f" Found cost: {cost}, price: {price}, active: {active}") + self.workbook_container.update_row_sourcing_sheet(index_row = index_row, unit_cost = cost, unit_price = price, active = active) + updated_count += 1 + else: + print(f" Error: Could not find cost on page") + + self.workbook_container.save_workbook() + """ + for index_domain in range(len(self.domain_names)): + self.product_scrapers[index_domain].stop_browser() + """ + print(f"\nComplete!") + print(f"Processed: {processed_count} entries") + print(f"Updated: {updated_count} costs") + except Exception as e: + print(f"Error: {e}") + + @staticmethod + def log_processing_new_row(index_row, source_link): + print(f"\n{'='*60}") + print(f"Processing row {index_row}: {source_link}") + print(f"{'='*60}") + +def main(): + cost_fetcher = Cost_Fetcher() + cost_fetcher.fetch_all() + +if __name__ == "__main__": + main() \ No newline at end of file