Files
comp/zaubacorp_scraper_enhanced.py
2025-08-18 23:16:46 +05:30

510 lines
20 KiB
Python

import time
import os
import csv
import json
import logging
from datetime import datetime
from urllib.parse import urljoin, urlparse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import pandas as pd
import re
from config import *
class ZaubaCorpScraperEnhanced:
def __init__(self, config=None):
# Load configuration
self.config = config or {}
self.browser_config = self.config.get('browser', BROWSER_CONFIG)
self.scraping_limits = self.config.get('scraping', SCRAPING_LIMITS)
self.output_config = self.config.get('output', OUTPUT_CONFIG)
self.urls = self.config.get('urls', URLS)
self.selectors = self.config.get('selectors', SELECTORS)
self.retry_config = self.config.get('retry', RETRY_CONFIG)
# Initialize variables
self.driver = None
self.scraped_companies = []
self.visited_urls = set()
self.failed_urls = set()
self.stats = {
'pages_processed': 0,
'companies_found': 0,
'companies_scraped': 0,
'errors': 0,
'start_time': None,
'end_time': None
}
# Create output directory
os.makedirs(self.output_config['output_dir'], exist_ok=True)
# Setup logging
self.setup_logging()
# Setup Chrome options
self.setup_chrome_options()
def setup_logging(self):
"""Setup logging configuration"""
log_config = LOGGING_CONFIG
log_file = os.path.join(self.output_config['output_dir'], log_config['log_file'])
logging.basicConfig(
level=getattr(logging, log_config['log_level']),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler() if log_config['console_output'] else logging.NullHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_chrome_options(self):
"""Setup Chrome browser options"""
self.chrome_options = Options()
if self.browser_config['headless']:
self.chrome_options.add_argument("--headless")
self.chrome_options.add_argument("--no-sandbox")
self.chrome_options.add_argument("--disable-dev-shm-usage")
self.chrome_options.add_argument("--disable-gpu")
self.chrome_options.add_argument(f"--window-size={self.browser_config['window_size']}")
self.chrome_options.add_argument(f"--user-agent={self.browser_config['user_agent']}")
self.chrome_options.add_argument("--disable-blink-features=AutomationControlled")
self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.chrome_options.add_experimental_option('useAutomationExtension', False)
# For macOS with Brave Browser (uncomment if using Brave)
# self.chrome_options.binary_location = "/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"
def initialize_driver(self):
"""Initialize the WebDriver with retry logic"""
for attempt in range(self.retry_config['max_retries']):
try:
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=self.chrome_options)
self.driver.implicitly_wait(self.browser_config['implicit_wait'])
self.driver.set_page_load_timeout(self.browser_config['page_load_timeout'])
# Execute script to avoid detection
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self.logger.info("WebDriver initialized successfully")
return True
except Exception as e:
self.logger.error(f"Attempt {attempt + 1} failed to initialize WebDriver: {e}")
if attempt < self.retry_config['max_retries'] - 1:
time.sleep(self.retry_config['retry_delay'])
else:
raise Exception(f"Failed to initialize WebDriver after {self.retry_config['max_retries']} attempts")
def close_driver(self):
"""Close the WebDriver safely"""
if self.driver:
try:
self.driver.quit()
self.logger.info("WebDriver closed successfully")
except Exception as e:
self.logger.error(f"Error closing WebDriver: {e}")
def safe_get(self, url, max_retries=None):
"""Safely navigate to a URL with retry logic"""
max_retries = max_retries or self.retry_config['max_retries']
for attempt in range(max_retries):
try:
self.driver.get(url)
time.sleep(2) # Wait for page to stabilize
return True
except TimeoutException:
self.logger.warning(f"Timeout loading {url}, attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(self.retry_config['retry_delay'])
except Exception as e:
self.logger.error(f"Error loading {url}: {e}")
if attempt < max_retries - 1:
time.sleep(self.retry_config['retry_delay'])
self.failed_urls.add(url)
self.stats['errors'] += 1
return False
def get_companies_list_pages(self):
"""Get all available pages from the companies list"""
try:
self.logger.info(f"Loading companies list page: {self.urls['companies_list_url']}")
if not self.safe_get(self.urls['companies_list_url']):
return [self.urls['companies_list_url']]
# Wait for page to load completely
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
except TimeoutException:
self.logger.warning("Page load timeout, proceeding anyway")
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
pagination_links = set()
# Try different pagination selectors
for selector in self.selectors['pagination']:
links = soup.select(selector)
if links:
for link in links:
href = link.get('href')
if href and ('page=' in href or 'companies-list' in href):
full_url = urljoin(self.urls['base_url'], href)
pagination_links.add(full_url)
# Convert to sorted list
pagination_links = sorted(list(pagination_links))
# If no pagination found, just return the main page
if not pagination_links:
pagination_links = [self.urls['companies_list_url']]
self.logger.info(f"Found {len(pagination_links)} pages to scrape")
return pagination_links
except Exception as e:
self.logger.error(f"Error getting companies list pages: {e}")
return [self.urls['companies_list_url']]
def extract_company_links(self, page_url):
"""Extract company profile links from a companies list page"""
try:
self.logger.info(f"Extracting company links from: {page_url}")
if not self.safe_get(page_url):
return []
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
company_links = set()
# Try different selectors for company links
for selector in self.selectors['company_links']:
links = soup.select(selector)
if links:
for link in links:
href = link.get('href')
if href:
full_url = urljoin(self.urls['base_url'], href)
company_links.add(full_url)
# Fallback: look for any links that might be company profiles
if not company_links:
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href')
if href and ('company' in href.lower() or 'profile' in href.lower()):
full_url = urljoin(self.urls['base_url'], href)
company_links.add(full_url)
company_links = list(company_links)
self.logger.info(f"Found {len(company_links)} company links on this page")
self.stats['companies_found'] += len(company_links)
return company_links
except Exception as e:
self.logger.error(f"Error extracting company links from {page_url}: {e}")
self.stats['errors'] += 1
return []
def extract_text_from_element(self, soup, selectors, fallback=''):
"""Extract text from element using multiple selectors"""
for selector in selectors:
element = soup.select_one(selector)
if element and element.text.strip():
return element.text.strip()
return fallback
def extract_cin_from_text(self, text):
"""Extract CIN (Corporate Identification Number) from text"""
# CIN pattern: starts with letter, followed by 5 digits, 2 letters, 4 digits, 3 letters, 6 digits
cin_pattern = r'[A-Z]\d{5}[A-Z]{2}\d{4}[A-Z]{3}\d{6}'
match = re.search(cin_pattern, text)
return match.group() if match else ''
def extract_table_data(self, soup):
"""Extract data from tables on the page"""
data = {}
tables = soup.find_all('table')
for table in tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all(['td', 'th'])
if len(cells) >= 2:
key = cells[0].get_text().strip().lower()
value = cells[1].get_text().strip()
# Map keys to our data structure
key_mappings = {
'registration number': 'registration_number',
'company category': 'company_category',
'company sub category': 'company_sub_category',
'class of company': 'class_of_company',
'roc': 'roc',
'registration date': 'registration_date',
'incorporation date': 'registration_date',
'company status': 'company_status',
'authorized capital': 'authorized_capital',
'paid up capital': 'paid_up_capital',
'paid-up capital': 'paid_up_capital',
'activity code': 'activity_code',
'business activity': 'activity_code',
'email': 'email',
'email id': 'email',
'address': 'address',
'registered address': 'address',
'state': 'state',
'pincode': 'pincode',
'pin code': 'pincode',
'country': 'country'
}
for pattern, field in key_mappings.items():
if pattern in key:
data[field] = value
break
return data
def extract_directors(self, soup):
"""Extract directors information"""
directors = []
# Look for sections containing director information
director_keywords = ['director', 'management', 'officer']
for keyword in director_keywords:
sections = soup.find_all(text=lambda x: x and keyword in x.lower())
for section in sections:
parent = section.parent
if parent:
# Look for links or names near director sections
links = parent.find_all('a')
for link in links:
director_name = link.get_text().strip()
if director_name and len(director_name) > 2 and director_name not in directors:
directors.append(director_name)
return directors
def scrape_company_details(self, company_url):
"""Scrape detailed information from a company profile page"""
try:
self.logger.info(f"Scraping company details: {company_url}")
if not self.safe_get(company_url):
return None
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
# Initialize company data with default values
company_data = COMPANY_FIELDS.copy()
company_data['url'] = company_url
company_data['last_updated'] = datetime.now().isoformat()
# Extract company name
company_data['company_name'] = self.extract_text_from_element(
soup, self.selectors['company_name']
)
# Extract CIN from page text
page_text = soup.get_text()
company_data['cin'] = self.extract_cin_from_text(page_text)
# Extract data from tables
table_data = self.extract_table_data(soup)
company_data.update(table_data)
# Extract directors
company_data['directors'] = self.extract_directors(soup)
# Clean up the data
for key, value in company_data.items():
if isinstance(value, str):
company_data[key] = value.strip()
self.logger.info(f"Successfully scraped: {company_data.get('company_name', 'Unknown')}")
self.stats['companies_scraped'] += 1
return company_data
except Exception as e:
self.logger.error(f"Error scraping company details from {company_url}: {e}")
self.stats['errors'] += 1
return None
def save_data(self, force_save=False):
"""Save scraped data to files"""
if not self.scraped_companies and not force_save:
self.logger.info("No data to save")
return
try:
# Prepare data for export
companies_for_export = []
for company in self.scraped_companies:
company_copy = company.copy()
# Convert directors list to string for CSV
company_copy['directors'] = '; '.join(company['directors']) if company['directors'] else ''
companies_for_export.append(company_copy)
# Save as CSV
if 'csv' in self.output_config['save_formats']:
csv_file = os.path.join(self.output_config['output_dir'], self.output_config['csv_filename'])
df = pd.DataFrame(companies_for_export)
df.to_csv(csv_file, index=False, encoding='utf-8')
self.logger.info(f"Data saved to {csv_file}")
# Save as JSON
if 'json' in self.output_config['save_formats']:
json_file = os.path.join(self.output_config['output_dir'], self.output_config['json_filename'])
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(self.scraped_companies, f, indent=2, ensure_ascii=False)
self.logger.info(f"Data saved to {json_file}")
# Save statistics
stats_file = os.path.join(self.output_config['output_dir'], 'scraping_stats.json')
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(self.stats, f, indent=2)
except Exception as e:
self.logger.error(f"Error saving data: {e}")
def print_stats(self):
"""Print scraping statistics"""
if self.stats['start_time'] and self.stats['end_time']:
duration = self.stats['end_time'] - self.stats['start_time']
duration_str = str(duration).split('.')[0] # Remove microseconds
else:
duration_str = "Unknown"
print("\n" + "="*50)
print("SCRAPING STATISTICS")
print("="*50)
print(f"Pages processed: {self.stats['pages_processed']}")
print(f"Companies found: {self.stats['companies_found']}")
print(f"Companies scraped: {self.stats['companies_scraped']}")
print(f"Errors encountered: {self.stats['errors']}")
print(f"Failed URLs: {len(self.failed_urls)}")
print(f"Duration: {duration_str}")
print("="*50)
def scrape_companies(self):
"""Main method to scrape companies data"""
self.stats['start_time'] = datetime.now()
try:
self.logger.info("Starting ZaubaCorp scraping...")
self.initialize_driver()
# Get all companies list pages
list_pages = self.get_companies_list_pages()
max_pages = self.scraping_limits.get('max_pages')
if max_pages:
list_pages = list_pages[:max_pages]
max_companies = self.scraping_limits.get('max_companies')
total_scraped = 0
for i, page_url in enumerate(list_pages, 1):
if max_companies and total_scraped >= max_companies:
break
self.logger.info(f"Processing page {i}/{len(list_pages)}")
self.stats['pages_processed'] += 1
# Extract company links from this page
company_links = self.extract_company_links(page_url)
for j, company_url in enumerate(company_links, 1):
if max_companies and total_scraped >= max_companies:
break
if company_url in self.visited_urls:
continue
self.visited_urls.add(company_url)
self.logger.info(f"Processing company {j}/{len(company_links)} on page {i}")
# Scrape company details
company_data = self.scrape_company_details(company_url)
if company_data:
self.scraped_companies.append(company_data)
total_scraped += 1
# Add delay between requests
time.sleep(self.scraping_limits.get('delay_between_requests', 1))
# Save data periodically
save_interval = self.scraping_limits.get('save_interval', 50)
if total_scraped > 0 and total_scraped % save_interval == 0:
self.save_data()
self.logger.info(f"Saved {total_scraped} companies so far...")
self.stats['end_time'] = datetime.now()
self.logger.info("Scraping completed!")
# Save final data
self.save_data(force_save=True)
self.print_stats()
except KeyboardInterrupt:
self.logger.info("Scraping interrupted by user")
self.stats['end_time'] = datetime.now()
self.save_data(force_save=True)
except Exception as e:
self.logger.error(f"Error during scraping: {e}")
self.stats['end_time'] = datetime.now()
self.save_data(force_save=True)
finally:
self.close_driver()
def main():
"""Main function to run the scraper"""
print("ZaubaCorp Companies Scraper Enhanced")
print("=" * 50)
# You can customize configuration here
custom_config = {
'scraping': {
'max_companies': 100, # Limit for testing
'max_pages': 3, # Limit for testing
'delay_between_requests': 2,
'save_interval': 25
},
'browser': {
'headless': True, # Set to False to see browser
}
}
# Initialize and run scraper
scraper = ZaubaCorpScraperEnhanced(config=custom_config)
try:
scraper.scrape_companies()
except Exception as e:
print(f"Unexpected error: {e}")
logging.error(f"Unexpected error: {e}")
if __name__ == "__main__":
main()