10 Appendix: Code re-cap
Our final code will now look like this:
10.0.1 my-container-infra.py
import os
import aws_cdk as cdk
from aws_cdk import (
aws_cloudwatch as cw,
aws_ec2 as ec2,
aws_sns as sns,
aws_sns_subscriptions as snssubs,
)
import cdk_monitoring_constructs as cdkmon
import containers
import monitoring
app = cdk.App()
env = cdk.Environment(
account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
)
stack = cdk.Stack(app, "my-container-infra", env=env)
vpcname = app.node.try_get_context("vpcname")
if vpcname:
vpc = ec2.Vpc.from_lookup(stack, "vpc", vpc_name=vpcname)
else:
vpc = ec2.Vpc(stack, "vpc", vpc_name="my-vpc", nat_gateways=1, max_azs=2)
config = containers.ClusterConfig(vpc=vpc)
cluster = containers.add_cluster(stack, "my-test-cluster", config)
taskconfig: containers.TaskConfig = {
"cpu": 512,
"memory_limit_mib": 1024,
"family": "webapp",
}
containerconfig: containers.ContainerConfig = {
"image": "public.ecr.aws/aws-containers/hello-app-runner:latest",
"tcp_ports": [8000],
}
taskdef = containers.add_task_definition_with_container(
stack, f"taskdef-{taskconfig['family']}", taskconfig, containerconfig
)
service = containers.add_service(
stack, f"service-{taskconfig['family']}", cluster, taskdef, 8000, 2, True
)
containers.set_service_scaling(
service=service.service,
config=containers.ServiceScalingConfig(
min_count=1,
max_count=4,
scale_cpu_target=containers.ScalingThreshold(percent=50),
scale_memory_target=containers.ScalingThreshold(percent=70),
),
)
alarm_topic = sns.Topic(stack, 'alarm-topic', display_name='Alarm topic')
monitoring_config = monitoring.MonitoringConfig(dashboard_name='monitoring', default_alarm_topic=alarm_topic)
mon = monitoring.init_monitoring(stack, monitoring_config)
mon["handler"].add_medium_header("Test App monitoring")
mon["handler"].monitor_fargate_service(
fargate_service=service,
human_readable_name="My test service",
)
mon["handler"].monitor_fargate_service(
fargate_service=service,
human_readable_name='My test service',
add_running_task_count_alarm={
'alarm1': cdkmon.RunningTaskCountThreshold(
max_running_tasks=2,
comparison_operator_override=cw.ComparisonOperator.LESS_THAN_THRESHOLD,
evaluation_periods=2,
datapoints_to_alarm=2,
period=cdk.Duration.minutes(5),
)
})
alarm_email = 'hello@example.com'
alarm_topic.add_subscription(snssubs.EmailSubscription(alarm_email))
app.synth()10.0.2 containers.py
from typing import Literal, TypedDict, List, NotRequired # noqa
import constructs as cons
from aws_cdk import (
aws_ec2 as ec2,
aws_ecs as ecs,
aws_ecs_patterns as ecspat,
aws_logs as logs,
)
class TaskConfig(TypedDict):
cpu: Literal[256, 512, 1024, 2048, 4096]
memory_limit_mib: int
family: str
class ContainerConfig(TypedDict):
image: str
tcp_ports: List[int]
def add_task_definition_with_container(
scope: cons.Construct,
id: str,
task_config: TaskConfig,
container_config: ContainerConfig,
) -> ecs.FargateTaskDefinition:
taskdef = ecs.FargateTaskDefinition(
scope,
id,
cpu=task_config["cpu"],
memory_limit_mib=task_config["memory_limit_mib"],
family=task_config["family"],
)
logdriver = ecs.LogDrivers.aws_logs(
stream_prefix=taskdef.family,
log_retention=logs.RetentionDays.ONE_DAY,
)
image = ecs.ContainerImage.from_registry(container_config["image"])
image_id = f"container-{_extract_image_name(container_config['image'])}"
containerdef = taskdef.add_container(image_id, image=image, logging=logdriver)
for port in container_config["tcp_ports"]:
containerdef.add_port_mappings(
ecs.PortMapping(container_port=port, protocol=ecs.Protocol.TCP)
)
return taskdef
def add_service(
scope: cons.Construct,
id: str,
cluster: ecs.Cluster,
taskdef: ecs.FargateTaskDefinition,
port: int,
desired_count: int,
use_public_endpoint: bool = True,
service_name: str | None = None,
) -> ecspat.ApplicationLoadBalancedFargateService:
service = ecspat.ApplicationLoadBalancedFargateService(
scope,
id,
cluster=cluster,
task_definition=taskdef,
listener_port=port,
desired_count=desired_count,
service_name=service_name,
circuit_breaker=ecs.DeploymentCircuitBreaker(
rollback=True,
),
public_load_balancer=use_public_endpoint,
)
return service
class ClusterConfig(TypedDict):
vpc: ec2.IVpc
enable_container_insights: NotRequired[bool]
def add_cluster(scope: cons.Construct, id: str, config: ClusterConfig) -> ecs.Cluster:
return ecs.Cluster(scope, id, vpc=config["vpc"], container_insights=config.get("enable_container_insights", None))
def _extract_image_name(image_ref):
name_with_tag = image_ref.split("/")[-1]
name = name_with_tag.split(":")[0]
return name
class ScalingThreshold(TypedDict):
percent: float
class ServiceScalingConfig(TypedDict):
min_count: int
max_count: int
scale_cpu_target: ScalingThreshold
scale_memory_target: ScalingThreshold
def set_service_scaling(service: ecs.FargateService, config: ServiceScalingConfig):
scaling = service.auto_scale_task_count(
max_capacity=config["max_count"], min_capacity=config["min_count"]
)
scaling.scale_on_cpu_utilization(
"CpuScaling", target_utilization_percent=config["scale_cpu_target"]["percent"]
)
scaling.scale_on_memory_utilization(
"MemoryScaling",
target_utilization_percent=config["scale_memory_target"]["percent"],
)10.0.3 monitoring.py
from typing import NotRequired, TypedDict
from constructs import Construct
import aws_cdk as cdk
from aws_cdk import (
aws_sns as sns,
)
import cdk_monitoring_constructs as cdkmon
class MonitoringConfig(TypedDict):
dashboard_name: str
default_alarm_topic: NotRequired[sns.ITopic]
default_alarm_name_prefix: NotRequired[str]
class MonitoringContext(TypedDict):
handler: cdkmon.MonitoringFacade
default_alarm_topic: NotRequired[sns.ITopic]
default_alarm_name_prefix: NotRequired[str]
def init_monitoring(scope: Construct, config: MonitoringConfig) -> MonitoringContext:
sns_alarm_strategy = cdkmon.NoopAlarmActionStrategy()
if config.get("default_alarm_topic"):
sns_alarm_strategy = cdkmon.SnsAlarmActionStrategy(on_alarm_topic=config.get("default_alarm_topic"))
default_alarm_name_prefix = config.get("default_alarm_name_prefix")
if default_alarm_name_prefix is None:
default_alarm_name_prefix = config["dashboard_name"]
return MonitoringContext(
handler=cdkmon.MonitoringFacade(
scope,
config["dashboard_name"],
alarm_factory_defaults=cdkmon.AlarmFactoryDefaults(
actions_enabled=True,
action=sns_alarm_strategy,
alarm_name_prefix=default_alarm_name_prefix
)),
default_alarm_topic=config.get("default_alarm_topic"),
default_alarm_name_prefix=default_alarm_name_prefix)10.0.4 containers_test.py
import pytest
import aws_cdk as cdk
from aws_cdk import (
aws_ec2 as ec2,
aws_ecs as ecs,
assertions,
)
import containers
def test_ecs_cluster_defined_with_existing_vpc():
stack = cdk.Stack()
vpc = ec2.Vpc(stack, "vpc")
config = containers.ClusterConfig(vpc=vpc)
cluster = containers.add_cluster(stack, "my-test-cluster", config)
template = assertions.Template.from_stack(stack)
template.resource_count_is("AWS::ECS::Cluster", 1)
assert cluster.vpc is vpc
def test_check_that_container_insights_become_enabled():
stack = cdk.Stack()
vpc = ec2.Vpc(stack, "vpc")
config = containers.ClusterConfig(vpc=vpc, enable_container_insights=True)
containers.add_cluster(stack, "test-cluster", config)
template = assertions.Template.from_stack(stack)
template.has_resource_properties('AWS::ECS::Cluster', {
'ClusterSettings': assertions.Match.array_with(
pattern=[
assertions.Match.object_equals(pattern={
'Name': 'containerInsights',
'Value': 'enabled'
})
]
)
})
def test_ecs_fargate_task_definition_defined():
stack = cdk.Stack()
cpuval = 512
memval = 1024
familyval = "test"
taskcfg: containers.TaskConfig = {
"cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}
image = "public.ecr.aws/aws-containers/hello-app-runner:latest"
containercfg: containers.ContainerConfig = {"image": image, "tcp_ports": [8000]}
taskdef = containers.add_task_definition_with_container(
stack, f"taskdef-{taskcfg['family']}", taskcfg, containercfg
)
assert taskdef.is_fargate_compatible
assert taskdef in stack.node.children
template = assertions.Template.from_stack(stack)
template.resource_count_is("AWS::ECS::TaskDefinition", 1)
template.has_resource_properties(
"AWS::ECS::TaskDefinition",
{
"RequiresCompatibilities": ["FARGATE"],
"Cpu": str(cpuval),
"Memory": str(memval),
"Family": familyval,
},
)
def test_container_definition_added_to_task_definition():
stack = cdk.Stack()
cpuval = 512
memval = 1024
familyval = "test"
taskcfg: containers.TaskConfig = {
"cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}
image_name = "public.ecr.aws/aws-containers/hello-app-runner:latest"
containercfg: containers.ContainerConfig = {
"image": image_name,
"tcp_ports": [8000],
}
taskdef = containers.add_task_definition_with_container(
stack, "test-taskdef", taskcfg, containercfg
)
template = assertions.Template.from_stack(stack)
containerdef: ecs.ContainerDefinition = taskdef.default_container # type: ignore
assert containerdef is not None
assert containerdef.image_name == image_name
template.has_resource_properties(
"AWS::ECS::TaskDefinition",
{
"ContainerDefinitions": assertions.Match.array_with(
[assertions.Match.object_like({"Image": image_name})]
)
},
)
@pytest.fixture
def service_test_input_data():
stack = cdk.Stack()
vpc = ec2.Vpc(stack, "vpc")
config=containers.ClusterConfig(vpc=vpc)
cluster = containers.add_cluster(stack, "test-cluster", config)
cpuval = 512
memval = 1024
familyval = "test"
taskcfg: containers.TaskConfig = {
"cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}
image_name = "public.ecr.aws/aws-containers/hello-app-runner:latest"
containercfg: containers.ContainerConfig = {
"image": image_name,
"tcp_ports": [8000],
}
taskdef = containers.add_task_definition_with_container(
stack, "test-taskdef", taskcfg, containercfg
)
return {"stack": stack, "cluster": cluster, "task_definition": taskdef}
def test_fargate_service_created_with_only_mandatory_properties(
service_test_input_data,
):
stack = service_test_input_data["stack"]
cluster = service_test_input_data["cluster"]
taskdef = service_test_input_data["task_definition"]
port = 80
desired_count = 1
service = containers.add_service(
stack, "test-service", cluster, taskdef, port, desired_count
)
sg_capture = assertions.Capture()
template = assertions.Template.from_stack(stack)
assert service.cluster == cluster
assert service.task_definition == taskdef
template.resource_count_is("AWS::ECS::Service", 1)
template.has_resource_properties(
"AWS::ECS::Service",
{
"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{
"AwsvpcConfiguration": assertions.Match.object_like(
{
"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
template.resource_count_is("AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
template.has_resource_properties(
"AWS::ElasticLoadBalancingV2::LoadBalancer",
{"Type": "application", "Scheme": "internet-facing"},
)
template.has_resource_properties(
"AWS::EC2::SecurityGroup",
{
"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like(
{"CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
)
]
)
},
)
def test_fargate_service_created_without_public_access(service_test_input_data):
stack = service_test_input_data["stack"]
cluster = service_test_input_data["cluster"]
taskdef = service_test_input_data["task_definition"]
port = 80
desired_count = 1
containers.add_service(
stack, "test-service", cluster, taskdef, port, desired_count, False
)
template = assertions.Template.from_stack(stack)
template.resource_count_is("AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
template.has_resource_properties(
"AWS::ElasticLoadBalancingV2::LoadBalancer",
{"Type": "application", "Scheme": "internal"},
)
def test_scaling_settings_for_service(service_test_input_data):
stack = service_test_input_data["stack"]
cluster = service_test_input_data["cluster"]
taskdef = service_test_input_data["task_definition"]
port = 80
desired_count = 2
service = containers.add_service(
stack, "test-service", cluster, taskdef, port, desired_count, False
)
config = containers.ServiceScalingConfig(
min_count=1,
max_count=5,
scale_cpu_target=containers.ScalingThreshold(percent=50),
scale_memory_target=containers.ScalingThreshold(percent=50),
)
containers.set_service_scaling(service=service.service, config=config)
scale_resource = assertions.Capture()
template = assertions.Template.from_stack(stack)
template.resource_count_is("AWS::ApplicationAutoScaling::ScalableTarget", 1)
template.has_resource_properties(
"AWS::ApplicationAutoScaling::ScalableTarget",
{
"MaxCapacity": config["max_count"],
"MinCapacity": config["min_count"],
"ResourceId": scale_resource,
"ScalableDimension": "ecs:service:DesiredCount",
"ServiceNamespace": "ecs",
},
)
template.resource_count_is("AWS::ApplicationAutoScaling::ScalingPolicy", 2)
template.has_resource_properties(
"AWS::ApplicationAutoScaling::ScalingPolicy",
{
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
{
"PredefinedMetricSpecification": assertions.Match.object_equals(
{"PredefinedMetricType": "ECSServiceAverageCPUUtilization"}
),
"TargetValue": config["scale_cpu_target"]["percent"],
}
),
},
)
template.has_resource_properties(
"AWS::ApplicationAutoScaling::ScalingPolicy",
{
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
{
"PredefinedMetricSpecification": assertions.Match.object_equals(
{"PredefinedMetricType": "ECSServiceAverageMemoryUtilization"}
),
"TargetValue": config["scale_memory_target"]["percent"],
}
),
},
)10.0.5 monitoring_test.py
import pytest
import aws_cdk as cdk
from aws_cdk import (
assertions,
aws_ec2 as ec2,
aws_sns as sns,
)
import monitoring as mon
def test_init_monitoring_of_stack_with_defaults():
stack = cdk.Stack()
config = mon.MonitoringConfig(dashboard_name="test-monitoring")
mon.init_monitoring(stack, config)
template = assertions.Template.from_stack(stack)
print(template)
template.resource_count_is("AWS::CloudWatch::Dashboard", 1)
template.has_resource_properties(
"AWS::CloudWatch::Dashboard", {"DashboardName": config["dashboard_name"]}
)
def test_init_monitoring_of_stack_with_sns_alarm_topic():
stack = cdk.Stack()
ec2.Vpc(stack, 'vpc')
alarm_topic = sns.Topic(stack, 'alarm-topic')
monitoring_config = mon.MonitoringConfig(
dashboard_name='test-monitoring',
default_alarm_topic=alarm_topic # type: ignore
)
monitoring = mon.init_monitoring(stack, config=monitoring_config)
assert(monitoring.get("default_alarm_topic") == monitoring_config.get("default_alarm_topic"))
assert(monitoring.get("default_alarm_name_prefix") == monitoring_config.get("dashboard_name"))