Awesome
Ethereum ETL Airflow
Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset
Local Development Prerequisites
- direnv
- pyenv
We are using direnv to automatically set up and load the correct python version. We also create a venv in the root folder, that is automatically activated when entering the project folder.
Setting up Airflow DAGs using Google Cloud Composer
Create BigQuery Datasets
- Sign in to BigQuery https://bigquery.cloud.google.com/
- Create new datasets called
crypto_ethereum
,crypto_ethereum_raw
,crypto_ethereum_temp
Create Google Cloud Storage bucket
- Create a new Google Storage bucket to store exported files https://console.cloud.google.com/storage/browser
Create Google Cloud Composer (version 2) environment
Create a new Cloud Composer environment:
export ENVIRONMENT_NAME=ethereum-etl-0
AIRFLOW_CONFIGS_ARR=(
"celery-worker_concurrency=8"
"scheduler-dag_dir_list_interval=300"
"scheduler-min_file_process_interval=120"
)
export AIRFLOW_CONFIGS=$(IFS=, ; echo "${AIRFLOW_CONFIGS_ARR[*]}")
gcloud composer environments create \
$ENVIRONMENT_NAME \
--location=us-central1 \
--image-version=composer-2.1.14-airflow-2.5.1 \
--environment-size=medium \
--scheduler-cpu=2 \
--scheduler-memory=13 \
--scheduler-storage=1 \
--scheduler-count=1 \
--web-server-cpu=1 \
--web-server-memory=2 \
--web-server-storage=512MB \
--worker-cpu=2 \
--worker-memory=13 \
--worker-storage=10 \
--min-workers=1 \
--max-workers=8 \
--airflow-configs=$AIRFLOW_CONFIGS
gcloud composer environments update \
$ENVIRONMENT_NAME \
--location=us-central1 \
--update-pypi-packages-from-file=requirements_airflow.txt
Create variables in Airflow (Admin > Variables in the UI):
Variable | Description |
---|---|
ethereum_output_bucket | GCS bucket to store exported files |
ethereum_provider_uris | Comma separated URIs of Ethereum nodes |
ethereum_destination_dataset_project_id | Project ID of BigQuery datasets |
notification_emails | email for notifications |
Check other variables in dags/ethereumetl_airflow/variables.py
.
Updating package requirements
Suggested package requirements for Composer are stored in requirements_airflow.txt
.
You can update the Composer environment using the following script:
ENVIRONMENT_NAME="ethereum-etl-0"
LOCAL_REQUIREMENTS_PATH="$(mktemp)"
# grep pattern removes comments and whitespace:
cat "./requirements_airflow.txt" | grep -o '^[^#| ]*' > "$LOCAL_REQUIREMENTS_PATH"
gcloud composer environments update \
"$ENVIRONMENT_NAME" \
--location="us-central1" \
--update-pypi-packages-from-file="$LOCAL_REQUIREMENTS_PATH"
Note: Composer can be very pedantic about conflicts in additional packages. You may have to fix dependency conflicts where you had no issues testing locally (when updating dependencies, Composer does something "cleverer" than just pip install -r requirements.txt
). This is why eth-hash
is currently pinned in requirements_airflow.txt
. Typically we have found that pinning eth-hash
and/or eth-rlp
may make things work, though Your Mileage May Vary.
See this issue for further ideas on how to unblock problems you may encounter.
Upload DAGs
> ./upload_dags.sh <airflow_bucket>
Running Tests
pip install \
-r requirements_test.txt \
-r requirements_local.txt \
-r requirements_airflow.txt
pytest -vv -s
Running locally
A docker compose definition has been provided to easily spin up a local Airflow instance.
To build the required image:
docker compose build
To start Airflow:
docker compose up airflow
The instance requires the CLOUDSDK_CORE_PROJECT
environment variable to be set in most cases. Airflow Variables can be defined in variables.json.
Creating Table Definition Files for Parsing Events and Function Calls
Read this article: https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644
Debugging Table Defenition Files
A utility script for debugging and verifying contract parsing in Ethereum data processing pipelines is available. You can simply run
python3 generate_parse_sql.py <path_to_table_definition_file> <date>
This will output some example SQL that can be used to debug if the generated json files from the contract parser are correct.
NOTE: certain files may not have the contract_address
field specified as a valid address ERC20Pool_event_TransferLP but use a select statement on another table instead. For these you can simply pass the contract address yourself like below:
python3 generate_parse_sql.py <path_to_table_definition_file> <date> --contract_address <contract_address>
More Information
You can follow the instructions here for Polygon DAGs https://github.com/blockchain-etl/polygon-etl. The architecture
there is very similar to Ethereum so in most case substituting polygon
for ethereum
will work. Contributions
to this README file for porting documentation from Polygon to Ethereum are welcome.