Cloud Pub/Sub to Cloud Storage template
Use the Managed Service for Apache Spark Cloud Pub/Sub to Cloud Storage template to extract data from Pub/Sub to Cloud Storage.
Use the template
Run the template using the gcloud CLI or Managed Service for Apache Spark API.
gcloud
Before using any of the command data below, make the following replacements:
- PROJECT_ID: Required. Your Google Cloud project ID listed in the IAM Settings.
- REGION: Required. Compute Engine region.
- SUBNET: Optional. If a subnet is not specified, the subnet
in the specified REGION in the
defaultnetwork is selected.Example:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME - TEMPLATE_VERSION: Required. Specify
latestfor the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binariesto list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID: Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
- SUBSCRIPTION: Required. Pub/Sub subscription name.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Required. Cloud Storage bucket name where output will be stored.
Note: The output files will be stored in the
output/folder inside the bucket. - FORMAT: Required. Output data format. Options:
avroorjson.Note: If
avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to thejarsgcloud CLI flag or API field.Example (the
file://prefix references a Managed Service for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,[ ... other jars] - TIMEOUT: Optional. Time in milliseconds before termination of stream. Defaults to 60000.
- DURATION: Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds.
- NUM_RECEIVERS: Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5.
- BATCHSIZE: Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000.
- SERVICE_ACCOUNT: Optional. If not provided, the default Compute Engine service account is used.
- PROPERTY and PROPERTY_VALUE:
Optional. Comma-separated list of
Spark property=
valuepairs. - LABEL and LABEL_VALUE:
Optional. Comma-separated list of
label=valuepairs. - LOG_LEVEL: Optional. Level of logging. Can be one of
ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE, orWARN. Default:INFO. -
KMS_KEY: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.
Example:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Execute the following command:
Linux, macOS, or Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.2" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.2" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.2" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Before using any of the request data, make the following replacements:
- PROJECT_ID: Required. Your Google Cloud project ID listed in the IAM Settings.
- REGION: Required. Compute Engine region.
- SUBNET: Optional. If a subnet is not specified, the subnet
in the specified REGION in the
defaultnetwork is selected.Example:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME - TEMPLATE_VERSION: Required. Specify
latestfor the latest template version, or the date of a specific version, for example,2023-03-17_v0.1.0-beta(visit gs://dataproc-templates-binaries or rungcloud storage ls gs://dataproc-templates-binariesto list available template versions). - PUBSUB_SUBSCRIPTION_PROJECT_ID: Required. The Google Cloud project ID listed in the IAM Settings that contains the input Pub/Sub subscription to be read.
- SUBSCRIPTION: Required. Pub/Sub subscription name.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Required. Cloud Storage bucket name where output will be stored.
Note: The output files will be stored in the
output/folder inside the bucket. - FORMAT: Required. Output data format. Options:
avroorjson.Note: If
avro, you must add "file:///usr/lib/spark/connector/spark-avro.jar" to thejarsgcloud CLI flag or API field.Example (the
file://prefix references a Managed Service for Apache Spark jar file):--jars=file:///usr/lib/spark/connector/spark-avro.jar,[ ... other jars] - TIMEOUT: Optional. Time in milliseconds before termination of stream. Defaults to 60000.
- DURATION: Optional. Frequency in seconds of writes to Cloud Storage. Defaults to 15 seconds.
- NUM_RECEIVERS: Optional. Number of streams read from a Pub/Sub subscription in parallel. Defaults to 5.
- BATCHSIZE: Optional. Number of records to insert in one round trip into Cloud Storage. Defaults to 1000.
- SERVICE_ACCOUNT: Optional. If not provided, the default Compute Engine service account is used.
- PROPERTY and PROPERTY_VALUE:
Optional. Comma-separated list of
Spark property=
valuepairs. - LABEL and LABEL_VALUE:
Optional. Comma-separated list of
label=valuepairs. - LOG_LEVEL: Optional. Level of logging. Can be one of
ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE, orWARN. Default:INFO. -
KMS_KEY: Optional. The Cloud Key Management Service key to use for encryption. If a key is not specified, data is encrypted at rest using a Google-owned and Google-managed encryption key.
Example:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
HTTP method and URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
Request JSON body:
{
"environmentConfig":{
"executionConfig":{
"subnetworkUri":"SUBNET",
"kmsKey": "KMS_KEY",
"serviceAccount": "SERVICE_ACCOUNT"
}
},
"labels": {
"LABEL": "LABEL_VALUE"
},
"runtimeConfig": {
"version": "1.2",
"properties": {
"PROPERTY": "PROPERTY_VALUE"
}
},
"sparkBatch":{
"mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
"args":[
"--template","PUBSUBTOGCS",
"--templateProperty","log.level=LOG_LEVEL",
"--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID",
"--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION",
"--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME",
"--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT",
"--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT",
"--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION",
"--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS",
"--templateProperty","pubsubtogcs.batch.size=BATCHSIZE"
],
"jarFileUris":[
"file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar"
]
}
}
To send your request, expand one of these options:
You should receive a JSON response similar to the following:
{
"name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID",
"metadata": {
"@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
"batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID",
"batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
"createTime": "2023-02-24T03:31:03.440329Z",
"operationType": "BATCH",
"description": "Batch"
}
}