Kafka Connector
The Kafka connector sends events to an Apache Kafka event streaming platform.
You must create a Kafka event streaming environment. For information about configuring the Apache Kafka event streaming cluster, see the Apache Kafka documentation.
Property | Description |
---|---|
connector | This property identifies the Kafka connector, that is, kafka-connector_filepath. |
topic | This property specifies the Kafka topic to which the Event Streaming Framework sends
events. By default, the connector uses this setting to send events to one or more topics. Each local topic can override this global property to send events by
specifying a topic in the filter property. |
key | (Optional) The string identifying the Kafka topic key to which the Event Streaming Framework sends events. Note: If a valid attribute
cannot be found for a specified key name, the default key of EventId , which is the current key for all events, is used. |
partition | (Optional) The positive integer (for example, 1) identifying the Kafka topic partition to which the Event Streaming Framework sends events. Note: The partition must be
a positive integer. When it is replaced with an event attribute, the attribute value must also be a positive integer. If no partition is specified, but a key is
specified, the partition is determined by using a hash of the key. If no key nor partition is specified, the partition is assigned in round-robin
fashion. |
format | This property specifies the data type to stream to Kafka. The options are:
If this field is not specified, the default format of the compact MDC data format is used. For details about the difference between JSON and OpenAPI JSON format, see the discussion about Event Streaming Framework connectors. |
enableCallback | The default value is false . If this property is set to false , you do not receive delivery status callback messages. A value of
true enables the callback feature, which logs an informational message for each successfully published event and an error message for each
unsuccessful try to publish an event. |
filter | This property is supported for JSON output format and enables you to create lists of allowed or blocked object fields to include or exclude from the information
sent to Kafka. Note: If you use the include option to
filter , it must define key and value attributes. |
settings | This property accepts the Kafka producer configuration settings defined by Apache Kafka. For details about these settings and their options, see the discussion
about producer configurations in the Apache Kafka documentation. You must specify the settings in the following list unless they are marked as optional. MATRIXX
Support recommends that you also use the optional settings listed here, and you can use any of the other Kafka producer configuration settings that your environment
requires:
|
failed_delivery_rate:percentage |
(Optional) If not defined, the Event Streaming Framework does not capture failed events, so they are not available for re-streaming. If this setting is defined, the Event Streaming Framework calculates the percentage of failed events in a batch. If the percentage of failed events exceeds this percentage, the Event Streaming Framework tries to process that batch of events again. The Event Streaming Framework continues reprocessing the failed batch until it succeeds and then goes to the next batch. If the percentage of failed events does not exceed the percentage you set, the Event Streaming Framework logs any failed events in either the default directory or one you set with the failed_events_dir property and then processes the next batch. You can then reprocess the files manually. For information about reprocessing events, see the discussion about reprocessing failed events. For example:
MATRIXX Support recommends that you set the
To change the default delay between processing a batch of events that
failed, use the |
failed_events_dir:directory_path |
(Optional) Specifies a directory for storage of failed events. The default is /var/mtx/events. Confirm that the directory you specify exists. For example: failed_events_dir:/home/text/logs. For more information, see the discussion about reprocessing failed events. |
Kafka YAML Configuration
# Simple Kafka Example
[StreamName]:
filter: |
[filter definition]
config: |
connector: kafka-connector
topic: allevents_mdc
format: raw
settings:
bootstrap.servers: 10.10.96.51:29092
acks: all
retries: 0
value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes
key.serializer: org.apache.kafka.common.serialization.StringSerializer
Other Kafka Connector Examples
[StreamName]:
filter: |
[filter definition]
config: |
connector: kafka-connector
topic: allevents
format: json
filter:
exclude:
- DeleteCode
- EventId
enableCallback: "true"
settings:
bootstrap.servers: 10.10.96.51:29092
acks: all
retries: 0
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
compression.type
property to specify a valid compression codec.
You can select gzip, Snappy, lz4, or zstd, each of which have varying compression
speeds. Refer to the Apache Kafka documentation for more details about the
compression
type.connector: kafka-connector
topic: allevents_include
format: json
settings:
compression.type: gzip
batch.size: 32000
bootstrap.servers: localhost:9092
acks: all
retries: 0
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
If you enable compression on producers, you might notice reduced producer throughput and a lower compression rate on the broker sometimes. When receiving compressed messages, 0.10.0 brokers
avoid compressing the messages again, which reduces the latency and improves the throughput. In certain cases, this can reduce the batching size on the producer, which can lead to
worse throughput. If this happens, you can tune the Kafka linger.ms utility and the batch.size
of the producer for better throughput. Also, the
producer buffer for compressing messages with Snappy is smaller than the one used by the broker. This can have a negative impact on the compression ratio for the messages on the
disk.
topic
(ABC Company
), key
(0
), and partition
(1
). The
attribute for the topic, key, or partition must be a top-level, simple attribute
(for example, TenantId
), and not a struct
or array
attribute. You can use ${event_attribute_name}
, (for example,
TenantId
), to define a topic, key, or partition. Only the
topic
attribute is required, and partition
and
key
are optional. The partition
must be a
positive integer, so when it is replaced with an event attribute, the attribute
value must also be a positive
integer.connector: kafka-connector
topic: Tenant_${TenantId:ABC Company}_${EventType}
key: key_${attribute_x:0}
partition: {attribute_y:1}
format: json
settings:
bootstrap.servers: localhost:9092
acks: all
retries: 0
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
- If you specify a valid partition number, the associated partition is used to build the producer record.
- If you do not specify a partition, but you specify a key, a partition is chosen using a hash of the key.
If an error occurs while processing a topic name, a default topic,
DefaultTopic
, is used as the topic. If an error occurs while
processing a partition or key, the related value(s) are considered invalid and are
not passed to build the Kafka producer record.
MessageHeaderMap
array. Each element in this array defines the Kafka record headers for each event type. Also, when using attribute values as part of a Kafka topic, make sure they do not contain invalid characters. The only allowed characters are digits, letters (uppercase or lowercase), periods, hyphens, and underscores ("_" character). Refer to the Apache Kafka documentation for more information.
If no valid attribute is found for the topic, the entire string of the topic name defined in the properties is used as the default topic. For example, if the
topic name is topic : Tenant_${TenantID}_${EventType}
, but the Event Streaming Framework cannot find the tenant ID field from the events, then
Tenant_${TenantID}_${EventType}
is used as the topic name. Also, because a valid attribute cannot be found for
the key name in this example, the default key of EventId, which is the current key for all events, is used.
- For any MtxNotificationDone event, two custom headers are added:
EventType
with the valueMtxNotificationDoneEvent
.NotificationType
with the value of the$NotificationType
attribute from the event data.
- For any MtxPurchase event, two custom headers are added:
EventType
with the valueMtxPurchaseEvent
.Info
with the value of theInfo
attribute from the event data.
connector: kafka-connector
topic: allevents_second
format: json
MessageHeaderMap:
MtxNotificationDoneEvent:
Headers:
- Name: EventType
Value: MtxNotificationDoneEvent
- Name: NotificationType
Value: $NotificationType
MtxPurchaseEvent:
Headers:
- Name: EventType
Value: MtxPurchaseEvent
- Name: Info
Value: $Info
settings:
bootstrap.servers: localhost:9092
acks: all
retries: 0
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer