Skip to content

Commit

Permalink
refactor: 🚀 refactoring of the multithreading implementation for down…
Browse files Browse the repository at this point in the history
…loading sources or articles

news_pool was removed. any use of news_pool must be replaced with fetch_news()
  • Loading branch information
AndyTheFactory committed Dec 29, 2023
1 parent 6ccf8af commit 081bd00
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 287 deletions.
4 changes: 0 additions & 4 deletions newspaper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
Configuration as Config,
)
from .article import Article, ArticleException
from .mthreading import NewsPool
from .source import Source
from .version import __version__
import logging
from logging import NullHandler

news_pool = NewsPool()


# Set default logging handler to avoid "No handler found" warnings.
logging.getLogger(__name__).addHandler(NullHandler())
Expand Down Expand Up @@ -78,5 +75,4 @@ def article(url: str, language: Optional[str] = "en", **kwargs) -> Article:
"ArticleException",
"Source",
"__version__",
"news_pool",
]
1 change: 1 addition & 0 deletions newspaper/article.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ def parse(self) -> "Article":

if self.doc is None:
# `parse` call failed, return nothing
self.is_parsed = True
return self

# TODO: Fix this, sync in our fix_url() method
Expand Down
168 changes: 44 additions & 124 deletions newspaper/mthreading.py
Original file line number Diff line number Diff line change
@@ -1,135 +1,55 @@
# -*- coding: utf-8 -*-
# Much of the code here was forked from https://github.com/codelucas/newspaper
# Copyright (c) Lucas Ou-Yang (codelucas)

"""
Anything that has to do with threading in this library
must be abstracted in this file. If we decide to do gevent
also, it will deserve its own gevent file.
"""
Helper functions for multihtreading news fetching.
"""

import logging
import queue
import traceback


from threading import Thread

from .configuration import Configuration

log = logging.getLogger(__name__)


class ConcurrencyException(Exception):
pass
from concurrent.futures import ThreadPoolExecutor
from typing import List, Union
import newspaper
from newspaper.article import Article
from newspaper.source import Source


class Worker(Thread):
"""
Thread executing tasks from a given tasks queue.
def fetch_news(
news_list: List[Union[str, Article, Source]], threads: int = 5
) -> List[Union[Article, Source]]:
"""
Fetch news from a list of sources, articles, or both. Threads will be
allocated to download and parse the sources or articles. If urls are
passed into the list, then a new `Article` object will be created for
it and downloaded + parsed.
There will be no nlp done on the articles.
If there is a problem in detecting the language of the urls, then instantiate
the `Article` object yourself with the language parameter and pass it in.
Arguments:
news_list {List[Union[str, Article, Source]]} -- List of sources,
articles, urls or a mix of them.
threads {int} -- Number of threads to use for fetching. This affects
how many items from the news_list are fetched at once. In order to control
how many threads are used in a `Source` object, use the
`Configuration`.`number_threads` setting. This could result in
a high number of threads. Maximum number of threads would be
`threads` * `Configuration`.`number_threads`.
def __init__(self, tasks, timeout_seconds):
Thread.__init__(self)
self.tasks = tasks
self.timeout = timeout_seconds
self.daemon = True
self.start()

def run(self):
while True:
try:
func, args, kargs = self.tasks.get(timeout=self.timeout)
except queue.Empty:
# Extra thread allocated, no job, exit gracefully
break
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()

self.tasks.task_done()


class ThreadPool:
def __init__(self, num_threads, timeout_seconds):
self.tasks = queue.Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks, timeout_seconds)

def add_task(self, func, *args, **kargs):
self.tasks.put((func, args, kargs))

def wait_completion(self):
self.tasks.join()


class NewsPool:
def __init__(self, config=None):
"""
Abstraction of a threadpool. A newspool can accept any number of
source OR article objects together in a list. It allocates one
thread to every source and then joins.
We allocate one thread per source to avoid rate limiting.
5 sources = 5 threads, one per source.
>>> import newspaper
>>> from newspaper import news_pool
>>> cnn_paper = newspaper.build('http://cnn.com')
>>> tc_paper = newspaper.build('http://techcrunch.com')
>>> espn_paper = newspaper.build('http://espn.com')
>>> papers = [cnn_paper, tc_paper, espn_paper]
>>> news_pool.set(papers)
>>> news_pool.join()
# All of your papers should have their articles html all populated now.
>>> cnn_paper.articles[50].html
u'<html>blahblah ... '
"""
self.pool = None
self.config = config or Configuration()

def join(self):
"""
Runs the mtheading and returns when all threads have joined
resets the task.
"""
if self.pool is None:
raise ConcurrencyException(
"Call set(..) with a list of source objects before calling .join(..)"
)
self.pool.wait_completion()
self.pool = None

def set(self, news_list, threads_per_source=1, override_threads=None):
"""
news_list can be a list of `Article`, `Source`, or both.
If caller wants to decide how many threads to use, they can use
`override_threads` which takes precedence over all. Otherwise,
this api infers that if the input is all `Source` objects, to
allocate one thread per `Source` to not spam the host.
If both of the above conditions are not true, default to 1 thread.
"""
from .source import Source
"""

