Home

Awesome

Overview

Command line tool to fetch the dataflow job metrics, estimates job cost and stores the results in the output store (i.e bigquery or file system)

Commands

Supports multiple functionalities through following command types:

Command TypeDescription
COLLECT_METRICSFor complete details refer here
LAUNCH_AND_COLLECTFor complete details refer here

Output Stores

User can choose appropriate output store based on the command line options: output_type and output_location:

Output TypeOutput Location
BIGQUERYproject-id.dataset.table
FILE/path/to/location <br/>(Supports GCS and Local File Systems)

In order to store the metrics in bigquery, create the table using schema specified in scripts/metrics_bigquery_schema.json.

Sample commands for creating dataset and table are provided in scripts/bigquery.sh

Getting Started

Requirements

$ export GOOGLE_APPLICATION_CREDENTIALS=</path/to/application_default_credentials.json>
$ mvn spotless:apply
$ mvn clean package

BIGQUERY OUTPUT STORE</a>

Bigquery can be used to store the metrics collected by the tool. In order to store the metrics create the table in bigquery using schema specified

<a name="CollectMetrics">COLLECT_METRICS</a>

Collects the metrics for a given job id and stores the results in the output store.

Creating the configuration File

The configuration file provides project_id, region and job_id along with pricing (optional) of the job for which the metrics needs to be collected

Example configuration File

Below is an example configuration file. pricing section is optional. If pricing is skipped then estimated cost of the job will not be calculated.

{
  "project": "test-project",
  "region": "us-central1",
  "jobId": "2023-09-05_12_40_15-5693029202890139182",
  "pricing": {
    "vcpuPerHour": 0.056,
    "memoryGbPerHour": 0.003557,
    "pdGbPerHour": 0.000054,
    "ssdGbPerHour": 10,
    "shuffleDataPerGb": 0.011,
    "streamingDataPerGb": 0.005
  }
}

Execute the command

# To store results in BigQuery
$ java -jar /path/to/dataflow-metrics-exporter-${version}.jar --command COLLECT_METRICS --conf /path/to/config.json \
--output_type BIGQUERY --output_location projectid:datasetid.tableid

# To store results in GCS / local file system
$ java -jar /path/to/dataflow-metrics-exporter-${version}.jar --command COLLECT_METRICS --conf /path/to/config.json \
--output_type FILE --output_location /path/to/output/location

Sample Output

{
   "run_timestamp": "2023-09-18T22:50:43.419455Z",
   "pipeline_name": "word-count-benchmark-20230908224412968",
   "job_create_timestamp": "2023-09-08T22:44:14.475764Z",
   "job_type": "JOB_TYPE_BATCH",
   "sdk_version": "2.49.0",
   "sdk": "Apache Beam SDK for Java",
   "metrics": {
      "TotalDpuUsage": 0.050666635590718774,
      "TotalSeCuUsage": 0.0,
      "TotalStreamingDataProcessed": 0.0,
      "TotalDcuUsage": 50666.63559071875,
      "BillableShuffleDataProcessed": 1.603737473487854E-5,
      "TotalPdUsage": 2279.0,
      "TotalGpuTime": 0.0,
      "EstimatedJobCost": 0.007,
      "TotalVcpuTime": 182.0,
      "TotalShuffleDataProcessed": 6.414949893951416E-5,
      "TotalMemoryUsage": 747068.0,
      "TotalSsdUsage": 0.0
   }
}

<a name="LaunchCollect">LAUNCH_AND_COLLECT</a>

Launches either Dataflow classic or flex template and waits for the job to finish or time out (in case of streaming), collects the metrics and stores the results to Output Store

Creating the configuration File

The configuration file provides project_id, template name, template type, template spec and pipeline options

Example configuration File

Below is an example configuration file with minimal run time environment options.

{
  "project": "test-project",
  "region": "us-central1",
  "templateName": "SampleWordCount",
  "templateVersion": "1.0",
  "templateType": "classic",
  "templateSpec": "gs://dataflow-templates/latest/Word_Count",
  "jobPrefix": "WordCountBenchmark",
  "timeoutInMinutes" : 30,
  "pipelineOptions": {
    "inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt",
    "output": "gs://bucket-name/output/wordcount/"
  },
  "environmentOptions": {
    "tempLocation": "gs://bucket-name/temp/"
  }
}

Below is another example configuration file with additional run time environment options and pricing details.

{
  "project": "test-project",
  "region": "us-central1",
  "templateName": "SampleWordCount",
  "templateVersion": "1.0",
  "templateType": "classic",
  "templateSpec": "gs://dataflow-templates/latest/Word_Count",
  "jobPrefix": "WordCountBenchmark",
  "timeoutInMinutes" : 30,
  "pipelineOptions": {
    "inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt",
    "output": "gs://bucket-name/output/wordcount/"
  },
  "environmentOptions": {
    "numWorkers" : 2,
    "maxWorkers" : 10,
    "service_account_email" : "my-svc-acct@project-id.iam.gserviceaccount.com",
    "zone" : "us-central1-b",
    "autoscalingAlgorithm" : "AUTOSCALING_ALGORITHM_BASIC",
    "subnetwork": "https://www.googleapis.com/compute/v1/projects/test-project/regions/us-central1/subnetworks/default",
    "tempLocation": "gs://bucket-name/temp/",
    "ipConfiguration": "WORKER_IP_PRIVATE",
    "additionalExperiments": ["enable_prime"],
    "additionalUserLabels": {
      "topology":"pointtopoint",
      "dataformats":"text",
      "datasizecategory":"medium",
      "datasizeingb":"100"
    }
  },
  "pricing": {
    "vcpuPerHour": 0.056,
    "memoryGbPerHour": 0.003557,
    "pdGbPerHour": 0.000054,
    "ssdGbPerHour": 10,
    "shuffleDataPerGb": 0.011,
    "streamingDataPerGb": 0.005
  }
}

