From 1c4eccca835556512021a78d94479510d7a0460e Mon Sep 17 00:00:00 2001 From: Lowell Alleman Date: Mon, 9 Jul 2018 17:15:29 -0400 Subject: [PATCH] Release to 0.3.3 - Add CLI Add argparse CLI with several key options to (1) add a random delay, (2) select which interface(s) to test, and (3) decide how many interfaces to test (options include 'default' (gateway), 'random', and 'all'). Create an internal "InterfaceInfo" container. Also created an interface device blacklist filter which currently only blocks 'tun' (VPN) devices. --- kintyre_speedtest.py | 191 ++++++++++++++++++++++++++++++++++++------- setup.py | 2 +- 2 files changed, 164 insertions(+), 29 deletions(-) diff --git a/kintyre_speedtest.py b/kintyre_speedtest.py index e07ed35..22ea79e 100755 --- a/kintyre_speedtest.py +++ b/kintyre_speedtest.py @@ -7,7 +7,18 @@ import platform import time import locale +import random +import time from subprocess import Popen, PIPE, list2cmdline +from collections import namedtuple + +''' +try: + from configparser import ConfigParser +except ImportError: + from ConfigParser import ConfigParser +''' + import ifcfg import speedtest @@ -17,7 +28,7 @@ # To enable loads of noise! # speedtest.DEBUG = True -JSON_FORMAT_VER = "0.3.2" +JSON_FORMAT_VER = "0.3.3" def cli_parser(cmd, breaker, regexes, group_by="id"): cregexes = [] @@ -231,20 +242,42 @@ def non_empty(x): plat[k] = v -def main(output=output_to_hec): - if_for_testing = {} + +InterfaceInfo = namedtuple("InterfaceInfo", ("ip", "dev", "meta")) + + +def _filter_interface_attrs(ifcfg_if, **extra): + """ Keep only some of the ifcfg attributes (prevent info leakage by being explicit.) """ + # Todo: See if there are any other interesting goodies provided by Windows + # Todo: Capture the "Description" field from ipconfig; extend Windows class in ifcfg + d = {} + for k in ("device", "ether", "status", "mtu", "txbytes", "rxbytes"): + if k in ifcfg_if: + d[k] = ifcfg_if[k] + d["v"] = JSON_FORMAT_VER + d.update(**extra) + return InterfaceInfo(ifcfg_if['inet'], ifcfg_if['device'], d) + + +def find_interfaces(whitelist=None): try: interfaces = ifcfg.interfaces() except Exception as e: sys.stderr.write("Unable to get interface info. Falling back to simple output. " "Error: {0}\n".format(e)) - results = run_speedtest(None) - results["v"] = JSON_FORMAT_VER - results["_error"] = "ifcfg failed" - output(json.dumps(results)) + yield InterfaceInfo(None, None, dict(v=JSON_FORMAT_VER, _error="ifcfg failed: {}".format(e))) + return + + if not interfaces: + yield InterfaceInfo(None, None, dict(v=JSON_FORMAT_VER, + _error="no output from ifcfg.interface()")) return - for name, interface in list(interfaces.items()): + for name, interface in interfaces.items(): + if whitelist: + if name not in whitelist: + continue + # Skip loopback adapter if name.startswith("lo"): continue @@ -252,15 +285,48 @@ def main(output=output_to_hec): continue if interface["inet"] is None: continue - d = {} - # Todo: See if there are any other interesting goodies provided by Windows - # Todo: Capture the "Description" field from ipconfig; extend Windows class in ifcfg - for k in ("device", "ether", "status", "mtu", "txbytes", "rxbytes"): - if k in interface: - d[k] = interface[k] - if_for_testing[(interface['inet'], interface['device'])] = d + yield _filter_interface_attrs(interface) - sys.stderr.write("DEBUG: iterfaces for testing: {0!r}\n".format(if_for_testing)) + +def find_matching_interfaces(selected, whitelist=None, blacklist_pattern=None): + if selected == "default": + dflt = ifcfg.default_interface() + if dflt: + return [ _filter_interface_attrs(dflt) ] + else: + sys.stderr.write("No default interface found. Randomly picking one.\n") + selected = "random" + + interfaces = list(find_interfaces(whitelist)) + + # Loop over interfaces. eliminate blacklist matches. + # default: + # "(u|)tun\d+" add match for PPP adapter for windows too + + if blacklist_pattern: + blacklist_pattern = re.compile(blacklist_pattern) + + interfaces2 = [ i for i in interfaces if not blacklist_pattern.match(i.dev) ] + # ToDo: Debug log: show which interfaces were blacklisted... + # ToDo: Check to see if ALL interfaces have been eliminated by this filter. (recover by passing in NO ip?) + if len(interfaces) != len(interfaces2): + sys.stderr.write("Blacklist filter eliminated {} interface devices\n".format( + len(interfaces)-len(interfaces2))) + interfaces = interfaces2 + + if not interfaces: + return InterfaceInfo(None, None, dict(_error="No non-blacklisted interfaces found.")) + if selected == "all": + return interfaces + elif selected == "random": + return [ random.choice(interfaces) ] + else: + raise RuntimeError("Unknown selection type of {!r}".format(selected)) + + + +def main(interfaces, output=output_to_hec): + sys.stderr.write("DEBUG: iterfaces for testing: {0!r}\n".format(interfaces)) net_info = get_macosx_network_hw() sys.stderr.write("DEBUG: get_macosx_hardware() returns: {0!r}\n".format(net_info)) @@ -274,11 +340,12 @@ def main(output=output_to_hec): lshw_info = get_linux_lshw() sys.stderr.write("DEBUG: get_linux_lshw() returns: {0!r}\n".format(lshw_info)) - for ((ip,dev), info) in list(if_for_testing.items()): + for if_ in interfaces: + info = if_.meta try: mac = None - sys.stderr.write("Speed testing on interface {0} (ip={1})\n".format(dev, ip)) - results = run_speedtest(ip) + sys.stderr.write("Speed testing on interface {0} (ip={1})\n".format(if_.dev, if_.ip)) + results = run_speedtest(if_.ip) if "device" in info: results["dev"] = info.pop("device") if "ether" in info: @@ -287,8 +354,8 @@ def main(output=output_to_hec): results["meta"] = info # Add MacOSX hardware info, if available. (Indicate LAN vs Wireless) - if net_info and dev in net_info: - hw_port = net_info[dev].get("hardware_port") + if net_info and if_.dev in net_info: + hw_port = net_info[if_.dev].get("hardware_port") if hw_port: results["osx_hw_port"] = hw_port if hw_port.lower() == "wi-fi": @@ -298,11 +365,11 @@ def main(output=output_to_hec): results["wlan"] = win_info[mac] # Add wireless info for Linux systems - if iwconfig_info and dev in iwconfig_info: - results["wlan"] = iwconfig_info[dev] + if iwconfig_info and if_.dev in iwconfig_info: + results["wlan"] = iwconfig_info[if_.dev] - if lshw_info and dev in lshw_info: - results["hardware"] = lshw_info[dev] + if lshw_info and if_.dev in lshw_info: + results["hardware"] = lshw_info[if_.dev] try: add_platform_info(results) @@ -319,8 +386,76 @@ def main(output=output_to_hec): sys.stderr.write("DEBUG: Payload: {0}\n".format(o)) output(o) except Exception as e: - sys.stderr.write("Failure for ip {0}: {1}\n".format(ip, e)) + sys.stderr.write("Failure for ip {0}: {1}\n".format(if_.ip, e)) + +''' +def load_config(path): + cp = ConfigParser() + cp.read(path) + uuid = cp.get("default", "uuid") +''' + if __name__ == '__main__': - #main(output_to_scriptedinput) - main(output_to_hec) + from argparse import ArgumentParser + + parser = ArgumentParser(description="Kintyre speedtest agent") + parser.add_argument("--version", "-V", action="version", version=JSON_FORMAT_VER) + + parser.add_argument("--interface", "-i", + nargs="+", + help="Name of interface(s) to speedtest. No other interfaces will be " + "considered. When used with --random then one of the provided " + "interfaces will be selected randomly.") + + parser.add_argument("--randomize", + type=int, + metavar="SECS", + help="Add a random delay before running the speedtest. " + "This can avoid kicking off multiple test at the same moment.") + + ifslct = parser.add_mutually_exclusive_group() + ifslct.add_argument("--random", + dest="interface_select", + action="store_const", + const="random", + help="Randomly pick and test a single interface to test on") + ifslct.add_argument("--all", + dest="interface_select", + action="store_const", + const="all", + help="Test against all usable interfaces.") + ifslct.add_argument("--default", + dest="interface_select", + action="store_const", + const="default", + help="Run speedtest on the interface with a default gateway. (This is the " + "default behavior, unless the --interface option is provided)") + + parser.add_argument("--fake-it", + action="store_true", + help="Disable speedtest functionality and return a bogus payload instead." + "ONLY useful for testing.") + + + args = parser.parse_args() + + + if args.fake_it: + def run_speedtest(ip=None): + return { "FAKE_SPEEDTEST" : True, ip: ip } + + print("interface: {!r}".format(args.interface)) + if not args.interface_select: + if args.interface: + args.interface_select = "random" + else: + args.interface_select = "default" + print("mode={} interface: {!r}".format(args.interface_select, args.interface)) + + if args.randomize: + delay = random.randint(0, args.randomize) + print("Sleeping for {} seconds to randomize clocks".format(delay)) + time.sleep(delay) + interfaces = find_matching_interfaces(args.interface_select, args.interface, r"^(u|v|)tun$") + main(interfaces, output_to_hec) diff --git a/setup.py b/setup.py index ee3e1c0..8266a7d 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ def get_ver(): ], install_requires=[ "speedtest-cli", - "ifcfg", + "ifcfg>=0.17.0", "requests", ], entry_points={