import pandas as pd from openpyxl import load_workbook, Workbook from openpyxl.worksheet.worksheet import Worksheet import requests """ from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service """ import re import time import random from playwright.sync_api import sync_playwright, Browser, Page from playwright.async_api import async_playwright # import playwright # import undetected_chromedriver as uc # from undetected_chromedriver import Chrome import asyncio from aioconsole import ainput from collections import defaultdict from datetime import datetime, timedelta class Product_Scraper: domain: str page: Page def __init__(self, domain): print("Setting up browser automation") self.domain = domain @staticmethod def parse_cost(cost_text): if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) try: return float(cost_clean) / 100 except ValueError: return None @classmethod def parse_cost_from_pennies(cls, cost_text): if not cost_text: return None cost_clean = cls.parse_cost(cost_text = cost_text) if cost_clean is not None: cost_clean = cost_clean / 100 return cost_clean @classmethod def parse_cost_chaoscards(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_cardmarket(cls, cost_text): # return cls.parse_cost(cost_text = cost_text) """Convert '141,30 €' format to float in EUR""" if not cost_text: return None cost_clean = re.sub(r'[^\d,]', '', cost_text) cost_clean = cost_clean.replace(',', '.') try: return float(cost_clean) except ValueError: return None @classmethod def parse_cost_gameslore(cls, cost_text): return cls.parse_cost(cost_text = cost_text) @classmethod def parse_cost_magicmadhouse(cls, cost_text): return cls.parse_cost(cost_text = cost_text) async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0): print(f" Loading page...") self.page = await browser.new_page() await self.page.goto(url = url) cost = None active = None try: # Automatically waits up to 30s by default element = self.page.locator(selector = page_load_element_selector) page_title = await self.page.title() print(f" Page title: {page_title}") element = self.page.locator(selector = cost_selector) text = await element.text_content() print(f" Text: '{text}'") cost = text active = None if active_selector is None: active = (cost is not None) else: try: elements = await self.page.query_selector_all(selector = cost_selector) if len(elements) == 0: active = True else: text = await elements[0].text_content() print(f" Text: '{text}'") active = (invalid_active_statuses is None or text not in invalid_active_statuses) except Exception as e: print(f" Selector failed: {e}") if cost is None or active is None: print(f" ✗ No cost found") print(f"Cost: {cost}, Active: {active}") await ainput("Press Enter to continue to next URL...") except Exception as e: print(f" Error: {e}") await ainput("Press Enter to continue to next URL...") return None, None finally: return cost, active async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate): page_load_element_selector = "body > main.container > div.page-title-container" cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = None , invalid_active_statuses = [] , min_delay = 15 ) cost = Product_Scraper.parse_cost_cardmarket(cost_text) if cost is not None: item_shipping_cost_in = 0 if cost < 10: item_shipping_cost_in = 2 elif cost < 100: item_shipping_cost_in = 8 else: item_shipping_cost_in = 20 cost = cost * eur_to_gbp_rate + item_shipping_cost_in active = (cost is not None) return cost, active async def scrape_cost_and_active_playwright_chaoscards(self, browser, url): # page_load_element_selector = '#prod_title' cost_selector = '.price_inc > span:nth-child(2)' active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = cost_selector # page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["Out of stock", "Coming soon"] , min_delay = 15 ) cost = Product_Scraper.parse_cost_chaoscards(cost_text) return cost, active async def scrape_cost_and_active_playwright_gameslore(self, browser, url): # page_load_element_selector = '.page-title' cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' active_selector = '.stock > span:nth-child(1)' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = cost_selector # page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = ["OUT OF STOCK"] ) cost = Product_Scraper.parse_cost_gameslore(cost_text) return cost, active async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url): page_load_element_selector = '.productView-title' cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' active_selector = '.alertBox.alertBox--error' cost_text, active = await self.scrape_cost_and_active_playwright( browser = browser , url = url , page_load_element_selector = page_load_element_selector , cost_selector = cost_selector , active_selector = active_selector , invalid_active_statuses = [] ) cost = Product_Scraper.parse_cost_magicmadhouse(cost_text) return cost, active async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate): offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' quantity_selector = 'div.amount-container > span:nth-child(1)' print(f" Loading page...") self.page = await browser.new_page() await self.page.goto(url = url) try: # Automatically waits up to 30s by default page_title = await self.page.title() print(f" Page title: {page_title}") price_quantity_pairs = [] try: offer_containers = await self.page.query_selector_all(offer_container_selector) print(f" Offer container selector: Found {len(offer_containers)} elements") for offer_container in offer_containers: price_element = await offer_container.query_selector(price_selector) price_text = await price_element.text_content() if '€' in price_text and re.search(r'\d', price_text): print(f" ✓ Found price: {price_text}") else: price_text = None quantity_element = await offer_container.query_selector(quantity_selector) quantity_text = await quantity_element.text_content() if price_text is None or quantity_text is None: continue price_quantity_pairs.append({ 'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate , 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text) }) except Exception as e: print(f" Price selector failed: {e}") await ainput("Press enter to continue to next URL...") return [] finally: return price_quantity_pairs class TCG_Sole_Trader_Workbook_Container: NAME_COLUMN_ACTIVE: str = 'Active' NAME_COLUMN_INDEX_ROW: str = 'Index Row' NAME_COLUMN_LINK: str = 'Link' NAME_COLUMN_PRODUCT_ID: str = 'Product Id' NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster' NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box' NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon' NAME_COLUMN_SOURCE_NAME: str = 'Source Name' NAME_COLUMN_UNIT_COST: str = 'Cost' NAME_COLUMN_UNIT_PRICE: str = 'Price' PRODUCT_WORKSHEET_NAME = 'Product' SOURCING_WORKSHEET_NAME = 'Sourcing' WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx' index_column_active_sourcing: int index_column_is_booster_product: int index_column_is_booster_box_product: int index_column_is_precon_product: int index_column_link_sourcing: int index_column_name_sourcing: int index_column_product_id_product: int index_column_product_id_sourcing: int index_column_unit_cost_sourcing: int index_column_unit_price_sourcing: int index_row_header_product: int index_row_header_sourcing: int product_sheet: Worksheet sourcing_sheet: Worksheet workbook: Workbook def __init__(self): print("Loading workbook...") self.workbook = load_workbook(self.WORKBOOK_NAME) if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found") return if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames: print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found") return self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME] self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME] sourcing_table_found = False for row in range(1, self.sourcing_sheet.max_row + 1): if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found or not self.index_row_header_sourcing: for row in range(1, min(20, self.sourcing_sheet.max_row + 1)): if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value): self.index_row_header_sourcing = row sourcing_table_found = True break if not sourcing_table_found: print("Error: Could not find table 'tbl_Sourcing'") return product_table_found = False for row in range(1, self.product_sheet.max_row + 1): if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value): self.index_row_header_product = row product_table_found = True break if not product_table_found: print("Error: Could not find table 'tbl_Product'") return for index_column in range(1, self.sourcing_sheet.max_column + 1): header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip() if 'Source Name' == header: self.index_column_name_sourcing = index_column elif 'Source Link' == header: self.index_column_link_sourcing = index_column elif 'Source Unit Cost' == header: self.index_column_unit_cost_sourcing = index_column elif 'Sale Price' == header: self.index_column_unit_price_sourcing = index_column elif 'Active' == header: self.index_column_active_sourcing = index_column elif 'Product Id' == header: self.index_column_product_id_sourcing = index_column for index_column in range(1, self.product_sheet.max_column + 1): header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip() if 'Is Booster Box' == header: self.index_column_is_booster_box_product = index_column elif 'Is Booster' == header: self.index_column_is_booster_product = index_column elif 'Is Precon' == header: self.index_column_is_precon_product = index_column elif 'Product Id' == header: self.index_column_product_id_product = index_column print(f"Sourcing max row: {self.sourcing_sheet.max_row}") print(f"Sourcing header row: {self.index_row_header_sourcing}") print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}") print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}") print(f"Product max row: {self.product_sheet.max_row}") print(f"Product header row: {self.index_row_header_product}") print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}") print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}") if not all([ self.index_column_name_sourcing , self.index_column_link_sourcing , self.index_column_unit_cost_sourcing , self.index_column_unit_price_sourcing , self.index_column_product_id_sourcing , self.index_column_active_sourcing , self.index_column_product_id_product , self.index_column_is_booster_product , self.index_column_is_booster_box_product , self.index_column_is_precon_product ]): print("Error: Could not find required columns") return @classmethod def create_product_source_df(cls): return pd.DataFrame(columns = [ cls.NAME_COLUMN_INDEX_ROW , cls.NAME_COLUMN_PRODUCT_ID , cls.NAME_COLUMN_SOURCE_NAME , cls.NAME_COLUMN_LINK , cls.NAME_COLUMN_PRODUCT_IS_BOOSTER , cls.NAME_COLUMN_UNIT_COST , cls.NAME_COLUMN_UNIT_PRICE , cls.NAME_COLUMN_ACTIVE ]) def get_sourcing_entries(self): product_sources = self.create_product_source_df() try: # products = [] for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1): source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value if not source_name or not source_link: continue print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}") product_is_booster = False for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value # print(f"found product: id {product_id}") if product_id == source_product_id: product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper() # print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}") product_is_booster = (product_is_booster_text == "TRUE") break print(f"product is booster: {product_is_booster}") # products.append((index_row, source_product_id, source_name, source_link, product_is_booster)) product_sources.loc[len(product_sources)] = [ index_row , source_product_id , source_name , source_link , product_is_booster , None # cost , None # price , None # active ] except Exception as e: print(f"Error: {e}") product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME) return product_sources def clear_row_sourcing_sheet(self, index_row): self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE" def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None): if unit_cost is not None: self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost if unit_price is not None: self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price if active is not None: self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE" def save_workbook(self): print(f"\n{'='*60}") print(f"Saving workbook...") self.workbook.save(self.WORKBOOK_NAME) class Cost_Fetcher: ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On' ACTIVE_FLAG: str = 'Active' COST_FLAG: str = 'Cost' DATA_FLAG: str = 'Data' ERROR_FLAG: str = 'Error' INDEX_DOMAIN_FLAG: str = 'Index Domain' INDEX_ROW_FLAG: str = 'Index Row' NAME_DOMAIN_CARD_MARKET: str = 'Card Market' NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' NAME_FLAG: str = 'Name' PRICE_FLAG: str = 'Price' SUCCESS_FLAG: str = 'Success' URL_FLAG: str = 'Url' active_row_indices = list[int] domain_names: list[str] eur_to_gbp_rate: float product_scrapers: list[Product_Scraper] product_sources: pd.DataFrame workbook_container: TCG_Sole_Trader_Workbook_Container def __init__(self): self.domain_names = [ self.NAME_DOMAIN_CARD_MARKET , self.NAME_DOMAIN_CHAOS_CARDS , self.NAME_DOMAIN_GAMES_LORE , self.NAME_DOMAIN_MAGIC_MADHOUSE ] self.domain_details = { self.NAME_DOMAIN_CARD_MARKET: { self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET) , self.ACCESSED_LAST_ON_FLAG: 0 } , self.NAME_DOMAIN_CHAOS_CARDS: { self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS) , self.ACCESSED_LAST_ON_FLAG: 0 } , self.NAME_DOMAIN_GAMES_LORE: { self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE) , self.ACCESSED_LAST_ON_FLAG: 0 } , self.NAME_DOMAIN_MAGIC_MADHOUSE: { self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE , self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE) , self.ACCESSED_LAST_ON_FLAG: 0 } } product_scrapers = [] active_row_indices = [] for index_domain in range(len(self.domain_names)): domain = self.domain_names[index_domain] product_scraper = Product_Scraper(domain) product_scrapers.append(product_scraper) active_row_indices.append(None) self.product_scrapers = product_scrapers self.active_row_indices = active_row_indices self.workbook_container = TCG_Sole_Trader_Workbook_Container() self.get_eur_to_gbp_rate() self.domain_locks = defaultdict(asyncio.Lock) def get_index_domain_from_name(self, domain_name): for index_domain in range(len(self.domain_names)): if (self.domain_names[index_domain] == domain_name): return index_domain raise ValueError(f'Domain does not exist: {domain_name}') def get_eur_to_gbp_rate(self): try: response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10) data = response.json() self.eur_to_gbp_rate = data['rates']['GBP'] except Exception as e: print(f"Error fetching exchange rate: {e}") print("Using fallback rate: 0.85") self.eur_to_gbp_rate = 0.85 async def fetch_all(self): try: processed_count = 0 updated_count = 0 self.product_sources = self.workbook_container.get_sourcing_entries() # sourced_products = await self.scrape_with_browser_pool() # for sourced_product in sourced_products: # for product_source in self.product_sources: # sourced_product = await self.fetch_single(product_source = product_source) # for index_product_source, product_source in self.product_sources.iterrows(): # sourced_product = await self.fetch_single(product_source = product_source) # Create tasks for parallel execution tasks = [] for index_product_source, product_source in self.product_sources.iterrows(): task = self.fetch_single(product_source = product_source) tasks.append(task) # Execute all tasks in parallel sourced_products = await asyncio.gather(*tasks) # Process results for sourced_product in sourced_products: index_row = sourced_product[self.workbook_container.NAME_COLUMN_INDEX_ROW] unit_cost = sourced_product[self.workbook_container.NAME_COLUMN_UNIT_COST] unit_price = sourced_product[self.workbook_container.index_column_unit_price_sourcing] active = sourced_product[self.workbook_container.index_column_active_sourcing] processed_count += 1 if not active: continue updated_count += 1 self.workbook_container.update_row_sourcing_sheet( index_row = index_row , unit_cost = unit_cost , unit_price = unit_price , active = active ) self.workbook_container.save_workbook() print(f"\nComplete!") print(f"Processed: {processed_count} entries") print(f"Updated: {updated_count} costs") except Exception as e: print(f"Error: {e}") """ async def scrape_with_browser_pool(self): count_domains = len(self.domain_names) async with async_playwright() as p: browsers = [await p.chromium.launch(headless = False) for _ in range(count_domains)] try: tasks = [] # for i, url in enumerate(urls): for index_product_source in range(len(self.product_sources)): product_source = self.product_sources.loc[index_product_source] browser = browsers[index_product_source % count_domains] tasks.append(self.fetch_single(browser, product_source)) return await asyncio.gather(*tasks) finally: for browser in browsers: await browser.close() """ async def fetch_single(self, product_source): # browser, async with async_playwright() as p: browser = await p.chromium.launch(headless = False) print(f'Product source: {product_source}') index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] index_domain = None try: index_domain = self.get_index_domain_from_name(source_name) except: await browser.close() return self.make_result_data_json(index_row = index_row) domain_details = self.domain_details[source_name] self.workbook_container.clear_row_sourcing_sheet(index_row = index_row) Cost_Fetcher.log_processing_new_row( index_row = index_row , source_link = source_link ) cost = None price = None active = None if source_name == self.NAME_DOMAIN_CARD_MARKET: while (self.active_row_indices[index_domain] is None or time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): await asyncio.sleep(random.uniform(3, 5)) self.active_row_indices[index_domain] = index_row if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs) cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) elif source_name == self.NAME_DOMAIN_CHAOS_CARDS: while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): await asyncio.sleep(random.uniform(3, 5)) self.active_row_indices[index_domain] = index_row cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(browser = browser, url = source_link) elif source_name == self.NAME_DOMAIN_GAMES_LORE: while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): await asyncio.sleep(random.uniform(3, 5)) self.active_row_indices[index_domain] = index_row cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(browser = browser, url = source_link) elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): await asyncio.sleep(random.uniform(3, 5)) self.active_row_indices[index_domain] = index_row cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(browser = browser, url = source_link) self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time() self.active_row_indices[index_domain] = None if ((cost is None and price is None) or active is None): print(f" Error: Could not find cost on page") await browser.close() return self.make_result_data_json( index_row = index_row , cost = cost , price = price , active = active ) @classmethod def make_result(cls, url, success, data, error): return { cls.URL_FLAG: url , cls.SUCCESS_FLAG: success , cls.DATA_FLAG: data , cls.ERROR_FLAG: error } @classmethod def make_result_data_json(cls, index_row, cost = None, price = None, active = None): return { cls.INDEX_ROW_FLAG: index_row , cls.COST_FLAG: cost , cls.PRICE_FLAG: price , cls.ACTIVE_FLAG: active } def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs): if not price_quantity_pairs: return None, False max_quantity = 0 updated_row_price = False for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") max_quantity = max(max_quantity, quantity) if quantity >= 8: if eur_price: price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{price:.2f}") # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price updated_row_price = True # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") break else: print(f" Error: Could not parse price") if not updated_row_price: print("Offer with quantity >= 8 not found") for price_quantity_pair in price_quantity_pairs: eur_price = price_quantity_pair['price'] quantity = price_quantity_pair['quantity'] print(f" Found price: €{eur_price}") print(f" Found quantity: {quantity}") if max_quantity <= 2 or quantity == max_quantity: if eur_price: price = eur_price * self.eur_to_gbp_rate print(f" Converted: €{eur_price:.2f} → £{price:.2f}") # self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price updated_row_price = True # print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}") break else: print(f" Error: Could not parse price") return price, True @staticmethod def log_processing_new_row(index_row, source_link): print(f"\n{'='*60}") print(f"Processing row {index_row}: {source_link}") print(f"{'='*60}") async def main(): cost_fetcher = Cost_Fetcher() await cost_fetcher.fetch_all() if __name__ == "__main__": asyncio.run(main())