Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db model #49

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ httpx = "*"
kaggle = ">=1.5.16"
prefect-slack = "*"
tabulate = "==0.9.0"
sqlmodel = ">=0.0.14"
prefect-sqlalchemy = "*"
mysql-connector-python = "*"
pydantic-settings = "*"

[dev-packages]
pipenv = "*"
Expand Down
1,081 changes: 575 additions & 506 deletions Pipfile.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions infrastructure/borderlands-aurora/CREATE_IAM_USER.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ GRANT
-- permissions for more complex routines
CREATE TEMPORARY TABLES,
EXECUTE,
TRIGGER
ON Borderlands.* TO 'BorderlandsExecutor';
TRIGGER,
LOAD FROM S3
ON borderlands.* TO 'BorderlandsExecutor';

-- Grants the RDS user the necessary permissions to read and write to the Borderlands database
GRANT 'BorderlandsExecutor' TO 'Prefect';
57 changes: 56 additions & 1 deletion infrastructure/terraform/aurora.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,65 @@ resource "aws_iam_role" "rds" {
}
}

data "aws_iam_policy_document" "s3_read" {
statement {
effect = "Allow"
actions = [
"s3:GetObject"
]
resources = [
"${aws_s3_bucket.core_bucket.arn}/*"
]
}
}

resource "aws_iam_policy" "rds_s3_read" {
name = "rds-s3-read"
description = "Allows RDS to read from S3"
policy = data.aws_iam_policy_document.s3_read.json
}

resource "aws_iam_role_policy_attachment" "rds_s3_read" {
policy_arn = aws_iam_policy.rds_s3_read.arn
role = aws_iam_role.rds.name
}

// ------------------------------
// RDS Cluster

resource "aws_rds_cluster_parameter_group" "dev" {
name = "rds-cluster-pg-borderlands-dev"
family = "aurora-mysql8.0"
description = "Custom parameter group for the Borderlands dev RDS cluster"

parameter {
name = "aws_default_s3_role"
value = aws_iam_role.rds.arn
}

parameter {
// Allows Prefect DB user to automatically assume the BorderlandsExecutor role
name = "activate_all_roles_on_login"
value = "TRUE"
}

tags = {
project = "borderlands"
}
}

