Event Streaming Framework Properties and Examples

Configure the Event Streaming Framework by using the mtx_event_streamer.yaml file. These properties require that a stream processing framework (such as Kafka) is already configured to accept events from the Event Streaming Framework. The mtx_event_streamer.yaml file also includes examples to use as templates for your own event streaming implementation.

Configuring the Event Streaming Properties

You configure event streaming global configuration properties by moving the /opt/mtx/data/mtx_event_streamer.yaml example configuration file to ${MTX_CONF_DIR}/mtx_event_streamer.yaml and editing its parameters. The default settings for the optional parameters are used unless you specify a different setting. The definition for each property follows a colon (:). Example: processingThreads: 16.
Note: Pre-5210 releases used a Java .properties file. Starting in release 5210, the default configuration file is mtx_event_streamer.yaml, although external JSON and .properties files are still supported. MATRIXX Support recommends that you transition to YAML.

You can also define sets of YAML parameters as separate files. If the YAML instructions are brief, add them to the mtx_event_streamer.yaml file.

Many of the default examples referenced in mtx_event_streamer.yaml are contained in these YAML files in the /opt/mtx/conf directory.

Configuring the Global Configuration Properties

Event Streaming Framework Global Configuration Properties describes the Event Streaming Framework global configuration properties.
Table 1. Event Streaming Framework Global Configuration Properties
Property Description
processingThreads (Optional) MATRIXX Support recommends that you do not change this property. This property defines the number of concurrent processing threads dedicated to processing event records and sending them to the stream processing framework. This value must not be greater than the number of streams you configure. If the value is greater than the number of streams, an error occurs. If this value is set to 0, the Event Streaming Framework determines the number of unique sub-domain streams configured and uses this number for the size of the thread pool. Setting this value lower than the number of defined streams caps the maximum number of threads used to process those streams and ensures that not all processors are used.
taskLoopSleepInterval (Optional) This property sets how often the main task loop executes in the Event Streaming Framework. The default is 500 ms, which is appropriate for most MATRIXX environments. If your Event Streaming Framework pod consistently shows excessive usage, increasing this value can help.
rsgateway This property identifies an RS Gateway to connect to. This property includes the following sub-properties:
  • url — The URL (IP address and port) of the RS Gateway.
  • login — The name of the RS Gateway user to log into RS Gateway with. The default value is streamer.
  • password — The password for the rsgateway.user. The default value is str34m3r.
  • required — An indicator of whether the Event Streaming Framework continues to process events if RS Gateway stops responding during the Event Streaming Framework start-up time. When this property is set to:
    • true — When the Event Streaming Framework starts, it tries to get the cursor from RS Gateway and keeps trying until it gets a valid response from RS Gateway. After passing the initialization step, the Event Streaming Framework communicates with the Event Stream Server, and after each response is processed, the Event Streaming Framework saves the cursor through RS Gateway. If RS Gateway is unreachable, the Event Streaming Framework retries the number of tries specified by the rsgateway.retry property (the default value is 8). After the specified number of retries, if RS Gateway is still unreachable, the Event Streaming Framework logs an error and tries to stop.
      Note: The initial, out-of-the-box configuration value is true.
    • false — The Event Streaming Framework continues working even if RS Gateway is not reachable. The Event Streaming Framework cannot save any processed cursor through RS Gateway in this case, which could cause many duplicate events to be reprocessed.
      Note: The value for this property is treated as false if the property is not in your configuration file.
  • retry — The number of times the Event Streaming Framework tries reaching RS Gateway to save the cursor before stopping. The default value is 8.
    Note: This number applies only to tries to save a cursor, not to get, create, or delete a cursor.
Example:
# rsgateway connection details
rsgateway:
  url: http://localhost:8080/rsgateway
  login: streamer
  password: str34m3r
  required: true
schemaVersion This property is only used to upgrade to a new release of MATRIXX. For details, see MATRIXX Installation and Upgrade.
extensionVersion This property is used only to upgrade to a new release of MATRIXX. For details, see MATRIXX Installation and Upgrade.
checkSchemaVersion (Optional) If this property is set to true, Event Streaming Framework tries to contact RS Gateway to update its schema version information if it detects an event that has a later schema version than what the Event Streaming Framework knows.

This property must be set in the GLOBAL Configuration Settings section of the mtx_event_streamer.yaml file, not per sub-domain.

