Updated to use Playwright instead of Selenium
This commit is contained in:
@@ -1 +0,0 @@
|
||||
,teddy,lord-T-1024,09.01.2026 15:38,file:///home/teddy/.config/libreoffice/4;
|
||||
Binary file not shown.
@@ -1,593 +0,0 @@
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
import requests
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import StaleElementReferenceException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
import re
|
||||
import time
|
||||
import random
|
||||
|
||||
# import undetected_chromedriver as uc
|
||||
from undetected_chromedriver import Chrome
|
||||
|
||||
|
||||
class Cost_Fetcher_Base:
|
||||
PRODUCT_WORKSHEET_NAME = 'Product'
|
||||
SOURCING_WORKSHEET_NAME = 'Sourcing'
|
||||
WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx'
|
||||
|
||||
driver: Chrome # webdriver.Chrome
|
||||
eur_to_gbp_rate: float
|
||||
index_column_active_sourcing: int
|
||||
index_column_is_booster_product: int
|
||||
index_column_is_booster_box_product: int
|
||||
index_column_is_precon_product: int
|
||||
index_column_link_sourcing: int
|
||||
index_column_name_sourcing: int
|
||||
index_column_product_id_product: int
|
||||
index_column_product_id_sourcing: int
|
||||
index_column_unit_cost_sourcing: int
|
||||
index_row_header_product: int
|
||||
index_row_header_sourcing: int
|
||||
product_sheet: Worksheet
|
||||
sourcing_sheet: Worksheet
|
||||
wait: WebDriverWait
|
||||
workbook: Workbook
|
||||
|
||||
@staticmethod
|
||||
def parse_cost(cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
try:
|
||||
return float(cost_clean) / 100
|
||||
except ValueError:
|
||||
return None
|
||||
@classmethod
|
||||
def parse_cost_from_pennies(cls, cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = cls.parse_cost(cost_text = cost_text)
|
||||
if cost_clean is not None:
|
||||
cost_clean = cost_clean / 100
|
||||
return cost_clean
|
||||
@classmethod
|
||||
def parse_cost_chaoscards(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_cardmarket(cls, cost_text):
|
||||
# return cls.parse_cost(cost_text = cost_text)
|
||||
"""Convert '141,30 €' format to float in EUR"""
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
cost_clean = cost_clean.replace(',', '.')
|
||||
try:
|
||||
return float(cost_clean)
|
||||
except ValueError:
|
||||
return None
|
||||
@classmethod
|
||||
def parse_cost_gameslore(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_magicmadhouse(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
|
||||
def get_eur_to_gbp_rate(self):
|
||||
try:
|
||||
response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10)
|
||||
data = response.json()
|
||||
self.eur_to_gbp_rate = data['rates']['GBP']
|
||||
except Exception as e:
|
||||
print(f"Error fetching exchange rate: {e}")
|
||||
print("Using fallback rate: 0.85")
|
||||
self.eur_to_gbp_rate = 0.85
|
||||
|
||||
def setup_driver(self):
|
||||
print("Starting driver")
|
||||
"""
|
||||
chrome_options = Options()
|
||||
# Remove headless mode to see the browser
|
||||
# chrome_options.add_argument('--headless')
|
||||
chrome_options.add_argument('--no-sandbox')
|
||||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||||
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
|
||||
chrome_options.add_argument('--window-size=1920,1080')
|
||||
"""
|
||||
try:
|
||||
self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options)
|
||||
# return driver
|
||||
except Exception as e:
|
||||
print(f"Error setting up Chrome driver: {e}")
|
||||
print("Make sure Chrome and chromedriver are installed")
|
||||
# return None
|
||||
self.wait = WebDriverWait(self.driver, 15)
|
||||
|
||||
def scrape_cost_and_active_selenium(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses):
|
||||
try:
|
||||
print(f" Loading page...")
|
||||
# time.sleep(random.uniform(6, 10))
|
||||
try:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
except Exception as e:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
|
||||
max_attempts = 10
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
element = None
|
||||
element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector)
|
||||
text = element.text
|
||||
print(f"✓ Element loaded successfully on attempt {attempt + 1}")
|
||||
# return True
|
||||
break
|
||||
except StaleElementReferenceException:
|
||||
print(f"Stale element on attempt {attempt + 1}, retrying...")
|
||||
if attempt < max_attempts - 1:
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ValueError("StaleElementReferenceException")
|
||||
|
||||
print(f" Page title: {self.driver.title}")
|
||||
|
||||
cost = None
|
||||
element = None
|
||||
counter = 0
|
||||
while cost is None:
|
||||
counter += 1
|
||||
try:
|
||||
element = self.driver.find_element(By.CSS_SELECTOR, cost_selector)
|
||||
text = element.text
|
||||
print(f" Text: '{text}'")
|
||||
cost = text
|
||||
except Exception as e:
|
||||
print(f" Selector failed: {e}")
|
||||
cost = None
|
||||
time.sleep(random.uniform(2, 4))
|
||||
if counter > 10:
|
||||
print("10 cost selector fails")
|
||||
break
|
||||
|
||||
active = None
|
||||
if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []:
|
||||
active = (cost is not None)
|
||||
else:
|
||||
try:
|
||||
elements = None
|
||||
elements = self.driver.find_elements(By.CSS_SELECTOR, active_selector)
|
||||
if len(elements) == 0:
|
||||
active = True
|
||||
else:
|
||||
text = elements[0].text
|
||||
print(f" Text: '{text}'")
|
||||
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
|
||||
except Exception as e:
|
||||
print(f" Selector failed: {e}")
|
||||
|
||||
if cost is None or active is None:
|
||||
print(f" ✗ No cost found")
|
||||
print(f"Cost: {cost}, Active: {active}")
|
||||
input("Press Enter to continue to next URL...")
|
||||
return cost, active
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
input("Press Enter to continue to next URL...")
|
||||
return None, None
|
||||
|
||||
def scrape_cost_and_active_selenium_cardmarket(self, url):
|
||||
page_load_element_selector = "body > main.container > div.page-title-container"
|
||||
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
cost_text, active = self.scrape_cost_and_active_selenium(
|
||||
url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = None
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = Cost_Fetcher_Base.parse_cost_cardmarket(cost_text)
|
||||
if cost is not None:
|
||||
item_shipping_cost_in = 0
|
||||
if cost < 10:
|
||||
item_shipping_cost_in = 2
|
||||
elif cost < 100:
|
||||
item_shipping_cost_in = 8
|
||||
else:
|
||||
item_shipping_cost_in = 20
|
||||
cost = cost * self.eur_to_gbp_rate + item_shipping_cost_in
|
||||
active = (cost is not None)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_selenium_chaoscards(self, url):
|
||||
# page_load_element_selector = '#prod_title'
|
||||
cost_selector = '.price_inc > span:nth-child(2)'
|
||||
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)'
|
||||
cost_text, active = self.scrape_cost_and_active_selenium(
|
||||
url = url
|
||||
, page_load_element_selector = cost_selector # page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["Out of stock", "Coming soon"]
|
||||
)
|
||||
cost = Cost_Fetcher_Base.parse_cost_chaoscards(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_selenium_gameslore(self, url):
|
||||
# page_load_element_selector = '.page-title'
|
||||
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
|
||||
active_selector = '.stock > span:nth-child(1)'
|
||||
cost_text, active = self.scrape_cost_and_active_selenium(
|
||||
url = url
|
||||
, page_load_element_selector = cost_selector # page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["OUT OF STOCK"]
|
||||
)
|
||||
cost = Cost_Fetcher_Base.parse_cost_gameslore(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_selenium_magicmadhouse(self, url):
|
||||
page_load_element_selector = '.productView-title'
|
||||
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
|
||||
active_selector = '.alertBox.alertBox--error'
|
||||
cost_text, active = self.scrape_cost_and_active_selenium(
|
||||
url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = Cost_Fetcher_Base.parse_cost_magicmadhouse(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_prices_and_quantities_selenium_cardmarket(self, url):
|
||||
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
|
||||
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
quantity_selector = 'div.amount-container > span:nth-child(1)'
|
||||
|
||||
try:
|
||||
print(f" Loading page...")
|
||||
# time.sleep(random.uniform(6, 10))
|
||||
try:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector))
|
||||
)
|
||||
except Exception as e:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, offer_container_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, offer_container_selector))
|
||||
)
|
||||
|
||||
max_attempts = 10
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
element = None
|
||||
element = self.driver.find_element(By.CSS_SELECTOR, offer_container_selector)
|
||||
text = element.text
|
||||
print(f"✓ Element loaded successfully on attempt {attempt + 1}")
|
||||
# return True
|
||||
break
|
||||
except StaleElementReferenceException:
|
||||
print(f"Stale element on attempt {attempt + 1}, retrying...")
|
||||
if attempt < max_attempts - 1:
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ValueError("StaleElementReferenceException")
|
||||
|
||||
print(f" Page title: {self.driver.title}")
|
||||
|
||||
price_quantity_pairs = []
|
||||
try:
|
||||
offer_containers = self.driver.find_elements(By.CSS_SELECTOR, offer_container_selector)
|
||||
print(f" Offer container selector: Found {len(offer_containers)} elements")
|
||||
for offer_container in offer_containers:
|
||||
price_element = offer_container.find_element(By.CSS_SELECTOR, price_selector)
|
||||
price_text = price_element.text
|
||||
if '€' in price_text and re.search(r'\d', price_text):
|
||||
print(f" ✓ Found price: {price_text}")
|
||||
else:
|
||||
price_text = None
|
||||
|
||||
quantity_element = offer_container.find_element(By.CSS_SELECTOR, quantity_selector)
|
||||
quantity_text = quantity_element.text
|
||||
|
||||
if price_text is None or quantity_text is None:
|
||||
continue
|
||||
price_quantity_pairs.append({
|
||||
'price': Cost_Fetcher_Base.parse_cost_cardmarket(price_text = price_text)
|
||||
, 'quantity': Cost_Fetcher_Base.parse_cost_cardmarket(quantity_text = quantity_text)
|
||||
})
|
||||
except Exception as e:
|
||||
print(f" Price selector failed: {e}")
|
||||
return []
|
||||
finally:
|
||||
return price_quantity_pairs
|
||||
|
||||
def load_tcg_sole_trader_workbook(self):
|
||||
print("Loading workbook...")
|
||||
self.workbook = load_workbook(Cost_Fetcher_Base.WORKBOOK_NAME)
|
||||
|
||||
if Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
if Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
|
||||
self.sourcing_sheet = self.workbook[Cost_Fetcher_Base.SOURCING_WORKSHEET_NAME]
|
||||
self.product_sheet = self.workbook[Cost_Fetcher_Base.PRODUCT_WORKSHEET_NAME]
|
||||
|
||||
sourcing_table_found = False
|
||||
for row in range(1, self.sourcing_sheet.max_row + 1):
|
||||
if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found or not self.index_row_header_sourcing:
|
||||
for row in range(1, min(20, self.sourcing_sheet.max_row + 1)):
|
||||
if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found:
|
||||
print("Error: Could not find table 'tbl_Sourcing'")
|
||||
return
|
||||
|
||||
product_table_found = False
|
||||
for row in range(1, self.product_sheet.max_row + 1):
|
||||
if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value):
|
||||
self.index_row_header_product = row
|
||||
product_table_found = True
|
||||
break
|
||||
|
||||
if not product_table_found:
|
||||
print("Error: Could not find table 'tbl_Product'")
|
||||
return
|
||||
|
||||
for index_column in range(1, self.sourcing_sheet.max_column + 1):
|
||||
header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip()
|
||||
if 'Source Name' == header:
|
||||
self.index_column_name_sourcing = index_column
|
||||
elif 'Source Link' == header:
|
||||
self.index_column_link_sourcing = index_column
|
||||
elif 'Source Unit Cost' == header:
|
||||
self.index_column_unit_cost_sourcing = index_column
|
||||
elif 'Active' == header:
|
||||
self.index_column_active_sourcing = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_sourcing = index_column
|
||||
|
||||
for index_column in range(1, self.product_sheet.max_column + 1):
|
||||
header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip()
|
||||
if 'Is Booster Box' == header:
|
||||
self.index_column_is_booster_box_product = index_column
|
||||
elif 'Is Booster' == header:
|
||||
self.index_column_is_booster_product = index_column
|
||||
elif 'Is Precon' == header:
|
||||
self.index_column_is_precon_product = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_product = index_column
|
||||
|
||||
print(f"Sourcing max row: {self.sourcing_sheet.max_row}")
|
||||
print(f"Sourcing header row: {self.index_row_header_sourcing}")
|
||||
print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}")
|
||||
print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}")
|
||||
print(f"Product max row: {self.product_sheet.max_row}")
|
||||
print(f"Product header row: {self.index_row_header_product}")
|
||||
print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}")
|
||||
print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}")
|
||||
|
||||
if not all([
|
||||
self.index_column_name_sourcing
|
||||
, self.index_column_link_sourcing
|
||||
, self.index_column_unit_cost_sourcing
|
||||
, self.index_column_product_id_sourcing
|
||||
, self.index_column_active_sourcing
|
||||
, self.index_column_product_id_product
|
||||
, self.index_column_is_booster_product
|
||||
, self.index_column_is_booster_box_product
|
||||
, self.index_column_is_precon_product
|
||||
]):
|
||||
print("Error: Could not find required columns")
|
||||
return
|
||||
|
||||
def scrape_all_costs(self):
|
||||
try:
|
||||
processed_count = 0
|
||||
updated_count = 0
|
||||
cardmarket_accessed_last_on = 0
|
||||
chaoscards_accessed_last_on = 0
|
||||
gameslore_accessed_last_on = 0
|
||||
magicmadhouse_accessed_last_on = 0
|
||||
did_restart_since_last_chaos_cards_visit = True
|
||||
did_restart_since_last_games_lore_visit = True
|
||||
for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1):
|
||||
# print(f"index_row: {index_row}")
|
||||
# print(f"{self.sourcing_sheet.cell(index_row, 1).value}, {self.sourcing_sheet.cell(index_row, 2).value}, {self.sourcing_sheet.cell(index_row, 3).value}, {self.sourcing_sheet.cell(index_row, 4).value}, {self.sourcing_sheet.cell(index_row, 5).value}, {self.sourcing_sheet.cell(index_row, 6).value}, {self.sourcing_sheet.cell(index_row, 7).value}, {self.sourcing_sheet.cell(index_row, 8).value}, {self.sourcing_sheet.cell(index_row, 9).value}, {self.sourcing_sheet.cell(index_row, 10).value}, {self.sourcing_sheet.cell(index_row, 11).value}, {self.sourcing_sheet.cell(index_row, 12).value}, {self.sourcing_sheet.cell(index_row, 13).value}, {self.sourcing_sheet.cell(index_row, 14).value}, {self.sourcing_sheet.cell(index_row, 15).value}, {self.sourcing_sheet.cell(index_row, 16).value}, {self.sourcing_sheet.cell(index_row, 17).value}, {self.sourcing_sheet.cell(index_row, 18).value}, {self.sourcing_sheet.cell(index_row, 19).value}")
|
||||
source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value
|
||||
source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value
|
||||
source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value
|
||||
|
||||
if not source_name or not source_link: # or not str(source_link).strip():
|
||||
continue
|
||||
|
||||
print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
|
||||
|
||||
product_is_booster = False
|
||||
for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1):
|
||||
product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value
|
||||
# print(f"found product: id {product_id}")
|
||||
if product_id == source_product_id:
|
||||
product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper()
|
||||
# print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}")
|
||||
product_is_booster = (product_is_booster_text == "TRUE")
|
||||
break
|
||||
print(f"product is booster: {product_is_booster}")
|
||||
|
||||
if (
|
||||
(
|
||||
source_name == "Chaos Cards"
|
||||
and not did_restart_since_last_chaos_cards_visit
|
||||
)
|
||||
or (
|
||||
source_name == "Games Lore"
|
||||
and not did_restart_since_last_games_lore_visit
|
||||
)
|
||||
):
|
||||
self.stop_driver()
|
||||
self.setup_driver()
|
||||
if not self.driver:
|
||||
return
|
||||
did_restart_since_last_chaos_cards_visit = True
|
||||
did_restart_since_last_games_lore_visit = True
|
||||
|
||||
if source_name in ["Card Market", "Chaos Cards", "Games Lore", "Magic Madhouse"]:
|
||||
self.clear_row_sourcing_sheet(index_row = index_row)
|
||||
processed_count += 1
|
||||
Cost_Fetcher_Base.log_processing_new_row(
|
||||
index_row = index_row
|
||||
, source_link = source_link
|
||||
)
|
||||
|
||||
cost = None
|
||||
active = None
|
||||
if source_name == "Card Market":
|
||||
while (time.time() - cardmarket_accessed_last_on < random.uniform(10, 20)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
if product_is_booster:
|
||||
price_quantity_pairs = self.scrape_prices_and_quantities_selenium_cardmarket(url = source_link)
|
||||
if price_quantity_pairs:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE"
|
||||
max_quantity = 0
|
||||
updated_row_price = False
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
eur_price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
print(f" Found price: €{eur_price}")
|
||||
print(f" Found quantity: {quantity}")
|
||||
max_quantity = max(max_quantity, quantity)
|
||||
if quantity >= 8:
|
||||
if eur_price:
|
||||
gbp_price = eur_price * self.eur_to_gbp_rate
|
||||
print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}")
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price
|
||||
updated_count += 1
|
||||
updated_row_price = True
|
||||
print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}")
|
||||
break
|
||||
else:
|
||||
print(f" Error: Could not parse price")
|
||||
if not updated_row_price:
|
||||
print("Offer with quantity >= 8 not found")
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
eur_price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
print(f" Found price: €{eur_price}")
|
||||
print(f" Found quantity: {quantity}")
|
||||
if max_quantity <= 2 or quantity == max_quantity:
|
||||
if eur_price:
|
||||
gbp_price = eur_price * self.eur_to_gbp_rate
|
||||
print(f" Converted: €{eur_price:.2f} → £{gbp_price:.2f}")
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = gbp_price
|
||||
updated_count += 1
|
||||
updated_row_price = True
|
||||
print(f"output row: {index_row}, value: {self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value}")
|
||||
break
|
||||
else:
|
||||
print(f" Error: Could not parse price")
|
||||
else:
|
||||
cost, active = self.scrape_cost_and_active_selenium_cardmarket(url = source_link)
|
||||
cardmarket_accessed_last_on = time.time()
|
||||
elif source_name == "Chaos Cards":
|
||||
while (time.time() - chaoscards_accessed_last_on < random.uniform(20, 30)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.scrape_cost_and_active_selenium_chaoscards(url = source_link)
|
||||
chaoscards_accessed_last_on = time.time()
|
||||
did_restart_since_last_chaos_cards_visit = False
|
||||
elif source_name == "Games Lore":
|
||||
while (time.time() - gameslore_accessed_last_on < random.uniform(10, 20)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.scrape_cost_and_active_selenium_gameslore(url = source_link)
|
||||
gameslore_accessed_last_on = time.time()
|
||||
did_restart_since_last_games_lore_visit = False
|
||||
elif source_name == "Magic Madhouse":
|
||||
while (time.time() - magicmadhouse_accessed_last_on < random.uniform(10, 20)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.scrape_cost_and_active_selenium_magicmadhouse(url = source_link)
|
||||
magicmadhouse_accessed_last_on = time.time()
|
||||
|
||||
if (cost is not None and active is not None):
|
||||
print(f" Found cost: {cost}, active: {active}")
|
||||
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = cost
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE"
|
||||
updated_count += 1
|
||||
else:
|
||||
print(f" Error: Could not find cost on page")
|
||||
# Save workbook
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Saving workbook...")
|
||||
self.workbook.save(Cost_Fetcher_Base.WORKBOOK_NAME)
|
||||
|
||||
print(f"\nComplete!")
|
||||
print(f"Processed: {processed_count} entries")
|
||||
print(f"Updated: {updated_count} costs")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
def clear_row_sourcing_sheet(self, index_row):
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE"
|
||||
|
||||
@staticmethod
|
||||
def log_processing_new_row(index_row, source_link):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Processing row {index_row}: {source_link}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
def __init__(self):
|
||||
print("Setting up browser automation (browser will not be visible)...")
|
||||
self.setup_driver()
|
||||
if not self.driver:
|
||||
return
|
||||
self.load_tcg_sole_trader_workbook()
|
||||
self.get_eur_to_gbp_rate()
|
||||
|
||||
def stop_driver(self):
|
||||
self.driver.quit()
|
||||
|
||||
def main():
|
||||
cost_fetcher = Cost_Fetcher_Base()
|
||||
cost_fetcher.scrape_all_costs()
|
||||
cost_fetcher.stop_driver()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
689
product_scraping/product_scraper.py
Normal file
689
product_scraping/product_scraper.py
Normal file
@@ -0,0 +1,689 @@
|
||||
import pandas as pd
|
||||
from openpyxl import load_workbook, Workbook
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
import requests
|
||||
"""
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import StaleElementReferenceException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import random
|
||||
from playwright.sync_api import sync_playwright, Browser, Page
|
||||
# import playwright
|
||||
# import undetected_chromedriver as uc
|
||||
# from undetected_chromedriver import Chrome
|
||||
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
class Product_Scraper:
|
||||
browser: Browser # sync_playwright.chromium
|
||||
domain: str
|
||||
# driver: Chrome # webdriver.Chrome
|
||||
page: Page
|
||||
# wait: WebDriverWait
|
||||
|
||||
def __init__(self, domain):
|
||||
print("Setting up browser automation")
|
||||
self.domain = domain
|
||||
"""
|
||||
self.setup_browser()
|
||||
if not self.browser:
|
||||
return
|
||||
"""
|
||||
|
||||
def stop_browser(self, min_delay = 0):
|
||||
if (min_delay > 0):
|
||||
time.sleep(random.uniform(min_delay, min_delay * 1.5))
|
||||
self.browser.close()
|
||||
|
||||
@staticmethod
|
||||
def parse_cost(cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
try:
|
||||
return float(cost_clean) / 100
|
||||
except ValueError:
|
||||
return None
|
||||
@classmethod
|
||||
def parse_cost_from_pennies(cls, cost_text):
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = cls.parse_cost(cost_text = cost_text)
|
||||
if cost_clean is not None:
|
||||
cost_clean = cost_clean / 100
|
||||
return cost_clean
|
||||
@classmethod
|
||||
def parse_cost_chaoscards(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_cardmarket(cls, cost_text):
|
||||
# return cls.parse_cost(cost_text = cost_text)
|
||||
"""Convert '141,30 €' format to float in EUR"""
|
||||
if not cost_text:
|
||||
return None
|
||||
cost_clean = re.sub(r'[^\d,]', '', cost_text)
|
||||
cost_clean = cost_clean.replace(',', '.')
|
||||
try:
|
||||
return float(cost_clean)
|
||||
except ValueError:
|
||||
return None
|
||||
@classmethod
|
||||
def parse_cost_gameslore(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
@classmethod
|
||||
def parse_cost_magicmadhouse(cls, cost_text):
|
||||
return cls.parse_cost(cost_text = cost_text)
|
||||
|
||||
"""
|
||||
def setup_driver(self):
|
||||
print("Starting driver")
|
||||
" ""
|
||||
chrome_options = Options()
|
||||
# Remove headless mode to see the browser
|
||||
# chrome_options.add_argument('--headless')
|
||||
chrome_options.add_argument('--no-sandbox')
|
||||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||||
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
|
||||
chrome_options.add_argument('--window-size=1920,1080')
|
||||
" ""
|
||||
try:
|
||||
self.driver = Chrome(version_main=133) # webdriver.Chrome(options=chrome_options)
|
||||
# return driver
|
||||
except Exception as e:
|
||||
print(f"Error setting up Chrome driver: {e}")
|
||||
print("Make sure Chrome and chromedriver are installed")
|
||||
# return None
|
||||
self.wait = WebDriverWait(self.driver, 15)
|
||||
" ""
|
||||
def setup_browser(self):
|
||||
print("Starting browser")
|
||||
with sync_playwright() as p:
|
||||
self.browser = p.chromium.launch()
|
||||
self.page = self.browser.new_page()
|
||||
"""
|
||||
def scrape_cost_and_active_playwright(self, url, page_load_element_selector, cost_selector, active_selector, invalid_active_statuses, min_delay = 0):
|
||||
print(f" Loading page...")
|
||||
# time.sleep(random.uniform(6, 10))
|
||||
"""
|
||||
try:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
except Exception as e:
|
||||
self.driver.get(url)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, page_load_element_selector))
|
||||
)
|
||||
|
||||
max_attempts = 10
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
element = None
|
||||
element = self.driver.find_element(By.CSS_SELECTOR, page_load_element_selector)
|
||||
text = element.text
|
||||
print(f"✓ Element loaded successfully on attempt {attempt + 1}")
|
||||
# return True
|
||||
break
|
||||
except StaleElementReferenceException:
|
||||
print(f"Stale element on attempt {attempt + 1}, retrying...")
|
||||
if attempt < max_attempts - 1:
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise ValueError("StaleElementReferenceException")
|
||||
"""
|
||||
with sync_playwright() as p:
|
||||
self.browser = p.chromium.launch(headless=False)
|
||||
self.page = self.browser.new_page()
|
||||
self.page.goto(url = url)
|
||||
try:
|
||||
# Automatically waits up to 30s by default
|
||||
element = self.page.locator(selector = page_load_element_selector)
|
||||
print(f" Page title: {self.page.title()}")
|
||||
|
||||
element = self.page.locator(selector = cost_selector)
|
||||
text = element.text_content()
|
||||
print(f" Text: '{text}'")
|
||||
cost = text
|
||||
|
||||
active = None
|
||||
if active_selector is None: # or invalid_active_statuses is None or invalid_active_statuses == []:
|
||||
active = (cost is not None)
|
||||
else:
|
||||
try:
|
||||
elements = self.page.query_selector_all(selector = cost_selector)
|
||||
if len(elements) == 0:
|
||||
active = True
|
||||
else:
|
||||
text = elements[0].text_content()
|
||||
print(f" Text: '{text}'")
|
||||
active = (invalid_active_statuses is None or text not in invalid_active_statuses)
|
||||
except Exception as e:
|
||||
print(f" Selector failed: {e}")
|
||||
|
||||
if cost is None or active is None:
|
||||
print(f" ✗ No cost found")
|
||||
print(f"Cost: {cost}, Active: {active}")
|
||||
input("Press Enter to continue to next URL...")
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
input("Press Enter to continue to next URL...")
|
||||
self.stop_browser(min_delay = min_delay)
|
||||
return None, None
|
||||
finally:
|
||||
self.stop_browser(min_delay = min_delay)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_playwright_cardmarket(self, url, eur_to_gbp_rate):
|
||||
page_load_element_selector = "body > main.container > div.page-title-container"
|
||||
cost_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer > div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
cost_text, active = self.scrape_cost_and_active_playwright(
|
||||
url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = None
|
||||
, invalid_active_statuses = []
|
||||
, min_delay = 15
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_cardmarket(cost_text)
|
||||
if cost is not None:
|
||||
item_shipping_cost_in = 0
|
||||
if cost < 10:
|
||||
item_shipping_cost_in = 2
|
||||
elif cost < 100:
|
||||
item_shipping_cost_in = 8
|
||||
else:
|
||||
item_shipping_cost_in = 20
|
||||
cost = cost * eur_to_gbp_rate + item_shipping_cost_in
|
||||
active = (cost is not None)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_playwright_chaoscards(self, url):
|
||||
# page_load_element_selector = '#prod_title'
|
||||
cost_selector = '.price_inc > span:nth-child(2)'
|
||||
active_selector = '.product__right > form > ul.prod_det_fields.left.product-section.product-section--stock > li > div:nth-child(1) > div:nth-child(2)'
|
||||
cost_text, active = self.scrape_cost_and_active_playwright(
|
||||
url = url
|
||||
, page_load_element_selector = cost_selector # page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["Out of stock", "Coming soon"]
|
||||
, min_delay = 15
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_chaoscards(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_playwright_gameslore(self, url):
|
||||
# page_load_element_selector = '.page-title'
|
||||
cost_selector = 'div.columns > div.column.main > div.product-info-main > div.product-info-price > div.price-box > span.special-price > span.price-container > span.price-wrapper > span.price'
|
||||
active_selector = '.stock > span:nth-child(1)'
|
||||
cost_text, active = self.scrape_cost_and_active_playwright(
|
||||
url = url
|
||||
, page_load_element_selector = cost_selector # page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = ["OUT OF STOCK"]
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_gameslore(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_cost_and_active_playwright_magicmadhouse(self, url):
|
||||
page_load_element_selector = '.productView-title'
|
||||
cost_selector = 'div.body > div.container > div > div.productView > section.productView-details > div.productView-options > form > div.productView-options-selections > div.productView-product > div.productView-info > div.price-rating > div.productView-price > div.price-section.actual-price > span.price'
|
||||
active_selector = '.alertBox.alertBox--error'
|
||||
cost_text, active = self.scrape_cost_and_active_playwright(
|
||||
url = url
|
||||
, page_load_element_selector = page_load_element_selector
|
||||
, cost_selector = cost_selector
|
||||
, active_selector = active_selector
|
||||
, invalid_active_statuses = []
|
||||
)
|
||||
cost = Product_Scraper.parse_cost_magicmadhouse(cost_text)
|
||||
return cost, active
|
||||
|
||||
def scrape_prices_and_quantities_playwright_cardmarket(self, url, eur_to_gbp_rate):
|
||||
offer_container_selector = '#table > div:nth-child(1) > div.table-body > .row.article-row.g-0:nth-child(1) > div.col-offer'
|
||||
price_selector = 'div.price-container > div > div:nth-child(1) > span:nth-child(1)'
|
||||
quantity_selector = 'div.amount-container > span:nth-child(1)'
|
||||
|
||||
print(f" Loading page...")
|
||||
with sync_playwright() as p:
|
||||
self.browser = p.chromium.launch(headless=False)
|
||||
self.page = self.browser.new_page()
|
||||
self.page.goto(url = url)
|
||||
|
||||
try:
|
||||
# Automatically waits up to 30s by default
|
||||
print(f" Page title: {self.page.title()}")
|
||||
|
||||
price_quantity_pairs = []
|
||||
try:
|
||||
offer_containers = self.page.query_selector_all(offer_container_selector)
|
||||
print(f" Offer container selector: Found {len(offer_containers)} elements")
|
||||
for offer_container in offer_containers:
|
||||
price_element = offer_container.query_selector(price_selector)
|
||||
price_text = price_element.text_content()
|
||||
if '€' in price_text and re.search(r'\d', price_text):
|
||||
print(f" ✓ Found price: {price_text}")
|
||||
else:
|
||||
price_text = None
|
||||
|
||||
quantity_element = offer_container.query_selector(quantity_selector)
|
||||
quantity_text = quantity_element.text_content()
|
||||
|
||||
if price_text is None or quantity_text is None:
|
||||
continue
|
||||
price_quantity_pairs.append({
|
||||
'price': Product_Scraper.parse_cost_cardmarket(price_text = price_text) * eur_to_gbp_rate
|
||||
, 'quantity': Product_Scraper.parse_cost_cardmarket(quantity_text = quantity_text)
|
||||
})
|
||||
except Exception as e:
|
||||
print(f" Price selector failed: {e}")
|
||||
input("Press enter to continue to next URL...")
|
||||
self.stop_browser(min_delay = 15)
|
||||
return []
|
||||
finally:
|
||||
self.stop_browser(min_delay = 15)
|
||||
return price_quantity_pairs
|
||||
|
||||
|
||||
class TCG_Sole_Trader_Workbook_Container:
|
||||
NAME_COLUMN_ACTIVE: str = 'Active'
|
||||
NAME_COLUMN_INDEX_ROW: str = 'Index Row'
|
||||
NAME_COLUMN_LINK: str = 'Link'
|
||||
NAME_COLUMN_PRODUCT_ID: str = 'Product Id'
|
||||
NAME_COLUMN_PRODUCT_IS_BOOSTER: str = 'Product Is Booster'
|
||||
NAME_COLUMN_PRODUCT_IS_BOOSTER_BOX: str = 'Product Is Booster Box'
|
||||
NAME_COLUMN_PRODUCT_IS_PRECON: str = 'Product Is Precon'
|
||||
NAME_COLUMN_SOURCE_NAME: str = 'Source Name'
|
||||
NAME_COLUMN_UNIT_COST: str = 'Cost'
|
||||
NAME_COLUMN_UNIT_PRICE: str = 'Price'
|
||||
PRODUCT_WORKSHEET_NAME = 'Product'
|
||||
SOURCING_WORKSHEET_NAME = 'Sourcing'
|
||||
WORKBOOK_NAME = 'TCG Sole Trader Copy.xlsx'
|
||||
|
||||
index_column_active_sourcing: int
|
||||
index_column_is_booster_product: int
|
||||
index_column_is_booster_box_product: int
|
||||
index_column_is_precon_product: int
|
||||
index_column_link_sourcing: int
|
||||
index_column_name_sourcing: int
|
||||
index_column_product_id_product: int
|
||||
index_column_product_id_sourcing: int
|
||||
index_column_unit_cost_sourcing: int
|
||||
index_column_unit_price_sourcing: int
|
||||
index_row_header_product: int
|
||||
index_row_header_sourcing: int
|
||||
product_sheet: Worksheet
|
||||
sourcing_sheet: Worksheet
|
||||
workbook: Workbook
|
||||
|
||||
def __init__(self):
|
||||
print("Loading workbook...")
|
||||
self.workbook = load_workbook(self.WORKBOOK_NAME)
|
||||
|
||||
if self.SOURCING_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{self.SOURCING_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
if self.PRODUCT_WORKSHEET_NAME not in self.workbook.sheetnames:
|
||||
print(f"Error: Sheet '{self.PRODUCT_WORKSHEET_NAME}' not found")
|
||||
return
|
||||
|
||||
self.sourcing_sheet = self.workbook[self.SOURCING_WORKSHEET_NAME]
|
||||
self.product_sheet = self.workbook[self.PRODUCT_WORKSHEET_NAME]
|
||||
|
||||
sourcing_table_found = False
|
||||
for row in range(1, self.sourcing_sheet.max_row + 1):
|
||||
if self.sourcing_sheet.cell(row, 1).value == 'tbl_Sourcing' or 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found or not self.index_row_header_sourcing:
|
||||
for row in range(1, min(20, self.sourcing_sheet.max_row + 1)):
|
||||
if 'Source Name' in str(self.sourcing_sheet.cell(row, 3).value):
|
||||
self.index_row_header_sourcing = row
|
||||
sourcing_table_found = True
|
||||
break
|
||||
|
||||
if not sourcing_table_found:
|
||||
print("Error: Could not find table 'tbl_Sourcing'")
|
||||
return
|
||||
|
||||
product_table_found = False
|
||||
for row in range(1, self.product_sheet.max_row + 1):
|
||||
if self.product_sheet.cell(row, 1).value == 'tbl_Product' or 'Product Id' in str(self.product_sheet.cell(row, 1).value):
|
||||
self.index_row_header_product = row
|
||||
product_table_found = True
|
||||
break
|
||||
|
||||
if not product_table_found:
|
||||
print("Error: Could not find table 'tbl_Product'")
|
||||
return
|
||||
|
||||
for index_column in range(1, self.sourcing_sheet.max_column + 1):
|
||||
header = str(self.sourcing_sheet.cell(self.index_row_header_sourcing, index_column).value).strip()
|
||||
if 'Source Name' == header:
|
||||
self.index_column_name_sourcing = index_column
|
||||
elif 'Source Link' == header:
|
||||
self.index_column_link_sourcing = index_column
|
||||
elif 'Source Unit Cost' == header:
|
||||
self.index_column_unit_cost_sourcing = index_column
|
||||
elif 'Sale Price' == header:
|
||||
self.index_column_unit_price_sourcing = index_column
|
||||
elif 'Active' == header:
|
||||
self.index_column_active_sourcing = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_sourcing = index_column
|
||||
|
||||
for index_column in range(1, self.product_sheet.max_column + 1):
|
||||
header = str(self.product_sheet.cell(self.index_row_header_product, index_column).value).strip()
|
||||
if 'Is Booster Box' == header:
|
||||
self.index_column_is_booster_box_product = index_column
|
||||
elif 'Is Booster' == header:
|
||||
self.index_column_is_booster_product = index_column
|
||||
elif 'Is Precon' == header:
|
||||
self.index_column_is_precon_product = index_column
|
||||
elif 'Product Id' == header:
|
||||
self.index_column_product_id_product = index_column
|
||||
|
||||
print(f"Sourcing max row: {self.sourcing_sheet.max_row}")
|
||||
print(f"Sourcing header row: {self.index_row_header_sourcing}")
|
||||
print(f"Sourcing header 1: {self.sourcing_sheet.cell(self.index_row_header_sourcing, 1).value}")
|
||||
print(f"Sourcing Columns - Name: {self.index_column_name_sourcing}, Link: {self.index_column_link_sourcing}, Unit Cost: {self.index_column_unit_cost_sourcing}, Sale price: {self.index_column_unit_price_sourcing}, Active: {self.index_column_active_sourcing}, Product Id: {self.index_column_product_id_sourcing}")
|
||||
print(f"Product max row: {self.product_sheet.max_row}")
|
||||
print(f"Product header row: {self.index_row_header_product}")
|
||||
print(f"Sourcing header 1: {self.product_sheet.cell(self.index_row_header_product, 1).value}")
|
||||
print(f"Product Columns - Id: {self.index_column_product_id_product}, Is Booster: {self.index_column_is_booster_product}, Is Booster Box: {self.index_column_is_booster_box_product}, Is Precon: {self.index_column_is_precon_product}")
|
||||
|
||||
if not all([
|
||||
self.index_column_name_sourcing
|
||||
, self.index_column_link_sourcing
|
||||
, self.index_column_unit_cost_sourcing
|
||||
, self.index_column_unit_price_sourcing
|
||||
, self.index_column_product_id_sourcing
|
||||
, self.index_column_active_sourcing
|
||||
, self.index_column_product_id_product
|
||||
, self.index_column_is_booster_product
|
||||
, self.index_column_is_booster_box_product
|
||||
, self.index_column_is_precon_product
|
||||
]):
|
||||
print("Error: Could not find required columns")
|
||||
return
|
||||
|
||||
@classmethod
|
||||
def create_product_source_df(cls):
|
||||
return pd.DataFrame(columns = [
|
||||
cls.NAME_COLUMN_INDEX_ROW
|
||||
, cls.NAME_COLUMN_PRODUCT_ID
|
||||
, cls.NAME_COLUMN_SOURCE_NAME
|
||||
, cls.NAME_COLUMN_LINK
|
||||
, cls.NAME_COLUMN_PRODUCT_IS_BOOSTER
|
||||
, cls.NAME_COLUMN_UNIT_COST
|
||||
, cls.NAME_COLUMN_UNIT_PRICE
|
||||
, cls.NAME_COLUMN_ACTIVE
|
||||
])
|
||||
|
||||
def get_sourcing_entries(self):
|
||||
product_sources = self.create_product_source_df()
|
||||
try:
|
||||
# products = []
|
||||
for index_row in range(self.index_row_header_sourcing + 1, self.sourcing_sheet.max_row + 1):
|
||||
source_name = self.sourcing_sheet.cell(index_row, self.index_column_name_sourcing).value
|
||||
source_link = self.sourcing_sheet.cell(index_row, self.index_column_link_sourcing).value
|
||||
source_product_id = self.sourcing_sheet.cell(index_row, self.index_column_product_id_sourcing).value
|
||||
print(f"found source: {source_name} - product: {source_product_id} - link: {source_link}")
|
||||
|
||||
if not source_name or not source_link:
|
||||
continue
|
||||
|
||||
product_is_booster = False
|
||||
for product_row in range(self.index_row_header_product + 1, self.product_sheet.max_row + 1):
|
||||
product_id = self.product_sheet.cell(product_row, self.index_column_product_id_product).value
|
||||
# print(f"found product: id {product_id}")
|
||||
if product_id == source_product_id:
|
||||
product_is_booster_text = str(self.product_sheet.cell(product_row, self.index_column_is_booster_product).value).upper()
|
||||
# print(f"product is booster: {product_is_booster_text}, type: {str(type(product_is_booster_text))}")
|
||||
product_is_booster = (product_is_booster_text == "TRUE")
|
||||
break
|
||||
print(f"product is booster: {product_is_booster}")
|
||||
|
||||
# products.append((index_row, source_product_id, source_name, source_link, product_is_booster))
|
||||
product_sources.loc[len(product_sources)] = [
|
||||
index_row
|
||||
, source_product_id
|
||||
, source_name
|
||||
, source_link
|
||||
, product_is_booster
|
||||
, None # cost
|
||||
, None # price
|
||||
, None # active
|
||||
]
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
product_sources.sort_values(self.NAME_COLUMN_SOURCE_NAME)
|
||||
return product_sources
|
||||
|
||||
def clear_row_sourcing_sheet(self, index_row):
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = None
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "FALSE"
|
||||
|
||||
def update_row_sourcing_sheet(self, index_row, unit_cost = None, unit_price = None, active = None):
|
||||
if unit_cost is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_cost_sourcing).value = unit_cost
|
||||
if unit_price is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_unit_price_sourcing).value = unit_price
|
||||
if active is not None:
|
||||
self.sourcing_sheet.cell(index_row, self.index_column_active_sourcing).value = "TRUE" if active else "FALSE"
|
||||
|
||||
def save_workbook(self):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Saving workbook...")
|
||||
self.workbook.save(self.WORKBOOK_NAME)
|
||||
|
||||
|
||||
class Cost_Fetcher:
|
||||
ACCESSED_LAST_ON_FLAG: str = 'Accessed Last On'
|
||||
INDEX_DOMAIN_FLAG: str = 'Index Domain'
|
||||
NAME_DOMAIN_CARD_MARKET: str = 'Card Market'
|
||||
NAME_DOMAIN_CHAOS_CARDS: str = 'Chaos Cards'
|
||||
NAME_DOMAIN_GAMES_LORE: str = 'Games Lore'
|
||||
NAME_DOMAIN_MAGIC_MADHOUSE: str = 'Magic Madhouse'
|
||||
NAME_FLAG: str = 'Name'
|
||||
|
||||
domain_names: list[str]
|
||||
eur_to_gbp_rate: float
|
||||
product_scrapers: list[Product_Scraper]
|
||||
product_sources: pd.DataFrame
|
||||
workbook_container: TCG_Sole_Trader_Workbook_Container
|
||||
|
||||
def __init__(self):
|
||||
self.domain_names = [
|
||||
self.NAME_DOMAIN_CARD_MARKET
|
||||
, self.NAME_DOMAIN_CHAOS_CARDS
|
||||
, self.NAME_DOMAIN_GAMES_LORE
|
||||
, self.NAME_DOMAIN_MAGIC_MADHOUSE
|
||||
]
|
||||
self.domain_details = {
|
||||
self.NAME_DOMAIN_CARD_MARKET: {
|
||||
self.NAME_FLAG: self.NAME_DOMAIN_CARD_MARKET
|
||||
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CARD_MARKET)
|
||||
, self.ACCESSED_LAST_ON_FLAG: 0
|
||||
}
|
||||
, self.NAME_DOMAIN_CHAOS_CARDS: {
|
||||
self.NAME_FLAG: self.NAME_DOMAIN_CHAOS_CARDS
|
||||
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_CHAOS_CARDS)
|
||||
, self.ACCESSED_LAST_ON_FLAG: 0
|
||||
}
|
||||
, self.NAME_DOMAIN_GAMES_LORE: {
|
||||
self.NAME_FLAG: self.NAME_DOMAIN_GAMES_LORE
|
||||
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_GAMES_LORE)
|
||||
, self.ACCESSED_LAST_ON_FLAG: 0
|
||||
}
|
||||
, self.NAME_DOMAIN_MAGIC_MADHOUSE: {
|
||||
self.NAME_FLAG: self.NAME_DOMAIN_MAGIC_MADHOUSE
|
||||
, self.INDEX_DOMAIN_FLAG: self.get_index_domain_from_name(self.NAME_DOMAIN_MAGIC_MADHOUSE)
|
||||
, self.ACCESSED_LAST_ON_FLAG: 0
|
||||
}
|
||||
}
|
||||
product_scrapers = []
|
||||
for index_domain in range(len(self.domain_names)):
|
||||
domain = self.domain_names[index_domain]
|
||||
product_scraper = Product_Scraper(domain)
|
||||
product_scrapers.append(product_scraper)
|
||||
self.product_scrapers = product_scrapers
|
||||
self.workbook_container = TCG_Sole_Trader_Workbook_Container()
|
||||
self.get_eur_to_gbp_rate()
|
||||
|
||||
self.domain_locks = defaultdict(asyncio.Lock)
|
||||
|
||||
def get_index_domain_from_name(self, domain_name):
|
||||
for index_domain in range(len(self.domain_names)):
|
||||
if (self.domain_names[index_domain] == domain_name):
|
||||
return index_domain
|
||||
raise ValueError(f'Domain does not exist: {domain_name}')
|
||||
|
||||
def get_eur_to_gbp_rate(self):
|
||||
try:
|
||||
response = requests.get('https://api.exchangerate-api.com/v4/latest/EUR', timeout=10)
|
||||
data = response.json()
|
||||
self.eur_to_gbp_rate = data['rates']['GBP']
|
||||
except Exception as e:
|
||||
print(f"Error fetching exchange rate: {e}")
|
||||
print("Using fallback rate: 0.85")
|
||||
self.eur_to_gbp_rate = 0.85
|
||||
|
||||
def fetch_all(self):
|
||||
try:
|
||||
processed_count = 0
|
||||
updated_count = 0
|
||||
self.product_sources = self.workbook_container.get_sourcing_entries()
|
||||
for index_product_source in range(len(self.product_sources)):
|
||||
product_source = self.product_sources.loc[index_product_source]
|
||||
print(f'Product source: {product_source}')
|
||||
index_row = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_INDEX_ROW]
|
||||
source_name = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_SOURCE_NAME]
|
||||
source_link = product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_LINK]
|
||||
index_domain = None
|
||||
try:
|
||||
index_domain = self.get_index_domain_from_name(source_name)
|
||||
except:
|
||||
continue
|
||||
domain_details = self.domain_details[source_name]
|
||||
self.workbook_container.clear_row_sourcing_sheet(index_row = index_row)
|
||||
processed_count += 1
|
||||
Cost_Fetcher.log_processing_new_row(
|
||||
index_row = index_row
|
||||
, source_link = source_link
|
||||
)
|
||||
|
||||
cost = None
|
||||
price = None
|
||||
active = None
|
||||
if source_name == self.NAME_DOMAIN_CARD_MARKET:
|
||||
while (time.time() - domain_details[self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
if product_source[TCG_Sole_Trader_Workbook_Container.NAME_COLUMN_PRODUCT_IS_BOOSTER]:
|
||||
price_quantity_pairs = self.product_scrapers[index_domain].scrape_prices_and_quantities_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
|
||||
if price_quantity_pairs:
|
||||
active = True
|
||||
max_quantity = 0
|
||||
updated_row_price = False
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
eur_price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
print(f" Found price: €{eur_price}")
|
||||
print(f" Found quantity: {quantity}")
|
||||
max_quantity = max(max_quantity, quantity)
|
||||
if quantity >= 8:
|
||||
if eur_price:
|
||||
price = eur_price * self.eur_to_gbp_rate
|
||||
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
|
||||
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_price_sourcing).value = gbp_price
|
||||
updated_count += 1
|
||||
updated_row_price = True
|
||||
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
|
||||
break
|
||||
else:
|
||||
print(f" Error: Could not parse price")
|
||||
if not updated_row_price:
|
||||
print("Offer with quantity >= 8 not found")
|
||||
for price_quantity_pair in price_quantity_pairs:
|
||||
eur_price = price_quantity_pair['price']
|
||||
quantity = price_quantity_pair['quantity']
|
||||
print(f" Found price: €{eur_price}")
|
||||
print(f" Found quantity: {quantity}")
|
||||
if max_quantity <= 2 or quantity == max_quantity:
|
||||
if eur_price:
|
||||
price = eur_price * self.eur_to_gbp_rate
|
||||
print(f" Converted: €{eur_price:.2f} → £{price:.2f}")
|
||||
# self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value = gbp_price
|
||||
updated_count += 1
|
||||
updated_row_price = True
|
||||
# print(f"output row: {index_row}, value: {self.workbook_container.sourcing_sheet.cell(index_row, self.workbook_container.index_column_unit_cost_sourcing).value}")
|
||||
break
|
||||
else:
|
||||
print(f" Error: Could not parse price")
|
||||
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_cardmarket(url = source_link, eur_to_gbp_rate = self.eur_to_gbp_rate)
|
||||
elif source_name == self.NAME_DOMAIN_CHAOS_CARDS:
|
||||
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(30, 40)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_chaoscards(url = source_link)
|
||||
elif source_name == self.NAME_DOMAIN_GAMES_LORE:
|
||||
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_gameslore(url = source_link)
|
||||
elif source_name == self.NAME_DOMAIN_MAGIC_MADHOUSE:
|
||||
while (time.time() - self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] < random.uniform(10, 20)):
|
||||
time.sleep(random.uniform(3, 5))
|
||||
cost, active = self.product_scrapers[index_domain].scrape_cost_and_active_playwright_magicmadhouse(url = source_link)
|
||||
|
||||
self.domain_details[source_name][self.ACCESSED_LAST_ON_FLAG] = time.time()
|
||||
|
||||
if ((cost is not None or price is not None) and active is not None):
|
||||
print(f" Found cost: {cost}, price: {price}, active: {active}")
|
||||
self.workbook_container.update_row_sourcing_sheet(index_row = index_row, unit_cost = cost, unit_price = price, active = active)
|
||||
updated_count += 1
|
||||
else:
|
||||
print(f" Error: Could not find cost on page")
|
||||
|
||||
self.workbook_container.save_workbook()
|
||||
"""
|
||||
for index_domain in range(len(self.domain_names)):
|
||||
self.product_scrapers[index_domain].stop_browser()
|
||||
"""
|
||||
print(f"\nComplete!")
|
||||
print(f"Processed: {processed_count} entries")
|
||||
print(f"Updated: {updated_count} costs")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
@staticmethod
|
||||
def log_processing_new_row(index_row, source_link):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Processing row {index_row}: {source_link}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
def main():
|
||||
cost_fetcher = Cost_Fetcher()
|
||||
cost_fetcher.fetch_all()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user