Awesome
<!--- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->DataFusion on Ray
This was originally a research project donated from ray-sql to evaluate performing distributed SQL queries from Python, using Ray and Apache DataFusion
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation of Apache Arrow, Apache DataFusion, and Ray.
Comparison to other DataFusion projects
Comparison to DataFusion Ballista
- Unlike DataFusion Ballista, DataFusion Ray does not provide its own distributed scheduler and instead relies on Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first
Comparison to DataFusion Python
- DataFusion Python provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends DataFusion Python to provide scalability across multiple nodes.
Example
Run the following example live in your browser using a Google Colab notebook.
import os
import ray
from datafusion_ray import DatafusionRayContext
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
# Start a local cluster
ray.init(resources={"worker": 1})
# Create a context and register a table
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
result_set = ctx.sql(
"select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
)
for record_batch in result_set:
print(record_batch.to_pandas())
Status
- DataFusion Ray can run all queries in the TPC-H benchmark
Features
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
- Support for CSV and Parquet files
Building
# prepare development environment (used to build wheel / install in development)
python3 -m venv venv
# activate the venv
source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-in.txt
Whenever rust code changes (your changes or via git pull
):
# make sure you activate the venv using "source venv/bin/activate" first
maturin develop; python -m pytest
Testing
Running local Rust tests require generating the tpch-data. This can be done by running the following commands:
export TPCH_TEST_PARTITIONS=1
export TPCH_SCALING_FACTOR=1
./scripts/gen-test-data.sh
This will generate data into a top-level data
directory.
Tests can be run with:
export TPCH_DATA_PATH=`pwd`/data
cargo test
Benchmarking
Create a release build when running benchmarks, then use pip to install the wheel.
maturin develop --release
How to update dependencies
To change test dependencies, change the requirements.in
and run
# install pip-tools (this can be done only once), also consider running in venv
python -m pip install pip-tools
python -m piptools compile --generate-hashes -o requirements-310.txt
To update dependencies, run with -U
python -m piptools compile -U --generate-hashes -o requirements-310.txt
More details here