Files
tcg_web_scraper/product_scraping/single run/product_scraper.py

237 lines
10 KiB
Python

"""
Project: Shuffle & Skirmish Market Scraper
Author: Edward Middleton-Smith
Shuffle & Skirmish
Technology: Business Objects
Feature: Product Scraper Class
"""
import pandas as pd
from openpyxl import load_workbook, Workbook
from openpyxl.worksheet.worksheet import Worksheet
import requests
import re
import time
import random
from playwright.sync_api import sync_playwright, Browser, Page
from playwright.async_api import async_playwright
import asyncio
from aioconsole import ainput
from collections import defaultdict
from datetime import datetime, timedelta
class Product_Scraper:
domain: str
page: Page
def __init__(self, domain):
print("Setting up browser automation")
self.domain = domain
@staticmethod
def parse_cost(cost_text):
if not cost_text:
return None
cost_clean = re.sub(r'[^\d,]', '', cost_text)
try:
return float(cost_clean) / 100
except ValueError:
return None
@classmethod
def parse_cost_chaoscards(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_cardmarket(cls, cost_text):
"""Convert '141,30 €' format to float in EUR"""
if not cost_text:
return None
cost_clean = re.sub(r'[^\d,]', '', cost_text)
cost_clean = cost_clean.replace(',', '.')
try:
return float(cost_clean)
except ValueError:
return None
@classmethod
def parse_cost_gameslore(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_magicmadhouse(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
@classmethod
def parse_cost_newrealitiesgaming(cls, cost_text):
return cls.parse_cost(cost_text = cost_text)
async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses):
print(f" Loading page...")
self.page = await browser.new_page()
await self.page.goto(url = url)
await asyncio.sleep(random.uniform(20, 25))
cost = None
active = None
try:
element = self.page.locator(selector = page_load_element_selector)
page_title = await self.page.title()
print(f" Page title: {page_title}")
element = self.page.locator(selector = cost_selector)
text = await element.text_content()
print(f" Text: '{text}'")
cost = text
active = None
if active_selector is None:
active = (cost is not None)
else:
try:
elements = await self.page.query_selector_all(selector = active_selector)
print(f'# active elements: {len(elements)}')
if len(elements) == 0:
active = True
else:
text = await elements[0].text_content()
text = text.strip()
print(f" Text: '{text}'")
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
except Exception as e:
print(f" Selector failed: {e}")
if cost is None or active is None:
print(f" ✗ No cost found")
# await ainput("Press Enter to continue to next URL...")
print(f"Cost: {cost}, Active: {active}")
except Exception as e:
print(f" Error: {e}")
# await ainput("Press Enter to continue to next URL...")
return None, None
finally:
await self.page.close()
return cost, active
async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate):
page_load_element_selector = "body > main.container > div.page-title-container"
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = None
, invalid_active_statuses = []
)
cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text)
if cost is not None:
item_shipping_cost_in = 0
if cost < 10:
item_shipping_cost_in = 2
elif cost < 100:
item_shipping_cost_in = 8
else:
item_shipping_cost_in = 20
cost = cost * eur_to_gbp_rate + item_shipping_cost_in
active = (cost is not None)
return cost, active
async def scrape_cost_and_active_playwright_chaoscards(self, browser, url):
cost_selector = '.price_inc > span:nth-child(2)'
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = cost_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ["Out of stock", "Coming soon"]
)
cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_gameslore(self, browser, url):
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
active_selector = '.stock > span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = cost_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ["OUT OF STOCK"]
)
cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url):
page_load_element_selector = '.productView-title'
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
active_selector = '.alertBox.alertBox--error'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = []
)
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
return cost, active
async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url):
button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button'
page_load_element_selector = button_selector
cost_selector = f'{button_selector} span:nth-child(2)'
active_selector = f'{button_selector} span:nth-child(1)'
cost_text, active = await self.scrape_cost_and_active_playwright(
browser = browser
, url = url
, page_load_element_selector = page_load_element_selector
, cost_selector = cost_selector
, active_selector = active_selector
, invalid_active_statuses = ['Out of stock']
)
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
return cost, active
async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate):
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
quantity_selector = 'div.amount-container > span:nth-child(1)'
print(f" Loading page...")
self.page = await browser.new_page()
await self.page.goto(url = url)
await asyncio.sleep(random.uniform(20, 25))
try:
page_title = await self.page.title()
print(f" Page title: {page_title}")
price_quantity_pairs = []
try:
offer_containers = await self.page.query_selector_all(offer_container_selector)
print(f" Offer container selector: Found {len(offer_containers)} elements")
for offer_container in offer_containers:
price_element = await offer_container.query_selector(price_selector)
price_text = await price_element.text_content()
if '' in price_text and re.search(r'\d', price_text):
print(f" ✓ Found price: {price_text}")
else:
price_text = None
quantity_element = await offer_container.query_selector(quantity_selector)
quantity_text = await quantity_element.text_content()
if price_text is None or quantity_text is None:
continue
price_quantity_pairs.append({
'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate
, 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text)
})
except Exception as e:
print(f" Price selector failed: {e}")
# await ainput("Press enter to continue to next URL...")
return []
finally:
await self.page.close()
return price_quantity_pairs