Home

Awesome

serverless-aws-rds-logs-s3

A Serverless Framework example demonstrating a periodic scheduled task to ship logs from RDS to S3.

Quick start:

Introduction

For this application, we'd like to build a system to preserve log files from AWS RDS into an S3 bucket. Logs are available in RDS itself for a limited period of time; although RDS offers streaming to CloudWatch for more durable storage, not all RDS backend versions support it. S3 offers highly reliable, configurable storage and a convenient interface if we wish to perform our own analysis of the logs.

This example assumes an existing RDS instance, to which we'll add

We'll also use the serverless-python-requirements plugin to manage the external Python libraries our service requires.

A Digression

The RDS SDKs (e.g. boto3 for Python) advertise a function download_db_log_file_portion for retrieving RDS log files. This function, unfortunately, has a somewhat awkward interface: the paging interval specified by the API is expressed as some number of lines, but the service also imposes an absolute size limit on the payload. If the requested number of lines causes the data maximum to be breached, the service truncates the last line returned and inserts a warning marker into the data stream.

One could write code like this to circumvent this problem

    TRUNCATE_MARKER='[Your log message was truncated]'

    def _get_chunk(self, log_file, marker='0', n_lines=3000, retry=True):
        record = rds.download_db_log_file_portion(
            DBInstanceIdentifier=self.db_name,
            LogFileName=log_file,
            Marker=marker,
            NumberOfLines=n_lines,
        )

        # If RDS hits its max log download size in the middle of a line,
        # it truncates the line to fit (!!). If we detect a truncated line,
        # retry with fewer lines, which should allow the fetch to succeed
        # without truncating.
        if retry and TRUNCATE_MARKER in record['LogFileData'][-100:]:
            return self._get_chunk(log_file, marker=marker,
                                   n_lines=n_lines // 2, retry=False)
        else:
            return record

but on balance, one would rather not!

A more serious concern involves multibyte character sets: log files that contain non-ASCII characters cannot be pulled reliably by the AWS SDKs.

RDS quietly advertises a REST interface for downloading an entire RDS log file in one step; indeed, this is the same interface that the AWS management console invokes if you download an entire log file via the web interface! Our service will utilize this REST endpoint for fetching logs from RDS; see the end of this document for additional discussion.

serverless Configuration

The S3 bucket

First we define a new S3 bucket resource.

    resources:
      Resources:
        RDSLogBucket:
          Type: AWS::S3::Bucket
          Properties:
            BucketName: !Join
              - "-"
              - - !Ref AWS::AccountId
                - "${self:custom.bucket.${self:provider.stage}}"
                - !Ref AWS::Region

    custom:
      bucket:
        live: prod-rds-logs
        stage: stage-rds-logs

As a convenience, we also define a custom bucket attribute to hold a configurable portion of our S3 bucket name; during the deploy, CloudFormation will concatenate this with the AWS account ID and region to produce the final bucket name in S3; adjust this to suit local preference or policy.

The function configuration

Our function configuration specifies the handler and other Lambda parameters as usual, but also

    functions:
      sync_s3:
        handler: handler.sync_s3
        timeout: 600
        memorySize: 256
        environment:
          DBNAME: ${self:custom.db.${self:provider.stage}}
          TARGET_BUCKET: !Ref RDSLogBucket
        events:
         - schedule: rate(5 minutes)

    custom:
      db:
        live: prod-rds
        stage: stage-rds

Additional IAM permissions

We also need to give our function permission to read the RDS logs and read and write the S3 bucket, so we'll add those policies to the execution role created by Serverless when we deploy:

    provider:
      name: aws
      runtime: python3.8
      stage: ${opt:stage, "stage"}
      iamRoleStatements:
        - Effect: 'Allow'
          Action:
            - 's3:ListBucket'
          Resource: !GetAtt RDSLogBucket.Arn
        - Effect: 'Allow'
          Action:
            - 's3:GetObject'
            - 's3:PutObject'
          Resource: !Join ["", [!GetAtt RDSLogBucket.Arn, "/*"]]
        - Effect: Allow
          Action:
            - rds:DownloadCompleteDBLogFile
          Resource: '*'
        - Effect: Allow
          Action:
            - rds:DescribeDBLogFiles
          Resource: !Join
            - ":"
            - - "arn:aws:rds"
              - !Ref AWS::Region
              - !Ref AWS::AccountId
              - "db"
              - ${self:custom.db.${self:provider.stage}}

rds:DescribeDBLogFiles allows the role to query RDS for the list of log files and their sizes; we'll use that later to only sync logs that have changed.

rds:DownloadCompleteDBLogFile grants access to the REST endpoint for fetching an entire log file. Note the Resource specification of *: this is required by IAM, so if you need to restrict the role's access to only a subset of your RDS instances, you will need a condition statement or some other mechanism to do so.

The sync_s3 function

Now that we have our Serverless configuration established, let's have a look at the sync_s3 function itself. In outline, the function

Here is the function in full:

    def sync_s3(event, context):
        "Sync RDS logs to S3."

        # passed in from serverless
        db = os.environ['DBNAME']
        bucket = os.environ['TARGET_BUCKET']

        s3 = boto3.resource('s3')
        streamer = RDSLogStreamer(db)

        for log_file in streamer.log_files:
            obj = s3.Object(bucket, log_file.target_path)

            try:
                if obj.content_length == log_file.rds_size:
                    print(f'Skipping existing {log_file}')
                    continue
            except ClientError:
                pass  # object does not exist

            print(f'Sync {log_file}')

            f = streamer.stream(log_file)

            obj.upload_fileobj(f)

A couple of helper classes handle bookkeeping chores for us like collecting the available logs and their sizes and organizing the destination bucket into date-based directories.

To fetch a log file, we create a signed URL for the REST endpoint and have requests open the URL for streaming:

    @attr.s
    class RDSLogStreamer:
        def stream(self, log_file):
            signed_url = get_rds_logfile_url(self.db_name, log_file)

            response = requests.get(signed_url, stream=True)

            response.raise_for_status()

            # https://github.com/psf/requests/issues/2155
            response.raw.decode_content = True

            return response.raw

Additional context: signing the REST request

Invoking the REST call to retrieve an RDS log file requires that the request be signed with credentials for a user or role with the rds:DownloadCompleteDBLogFile permission. Our goal is to use the execution role we created as part of our deployment — and to which we carefully added the required permission — to sign the request.

When generating a signed request for a role, we are required to pass a session token with our request. Fortunately, the Lambda execution environment includes the required keys and session token as environment variables

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_SESSION_TOKEN

To generate the signed request, our service packages a lightly customized version of the SigV4 signing example, tailored for the RDS endpoint we need and assuming role-based authentication with a session token.