diff --git a/docs/stack_outputs.json b/docs/stack_outputs.json index 7401bcb..ccde5f9 100644 --- a/docs/stack_outputs.json +++ b/docs/stack_outputs.json @@ -39,8 +39,8 @@ "ato": { "id": "1234-ATO", "authorized": "2025-03-27T00:00:00Z", - "review": "2028-03-27T00:00:00Z", - "renew": "2026-03-27T00:00:00Z" + "eol": "2028-03-27T00:00:00Z", + "last_touch": "2026-03-27T00:00:00Z" } }, "nist": { diff --git a/modules/aws/config.py b/modules/aws/config.py index 6cef125..922cd0a 100644 --- a/modules/aws/config.py +++ b/modules/aws/config.py @@ -35,9 +35,22 @@ "aws:ec2/instance:Instance", "aws:iam/role:Role", "aws:rds/instance:Instance", - # Add other AWS resource types that support tagging ] +DEFAULT_MODULE_CONFIG = { + "enabled": True, + "version": "latest", + "config": {"region": "us-east-1"}, + "compliance": { + "fisma": { + "enabled": False, + "level": "low", + "mode": "strict", + "ato": {"id": None, "authorized": None, "eol": None}, + } + }, +} + def validate_config(raw_config: dict) -> AWSModuleConfig: try: @@ -58,9 +71,7 @@ def initialize_aws_provider(config: AWSConfig) -> Provider: """ aws_config = pulumi.Config("aws") aws_access_key = os.getenv("AWS_ACCESS_KEY_ID") or aws_config.get("access_key_id") - aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") or aws_config.get( - "secret_access_key" - ) + aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") or aws_config.get("secret_access_key") profile = os.getenv("AWS_PROFILE") or config.profile return Provider( @@ -103,9 +114,7 @@ def global_transform( pulumi.runtime.register_stack_transformation(global_transform) -def generate_tags( - config: AWSConfig, compliance_config: ComplianceConfig, git_info: Dict[str, str] -) -> Dict[str, str]: +def generate_tags(config: AWSConfig, compliance_config: ComplianceConfig, git_info: Dict[str, str]) -> Dict[str, str]: """ Generates tags for AWS resources, including compliance and Git metadata. @@ -181,16 +190,12 @@ def load_tenant_account_configs() -> Dict[str, TenantAccountConfig]: tenant_config = TenantAccountConfig(**tenant) tenant_accounts[tenant_config.name] = tenant_config except Exception as e: - log.warn( - f"Invalid tenant account configuration for '{tenant.get('name', 'unknown')}': {e}" - ) + log.warn(f"Invalid tenant account configuration for '{tenant.get('name', 'unknown')}': {e}") return tenant_accounts -def merge_configurations( - base_config: Dict[str, Any], override_config: Dict[str, Any] -) -> Dict[str, Any]: +def merge_configurations(base_config: Dict[str, Any], override_config: Dict[str, Any]) -> Dict[str, Any]: """ Merges two configuration dictionaries with override taking precedence. @@ -306,9 +311,7 @@ def generate_compliance_labels(compliance_config: ComplianceConfig) -> Dict[str, if compliance_config.nist.enabled: labels["compliance.nist.enabled"] = "true" if compliance_config.nist.controls: - labels["compliance.nist.controls"] = ",".join( - compliance_config.nist.controls - ) + labels["compliance.nist.controls"] = ",".join(compliance_config.nist.controls) return labels diff --git a/modules/aws/deployment.py b/modules/aws/deployment.py index 403a255..11e1238 100644 --- a/modules/aws/deployment.py +++ b/modules/aws/deployment.py @@ -10,6 +10,8 @@ from .provider import AWSProvider from .types import AWSConfig from modules.core.stack_outputs import collect_global_metadata, collect_module_metadata +from modules.core.compliance_types import ComplianceConfig +from .eks import EksManager class AwsModule(ModuleInterface): @@ -64,7 +66,24 @@ def deploy(self, config: Dict[str, Any]) -> ModuleDeploymentResult: log.info(f"Successfully authenticated as: {caller_identity.arn}") log.info(f"AWS Account ID: {caller_identity.account_id}") + # Deploy EKS if enabled + if aws_config.eks and aws_config.eks.enabled: + log.info(f"Deploying EKS cluster: {aws_config.eks.name}") + eks_manager = EksManager(provider) + eks_resources = eks_manager.deploy_cluster( + name=aws_config.eks.name, + version=aws_config.eks.version, + instance_types=aws_config.eks.node_groups[0].instance_types if aws_config.eks.node_groups else None, + scaling_config=aws_config.eks.node_groups[0].scaling_config if aws_config.eks.node_groups else None, + ) + + # Export EKS outputs + pulumi.export("eks_cluster_name", eks_resources["cluster"].name) + pulumi.export("eks_cluster_endpoint", eks_resources["cluster"].endpoint) + pulumi.export("eks_cluster_vpc_id", eks_resources["vpc"].id) + # Get Git info as dictionary + # this is required code for initializing the git info, do not remove git_info = init_config.git_info.model_dump() # Collect metadata for resource tagging @@ -122,16 +141,29 @@ def deploy(self, config: Dict[str, Any]) -> ModuleDeploymentResult: provider_urn = str(provider.provider.urn) bucket_name = str(s3_bucket.id) + # Update metadata to include EKS info if deployed + if aws_config.eks and aws_config.eks.enabled: + aws_metadata["eks_cluster_name"] = aws_config.eks.name + + # Parse compliance config + compliance_config = ComplianceConfig.model_validate(config.get("compliance", {})) + + # Return deployment result without version # Return deployment result without version return ModuleDeploymentResult( success=True, - version="", # Empty string since AWS module doesn't use versions + version="0.0.1", resources=[provider_urn, bucket_name], - metadata=aws_metadata, + metadata={ + "compliance": compliance_config.model_dump(), + "aws_account_id": caller_identity.account_id, + "aws_user_id": caller_identity.user_id, + "aws_arn": caller_identity.arn, + **aws_metadata, + }, ) except Exception as e: - log.error(f"AWS deployment failed: {str(e)}") return ModuleDeploymentResult( success=False, version="", errors=[str(e)] # Empty string since AWS module doesn't use versions ) diff --git a/modules/aws/eks.py b/modules/aws/eks.py new file mode 100644 index 0000000..213c66c --- /dev/null +++ b/modules/aws/eks.py @@ -0,0 +1,337 @@ +"""AWS EKS Management Module""" + +from typing import Dict, List, Optional, Any, TYPE_CHECKING +import pulumi_aws as aws +from pulumi import ResourceOptions, log + +if TYPE_CHECKING: + from .provider import AWSProvider + + +class EksManager: + """Manages AWS EKS cluster and related resources.""" + + def __init__(self, provider: "AWSProvider"): + """Initialize EKS manager.""" + self.provider = provider + + def create_vpc(self, name: str) -> Dict[str, Any]: + """Create VPC and related networking resources for EKS.""" + try: + # Create VPC + vpc = aws.ec2.Vpc( + f"eks-vpc-{name}", + cidr_block="10.0.0.0/16", + enable_dns_hostnames=True, + enable_dns_support=True, + tags={ + **self.provider.get_tags(), + "Name": f"eks-vpc-{name}", + "kubernetes.io/cluster/{name}": "shared", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Create Internet Gateway + igw = aws.ec2.InternetGateway( + f"eks-igw-{name}", + vpc_id=vpc.id, + tags={ + **self.provider.get_tags(), + "Name": f"eks-igw-{name}", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Create public and private subnets across 2 AZs + public_subnets = [] + private_subnets = [] + azs = ["us-east-1a", "us-east-1b"] # Hardcoded for first iteration + + for i, az in enumerate(azs): + # Public subnet + public_subnet = aws.ec2.Subnet( + f"eks-public-subnet-{name}-{i}", + vpc_id=vpc.id, + cidr_block=f"10.0.{i*2}.0/24", + availability_zone=az, + map_public_ip_on_launch=True, + tags={ + **self.provider.get_tags(), + "Name": f"eks-public-{name}-{i}", + "kubernetes.io/cluster/{name}": "shared", + "kubernetes.io/role/elb": "1", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + public_subnets.append(public_subnet) + + # Private subnet + private_subnet = aws.ec2.Subnet( + f"eks-private-subnet-{name}-{i}", + vpc_id=vpc.id, + cidr_block=f"10.0.{i*2+1}.0/24", + availability_zone=az, + tags={ + **self.provider.get_tags(), + "Name": f"eks-private-{name}-{i}", + "kubernetes.io/cluster/{name}": "shared", + "kubernetes.io/role/internal-elb": "1", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + private_subnets.append(private_subnet) + + # Create route table for public subnets + public_rt = aws.ec2.RouteTable( + f"eks-public-rt-{name}", + vpc_id=vpc.id, + routes=[ + { + "cidr_block": "0.0.0.0/0", + "gateway_id": igw.id, + } + ], + tags={ + **self.provider.get_tags(), + "Name": f"eks-public-rt-{name}", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Associate public subnets with public route table + for i, subnet in enumerate(public_subnets): + aws.ec2.RouteTableAssociation( + f"eks-public-rta-{name}-{i}", + subnet_id=subnet.id, + route_table_id=public_rt.id, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Create NAT Gateway for private subnets + eip = aws.ec2.Eip( + f"eks-eip-{name}", + tags={ + **self.provider.get_tags(), + "Name": f"eks-eip-{name}", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + nat_gateway = aws.ec2.NatGateway( + f"eks-nat-{name}", + allocation_id=eip.id, + subnet_id=public_subnets[0].id, + tags={ + **self.provider.get_tags(), + "Name": f"eks-nat-{name}", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Create route table for private subnets + private_rt = aws.ec2.RouteTable( + f"eks-private-rt-{name}", + vpc_id=vpc.id, + routes=[ + { + "cidr_block": "0.0.0.0/0", + "nat_gateway_id": nat_gateway.id, + } + ], + tags={ + **self.provider.get_tags(), + "Name": f"eks-private-rt-{name}", + }, + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Associate private subnets with private route table + for i, subnet in enumerate(private_subnets): + aws.ec2.RouteTableAssociation( + f"eks-private-rta-{name}-{i}", + subnet_id=subnet.id, + route_table_id=private_rt.id, + opts=ResourceOptions(provider=self.provider.provider), + ) + + return { + "vpc": vpc, + "public_subnets": public_subnets, + "private_subnets": private_subnets, + } + + except Exception as e: + log.error(f"Failed to create VPC infrastructure: {str(e)}") + raise + + def create_cluster_role(self, name: str) -> aws.iam.Role: + """Create IAM role for EKS cluster.""" + assume_role_policy = { + "Version": "2012-10-17", + "Statement": [ + {"Effect": "Allow", "Principal": {"Service": "eks.amazonaws.com"}, "Action": "sts:AssumeRole"} + ], + } + + role = aws.iam.Role( + f"eks-cluster-role-{name}", + assume_role_policy=assume_role_policy, + tags=self.provider.get_tags(), + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Attach required policies + aws.iam.RolePolicyAttachment( + f"eks-cluster-policy-{name}", + policy_arn="arn:aws:iam::aws:policy/AmazonEKSClusterPolicy", + role=role.name, + opts=ResourceOptions(provider=self.provider.provider), + ) + + return role + + def create_node_role(self, name: str) -> aws.iam.Role: + """Create IAM role for EKS node group.""" + assume_role_policy = { + "Version": "2012-10-17", + "Statement": [ + {"Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole"} + ], + } + + role = aws.iam.Role( + f"eks-node-role-{name}", + assume_role_policy=assume_role_policy, + tags=self.provider.get_tags(), + opts=ResourceOptions(provider=self.provider.provider), + ) + + # Attach required policies + required_policies = ["AmazonEKSWorkerNodePolicy", "AmazonEKS_CNI_Policy", "AmazonEC2ContainerRegistryReadOnly"] + + for policy in required_policies: + aws.iam.RolePolicyAttachment( + f"eks-node-policy-{policy}-{name}", + policy_arn=f"arn:aws:iam::aws:policy/{policy}", + role=role.name, + opts=ResourceOptions(provider=self.provider.provider), + ) + + return role + + def create_cluster( + self, + name: str, + subnet_ids: List[str], + cluster_role: aws.iam.Role, + version: Optional[str] = "1.27", + tags: Optional[Dict[str, str]] = None, + ) -> aws.eks.Cluster: + """Create EKS cluster.""" + if tags is None: + tags = {} + + # Merge with provider tags + merged_tags = {**self.provider.get_tags(), **tags} + + cluster = aws.eks.Cluster( + f"eks-{name}", + name=name, + role_arn=cluster_role.arn, + version=version, + vpc_config={ + "subnetIds": subnet_ids, + "endpointPrivateAccess": True, + "endpointPublicAccess": True, + }, + tags=merged_tags, + opts=ResourceOptions(provider=self.provider.provider, protect=True), + ) + + return cluster + + def create_node_group( + self, + name: str, + cluster: aws.eks.Cluster, + node_role: aws.iam.Role, + subnet_ids: List[str], + instance_types: Optional[List[str]] = None, + scaling_config: Optional[Dict[str, int]] = None, + tags: Optional[Dict[str, str]] = None, + ) -> aws.eks.NodeGroup: + """Create EKS node group.""" + if instance_types is None: + instance_types = ["t3.medium"] + + if scaling_config is None: + scaling_config = {"desired_size": 2, "max_size": 4, "min_size": 1} + + if tags is None: + tags = {} + + # Merge with provider tags + merged_tags = {**self.provider.get_tags(), **tags} + + node_group = aws.eks.NodeGroup( + f"eks-nodegroup-{name}", + cluster_name=cluster.name, + node_role_arn=node_role.arn, + subnet_ids=subnet_ids, + instance_types=instance_types, + scaling_config=scaling_config, + tags=merged_tags, + opts=ResourceOptions(provider=self.provider.provider, depends_on=[cluster]), + ) + + return node_group + + def deploy_cluster( + self, + name: str, + version: Optional[str] = None, + instance_types: Optional[List[str]] = None, + scaling_config: Optional[Dict[str, int]] = None, + ) -> Dict[str, Any]: + """ + Deploy a complete EKS cluster with VPC and node group. + Returns cluster information and resources. + """ + try: + # Create VPC infrastructure + vpc_resources = self.create_vpc(name) + + # Use private subnets for the EKS cluster + subnet_ids = [subnet.id for subnet in vpc_resources["private_subnets"]] + + # Create IAM roles + cluster_role = self.create_cluster_role(name) + node_role = self.create_node_role(name) + + # Create EKS cluster + cluster = self.create_cluster(name=name, subnet_ids=subnet_ids, cluster_role=cluster_role, version=version) + + # Create node group + node_group = self.create_node_group( + name=name, + cluster=cluster, + node_role=node_role, + subnet_ids=subnet_ids, + instance_types=instance_types, + scaling_config=scaling_config, + ) + + return { + "vpc": vpc_resources["vpc"], + "public_subnets": vpc_resources["public_subnets"], + "private_subnets": vpc_resources["private_subnets"], + "cluster": cluster, + "node_group": node_group, + "cluster_role": cluster_role, + "node_role": node_role, + } + + except Exception as e: + log.error(f"Failed to deploy EKS cluster: {str(e)}") + raise diff --git a/modules/aws/provider.py b/modules/aws/provider.py index be8282d..ab8ea47 100644 --- a/modules/aws/provider.py +++ b/modules/aws/provider.py @@ -6,6 +6,7 @@ import os from .types import AWSConfig +from modules.core.metadata import MetadataSingleton class AWSProvider: @@ -162,6 +163,44 @@ def get_tags(self) -> Dict[str, str]: # Sanitize tags before returning return self.sanitize_tags(self._tags) + def update_metadata(self) -> None: + """Update global metadata singleton with AWS-specific metadata.""" + try: + caller_identity = self.get_caller_identity() + + aws_metadata = { + "account_id": caller_identity.account_id, + "user_id": caller_identity.user_id, + "arn": caller_identity.arn, + "region": self.region, + } + + # Update the global metadata singleton with AWS metadata + MetadataSingleton().set_module_metadata("aws", aws_metadata) + + log.info("Successfully updated AWS metadata") + + except Exception as e: + log.error(f"Failed to update AWS metadata: {str(e)}") + raise + + def export_aws_metadata(self) -> None: + """Export AWS metadata to stack outputs.""" + try: + # Get metadata from singleton + aws_metadata = MetadataSingleton().get_module_metadata("aws") + + # Create AWS-specific stack outputs + stack_outputs = {"config": {"aws": {"sts_caller_identity": aws_metadata}}} + + # Export AWS-specific outputs + pulumi.export("aws_metadata", stack_outputs) + log.info("Successfully exported AWS metadata") + + except Exception as e: + log.error(f"Failed to export AWS metadata: {str(e)}") + raise + def collect_module_metadata(global_metadata: Dict[str, Any], provider: AWSProvider) -> Dict[str, Any]: """Collect AWS-specific metadata.""" @@ -177,8 +216,6 @@ def collect_module_metadata(global_metadata: Dict[str, Any], provider: AWSProvid } # Store in global metadata singleton - from modules.core.metadata import MetadataSingleton - MetadataSingleton().set_aws_metadata(aws_metadata["aws"]) return aws_metadata diff --git a/modules/aws/types.py b/modules/aws/types.py index 66f0ad7..61d63b3 100644 --- a/modules/aws/types.py +++ b/modules/aws/types.py @@ -80,6 +80,7 @@ class AWSConfig(BaseModel): security: SecurityConfig = Field(default_factory=SecurityConfig) compliance: ComplianceConfig = Field(default_factory=ComplianceConfig) tags: Dict[str, str] = Field(default_factory=dict) + eks: Optional[EksClusterConfig] = Field(default=None) @validator("region") def validate_region(cls, v): @@ -134,3 +135,48 @@ class TenantAccountConfig(BaseModel): email: str = Field(..., description="Email address for the account root user") organizational_unit: Optional[str] = Field(None, description="OU to place the account in") tags: Dict[str, str] = Field(default_factory=dict) + + +class AWSProviderMetadata(BaseModel): + """AWS provider metadata.""" + + account_id: str + user_id: str + arn: str + region: str + + +class EksNodeGroupConfig(BaseModel): + """Configuration for EKS node groups.""" + + name: str = Field(..., description="Name of the node group") + instance_types: List[str] = Field(default_factory=lambda: ["t3.medium"]) + scaling_config: Dict[str, int] = Field( + default_factory=lambda: { + "desired_size": 2, + "max_size": 4, + "min_size": 1, + } + ) + subnet_ids: Optional[List[str]] = Field(default=None) + tags: Dict[str, str] = Field(default_factory=dict) + + +class EksClusterConfig(BaseModel): + """Configuration for EKS cluster.""" + + enabled: bool = Field(default=True) + name: str = Field(..., description="Name of the EKS cluster") + version: str = Field(default="1.27", description="Kubernetes version") + subnet_ids: Optional[List[str]] = Field(default=None) + endpoint_private_access: bool = Field(default=True) + endpoint_public_access: bool = Field(default=True) + node_groups: List[EksNodeGroupConfig] = Field(default_factory=list) + tags: Dict[str, str] = Field(default_factory=dict) + + @validator("version") + def validate_version(cls, v): + valid_versions = ["1.27", "1.26", "1.25"] # Add supported versions + if v not in valid_versions: + raise ValueError(f"Invalid EKS version: {v}. Must be one of {valid_versions}") + return v diff --git a/modules/core/__init__.py b/modules/core/__init__.py index 3f15e2b..060979f 100644 --- a/modules/core/__init__.py +++ b/modules/core/__init__.py @@ -95,8 +95,7 @@ # Metadata management from .metadata import ( setup_global_metadata, - set_global_labels, - set_global_annotations, + MetadataSingleton, ) # Git utilities @@ -171,8 +170,7 @@ "initialize_pulumi", # Metadata "setup_global_metadata", - "set_global_labels", - "set_global_annotations", + "MetadataSingleton", # Git utilities "get_latest_semver_tag", "get_remote_url", diff --git a/modules/core/compliance_types.py b/modules/core/compliance_types.py index c273d75..ed6748f 100644 --- a/modules/core/compliance_types.py +++ b/modules/core/compliance_types.py @@ -2,99 +2,168 @@ """ Compliance configuration types """ -from pulumi import log -from dataclasses import dataclass +from pulumi import log, Config +from dataclasses import dataclass, field from typing import List, Dict, Any, Optional -from datetime import datetime -from pydantic import BaseModel, Field +from datetime import datetime, timezone +from pydantic import BaseModel, Field, validator -@dataclass -class FismaAto: - authorized: datetime - renew: datetime - review: datetime +class FismaAto(BaseModel): + """FISMA ATO configuration""" + + id: Optional[str] = Field(None, description="ATO identifier") + authorized: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + eol: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + last_touch: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc)) + + @classmethod + def from_config(cls, ato_config: Dict[str, Any], program_start_time: datetime) -> "FismaAto": + """Create FismaAto from config dictionary""" + if not ato_config: + return cls(authorized=program_start_time, eol=program_start_time, last_touch=program_start_time) + + return cls( + id=ato_config.get("id"), + authorized=cls.parse_datetime(ato_config.get("authorized", program_start_time)), + eol=cls.parse_datetime(ato_config.get("eol", program_start_time)), + last_touch=program_start_time, + ) + + @validator("authorized", "eol", pre=True) + def parse_datetime(cls, v): + """Parse datetime strings into datetime objects""" + if isinstance(v, datetime): + return v + if isinstance(v, str): + try: + # First try parsing ISO format + try: + return datetime.fromisoformat(v.replace("Z", "+00:00")) + except ValueError: + pass + + # Then try explicit formats + formats = [ + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%d %H:%M:%S %z", + "%Y-%m-%d %H:%M:%S +0000 UTC", + "%Y-%m-%d", + ] + + for fmt in formats: + try: + dt = datetime.strptime(v, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + continue + + raise ValueError(f"Could not parse datetime from {v}") + + except Exception as e: + log.error(f"Failed to parse datetime '{v}': {str(e)}") + raise ValueError(f"Invalid datetime format: {v}") + raise ValueError(f"Invalid datetime value: {v}") + + class Config: + json_encoders = {datetime: lambda v: v.isoformat()} @dataclass class Fisma: - ato: FismaAto + ato: FismaAto = field(default_factory=FismaAto) enabled: bool = False level: str = "low" + mode: str = "strict" @dataclass class Nist: - auxiliary: List[str] = Field(default_factory=list) - controls: List[str] = Field(default_factory=list) + auxiliary: List[str] = field(default_factory=list) + controls: List[str] = field(default_factory=list) enabled: bool = False - exceptions: List[str] = Field(default_factory=list) + exceptions: List[str] = field(default_factory=list) @dataclass class ScipOwnership: - contacts: List[str] = Field(default_factory=list) + contacts: List[str] = field(default_factory=list) name: str = "" @dataclass class ScipProvider: name: str = "" - regions: List[str] = Field(default_factory=list) + regions: List[str] = field(default_factory=list) @dataclass class Scip: environment: str = "dev" - ownership: Dict[str, ScipOwnership] = Field( + ownership: Dict[str, ScipOwnership] = field( default_factory=lambda: { "operator": ScipOwnership(), "provider": ScipOwnership(), } ) - provider: ScipProvider = Field(default_factory=ScipProvider) + provider: ScipProvider = field(default_factory=ScipProvider) class ComplianceConfig(BaseModel): """Compliance configuration with defaults""" - fisma: Fisma = Field( - default_factory=lambda: Fisma( - ato=FismaAto( - authorized=datetime.now(), renew=datetime.now(), review=datetime.now() - ) - ) - ) + fisma: Fisma = Field(default_factory=Fisma) nist: Nist = Field(default_factory=Nist) scip: Scip = Field(default_factory=Scip) - @staticmethod - def merge(user_config: Dict[str, Any]) -> "ComplianceConfig": - """Merge user configuration with defaults""" - default_config = ComplianceConfig() - + @classmethod + def from_pulumi_config(cls, config: Config, program_start_time: datetime) -> "ComplianceConfig": + """Create ComplianceConfig from Pulumi stack configuration""" try: - for key, value in user_config.items(): - if hasattr(default_config, key): - setattr(default_config, key, value) - else: - log.warn(f"Unknown compliance configuration key: {key}") - except Exception as e: - log.warn(f"Error merging compliance config: {str(e)}") + # Get compliance config from stack + compliance_config = config.get_object("compliance") or {} + + # Get FISMA config + fisma_config = compliance_config.get("fisma", {}) + ato_config = fisma_config.get("ato", {}) + + # Create ATO instance + ato = FismaAto( + id=ato_config.get("id"), + authorized=ato_config.get("authorized", program_start_time), + eol=ato_config.get("eol", program_start_time), + last_touch=program_start_time, + ) - return default_config + # Create and return complete config + return cls( + fisma=Fisma( + ato=ato, + enabled=fisma_config.get("enabled", False), + level=fisma_config.get("level", "low"), + mode=fisma_config.get("mode", "strict"), + ), + nist=Nist(**compliance_config.get("nist", {})), + scip=Scip(**compliance_config.get("scip", {})), + ) + except Exception as e: + log.error(f"Error creating compliance config: {str(e)}") + # Return default config instead of raising + return cls() def to_dict(self) -> Dict[str, Any]: """Convert compliance metadata to dictionary format""" return { "fisma": { "ato": { - "authorized": self.fisma.ato.authorized.isoformat(), - "renew": self.fisma.ato.renew.isoformat(), - "review": self.fisma.ato.review.isoformat(), + "id": self.fisma.ato.id, + "authorized": self.fisma.ato.authorized.isoformat() if self.fisma.ato.authorized else None, + "eol": self.fisma.ato.eol.isoformat() if self.fisma.ato.eol else None, + "last_touch": self.fisma.ato.last_touch.isoformat() if self.fisma.ato.last_touch else None, }, "enabled": self.fisma.enabled, "level": self.fisma.level, + "mode": self.fisma.mode, }, "nist": { "auxiliary": self.nist.auxiliary, @@ -105,8 +174,14 @@ def to_dict(self) -> Dict[str, Any]: "scip": { "environment": self.scip.environment, "ownership": { - "operator": vars(self.scip.ownership["operator"]), - "provider": vars(self.scip.ownership["provider"]), + "operator": { + "contacts": self.scip.ownership["operator"].contacts, + "name": self.scip.ownership["operator"].name, + }, + "provider": { + "contacts": self.scip.ownership["provider"].contacts, + "name": self.scip.ownership["provider"].name, + }, }, "provider": { "name": self.scip.provider.name, @@ -114,3 +189,6 @@ def to_dict(self) -> Dict[str, Any]: }, }, } + + class Config: + arbitrary_types_allowed = True diff --git a/modules/core/config.py b/modules/core/config.py index 0a53a10..5febd9c 100644 --- a/modules/core/config.py +++ b/modules/core/config.py @@ -35,16 +35,13 @@ # Configuration Constants -DEFAULT_VERSIONS_URL_TEMPLATE = ( - "https://raw.githubusercontent.com/ContainerCraft/Kargo/newfactor/modules/" -) +DEFAULT_VERSIONS_URL_TEMPLATE = "https://raw.githubusercontent.com/ContainerCraft/Kargo/newfactor/modules/" CACHE_DIR = Path("/tmp/konductor") VERSION_CACHE_FILE = CACHE_DIR / "default_versions.json" # Default module configuration DEFAULT_MODULE_CONFIG: Dict[str, ModuleDefaults] = { - "aws": {"enabled": True, "version": None, "config": {}}, "cert_manager": {"enabled": False, "version": None, "config": {}}, "kubevirt": {"enabled": False, "version": None, "config": {}}, "multus": {"enabled": False, "version": None, "config": {}}, @@ -100,17 +97,13 @@ def get_module_config( ValueError: If module configuration is invalid """ try: - module_defaults = DEFAULT_MODULE_CONFIG.get( - module_name, ModuleDefaults(enabled=False, version=None, config={}) - ) + module_defaults = DEFAULT_MODULE_CONFIG.get(module_name, ModuleDefaults(enabled=False, version=None, config={})) # Get module configuration from Pulumi config module_config: Dict[str, Any] = config.get_object(module_name) or {} # Determine if module is enabled - enabled = coerce_to_bool( - module_config.get("enabled", module_defaults["enabled"]) - ) + enabled = coerce_to_bool(module_config.get("enabled", module_defaults["enabled"])) # Get module version version = module_config.get("version", default_versions.get(module_name)) @@ -189,9 +182,7 @@ def load_versions_from_url(url: str) -> Dict[str, Any]: return {} -def load_default_versions( - config: pulumi.Config, force_refresh: bool = False -) -> Dict[str, Any]: +def load_default_versions(config: pulumi.Config, force_refresh: bool = False) -> Dict[str, Any]: """ Loads the default versions for modules based on configuration settings. @@ -236,9 +227,7 @@ def load_default_versions( # Try stack-specific versions if versions_stack_name: - stack_versions_path = ( - Path(__file__).parent.parent / "versions" / f"{stack_name}.json" - ) + stack_versions_path = Path(__file__).parent.parent / "versions" / f"{stack_name}.json" if versions := load_versions_from_file(stack_versions_path): _cache_versions(versions) return versions @@ -287,11 +276,7 @@ def export_results( """ try: # Convert compliance to dictionary if it's a Pydantic model - compliance_dict = ( - compliance.dict() - if isinstance(compliance, ComplianceConfig) - else compliance - ) + compliance_dict = compliance.dict() if isinstance(compliance, ComplianceConfig) else compliance # Export results pulumi.export("versions", versions) @@ -364,9 +349,7 @@ def validate_module_config( raise -def merge_configurations( - base_config: Dict[str, Any], override_config: Dict[str, Any] -) -> Dict[str, Any]: +def merge_configurations(base_config: Dict[str, Any], override_config: Dict[str, Any]) -> Dict[str, Any]: """ Merges two configurations with override taking precedence. @@ -408,9 +391,7 @@ def initialize_config(stack_config: Dict[str, Any]) -> InitializationConfig: required_fields = {"project_name", "stack_name"} missing_fields = required_fields - set(stack_config.keys()) if missing_fields: - raise ValueError( - f"Missing required stack configuration fields: {missing_fields}" - ) + raise ValueError(f"Missing required stack configuration fields: {missing_fields}") # Initialize with validated config config = InitializationConfig(**stack_config) @@ -498,8 +479,7 @@ def get_stack_outputs(init_config: InitializationConfig) -> StackOutputs: k8s_versions = { name: config.get("version", "unknown") for name, config in init_config.configurations.items() - if name - in ["cert_manager", "kubevirt", "multus", "pulumi_operator", "crossplane"] + if name in ["cert_manager", "kubevirt", "multus", "pulumi_operator", "crossplane"] } # Construct the StackOutputs TypedDict @@ -540,27 +520,50 @@ def get_enabled_modules(self) -> List[str]: """Get list of enabled modules from config.""" enabled_modules = [] - # Get AWS configuration and check if enabled - aws_config = self.pulumi_config.get_object("aws") - if aws_config and aws_config.get("enabled"): - log.info("AWS module is enabled in configuration") - enabled_modules.append("aws") - else: - log.debug("AWS module is not enabled in configuration") + # Look for modules directory + modules_dir = Path(__file__).parent.parent + + # Scan for module directories + for module_dir in modules_dir.iterdir(): + if not module_dir.is_dir() or module_dir.name == "core": + continue + + module_name = module_dir.name + try: + # Import module's config + module_config = self.get_module_config(module_name) + + # Check if module is enabled + is_enabled = module_config.get("enabled", False) + + if is_enabled: + log.info(f"{module_name} module is enabled in configuration") + enabled_modules.append(module_name) + else: + log.debug(f"{module_name} module is not enabled") + + except Exception as e: + log.warn(f"Error checking module {module_name}: {str(e)}") + continue return enabled_modules def get_module_config(self, module_name: str) -> Dict[str, Any]: """Get configuration for a specific module.""" try: - # First try to get module-specific config - module_config = self.pulumi_config.get_object(module_name) - if module_config is None: - # Fallback to legacy format - module_config = self.pulumi_config.get_object( - f"konductor:{module_name}" - ) - return module_config or {} + # Try to get module config from Pulumi stack config + stack_config = self.pulumi_config.get_object(module_name) or {} + + # Try to import module's default config + try: + module_config = __import__(f"modules.{module_name}.config", fromlist=["DEFAULT_MODULE_CONFIG"]) + default_config = getattr(module_config, "DEFAULT_MODULE_CONFIG", {}) + except (ImportError, AttributeError): + default_config = {} + + # Merge configs with stack config taking precedence + return {**default_config, **stack_config} + except Exception as e: log.warn(f"Error loading config for module {module_name}: {str(e)}") return {} diff --git a/modules/core/deployment.py b/modules/core/deployment.py index 9efc73c..40c392b 100644 --- a/modules/core/deployment.py +++ b/modules/core/deployment.py @@ -27,11 +27,14 @@ def deploy_modules(self, modules_to_deploy: List[str]) -> None: try: module_class = self.load_module(module_name) module_config = self.config_manager.get_module_config(module_name) + + # Add compliance config to module config + module_config["compliance"] = self.init_config.compliance_config.model_dump() + module_instance = module_class(init_config=self.init_config) result = module_instance.deploy(module_config) if result.success: - # Store metadata self.modules_metadata[module_name] = result.metadata else: raise ModuleDeploymentError(f"Module {module_name} deployment failed.") diff --git a/modules/core/initialization.py b/modules/core/initialization.py index da6b2db..94c9b80 100644 --- a/modules/core/initialization.py +++ b/modules/core/initialization.py @@ -1,5 +1,4 @@ -# ../konductor/modules/core/initialization.py - +# ./modules/core/initialization.py """ Pulumi Initialization Module @@ -8,11 +7,13 @@ and initializing the Pulumi runtime environment. """ -import pulumi from pulumi import Config, get_stack, get_project, log +from datetime import datetime, timezone -from modules.core.types import InitializationConfig, GitInfo +from modules.core.types import InitializationConfig from modules.core.git import collect_git_info +from modules.core.compliance_types import ComplianceConfig + def initialize_pulumi() -> InitializationConfig: """ @@ -31,6 +32,9 @@ def initialize_pulumi() -> InitializationConfig: Exception: If initialization fails """ try: + # Create program start timestamp in UTC + program_start_time = datetime.now(timezone.utc) + # Load Pulumi configuration pulumi_config = Config() @@ -39,11 +43,16 @@ def initialize_pulumi() -> InitializationConfig: project_name = get_project() log.info(f"Initializing Pulumi project: {project_name}, stack: {stack_name}") + # Get compliance config + compliance_config = ComplianceConfig.from_pulumi_config(pulumi_config, program_start_time) + + # Store compliance config in singleton + from modules.core.metadata import MetadataSingleton + + MetadataSingleton().set_module_metadata("compliance", compliance_config.model_dump()) + # Initialize default metadata structure - metadata = { - "labels": {}, - "annotations": {} - } + metadata = {"labels": {}, "annotations": {}, "timestamps": {"last_touch": program_start_time.isoformat()}} # Create the initialization config init_config = InitializationConfig( @@ -52,10 +61,12 @@ def initialize_pulumi() -> InitializationConfig: project_name=project_name, default_versions={}, git_info=collect_git_info(), - metadata=metadata + metadata=metadata, + deployment_date_time=program_start_time.isoformat(), + compliance_config=compliance_config, ) - log.info(f"Pulumi initialization completed successfully") + log.info(f"Pulumi initialization completed successfully at {program_start_time}") return init_config except Exception as e: diff --git a/modules/core/metadata.py b/modules/core/metadata.py index 68e7cb1..1be847c 100644 --- a/modules/core/metadata.py +++ b/modules/core/metadata.py @@ -9,49 +9,27 @@ """ from typing import Dict, Optional, ClassVar, Any, Protocol -from pulumi import log +from pydantic import ValidationError +from datetime import datetime, timezone from threading import Lock -from datetime import datetime - -from .compliance_types import ( - Fisma, - FismaAto, - Nist, - Scip, - ScipProvider, - ComplianceConfig, - ScipOwnership, -) + import pulumi +from pulumi import Config, log + +from .compliance_types import ComplianceConfig class MetadataSingleton: """ Thread-safe singleton class to manage global metadata. - - This class ensures consistent labels and annotations across all resources. - It uses threading.Lock for thread safety and provides atomic operations - for metadata updates. - - Attributes: - _instance: The singleton instance - _lock: Thread lock for synchronization - _global_labels: Dictionary of global labels - _global_annotations: Dictionary of global annotations - _aws_metadata: Dictionary of AWS metadata - _git_metadata: Dictionary of Git metadata + Provides a centralized store for all module metadata, with each module + storing its metadata under its own namespace. """ _instance: Optional["MetadataSingleton"] = None _lock: ClassVar[Lock] = Lock() def __new__(cls) -> "MetadataSingleton": - """ - Ensure only one instance is created. - - Returns: - MetadataSingleton: The singleton instance - """ if cls._instance is None: with cls._lock: if cls._instance is None: @@ -65,74 +43,64 @@ def __init__(self) -> None: if not hasattr(self, "_initialized"): self._global_labels: Dict[str, str] = {} self._global_annotations: Dict[str, str] = {} - self._aws_metadata: Dict[str, Any] = {} self._git_metadata: Dict[str, Any] = {} + self._modules_metadata: Dict[str, Dict[str, Any]] = {} self._initialized = True @property def global_labels(self) -> Dict[str, str]: - """ - Get global labels. - - Returns: - Dict[str, str]: Copy of global labels dictionary - """ + """Get global labels.""" with self._lock: return self._global_labels.copy() @property def global_annotations(self) -> Dict[str, str]: - """ - Get global annotations. - - Returns: - Dict[str, str]: Copy of global annotations dictionary - """ + """Get global annotations.""" with self._lock: return self._global_annotations.copy() - @property - def aws_metadata(self) -> Dict[str, Any]: - """Get AWS metadata.""" - with self._lock: - return self._aws_metadata.copy() - @property def git_metadata(self) -> Dict[str, Any]: """Get Git metadata.""" with self._lock: return self._git_metadata.copy() - def set_labels(self, labels: Dict[str, str]) -> None: - """ - Set global labels. + @property + def modules_metadata(self) -> Dict[str, Dict[str, Any]]: + """Get all modules metadata.""" + with self._lock: + return self._modules_metadata.copy() - Args: - labels: Dictionary of labels to set - """ + def set_labels(self, labels: Dict[str, str]) -> None: + """Set global labels.""" with self._lock: self._global_labels.update(labels) def set_annotations(self, annotations: Dict[str, str]) -> None: - """ - Set global annotations. - - Args: - annotations: Dictionary of annotations to set - """ + """Set global annotations.""" with self._lock: self._global_annotations.update(annotations) - def set_aws_metadata(self, metadata: Dict[str, Any]) -> None: - """Set AWS metadata.""" - with self._lock: - self._aws_metadata.update(metadata) - def set_git_metadata(self, metadata: Dict[str, Any]) -> None: """Set Git metadata.""" with self._lock: self._git_metadata.update(metadata) + def set_module_metadata(self, module_name: str, metadata: Dict[str, Any]) -> None: + """ + Set metadata for a specific module. + Each module's metadata is stored under its own namespace. + """ + with self._lock: + if module_name not in self._modules_metadata: + self._modules_metadata[module_name] = {} + self._modules_metadata[module_name].update(metadata) + + def get_module_metadata(self, module_name: str) -> Dict[str, Any]: + """Get metadata for a specific module.""" + with self._lock: + return self._modules_metadata.get(module_name, {}).copy() + class InitConfig(Protocol): project_name: str @@ -142,12 +110,7 @@ class InitConfig(Protocol): def setup_global_metadata(init_config: InitConfig) -> None: - """ - Initialize global metadata for resources. - - Args: - init_config: Initialization configuration object - """ + """Initialize global metadata for resources.""" try: metadata = MetadataSingleton() @@ -176,6 +139,7 @@ def setup_global_metadata(init_config: InitConfig) -> None: # Set global metadata metadata.set_labels(all_labels) metadata.set_annotations(init_config.metadata.get("annotations", {})) + metadata.set_git_metadata(git_info) log.info("Global metadata initialized successfully") @@ -184,116 +148,54 @@ def setup_global_metadata(init_config: InitConfig) -> None: raise -def set_global_labels(labels: Dict[str, str]) -> None: - """Sets global labels.""" - MetadataSingleton().set_labels(labels) - - -def set_global_annotations(annotations: Dict[str, str]) -> None: - """Sets global annotations.""" - MetadataSingleton().set_annotations(annotations) +def get_compliance_metadata() -> ComplianceConfig: + """ + Get compliance metadata from the global singleton. + """ + try: + metadata = MetadataSingleton() + if compliance_metadata := metadata.get_module_metadata("compliance"): + try: + return ComplianceConfig.model_validate(compliance_metadata) + except ValidationError as e: + log.error(f"Compliance metadata validation error: {str(e)}") + # Try parsing with more lenient validation + return ComplianceConfig.from_pulumi_config(Config(), datetime.now(timezone.utc)) + return ComplianceConfig() + except Exception as e: + log.error(f"Failed to get compliance metadata: {str(e)}") + return ComplianceConfig() -def get_compliance_metadata() -> ComplianceConfig: - """Get default compliance metadata""" - # TODO: HIGH PRIORITY: Replace with pulumi stack config derived metadata instead of hard coded values. Consider adding support for non-ato values when developing against pre-prod compliance metadata. - return ComplianceConfig( - fisma=Fisma( - ato=FismaAto( - authorized=datetime.strptime("2025-03-27T00:00:00", "%Y-%m-%dT%H:%M:%S"), - renew=datetime.strptime("2026-03-27T00:00:00", "%Y-%m-%dT%H:%M:%S"), - review=datetime.strptime("2028-03-27T00:00:00", "%Y-%m-%dT%H:%M:%S"), - ), - enabled=True, - level="moderate", - ), - nist=Nist( - auxiliary=["ac-6.1", "ac-2.13"], - controls=["ac-1"], - enabled=True, - exceptions=["ca-7.1", "ma-2.2", "si-12"], - ), - scip=Scip( - environment="prod", - ownership={ - "operator": ScipOwnership( - contacts=["seti2@nasa.gov", "alien51@nasa.gov"], - name="science-team-seti2-obs2819", - ), - "provider": ScipOwnership( - contacts=["scip@nasa.gov", "bobert@nasa.gov"], - name="scip-team-xyz", - ), - }, - provider=ScipProvider(name="Kubevirt", regions=["scip-west-1", "scip-east-1", "scip-lunar-2"]), - ), - ) - - -def export_compliance_metadata(): - """Export compliance metadata to Pulumi stack outputs.""" +def export_compliance_metadata() -> None: + """ + Export compliance metadata to Pulumi stack outputs. + Uses metadata from the global singleton. + """ try: log.info("Exporting compliance metadata") - metadata = get_compliance_metadata() metadata_singleton = MetadataSingleton() - # Get Git metadata with fallbacks - git_info = metadata_singleton.git_metadata - git_metadata = { - "branch": git_info.get("branch_name", "unknown"), - "commit": git_info.get("commit_hash", "unknown"), - "remote": git_info.get("remote_url", "unknown"), - } + # Get compliance metadata + compliance_metadata = get_compliance_metadata() - # Get AWS metadata with fallbacks - aws_metadata = metadata_singleton.aws_metadata or { - "aws_user_account_id": "unknown", - "aws_user_id": "unknown", - "aws_user_arn": "unknown", + # Get Git metadata + git_metadata = metadata_singleton.git_metadata + git_info = { + "branch": git_metadata.get("branch_name", "unknown"), + "commit": git_metadata.get("commit_hash", "unknown"), + "remote": git_metadata.get("remote_url", "unknown"), } # Create the compliance export structure - # TODO: HIGH PRIORITY: Replace ato date valueswith pulumi stack config derived - # metadata instead of hard coded values. Consider adding support for - # non-ato values when developing against pre-prod compliance metadata. stack_outputs = { "config": { - "compliance": { - "fisma": { - "ato": { - "authorized": metadata.fisma.ato.authorized.strftime("%Y-%m-%dT%H:%M:%S"), - "renew": metadata.fisma.ato.renew.strftime("%Y-%m-%dT%H:%M:%S"), - "review": metadata.fisma.ato.review.strftime("%Y-%m-%dT%H:%M:%S"), - }, - "enabled": metadata.fisma.enabled, - "level": metadata.fisma.level, - }, - "nist": { - "auxiliary": metadata.nist.auxiliary, - "controls": metadata.nist.controls, - "enabled": metadata.nist.enabled, - "exceptions": metadata.nist.exceptions, - }, - "scip": { - "environment": metadata.scip.environment, - "ownership": { - "operator": vars(metadata.scip.ownership["operator"]), - "provider": vars(metadata.scip.ownership["provider"]), - }, - "provider": { - "name": metadata.scip.provider.name, - "regions": metadata.scip.provider.regions, - }, - }, - }, - "aws": { - "sts_caller_identity": aws_metadata, - }, - "source_repository": git_metadata, + "compliance": compliance_metadata.to_dict(), + "source_repository": git_info, } } - # Export the full stack outputs + # Export the stack outputs pulumi.export("stack_outputs", stack_outputs) log.info("Successfully exported compliance metadata") diff --git a/modules/core/stack_outputs.py b/modules/core/stack_outputs.py index 83d630b..f11f803 100644 --- a/modules/core/stack_outputs.py +++ b/modules/core/stack_outputs.py @@ -32,17 +32,14 @@ def get_stack_outputs( # Add compliance data if compliance_config := init_config.get("compliance_config"): - config["compliance"] = { - "scip": compliance_config.scip.dict(), - "fisma": compliance_config.fisma.dict(), - "nist": compliance_config.nist.dict(), - } + # Convert compliance config to dict using model's method + config["compliance"] = compliance_config.to_dict() # Add module-specific configs for module_name, metadata in modules_metadata.items(): config[module_name] = metadata - # Add kubernetes versions separately + # Add kubernetes versions k8s_versions = { name: {version: {} for version in versions} for name, versions in init_config.get("versions", {}).items() @@ -76,13 +73,6 @@ def collect_global_metadata() -> GlobalMetadata: def collect_module_metadata(global_metadata: Dict[str, Any], modules_metadata: Dict[str, Any] = None) -> Dict[str, Any]: """ Merge global metadata with module-specific metadata. - - Args: - global_metadata (Dict[str, Any]): The global metadata collected. - modules_metadata (Dict[str, Any], optional): Metadata from modules. - - Returns: - Dict[str, Any]: Combined module metadata dictionary. """ module_metadata = global_metadata.copy() if modules_metadata: diff --git a/modules/core/types.py b/modules/core/types.py index 54b4cd7..6457f35 100644 --- a/modules/core/types.py +++ b/modules/core/types.py @@ -7,6 +7,7 @@ It provides type-safe configuration structures using Pydantic models and TypedDict. """ +from datetime import datetime, timezone from typing import Dict, List, Optional, TypeVar, Union, TypedDict, Any, Protocol from pydantic import BaseModel, Field, ConfigDict import pulumi @@ -72,10 +73,8 @@ class InitializationConfig(BaseModel): kubernetes_provider: Optional[k8s.Provider] = None git_info: GitInfo = Field(default_factory=GitInfo) compliance_config: ComplianceConfig = Field(default_factory=ComplianceConfig) - metadata: Dict[str, Dict[str, str]] = Field( - default_factory=lambda: {"labels": {}, "annotations": {}} - ) - + metadata: Dict[str, Dict[str, str]] = Field(default_factory=lambda: {"labels": {}, "annotations": {}}) + deployment_date_time: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) model_config = ConfigDict(arbitrary_types_allowed=True)