diff --git a/bcpandas/main.py b/bcpandas/main.py index 64a37de..67baabe 100644 --- a/bcpandas/main.py +++ b/bcpandas/main.py @@ -39,8 +39,9 @@ class SqlCreds: engine that uses `pyodbc` as the DBAPI, and store it in the `self.engine` attribute. If `username` and `password` are not provided, `with_krb_auth` will be `True`. + If `entra_id_token` are provided uses Microsoft Entra ID Authentication. - Only supports SQL based logins, not Active Directory or Azure AD. + Only supports SQL based logins and Microsoft Entra ID, not Active Directory. Parameters ---------- @@ -54,6 +55,8 @@ class SqlCreds: odbc_kwargs : dict of {str, str or int}, optional additional keyword arguments, to pass into ODBC connection string, such as Encrypted='yes' + entra_id_token: str, optional + Microsoft Entra ID Authentication token Returns ------- @@ -69,6 +72,7 @@ def __init__( driver_version: Optional[int] = None, port: int = 1433, odbc_kwargs: Optional[Dict[str, Union[str, int]]] = None, + entra_id_token: Optional[str] = None, ): self.server = server self.database = database @@ -107,6 +111,8 @@ def __init__( self.with_krb_auth = True db_url += "Trusted_Connection=yes;" + self.entra_id_token = entra_id_token + self_msg = sub(r"password=\'.*\'", "password=[REDACTED]", str(self)) logger.info(f"Created creds:\t{self_msg}") diff --git a/bcpandas/utils.py b/bcpandas/utils.py index 9111b52..5b3250f 100644 --- a/bcpandas/utils.py +++ b/bcpandas/utils.py @@ -71,6 +71,8 @@ def bcp( # auth if creds.with_krb_auth: auth = ["-T"] + elif creds.entra_id_token: + auth = ["-G", "-P", quote_this(creds.entra_id_token)] else: auth = ["-U", quote_this(creds.username), "-P", quote_this(creds.password)] if creds.odbc_kwargs: diff --git a/tests/test_utils.py b/tests/test_utils.py index 4e1558f..1ef63a6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -19,7 +19,9 @@ def fixture_run_cmd_capture(monkeypatch): def test_bcpandas_creates_command_without_port_if_default(run_cmd): - Creds = namedtuple("Creds", "server port database with_krb_auth username password odbc_kwargs") + Creds = namedtuple( + "Creds", "server port database with_krb_auth username password odbc_kwargs entra_id_token" + ) creds = Creds( server="localhost", port=1433, @@ -28,6 +30,7 @@ def test_bcpandas_creates_command_without_port_if_default(run_cmd): username="me", password="secret", odbc_kwargs=None, + entra_id_token=None, ) utils.bcp("table", "in", "", creds, True) assert run_cmd.call_args == mock.call( @@ -51,7 +54,9 @@ def test_bcpandas_creates_command_without_port_if_default(run_cmd): def test_bcpandas_creates_command_with_port_if_not_default(run_cmd): - Creds = namedtuple("Creds", "server port database with_krb_auth username password odbc_kwargs") + Creds = namedtuple( + "Creds", "server port database with_krb_auth username password odbc_kwargs entra_id_token" + ) creds = Creds( server="localhost", port=1234, @@ -60,6 +65,7 @@ def test_bcpandas_creates_command_with_port_if_not_default(run_cmd): username="me", password="secret", odbc_kwargs=None, + entra_id_token=None, ) utils.bcp("table", "in", "", creds, True) assert run_cmd.call_args == mock.call( @@ -83,7 +89,9 @@ def test_bcpandas_creates_command_with_port_if_not_default(run_cmd): def test_bcpandas_creates_command_with_encrypt_no(run_cmd): - Creds = namedtuple("Creds", "server port database with_krb_auth username password odbc_kwargs") + Creds = namedtuple( + "Creds", "server port database with_krb_auth username password odbc_kwargs entra_id_token" + ) creds = Creds( server="localhost", port=1433, @@ -92,6 +100,7 @@ def test_bcpandas_creates_command_with_encrypt_no(run_cmd): username="me", password="secret", odbc_kwargs=dict(encrypt="no"), + entra_id_token=None, ) utils.bcp("table", "in", "", creds, True) assert run_cmd.call_args == mock.call( @@ -116,7 +125,9 @@ def test_bcpandas_creates_command_with_encrypt_no(run_cmd): def test_bcpandas_creates_command_with_encrypt_yes(run_cmd): - Creds = namedtuple("Creds", "server port database with_krb_auth username password odbc_kwargs") + Creds = namedtuple( + "Creds", "server port database with_krb_auth username password odbc_kwargs entra_id_token" + ) creds = Creds( server="localhost", port=1433, @@ -125,6 +136,7 @@ def test_bcpandas_creates_command_with_encrypt_yes(run_cmd): username="me", password="secret", odbc_kwargs=dict(Encrypt="1"), + entra_id_token=None, ) utils.bcp("table", "in", "", creds, True) assert run_cmd.call_args == mock.call( @@ -148,6 +160,41 @@ def test_bcpandas_creates_command_with_encrypt_yes(run_cmd): ) +def test_bcpandas_creates_command_with_entra_id_token(run_cmd): + Creds = namedtuple( + "Creds", "server port database with_krb_auth username password odbc_kwargs entra_id_token" + ) + creds = Creds( + server="localhost", + port=1433, + database="DB", + with_krb_auth=False, + username=None, + password=None, + odbc_kwargs=dict(Encrypt="1"), + entra_id_token="secret_token", + ) + utils.bcp("table", "in", "", creds, True) + assert run_cmd.call_args == mock.call( + [ + "bcp", + "dbo.table", + "in", + "", + "-S", + "localhost", + "-d", + "DB", + "-q", + "-G", + "-P", + "secret_token", + ] + + (["-Ym"] if sys.platform != "win32" else []), + print_output=True, + ) + + @pytest.mark.usefixtures("database") def test_bcp_login_failure(sql_creds: SqlCreds): wrong_sql_creds = SqlCreds(