Awesome
serverless-aws-rds-logs-s3
A Serverless Framework example demonstrating a periodic scheduled task to ship logs from RDS to S3.
Quick start:
npm install -g serverless
npm install
- edit
serverless.yml
with the name of your RDS instance sls deploy --stage live
Introduction
For this application, we'd like to build a system to preserve log files from AWS RDS into an S3 bucket. Logs are available in RDS itself for a limited period of time; although RDS offers streaming to CloudWatch for more durable storage, not all RDS backend versions support it. S3 offers highly reliable, configurable storage and a convenient interface if we wish to perform our own analysis of the logs.
This example assumes an existing RDS instance, to which we'll add
- an S3 bucket to store our log files
- a Python service, executed on a schedule, to sync the logs from RDS to the S3 bucket
- an IAM Role granting permission to the service to read the RDS logs and read and write to the bucket
We'll also use the serverless-python-requirements plugin to manage the external Python libraries our service requires.
A Digression
The RDS SDKs (e.g. boto3
for Python) advertise a function
download_db_log_file_portion
for retrieving RDS log files. This function, unfortunately, has a somewhat
awkward interface: the paging interval specified by the API is expressed
as some number of lines, but the service also imposes an absolute size
limit on the payload. If the requested number of lines causes the data
maximum to be breached, the service truncates the last line returned and
inserts a warning marker into the data stream.
One could write code like this to circumvent this problem
TRUNCATE_MARKER='[Your log message was truncated]'
def _get_chunk(self, log_file, marker='0', n_lines=3000, retry=True):
record = rds.download_db_log_file_portion(
DBInstanceIdentifier=self.db_name,
LogFileName=log_file,
Marker=marker,
NumberOfLines=n_lines,
)
# If RDS hits its max log download size in the middle of a line,
# it truncates the line to fit (!!). If we detect a truncated line,
# retry with fewer lines, which should allow the fetch to succeed
# without truncating.
if retry and TRUNCATE_MARKER in record['LogFileData'][-100:]:
return self._get_chunk(log_file, marker=marker,
n_lines=n_lines // 2, retry=False)
else:
return record
but on balance, one would rather not!
A more serious concern involves multibyte character sets: log files that contain non-ASCII characters cannot be pulled reliably by the AWS SDKs.
RDS quietly advertises a REST interface for downloading an entire RDS log file in one step; indeed, this is the same interface that the AWS management console invokes if you download an entire log file via the web interface! Our service will utilize this REST endpoint for fetching logs from RDS; see the end of this document for additional discussion.
serverless
Configuration
The S3 bucket
First we define a new S3 bucket resource.
resources:
Resources:
RDSLogBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Join
- "-"
- - !Ref AWS::AccountId
- "${self:custom.bucket.${self:provider.stage}}"
- !Ref AWS::Region
custom:
bucket:
live: prod-rds-logs
stage: stage-rds-logs
As a convenience, we also define a custom bucket
attribute to hold a
configurable portion of our S3 bucket name; during the deploy, CloudFormation
will concatenate this with the AWS account ID and region to produce the
final bucket name in S3; adjust this to suit local preference or policy.
The function configuration
Our function configuration specifies the handler and other Lambda parameters as usual, but also
-
collects the RDS DB name and the target S3 bucket, which will be passed to the function in two environment variables at runtime
-
specifies a schedule event to trigger the function at intervals of 5 minutes (think
cron
)
functions:
sync_s3:
handler: handler.sync_s3
timeout: 600
memorySize: 256
environment:
DBNAME: ${self:custom.db.${self:provider.stage}}
TARGET_BUCKET: !Ref RDSLogBucket
events:
- schedule: rate(5 minutes)
custom:
db:
live: prod-rds
stage: stage-rds
Additional IAM permissions
We also need to give our function permission to read the RDS logs and read and write the S3 bucket, so we'll add those policies to the execution role created by Serverless when we deploy:
provider:
name: aws
runtime: python3.8
stage: ${opt:stage, "stage"}
iamRoleStatements:
- Effect: 'Allow'
Action:
- 's3:ListBucket'
Resource: !GetAtt RDSLogBucket.Arn
- Effect: 'Allow'
Action:
- 's3:GetObject'
- 's3:PutObject'
Resource: !Join ["", [!GetAtt RDSLogBucket.Arn, "/*"]]
- Effect: Allow
Action:
- rds:DownloadCompleteDBLogFile
Resource: '*'
- Effect: Allow
Action:
- rds:DescribeDBLogFiles
Resource: !Join
- ":"
- - "arn:aws:rds"
- !Ref AWS::Region
- !Ref AWS::AccountId
- "db"
- ${self:custom.db.${self:provider.stage}}
rds:DescribeDBLogFiles
allows the role to query RDS for the list of log
files and their sizes; we'll use that later to only sync logs that have
changed.
rds:DownloadCompleteDBLogFile
grants access to the REST endpoint for
fetching an entire log file. Note the Resource
specification of *
: this
is required by IAM, so if you need to restrict the role's access to only
a subset of your RDS instances, you will need a condition statement or some
other mechanism to do so.
The sync_s3
function
Now that we have our Serverless configuration established, let's have a
look at the sync_s3
function itself. In outline, the function
-
retrieves the configured RDS database and S3 bucket from the execution environment
-
retrieves the list of available log files from RDS
-
compares the reported size of each log with the corresponding file in S3
-
if they differ, streams the contents from RDS to S3
Here is the function in full:
def sync_s3(event, context):
"Sync RDS logs to S3."
# passed in from serverless
db = os.environ['DBNAME']
bucket = os.environ['TARGET_BUCKET']
s3 = boto3.resource('s3')
streamer = RDSLogStreamer(db)
for log_file in streamer.log_files:
obj = s3.Object(bucket, log_file.target_path)
try:
if obj.content_length == log_file.rds_size:
print(f'Skipping existing {log_file}')
continue
except ClientError:
pass # object does not exist
print(f'Sync {log_file}')
f = streamer.stream(log_file)
obj.upload_fileobj(f)
A couple of helper classes handle bookkeeping chores for us like collecting the available logs and their sizes and organizing the destination bucket into date-based directories.
To fetch a log file, we create a signed URL for the REST endpoint and have
requests
open the URL for streaming:
@attr.s
class RDSLogStreamer:
def stream(self, log_file):
signed_url = get_rds_logfile_url(self.db_name, log_file)
response = requests.get(signed_url, stream=True)
response.raise_for_status()
# https://github.com/psf/requests/issues/2155
response.raw.decode_content = True
return response.raw
Additional context: signing the REST request
Invoking the REST call to retrieve an RDS log file requires that the
request be signed with credentials for a user or role with the
rds:DownloadCompleteDBLogFile
permission. Our goal is to use the
execution role we created as part of our deployment — and to which we
carefully added the required permission — to sign the request.
When generating a signed request for a role, we are required to pass a session token with our request. Fortunately, the Lambda execution environment includes the required keys and session token as environment variables
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_SESSION_TOKEN
To generate the signed request, our service packages a lightly customized version of the SigV4 signing example, tailored for the RDS endpoint we need and assuming role-based authentication with a session token.