Feat: Some new source websites not fully integrated.
This commit is contained in:
BIN
product_scraping/TCG Sole Trader Copy (DEAD).xlsx
Normal file
BIN
product_scraping/TCG Sole Trader Copy (DEAD).xlsx
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,237 +0,0 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Product Scraper Class
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
import requests
|
||||
import re
|
||||
import time
|
||||
import random
|
||||
from playwright.sync_api import sync_playwright, Browser, Page
|
||||
from playwright.async_api import async_playwright
|
||||
import asyncio
|
||||
from aioconsole import ainput
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
class Product_Scraper:
|
||||
domain: str
|
||||
page: Page
|
||||
|
||||
def __init__(self, domain):
|
||||
print("Setting up browser automation")
|
||||
self.domain = domain
|
||||
|
||||
@staticmethod
|
||||
def parse_cost(cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
try:
|
||||
return float(cost_clean) / 100
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def parse_cost_chaoscards(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_cardmarket(cls, cost_text):
|
||||
"""Convert '141,30 €' format to float in EUR"""
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
cost_clean = cost_clean.replace(',', '.')
|
||||
try:
|
||||
return float(cost_clean)
|
||||
except ValueError:
|
||||
return None
|
||||
@classmethod
|
||||
def parse_cost_gameslore(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_magicmadhouse(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_newrealitiesgaming(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses):
|
||||
print(f" Loading page...")
|
||||
self.page = await browser.new_page()
|
||||
await self.page.goto(url = url)
|
||||
await asyncio.sleep(random.uniform(20, 25))
|
||||
cost = None
|
||||
active = None
|
||||
try:
|
||||
element = self.page.locator(selector = page_load_element_selector)
|
||||
page_title = await self.page.title()
|
||||
print(f" Page title: {page_title}")
|
||||
|
||||
element = self.page.locator(selector = cost_selector)
|
||||
text = await element.text_content()
|
||||
print(f" Text: '{text}'")
|
||||
cost = text
|
||||
|
||||
active = None
|
||||
if active_selector is None:
|
||||
active = (cost is not None)
|
||||
else:
|
||||
try:
|
||||
elements = await self.page.query_selector_all(selector = active_selector)
|
||||
print(f'# active elements: {len(elements)}')
|
||||
if len(elements) == 0:
|
||||
active = True
|
||||
else:
|
||||
text = await elements[0].text_content()
|
||||
text = text.strip()
|
||||
print(f" Text: '{text}'")
|
||||
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
|
||||
except Exception as e:
|
||||
print(f" Selector failed: {e}")
|
||||
|
||||
if cost is None or active is None:
|
||||
print(f" ✗ No cost found")
|
||||
# await ainput("Press Enter to continue to next URL...")
|
||||
print(f"Cost: {cost}, Active: {active}")
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
# await ainput("Press Enter to continue to next URL...")
|
||||
return None, None
|
||||
finally:
|
||||
await self.page.close()
|
||||
return cost, active
|
||||
|
||||
async def scrape_cost_and_active_playwright_cardmarket(self, browser, url, eur_to_gbp_rate):
|
||||
page_load_element_selector = "body > main.container > div.page-title-container"
|
||||
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
cost_text, active = await self.scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = None
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_cardmarket(cost_text = cost_text)
|
||||
if cost is not None:
|
||||
item_shipping_cost_in = 0
|
||||
if cost < 10:
|
||||
item_shipping_cost_in = 2
|
||||
elif cost < 100:
|
||||
item_shipping_cost_in = 8
|
||||
else:
|
||||
item_shipping_cost_in = 20
|
||||
cost = cost * eur_to_gbp_rate + item_shipping_cost_in
|
||||
active = (cost is not None)
|
||||
return cost, active
|
||||
|
||||
async def scrape_cost_and_active_playwright_chaoscards(self, browser, url):
|
||||
cost_selector = '.price_inc > span:nth-child(2)'
|
||||
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)'
|
||||
cost_text, active = await self.scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = cost_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["Out of stock", "Coming soon"]
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_chaoscards(cost_text = cost_text)
|
||||
return cost, active
|
||||
|
||||
async def scrape_cost_and_active_playwright_gameslore(self, browser, url):
|
||||
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
|
||||
active_selector = '.stock > span:nth-child(1)'
|
||||
cost_text, active = await self.scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = cost_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["OUT OF STOCK"]
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_gameslore(cost_text = cost_text)
|
||||
return cost, active
|
||||
|
||||
async def scrape_cost_and_active_playwright_magicmadhouse(self, browser, url):
|
||||
page_load_element_selector = '.productView-title'
|
||||
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
|
||||
active_selector = '.alertBox.alertBox--error'
|
||||
cost_text, active = await self.scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
|
||||
return cost, active
|
||||
|
||||
async def scrape_cost_and_active_playwright_newrealitiesgaming(self, browser, url):
|
||||
button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button'
|
||||
page_load_element_selector = button_selector
|
||||
cost_selector = f'{button_selector} span:nth-child(2)'
|
||||
active_selector = f'{button_selector} span:nth-child(1)'
|
||||
cost_text, active = await self.scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ['Out of stock']
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text = cost_text)
|
||||
return cost, active
|
||||
|
||||
async def scrape_prices_and_quantities_playwright_cardmarket(self, browser: Browser, url, eur_to_gbp_rate):
|
||||
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
|
||||
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
quantity_selector = 'div.amount-container > span:nth-child(1)'
|
||||
|
||||
print(f" Loading page...")
|
||||
self.page = await browser.new_page()
|
||||
await self.page.goto(url = url)
|
||||
await asyncio.sleep(random.uniform(20, 25))
|
||||
|
||||
try:
|
||||
page_title = await self.page.title()
|
||||
print(f" Page title: {page_title}")
|
||||
|
||||
price_quantity_pairs = []
|
||||
try:
|
||||
offer_containers = await self.page.query_selector_all(offer_container_selector)
|
||||
print(f" Offer container selector: Found {len(offer_containers)} elements")
|
||||
for offer_container in offer_containers:
|
||||
price_element = await offer_container.query_selector(price_selector)
|
||||
price_text = await price_element.text_content()
|
||||
if '€' in price_text and re.search(r'\d', price_text):
|
||||
print(f" ✓ Found price: {price_text}")
|
||||
else:
|
||||
price_text = None
|
||||
|
||||
quantity_element = await offer_container.query_selector(quantity_selector)
|
||||
quantity_text = await quantity_element.text_content()
|
||||
|
||||
if price_text is None or quantity_text is None:
|
||||
continue
|
||||
price_quantity_pairs.append({
|
||||
'price': Product_Scraper.parse_cost_cardmarket(cost_text = price_text) * eur_to_gbp_rate
|
||||
, 'quantity': Product_Scraper.parse_cost_cardmarket(cost_text = quantity_text)
|
||||
})
|
||||
except Exception as e:
|
||||
print(f" Price selector failed: {e}")
|
||||
# await ainput("Press enter to continue to next URL...")
|
||||
return []
|
||||
finally:
|
||||
await self.page.close()
|
||||
return price_quantity_pairs
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
103
product_scraping/single_run/card_market_product_scraper.py
Normal file
103
product_scraping/single_run/card_market_product_scraper.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Card Market Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
import re
|
||||
import random
|
||||
from playwright.sync_api import Browser
|
||||
import asyncio
|
||||
|
||||
class Card_Market_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_CARD_MARKET: str = 'Card Market'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_CARD_MARKET)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
"""Convert '141,30 €' format to float in EUR"""
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
cost_clean = cost_clean.replace(',', '.')
|
||||
try:
|
||||
return float(cost_clean)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url, eur_to_gbp_rate):
|
||||
page_load_element_selector = "body > main.container > div.page-title-container"
|
||||
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = None
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
if cost is not None:
|
||||
item_shipping_cost_in = 0
|
||||
if cost < 10:
|
||||
item_shipping_cost_in = 2
|
||||
elif cost < 100:
|
||||
item_shipping_cost_in = 8
|
||||
else:
|
||||
item_shipping_cost_in = 20
|
||||
cost = cost * eur_to_gbp_rate + item_shipping_cost_in
|
||||
active = (cost is not None)
|
||||
return cost, active
|
||||
|
||||
async def scrape_prices_and_quantities_playwright(self, browser: Browser, url, eur_to_gbp_rate):
|
||||
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
|
||||
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
quantity_selector = 'div.amount-container > span:nth-child(1)'
|
||||
|
||||
# print(f" Loading page...")
|
||||
self.page = await browser.new_page()
|
||||
await self.page.goto(url = url)
|
||||
await asyncio.sleep(random.uniform(20, 25))
|
||||
|
||||
price_quantity_pairs = []
|
||||
try:
|
||||
page_title = await self.page.title()
|
||||
# print(f" Page title: {page_title}")
|
||||
|
||||
offer_containers = await self.page.query_selector_all(offer_container_selector)
|
||||
# print(f" Offer container selector: Found {len(offer_containers)} elements")
|
||||
for offer_container in offer_containers:
|
||||
price_element = await offer_container.query_selector(price_selector)
|
||||
price_text = await price_element.text_content()
|
||||
if '€' in price_text and re.search(r'\d', price_text):
|
||||
# print(f" ✓ Found price: {price_text}")
|
||||
pass
|
||||
else:
|
||||
price_text = None
|
||||
|
||||
quantity_element = await offer_container.query_selector(quantity_selector)
|
||||
quantity_text = await quantity_element.text_content()
|
||||
|
||||
if price_text is None or quantity_text is None:
|
||||
continue
|
||||
price_quantity_pairs.append({
|
||||
'price': self.parse_cost(cost_text = price_text) * eur_to_gbp_rate
|
||||
, 'quantity': self.parse_cost(cost_text = quantity_text)
|
||||
})
|
||||
except Exception as e:
|
||||
# print(f" Price selector failed: {e}")
|
||||
# await ainput("Press enter to continue to next URL...")
|
||||
print(f"\n\nError getting cardmarket prices: {e}\npage: {page_title}\nprices: {price_text}\nquantity: {quantity_text}")
|
||||
raise e # ValueError(f"Price selector failed: {e}")
|
||||
return []
|
||||
finally:
|
||||
await self.page.close()
|
||||
return price_quantity_pairs
|
||||
36
product_scraping/single_run/chaos_cards_product_scraper.py
Normal file
36
product_scraping/single_run/chaos_cards_product_scraper.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Chaos Cards Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
class Chaos_Cards_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_CHAOS_CARDS)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
cost_selector = '.price_inc > span:nth-child(2)'
|
||||
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li.prod_det_stock > div:nth-child(1) > div:nth-child(2)'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = cost_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["Out of stock", "Coming soon"]
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
37
product_scraping/single_run/distro_gg_product_scraper.py
Normal file
37
product_scraping/single_run/distro_gg_product_scraper.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Distro GG Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
class Distro_GG_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_DISTRO_GG: str = 'Distro GG'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_DISTRO_GG)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
page_load_element_selector = '#main-content'
|
||||
cost_selector = '.drawer .drawer-content .grid .items-baseline h2.items-center'
|
||||
active_selector = '.drawer .drawer-content .grid div div div button span'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ['Pro Subscription required to Request Allocation', None]
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
36
product_scraping/single_run/games_lore_product_scraper.py
Normal file
36
product_scraping/single_run/games_lore_product_scraper.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Games Lore Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
class Games_Lore_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_GAMES_LORE: str = 'Games Lore'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_GAMES_LORE)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
|
||||
active_selector = '.stock > span:nth-child(1)'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = cost_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["OUT OF STOCK"]
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Magic Madhouse Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
class Magic_Madhouse_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_MAGIC_MADHOUSE)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
page_load_element_selector = '.productView-title'
|
||||
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
|
||||
active_selector = '.alertBox.alertBox--error'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
@@ -0,0 +1,40 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: New Realities Gaming Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
""" This website is probably not legit
|
||||
class New_Realities_Gaming_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_NEW_REALITIES_GAMING: str = 'New Realities Gaming'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_NEW_REALITIES_GAMING)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
button_selector = 'div.display-desktop.add-to-cart-button__wrapper div.w-wrapper form button'
|
||||
page_load_element_selector = button_selector
|
||||
cost_selector = f'{button_selector} span:nth-child(2)'
|
||||
active_selector = f'{button_selector} span:nth-child(1)'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ['Out of stock']
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
"""
|
||||
83
product_scraping/single_run/product_scraper.py
Normal file
83
product_scraping/single_run/product_scraper.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Product Scraper Class
|
||||
"""
|
||||
|
||||
import re
|
||||
import random
|
||||
from playwright.sync_api import Browser, Page
|
||||
import asyncio
|
||||
|
||||
|
||||
class Product_Scraper:
|
||||
domain: str
|
||||
page: Page
|
||||
|
||||
def __init__(self, domain):
|
||||
print("Setting up browser automation")
|
||||
self.domain = domain
|
||||
|
||||
@staticmethod
|
||||
def parse_cost(cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
try:
|
||||
return float(cost_clean) / 100
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser: Browser, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses):
|
||||
# print(f" Loading page...")
|
||||
self.page = await browser.new_page()
|
||||
await self.page.goto(url = url)
|
||||
await asyncio.sleep(random.uniform(20, 25))
|
||||
cost = None
|
||||
active = None
|
||||
try:
|
||||
element = self.page.locator(selector = page_load_element_selector)
|
||||
page_title = await self.page.title()
|
||||
# print(f" Page title: {page_title}")
|
||||
|
||||
element = self.page.locator(selector = cost_selector)
|
||||
text = await element.text_content()
|
||||
# print(f" Text: '{text}'")
|
||||
cost = text
|
||||
|
||||
active = None
|
||||
if active_selector is None:
|
||||
active = (cost is not None)
|
||||
else:
|
||||
# try:
|
||||
elements = await self.page.query_selector_all(selector = active_selector)
|
||||
# print(f'# active elements: {len(elements)}')
|
||||
if len(elements) == 0:
|
||||
active = True
|
||||
else:
|
||||
text = await elements[0].text_content()
|
||||
text = text.strip()
|
||||
# print(f" Text: '{text}'")
|
||||
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
|
||||
# except Exception as e:
|
||||
# print(f" Selector failed: {e}")
|
||||
# raise ValueError(f"Active selector failed: {e}")
|
||||
|
||||
if cost is None or active is None:
|
||||
# print(f" ✗ No cost found")
|
||||
# await ainput("Press Enter to continue to next URL...")
|
||||
raise Exception(f"Cost not found: {e}")
|
||||
# print(f"Cost: {cost}, Active: {active}")
|
||||
|
||||
except Exception as e:
|
||||
# print(f" Error: {e}")
|
||||
# await ainput("Press Enter to continue to next URL...")
|
||||
print(f"\n\nError getting costs: {e}\npage: {page_title}\ncost: {cost}\nactive: {active | text}")
|
||||
raise e
|
||||
return None, None
|
||||
finally:
|
||||
await self.page.close()
|
||||
return cost, active
|
||||
305
product_scraping/single_run/product_scraper_controller.py
Normal file
305
product_scraping/single_run/product_scraper_controller.py
Normal file
@@ -0,0 +1,305 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: Cost Fetcher Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
from card_market_product_scraper import Card_Market_Product_Scraper
|
||||
from chaos_cards_product_scraper import Chaos_Cards_Product_Scraper
|
||||
from games_lore_product_scraper import Games_Lore_Product_Scraper
|
||||
from magic_madhouse_product_scraper import Magic_Madhouse_Product_Scraper
|
||||
from the_game_collection_product_scraper import The_Game_Collection_Product_Scraper
|
||||
# from new_realities_gaming_product_scraper import New_Realities_Gaming_Product_Scraper
|
||||
from tcg_sole_trader_workbook_container import TCG_Sole_Trader_Workbook_Container
|
||||
|
||||
# External
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
import requests
|
||||
import re
|
||||
import time
|
||||
import random
|
||||
from playwright.sync_api import sync_playwright, Browser, Page
|
||||
from playwright.async_api import async_playwright
|
||||
import asyncio
|
||||
from aioconsole import ainput
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
class Product_Scraper_Controller:
|
||||
ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On'
|
||||
ACTIVE_FLAG: str = 'Active'
|
||||
COST_FLAG: str = 'Cost'
|
||||
DATA_FLAG: str = 'Data'
|
||||
ERROR_FLAG: str = 'Error'
|
||||
INDEX_DOMAIN_FLAG: str = 'Index Domain'
|
||||
INDEX_ROW_FLAG: str = 'Index Row'
|
||||
NAME_FLAG: str = 'Name'
|
||||
PRICE_FLAG: str = 'Price'
|
||||
SUCCESS_FLAG: str = 'Success'
|
||||
URL_FLAG: str = 'Url'
|
||||
|
||||
domain_names: list[str]
|
||||
eur_to_gbp_rate: float
|
||||
product_scrapers: list[Product_Scraper]
|
||||
product_sources: pd.DataFrame
|
||||
workbook_container: TCG_Sole_Trader_Workbook_Container
|
||||
|
||||
def __init__(self):
|
||||
self.domain_names = [
|
||||
Card_Market_Product_Scraper.NAME_DOMAIN_CARD_MARKET
|
||||
, Chaos_Cards_Product_Scraper.NAME_DOMAIN_CHAOS_CARDS
|
||||
, Games_Lore_Product_Scraper.NAME_DOMAIN_GAMES_LORE
|
||||
, Magic_Madhouse_Product_Scraper.NAME_DOMAIN_MAGIC_MADHOUSE
|
||||
, The_Game_Collection_Product_Scraper.NAME_DOMAIN_THE_GAME_COLLECTION
|
||||
]
|
||||
domain_details = {}
|
||||
product_scrapers = []
|
||||
for index_domain in range(len(self.domain_names)):
|
||||
domain_name = self.domain_names[index_domain]
|
||||
domain_details[domain_name] = {
|
||||
self.NAME_FLAG: domain_name
|
||||
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(domain_name)
|
||||
, self.ACCESSED_LAST_ON_FLAG: 0
|
||||
}
|
||||
# product_scraper = Product_Scraper(domain)
|
||||
product_scraper = None
|
||||
if domain_name == Card_Market_Product_Scraper.NAME_DOMAIN_CARD_MARKET:
|
||||
product_scraper = Card_Market_Product_Scraper()
|
||||
elif domain_name == Chaos_Cards_Product_Scraper.NAME_DOMAIN_CHAOS_CARDS:
|
||||
product_scraper = Chaos_Cards_Product_Scraper()
|
||||
elif domain_name == Games_Lore_Product_Scraper.NAME_DOMAIN_GAMES_LORE:
|
||||
product_scraper = Games_Lore_Product_Scraper()
|
||||
elif domain_name == Magic_Madhouse_Product_Scraper.NAME_DOMAIN_MAGIC_MADHOUSE:
|
||||
product_scraper = Magic_Madhouse_Product_Scraper()
|
||||
elif domain_name == The_Game_Collection_Product_Scraper.NAME_DOMAIN_THE_GAME_COLLECTION:
|
||||
product_scraper = The_Game_Collection_Product_Scraper()
|
||||
|
||||
if product_scraper is not None:
|
||||
product_scrapers.append(product_scraper)
|
||||
else:
|
||||
raise ValueError(f'Domain not known: {index_domain}, {domain_name}')
|
||||
self.domain_details = domain_details
|
||||
self.product_scrapers = product_scrapers
|
||||
self.workbook_container = TCG_Sole_Trader_Workbook_Container()
|
||||
self.get_eur_to_gbp_rate()
|
||||
|
||||
async def fetch_single_with_browser(self, browser, domain_name, product_source):
|
||||
"""Fetch a single URL using the provided browser"""
|
||||
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW]
|
||||
source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK]
|
||||
print(f"\n\nFetching\nrow: {index_row}\ndomain: {domain_name}\nlink: {source_link}")
|
||||
|
||||
self.workbook_container.clear_row_sourcing_sheet(index_row = index_row)
|
||||
self.log_processing_new_row(
|
||||
index_row = index_row
|
||||
, source_link = source_link
|
||||
)
|
||||
|
||||
index_domain = self.get_index_domain_from_name(domain_name)
|
||||
cost = None
|
||||
price = None
|
||||
active = None
|
||||
|
||||
try:
|
||||
if domain_name == Card_Market_Product_Scraper.NAME_DOMAIN_CARD_MARKET:
|
||||
if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]:
|
||||
price_quantity_pairs = await self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright(
|
||||
browser = browser
|
||||
, url = source_link
|
||||
, eur_to_gbp_rate = self.eur_to_gbp_rate
|
||||
)
|
||||
price = self.get_sale_price_from_price_quantity_pairs(price_quantity_pairs = price_quantity_pairs)
|
||||
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = source_link
|
||||
, eur_to_gbp_rate = self.eur_to_gbp_rate
|
||||
)
|
||||
elif domain_name in (
|
||||
Chaos_Cards_Product_Scraper.NAME_DOMAIN_CHAOS_CARDS
|
||||
, Games_Lore_Product_Scraper.NAME_DOMAIN_GAMES_LORE
|
||||
, Magic_Madhouse_Product_Scraper.NAME_DOMAIN_MAGIC_MADHOUSE
|
||||
, The_Game_Collection_Product_Scraper.NAME_DOMAIN_THE_GAME_COLLECTION
|
||||
):
|
||||
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = source_link
|
||||
)
|
||||
""" unverified
|
||||
elif domain_name == self.NAME_DOMAIN_NEW_REALITIES_GAMING:
|
||||
cost, active = await self.product_scrapers[index_domain].scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = source_link
|
||||
)
|
||||
"""
|
||||
if ((cost is None and price is None) or active is None):
|
||||
print(f"\n\nError: Could not find cost on page\nrow: {index_row}\ndomain: {domain_name}\nlink: {source_link}\nprice: {price}\ncost: {cost}\nactive: {active}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n\nError: Could not find cost on page\nrow: {index_row}\ndomain: {domain_name}\nlink: {source_link}\nprice: {price}\ncost: {cost}\nactive: {active}\nerror: {e}")
|
||||
|
||||
return self.make_result_data_json(
|
||||
index_row = index_row
|
||||
, cost = cost
|
||||
, price = price
|
||||
, active = active
|
||||
)
|
||||
|
||||
async def fetch_all(self):
|
||||
try:
|
||||
processed_count = 0
|
||||
updated_count = 0
|
||||
self.product_sources = self.workbook_container.get_sourcing_entries()
|
||||
|
||||
# Group product sources by domain
|
||||
domain_groups = {domain: [] for domain in self.domain_names}
|
||||
for index_product_source, product_source in self.product_sources.iterrows():
|
||||
source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME]
|
||||
if source_name in domain_groups:
|
||||
domain_groups[source_name].append(product_source)
|
||||
|
||||
# Create one browser per domain and process all URLs for that domain
|
||||
async with async_playwright() as p:
|
||||
# Create one browser per domain that has URLs
|
||||
domain_tasks = []
|
||||
for domain_name in self.domain_names:
|
||||
if domain_groups[domain_name]: # Only if there are URLs for this domain
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
task = self.process_domain_urls(browser, domain_name, domain_groups[domain_name])
|
||||
domain_tasks.append(task)
|
||||
|
||||
# Process all domains in parallel
|
||||
all_sourced_products = await asyncio.gather(*domain_tasks)
|
||||
|
||||
# Flatten results from all domains
|
||||
sourced_products = []
|
||||
for domain_results in all_sourced_products:
|
||||
sourced_products.extend(domain_results)
|
||||
|
||||
# Process results
|
||||
for sourced_product in sourced_products:
|
||||
index_row = sourced_product[self.INDEX_ROW_FLAG]
|
||||
unit_cost = sourced_product[self.COST_FLAG]
|
||||
unit_price = sourced_product[self.PRICE_FLAG]
|
||||
active = sourced_product[self.ACTIVE_FLAG]
|
||||
processed_count += 1
|
||||
if not active:
|
||||
continue
|
||||
updated_count += 1
|
||||
self.workbook_container.update_row_sourcing_sheet(
|
||||
index_row = index_row
|
||||
, unit_cost = unit_cost
|
||||
, unit_price = unit_price
|
||||
, active = active
|
||||
)
|
||||
self.workbook_container.save_workbook()
|
||||
print(f"\nComplete!")
|
||||
print(f"Processed: {processed_count} entries")
|
||||
print(f"Updated: {updated_count} costs")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
async def process_domain_urls(self, browser, domain_name, product_sources):
|
||||
"""Process all URLs for a single domain sequentially with rate limiting"""
|
||||
results = []
|
||||
domain_details = self.domain_details[domain_name]
|
||||
last_access_time = domain_details[self.ACCESSED_LAST_ON_FLAG]
|
||||
try:
|
||||
for product_source in product_sources:
|
||||
# Rate limiting: wait 60s between requests to same domain
|
||||
time_since_last = time.time() - last_access_time
|
||||
if time_since_last < 45:
|
||||
wait_time = 45 - time_since_last + random.uniform(0, 5) # 45-50s
|
||||
print(f" [{domain_name}] Waiting {wait_time:.1f}s before next request...")
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# Process the URL
|
||||
result = await self.fetch_single_with_browser(browser, domain_name, product_source)
|
||||
results.append(result)
|
||||
last_access_time = time.time()
|
||||
|
||||
finally:
|
||||
await browser.close()
|
||||
|
||||
return results
|
||||
|
||||
def get_index_domain_from_name(self, domain_name):
|
||||
for index_domain in range(len(self.domain_names)):
|
||||
if (self.domain_names[index_domain] == domain_name):
|
||||
return index_domain
|
||||
raise ValueError(f'Domain does not exist: {domain_name}')
|
||||
|
||||
def get_eur_to_gbp_rate(self):
|
||||
try:
|
||||
response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10)
|
||||
data = response.json()
|
||||
self.eur_to_gbp_rate = data['rates']['GBP']
|
||||
except Exception as e:
|
||||
print(f"Error fetching exchange rate: {e}")
|
||||
print("Using fallback rate: 0.85")
|
||||
self.eur_to_gbp_rate = 0.85
|
||||
|
||||
@classmethod
|
||||
def make_result_data_json(cls, index_row, cost = None, price = None, active = None):
|
||||
return {
|
||||
cls.INDEX_ROW_FLAG: index_row
|
||||
, cls.COST_FLAG: cost
|
||||
, cls.PRICE_FLAG: price
|
||||
, cls.ACTIVE_FLAG: active
|
||||
}
|
||||
|
||||
def get_sale_price_from_price_quantity_pairs(self, price_quantity_pairs):
|
||||
if not price_quantity_pairs:
|
||||
return None
|
||||
|
||||
max_quantity = 0
|
||||
price = None
|
||||
|
||||
# First pass: look for quantity >= 8
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
# print(f" Found price: €{eur_price}")
|
||||
# print(f" Found quantity: {quantity}")
|
||||
max_quantity = max(max_quantity, quantity)
|
||||
|
||||
if quantity >= 8 and price: # eur_price:
|
||||
# price = eur_price * self.eur_to_gbp_rate
|
||||
# print(f" Converted: {quantity}x €{eur_price:.2f} → £{price:.2f}")
|
||||
return price
|
||||
|
||||
# Second pass: use max quantity if no quantity >= 8
|
||||
# print("Offer with quantity >= 8 not found")
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
|
||||
if (max_quantity <= 2 or quantity == max_quantity) and price: # eur_price:
|
||||
# price = eur_price * self.eur_to_gbp_rate
|
||||
# print(f" Converted: {quantity}x €{eur_price:.2f} → £{price:.2f}")
|
||||
return price
|
||||
|
||||
return price
|
||||
|
||||
@staticmethod
|
||||
def log_processing_new_row(index_row, source_link):
|
||||
"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Processing row {index_row}: {source_link}")
|
||||
print(f"{'='*60}")
|
||||
"""
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
controller = Product_Scraper_Controller()
|
||||
await controller.fetch_all()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,201 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: TCG Sole Trader Workbook Container Class
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
|
||||
class TCG_Sole_Trader_Workbook_Container:
|
||||
NAME_COLUMN_ACTIVE: str = 'Active'
|
||||
NAME_COLUMN_INDEX_ROW: str = 'Index Row'
|
||||
NAME_COLUMN_LINK: str = 'Link'
|
||||
NAME_COLUMN_PRODUCT_ID: str = 'Product Id'
|
||||
NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster'
|
||||
NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box'
|
||||
NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon'
|
||||
NAME_COLUMN_SOURCE_NAME: str = 'Source Name'
|
||||
NAME_COLUMN_UNIT_COST: str = 'Cost'
|
||||
NAME_COLUMN_UNIT_PRICE: str = 'Price'
|
||||
PRODUCT_WORKSHEET_NAME = 'Product'
|
||||
SOURCING_WORKSHEET_NAME = 'Sourcing'
|
||||
WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx'
|
||||
|
||||
index_column_active_sourcing: int
|
||||
index_column_is_booster_product: int
|
||||
index_column_is_booster_box_product: int
|
||||
index_column_is_precon_product: int
|
||||
index_column_link_sourcing: int
|
||||
index_column_name_sourcing: int
|
||||
index_column_product_id_product: int
|
||||
index_column_product_id_sourcing: int
|
||||
index_column_unit_cost_sourcing: int
|
||||
index_column_unit_price_sourcing: int
|
||||
index_row_header_product: int
|
||||
index_row_header_sourcing: int
|
||||
product_sheet: Worksheet
|
||||
sourcing_sheet: Worksheet
|
||||
workbook: Workbook
|
||||
|
||||
def __init__(self):
|
||||
print("Loading workbook...")
|
||||
self.workbook = load_workbook(self.WORKBOOK_NAME)
|
||||
|
||||
if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
|
||||
self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME]
|
||||
self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME]
|
||||
|
||||
sourcing_table_found = False
|
||||
for row in range(1, self.sourcing_sheet.max_row + 1):
|
||||
if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found or not self.index_row_header_sourcing:
|
||||
for row in range(1, min(20, self.sourcing_sheet.max_row + 1)):
|
||||
if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found:
|
||||
print("Error: Could not find table 'tbl_Sourcing'")
|
||||
return
|
||||
|
||||
product_table_found = False
|
||||
for row in range(1, self.product_sheet.max_row + 1):
|
||||
if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value):
|
||||
self.index_row_header_product = row
|
||||
product_table_found = True
|
||||
break
|
||||
|
||||
if not product_table_found:
|
||||
print("Error: Could not find table 'tbl_Product'")
|
||||
return
|
||||
|
||||
for index_column in range(1, self.sourcing_sheet.max_column + 1):
|
||||
header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip()
|
||||
if 'Source Name' == header:
|
||||
self.index_column_name_sourcing = index_column
|
||||
elif 'Source Link' == header:
|
||||
self.index_column_link_sourcing = index_column
|
||||
elif 'Source Unit Cost' == header:
|
||||
self.index_column_unit_cost_sourcing = index_column
|
||||
elif 'Sale Price' == header:
|
||||
self.index_column_unit_price_sourcing = index_column
|
||||
elif 'Active' == header:
|
||||
self.index_column_active_sourcing = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_sourcing = index_column
|
||||
|
||||
for index_column in range(1, self.product_sheet.max_column + 1):
|
||||
header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip()
|
||||
if 'Is Booster Box' == header:
|
||||
self.index_column_is_booster_box_product = index_column
|
||||
elif 'Is Booster' == header:
|
||||
self.index_column_is_booster_product = index_column
|
||||
elif 'Is Precon' == header:
|
||||
self.index_column_is_precon_product = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_product = index_column
|
||||
|
||||
print(f"Sourcing max row: {self.sourcing_sheet.max_row}")
|
||||
print(f"Sourcing header row: {self.index_row_header_sourcing}")
|
||||
print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}")
|
||||
print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}")
|
||||
print(f"Product max row: {self.product_sheet.max_row}")
|
||||
print(f"Product header row: {self.index_row_header_product}")
|
||||
print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}")
|
||||
print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}")
|
||||
|
||||
if not all([
|
||||
self.index_column_name_sourcing
|
||||
, self.index_column_link_sourcing
|
||||
, self.index_column_unit_cost_sourcing
|
||||
, self.index_column_unit_price_sourcing
|
||||
, self.index_column_product_id_sourcing
|
||||
, self.index_column_active_sourcing
|
||||
, self.index_column_product_id_product
|
||||
, self.index_column_is_booster_product
|
||||
, self.index_column_is_booster_box_product
|
||||
, self.index_column_is_precon_product
|
||||
]):
|
||||
print("Error: Could not find required columns")
|
||||
return
|
||||
|
||||
@classmethod
|
||||
def create_product_source_df(cls):
|
||||
return pd.DataFrame(columns = [
|
||||
cls.NAME_COLUMN_INDEX_ROW
|
||||
, cls.NAME_COLUMN_PRODUCT_ID
|
||||
, cls.NAME_COLUMN_SOURCE_NAME
|
||||
, cls.NAME_COLUMN_LINK
|
||||
, cls.NAME_COLUMN_PRODUCT_IS_BOOSTER
|
||||
, cls.NAME_COLUMN_UNIT_COST
|
||||
, cls.NAME_COLUMN_UNIT_PRICE
|
||||
, cls.NAME_COLUMN_ACTIVE
|
||||
])
|
||||
|
||||
def get_sourcing_entries(self):
|
||||
product_sources = self.create_product_source_df()
|
||||
try:
|
||||
for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1):
|
||||
source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value
|
||||
source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value
|
||||
source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value
|
||||
if not source_name or not source_link:
|
||||
continue
|
||||
# print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
|
||||
product_is_booster = False
|
||||
for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1):
|
||||
product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value
|
||||
if product_id == source_product_id:
|
||||
product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper()
|
||||
product_is_booster = (product_is_booster_text == "TRUE")
|
||||
break
|
||||
# print(f"product is booster: {product_is_booster}")
|
||||
|
||||
product_sources.loc[len(product_sources)] = [
|
||||
index_row
|
||||
, source_product_id
|
||||
, source_name
|
||||
, source_link
|
||||
, product_is_booster
|
||||
, None # cost
|
||||
, None # price
|
||||
, None # active
|
||||
]
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME)
|
||||
return product_sources
|
||||
|
||||
def clear_row_sourcing_sheet(self, index_row):
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE"
|
||||
|
||||
def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None):
|
||||
if unit_cost is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost
|
||||
if unit_price is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price
|
||||
if active is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE"
|
||||
|
||||
def save_workbook(self):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Saving workbook...")
|
||||
self.workbook.save(self.WORKBOOK_NAME)
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Project: Shuffle & Skirmish Market Scraper
|
||||
Author: Edward Middleton-Smith
|
||||
Shuffle & Skirmish
|
||||
|
||||
Technology: Business Objects
|
||||
Feature: The Game Collection Product Scraper Class
|
||||
"""
|
||||
|
||||
# Internal
|
||||
from product_scraper import Product_Scraper
|
||||
# External
|
||||
|
||||
class The_Game_Collection_Product_Scraper(Product_Scraper):
|
||||
NAME_DOMAIN_THE_GAME_COLLECTION: str = 'The Game Collection'
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(domain = self.NAME_DOMAIN_THE_GAME_COLLECTION)
|
||||
|
||||
@classmethod
|
||||
def parse_cost(cls, cost_text):
|
||||
return super().parse_cost(cost_text = cost_text)
|
||||
|
||||
async def scrape_cost_and_active_playwright(self, browser, url):
|
||||
page_load_element_selector = '#main-content'
|
||||
cost_selector = '#main-content .shopify-section .container .product .product-info .price strong'
|
||||
active_selector = '#main-content .shopify-section .container .product .product-info product-form form.js-product-form-main button[type="submit"]'
|
||||
cost_text, active = await super().scrape_cost_and_active_playwright(
|
||||
browser = browser
|
||||
, url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ['Sold out']
|
||||
)
|
||||
cost = self.parse_cost(cost_text = cost_text)
|
||||
return cost, active
|
||||
Reference in New Issue
Block a user