Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jochenchrist committed Nov 24, 2024
1 parent e0d72cf commit c77751d
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 95 deletions.
110 changes: 94 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,102 @@ Start the agent using Docker. You must pass the API keys as environment variable
```
docker run \
-e DATAMESHMANAGER_CLIENT_APIKEY='insert-api-key-here' \
-e DATAMESHMANAGER_CLIENT_DATABRICKS_HOST='https://dbc-xxxxxx.cloud.databricks.com/' \
-e DATAMESHMANAGER_CLIENT_DATABRICKS_TOKEN='your-access-token' \
-e DATAMESHMANAGER_CLIENT_DATABRICKS_WORKSPACE_HOST='https://dbc-xxxxxx.cloud.databricks.com/' \
-e DATAMESHMANAGER_CLIENT_DATABRICKS_WORKSPACE_TOKEN='your-access-token' \
datameshmanager/datamesh-manager-agent-databricks:latest
```

## Configuration

| Environment Variable | Default Value | Description |
|------------------------------------------------------------------------------|------------------------------------|----------------------------------------------------------------------------------------|
| `DATAMESHMANAGER_CLIENT_HOST` | `https://api.datamesh-manager.com` | Base URL of the Data Mesh Manager API. |
| `DATAMESHMANAGER_CLIENT_APIKEY` | | API key for authenticating requests to the Data Mesh Manager. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_HOST` | | Databricks workspace host URL in the form of `https://dbc-xxxxxx.cloud.databricks.com/`. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_TOKEN` | | Personal access token for authenticating with Databricks. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_AGENTID` | `databricks-access-management` | Identifier for the Databricks access management agent. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_ENABLED` | `true` | Indicates whether Databricks access management is enabled. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_MAPPING_DATAPRODUCT_CUSTOMFIELD` | `databricksServicePrincipal` | Custom field mapping for Databricks service principals in data products. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_MAPPING_TEAM_CUSTOMFIELD` | `databricksServicePrincipal` | Custom field mapping for Databricks service principals in teams. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_AGENTID` | `databricks-assets` | Identifier for the Databricks assets agent. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_ENABLED` | `true` | Indicates whether Databricks asset tracking is enabled. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_POLLINTERVAL` | `PT5S` | Polling interval for Databricks asset updates, in ISO 8601 duration format. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_TABLES_ALLOWLIST` | `*` | List of allowed tables for Databricks asset tracking (wildcard `*` allows all tables). |
| Environment Variable | Default Value | Description |
|--------------------------------------------------------------------------------------|------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
| `DATAMESHMANAGER_CLIENT_HOST` | `https://api.datamesh-manager.com` | Base URL of the Data Mesh Manager API. |
| `DATAMESHMANAGER_CLIENT_APIKEY` | | API key for authenticating requests to the Data Mesh Manager. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_WORKSPACE_HOST` | | Databricks workspace host URL in the form of `https://dbc-xxxxxx.cloud.databricks.com` (for AWS). |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_WORKSPACE_CLIENTID` | | Client ID of a workspace service principal with USE CATALOG, USE SCHEMA, and MODIFY permissions to grant permissions to schemas. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_WORKSPACE_CLIENTSECRET` | | Client secret of a workspace service principal with USE CATALOG, USE SCHEMA, and MODIFY permissions to grant permissions to schemas. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCOUNT_HOST` | | Databricks account login URL, e.g. the form of `https://accounts.cloud.databricks.com` (for AWS). |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCOUNT_ACCOUNTID` | | The databricks Account ID. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCOUNT_CLIENTID` | | The client ID of a an account service principal with Account admin role. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCOUNT_CLIENTSECRET` | | The client secret of a an account service principal with Account admin role. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_AGENTID` | `databricks-access-management` | Identifier for the Databricks access management agent. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_ENABLED` | `true` | Indicates whether Databricks access management is enabled. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_MAPPING_DATAPRODUCT_CUSTOMFIELD` | `databricksServicePrincipal` | Custom field mapping for Databricks service principals in data products. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ACCESSMANAGEMENT_MAPPING_TEAM_CUSTOMFIELD` | `databricksServicePrincipal` | Custom field mapping for Databricks service principals in teams. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_AGENTID` | `databricks-assets` | Identifier for the Databricks assets agent. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_ENABLED` | `true` | Indicates whether Databricks asset tracking is enabled. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_POLLINTERVAL` | `PT5S` | Polling interval for Databricks asset updates, in ISO 8601 duration format. |
| `DATAMESHMANAGER_CLIENT_DATABRICKS_ASSETS_TABLES_INCLUDE` | `*` | List of included tables for Databricks asset tracking (wildcard `*` includes all tables). |


