Skip to content

Commit

Permalink
feat: speed improvements for IPv4 address validation (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheMythologist authored Jan 24, 2023
1 parent c79996d commit e71d857
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 22 deletions.
41 changes: 41 additions & 0 deletions speed_tests/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import ipaddress
import re
import socket
import timeit

ipv4 = re.compile(r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")


def old_validate(ip: str) -> bool:
try:
if ipv4.match(ip):
ipaddress.IPv4Address(ip)
return True
except (ipaddress.AddressValueError, socket.gaierror):
pass
return False


def new_validate(ip: str) -> bool:
try:
if ipv4.match(ip):
socket.inet_aton(ip)
return True
except socket.error:
pass
return False


def test_code(fun):
assert fun("52.102.136.0") is True
# assert not fun("52.102.136.257")


if __name__ == "__main__":
old_speed = timeit.timeit(lambda: test_code(old_validate), number=100000)
print(f"Old speed: {old_speed}")
new_speed = timeit.timeit(lambda: test_code(new_validate), number=100000)
print(f"New speed: {new_speed}")
# More than thrice the speed in positive cases!
# Speed is comparable for negative cases
# It's already a win, pluswe are more likely to get positive cases
1 change: 0 additions & 1 deletion util/DynamicBlacklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def construct_all_cidr_masks():

# To generate all CIDR blocks containing a certain IP, we must zero the right-most bit, append /32, then zero the next
# right-most bit (move one bit left), append /31, and so on.
# Probably best manipulated using ipaddress.packed attribute?


def parse_azure_ip_ranges_from_url(url_to_json_file):
Expand Down
33 changes: 12 additions & 21 deletions util/validator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import ipaddress
import re
import socket

Expand All @@ -16,7 +15,6 @@ class NameInCustom(Validator):
def validate(self, document):
global custom_ips
if custom_ips.has(document.text):
# Move cursor to end
raise ValidationError(
message="Name already in list", cursor_position=len(document.text)
)
Expand All @@ -26,61 +24,57 @@ class NameInBlacklist(Validator):
def validate(self, document):
global blacklist
if blacklist.has(document.text):
# Move cursor to end
raise ValidationError(
message="Name already in list", cursor_position=len(document.text)
)


class IPValidator(Validator):
def validate(self, document):
# Move cursor to end
error = ValidationError(
message="Not a valid IP or URL", cursor_position=len(document.text)
)
try:
ip = document.text
if ipv4.match(ip):
ipaddress.IPv4Address(ip)
socket.inet_aton(ip)
elif not domain.match(ip):
raise error
except (ipaddress.AddressValueError, socket.gaierror):
except socket.error:
raise error

# TODO: Add an extra validator to check if an IP could be used by R* services (i.e. it's part of Microsoft Azure)

@staticmethod
def validate_get(text):
# Move cursor to end
error = ValidationError(
message="Not a valid IP or URL", cursor_position=len(text)
)
try:
ip = text
if ipv4.match(ip):
ipaddress.IPv4Address(ip)
socket.inet_aton(ip)
elif domain.match(ip):
ip = socket.gethostbyname(text)
ipaddress.IPv4Address(ip)
try:
ip = socket.gethostbyname(text)
except socket.gaierror:
raise ValidationError(
message=f"URL {text} can't be resolved to IP",
cursor_position=len(text),
)
socket.inet_aton(ip)
else:
raise error
return ip
except ipaddress.AddressValueError:
except socket.error:
raise error
except socket.gaierror:
# Move cursor to end
raise ValidationError(
message=f"URL {text} can't be resolved to IP",
cursor_position=len(text),
)


class IPInCustom(IPValidator):
def validate(self, document):
super().validate(document)
global custom_ips
if document.text in custom_ips or custom_ips.has(document.text, "value"):
# Move cursor to end
raise ValidationError(
message="IP already in list", cursor_position=len(document.text)
)
Expand All @@ -91,7 +85,6 @@ def validate(self, document):
super().validate(document)
global blacklist
if document.text in blacklist or blacklist.has(document.text, "value"):
# Move cursor to end
raise ValidationError(
message="IP already in list", cursor_position=len(document.text)
)
Expand All @@ -101,14 +94,12 @@ class ValidateToken(Validator):
def validate(self, document):
conn = networkmanager.Cloud(document.text)
if not conn.check_connection():
# Move cursor to end
raise ValidationError(
message="DigitalArc is unavailable, unable to check token",
cursor_position=len(document.text),
)

if not conn.check_token():
# Move cursor to end
raise ValidationError(
message="Token invalid", cursor_position=len(document.text)
)

0 comments on commit e71d857

Please sign in to comment.