forked from misskecupbung/lambda-enrich-data
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
124 lines (99 loc) · 3.67 KB
/
lambda_function.py
File metadata and controls
124 lines (99 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""AWS Lambda function to enrich data with user preferences from DynamoDB."""
import json
import logging
import os
from typing import Any
import boto3
from botocore.exceptions import ClientError
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize DynamoDB resource outside handler for connection reuse
TABLE_NAME = os.environ.get("TABLE_NAME", "")
dynamodb_resource = boto3.resource("dynamodb")
users_table = dynamodb_resource.Table(TABLE_NAME)
# Default user preferences
DEFAULT_PREFERENCES = {
"language": "en",
"notifications_enabled": False,
}
def get_user_preferences(user_id: str) -> dict[str, Any] | None:
"""Fetch user preferences from DynamoDB.
Args:
user_id: The unique identifier for the user.
Returns:
User preferences dict if found, None otherwise.
Raises:
ClientError: If DynamoDB request fails.
"""
try:
response = users_table.get_item(Key={"id": user_id})
return response.get("Item")
except ClientError as e:
logger.error("Failed to fetch user %s: %s", user_id, e.response["Error"]["Message"])
raise
def enrich_payload(payload: dict[str, Any]) -> dict[str, Any]:
"""Enrich payload with user preferences.
Args:
payload: The original request payload containing user id.
Returns:
Enriched payload with user preferences or defaults.
"""
user_id = payload.get("id")
if not user_id:
logger.warning("No user id provided, using default preferences")
return {**payload, **DEFAULT_PREFERENCES}
user_prefs = get_user_preferences(user_id)
if user_prefs:
logger.info("Found preferences for user %s", user_id)
return {**payload, **user_prefs}
logger.info("No preferences found for user %s, using defaults", user_id)
return {**payload, **DEFAULT_PREFERENCES}
def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""AWS Lambda handler to enrich incoming data with user preferences.
Args:
event: Lambda event containing request data.
context: Lambda context object.
Returns:
Response dict with status code and enriched payload.
"""
logger.info("Processing event: %s", json.dumps(event))
try:
# Handle both direct invocation and API Gateway events
if isinstance(event, list) and event:
body = event[0].get("body", "{}")
elif isinstance(event, dict):
body = event.get("body", "{}")
else:
body = "{}"
# Parse JSON body if it's a string
payload = json.loads(body) if isinstance(body, str) else body
enriched_payload = enrich_payload(payload)
return {
"statusCode": 200,
"body": json.dumps(enriched_payload),
"headers": {
"Content-Type": "application/json",
},
}
except json.JSONDecodeError as e:
logger.error("Invalid JSON in request body: %s", e)
return {
"statusCode": 400,
"body": json.dumps({"error": "Invalid JSON in request body"}),
"headers": {"Content-Type": "application/json"},
}
except ClientError as e:
logger.error("DynamoDB error: %s", e)
return {
"statusCode": 500,
"body": json.dumps({"error": "Internal server error"}),
"headers": {"Content-Type": "application/json"},
}
except Exception as e:
logger.error("Unexpected error: %s", e)
return {
"statusCode": 500,
"body": json.dumps({"error": "Internal server error"}),
"headers": {"Content-Type": "application/json"},
}