Snapshot-Sleuth Code Examples
This document contains anonymized code examples demonstrating key patterns from the Snapshot-Sleuth implementation.
Example 1: CDK Construct Pattern (TypeScript)
Reusable CDK construct for SQS queue with automatic dead-letter queue:
import { Construct } from 'constructs';
import { Queue, QueueEncryption } from 'aws-cdk-lib/aws-sqs';
interface QueueProps {
queueName: string;
visibilityTimeout?: number;
maxReceiveCount?: number;
}
export class ForensicQueue extends Construct {
public readonly queue: Queue;
public readonly dlq: Queue;
constructor(scope: Construct, id: string, props: QueueProps) {
super(scope, id);
// Dead Letter Queue for failed messages
this.dlq = new Queue(this, `${props.queueName}-DLQ`, {
queueName: `${props.queueName}-DLQ`,
encryption: QueueEncryption.KMS_MANAGED,
retentionPeriod: Duration.days(14),
});
// Main queue with DLQ integration
this.queue = new Queue(this, props.queueName, {
queueName: props.queueName,
encryption: QueueEncryption.KMS_MANAGED,
visibilityTimeout: Duration.seconds(props.visibilityTimeout ?? 300),
deadLetterQueue: {
queue: this.dlq,
maxReceiveCount: props.maxReceiveCount ?? 3,
},
});
}
}
Example 2: Step Functions State Machine with Task Token Callback (TypeScript)
Orchestrating EC2-based forensic analysis with callback pattern:
import { Chain, StateMachine, TaskInput, JsonPath } from 'aws-cdk-lib/aws-stepfunctions';
import { LambdaInvoke, CallAwsService } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { IntegrationPattern } from 'aws-cdk-lib/aws-stepfunctions';
// Validation step
const validateSnapshot = new LambdaInvoke(this, 'ValidateSnapshot', {
lambdaFunction: validationLambda,
outputPath: '$.Payload',
});
// Copy snapshot (AWS API call)
const copySnapshot = new CallAwsService(this, 'CopySnapshot', {
service: 'ec2',
action: 'copySnapshot',
parameters: {
SourceSnapshotId: JsonPath.stringAt('$.snapshotId'),
SourceRegion: JsonPath.stringAt('$.sourceRegion'),
Encrypted: true,
},
iamResources: ['*'],
});
// Launch forensic EC2 with task token callback
const launchForensicInstance = new LambdaInvoke(this, 'LaunchForensicEC2', {
lambdaFunction: instanceLauncher,
integrationPattern: IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: TaskInput.fromObject({
taskToken: JsonPath.taskToken,
snapshotId: JsonPath.stringAt('$.copiedSnapshotId'),
caseId: JsonPath.stringAt('$.caseId'),
}),
timeout: Duration.hours(8),
});
// Define workflow chain
const definition = validateSnapshot
.next(new Parallel(this, 'ParallelOps')
.branch(copySnapshot)
.branch(launchForensicInstance))
.next(notifyCompletion);
new StateMachine(this, 'SnapshotAnalysisMachine', {
definition,
tracingEnabled: true,
logs: { destination: logGroup, level: LogLevel.ALL },
});
Example 3: Lambda Handler with Powertools (TypeScript)
Lambda function with structured logging, metrics, and tracing:
import { Logger } from '@aws-lambda-powertools/logger';
import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics';
import { Tracer } from '@aws-lambda-powertools/tracer';
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
const logger = new Logger({ serviceName: 'snapshot-sleuth-validator' });
const metrics = new Metrics({
namespace: 'Sleuth/Workflow',
serviceName: 'Validator'
});
const tracer = new Tracer({ serviceName: 'snapshot-sleuth-validator' });
const dynamodb = tracer.captureAWSv3Client(new DynamoDBClient({}));
interface SnapshotEvent {
snapshotId: string;
sourceAccount: string;
sourceRegion: string;
caseId: string;
}
export const handler = async (event: SnapshotEvent) => {
logger.info('Processing snapshot', {
snapshotId: event.snapshotId,
caseId: event.caseId
});
const segment = tracer.getSegment();
const subsegment = segment?.addNewSubsegment('ValidateSnapshot');
try {
// Validate snapshot exists and is shared
const isValid = await validateSnapshotAccess(event.snapshotId);
if (!isValid) {
metrics.addMetric('ValidationFailed', MetricUnits.Count, 1);
throw new Error(`Snapshot ${event.snapshotId} not accessible`);
}
// Record workflow start in DynamoDB
await dynamodb.send(new PutItemCommand({
TableName: process.env.TABLE_NAME,
Item: {
CaseID: { S: event.caseId },
SnapshotID: { S: event.snapshotId },
Status: { S: 'VALIDATING' },
StartTime: { S: new Date().toISOString() },
},
}));
metrics.addMetric('ValidationSucceeded', MetricUnits.Count, 1);
subsegment?.addAnnotation('validated', true);
return {
...event,
validated: true,
timestamp: Date.now(),
};
} catch (error) {
subsegment?.addError(error as Error);
logger.error('Validation failed', { error });
throw error;
} finally {
subsegment?.close();
metrics.publishStoredMetrics();
}
};
Example 4: Python Forensic Scanner with Timeout Protection
Scanner factory pattern with timeout protection for forensic tools:
import signal
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional
from aws_lambda_powertools import Logger, Metrics
from aws_xray_sdk.core import xray_recorder
logger = Logger(service="snapshot-sleuth-scanner")
metrics = Metrics(namespace="Sleuth/Tools")
class TimeoutError(Exception):
"""Raised when scanner exceeds time limit."""
pass
def timeout_handler(signum, frame):
raise TimeoutError("Scanner execution timed out")
@dataclass
class ScanResult:
tool_name: str
success: bool
detections: List[Dict]
files_scanned: int
execution_time_seconds: float
error_message: Optional[str] = None
class ForensicScanner(ABC):
"""Abstract base class for forensic scanning tools."""
def __init__(self, timeout_seconds: int = 3600):
self.timeout_seconds = timeout_seconds
@abstractmethod
def scan(self, target_path: str) -> ScanResult:
"""Execute scan on target filesystem."""
pass
def execute_with_timeout(self, target_path: str) -> ScanResult:
"""Execute scan with timeout protection."""
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(self.timeout_seconds)
try:
result = self.scan(target_path)
return result
except TimeoutError:
logger.error(f"{self.__class__.__name__} timed out")
return ScanResult(
tool_name=self.__class__.__name__,
success=False,
detections=[],
files_scanned=0,
execution_time_seconds=self.timeout_seconds,
error_message="Execution timed out"
)
finally:
signal.alarm(0)
class YaraScanner(ForensicScanner):
"""YARA pattern matching scanner."""
def __init__(self, rules_path: str, timeout_seconds: int = 3600):
super().__init__(timeout_seconds)
self.rules_path = rules_path
@xray_recorder.capture("yara_scan")
def scan(self, target_path: str) -> ScanResult:
import yara
import time
import os
start_time = time.time()
detections = []
files_scanned = 0
# Compile YARA rules
rules = yara.compile(filepath=self.rules_path)
# Scan filesystem
for root, dirs, files in os.walk(target_path):
for filename in files:
filepath = os.path.join(root, filename)
try:
matches = rules.match(filepath, timeout=60)
files_scanned += 1
if matches:
detections.append({
"file": filepath,
"rules": [m.rule for m in matches],
"tags": [tag for m in matches for tag in m.tags]
})
except Exception as e:
logger.warning(f"Error scanning {filepath}: {e}")
execution_time = time.time() - start_time
# Emit metrics
metrics.add_metric(
name="ToolExecutionTime",
unit="Seconds",
value=execution_time
)
metrics.add_metric(
name="DetectionCount",
unit="Count",
value=len(detections)
)
return ScanResult(
tool_name="YARA",
success=True,
detections=detections,
files_scanned=files_scanned,
execution_time_seconds=execution_time
)
class ScannerFactory:
"""Factory for creating forensic scanners."""
_scanners = {
"yara": YaraScanner,
"clamav": lambda: ClamAVScanner(),
"artifacts": lambda: ArtifactCollector(),
"timeline": lambda: TimelineGenerator(),
}
@classmethod
def create(cls, scanner_type: str, **kwargs) -> ForensicScanner:
if scanner_type not in cls._scanners:
raise ValueError(f"Unknown scanner type: {scanner_type}")
return cls._scanners[scanner_type](**kwargs)
Example 5: EventBridge Metrics Processor (Python)
Processing Step Functions state changes for CloudWatch metrics:
import json
import boto3
from datetime import datetime
from aws_lambda_powertools import Logger, Metrics
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger(service="snapshot-sleuth-metrics")
metrics = Metrics(namespace="Sleuth/Workflow")
sfn_client = boto3.client("stepfunctions")
def handler(event: dict, context: LambdaContext) -> dict:
"""Process Step Functions EventBridge events and emit CloudWatch metrics."""
detail = event.get("detail", {})
status = detail.get("status")
execution_arn = detail.get("executionArn")
state_machine_arn = detail.get("stateMachineArn")
logger.info("Processing execution event", extra={
"status": status,
"execution_arn": execution_arn
})
# Extract state machine name for dimensions
sm_name = state_machine_arn.split(":")[-1] if state_machine_arn else "unknown"
# Add common dimensions
metrics.add_dimension(name="StateMachine", value=sm_name)
metrics.add_dimension(name="Region", value=context.invoked_function_arn.split(":")[3])
if status == "RUNNING":
metrics.add_metric(name="ExecutionStarted", unit="Count", value=1)
elif status == "SUCCEEDED":
metrics.add_metric(name="ExecutionSucceeded", unit="Count", value=1)
# Calculate and emit duration
if execution_arn:
duration = calculate_execution_duration(execution_arn)
metrics.add_metric(
name="ExecutionDuration",
unit="Milliseconds",
value=duration
)
elif status == "FAILED":
metrics.add_metric(name="ExecutionFailed", unit="Count", value=1)
# Log failure details
if execution_arn:
error_info = get_execution_error(execution_arn)
logger.error("Execution failed", extra=error_info)
elif status == "TIMED_OUT":
metrics.add_metric(name="ExecutionTimedOut", unit="Count", value=1)
metrics.flush_metrics()
return {"processed": True, "status": status}
def calculate_execution_duration(execution_arn: str) -> float:
"""Get execution duration in milliseconds."""
response = sfn_client.describe_execution(executionArn=execution_arn)
start_time = response["startDate"]
stop_time = response.get("stopDate", datetime.now(start_time.tzinfo))
duration = (stop_time - start_time).total_seconds() * 1000
return duration
def get_execution_error(execution_arn: str) -> dict:
"""Extract error details from failed execution."""
response = sfn_client.describe_execution(executionArn=execution_arn)
return {
"error": response.get("error", "Unknown"),
"cause": response.get("cause", "No cause provided"),
"execution_arn": execution_arn
}
Example 6: React Query Hook for Dashboard (TypeScript/React)
Data fetching pattern with React Query for real-time dashboard updates:
import { useQuery, useMutation, useQueryClient } from 'react-query';
interface Engagement {
caseId: string;
snapshotId: string;
status: 'pending' | 'processing' | 'completed' | 'failed';
startTime: string;
completionTime?: string;
evidenceLocation?: string;
}
interface EngagementFilters {
status?: string;
dateRange?: { start: string; end: string };
}
// Fetch engagements with automatic refresh
export const useEngagements = (filters?: EngagementFilters) => {
return useQuery<Engagement[]>(
['engagements', filters],
async () => {
const params = new URLSearchParams();
if (filters?.status) params.append('status', filters.status);
if (filters?.dateRange) {
params.append('startDate', filters.dateRange.start);
params.append('endDate', filters.dateRange.end);
}
const response = await fetch(`/api/engagements?${params}`, {
credentials: 'include', // SSO authentication
});
if (!response.ok) {
throw new Error('Failed to fetch engagements');
}
return response.json();
},
{
refetchInterval: 30000, // Refresh every 30 seconds
staleTime: 10000, // Consider data stale after 10 seconds
retry: 3,
}
);
};
// Fetch single engagement details
export const useEngagementDetails = (caseId: string) => {
return useQuery<Engagement>(
['engagement', caseId],
async () => {
const response = await fetch(`/api/engagements/${caseId}`, {
credentials: 'include',
});
if (!response.ok) {
throw new Error('Failed to fetch engagement details');
}
return response.json();
},
{
enabled: !!caseId, // Only fetch when caseId is provided
refetchInterval: 5000, // More frequent refresh for detail view
}
);
};
// Submit new snapshot for analysis
export const useSubmitSnapshot = () => {
const queryClient = useQueryClient();
return useMutation(
async (data: { snapshotId: string; caseId: string; sourceRegion: string }) => {
const response = await fetch('/api/engagements', {
method: 'POST',
credentials: 'include',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
});
if (!response.ok) {
throw new Error('Failed to submit snapshot');
}
return response.json();
},
{
onSuccess: () => {
// Invalidate and refetch engagements list
queryClient.invalidateQueries('engagements');
},
}
);
};
Example 7: X-Ray Tracing Decorator (Python)
Decorator pattern for automatic X-Ray tracing with graceful degradation:
from functools import wraps
from typing import Callable, Optional
import os
# Check if X-Ray is available
XRAY_ENABLED = os.environ.get("AWS_XRAY_DAEMON_ADDRESS") is not None
if XRAY_ENABLED:
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
def trace(
name: Optional[str] = None,
namespace: str = "Sleuth",
capture_response: bool = False
) -> Callable:
"""
Decorator for X-Ray tracing with graceful degradation.
Args:
name: Segment name (defaults to function name)
namespace: X-Ray namespace for grouping
capture_response: Whether to capture return value in metadata
Usage:
@trace(name="scan_filesystem", namespace="Sleuth/Tools")
def perform_scan(path: str) -> dict:
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if not XRAY_ENABLED:
# X-Ray not available, execute without tracing
return func(*args, **kwargs)
segment_name = name or func.__name__
# Create subsegment within current context
with xray_recorder.in_subsegment(segment_name) as subsegment:
subsegment.put_annotation("namespace", namespace)
subsegment.put_annotation("function", func.__name__)
try:
result = func(*args, **kwargs)
if capture_response and result is not None:
# Safely capture response metadata
try:
subsegment.put_metadata("response", result)
except Exception:
pass # Don't fail if response can't be serialized
return result
except Exception as e:
subsegment.add_exception(e, stack=True)
raise
return wrapper
return decorator
# Usage examples
@trace(name="mount_filesystem", namespace="Sleuth/Storage")
def mount_image(image_path: str, mount_point: str) -> bool:
"""Mount forensic image at specified location."""
import subprocess
result = subprocess.run(
["mount", "-o", "ro,loop", image_path, mount_point],
capture_output=True
)
return result.returncode == 0
@trace(name="collect_artifacts", namespace="Sleuth/Forensics", capture_response=True)
def collect_system_artifacts(mount_point: str) -> dict:
"""Collect forensic artifacts from mounted filesystem."""
artifacts = {
"user_files": [],
"browser_history": [],
"registry_keys": [],
"event_logs": [],
}
# Collection logic here...
return artifacts
Example 8: Dashboard Widget Configuration (TypeScript CDK)
Modular dashboard construct for CloudWatch monitoring:
import { Construct } from 'constructs';
import {
Dashboard,
GraphWidget,
SingleValueWidget,
TextWidget,
Metric,
Row
} from 'aws-cdk-lib/aws-cloudwatch';
interface DashboardProps {
stageName: string;
region: string;
stateMachineArn: string;
}
export class ForensicsWorkflowDashboard extends Construct {
public readonly dashboard: Dashboard;
constructor(scope: Construct, id: string, props: DashboardProps) {
super(scope, id);
const dashboardName = `Sleuth-${props.stageName}-${props.region}-Workflow`;
this.dashboard = new Dashboard(this, 'Dashboard', {
dashboardName,
defaultInterval: Duration.hours(3),
});
// Header row
this.dashboard.addWidgets(
new TextWidget({
markdown: `# Forensic Workflow Dashboard\n**Stage:** ${props.stageName} | **Region:** ${props.region}`,
width: 24,
height: 1,
})
);
// Key metrics row
this.dashboard.addWidgets(
this.createExecutionCountWidget(),
this.createSuccessRateWidget(),
this.createAverageDurationWidget(),
this.createActiveExecutionsWidget(),
);
// Execution timeline
this.dashboard.addWidgets(
this.createExecutionTimelineWidget(props.stateMachineArn),
);
// Duration breakdown
this.dashboard.addWidgets(
this.createDurationBreakdownWidget(),
this.createToolPerformanceWidget(),
);
}
private createExecutionCountWidget(): SingleValueWidget {
return new SingleValueWidget({
title: 'Executions Today',
metrics: [
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionStarted',
statistic: 'Sum',
period: Duration.days(1),
}),
],
width: 6,
height: 4,
});
}
private createSuccessRateWidget(): SingleValueWidget {
return new SingleValueWidget({
title: 'Success Rate',
metrics: [
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionSucceeded',
statistic: 'Average',
period: Duration.hours(24),
}),
],
width: 6,
height: 4,
});
}
private createAverageDurationWidget(): SingleValueWidget {
return new SingleValueWidget({
title: 'Avg Duration (min)',
metrics: [
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionDuration',
statistic: 'Average',
period: Duration.hours(24),
}),
],
width: 6,
height: 4,
});
}
private createActiveExecutionsWidget(): SingleValueWidget {
return new SingleValueWidget({
title: 'Active Executions',
metrics: [
new Metric({
namespace: 'AWS/States',
metricName: 'ExecutionsRunning',
statistic: 'Maximum',
period: Duration.minutes(5),
}),
],
width: 6,
height: 4,
});
}
private createExecutionTimelineWidget(stateMachineArn: string): GraphWidget {
return new GraphWidget({
title: 'Execution Timeline',
left: [
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionStarted',
statistic: 'Sum',
period: Duration.hours(1),
color: '#2196f3',
}),
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionSucceeded',
statistic: 'Sum',
period: Duration.hours(1),
color: '#4caf50',
}),
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionFailed',
statistic: 'Sum',
period: Duration.hours(1),
color: '#f44336',
}),
],
width: 24,
height: 6,
});
}
private createDurationBreakdownWidget(): GraphWidget {
return new GraphWidget({
title: 'Execution Duration Distribution',
left: [
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionDuration',
statistic: 'p50',
period: Duration.hours(1),
label: 'p50',
}),
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionDuration',
statistic: 'p90',
period: Duration.hours(1),
label: 'p90',
}),
new Metric({
namespace: 'Sleuth/Workflow',
metricName: 'ExecutionDuration',
statistic: 'p99',
period: Duration.hours(1),
label: 'p99',
}),
],
width: 12,
height: 6,
});
}
private createToolPerformanceWidget(): GraphWidget {
return new GraphWidget({
title: 'Tool Execution Time',
left: [
new Metric({
namespace: 'Sleuth/Tools',
metricName: 'ToolExecutionTime',
dimensionsMap: { Tool: 'YARA' },
statistic: 'Average',
period: Duration.hours(1),
label: 'YARA',
}),
new Metric({
namespace: 'Sleuth/Tools',
metricName: 'ToolExecutionTime',
dimensionsMap: { Tool: 'ClamAV' },
statistic: 'Average',
period: Duration.hours(1),
label: 'ClamAV',
}),
new Metric({
namespace: 'Sleuth/Tools',
metricName: 'ToolExecutionTime',
dimensionsMap: { Tool: 'Artifacts' },
statistic: 'Average',
period: Duration.hours(1),
label: 'Artifacts',
}),
],
width: 12,
height: 6,
});
}
}
Code Example Summary
| Example | Language | Pattern | Key Concept |
|---|---|---|---|
| 1 | TypeScript CDK | Construct | Reusable infrastructure component |
| 2 | TypeScript CDK | Step Functions | Task token callback pattern |
| 3 | TypeScript | Lambda Powertools | Observability integration |
| 4 | Python | Factory + Timeout | Scanner abstraction with protection |
| 5 | Python | EventBridge Processor | Metrics emission pattern |
| 6 | TypeScript/React | React Query | Data fetching with auto-refresh |
| 7 | Python | Decorator | X-Ray tracing with graceful degradation |
| 8 | TypeScript CDK | Dashboard | CloudWatch widget configuration |