-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
305 lines (246 loc) · 11.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
""" Ulauncher extension that lets you search and open MediaWiki pages """
from __future__ import annotations
import os
import re
from typing import cast, Dict
# noinspection PyPep8Naming
from urllib.parse import ParseResult as URL, urlparse
import requests
import validators
from bs4 import BeautifulSoup
from cachetools import LRUCache, cachedmethod
from cachetools.keys import hashkey
from ulauncher.api.client.Extension import Extension
from ulauncher.api.shared.event import KeywordQueryEvent, PreferencesUpdateEvent, PreferencesEvent
from data import MEDIA_WIKI_DETECTION_REGEXES_META, MEDIA_WIKI_DETECTION_REGEXES_CONTENT, \
COMMON_API_ENDPOINTS, KNOWN_API_ENDPOINTS, MEDIA_WIKI_USER_AGENT, TITLE_READABILITY_IMPROVEMENTS
from data.WikiPage import WikiPage
from events.KeywordQueryEventListener import KeywordQueryEventListener
from events.PreferencesEventListener import PreferencesEventListener
from events.PreferencesUpdateEventListener import PreferencesUpdateEventListener
# noinspection PyPep8Naming
from utils.API import API
from utils.SortedList import SortedList
class WikiSearchExtension(Extension):
""" Main Extension Class """
_apis: Dict[str, API] = {}
_cache: LRUCache = LRUCache(maxsize=32)
def __init__(self):
""" Initializes the extension """
super().__init__()
self.subscribe(KeywordQueryEvent, KeywordQueryEventListener())
self.subscribe(PreferencesEvent, PreferencesEventListener())
self.subscribe(PreferencesUpdateEvent, PreferencesUpdateEventListener())
@staticmethod
def get_base_icon():
"""
Returns the base (project) icon
:return: None
"""
path = os.path.join(os.path.dirname(__file__), "images", "icon.svg")
if path is None or not os.path.isfile(path):
raise FileNotFoundError("Cant find base icon")
return path
@staticmethod
def _parse_url(raw_url: str) -> URL | None:
"""
Validates and parses provided URL
:param raw_url: URL to parse
:return: Parsed URL or None if invalid
"""
# Add dummy "http" schema in front of the URL if it starts with "//" to support RFC 1808
if validators.url("http:" + raw_url if re.match(r"^//", raw_url) else raw_url):
return urlparse(raw_url)
if validators.domain(raw_url):
return urlparse("//" + raw_url)
return None
@staticmethod
def _url_to_api(url: URL) -> API:
"""
Converts URL into MediaWiki API object
:param url: URL to convert
:return: MediaWiki API object
"""
return API(
host=url.netloc,
scheme=url.scheme,
path=url.path,
clients_useragent=MEDIA_WIKI_USER_AGENT,
force_login=False
)
@staticmethod
def _request(url: str, params=None, **kwargs):
r"""
Wrapper for the `requests.get` method
that automatically adds proper user agent to the request
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary, list of tuples or bytes to send
in the query string for the :class:`Request`.
:param kwargs: \*\*kwargs: Optional arguments that ``request`` takes.
:return: :class:`Response <Response>` object
:rtype: requests.Response
"""
return requests.get(url, params=params, **kwargs,
headers={"user-agent": MEDIA_WIKI_USER_AGENT})
# noinspection PyProtectedMember
def _get_api(self, url: URL) -> API | None:
"""
Resolves MediaWiki API from the provided url
:param url: URL pointing to the MediaWiki site
:return: MediaWiki API or None if not resolved
"""
# If there is no scheme specified set it as "http" and relay on the https redirect
if not url.scheme:
url = url._replace(scheme="http")
res = self._request(url.geturl())
# Maybe the site doesn't support https redirects? Let's try doing that manually
if not res.ok:
url = url._replace(scheme="https")
res = self._request(url.geturl())
# Either the response was unsuccessful or the returned response type was not HTML
content_type = res.headers.get("content-type")
if not res.ok or "text/html" not in (content_type or ""):
return None
new_url = urlparse(res.url)
url = url._replace(scheme=new_url.scheme)
# Let's check if the site is running MediaWiki
media_wiki: bool = any(
regex.findall(res.text) for regex in MEDIA_WIKI_DETECTION_REGEXES_CONTENT)
if not media_wiki:
soup = BeautifulSoup(res.text, "html.parser")
media_wiki = any(regex.match(
(soup.find("meta", attrs={"name": key}) or {}).get("content") or ""
) for key, regex in MEDIA_WIKI_DETECTION_REGEXES_META.items())
if not media_wiki:
return None
# Final part, let's resolve the actual API endpoint
# First let's check common websites
known_api_endpoint = next(endpoint_data for endpoint_data in KNOWN_API_ENDPOINTS if
endpoint_data.regex.match(cast(str, url.hostname)))
if known_api_endpoint:
url = url._replace(path=known_api_endpoint.path)
return self._url_to_api(url)
# Otherwise, check common API endpoints and try to determine the valid one
for common_endpoint in COMMON_API_ENDPOINTS:
url = url._replace(path=common_endpoint)
res = self._request(url.geturl() + "api.php", params={
"format": "json",
"action": "query",
"meta": "siteinfo",
"siprop": "general"
})
content_type = res.headers.get("content-type")
if res.ok and "application/json" in (content_type or "") and \
not res.json().get("error"):
return self._url_to_api(url)
return None
def parse_wiki_urls(self, raw_wiki_urls: str) -> None:
"""
Parses raw list of wiki urls and adds them to the list
:param raw_wiki_urls: Raw list of wiki urls
"""
if not raw_wiki_urls:
return
self.logger.info("Parsing wiki urls...")
matches = list(filter(None, [self._parse_url(raw_url.strip()) for raw_url in
re.findall(r"(?: *\| *)?(\S+)", raw_wiki_urls)]))
endpoints: list[API] = []
for url in matches:
self.logger.debug("Resolving API endpoint for %s", url.netloc)
endpoint: API | None = None
if self._apis.get(url.netloc):
endpoint = cast(API, self._apis.get(url.netloc))
endpoints.append(endpoint)
self.logger.debug("API endpoint found in cache: hostname=%s scheme=%s, path=%s",
endpoint.host, endpoint.scheme, endpoint.path)
continue
endpoint = self._get_api(url)
if endpoint:
endpoints.append(endpoint)
self.logger.debug("Resolved API endpoint: hostname=%s scheme=%s, path=%s",
endpoint.host, endpoint.scheme, endpoint.path)
continue
self.logger.warning("Unable to resolve API endpoint for %s", url.netloc)
self._apis = {}
for endpoint in endpoints:
self._apis[endpoint.host] = endpoint
self._cache.clear()
self.logger.info("Parsing completed, resolved %s/%s URLs", len(endpoints), len(matches))
@cachedmethod(lambda self: self._cache, lambda self, query: hashkey(query.lower().strip()))
def search(self, query: str) -> SortedList[WikiPage]:
"""
Searches wikis for the query and returns combined results from all of them
:param query: Text to search
:return: Combined results
"""
pages = SortedList[WikiPage](query, min_score=60, limit=8)
for wiki in self._apis.values():
namespaces = list(filter(
lambda value: value.has_content,
wiki.namespaces.values()
))
result = wiki.get(
# Basic Options
action="query",
prop=["categories", "langlinks", "info"],
generator="search",
# Categories Options
cllimit=500,
# Language Links Options
lllang="en",
lllimit=500,
# Info Options
inprop=["displaytitle", "url"],
# Generator Options
gsrsearch=query,
gsrnamespace=list(map(lambda value: value.id, namespaces)),
gsrlimit=10
)
if "query" not in result or not result["query"] or "pages" not in result["query"] or \
not result["query"]["pages"]:
continue
raw_pages = cast(dict, result["query"]["pages"]).values()
for raw_page in raw_pages:
title = cast(str, raw_page["title"])
display_title = cast(str, raw_page["displaytitle"])
namespace = next((namespace.name for namespace in namespaces
if namespace.id == raw_page["ns"]), None)
if self.preferences["improved_filters"]:
# Filter main page
if title == cast(str, wiki.site["mainpage"]):
continue
# Filter translation pages
if len(raw_page["langlinks"] if "langlinks" in raw_page else []) > 0:
continue
# Filter formatting pages
formatting_category = next((category for category in (
cast(list, raw_page["categories"]) if "categories" in raw_page else []
) if re.match(
r"(?:Category:)?Format(?:ting)?(?:\s+)?subpage(?:s)?",
category["title"]
)), None)
if formatting_category:
continue
if self.preferences["improved_titles"]:
for improvement in TITLE_READABILITY_IMPROVEMENTS:
display_title = improvement["regex"].sub(
improvement["replacement"],
display_title
)
# Strip namespace from the title
if namespace:
display_title = re.sub(rf"{namespace}:\s+", "", display_title)
pages.append(
WikiPage(
wiki=wiki,
id=cast(int, raw_page["pageid"]),
title=title,
display_title=display_title,
namespace=namespace or "Unknown",
url=cast(str, raw_page["fullurl"])
)
)
return pages
if __name__ == "__main__":
extension = WikiSearchExtension()
extension.run()