-
Notifications
You must be signed in to change notification settings - Fork 2
/
botlnurl.py
101 lines (91 loc) · 3.53 KB
/
botlnurl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
import json
import requests
import urllib.parse
logger = None
config = None
def gettimeouts():
connectTimeout = 5
readTimeout = 30
if "connectTimeout" in config: connectTimeout = config["connectTimeout"]
if "readTimeout" in config: readTimeout = config["readTimeout"]
return (connectTimeout, readTimeout)
def gettorproxies():
# with tor service installed, default port is 9050
# to find the port to use, can run the following
# cat /etc/tor/torrc | grep SOCKSPort | grep -v "#" | awk '{print $2}'
return {'http': 'socks5h://127.0.0.1:9050','https': 'socks5h://127.0.0.1:9050'}
def geturl(useTor=True, url=None, defaultResponse="{}", headers={}, description=None):
try:
proxies = gettorproxies() if useTor else {}
timeout = gettimeouts()
resp = requests.get(url,timeout=timeout,allow_redirects=True,proxies=proxies,headers=headers,verify=True)
cmdoutput = resp.text
return json.loads(cmdoutput)
except Exception as e:
if description is not None: logger.info(description)
logger.warning(f"Error getting data from LN URL Provider from url ({url}): {str(e)}")
return json.loads(defaultResponse)
def getLNURLPayInfo(identity):
identityParts = identity.split("@")
if len(identityParts) != 2:
return None, None
username = identityParts[0]
domainname = identityParts[1]
useTor = False
protocol = "https"
if domainname.endswith(".onion"):
protocol = "http"
useTor = True
url = f"{protocol}://{domainname}/.well-known/lnurlp/{username}"
j = geturl(useTor, url, "{}", {}, "Get LNURL Pay Info")
return j, url
def isLNURLProviderAllowed(identity):
identityParts = identity.split("@")
if len(identityParts) != 2: return False
domainname = identityParts[1]
if "denyProviders" in config:
if domainname in config["denyProviders"]:
return False
return True
def isLNURLCallbackAllowed(callback):
if "denyProviders" not in config: return True
parseresult = urllib.parse.urlparse(callback)
if parseresult.netloc in config["denyProviders"]:
return False
return True
def getInvoiceFromZapRequest(callback, satsToZap, zapRequest, bech32lnurl):
logger.debug(f"Requesting invoice from LNURL service using zap request")
encoded = getEncodedZapRequest(zapRequest)
amountMillisatoshi = satsToZap*1000
useTor = False
if ".onion" in callback: useTor = True
if "?" in callback:
url = f"{callback}&"
else:
url = f"{callback}?"
url = f"{url}amount={amountMillisatoshi}&nostr={encoded}&lnurl={bech32lnurl}"
j = geturl(useTor, url, "{}", {}, "Get Invoice from LN Url Provider")
return j
def getEncodedZapRequest(zapRequest):
o = {
"id": zapRequest.id,
"pubkey": zapRequest.public_key,
"created_at": zapRequest.created_at,
"kind": zapRequest.kind,
"tags": zapRequest.tags,
"content": zapRequest.content,
"sig": zapRequest.signature,
}
jd = json.dumps(o)
encoded = urllib.parse.quote(jd)
return encoded
def isValidInvoiceResponse(invoiceResponse):
if "status" in invoiceResponse:
if invoiceResponse["status"] == "ERROR":
errReason = "unreported reason"
if "reason" in invoiceResponse: errReason = invoiceResponse["reason"]
logger.warning(f"Invoice request error: {errReason}")
return False
if "pr" not in invoiceResponse: return False
return True