For complete list of run time environment options for various template types check below commands:

# For classic templates
$ gcloud dataflow jobs run --help

# For flex templates
$ gcloud dataflow flex-template run --help

Execute the command

# To store results in BigQuery
$ java -jar /path/to/dataflow-metrics-exporter-${version}.jar --command LAUNCH_AND_COLLECT --conf /path/to/config.json \
--output_type BIGQUERY --output_location projectid:datasetid.tableid

# To store results in GCS / local file system
$ java -jar /path/to/dataflow-metrics-exporter-${version}.jar --command LAUNCH_AND_COLLECT --conf /path/to/config.json \
--output_type FILE --output_location /path/to/output/location

Sample Output

{
  "run_timestamp": "2023-09-18T22:50:43.419455Z",
  "pipeline_name": "word-count-benchmark-20230908224412968",
  "job_create_timestamp": "2023-09-08T22:44:14.475764Z",
  "job_type": "JOB_TYPE_BATCH",
  "template_name": "SampleWordCount",
  "template_version": "1.0",
  "sdk_version": "2.49.0",
  "template_type": "classic",
  "sdk": "Apache Beam SDK for Java",
  "metrics": {
    "TotalDpuUsage": 0.050666635590718774,
    "TotalSeCuUsage": 0.0,
    "TotalStreamingDataProcessed": 0.0,
    "TotalDcuUsage": 50666.63559071875,
    "BillableShuffleDataProcessed": 1.603737473487854E-5,
    "TotalPdUsage": 2279.0,
    "TotalGpuTime": 0.0,
    "EstimatedJobCost": 0.045,
    "TotalVcpuTime": 182.0,
    "TotalShuffleDataProcessed": 6.414949893951416E-5,
    "TotalMemoryUsage": 747068.0,
    "TotalSsdUsage": 0.0
  },
  "parameters": {
    "output": "gs://bucket-name/output/wordcount/",
    "tempLocation": "gs://bucket-name/temp/",
    "additionalExperiments": "[enable_prime, workerMachineType=n1-standard-2, minNumWorkers=2]",
    "inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt",
    "maxWorkers": "10",
    "additionalUserLabels": "{sources=gcs, sinks=bigtable, topology=pointtopoint, dataformats=text, datasizecategory=medium, datasizeingb=100}",
    "subnetwork": "https://www.googleapis.com/compute/v1/projects/test-project/regions/us-central1/subnetworks/default",
    "numWorkers": "2"
  }
}

Output Metrics

MetricNameMetric Description
TotalVcpuTimeThe total vCPU seconds used by Dataflow job
TotalGpuTimeThe total proportion of time in which the GPU was used by Dataflow job
TotalMemoryUsageThe total GB seconds of memory allocated to Dataflow job
TotalPdUsageThe total GB seconds for all persistent disk used by all workers associated with Dataflow job
TotalSsdUsageThe total GB seconds for all SSD used by all workers associated with Dataflow job
TotalShuffleDataProcessedThe total bytes of shuffle data processed by Dataflow job
TotalStreamingDataProcessedThe total bytes of streaming data processed by Dataflow job
BillableShuffleDataProcessedThe billable bytes of shuffle data processed by Dataflow job
TotalDcuUsageThe total amount of DCUs (Data Compute Unit) used by the Dataflow job since it was launched.
TotalElapsedTimeSecTotal duration of the pipeline that job is in RUNNING_STATE in seconds
EstimatedJobCostEstimated cost of the dataflow job (if pricing info is provided). Not applicable for prime enabled jobs

For complete list of metrics refer Dataflow Monitoring Metrics

Cost Estimation

Job cost can be estimated by providing the resource pricing information inside the config file as shown below:

{
   "project": "test-project",
   "region": "us-central1",
   "templateName": "SampleWordCount",
   "pipelineOptions": {},
   "environmentOptions": {},
   "pricing": {
      "vcpuPerHour": 0.056,
      "memoryGbPerHour": 0.003557,
      "pdGbPerHour": 0.000054,
      "ssdGbPerHour": 10,
      "shuffleDataPerGb": 0.011,
      "streamingDataPerGb": 0.005
   }
}

Pricing for various resource by region can be obtained from Dataflow Pricing docs.

Note:

Contributing

Check CONTRIBUTING.md for details.

License

Apache 2.0; Check LICENSE for details.

Disclaimer

This project is not an official Google project. It is not supported by Google and disclaims all warranties as to its quality, merchantability, or fitness for a particular purpose.