Awesome
Kedro Wings
As Seen on YouTube DataEngineerOne:
- Kedro Wings: It's almost too easy to write pipelines this way.
- Easy Stateful Pipelines with Chronocoding and Kedro Wings
Give your next kedro project Wings! The perfect plugin for brand new pipelines, and new kedro users. This plugin enables easy and fast creation of datasets so that you can get straight into coding your pipelines.
Quick Start Usage Example: Iris Example
The following example is a recreation of the iris example pipeline.
Kedro Wings enables super fast creation of pipelines by taking care of all the catalog work for you. Catalog entries are automatically created by parsing the values for your nodes' inputs and outputs.
This pipeline automatically creates a dataset that reads from the iris.csv
and then it creates 12 more datasets, corresponding to the outputs and inputs of the other datasets.
wing_example = Pipeline([
node(
split_data,
inputs=['01_raw/iris.csv', 'params:example_test_data_ratio'],
outputs=dict(
train_x="02_intermediate/example_train_x.csv",
train_y="02_intermediate/example_train_y.csv",
test_x="02_intermediate/example_test_x.csv",
test_y="02_intermediate/example_test_y.csv")
),
node(
train_model,
["02_intermediate/example_train_x.csv", "02_intermediate/example_train_y.csv", "parameters"],
outputs="06_models/example_model.pkl",
),
node(
predict,
inputs=dict(
model="06_models/example_model.pkl",
test_x="02_intermediate/example_test_x.csv"
),
outputs="07_model_output/example_predictions.pkl",
),
node(
report_accuracy,
inputs=["07_model_output/example_predictions.pkl", "02_intermediate/example_test_y.csv"],
None
),
])
Quick Start Example: Chronocoding
Watch the video on Chronocoding here: Easy Stateful Pipelines with Chronocoding and Kedro Wings
Sometimes, there arises a need to rewrite data to the same path. This makes it easier to save state between kedro runs. Using KedroWings, you can automatically generate chronocoded datasets which temporally separates a read and write to a dataset.
By adding an !
at the end of a dataset, we signal to kedro that we wish to overwrite the data in that same filepath. Thus, we get around kedro's DAG requirement for datasets.
In Depth Breakdown on Chronocoding here: [KED-1667] Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs
def state_modifier(state: str) -> str:
current_value = int(state)
new_value = current_value + 1
return str(new_value)
def create_pipelines(**kwargs):
return Pipeline([
node(
state_modifier,
inputs="01_raw/state.txt",
outputs="01_raw/state.txt!"
),
])
Installation
Kedro Wings is available on pypi, and is installed with kedro hooks.
pip install kedro-wings
Setup with Kedro Pipeline
Simply add a KedroWings
instance to the ProjectContext
hooks
tuple.
from kedro_wings import KedroWings
class ProjectContext(KedroContext):
hooks = (
KedroWings(),
)
Setup with Jupyter Notebook
Simply pass the kedro context into KedroWings
, and it will automatically add all catalog entries from all available pipelines.
# Load the context if not using a kedro jupyter notebook
from kedro.framework.context import load_context
context = load_context('./')
# Pass the context into KedroWings
from kedro_wings import KedroWings
KedroWings(context=context)
# context catalog now has all wings datasets available.
context.catalog.list()
Usage
Catalog Creation
Catalog entries are created using dataset input and output strings. The API is simple:
inputs="[PATH]/[NAME].[EXT]"
The PATH
portion determines the directory where a file will be saved.
The NAME
portion determines the final output name of the file to be saved.
The EXT
portion determines the dataset used to save and load that particular data.
Ex: Creating an iris.csv reader
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')
This will create a pandas.CSVDataSet
pointing at the 01_raw/iris.csv
file.
Ex: Overwrite a Kedro Wing dataset using catalog.yml
# pipeline.py
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')
# catalog.yml
01_raw/iris.csv':
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
If a catalog entry already exists inside of catalog.yml
, with a name that matches the wing catalog name,
KedroWings will NOT create that catalog, and will instead defer to the catalog.yml
entry.
Default Datasets
The following are the datasets available by default.
default_dataset_configs={
".csv": {"type": "pandas.CSVDataSet"},
".yml": {"type": "yaml.YAMLDataSet"},
".yaml": {"type": "yaml.YAMLDataSet"},
".xls": {"type": "pandas.ExcelDataSet"},
".txt": {"type": "text.TextDataSet"},
".png": {"type": "pillow.ImageDataSet"},
".jpg": {"type": "pillow.ImageDataSet"},
".jpeg": {"type": "pillow.ImageDataSet"},
".img": {"type": "pillow.ImageDataSet"},
".pkl": {"type": "pickle.PickleDataSet"},
".parquet": {"type": "pandas.ParquetDataSet"},
".json": {"type": "json.JSONDataSet"}, # Only available in kedro 0.16.3
}
Configuration
Kedro Wings supports configuration on instantiation of the hook.
KedroWings(dataset_configs, paths, root, namespaces, enabled, context)
dataset_configs
:param dataset_configs: A mapping of file name extensions to the type of dataset to be created.
This allows the default dataset configurations to be overridden. This also allows the default extension to dataset mapping to be overridden or extended for other datasets.
Longer extensions are prioritized over shorter extensions, meaning multiple encoding methods can be applied to a single filetype.
Ex: Make default csv files use pipes as separators
KedroWings(dataset_configs={
'.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})
Ex: Use dataset types directly
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
'.csv': pandas.CSVDataSet,
})
Ex: Save CSVs with pipes or commas
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
'.comma.csv': pandas.CSVDataSet,
'.pipe.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})
paths
This allows specified paths to be remapped
:param paths: A mapping of old path names to new path names.
Ex: Moving data from 06_models to a new_models folder
KedroWings(paths={
'06_models': 'new_models',
})
root
This setting is prepended to any paths parsed. This is useful if the dataset supports fsspec
.
:param root: The root directory to save files to. Default: data
Ex: Saving data to s3 instead of the local directory.
KedroWings(root='s3a://my-bucket/kedro-data')
Ex: Allow individual datasets to choose their root
KedroWings(root=None)
namespaces
Namespaces from modular pipelines are supported. This parameter should be a list of the namespaces that KedroWings should account for. If a namespace is encountered, the output filepath will include the namespace in the extension.
Ex: Namespace
The determined file paths would be iris.example1.csv
and iris2.example2.csv
.
KedroWings(namespaces=['example1'])
pipeline(Pipeline([node(lambda x: x, inputs='iris.csv', outputs='iris2.csv')]), namespace="example1")
enabled
This setting allows easy enabling and disabling of the plugin.
:param enabled: Convenience flag to enable or disable this plugin.
Ex: Use an environment variable to enable or disable wings
KedroWings(enabled=os.getenv('ENABLE_WINGS'))