The following table lists the Kafka Connect connector types that Managed Service for Apache Kafka supports. You can use these connectors to integrate Apache Kafka with your applications and with other Google Cloud services.
| Connector | Description | Use cases |
|---|---|---|
| MirrorMaker 2.0 | Replicates topics and data from one Kafka cluster to another Kafka cluster. | Data replication, disaster recovery, data migration |
| BigQuery Sink | Streams data from Kafka topics to a BigQuery table. | Data warehousing, analytics |
| Cloud Storage Sink | Streams data from Kafka topics to a Cloud Storage bucket. | Data lake ingestion, data archiving |
| Pub/Sub Sink | Streams data from Kafka topics to a Pub/Sub topic. | Service integration, real-time notifications |
| Pub/Sub Source | Streams messages from a Pub/Sub subscription to a Kafka topic. | Real-time data ingestion, event-driven architectures |
Converters
Converters are responsible for serialization and deserialization of Kafka record data. They translate between the raw byte format found on Kafka topics and the internal, structured data representation used by Kafka Connect.
For sink connectors, converters deserialize data from the topic's wire format to the Kafka Connect internal data format, which the connector uses to write to the target system.
For source connectors, converters serialize the data from the Kafka Connect internal data format to the specified wire format for the Kafka topic.
Converters ensure that the connector reads or writes the Kafka records in a format that is compatible with the external system.
When you configure a connector, set the following properties:
Key converter (
key.converter): The converter to use when serializing and deserializing Kafka record keys.Value converter (
value.converter): The converter to use when serializing and deserializing Kafka record values.
If you don't specify a converter, the default converter type is
org.apache.kafka.connect.converters.ByteArrayConverter, which passes
the data through in its raw byte format.
Supported converters
Managed Service for Apache Kafka supports the following built-in converters:
| Converter | Format |
|---|---|
io.confluent.connect.avro.AvroConverter |
Apache Avro |
org.apache.kafka.connect.converters.BooleanConverter |
Boolean |
org.apache.kafka.connect.converters.ByteArrayConverter |
Byte array The default converter type. Preserves the exact content of messages across two systems. |
org.apache.kafka.connect.converters.DoubleConverter |
Double |
org.apache.kafka.connect.converters.FloatConverter |
Float |
org.apache.kafka.connect.converters.IntegerConverter |
Integer |
org.apache.kafka.connect.json.JsonConverter |
JSON For JSON data without a schema, also set
|
org.apache.kafka.connect.converters.LongConverter |
Long |
org.apache.kafka.connect.converters.ShortConverter |
Short |
org.apache.kafka.connect.storage.StringConverter |
String |
The choice of converter depends on the connector type and the data that you are storing in Kafka. For more information, see the documentation for the specific connector.
Tasks
A connector transfers data by creating one or more tasks that operate in
parallel. To set an upper limit on how many tasks a connector creates, set
the connector's tasks.max configuration property. The connector might create
fewer tasks than this value.
Increasing the value of tasks.max can improve throughput, but also increase
resource consumption (CPU and memory). The optimal value depends on the workload
and the resources allocated to your Connect cluster workers. For sink
connectors, the number of Kafka topic partitions can also affect parallelism.
Task restart policy
You can set a connector's task restart policy, which determines the behavior when a failure occurs. Connectors support the following policies:
Never restart. The connector does not restart failed tasks. This policy is the default behavior. It's useful for debugging, or in situations where manual intervention is required after an error.
Restart with exponential backoff. The connector restarts a failed task after a delay (called the backoff period). The delay increases exponentially with each subsequent failure. This policy is recommended for most production workloads.
If you use the exponential backoff policy, also set values for the minimum and maximum backoff. The minimum backoff should be greater than 60 seconds, and the maximum backoff should be less than 7200 seconds.
Transformations and predicates
Managed Service for Apache Kafka supports the default Kafka Connect transformations and predicates.
Transformations let you modify individual messages before they are sent to Managed Service for Apache Kafka (for source connectors) or to the external system (for sink connectors). You might use a transformation to mask sensitive data, add timestamps, or rename fields.
Predicates let you filter data based on specific conditions, determining which messages a transform applies to based on message properties.
For example, to configure a sink connector to disregard messages that contain a
DoNotProcess header key, add the following configuration:
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
This configuration does the following:
Configures a predicate named
hasKeyof typeorg.apache.kafka.connect.transforms.predicates.HasHeaderKey. This predicate matches all messages that contain a header with the keyDoNotProcess.Configures a transformation named
dropMessageof typeorg.apache.kafka.connect.transforms.Filter. This transformation drops all messages that match the configured predicate.Links the transformation to the predicate
hasKey. This ensures that only messages with theDoNotProcessheader key present are dropped by the transformation.