diff --git a/src/scmrepo/git/backend/dulwich/asyncssh_vendor.py b/src/scmrepo/git/backend/dulwich/asyncssh_vendor.py index 93360441..b0a0c311 100644 --- a/src/scmrepo/git/backend/dulwich/asyncssh_vendor.py +++ b/src/scmrepo/git/backend/dulwich/asyncssh_vendor.py @@ -298,7 +298,7 @@ async def _run_command( host, port=port if port is not None else (), username=username if username is not None else (), - password=password if password is not None else (), + password=password, client_keys=[key_filename] if key_filename else (), ignore_encrypted=not key_filename, known_hosts=None, diff --git a/tests/test_dulwich.py b/tests/test_dulwich.py index 80195269..c9305eee 100644 --- a/tests/test_dulwich.py +++ b/tests/test_dulwich.py @@ -11,18 +11,74 @@ from pytest_mock import MockerFixture from pytest_test_utils.waiters import wait_until +from scmrepo.exceptions import AuthError from scmrepo.git.backend.dulwich.asyncssh_vendor import AsyncSSHVendor -from .vendor.test_paramiko_vendor import ( - CLIENT_KEY, - PASSWORD, - USER, - Server, -) - # pylint: disable=redefined-outer-name +USER = "testuser" +PASSWORD = "test" +CLIENT_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAxvREKSElPOm/0z/nPO+j5rk2tjdgGcGc7We1QZ6TRXYLu7nN +GeEFIL4p8N1i6dmB+Eydt7xqCU79MWD6Yy4prFe1+/K1wCDUxIbFMxqQcX5zjJzd +i8j8PbcaUlVhP/OkjtkSxrXaGDO1BzfdV4iEBtTV/2l3zmLKJlt3jnOHLczP24CB +DTQKp3rKshbRefzot9Y+wnaK692RsYgsyo9YEP0GyWKG9topCHk13r46J6vGLeuj +ryUKqmbLJkzbJbIcEqwTDo5iHaCVqaMr5Hrb8BdMucSseqZQJsXSd+9tdRcIblUQ +38kZjmFMm4SFbruJcpZCNM2wNSZPIRX+3eiwNwIDAQABAoIBAHSacOBSJsr+jIi5 +KUOTh9IPtzswVUiDKwARCjB9Sf8p4lKR4N1L/n9kNJyQhApeikgGT2GCMftmqgoo +tlculQoHFgemBlOmak0MV8NNzF5YKEy/GzF0CDH7gJfEpoyetVFrdA+2QS5yD6U9 +XqKQxiBi2VEqdScmyyeT8AwzNYTnPeH/DOEcnbdRjqiy/CD79F49CQ1lX1Fuqm0K +I7BivBH1xo/rVnUP4F+IzocDqoga+Pjdj0LTXIgJlHQDSbhsQqWujWQDDuKb+MAw +sNK4Zf8ErV3j1PyA7f/M5LLq6zgstkW4qikDHo4SpZX8kFOO8tjqb7kujj7XqeaB +CxqrOTECgYEA73uWkrohcmDJ4KqbuL3tbExSCOUiaIV+sT1eGPNi7GCmXD4eW5Z4 +75v2IHymW83lORSu/DrQ6sKr1nkuRpqr2iBzRmQpl/H+wahIhBXlnJ25uUjDsuPO +1Pq2LcmyD+jTxVnmbSe/q7O09gZQw3I6H4+BMHmpbf8tC97lqimzpJ0CgYEA1K0W +ZL70Xtn9quyHvbtae/BW07NZnxvUg4UaVIAL9Zu34JyplJzyzbIjrmlDbv6aRogH +/KtuG9tfbf55K/jjqNORiuRtzt1hUN1ye4dyW7tHx2/7lXdlqtyK40rQl8P0kqf8 +zaS6BqjnobgSdSpg32rWoL/pcBHPdJCJEgQ8zeMCgYEA0/PK8TOhNIzrP1dgGSKn +hkkJ9etuB5nW5mEM7gJDFDf6JPupfJ/xiwe6z0fjKK9S57EhqgUYMB55XYnE5iIw +ZQ6BV9SAZ4V7VsRs4dJLdNC3tn/rDGHJBgCaym2PlbsX6rvFT+h1IC8dwv0V79Ui +Ehq9WTzkMoE8yhvNokvkPZUCgYEAgBAFxv5xGdh79ftdtXLmhnDvZ6S8l6Fjcxqo +Ay/jg66Tp43OU226iv/0mmZKM8Dd1xC8dnon4GBVc19jSYYiWBulrRPlx0Xo/o+K +CzZBN1lrXH1i6dqufpc0jq8TMf/N+q1q/c1uMupsKCY1/xVYpc+ok71b7J7c49zQ +nOeuUW8CgYA9Infooy65FTgbzca0c9kbCUBmcAPQ2ItH3JcPKWPQTDuV62HcT00o +fZdIV47Nez1W5Clk191RMy8TXuqI54kocciUWpThc6j44hz49oUueb8U4bLcEHzA +WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w== +-----END RSA PRIVATE KEY-----""" + + +class Server(paramiko.ServerInterface): + """http://docs.paramiko.org/en/2.4/api/server.html.""" + + def __init__(self, commands, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.commands = commands + + def check_channel_exec_request(self, channel, command): + self.commands.append(command) + return True + + def check_auth_password(self, username, password): + if username == USER and password == PASSWORD: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_auth_publickey(self, username, key): + pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) + if username == USER and key == pubkey: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_channel_request(self, kind, chanid): + if kind == "session": + return paramiko.OPEN_SUCCEEDED + return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + + def get_allowed_auths(self, username): + return "password,publickey" + + @pytest.fixture def ssh_conn(request: pytest.FixtureRequest) -> dict[str, Any]: server = Server([]) @@ -77,6 +133,18 @@ def test_run_command_password(server: Server, ssh_port: int): assert b"test_run_command_password" in server.commands +def test_run_command_no_password(server: Server, ssh_port: int): + vendor = AsyncSSHVendor() + with pytest.raises(AuthError): + vendor.run_command( + "127.0.0.1", + "test_run_command_password", + username=USER, + port=ssh_port, + password=None, + ) + + def test_run_command_with_privkey(server: Server, ssh_port: int): key = asyncssh.import_private_key(CLIENT_KEY)