Kafka Connector

The Kafka connector sends events to an Apache Kafka event streaming platform.

You must create a Kafka event streaming environment. For information about configuring the Apache Kafka event streaming cluster, see the Apache Kafka documentation.

Kafka Connector Configuration Properties lists the Kafka connector configuration properties:
Table 1. Kafka Connector Configuration Properties
Property Description
connector This property identifies the Kafka connector, that is, kafka-connector_filepath.
topic This property specifies the Kafka topic to which the Event Streaming Framework sends events. By default, the connector uses this setting to send events to one or more topics. Each local topic can override this global property to send events by specifying a topic in the filter property.
key (Optional) The string identifying the Kafka topic key to which the Event Streaming Framework sends events.
Note: If a valid attribute cannot be found for a specified key name, the default key of EventId, which is the current key for all events, is used.
partition (Optional) The positive integer (for example, 1) identifying the Kafka topic partition to which the Event Streaming Framework sends events.
Note: The partition must be a positive integer. When it is replaced with an event attribute, the attribute value must also be a positive integer. If no partition is specified, but a key is specified, the partition is determined by using a hash of the key. If no key nor partition is specified, the partition is assigned in round-robin fashion.
format This property specifies the data type to stream to Kafka. The options are:
  • raw — (Default) Compact MDC format.
  • json — JavaScript Object Notation format. This option can be used with the filter option, which enables you to exclude specific fields.
  • openapi-json — JavaScript Object Notation format.

If this field is not specified, the default format of the compact MDC data format is used. For details about the difference between JSON and OpenAPI JSON format, see the discussion about Event Streaming Framework connectors.

enableCallback The default value is false. If this property is set to false, you do not receive delivery status callback messages. A value of true enables the callback feature, which logs an informational message for each successfully published event and an error message for each unsuccessful try to publish an event.
filter This property is supported for JSON output format and enables you to create lists of allowed or blocked object fields to include or exclude from the information sent to Kafka.
Note: If you use the include option to filter, it must define key and value attributes.
settings This property accepts the Kafka producer configuration settings defined by Apache Kafka. For details about these settings and their options, see the discussion about producer configurations in the Apache Kafka documentation. You must specify the settings in the following list unless they are marked as optional. MATRIXX Support recommends that you also use the optional settings listed here, and you can use any of the other Kafka producer configuration settings that your environment requires:
  • bootstrap.servers — The IP address and port for the Kafka cluster.
  • acks — (Optional) An indicator to Kafka of how events are acknowledged. MATRIXX tests with the default value (all).
  • retries — (Optional) An indicator of how many times the connector retries sending events. MATRIXX tests with the default value (0).
  • value.serializer — A setting for specifying encoding (serializing) and decoding (de-serializing) for events to and from Kafka. Set this to com.matrixx.kafka.eventStore.DataContainerSerdes for raw MDC data or org.apache.kafka.common.serialization.StringSerializer for JSON format.
  • key.serializer — A setting for encoding (serializing) and decoding (de-serializing) for events to and from Kafka. Set the value to:
    org.apache.kafka.common.serialization.StringSerializer
failed_delivery_rate:percentage

(Optional) If not defined, the Event Streaming Framework does not capture failed events, so they are not available for re-streaming.

If this setting is defined, the Event Streaming Framework calculates the percentage of failed events in a batch.

If the percentage of failed events exceeds this percentage, the Event Streaming Framework tries to process that batch of events again. The Event Streaming Framework continues reprocessing the failed batch until it succeeds and then goes to the next batch.

If the percentage of failed events does not exceed the percentage you set, the Event Streaming Framework logs any failed events in either the default directory or one you set with the failed_events_dir property and then processes the next batch. You can then reprocess the files manually. For information about reprocessing events, see the discussion about reprocessing failed events.

For example:
failed_delivery_rate: 95%

MATRIXX Support recommends that you set the failed_delivery_rate relatively high, such as 95%.

To change the default delay between processing a batch of events that failed, use the reprocessBatchInterval sub-domain property.

failed_events_dir:directory_path

(Optional) Specifies a directory for storage of failed events. The default is /var/mtx/events. Confirm that the directory you specify exists. For example: failed_events_dir:/home/text/logs. For more information, see the discussion about reprocessing failed events.

Kafka YAML Configuration

The following example code defines a simple Kafka stream configuration:
# Simple Kafka Example
[StreamName]:
  filter: |
    [filter definition]
  config: |
    connector: kafka-connector
    topic: allevents_mdc
    format: raw
    settings:
      bootstrap.servers: 10.10.96.51:29092
      acks: all
      retries: 0
      value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes
      key.serializer: org.apache.kafka.common.serialization.StringSerializer

Other Kafka Connector Examples

