def __init__(self, **kwargs):
if kwargs.get("default_enter") is None:
kwargs["default_enter"] = "\r"
+ # Aruba has an auto-complete on space behavior that is problematic
+ if kwargs.get("global_cmd_verify") is None:
+ kwargs["global_cmd_verify"] = False
return super().__init__(**kwargs)
def session_preparation(self):
@@ -57,7 +60,13 @@
Module netmiko.aruba.aruba_ssh
"""
if not pattern:
pattern = re.escape(self.base_prompt[:16])
- return super().check_config_mode(check_string=check_string, pattern=pattern)
+ return super().check_config_mode(check_string=check_string, pattern=pattern)
+
+ def config_mode(self, config_command="configure term", pattern=""):
+ """
+ Aruba auto completes on space so 'configure' needs fully spelled-out.
+ """
+ return super().config_mode(config_command=config_command, pattern=pattern)
@@ -209,6 +218,9 @@
Classes
def __init__(self, **kwargs):
if kwargs.get("default_enter") is None:
kwargs["default_enter"] = "\r"
+ # Aruba has an auto-complete on space behavior that is problematic
+ if kwargs.get("global_cmd_verify") is None:
+ kwargs["global_cmd_verify"] = False
return super().__init__(**kwargs)
def session_preparation(self):
@@ -231,7 +243,13 @@
Classes
"""
if not pattern:
pattern = re.escape(self.base_prompt[:16])
- return super().check_config_mode(check_string=check_string, pattern=pattern)
+ return super().check_config_mode(check_string=check_string, pattern=pattern)
+
+ def config_mode(self, config_command="configure term", pattern=""):
+ """
+ Aruba auto completes on space so 'configure' needs fully spelled-out.
+ """
+ return super().config_mode(config_command=config_command, pattern=pattern)
def __init__(self, **kwargs):
if kwargs.get("default_enter") is None:
kwargs["default_enter"] = "\r"
+ # Aruba has an auto-complete on space behavior that is problematic
+ if kwargs.get("global_cmd_verify") is None:
+ kwargs["global_cmd_verify"] = False
return super().__init__(**kwargs)
def session_preparation(self):
@@ -205,7 +208,13 @@
Classes
"""
if not pattern:
pattern = re.escape(self.base_prompt[:16])
- return super().check_config_mode(check_string=check_string, pattern=pattern)
+ return super().check_config_mode(check_string=check_string, pattern=pattern)
+
+ def config_mode(self, config_command="configure term", pattern=""):
+ """
+ Aruba auto completes on space so 'configure' needs fully spelled-out.
+ """
+ return super().config_mode(config_command=config_command, pattern=pattern)
output = self.strip_prompt(output)
return output
- def establish_connection(self, width=None, height=None):
+ def establish_connection(self, width=511, height=1000):
"""Establish SSH connection to the network device
Timeout will generate a NetmikoTimeoutException
Authentication failure will generate a NetmikoAuthenticationException
- width and height are needed for Fortinet paging setting.
-
- :param width: Specified width of the VT100 terminal window
+ :param width: Specified width of the VT100 terminal window (default: 511)
:type width: int
- :param height: Specified height of the VT100 terminal window
+ :param height: Specified height of the VT100 terminal window (default: 1000)
:type height: int
"""
if self.protocol == "telnet":
@@ -945,12 +943,9 @@
Module netmiko.base_connection
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
- if width and height:
- self.remote_conn = self.remote_conn_pre.invoke_shell(
- term="vt100", width=width, height=height
- )
- else:
- self.remote_conn = self.remote_conn_pre.invoke_shell()
+ self.remote_conn = self.remote_conn_pre.invoke_shell(
+ term="vt100", width=width, height=height
+ )
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
@@ -1056,7 +1051,10 @@
Module netmiko.base_connection
log.debug(f"Command: {command}")
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
log.debug(f"{output}")
log.debug("Exiting disable_paging")
return output
@@ -1079,7 +1077,10 @@
Module netmiko.base_connection
command = self.normalize_cmd(command)
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
return output
def set_base_prompt(
@@ -1646,7 +1647,10 @@
Module netmiko.base_connection
if not self.check_config_mode():
self.write_channel(self.normalize_cmd(config_command))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(config_command.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(config_command.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if not self.check_config_mode():
@@ -1666,7 +1670,10 @@
Module netmiko.base_connection
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(exit_config.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(exit_config.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if self.check_config_mode():
@@ -2924,18 +2931,16 @@
Classes
output = self.strip_prompt(output)
return output
- def establish_connection(self, width=None, height=None):
+ def establish_connection(self, width=511, height=1000):
"""Establish SSH connection to the network device
Timeout will generate a NetmikoTimeoutException
Authentication failure will generate a NetmikoAuthenticationException
- width and height are needed for Fortinet paging setting.
-
- :param width: Specified width of the VT100 terminal window
+ :param width: Specified width of the VT100 terminal window (default: 511)
:type width: int
- :param height: Specified height of the VT100 terminal window
+ :param height: Specified height of the VT100 terminal window (default: 1000)
:type height: int
"""
if self.protocol == "telnet":
@@ -2971,12 +2976,9 @@
Classes
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
- if width and height:
- self.remote_conn = self.remote_conn_pre.invoke_shell(
- term="vt100", width=width, height=height
- )
- else:
- self.remote_conn = self.remote_conn_pre.invoke_shell()
+ self.remote_conn = self.remote_conn_pre.invoke_shell(
+ term="vt100", width=width, height=height
+ )
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
@@ -3082,7 +3084,10 @@
Classes
log.debug(f"Command: {command}")
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
log.debug(f"{output}")
log.debug("Exiting disable_paging")
return output
@@ -3105,7 +3110,10 @@
Classes
command = self.normalize_cmd(command)
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
return output
def set_base_prompt(
@@ -3672,7 +3680,10 @@
Classes
if not self.check_config_mode():
self.write_channel(self.normalize_cmd(config_command))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(config_command.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(config_command.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if not self.check_config_mode():
@@ -3692,7 +3703,10 @@
Classes
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(exit_config.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(exit_config.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if self.check_config_mode():
@@ -4143,7 +4157,10 @@
Methods
if not self.check_config_mode():
self.write_channel(self.normalize_cmd(config_command))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(config_command.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(config_command.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if not self.check_config_mode():
@@ -4179,7 +4196,10 @@
Methods
log.debug(f"Command: {command}")
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
log.debug(f"{output}")
log.debug("Exiting disable_paging")
return output
@@ -4257,31 +4277,28 @@
def establish_connection(self, width=511, height=1000):
"""Establish SSH connection to the network device
Timeout will generate a NetmikoTimeoutException
Authentication failure will generate a NetmikoAuthenticationException
- width and height are needed for Fortinet paging setting.
-
- :param width: Specified width of the VT100 terminal window
+ :param width: Specified width of the VT100 terminal window (default: 511)
:type width: int
- :param height: Specified height of the VT100 terminal window
+ :param height: Specified height of the VT100 terminal window (default: 1000)
:type height: int
"""
if self.protocol == "telnet":
@@ -4317,12 +4334,9 @@
Methods
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
- if width and height:
- self.remote_conn = self.remote_conn_pre.invoke_shell(
- term="vt100", width=width, height=height
- )
- else:
- self.remote_conn = self.remote_conn_pre.invoke_shell()
+ self.remote_conn = self.remote_conn_pre.invoke_shell(
+ term="vt100", width=width, height=height
+ )
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
@@ -4357,7 +4371,10 @@
Methods
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(exit_config.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(exit_config.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if self.check_config_mode():
@@ -5356,7 +5373,10 @@
Methods
command = self.normalize_cmd(command)
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
return output
remote_cmd = f"dir {self.file_system}/{remote_file}"
remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
+ if re.search("no such file or directory", remote_out, flags=re.I):
+ raise IOError("Unable to find file on remote system")
# Match line containing file name
escape_file_name = re.escape(remote_file)
pattern = r".*({}).*".format(escape_file_name)
@@ -120,12 +122,10 @@
Module netmiko.cisco.cisco_nxos_ssh
if match:
file_size = match.group(0)
file_size = file_size.split()[0]
-
- if "No such file or directory" in remote_out:
- raise IOError("Unable to find file on remote system")
- else:
return int(file_size)
+ raise IOError("Unable to find file on remote system")
+
@staticmethod
def process_md5(md5_output, pattern=r"= (.*)"):
"""Not needed on NX-OS."""
@@ -225,6 +225,8 @@
Classes
remote_cmd = f"dir {self.file_system}/{remote_file}"
remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
+ if re.search("no such file or directory", remote_out, flags=re.I):
+ raise IOError("Unable to find file on remote system")
# Match line containing file name
escape_file_name = re.escape(remote_file)
pattern = r".*({}).*".format(escape_file_name)
@@ -232,12 +234,10 @@
Classes
if match:
file_size = match.group(0)
file_size = file_size.split()[0]
-
- if "No such file or directory" in remote_out:
- raise IOError("Unable to find file on remote system")
- else:
return int(file_size)
+ raise IOError("Unable to find file on remote system")
+
@staticmethod
def process_md5(md5_output, pattern=r"= (.*)"):
"""Not needed on NX-OS."""
diff --git a/docs/netmiko/cisco/cisco_xr.html b/docs/netmiko/cisco/cisco_xr.html
index e9cbdee74..8f39184e0 100644
--- a/docs/netmiko/cisco/cisco_xr.html
+++ b/docs/netmiko/cisco/cisco_xr.html
@@ -771,7 +771,7 @@
remote_cmd = f"dir {self.file_system}/{remote_file}"
remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
+ if re.search("no such file or directory", remote_out, flags=re.I):
+ raise IOError("Unable to find file on remote system")
# Match line containing file name
escape_file_name = re.escape(remote_file)
pattern = r".*({}).*".format(escape_file_name)
@@ -1565,12 +1573,10 @@
Inherited members
if match:
file_size = match.group(0)
file_size = file_size.split()[0]
-
- if "No such file or directory" in remote_out:
- raise IOError("Unable to find file on remote system")
- else:
return int(file_size)
+ raise IOError("Unable to find file on remote system")
+
@staticmethod
def process_md5(md5_output, pattern=r"= (.*)"):
"""Not needed on NX-OS."""
@@ -3129,7 +3135,7 @@
Initialize attributes for establishing connection to target device.
+
+ :param ip: IP address of target device. Not required if `host` is
+ provided.
+ :type ip: str
+
+ :param host: Hostname of target device. Not required if `ip` is
+ provided.
+ :type host: str
+
+ :param username: Username to authenticate against target device if
+ required.
+ :type username: str
+
+ :param password: Password to authenticate against target device if
+ required.
+ :type password: str
+
+ :param secret: The enable password if target device requires one.
+ :type secret: str
+
+ :param port: The destination port used to connect to the target
+ device.
+ :type port: int or None
+
+ :param device_type: Class selection based on device type.
+ :type device_type: str
+
+ :param verbose: Enable additional messages to standard output.
+ :type verbose: bool
+
+ :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
+ :type global_delay_factor: int
+
+ :param use_keys: Connect to target device using SSH keys.
+ :type use_keys: bool
+
+ :param key_file: Filename path of the SSH key file to use.
+ :type key_file: str
+
+ :param pkey: SSH key object to use.
+ :type pkey: paramiko.PKey
+
+ :param passphrase: Passphrase to use for encrypted key; password will be used for key
+ decryption if not specified.
+ :type passphrase: str
+
+ :param allow_agent: Enable use of SSH key-agent.
+ :type allow_agent: bool
+
+ :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
+ means unknown SSH host keys will be accepted).
+ :type ssh_strict: bool
+
+ :param system_host_keys: Load host keys from the users known_hosts file.
+ :type system_host_keys: bool
+ :param alt_host_keys: If `True` host keys will be loaded from the file specified in
+ alt_key_file.
+ :type alt_host_keys: bool
+
+ :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
+ :type alt_key_file: str
+
+ :param ssh_config_file: File name of OpenSSH configuration file.
+ :type ssh_config_file: str
+
+ :param timeout: Connection timeout.
+ :type timeout: float
+
+ :param session_timeout: Set a timeout for parallel requests.
+ :type session_timeout: float
+
+ :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
+ :type auth_timeout: float
+
+ :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
+ :type banner_timeout: float
+
+ :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
+ Currently defaults to 0, for backwards compatibility (it will not attempt
+ to keep the connection alive).
+ :type keepalive: int
+
+ :param default_enter: Character(s) to send to correspond to enter key (default:
+
+
).
+:type default_enter: str
+
:param response_return: Character(s) to use in normalized return data to represent
+ enter key (default:
+
+
)
+:type response_return: str
+
:param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
+ to select smallest of global and specific. Sets default global_delay_factor to .1
+ (default: False)
+ :type fast_cli: boolean
+
+ :param session_log: File path or BufferedIOBase subclass object to write the session log to.
+ :type session_log: str
+
+ :param session_log_record_writes: The session log generally only records channel reads due
+ to eliminate command duplication due to command echo. You can enable this if you
+ want to record both channel reads and channel writes in the log (default: False).
+ :type session_log_record_writes: boolean
+
+ :param session_log_file_mode: "write" or "append" for session_log file mode
+ (default: "write")
+ :type session_log_file_mode: str
+
+ :param allow_auto_change: Allow automatic configuration changes for terminal settings.
+ (default: False)
+ :type allow_auto_change: bool
+
+ :param encoding: Encoding to be used when writing bytes to the output channel.
+ (default: ascii)
+ :type encoding: str
+
+ :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
+ communication to the target host (default: None).
+ :type sock: socket
+
+ :param global_cmd_verify: Control whether command echo verification is enabled or disabled
+ (default: None). Global attribute takes precedence over function `cmd_verify`
+ argument. Value of `None` indicates to use function `cmd_verify` argument.
+ :type global_cmd_verify: bool|None
+
+
+Source code
+
class HuaweiSmartAXSSH(CiscoBaseConnection):
+ """Supports Huawei SmartAX and OLT."""
+
+ def session_preparation(self):
+ """Prepare the session after the connection has been established."""
+ self.ansi_escape_codes = True
+ self._test_channel_read()
+ self.set_base_prompt()
+ self._disable_smart_interaction()
+ self.disable_paging()
+ # Clear the read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
+
+ def strip_ansi_escape_codes(self, string_buffer):
+ """
+ Huawei does a strange thing where they add a space and then add ESC[1D
+ to move the cursor to the left one.
+ The extra space is problematic.
+ """
+ code_cursor_left = chr(27) + r"\[\d+D"
+ output = string_buffer
+ pattern = rf" {code_cursor_left}"
+ output = re.sub(pattern, "", output)
+
+ log.debug("Stripping ANSI escape codes")
+ log.debug(f"new_output = {output}")
+ log.debug(f"repr = {repr(output)}")
+ return super().strip_ansi_escape_codes(output)
+
+ def _disable_smart_interaction(self, command="undo smart", delay_factor=1):
+ """Disables the { <cr> } prompt to avoid having to sent a 2nd return after each command"""
+ delay_factor = self.select_delay_factor(delay_factor)
+ time.sleep(delay_factor * 0.1)
+ self.clear_buffer()
+ command = self.normalize_cmd(command)
+ log.debug("In disable_smart_interaction")
+ log.debug(f"Command: {command}")
+ self.write_channel(command)
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
+ log.debug(f"{output}")
+ log.debug("Exiting disable_smart_interaction")
+
+ def disable_paging(self, command="scroll"):
+ return super().disable_paging(command=command)
+
+ def config_mode(self, config_command="config", pattern=""):
+ """Enter configuration mode."""
+ return super().config_mode(config_command=config_command, pattern=pattern)
+
+ def check_config_mode(self, check_string=")#"):
+ return super().check_config_mode(check_string=check_string)
+
+ def exit_config_mode(self, exit_config="return"):
+ return super().exit_config_mode(exit_config=exit_config)
+
+ def check_enable_mode(self, check_string="#"):
+ return super().check_enable_mode(check_string=check_string)
+
+ def enable(self, cmd="enable", pattern="", re_flags=re.IGNORECASE):
+ return super().enable(cmd=cmd, pattern=pattern, re_flags=re_flags)
+
+ def set_base_prompt(self, pri_prompt_terminator=">", alt_prompt_terminator="#"):
+ return super().set_base_prompt(
+ pri_prompt_terminator=pri_prompt_terminator,
+ alt_prompt_terminator=alt_prompt_terminator,
+ )
+
+ def save_config(self, cmd="save", confirm=False, confirm_response=""):
+ """ Save Config for HuaweiSSH"""
+ return super().save_config(
+ cmd=cmd, confirm=confirm, confirm_response=confirm_response
+ )
Huawei does a strange thing where they add a space and then add ESC[1D
+to move the cursor to the left one.
+The extra space is problematic.
+
+Source code
+
def strip_ansi_escape_codes(self, string_buffer):
+ """
+ Huawei does a strange thing where they add a space and then add ESC[1D
+ to move the cursor to the left one.
+ The extra space is problematic.
+ """
+ code_cursor_left = chr(27) + r"\[\d+D"
+ output = string_buffer
+ pattern = rf" {code_cursor_left}"
+ output = re.sub(pattern, "", output)
+
+ log.debug("Stripping ANSI escape codes")
+ log.debug(f"new_output = {output}")
+ log.debug(f"repr = {repr(output)}")
+ return super().strip_ansi_escape_codes(output)
Initialize attributes for establishing connection to target device.
+
+ :param ip: IP address of target device. Not required if `host` is
+ provided.
+ :type ip: str
+
+ :param host: Hostname of target device. Not required if `ip` is
+ provided.
+ :type host: str
+
+ :param username: Username to authenticate against target device if
+ required.
+ :type username: str
+
+ :param password: Password to authenticate against target device if
+ required.
+ :type password: str
+
+ :param secret: The enable password if target device requires one.
+ :type secret: str
+
+ :param port: The destination port used to connect to the target
+ device.
+ :type port: int or None
+
+ :param device_type: Class selection based on device type.
+ :type device_type: str
+
+ :param verbose: Enable additional messages to standard output.
+ :type verbose: bool
+
+ :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
+ :type global_delay_factor: int
+
+ :param use_keys: Connect to target device using SSH keys.
+ :type use_keys: bool
+
+ :param key_file: Filename path of the SSH key file to use.
+ :type key_file: str
+
+ :param pkey: SSH key object to use.
+ :type pkey: paramiko.PKey
+
+ :param passphrase: Passphrase to use for encrypted key; password will be used for key
+ decryption if not specified.
+ :type passphrase: str
+
+ :param allow_agent: Enable use of SSH key-agent.
+ :type allow_agent: bool
+
+ :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
+ means unknown SSH host keys will be accepted).
+ :type ssh_strict: bool
+
+ :param system_host_keys: Load host keys from the users known_hosts file.
+ :type system_host_keys: bool
+ :param alt_host_keys: If `True` host keys will be loaded from the file specified in
+ alt_key_file.
+ :type alt_host_keys: bool
+
+ :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
+ :type alt_key_file: str
+
+ :param ssh_config_file: File name of OpenSSH configuration file.
+ :type ssh_config_file: str
+
+ :param timeout: Connection timeout.
+ :type timeout: float
+
+ :param session_timeout: Set a timeout for parallel requests.
+ :type session_timeout: float
+
+ :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
+ :type auth_timeout: float
+
+ :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
+ :type banner_timeout: float
+
+ :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
+ Currently defaults to 0, for backwards compatibility (it will not attempt
+ to keep the connection alive).
+ :type keepalive: int
+
+ :param default_enter: Character(s) to send to correspond to enter key (default:
+
+
).
+:type default_enter: str
+
:param response_return: Character(s) to use in normalized return data to represent
+ enter key (default:
+
+
)
+:type response_return: str
+
:param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
+ to select smallest of global and specific. Sets default global_delay_factor to .1
+ (default: False)
+ :type fast_cli: boolean
+
+ :param session_log: File path or BufferedIOBase subclass object to write the session log to.
+ :type session_log: str
+
+ :param session_log_record_writes: The session log generally only records channel reads due
+ to eliminate command duplication due to command echo. You can enable this if you
+ want to record both channel reads and channel writes in the log (default: False).
+ :type session_log_record_writes: boolean
+
+ :param session_log_file_mode: "write" or "append" for session_log file mode
+ (default: "write")
+ :type session_log_file_mode: str
+
+ :param allow_auto_change: Allow automatic configuration changes for terminal settings.
+ (default: False)
+ :type allow_auto_change: bool
+
+ :param encoding: Encoding to be used when writing bytes to the output channel.
+ (default: ascii)
+ :type encoding: str
+
+ :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
+ communication to the target host (default: None).
+ :type sock: socket
+
+ :param global_cmd_verify: Control whether command echo verification is enabled or disabled
+ (default: None). Global attribute takes precedence over function `cmd_verify`
+ argument. Value of `None` indicates to use function `cmd_verify` argument.
+ :type global_cmd_verify: bool|None
+
+
+Source code
+
class HuaweiSmartAXSSH(CiscoBaseConnection):
+ """Supports Huawei SmartAX and OLT."""
+
+ def session_preparation(self):
+ """Prepare the session after the connection has been established."""
+ self.ansi_escape_codes = True
+ self._test_channel_read()
+ self.set_base_prompt()
+ self._disable_smart_interaction()
+ self.disable_paging()
+ # Clear the read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
+
+ def strip_ansi_escape_codes(self, string_buffer):
+ """
+ Huawei does a strange thing where they add a space and then add ESC[1D
+ to move the cursor to the left one.
+ The extra space is problematic.
+ """
+ code_cursor_left = chr(27) + r"\[\d+D"
+ output = string_buffer
+ pattern = rf" {code_cursor_left}"
+ output = re.sub(pattern, "", output)
+
+ log.debug("Stripping ANSI escape codes")
+ log.debug(f"new_output = {output}")
+ log.debug(f"repr = {repr(output)}")
+ return super().strip_ansi_escape_codes(output)
+
+ def _disable_smart_interaction(self, command="undo smart", delay_factor=1):
+ """Disables the { <cr> } prompt to avoid having to sent a 2nd return after each command"""
+ delay_factor = self.select_delay_factor(delay_factor)
+ time.sleep(delay_factor * 0.1)
+ self.clear_buffer()
+ command = self.normalize_cmd(command)
+ log.debug("In disable_smart_interaction")
+ log.debug(f"Command: {command}")
+ self.write_channel(command)
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
+ log.debug(f"{output}")
+ log.debug("Exiting disable_smart_interaction")
+
+ def disable_paging(self, command="scroll"):
+ return super().disable_paging(command=command)
+
+ def config_mode(self, config_command="config", pattern=""):
+ """Enter configuration mode."""
+ return super().config_mode(config_command=config_command, pattern=pattern)
+
+ def check_config_mode(self, check_string=")#"):
+ return super().check_config_mode(check_string=check_string)
+
+ def exit_config_mode(self, exit_config="return"):
+ return super().exit_config_mode(exit_config=exit_config)
+
+ def check_enable_mode(self, check_string="#"):
+ return super().check_enable_mode(check_string=check_string)
+
+ def enable(self, cmd="enable", pattern="", re_flags=re.IGNORECASE):
+ return super().enable(cmd=cmd, pattern=pattern, re_flags=re_flags)
+
+ def set_base_prompt(self, pri_prompt_terminator=">", alt_prompt_terminator="#"):
+ return super().set_base_prompt(
+ pri_prompt_terminator=pri_prompt_terminator,
+ alt_prompt_terminator=alt_prompt_terminator,
+ )
+
+ def save_config(self, cmd="save", confirm=False, confirm_response=""):
+ """ Save Config for HuaweiSSH"""
+ return super().save_config(
+ cmd=cmd, confirm=confirm, confirm_response=confirm_response
+ )
Huawei does a strange thing where they add a space and then add ESC[1D
+to move the cursor to the left one.
+The extra space is problematic.
+
+Source code
+
def strip_ansi_escape_codes(self, string_buffer):
+ """
+ Huawei does a strange thing where they add a space and then add ESC[1D
+ to move the cursor to the left one.
+ The extra space is problematic.
+ """
+ code_cursor_left = chr(27) + r"\[\d+D"
+ output = string_buffer
+ pattern = rf" {code_cursor_left}"
+ output = re.sub(pattern, "", output)
+
+ log.debug("Stripping ANSI escape codes")
+ log.debug(f"new_output = {output}")
+ log.debug(f"repr = {repr(output)}")
+ return super().strip_ansi_escape_codes(output)
Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.
@@ -383,6 +383,7 @@
Functions
inline_transfer=False,
overwrite_file=False,
socket_timeout=10.0,
+ verify_file=None,
):
"""Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.
@@ -417,6 +418,10 @@
Functions
if not cisco_ios and inline_transfer:
raise ValueError("Inline Transfer only supported for Cisco IOS/Cisco IOS-XE")
+ # Replace disable_md5 argument with verify_file argument across time
+ if verify_file is None:
+ verify_file = not disable_md5
+
scp_args = {
"ssh_conn": ssh_conn,
"source_file": source_file,
@@ -432,13 +437,13 @@
Functions
with TransferClass(**scp_args) as scp_transfer:
if scp_transfer.check_file_exists():
if overwrite_file:
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
else:
# File exists, you can overwrite it, MD5 is wrong (transfer file)
verifyspace_and_transferfile(scp_transfer)
- if scp_transfer.compare_md5():
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError(
@@ -450,16 +455,16 @@
Functions
return transferred_and_notverified
else:
# File exists, but you can't overwrite it.
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
msg = "File already exists and overwrite_file is disabled"
raise ValueError(msg)
else:
verifyspace_and_transferfile(scp_transfer)
# File doesn't exist
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError("MD5 failure between source and destination files")
@@ -1477,18 +1482,16 @@
Classes
output = self.strip_prompt(output)
return output
- def establish_connection(self, width=None, height=None):
+ def establish_connection(self, width=511, height=1000):
"""Establish SSH connection to the network device
Timeout will generate a NetmikoTimeoutException
Authentication failure will generate a NetmikoAuthenticationException
- width and height are needed for Fortinet paging setting.
-
- :param width: Specified width of the VT100 terminal window
+ :param width: Specified width of the VT100 terminal window (default: 511)
:type width: int
- :param height: Specified height of the VT100 terminal window
+ :param height: Specified height of the VT100 terminal window (default: 1000)
:type height: int
"""
if self.protocol == "telnet":
@@ -1524,12 +1527,9 @@
Classes
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
- if width and height:
- self.remote_conn = self.remote_conn_pre.invoke_shell(
- term="vt100", width=width, height=height
- )
- else:
- self.remote_conn = self.remote_conn_pre.invoke_shell()
+ self.remote_conn = self.remote_conn_pre.invoke_shell(
+ term="vt100", width=width, height=height
+ )
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
@@ -1635,7 +1635,10 @@
Classes
log.debug(f"Command: {command}")
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
log.debug(f"{output}")
log.debug("Exiting disable_paging")
return output
@@ -1658,7 +1661,10 @@
Classes
command = self.normalize_cmd(command)
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
return output
def set_base_prompt(
@@ -2225,7 +2231,10 @@
Classes
if not self.check_config_mode():
self.write_channel(self.normalize_cmd(config_command))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(config_command.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(config_command.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if not self.check_config_mode():
@@ -2245,7 +2254,10 @@
Classes
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(exit_config.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(exit_config.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if self.check_config_mode():
@@ -2696,7 +2708,10 @@
Methods
if not self.check_config_mode():
self.write_channel(self.normalize_cmd(config_command))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(config_command.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(config_command.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if not self.check_config_mode():
@@ -2732,7 +2747,10 @@
Methods
log.debug(f"Command: {command}")
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
log.debug(f"{output}")
log.debug("Exiting disable_paging")
return output
def establish_connection(self, width=511, height=1000):
"""Establish SSH connection to the network device
Timeout will generate a NetmikoTimeoutException
Authentication failure will generate a NetmikoAuthenticationException
- width and height are needed for Fortinet paging setting.
-
- :param width: Specified width of the VT100 terminal window
+ :param width: Specified width of the VT100 terminal window (default: 511)
:type width: int
- :param height: Specified height of the VT100 terminal window
+ :param height: Specified height of the VT100 terminal window (default: 1000)
:type height: int
"""
if self.protocol == "telnet":
@@ -2870,12 +2885,9 @@
Methods
print(f"SSH connection established to {self.host}:{self.port}")
# Use invoke_shell to establish an 'interactive session'
- if width and height:
- self.remote_conn = self.remote_conn_pre.invoke_shell(
- term="vt100", width=width, height=height
- )
- else:
- self.remote_conn = self.remote_conn_pre.invoke_shell()
+ self.remote_conn = self.remote_conn_pre.invoke_shell(
+ term="vt100", width=width, height=height
+ )
self.remote_conn.settimeout(self.blocking_timeout)
if self.keepalive:
@@ -2910,7 +2922,10 @@
Methods
if self.check_config_mode():
self.write_channel(self.normalize_cmd(exit_config))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output += self.read_until_pattern(pattern=re.escape(exit_config.strip()))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(
+ pattern=re.escape(exit_config.strip())
+ )
if not re.search(pattern, output, flags=re.M):
output += self.read_until_pattern(pattern=pattern)
if self.check_config_mode():
@@ -3909,7 +3924,10 @@
Methods
command = self.normalize_cmd(command)
self.write_channel(command)
# Make sure you read until you detect the command echo (avoid getting out of sync)
- output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ if self.global_cmd_verify is not False:
+ output = self.read_until_pattern(pattern=re.escape(command.strip()))
+ else:
+ output = self.read_until_prompt()
return output
@@ -4834,7 +4852,45 @@
Methods
else:
return cached_results
- def _autodetect_std(self, cmd="", search_patterns=None, re_flags=re.I, priority=99):
+ def _autodetect_remote_version(
+ self, search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
+ """
+ Method to try auto-detect the device type, by matching a regular expression on the reported
+ remote version of the SSH server.
+
+ Parameters
+ ----------
+ search_patterns : list
+ A list of regular expression to look for in the reported remote SSH version
+ (default: None).
+ re_flags: re.flags, optional
+ Any flags from the python re module to modify the regular expression (default: re.I).
+ priority: int, optional
+ The confidence the match is right between 0 and 99 (default: 99).
+ """
+ invalid_responses = [r"^$"]
+
+ if not search_patterns:
+ return 0
+
+ try:
+ remote_version = self.connection.remote_conn.transport.remote_version
+ for pattern in invalid_responses:
+ match = re.search(pattern, remote_version, flags=re.I)
+ if match:
+ return 0
+ for pattern in search_patterns:
+ match = re.search(pattern, remote_version, flags=re_flags)
+ if match:
+ return priority
+ except Exception:
+ return 0
+ return 0
+
+ def _autodetect_std(
+ self, cmd="", search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
"""
Standard method to try to auto-detect the device type. This method will be called for each
device_type present in SSH_MAPPER_BASE dict ('dispatch' key). It will attempt to send a
diff --git a/docs/netmiko/nokia/index.html b/docs/netmiko/nokia/index.html
index 2043abd0d..fc69b2878 100644
--- a/docs/netmiko/nokia/index.html
+++ b/docs/netmiko/nokia/index.html
@@ -22,9 +22,9 @@
Module netmiko.nokia
Source code
-
from netmiko.nokia.nokia_sros_ssh import NokiaSrosSSH
+
from netmiko.nokia.nokia_sros_ssh import NokiaSrosSSH, NokiaSrosFileTransfer
-__all__ = ["NokiaSrosSSH"]
# "@" indicates model-driven CLI (vs Classical CLI)
if "@" in self.base_prompt:
self.disable_paging(command="environment more false")
+ # To perform file operations we need to disable paging in classical-CLI also
+ self.disable_paging(command="//environment no more")
self.set_terminal_width(command="environment console width 512")
else:
self.disable_paging(command="environment no more")
@@ -266,7 +451,10 @@
Classes
output += self._discard()
cmd = "quit-config"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(cmd))
+ else:
+ output += self.read_until_prompt()
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
return output
@@ -301,16 +489,25 @@
Classes
log.info("Apply uncommitted changes!")
cmd = "commit"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
- output += self.read_until_pattern(r"@")
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
+ if "@" not in new_output:
+ new_output += self.read_until_pattern(r"@")
+ output += new_output
return output
def _exit_all(self):
"""Return to the 'root' context."""
+ output = ""
exit_cmd = "exit all"
self.write_channel(self.normalize_cmd(exit_cmd))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- return self.read_until_pattern(pattern=re.escape(exit_cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(exit_cmd))
+ else:
+ output += self.read_until_prompt()
+ return output
def _discard(self):
"""Discard changes from private candidate for Nokia SR OS"""
@@ -318,7 +515,9 @@
Classes
if "@" in self.base_prompt:
cmd = "discard"
self.write_channel(self.normalize_cmd(cmd))
- new_output = self.read_until_pattern(pattern=re.escape(cmd))
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
if "@" not in new_output:
new_output += self.read_until_prompt()
output += new_output
@@ -415,8 +614,12 @@
Methods
log.info("Apply uncommitted changes!")
cmd = "commit"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
- output += self.read_until_pattern(r"@")
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
+ if "@" not in new_output:
+ new_output += self.read_until_pattern(r"@")
+ output += new_output
return output
@@ -468,7 +671,10 @@
Methods
output += self._discard()
cmd = "quit-config"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(cmd))
+ else:
+ output += self.read_until_prompt()
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
return output
@@ -610,6 +816,13 @@
# https://github.com/ktbyers/netmiko/blob/develop/LICENSE
import re
+import os
import time
from netmiko import log
from netmiko.base_connection import BaseConnection
+from netmiko.scp_handler import BaseFileTransfer
class NokiaSrosSSH(BaseConnection):
@@ -63,6 +65,8 @@
Module netmiko.nokia.nokia_sros_ssh
# "@" indicates model-driven CLI (vs Classical CLI)
if "@" in self.base_prompt:
self.disable_paging(command="environment more false")
+ # To perform file operations we need to disable paging in classical-CLI also
+ self.disable_paging(command="//environment no more")
self.set_terminal_width(command="environment console width 512")
else:
self.disable_paging(command="environment no more")
@@ -113,7 +117,10 @@
Module netmiko.nokia.nokia_sros_ssh
output += self._discard()
cmd = "quit-config"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(cmd))
+ else:
+ output += self.read_until_prompt()
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
return output
@@ -148,16 +155,25 @@
Module netmiko.nokia.nokia_sros_ssh
log.info("Apply uncommitted changes!")
cmd = "commit"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
- output += self.read_until_pattern(r"@")
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
+ if "@" not in new_output:
+ new_output += self.read_until_pattern(r"@")
+ output += new_output
return output
def _exit_all(self):
"""Return to the 'root' context."""
+ output = ""
exit_cmd = "exit all"
self.write_channel(self.normalize_cmd(exit_cmd))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- return self.read_until_pattern(pattern=re.escape(exit_cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(exit_cmd))
+ else:
+ output += self.read_until_prompt()
+ return output
def _discard(self):
"""Discard changes from private candidate for Nokia SR OS"""
@@ -165,7 +181,9 @@
Module netmiko.nokia.nokia_sros_ssh
if "@" in self.base_prompt:
cmd = "discard"
self.write_channel(self.normalize_cmd(cmd))
- new_output = self.read_until_pattern(pattern=re.escape(cmd))
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
if "@" not in new_output:
new_output += self.read_until_prompt()
output += new_output
@@ -191,7 +209,106 @@
Module netmiko.nokia.nokia_sros_ssh
pass
# Always try to send final 'logout'.
self._session_log_fin = True
- self.write_channel(command + self.RETURN)
+ self.write_channel(command + self.RETURN)
+
+
+class NokiaSrosFileTransfer(BaseFileTransfer):
+ def __init__(
+ self, ssh_conn, source_file, dest_file, hash_supported=False, **kwargs
+ ):
+ super().__init__(
+ ssh_conn, source_file, dest_file, hash_supported=hash_supported, **kwargs
+ )
+
+ def _file_cmd_prefix(self):
+ """
+ Allow MD-CLI to execute file operations by using classical CLI.
+
+ Returns "//" if the current prompt is MD-CLI (empty string otherwise).
+ """
+ return "//" if "@" in self.ssh_ctl_chan.base_prompt else ""
+
+ def remote_space_available(self, search_pattern=r"(\d+)\s+\w+\s+free"):
+ """Return space available on remote device."""
+
+ # Sample text for search_pattern.
+ # " 3 Dir(s) 961531904 bytes free."
+ remote_cmd = self._file_cmd_prefix() + "file dir {}".format(self.file_system)
+ remote_output = self.ssh_ctl_chan.send_command(remote_cmd)
+ match = re.search(search_pattern, remote_output)
+ return int(match.group(1))
+
+ def check_file_exists(self, remote_cmd=""):
+ """Check if destination file exists (returns boolean)."""
+
+ if self.direction == "put":
+ if not remote_cmd:
+ remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
+ self.file_system, self.dest_file
+ )
+ dest_file_name = self.dest_file.replace("\\", "/").split("/")[-1]
+ remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
+ if "File Not Found" in remote_out:
+ return False
+ elif dest_file_name in remote_out:
+ return True
+ else:
+ raise ValueError("Unexpected output from check_file_exists")
+ elif self.direction == "get":
+ return os.path.exists(self.dest_file)
+
+ def remote_file_size(self, remote_cmd=None, remote_file=None):
+ """Get the file size of the remote file."""
+
+ if remote_file is None:
+ if self.direction == "put":
+ remote_file = self.dest_file
+ elif self.direction == "get":
+ remote_file = self.source_file
+ if not remote_cmd:
+ remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
+ self.file_system, remote_file
+ )
+ remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
+
+ if "File Not Found" in remote_out:
+ raise IOError("Unable to find file on remote system")
+
+ # Parse dir output for filename. Output format is:
+ # "10/16/2019 10:00p 6738 {filename}"
+
+ pattern = r"\S+\s+\S+\s+(\d+)\s+{}".format(re.escape(remote_file))
+ match = re.search(pattern, remote_out)
+
+ if not match:
+ raise ValueError("Filename entry not found in dir output")
+
+ file_size = int(match.group(1))
+ return file_size
+
+ def verify_file(self):
+ """Verify the file has been transferred correctly based on filesize."""
+ if self.direction == "put":
+ return os.stat(self.source_file).st_size == self.remote_file_size(
+ remote_file=self.dest_file
+ )
+ elif self.direction == "get":
+ return (
+ self.remote_file_size(remote_file=self.source_file)
+ == os.stat(self.dest_file).st_size
+ )
+
+ def file_md5(self, **kwargs):
+ raise AttributeError("SR-OS does not support an MD5-hash operation.")
+
+ def process_md5(self, **kwargs):
+ raise AttributeError("SR-OS does not support an MD5-hash operation.")
+
+ def compare_md5(self, **kwargs):
+ raise AttributeError("SR-OS does not support an MD5-hash operation.")
+
+ def remote_md5(self, **kwargs):
+ raise AttributeError("SR-OS does not support an MD5-hash operation.")
# "@" indicates model-driven CLI (vs Classical CLI)
if "@" in self.base_prompt:
self.disable_paging(command="environment more false")
+ # To perform file operations we need to disable paging in classical-CLI also
+ self.disable_paging(command="//environment no more")
self.set_terminal_width(command="environment console width 512")
else:
self.disable_paging(command="environment no more")
@@ -426,7 +728,10 @@
Classes
output += self._discard()
cmd = "quit-config"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(cmd))
+ else:
+ output += self.read_until_prompt()
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
return output
@@ -461,16 +766,25 @@
Classes
log.info("Apply uncommitted changes!")
cmd = "commit"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
- output += self.read_until_pattern(r"@")
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
+ if "@" not in new_output:
+ new_output += self.read_until_pattern(r"@")
+ output += new_output
return output
def _exit_all(self):
"""Return to the 'root' context."""
+ output = ""
exit_cmd = "exit all"
self.write_channel(self.normalize_cmd(exit_cmd))
# Make sure you read until you detect the command echo (avoid getting out of sync)
- return self.read_until_pattern(pattern=re.escape(exit_cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(exit_cmd))
+ else:
+ output += self.read_until_prompt()
+ return output
def _discard(self):
"""Discard changes from private candidate for Nokia SR OS"""
@@ -478,7 +792,9 @@
Classes
if "@" in self.base_prompt:
cmd = "discard"
self.write_channel(self.normalize_cmd(cmd))
- new_output = self.read_until_pattern(pattern=re.escape(cmd))
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
if "@" not in new_output:
new_output += self.read_until_prompt()
output += new_output
@@ -575,8 +891,12 @@
Methods
log.info("Apply uncommitted changes!")
cmd = "commit"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
- output += self.read_until_pattern(r"@")
+ new_output = ""
+ if self.global_cmd_verify is not False:
+ new_output += self.read_until_pattern(pattern=re.escape(cmd))
+ if "@" not in new_output:
+ new_output += self.read_until_pattern(r"@")
+ output += new_output
return output
@@ -628,7 +948,10 @@
Methods
output += self._discard()
cmd = "quit-config"
self.write_channel(self.normalize_cmd(cmd))
- output += self.read_until_pattern(pattern=re.escape(cmd))
+ if self.global_cmd_verify is not False:
+ output += self.read_until_pattern(pattern=re.escape(cmd))
+ else:
+ output += self.read_until_prompt()
if self.check_config_mode():
raise ValueError("Failed to exit configuration mode")
return output
@@ -765,6 +1088,13 @@
inline_transfer=False,
overwrite_file=False,
socket_timeout=10.0,
+ verify_file=None,
):
"""Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.
@@ -89,6 +90,10 @@
Module netmiko.scp_functions
if not cisco_ios and inline_transfer:
raise ValueError("Inline Transfer only supported for Cisco IOS/Cisco IOS-XE")
+ # Replace disable_md5 argument with verify_file argument across time
+ if verify_file is None:
+ verify_file = not disable_md5
+
scp_args = {
"ssh_conn": ssh_conn,
"source_file": source_file,
@@ -104,13 +109,13 @@
Module netmiko.scp_functions
with TransferClass(**scp_args) as scp_transfer:
if scp_transfer.check_file_exists():
if overwrite_file:
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
else:
# File exists, you can overwrite it, MD5 is wrong (transfer file)
verifyspace_and_transferfile(scp_transfer)
- if scp_transfer.compare_md5():
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError(
@@ -122,16 +127,16 @@
Module netmiko.scp_functions
return transferred_and_notverified
else:
# File exists, but you can't overwrite it.
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
msg = "File already exists and overwrite_file is disabled"
raise ValueError(msg)
else:
verifyspace_and_transferfile(scp_transfer)
# File doesn't exist
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError("MD5 failure between source and destination files")
@@ -147,7 +152,7 @@
Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.
@@ -169,6 +174,7 @@
Functions
inline_transfer=False,
overwrite_file=False,
socket_timeout=10.0,
+ verify_file=None,
):
"""Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.
@@ -203,6 +209,10 @@
Functions
if not cisco_ios and inline_transfer:
raise ValueError("Inline Transfer only supported for Cisco IOS/Cisco IOS-XE")
+ # Replace disable_md5 argument with verify_file argument across time
+ if verify_file is None:
+ verify_file = not disable_md5
+
scp_args = {
"ssh_conn": ssh_conn,
"source_file": source_file,
@@ -218,13 +228,13 @@
Functions
with TransferClass(**scp_args) as scp_transfer:
if scp_transfer.check_file_exists():
if overwrite_file:
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
else:
# File exists, you can overwrite it, MD5 is wrong (transfer file)
verifyspace_and_transferfile(scp_transfer)
- if scp_transfer.compare_md5():
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError(
@@ -236,16 +246,16 @@
Functions
return transferred_and_notverified
else:
# File exists, but you can't overwrite it.
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return nottransferred_but_verified
msg = "File already exists and overwrite_file is disabled"
raise ValueError(msg)
else:
verifyspace_and_transferfile(scp_transfer)
# File doesn't exist
- if not disable_md5:
- if scp_transfer.compare_md5():
+ if verify_file:
+ if scp_transfer.verify_file():
return transferred_and_verified
else:
raise ValueError("MD5 failure between source and destination files")
diff --git a/docs/netmiko/scp_handler.html b/docs/netmiko/scp_handler.html
index 20604c85b..bb2d1d044 100644
--- a/docs/netmiko/scp_handler.html
+++ b/docs/netmiko/scp_handler.html
@@ -92,6 +92,7 @@
Netmiko connection creation section
else:
return cached_results
- def _autodetect_std(self, cmd="", search_patterns=None, re_flags=re.I, priority=99):
+ def _autodetect_remote_version(
+ self, search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
+ """
+ Method to try auto-detect the device type, by matching a regular expression on the reported
+ remote version of the SSH server.
+
+ Parameters
+ ----------
+ search_patterns : list
+ A list of regular expression to look for in the reported remote SSH version
+ (default: None).
+ re_flags: re.flags, optional
+ Any flags from the python re module to modify the regular expression (default: re.I).
+ priority: int, optional
+ The confidence the match is right between 0 and 99 (default: 99).
+ """
+ invalid_responses = [r"^$"]
+
+ if not search_patterns:
+ return 0
+
+ try:
+ remote_version = self.connection.remote_conn.transport.remote_version
+ for pattern in invalid_responses:
+ match = re.search(pattern, remote_version, flags=re.I)
+ if match:
+ return 0
+ for pattern in search_patterns:
+ match = re.search(pattern, remote_version, flags=re_flags)
+ if match:
+ return priority
+ except Exception:
+ return 0
+ return 0
+
+ def _autodetect_std(
+ self, cmd="", search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
"""
Standard method to try to auto-detect the device type. This method will be called for each
device_type present in SSH_MAPPER_BASE dict ('dispatch' key). It will attempt to send a
@@ -550,7 +614,45 @@
Methods
else:
return cached_results
- def _autodetect_std(self, cmd="", search_patterns=None, re_flags=re.I, priority=99):
+ def _autodetect_remote_version(
+ self, search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
+ """
+ Method to try auto-detect the device type, by matching a regular expression on the reported
+ remote version of the SSH server.
+
+ Parameters
+ ----------
+ search_patterns : list
+ A list of regular expression to look for in the reported remote SSH version
+ (default: None).
+ re_flags: re.flags, optional
+ Any flags from the python re module to modify the regular expression (default: re.I).
+ priority: int, optional
+ The confidence the match is right between 0 and 99 (default: 99).
+ """
+ invalid_responses = [r"^$"]
+
+ if not search_patterns:
+ return 0
+
+ try:
+ remote_version = self.connection.remote_conn.transport.remote_version
+ for pattern in invalid_responses:
+ match = re.search(pattern, remote_version, flags=re.I)
+ if match:
+ return 0
+ for pattern in search_patterns:
+ match = re.search(pattern, remote_version, flags=re_flags)
+ if match:
+ return priority
+ except Exception:
+ return 0
+ return 0
+
+ def _autodetect_std(
+ self, cmd="", search_patterns=None, re_flags=re.IGNORECASE, priority=99
+ ):
"""
Standard method to try to auto-detect the device type. This method will be called for each
device_type present in SSH_MAPPER_BASE dict ('dispatch' key). It will attempt to send a
diff --git a/docs/netmiko/ubiquiti/edge_ssh.html b/docs/netmiko/ubiquiti/edge_ssh.html
index a36cd4019..66c4ae32d 100644
--- a/docs/netmiko/ubiquiti/edge_ssh.html
+++ b/docs/netmiko/ubiquiti/edge_ssh.html
@@ -264,6 +264,10 @@
Implements support for Ubiquity EdgeSwitch devices.
+
Mostly conforms to Cisco IOS style syntax with a few minor changes.
+
This is NOT for EdgeRouter devices.
+
Initialize attributes for establishing connection to target device.
+
+ :param ip: IP address of target device. Not required if `host` is
+ provided.
+ :type ip: str
+
+ :param host: Hostname of target device. Not required if `ip` is
+ provided.
+ :type host: str
+
+ :param username: Username to authenticate against target device if
+ required.
+ :type username: str
+
+ :param password: Password to authenticate against target device if
+ required.
+ :type password: str
+
+ :param secret: The enable password if target device requires one.
+ :type secret: str
+
+ :param port: The destination port used to connect to the target
+ device.
+ :type port: int or None
+
+ :param device_type: Class selection based on device type.
+ :type device_type: str
+
+ :param verbose: Enable additional messages to standard output.
+ :type verbose: bool
+
+ :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
+ :type global_delay_factor: int
+
+ :param use_keys: Connect to target device using SSH keys.
+ :type use_keys: bool
+
+ :param key_file: Filename path of the SSH key file to use.
+ :type key_file: str
+
+ :param pkey: SSH key object to use.
+ :type pkey: paramiko.PKey
+
+ :param passphrase: Passphrase to use for encrypted key; password will be used for key
+ decryption if not specified.
+ :type passphrase: str
+
+ :param allow_agent: Enable use of SSH key-agent.
+ :type allow_agent: bool
+
+ :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
+ means unknown SSH host keys will be accepted).
+ :type ssh_strict: bool
+
+ :param system_host_keys: Load host keys from the users known_hosts file.
+ :type system_host_keys: bool
+ :param alt_host_keys: If `True` host keys will be loaded from the file specified in
+ alt_key_file.
+ :type alt_host_keys: bool
+
+ :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
+ :type alt_key_file: str
+
+ :param ssh_config_file: File name of OpenSSH configuration file.
+ :type ssh_config_file: str
+
+ :param timeout: Connection timeout.
+ :type timeout: float
+
+ :param session_timeout: Set a timeout for parallel requests.
+ :type session_timeout: float
+
+ :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
+ :type auth_timeout: float
+
+ :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
+ :type banner_timeout: float
+
+ :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
+ Currently defaults to 0, for backwards compatibility (it will not attempt
+ to keep the connection alive).
+ :type keepalive: int
+
+ :param default_enter: Character(s) to send to correspond to enter key (default:
+
+
).
+:type default_enter: str
+
:param response_return: Character(s) to use in normalized return data to represent
+ enter key (default:
+
+
)
+:type response_return: str
+
:param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
+ to select smallest of global and specific. Sets default global_delay_factor to .1
+ (default: False)
+ :type fast_cli: boolean
+
+ :param session_log: File path or BufferedIOBase subclass object to write the session log to.
+ :type session_log: str
+
+ :param session_log_record_writes: The session log generally only records channel reads due
+ to eliminate command duplication due to command echo. You can enable this if you
+ want to record both channel reads and channel writes in the log (default: False).
+ :type session_log_record_writes: boolean
+
+ :param session_log_file_mode: "write" or "append" for session_log file mode
+ (default: "write")
+ :type session_log_file_mode: str
+
+ :param allow_auto_change: Allow automatic configuration changes for terminal settings.
+ (default: False)
+ :type allow_auto_change: bool
+
+ :param encoding: Encoding to be used when writing bytes to the output channel.
+ (default: ascii)
+ :type encoding: str
+
+ :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
+ communication to the target host (default: None).
+ :type sock: socket
+
+ :param global_cmd_verify: Control whether command echo verification is enabled or disabled
+ (default: None). Global attribute takes precedence over function `cmd_verify`
+ argument. Value of `None` indicates to use function `cmd_verify` argument.
+ :type global_cmd_verify: bool|None
+
+
+Source code
+
class UbiquitiUnifiSwitchSSH(UbiquitiEdgeSSH):
+ def session_preparation(self):
+ """
+ Prepare the session after the connection has been established.
+ When SSHing to a UniFi switch, the session initially starts at a Linux
+ shell. Nothing interesting can be done in this environment, however,
+ running `telnet localhost` drops the session to a more familiar
+ environment.
+ """
+
+ self._test_channel_read()
+ self.set_base_prompt()
+ self.send_command(
+ command_string="telnet localhost", expect_string=r"\(UBNT\) >"
+ )
+ self.set_base_prompt()
+ self.enable()
+ self.disable_paging()
+
+ # Clear read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
+
+ def cleanup(self, command="exit"):
+ """Gracefully exit the SSH session."""
+ try:
+ # The pattern="" forces use of send_command_timing
+ if self.check_config_mode(pattern=""):
+ self.exit_config_mode()
+
+ # Exit from the first 'telnet localhost'
+ self.write_channel(command + self.RETURN)
+ except Exception:
+ pass
+
+ super().cleanup()
Prepare the session after the connection has been established.
+When SSHing to a UniFi switch, the session initially starts at a Linux
+shell. Nothing interesting can be done in this environment, however,
+running telnet localhost drops the session to a more familiar
+environment.
+
+Source code
+
def session_preparation(self):
+ """
+ Prepare the session after the connection has been established.
+ When SSHing to a UniFi switch, the session initially starts at a Linux
+ shell. Nothing interesting can be done in this environment, however,
+ running `telnet localhost` drops the session to a more familiar
+ environment.
+ """
+
+ self._test_channel_read()
+ self.set_base_prompt()
+ self.send_command(
+ command_string="telnet localhost", expect_string=r"\(UBNT\) >"
+ )
+ self.set_base_prompt()
+ self.enable()
+ self.disable_paging()
+
+ # Clear read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
import time
+from netmiko.ubiquiti.edge_ssh import UbiquitiEdgeSSH
+
+
+class UbiquitiUnifiSwitchSSH(UbiquitiEdgeSSH):
+ def session_preparation(self):
+ """
+ Prepare the session after the connection has been established.
+ When SSHing to a UniFi switch, the session initially starts at a Linux
+ shell. Nothing interesting can be done in this environment, however,
+ running `telnet localhost` drops the session to a more familiar
+ environment.
+ """
+
+ self._test_channel_read()
+ self.set_base_prompt()
+ self.send_command(
+ command_string="telnet localhost", expect_string=r"\(UBNT\) >"
+ )
+ self.set_base_prompt()
+ self.enable()
+ self.disable_paging()
+
+ # Clear read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
+
+ def cleanup(self, command="exit"):
+ """Gracefully exit the SSH session."""
+ try:
+ # The pattern="" forces use of send_command_timing
+ if self.check_config_mode(pattern=""):
+ self.exit_config_mode()
+
+ # Exit from the first 'telnet localhost'
+ self.write_channel(command + self.RETURN)
+ except Exception:
+ pass
+
+ super().cleanup()
Implements support for Ubiquity EdgeSwitch devices.
+
Mostly conforms to Cisco IOS style syntax with a few minor changes.
+
This is NOT for EdgeRouter devices.
+
Initialize attributes for establishing connection to target device.
+
+ :param ip: IP address of target device. Not required if `host` is
+ provided.
+ :type ip: str
+
+ :param host: Hostname of target device. Not required if `ip` is
+ provided.
+ :type host: str
+
+ :param username: Username to authenticate against target device if
+ required.
+ :type username: str
+
+ :param password: Password to authenticate against target device if
+ required.
+ :type password: str
+
+ :param secret: The enable password if target device requires one.
+ :type secret: str
+
+ :param port: The destination port used to connect to the target
+ device.
+ :type port: int or None
+
+ :param device_type: Class selection based on device type.
+ :type device_type: str
+
+ :param verbose: Enable additional messages to standard output.
+ :type verbose: bool
+
+ :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
+ :type global_delay_factor: int
+
+ :param use_keys: Connect to target device using SSH keys.
+ :type use_keys: bool
+
+ :param key_file: Filename path of the SSH key file to use.
+ :type key_file: str
+
+ :param pkey: SSH key object to use.
+ :type pkey: paramiko.PKey
+
+ :param passphrase: Passphrase to use for encrypted key; password will be used for key
+ decryption if not specified.
+ :type passphrase: str
+
+ :param allow_agent: Enable use of SSH key-agent.
+ :type allow_agent: bool
+
+ :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
+ means unknown SSH host keys will be accepted).
+ :type ssh_strict: bool
+
+ :param system_host_keys: Load host keys from the users known_hosts file.
+ :type system_host_keys: bool
+ :param alt_host_keys: If `True` host keys will be loaded from the file specified in
+ alt_key_file.
+ :type alt_host_keys: bool
+
+ :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
+ :type alt_key_file: str
+
+ :param ssh_config_file: File name of OpenSSH configuration file.
+ :type ssh_config_file: str
+
+ :param timeout: Connection timeout.
+ :type timeout: float
+
+ :param session_timeout: Set a timeout for parallel requests.
+ :type session_timeout: float
+
+ :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
+ :type auth_timeout: float
+
+ :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
+ :type banner_timeout: float
+
+ :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
+ Currently defaults to 0, for backwards compatibility (it will not attempt
+ to keep the connection alive).
+ :type keepalive: int
+
+ :param default_enter: Character(s) to send to correspond to enter key (default:
+
+
).
+:type default_enter: str
+
:param response_return: Character(s) to use in normalized return data to represent
+ enter key (default:
+
+
)
+:type response_return: str
+
:param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
+ to select smallest of global and specific. Sets default global_delay_factor to .1
+ (default: False)
+ :type fast_cli: boolean
+
+ :param session_log: File path or BufferedIOBase subclass object to write the session log to.
+ :type session_log: str
+
+ :param session_log_record_writes: The session log generally only records channel reads due
+ to eliminate command duplication due to command echo. You can enable this if you
+ want to record both channel reads and channel writes in the log (default: False).
+ :type session_log_record_writes: boolean
+
+ :param session_log_file_mode: "write" or "append" for session_log file mode
+ (default: "write")
+ :type session_log_file_mode: str
+
+ :param allow_auto_change: Allow automatic configuration changes for terminal settings.
+ (default: False)
+ :type allow_auto_change: bool
+
+ :param encoding: Encoding to be used when writing bytes to the output channel.
+ (default: ascii)
+ :type encoding: str
+
+ :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
+ communication to the target host (default: None).
+ :type sock: socket
+
+ :param global_cmd_verify: Control whether command echo verification is enabled or disabled
+ (default: None). Global attribute takes precedence over function `cmd_verify`
+ argument. Value of `None` indicates to use function `cmd_verify` argument.
+ :type global_cmd_verify: bool|None
+
+
+Source code
+
class UbiquitiUnifiSwitchSSH(UbiquitiEdgeSSH):
+ def session_preparation(self):
+ """
+ Prepare the session after the connection has been established.
+ When SSHing to a UniFi switch, the session initially starts at a Linux
+ shell. Nothing interesting can be done in this environment, however,
+ running `telnet localhost` drops the session to a more familiar
+ environment.
+ """
+
+ self._test_channel_read()
+ self.set_base_prompt()
+ self.send_command(
+ command_string="telnet localhost", expect_string=r"\(UBNT\) >"
+ )
+ self.set_base_prompt()
+ self.enable()
+ self.disable_paging()
+
+ # Clear read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()
+
+ def cleanup(self, command="exit"):
+ """Gracefully exit the SSH session."""
+ try:
+ # The pattern="" forces use of send_command_timing
+ if self.check_config_mode(pattern=""):
+ self.exit_config_mode()
+
+ # Exit from the first 'telnet localhost'
+ self.write_channel(command + self.RETURN)
+ except Exception:
+ pass
+
+ super().cleanup()
Prepare the session after the connection has been established.
+When SSHing to a UniFi switch, the session initially starts at a Linux
+shell. Nothing interesting can be done in this environment, however,
+running telnet localhost drops the session to a more familiar
+environment.
+
+Source code
+
def session_preparation(self):
+ """
+ Prepare the session after the connection has been established.
+ When SSHing to a UniFi switch, the session initially starts at a Linux
+ shell. Nothing interesting can be done in this environment, however,
+ running `telnet localhost` drops the session to a more familiar
+ environment.
+ """
+
+ self._test_channel_read()
+ self.set_base_prompt()
+ self.send_command(
+ command_string="telnet localhost", expect_string=r"\(UBNT\) >"
+ )
+ self.set_base_prompt()
+ self.enable()
+ self.disable_paging()
+
+ # Clear read buffer
+ time.sleep(0.3 * self.global_delay_factor)
+ self.clear_buffer()