Scrapy Spider for NHTSA Violation Data
Scrapy Spider for NHTSA Violation Data
# import re
# import os
# import string
# import scrapy
# import pandas as pd
# from datetime import datetime
# from [Link] import execute
#
#
# # Spider class for scraping violation data from Good Jobs First (OFAC-related
entries)
# class Act115Spider([Link]):
# name = "NHTSA"
#
# # Preset cookies required for session management and access
# cookies = {
# '_fbp': 'fb.1.1747201269861.315460754820040010',
# '_gid': 'GA1.2.38545643.1747749932',
# '_ga_Q4G4E8KT5J': 'GS2.1.s1747827357$o2$g1$t1747827638$j0$l0$h0',
# 'PHPSESSID': '454b72dc00e908524384cdb37fddb976',
# '_gat_UA-21812781-2': '1',
# '_ga_9VW1HCFL7C': 'GS2.1.s1747997430$o42$g1$t1747999957$j0$l0$h0',
# '_ga': 'GA1.1.1470230669.1747201268',
# }
#
# headers = {
# 'accept':
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/
webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
# 'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,hi;q=0.7',
# 'cache-control': 'no-cache',
# 'pragma': 'no-cache',
# 'priority': 'u=0, i',
# 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136",
"Not.A/Brand";v="99"',
# 'sec-ch-ua-mobile': '?0',
# 'sec-ch-ua-platform': '"Windows"',
# 'sec-fetch-dest': 'document',
# 'sec-fetch-mode': 'navigate',
# 'sec-fetch-site': 'none',
# 'sec-fetch-user': '?1',
# 'upgrade-insecure-requests': '1',
# 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/[Link] Safari/537.36',
# # 'cookie': '_fbp=fb.1.1747201269861.315460754820040010;
_gid=GA1.2.38545643.1747749932;
_ga_Q4G4E8KT5J=GS2.1.s1747827357$o2$g1$t1747827638$j0$l0$h0;
PHPSESSID=454b72dc00e908524384cdb37fddb976; _gat_UA-21812781-2=1;
_ga_9VW1HCFL7C=GS2.1.s1747997430$o42$g1$t1747999957$j0$l0$h0;
_ga=GA1.1.1470230669.1747201268',
# }
# custom_settings = {
# "DOWNLOAD_HANDLERS": {
# "http": "scrapy_impersonate.ImpersonateDownloadHandler",
# "https": "scrapy_impersonate.ImpersonateDownloadHandler",
# },
# "TWISTED_REACTOR":
"[Link]",
# }
#
# def __init__(self):
# # Store all parsed data in a list for final export
# [Link] = []
# self.raw_data=[]
# self.data_cleaned = []
# self.data_uncleaned = []
# [Link] = [Link]([Link](), "exports") # or specify your own
path
#
# # Create the directory if it doesn't exist
# [Link]([Link], exist_ok=True)
# [Link] = "[Link]
company_op=starts&company=&offense_group=&agency_code=NHTSA"
# def start_requests(self):
# browsers = [
# "chrome110",
# "edge99",
# "safari15_5"
# ]
# meta = {}
# meta['impersonate'] = [Link](browsers)
# meta['source_url'] = [Link]
#
# # Send initial request with custom headers and cookies
# yield [Link](url=[Link],meta=meta, headers=[Link],
cookies=[Link], callback=self.parse_listing)
#
# def parse_listing(self, response):
# company_links =
[Link]('//table[2]/tbody/tr/td[1]/a/@href').getall()
# source_url = [Link]('source_url')
#
# for link in company_links:
# browsers = [
# "chrome110",
# "edge99",
# "safari15_5"
# ]
# meta = {}
# meta['impersonate'] = [Link](browsers)
# meta['source_url']=source_url
#
# yield [Link](link,meta=meta, headers=[Link],
cookies=[Link], callback=self.parse_details)
# # break # Use during testing to limit pagination
#
# # Handle pagination by identifying total pages and iterating through all
# next_page = [Link]('//a[contains(text(),">>")]/@href').get()
# if next_page:
# total_page = int(next_page.split('page=')[-1])
# for page in range(0, total_page+1):
# paginated_url = f"{[Link]}&page={page}"
# browsers = [
# "chrome110",
# "edge99",
# "safari15_5"
# ]
# meta = {}
# meta['impersonate'] = [Link](browsers)
# meta['source_url']=paginated_url
#
# yield [Link](url=paginated_url,meta=meta,
headers=[Link], cookies=[Link], callback=self.parse_listing)
#
# def parse_details(self, response):
# # Known countries list
# self.known_countries = ['india ', 'usa','us', 'united states', 'uk',
'canada', 'australia', 'bahamas',
# 'singapore', 'germany', 'france', 'hong kong',
'north america', 'america']
# source_url = [Link]('source_url')
#
# # Extract all key-value details from the company profile page
# item = dict()
# raw_item = dict() # For storing completely raw data
#
# item["PDP URL"] = [Link]
# raw_item["PDP URL"] = [Link]
#
# for path in [Link]('//div[@id="contentResult"]//b'):
# item['source_url']=source_url
#
# data_skip = False
# key = [Link]('./text()').get(default='NA').strip()
# raw_value = \
# ''.join([Link]('./following-sibling::text() | ./following-
sibling::a//text()').getall()).split(
# '\n:')[0].replace(':\xa0', '').strip()
# value = raw_value # Initialize cleaned value with raw value
#
# # Store raw value first
# raw_item[key] = raw_value
#
# # Format date fields into YYYY-MM-DD format (only for cleaned data)
# if 'Date' in key:
# try:
# date_obj = [Link](value, "%B %d, %Y")
# value = date_obj.strftime("%Y-%m-%d")
# except Exception:
# pass
# # Remove "Note:" prefix and clean whitespace if key is "Note"
# if key == 'Notes':
# value = [Link](r'^Note:\s*', '', value).strip()
#
# # Special handling for Company field
# if key == 'Company':
# # First handle dba/d/b/a cases - extract alias and remove from
main name
# dba_pattern = r'(.*?)\s+(dba|d/b/a|doing\s+business\s+as)\s+(.+)'
# dba_match = [Link](dba_pattern, value, flags=[Link])
#
# aliases = []
# if dba_match:
# # Extract main name and alias
# main_name = dba_match.group(1).strip()
# dba_alias = dba_match.group(3).strip()
#
# # Simple cleaning - just remove trailing punctuation
# dba_alias = [Link](r'[.,;]+$', '', dba_alias).strip()
#
# # Add to aliases list and update value
# if dba_alias:
# [Link](dba_alias)
# value = main_name # Use just the main name part
#
# # Check if we should split on "and"
# split_keywords = [
# 'inc', 'inc ', 'incorporated', 'corp', 'corp ', 'Inc.',
'corporation',
# 'llc', 'ltd', 'limited', 'lp', 'llp', 'plc', 'communities',
'center',
# 'co', 'company', 'group'
# ]
#
# parts = [Link](r'\s+(and)\s+', value, flags=[Link])
# should_split = False
#
# if len(parts) > 1:
# left_part = parts[0].strip()
# right_part = parts[-1].strip()
#
# # Check if both parts end with company suffixes
# left_has_suffix = any([Link](rf'\b{kw}\.?$', left_part,
[Link]) for kw in split_keywords)
# right_has_suffix = any(
# [Link](rf'\b{kw}\.?$', right_part, [Link]) for
kw in split_keywords)
#
# should_split = left_has_suffix and right_has_suffix
#
# if should_split:
# # Handle the split case
# company_names = [parts[0].strip(), parts[2].strip()]
#
# # Create new items for each company
# items = []
# for name in company_names:
# new_item = [Link]()
# new_item['Company'] = self.clean_text_punct(name)
#
# # Only keep aliases for the first item if they exist
# if name == company_names[0] and aliases:
# for idx, alias_val in enumerate(aliases[:2],
start=1):
# new_item[f'Alias {idx}'] = alias_val
#
# [Link](new_item)
#
# return items
#
# # If we shouldn't split, proceed with normal processing
# # Now process parentheses content
# parens = [Link](r'\(([^()]+)\)', value)
# country = None
#
# # Check inside () for country or special cases
# for val in parens:
# cleaned_val = [Link]().lower()
# # Remove phrases like "now known as", "formerly", etc.
# cleaned_val = [Link](r'^(now known as|formerly known as|also
known as)\s+', '', cleaned_val,
# flags=[Link])
# if 'us' in cleaned_val:
# print("found")
#
# if cleaned_val in self.known_countries:
# # Preserve country in parentheses
# value = [Link](r'\(' + [Link](val) + r'\)', f'
({val})', value)
# country = cleaned_val
# else:
# if f'({val})' in value:
# [Link](cleaned_val)
#
# # Remove () content only if not country
# for val in parens:
# if [Link]() not in self.known_countries:
# value = [Link](r'\(' + [Link](val) + r'\)', ' ',
value)
#
# # Clean up company name
# company_name = self.clean_text_punct([Link]())
# item[key] = company_name
#
# # Assign aliases (simple assignment without splitting)
# for idx, alias_val in enumerate(aliases[:2], start=1):
# item[f'Alias {idx}'] = alias_val
#
# continue
# # Handle 'and' in aliases
# # split_aliases = [Link](r'\s+and\s+', alias_val,
flags=[Link])
# # for i, part in enumerate(split_aliases, start=idx):
# # part = [Link]()
# # if part:
# # part = [Link](r'^and\s*', '', part,
flags=[Link])
# # item[f'Alias {i}'] = part
# # idx += len(split_aliases) - 1
#
# # continue
#
# # Handle Current Parent Company (similar to Company but without
alias/country extraction)
# if key == 'Current Parent Company':
# # Clean up punctuation (keeping - ' & /)
# text_no_punct = [Link]([Link]('', '',
[Link]('-', '')
# .replace("'", '')
# .replace('&', '')
# .replace('/', '')))
# value = [Link](r'\s+', ' ', text_no_punct).strip()
# item[key + " name"] = value
# item[key + " url"] =
[Link]('./following-sibling::a/@href').get(default='')
# raw_item[key + " name"] = raw_value
# raw_item[key + " url"] =
[Link]('./following-sibling::a/@href').get(default='')
# continue
#
# # Handle special keys (Mega-Scandal, Source of Data)
# if 'Mega-Scandal' in key or 'Source of Data' in key:
# if 'Mega-Scandal' in key:
# item[key + " name"] = value
# item[key + " url"] =
[Link]('./following-sibling::a/@href').get(default='')
# raw_item[key + " name"] = raw_value
# raw_item[key + " url"] =
[Link]('./following-sibling::a/@href').get(default='')
# else:
# value =
[Link]('./following-sibling::a/@href').get(default='')
# raw_item[key] = value
# item[key] = value
# data_skip = True
#
# # Handle link-based values (e.g., "click here")
# if not data_skip:
# if "(click here)" in [Link]():
# value =
[Link]('./following-sibling::a/@href').get(default='')
# raw_item[key] = value
# item[key] = value # All other fields keep original punctuation
#
# # Additional handling for specific URL columns
# url_columns = ['Current Parent Company url', 'Archived Source', 'Mega-
Scandal url']
# for url_col in url_columns:
# if url_col in item and isinstance(item[url_col], str) and "(click
here)" in item[url_col].lower():
# url_value = [Link](
# f'//b[contains(text(), "{url_col.replace(" url",
"")}")]/following-sibling::a/@href').get(
# default='')
# item[url_col] = url_value
# raw_item[url_col] = url_value
#
# split_items = self.split_company_names(item)
# for new_item in split_items:
# [Link](new_item)
# self.raw_data.append(raw_item)
#
# def split_company_names(self, item):
# split_keywords = [
# # Common US suffixes
# 'inc', 'inc ', 'incorporated', 'corp', 'corp ', 'Inc.',
'corporation',
# 'llc', 'ltd', 'limited', 'lp', 'llp', 'plc', 'communities', 'center',
# 'co', 'company', 'group'
# ]
# company_name = item['Company']
#
# # First handle dba/d/b/a cases - this needs to happen BEFORE any other
processing
# dba_pattern = r'^(.*?)\s+(dba|d/b/a|doing\s+business\s+as)\s+(.+)$'
# dba_match = [Link](dba_pattern, company_name, flags=[Link])
#
# if dba_match:
# # If we find dba pattern, process it immediately
# main_name = dba_match.group(1).strip()
# dba_alias = dba_match.group(3).strip()
#
# # Clean the alias
# dba_alias = [Link](r'[.,;]+$', '', dba_alias)
# dba_alias = [Link](r'\([^)]*\)$', '', dba_alias).strip()
#
# # Create new item with main name and alias
# new_item = [Link]()
# new_item['Company'] = main_name
# new_item['Alias 1'] = dba_alias.lower()
#
# # Remove any other aliases since we're creating a fresh copy
# for k in list(new_item.keys()):
# if [Link]('Alias ') and k != 'Alias 1':
# del new_item[k]
#
# return [new_item]
#
# # Rest of your existing splitting logic for "and" cases
# # Split on 'and' or '&' with surrounding whitespace
# parts = [Link](r'\s+(and)\s+', company_name, flags=[Link])
#
# # If no splitting occurred, return original item in a list
# if len(parts) == 1:
# return [item]
#
# result_items = []
# current = parts[0].strip()
# i = 1
#
# while i < len(parts):
# sep = parts[i]
# next_part = parts[i + 1].strip()
#
# # Check if current ends with a keyword
# keyword_match = False
# for kw in split_keywords:
# if [Link](rf'\b{kw}\.?$', current, [Link]):
# keyword_match = True
# break
#
# if keyword_match:
# # Create a new item with the current company name
# new_item = [Link]()
#
# # Remove any aliases from the item copy since we'll reprocess
them
# for k in list(new_item.keys()):
# if [Link]('Alias '):
# del new_item[k]
#
# new_item['Company'] = current
# result_items.append(new_item)
# current = next_part
# else:
# current += f" {sep} {next_part}"
#
# i += 2
#
# # Add the last remaining part
# new_item = [Link]()
#
# # Remove any aliases from the item copy since we'll reprocess them
# for k in list(new_item.keys()):
# if [Link]('Alias '):
# del new_item[k]
#
# new_item['Company'] = current
# result_items.append(new_item)
#
# # Now reprocess each item to extract aliases specific to that company
# final_items = []
# for split_item in result_items:
# # Make a copy to avoid modifying the original
# processed_item = split_item.copy()
# company_name = processed_item['Company']
#
# # Extract and process aliases just for this company
# parens = [Link](r'\(([^()]+)\)', company_name)
# aliases = []
# country = None
#
# # Check inside () for country or aliases
# for val in parens:
# cleaned_val = [Link]().lower()
# if cleaned_val.upper() != "N.A.":
# # Remove phrases like "now known as", "formerly", etc.
# cleaned_val = [Link](r'^(now known as|formerly known as|also
known as)\s+', '', cleaned_val,
# flags=[Link])
#
# # Check if it's a known country
# if cleaned_val in self.known_countries:
# country = cleaned_val
# else:
# [Link](cleaned_val)
#
# # Handle dba or d/b/a aliases in main text - improved pattern
# dba_pattern = r'^(.*?)\s+(dba|d/b/a|doing\s+business\s+as)\s+(.+)$'
# dba_match = [Link](dba_pattern, company_name, flags=[Link])
# if dba_match:
# main_name = dba_match.group(1).strip()
# dba_alias = dba_match.group(3).strip()
# # Clean the alias
# dba_alias = [Link](r'[.,;]+$', '', dba_alias)
# dba_alias = [Link](r'\([^)]*\)$', '', dba_alias).strip()
# if dba_alias:
# [Link](0, dba_alias.lower()) # Add at beginning as
primary alias
# processed_item['Company'] = main_name
#
# # Remove () content from company name
# processed_item['Company'] = [Link](r'\([^()]+\)', '',
processed_item['Company']).strip()
#
# # Assign aliases if any (up to 2 aliases)
# for idx, alias_val in enumerate(aliases[:2], start=1):
# processed_item[f'Alias {idx}'] = alias_val
#
# final_items.append(processed_item)
#
# return final_items
# # def split_company_names(self, item):
# # split_keywords = [
# # # Common US suffixes
# # 'inc', 'inc ', 'incorporated', 'corp', 'corp ', 'corporation',
# # 'llc', 'ltd', 'limited', 'lp', 'llp', 'plc', 'communities',
'center',
# # 'co', 'company', 'group'
# # ]
# # company_name = item['Company']
# #
# # # First extract all parentheses content and their positions
# # paren_matches = list([Link](r'\(([^()]+)\)', company_name))
# # paren_contents = [[Link](1) for match in paren_matches]
# #
# # # Split on 'and' or '&' with surrounding whitespace
# # parts = [Link](r'\s+(and)\s+', company_name, flags=[Link])
# #
# # # If no splitting occurred, return original item in a list
# # if len(parts) == 1:
# # return [item]
# #
# # result_items = []
# # current = parts[0].strip()
# # i = 1
# #
# # while i < len(parts):
# # sep = parts[i]
# # next_part = parts[i + 1].strip()
# #
# # # Check if current or next_part ends with a keyword
# # keyword_match = False
# # for kw in split_keywords:
# # if [Link](rf'\b{kw}\.?$', current, [Link]):
# # keyword_match = True
# # break
# #
# # if keyword_match:
# # # Create a new item with the current company name
# # new_item = [Link]()
# #
# # # Find any parentheses that were in this part of the string
# # current_with_parens = current
# # for match in paren_matches:
# # if [Link](0) in current:
# # current_with_parens = [Link]([Link](0),
f'({[Link](1)})')
# #
# # new_item['Company'] = current_with_parens
# # result_items.append(new_item)
# # current = next_part
# # else:
# # current += f" {sep} {next_part}"
# #
# # i += 2
# #
# # # Add the last remaining part
# # new_item = [Link]()
# #
# # # Find any parentheses that were in this part of the string
# # current_with_parens = current
# # for match in paren_matches:
# # if [Link](0) in current:
# # current_with_parens = [Link]([Link](0),
f'({[Link](1)})')
# #
# # new_item['Company'] = current_with_parens
# # result_items.append(new_item)
# #
# # return result_items
#
# # def clean_text_punct(text):
# # """Replace punctuation with space (except - ' & /), then normalize
whitespace"""
# # if not isinstance(text, str) or [Link]() == '':
# # return text
# # keep_chars = {"&", "-", "/", "'"}
# # cleaned = ''.join(char if char not in [Link] or char in
keep_chars else ' ' for char in text)
# # return [Link](r'\s+', ' ', cleaned).strip()
#
# def extract_name_alias(self, entry):
# """
# Extract country from a given company name string.
# Returns a tuple: (name, country)
# """
# try:
# if not isinstance(entry, str):
# return entry, None
#
# entry = [Link]()
# country = None
#
# # Extract and remove country from parentheses
# parens = [Link](r'\(([^()]+)\)', entry)
# for val in parens:
# if [Link]() in self.known_countries:
# country = [Link]()
# entry = [Link](r'\(' + [Link](val) + r'\)', '',
entry).strip()
#
# return entry, country
#
# except Exception:
# return entry, None
# # 2. Remove punctuation except &-/'
# def clean_text_punct(self,text):
# """Clean punctuation and whitespace for Company-related fields"""
# if not isinstance(text, str) or [Link]() == '':
# return text
# keep_chars = {"&", "-", "/", "'"}
# cleaned = []
# text = [Link](".", " ")
#
# # Process the text and preserve country names inside parentheses
# parens = [Link](r'\(([^()]+)\)', text) # Extract all content inside
parentheses
# if "HK" not in text:
# text = [Link]("(", "abcdef")
# text = [Link](")", "fedcba")
#
#
# for char in text:
# if char in [Link] and char not in keep_chars:
# [Link](' ')
# else:
# [Link](char)
# cleaned = ''.join(cleaned)
# cleaned = [Link]("abcdef", "(")
# cleaned = [Link]("fedcba", ")")
#
# return [Link](r'\s+', ' ', ''.join(cleaned)).strip()
#
# def normalize_whitespace(self,text):
# """Normalize whitespace (used for non-Company fields)"""
# if not isinstance(text, str) or [Link]() == '':
# return text
# return [Link](r'\s+', ' ', text).strip()
#
#
# def close(self, reason):
# # Create DataFrames
# df_cleaned = [Link]([Link])
# df_uncleaned = [Link](self.raw_data) # Use completely raw data
#
# # Add common columns to both DataFrames
# df_cleaned.insert(0, 'ID', range(1, 1 + len(df_cleaned)))
#
# # Ensure source_url is second column
# if 'source_url' in df_cleaned.columns:
# cols = df_cleaned.[Link]()
# [Link](1, [Link]([Link]('source_url')))
# df_cleaned = df_cleaned[cols]
#
# # Only add ID to uncleaned data (without Source URL)
# df_uncleaned.insert(0, 'ID', range(1, 1 + len(df_uncleaned)))
# # df_uncleaned = df_uncleaned.drop(columns=['Current Parent Company
url','Source of Data'])
#
# # Save uncleaned file (completely raw data)
# timestamp = [Link]().strftime('%Y-%m-%d_%H%M%S')
# filename_uncleaned =
f"raw_data_violationtracker_goodjobsfirst_org_{timestamp}.xlsx"
# filepath_uncleaned = [Link]([Link], filename_uncleaned)
#
# with [Link](filepath_uncleaned, engine='xlsxwriter',
# engine_kwargs={'options': {'strings_to_numbers':
True}}) as writer:
# df_uncleaned.fillna("").to_excel(writer, index=False)
#
# # Process cleaned data
# protected_columns = ['ID', 'Source URL', 'PDP URL', 'Penalty', 'Date',
# 'Current Parent Company url', 'Archived Source',
'Mega-Scandal url']
# columns_to_clean = ['Company', 'Current Parent Company name']
#
# # 1. Replace N/A with blank except in protected columns
# for col in df_cleaned.columns:
# if col not in protected_columns:
# df_cleaned[col] = df_cleaned[col].replace(['N/A', 'NA'], '')
# columns_to_clean = ['Company', 'Current Parent Company']
#
# # Apply per-column logic
# for col in df_cleaned.columns:
# if col in protected_columns:
# continue
# if df_cleaned[col].dtype == object:
# if col in columns_to_clean:
# df_cleaned[col] =
df_cleaned[col].apply(self.clean_text_punct)
# else:
# df_cleaned[col] =
df_cleaned[col].apply(self.normalize_whitespace)
#
# # Save cleaned file
# filename_cleaned =
f"violationtracker_goodjobsfirst_org_{timestamp}.xlsx"
# filepath_cleaned = [Link]([Link], filename_cleaned)
#
# with [Link](filepath_cleaned, engine='xlsxwriter',
# engine_kwargs={'options': {'strings_to_numbers':
True}}) as writer:
# df_cleaned.to_excel(writer, index=False)
# # Columns that should have punctuation removed
#
#
# if __name__ == '__main__':
# execute("scrapy crawl NHTSA".split())
import random
import re
import os
import string
import scrapy
import pandas as pd
from datetime import datetime
from [Link] import execute
# Spider class for scraping violation data from Good Jobs First (OFAC-related
entries)
class Act110Spider([Link]):
name = "NHTSA"
cookies = {
'_fbp': 'fb.1.1747201269861.315460754820040010',
'_gid': 'GA1.2.38545643.1747749932',
'_ga_Q4G4E8KT5J': 'GS2.1.s1747827357$o2$g1$t1747827638$j0$l0$h0',
'PHPSESSID': '454b72dc00e908524384cdb37fddb976',
'_gat_UA-21812781-2': '1',
'_ga_9VW1HCFL7C': 'GS2.1.s1747997430$o42$g1$t1747999957$j0$l0$h0',
'_ga': 'GA1.1.1470230669.1747201268',
}
headers = {
'accept':
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/
webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,hi;q=0.7',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136",
"Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36
(KHTML, like Gecko) Chrome/[Link] Safari/537.36',
# 'cookie': '_fbp=fb.1.1747201269861.315460754820040010;
_gid=GA1.2.38545643.1747749932;
_ga_Q4G4E8KT5J=GS2.1.s1747827357$o2$g1$t1747827638$j0$l0$h0;
PHPSESSID=454b72dc00e908524384cdb37fddb976; _gat_UA-21812781-2=1;
_ga_9VW1HCFL7C=GS2.1.s1747997430$o42$g1$t1747999957$j0$l0$h0;
_ga=GA1.1.1470230669.1747201268',
}
custom_settings = {
"DOWNLOAD_HANDLERS": {
"http": "scrapy_impersonate.ImpersonateDownloadHandler",
"https": "scrapy_impersonate.ImpersonateDownloadHandler",
},
"TWISTED_REACTOR":
"[Link]",
}
def __init__(self):
# Store all parsed data in a list for final export
[Link] = []
self.raw_data=[]
self.data_cleaned = []
self.data_uncleaned = []
[Link] = [Link]([Link](), "exports") # or specify your own
path
yield [Link](url=paginated_url,meta=meta,
headers=[Link], cookies=[Link], callback=self.parse_listing)
data_skip = False
key = [Link]('./text()').get(default='NA').strip()
raw_value = \
''.join([Link]('./following-sibling::text() | ./following-
sibling::a//text()').getall()).split(
'\n:')[0].replace(':\xa0', '').strip()
value = raw_value # Initialize cleaned value with raw value
# Format date fields into YYYY-MM-DD format (only for cleaned data)
if 'Date' in key:
try:
date_obj = [Link](value, "%B %d, %Y")
value = date_obj.strftime("%Y-%m-%d")
except Exception:
pass
# Remove "Note:" prefix and clean whitespace if key is "Note"
if key == 'Notes':
value = [Link](r'^Note:\s*', '', value).strip()
aliases = []
if dba_match:
# Extract all parts
main_name = dba_match.group(1).strip()
alias_part = dba_match.group(3).strip() # Part after d/b/a
and_part = dba_match.group(4).strip() if dba_match.group(4)
else None
if cleaned_val.lower() in self.known_countries:
value = [Link](r'\(' + [Link](val) + r'\)', f' ({val})',
value)
country = cleaned_val
else:
if f'({val})' in value:
[Link](cleaned_val)
if parts:
company_name = parts[0]
extracted_aliases = parts[1:]
cleaned_aliases = [
[Link](r'^[/\s]+', '', [Link]().strip())
for a in extracted_aliases
if len([Link]()) >= 3 and [Link]().strip() not in ["hk"]
]
[Link](cleaned_aliases)
else:
company_name = [Link]()
company_name = self.clean_text_punct(company_name)
# Assign aliases
for idx, alias_val in enumerate(aliases[:2], start=1):
alias_val = [Link](r'^[/\s]+', '', alias_val)
remove_words = ['fka', 'f/k/a', 'aka', 'a/k/a']
pattern = r'^(?:' + '|'.join([Link](word) for word in
remove_words) + r')\s+'
alias_val = [Link](pattern, '', alias_val, flags=[Link])
split_aliases = [Link](r'\s+and\s+', alias_val,
flags=[Link])
idx += len(split_aliases) - 1
continue
split_items = self.split_company_names(reordered_item)
for new_item in split_items:
[Link](new_item)
self.raw_data.append(raw_item)
result_items = []
current = parts[0].strip()
i = 1
i += 2
return result_items
entry = [Link]()
country = None
# Extract and remove country from parentheses
parens = [Link](r'\(([^()]+)\)', entry)
for val in parens:
if [Link]().lower() in self.known_countries:
country = [Link]()
entry = [Link](r'\(' + [Link](val) + r'\)', '',
entry).strip()
except Exception:
return entry, None
cleaned = ''.join(cleaned)
cleaned = [Link]("abcdef", "(")
cleaned = [Link]("fedcba", ")")
def normalize_whitespace(self,text):
"""Normalize whitespace (used for non-Company fields)"""
if not isinstance(text, str) or [Link]() == '':
return text
return [Link](r'\s+', ' ', text).strip()
#
# def extract_additional_info(self, entry):
# """
# Extracts additional information like 'Division of', 'A unit of', 'A
business of', etc.
# Returns tuple: (cleaned_entry, additional_info)
# """
# if not isinstance(entry, str):
# return entry, None
#
# entry = [Link]()
# additional_info = None
#
# # List of patterns to search for
# patterns = [
# r'\b(a Division of\s+.+)',
# r'\b(A\s+unit of\s+.+)',
# r'\b(A\s+business of\s+.+)',
# r'\b(A\s+subsidiary of\s+.+)',
# r'\b(Segment of\s+.+)',
# r'\b(Sub-division of\s+.+)',
# ]
#
# for pattern in patterns:
# match = [Link](pattern, entry, flags=[Link])
# if match:
# additional_info = [Link](0).strip()
# entry = [Link]([Link](additional_info), '', entry,
flags=[Link]).strip()
# break # Only extract the first matching additional info
#
# # Clean extra spaces and dangling "by", "of", "a"
# entry = [Link](r'\b(of|by|a)\s*$', '', entry,
flags=[Link]).strip()
#
# return entry, additional_info
if __name__ == '__main__':
execute("scrapy crawl NHTSA".split())