Files
tcg_web_scraper/pkm_card_fetcher.py
2026-01-09 20:53:52 +00:00

277 lines
9.4 KiB
Python

import json
import os
import pandas as pd
from datetime import datetime
import glob
def flatten_list(items):
"""Convert list to comma-separated string"""
if items is None:
return ""
if isinstance(items, list):
return ", ".join(str(item) for item in items)
return str(items)
def extract_ability_info(abilities):
"""Extract ability information as comma-separated values"""
if not abilities:
return "", "", ""
names = []
texts = []
types = []
for ability in abilities:
names.append(ability.get('name', ''))
texts.append(ability.get('text', ''))
types.append(ability.get('type', ''))
return flatten_list(names), flatten_list(texts), flatten_list(types)
def extract_attack_info(attacks):
"""Extract attack information as comma-separated values"""
if not attacks:
return "", "", "", "", ""
names = []
costs = []
damages = []
texts = []
converted_costs = []
for attack in attacks:
names.append(attack.get('name', ''))
costs.append(flatten_list(attack.get('cost', [])))
damages.append(attack.get('damage', ''))
texts.append(attack.get('text', ''))
converted_costs.append(str(attack.get('convertedEnergyCost', '')))
return (flatten_list(names),
" | ".join(costs), # Use | to separate different attacks' costs
flatten_list(damages),
flatten_list(texts),
flatten_list(converted_costs))
def extract_weakness_resistance(items):
"""Extract weakness or resistance information"""
if not items:
return "", ""
types = []
values = []
for item in items:
types.append(item.get('type', ''))
values.append(item.get('value', ''))
return flatten_list(types), flatten_list(values)
def extract_prices(price_dict, prefix):
"""Extract price information from nested price dictionaries"""
if not price_dict:
return {}
result = {}
for price_type, prices in price_dict.items():
if isinstance(prices, dict):
for metric, value in prices.items():
key = f"{prefix}_{price_type}_{metric}"
result[key] = value
else:
# Handle direct price values
key = f"{prefix}_{price_type}"
result[key] = prices
return result
def process_card(card):
"""Process a single card and return a flattened dictionary"""
row = {
'id': card.get('id', ''),
'name': card.get('name', ''),
'supertype': card.get('supertype', ''),
'subtypes': flatten_list(card.get('subtypes', [])),
'level': card.get('level', ''),
'hp': card.get('hp', ''),
'types': flatten_list(card.get('types', [])),
'evolvesFrom': card.get('evolvesFrom', ''),
'evolvesTo': flatten_list(card.get('evolvesTo', [])),
'rules': flatten_list(card.get('rules', [])),
'number': card.get('number', ''),
'artist': card.get('artist', ''),
'rarity': card.get('rarity', ''),
'flavorText': card.get('flavorText', ''),
'nationalPokedexNumbers': flatten_list(card.get('nationalPokedexNumbers', [])),
'regulationMark': card.get('regulationMark', ''),
'retreatCost': flatten_list(card.get('retreatCost', [])),
'convertedRetreatCost': card.get('convertedRetreatCost', ''),
}
# Ancient Trait
ancient_trait = card.get('ancientTrait', {})
if ancient_trait:
row['ancientTrait_name'] = ancient_trait.get('name', '')
row['ancientTrait_text'] = ancient_trait.get('text', '')
else:
row['ancientTrait_name'] = ''
row['ancientTrait_text'] = ''
# Abilities
abilities = card.get('abilities', [])
row['ability_names'], row['ability_texts'], row['ability_types'] = extract_ability_info(abilities)
# Attacks
attacks = card.get('attacks', [])
row['attack_names'], row['attack_costs'], row['attack_damages'], row['attack_texts'], row['attack_convertedCosts'] = extract_attack_info(attacks)
# Weaknesses
weaknesses = card.get('weaknesses', [])
row['weakness_types'], row['weakness_values'] = extract_weakness_resistance(weaknesses)
# Resistances
resistances = card.get('resistances', [])
row['resistance_types'], row['resistance_values'] = extract_weakness_resistance(resistances)
# Set information
set_info = card.get('set', {})
if set_info:
row['set_id'] = set_info.get('id', '')
row['set_name'] = set_info.get('name', '')
row['set_series'] = set_info.get('series', '')
row['set_printedTotal'] = set_info.get('printedTotal', '')
row['set_total'] = set_info.get('total', '')
row['set_ptcgoCode'] = set_info.get('ptcgoCode', '')
row['set_releaseDate'] = set_info.get('releaseDate', '')
# Legalities
legalities = card.get('legalities', {})
row['legal_standard'] = legalities.get('standard', '')
row['legal_expanded'] = legalities.get('expanded', '')
row['legal_unlimited'] = legalities.get('unlimited', '')
# Images
images = card.get('images', {})
row['image_small'] = images.get('small', '')
row['image_large'] = images.get('large', '')
# TCGPlayer prices
tcgplayer = card.get('tcgplayer', {})
if tcgplayer:
row['tcgplayer_url'] = tcgplayer.get('url', '')
row['tcgplayer_updatedAt'] = tcgplayer.get('updatedAt', '')
# Extract all price types
prices = tcgplayer.get('prices', {})
tcg_prices = extract_prices(prices, 'tcgplayer')
row.update(tcg_prices)
# Cardmarket prices
cardmarket = card.get('cardmarket', {})
if cardmarket:
row['cardmarket_url'] = cardmarket.get('url', '')
row['cardmarket_updatedAt'] = cardmarket.get('updatedAt', '')
# Extract all price types
prices = cardmarket.get('prices', {})
cm_prices = extract_prices(prices, 'cardmarket')
row.update(cm_prices)
return row
def main():
# Directory containing the JSON files
data_dir = './pkm_data/cards/en'
# Get all JSON files
json_files = glob.glob(os.path.join(data_dir, '*.json'))
if not json_files:
print(f"No JSON files found in {data_dir}")
return
all_cards = []
# Process each JSON file
for json_file in json_files:
print(f"Processing {os.path.basename(json_file)}...")
try:
with open(json_file, 'r', encoding='utf-8') as f:
# Handle newline-delimited JSON
for line in f:
line = line.strip()
if line:
try:
# Try to parse as a single card
card = json.loads(line)
if isinstance(card, dict):
all_cards.append(process_card(card))
elif isinstance(card, list):
# If it's a list of cards
for c in card:
all_cards.append(process_card(c))
except json.JSONDecodeError:
# Try parsing the entire file as one JSON array
f.seek(0)
data = json.load(f)
if isinstance(data, list):
for card in data:
all_cards.append(process_card(card))
break
except Exception as e:
print(f"Error processing {json_file}: {str(e)}")
continue
# Create DataFrame
df = pd.DataFrame(all_cards)
# Sort by set and number
"""
df['number_int'] = df['number'].str.extract('(\d+)').astype(float, errors='ignore')
df = df.sort_values(['set_name', 'number_int', 'number'], na_position='last')
df = df.drop('number_int', axis=1)
"""
# Save to Excel
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f'pkm_cards_{timestamp}.xlsx'
# Create Excel writer with xlsxwriter engine
with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='Pokemon Cards', index=False)
# Get the workbook and worksheet
workbook = writer.book
worksheet = writer.sheets['Pokemon Cards']
# Add some formatting
header_format = workbook.add_format({
'bold': True,
'text_wrap': True,
'valign': 'top',
'fg_color': '#D7E4BD',
'border': 1
})
# Write headers with formatting
for col_num, value in enumerate(df.columns.values):
worksheet.write(0, col_num, value, header_format)
# Auto-adjust column widths
for i, col in enumerate(df.columns):
# Find maximum length in column
max_len = df[col].astype(str).str.len().max()
max_len = max(max_len, len(col)) + 2
# Cap column width at 50
max_len = min(max_len, 50)
worksheet.set_column(i, i, max_len)
print(f"\nSuccessfully created {output_file}")
print(f"Total cards processed: {len(df)}")
print(f"\nColumns in the spreadsheet:")
for col in df.columns:
print(f" - {col}")
if __name__ == "__main__":
main()