From 081bd009f0b410055826c6584e5807be842951fc Mon Sep 17 00:00:00 2001 From: Andrei Date: Fri, 29 Dec 2023 16:26:11 +0200 Subject: [PATCH] refactor: :rocket: refactoring of the multithreading implementation for downloading sources or articles news_pool was removed. any use of news_pool must be replaced with fetch_news() --- newspaper/__init__.py | 4 - newspaper/article.py | 1 + newspaper/mthreading.py | 168 +++++++++++----------------------------- newspaper/network.py | 104 ++++++++++--------------- newspaper/source.py | 150 ++++++++++++++--------------------- tests/test_misc.py | 79 +++++++++++++++++-- tests/test_source.py | 16 ++++ 7 files changed, 235 insertions(+), 287 deletions(-) diff --git a/newspaper/__init__.py b/newspaper/__init__.py index b0b13c9..527b370 100644 --- a/newspaper/__init__.py +++ b/newspaper/__init__.py @@ -24,14 +24,11 @@ Configuration as Config, ) from .article import Article, ArticleException -from .mthreading import NewsPool from .source import Source from .version import __version__ import logging from logging import NullHandler -news_pool = NewsPool() - # Set default logging handler to avoid "No handler found" warnings. logging.getLogger(__name__).addHandler(NullHandler()) @@ -78,5 +75,4 @@ def article(url: str, language: Optional[str] = "en", **kwargs) -> Article: "ArticleException", "Source", "__version__", - "news_pool", ] diff --git a/newspaper/article.py b/newspaper/article.py index 7fab49b..79cca82 100644 --- a/newspaper/article.py +++ b/newspaper/article.py @@ -464,6 +464,7 @@ def parse(self) -> "Article": if self.doc is None: # `parse` call failed, return nothing + self.is_parsed = True return self # TODO: Fix this, sync in our fix_url() method diff --git a/newspaper/mthreading.py b/newspaper/mthreading.py index 665a9a2..0305895 100644 --- a/newspaper/mthreading.py +++ b/newspaper/mthreading.py @@ -1,135 +1,55 @@ -# -*- coding: utf-8 -*- -# Much of the code here was forked from https://github.com/codelucas/newspaper -# Copyright (c) Lucas Ou-Yang (codelucas) - -""" -Anything that has to do with threading in this library -must be abstracted in this file. If we decide to do gevent -also, it will deserve its own gevent file. """ +Helper functions for multihtreading news fetching. +""" -import logging -import queue -import traceback - - -from threading import Thread - -from .configuration import Configuration - -log = logging.getLogger(__name__) - - -class ConcurrencyException(Exception): - pass +from concurrent.futures import ThreadPoolExecutor +from typing import List, Union +import newspaper +from newspaper.article import Article +from newspaper.source import Source -class Worker(Thread): - """ - Thread executing tasks from a given tasks queue. +def fetch_news( + news_list: List[Union[str, Article, Source]], threads: int = 5 +) -> List[Union[Article, Source]]: """ + Fetch news from a list of sources, articles, or both. Threads will be + allocated to download and parse the sources or articles. If urls are + passed into the list, then a new `Article` object will be created for + it and downloaded + parsed. + There will be no nlp done on the articles. + If there is a problem in detecting the language of the urls, then instantiate + the `Article` object yourself with the language parameter and pass it in. + + Arguments: + news_list {List[Union[str, Article, Source]]} -- List of sources, + articles, urls or a mix of them. + + threads {int} -- Number of threads to use for fetching. This affects + how many items from the news_list are fetched at once. In order to control + how many threads are used in a `Source` object, use the + `Configuration`.`number_threads` setting. This could result in + a high number of threads. Maximum number of threads would be + `threads` * `Configuration`.`number_threads`. - def __init__(self, tasks, timeout_seconds): - Thread.__init__(self) - self.tasks = tasks - self.timeout = timeout_seconds - self.daemon = True - self.start() - - def run(self): - while True: - try: - func, args, kargs = self.tasks.get(timeout=self.timeout) - except queue.Empty: - # Extra thread allocated, no job, exit gracefully - break - try: - func(*args, **kargs) - except Exception: - traceback.print_exc() - - self.tasks.task_done() - - -class ThreadPool: - def __init__(self, num_threads, timeout_seconds): - self.tasks = queue.Queue(num_threads) - for _ in range(num_threads): - Worker(self.tasks, timeout_seconds) - - def add_task(self, func, *args, **kargs): - self.tasks.put((func, args, kargs)) - - def wait_completion(self): - self.tasks.join() - - -class NewsPool: - def __init__(self, config=None): - """ - Abstraction of a threadpool. A newspool can accept any number of - source OR article objects together in a list. It allocates one - thread to every source and then joins. - - We allocate one thread per source to avoid rate limiting. - 5 sources = 5 threads, one per source. - - >>> import newspaper - >>> from newspaper import news_pool - - >>> cnn_paper = newspaper.build('http://cnn.com') - >>> tc_paper = newspaper.build('http://techcrunch.com') - >>> espn_paper = newspaper.build('http://espn.com') - - >>> papers = [cnn_paper, tc_paper, espn_paper] - >>> news_pool.set(papers) - >>> news_pool.join() - - # All of your papers should have their articles html all populated now. - >>> cnn_paper.articles[50].html - u'blahblah ... ' - """ - self.pool = None - self.config = config or Configuration() - - def join(self): - """ - Runs the mtheading and returns when all threads have joined - resets the task. - """ - if self.pool is None: - raise ConcurrencyException( - "Call set(..) with a list of source objects before calling .join(..)" - ) - self.pool.wait_completion() - self.pool = None - - def set(self, news_list, threads_per_source=1, override_threads=None): - """ - news_list can be a list of `Article`, `Source`, or both. - - If caller wants to decide how many threads to use, they can use - `override_threads` which takes precedence over all. Otherwise, - this api infers that if the input is all `Source` objects, to - allocate one thread per `Source` to not spam the host. - - If both of the above conditions are not true, default to 1 thread. - """ - from .source import Source + """ - if override_threads is not None: - num_threads = override_threads - elif all([isinstance(n, Source) for n in news_list]): - num_threads = threads_per_source * len(news_list) + def get_item(item: Union[str, Article, Source]) -> Union[Article, Source]: + if isinstance(item, Article): + item.download() + item.parse() + elif isinstance(item, Source): + item.download_articles() + item.parse_articles() + elif isinstance(item, str): + item = newspaper.article(url=item) else: - num_threads = 1 + raise TypeError(f"Invalid type {type(item)} for item {item}") + + return item - timeout = self.config.thread_timeout_seconds - self.pool = ThreadPool(num_threads, timeout) + with ThreadPoolExecutor(max_workers=threads) as tpe: + results = tpe.map(get_item, news_list) - for news_object in news_list: - if isinstance(news_object, Source): - self.pool.add_task(news_object.download_articles) - else: - self.pool.add_task(news_object.download) + return list(results) diff --git a/newspaper/network.py b/newspaper/network.py index 5c5d673..349a5c7 100644 --- a/newspaper/network.py +++ b/newspaper/network.py @@ -6,6 +6,7 @@ All code involving requests and responses over the http network must be abstracted in this file. """ +from concurrent.futures import ThreadPoolExecutor import logging from typing import Callable import requests @@ -13,7 +14,6 @@ import tldextract from .configuration import Configuration -from .mthreading import ThreadPool from newspaper import parsers log = logging.getLogger(__name__) @@ -137,6 +137,19 @@ def is_binary_url(url: str) -> bool: return False +def do_request(url, config): + if not config.allow_binary_content and has_get_ranges(url): + if is_binary_url(url): + raise ArticleBinaryDataException(f"Article is binary data: {url}") + + response = requests.get( + url=url, + **config.requests_params, + ) + + return response + + def get_html(url, config=None, response=None): """HTTP response code agnostic""" html = "" @@ -162,16 +175,8 @@ def get_html_2XX_only(url, config=None, response=None): if response is not None: return _get_html_from_response(response, config), response.status_code - if not config.allow_binary_content and has_get_ranges(url): - if is_binary_url(url): - raise ArticleBinaryDataException(f"Article is binary data: {url}") + response = do_request(url, config) - response = requests.get( - url=url, - **config.requests_params, - ) - - # TODO: log warning with response codes<>200 if response.status_code != 200: log.warning( "get_html_2XX_only(): bad status code %s on URL: %s, html: %s", @@ -205,62 +210,39 @@ def _get_html_from_response(response, config): return html or "" -class MRequest: - """Wrapper for request object for multithreading. If the domain we are - crawling is under heavy load, the self.resp will be left as None. - If this is the case, we still want to report the url which has failed - so (perhaps) we can try again later. - """ - - def __init__(self, url, config=None): - self.url = url - self.config = config or Configuration() - self.resp = None - - def send(self): - """Send the request, set self.resp to the response object.""" - self.resp = None - if not self.config.allow_binary_content and has_get_ranges(self.url): - if is_binary_url(self.url): - log.warning("MRequest.send() binary data: %s", self.url) - return - - try: - self.resp = requests.get( - self.url, - **self.config.requests_params, - ) - if self.config.http_success_only: - if self.resp.status_code >= 400: - log.warning( - "MRequest.send(): bad status code %s on URL: %s, html: %s", - self.resp.status_code, - self.url, - self.resp.text[:200], - ) - self.resp = None - except RequestException as e: - log.error( - "MRequest.send(): [REQUEST FAILED] %s, on URL: %s", str(e), self.url - ) - - def multithread_request(urls, config=None): """Request multiple urls via mthreading, order of urls & requests is stable returns same requests but with response variables filled. """ config = config or Configuration() - num_threads = config.number_threads - timeout = config.thread_timeout_seconds - pool = ThreadPool(num_threads, timeout) - - m_requests = [] - for url in urls: - m_requests.append(MRequest(url, config)) + timeout = config.thread_timeout_seconds + requests_timeout = config.requests_params.get("timeout", 7) - for req in m_requests: - pool.add_task(req.send) + if timeout < requests_timeout: + log.warning( + "multithread_request(): Thread timeout %s < Requests timeout %s, could" + " cause threads to be stopped before getting the chance to finish. Consider" + " increasing the thread timeout.", + timeout, + requests_timeout, + ) + results = [] + with ThreadPoolExecutor(max_workers=config.number_threads) as tpe: + result_futures = [ + tpe.submit(do_request, url=url, config=config) for url in urls + ] + for idx, future in enumerate(result_futures): + url = urls[idx] + try: + results.append(future.result()) + except TimeoutError: + results.append(None) + log.error("multithread_request(): Thread timeout for URL: %s", url) + except RequestException as e: + results.append(None) + log.warning( + "multithread_request(): Http download error %s on URL: %s", e, url + ) - pool.wait_completion() - return m_requests + return results diff --git a/newspaper/source.py b/newspaper/source.py index fa80042..f9b582c 100644 --- a/newspaper/source.py +++ b/newspaper/source.py @@ -7,6 +7,7 @@ www.cnn.com would be its own source. """ +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import logging import re @@ -87,8 +88,8 @@ class Source: def __init__( self, url: str, - read_more_link: str = None, - config: Configuration = None, + read_more_link: Optional[str] = None, + config: Optional[Configuration] = None, **kwargs ): """The config object for this source will be passed into all of this @@ -159,7 +160,7 @@ def build(self): self.generate_articles() - def purge_articles(self, reason, articles): + def purge_articles(self, reason: str, articles: List[Article]) -> List[Article]: """Delete rejected articles, if there is an articles param, purge from there, otherwise purge from source instance. @@ -168,9 +169,9 @@ def purge_articles(self, reason, articles): http://stackoverflow.com/questions/1207406/remove-items-from-a-list-while-iterating-in-python """ if reason == "url": - articles[:] = [a for a in articles if a.is_valid_url()] + articles = [a for a in articles if a.is_valid_url()] elif reason == "body": - articles[:] = [a for a in articles if a.is_valid_body()] + articles = [a for a in articles if a.is_valid_body()] return articles @utils.cache_disk(seconds=(86400 * 1), cache_folder=ANCHOR_DIRECTORY) @@ -203,22 +204,13 @@ def set_feeds(self): common_feed_urls_as_categories = [Category(url=url) for url in common_feed_urls] category_urls = [c.url for c in common_feed_urls_as_categories] - requests = network.multithread_request(category_urls, self.config) - - for index, _ in enumerate(common_feed_urls_as_categories): - response = requests[index].resp - if response and response.ok: - try: - common_feed_urls_as_categories[index].html = network.get_html( - response.url, response=response - ) - except network.ArticleBinaryDataException: - log.warning( - "Deleting feed %s from source %s due to binary data", - common_feed_urls_as_categories[index].url, - self.url, - ) + responses = network.multithread_request(category_urls, self.config) + for response, feed in zip(responses, common_feed_urls_as_categories): + if response and response.status_code < 400: + feed.html = network.get_html(feed.url, response=response) + + # Remove empty or erroneous feeds common_feed_urls_as_categories = [ c for c in common_feed_urls_as_categories if c.html ] @@ -250,39 +242,27 @@ def download(self): def download_categories(self): """Download all category html, can use mthreading""" - category_urls = [c.url for c in self.categories] - requests = network.multithread_request(category_urls, self.config) - - for index, _ in enumerate(self.categories): - req = requests[index] - if req.resp is not None: - self.categories[index].html = network.get_html( - req.url, response=req.resp - ) - else: - log.warning( - "Deleting category %s from source %s due to download error", - self.categories[index].url, - self.url, - ) + category_urls = self.category_urls() + responses = network.multithread_request(category_urls, self.config) + + for response, category in zip(responses, self.categories): + if response and response.status_code < 400: + category.html = network.get_html(category.url, response=response) + self.categories = [c for c in self.categories if c.html] + return self.categories + def download_feeds(self): """Download all feed html, can use mthreading""" - feed_urls = [f.url for f in self.feeds] - requests = network.multithread_request(feed_urls, self.config) - - for index, _ in enumerate(self.feeds): - req = requests[index] - if req.resp is not None: - self.feeds[index].rss = network.get_html(req.url, response=req.resp) - else: - log.warning( - "Deleting feed %s from source %s due to download error", - self.categories[index].url, - self.url, - ) + feed_urls = self.feed_urls() + responses = network.multithread_request(feed_urls, self.config) + + for response, feed in zip(responses, self.feeds): + if response and response.status_code < 400: + feed.rss = network.get_html(feed.url, response=response) self.feeds = [f for f in self.feeds if f.rss] + return self.feeds def parse(self): """Sets the lxml root, also sets lxml roots of all @@ -321,7 +301,7 @@ def parse_feeds(self): log.debug("We are parsing %d feeds", self.feeds) self.feeds = [self._map_title_to_feed(f) for f in self.feeds] - def feeds_to_articles(self): + def feeds_to_articles(self) -> List[Article]: """Returns a list of :any:`Article` objects based on articles found in the Source's RSS feeds""" articles = [] @@ -364,7 +344,7 @@ def get_urls(feed): ) return articles - def categories_to_articles(self): + def categories_to_articles(self) -> List[Article]: """Takes the categories, splays them into a big list of urls and churns the articles out of each url with the url_to_article method """ @@ -426,63 +406,47 @@ def generate_articles(self, limit=5000): self.articles = articles[:limit] log.debug("%d articles generated and cutoff at %d", len(articles), limit) - def download_articles(self, threads=1): + def download_articles(self) -> List[Article]: """Starts the ``download()`` for all :any:`Article` objects from the ``articles`` property. It can run single threaded or multi-threaded. - Arguments: - threads(int): The number of threads to use for downloading - articles. Default is 1. + Returns: + List[:any:`Article`]: A list of downloaded articles. """ - # TODO fix how the article's is_downloaded is not set! - urls = [a.url for a in self.articles] + urls = self.article_urls() failed_articles = [] - def get_all_articles(): - for index, _ in enumerate(self.articles): - url = urls[index] - try: - html = network.get_html(url, config=self.config) - except network.ArticleBinaryDataException: - log.warning( - "Deleting article %s from source %s due to binary data", - url, - self.url, - ) + threads = self.config.number_threads + + if threads > NUM_THREADS_PER_SOURCE_WARN_LIMIT: + log.warning( + "Using %s+ threads on a single source may result in rate limiting!", + NUM_THREADS_PER_SOURCE_WARN_LIMIT, + ) + responses = network.multithread_request(urls, self.config) + # Note that the responses are returned in original order + with ThreadPoolExecutor(max_workers=threads) as tpe: + futures = [] + for response, article in zip(responses, self.articles): + if response and response.status_code < 400: + html = network.get_html(article.url, response=response) + else: html = "" + failed_articles.append(article.url) - self.articles[index].html = html - if not html: - failed_articles.append(self.articles[index]) - return [a for a in self.articles if a.html] - - def get_multithreaded_articles(): - filled_requests = network.multithread_request(urls, self.config) - # Note that the responses are returned in original order - for index, req in enumerate(filled_requests): - html = network.get_html(req.url, response=req.resp) - self.articles[index].html = html - if not req.resp: - failed_articles.append(self.articles[index]) - return [a for a in self.articles if a.html] - - if threads == 1: - self.articles = get_all_articles() - else: - if threads > NUM_THREADS_PER_SOURCE_WARN_LIMIT: - log.warning( - "Using %s+ threads on a single source may result in rate limiting!", - NUM_THREADS_PER_SOURCE_WARN_LIMIT, - ) + futures.append(tpe.submit(article.download, input_html=html)) - self.articles = get_multithreaded_articles() + self.articles = [future.result() for future in futures] self.is_downloaded = True + if len(failed_articles) > 0: log.warning( - "The following article urls failed the download: %s", - ", ".join([a.url for a in failed_articles]), + "There were %d articles that failed to download: %s", + len(failed_articles), + ", ".join(failed_articles), ) + return self.articles def parse_articles(self): """Parse all articles, delete if too small""" diff --git a/tests/test_misc.py b/tests/test_misc.py index a914f9b..93fea32 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,7 +1,48 @@ import os import pytest import newspaper +from newspaper.article import Article from newspaper.configuration import Configuration +from newspaper.mthreading import fetch_news +from newspaper.network import multithread_request + + +@pytest.fixture +def download_urls(): + return [ + { + "url": "http://echo.jsontest.com/key/value/one/two", + "json": {"one": "two", "key": "value"}, + }, + {"url": "http://ipv4.download.thinkbroadband.com/5MB.zip", "size": 5000000}, + {"url": "https://httpbin.org/delay/5", "size": 100}, + {"url": "https://httpbin.org/image/jpeg", "size": 35000}, + { + "url": "https://freetestdata.com/wp-content/uploads/2023/11/7.7-KB.txt", + "size": 7700, + }, + { + "url": "https://freetestdata.com/wp-content/uploads/2023/11/23KB.txt", + "size": 23000, + }, + { + "url": "https://freetestdata.com/wp-content/uploads/2023/11/40-KB.txt", + "size": 40000, + }, + { + "url": "https://freetestdata.com/wp-content/uploads/2023/11/160-KB.txt", + "size": 160000, + }, + ] + + +@pytest.fixture +def article_urls(): + return [ + "https://edition.cnn.com/travel/air-algeria-airplane-stowaway-critical-condition/index.html", + "https://edition.cnn.com/videos/us/2023/12/28/man-pulls-gun-on-woman-road-rage-pkg-vpx.knxv", + "https://www.foxnews.com/us/antisemitism-exposed-hate-soars-on-campus-tennis-legend-weighs-in-on-viral-video", + ] def test_hot_trending(): @@ -20,19 +61,47 @@ def test_languages(): # Skip if GITHUB_ACTIONS @pytest.mark.skipif("GITHUB_ACTIONS" in os.environ, reason="Skip if GITHUB_ACTIONS") -def test_multithread_download(): +def test_multihread_requests(download_urls): + config = Configuration() + config.number_threads = 3 + config.thread_timeout_seconds = 15 + config.http_success_only = True + config.allow_binary_content = True + results = multithread_request([x["url"] for x in download_urls], config=config) + + assert len(results) == len(download_urls) + for result, expected in zip(results, download_urls): + assert result is not None + assert result.status_code == 200 + if "json" in expected: + assert result.json() == expected["json"] + elif "size" in expected: + assert len(result.content) > expected["size"] + + +# Skip if GITHUB_ACTIONS. It takes a lot of time +@pytest.mark.skipif("GITHUB_ACTIONS" in os.environ, reason="Skip if GITHUB_ACTIONS") +def test_multithread_sources(article_urls): config = Configuration() config.memorize_articles = False slate_paper = newspaper.build("http://slate.com", config=config) tc_paper = newspaper.build("http://techcrunch.com", config=config) espn_paper = newspaper.build("http://time.com", config=config) + articles = [Article(url=u) for u in article_urls] + + urls = [ + "https://www.foxnews.com/media/homeowner-new-florida-bill-close-squatting-loophole-return-some-fairness", + "https://edition.cnn.com/2023/12/27/middleeast/dutch-diplomat-humanitarian-aid-gaza-sigrid-kaag-intl/index.html", + ] + papers = [slate_paper, tc_paper, espn_paper] - for paper in papers: - paper.articles = paper.articles[:20] - newspaper.news_pool.set(papers, threads_per_source=2) + papers.extend(articles) + papers.extend(urls) + + results = fetch_news(papers, threads=4) - newspaper.news_pool.join() + assert len(results) == len(papers) assert len(slate_paper.articles[-1].html) > 0 assert len(espn_paper.articles[-1].html) > 0 diff --git a/tests/test_source.py b/tests/test_source.py index d63be0f..fc5dc65 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -1,5 +1,6 @@ import pytest from newspaper import Source +from newspaper.article import ArticleDownloadState from newspaper.settings import MEMO_DIR from newspaper.utils import domain_to_filename import tests.conftest as conftest @@ -175,3 +176,18 @@ def test_get_feeds(self, feed_sources): source.build() # source.set_feeds() assert feed_source["feeds"] <= len(source.feeds) + + def test_download_all_articles(self, cnn_source): + source = Source(cnn_source["url"], verbose=False, memorize_articles=False) + source.clean_memo_cache() + + source.html = cnn_source["html_content"] + source.parse() + source.set_feeds() + source.download_feeds() # mthread + + source.generate_articles(limit=30) + articles = source.download_articles(threads=4) + + assert len(articles) == 30 + assert all([a.download_state == ArticleDownloadState.SUCCESS for a in articles])