Skip to content

Instantly share code, notes, and snippets.

@pedrohenriquebr
Last active April 18, 2025 18:18
Show Gist options
  • Select an option

  • Save pedrohenriquebr/2856254338fe473178a66053e027db97 to your computer and use it in GitHub Desktop.

Select an option

Save pedrohenriquebr/2856254338fe473178a66053e027db97 to your computer and use it in GitHub Desktop.
Webscrapper for linkedin
# ==============================================================================
# Necessary Imports
# ==============================================================================
import asyncio
import hashlib
import os
import time as tm # For scraper delays
import json # To load config if needed in the future
from typing import List, Optional, Dict, Any, Set, Tuple
from urllib.parse import quote, urlencode
import pandas as pd
import requests
from bs4 import BeautifulSoup
from langdetect import detect, LangDetectException # Optional
from dotenv import load_dotenv
from playwright.async_api import async_playwright, Playwright, Browser, Page, Locator, Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError
# YOU CANGE HERE TO YOUR TARGET LANGUAGE
JOBS_COLUMNS_MAPPER_ENG_PT = {
'company_name':'Empresa',
'job_title': 'Titulo da Vaga',
'job_url': 'Link da Vaga',
'source': 'Fonte',
'location': 'Localizacao',
'date_posted': 'Data Postagem',
'job_id': 'ID Unico'
}
JOBS_COLUMNS_MAPPER_PT_ENG = {JOBS_COLUMNS_MAPPER_ENG_PT[key]: key for key in JOBS_COLUMNS_MAPPER_ENG_PT.keys()}
def generate_job_hash(identifier: str, company: Optional[str] = None, title: Optional[str] = None) -> str:
"""Generates a SHA256 hash to uniquely identify a job (based on the job URL)."""
# Uses identifier (job_url) as base, adds others to reduce collisions (rare)
raw_id = f"{identifier}-{company}-{title}".lower()
return hashlib.sha256(raw_id.encode()).hexdigest()
# ==============================================================================
# LinkedInScraper Class
# ==============================================================================
class LinkedInScraper:
"""
Class to search, filter, and save job listings from LinkedIn
using an unofficial guest API.
"""
# Default values that can be overridden in __init__
DEFAULT_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.9,pt-BR;q=0.8,pt;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
}
DEFAULT_TIMESPAN = "r604800" # Past Week
DEFAULT_PAGES_TO_SCRAPE = 3
DEFAULT_REQUEST_RETRIES = 3
DEFAULT_REQUEST_DELAY = 4 # Increase the default delay slightly
def __init__(
self,
search_queries: List[Dict[str, str]],
output_filename: str = "linkedin_scraper_jobs.xlsx",
headers: Optional[Dict[str, str]] = None,
proxies: Optional[Dict[str, str]] = None,
title_exclude: Optional[List[str]] = None,
title_include: Optional[List[str]] = None,
company_exclude: Optional[List[str]] = None,
timespan: str = DEFAULT_TIMESPAN,
pages_to_scrape: int = DEFAULT_PAGES_TO_SCRAPE,
request_retries: int = DEFAULT_REQUEST_RETRIES,
request_delay: int = DEFAULT_REQUEST_DELAY
):
"""
Initializes the scraper with configurations.
Args:
search_queries: List of dictionaries, each with 'keywords' and 'location' (and optionally 'f_WT').
output_filename: Name of the Excel file to save/read data.
headers: HTTP Headers for the requests. Uses DEFAULT_HEADERS if None.
proxies: Proxies for the requests.
title_exclude: List of keywords to exclude jobs by title.
title_include: List of keywords that MUST be in the title.
company_exclude: List of companies to exclude.
timespan: Time filter (e.g., "r604800" for Past Week).
pages_to_scrape: How many pages of results to fetch per query.
request_retries: Number of attempts for each HTTP request.
request_delay: Base delay (in seconds) between requests/attempts.
"""
self.search_queries = search_queries
self.output_filename = output_filename
self.headers = headers if headers is not None else self.DEFAULT_HEADERS
self.proxies = proxies if proxies is not None else {}
self.title_exclude = title_exclude if title_exclude is not None else []
self.title_include = title_include if title_include is not None else []
self.company_exclude = company_exclude if company_exclude is not None else []
self.timespan = timespan
self.pages_to_scrape = pages_to_scrape
self.request_retries = request_retries
self.request_delay = request_delay
print(f"INFO: LinkedInScraper initialized to save in '{self.output_filename}'.")
print(f"INFO: Queries: {len(self.search_queries)}, Pages/Query: {self.pages_to_scrape}, Timespan: {self.timespan}")
# --- Private Methods (Internal Logic) ---
def _make_request(self, url: str) -> Optional[BeautifulSoup]:
"""Performs the GET request with retries and returns the BeautifulSoup object."""
print(f"SCRAPER: Requesting URL: {url}")
current_delay = self.request_delay
for i in range(self.request_retries):
try:
tm.sleep(current_delay + (i * 0.5)) # Delay before attempt
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=20) # Longer timeout
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
return soup
except requests.exceptions.Timeout:
print(f"SCRAPER WARN: Timeout for URL: {url}, retry {i+1}/{self.request_retries}...")
except requests.exceptions.RequestException as e:
status_code = e.response.status_code if e.response is not None else 'N/A'
print(f"SCRAPER ERROR: Request failed: Status {status_code}, URL: {url}, Error: {e}, retry {i+1}/{self.request_retries}...")
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 429:
print("SCRAPER WARN: Rate limited (429). Increasing delay.")
current_delay *= 1.5 # Increase delay if rate limited
except Exception as e:
print(f"SCRAPER ERROR: Unexpected error during request: {e}")
return None # Don't retry
print(f"SCRAPER ERROR: Failed to retrieve URL after {self.request_retries} retries: {url}")
return None
def _parse_job_card(self, card: Any) -> Optional[Dict[str, Any]]:
"""Extracts data from a single job card (HTML element)."""
# (Logic of _parse_job_card remains the same as transform_api_job_card,
# but now it's a class method. It could access self if needed, but doesn't here)
try:
job_div = card.find('div', class_='base-search-card')
if not job_div:
job_div = card.find('div', class_='job-search-card')
if not job_div: return None
title_tag = job_div.find(['h3', 'span'], class_='base-search-card__title') # Try h3 or span
company_tag = job_div.find(['h4', 'a'], class_='base-search-card__subtitle') # Try h4 or a
location_tag = job_div.find('span', class_='job-search-card__location')
date_tag = job_div.find('time', class_='job-search-card__listdate') or job_div.find('time', class_='job-search-card__listdate--new')
link_tag = job_div.find('a', class_='base-card__full-link') # Main card link
job_url = None
job_posting_id = None # Initialize
if link_tag and 'href' in link_tag.attrs:
url_raw = link_tag['href']
if '/jobs/view/' in url_raw:
job_url = url_raw.split('?')[0] # Get base job URL
try: job_posting_id = job_url.split('/jobs/view/')[1].split('/')[0]
except: pass
if not job_url: # Try getting URL via data-entity-urn if link failed
entity_urn = job_div.get('data-entity-urn') or card.get('data-entity-urn') # Try on div or li
if entity_urn and 'jobPosting' in entity_urn:
try:
job_posting_id = entity_urn.split(':')[-1]
job_url = f'https://www.linkedin.com/jobs/view/{job_posting_id}/'
except: pass
if not title_tag or not job_url: return None # Title and Job URL are required
# Text cleanup
title = title_tag.text.strip()
company_raw = company_tag.find('a') if company_tag and company_tag.find('a') else company_tag
company = company_raw.text.strip().replace('\n', ' ') if company_raw else None
location = location_tag.text.strip() if location_tag else None
date = date_tag['datetime'] if date_tag and 'datetime' in date_tag.attrs else None
# Return dictionary with consistent key names
return {
'job_title': title,
'company_name': company,
'location': location,
'date_posted': date,
'job_url': job_url, # JOB URL
'source': 'scraper'
}
except Exception as e:
# print(f"SCRAPER WARN: Error parsing job card: {e}") # Can be too verbose
return None
def _filter_jobs(self, joblist: List[Dict]) -> List[Dict]:
"""Filters the list of jobs based on instance criteria."""
if not joblist: return []
original_count = len(joblist)
print(f"SCRAPER: Filtering {original_count} raw jobs...")
# Jobs should already have the correct keys ('job_title', 'company_name') from _parse_job_card
if self.title_exclude:
joblist = [job for job in joblist if not any(word.lower() in (job.get('job_title') or '').lower() for word in self.title_exclude)]
if self.title_include:
joblist = [job for job in joblist if any(word.lower() in (job.get('job_title') or '').lower() for word in self.title_include)]
if self.company_exclude:
joblist = [job for job in joblist if not any(word.lower() in (job.get('company_name') or '').lower() for word in self.company_exclude)]
print(f"SCRAPER: Filtering complete. {len(joblist)} jobs remaining from {original_count}.")
return joblist
# --- Public Methods (Interface for User/Notebook) ---
def fetch_raw_jobs(self) -> List[Dict[str, Any]]:
"""Fetches jobs from all configured queries and pages."""
print("\nINFO: --- Starting Job Fetching ---")
all_raw_jobs = []
processed_urls_this_run = set()
for query in self.search_queries:
kw = query['keywords']
loc = query['location']
f_wt = query.get('f_WT', '') # Default to any type
print(f"\nSCRAPER: Processing Query: Keywords='{kw}', Location='{loc}', Type='{f_wt}'")
kw_encoded = quote(kw)
loc_encoded = quote(loc)
for i in range(self.pages_to_scrape):
start_index = 25 * i
url = (
f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search"
f"?keywords={kw_encoded}"
f"&location={loc_encoded}"
f"&f_WT={f_wt}"
f"&f_TPR={self.timespan}"
f"&start={start_index}"
)
soup = self._make_request(url) # Use internal request method
if soup:
# Extract cards based on the returned HTML
# The API returns HTML, so we use find_all
job_cards = soup.find_all('li') # Try 'li' first
if not job_cards:
job_cards = soup.find_all('div', class_='job-search-card') # Fallback
if not job_cards:
print(f"SCRAPER WARN: No job card elements (li or div.job-search-card) found on page {i} for query '{kw}'. Content might have changed.")
# Not necessarily stopping the query, could be an empty page at the end
continue # Try next page
found_on_page = 0
for card in job_cards:
job_data = self._parse_job_card(card) # Use internal parse method
if job_data and job_data.get('job_url') and job_data['job_url'] not in processed_urls_this_run:
all_raw_jobs.append(job_data)
processed_urls_this_run.add(job_data['job_url'])
found_on_page += 1
print(f"SCRAPER: Found {found_on_page} new job(s) on page {i}.")
else:
print(f"SCRAPER WARN: Failed to fetch or parse page {i} for query '{kw}'. Stopping this query.")
break # Stop current query if a page fails completely
print(f"\nSCRAPER: Fetching complete. Found {len(all_raw_jobs)} raw job listings total.")
return all_raw_jobs
def load_existing_data(self) -> Tuple[pd.DataFrame, Set[str]]:
"""Loads data from the existing Excel file and returns DataFrame and set of IDs."""
existing_ids = set()
existing_df = pd.DataFrame()
if os.path.exists(self.output_filename):
print(f"INFO: Loading existing data from '{self.output_filename}'...")
try:
existing_df = pd.read_excel(self.output_filename)
# Rename PT columns to EN internally for consistent processing
existing_df = existing_df.rename(columns=JOBS_COLUMNS_MAPPER_PT_ENG)
if 'job_id' in existing_df.columns:
# Handle potential NaNs before converting to string
existing_ids = set(existing_df['job_id'].dropna().astype(str).unique())
print(f"INFO: Found {len(existing_ids)} existing job IDs.")
else:
print(f"WARN: 'job_id' column not found in '{self.output_filename}'.")
except Exception as e:
print(f"WARN: Could not load/parse '{self.output_filename}'. Error: {e}")
existing_df = pd.DataFrame() # Reset on error
else:
print(f"INFO: No existing file '{self.output_filename}' found.")
return existing_df, existing_ids
def deduplicate_and_prepare(self, fetched_jobs: List[Dict], existing_ids: Set[str]) -> List[Dict]:
"""Filters jobs, generates hash, and removes duplicates against existing IDs."""
filtered_jobs = self._filter_jobs(fetched_jobs) # Use internal filter method
new_unique_jobs = []
print(f"SCRAPER: Deduplicating {len(filtered_jobs)} filtered jobs against {len(existing_ids)} existing IDs...")
for job in filtered_jobs:
# HERE IS THE CALL THAT CAUSED THE PREVIOUS ERROR: Uses the global function defined BEFORE the class
job_hash = generate_job_hash(identifier=job['job_url'], company=job.get('company_name'), title=job.get('job_title'))
if job_hash not in existing_ids:
job['job_id'] = job_hash # Add hash to the dictionary
new_unique_jobs.append(job)
existing_ids.add(job_hash) # Also add to the set to avoid internal duplicates
# else: # Optional: Log found duplicate
# print(f" SCRAPER: Skipping duplicate job: {job.get('job_title','N/A')}")
print(f"SCRAPER: Found {len(new_unique_jobs)} new unique jobs after deduplication.")
return new_unique_jobs
def save_results(self, jobs_to_save: List[Dict], existing_df: pd.DataFrame):
"""Combines new jobs with existing ones and saves to the Excel file."""
if not jobs_to_save:
print("\nINFO: No new jobs to save.")
# If the file exists, do nothing. If not, create empty? Or just log.
if not os.path.exists(self.output_filename):
print(f"INFO: No existing file '{self.output_filename}' either. Nothing saved.")
return
print(f"\nINFO: Preparing to save {len(jobs_to_save)} new jobs to '{self.output_filename}'...")
try:
new_df = pd.DataFrame.from_records(jobs_to_save)
# Combine with existing data
if existing_df is not None and not existing_df.empty:
print(f"INFO: Combining with {len(existing_df)} existing jobs.")
# Ensure columns and types
if 'job_id' not in existing_df.columns: existing_df['job_id'] = None
if 'job_id' not in new_df.columns: new_df['job_id'] = None # Should have been added
existing_df['job_id'] = existing_df['job_id'].astype(str)
new_df['job_id'] = new_df['job_id'].astype(str)
# Align columns
all_cols = list(existing_df.columns) + [col for col in new_df.columns if col not in existing_df.columns]
existing_df = existing_df.reindex(columns=all_cols)
new_df = new_df.reindex(columns=all_cols)
combined_df = pd.concat([existing_df, new_df], ignore_index=True)
# Final deduplication based on job_id
combined_df = combined_df.drop_duplicates(subset=['job_id'], keep='last')
print(f"INFO: Final DataFrame size after combining: {len(combined_df)} rows.")
else:
print("INFO: Saving only newly found jobs.")
combined_df = new_df
# Define desired final column order (using internal EN names)
final_cols_order = [
'job_id', 'job_title', 'company_name', 'location',
'date_posted', 'job_url', 'source'
]
# Filter/Reorder columns present in the final DataFrame
final_df_cols = [col for col in final_cols_order if col in combined_df.columns]
combined_df = combined_df[final_df_cols]
# Rename columns to Portuguese (optional) BEFORE saving
combined_df = combined_df.rename(columns=JOBS_COLUMNS_MAPPER_ENG_PT)
combined_df.to_excel(self.output_filename, index=False, engine='openpyxl')
print(f"INFO: Data successfully saved/updated in '{self.output_filename}'.")
except ImportError:
print("\nERROR: `pandas` or `openpyxl` library not found. Install: pip install pandas openpyxl")
except Exception as e:
print(f"\nERROR: Failed to save data to Excel: {e}")
import traceback
traceback.print_exc()
# ==============================================================================
# LinkedInPostScraperPlaywright Class (Only comments/prints translated)
# ==============================================================================
class LinkedInPostScraperPlaywright:
"""
Class to search for job posts on LinkedIn using Playwright,
navigating to a filtered CONTENT search URL and scrolling the page.
Detailed post data extraction is delegated.
"""
DEFAULT_TIMESPAN_POSTS = "past-week" # Values for datePosted: "past-week", "past-24h", "past-month"
DEFAULT_SCROLLS_PER_QUERY = 5 # How many scrolls to attempt per search
DEFAULT_ACTION_DELAY_S = 2 # Delay between actions like scrolling/waiting
DEFAULT_CONNECT_TIMEOUT = 30000 # ms
DEFAULT_NAVIGATION_TIMEOUT = 60000 # ms
def __init__(
self,
search_queries: List[Dict[str, str]], # Dictionaries with 'keywords', optionally 'location'
output_filename: str = "linkedin_post_jobs.xlsx",
timespan: str = DEFAULT_TIMESPAN_POSTS,
scrolls_per_query: int = DEFAULT_SCROLLS_PER_QUERY,
action_delay_s: int = DEFAULT_ACTION_DELAY_S,
debug_port: int = 9222
):
"""Initializes the post scraper."""
self.search_queries = search_queries
self.output_filename = output_filename
# Filters are applied in the URL, no additional internal filters by default
self.timespan = timespan
self.scrolls_per_query = scrolls_per_query
self.action_delay_s = action_delay_s
self.debug_port = debug_port
self.playwright: Optional[Playwright] = None
self.browser: Optional[Browser] = None
self.page: Optional[Page] = None
print(f"INFO: LinkedInPostScraperPlaywright initialized to save in '{self.output_filename}'.")
print(f"INFO: Will connect on port {self.debug_port}. Queries: {len(self.search_queries)}, Scrolls/Query: {self.scrolls_per_query}, Timespan: {self.timespan}")
async def connect(self):
"""
Connects to the existing browser via CDP. Tries to reuse an existing
LinkedIn tab or creates a new tab if none is found.
"""
# --- Case 1: Already connected and with a valid page ---
if self.browser and self.browser.is_connected():
# Check if the current page is still valid and is LinkedIn
if self.page and not self.page.is_closed() and 'linkedin.com' in self.page.url:
print(f"INFO: Playwright already connected and reusing existing LinkedIn page: {self.page.url}")
return True
# If current page is invalid or not LinkedIn, try to find/create a new one
else:
print("INFO: Existing Playwright connection, but current page invalid or not LinkedIn. Searching/creating a new one...")
# Logic to find/create page (see below)
# --- Case 2: Trying to connect for the first time or reconnect ---
print(f"INFO: Connecting to browser on port {self.debug_port} via Playwright...")
try:
# Start Playwright if not already started
if not self.playwright:
self.playwright = await async_playwright().start()
# Connect to Browser via CDP
endpoint_url = f"http://localhost:{self.debug_port}"
self.browser = await self.playwright.chromium.connect_over_cdp(endpoint_url, timeout=self.DEFAULT_CONNECT_TIMEOUT)
print("INFO: Connection with browser established.")
# --- Logic to find or create the page ---
found_linkedin_page = None
# Check existing contexts
if self.browser.contexts:
# Iterate over all contexts and all pages
for context in self.browser.contexts:
for page in context.pages:
# Check if URL contains 'linkedin.com' (safer than just 'linkedin')
# and if the page is not closed
if not page.is_closed() and 'linkedin.com' in page.url:
print(f"INFO: Found existing LinkedIn tab: {page.url}")
found_linkedin_page = page
break # Use the first LinkedIn page found
if found_linkedin_page:
break # Stop searching other contexts if found
else:
print("WARN: No context found in browser. Creating a new one.")
new_context = await self.browser.new_context()
# Doesn't create page yet, lets logic below create if necessary
# Assign the found page or create a new one
if found_linkedin_page:
self.page = found_linkedin_page
else:
print("INFO: No existing LinkedIn tab found. Creating a new tab...")
# Ensure we have a context to create the page in
if not self.browser.contexts:
active_context = await self.browser.new_context()
else:
# Use the first context by default
active_context = self.browser.contexts[0]
self.page = await active_context.new_page()
print(f"INFO: New tab created. Initial URL: {self.page.url}")
# Optional: Navigate to LinkedIn if the new tab isn't useful
# await self.page.goto("https://www.linkedin.com/feed/", wait_until='domcontentloaded')
print(f"INFO: Playwright connected and ready to use page: {self.page.url}")
self.page.set_default_timeout(self.DEFAULT_NAVIGATION_TIMEOUT)
return True
# --- Error Handling ---
except PlaywrightTimeoutError:
print(f"ERROR: Timeout trying to connect to browser on port {self.debug_port}. "
f"Verify Chrome is running with --remote-debugging-port={self.debug_port}")
await self.close() # Try to clean up Playwright
return False
except Exception as e:
# Catch other connection or Playwright errors
print(f"ERROR: Failed to connect or configure page via Playwright: {e}")
import traceback
traceback.print_exc() # Print stacktrace for debug
await self.close() # Try to clean up Playwright
return False
async def close(self):
"""Closes the Playwright connection."""
# (Logic of close() remains the same as previous class)
if self.browser and self.browser.is_connected():
print("INFO: Closing Playwright connection...")
try: await self.browser.close() # Try to close context/connection
except Exception as e: print(f"WARN: Error closing browser/context: {e}")
self.browser = None
if self.playwright:
try: await self.playwright.stop()
except Exception as e: print(f"WARN: Error stopping playwright: {e}")
self.playwright = None
print("INFO: Playwright connection closed.")
# --- METHOD FOR YOU TO IMPLEMENT EXTRACTION ---
async def _extract_post_data(self, post_locator: Locator) -> Optional[Dict[str, Any]]:
"""
**IMPLEMENT HERE:** Extracts relevant data from a single post Locator.
Args:
post_locator: The Playwright Locator object pointing to the main
element of a post (e.g., a 'div' or 'article').
Returns:
A dictionary with the extracted data (at least 'post_url')
or None if the post is irrelevant or extraction fails.
Example return:
{
'post_url': 'https://www.linkedin.com/feed/update/urn:li:activity:...',
'poster_name': 'Person Name',
'poster_title': 'Person Title',
'date_posted_relative': '2d',
'post_text': 'Full text of the post...',
'direct_link': 'https://external.link/job' # If found
# Add fields you manage to extract here:
'job_title_extracted': 'Developer .NET Pleno',
'company_extracted': 'Fictional Company Name',
'location_extracted': 'Remote',
'skills_extracted': ['C#', '.NET', 'SQL'] # Example
}
"""
print("DEBUG: Calling _extract_post_data for a post...") # Debug log
extracted_data = {'source': 'playwright-post'} # Default source
try:
# 1. Get the permanent Post URL (ESSENTIAL)
# This usually involves finding a specific link within the post
# or extracting an attribute like 'data-urn'. Inspect the HTML!
# Example (VERY PROVISIONAL - NEEDS ADJUSTMENT):
urn_attr = await post_locator.get_attribute('data-urn')
if urn_attr and 'activity' in urn_attr:
extracted_data['post_url'] = f"https://www.linkedin.com/feed/update/{urn_attr}"
else:
# Try another way? E.g., timestamp link?
timestamp_link = post_locator.locator("a.feed-shared-timestamp") # Example selector
if await timestamp_link.count() > 0:
href = await timestamp_link.first.get_attribute('href')
if href and 'feed/update/urn:li:activity' in href:
extracted_data['post_url'] = f"https://www.linkedin.com{href.split('?')[0]}" # Clean params
if not extracted_data.get('post_url'):
print("WARN: Could not extract post_url. Ignoring post.")
return None # Post URL is mandatory
# 2. Extract other data (Examples - ADJUST SELECTORS!)
poster_name_loc = post_locator.locator("span.update-components-actor__title > span:first-child > span > span:first-of-type ").first # Example
extracted_data['poster_name'] = (await poster_name_loc.inner_text()).strip() if await poster_name_loc.count() else ''
poster_title_loc = post_locator.locator("span.update-components-actor__description > span:first-of-type").first # Example
extracted_data['poster_title'] = (await poster_title_loc.inner_text()).strip() if await poster_title_loc.count() else ''
date_relative_loc = post_locator.locator("span.update-components-actor__sub-description > span:first-of-type").first # Example
extracted_data['date_posted_relative'] = (await date_relative_loc.inner_text()).split(' • ')[0].strip() if await date_relative_loc.count() else ''
# Expand text (Click "...more" or "see more")
see_more_button = post_locator.locator('button.feed-shared-inline-show-more-text__see-more-less-toggle') # Example selector
if await see_more_button.is_visible():
print("DEBUG: Clicking 'see more'...")
await see_more_button.click()
await asyncio.sleep(0.5) # Short pause to expand
post_text_loc = post_locator.locator(".feed-shared-update-v2__description") # Example wrapper selector
extracted_data['post_text'] = (await post_text_loc.inner_text()).strip() if await post_text_loc.count() else ''
# --- YOUR DETAILED EXTRACTION LOGIC GOES HERE ---
# Analyze 'post_text' or other elements to find:
# - Job Title ('job_title_extracted')
# - Company Name ('company_extracted')
# - Location ('location_extracted')
# - Direct Link ('direct_link')
# - Skills, etc.
# Fill extracted_data['...'] with what you find.
# Simple example (for illustration only):
# if extracted_data.get('post_text'):
# if ".net pleno" in extracted_data['post_text'].lower():
# extracted_data['job_title_extracted'] = ".NET Pleno (Detected)"
# if "empresa x" in extracted_data['post_text'].lower():
# extracted_data['company_extracted'] = "Company X"
print(f"DEBUG: Extracted data (partial): { {k:v for k,v in extracted_data.items() if k != 'post_text'} }") # Log without large text
return extracted_data
except PlaywrightTimeoutError:
print(f"PW WARN: Timeout while extracting data from a post.")
return None
except PlaywrightError as e:
print(f"PW WARN: Playwright error while extracting data from a post: {e}")
return None
except Exception as e:
print(f"PW ERROR: Unexpected error parsing post with Playwright: {e}")
return None
# --- Public Async Methods ---
async def fetch_raw_posts(self) -> List[Dict[str, Any]]:
"""Fetches posts from all queries, navigates, and scrolls the page."""
if not self.page or self.page.is_closed():
print("ERROR: Playwright page not available. Cannot fetch posts.")
return []
print("\nINFO: --- Starting Post Fetching (Playwright) ---")
all_extracted_posts = []
processed_post_urls_this_run = set()
for query_idx, query in enumerate(self.search_queries):
kw = query['keywords']
found_posts_total_query = 0
# Location is not a standard filter in Content/Posts search via URL
# Can be included in keywords: e.g., ".net AND remote AND Brazil"
print(f"\nPW INFO: Processing Query {query_idx+1}/{len(self.search_queries)}: Keywords='{kw}'")
# Build CONTENT (posts) search URL
params = {
'keywords': kw,
'origin': 'GLOBAL_SEARCH_HEADER', # Common origin
# Apply filters directly in URL
'datePosted': f'"{self.timespan}"' # Format with quotes inside
# Other possible filters: network=["F"] (1st connections), etc.
}
base_url = "https://www.linkedin.com/search/results/content/"
search_url = f"{base_url}?{urlencode(params, quote_via=quote)}"
try:
print(f"PW INFO: Navigating to post search URL: {search_url}")
await self.page.goto(search_url, wait_until='domcontentloaded')
await asyncio.sleep(self.action_delay_s + 3) # Longer initial wait
# --- Scroll and Extract Loop ---
consecutive_scrolls_without_new = 0
for scroll_attempt in range(self.scrolls_per_query):
print(f"PW INFO: Scroll attempt {scroll_attempt + 1}/{self.scrolls_per_query} for query '{kw}'...")
# Locator for posts in the results feed
# !!! CRITICAL SELECTOR - NEEDS UPDATE !!!
# Inspect the post search results page when logged in
post_card_selector = "div.feed-shared-update-v2" # Common example, could be article, etc.
# post_card_selector = "div[data-urn*='urn:li:activity:']" # Attribute-based alternative
card_locators = self.page.locator(post_card_selector)
count = await card_locators.count()
print(f"PW INFO: Found {count} potential post elements on screen.")
if count == 0 and scroll_attempt > 0: # If nothing found after scrolling
print("PW WARN: No post elements found after scroll.")
consecutive_scrolls_without_new +=1
if consecutive_scrolls_without_new >= 2: break # Stop if 2 scrolls find NOTHING
# Continue to next scroll anyway? Or break? Test.
found_this_scroll = 0
current_posts_on_screen = await card_locators.all() # Get all current locators
for card_locator in current_posts_on_screen:
# Call the extraction method YOU will implement
post_data = await self._extract_post_data(card_locator)
# Validate and check duplicates BEFORE adding
if post_data and post_data.get('post_url'):
if post_data['post_url'] not in processed_post_urls_this_run:
all_extracted_posts.append(post_data)
processed_post_urls_this_run.add(post_data['post_url'])
found_this_scroll += 1
# else: print(f"DEBUG: post_url {post_data['post_url']} already processed in this run.") # Verbose
# else: print("DEBUG: _extract_post_data returned None or missing post_url.") # Verbose
print(f"PW INFO: Extracted {found_this_scroll} new, valid posts in this scroll.")
if found_this_scroll == 0 and count > 0: # Scrolled, had posts, but none were new/valid
consecutive_scrolls_without_new += 1
else:
consecutive_scrolls_without_new = 0 # Reset counter if new found
found_posts_total_query += found_this_scroll
# Stop if not finding new posts for a few scrolls
if consecutive_scrolls_without_new >= 3: # Stop after 3 scrolls finding NOTHING new
print("PW INFO: Stopping scroll for this query as no new posts were found in 3 attempts.")
break
# Scroll down the page
print("PW INFO: Scrolling down...")
await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(self.action_delay_s + 2) # Wait for load
except PlaywrightTimeoutError: print(f"PW ERROR: Timeout during navigation or interaction for query '{kw}'.")
except PlaywrightError as e: print(f"PW ERROR: Playwright error for query '{kw}': {e}")
except Exception as e: print(f"PW ERROR: Unexpected error during query '{kw}': {e}")
print(f"PW INFO: Finished query '{kw}'. Found {found_posts_total_query} total new valid posts.")
await asyncio.sleep(self.action_delay_s) # Small delay between queries
print(f"\nPW INFO: Post fetching complete. Extracted {len(all_extracted_posts)} total posts.")
return all_extracted_posts
# Methods load_existing_data, deduplicate_and_prepare, and save_results
# need to be adapted to use 'post_id' generated from extracted data
# and the desired final column names.
def load_existing_data(self) -> Tuple[pd.DataFrame, Set[str]]:
"""Loads data from the existing Excel file (focused on post_id)."""
# (Logic similar to previous, but looks for 'ID Unico' or 'post_id')
existing_ids = set()
existing_df = pd.DataFrame()
if os.path.exists(self.output_filename):
print(f"INFO: Loading existing data from '{self.output_filename}'...")
try:
existing_df = pd.read_excel(self.output_filename)
# Try reading with PT column name first, then EN as fallback
id_col_name_pt = JOBS_COLUMNS_MAPPER_ENG_PT.get('post_id','ID Unico') # PT name
id_col_name_en = 'post_id'
id_col_to_use = None
if id_col_name_pt in existing_df.columns: id_col_to_use = id_col_name_pt
elif id_col_name_en in existing_df.columns: id_col_to_use = id_col_name_en
if id_col_to_use:
existing_ids = set(existing_df[id_col_to_use].dropna().astype(str).unique())
print(f"INFO: Found {len(existing_ids)} existing post IDs.")
# Rename columns to EN internally if they were PT
if id_col_to_use == id_col_name_pt:
existing_df = existing_df.rename(columns=JOBS_COLUMNS_MAPPER_PT_ENG)
else:
print(f"WARN: ID column ('{id_col_name_pt}' or '{id_col_name_en}') not found.")
except Exception as e: print(f"WARN: Could not load/parse '{self.output_filename}'. Error: {e}"); existing_df = pd.DataFrame()
else: print(f"INFO: No existing file '{self.output_filename}' found.")
return existing_df, existing_ids
def deduplicate_and_prepare(self, fetched_posts: List[Dict], existing_ids: Set[str]) -> List[Dict]:
"""Generates hash (post_id) and removes duplicates against existing IDs."""
# Internal filters (title_exclude, etc.) DON'T apply directly here,
# as we extracted general posts. Filtering is done in the initial query.
# We could add post text filters here if desired.
new_unique_posts = []
print(f"PW INFO: Deduplicating {len(fetched_posts)} extracted posts against {len(existing_ids)} existing IDs...")
for post in fetched_posts:
# Generate hash based on POST URL
# Ensure post_url exists (should have been validated in _extract_post_data)
if not post.get('post_url'): continue
# Use post_url and maybe poster_name for the hash
post_hash = generate_job_hash(identifier=post['post_url'], company=post.get('poster_name'), title=post.get('poster_title'))
if post_hash not in existing_ids:
post['post_id'] = post_hash # Add the generated ID
new_unique_posts.append(post)
existing_ids.add(post_hash)
# else: print(f"DEBUG: Skipping duplicate post: {post.get('post_url')}")
print(f"PW INFO: Found {len(new_unique_posts)} new unique posts after deduplication.")
return new_unique_posts
def save_results(self, posts_to_save: List[Dict], existing_df: pd.DataFrame):
"""Combines new posts with existing ones and saves to Excel."""
if not posts_to_save:
print("\nINFO: No new posts to save.")
if not os.path.exists(self.output_filename): print(f"INFO: No existing file '{self.output_filename}'.")
return
print(f"\nINFO: Preparing to save {len(posts_to_save)} new posts to '{self.output_filename}'...")
try:
new_df = pd.DataFrame.from_records(posts_to_save)
# Combine with existing
if existing_df is not None and not existing_df.empty:
print(f"INFO: Combining with {len(existing_df)} existing posts.")
# Ensure 'post_id' column and string type
id_col_en = 'post_id'
if id_col_en not in existing_df.columns: existing_df[id_col_en] = None
if id_col_en not in new_df.columns: new_df[id_col_en] = None # Should have been added
existing_df[id_col_en] = existing_df[id_col_en].astype(str)
new_df[id_col_en] = new_df[id_col_en].astype(str)
# Align columns (using internal EN names)
all_cols_en = list(existing_df.columns) + [col for col in new_df.columns if col not in existing_df.columns]
existing_df = existing_df.reindex(columns=all_cols_en)
new_df = new_df.reindex(columns=all_cols_en)
combined_df = pd.concat([existing_df, new_df], ignore_index=True)
# Deduplicate by 'post_id'
combined_df = combined_df.drop_duplicates(subset=[id_col_en], keep='last')
print(f"INFO: Final DataFrame size after combining: {len(combined_df)} rows.")
else:
print("INFO: Saving only newly found posts.")
combined_df = new_df
# Define final desired column order (internal EN names)
final_cols_order_en = [
'post_id', 'post_url', 'poster_name', 'poster_title',
'date_posted_relative', 'post_text', 'direct_link',
'job_title_extracted', 'company_extracted', 'location_extracted',
'skills_extracted', # Add other extracted fields
'source'
]
# Filter and reorder based on columns actually present
final_df_cols_en = [col for col in final_cols_order_en if col in combined_df.columns]
combined_df_en = combined_df[final_df_cols_en]
# Rename columns to PT BEFORE saving
combined_df_pt = combined_df_en.rename(columns=JOBS_COLUMNS_MAPPER_ENG_PT) # Use global mapper
combined_df_pt.to_excel(self.output_filename, index=False, engine='openpyxl')
print(f"INFO: Data successfully saved/updated in '{self.output_filename}'.")
except ImportError: print("\nERROR: `pandas` or `openpyxl` library not found.")
except Exception as e: print(f"\nERROR: Failed to save to Excel: {e}"); import traceback; traceback.print_exc()
# ==============================================================================
# Async Example Usage for the Post Scraper Class
# ==============================================================================
async def run_playwright_post_scraper():
"""Async function to demonstrate usage of the Playwright Post Scraper class."""
print("Executing example with LinkedInPostScraperPlaywright...")
# Dependencies check
try:
import pandas
import openpyxl
from playwright.async_api import async_playwright
except ImportError as e:
print(f"FATAL ERROR: Missing required library: {e.name}.")
print("Please install dependencies: pip install pandas openpyxl python-dotenv playwright")
print("And install browser binaries: playwright install chromium")
exit()
print("INFO: Preparing to run the POST scraper with Playwright...")
print(f"INFO: Make sure Chrome is running with: --remote-debugging-port={9222}") # Hardcoded port for example
print("INFO: And that you are LOGGED IN to LinkedIn in that Chrome instance.")
input("Press Enter when Chrome is ready...")
load_dotenv() # Load .env if present
# 1. Define Configuration
config_posts = {
"search_queries": [
{"keywords": ".net AND pleno AND (remoto OR \"home office\")"},
# Add your POST search queries here
],
"output_filename": "linkedin_job_posts_output.xlsx", # Output file name
"timespan": "past-week", # URL date filter
"scrolls_per_query": 4, # Try 4 scrolls
"action_delay_s": 5, # Longer delay between scrolls/waits
"debug_port": 9222
}
# 2. Instantiate and Connect
scraper_posts = LinkedInPostScraperPlaywright(**config_posts)
connected = await scraper_posts.connect()
if not connected:
print("FATAL ERROR: Could not connect to browser via Playwright. Exiting.")
return
# 3. Execute Steps
try:
existing_df, existing_ids = scraper_posts.load_existing_data()
# fetch_raw_posts is async
raw_posts = await scraper_posts.fetch_raw_posts()
# deduplicate and save are sync
new_unique_posts = scraper_posts.deduplicate_and_prepare(raw_posts, existing_ids)
scraper_posts.save_results(new_unique_posts, existing_df)
except Exception as e:
print(f"ERROR: An error occurred during the post scraper execution: {e}")
import traceback
traceback.print_exc()
finally:
# Ensure connection is closed
await scraper_posts.close()
print("\n--- Playwright Posts Example Finished ---")
# ==============================================================================
# Main Entry Point (if executed as script) - Now runs Playwright example
# ==============================================================================
# Keep the old scraper class if you want to switch between them
def run_api_scraper():
TIMESPAN_PASTWEEK= "r604800"
TIMESPAN_PAST24HOURS= "r86400"
TIMESPAN_PASTMONTH= "r2592000"
config = {
"search_queries": [
{"keywords": ".net AND pleno", "location": "Brazil", "f_WT": ""}, # Remoto
],
"output_filename": "linkedin_scraper_jobs.xlsx",
"title_exclude": ["frontend", "front end", "manager", "lead"],
"company_exclude": ["recruiter", "consulting xyz"],
"timespan": TIMESPAN_PASTMONTH, # Past Week
"pages_to_scrape": 10, # Search for 10 pages per query
"request_delay": 5
}
scraper = LinkedInScraper(**config)
existing_df, existing_ids = scraper.load_existing_data()
raw_jobs = scraper.fetch_raw_jobs()
new_unique_jobs = scraper.deduplicate_and_prepare(raw_jobs, existing_ids)
scraper.save_results(new_unique_jobs, existing_df)
print("\n--- Script Example Finished ---")
if __name__ == '__main__':
print("Executing the main script...")
# Choose which scraper to run by default
run_api_scraper() # Run the old requests-based scraper
# Or run the new Playwright-based post scraper
asyncio.run(run_playwright_post_scraper())
@pedrohenriquebr
Copy link
Author

