Last active
April 18, 2025 18:18
-
-
Save pedrohenriquebr/2856254338fe473178a66053e027db97 to your computer and use it in GitHub Desktop.
Webscrapper for linkedin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ============================================================================== | |
| # Necessary Imports | |
| # ============================================================================== | |
| import asyncio | |
| import hashlib | |
| import os | |
| import time as tm # For scraper delays | |
| import json # To load config if needed in the future | |
| from typing import List, Optional, Dict, Any, Set, Tuple | |
| from urllib.parse import quote, urlencode | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langdetect import detect, LangDetectException # Optional | |
| from dotenv import load_dotenv | |
| from playwright.async_api import async_playwright, Playwright, Browser, Page, Locator, Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError | |
| # YOU CANGE HERE TO YOUR TARGET LANGUAGE | |
| JOBS_COLUMNS_MAPPER_ENG_PT = { | |
| 'company_name':'Empresa', | |
| 'job_title': 'Titulo da Vaga', | |
| 'job_url': 'Link da Vaga', | |
| 'source': 'Fonte', | |
| 'location': 'Localizacao', | |
| 'date_posted': 'Data Postagem', | |
| 'job_id': 'ID Unico' | |
| } | |
| JOBS_COLUMNS_MAPPER_PT_ENG = {JOBS_COLUMNS_MAPPER_ENG_PT[key]: key for key in JOBS_COLUMNS_MAPPER_ENG_PT.keys()} | |
| def generate_job_hash(identifier: str, company: Optional[str] = None, title: Optional[str] = None) -> str: | |
| """Generates a SHA256 hash to uniquely identify a job (based on the job URL).""" | |
| # Uses identifier (job_url) as base, adds others to reduce collisions (rare) | |
| raw_id = f"{identifier}-{company}-{title}".lower() | |
| return hashlib.sha256(raw_id.encode()).hexdigest() | |
| # ============================================================================== | |
| # LinkedInScraper Class | |
| # ============================================================================== | |
| class LinkedInScraper: | |
| """ | |
| Class to search, filter, and save job listings from LinkedIn | |
| using an unofficial guest API. | |
| """ | |
| # Default values that can be overridden in __init__ | |
| DEFAULT_HEADERS = { | |
| 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', | |
| 'Accept-Language': 'en-US,en;q=0.9,pt-BR;q=0.8,pt;q=0.7', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', | |
| } | |
| DEFAULT_TIMESPAN = "r604800" # Past Week | |
| DEFAULT_PAGES_TO_SCRAPE = 3 | |
| DEFAULT_REQUEST_RETRIES = 3 | |
| DEFAULT_REQUEST_DELAY = 4 # Increase the default delay slightly | |
| def __init__( | |
| self, | |
| search_queries: List[Dict[str, str]], | |
| output_filename: str = "linkedin_scraper_jobs.xlsx", | |
| headers: Optional[Dict[str, str]] = None, | |
| proxies: Optional[Dict[str, str]] = None, | |
| title_exclude: Optional[List[str]] = None, | |
| title_include: Optional[List[str]] = None, | |
| company_exclude: Optional[List[str]] = None, | |
| timespan: str = DEFAULT_TIMESPAN, | |
| pages_to_scrape: int = DEFAULT_PAGES_TO_SCRAPE, | |
| request_retries: int = DEFAULT_REQUEST_RETRIES, | |
| request_delay: int = DEFAULT_REQUEST_DELAY | |
| ): | |
| """ | |
| Initializes the scraper with configurations. | |
| Args: | |
| search_queries: List of dictionaries, each with 'keywords' and 'location' (and optionally 'f_WT'). | |
| output_filename: Name of the Excel file to save/read data. | |
| headers: HTTP Headers for the requests. Uses DEFAULT_HEADERS if None. | |
| proxies: Proxies for the requests. | |
| title_exclude: List of keywords to exclude jobs by title. | |
| title_include: List of keywords that MUST be in the title. | |
| company_exclude: List of companies to exclude. | |
| timespan: Time filter (e.g., "r604800" for Past Week). | |
| pages_to_scrape: How many pages of results to fetch per query. | |
| request_retries: Number of attempts for each HTTP request. | |
| request_delay: Base delay (in seconds) between requests/attempts. | |
| """ | |
| self.search_queries = search_queries | |
| self.output_filename = output_filename | |
| self.headers = headers if headers is not None else self.DEFAULT_HEADERS | |
| self.proxies = proxies if proxies is not None else {} | |
| self.title_exclude = title_exclude if title_exclude is not None else [] | |
| self.title_include = title_include if title_include is not None else [] | |
| self.company_exclude = company_exclude if company_exclude is not None else [] | |
| self.timespan = timespan | |
| self.pages_to_scrape = pages_to_scrape | |
| self.request_retries = request_retries | |
| self.request_delay = request_delay | |
| print(f"INFO: LinkedInScraper initialized to save in '{self.output_filename}'.") | |
| print(f"INFO: Queries: {len(self.search_queries)}, Pages/Query: {self.pages_to_scrape}, Timespan: {self.timespan}") | |
| # --- Private Methods (Internal Logic) --- | |
| def _make_request(self, url: str) -> Optional[BeautifulSoup]: | |
| """Performs the GET request with retries and returns the BeautifulSoup object.""" | |
| print(f"SCRAPER: Requesting URL: {url}") | |
| current_delay = self.request_delay | |
| for i in range(self.request_retries): | |
| try: | |
| tm.sleep(current_delay + (i * 0.5)) # Delay before attempt | |
| response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=20) # Longer timeout | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| return soup | |
| except requests.exceptions.Timeout: | |
| print(f"SCRAPER WARN: Timeout for URL: {url}, retry {i+1}/{self.request_retries}...") | |
| except requests.exceptions.RequestException as e: | |
| status_code = e.response.status_code if e.response is not None else 'N/A' | |
| print(f"SCRAPER ERROR: Request failed: Status {status_code}, URL: {url}, Error: {e}, retry {i+1}/{self.request_retries}...") | |
| if hasattr(e, 'response') and e.response is not None and e.response.status_code == 429: | |
| print("SCRAPER WARN: Rate limited (429). Increasing delay.") | |
| current_delay *= 1.5 # Increase delay if rate limited | |
| except Exception as e: | |
| print(f"SCRAPER ERROR: Unexpected error during request: {e}") | |
| return None # Don't retry | |
| print(f"SCRAPER ERROR: Failed to retrieve URL after {self.request_retries} retries: {url}") | |
| return None | |
| def _parse_job_card(self, card: Any) -> Optional[Dict[str, Any]]: | |
| """Extracts data from a single job card (HTML element).""" | |
| # (Logic of _parse_job_card remains the same as transform_api_job_card, | |
| # but now it's a class method. It could access self if needed, but doesn't here) | |
| try: | |
| job_div = card.find('div', class_='base-search-card') | |
| if not job_div: | |
| job_div = card.find('div', class_='job-search-card') | |
| if not job_div: return None | |
| title_tag = job_div.find(['h3', 'span'], class_='base-search-card__title') # Try h3 or span | |
| company_tag = job_div.find(['h4', 'a'], class_='base-search-card__subtitle') # Try h4 or a | |
| location_tag = job_div.find('span', class_='job-search-card__location') | |
| date_tag = job_div.find('time', class_='job-search-card__listdate') or job_div.find('time', class_='job-search-card__listdate--new') | |
| link_tag = job_div.find('a', class_='base-card__full-link') # Main card link | |
| job_url = None | |
| job_posting_id = None # Initialize | |
| if link_tag and 'href' in link_tag.attrs: | |
| url_raw = link_tag['href'] | |
| if '/jobs/view/' in url_raw: | |
| job_url = url_raw.split('?')[0] # Get base job URL | |
| try: job_posting_id = job_url.split('/jobs/view/')[1].split('/')[0] | |
| except: pass | |
| if not job_url: # Try getting URL via data-entity-urn if link failed | |
| entity_urn = job_div.get('data-entity-urn') or card.get('data-entity-urn') # Try on div or li | |
| if entity_urn and 'jobPosting' in entity_urn: | |
| try: | |
| job_posting_id = entity_urn.split(':')[-1] | |
| job_url = f'https://www.linkedin.com/jobs/view/{job_posting_id}/' | |
| except: pass | |
| if not title_tag or not job_url: return None # Title and Job URL are required | |
| # Text cleanup | |
| title = title_tag.text.strip() | |
| company_raw = company_tag.find('a') if company_tag and company_tag.find('a') else company_tag | |
| company = company_raw.text.strip().replace('\n', ' ') if company_raw else None | |
| location = location_tag.text.strip() if location_tag else None | |
| date = date_tag['datetime'] if date_tag and 'datetime' in date_tag.attrs else None | |
| # Return dictionary with consistent key names | |
| return { | |
| 'job_title': title, | |
| 'company_name': company, | |
| 'location': location, | |
| 'date_posted': date, | |
| 'job_url': job_url, # JOB URL | |
| 'source': 'scraper' | |
| } | |
| except Exception as e: | |
| # print(f"SCRAPER WARN: Error parsing job card: {e}") # Can be too verbose | |
| return None | |
| def _filter_jobs(self, joblist: List[Dict]) -> List[Dict]: | |
| """Filters the list of jobs based on instance criteria.""" | |
| if not joblist: return [] | |
| original_count = len(joblist) | |
| print(f"SCRAPER: Filtering {original_count} raw jobs...") | |
| # Jobs should already have the correct keys ('job_title', 'company_name') from _parse_job_card | |
| if self.title_exclude: | |
| joblist = [job for job in joblist if not any(word.lower() in (job.get('job_title') or '').lower() for word in self.title_exclude)] | |
| if self.title_include: | |
| joblist = [job for job in joblist if any(word.lower() in (job.get('job_title') or '').lower() for word in self.title_include)] | |
| if self.company_exclude: | |
| joblist = [job for job in joblist if not any(word.lower() in (job.get('company_name') or '').lower() for word in self.company_exclude)] | |
| print(f"SCRAPER: Filtering complete. {len(joblist)} jobs remaining from {original_count}.") | |
| return joblist | |
| # --- Public Methods (Interface for User/Notebook) --- | |
| def fetch_raw_jobs(self) -> List[Dict[str, Any]]: | |
| """Fetches jobs from all configured queries and pages.""" | |
| print("\nINFO: --- Starting Job Fetching ---") | |
| all_raw_jobs = [] | |
| processed_urls_this_run = set() | |
| for query in self.search_queries: | |
| kw = query['keywords'] | |
| loc = query['location'] | |
| f_wt = query.get('f_WT', '') # Default to any type | |
| print(f"\nSCRAPER: Processing Query: Keywords='{kw}', Location='{loc}', Type='{f_wt}'") | |
| kw_encoded = quote(kw) | |
| loc_encoded = quote(loc) | |
| for i in range(self.pages_to_scrape): | |
| start_index = 25 * i | |
| url = ( | |
| f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search" | |
| f"?keywords={kw_encoded}" | |
| f"&location={loc_encoded}" | |
| f"&f_WT={f_wt}" | |
| f"&f_TPR={self.timespan}" | |
| f"&start={start_index}" | |
| ) | |
| soup = self._make_request(url) # Use internal request method | |
| if soup: | |
| # Extract cards based on the returned HTML | |
| # The API returns HTML, so we use find_all | |
| job_cards = soup.find_all('li') # Try 'li' first | |
| if not job_cards: | |
| job_cards = soup.find_all('div', class_='job-search-card') # Fallback | |
| if not job_cards: | |
| print(f"SCRAPER WARN: No job card elements (li or div.job-search-card) found on page {i} for query '{kw}'. Content might have changed.") | |
| # Not necessarily stopping the query, could be an empty page at the end | |
| continue # Try next page | |
| found_on_page = 0 | |
| for card in job_cards: | |
| job_data = self._parse_job_card(card) # Use internal parse method | |
| if job_data and job_data.get('job_url') and job_data['job_url'] not in processed_urls_this_run: | |
| all_raw_jobs.append(job_data) | |
| processed_urls_this_run.add(job_data['job_url']) | |
| found_on_page += 1 | |
| print(f"SCRAPER: Found {found_on_page} new job(s) on page {i}.") | |
| else: | |
| print(f"SCRAPER WARN: Failed to fetch or parse page {i} for query '{kw}'. Stopping this query.") | |
| break # Stop current query if a page fails completely | |
| print(f"\nSCRAPER: Fetching complete. Found {len(all_raw_jobs)} raw job listings total.") | |
| return all_raw_jobs | |
| def load_existing_data(self) -> Tuple[pd.DataFrame, Set[str]]: | |
| """Loads data from the existing Excel file and returns DataFrame and set of IDs.""" | |
| existing_ids = set() | |
| existing_df = pd.DataFrame() | |
| if os.path.exists(self.output_filename): | |
| print(f"INFO: Loading existing data from '{self.output_filename}'...") | |
| try: | |
| existing_df = pd.read_excel(self.output_filename) | |
| # Rename PT columns to EN internally for consistent processing | |
| existing_df = existing_df.rename(columns=JOBS_COLUMNS_MAPPER_PT_ENG) | |
| if 'job_id' in existing_df.columns: | |
| # Handle potential NaNs before converting to string | |
| existing_ids = set(existing_df['job_id'].dropna().astype(str).unique()) | |
| print(f"INFO: Found {len(existing_ids)} existing job IDs.") | |
| else: | |
| print(f"WARN: 'job_id' column not found in '{self.output_filename}'.") | |
| except Exception as e: | |
| print(f"WARN: Could not load/parse '{self.output_filename}'. Error: {e}") | |
| existing_df = pd.DataFrame() # Reset on error | |
| else: | |
| print(f"INFO: No existing file '{self.output_filename}' found.") | |
| return existing_df, existing_ids | |
| def deduplicate_and_prepare(self, fetched_jobs: List[Dict], existing_ids: Set[str]) -> List[Dict]: | |
| """Filters jobs, generates hash, and removes duplicates against existing IDs.""" | |
| filtered_jobs = self._filter_jobs(fetched_jobs) # Use internal filter method | |
| new_unique_jobs = [] | |
| print(f"SCRAPER: Deduplicating {len(filtered_jobs)} filtered jobs against {len(existing_ids)} existing IDs...") | |
| for job in filtered_jobs: | |
| # HERE IS THE CALL THAT CAUSED THE PREVIOUS ERROR: Uses the global function defined BEFORE the class | |
| job_hash = generate_job_hash(identifier=job['job_url'], company=job.get('company_name'), title=job.get('job_title')) | |
| if job_hash not in existing_ids: | |
| job['job_id'] = job_hash # Add hash to the dictionary | |
| new_unique_jobs.append(job) | |
| existing_ids.add(job_hash) # Also add to the set to avoid internal duplicates | |
| # else: # Optional: Log found duplicate | |
| # print(f" SCRAPER: Skipping duplicate job: {job.get('job_title','N/A')}") | |
| print(f"SCRAPER: Found {len(new_unique_jobs)} new unique jobs after deduplication.") | |
| return new_unique_jobs | |
| def save_results(self, jobs_to_save: List[Dict], existing_df: pd.DataFrame): | |
| """Combines new jobs with existing ones and saves to the Excel file.""" | |
| if not jobs_to_save: | |
| print("\nINFO: No new jobs to save.") | |
| # If the file exists, do nothing. If not, create empty? Or just log. | |
| if not os.path.exists(self.output_filename): | |
| print(f"INFO: No existing file '{self.output_filename}' either. Nothing saved.") | |
| return | |
| print(f"\nINFO: Preparing to save {len(jobs_to_save)} new jobs to '{self.output_filename}'...") | |
| try: | |
| new_df = pd.DataFrame.from_records(jobs_to_save) | |
| # Combine with existing data | |
| if existing_df is not None and not existing_df.empty: | |
| print(f"INFO: Combining with {len(existing_df)} existing jobs.") | |
| # Ensure columns and types | |
| if 'job_id' not in existing_df.columns: existing_df['job_id'] = None | |
| if 'job_id' not in new_df.columns: new_df['job_id'] = None # Should have been added | |
| existing_df['job_id'] = existing_df['job_id'].astype(str) | |
| new_df['job_id'] = new_df['job_id'].astype(str) | |
| # Align columns | |
| all_cols = list(existing_df.columns) + [col for col in new_df.columns if col not in existing_df.columns] | |
| existing_df = existing_df.reindex(columns=all_cols) | |
| new_df = new_df.reindex(columns=all_cols) | |
| combined_df = pd.concat([existing_df, new_df], ignore_index=True) | |
| # Final deduplication based on job_id | |
| combined_df = combined_df.drop_duplicates(subset=['job_id'], keep='last') | |
| print(f"INFO: Final DataFrame size after combining: {len(combined_df)} rows.") | |
| else: | |
| print("INFO: Saving only newly found jobs.") | |
| combined_df = new_df | |
| # Define desired final column order (using internal EN names) | |
| final_cols_order = [ | |
| 'job_id', 'job_title', 'company_name', 'location', | |
| 'date_posted', 'job_url', 'source' | |
| ] | |
| # Filter/Reorder columns present in the final DataFrame | |
| final_df_cols = [col for col in final_cols_order if col in combined_df.columns] | |
| combined_df = combined_df[final_df_cols] | |
| # Rename columns to Portuguese (optional) BEFORE saving | |
| combined_df = combined_df.rename(columns=JOBS_COLUMNS_MAPPER_ENG_PT) | |
| combined_df.to_excel(self.output_filename, index=False, engine='openpyxl') | |
| print(f"INFO: Data successfully saved/updated in '{self.output_filename}'.") | |
| except ImportError: | |
| print("\nERROR: `pandas` or `openpyxl` library not found. Install: pip install pandas openpyxl") | |
| except Exception as e: | |
| print(f"\nERROR: Failed to save data to Excel: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # ============================================================================== | |
| # LinkedInPostScraperPlaywright Class (Only comments/prints translated) | |
| # ============================================================================== | |
| class LinkedInPostScraperPlaywright: | |
| """ | |
| Class to search for job posts on LinkedIn using Playwright, | |
| navigating to a filtered CONTENT search URL and scrolling the page. | |
| Detailed post data extraction is delegated. | |
| """ | |
| DEFAULT_TIMESPAN_POSTS = "past-week" # Values for datePosted: "past-week", "past-24h", "past-month" | |
| DEFAULT_SCROLLS_PER_QUERY = 5 # How many scrolls to attempt per search | |
| DEFAULT_ACTION_DELAY_S = 2 # Delay between actions like scrolling/waiting | |
| DEFAULT_CONNECT_TIMEOUT = 30000 # ms | |
| DEFAULT_NAVIGATION_TIMEOUT = 60000 # ms | |
| def __init__( | |
| self, | |
| search_queries: List[Dict[str, str]], # Dictionaries with 'keywords', optionally 'location' | |
| output_filename: str = "linkedin_post_jobs.xlsx", | |
| timespan: str = DEFAULT_TIMESPAN_POSTS, | |
| scrolls_per_query: int = DEFAULT_SCROLLS_PER_QUERY, | |
| action_delay_s: int = DEFAULT_ACTION_DELAY_S, | |
| debug_port: int = 9222 | |
| ): | |
| """Initializes the post scraper.""" | |
| self.search_queries = search_queries | |
| self.output_filename = output_filename | |
| # Filters are applied in the URL, no additional internal filters by default | |
| self.timespan = timespan | |
| self.scrolls_per_query = scrolls_per_query | |
| self.action_delay_s = action_delay_s | |
| self.debug_port = debug_port | |
| self.playwright: Optional[Playwright] = None | |
| self.browser: Optional[Browser] = None | |
| self.page: Optional[Page] = None | |
| print(f"INFO: LinkedInPostScraperPlaywright initialized to save in '{self.output_filename}'.") | |
| print(f"INFO: Will connect on port {self.debug_port}. Queries: {len(self.search_queries)}, Scrolls/Query: {self.scrolls_per_query}, Timespan: {self.timespan}") | |
| async def connect(self): | |
| """ | |
| Connects to the existing browser via CDP. Tries to reuse an existing | |
| LinkedIn tab or creates a new tab if none is found. | |
| """ | |
| # --- Case 1: Already connected and with a valid page --- | |
| if self.browser and self.browser.is_connected(): | |
| # Check if the current page is still valid and is LinkedIn | |
| if self.page and not self.page.is_closed() and 'linkedin.com' in self.page.url: | |
| print(f"INFO: Playwright already connected and reusing existing LinkedIn page: {self.page.url}") | |
| return True | |
| # If current page is invalid or not LinkedIn, try to find/create a new one | |
| else: | |
| print("INFO: Existing Playwright connection, but current page invalid or not LinkedIn. Searching/creating a new one...") | |
| # Logic to find/create page (see below) | |
| # --- Case 2: Trying to connect for the first time or reconnect --- | |
| print(f"INFO: Connecting to browser on port {self.debug_port} via Playwright...") | |
| try: | |
| # Start Playwright if not already started | |
| if not self.playwright: | |
| self.playwright = await async_playwright().start() | |
| # Connect to Browser via CDP | |
| endpoint_url = f"http://localhost:{self.debug_port}" | |
| self.browser = await self.playwright.chromium.connect_over_cdp(endpoint_url, timeout=self.DEFAULT_CONNECT_TIMEOUT) | |
| print("INFO: Connection with browser established.") | |
| # --- Logic to find or create the page --- | |
| found_linkedin_page = None | |
| # Check existing contexts | |
| if self.browser.contexts: | |
| # Iterate over all contexts and all pages | |
| for context in self.browser.contexts: | |
| for page in context.pages: | |
| # Check if URL contains 'linkedin.com' (safer than just 'linkedin') | |
| # and if the page is not closed | |
| if not page.is_closed() and 'linkedin.com' in page.url: | |
| print(f"INFO: Found existing LinkedIn tab: {page.url}") | |
| found_linkedin_page = page | |
| break # Use the first LinkedIn page found | |
| if found_linkedin_page: | |
| break # Stop searching other contexts if found | |
| else: | |
| print("WARN: No context found in browser. Creating a new one.") | |
| new_context = await self.browser.new_context() | |
| # Doesn't create page yet, lets logic below create if necessary | |
| # Assign the found page or create a new one | |
| if found_linkedin_page: | |
| self.page = found_linkedin_page | |
| else: | |
| print("INFO: No existing LinkedIn tab found. Creating a new tab...") | |
| # Ensure we have a context to create the page in | |
| if not self.browser.contexts: | |
| active_context = await self.browser.new_context() | |
| else: | |
| # Use the first context by default | |
| active_context = self.browser.contexts[0] | |
| self.page = await active_context.new_page() | |
| print(f"INFO: New tab created. Initial URL: {self.page.url}") | |
| # Optional: Navigate to LinkedIn if the new tab isn't useful | |
| # await self.page.goto("https://www.linkedin.com/feed/", wait_until='domcontentloaded') | |
| print(f"INFO: Playwright connected and ready to use page: {self.page.url}") | |
| self.page.set_default_timeout(self.DEFAULT_NAVIGATION_TIMEOUT) | |
| return True | |
| # --- Error Handling --- | |
| except PlaywrightTimeoutError: | |
| print(f"ERROR: Timeout trying to connect to browser on port {self.debug_port}. " | |
| f"Verify Chrome is running with --remote-debugging-port={self.debug_port}") | |
| await self.close() # Try to clean up Playwright | |
| return False | |
| except Exception as e: | |
| # Catch other connection or Playwright errors | |
| print(f"ERROR: Failed to connect or configure page via Playwright: {e}") | |
| import traceback | |
| traceback.print_exc() # Print stacktrace for debug | |
| await self.close() # Try to clean up Playwright | |
| return False | |
| async def close(self): | |
| """Closes the Playwright connection.""" | |
| # (Logic of close() remains the same as previous class) | |
| if self.browser and self.browser.is_connected(): | |
| print("INFO: Closing Playwright connection...") | |
| try: await self.browser.close() # Try to close context/connection | |
| except Exception as e: print(f"WARN: Error closing browser/context: {e}") | |
| self.browser = None | |
| if self.playwright: | |
| try: await self.playwright.stop() | |
| except Exception as e: print(f"WARN: Error stopping playwright: {e}") | |
| self.playwright = None | |
| print("INFO: Playwright connection closed.") | |
| # --- METHOD FOR YOU TO IMPLEMENT EXTRACTION --- | |
| async def _extract_post_data(self, post_locator: Locator) -> Optional[Dict[str, Any]]: | |
| """ | |
| **IMPLEMENT HERE:** Extracts relevant data from a single post Locator. | |
| Args: | |
| post_locator: The Playwright Locator object pointing to the main | |
| element of a post (e.g., a 'div' or 'article'). | |
| Returns: | |
| A dictionary with the extracted data (at least 'post_url') | |
| or None if the post is irrelevant or extraction fails. | |
| Example return: | |
| { | |
| 'post_url': 'https://www.linkedin.com/feed/update/urn:li:activity:...', | |
| 'poster_name': 'Person Name', | |
| 'poster_title': 'Person Title', | |
| 'date_posted_relative': '2d', | |
| 'post_text': 'Full text of the post...', | |
| 'direct_link': 'https://external.link/job' # If found | |
| # Add fields you manage to extract here: | |
| 'job_title_extracted': 'Developer .NET Pleno', | |
| 'company_extracted': 'Fictional Company Name', | |
| 'location_extracted': 'Remote', | |
| 'skills_extracted': ['C#', '.NET', 'SQL'] # Example | |
| } | |
| """ | |
| print("DEBUG: Calling _extract_post_data for a post...") # Debug log | |
| extracted_data = {'source': 'playwright-post'} # Default source | |
| try: | |
| # 1. Get the permanent Post URL (ESSENTIAL) | |
| # This usually involves finding a specific link within the post | |
| # or extracting an attribute like 'data-urn'. Inspect the HTML! | |
| # Example (VERY PROVISIONAL - NEEDS ADJUSTMENT): | |
| urn_attr = await post_locator.get_attribute('data-urn') | |
| if urn_attr and 'activity' in urn_attr: | |
| extracted_data['post_url'] = f"https://www.linkedin.com/feed/update/{urn_attr}" | |
| else: | |
| # Try another way? E.g., timestamp link? | |
| timestamp_link = post_locator.locator("a.feed-shared-timestamp") # Example selector | |
| if await timestamp_link.count() > 0: | |
| href = await timestamp_link.first.get_attribute('href') | |
| if href and 'feed/update/urn:li:activity' in href: | |
| extracted_data['post_url'] = f"https://www.linkedin.com{href.split('?')[0]}" # Clean params | |
| if not extracted_data.get('post_url'): | |
| print("WARN: Could not extract post_url. Ignoring post.") | |
| return None # Post URL is mandatory | |
| # 2. Extract other data (Examples - ADJUST SELECTORS!) | |
| poster_name_loc = post_locator.locator("span.update-components-actor__title > span:first-child > span > span:first-of-type ").first # Example | |
| extracted_data['poster_name'] = (await poster_name_loc.inner_text()).strip() if await poster_name_loc.count() else '' | |
| poster_title_loc = post_locator.locator("span.update-components-actor__description > span:first-of-type").first # Example | |
| extracted_data['poster_title'] = (await poster_title_loc.inner_text()).strip() if await poster_title_loc.count() else '' | |
| date_relative_loc = post_locator.locator("span.update-components-actor__sub-description > span:first-of-type").first # Example | |
| extracted_data['date_posted_relative'] = (await date_relative_loc.inner_text()).split(' • ')[0].strip() if await date_relative_loc.count() else '' | |
| # Expand text (Click "...more" or "see more") | |
| see_more_button = post_locator.locator('button.feed-shared-inline-show-more-text__see-more-less-toggle') # Example selector | |
| if await see_more_button.is_visible(): | |
| print("DEBUG: Clicking 'see more'...") | |
| await see_more_button.click() | |
| await asyncio.sleep(0.5) # Short pause to expand | |
| post_text_loc = post_locator.locator(".feed-shared-update-v2__description") # Example wrapper selector | |
| extracted_data['post_text'] = (await post_text_loc.inner_text()).strip() if await post_text_loc.count() else '' | |
| # --- YOUR DETAILED EXTRACTION LOGIC GOES HERE --- | |
| # Analyze 'post_text' or other elements to find: | |
| # - Job Title ('job_title_extracted') | |
| # - Company Name ('company_extracted') | |
| # - Location ('location_extracted') | |
| # - Direct Link ('direct_link') | |
| # - Skills, etc. | |
| # Fill extracted_data['...'] with what you find. | |
| # Simple example (for illustration only): | |
| # if extracted_data.get('post_text'): | |
| # if ".net pleno" in extracted_data['post_text'].lower(): | |
| # extracted_data['job_title_extracted'] = ".NET Pleno (Detected)" | |
| # if "empresa x" in extracted_data['post_text'].lower(): | |
| # extracted_data['company_extracted'] = "Company X" | |
| print(f"DEBUG: Extracted data (partial): { {k:v for k,v in extracted_data.items() if k != 'post_text'} }") # Log without large text | |
| return extracted_data | |
| except PlaywrightTimeoutError: | |
| print(f"PW WARN: Timeout while extracting data from a post.") | |
| return None | |
| except PlaywrightError as e: | |
| print(f"PW WARN: Playwright error while extracting data from a post: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"PW ERROR: Unexpected error parsing post with Playwright: {e}") | |
| return None | |
| # --- Public Async Methods --- | |
| async def fetch_raw_posts(self) -> List[Dict[str, Any]]: | |
| """Fetches posts from all queries, navigates, and scrolls the page.""" | |
| if not self.page or self.page.is_closed(): | |
| print("ERROR: Playwright page not available. Cannot fetch posts.") | |
| return [] | |
| print("\nINFO: --- Starting Post Fetching (Playwright) ---") | |
| all_extracted_posts = [] | |
| processed_post_urls_this_run = set() | |
| for query_idx, query in enumerate(self.search_queries): | |
| kw = query['keywords'] | |
| found_posts_total_query = 0 | |
| # Location is not a standard filter in Content/Posts search via URL | |
| # Can be included in keywords: e.g., ".net AND remote AND Brazil" | |
| print(f"\nPW INFO: Processing Query {query_idx+1}/{len(self.search_queries)}: Keywords='{kw}'") | |
| # Build CONTENT (posts) search URL | |
| params = { | |
| 'keywords': kw, | |
| 'origin': 'GLOBAL_SEARCH_HEADER', # Common origin | |
| # Apply filters directly in URL | |
| 'datePosted': f'"{self.timespan}"' # Format with quotes inside | |
| # Other possible filters: network=["F"] (1st connections), etc. | |
| } | |
| base_url = "https://www.linkedin.com/search/results/content/" | |
| search_url = f"{base_url}?{urlencode(params, quote_via=quote)}" | |
| try: | |
| print(f"PW INFO: Navigating to post search URL: {search_url}") | |
| await self.page.goto(search_url, wait_until='domcontentloaded') | |
| await asyncio.sleep(self.action_delay_s + 3) # Longer initial wait | |
| # --- Scroll and Extract Loop --- | |
| consecutive_scrolls_without_new = 0 | |
| for scroll_attempt in range(self.scrolls_per_query): | |
| print(f"PW INFO: Scroll attempt {scroll_attempt + 1}/{self.scrolls_per_query} for query '{kw}'...") | |
| # Locator for posts in the results feed | |
| # !!! CRITICAL SELECTOR - NEEDS UPDATE !!! | |
| # Inspect the post search results page when logged in | |
| post_card_selector = "div.feed-shared-update-v2" # Common example, could be article, etc. | |
| # post_card_selector = "div[data-urn*='urn:li:activity:']" # Attribute-based alternative | |
| card_locators = self.page.locator(post_card_selector) | |
| count = await card_locators.count() | |
| print(f"PW INFO: Found {count} potential post elements on screen.") | |
| if count == 0 and scroll_attempt > 0: # If nothing found after scrolling | |
| print("PW WARN: No post elements found after scroll.") | |
| consecutive_scrolls_without_new +=1 | |
| if consecutive_scrolls_without_new >= 2: break # Stop if 2 scrolls find NOTHING | |
| # Continue to next scroll anyway? Or break? Test. | |
| found_this_scroll = 0 | |
| current_posts_on_screen = await card_locators.all() # Get all current locators | |
| for card_locator in current_posts_on_screen: | |
| # Call the extraction method YOU will implement | |
| post_data = await self._extract_post_data(card_locator) | |
| # Validate and check duplicates BEFORE adding | |
| if post_data and post_data.get('post_url'): | |
| if post_data['post_url'] not in processed_post_urls_this_run: | |
| all_extracted_posts.append(post_data) | |
| processed_post_urls_this_run.add(post_data['post_url']) | |
| found_this_scroll += 1 | |
| # else: print(f"DEBUG: post_url {post_data['post_url']} already processed in this run.") # Verbose | |
| # else: print("DEBUG: _extract_post_data returned None or missing post_url.") # Verbose | |
| print(f"PW INFO: Extracted {found_this_scroll} new, valid posts in this scroll.") | |
| if found_this_scroll == 0 and count > 0: # Scrolled, had posts, but none were new/valid | |
| consecutive_scrolls_without_new += 1 | |
| else: | |
| consecutive_scrolls_without_new = 0 # Reset counter if new found | |
| found_posts_total_query += found_this_scroll | |
| # Stop if not finding new posts for a few scrolls | |
| if consecutive_scrolls_without_new >= 3: # Stop after 3 scrolls finding NOTHING new | |
| print("PW INFO: Stopping scroll for this query as no new posts were found in 3 attempts.") | |
| break | |
| # Scroll down the page | |
| print("PW INFO: Scrolling down...") | |
| await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| await asyncio.sleep(self.action_delay_s + 2) # Wait for load | |
| except PlaywrightTimeoutError: print(f"PW ERROR: Timeout during navigation or interaction for query '{kw}'.") | |
| except PlaywrightError as e: print(f"PW ERROR: Playwright error for query '{kw}': {e}") | |
| except Exception as e: print(f"PW ERROR: Unexpected error during query '{kw}': {e}") | |
| print(f"PW INFO: Finished query '{kw}'. Found {found_posts_total_query} total new valid posts.") | |
| await asyncio.sleep(self.action_delay_s) # Small delay between queries | |
| print(f"\nPW INFO: Post fetching complete. Extracted {len(all_extracted_posts)} total posts.") | |
| return all_extracted_posts | |
| # Methods load_existing_data, deduplicate_and_prepare, and save_results | |
| # need to be adapted to use 'post_id' generated from extracted data | |
| # and the desired final column names. | |
| def load_existing_data(self) -> Tuple[pd.DataFrame, Set[str]]: | |
| """Loads data from the existing Excel file (focused on post_id).""" | |
| # (Logic similar to previous, but looks for 'ID Unico' or 'post_id') | |
| existing_ids = set() | |
| existing_df = pd.DataFrame() | |
| if os.path.exists(self.output_filename): | |
| print(f"INFO: Loading existing data from '{self.output_filename}'...") | |
| try: | |
| existing_df = pd.read_excel(self.output_filename) | |
| # Try reading with PT column name first, then EN as fallback | |
| id_col_name_pt = JOBS_COLUMNS_MAPPER_ENG_PT.get('post_id','ID Unico') # PT name | |
| id_col_name_en = 'post_id' | |
| id_col_to_use = None | |
| if id_col_name_pt in existing_df.columns: id_col_to_use = id_col_name_pt | |
| elif id_col_name_en in existing_df.columns: id_col_to_use = id_col_name_en | |
| if id_col_to_use: | |
| existing_ids = set(existing_df[id_col_to_use].dropna().astype(str).unique()) | |
| print(f"INFO: Found {len(existing_ids)} existing post IDs.") | |
| # Rename columns to EN internally if they were PT | |
| if id_col_to_use == id_col_name_pt: | |
| existing_df = existing_df.rename(columns=JOBS_COLUMNS_MAPPER_PT_ENG) | |
| else: | |
| print(f"WARN: ID column ('{id_col_name_pt}' or '{id_col_name_en}') not found.") | |
| except Exception as e: print(f"WARN: Could not load/parse '{self.output_filename}'. Error: {e}"); existing_df = pd.DataFrame() | |
| else: print(f"INFO: No existing file '{self.output_filename}' found.") | |
| return existing_df, existing_ids | |
| def deduplicate_and_prepare(self, fetched_posts: List[Dict], existing_ids: Set[str]) -> List[Dict]: | |
| """Generates hash (post_id) and removes duplicates against existing IDs.""" | |
| # Internal filters (title_exclude, etc.) DON'T apply directly here, | |
| # as we extracted general posts. Filtering is done in the initial query. | |
| # We could add post text filters here if desired. | |
| new_unique_posts = [] | |
| print(f"PW INFO: Deduplicating {len(fetched_posts)} extracted posts against {len(existing_ids)} existing IDs...") | |
| for post in fetched_posts: | |
| # Generate hash based on POST URL | |
| # Ensure post_url exists (should have been validated in _extract_post_data) | |
| if not post.get('post_url'): continue | |
| # Use post_url and maybe poster_name for the hash | |
| post_hash = generate_job_hash(identifier=post['post_url'], company=post.get('poster_name'), title=post.get('poster_title')) | |
| if post_hash not in existing_ids: | |
| post['post_id'] = post_hash # Add the generated ID | |
| new_unique_posts.append(post) | |
| existing_ids.add(post_hash) | |
| # else: print(f"DEBUG: Skipping duplicate post: {post.get('post_url')}") | |
| print(f"PW INFO: Found {len(new_unique_posts)} new unique posts after deduplication.") | |
| return new_unique_posts | |
| def save_results(self, posts_to_save: List[Dict], existing_df: pd.DataFrame): | |
| """Combines new posts with existing ones and saves to Excel.""" | |
| if not posts_to_save: | |
| print("\nINFO: No new posts to save.") | |
| if not os.path.exists(self.output_filename): print(f"INFO: No existing file '{self.output_filename}'.") | |
| return | |
| print(f"\nINFO: Preparing to save {len(posts_to_save)} new posts to '{self.output_filename}'...") | |
| try: | |
| new_df = pd.DataFrame.from_records(posts_to_save) | |
| # Combine with existing | |
| if existing_df is not None and not existing_df.empty: | |
| print(f"INFO: Combining with {len(existing_df)} existing posts.") | |
| # Ensure 'post_id' column and string type | |
| id_col_en = 'post_id' | |
| if id_col_en not in existing_df.columns: existing_df[id_col_en] = None | |
| if id_col_en not in new_df.columns: new_df[id_col_en] = None # Should have been added | |
| existing_df[id_col_en] = existing_df[id_col_en].astype(str) | |
| new_df[id_col_en] = new_df[id_col_en].astype(str) | |
| # Align columns (using internal EN names) | |
| all_cols_en = list(existing_df.columns) + [col for col in new_df.columns if col not in existing_df.columns] | |
| existing_df = existing_df.reindex(columns=all_cols_en) | |
| new_df = new_df.reindex(columns=all_cols_en) | |
| combined_df = pd.concat([existing_df, new_df], ignore_index=True) | |
| # Deduplicate by 'post_id' | |
| combined_df = combined_df.drop_duplicates(subset=[id_col_en], keep='last') | |
| print(f"INFO: Final DataFrame size after combining: {len(combined_df)} rows.") | |
| else: | |
| print("INFO: Saving only newly found posts.") | |
| combined_df = new_df | |
| # Define final desired column order (internal EN names) | |
| final_cols_order_en = [ | |
| 'post_id', 'post_url', 'poster_name', 'poster_title', | |
| 'date_posted_relative', 'post_text', 'direct_link', | |
| 'job_title_extracted', 'company_extracted', 'location_extracted', | |
| 'skills_extracted', # Add other extracted fields | |
| 'source' | |
| ] | |
| # Filter and reorder based on columns actually present | |
| final_df_cols_en = [col for col in final_cols_order_en if col in combined_df.columns] | |
| combined_df_en = combined_df[final_df_cols_en] | |
| # Rename columns to PT BEFORE saving | |
| combined_df_pt = combined_df_en.rename(columns=JOBS_COLUMNS_MAPPER_ENG_PT) # Use global mapper | |
| combined_df_pt.to_excel(self.output_filename, index=False, engine='openpyxl') | |
| print(f"INFO: Data successfully saved/updated in '{self.output_filename}'.") | |
| except ImportError: print("\nERROR: `pandas` or `openpyxl` library not found.") | |
| except Exception as e: print(f"\nERROR: Failed to save to Excel: {e}"); import traceback; traceback.print_exc() | |
| # ============================================================================== | |
| # Async Example Usage for the Post Scraper Class | |
| # ============================================================================== | |
| async def run_playwright_post_scraper(): | |
| """Async function to demonstrate usage of the Playwright Post Scraper class.""" | |
| print("Executing example with LinkedInPostScraperPlaywright...") | |
| # Dependencies check | |
| try: | |
| import pandas | |
| import openpyxl | |
| from playwright.async_api import async_playwright | |
| except ImportError as e: | |
| print(f"FATAL ERROR: Missing required library: {e.name}.") | |
| print("Please install dependencies: pip install pandas openpyxl python-dotenv playwright") | |
| print("And install browser binaries: playwright install chromium") | |
| exit() | |
| print("INFO: Preparing to run the POST scraper with Playwright...") | |
| print(f"INFO: Make sure Chrome is running with: --remote-debugging-port={9222}") # Hardcoded port for example | |
| print("INFO: And that you are LOGGED IN to LinkedIn in that Chrome instance.") | |
| input("Press Enter when Chrome is ready...") | |
| load_dotenv() # Load .env if present | |
| # 1. Define Configuration | |
| config_posts = { | |
| "search_queries": [ | |
| {"keywords": ".net AND pleno AND (remoto OR \"home office\")"}, | |
| # Add your POST search queries here | |
| ], | |
| "output_filename": "linkedin_job_posts_output.xlsx", # Output file name | |
| "timespan": "past-week", # URL date filter | |
| "scrolls_per_query": 4, # Try 4 scrolls | |
| "action_delay_s": 5, # Longer delay between scrolls/waits | |
| "debug_port": 9222 | |
| } | |
| # 2. Instantiate and Connect | |
| scraper_posts = LinkedInPostScraperPlaywright(**config_posts) | |
| connected = await scraper_posts.connect() | |
| if not connected: | |
| print("FATAL ERROR: Could not connect to browser via Playwright. Exiting.") | |
| return | |
| # 3. Execute Steps | |
| try: | |
| existing_df, existing_ids = scraper_posts.load_existing_data() | |
| # fetch_raw_posts is async | |
| raw_posts = await scraper_posts.fetch_raw_posts() | |
| # deduplicate and save are sync | |
| new_unique_posts = scraper_posts.deduplicate_and_prepare(raw_posts, existing_ids) | |
| scraper_posts.save_results(new_unique_posts, existing_df) | |
| except Exception as e: | |
| print(f"ERROR: An error occurred during the post scraper execution: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| # Ensure connection is closed | |
| await scraper_posts.close() | |
| print("\n--- Playwright Posts Example Finished ---") | |
| # ============================================================================== | |
| # Main Entry Point (if executed as script) - Now runs Playwright example | |
| # ============================================================================== | |
| # Keep the old scraper class if you want to switch between them | |
| def run_api_scraper(): | |
| TIMESPAN_PASTWEEK= "r604800" | |
| TIMESPAN_PAST24HOURS= "r86400" | |
| TIMESPAN_PASTMONTH= "r2592000" | |
| config = { | |
| "search_queries": [ | |
| {"keywords": ".net AND pleno", "location": "Brazil", "f_WT": ""}, # Remoto | |
| ], | |
| "output_filename": "linkedin_scraper_jobs.xlsx", | |
| "title_exclude": ["frontend", "front end", "manager", "lead"], | |
| "company_exclude": ["recruiter", "consulting xyz"], | |
| "timespan": TIMESPAN_PASTMONTH, # Past Week | |
| "pages_to_scrape": 10, # Search for 10 pages per query | |
| "request_delay": 5 | |
| } | |
| scraper = LinkedInScraper(**config) | |
| existing_df, existing_ids = scraper.load_existing_data() | |
| raw_jobs = scraper.fetch_raw_jobs() | |
| new_unique_jobs = scraper.deduplicate_and_prepare(raw_jobs, existing_ids) | |
| scraper.save_results(new_unique_jobs, existing_df) | |
| print("\n--- Script Example Finished ---") | |
| if __name__ == '__main__': | |
| print("Executing the main script...") | |
| # Choose which scraper to run by default | |
| run_api_scraper() # Run the old requests-based scraper | |
| # Or run the new Playwright-based post scraper | |
| asyncio.run(run_playwright_post_scraper()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
LinkedIn Job Scraper (API & Posts via Playwright)
This Python script provides two methods for scraping job listings from LinkedIn:
LinkedInScraper(API-based): UsesrequestsandBeautifulSoupto fetch job listings from an unofficial LinkedIn guest API endpoint (/jobs-guest/jobs/api/seeMoreJobPostings/search). This method does not require a LinkedIn login but relies on an unofficial API that might change or become unavailable. It primarily fetches structured job listing data.LinkedInPostScraperPlaywright(Post-based): Usesplaywrightto connect to an existing, logged-in Chrome browser instance running with remote debugging enabled. It navigates to the LinkedIn post search results page (content feed), scrolls down to load more posts, and provides a framework (_extract_post_datamethod) for you to implement the logic to extract job details directly from user posts. This method is necessary for finding jobs advertised within posts rather than formal listings and requires an active LinkedIn session.Both methods include filtering, deduplication against previously saved results in an Excel file, and saving the new unique jobs found.
Features
job_idorpost_id).LinkedInPostScraperPlaywrightclass has a clearly marked method (_extract_post_data) for you to customize how data is extracted from individual posts..xlsxfile using pandas.Requirements
pandasopenpyxl(for Excel read/write)requests(forLinkedInScraper)beautifulsoup4(forLinkedInScraper)python-dotenv(optional, for loading environment variables)playwright(forLinkedInPostScraperPlaywright)langdetect(optional, used by originalLinkedInScrapercode but not active in current version)Installation
main.pyscript.playwright install chromium # Or install all: playwright installConfiguration
Modify the configuration variables directly within the
main.pyscript, primarily in these sections:Global Configuration:
OUTPUT_FILENAME: Change the base name if desired (separate files are used in the exampleif __name__ == '__main__'block).LinkedInScraper(API-based) Configuration: Inside therun_api_scraper()function (withinif __name__ == '__main__':):config["search_queries"]: A list of dictionaries. Each dict needs'keywords'and'location'.'f_WT'is optional (Job Type:""=Any,"1"=On-site,"2"=Remote,"3"=Hybrid).config["output_filename"]: Specific output file for this scraper.config["title_exclude"],config["company_exclude"],config["title_include"]: Lists of strings for filtering.config["timespan"]: Time filter for the API (e.g.,"r604800"for Past Week,"r86400"for Past 24h,"r2592000"for Past Month).config["pages_to_scrape"]: How many pages (of ~25 results) to fetch per query.config["request_delay"]: Base delay (seconds) between requests.LinkedInPostScraperPlaywright(Post-based) Configuration: Inside therun_playwright_post_scraper()function (withinif __name__ == '__main__':):config_posts["search_queries"]: List of dictionaries, primarily using'keywords'. Location filtering is less effective here and often included in the keywords (e.g.,"python developer remote brazil").config_posts["output_filename"]: Specific output file for this scraper.config_posts["timespan"]: Time filter applied to the URL ("past-week","past-24h","past-month").config_posts["scrolls_per_query"]: How many times to scroll down the results page per query.config_posts["action_delay_s"]: Delay (seconds) between Playwright actions (scrolling, waiting).config_posts["debug_port"]: Must match the port Chrome is running with for remote debugging (default is 9222).Headers & Proxies (Optional):
DEFAULT_HEADERSinside theLinkedInScraperclass or pass customheadersduring instantiation if needed.proxiesdictionary if required.Usage
Running the Scrapers
The script currently runs both scrapers sequentially when executed directly:
It will first run
run_api_scraper()and thenrun_playwright_post_scraper(). You can comment out one of the calls in theif __name__ == '__main__':block if you only want to run one of them.Running the Playwright Post Scraper (
LinkedInPostScraperPlaywright) - IMPORTANT STEPS:google-chrome --remote-debugging-port=9222(Adjust path/command if needed)/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222"C:\Program Files\Google\Chrome\Application\chrome.exe" --remote-debugging-port=9222(Adjust path if needed)debug_portconfiguration in the script.python main.py._extract_post_datamethod. Results will be deduplicated and saved to the configured Excel file.Customizing Post Extraction (
_extract_post_data)The core of the
LinkedInPostScraperPlaywrightrequires your customization. You need to edit the_extract_post_datamethod within the class:post_card_selectorinfetch_raw_postsneeds checking too).data-urnattribute). This is mandatory._extract_post_data: Replace the example selectors and extraction logic within the_extract_post_datamethod with the correct Playwrightlocator()calls and data extraction methods (.inner_text(),.get_attribute(), etc.) based on the selectors you found. Extract the desired fields into theextracted_datadictionary.Notes
/jobs-guest/API endpoint used byLinkedInScraperis unofficial and may stop working without notice.request_delay,action_delay_s) and consider using proxies if scraping heavily. Running the Playwright scraper requires a logged-in session, increasing the risk if automation is detected. Use responsibly and at your own risk.