443 lines
18 KiB
Python
443 lines
18 KiB
Python
import asyncio
|
|
import aiohttp
|
|
import aiofiles
|
|
import pandas as pd
|
|
import json
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urljoin
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import threading
|
|
from typing import List, Dict, Optional
|
|
import random
|
|
|
|
class ZaubaCorpParallelScraper:
|
|
def __init__(self, max_workers=10, output_dir="zaubacorp_parallel_data"):
|
|
self.base_url = "https://www.zaubacorp.com"
|
|
self.companies_list_base = "https://www.zaubacorp.com/companies-list"
|
|
self.max_workers = max_workers
|
|
self.output_dir = output_dir
|
|
self.scraped_companies = []
|
|
self.failed_pages = []
|
|
self.lock = threading.Lock()
|
|
|
|
# Create output directory
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
|
# Setup logging
|
|
self.setup_logging()
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
'total_pages': 90769, # Based on your observation
|
|
'pages_processed': 0,
|
|
'companies_found': 0,
|
|
'companies_detailed': 0,
|
|
'failed_pages': 0,
|
|
'start_time': None,
|
|
'end_time': None
|
|
}
|
|
|
|
# User agents for rotation
|
|
self.user_agents = [
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0'
|
|
]
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
log_file = os.path.join(self.output_dir, 'parallel_scraper.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def get_random_headers(self):
|
|
"""Get random headers to avoid detection"""
|
|
return {
|
|
'User-Agent': random.choice(self.user_agents),
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
}
|
|
|
|
async def fetch_page(self, session: aiohttp.ClientSession, url: str, retries: int = 3) -> Optional[str]:
|
|
"""Fetch a single page with retry logic"""
|
|
for attempt in range(retries):
|
|
try:
|
|
headers = self.get_random_headers()
|
|
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
|
if response.status == 200:
|
|
return await response.text()
|
|
elif response.status == 429: # Rate limited
|
|
wait_time = (2 ** attempt) + random.uniform(0, 1)
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
self.logger.warning(f"HTTP {response.status} for {url}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Attempt {attempt + 1} failed for {url}: {e}")
|
|
if attempt < retries - 1:
|
|
await asyncio.sleep(random.uniform(1, 3))
|
|
|
|
return None
|
|
|
|
def parse_companies_list_page(self, html: str, page_num: int) -> List[Dict]:
|
|
"""Parse companies from a list page"""
|
|
try:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
companies = []
|
|
|
|
# Find all table rows with company data
|
|
rows = soup.find_all('tr')
|
|
|
|
for row in rows:
|
|
cells = row.find_all('td')
|
|
if len(cells) >= 5: # Should have CIN, Name, Status, Capital, Address
|
|
try:
|
|
# Extract CIN and company URL
|
|
cin_cell = cells[0]
|
|
cin_link = cin_cell.find('a')
|
|
cin = cin_link.text.strip() if cin_link else cin_cell.text.strip()
|
|
company_url = cin_link.get('href') if cin_link else ''
|
|
|
|
# Extract company name
|
|
name_cell = cells[1]
|
|
name_link = name_cell.find('a')
|
|
company_name = name_link.text.strip() if name_link else name_cell.text.strip()
|
|
|
|
# Extract status
|
|
status = cells[2].text.strip()
|
|
|
|
# Extract paid up capital
|
|
paid_up_capital = cells[3].text.strip()
|
|
|
|
# Extract address
|
|
address = cells[4].text.strip()
|
|
|
|
if company_name and cin: # Only add if we have essential data
|
|
company_data = {
|
|
'cin': cin,
|
|
'company_name': company_name,
|
|
'status': status,
|
|
'paid_up_capital': paid_up_capital,
|
|
'address': address,
|
|
'company_url': urljoin(self.base_url, company_url) if company_url else '',
|
|
'page_number': page_num,
|
|
'scraped_at': datetime.now().isoformat()
|
|
}
|
|
companies.append(company_data)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error parsing row on page {page_num}: {e}")
|
|
continue
|
|
|
|
return companies
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing page {page_num}: {e}")
|
|
return []
|
|
|
|
async def scrape_companies_list_page(self, session: aiohttp.ClientSession, page_num: int) -> List[Dict]:
|
|
"""Scrape a single companies list page"""
|
|
url = f"{self.companies_list_base}/p-{page_num}-company.html"
|
|
|
|
html = await self.fetch_page(session, url)
|
|
if html:
|
|
companies = self.parse_companies_list_page(html, page_num)
|
|
|
|
with self.lock:
|
|
self.stats['pages_processed'] += 1
|
|
self.stats['companies_found'] += len(companies)
|
|
|
|
self.logger.info(f"Page {page_num}: Found {len(companies)} companies")
|
|
return companies
|
|
else:
|
|
with self.lock:
|
|
self.failed_pages.append(page_num)
|
|
self.stats['failed_pages'] += 1
|
|
|
|
self.logger.error(f"Failed to fetch page {page_num}")
|
|
return []
|
|
|
|
async def scrape_company_details(self, session: aiohttp.ClientSession, company: Dict) -> Dict:
|
|
"""Scrape detailed information from a company profile page"""
|
|
if not company.get('company_url'):
|
|
return company
|
|
|
|
try:
|
|
html = await self.fetch_page(session, company['company_url'])
|
|
if html:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Extract additional details from company page
|
|
# This is a basic extraction - can be enhanced based on page structure
|
|
|
|
# Look for tables with company details
|
|
tables = soup.find_all('table')
|
|
for table in tables:
|
|
rows = table.find_all('tr')
|
|
for row in rows:
|
|
cells = row.find_all(['td', 'th'])
|
|
if len(cells) >= 2:
|
|
key = cells[0].get_text().strip().lower()
|
|
value = cells[1].get_text().strip()
|
|
|
|
# Map common fields
|
|
if 'registration number' in key:
|
|
company['registration_number'] = value
|
|
elif 'authorized capital' in key:
|
|
company['authorized_capital'] = value
|
|
elif 'company category' in key:
|
|
company['company_category'] = value
|
|
elif 'class of company' in key:
|
|
company['class_of_company'] = value
|
|
elif 'roc' in key:
|
|
company['roc'] = value
|
|
elif 'registration date' in key or 'incorporation' in key:
|
|
company['registration_date'] = value
|
|
elif 'email' in key:
|
|
company['email'] = value
|
|
elif 'phone' in key or 'mobile' in key:
|
|
company['phone'] = value
|
|
|
|
with self.lock:
|
|
self.stats['companies_detailed'] += 1
|
|
|
|
return company
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error scraping company details for {company.get('company_name', 'Unknown')}: {e}")
|
|
return company
|
|
|
|
async def save_batch_data(self, companies: List[Dict], batch_num: int):
|
|
"""Save a batch of companies data"""
|
|
if not companies:
|
|
return
|
|
|
|
try:
|
|
# Save as JSON
|
|
json_file = os.path.join(self.output_dir, f'companies_batch_{batch_num}.json')
|
|
async with aiofiles.open(json_file, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(companies, indent=2, ensure_ascii=False))
|
|
|
|
# Save as CSV
|
|
csv_file = os.path.join(self.output_dir, f'companies_batch_{batch_num}.csv')
|
|
df = pd.DataFrame(companies)
|
|
df.to_csv(csv_file, index=False, encoding='utf-8')
|
|
|
|
self.logger.info(f"Saved batch {batch_num} with {len(companies)} companies")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving batch {batch_num}: {e}")
|
|
|
|
async def scrape_pages_batch(self, session: aiohttp.ClientSession, page_numbers: List[int], batch_num: int, scrape_details: bool = False):
|
|
"""Scrape a batch of pages"""
|
|
all_companies = []
|
|
|
|
# Create semaphore to limit concurrent requests per batch
|
|
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests per batch
|
|
|
|
async def scrape_single_page(page_num):
|
|
async with semaphore:
|
|
companies = await self.scrape_companies_list_page(session, page_num)
|
|
|
|
if scrape_details and companies:
|
|
# Scrape detailed information for each company
|
|
detailed_companies = []
|
|
for company in companies:
|
|
detailed_company = await self.scrape_company_details(session, company)
|
|
detailed_companies.append(detailed_company)
|
|
# Small delay between detail requests
|
|
await asyncio.sleep(random.uniform(0.1, 0.3))
|
|
return detailed_companies
|
|
|
|
return companies
|
|
|
|
# Process all pages in this batch
|
|
tasks = [scrape_single_page(page_num) for page_num in page_numbers]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Collect results
|
|
for result in results:
|
|
if isinstance(result, list):
|
|
all_companies.extend(result)
|
|
elif isinstance(result, Exception):
|
|
self.logger.error(f"Batch {batch_num} task failed: {result}")
|
|
|
|
# Save batch data
|
|
if all_companies:
|
|
await self.save_batch_data(all_companies, batch_num)
|
|
|
|
# Also add to main collection
|
|
with self.lock:
|
|
self.scraped_companies.extend(all_companies)
|
|
|
|
return all_companies
|
|
|
|
async def scrape_all_companies(self, start_page: int = 1, end_page: int = None, batch_size: int = 100, scrape_details: bool = False):
|
|
"""Main method to scrape all companies using parallel processing"""
|
|
self.stats['start_time'] = datetime.now()
|
|
end_page = end_page or self.stats['total_pages']
|
|
|
|
self.logger.info(f"Starting parallel scraping from page {start_page} to {end_page}")
|
|
self.logger.info(f"Batch size: {batch_size}, Max workers: {self.max_workers}")
|
|
self.logger.info(f"Scrape details: {scrape_details}")
|
|
|
|
# Create page batches
|
|
page_ranges = []
|
|
for i in range(start_page, end_page + 1, batch_size):
|
|
batch_end = min(i + batch_size - 1, end_page)
|
|
page_ranges.append(list(range(i, batch_end + 1)))
|
|
|
|
self.logger.info(f"Created {len(page_ranges)} batches")
|
|
|
|
# Configure connection limits
|
|
connector = aiohttp.TCPConnector(
|
|
limit=self.max_workers * 2,
|
|
limit_per_host=self.max_workers,
|
|
ttl_dns_cache=300,
|
|
use_dns_cache=True,
|
|
)
|
|
|
|
timeout = aiohttp.ClientTimeout(total=60, connect=30)
|
|
|
|
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
|
|
# Process batches with limited concurrency
|
|
semaphore = asyncio.Semaphore(self.max_workers)
|
|
|
|
async def process_batch(batch_pages, batch_num):
|
|
async with semaphore:
|
|
return await self.scrape_pages_batch(session, batch_pages, batch_num, scrape_details)
|
|
|
|
# Create tasks for all batches
|
|
tasks = [
|
|
process_batch(batch_pages, i)
|
|
for i, batch_pages in enumerate(page_ranges, 1)
|
|
]
|
|
|
|
# Process batches
|
|
self.logger.info("Starting batch processing...")
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Log results
|
|
successful_batches = sum(1 for r in results if isinstance(r, list))
|
|
failed_batches = len(results) - successful_batches
|
|
|
|
self.logger.info(f"Completed: {successful_batches} successful, {failed_batches} failed batches")
|
|
|
|
self.stats['end_time'] = datetime.now()
|
|
await self.save_final_results()
|
|
self.print_final_stats()
|
|
|
|
async def save_final_results(self):
|
|
"""Save final consolidated results"""
|
|
try:
|
|
if self.scraped_companies:
|
|
# Save consolidated JSON
|
|
json_file = os.path.join(self.output_dir, 'all_companies.json')
|
|
async with aiofiles.open(json_file, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(self.scraped_companies, indent=2, ensure_ascii=False))
|
|
|
|
# Save consolidated CSV
|
|
csv_file = os.path.join(self.output_dir, 'all_companies.csv')
|
|
df = pd.DataFrame(self.scraped_companies)
|
|
df.to_csv(csv_file, index=False, encoding='utf-8')
|
|
|
|
self.logger.info(f"Saved {len(self.scraped_companies)} companies to final files")
|
|
|
|
# Save statistics
|
|
stats_file = os.path.join(self.output_dir, 'scraping_statistics.json')
|
|
async with aiofiles.open(stats_file, 'w', encoding='utf-8') as f:
|
|
stats_copy = self.stats.copy()
|
|
if stats_copy['start_time']:
|
|
stats_copy['start_time'] = stats_copy['start_time'].isoformat()
|
|
if stats_copy['end_time']:
|
|
stats_copy['end_time'] = stats_copy['end_time'].isoformat()
|
|
await f.write(json.dumps(stats_copy, indent=2))
|
|
|
|
# Save failed pages
|
|
if self.failed_pages:
|
|
failed_file = os.path.join(self.output_dir, 'failed_pages.json')
|
|
async with aiofiles.open(failed_file, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(self.failed_pages, indent=2))
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving final results: {e}")
|
|
|
|
def print_final_stats(self):
|
|
"""Print final statistics"""
|
|
duration = self.stats['end_time'] - self.stats['start_time']
|
|
|
|
print("\n" + "="*80)
|
|
print("ZAUBACORP PARALLEL SCRAPING COMPLETED")
|
|
print("="*80)
|
|
print(f"Total pages processed: {self.stats['pages_processed']:,}")
|
|
print(f"Total companies found: {self.stats['companies_found']:,}")
|
|
print(f"Companies with details: {self.stats['companies_detailed']:,}")
|
|
print(f"Failed pages: {self.stats['failed_pages']:,}")
|
|
print(f"Success rate: {(self.stats['pages_processed']/(self.stats['pages_processed']+self.stats['failed_pages'])*100):.1f}%")
|
|
print(f"Duration: {duration}")
|
|
print(f"Average speed: {self.stats['pages_processed']/duration.total_seconds():.2f} pages/second")
|
|
print(f"Companies per minute: {self.stats['companies_found']/(duration.total_seconds()/60):.0f}")
|
|
print(f"Output directory: {self.output_dir}")
|
|
print("="*80)
|
|
|
|
def main():
|
|
"""Main function to run the parallel scraper"""
|
|
print("ZaubaCorp Parallel Scraper")
|
|
print("=" * 50)
|
|
|
|
# Configuration
|
|
MAX_WORKERS = 20 # Adjust based on your system and network
|
|
BATCH_SIZE = 200 # Pages per batch
|
|
START_PAGE = 1
|
|
END_PAGE = 1000 # Set to None for all pages (90,769)
|
|
SCRAPE_DETAILS = False # Set to True to scrape company detail pages (much slower)
|
|
OUTPUT_DIR = "zaubacorp_parallel_data"
|
|
|
|
print(f"Configuration:")
|
|
print(f" Max workers: {MAX_WORKERS}")
|
|
print(f" Batch size: {BATCH_SIZE}")
|
|
print(f" Page range: {START_PAGE} to {END_PAGE or 90769}")
|
|
print(f" Scrape details: {SCRAPE_DETAILS}")
|
|
print(f" Output directory: {OUTPUT_DIR}")
|
|
|
|
# Create scraper
|
|
scraper = ZaubaCorpParallelScraper(
|
|
max_workers=MAX_WORKERS,
|
|
output_dir=OUTPUT_DIR
|
|
)
|
|
|
|
# Run scraper
|
|
try:
|
|
asyncio.run(scraper.scrape_all_companies(
|
|
start_page=START_PAGE,
|
|
end_page=END_PAGE,
|
|
batch_size=BATCH_SIZE,
|
|
scrape_details=SCRAPE_DETAILS
|
|
))
|
|
except KeyboardInterrupt:
|
|
print("\nScraping interrupted by user")
|
|
asyncio.run(scraper.save_final_results())
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
logging.error(f"Unexpected error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|