This Kafka connector example specifies the JSON output format, enables delivery status callback messages, and excludes the DeleteCode and EventId fields:
[StreamName]:
  filter: |
    [filter definition]
  config: |
    connector: kafka-connector
    topic: allevents
    format: json
    filter:
      exclude:
      - DeleteCode
      - EventId
    enableCallback: "true"
    settings:
      bootstrap.servers: 10.10.96.51:29092
      acks: all
      retries: 0
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
The following example allows a batch of events streamed to Kafka to be zipped. Kafka can set the compression type at the producer (producer end) or topic (broker end). Because the Event Streaming Framework only controls the Kafka producer, this example shows you how to set the compression type on the producer by configuring changes in mtx_event_streamer.yaml. Use the compression.type property to specify a valid compression codec. You can select gzip, Snappy, lz4, or zstd, each of which have varying compression speeds. Refer to the Apache Kafka documentation for more details about the compression type.
connector: kafka-connector
topic: allevents_include
format: json
settings:
  compression.type: gzip
  batch.size: 32000
  bootstrap.servers: localhost:9092
  acks: all
  retries: 0
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  key.serializer: org.apache.kafka.common.serialization.StringSerializer

If you enable compression on producers, you might notice reduced producer throughput and a lower compression rate on the broker sometimes. When receiving compressed messages, 0.10.0 brokers avoid compressing the messages again, which reduces the latency and improves the throughput. In certain cases, this can reduce the batching size on the producer, which can lead to worse throughput. If this happens, you can tune the Kafka linger.ms utility and the batch.size of the producer for better throughput. Also, the producer buffer for compressing messages with Snappy is smaller than the one used by the broker. This can have a negative impact on the compression ratio for the messages on the disk.

The following example allows a service provider ID (tenant ID) and event type to be used to determine the Kafka topic. This example specifies default values for topic (ABC Company), key (0), and partition (1). The attribute for the topic, key, or partition must be a top-level, simple attribute (for example, TenantId), and not a struct or array attribute. You can use ${event_attribute_name}, (for example, TenantId), to define a topic, key, or partition. Only the topic attribute is required, and partition and key are optional. The partition must be a positive integer, so when it is replaced with an event attribute, the attribute value must also be a positive integer.
connector: kafka-connector
topic: Tenant_${TenantId:ABC Company}_${EventType}
key: key_${attribute_x:0}
partition: {attribute_y:1}
format: json
settings:
  bootstrap.servers: localhost:9092
  acks: all
  retries: 0
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
Based on the attributes defined in the properties file, the Event Streaming Framework builds producer records. You can decide how to set the partition and key to ensure efficient distribution in the Kafka broker:
  • If you specify a valid partition number, the associated partition is used to build the producer record.
  • If you do not specify a partition, but you specify a key, a partition is chosen using a hash of the key.
CAUTION: If you specify a valid partition, make sure the created Kafka topic also has the correct partition setup. For more information, see the Apache Kafka documentation. The Kafka producer throughput drops with more partitions, so set the partition value with care. Refer to the Apache Kafka documentation for more details.

If an error occurs while processing a topic name, a default topic, DefaultTopic, is used as the topic. If an error occurs while processing a partition or key, the related value(s) are considered invalid and are not passed to build the Kafka producer record.

The following example allows a service provider ID (tenant ID) and event type, and, for the NotificationDone event, the notification type to be used to determine the Kafka record header using a MessageHeaderMap array. Each element in this array defines the Kafka record headers for each event type.
Note: If no event attribute is found, this header is not added to the Kafka message.

Also, when using attribute values as part of a Kafka topic, make sure they do not contain invalid characters. The only allowed characters are digits, letters (uppercase or lowercase), periods, hyphens, and underscores ("_" character). Refer to the Apache Kafka documentation for more information.

If no valid attribute is found for the topic, the entire string of the topic name defined in the properties is used as the default topic. For example, if the topic name is topic : Tenant_${TenantID}_${EventType}, but the Event Streaming Framework cannot find the tenant ID field from the events, then Tenant_${TenantID}_${EventType} is used as the topic name. Also, because a valid attribute cannot be found for the key name in this example, the default key of EventId, which is the current key for all events, is used.

The following example defines two types of events:
  • For any MtxNotificationDone event, two custom headers are added:
    • EventType with the value MtxNotificationDoneEvent.
    • NotificationType with the value of the $NotificationType attribute from the event data.
  • For any MtxPurchase event, two custom headers are added:
    • EventType with the value MtxPurchaseEvent.
    • Info with the value of the Info attribute from the event data.
connector: kafka-connector
topic: allevents_second
format: json
MessageHeaderMap:
  MtxNotificationDoneEvent:
    Headers:
    - Name: EventType
      Value: MtxNotificationDoneEvent
    - Name: NotificationType
      Value: $NotificationType
  MtxPurchaseEvent:
    Headers:
    - Name: EventType
      Value: MtxPurchaseEvent
    - Name: Info
      Value: $Info
settings:
  bootstrap.servers: localhost:9092
  acks: all
  retries: 0
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  key.serializer: org.apache.kafka.common.serialization.StringSerializer