pedrohenriquebr commented Apr 17, 2025

LinkedIn Job Scraper (API & Posts via Playwright)

This Python script provides two methods for scraping job listings from LinkedIn:

  1. LinkedInScraper (API-based): Uses requests and BeautifulSoup to fetch job listings from an unofficial LinkedIn guest API endpoint (/jobs-guest/jobs/api/seeMoreJobPostings/search). This method does not require a LinkedIn login but relies on an unofficial API that might change or become unavailable. It primarily fetches structured job listing data.
  2. LinkedInPostScraperPlaywright (Post-based): Uses playwright to connect to an existing, logged-in Chrome browser instance running with remote debugging enabled. It navigates to the LinkedIn post search results page (content feed), scrolls down to load more posts, and provides a framework (_extract_post_data method) for you to implement the logic to extract job details directly from user posts. This method is necessary for finding jobs advertised within posts rather than formal listings and requires an active LinkedIn session.

Both methods include filtering, deduplication against previously saved results in an Excel file, and saving the new unique jobs found.

Features

  • Dual Scraping Methods: Choose between scraping formal job listings (no login needed, potentially less reliable API) or job posts (requires login via Playwright, better for hidden opportunities).
  • Configuration: Easily configure search queries, location, job type filters, time spans, exclusion keywords (title, company), and output file names.
  • Deduplication: Avoids adding the same job multiple times by checking against previously saved results in the output Excel file (using a generated hash job_id or post_id).
  • Playwright Integration: Connects to an existing Chrome instance for scraping posts, leveraging your logged-in session.
  • Extensible Post Extraction: The LinkedInPostScraperPlaywright class has a clearly marked method (_extract_post_data) for you to customize how data is extracted from individual posts.
  • Excel Output: Saves results to a .xlsx file using pandas.

