import time import os import csv import json import logging from datetime import datetime from urllib.parse import urljoin, urlparse from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup import pandas as pd import re from config import * class ZaubaCorpScraperEnhanced: def __init__(self, config=None): # Load configuration self.config = config or {} self.browser_config = self.config.get('browser', BROWSER_CONFIG) self.scraping_limits = self.config.get('scraping', SCRAPING_LIMITS) self.output_config = self.config.get('output', OUTPUT_CONFIG) self.urls = self.config.get('urls', URLS) self.selectors = self.config.get('selectors', SELECTORS) self.retry_config = self.config.get('retry', RETRY_CONFIG) # Initialize variables self.driver = None self.scraped_companies = [] self.visited_urls = set() self.failed_urls = set() self.stats = { 'pages_processed': 0, 'companies_found': 0, 'companies_scraped': 0, 'errors': 0, 'start_time': None, 'end_time': None } # Create output directory os.makedirs(self.output_config['output_dir'], exist_ok=True) # Setup logging self.setup_logging() # Setup Chrome options self.setup_chrome_options() def setup_logging(self): """Setup logging configuration""" log_config = LOGGING_CONFIG log_file = os.path.join(self.output_config['output_dir'], log_config['log_file']) logging.basicConfig( level=getattr(logging, log_config['log_level']), format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() if log_config['console_output'] else logging.NullHandler() ] ) self.logger = logging.getLogger(__name__) def setup_chrome_options(self): """Setup Chrome browser options""" self.chrome_options = Options() if self.browser_config['headless']: self.chrome_options.add_argument("--headless") self.chrome_options.add_argument("--no-sandbox") self.chrome_options.add_argument("--disable-dev-shm-usage") self.chrome_options.add_argument("--disable-gpu") self.chrome_options.add_argument(f"--window-size={self.browser_config['window_size']}") self.chrome_options.add_argument(f"--user-agent={self.browser_config['user_agent']}") self.chrome_options.add_argument("--disable-blink-features=AutomationControlled") self.chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) self.chrome_options.add_experimental_option('useAutomationExtension', False) # For macOS with Brave Browser (uncomment if using Brave) # self.chrome_options.binary_location = "/Applications/Brave Browser.app/Contents/MacOS/Brave Browser" def initialize_driver(self): """Initialize the WebDriver with retry logic""" for attempt in range(self.retry_config['max_retries']): try: service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=self.chrome_options) self.driver.implicitly_wait(self.browser_config['implicit_wait']) self.driver.set_page_load_timeout(self.browser_config['page_load_timeout']) # Execute script to avoid detection self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") self.logger.info("WebDriver initialized successfully") return True except Exception as e: self.logger.error(f"Attempt {attempt + 1} failed to initialize WebDriver: {e}") if attempt < self.retry_config['max_retries'] - 1: time.sleep(self.retry_config['retry_delay']) else: raise Exception(f"Failed to initialize WebDriver after {self.retry_config['max_retries']} attempts") def close_driver(self): """Close the WebDriver safely""" if self.driver: try: self.driver.quit() self.logger.info("WebDriver closed successfully") except Exception as e: self.logger.error(f"Error closing WebDriver: {e}") def safe_get(self, url, max_retries=None): """Safely navigate to a URL with retry logic""" max_retries = max_retries or self.retry_config['max_retries'] for attempt in range(max_retries): try: self.driver.get(url) time.sleep(2) # Wait for page to stabilize return True except TimeoutException: self.logger.warning(f"Timeout loading {url}, attempt {attempt + 1}") if attempt < max_retries - 1: time.sleep(self.retry_config['retry_delay']) except Exception as e: self.logger.error(f"Error loading {url}: {e}") if attempt < max_retries - 1: time.sleep(self.retry_config['retry_delay']) self.failed_urls.add(url) self.stats['errors'] += 1 return False def get_companies_list_pages(self): """Get all available pages from the companies list""" try: self.logger.info(f"Loading companies list page: {self.urls['companies_list_url']}") if not self.safe_get(self.urls['companies_list_url']): return [self.urls['companies_list_url']] # Wait for page to load completely try: WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) except TimeoutException: self.logger.warning("Page load timeout, proceeding anyway") soup = BeautifulSoup(self.driver.page_source, 'html.parser') pagination_links = set() # Try different pagination selectors for selector in self.selectors['pagination']: links = soup.select(selector) if links: for link in links: href = link.get('href') if href and ('page=' in href or 'companies-list' in href): full_url = urljoin(self.urls['base_url'], href) pagination_links.add(full_url) # Convert to sorted list pagination_links = sorted(list(pagination_links)) # If no pagination found, just return the main page if not pagination_links: pagination_links = [self.urls['companies_list_url']] self.logger.info(f"Found {len(pagination_links)} pages to scrape") return pagination_links except Exception as e: self.logger.error(f"Error getting companies list pages: {e}") return [self.urls['companies_list_url']] def extract_company_links(self, page_url): """Extract company profile links from a companies list page""" try: self.logger.info(f"Extracting company links from: {page_url}") if not self.safe_get(page_url): return [] soup = BeautifulSoup(self.driver.page_source, 'html.parser') company_links = set() # Try different selectors for company links for selector in self.selectors['company_links']: links = soup.select(selector) if links: for link in links: href = link.get('href') if href: full_url = urljoin(self.urls['base_url'], href) company_links.add(full_url) # Fallback: look for any links that might be company profiles if not company_links: all_links = soup.find_all('a', href=True) for link in all_links: href = link.get('href') if href and ('company' in href.lower() or 'profile' in href.lower()): full_url = urljoin(self.urls['base_url'], href) company_links.add(full_url) company_links = list(company_links) self.logger.info(f"Found {len(company_links)} company links on this page") self.stats['companies_found'] += len(company_links) return company_links except Exception as e: self.logger.error(f"Error extracting company links from {page_url}: {e}") self.stats['errors'] += 1 return [] def extract_text_from_element(self, soup, selectors, fallback=''): """Extract text from element using multiple selectors""" for selector in selectors: element = soup.select_one(selector) if element and element.text.strip(): return element.text.strip() return fallback def extract_cin_from_text(self, text): """Extract CIN (Corporate Identification Number) from text""" # CIN pattern: starts with letter, followed by 5 digits, 2 letters, 4 digits, 3 letters, 6 digits cin_pattern = r'[A-Z]\d{5}[A-Z]{2}\d{4}[A-Z]{3}\d{6}' match = re.search(cin_pattern, text) return match.group() if match else '' def extract_table_data(self, soup): """Extract data from tables on the page""" data = {} tables = soup.find_all('table') for table in tables: rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) if len(cells) >= 2: key = cells[0].get_text().strip().lower() value = cells[1].get_text().strip() # Map keys to our data structure key_mappings = { 'registration number': 'registration_number', 'company category': 'company_category', 'company sub category': 'company_sub_category', 'class of company': 'class_of_company', 'roc': 'roc', 'registration date': 'registration_date', 'incorporation date': 'registration_date', 'company status': 'company_status', 'authorized capital': 'authorized_capital', 'paid up capital': 'paid_up_capital', 'paid-up capital': 'paid_up_capital', 'activity code': 'activity_code', 'business activity': 'activity_code', 'email': 'email', 'email id': 'email', 'address': 'address', 'registered address': 'address', 'state': 'state', 'pincode': 'pincode', 'pin code': 'pincode', 'country': 'country' } for pattern, field in key_mappings.items(): if pattern in key: data[field] = value break return data def extract_directors(self, soup): """Extract directors information""" directors = [] # Look for sections containing director information director_keywords = ['director', 'management', 'officer'] for keyword in director_keywords: sections = soup.find_all(text=lambda x: x and keyword in x.lower()) for section in sections: parent = section.parent if parent: # Look for links or names near director sections links = parent.find_all('a') for link in links: director_name = link.get_text().strip() if director_name and len(director_name) > 2 and director_name not in directors: directors.append(director_name) return directors def scrape_company_details(self, company_url): """Scrape detailed information from a company profile page""" try: self.logger.info(f"Scraping company details: {company_url}") if not self.safe_get(company_url): return None soup = BeautifulSoup(self.driver.page_source, 'html.parser') # Initialize company data with default values company_data = COMPANY_FIELDS.copy() company_data['url'] = company_url company_data['last_updated'] = datetime.now().isoformat() # Extract company name company_data['company_name'] = self.extract_text_from_element( soup, self.selectors['company_name'] ) # Extract CIN from page text page_text = soup.get_text() company_data['cin'] = self.extract_cin_from_text(page_text) # Extract data from tables table_data = self.extract_table_data(soup) company_data.update(table_data) # Extract directors company_data['directors'] = self.extract_directors(soup) # Clean up the data for key, value in company_data.items(): if isinstance(value, str): company_data[key] = value.strip() self.logger.info(f"Successfully scraped: {company_data.get('company_name', 'Unknown')}") self.stats['companies_scraped'] += 1 return company_data except Exception as e: self.logger.error(f"Error scraping company details from {company_url}: {e}") self.stats['errors'] += 1 return None def save_data(self, force_save=False): """Save scraped data to files""" if not self.scraped_companies and not force_save: self.logger.info("No data to save") return try: # Prepare data for export companies_for_export = [] for company in self.scraped_companies: company_copy = company.copy() # Convert directors list to string for CSV company_copy['directors'] = '; '.join(company['directors']) if company['directors'] else '' companies_for_export.append(company_copy) # Save as CSV if 'csv' in self.output_config['save_formats']: csv_file = os.path.join(self.output_config['output_dir'], self.output_config['csv_filename']) df = pd.DataFrame(companies_for_export) df.to_csv(csv_file, index=False, encoding='utf-8') self.logger.info(f"Data saved to {csv_file}") # Save as JSON if 'json' in self.output_config['save_formats']: json_file = os.path.join(self.output_config['output_dir'], self.output_config['json_filename']) with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.scraped_companies, f, indent=2, ensure_ascii=False) self.logger.info(f"Data saved to {json_file}") # Save statistics stats_file = os.path.join(self.output_config['output_dir'], 'scraping_stats.json') with open(stats_file, 'w', encoding='utf-8') as f: json.dump(self.stats, f, indent=2) except Exception as e: self.logger.error(f"Error saving data: {e}") def print_stats(self): """Print scraping statistics""" if self.stats['start_time'] and self.stats['end_time']: duration = self.stats['end_time'] - self.stats['start_time'] duration_str = str(duration).split('.')[0] # Remove microseconds else: duration_str = "Unknown" print("\n" + "="*50) print("SCRAPING STATISTICS") print("="*50) print(f"Pages processed: {self.stats['pages_processed']}") print(f"Companies found: {self.stats['companies_found']}") print(f"Companies scraped: {self.stats['companies_scraped']}") print(f"Errors encountered: {self.stats['errors']}") print(f"Failed URLs: {len(self.failed_urls)}") print(f"Duration: {duration_str}") print("="*50) def scrape_companies(self): """Main method to scrape companies data""" self.stats['start_time'] = datetime.now() try: self.logger.info("Starting ZaubaCorp scraping...") self.initialize_driver() # Get all companies list pages list_pages = self.get_companies_list_pages() max_pages = self.scraping_limits.get('max_pages') if max_pages: list_pages = list_pages[:max_pages] max_companies = self.scraping_limits.get('max_companies') total_scraped = 0 for i, page_url in enumerate(list_pages, 1): if max_companies and total_scraped >= max_companies: break self.logger.info(f"Processing page {i}/{len(list_pages)}") self.stats['pages_processed'] += 1 # Extract company links from this page company_links = self.extract_company_links(page_url) for j, company_url in enumerate(company_links, 1): if max_companies and total_scraped >= max_companies: break if company_url in self.visited_urls: continue self.visited_urls.add(company_url) self.logger.info(f"Processing company {j}/{len(company_links)} on page {i}") # Scrape company details company_data = self.scrape_company_details(company_url) if company_data: self.scraped_companies.append(company_data) total_scraped += 1 # Add delay between requests time.sleep(self.scraping_limits.get('delay_between_requests', 1)) # Save data periodically save_interval = self.scraping_limits.get('save_interval', 50) if total_scraped > 0 and total_scraped % save_interval == 0: self.save_data() self.logger.info(f"Saved {total_scraped} companies so far...") self.stats['end_time'] = datetime.now() self.logger.info("Scraping completed!") # Save final data self.save_data(force_save=True) self.print_stats() except KeyboardInterrupt: self.logger.info("Scraping interrupted by user") self.stats['end_time'] = datetime.now() self.save_data(force_save=True) except Exception as e: self.logger.error(f"Error during scraping: {e}") self.stats['end_time'] = datetime.now() self.save_data(force_save=True) finally: self.close_driver() def main(): """Main function to run the scraper""" print("ZaubaCorp Companies Scraper Enhanced") print("=" * 50) # You can customize configuration here custom_config = { 'scraping': { 'max_companies': 100, # Limit for testing 'max_pages': 3, # Limit for testing 'delay_between_requests': 2, 'save_interval': 25 }, 'browser': { 'headless': True, # Set to False to see browser } } # Initialize and run scraper scraper = ZaubaCorpScraperEnhanced(config=custom_config) try: scraper.scrape_companies() except Exception as e: print(f"Unexpected error: {e}") logging.error(f"Unexpected error: {e}") if __name__ == "__main__": main()