-
Notifications
You must be signed in to change notification settings - Fork 1
/
vanity_npub.py
163 lines (128 loc) · 5.76 KB
/
vanity_npub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Nostr vanity pubkey generator
https://github.com/kdmukai/nostr_vanity_npub
"""
import datetime
from threading import Event, Lock, Thread
from typing import List
from nostr.key import PrivateKey
from mnemonic import Mnemonic
class ThreadsafeCounter:
"""
Copied from SeedSigner
"""
def __init__(self, initial_value: int = 0):
self.count = initial_value
self._lock = Lock()
@property
def cur_count(self):
# Reads don't require the lock
return self.count
def increment(self, step: int = 1):
# Updates must be locked
with self._lock:
self.count += step
def set_value(self, value: int):
with self._lock:
self.count = value
class BruteForceThread(Thread):
def __init__(self, targets: List[str], bonus_targets: List[str], threadsafe_counter: ThreadsafeCounter, event: Event, include_end: bool = False):
super().__init__(daemon=True)
self.targets = targets
self.bonus_targets = bonus_targets
self.threadsafe_counter = threadsafe_counter
self.event = event
self.include_end = include_end
def run(self):
i = 0
while not self.event.is_set():
i += 1
pk = PrivateKey()
npub = pk.public_key.bech32()[5:] # Trim the "npub1" prefix
# First check the bonus targets
for target in self.bonus_targets:
if npub[:len(target)] == target or (self.include_end and npub[-1*len(target):] == target):
# Found one of our bonus targets!
print(f"BONUS TARGET: {target}:\n\t{pk.public_key.bech32()}\n\t{pk.bech32()}", flush=True)
# Now check our main targets
for target in self.targets:
if npub[:len(target)] == target or (self.include_end and npub[-1*len(target):] == target):
# Found our match!
print(f"\n{int(self.threadsafe_counter.cur_count):,} | {(time.time() - start):0.1f}s | {target} | npub1{npub}")
print(f"""\n\t{"*"*76}\n\tPrivate key: {pk.bech32()}\n\t{"*"*76}\n""", flush=True)
print(f"""\n\t{"*"*81}\n\tPrivate key hex: {pk.hex()}\n\t{"*"*81}\n""", flush=True)
byte_value = bytes.fromhex(pk.hex())
mnemo = Mnemonic("english")
mnemonic_phrase = mnemo.to_mnemonic(byte_value)
print("Mnemonic Phrase:", mnemonic_phrase)
# Set the shared Event to signal to the other threads to exit
self.event.set()
break
# Nothing matched
if i % 1e4 == 0:
# Accumulate every 1e4...
self.threadsafe_counter.increment(1e4)
if self.threadsafe_counter.cur_count % 1e6 == 0:
# ...but update to stdout every 1e6
print(f"{str(datetime.datetime.now())}: Tried {int(self.threadsafe_counter.cur_count):,} npubs so far", flush=True)
continue
if __name__ == "__main__":
import argparse
import time
from threading import Event
from nostr import bech32
parser = argparse.ArgumentParser(
description="********************** Nostr vanity pubkey generator **********************\n\n" + \
"Search for `target` in an npub such that:\n\n" + \
"\tnpub1[target]acd023...",
formatter_class=argparse.RawTextHelpFormatter,
)
# Required positional arguments
parser.add_argument('targets', type=str,
help="The string(s) you're looking for (comma-separated, no spaces)")
# Optional arguments
parser.add_argument('-b', '--bonus_targets',
default="",
dest="bonus_targets",
help="Additional targets to search for, but does not end execution when found (comma-separated, no spaces)")
parser.add_argument('-e', '--include-end',
action="store_true",
default=False,
dest="include_end",
help="Also search the end of the npub")
parser.add_argument('-j', '--jobs',
default=2,
type=int,
dest="num_jobs",
help="Number of threads (default: 2)")
args = parser.parse_args()
targets = args.targets.lower().split(",")
bonus_targets = args.bonus_targets.lower().split(",") if args.bonus_targets else []
include_end = args.include_end
num_jobs = args.num_jobs
print(targets)
print(bonus_targets)
for target in targets + bonus_targets:
for i in range(0, len(target)):
if target[i] not in bech32.CHARSET:
print(f"""ERROR: "{target[i]}" is not a valid character (not in the bech32 charset)""")
print(f"""\tbech32 chars: {"".join(sorted(bech32.CHARSET))}""")
exit()
if max([len(t) for t in targets]) >= 6:
print(
"This will probably take a LONG time!\n\n" + \
"\tTip: CTRL-C to abort.\n\n"
)
start = time.time()
threadsafe_counter = ThreadsafeCounter()
event = Event()
threads = []
for i in range(0, num_jobs):
brute_force_thread = BruteForceThread(targets, bonus_targets, threadsafe_counter=threadsafe_counter, event=event, include_end=include_end)
brute_force_thread.start()
threads.append(brute_force_thread)
print(f"Initialized {num_jobs} threads")
print(f"{str(datetime.datetime.now())}: Starting")
# Block until the first thread exits; after one thread finds a match, it will set the
# Event and all threads will exit.
threads[0].join()