## Access Management Flow

When an Access Request has been approved by the data product owner, and the start date is reached, Data Mesh Manager will publish an `AccessActivatedEvent`. When an end date is defined and reached, Data Mesh Manager will publish an `AccessDeactivatedEvent`. The agent listens for these events and grants access to the data consumer in Databricks.

### Consumer Type: Data Product

Example:

- Provider is a data product with ID `p-200` and selected output port `p-200-op-210`.
- The output port defines the schema `my_catalog.schema_220` in the server section.
- Consumer is a data product with ID `c-300`.
- Access ID is `a-100`.

Agent Actions on `AccessActivatedEvent`:

- Create a new service principal `dataproduct-c-300`, if it does not exist. (if a custom field `databricksServicePrincipal` is defined in the data product, the value will be used as the service principal name instead of the ID)
- Create a new group `access-a-100` for this access.
- Add the service principal `dataproduct-c-300` to the group `access-a-100`.
- Create a new group `team-t-300`, if it does not exist. (if a custom field `databricksGroupName` is defined in the team, the value will be used as the group name instead of the ID)
- Add all members of the team `t-300` to the group `team-t-300`.
- Add the group `team-t-300` to the group `access-a-101`.
- Grant permissions `USE SCHEMA` and `SELECT` on the schema `my_catalog.schema_220` to group `access-a-100`

Agent Actions on `AccessDeactivatedEvent`:

- Delete the group `access-a-100`


### Consumer Type: Team

Example:

- Provider is a data product with ID `p-200` and selected output port `p-200-op-210`.
- The output port defines the schema `my_catalog.schema_220` in the server section.
- Consumer is a team with ID `t-400`.
- Access ID is `a-101`.

Agent Actions on `AccessActivatedEvent`:

- Create a new group `team-t-400`, if it does not exist. (if a custom field `databricksGroupName` is defined in the team, the value will be used as the group name instead of the ID)
- Add all members of the team `t-400` to the group `team-t-400`.
- Create a new group `access-a-101` for this access.
- Add the group `team-t-400` to the group `access-a-101`.
- Grant permissions `USE SCHEMA` and `SELECT` on the schema `my_catalog.schema_220` to group `access-a-101`

Agent Actions on `AccessDeactivatedEvent`:

- Delete the group `access-a-101`


### Consumer Type: User

Example:

- Provider is a data product with ID `p-200` and selected output port `p-200-op-210`.
- The output port defines the schema `my_catalog.schema_220` in the server section.
- Consumer is an individual user with username `alice@example.com`.
- Access ID is `a-102`.

Agent Actions on `AccessActivatedEvent`:

- Create a new group `access-a-102` for this access.
- Add the user `alice@example.com` to the group `access-a-102` (the agent currently assumes that the username in Data Mesh Manager and Databricks are equal).
- Grant permissions `USE SCHEMA` and `SELECT` on the schema `my_catalog.schema_220` to group `access-a-102`

Agent Actions on `AccessDeactivatedEvent`:

- Delete the group `access-a-102`



2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<dependency>
<groupId>com.datamesh-manager</groupId>
<artifactId>datamesh-manager-sdk</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
Expand Down
52 changes: 34 additions & 18 deletions src/main/java/datameshmanager/databricks/Application.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package datameshmanager.databricks;