if override_threads is not None:
num_threads = override_threads
elif all([isinstance(n, Source) for n in news_list]):
num_threads = threads_per_source * len(news_list)
def get_item(item: Union[str, Article, Source]) -> Union[Article, Source]:
if isinstance(item, Article):
item.download()
item.parse()
elif isinstance(item, Source):
item.download_articles()
item.parse_articles()
elif isinstance(item, str):
item = newspaper.article(url=item)
else:
num_threads = 1
raise TypeError(f"Invalid type {type(item)} for item {item}")

return item

timeout = self.config.thread_timeout_seconds
self.pool = ThreadPool(num_threads, timeout)
with ThreadPoolExecutor(max_workers=threads) as tpe:
results = tpe.map(get_item, news_list)

for news_object in news_list:
if isinstance(news_object, Source):
self.pool.add_task(news_object.download_articles)
else:
self.pool.add_task(news_object.download)
return list(results)
104 changes: 43 additions & 61 deletions newspaper/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
All code involving requests and responses over the http network
must be abstracted in this file.
"""
from concurrent.futures import ThreadPoolExecutor
import logging
from typing import Callable
import requests
from requests import RequestException
import tldextract

from .configuration import Configuration
from .mthreading import ThreadPool
from newspaper import parsers

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,6 +137,19 @@ def is_binary_url(url: str) -> bool:
return False


def do_request(url, config):
if not config.allow_binary_content and has_get_ranges(url):
if is_binary_url(url):
raise ArticleBinaryDataException(f"Article is binary data: {url}")

response = requests.get(
url=url,
**config.requests_params,
)

return response


def get_html(url, config=None, response=None):
"""HTTP response code agnostic"""
html = ""
Expand All @@ -162,16 +175,8 @@ def get_html_2XX_only(url, config=None, response=None):
if response is not None:
return _get_html_from_response(response, config), response.status_code

if not config.allow_binary_content and has_get_ranges(url):
if is_binary_url(url):
raise ArticleBinaryDataException(f"Article is binary data: {url}")
response = do_request(url, config)

response = requests.get(
url=url,
**config.requests_params,
)

# TODO: log warning with response codes<>200
if response.status_code != 200:
log.warning(
"get_html_2XX_only(): bad status code %s on URL: %s, html: %s",
Expand Down Expand Up @@ -205,62 +210,39 @@ def _get_html_from_response(response, config):
return html or ""


class MRequest:
"""Wrapper for request object for multithreading. If the domain we are
crawling is under heavy load, the self.resp will be left as None.
If this is the case, we still want to report the url which has failed
so (perhaps) we can try again later.
"""

def __init__(self, url, config=None):
self.url = url
self.config = config or Configuration()
self.resp = None

def send(self):
"""Send the request, set self.resp to the response object."""
self.resp = None
if not self.config.allow_binary_content and has_get_ranges(self.url):
if is_binary_url(self.url):
log.warning("MRequest.send() binary data: %s", self.url)
return

try:
self.resp = requests.get(
self.url,
**self.config.requests_params,
)
if self.config.http_success_only:
if self.resp.status_code >= 400:
log.warning(
"MRequest.send(): bad status code %s on URL: %s, html: %s",
self.resp.status_code,
self.url,
self.resp.text[:200],
)
self.resp = None
except RequestException as e:
log.error(
"MRequest.send(): [REQUEST FAILED] %s, on URL: %s", str(e), self.url
)


def multithread_request(urls, config=None):
"""Request multiple urls via mthreading, order of urls & requests is stable
returns same requests but with response variables filled.
"""
config = config or Configuration()
num_threads = config.number_threads
timeout = config.thread_timeout_seconds

pool = ThreadPool(num_threads, timeout)

m_requests = []
for url in urls:
m_requests.append(MRequest(url, config))
timeout = config.thread_timeout_seconds
requests_timeout = config.requests_params.get("timeout", 7)

for req in m_requests:
pool.add_task(req.send)
if timeout < requests_timeout:
log.warning(
"multithread_request(): Thread timeout %s < Requests timeout %s, could"
" cause threads to be stopped before getting the chance to finish. Consider"
" increasing the thread timeout.",
timeout,
requests_timeout,
)
results = []
with ThreadPoolExecutor(max_workers=config.number_threads) as tpe:
result_futures = [
tpe.submit(do_request, url=url, config=config) for url in urls
]
for idx, future in enumerate(result_futures):
url = urls[idx]
try:
results.append(future.result())
except TimeoutError:
results.append(None)
log.error("multithread_request(): Thread timeout for URL: %s", url)
except RequestException as e:
results.append(None)
log.warning(
"multithread_request(): Http download error %s on URL: %s", e, url
)

pool.wait_completion()
return m_requests
return results
Loading

0 comments on commit 081bd00

Please sign in to comment.