Files
comp/zaubacorp_parallel_scraper.py
2025-08-18 23:16:46 +05:30

443 lines
18 KiB
Python

import asyncio
import aiohttp
import aiofiles
import pandas as pd
import json
import time
import logging
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import List, Dict, Optional
import random
class ZaubaCorpParallelScraper:
def __init__(self, max_workers=10, output_dir="zaubacorp_parallel_data"):
self.base_url = "https://www.zaubacorp.com"
self.companies_list_base = "https://www.zaubacorp.com/companies-list"
self.max_workers = max_workers
self.output_dir = output_dir
self.scraped_companies = []
self.failed_pages = []
self.lock = threading.Lock()
# Create output directory
os.makedirs(self.output_dir, exist_ok=True)
# Setup logging
self.setup_logging()
# Statistics
self.stats = {
'total_pages': 90769, # Based on your observation
'pages_processed': 0,
'companies_found': 0,
'companies_detailed': 0,
'failed_pages': 0,
'start_time': None,
'end_time': None
}
# User agents for rotation
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0'
]
def setup_logging(self):
"""Setup logging configuration"""
log_file = os.path.join(self.output_dir, 'parallel_scraper.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_random_headers(self):
"""Get random headers to avoid detection"""
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
async def fetch_page(self, session: aiohttp.ClientSession, url: str, retries: int = 3) -> Optional[str]:
"""Fetch a single page with retry logic"""
for attempt in range(retries):
try:
headers = self.get_random_headers()
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
return await response.text()
elif response.status == 429: # Rate limited
wait_time = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait_time)
continue
else:
self.logger.warning(f"HTTP {response.status} for {url}")
except Exception as e:
self.logger.error(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1:
await asyncio.sleep(random.uniform(1, 3))
return None
def parse_companies_list_page(self, html: str, page_num: int) -> List[Dict]:
"""Parse companies from a list page"""
try:
soup = BeautifulSoup(html, 'html.parser')
companies = []
# Find all table rows with company data
rows = soup.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 5: # Should have CIN, Name, Status, Capital, Address
try:
# Extract CIN and company URL
cin_cell = cells[0]
cin_link = cin_cell.find('a')
cin = cin_link.text.strip() if cin_link else cin_cell.text.strip()
company_url = cin_link.get('href') if cin_link else ''
# Extract company name
name_cell = cells[1]
name_link = name_cell.find('a')
company_name = name_link.text.strip() if name_link else name_cell.text.strip()
# Extract status
status = cells[2].text.strip()
# Extract paid up capital
paid_up_capital = cells[3].text.strip()
# Extract address
address = cells[4].text.strip()
if company_name and cin: # Only add if we have essential data
company_data = {
'cin': cin,
'company_name': company_name,
'status': status,
'paid_up_capital': paid_up_capital,
'address': address,
'company_url': urljoin(self.base_url, company_url) if company_url else '',
'page_number': page_num,
'scraped_at': datetime.now().isoformat()
}
companies.append(company_data)
except Exception as e:
self.logger.warning(f"Error parsing row on page {page_num}: {e}")
continue
return companies
except Exception as e:
self.logger.error(f"Error parsing page {page_num}: {e}")
return []
async def scrape_companies_list_page(self, session: aiohttp.ClientSession, page_num: int) -> List[Dict]:
"""Scrape a single companies list page"""
url = f"{self.companies_list_base}/p-{page_num}-company.html"
html = await self.fetch_page(session, url)
if html:
companies = self.parse_companies_list_page(html, page_num)
with self.lock:
self.stats['pages_processed'] += 1
self.stats['companies_found'] += len(companies)
self.logger.info(f"Page {page_num}: Found {len(companies)} companies")
return companies
else:
with self.lock:
self.failed_pages.append(page_num)
self.stats['failed_pages'] += 1
self.logger.error(f"Failed to fetch page {page_num}")
return []
async def scrape_company_details(self, session: aiohttp.ClientSession, company: Dict) -> Dict:
"""Scrape detailed information from a company profile page"""
if not company.get('company_url'):
return company
try:
html = await self.fetch_page(session, company['company_url'])
if html:
soup = BeautifulSoup(html, 'html.parser')
# Extract additional details from company page
# This is a basic extraction - can be enhanced based on page structure
# Look for tables with company details
tables = soup.find_all('table')
for table in tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all(['td', 'th'])
if len(cells) >= 2:
key = cells[0].get_text().strip().lower()
value = cells[1].get_text().strip()
# Map common fields
if 'registration number' in key:
company['registration_number'] = value
elif 'authorized capital' in key:
company['authorized_capital'] = value
elif 'company category' in key:
company['company_category'] = value
elif 'class of company' in key:
company['class_of_company'] = value
elif 'roc' in key:
company['roc'] = value
elif 'registration date' in key or 'incorporation' in key:
company['registration_date'] = value
elif 'email' in key:
company['email'] = value
elif 'phone' in key or 'mobile' in key:
company['phone'] = value
with self.lock:
self.stats['companies_detailed'] += 1
return company
except Exception as e:
self.logger.error(f"Error scraping company details for {company.get('company_name', 'Unknown')}: {e}")
return company
async def save_batch_data(self, companies: List[Dict], batch_num: int):
"""Save a batch of companies data"""
if not companies:
return
try:
# Save as JSON
json_file = os.path.join(self.output_dir, f'companies_batch_{batch_num}.json')
async with aiofiles.open(json_file, 'w', encoding='utf-8') as f:
await f.write(json.dumps(companies, indent=2, ensure_ascii=False))
# Save as CSV
csv_file = os.path.join(self.output_dir, f'companies_batch_{batch_num}.csv')
df = pd.DataFrame(companies)
df.to_csv(csv_file, index=False, encoding='utf-8')
self.logger.info(f"Saved batch {batch_num} with {len(companies)} companies")
except Exception as e:
self.logger.error(f"Error saving batch {batch_num}: {e}")
async def scrape_pages_batch(self, session: aiohttp.ClientSession, page_numbers: List[int], batch_num: int, scrape_details: bool = False):
"""Scrape a batch of pages"""
all_companies = []
# Create semaphore to limit concurrent requests per batch
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests per batch
async def scrape_single_page(page_num):
async with semaphore:
companies = await self.scrape_companies_list_page(session, page_num)
if scrape_details and companies:
# Scrape detailed information for each company
detailed_companies = []
for company in companies:
detailed_company = await self.scrape_company_details(session, company)
detailed_companies.append(detailed_company)
# Small delay between detail requests
await asyncio.sleep(random.uniform(0.1, 0.3))
return detailed_companies
return companies
# Process all pages in this batch
tasks = [scrape_single_page(page_num) for page_num in page_numbers]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect results
for result in results:
if isinstance(result, list):
all_companies.extend(result)
elif isinstance(result, Exception):
self.logger.error(f"Batch {batch_num} task failed: {result}")
# Save batch data
if all_companies:
await self.save_batch_data(all_companies, batch_num)
# Also add to main collection
with self.lock:
self.scraped_companies.extend(all_companies)
return all_companies
async def scrape_all_companies(self, start_page: int = 1, end_page: int = None, batch_size: int = 100, scrape_details: bool = False):
"""Main method to scrape all companies using parallel processing"""
self.stats['start_time'] = datetime.now()
end_page = end_page or self.stats['total_pages']
self.logger.info(f"Starting parallel scraping from page {start_page} to {end_page}")
self.logger.info(f"Batch size: {batch_size}, Max workers: {self.max_workers}")
self.logger.info(f"Scrape details: {scrape_details}")
# Create page batches
page_ranges = []
for i in range(start_page, end_page + 1, batch_size):
batch_end = min(i + batch_size - 1, end_page)
page_ranges.append(list(range(i, batch_end + 1)))
self.logger.info(f"Created {len(page_ranges)} batches")
# Configure connection limits
connector = aiohttp.TCPConnector(
limit=self.max_workers * 2,
limit_per_host=self.max_workers,
ttl_dns_cache=300,
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(total=60, connect=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# Process batches with limited concurrency
semaphore = asyncio.Semaphore(self.max_workers)
async def process_batch(batch_pages, batch_num):
async with semaphore:
return await self.scrape_pages_batch(session, batch_pages, batch_num, scrape_details)
# Create tasks for all batches
tasks = [
process_batch(batch_pages, i)
for i, batch_pages in enumerate(page_ranges, 1)
]
# Process batches
self.logger.info("Starting batch processing...")
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log results
successful_batches = sum(1 for r in results if isinstance(r, list))
failed_batches = len(results) - successful_batches
self.logger.info(f"Completed: {successful_batches} successful, {failed_batches} failed batches")
self.stats['end_time'] = datetime.now()
await self.save_final_results()
self.print_final_stats()
async def save_final_results(self):
"""Save final consolidated results"""
try:
if self.scraped_companies:
# Save consolidated JSON
json_file = os.path.join(self.output_dir, 'all_companies.json')
async with aiofiles.open(json_file, 'w', encoding='utf-8') as f:
await f.write(json.dumps(self.scraped_companies, indent=2, ensure_ascii=False))
# Save consolidated CSV
csv_file = os.path.join(self.output_dir, 'all_companies.csv')
df = pd.DataFrame(self.scraped_companies)
df.to_csv(csv_file, index=False, encoding='utf-8')
self.logger.info(f"Saved {len(self.scraped_companies)} companies to final files")
# Save statistics
stats_file = os.path.join(self.output_dir, 'scraping_statistics.json')
async with aiofiles.open(stats_file, 'w', encoding='utf-8') as f:
stats_copy = self.stats.copy()
if stats_copy['start_time']:
stats_copy['start_time'] = stats_copy['start_time'].isoformat()
if stats_copy['end_time']:
stats_copy['end_time'] = stats_copy['end_time'].isoformat()
await f.write(json.dumps(stats_copy, indent=2))
# Save failed pages
if self.failed_pages:
failed_file = os.path.join(self.output_dir, 'failed_pages.json')
async with aiofiles.open(failed_file, 'w', encoding='utf-8') as f:
await f.write(json.dumps(self.failed_pages, indent=2))
except Exception as e:
self.logger.error(f"Error saving final results: {e}")
def print_final_stats(self):
"""Print final statistics"""
duration = self.stats['end_time'] - self.stats['start_time']
print("\n" + "="*80)
print("ZAUBACORP PARALLEL SCRAPING COMPLETED")
print("="*80)
print(f"Total pages processed: {self.stats['pages_processed']:,}")
print(f"Total companies found: {self.stats['companies_found']:,}")
print(f"Companies with details: {self.stats['companies_detailed']:,}")
print(f"Failed pages: {self.stats['failed_pages']:,}")
print(f"Success rate: {(self.stats['pages_processed']/(self.stats['pages_processed']+self.stats['failed_pages'])*100):.1f}%")
print(f"Duration: {duration}")
print(f"Average speed: {self.stats['pages_processed']/duration.total_seconds():.2f} pages/second")
print(f"Companies per minute: {self.stats['companies_found']/(duration.total_seconds()/60):.0f}")
print(f"Output directory: {self.output_dir}")
print("="*80)
def main():
"""Main function to run the parallel scraper"""
print("ZaubaCorp Parallel Scraper")
print("=" * 50)
# Configuration
MAX_WORKERS = 20 # Adjust based on your system and network
BATCH_SIZE = 200 # Pages per batch
START_PAGE = 1
END_PAGE = 1000 # Set to None for all pages (90,769)
SCRAPE_DETAILS = False # Set to True to scrape company detail pages (much slower)
OUTPUT_DIR = "zaubacorp_parallel_data"
print(f"Configuration:")
print(f" Max workers: {MAX_WORKERS}")
print(f" Batch size: {BATCH_SIZE}")
print(f" Page range: {START_PAGE} to {END_PAGE or 90769}")
print(f" Scrape details: {SCRAPE_DETAILS}")
print(f" Output directory: {OUTPUT_DIR}")
# Create scraper
scraper = ZaubaCorpParallelScraper(
max_workers=MAX_WORKERS,
output_dir=OUTPUT_DIR
)
# Run scraper
try:
asyncio.run(scraper.scrape_all_companies(
start_page=START_PAGE,
end_page=END_PAGE,
batch_size=BATCH_SIZE,
scrape_details=SCRAPE_DETAILS
))
except KeyboardInterrupt:
print("\nScraping interrupted by user")
asyncio.run(scraper.save_final_results())
except Exception as e:
print(f"Unexpected error: {e}")
logging.error(f"Unexpected error: {e}")
if __name__ == "__main__":
main()