diff --git a/rally_ovs/plugins/ovs/ovsclients_impl.py b/rally_ovs/plugins/ovs/ovsclients_impl.py index 16bab27..1532ead 100644 --- a/rally_ovs/plugins/ovs/ovsclients_impl.py +++ b/rally_ovs/plugins/ovs/ovsclients_impl.py @@ -17,15 +17,14 @@ import pipes from io import StringIO from rally_ovs.plugins.ovs.ovsclients import * -from rally_ovs.plugins.ovs.utils import get_ssh_from_credential +from rally_ovs.plugins.ovs.utils import get_client_connection @configure("ssh") class SshClient(OvsClient): def create_client(self): - print("********* call OvnNbctl.create_client") - return get_ssh_from_credential(self.credential) + return get_client_connection(self.credential) @configure("ovn-nbctl") @@ -34,7 +33,7 @@ class OvnNbctl(OvsClient): class _OvnNbctl(DdCtlMixin): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None self.batch_mode = False @@ -85,8 +84,9 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, cmd = itertools.chain(cmd_prefix, [ovn_cmd], opts, [cmd], args) self.cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(self.cmds), - stdout=stdout, stderr=stderr, raise_on_error=raise_on_error) + self.client.run("\n".join(self.cmds), + stdout=stdout, stderr=stderr, + raise_on_error=raise_on_error) self.cmds = None @@ -110,8 +110,8 @@ def flush(self): run_cmds.append(cmd_prefix + " ".join(self.cmds)) - self.ssh.run("\n".join(run_cmds), - stdout=sys.stdout, stderr=sys.stderr) + self.client.run("\n".join(run_cmds), + stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -294,7 +294,7 @@ class OvnSbctl(OvsClient): class _OvnSbctl(DdCtlMixin): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None self.batch_mode = False @@ -332,7 +332,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr): cmd = itertools.chain(cmd_prefix, ["ovn-sbctl"], opts, [cmd], args) self.cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(self.cmds), + self.client.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) self.cmds = None @@ -355,7 +355,7 @@ def flush(self): else: run_cmds.append("sudo ovn-sbctl" + " ".join(self.cmds)) - self.ssh.run("\n".join(run_cmds), + self.client.run("\n".join(run_cmds), stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -368,13 +368,13 @@ def db_set(self, table, record, *col_values): def count_igmp_flows(self, lswitch, network_prefix='239'): stdout = StringIO() - self.ssh.run( + self.client.run( "ovn-sbctl list datapath_binding | grep {sw} -B 1 | " "grep uuid | cut -f 2 -d ':'".format(sw=lswitch), stdout=stdout) uuid = stdout.getvalue().rstrip() stdout = StringIO() - self.ssh.run( + self.client.run( "ovn-sbctl list logical_flow | grep 'dst == {nw}' -B 1 | " "grep {uuid} -B 1 | wc -l".format( uuid=uuid, nw=network_prefix), @@ -405,6 +405,7 @@ def chassis_bound(self, chassis_name): self.batch_mode = batch_mode return len(stdout.getvalue().splitlines()) == 1 + def create_client(self): print("********* call OvnSbctl.create_client") @@ -417,7 +418,7 @@ class OvsSsh(OvsClient): class _OvsSsh(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.batch_mode = False self.cmds = None @@ -444,7 +445,7 @@ def run(self, cmd): self.flush() def run_immediate(self, cmd, stdout=sys.stdout, stderr=sys.stderr): - self.ssh.run(cmd, stdout) + self.client.run(cmd, stdout) def flush(self): if self.cmds == None: @@ -453,7 +454,8 @@ def flush(self): cmds = "\n".join(self.cmds) self.cmds = None - self.ssh.run(cmds, stdout=sys.stdout, stderr=sys.stderr) + self.client.run(cmds, stdout=sys.stdout, stderr=sys.stderr) + def create_client(self): print("********* call OvsSsh.create_client") @@ -467,7 +469,7 @@ class OvsVsctl(OvsClient): class _OvsVsctl(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.batch_mode = False self.sandbox = None @@ -509,7 +511,7 @@ def run(self, cmd, opts=[], args=[], extras=[], stdout=sys.stdout, stderr=sys.st if self.batch_mode: return - self.ssh.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) + self.client.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) self.cmds = None @@ -521,7 +523,7 @@ def flush(self): if self.install_method == "sandbox": self.cmds.insert(0, ". %s/sandbox.rc" % self.sandbox) - self.ssh.run("\n".join(self.cmds), + self.client.run("\n".join(self.cmds), stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -540,6 +542,7 @@ def db_set(self, table, record, *col_values): args += set_colval_args(*col_values) self.run("set", args=args) + def create_client(self): print("********* call OvsVsctl.create_client") client = self._OvsVsctl(self.credential) @@ -553,7 +556,7 @@ class OvsOfctl(OvsClient): class _OvsOfctl(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None @@ -577,7 +580,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr): cmd_prefix = ["ovs-ofctl"] cmd = itertools.chain(cmd_prefix, opts, [cmd], args) cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(cmds), + self.client.run("\n".join(cmds), stdout=stdout, stderr=stderr) def dump_flows(self, bridge): @@ -588,6 +591,7 @@ def dump_flows(self, bridge): oflow_data = oflow_data.split('\n') return len(oflow_data) + def create_client(self): print("********* call OvsOfctl.create_client") client = self._OvsOfctl(self.credential) diff --git a/rally_ovs/plugins/ovs/utils.py b/rally_ovs/plugins/ovs/utils.py index e17661e..8e3d9be 100644 --- a/rally_ovs/plugins/ovs/utils.py +++ b/rally_ovs/plugins/ovs/utils.py @@ -23,10 +23,12 @@ from rally.common import db +import socket +import selectors +import time cidr_incr = utils.RAMInt() - ''' Find credential resource from DB by deployment uuid, and return info as a dict. @@ -147,9 +149,87 @@ def get_sandboxes(deploy_uuid, farm="", tag=""): return sandboxes - - - - - - +class NCatError(Exception): + def __init__(self, details): + self.details = details + + +class NCatClient(object): + def __init__(self, server): + self.server = server + self.sock = socket.create_connection((server, 8000)) + self.sel = selectors.DefaultSelector() + self.sel.register(self.sock, selectors.EVENT_READ) + + def run(self, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600): + start = time.clock_gettime(time.CLOCK_MONOTONIC) + end = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout + to = end - start + # We have to doctor the command a bit for three reasons: + # 1. We need to add a newline to ensure that the command + # gets sent to the server and doesn't just get put in + # the socket's write buffer. + # 2. We need to pipe stderr to stdout so that stderr gets + # returned over the client connection. + # 3. We need to add some marker text so our client knows + # that it has received all output from the command. This + # marker text let's us know if the command completed + # successfully or not. + good = "SUCCESS" + bad = "FAIL" + result = f"&& echo -n {good} || echo -n {bad}" + self.sock.send(f"({cmd}) 2>&1 {result}\n".encode('utf-8')) + out = "" + stream = None + error = False + while True: + events = self.sel.select(to) + for key, mask in events: + buf = key.fileobj.recv(4096).decode('utf-8') + if buf.endswith(good): + out += buf[:-len(good)] + stream = stdout + break + elif buf.endswith(bad): + out += buf[:-len(bad)] + # We assume that if the command errored, then everything + # that was output was stderr. This isn't necessarily + # accurate but it hopefully won't ruffle too many feathers. + stream = stderr + error = True + break + else: + out += buf + to = end - time.clock_gettime(time.CLOCK_MONOTONIC) + + if stream is not None: + stream.write(out) + + if error and raise_on_error: + details = (f"Error running command {cmd}\n" + f"Last stderr output is {out}\n") + raise NCatError(details) + + def close(self): + # Test scenarios call close after every operation because with SSH, + # this is necessary to ensure that we do not open too many + # connections. Our ncat client cache is keyed on hostname rather than + # on the controller node. This means that we open far fewer connections + # than SSH does. Therefore, there is no reason to close connections + # as frequently as we do with SSH. We can afford to leave the + # connection open and reuse the clients instead. This is why we "pass" + # in this method. + pass + + +NCAT_CLIENT_CACHE = {} + + +def get_client_connection(cred): + try: + global NCAT_CLIENT_CACHE + server = cred["host"] + return NCAT_CLIENT_CACHE.setdefault(server, NCatClient(server)) + except socket.error: + return get_ssh_from_credential(cred)