Event Streaming Framework Properties and Examples
Configure the Event Streaming Framework by using the mtx_event_streamer.yaml file. These properties require that a stream processing framework (such as Kafka) is already configured to accept events from the Event Streaming Framework. The mtx_event_streamer.yaml file also includes examples to use as templates for your own event streaming implementation.
Configuring the Event Streaming Properties
processingThreads: 16
. .properties
file. Starting in release 5210, the default configuration file is mtx_event_streamer.yaml, although external JSON and
.properties
files are still supported. MATRIXX Support recommends that you transition to YAML.You can also define sets of YAML parameters as separate files. If the YAML instructions are brief, add them to the mtx_event_streamer.yaml file.
Many of the default examples referenced in mtx_event_streamer.yaml are contained in these YAML files in the /opt/mtx/conf directory.
Configuring the Global Configuration Properties
Property | Description |
---|---|
processingThreads | (Optional) MATRIXX Support recommends that you do not change this property. This property defines the number of concurrent processing threads dedicated to processing event records and sending them to the stream processing framework. This value must not be greater than the number of streams you configure. If the value is greater than the number of streams, an error occurs. If this value is set to 0, the Event Streaming Framework determines the number of unique sub-domain streams configured and uses this number for the size of the thread pool. Setting this value lower than the number of defined streams caps the maximum number of threads used to process those streams and ensures that not all processors are used. |
taskLoopSleepInterval | (Optional) This property sets how often the main task loop executes in the Event Streaming Framework. The default is 500 ms, which is appropriate for most MATRIXX environments. If your Event Streaming Framework pod consistently shows excessive usage, increasing this value can help. |
rsgateway | This property identifies an RS Gateway to connect to. This property includes the following
sub-properties:
Example:
|
schemaVersion | This property is only used to upgrade to a new release of MATRIXX. For details, see MATRIXX Installation and Upgrade. |
extensionVersion | This property is used only to upgrade to a new release of MATRIXX. For details, see MATRIXX Installation and Upgrade. |
checkSchemaVersion | (Optional) If this property is set to true , Event Streaming Framework tries to contact RS Gateway to update its schema version information if it detects an event that has a
later schema version than what the Event Streaming Framework knows.This property must be set in the GLOBAL Configuration Settings section of the mtx_event_streamer.yaml file, not per sub-domain. |
forceSchemaPinning | If this property is set to true , Event Streaming Framework uses schemaVersion and
extensionVersion values from the mtx_event_streamer.yaml configuration file. If the value is false , Event Streaming Framework uses auto-pinning. This means that Event Streaming Framework makes an RS Gateway call to get the schema version and extension version values from MATRIXX Engine instead. The default value is false . |
Configuring Streams
Event Streaming Framework Stream Configuration Properties describes the Event Streaming Framework stream configuration properties, including the connectors they use. You must define at least one stream. Not all properties are available to all connectors. For information about configuring the connectors, see the discussion about the Event Streaming Framework connectors.
multi
option for the config:
property enables you to stream the same events to multiple locations. For details,
see the discussion about multi-connectors.Property | Description |
---|---|
streams | This property is a list of streams, each one separated by a space. For example, a stream that includes all events can be named all , and a stream that includes only
purchase events can be named purchases . Use this
syntax:
|
filter | Each stream uses at least one associated filter to identify the events (MDC objects) to include in the stream. Each filter is identified by a filter property. This
example filter for the all_events stream includes all events and their
descendants:
Specify multiple streams for each MDC. Some connectors enable you to filter by object field. |
config | Each stream has an associated configuration defined by
config . The configuration property must
include the name of the connector that receives the event
stream. Supported connectors include:
For more information about the individual connectors, see the discussion about Event Streaming Framework connectors. The following parameters can also be configured for each stream for reprocessing failed events:
|
Configuring Sub-domains (Producers)
Event Streaming Framework Sub-domain Configuration Properties lists the Event Streaming Framework sub-domain configuration properties. Sub-domains are the event streaming producers, and at least one sub-domain definition is required. The default definition works for MATRIXX environments with a single engine chain. If your environment uses sub-domain routing, you must define each of your sub-domains.
Each sub-domain has a streaming server, and each sub-domain can have different streams or the same streams. For example, you can separate streams of events from sub-domain A for prepaid subscribers and sub-domain B for postpaid subscribers. You can also balance subscribers with multiple sub-domains using the same streams.
Property | Description |
---|---|
subdomains | This property specifies a sub-domain that a grouping of event filters and configuration settings to use for an event stream. Other properties use subdomains: to
configure information about the sub-domain. Each sub-domain to which the Event Streaming Framework connects must be configured. Each stream type has a single
connection defined. For example, for two streams, all and purchases , two sockets are opened in the sub-domain. This example
specifies a sub-domain called metxtesting that uses a separate mtxtestingSubdomainConfig.yaml configuration file on
localhost using port 4100 with a timeout limit of 6 seconds:
|
host | This property specifies the name of the Event Streaming VIP in the tra_config_network_topology.xml file. In this configuration file fragment, the name is
vipExtern :
Note: In test environments, you can use the IP address of the MATRIXX Engine publishing pod. |
port | This property specifies the port used by the TRA-RT-(SI/DR) virtual server (that uses
the evs protocol) in the
tra_config_network_topology.xml file.
The default value is 4100 as shown in this file
fragment:
|
timeout | This property sets the time limit that the Event Streaming Framework waits for the Event Stream Server to return information from streams provided by the sub-domain. You can override this value on a per-stream basis adding the timeout: parameter to config:. This property has no default value. The value represents milliseconds. |
types | This property specifies which streams are opened in the sub-domain. The names entered
in types: must match the name of a stream
defined in streams . All streams to be accessed
in the sub-domain must be listed with spaces or any other
punctuation character separating each stream name.
Example:
|
config | Each sub-domain must have a JSON or YAML configuration
definition that defines parameters needed to communicate with
the sub-domain. Parameters include:
|
Example mtx_event_streamer.yaml File
The following code is a Event Streaming Framework mtx_event_streamer.yaml configuration file. The upgrade properties are omitted:#
# GLOBAL Configuration Settings
#
---
# The number of processing threads dedicated to processing event records
# and sending them to kafka. If this value is set to 0 or not set, then
# MATRIXX computes the number of unique subdomain streams you have configured
# and uses them for the size of this thread pool
processingThreads: 16
# The sleep interval used in event streaming main program. Changing this
# will control the frequency of main task loops and CPU usage. The unit of
# interval is ms. Default value is 500ms
taskLoopSleepInterval: 500
#
# rsgateway connection, uncomment out for persistent session cursors
rsgateway:
url: http://localhost:8080/rsgateway
login: streamer
password: str34m3r
##############################################################################
# Configure known subdomains
# This section is used to define all of the known subdomains that will
# participate in event streaming.
#
# "subdomains" is a space seperated list of subdomain names and each listed
# subdomain will have a corresponding section below:
#
subdomains: RTID1 RTID2
#
# RTID1 : this subdomain will stream the events defined in "kafkaAllMdcStream"
#
RTID1: # name this subdomain
# address of tra (or engine if no TRA)
host: 10.105.57.187
# port to open
port: 4100
# timeout before considering stream dead
timeout: 60000
# space seperated list of stream names. Each of these names must be
# defined in the "Configure All Streams" section
types: kafkaAllMdcStream
# configuration block for this subdomain
config: |
# route to send to the TRA
route: RTID1
#
# specific overrides for the stream kafkaAllMdcStream stream can be
# listed here. These values apply to the connection between ESF and
# the stream server that is being used to stream events from from this
# subdomain (RTID1) for this stream (kafkaAllMdcStream). Each such stream
# (if there were more than one) can be individually controlled so that the
# priority of each stream is indepependant of all other streams
#
kafkaAllMdcStream:
# sleep delay after we have rcvd events
delay: 0
# sleep when we got back 0 events from SS
sleep: 5000
# timeout for requests to the SS
timeout: 5000
# time between reconnect attempts
reconnect: 60000
#
# RTID2 : this subdomain will stream the events defined in
# "kafkaPurchaseJsonStream"
#
RTID2:
# address of tra (or engine if no TRA)
host: 10.105.57.187
# port to open
port: 4100
# timeout before considering stream dead
timeout: 60000
# space seperated list of stream names. Each of these names must be
# defined in the "Configure All Streams" section
types: kafkaPurchaseJsonStream
config: |
# route to send to the TRA
route: RTID2
#
# specific overrides for the stream kafkaPurchaseJsonStream stream can be
# listed here. These values apply to the connection between ESF and
# the stream server that is being used to stream events from from this
# subdomain (RTID2) for this stream (kafkaPurchaseJsonStream). Each such stream
# (if there were more than one) can be individually controlled so that the
# priority of each stream is indepependant of all other streams
#
kafkaPurchaseJsonStream:
# sleep delay after we have rcvd events
delay: 0
# sleep when we got back 0 events from SS
sleep: 5000
# timeout for requests to the SS
timeout: 5000
# time between reconnect attempts
reconnect: 60000
##############################################################################
# Configure All Streams
#
# This section is used to configure all the known types of streams. All
# streams listed in the "types" entry for a subdomain (above) must be listed
# here as well.
#
#
# A space seperated list of all named stream definitions. Each entry here
# must have a corresponding section with the name of the stream below
#
streams: kafkaAllMdcStream kafkaPurchaseJsonStream
# Define "kafkaAllMdcStream"
kafkaAllMdcStream:
#
# this section defines what sort of events should be inlucded in the stream.
# It's syntax is that of a Matrixx MDC in RAW format:
#
filter: |
# required, the name of the MDC: MtxEventStreamFilter
$: MtxEventStreamFilter
# required, this array field, IncludeEventArray should have an MDC
# entry for each type of event. An event type is based on the MDC container
# name
IncludeEventArray:
# Each array element should be of type "MtxEventFilterSpec"
- $: MtxEventFilterSpec
# we want MtxEvent objects
EventTypeName: MtxEvent
# setting IncludeDescendants to "true" indicates that we want
# any and all events whose MDC container is derived from the name
# listed in "EventTypeName"
IncludeDescendants: "true"
#
# this section is used to tell ESF where these events should be delivered
#
config: |
# the connector to use for these events
connector: kafka-connector
# the topic (specific to kafka-connector) where events should be deliverd
topic: allevents_mdc
# the format these events should be delivered in
format: raw
# settings (specific to kafka-connector)
settings:
# here to find out about the kafka connector
bootstrap.servers: 10.10.96.51:29092
# how the kafka-connector should deal with acknowledgements
acks: all
# number of retries for the connector
retries: 0
# value serializer to use (must be appropriate for the "format")
value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes
# key serializer (must be appropriate for key, usually StringSerializer)
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# Define "kafkaPurchaseJsonStream"
kafkaPurchaseJsonStream:
#
# this section defines what sort of events should be inlucded in the stream.
# It's syntax is that of a Matrixx MDC in JSON format:
#
filter: |
# required, the name of the MDC: MtxEventStreamFilter
$: MtxEventStreamFilter
# required, this array field, IncludeEventArray should have an MDC
# entry for each type of event. An event type is based on the MDC container
# name
IncludeEventArray:
# Each array element should be of type "MtxEventFilterSpec"
- $: MtxEventFilterSpec
# we want MtxPurchaseEvent objects
EventTypeName: MtxPurchaseEvent
# setting IncludeDescendants to "true" indicates that we want
# any and all events whose MDC container is derived from the name
# listed in "EventTypeName"
IncludeDescendants: "true"
#
# this section is used to tell ESF where these events should be delivered
#
config: |
# the connector to use for these events
connector: kafka-connector
# the topic (specific to kafka-connector) where events should be deliverd
topic: purchaseevents_json
# the format these events should be delivered in
format: json
# settings (specific to kafka-connector)
settings:
# here to find out about the kafka connector
bootstrap.servers: 10.10.96.51:29092
# how the kafka-connector should deal with acknowledgements
acks: all
# number of retries for the connector
retries: 0
# value serializer to use (must be appropriate for the "format")
value.serializer: org.apache.kafka.common.serialization.StringSerializer
# key serializer (must be appropriate for key, usually StringSerializer)
key.serializer: org.apache.kafka.common.serialization.StringSerializer