import time import os import csv import json from urllib.parse import urljoin, urlparse from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup import pandas as pd class ZaubaCorpScraper: def __init__(self, headless=True, output_dir="zaubacorp_data"): self.base_url = "https://www.zaubacorp.com" self.companies_list_url = "https://www.zaubacorp.com/companies-list" self.output_dir = output_dir self.driver = None self.scraped_companies = [] self.visited_urls = set() # Create output directory os.makedirs(self.output_dir, exist_ok=True) # Setup Chrome options self.chrome_options = Options() if headless: self.chrome_options.add_argument("--headless") self.chrome_options.add_argument("--no-sandbox") self.chrome_options.add_argument("--disable-dev-shm-usage") self.chrome_options.add_argument("--disable-gpu") self.chrome_options.add_argument("--window-size=1920,1080") self.chrome_options.add_argument("--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") # For macOS with Brave Browser (uncomment if using Brave) # self.chrome_options.binary_location = "/Applications/Brave Browser.app/Contents/MacOS/Brave Browser" def initialize_driver(self): """Initialize the WebDriver""" try: service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=self.chrome_options) self.driver.implicitly_wait(10) print("WebDriver initialized successfully") except Exception as e: print(f"Error initializing WebDriver: {e}") raise def close_driver(self): """Close the WebDriver""" if self.driver: self.driver.quit() print("WebDriver closed") def get_companies_list_pages(self): """Get all available pages from the companies list""" try: print(f"Loading companies list page: {self.companies_list_url}") self.driver.get(self.companies_list_url) time.sleep(3) # Wait for page to load WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) soup = BeautifulSoup(self.driver.page_source, 'html.parser') # Find pagination links pagination_links = [] # Look for pagination elements (common patterns) pagination_selectors = [ '.pagination a', '.pager a', '.page-link', 'a[href*="page="]', 'a[href*="companies-list"]' ] for selector in pagination_selectors: links = soup.select(selector) if links: for link in links: href = link.get('href') if href and ('page=' in href or 'companies-list' in href): full_url = urljoin(self.base_url, href) if full_url not in pagination_links: pagination_links.append(full_url) break # If no pagination found, just return the main page if not pagination_links: pagination_links = [self.companies_list_url] print(f"Found {len(pagination_links)} pages to scrape") return pagination_links except Exception as e: print(f"Error getting companies list pages: {e}") return [self.companies_list_url] def extract_company_links(self, page_url): """Extract company profile links from a companies list page""" try: print(f"Extracting company links from: {page_url}") self.driver.get(page_url) time.sleep(2) soup = BeautifulSoup(self.driver.page_source, 'html.parser') company_links = [] # Common selectors for company links link_selectors = [ 'a[href*="/company/"]', 'a[href*="company-detail"]', '.company-name a', '.company-link', 'a[title*="company"]' ] for selector in link_selectors: links = soup.select(selector) if links: for link in links: href = link.get('href') if href: full_url = urljoin(self.base_url, href) if full_url not in company_links: company_links.append(full_url) break # Fallback: look for any links that might be company profiles if not company_links: all_links = soup.find_all('a', href=True) for link in all_links: href = link.get('href') if href and ('company' in href.lower() or 'profile' in href.lower()): full_url = urljoin(self.base_url, href) if full_url not in company_links: company_links.append(full_url) print(f"Found {len(company_links)} company links on this page") return company_links except Exception as e: print(f"Error extracting company links from {page_url}: {e}") return [] def scrape_company_details(self, company_url): """Scrape detailed information from a company profile page""" try: print(f"Scraping company details: {company_url}") self.driver.get(company_url) time.sleep(2) soup = BeautifulSoup(self.driver.page_source, 'html.parser') company_data = { 'url': company_url, 'company_name': '', 'cin': '', 'registration_number': '', 'company_category': '', 'company_sub_category': '', 'class_of_company': '', 'roc': '', 'registration_date': '', 'company_status': '', 'authorized_capital': '', 'paid_up_capital': '', 'activity_code': '', 'email': '', 'address': '', 'state': '', 'pincode': '', 'country': '', 'directors': [], 'last_updated': '' } # Extract company name name_selectors = ['h1', '.company-name', '.main-heading', 'title'] for selector in name_selectors: element = soup.select_one(selector) if element and element.text.strip(): company_data['company_name'] = element.text.strip() break # Extract CIN cin_patterns = ['CIN', 'Corporate Identification Number', 'Company ID'] for pattern in cin_patterns: element = soup.find(text=lambda x: x and pattern in x) if element: parent = element.parent if parent: cin_text = parent.get_text() # Extract CIN pattern (usually alphanumeric) import re cin_match = re.search(r'[A-Z]\d{5}[A-Z]{2}\d{4}[A-Z]{3}\d{6}', cin_text) if cin_match: company_data['cin'] = cin_match.group() break # Extract other details using table data or key-value pairs tables = soup.find_all('table') for table in tables: rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) if len(cells) >= 2: key = cells[0].get_text().strip().lower() value = cells[1].get_text().strip() if 'registration' in key and 'number' in key: company_data['registration_number'] = value elif 'category' in key: company_data['company_category'] = value elif 'class' in key: company_data['class_of_company'] = value elif 'roc' in key: company_data['roc'] = value elif 'registration date' in key or 'incorporation' in key: company_data['registration_date'] = value elif 'status' in key: company_data['company_status'] = value elif 'authorized capital' in key: company_data['authorized_capital'] = value elif 'paid' in key and 'capital' in key: company_data['paid_up_capital'] = value elif 'activity' in key or 'business' in key: company_data['activity_code'] = value elif 'email' in key: company_data['email'] = value elif 'address' in key: company_data['address'] = value elif 'state' in key: company_data['state'] = value elif 'pincode' in key or 'pin' in key: company_data['pincode'] = value elif 'country' in key: company_data['country'] = value # Extract directors information directors_section = soup.find(text=lambda x: x and 'director' in x.lower()) if directors_section: # Look for director names in the surrounding area directors_container = directors_section.parent if directors_container: director_links = directors_container.find_all('a') for link in director_links: director_name = link.get_text().strip() if director_name and len(director_name) > 2: company_data['directors'].append(director_name) return company_data except Exception as e: print(f"Error scraping company details from {company_url}: {e}") return None def save_data(self, format='csv'): """Save scraped data to file""" if not self.scraped_companies: print("No data to save") return # Prepare data for saving companies_for_export = [] for company in self.scraped_companies: company_copy = company.copy() # Convert directors list to string company_copy['directors'] = '; '.join(company['directors']) if company['directors'] else '' companies_for_export.append(company_copy) if format.lower() == 'csv': csv_file = os.path.join(self.output_dir, 'zaubacorp_companies.csv') df = pd.DataFrame(companies_for_export) df.to_csv(csv_file, index=False, encoding='utf-8') print(f"Data saved to {csv_file}") elif format.lower() == 'json': json_file = os.path.join(self.output_dir, 'zaubacorp_companies.json') with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.scraped_companies, f, indent=2, ensure_ascii=False) print(f"Data saved to {json_file}") # Always save both formats if format != 'csv': csv_file = os.path.join(self.output_dir, 'zaubacorp_companies.csv') df = pd.DataFrame(companies_for_export) df.to_csv(csv_file, index=False, encoding='utf-8') if format != 'json': json_file = os.path.join(self.output_dir, 'zaubacorp_companies.json') with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.scraped_companies, f, indent=2, ensure_ascii=False) def scrape_companies(self, max_companies=None, max_pages=None): """Main method to scrape companies data""" try: self.initialize_driver() # Get all companies list pages list_pages = self.get_companies_list_pages() if max_pages: list_pages = list_pages[:max_pages] total_scraped = 0 for i, page_url in enumerate(list_pages, 1): if max_companies and total_scraped >= max_companies: break print(f"\n--- Processing page {i}/{len(list_pages)} ---") # Extract company links from this page company_links = self.extract_company_links(page_url) for j, company_url in enumerate(company_links, 1): if max_companies and total_scraped >= max_companies: break if company_url in self.visited_urls: continue self.visited_urls.add(company_url) print(f"Processing company {j}/{len(company_links)} on page {i}") # Scrape company details company_data = self.scrape_company_details(company_url) if company_data: self.scraped_companies.append(company_data) total_scraped += 1 print(f"Successfully scraped: {company_data.get('company_name', 'Unknown')}") # Add delay between requests time.sleep(1) # Save data periodically if total_scraped > 0 and total_scraped % 50 == 0: self.save_data() print(f"Saved {total_scraped} companies so far...") print(f"\n--- Scraping completed! ---") print(f"Total companies scraped: {len(self.scraped_companies)}") # Save final data self.save_data() except Exception as e: print(f"Error during scraping: {e}") finally: self.close_driver() def main(): """Main function to run the scraper""" print("ZaubaCorp Companies Scraper") print("=" * 40) # Configuration HEADLESS = True # Set to False to see browser window MAX_COMPANIES = 100 # Set to None for unlimited MAX_PAGES = 5 # Set to None for all pages OUTPUT_DIR = "zaubacorp_data" # Initialize and run scraper scraper = ZaubaCorpScraper(headless=HEADLESS, output_dir=OUTPUT_DIR) try: scraper.scrape_companies(max_companies=MAX_COMPANIES, max_pages=MAX_PAGES) except KeyboardInterrupt: print("\nScraping interrupted by user") scraper.save_data() scraper.close_driver() except Exception as e: print(f"Unexpected error: {e}") scraper.close_driver() if __name__ == "__main__": main()