Home

Awesome

paas-data-ingestion

Ingest and prepare data with AWS lambdas, Snowflake and dbt in a scalable, fully replayable manner.

Overview

This repository contains a fully PaaS infrastructure for data ingestion and transformation at scale. This repository has a companion blog post, to which we refer the reader for in-depth analysis of the proposed patterns and the motivations behind the project. While you can enjoy this repository in isolation, please note that our pipeline is part of a bigger architecture, detailed in our MLOps without Much Ops series.

The ingestion pipeline mimics a typical data flow for data-driven applications: clients send events, an endpoint collects them and dumps them into a stream, finally a data warehouse stores them for further processing:

<img src="https://github.com/jacopotagliabue/paas-data-ingestion/blob/main/docs/data_pattern_viz.jpg" width="640">

We make use of three main technologies:

Finally, it is worth mentioning that the repository is e-commerce related as a results of mainly three factors:

Our goal was to keep the code realistic enough for the target use cases, but simple enough as to make it easy for everybody to port this stack to a different industry.

Prerequisites

Please note that by running this project you may incur in some cloud costs: make sure to keep costs and infrastructure monitored and to understand how much your experiments are covered by the free tier.

Structure

The project is divided in three main components and a simulation script.

Infrastructure

Following the infrastructure-as-code paradigm, the folder contains the Pulumi code necessary to properly set up the AWS (lambda function, API Gateway, Cloudwatch, Firehose, etc.) and Snowflake components needed for the ingestion platform to work. For instructions on how to run it, see below.

infrastructure/main.py checks which services have to be created/updated/removed. The script defines:

dbt

The folder contains a typical dbt project, whose goal is to collect, organize, version all the transformations needed to go from the raw data ingested by the pixel endpoint to the pre-calculated features and aggregations that can be consumed by downstream processes (BI, ML, etc.). For instructions on how to run the dbt project, see below.

We used dbt to process RAW logs and to normalize the data into 3 different schemes:

We define the queries in macros because the tables stored on the EVENTS & EVENTS_LIVE schemes are the same but contain data from different analysis periods. For example, the logs table containing all the sessionized and enriched logs is used as to power the other tables:

Both the EVENTS.LOGS and the EVENTS_LIVE.LOGS_STAGED table uses the same macro dbt/macros/models/events/select_logs.sql but with different filters.

Finally, for pedagogical purposes, we have integrated the UAParser.js library for parsing the user-agents directly on Snowflake.

Please see the Bonus section below for a more detailed functional explanation of the overall pattern. As a visual overview, this is the DAG generated by dbt when describing the data transformations we prepared:

<img src="https://github.com/jacopotagliabue/paas-data-ingestion/blob/main/docs/dbt_graph.png" width="400">

These screenshots should get you a sense of how the shape of the data changes from the original log table storing the JSON event from the endpoint, to the final aggregations.

AWS lambda

A simple AWS lambda function implementing a data collection pixel. Once deployed, the resulting /collect endpoint will accept POST requests from clients sending e-commerce data: the function is kept simple for pedagogical reason - after accepting the body, it prepares a simple but structured event, and uses another AWS PaaS service, Firehose, to dump it in a stream for downstream storage and further processing.

Data pumper

To simulate a constant stream of events reaching the collect endpoint, we provide a script that can be run at will to upload e-commerce events in the Google Analytics format.

The events are based on the real-world clickstream dataset open sourced in 2021, the Coveo Data Challenge dataset. By using real-world anonymized events we provide practitioners with a realistic, non-toy scenario to get acquainted with the design patterns we propose. Please cite our work, share / star the repo if you find the dataset useful!

How to run it

Running the stack involves running three operations:

Setting up the infrastructure

  1. Install Pulumi on your computer and configure your Pulumi account:
    pulumi login
    
  2. Jump into the project folder & setup the python venv
    cd infrastructure
    make install
    
  3. Create a new Pulumi Stack:
    pulumi stack init dev
    
  4. Configure the new stack with all the required credentials:
    # AWS
    pulumi config set aws:region <value>
    pulumi config set aws:accessKey <value>
    pulumi config set aws:secretKey <value> --secret
    # Snowflake
    pulumi config set snowflake:region <value>
    pulumi config set snowflake:account <value>
    pulumi config set snowflake:password <value> --secret
    pulumi config set snowflake:username <value>
    
    All the configurations will be stored on the Pulumi.dev.yaml file.
  5. Deploy the stack:
    make up
    

Notes:

Send data with the pumper

At every run, pumper.py will send events as they are happening in that very moments: so running the code two times will not produce duplicate events, but events with similar categorical features and different id, timestamp etc.

Please note that if you want to jump start the log table by bulk-loading the dataset (or a portion of it) to Snowflake, you can avoid some idle time waiting for events to be sent by using the copy into function over the raw csv.

Run dbt transformations

  1. Jump into the project folder & setup the python venv
    cd dbt
    make install
    
  2. Configure your dbt profile following the official guide:
    paas-data-ingestion:
        target: dev
        outputs:
            dev:
                type: snowflake
                account: <value>
                user: <value>
                password: <value>
    
                role: ACCOUNTADMIN
                database: "PAAS_DATA_INGESTION_DEV"
                warehouse: "PAAS_DATA_INGESTION_DEV_WH"
                schema: RAW
                threads: 5
                client_session_keep_alive: False
    
    
  3. Launch DBT:
    make dbt-run
    make dbt-rest
    make dbt-docs
    

Bonus: mix and match materialization options

A basic computation in almost any analytics platform is the "sessionization" of clickstream data ingested from the app. That is the post hoc grouping (inside the data warehouse) of the uninterrupted stream of events into "sessions", which conventionally are defined with a 30 minutes threshold (i.e. given a user, two events 29 minutes apart are in the same session, if 31 minutes have passed, we now have two sessions):

<img src="https://github.com/jacopotagliabue/paas-data-ingestion/blob/main/docs/sessionization.jpg" width="640">

Sessionization requires some heavy lifting with window functions, so we would like to run that computation in batch without much latency constraints every X hours; on the other hand, it would be nice if recent events (e.g. session counts) could still be actionable in the stack. The stylized flow below shows how the tables in the repo can be used to solve the problem within a single stack:

<img src="https://github.com/jacopotagliabue/paas-data-ingestion/blob/main/docs/union.jpg" width="640">

Please refer to our blog post for more details.

Contributors

This project has been brought to you with love by:

A lot of people helped with the blog post: please refer to our article for more details.

License

The code is provided "as is" and released under an open MIT License.