-
Notifications
You must be signed in to change notification settings - Fork 1
/
bangs
executable file
·164 lines (128 loc) · 4.98 KB
/
bangs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python3
import argparse
import json
import pathlib
import re
import sys
import time
import traceback
import urllib.parse
import webbrowser
import bs4
import peewee
import requests
import yaml
HERE = pathlib.Path(__file__).resolve().parent
ROOT = HERE.parent
DATADIR = ROOT / 'data'
DATADIR.mkdir(exist_ok=True)
DATABASE_FILE = DATADIR / 'bangs.db'
BLACKLIST_FILE = DATADIR / 'blacklist.txt'
db = peewee.SqliteDatabase(str(DATABASE_FILE))
class Bang(peewee.Model):
title = peewee.TextField()
bang = peewee.TextField()
url = peewee.TextField()
class Meta:
database = db
def list_bangs():
r = requests.get('https://duckduckgo.com/bang_lite.html')
assert r.status_code == 200
pattern = re.compile(r'^(.*) \((![^ ]+)\)<br>$')
bangs = []
for line in r.text.split('\n'):
m = pattern.match(line)
if m:
bangs.append((m[1].strip(), m[2]))
return bangs
def resolve_bang(bang):
if not bang.startswith('!'):
bang = '!%s' % bang
gibberish = 'randomgibberish'
url = 'https://duckduckgo.com/?q=%s+%s' % (urllib.parse.quote(bang), gibberish)
r = requests.get(url)
soup = bs4.BeautifulSoup(r.text, 'html.parser')
# e.g., !gt => '0; url=http://translate.google.com/#auto/en/randomgibberish'
redirect_url = soup.select_one('meta[http-equiv=refresh]')['content'][7:]
if redirect_url.startswith('/'):
redirect_url = urllib.parse.parse_qs(urllib.parse.urlparse(redirect_url).query)['uddg'][0]
bang_url = redirect_url.replace(gibberish, '{{{s}}}')
assert bang_url.startswith('http'), '%s is not an HTTP URL' % bang_url
return bang_url
def browser_open(bang, autoraise=True):
if not bang.startswith('!'):
bang = '!%s' % bang
webbrowser.open('https://duckduckgo.com/?q=%s+gibberish' % urllib.parse.quote(bang),
autoraise=autoraise)
def do_resolve(args):
print(resolve_bang(args.bang))
return 0
def do_open(args):
browser_open(args.bang)
return 0
def do_update(args):
db.create_tables([Bang], safe=True)
bang_list = list_bangs()
total = len(bang_list)
cached_bangs = set(record.bang for record in Bang.select())
if BLACKLIST_FILE.exists():
with open(BLACKLIST_FILE) as fp:
blacklist = set(line.strip() for line in fp if line.startswith('!'))
else:
blacklist = set()
bang_list = [(title, bang) for title, bang in bang_list
if bang not in cached_bangs and bang not in blacklist]
print('%d bangs total, %d new:' % (total, len(bang_list)))
retval = 0
for index, (title, bang) in enumerate(bang_list):
try:
bang_url = resolve_bang(bang)
except Exception:
print('Error resolving %s' % bang, file=sys.stderr)
traceback.print_exc()
retval = 1
if args.open_on_error:
browser_open(bang, autoraise=False)
time.sleep(3)
else:
Bang.create(title=title, bang=bang, url=bang_url)
print('%d\t%s (%s): %s' % (index, bang, title, bang_url))
time.sleep(0.2)
return retval
def do_dump(args):
records = [dict(title=record.title, bang=record.bang, url=record.url)
for record in Bang.select()]
records.sort(key=lambda record: record['bang'])
if args.format == 'json':
output = json.dumps(records, ensure_ascii=False, indent=2, sort_keys=True)
elif args.format == 'yaml':
output = yaml.dump(records, allow_unicode=True, default_flow_style=False)
else:
raise NotImplementedError('Unknown output format %s' % args.format)
if args.output is None:
sys.stdout.write(output)
else:
with open(args.output, 'w') as fp:
fp.write(output)
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
parser_resolve = subparsers.add_parser('resolve', help='resolve a bang')
parser_resolve.add_argument('bang')
parser_resolve.set_defaults(handler=do_resolve)
parser_open = subparsers.add_parser('open', help='open sample bang query in browser')
parser_open.add_argument('bang')
parser_open.set_defaults(handler=do_open)
parser_update = subparsers.add_parser('update', help='fetch and update bang cache')
parser_update.add_argument('-o', '--open-on-error', action='store_true',
help='automatically open (in browser) bangs that fail to resolve')
parser_update.set_defaults(handler=do_update)
parser_dump = subparsers.add_parser('dump', help='dump cached bangs in a structured way')
parser_dump.add_argument('-f', '--format', choices=('json', 'yaml'), default='json',
help='output format, json or yaml (default is json)')
parser_dump.add_argument('output', nargs='?', help='output file (default is stdout)')
parser_dump.set_defaults(handler=do_dump)
args = parser.parse_args()
sys.exit(args.handler(args))
if __name__ == '__main__':
main()