Skip to content

Commit

Permalink
Add support for adding banner to TelnetServiceMock. (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
fwkz authored Dec 16, 2017
1 parent 148fcf0 commit 95bf6b4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
19 changes: 19 additions & 0 deletions tests/service_mocks/test_telnet_service_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,22 @@ def test_telnet_service_mock_add_credentials():

assert match_object, foo.decode()
tn.close()


def test_telnet_service_mock_add_banner():
with TelnetServiceMock("127.0.0.1", 8023,
scenario=TelnetScenario.GENERIC) as target:
banner = b"Scoobeedoobeedoo where are you?"
target.add_banner(banner)

assert target.host == "127.0.0.1"
assert target.port == 8023

tn = Telnet(target.host, target.port, timeout=1.0)

_, match_object, _ = tn.expect([banner], 1.0)
assert match_object

_, match_object, foo = tn.expect([b"Login: ", b"login: "], 1.0)
assert match_object, foo.decode()
tn.close()
5 changes: 5 additions & 0 deletions threat9_test_bed/service_mocks/telnet_service_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ def get_command_mock(self, command: str):
return command_mock

def add_credentials(self, login: str, password: str):
""" Add custom credentials pair. """
self.protocol.add_credentials(login, password)

def add_banner(self, banner: bytes):
""" Add welcoming banner after connection. """
self.protocol.add_banner(banner)
6 changes: 6 additions & 0 deletions threat9_test_bed/telnet_service/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, scenario: TelnetScenario):
self.password = None
self.authorized = False

self.banner = b""
self._command_mocks = {}
self._creds = [
("admin", "admin"),
Expand All @@ -73,6 +74,8 @@ def connection_made(self, transport: asyncio.Transport):
self.remote_address = transport.get_extra_info("peername")
logger.debug(f"Connection from {self.remote_address}")
self.transport = transport
if self.banner:
self.transport.write(self.banner + b"\r\n")
self.transport.write(b"Login: ")

@authorized
Expand All @@ -91,3 +94,6 @@ def add_command_handler(self, command: str, handler: typing.Callable):

def add_credentials(self, login: str, password: str):
self._creds.append((login, password))

def add_banner(self, banner: bytes):
self.banner = banner

0 comments on commit 95bf6b4

Please sign in to comment.