import com.databricks.sdk.AccountClient;
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.core.DatabricksConfig;
import datameshmanager.sdk.DataMeshManagerAssetsSynchronizer;
import datameshmanager.sdk.DataMeshManagerClient;
import datameshmanager.sdk.DataMeshManagerEventListener;
import datameshmanager.sdk.DataMeshManagerStateRepository;
import datameshmanager.sdk.DataMeshManagerStateRepositoryRemote;
import java.util.Objects;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@SpringBootApplication(scanBasePackages = "datameshmanager")
@ConfigurationPropertiesScan("datameshmanager")
Expand All @@ -28,11 +29,22 @@ public static void main(String[] args) {

@Bean
public WorkspaceClient workspaceClient(DatabricksProperties properties) {
var host = properties.host();
var token = properties.token();
var databricksConfig = new DatabricksConfig().setHost(host).setToken(token);
var databricksConfig = new DatabricksConfig()
.setHost(properties.workspace().host())
.setClientId(properties.workspace().clientId())
.setClientSecret(properties.workspace().clientSecret());
// TODO support GCP and Azure
return new WorkspaceClient(databricksConfig);
}
@Bean
public AccountClient accountClient(DatabricksProperties properties) {
var databricksConfig = new DatabricksConfig()
.setHost(Objects.requireNonNullElse(properties.account().host(), "https://accounts.cloud.databricks.com"))
.setAccountId(Objects.requireNonNull(properties.account().accountId(), "Databricks account ID is required"))
.setClientId(properties.account().clientId())
.setClientSecret(properties.account().clientSecret());
return new AccountClient(databricksConfig);
}

@Bean
public DataMeshManagerClient dataMeshManagerClient(
Expand All @@ -43,10 +55,13 @@ public DataMeshManagerClient dataMeshManagerClient(

@Bean(destroyMethod = "stop")
@ConditionalOnProperty(value = "datameshmanager.client.databricks.accessmanagement.enabled", havingValue = "true")
public DataMeshManagerEventListener dataMeshManagerEventListener(DataMeshManagerClient client, DatabricksProperties databricksProperties,
WorkspaceClient workspaceClient, TaskExecutor taskExecutor) {
public DataMeshManagerEventListener dataMeshManagerEventListener(
DataMeshManagerClient client, DatabricksProperties databricksProperties,
WorkspaceClient workspaceClient,
AccountClient accountClient,
TaskExecutor taskExecutor) {
var agentid = databricksProperties.accessmanagement().agentid();
var eventHandler = new DatabricksAccessManagementHandler(client, databricksProperties, workspaceClient);
var eventHandler = new DatabricksAccessManagementHandler(client, workspaceClient, accountClient);
var stateRepository = new DataMeshManagerStateRepositoryRemote(agentid, client);
var dataMeshManagerEventListener = new DataMeshManagerEventListener(agentid, client, eventHandler, stateRepository);
taskExecutor.execute(dataMeshManagerEventListener::start);
Expand All @@ -55,25 +70,26 @@ public DataMeshManagerEventListener dataMeshManagerEventListener(DataMeshManager

@Bean(destroyMethod = "stop")
@ConditionalOnProperty(value = "datameshmanager.client.databricks.assets.enabled", havingValue = "true")
public DataMeshManagerAssetsSynchronizer dataMeshManagerAssetsSynchronizer(DatabricksProperties databricksProperties,
DataMeshManagerClient client, WorkspaceClient workspaceClient, TaskExecutor taskExecutor) {
public DataMeshManagerAssetsSynchronizer dataMeshManagerAssetsSynchronizer(
DatabricksProperties databricksProperties,
DataMeshManagerClient client,
WorkspaceClient workspaceClient,
TaskExecutor taskExecutor) {
var agentid = databricksProperties.assets().agentid();
var stateRepository = new DataMeshManagerStateRepositoryRemote(agentid, client);
var assetsSupplier = new DatabricksAssetsSupplier(workspaceClient, stateRepository, databricksProperties);
var dataMeshManagerAssetsSynchronizer = new DataMeshManagerAssetsSynchronizer(agentid, client, assetsSupplier);
if (databricksProperties.assets().pollinterval() != null) {
dataMeshManagerAssetsSynchronizer.setDelay(databricksProperties.assets().pollinterval());
}

taskExecutor.execute(dataMeshManagerAssetsSynchronizer::start);
return dataMeshManagerAssetsSynchronizer;
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("datameshmanager-agent-");
executor.initialize();
return executor;
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

}
Loading

0 comments on commit c77751d

Please sign in to comment.