forked from Coldcard/ckbunker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
webapp.py
executable file
·885 lines (682 loc) · 28.1 KB
/
webapp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
#!/usr/bin/env python
#
# A web server.
#
import sys, os, asyncio, logging, aiohttp_jinja2, jinja2, time, weakref, re
from aiohttp import web
from yarl import URL
from conn import Connection, MissingColdcard
from ckcc.protocol import CCProtocolPacker
from utils import pformat_json, json_loads, json_dumps, cleanup_psbt
from objstruct import ObjectStruct
from aiohttp.web_exceptions import HTTPMovedPermanently, HTTPNotFound, HTTPBadRequest, HTTPFound
from decimal import Decimal
import aiohttp_session
from aiohttp_session import get_session, new_session
from base64 import b32encode, b64decode, b64encode
from binascii import b2a_hex, a2b_hex
from status import STATUS
from persist import settings, BP
from hashlib import sha256
from chain import broadcast_txn
from version import VERSION
try:
from jinja2 import Markup, escape
except ImportError:
from markupsafe import Markup, escape
import policy
from ckcc.constants import USER_AUTH_TOTP, USER_AUTH_HMAC, USER_AUTH_SHOW_QR, MAX_USERNAME_LEN
from ckcc.constants import STXN_VISUALIZE, STXN_SIGNED, AF_P2WPKH, AF_CLASSIC
from ckcc.protocol import CCUserRefused
logging.getLogger(__name__).addHandler(logging.NullHandler())
routes = web.RouteTableDef()
web_sockets = weakref.WeakSet()
class HTMLErrorMsg(ValueError):
def __init__(self, html):
super(HTMLErrorMsg, self).__init__(Markup(html))
APPROVE_CTA = '''\
Please consult the Coldcard screen and review the HSM policy shown there. If you \
are satisfied it does what you need, approve the policy and the Coldcard will enter HSM mode.
'''
def default_context():
#
# Put values you want in every template here. They cannot vary per-request.
#
rv = ObjectStruct(VERSION=VERSION)
# this defines the nav menu in top bar
rv.PAGES = [ ('/', 'Sign Transaction'),
('/tools', 'Tools'),
('/setup', 'Coldcard Setup'),
('/bunker', 'Bunker Setup'),
#('/help', 'Help')
]
rv['zip'] = zip
return rv
async def add_shared_ctx(request, **rv):
# add ctx vars needed to support fancy Vue.js stuff on logged-in pages
ses = await get_session(request)
rv.update(dict(
ws_url = '/websocket/' + ses.get('ws_token'),
STATUS=STATUS.as_dict(),
))
rv['CUR_PAGE'] = '/' + request.path.split('/')[-1]
return rv
@routes.get('/')
@aiohttp_jinja2.template('txn/index.html')
async def homepage(request):
# may take some details from the policy, and cook up into more useful forms
ss = BP.get('summary', None) if STATUS.hsm.get('active') else None
# coldcard tells us when we'll need a local code, by providing seed value
nl = bool('next_local_code' in STATUS.hsm)
return await add_shared_ctx(request, policy_summary=ss, needs_local=nl)
@routes.get('/setup')
@aiohttp_jinja2.template('setup/index.html')
async def setup_page(request):
# HSM policy setup
# get latest status
dev = Connection()
await dev.hsm_status()
return await add_shared_ctx(request)
@routes.get('/bunker')
@aiohttp_jinja2.template('bunker/index.html')
async def bunker_page(request):
# Bunker config and setup
# get latest status
dev = Connection()
await dev.hsm_status()
from torsion import TOR
return await add_shared_ctx(request, BP=BP, force_local_mode=STATUS.force_local_mode)
@routes.get('/tools')
@aiohttp_jinja2.template('tools/index.html')
async def tools_page(request):
# various random things I've been talked into
# - message signing useul tho
if BP.get('policy'):
paths = BP['policy'].get('msg_paths') or []
paths = set(i.replace('*', '999') for i in paths)
else:
# priv_over_ux: we don't know, but some useful ones
paths = ['m', "m/0/0", "m/44'/0'/0'/0/0", "m/49'/0'/0'/0/0",
"m/84'/0'/0/0" ]
paths = list(sorted(paths, key=lambda x: (len(x.split('/')), x.split('/'))))
return await add_shared_ctx(request, msg_paths=paths)
@routes.get('/help')
@aiohttp_jinja2.template('help.html')
async def help_page(request):
return await add_shared_ctx(request)
_static_html_cache = dict()
def send_static_html(fname, disable_cache=True):
# just return the contents of an HTML file, and cache it along the way
# - can't use web.StaticFileResponse because want set-cookies to happen
global _static_html_cache
if not disable_cache and (fname in _static_html_cache):
page_html = _static_html_cache[fname]
else:
_static_html_cache[fname] = page_html = open(fname, 'rt').read()
return web.Response(text=page_html, content_type='text/html')
@routes.get('/logout')
async def logout_page(request):
# do a logout
ses = await get_session(request)
if ses and not ses.new:
logging.warn("Logout of user.")
# This clears cookie, but if they come back to any page on our site,
# they will get a new one, so we can't redirect them to any other page
# here
ses.invalidate()
return send_static_html('static/html/logout.html')
@routes.get('/login')
async def login_page(request):
# Completely static and plain.
# thanks to <https://codepen.io/rizwanahmed19/pen/KMMoEN>
# Create a session, if they don't have one. Need this
# to establish timing/authencity of login data when its posted
ses = await get_session(request)
if not ses:
ses = await new_session(request)
else:
if ses.get('login_ok'):
# they are already logged in, so send them to homepage
return HTTPFound('/')
# - cannot leave session empty, so put anything in there.
ses['j'] = True
ses.changed()
return send_static_html('static/html/login.html')
def accept_user_login(ses):
# setup session for good user
ses['login_ok'] = True
ses['active'] = time.time()
ses['ws_token'] = str(b32encode(os.urandom(15)), 'ascii')
ses.pop('captcha', None)
@routes.post('/login')
async def login_post(request):
# they must have a current session already
# - hope this is enough against CSRF
# - TODO: some rate limiting here, without DoS attacks
ses = await get_session(request)
form = await request.post()
ok = False
if ses.new:
logging.warn("Bad login attempt (no cookie)")
elif (time.time() - ses.created) > settings.MAX_LOGIN_WAIT_TIME:
ses.invalidate()
logging.warn("Stale login attempt (cookie too old)")
elif ses.get('kiddie', 0):
logging.warn("Keeping the kiddies at bay")
else:
captcha = form.get('captcha', '').lower()
pw = form.get('password', None)
if not captcha or not pw:
# keep same captcha; they just pressed enter
ok = False
else:
expect = BP.get('master_pw', settings.MASTER_PW) # XXX scrypt(pw)
expect_code = ses.pop('captcha', None)
ok = (pw == expect) and (captcha == expect_code)
if not ok:
# fail; do nothing visible (but they will get new captcha)
dest = URL(request.headers.get('referer', '/login'))
return HTTPFound(dest)
# SUCCESS
accept_user_login(ses)
# try to put them back where they were before
try:
dest = URL(request.headers.get('referer', '')).query.get('u', '/')
except:
dest = '/'
logging.warn(f"Good login from user, sending to: {dest}")
return HTTPFound(dest)
@routes.get('/captcha')
async def captcha_image(request):
# make a captcha image, but always the same one per session
ses = await get_session(request)
if ses.new:
return HTTPNotFound()
from make_captcha import RansomCaptcha, MegaGifCaptcha, TOKEN_CHARS
import random
if 'captcha' in ses:
# dont let them retry?
code = ses['captcha']
else:
code = ''.join(random.sample(TOKEN_CHARS, 8))
ses['captcha'] = code
easy = BP.get('easy_captcha', settings.EASY_CAPTCHA)
if easy:
itype, data = RansomCaptcha(seed=code).draw(code, foreground='#444')
else:
itype, data = MegaGifCaptcha(seed=code).draw(code, foreground='#444')
return web.Response(body=data, content_type='image/'+itype,
headers = {'Cache-Control': 'no-cache'})
async def rx_handler(ses, ws, orig_request):
# Block on receive, handle each message as it comes in.
# see pp/aiohttp/client_ws.py
async def tx_resp(_ws=ws, **resp):
logging.debug(f"Send resp: {resp}")
await _ws.send_str(json_dumps(resp))
async for msg in ws:
if msg.type != web.WSMsgType.TEXT:
raise TypeError('expected text')
try:
assert len(msg.data) < 20000
req = json_loads(msg.data)
if '_ping' in req:
# connection keep alive, simple
await tx_resp(_pong=1)
continue
# Caution: lots of sensitive values here XXX
#logging.info("WS api data: %r" % req)
except Exception as e:
logging.critical("Junk data on WS", exc_info=1)
break # the connection
# do something with the request
failed = True
try:
await ws_api_handler(ses, tx_resp, req, orig_request)
failed = False
except SystemExit:
raise
except KeyboardInterrupt:
break
except HTMLErrorMsg as exc:
# pre-formated text for display
msg = exc.args[0]
except RuntimeError as exc:
# covers CCProtoError and similar
msg = str(exc) or str(type(exc).__name__)
except BaseException as exc:
logging.exception("API fail: req=%r" % req)
msg = str(exc) or str(type(exc).__name__)
if failed:
# standard error response
await tx_resp(show_modal=True, html=escape(msg), selector='.js-api-fail')
async def push_status_updates_handler(ws):
# block for a bit, and then send display updates (and all other system status changes)
# - there is no need for immediate update because when we rendered the HTML on page
# load, we put in current values.
await asyncio.sleep(0.250)
last = None
while 1:
# get latest state
now = STATUS.as_dict()
if last != now:
# it has changed, so send it.
await ws.send_str(json_dumps(dict(vue_app_cb=dict(update_status=now))))
last = now
# wait until next update, or X seconds max (for keep alive/just in case)
try:
await asyncio.wait_for(STATUS._update_event.wait(), 120)
except asyncio.TimeoutError:
# force an update
last = None
async def ws_api_handler(ses, send_json, req, orig_request): # handle_api
#
# Handle incoming requests over websocket; send back results.
# req = already json parsed request coming in
# send_json() = means to send the response back
#
action = req.action
args = getattr(req, 'args', None)
#logging.warn("API action=%s (%r)" % (action, args)) # MAJOR info leak XXX
logging.debug(f"API action={action}")
if action == '_connected':
logging.info("Websocket connected: %r" % args)
# can send special state update at this point, depending on the page
elif action == 'start_hsm_btn':
await Connection().hsm_start()
await send_json(show_flash_msg=APPROVE_CTA)
elif action == 'delete_user':
name, = args
assert 1 <= len(name) <= MAX_USERNAME_LEN, "bad username length"
await Connection().delete_user(name.encode('utf8'))
# assume it worked, so UX updates right away
try:
STATUS.hsm.users.remove(name)
except ValueError:
pass
STATUS.notify_watchers()
elif action == 'create_user':
name, authmode, new_pw = args
assert 1 <= len(name) <= MAX_USERNAME_LEN, "bad username length"
assert ',' not in name, "no commas in names"
if authmode == 'totp':
mode = USER_AUTH_TOTP | USER_AUTH_SHOW_QR
new_pw = ''
elif authmode == 'rand_pw':
mode = USER_AUTH_HMAC | USER_AUTH_SHOW_QR
new_pw = ''
elif authmode == 'give_pw':
mode = USER_AUTH_HMAC
else:
raise ValueError(authmode)
await Connection().create_user(name.encode('utf8'), mode, new_pw)
# assume it worked, so UX updates right away
try:
STATUS.hsm.users = list(set(STATUS.hsm.users + [name]))
except ValueError:
pass
STATUS.notify_watchers()
elif action == 'submit_policy':
# get some JSON w/ everything the user entered.
p, save_copy = args
proposed = policy.web_cleanup(json_loads(p))
policy.update_sl(proposed)
await Connection().hsm_start(proposed)
STATUS.notify_watchers()
await send_json(show_flash_msg=APPROVE_CTA)
if save_copy:
d = policy.desensitize(proposed)
await send_json(local_download=dict(data=json_dumps(d, indent=2),
filename=f'hsm-policy-{STATUS.xfp}.json.txt'))
elif action == 'download_policy':
proposed = policy.web_cleanup(json_loads(args[0]))
await send_json(local_download=dict(data=json_dumps(proposed, indent=2),
filename=f'hsm-policy-{STATUS.xfp}.json.txt'))
elif action == 'import_policy':
# they are uploading a JSON capture, but need values we can load in Vue
proposed = args[0]
cooked = policy.web_cookup(proposed)
await send_json(vue_app_cb=dict(update_policy=cooked),
show_flash_msg="Policy file imported.")
elif action == 'pick_onion_addr':
from torsion import TOR
addr, pk = await TOR.pick_onion_addr()
await send_json(vue_app_cb=dict(new_onion_addr=[addr, pk]))
elif action == 'pick_master_pw':
pw = b64encode(os.urandom(12)).decode('ascii')
pw = pw.replace('/', 'S').replace('+', 'p')
assert '=' not in pw
await send_json(vue_app_cb=dict(new_master_pw=pw))
elif action == 'new_bunker_config':
from torsion import TOR
# save and apply config values
nv = json_loads(args[0])
assert 4 <= len(nv.master_pw) < 200, "Master password must be at least 4 chars long"
# copy in simple stuff
for fn in [ 'tor_enabled', 'master_pw', 'easy_captcha', 'allow_reboots']:
if fn in nv:
BP[fn] = nv[fn]
# update onion stuff only if PK is known (ie. they changed it)
if nv.get('onion_pk', False) or False:
for fn in [ 'onion_addr', 'onion_pk']:
if fn in nv:
BP[fn] = nv[fn]
BP.save()
await send_json(show_flash_msg="Bunker settings encrypted and saved to disk.")
STATUS.tor_enabled = BP['tor_enabled']
STATUS.notify_watchers()
if not BP['tor_enabled']:
await TOR.stop_tunnel()
elif BP.get('onion_pk') and not (STATUS.force_local_mode or STATUS.setup_mode) \
and TOR.get_current_addr() != BP.get('onion_addr'):
# disconnect/reconnect
await TOR.start_tunnel()
elif action == 'sign_message':
# sign a short text message
# - lots more checking could be done here, but CC does it anyway
msg_text, path, addr_fmt = args
addr_fmt = AF_P2WPKH if addr_fmt != 'classic' else AF_CLASSIC
try:
sig, addr = await Connection().sign_text_msg(msg_text, path, addr_fmt)
except:
# get the spinner to stop: error msg will be "refused by policy" typically
await send_json(vue_app_cb=dict(msg_signing_result='(failed)'))
raise
sig = b64encode(sig).decode('ascii').replace('\n', '')
await send_json(vue_app_cb=dict(msg_signing_result=f'{sig}\n{addr}'))
elif action == 'upload_psbt':
# receiving a PSBT for signing
size, digest, contents = args
psbt = b64decode(contents)
assert len(psbt) == size, "truncated/padded in transit"
assert sha256(psbt).hexdigest() == digest, "corrupted in transit"
STATUS.import_psbt(psbt)
STATUS.notify_watchers()
elif action == 'clear_psbt':
STATUS.clear_psbt()
STATUS.notify_watchers()
elif action == 'preview_psbt':
STATUS.psbt_preview = 'Wait...'
STATUS.notify_watchers()
try:
txt = await Connection().sign_psbt(STATUS._pending_psbt, flags=STXN_VISUALIZE)
txt = txt.decode('ascii')
# force some line splits, especially for bech32, 32-byte values (p2wsh)
probs = re.findall(r'([a-zA-Z0-9]{36,})', txt)
for p in probs:
txt = txt.replace(p, p[0:30] + '\u22ef\n\u22ef' + p[30:])
STATUS.psbt_preview = txt
except:
# like if CC doesn't like the keys, whatever ..
STATUS.psbt_preview = None
raise
finally:
STATUS.notify_watchers()
elif action == 'auth_set_name':
idx, name = args
assert 0 <= len(name) <= MAX_USERNAME_LEN
assert 0 <= idx < len(STATUS.pending_auth)
STATUS.pending_auth[idx].name = name
STATUS.notify_watchers()
elif action == 'auth_offer_guess':
idx, ts, guess = args
assert 0 <= idx < len(STATUS.pending_auth)
STATUS.pending_auth[idx].totp = ts
STATUS.pending_auth[idx].has_guess = 'x'*len(guess)
STATUS._auth_guess[idx] = guess
STATUS.notify_watchers()
elif action == 'submit_psbt':
# they want to sign it now
expect_hash, send_immediately, finalize, wants_dl = args
assert expect_hash == STATUS.psbt_hash, "hash mismatch"
if send_immediately: assert finalize, "must finalize b4 send"
logging.info("Starting to sign...")
STATUS.busy_signing = True
STATUS.notify_watchers()
try:
dev = Connection()
# do auth steps first (no feedback given)
for pa, guess in zip(STATUS.pending_auth, STATUS._auth_guess):
if pa.name and guess:
await dev.user_auth(pa.name, guess, int(pa.totp), a2b_hex(STATUS.psbt_hash))
STATUS.reset_pending_auth()
try:
result = await dev.sign_psbt(STATUS._pending_psbt, finalize=finalize)
logging.info("Done signing")
msg = "Transaction signed."
if send_immediately:
msg += '<br><br>' + broadcast_txn(result)
await send_json(show_modal=True, html=Markup(msg), selector='.js-api-success')
result = (b2a_hex(result) if finalize else b64encode(result)).decode('ascii')
fname = 'transaction.txt' if finalize else ('signed-%s.psbt'%STATUS.psbt_hash[-6:])
if wants_dl:
await send_json(local_download=dict(data=result, filename=fname,
is_b64=(not finalize)))
await dev.hsm_status()
except CCUserRefused:
logging.error("Coldcard refused to sign txn")
await dev.hsm_status()
r = STATUS.hsm.get('last_refusal', None)
if not r:
raise HTMLErroMsg('Refused by local user.')
else:
raise HTMLErrorMsg(f"Rejected by Coldcard.<br><br>{r}")
finally:
STATUS.busy_signing = False
STATUS.notify_watchers()
elif action == 'shutdown_bunker':
await send_json(show_flash_msg="Bunker is shutdown.")
await asyncio.sleep(0.25)
logging.warn("User-initiated shutdown")
asyncio.get_running_loop().stop()
sys.exit(0)
elif action == 'leave_setup_mode':
# During setup process, they want to go Tor mode; which I consider leaving
# setup mode ... in particular, logins are required.
# - button label is "Start Tor" tho ... so user doesn't see it that way
assert STATUS.setup_mode, 'not in setup mode?'
assert BP['tor_enabled'], 'Tor not enabled (need to save?)'
addr = BP['onion_addr']
assert addr and '.onion' in addr, "Bad address?"
STATUS.setup_mode = False
await send_json(show_flash_msg="Tor hidden service has been enabled. "
"It may take a few minutes for the website to become available")
STATUS.notify_watchers()
from torsion import TOR
logging.info(f"Starting hidden service: %s" % addr)
asyncio.create_task(TOR.start_tunnel())
elif action == 'logout_everyone':
# useful for running battles...
# - changes crypto key for cookies, so they are all invalid immediately.
from aiohttp_session.nacl_storage import NaClCookieStorage
import nacl
logging.warning("Logout of everyone!")
# reset all session cookies
storage = orig_request.get('aiohttp_session_storage')
assert isinstance(storage, NaClCookieStorage)
storage._secretbox = nacl.secret.SecretBox(os.urandom(32))
# kick everyone off (bonus step)
for w in web_sockets:
try:
await send_json(redirect='/logout', _ws=w)
await w.close()
except:
pass
else:
raise NotImplementedError(action)
@routes.get('/websocket/{token}')
async def api_websocket(request):
'''
Stream display activity as HTML fragments
- accept config changes
- and more?
'''
try:
ses = await get_session(request)
assert not ses.new
assert ses['login_ok']
token = request.match_info['token']
assert token == ses['ws_token'], ses.get('ws_token')
except:
logging.exception("Bad websocket link")
raise web.HTTPForbidden
# begin a streaming response
ws = web.WebSocketResponse()
await ws.prepare(request)
web_sockets.add(ws)
try:
api_rx = asyncio.create_task(rx_handler(ses, ws, request))
dis_tx = asyncio.create_task(push_status_updates_handler(ws))
await asyncio.gather(api_rx, dis_tx)
finally:
api_rx.cancel()
dis_tx.cancel()
await ws.close()
return ws
@routes.get('/{fname}.php')
@routes.get('/{path}/{fname}.php')
@routes.get('/admin')
async def kiddie_traps(request):
# I hate skill-less people running burpsuite! Track them
ses = await get_session(request)
ses['kiddie'] = ses.get('kiddie', 0) + 1
return HTTPFound("/login")
@routes.get('/favicon.ico')
@routes.get('/robots.txt')
@routes.get('/humans.txt')
async def expected_404s(request):
# we are expecting these URL's to 404
# - don't redirect to login
return HTTPNotFound()
@routes.get('/static/ext/themes/default/assets/fonts/{fname}')
async def remap_fonts(request):
fn = request.match_info['fname']
# remap some links so we don't need to edit Semantic files.
return HTTPMovedPermanently('/static/ext/semantic-fonts/' + fn)
def extra_filters(app):
def my_time(dt):
if dt is None: return None
try:
dt = pendulum.instance(dt)
except ValueError:
return repr(dt)
return dt.in_tz('local').strftime('%l:%M%p').lower()
def my_date(dt):
if dt is None: return None
try:
dt = pendulum.instance(dt)
except ValueError:
return repr(dt)
return dt.in_tz('local').strftime('%b %e')
def duration(i):
if i < 90:
return '%d seconds' % i
else:
return '%d:%02d minutes' % (i//60, i%60)
def extlink(url, label=None):
# in the templates, use like this:
# {{ "http://sdfsfd/sdf/sdf/" | extlink( "Hello") }}
if not label:
from urllib.parse import urlparse
label = urlparse(url).netloc.lower()
if label[0:4] == 'www.':
label = label[4:]
rv = f'<a href="{url}" rel="nofollow" class="extlink" '\
'target="external" title="External site: Opens in new tab">'\
f'{label} <i class="external alternate icon"></i></a>'
return Markup(rv)
def link_txn(txn_hash):
from chain import link_to_txn
url = link_to_txn(txn_hash)
return extlink(url, label=txn_hash)
def link_to_explorer(unused):
url = settings.EXPLORA
label = 'Bitcoin Explorer'
if STATUS.is_testnet:
label = 'Testnet Explorer'
url += '/testnet'
return extlink(url, label=label)
def slugify(t):
return t.lower().strip().replace('&', 'and').replace(' ', '-')
rv = locals().copy()
return rv
@web.middleware
async def auth_everything(request, handler):
# during setup, no login needed
if STATUS.force_local_mode or STATUS.setup_mode:
# bypass security completely
ses = await get_session(request)
if not ses:
ses = await new_session(request)
accept_user_login(ses)
return await handler(request)
# whitelist of pages that do not need login
# all other requests will need auth
if handler in { login_page, login_post, captcha_image, kiddie_traps, expected_404s }:
return await handler(request)
# verify auth
ses = await get_session(request)
if ses:
active = ses.get('active', 0)
idle = time.time() - active
if idle > settings.MAX_IDLE_TIME:
# stale / idle timeout
logging.warn("Idle timeout")
ses.invalidate()
ses = None
if not ses:
# no cookie/stale cookie, so send them to logout/in page
u = URL("/login")
target = request.path[1:]
if target and target != 'logout':
u = u.update_query(u=target)
return HTTPFound(u)
# normal path: continue to page
ses['active'] = time.time()
resp = await handler(request)
# clearly an HTML request, so force no caching
if '/static/' not in request.path:
resp.headers['Cache-Control'] = 'no-cache'
return resp
def startup(setup_mode):
# Entry point. Return an awaitable to be run.
# encrypted cookies for session logic
from aiohttp_session.nacl_storage import NaClCookieStorage
from aiohttp_session import session_middleware
todays_key = os.urandom(32)
sml = session_middleware(NaClCookieStorage(todays_key, cookie_name='X'))
# create app: order of middlewares matters
app = web.Application(middlewares=[sml, auth_everything])
app.add_routes(routes)
app.router.add_static('/static', './static')
# hack to obfuscate our identity a little
# - from <https://w3techs.com/technologies/details/ws-nginx/1> 23% market share
import aiohttp.web_response as ht
ht.SERVER_SOFTWARE = 'nginx/1.14.1'
j_env = aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'),
filters=extra_filters(app))
j_env.globals.update(default_context())
j_env.policies['json.dumps_function'] = json_dumps
my_url = f"http://localhost:{settings.PORT_NUMBER}" + ('/setup' if setup_mode else '')
logging.info(f"Web server at: {my_url}")
# meh; kinda annoying.
if 0:
def popup():
try:
# see <https://github.com/jupyter/notebook/issues/3746#issuecomment-444957821>
# if this doesn't happen on MacOS
from webbrowser import open as open_browser
open_browser(my_url)
except:
logging.error("Unable to pop browser open", exc_info=1)
asyncio.get_running_loop().call_later(3, popup)
from aiohttp.abc import AbstractAccessLogger
class AccessLogger(AbstractAccessLogger):
def log(self, request, response, time):
self.logger.info(f'{response.status} <= {request.method} {request.path}')
return web._run_app(app, port=settings.PORT_NUMBER, print=None, access_log_class=AccessLogger)
if __name__ == "__main__":
from utils import setup_logging
setup_logging()
dev = Connection(None) # won't do anything tho, because async dev.run not called
asyncio.run(startup())
# EOF