forceSchemaPinning If this property is set to true, Event Streaming Framework uses schemaVersion and extensionVersion values from the mtx_event_streamer.yaml configuration file. If the value is false, Event Streaming Framework uses auto-pinning. This means that Event Streaming Framework makes an RS Gateway call to get the schema version and extension version values from MATRIXX Engine instead. The default value is false.

Configuring Streams

Event Streaming Framework Stream Configuration Properties describes the Event Streaming Framework stream configuration properties, including the connectors they use. You must define at least one stream. Not all properties are available to all connectors. For information about configuring the connectors, see the discussion about the Event Streaming Framework connectors.

Tip: The multi option for the config: property enables you to stream the same events to multiple locations. For details, see the discussion about multi-connectors.
To define properties in a separate external document, reference them within a YAML file using the '@filename' syntax. For example, config: '@config_stream_allevents.yaml' references a separate file called config_stream_allevents.yaml containing the config properties.
Table 2. Event Streaming Framework Stream Configuration Properties
Property Description
streams This property is a list of streams, each one separated by a space. For example, a stream that includes all events can be named all, and a stream that includes only purchase events can be named purchases. Use this syntax:
streams: stream_name1 stream_name2 stream_namen
 stream_name1:
  config: |
    key: value
    ...
  filter: | 
    key: value
    ...
 stream_name2:
  ...
 stream_namen:
  ...
filter Each stream uses at least one associated filter to identify the events (MDC objects) to include in the stream. Each filter is identified by a filter property.
This example filter for the all_events stream includes all events and their descendants:
all_events:
 filter: |
  $: MtxEventStreamFilter
  IncludeEventArray:
  - $: MtxEventFilterSpec
    EventTypeName: MtxEvent
    IncludeDescendants: "true"

Specify multiple streams for each MDC. Some connectors enable you to filter by object field.

config Each stream has an associated configuration defined by config. The configuration property must include the name of the connector that receives the event stream. Supported connectors include:
  • kafka — Events are sent to the Kafka stream processing framework.
  • activemq — Events are sent to an Apache ActiveMQ database.
  • googlepubsub — Events are sent to a Google Pub/Sub topic.
  • console — Events are sent to standard output.
  • multi — Events are sent to multiple endpoints.
  • disk — Events are sent to a file or directory as specified in the connector.

For more information about the individual connectors, see the discussion about Event Streaming Framework connectors.

The following parameters can also be configured for each stream for reprocessing failed events:
  • failed_events_dir — The name of the directory that holds files containing failed requests for later restreaming.
  • failed_delivery_rate <percentage> — If the percentage of failed events does not exceed the percentage you set, Event Streaming Framework logs any failed events in either the default directory or one you set with the failed_events_dir property and then processes the next batch. You can then reprocess the files manually.
For more information, see the discussion about reprocessing failed events.

Configuring Sub-domains (Producers)

Event Streaming Framework Sub-domain Configuration Properties lists the Event Streaming Framework sub-domain configuration properties. Sub-domains are the event streaming producers, and at least one sub-domain definition is required. The default definition works for MATRIXX environments with a single engine chain. If your environment uses sub-domain routing, you must define each of your sub-domains.

Each sub-domain has a streaming server, and each sub-domain can have different streams or the same streams. For example, you can separate streams of events from sub-domain A for prepaid subscribers and sub-domain B for postpaid subscribers. You can also balance subscribers with multiple sub-domains using the same streams.

Attention: A single stream has two threads. Each stream reads events from files independently, and configuring more streams can affect the performance of the publishing pod. MATRIXX Support recommends that you limit the number of streams to improve performance. Base the number of streams you configure on the message load of the processing and publishing pods. The default maximum number of streams is 10. Adjust this number based on the message load. Monitor stream performance using the EventTcpServer logs in mtx_debug.log. Track the progress by looking at the GTC cursor for the streamed events and comparing that with the maximum GTC that the publishing pod has generated.
Table 3. Event Streaming Framework Sub-domain Configuration Properties
Property Description
subdomains This property specifies a sub-domain that a grouping of event filters and configuration settings to use for an event stream. Other properties use subdomains: to configure information about the sub-domain. Each sub-domain to which the Event Streaming Framework connects must be configured. Each stream type has a single connection defined. For example, for two streams, all and purchases, two sockets are opened in the sub-domain. This example specifies a sub-domain called metxtesting that uses a separate mtxtestingSubdomainConfig.yaml configuration file on localhost using port 4100 with a timeout limit of 6 seconds:
subdomains: mtxtesting
mtxtesting:
  config: '@mtxtestingSubdomainConfig.yaml'
  host: localhost
  port: 4100
  timeout: 60000
  types: testingConsoleJsonStream