Requirements

  • Python 3.8+
  • Google Chrome (or another Chromium-based browser if you adapt the Playwright connection)
  • Required Python packages (install via pip):
    • pandas
    • openpyxl (for Excel read/write)
    • requests (for LinkedInScraper)
    • beautifulsoup4 (for LinkedInScraper)
    • python-dotenv (optional, for loading environment variables)
    • playwright (for LinkedInPostScraperPlaywright)
    • langdetect (optional, used by original LinkedInScraper code but not active in current version)

Installation

  1. Clone or Download: Get the main.py script.
  2. Install Python Packages:
    pip install pandas openpyxl requests beautifulsoup4 python-dotenv playwright langdetect
  3. Install Playwright Browsers: Download the necessary browser binaries for Playwright (Chromium is used by default for connecting via CDP):
    playwright install chromium
    # Or install all: playwright install

Configuration

Modify the configuration variables directly within the main.py script, primarily in these sections:

  1. Global Configuration:

    • OUTPUT_FILENAME: Change the base name if desired (separate files are used in the example if __name__ == '__main__' block).
  2. LinkedInScraper (API-based) Configuration: Inside the run_api_scraper() function (within if __name__ == '__main__':):

    • config["search_queries"]: A list of dictionaries. Each dict needs 'keywords' and 'location'. 'f_WT' is optional (Job Type: ""=Any, "1"=On-site, "2"=Remote, "3"=Hybrid).
    • config["output_filename"]: Specific output file for this scraper.
    • config["title_exclude"], config["company_exclude"], config["title_include"]: Lists of strings for filtering.
    • config["timespan"]: Time filter for the API (e.g., "r604800" for Past Week, "r86400" for Past 24h, "r2592000" for Past Month).
    • config["pages_to_scrape"]: How many pages (of ~25 results) to fetch per query.
    • config["request_delay"]: Base delay (seconds) between requests.
  3. LinkedInPostScraperPlaywright (Post-based) Configuration: Inside the run_playwright_post_scraper() function (within if __name__ == '__main__':):

    • config_posts["search_queries"]: List of dictionaries, primarily using 'keywords'. Location filtering is less effective here and often included in the keywords (e.g., "python developer remote brazil").
    • config_posts["output_filename"]: Specific output file for this scraper.
    • config_posts["timespan"]: Time filter applied to the URL ("past-week", "past-24h", "past-month").
    • config_posts["scrolls_per_query"]: How many times to scroll down the results page per query.
    • config_posts["action_delay_s"]: Delay (seconds) between Playwright actions (scrolling, waiting).
    • config_posts["debug_port"]: Must match the port Chrome is running with for remote debugging (default is 9222).
  4. Headers & Proxies (Optional):

    • Modify DEFAULT_HEADERS inside the LinkedInScraper class or pass custom headers during instantiation if needed.
    • Add proxy details to the proxies dictionary if required.

