Feat: Product Scraper - update to async - broken.

This commit is contained in:
2026-01-11 23:16:01 +00:00
parent 86d4a981fd
commit 72ee0bb104
2 changed files with 226 additions and 327 deletions

View File

@@ -15,34 +15,23 @@ import re
import time import time
import random import random
from playwright.sync_api import sync_playwright, Browser, Page from playwright.sync_api import sync_playwright, Browser, Page
from playwright.async_api import async_playwright
# import playwright # import playwright
# import undetected_chromedriver as uc # import undetected_chromedriver as uc
# from undetected_chromedriver import Chrome # from undetected_chromedriver import Chrome
import asyncio import asyncio
from aioconsole import ainput
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
class Product_Scraper: class Product_Scraper:
browser: Browser # sync_playwright.chromium
domain: str domain: str
# driver: Chrome # webdriver.Chrome
page: Page page: Page
# wait: WebDriverWait
def __init__(self, domain): def __init__(self, domain):
print("Setting up browser automation") print("Setting up browser automation")
self.domain = domain self.domain = domain
"""
self.setup_browser()
if not self.browser:
return
"""
def stop_browser(self, min_delay = 0):
if (min_delay > 0):
time.sleep(random.uniform(min_delay, min_delay * 1.5))
self.browser.close()
@staticmethod @staticmethod
def parse_cost(cost_text): def parse_cost(cost_text):
@@ -83,119 +72,56 @@ class Product_Scraper:
def parse_cost_magicmadhouse(cls, cost_text): def parse_cost_magicmadhouse(cls, cost_text):
return cls.parse_cost(cost_text = cost_text) return cls.parse_cost(cost_text = cost_text)
""" async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0):
def setup_driver(self):
print("Starting driver")
" ""
chrome_options = Options()
# Remove headless mode to see the browser
# chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
chrome_options.add_argument('--window-size=1920,1080')
" ""
try:
self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options)
# return driver
except Exception as e:
print(f"Error setting up Chrome driver: {e}")
print("Make sure Chrome and chromedriver are installed")
# return None
self.wait = WebDriverWait(self.driver, 15)
" ""
def setup_browser(self):
print("Starting browser")
with sync_playwright() as p:
self.browser = p.chromium.launch()
self.page = self.browser.new_page()
"""
def scrape_cost_and_active_playwright(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0):
print(f" Loading page...") print(f" Loading page...")
# time.sleep(random.uniform(6, 10)) self.page = await browser.new_page()
""" await self.page.goto(url = url)
cost = None
active = None
try: try:
self.driver.get(url) # Automatically waits up to 30s by default
element = self.wait.until( element = self.page.locator(selector = page_load_element_selector)
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) page_title = await self.page.title()
) print(f" Page title: {page_title}")
element = self.wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector)) element = self.page.locator(selector = cost_selector)
) text = await element.text_content()
print(f" Text: '{text}'")
cost = text
active = None
if active_selector is None:
active = (cost is not None)
else:
try:
elements = await self.page.query_selector_all(selector = cost_selector)
if len(elements) == 0:
active = True
else:
text = await elements[0].text_content()
print(f" Text: '{text}'")
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
except Exception as e:
print(f" Selector failed: {e}")
if cost is None or active is None:
print(f" ✗ No cost found")
print(f"Cost: {cost}, Active: {active}")
await ainput("Press Enter to continue to next URL...")
except Exception as e: except Exception as e:
self.driver.get(url) print(f" Error: {e}")
element = self.wait.until( await ainput("Press Enter to continue to next URL...")
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector)) return None, None
) finally:
element = self.wait.until( return cost, active
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector))
)
max_attempts = 10 async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate):
for attempt in range(max_attempts):
try:
element = None
element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector)
text = element.text
print(f"✓ Element loaded successfully on attempt {attempt + 1}")
# return True
break
except StaleElementReferenceException:
print(f"Stale element on attempt {attempt + 1}, retrying...")
if attempt < max_attempts - 1:
time.sleep(1)
else:
raise ValueError("StaleElementReferenceException")
"""
with sync_playwright() as p:
self.browser = p.chromium.launch(headless=False)
self.page = self.browser.new_page()
self.page.goto(url = url)
try:
# Automatically waits up to 30s by default
element = self.page.locator(selector = page_load_element_selector)
print(f" Page title: {self.page.title()}")
element = self.page.locator(selector = cost_selector)
text = element.text_content()
print(f" Text: '{text}'")
cost = text
active = None
if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []:
active = (cost is not None)
else:
try:
elements = self.page.query_selector_all(selector = cost_selector)
if len(elements) == 0:
active = True
else:
text = elements[0].text_content()
print(f" Text: '{text}'")
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
except Exception as e:
print(f" Selector failed: {e}")
if cost is None or active is None:
print(f" ✗ No cost found")
print(f"Cost: {cost}, Active: {active}")
input("Press Enter to continue to next URL...")
except Exception as e:
print(f" Error: {e}")
input("Press Enter to continue to next URL...")
self.stop_browser(min_delay = min_delay)
return None, None
finally:
self.stop_browser(min_delay = min_delay)
return cost, active
def scrape_cost_and_active_playwright_cardmarket(self, url, eur_to_gbp_rate):
page_load_element_selector = "body > main.container > div.page-title-container" page_load_element_selector = "body > main.container > div.page-title-container"
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)' cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
cost_text, active = self.scrape_cost_and_active_playwright( cost_text, active = await self.scrape_cost_and_active_playwright(
url = url browser = browser
, url = url
, page_load_element_selector = page_load_element_selector , page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector , cost_selector = cost_selector
, active_selector = None , active_selector = None
@@ -215,12 +141,13 @@ class Product_Scraper:
active = (cost is not None) active = (cost is not None)
return cost, active return cost, active
def scrape_cost_and_active_playwright_chaoscards(self, url): async def scrape_cost_and_active_playwright_chaoscards(self, browser, url):
# page_load_element_selector = '#prod_title' # page_load_element_selector = '#prod_title'
cost_selector = '.price_inc > span:nth-child(2)' cost_selector = '.price_inc > span:nth-child(2)'
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)' active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)'
cost_text, active = self.scrape_cost_and_active_playwright( cost_text, active = await self.scrape_cost_and_active_playwright(
url = url browser = browser
, url = url
, page_load_element_selector = cost_selector # page_load_element_selector , page_load_element_selector = cost_selector # page_load_element_selector
, cost_selector = cost_selector , cost_selector = cost_selector
, active_selector = active_selector , active_selector = active_selector
@@ -230,12 +157,13 @@ class Product_Scraper:
cost = Product_Scraper.parse_cost_chaoscards(cost_text) cost = Product_Scraper.parse_cost_chaoscards(cost_text)
return cost, active return cost, active
def scrape_cost_and_active_playwright_gameslore(self, url): async def scrape_cost_and_active_playwright_gameslore(self, browser, url):
# page_load_element_selector = '.page-title' # page_load_element_selector = '.page-title'
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price' cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
active_selector = '.stock > span:nth-child(1)' active_selector = '.stock > span:nth-child(1)'
cost_text, active = self.scrape_cost_and_active_playwright( cost_text, active = await self.scrape_cost_and_active_playwright(
url = url browser = browser
, url = url
, page_load_element_selector = cost_selector # page_load_element_selector , page_load_element_selector = cost_selector # page_load_element_selector
, cost_selector = cost_selector , cost_selector = cost_selector
, active_selector = active_selector , active_selector = active_selector
@@ -244,12 +172,13 @@ class Product_Scraper:
cost = Product_Scraper.parse_cost_gameslore(cost_text) cost = Product_Scraper.parse_cost_gameslore(cost_text)
return cost, active return cost, active
def scrape_cost_and_active_playwright_magicmadhouse(self, url): async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url):
page_load_element_selector = '.productView-title' page_load_element_selector = '.productView-title'
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price' cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
active_selector = '.alertBox.alertBox--error' active_selector = '.alertBox.alertBox--error'
cost_text, active = self.scrape_cost_and_active_playwright( cost_text, active = await self.scrape_cost_and_active_playwright(
url = url browser = browser
, url = url
, page_load_element_selector = page_load_element_selector , page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector , cost_selector = cost_selector
, active_selector = active_selector , active_selector = active_selector
@@ -258,50 +187,47 @@ class Product_Scraper:
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text) cost = Product_Scraper.parse_cost_magicmadhouse(cost_text)
return cost, active return cost, active
def scrape_prices_and_quantities_playwright_cardmarket(self, url, eur_to_gbp_rate): async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate):
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer' offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)' price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
quantity_selector = 'div.amount-container > span:nth-child(1)' quantity_selector = 'div.amount-container > span:nth-child(1)'
print(f" Loading page...") print(f" Loading page...")
with sync_playwright() as p: self.page = await browser.new_page()
self.browser = p.chromium.launch(headless=False) await self.page.goto(url = url)
self.page = self.browser.new_page()
self.page.goto(url = url)
try:
# Automatically waits up to 30s by default
page_title = await self.page.title()
print(f" Page title: {page_title}")
price_quantity_pairs = []
try: try:
# Automatically waits up to 30s by default offer_containers = await self.page.query_selector_all(offer_container_selector)
print(f" Page title: {self.page.title()}") print(f" Offer container selector: Found {len(offer_containers)} elements")
for offer_container in offer_containers:
price_element = await offer_container.query_selector(price_selector)
price_text = await price_element.text_content()
if '' in price_text and re.search(r'\d', price_text):
print(f" ✓ Found price: {price_text}")
else:
price_text = None
price_quantity_pairs = [] quantity_element = await offer_container.query_selector(quantity_selector)
try: quantity_text = await quantity_element.text_content()
offer_containers = self.page.query_selector_all(offer_container_selector)
print(f" Offer container selector: Found {len(offer_containers)} elements")
for offer_container in offer_containers:
price_element = offer_container.query_selector(price_selector)
price_text = price_element.text_content()
if '' in price_text and re.search(r'\d', price_text):
print(f" ✓ Found price: {price_text}")
else:
price_text = None
quantity_element = offer_container.query_selector(quantity_selector) if price_text is None or quantity_text is None:
quantity_text = quantity_element.text_content() continue
price_quantity_pairs.append({
if price_text is None or quantity_text is None: 'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate
continue , 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text)
price_quantity_pairs.append({ })
'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate except Exception as e:
, 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text) print(f" Price selector failed: {e}")
}) await ainput("Press enter to continue to next URL...")
except Exception as e: return []
print(f" Price selector failed: {e}") finally:
input("Press enter to continue to next URL...") return price_quantity_pairs
self.stop_browser(min_delay = 15)
return []
finally:
self.stop_browser(min_delay = 15)
return price_quantity_pairs
class TCG_Sole_Trader_Workbook_Container: class TCG_Sole_Trader_Workbook_Container:
@@ -449,11 +375,9 @@ class TCG_Sole_Trader_Workbook_Container:
source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value
source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value
source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value
print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
if not source_name or not source_link: if not source_name or not source_link:
continue continue
print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
product_is_booster = False product_is_booster = False
for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1): for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1):
product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value
@@ -501,13 +425,22 @@ class TCG_Sole_Trader_Workbook_Container:
class Cost_Fetcher: class Cost_Fetcher:
ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On' ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On'
ACTIVE_FLAG: str = 'Active'
COST_FLAG: str = 'Cost'
DATA_FLAG: str = 'Data'
ERROR_FLAG: str = 'Error'
INDEX_DOMAIN_FLAG: str = 'Index Domain' INDEX_DOMAIN_FLAG: str = 'Index Domain'
INDEX_ROW_FLAG: str = 'Index Row'
NAME_DOMAIN_CARD_MARKET: str = 'Card Market' NAME_DOMAIN_CARD_MARKET: str = 'Card Market'
NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards' NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards'
NAME_DOMAIN_GAMES_LORE: str = 'Games Lore' NAME_DOMAIN_GAMES_LORE: str = 'Games Lore'
NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse' NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse'
NAME_FLAG: str = 'Name' NAME_FLAG: str = 'Name'
PRICE_FLAG: str = 'Price'
SUCCESS_FLAG: str = 'Success'
URL_FLAG: str = 'Url'
active_row_indices = list[int]
domain_names: list[str] domain_names: list[str]
eur_to_gbp_rate: float eur_to_gbp_rate: float
product_scrapers: list[Product_Scraper] product_scrapers: list[Product_Scraper]
@@ -544,11 +477,14 @@ class Cost_Fetcher:
} }
} }
product_scrapers = [] product_scrapers = []
active_row_indices = []
for index_domain in range(len(self.domain_names)): for index_domain in range(len(self.domain_names)):
domain = self.domain_names[index_domain] domain = self.domain_names[index_domain]
product_scraper = Product_Scraper(domain) product_scraper = Product_Scraper(domain)
product_scrapers.append(product_scraper) product_scrapers.append(product_scraper)
active_row_indices.append(None)
self.product_scrapers = product_scrapers self.product_scrapers = product_scrapers
self.active_row_indices = active_row_indices
self.workbook_container = TCG_Sole_Trader_Workbook_Container() self.workbook_container = TCG_Sole_Trader_Workbook_Container()
self.get_eur_to_gbp_rate() self.get_eur_to_gbp_rate()
@@ -570,113 +506,53 @@ class Cost_Fetcher:
print("Using fallback rate: 0.85") print("Using fallback rate: 0.85")
self.eur_to_gbp_rate = 0.85 self.eur_to_gbp_rate = 0.85
def fetch_all(self): async def fetch_all(self):
try: try:
processed_count = 0 processed_count = 0
updated_count = 0 updated_count = 0
self.product_sources = self.workbook_container.get_sourcing_entries() self.product_sources = self.workbook_container.get_sourcing_entries()
for index_product_source in range(len(self.product_sources)): sourced_products = await self.scrape_with_browser_pool()
product_source = self.product_sources.loc[index_product_source] for sourced_product in sourced_products:
print(f'Product source: {product_source}') # self.workbook_container.workbook.cell(index_row, self.workbook_container.IND)
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] index_row = sourced_product[self.workbook_container.NAME_COLUMN_INDEX_ROW]
source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] unit_cost = sourced_product[self.workbook_container.NAME_COLUMN_UNIT_COST]
source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK] unit_price = sourced_product[self.workbook_container.index_column_unit_price_sourcing]
index_domain = None active = sourced_product[self.workbook_container.index_column_active_sourcing]
try:
index_domain = self.get_index_domain_from_name(source_name)
except:
continue
domain_details = self.domain_details[source_name]
self.workbook_container.clear_row_sourcing_sheet(index_row = index_row)
processed_count += 1 processed_count += 1
Cost_Fetcher.log_processing_new_row( if not active:
continue
updated_count += 1
self.workbook_container.update_row_sourcing_sheet(
index_row = index_row index_row = index_row
, source_link = source_link , unit_cost = unit_cost
, unit_price = unit_price
, active = active
) )
cost = None
price = None
active = None
if source_name == self.NAME_DOMAIN_CARD_MARKET:
while (time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
time.sleep(random.uniform(3, 5))
if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]:
price_quantity_pairs = self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
if price_quantity_pairs:
active = True
max_quantity = 0
updated_row_price = False
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
max_quantity = max(max_quantity, quantity)
if quantity >= 8:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price
updated_count += 1
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
if not updated_row_price:
print("Offer with quantity >= 8 not found")
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
if max_quantity <= 2 or quantity == max_quantity:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price
updated_count += 1
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
elif source_name == self.NAME_DOMAIN_CHAOS_CARDS:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
time.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(url = source_link)
elif source_name == self.NAME_DOMAIN_GAMES_LORE:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
time.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(url = source_link)
elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
time.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(url = source_link)
self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time()
if ((cost is not None or price is not None) and active is not None):
print(f" Found cost: {cost}, price: {price}, active: {active}")
self.workbook_container.update_row_sourcing_sheet(index_row = index_row, unit_cost = cost, unit_price = price, active = active)
updated_count += 1
else:
print(f" Error: Could not find cost on page")
self.workbook_container.save_workbook() self.workbook_container.save_workbook()
"""
for index_domain in range(len(self.domain_names)):
self.product_scrapers[index_domain].stop_browser()
"""
print(f"\nComplete!") print(f"\nComplete!")
print(f"Processed: {processed_count} entries") print(f"Processed: {processed_count} entries")
print(f"Updated: {updated_count} costs") print(f"Updated: {updated_count} costs")
except Exception as e: except Exception as e:
print(f"Error: {e}") print(f"Error: {e}")
async def fetch_single(self): async def scrape_with_browser_pool(self):
product_source = self.product_sources.loc[index_product_source] count_domains = len(self.domain_names)
async with async_playwright() as p:
browsers = [await p.chromium.launch(headless = False) for _ in range(count_domains)]
try:
tasks = []
# for i, url in enumerate(urls):
for index_product_source in range(len(self.product_sources)):
product_source = self.product_sources.loc[index_product_source]
browser = browsers[index_product_source % count_domains]
tasks.append(self.fetch_single(browser, product_source))
return await asyncio.gather(*tasks)
finally:
for browser in browsers:
await browser.close()
async def fetch_single(self, browser, product_source):
print(f'Product source: {product_source}') print(f'Product source: {product_source}')
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW] index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW]
source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME] source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME]
@@ -685,12 +561,11 @@ class Cost_Fetcher:
try: try:
index_domain = self.get_index_domain_from_name(source_name) index_domain = self.get_index_domain_from_name(source_name)
except: except:
continue return self.make_result_data_json(index_row = index_row)
domain_details = self.domain_details[source_name] domain_details = self.domain_details[source_name]
self.workbook_container.clear_row_sourcing_sheet(index_row = index_row) self.workbook_container.clear_row_sourcing_sheet(index_row = index_row)
processed_count += 1
Cost_Fetcher.log_processing_new_row( Cost_Fetcher.log_processing_new_row(
index_row = index_row index_row = index_row
, source_link = source_link , source_link = source_link
) )
@@ -698,81 +573,104 @@ class Cost_Fetcher:
price = None price = None
active = None active = None
if source_name == self.NAME_DOMAIN_CARD_MARKET: if source_name == self.NAME_DOMAIN_CARD_MARKET:
while (time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): while (self.active_row_indices[index_domain] is None or time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
time.sleep(random.uniform(3, 5)) await asyncio.sleep(random.uniform(3, 5))
self.active_row_indices[index_domain] = index_row
if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]: if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]:
price_quantity_pairs = self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate) price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
if price_quantity_pairs: price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs)
active = True cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(browser = browser, url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
max_quantity = 0
updated_row_price = False
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
max_quantity = max(max_quantity, quantity)
if quantity >= 8:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price
updated_count += 1
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
if not updated_row_price:
print("Offer with quantity >= 8 not found")
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
if max_quantity <= 2 or quantity == max_quantity:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price
updated_count += 1
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
elif source_name == self.NAME_DOMAIN_CHAOS_CARDS: elif source_name == self.NAME_DOMAIN_CHAOS_CARDS:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)): while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
time.sleep(random.uniform(3, 5)) await asyncio.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(url = source_link) self.active_row_indices[index_domain] = index_row
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(browser = browser, url = source_link)
elif source_name == self.NAME_DOMAIN_GAMES_LORE: elif source_name == self.NAME_DOMAIN_GAMES_LORE:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
time.sleep(random.uniform(3, 5)) await asyncio.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(url = source_link) self.active_row_indices[index_domain] = index_row
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(browser = browser, url = source_link)
elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE: elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE:
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)): while (self.active_row_indices[index_domain] is None or time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
time.sleep(random.uniform(3, 5)) await asyncio.sleep(random.uniform(3, 5))
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(url = source_link) self.active_row_indices[index_domain] = index_row
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(browser = browser, url = source_link)
self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time() self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time()
self.active_row_indices[index_domain] = None
if ((cost is not None or price is not None) and active is not None): if ((cost is None and price is None) or active is None):
print(f" Found cost: {cost}, price: {price}, active: {active}")
self.workbook_container.update_row_sourcing_sheet(index_row = index_row, unit_cost = cost, unit_price = price, active = active)
updated_count += 1
else:
print(f" Error: Could not find cost on page") print(f" Error: Could not find cost on page")
return self.make_result_data_json(
index_row = index_row
, cost = cost
, price = price
, active = active
)
@classmethod
def make_result(cls, url, success, data, error):
return {
cls.URL_FLAG: url
, cls.SUCCESS_FLAG: success
, cls.DATA_FLAG: data
, cls.ERROR_FLAG: error
}
@classmethod
def make_result_data_json(cls, index_row, cost = None, price = None, active = None):
return {
cls.INDEX_ROW_FLAG: index_row
, cls.COST_FLAG: cost
, cls.PRICE_FLAG: price
, cls.ACTIVE_FLAG: active
}
def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs):
if not price_quantity_pairs: return None, False
max_quantity = 0
updated_row_price = False
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
max_quantity = max(max_quantity, quantity)
if quantity >= 8:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
if not updated_row_price:
print("Offer with quantity >= 8 not found")
for price_quantity_pair in price_quantity_pairs:
eur_price = price_quantity_pair['price']
quantity = price_quantity_pair['quantity']
print(f" Found price: €{eur_price}")
print(f" Found quantity: {quantity}")
if max_quantity <= 2 or quantity == max_quantity:
if eur_price:
price = eur_price * self.eur_to_gbp_rate
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price
updated_row_price = True
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
break
else:
print(f" Error: Could not parse price")
return price, True
@staticmethod @staticmethod
def log_processing_new_row(index_row, source_link): def log_processing_new_row(index_row, source_link):
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"Processing row {index_row}: {source_link}") print(f"Processing row {index_row}: {source_link}")
print(f"{'='*60}") print(f"{'='*60}")
def main(): async def main():
cost_fetcher = Cost_Fetcher() cost_fetcher = Cost_Fetcher()
cost_fetcher.fetch_all() await cost_fetcher.fetch_all()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

View File

@@ -13,3 +13,4 @@ xlsxwriter
# selenium # selenium
# undetected_chromedriver # undetected_chromedriver
playwright playwright
aioconsole