host This property specifies the name of the Event Streaming VIP in the tra_config_network_topology.xml file. In this configuration file fragment, the name is vipExtern:
<vip-addresses>
    <vip name="vipExtern" ifName="eth4" address="192.0.2.24" netmask="32"/>
Note: In test environments, you can use the IP address of the MATRIXX Engine publishing pod.
port This property specifies the port used by the TRA-RT-(SI/DR) virtual server (that uses the evs protocol) in the tra_config_network_topology.xml file. The default value is 4100 as shown in this file fragment:
<virtual-servers>
    <vs name="vsExternStream" vip="vipExtern" port="4100" protocol="evs" pool="poolAllSubdomainsStream"/>
timeout This property sets the time limit that the Event Streaming Framework waits for the Event Stream Server to return information from streams provided by the sub-domain. You can override this value on a per-stream basis adding the timeout: parameter to config:. This property has no default value. The value represents milliseconds.
types This property specifies which streams are opened in the sub-domain. The names entered in types: must match the name of a stream defined in streams. All streams to be accessed in the sub-domain must be listed with spaces or any other punctuation character separating each stream name. Example:
types: all testingstream1
config Each sub-domain must have a JSON or YAML configuration definition that defines parameters needed to communicate with the sub-domain.
Parameters include:
  • route — Sets the route to the sub-domain (such as RTID).
  • PageSize — (Optional) Sets the page size (the number of events to retrieve in each call to the Event Stream Server ). The default value is 1000.
  • event_stream_name — Sets the name of the event stream, where event_stream_name is the name of the stream. The stream has these parameters:
    • delay — (Optional) Sets how long to sleep after each time event is received from the Event Stream Server. The default value is 100 ms.
    • sleep — (Optional) Sets how long to sleep when the response from the Event Stream Server is empty or the event data count is 0. The default value is 5000 ms.
    • processEventsSleep — (Optional) Sets how long to wait if no event data to process is found in internal data storage. The default value is the value for delay.
    • concurrentRequests — (Optional) Sets the number of concurrent requests that can be made under a sub-domain for parallel processing and streaming. If no number is specified, then events are processed sequentially instead of in parallel. Specifying a number higher than 1 enables parallel processing.
      Note: High-speed parallel processing is not guaranteed when processing ordered events. If you need ordered events to be delivered, set concurrentRequests to 1 or leave it unset.
    • cursorQueueSize — (Optional) This parameter can only be used when Event Streaming Framework is in high-speed parallel processing mode (that is, when concurrentRequests is used). It sets the queue size for the priority queue in the cursor hash map. The default value is 5 multiplied by concurrentRequests.
    • maxResponseProcessingTime — (Optional) Sets the maximum allowed processing or waiting time for the response in the queue. The beginning timestamp of a response is the time that is received from Event Stream Server and added into a priority queue. When any response is finished, it is removed from queue. If the waiting or processing time in the queue is larger than this time:
      • The response is removed.
      • An error message is logged.
      • The response is logged in a failed event log file in a directory specified by failedEventsDir.
      The default value is the cursorQueueSize multiplied by esfTimeout.
    • failedEventsFileDir — (Optional) The name of the directory that holds files containing failed requests for later restreaming. For more information, see the discussion about reprocessing failed events.
    • esfTimeout — (Optional) Sets how long to wait before determining the connection from the Event Streaming Framework to the Event Stream Server is down. The default value is 6000 ms.
    • ssTimeout — (Optional) Sets the maximum time that the Event Stream Server is used to collect event data. The default value is 1000 ms.
    • timeout — Sets how long to wait for the leader Event Streaming Framework to respond before determining it is down and in an error state. This setting overrides the subdomain.timeout parameter. The default value is 6000 ms.
      Note: This property is deprecated for releases 5240 and later. For releases 5240 and later, use esfTimeout instead.
    • reconnect — (Optional) Sets how long to wait before trying to open a new connection to the sub-domain after a connection is closed due to an error. The default value is 60000 ms.
  • buffer — (Optional) This property sets the maximum buffer size for a data block that the Event Stream Server sends to the connector. The default value is 512 * 1024 bytes.
  • nonLeaderRetryInterval — (Optional) This property is valid in Event Streaming Framework HA groups and specifies an alternate sleep interval to use with non-leader Event Streaming Framework pods. The default value is 15000 milliseconds.
  • considerConnDeadInterval — (Optional) This property is valid in Event Streaming Framework HA groups and specifies how long to wait before considering the Event Streaming Framework HA group leader inoperative and to start communicating with one of the non-leader Event Streaming Framework pods. The default value is 30000 milliseconds.
  • traUnreachableNodeRetry — (Optional) This property specifies a retry interval in case of a Requested node 'id' is unreachable error. For information about this error, see the discussion about troubleshooting event streaming log errors in MATRIXX Monitoring and Logging. The default value is 1000 milliseconds.
  • traConnectionLimitRetry — (Optional) This property specifies a retry interval in case of an Event Streaming proxy connection limit error message. For information about this error, see the discussion about troubleshooting event streaming log errors. The default value is 2000 milliseconds.
  • reprocessBatchInterval:interval_ms — (Optional) This property specifies a wait time before reprocessing a batch of failed events. This property is useful if the optional Google Pub/Sub connector failed_delivery_rate property directs the Event Streaming Framework to reprocess a batch of events because too many events failed. This interval is used to delay the reprocessing in the hope that the events do not fail the next time. The default value is 60000 ms.

