def __init__(self, scope: Construct, construct_id: str, **kwargs: Any) -> None:
"""Provision the WAF WebACL.
Args:
scope: The CDK construct scope.
construct_id: The unique identifier for this stack.
**kwargs: Additional keyword arguments passed to the parent Stack.
"""
super().__init__(scope, construct_id, **kwargs)
apply_compliance_aspects(self)
# KMS key for WAF log group encryption.
# CloudWatch Logs requires the key policy to grant the Logs service
# principal access so it can encrypt log data on write.
waf_encryption_key = kms.Key(
self,
"WafEncryptionKey",
description=f"KMS key for {self.stack_name} WAF log group encryption",
enable_key_rotation=True,
# See HelloWorldApp.encryption_key for the rationale — automated
# rotation, no dependent redeploys, 90-day compliance baseline.
rotation_period=Duration.days(90),
removal_policy=RemovalPolicy.DESTROY,
)
# Confused-deputy guard on the CMK's CloudWatch Logs service grant.
# See ``grant_logs_service_to_key`` in ``nag_utils.py``.
grant_logs_service_to_key(
waf_encryption_key,
region=self.region,
account=self.account,
partition=self.partition,
)
# WAF log group — name must start with "aws-waf-logs-" (AWS requirement).
# WAFv2 uses its service-linked role (AWSServiceRoleForWAFv2Logging) to
# write log events; no additional log group resource policy is needed.
waf_log_group = logs.LogGroup(
self,
"WafLogGroup",
log_group_name=f"aws-waf-logs-{self.stack_name}",
encryption_key=waf_encryption_key,
retention=logs.RetentionDays.ONE_WEEK,
removal_policy=RemovalPolicy.DESTROY,
)
web_acl = wafv2.CfnWebACL(
self,
"WebACL",
scope="CLOUDFRONT",
default_action=wafv2.CfnWebACL.DefaultActionProperty(allow={}),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}WebACL",
sampled_requests_enabled=True,
),
rules=[
# Blocks IPs with a poor reputation (scanners, botnets, TOR exits)
wafv2.CfnWebACL.RuleProperty(
name="AWSManagedRulesAmazonIpReputationList",
priority=0,
statement=wafv2.CfnWebACL.StatementProperty(
managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
vendor_name="AWS",
name="AWSManagedRulesAmazonIpReputationList",
)
),
override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}-IpReputationList",
sampled_requests_enabled=True,
),
),
# Core rule set — protects against OWASP Top 10 web exploits
wafv2.CfnWebACL.RuleProperty(
name="AWSManagedRulesCommonRuleSet",
priority=1,
statement=wafv2.CfnWebACL.StatementProperty(
managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
vendor_name="AWS",
name="AWSManagedRulesCommonRuleSet",
)
),
override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}-CommonRuleSet",
sampled_requests_enabled=True,
),
),
# Blocks requests containing known malicious inputs (SQLi, XSS patterns)
wafv2.CfnWebACL.RuleProperty(
name="AWSManagedRulesKnownBadInputsRuleSet",
priority=2,
statement=wafv2.CfnWebACL.StatementProperty(
managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
vendor_name="AWS",
name="AWSManagedRulesKnownBadInputsRuleSet",
)
),
override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}-KnownBadInputs",
sampled_requests_enabled=True,
),
),
# Blocks requests from anonymizing services (VPN, Tor exits, hosting providers)
wafv2.CfnWebACL.RuleProperty(
name="AWSManagedRulesAnonymousIpList",
priority=3,
statement=wafv2.CfnWebACL.StatementProperty(
managed_rule_group_statement=wafv2.CfnWebACL.ManagedRuleGroupStatementProperty(
vendor_name="AWS",
name="AWSManagedRulesAnonymousIpList",
)
),
override_action=wafv2.CfnWebACL.OverrideActionProperty(none={}),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}-AnonymousIpList",
sampled_requests_enabled=True,
),
),
# Rate limiting — blocks a single client exceeding 200 requests per 5 minutes.
# Aggregates by FORWARDED_IP (X-Forwarded-For) because all traffic enters via
# CloudFront, so the source IP at WAF is CloudFront's edge — not the caller's.
# fallback_behavior=MATCH means a missing/invalid header trips the rule, which
# is the safer default (a determined caller can't bypass by stripping XFF).
wafv2.CfnWebACL.RuleProperty(
name="RateLimitPerIP",
priority=4,
action=wafv2.CfnWebACL.RuleActionProperty(block={}),
statement=wafv2.CfnWebACL.StatementProperty(
rate_based_statement=wafv2.CfnWebACL.RateBasedStatementProperty(
limit=200,
aggregate_key_type="FORWARDED_IP",
forwarded_ip_config=wafv2.CfnWebACL.ForwardedIPConfigurationProperty(
header_name="X-Forwarded-For",
fallback_behavior="MATCH",
),
)
),
visibility_config=wafv2.CfnWebACL.VisibilityConfigProperty(
cloud_watch_metrics_enabled=True,
metric_name=f"{self.stack_name}-RateLimitPerIP",
sampled_requests_enabled=True,
),
),
],
)
# Enable WAF logging to the CloudWatch Logs log group.
wafv2.CfnLoggingConfiguration(
self,
"WAFLogging",
log_destination_configs=[waf_log_group.log_group_arn],
resource_arn=web_acl.attr_arn,
)
# Exposed for HelloWorldFrontendStack to attach to CloudFront.
# When the frontend stack is in a different region, CDK bridges this
# value automatically via SSM (cross_region_references=True on the consumer).
self.web_acl_arn = web_acl.attr_arn
# ── CloudWatch Logs Insights saved queries ────────────────────────────
logs.QueryDefinition(
self,
"WafBlockedRequests",
query_definition_name=f"{self.stack_name}/WAF/BlockedRequests",
query_string=logs.QueryString(
fields=[
"@timestamp",
"action",
"httpRequest.clientIp",
"httpRequest.uri",
"httpRequest.httpMethod",
"httpRequest.country",
],
filter_statements=["action = 'BLOCK'"],
sort="@timestamp desc",
limit=50,
),
log_groups=[waf_log_group],
)
logs.QueryDefinition(
self,
"WafTopBlockedRules",
query_definition_name=f"{self.stack_name}/WAF/TopBlockedRules",
query_string=logs.QueryString(
filter_statements=["action = 'BLOCK'"],
stats_statements=["count(*) as blockCount by terminatingRuleId"],
sort="blockCount desc",
limit=25,
),
log_groups=[waf_log_group],
)
logs.QueryDefinition(
self,
"WafRateLimited",
query_definition_name=f"{self.stack_name}/WAF/RateLimitedIPs",
query_string=logs.QueryString(
filter_statements=["terminatingRuleId = 'RateLimitPerIP'"],
stats_statements=["count(*) as blockCount by httpRequest.clientIp"],
sort="blockCount desc",
limit=25,
),
log_groups=[waf_log_group],
)
NagSuppressions.add_stack_suppressions(
self,
[
{
"id": "NIST.800.53.R5-IAMNoInlinePolicy",
"reason": "KMS key grants use inline statements — not directly replaceable with managed policies",
},
],
)
CfnOutput(
self,
"WebAclArn",
description="WAF WebACL ARN — attach to CloudFront distributions in any region",
value=web_acl.attr_arn,
)
CfnOutput(
self,
"WebAclId",
description="WAF WebACL logical ID",
value=web_acl.attr_id,
)
CfnOutput(
self,
"WafLogGroupName",
description="CloudWatch Logs log group receiving WAF access logs",
value=waf_log_group.log_group_name,
)