resource "aws_rds_cluster" "borderlands_dev" {
// Cluster config
cluster_identifier = "borderlands-dev"
engine = "aurora-mysql"
engine_mode = "provisioned"
engine_version = "8.0.mysql_aurora.3.04.1"
db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.dev.name

serverlessv2_scaling_configuration {
max_capacity = 1.0
min_capacity = 0.5
}

// DB instance
database_name = "borderlands"
Expand Down Expand Up @@ -66,7 +117,7 @@ resource "aws_rds_cluster" "borderlands_dev" {
resource "aws_rds_cluster_instance" "instance_1" {
cluster_identifier = aws_rds_cluster.borderlands_dev.cluster_identifier
identifier = "borderlands-dev-instance-1"
instance_class = "db.t3.medium"
instance_class = "db.serverless"
engine = "aurora-mysql"
engine_version = "8.0.mysql_aurora.3.04.1"

Expand Down Expand Up @@ -103,6 +154,10 @@ resource "aws_iam_user_policy" "prefect_user_dev_rds_access" {
policy = data.aws_iam_policy_document.dev_rds_access.json
}


// ------------------------------
// RDS Cluster Outputs

output "rds_cluster_endpoint" {
description = "The endpoint of the RDS cluster."
value = aws_rds_cluster.borderlands_dev.endpoint
Expand Down
150 changes: 85 additions & 65 deletions src/borderlands/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,103 +3,123 @@
"""

import asyncio
from typing import Awaitable

from prefect.blocks.core import Block
from prefect.utilities.asyncutils import sync_compatible
from prefect_aws import S3Bucket
from prefect_slack import SlackWebhook
from prefecto.filesystems import create_child
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from .utilities.blocks import RdsCredentials

class Blocks:

class Blocks(BaseSettings):
"""Class for lazy loading Prefect Blocks."""

_core_bucket: S3Bucket | None = None
_persistence_bucket: S3Bucket | None = None
_oryx_bucket: S3Bucket | None = None
_assets_bucket: S3Bucket | None = None
_media_bucket: S3Bucket | None = None
_webhook: SlackWebhook | None = None
model_config = SettingsConfigDict(
env_prefix="blocks_",
case_sensitive=False,
)

core_bucket_name: str = Field(
"s3-bucket-borderlands-core",
description="The name of the S3 bucket for result data.",
)

persistence_bucket_name: str = Field(
"s3-bucket-borderlands-persistence",
description="The name of the S3 bucket for Prefect persistence data.",
)

webhook_name: str = Field(
"slack-webhook-borderlands",
description="The name of the Slack webhook for notifications.",
)

rds_credentials_name: str = Field(
"rds-credentials-borderlands",
description="The name of the RDS credentials for the program.",
)

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._core_bucket: S3Bucket | None = None
self._persistence_bucket: S3Bucket | None = None
self._webhook: SlackWebhook | None = None
self._rds_credentials: RdsCredentials | None = None

def copy(self) -> "Blocks":
"""Return a copy of the blocks."""
new_blocks = Blocks()

new_blocks._core_bucket = self._core_bucket
new_blocks.core_bucket_name = self.core_bucket_name

new_blocks._persistence_bucket = self._persistence_bucket
new_blocks.persistence_bucket_name = self.persistence_bucket_name

new_blocks._webhook = self._webhook
new_blocks.webhook_name = self.webhook_name

new_blocks._rds_credentials = self._rds_credentials
new_blocks.rds_credentials_name = self.rds_credentials_name
return new_blocks

def reset(self):
"""Reset the blocks."""
self._core_bucket = None
self._persistence_bucket = None
self._webhook = None
self._rds_credentials = None

@property
def core_bucket(self) -> S3Bucket:
"""Returns the bucket for the program. Loads if it isn't already."""
if not self._core_bucket:
self._core_bucket = S3Bucket.load("s3-bucket-borderlands-core")
self._core_bucket = S3Bucket.load(self.core_bucket_name)
return self._core_bucket

@property
def persistence_bucket(self) -> S3Bucket:
"""Returns the bucket for the program. Loads if it isn't already."""
if not self._persistence_bucket:
self._persistence_bucket = S3Bucket.load(
"s3-bucket-borderlands-persistence"
)
self._persistence_bucket = S3Bucket.load(self.persistence_bucket_name)
return self._persistence_bucket

@property
def webhook(self) -> SlackWebhook:
"""Returns the webhook for the program. Loads if it isn't already."""
if not self._webhook:
self._webhook = SlackWebhook.load("slack-webhook-borderlands")
self._webhook = SlackWebhook.load(self.webhook_name)
return self._webhook

@property
def oryx_bucket(self) -> S3Bucket:
"""Returns the bucket for the program. Loads if it isn't already."""
if not self._oryx_bucket:
self._oryx_bucket = create_child(self.core_bucket, "oryx", "-oryx")
return self._oryx_bucket

@property
def assets_bucket(self) -> S3Bucket:
"""Returns the bucket for the program. Loads if it isn't already."""
if not self._assets_bucket:
self._assets_bucket = create_child(self.core_bucket, "assets", "-assets")
return self._assets_bucket

@property
def media_bucket(self) -> S3Bucket:
"""Returns the bucket for the program. Loads if it isn't already."""
if not self._media_bucket:
self._media_bucket = create_child(self.core_bucket, "media", "-media")
return self._media_bucket
def rds_credentials(self) -> RdsCredentials:
"""Returns the RDS credentials for the program. Loads if it isn't already."""
if not self._rds_credentials:
self._rds_credentials = RdsCredentials.load(self.rds_credentials_name)
return self._rds_credentials

@sync_compatible
async def load(self):
"""Load the blocks."""
self._core_bucket = (
(await self.core_bucket)
if asyncio.iscoroutine(self.core_bucket)
else self.core_bucket
)

self._persistence_bucket = (
(await self.persistence_bucket)
if asyncio.iscoroutine(self.persistence_bucket)
else self.persistence_bucket
)

self._webhook = (
(await self.webhook) if asyncio.iscoroutine(self.webhook) else self.webhook
)

self._oryx_bucket = (
(await self.oryx_bucket)
if asyncio.iscoroutine(self.oryx_bucket)
else self.oryx_bucket
)

self._assets_bucket = (
(await self.assets_bucket)
if asyncio.iscoroutine(self.assets_bucket)
else self.assets_bucket
)

self._media_bucket = (
(await self.media_bucket)
if asyncio.iscoroutine(self.media_bucket)
else self.media_bucket
)
self._core_bucket = _return_or_await_and_return(self.core_bucket)

self._persistence_bucket = _return_or_await_and_return(self.persistence_bucket)

self._webhook = _return_or_await_and_return(self.webhook)

self._rds_credentials = _return_or_await_and_return(self.rds_credentials)


@sync_compatible
async def _return_or_await_and_return(block: Block | Awaitable[Block]) -> Block:
"""Return the block if it is already loaded, otherwise await and return."""
if asyncio.iscoroutine(block):
return await block
return block


blocks = Blocks()
2 changes: 2 additions & 0 deletions src/borderlands/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from borderlands.cli.blocks import blocks
from borderlands.cli.docs import docs
from borderlands.cli.rds import rds

borderlands.add_command(blocks)
borderlands.add_command(docs)
borderlands.add_command(rds)
80 changes: 80 additions & 0 deletions src/borderlands/cli/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,86 @@ def kaggle_credentials(
save(kaggle_username, downstream=[partial(save, kaggle_key)])


@blocks.command()
@click.option(
"-n",
"--database-name",
type=str,
default="borderlands",
help="The name of the database.",
)
@click.option(
"-u",
"--username",
type=str,
default="Prefect",
help="The username for the database.",
)
@click.option(
"-h",
"--host",
type=str,
default="env:DATABASE_HOST",
help="The host for the database. Prefix with 'env:' to use an environment variable.",
)
@click.option(
"-p",
"--port",
type=int,
default=3306,
help="The port for the database.",
)
@click.option(
"-c",
"--credentials",
type=str,
default="aws-credentials-prefect",
help="The name of the AWS credentials block. Prefix with 'env:' to use an environment variable.",
)
@click.option(
"-b",
"--block-name",
type=str,
default="rds-credentials-dev",
help="The name of the block.",
)
def rds_credentials(
database_name: str,
username: str,
host: str,
port: int,
credentials: str,
block_name: str,
):
"""Create the RDS credentials block. The database and IAM user ought to be configured for AWS IAM authentication."""

if host.startswith("env:"):
env_var = host.split(":", maxsplit=1)[1]
host = os.environ[env_var]

if credentials.startswith("env:"):
env_var = credentials.split(":", maxsplit=1)[1]
credentials = os.environ[env_var]

from prefect_aws import AwsCredentials
from prefect_sqlalchemy import SyncDriver

from borderlands.utilities.blocks import RdsCredentials

iam_credentials = AwsCredentials.load(credentials)

db_credentials = RdsCredentials(
_block_document_name=block_name,
driver=SyncDriver.MYSQL_MYSQLCONNECTOR,
database=database_name,
username=username,
host=host,
port=port,
iam_credentials=iam_credentials,
)
save(db_credentials)


@blocks.command()
@click.option(
"-u",
Expand Down
Loading
Loading