Enable the Event Streaming Framework
You can define an event streamer configuration source to enable the Event Streaming Framework.
Procedure
- Copy the/opt/mtx/conf/mdc_config_custom.xml file for the MATRIXX Engine to the config directory of the event streaming configuration source.
-
Update the mtx_event_streamer.yaml file, for example:
# # GLOBAL Configuration Settings # --- # The number of processing threads dedicated to processing event records # and sending them to kafka. If this value is set to 0 or not set, then # MATRIXX computes the number of unique subdomain streams you have configured # and uses them for the size of this thread pool processingThreads: 16 # The sleep interval used in event streaming main program. Changing this # will control the frequency of main task loops and CPU usage. The unit of # interval is ms. Default value is 500ms taskLoopSleepInterval: 500 # # rsgateway connection, uncomment out for persistent session cursors rsgateway: url: http://localhost:8080/rsgateway login: streamer password: str34m3r ############################################################################## # Configure known subdomains # This section is used to define all of the known subdomains that will # participate in event streaming. # # "subdomains" is a space-separated list of subdomain names, and each listed # subdomain will have a corresponding section below: # subdomains: RTID1 RTID2 # # RTID1 : this subdomain will stream the events defined in "kafkaAllMdcStream" # RTID1: # name this subdomain # address of tra (or engine if no TRA) host: 10.105.57.187 # port to open port: 4100 # timeout before considering stream dead timeout: 60000 # space-separated list of stream names. Each of these names must be # defined in the "Configure All Streams" section. types: kafkaAllMdcStream # configuration block for this subdomain config: | # route to send to the TRA route: RTID1 # # specific overrides for the stream kafkaAllMdcStream stream can be # listed here. These values apply to the connection between the ESF and # the stream server that is being used to stream events from from this # subdomain (RTID1) for this stream (kafkaAllMdcStream). Each such stream # (if there were more than one) can be individually controlled so that the # priority of each stream is indepependant of all other streams. # kafkaAllMdcStream: # sleep delay after receiving events delay: 0 # sleep time 0 events are received back from the Stream Server (SS) sleep: 5000 # timeout for requests to the SS timeout: 5000 # time between reconnection attempts reconnect: 60000 # # RTID2 : this subdomain will stream the events defined in # "kafkaPurchaseJsonStream" # RTID2: # address of tra (or engine if no TRA) host: 10.105.57.187 # port to open port: 4100 # timeout before considering the stream dead timeout: 60000 # space-separated list of stream names. Each of these names must be # defined in the "Configure All Streams" section. types: kafkaPurchaseJsonStream config: | # route to send to the TRA route: RTID2 # # specific overrides for the stream kafkaPurchaseJsonStream stream can be # listed here. These values apply to the connection between the ESF and # the stream server that is being used to stream events from from this # subdomain (RTID2) for this stream (kafkaPurchaseJsonStream). Each such stream # (if there were more than one) can be individually controlled so that the # priority of each stream is indepependant of all other streams. # kafkaPurchaseJsonStream: # sleep delay after receiving events delay: 0 # sleep when 0 events are received from the SS sleep: 5000 # timeout for requests to the SS timeout: 5000 # time between reconnect attempts reconnect: 60000 ############################################################################## # Configure All Streams # # This section is used to configure all the known types of streams. All # streams listed in the "types" entry for a subdomain (above) must be listed # here as well. # # # A space-separated list of all named stream definitions. Each entry here # must have a corresponding section with the name of the stream below. # streams: kafkaAllMdcStream kafkaPurchaseJsonStream # Define "kafkaAllMdcStream" kafkaAllMdcStream: # # this section defines what sort of events should be included in the stream. # Its syntax is that of a Matrixx MDC in RAW format: # filter: | # required, the name of the MDC: MtxEventStreamFilter $: MtxEventStreamFilter # required, this array field, IncludeEventArray should have an MDC # entry for each type of event. An event type is based on the MDC container # name IncludeEventArray: # Each array element should be of type "MtxEventFilterSpec" - $: MtxEventFilterSpec # we want MtxEvent objects EventTypeName: MtxEvent # setting IncludeDescendants to "true" indicates to include # any and all events whose MDC container is derived from the name # listed in "EventTypeName" IncludeDescendants: "true" # # this section is used to tell the ESF where these events should be delivered # config: | # the connector to use for these events connector: kafka-connector # the topic (specific to kafka-connector) where events should be delivered topic: allevents_mdc # the format these events should be delivered in format: raw # settings (specific to kafka-connector) settings: # here to find out about the kafka connector bootstrap.servers: 10.10.96.51:29092 # how the kafka-connector should deal with acknowledgements acks: all # number of retries for the connector retries: 0 # value serializer to use (must be appropriate for the "format") value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes # key serializer (must be appropriate for key, usually StringSerializer) key.serializer: org.apache.kafka.common.serialization.StringSerializer # Define "kafkaPurchaseJsonStream" kafkaPurchaseJsonStream: # # this section defines what sort of events should be inlucded in the stream. # It's syntax is that of a Matrixx MDC in JSON format: # filter: | # required, the name of the MDC: MtxEventStreamFilter $: MtxEventStreamFilter # required, this array field, IncludeEventArray should have an MDC # entry for each type of event. An event type is based on the MDC container # name IncludeEventArray: # Each array element should be of type "MtxEventFilterSpec" - $: MtxEventFilterSpec # we want MtxPurchaseEvent objects EventTypeName: MtxPurchaseEvent # setting IncludeDescendants to "true" indicates that we want # any and all events whose MDC container is derived from the name # listed in "EventTypeName" IncludeDescendants: "true" # # this section is used to tell the ESF where these events should be delivered # config: | # the connector to use for these events connector: kafka-connector # the topic (specific to kafka-connector) where events should be delivered topic: purchaseevents_json # the format these events should be delivered in format: json # settings (specific to kafka-connector) settings: # here to find out about the kafka connector bootstrap.servers: 10.10.96.51:29092 # how the kafka-connector should deal with acknowledgements acks: all # number of retries for the connector retries: 0 # value serializer to use (must be appropriate for the "format") value.serializer: org.apache.kafka.common.serialization.StringSerializer # key serializer (must be appropriate for key, usually StringSerializer) key.serializer: org.apache.kafka.common.serialization.StringSerializer