-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathscrapy_scheduler.py
executable file
·108 lines (84 loc) · 4.18 KB
/
scrapy_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Starts scrapy scheduler. Takes job details from the crawl-sites.json file referenced below as CRAWL_SITES_FILE.
Schedule is fully contained in-memory but current cron expression is stored in the input file so that on each
deploy the schedule can pickup where it left off.
Use the env var SCRAPY_MAX_WORKERS to control how many jobs can run at once. If the max number of jobs are running
when other jobs are supposed to run, those jobs will queue until one or more of the running jobs finishes.
"""
import logging
import os
import subprocess
from pathlib import Path
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from pythonjsonlogger.json import JsonFormatter
from search_gov_crawler.search_gov_spiders.extensions.json_logging import LOG_FMT
from search_gov_crawler.search_gov_spiders.utility_files.crawl_sites import CrawlSites
logging.basicConfig(level=os.environ.get("SCRAPY_LOG_LEVEL", "INFO"))
logging.getLogger().handlers[0].setFormatter(JsonFormatter(fmt=LOG_FMT))
log = logging.getLogger("search_gov_crawler.scrapy_scheduler")
CRAWL_SITES_FILE = Path(__file__).parent / "search_gov_spiders" / "utility_files" / "crawl-sites.json"
def run_scrapy_crawl(spider: str, allow_query_string: bool, allowed_domains: str, start_urls: str) -> None:
"""Runs `scrapy crawl` command as a subprocess given the allowed arguments"""
scrapy_env = os.environ.copy()
scrapy_env["PYTHONPATH"] = str(Path(__file__).parent.parent)
subprocess.run(
f"scrapy crawl {spider} -a allow_query_string={allow_query_string} -a allowed_domains={allowed_domains} -a start_urls={start_urls}",
check=True,
cwd=Path(__file__).parent,
env=scrapy_env,
executable="/bin/bash",
shell=True,
)
msg = (
"Successfully completed scrapy crawl with args "
"spider=%s, allow_query_string=%s, allowed_domains=%s, start_urls=%s"
)
log.info(msg, spider, allow_query_string, allowed_domains, start_urls)
def transform_crawl_sites(crawl_sites: CrawlSites) -> list[dict]:
"""
Transform crawl sites records into a format that can be used to create apscheduler jobs. Only
scheduler jobs that have a value for the `schedule` field.
"""
transformed_crawl_sites = []
for crawl_site in crawl_sites.scheduled():
job_name = crawl_site.name
transformed_crawl_sites.append(
{
"func": run_scrapy_crawl,
"id": job_name.lower().replace(" ", "-").replace("---", "-"),
"name": job_name,
"trigger": CronTrigger.from_crontab(expr=crawl_site.schedule, timezone="UTC"),
"args": [
"domain_spider" if not crawl_site.handle_javascript else "domain_spider_js",
crawl_site.allow_query_string,
crawl_site.allowed_domains,
crawl_site.starting_urls,
],
},
)
return transformed_crawl_sites
def start_scrapy_scheduler(input_file: Path) -> None:
"""Initializes schedule from input file, schedules jobs and runs scheduler"""
# Load and transform crawl sites
crawl_sites = CrawlSites.from_file(file=input_file)
apscheduler_jobs = transform_crawl_sites(crawl_sites)
# Initalize scheduler - 'min(32, (os.cpu_count() or 1) + 4)' is default from concurrent.futures
# but set to default of 5 to ensure consistent numbers between environments and for schedule
max_workers = int(os.environ.get("SCRAPY_MAX_WORKERS", "5"))
log.info("Max workers for schedule set to %s", max_workers)
scheduler = BlockingScheduler(
jobstores={"memory": MemoryJobStore()},
executors={"default": ThreadPoolExecutor(max_workers)},
job_defaults={"coalesce": False, "max_instances": 1},
timezone="UTC",
)
# Schedule Jobs
for apscheduler_job in apscheduler_jobs:
scheduler.add_job(**apscheduler_job, jobstore="memory")
# Run Scheduler
scheduler.start()
if __name__ == "__main__":
start_scrapy_scheduler(input_file=CRAWL_SITES_FILE)