Usage

Running the Scrapers

The script currently runs both scrapers sequentially when executed directly:

python main.py

It will first run run_api_scraper() and then run_playwright_post_scraper(). You can comment out one of the calls in the if __name__ == '__main__': block if you only want to run one of them.

Running the Playwright Post Scraper (LinkedInPostScraperPlaywright) - IMPORTANT STEPS:

  1. Start Chrome with Remote Debugging: Before running the script, you must start Chrome (or your target Chromium browser) with the remote debugging port enabled. Open your terminal and run:
    • Linux: google-chrome --remote-debugging-port=9222 (Adjust path/command if needed)
    • macOS: /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222
    • Windows: "C:\Program Files\Google\Chrome\Application\chrome.exe" --remote-debugging-port=9222 (Adjust path if needed)
    • Leave this terminal window open while the script runs. The port (9222) must match the debug_port configuration in the script.
  2. Log In to LinkedIn: In the Chrome window that just opened, manually log in to your LinkedIn account. The scraper needs an active session to access post search results correctly.
  3. Run the Python Script: Execute python main.py.
  4. Press Enter: The script will prompt you to press Enter once Chrome is running and you are logged in.
  5. Observe: Playwright will connect to the running Chrome instance, navigate to the post search URLs based on your queries, scroll down the page, and attempt to extract data using the _extract_post_data method. Results will be deduplicated and saved to the configured Excel file.

