Awesome
<!-- omit from toc -->STAC Task (stac-task)
This Python library consists of the Task class, which is used to create custom tasks based on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.
This library is based on a branch of cirrus-lib except aims to be more generic.
Quickstart for Creating New Tasks
from typing import Any
from stactask import Task, DownloadConfig
class MyTask(Task):
name = "my-task"
description = "this task does it all"
def validate(self, payload: dict[str, Any]) -> bool:
return len(self.items) == 1
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
item = self.items[0]
# download a datafile
item = self.download_item_assets(
item,
config=DownloadConfig(include=['data'])
)
# operate on the local file to create a new asset
item = self.upload_item_assets_to_s3(item)
# this task returns a single item
return [item.to_dict(include_self_link=True, transform_hrefs=False)]
Task Input
Task input is often referred to as a 'payload'.
Field Name | Type | Description |
---|---|---|
type | string | Must be FeatureCollection |
features | [Item] | An array of STAC Items |
process | [ProcessDefinition ] | An array of ProcessDefinition objects. |
ProcessDefinition | DEPRECATED A ProcessDefinition object |
ProcessDefinition Object
A Task can be provided additional configuration via the 'process' field in the input payload.
Field Name | Type | Description |
---|---|---|
description | string | Description of the process configuration |
upload_options | UploadOptions | An UploadOptions object |
tasks | Map<str, Map> | Dictionary of task configurations. |
TaskConfig ] | DEPRECATED A list of TaskConfig objects. | |
workflow_options | Map<str, Any> | Dictionary of configuration options applied to all tasks in the workflow |
UploadOptions Object
Options used when uploading Item assets to a remote server can be specified in a
'upload_options' field in the ProcessDefinition
object.
Field Name | Type | Description |
---|---|---|
path_template | string | REQUIRED A string template for specifying the location of uploaded assets |
public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
s3_urls | bool | Controls if the final published URLs should be an s3 (s3://bucket/key) or https URL |
path_template
The 'path_template' string is a way to control the output location of uploaded assets
from a STAC Item using metadata from the Item itself. The template can contain fixed
strings along with variables used for substitution. See the PySTAC documentation for
LayoutTemplate
for a list of supported template variables and their meaning.
collections
The 'collections' dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. At the end of processing, before the final STAC Items are returned, the Task class can be used to assign all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be compared. The first match will cause the Item's Collection ID to be set to the provided value.
For example:
"collections": {
"landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
}
In this example, the task will set any STAC Items that have an ID beginning with "LC08"
to the landsat-c2l2
collection.
See JSONPath Online Evaluator to experiment with JSONPath and regex101 to experiment with regex.
tasks
The 'tasks' field is a dictionary with an optional key for each task. If present, it
contains a dictionary that is converted to a set of keywords and passed to the Task's
process
function. The documentation for each Task will provide the list of available
parameters.
{
"tasks": {
"task-a": {
"param1": "value1"
},
"task-c": {
"param2": "value2"
}
}
}
In the example above, a task named task-a
would have the param1=value1
passed as a
keyword, while task-c
would have param2=value2
passed. If there were a task-b
to
be run, it would not be passed any keywords.
TaskConfig Object
DEPRECATED The 'tasks' field should be a dictionary of parameters, with task names
as keys. See tasks for more information. TaskConfig
objects are supported
for backwards compatibility.
Field Name | Type | Description |
---|---|---|
name | str | REQUIRED Name of the task |
parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Task process function |
workflow_options
The 'workflow_options' field is a dictionary of options that apply to all tasks in the workflow. The 'workflow_options' dictionary is combined with each task's option dictionary. If a key in the 'workflow_options' dictionary conflicts with a key in a task's option dictionary, the task option value takes precedence.
Full ProcessDefinition Example
{
"description": "My process configuration",
"upload_options": {
"path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}",
"collections": {
"landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
}
},
"tasks": {
"task-name": {
"param": "value"
}
}
}
Migration
0.4.x -> 0.5.x
In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with the stac-asset library. This has necessitated a change in the parameters that the download methods accept.
The primary change is that the Task methods download_item_assets
and
download_items_assets
(items plural) now accept fewer explicit and implicit (kwargs)
parameters.
Previously, the methods looked like:
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
but now look like:
def download_item_assets(
self,
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
Similarly, the asset_io
package methods were previously:
async def download_item_assets(
item: Item,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
absolute_path: bool = False,
keep_original_filenames: bool = False,
**kwargs: Any,
) -> Item:
and are now:
async def download_item_assets(
item: Item,
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
) -> Item:
Additionally, kwargs
keys were set to pass configuration through to fsspec. The most
common parameter was requester_pays
, to set the Requester Pays flag in AWS S3
requests.
Many of these parameters can be directly translated into configuration passed in a
DownloadConfig
object, which is just a wrapper over the stac_asset.Config
object.
Migration of these various parameters to DownloadConfig
are as follows:
assets
: setinclude
requester_pays
: sets3_requester_pays
= Truekeep_original_filenames
: setfile_name_strategy
toFileNameStrategy.FILE_NAME
if True orFileNameStrategy.KEY
if Falseoverwrite
: setoverwrite
save_item
: none, Item is always savedabsolute_path
: none. To create or retrieve the Asset hrefs as absolute paths, use eitherItem#make_all_asset_hrefs_absolute()
orAsset#get_absolute_href()
0.5.x -> 0.6.0
Previously, the validate
method was a classmethod, validating the payload argument
passed. This has now been made an instance method, which validates the self._payload
copy of the payload, from which the Task
instance is constructed. This is
behaviorally the same, in that construction will fail if validation fails, but allows
implementers to utilize the instance method's convenience functions.
Previous implementations of validate
would have been similar to this:
@classmethod
def validate(payload: dict[str, Any]) -> bool:
# Check The Things™
return isinstance(payload, dict)
And will now need to be updated to this form:
def validate(self) -> bool:
# Check The Things™
return isinstance(self._payload, dict)
Development
Clone, install in editable mode with development and test requirements, and install the pre-commit hooks:
git clone https://github.com/stac-utils/stac-task
cd stac-task
pip install -e '.[dev,test]'
pre-commit install
To run the tests:
pytest
To lint all the files:
pre-commit run --all-files
Contributing
Use Github issues and pull requests.