Example mtx_event_streamer.yaml File

The following code is a Event Streaming Framework mtx_event_streamer.yaml configuration file. The upgrade properties are omitted:
#
#                      GLOBAL Configuration Settings
#
---
# The number of processing threads dedicated to processing event records
# and sending them to kafka.  If this value is set to 0 or not set, then
# MATRIXX computes the number of unique subdomain streams you have configured
# and uses them for the size of this thread pool
processingThreads: 16

# The sleep interval used in event streaming main program. Changing this
# will control the frequency of main task loops and CPU usage. The unit of
# interval is ms. Default value is 500ms
taskLoopSleepInterval: 500

#
# rsgateway connection, uncomment out for persistent session cursors
rsgateway:
  url: http://localhost:8080/rsgateway
  login: streamer
  password: str34m3r


##############################################################################
#                          Configure known subdomains
# This section is used to define all of the known subdomains that will
# participate in event streaming.


#
# "subdomains" is a space seperated list of subdomain names and each listed
# subdomain will have a corresponding section below:
#
subdomains: RTID1 RTID2


#
# RTID1 : this subdomain will stream the events defined in "kafkaAllMdcStream"
#
RTID1:                               # name this subdomain
  # address of tra (or engine if no TRA)
  host: 10.105.57.187

  # port to open
  port: 4100

  # timeout before considering stream dead
  timeout: 60000                     

  # space seperated list of stream names.  Each of these names must be
  # defined in the "Configure All Streams" section
  types: kafkaAllMdcStream           

  # configuration block for this subdomain
  config: |                          
    #   route to send to the TRA
    route: RTID1                     
  
    #
    # specific overrides for the stream kafkaAllMdcStream stream can be
    # listed here.  These values apply to the connection between ESF and
    # the stream server that is being used to stream events from from this 
    # subdomain (RTID1) for this stream (kafkaAllMdcStream).  Each such stream
    # (if there were more than one) can be individually controlled so that the
    # priority of each stream is indepependant of all other streams
    #
    kafkaAllMdcStream:

      # sleep delay after we have rcvd events
      delay: 0                       

      # sleep when we got back 0 events from SS
      sleep: 5000                    

      # timeout for requests to the SS
      timeout: 5000                  

      # time between reconnect attempts
      reconnect: 60000               

#
# RTID2 : this subdomain will stream the events defined in 
# "kafkaPurchaseJsonStream"
#
RTID2:
  # address of tra (or engine if no TRA)
  host: 10.105.57.187 

  # port to open
  port: 4100

  # timeout before considering stream dead
  timeout: 60000

  # space seperated list of stream names.  Each of these names must be
  # defined in the "Configure All Streams" section
  types: kafkaPurchaseJsonStream
  config: |

    #   route to send to the TRA
    route: RTID2

    #
    # specific overrides for the stream kafkaPurchaseJsonStream stream can be
    # listed here.  These values apply to the connection between ESF and
    # the stream server that is being used to stream events from from this 
    # subdomain (RTID2) for this stream (kafkaPurchaseJsonStream).  Each such stream
    # (if there were more than one) can be individually controlled so that the
    # priority of each stream is indepependant of all other streams
    #
    kafkaPurchaseJsonStream:

      # sleep delay after we have rcvd events
      delay: 0                       

      # sleep when we got back 0 events from SS
      sleep: 5000                    

      # timeout for requests to the SS
      timeout: 5000                  

      # time between reconnect attempts
      reconnect: 60000               

##############################################################################
#                          Configure All Streams
#
# This section is used to configure all the known types of streams.  All 
# streams listed in the "types" entry for a subdomain (above) must be listed
# here as well.
#

#
# A space seperated list of all named stream definitions.  Each entry here
# must have a corresponding section with the name of the stream below
#
streams: kafkaAllMdcStream kafkaPurchaseJsonStream


# Define "kafkaAllMdcStream"
kafkaAllMdcStream:

  #
  # this section defines what sort of events should be inlucded in the stream.
  # It's syntax is that of a Matrixx MDC in RAW format:
  #
  filter: |
    # required, the name of the MDC: MtxEventStreamFilter
    $: MtxEventStreamFilter

    # required, this array field, IncludeEventArray should have an MDC
    # entry for each type of event.  An event type is based on the MDC container
    # name
    IncludeEventArray:

    # Each array element should be of type "MtxEventFilterSpec"
    - $: MtxEventFilterSpec

      # we want MtxEvent objects
      EventTypeName: MtxEvent

      # setting IncludeDescendants to "true" indicates that we want
      # any and all events whose MDC container is derived from the name
      # listed in "EventTypeName"
      IncludeDescendants: "true"

  #
  # this section is used to tell ESF where these events should be delivered
  #
  config: |
    # the connector to use for these events
    connector: kafka-connector

    # the topic (specific to kafka-connector) where events should be deliverd
    topic: allevents_mdc

    # the format these events should be delivered in
    format: raw

    # settings (specific to kafka-connector)
    settings:

      # here to find out about the kafka connector
      bootstrap.servers: 10.10.96.51:29092

      # how the kafka-connector should deal with acknowledgements
      acks: all

      # number of retries for the connector
      retries: 0

      # value serializer to use (must be appropriate for the "format")
      value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes

      # key serializer (must be appropriate for key, usually StringSerializer)
      key.serializer: org.apache.kafka.common.serialization.StringSerializer

# Define "kafkaPurchaseJsonStream"
kafkaPurchaseJsonStream:
  #
  # this section defines what sort of events should be inlucded in the stream.
  # It's syntax is that of a Matrixx MDC in JSON format:
  #
  filter: |
    # required, the name of the MDC: MtxEventStreamFilter
    $: MtxEventStreamFilter

    # required, this array field, IncludeEventArray should have an MDC
    # entry for each type of event.  An event type is based on the MDC container
    # name
    IncludeEventArray:

    # Each array element should be of type "MtxEventFilterSpec"
    - $: MtxEventFilterSpec

      # we want MtxPurchaseEvent objects
      EventTypeName: MtxPurchaseEvent

      # setting IncludeDescendants to "true" indicates that we want
      # any and all events whose MDC container is derived from the name
      # listed in "EventTypeName"
      IncludeDescendants: "true"

  #
  # this section is used to tell ESF where these events should be delivered
  #
  config: |
    # the connector to use for these events
    connector: kafka-connector

    # the topic (specific to kafka-connector) where events should be deliverd
    topic: purchaseevents_json

    # the format these events should be delivered in
    format: json

    # settings (specific to kafka-connector)
    settings:

      # here to find out about the kafka connector
      bootstrap.servers: 10.10.96.51:29092

      # how the kafka-connector should deal with acknowledgements
      acks: all

      # number of retries for the connector
      retries: 0

      # value serializer to use (must be appropriate for the "format")
      value.serializer: org.apache.kafka.common.serialization.StringSerializer

      # key serializer (must be appropriate for key, usually StringSerializer)
      key.serializer: org.apache.kafka.common.serialization.StringSerializer