Customizing Post Extraction (_extract_post_data)

The core of the LinkedInPostScraperPlaywright requires your customization. You need to edit the _extract_post_data method within the class:

  1. Inspect LinkedIn: Open LinkedIn in your normal browser, perform a post search similar to your queries, and use your browser's Developer Tools (F12) to inspect the HTML structure of the posts in the results feed.
  2. Identify Selectors: Find reliable CSS Selectors or XPath expressions for:
    • The main container element of a single post (post_card_selector in fetch_raw_posts needs checking too).
    • The element containing the permanent URL of the post (often linked from the timestamp or hidden in a data-urn attribute). This is mandatory.
    • Elements for the poster's name, title, relative date posted.
    • The main text content area of the post.
    • The "see more" button if posts are truncated.
    • Any elements where job-specific details (title, company, location, direct links) might appear within the post text or structure.
  3. Update _extract_post_data: Replace the example selectors and extraction logic within the _extract_post_data method with the correct Playwright locator() calls and data extraction methods (.inner_text(), .get_attribute(), etc.) based on the selectors you found. Extract the desired fields into the extracted_data dictionary.

Notes

  • LinkedIn UI Changes: LinkedIn frequently updates its website structure. The CSS selectors used for scraping (especially in the Playwright scraper) will likely break over time and require updates.
  • API Reliability: The /jobs-guest/ API endpoint used by LinkedInScraper is unofficial and may stop working without notice.
  • Rate Limiting/Blocking: Scraping LinkedIn, especially frequently or aggressively, can lead to IP blocks or account restrictions. Use reasonable delays (request_delay, action_delay_s) and consider using proxies if scraping heavily. Running the Playwright scraper requires a logged-in session, increasing the risk if automation is detected. Use responsibly and at your own risk.
  • Error Handling: Basic error handling is included, but robust scraping often requires more sophisticated handling of different HTTP errors, CAPTCHAs (unlikely with CDP connection but possible), and unexpected page structures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment