Skip to content
This repository has been archived by the owner on Aug 11, 2022. It is now read-only.

Commit

Permalink
Release to 0.3.3 - Add CLI
Browse files Browse the repository at this point in the history
Add argparse CLI with several key options to (1) add a random delay, (2) select
which interface(s) to test, and (3) decide how many interfaces to test (options
include 'default' (gateway), 'random', and 'all').

Create an internal "InterfaceInfo" container.  Also created an interface device
blacklist filter which currently only blocks 'tun' (VPN) devices.
  • Loading branch information
lowell80 committed Jul 9, 2018
1 parent aaae5d4 commit 1c4eccc
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 29 deletions.
191 changes: 163 additions & 28 deletions kintyre_speedtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@
import platform
import time
import locale
import random
import time
from subprocess import Popen, PIPE, list2cmdline
from collections import namedtuple

'''
try:
from configparser import ConfigParser
except ImportError:
from ConfigParser import ConfigParser
'''


import ifcfg
import speedtest
Expand All @@ -17,7 +28,7 @@
# To enable loads of noise!
# speedtest.DEBUG = True

JSON_FORMAT_VER = "0.3.2"
JSON_FORMAT_VER = "0.3.3"

def cli_parser(cmd, breaker, regexes, group_by="id"):
cregexes = []
Expand Down Expand Up @@ -231,36 +242,91 @@ def non_empty(x):
plat[k] = v


def main(output=output_to_hec):
if_for_testing = {}

InterfaceInfo = namedtuple("InterfaceInfo", ("ip", "dev", "meta"))


def _filter_interface_attrs(ifcfg_if, **extra):
""" Keep only some of the ifcfg attributes (prevent info leakage by being explicit.) """
# Todo: See if there are any other interesting goodies provided by Windows
# Todo: Capture the "Description" field from ipconfig; extend Windows class in ifcfg
d = {}
for k in ("device", "ether", "status", "mtu", "txbytes", "rxbytes"):
if k in ifcfg_if:
d[k] = ifcfg_if[k]
d["v"] = JSON_FORMAT_VER
d.update(**extra)
return InterfaceInfo(ifcfg_if['inet'], ifcfg_if['device'], d)


def find_interfaces(whitelist=None):
try:
interfaces = ifcfg.interfaces()
except Exception as e:
sys.stderr.write("Unable to get interface info. Falling back to simple output. "
"Error: {0}\n".format(e))
results = run_speedtest(None)
results["v"] = JSON_FORMAT_VER
results["_error"] = "ifcfg failed"
output(json.dumps(results))
yield InterfaceInfo(None, None, dict(v=JSON_FORMAT_VER, _error="ifcfg failed: {}".format(e)))
return

if not interfaces:
yield InterfaceInfo(None, None, dict(v=JSON_FORMAT_VER,
_error="no output from ifcfg.interface()"))
return

for name, interface in list(interfaces.items()):
for name, interface in interfaces.items():
if whitelist:
if name not in whitelist:
continue

# Skip loopback adapter
if name.startswith("lo"):
continue
if interface.get("status", None) == "inactive":
continue
if interface["inet"] is None:
continue
d = {}
# Todo: See if there are any other interesting goodies provided by Windows
# Todo: Capture the "Description" field from ipconfig; extend Windows class in ifcfg
for k in ("device", "ether", "status", "mtu", "txbytes", "rxbytes"):
if k in interface:
d[k] = interface[k]
if_for_testing[(interface['inet'], interface['device'])] = d
yield _filter_interface_attrs(interface)

sys.stderr.write("DEBUG: iterfaces for testing: {0!r}\n".format(if_for_testing))

def find_matching_interfaces(selected, whitelist=None, blacklist_pattern=None):
if selected == "default":
dflt = ifcfg.default_interface()
if dflt:
return [ _filter_interface_attrs(dflt) ]
else:
sys.stderr.write("No default interface found. Randomly picking one.\n")
selected = "random"

interfaces = list(find_interfaces(whitelist))

# Loop over interfaces. eliminate blacklist matches.
# default:
# "(u|)tun\d+" add match for PPP adapter for windows too

if blacklist_pattern:
blacklist_pattern = re.compile(blacklist_pattern)

interfaces2 = [ i for i in interfaces if not blacklist_pattern.match(i.dev) ]
# ToDo: Debug log: show which interfaces were blacklisted...
# ToDo: Check to see if ALL interfaces have been eliminated by this filter. (recover by passing in NO ip?)
if len(interfaces) != len(interfaces2):
sys.stderr.write("Blacklist filter eliminated {} interface devices\n".format(
len(interfaces)-len(interfaces2)))
interfaces = interfaces2

if not interfaces:
return InterfaceInfo(None, None, dict(_error="No non-blacklisted interfaces found."))
if selected == "all":
return interfaces
elif selected == "random":
return [ random.choice(interfaces) ]
else:
raise RuntimeError("Unknown selection type of {!r}".format(selected))



def main(interfaces, output=output_to_hec):
sys.stderr.write("DEBUG: iterfaces for testing: {0!r}\n".format(interfaces))

net_info = get_macosx_network_hw()
sys.stderr.write("DEBUG: get_macosx_hardware() returns: {0!r}\n".format(net_info))
Expand All @@ -274,11 +340,12 @@ def main(output=output_to_hec):
lshw_info = get_linux_lshw()
sys.stderr.write("DEBUG: get_linux_lshw() returns: {0!r}\n".format(lshw_info))

for ((ip,dev), info) in list(if_for_testing.items()):
for if_ in interfaces:
info = if_.meta
try:
mac = None
sys.stderr.write("Speed testing on interface {0} (ip={1})\n".format(dev, ip))
results = run_speedtest(ip)
sys.stderr.write("Speed testing on interface {0} (ip={1})\n".format(if_.dev, if_.ip))
results = run_speedtest(if_.ip)
if "device" in info:
results["dev"] = info.pop("device")
if "ether" in info:
Expand All @@ -287,8 +354,8 @@ def main(output=output_to_hec):
results["meta"] = info

# Add MacOSX hardware info, if available. (Indicate LAN vs Wireless)
if net_info and dev in net_info:
hw_port = net_info[dev].get("hardware_port")
if net_info and if_.dev in net_info:
hw_port = net_info[if_.dev].get("hardware_port")
if hw_port:
results["osx_hw_port"] = hw_port
if hw_port.lower() == "wi-fi":
Expand All @@ -298,11 +365,11 @@ def main(output=output_to_hec):
results["wlan"] = win_info[mac]

# Add wireless info for Linux systems
if iwconfig_info and dev in iwconfig_info:
results["wlan"] = iwconfig_info[dev]
if iwconfig_info and if_.dev in iwconfig_info:
results["wlan"] = iwconfig_info[if_.dev]

if lshw_info and dev in lshw_info:
results["hardware"] = lshw_info[dev]
if lshw_info and if_.dev in lshw_info:
results["hardware"] = lshw_info[if_.dev]

try:
add_platform_info(results)
Expand All @@ -319,8 +386,76 @@ def main(output=output_to_hec):
sys.stderr.write("DEBUG: Payload: {0}\n".format(o))
output(o)
except Exception as e:
sys.stderr.write("Failure for ip {0}: {1}\n".format(ip, e))
sys.stderr.write("Failure for ip {0}: {1}\n".format(if_.ip, e))

'''
def load_config(path):
cp = ConfigParser()
cp.read(path)
uuid = cp.get("default", "uuid")
'''


if __name__ == '__main__':
#main(output_to_scriptedinput)
main(output_to_hec)
from argparse import ArgumentParser

parser = ArgumentParser(description="Kintyre speedtest agent")
parser.add_argument("--version", "-V", action="version", version=JSON_FORMAT_VER)

parser.add_argument("--interface", "-i",
nargs="+",
help="Name of interface(s) to speedtest. No other interfaces will be "
"considered. When used with --random then one of the provided "
"interfaces will be selected randomly.")

parser.add_argument("--randomize",
type=int,
metavar="SECS",
help="Add a random delay before running the speedtest. "
"This can avoid kicking off multiple test at the same moment.")

ifslct = parser.add_mutually_exclusive_group()
ifslct.add_argument("--random",
dest="interface_select",
action="store_const",
const="random",
help="Randomly pick and test a single interface to test on")
ifslct.add_argument("--all",
dest="interface_select",
action="store_const",
const="all",
help="Test against all usable interfaces.")
ifslct.add_argument("--default",
dest="interface_select",
action="store_const",
const="default",
help="Run speedtest on the interface with a default gateway. (This is the "
"default behavior, unless the --interface option is provided)")

parser.add_argument("--fake-it",
action="store_true",
help="Disable speedtest functionality and return a bogus payload instead."
"ONLY useful for testing.")


args = parser.parse_args()


if args.fake_it:
def run_speedtest(ip=None):
return { "FAKE_SPEEDTEST" : True, ip: ip }

print("interface: {!r}".format(args.interface))
if not args.interface_select:
if args.interface:
args.interface_select = "random"
else:
args.interface_select = "default"
print("mode={} interface: {!r}".format(args.interface_select, args.interface))

if args.randomize:
delay = random.randint(0, args.randomize)
print("Sleeping for {} seconds to randomize clocks".format(delay))
time.sleep(delay)
interfaces = find_matching_interfaces(args.interface_select, args.interface, r"^(u|v|)tun$")
main(interfaces, output_to_hec)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_ver():
],
install_requires=[
"speedtest-cli",
"ifcfg",
"ifcfg>=0.17.0",
"requests",
],
entry_points={
Expand Down

0 comments on commit 1c4eccc

Please sign in to comment.