Enable the Event Streaming Framework

You can define an event streamer configuration source to enable the Event Streaming Framework.

Procedure

  1. Copy the/opt/mtx/conf/mdc_config_custom.xml file for the MATRIXX Engine to the config directory of the event streaming configuration source.
  2. Update the mtx_event_streamer.yaml file, for example:
    #
    #                      GLOBAL Configuration Settings
    #
    ---
    # The number of processing threads dedicated to processing event records
    # and sending them to kafka.  If this value is set to 0 or not set, then
    # MATRIXX computes the number of unique subdomain streams you have configured
    # and uses them for the size of this thread pool
    processingThreads: 16
    
    # The sleep interval used in event streaming main program. Changing this
    # will control the frequency of main task loops and CPU usage. The unit of
    # interval is ms. Default value is 500ms
    taskLoopSleepInterval: 500
    
    #
    # rsgateway connection, uncomment out for persistent session cursors
    rsgateway:
      url: http://localhost:8080/rsgateway
      login: streamer
      password: str34m3r
    
    
    ##############################################################################
    #                          Configure known subdomains
    # This section is used to define all of the known subdomains that will
    # participate in event streaming.
    
    
    #
    # "subdomains" is a space-separated list of subdomain names, and each listed
    # subdomain will have a corresponding section below:
    #
    subdomains: RTID1 RTID2
    
    
    #
    # RTID1 : this subdomain will stream the events defined in "kafkaAllMdcStream"
    #
    RTID1:                               # name this subdomain
      # address of tra (or engine if no TRA)
      host: 10.105.57.187
    
      # port to open
      port: 4100
    
      # timeout before considering stream dead
      timeout: 60000                     
    
      # space-separated list of stream names.  Each of these names must be
      # defined in the "Configure All Streams" section.
      types: kafkaAllMdcStream           
    
      # configuration block for this subdomain
      config: |                          
        #   route to send to the TRA
        route: RTID1                     
      
        #
        # specific overrides for the stream kafkaAllMdcStream stream can be
        # listed here.  These values apply to the connection between the ESF and
        # the stream server that is being used to stream events from from this 
        # subdomain (RTID1) for this stream (kafkaAllMdcStream).  Each such stream
        # (if there were more than one) can be individually controlled so that the
        # priority of each stream is indepependant of all other streams.
        #
        kafkaAllMdcStream:
    
          # sleep delay after receiving events
          delay: 0                       
    
          # sleep time 0 events are received back from the Stream Server (SS)
          sleep: 5000                    
    
          # timeout for requests to the SS
          timeout: 5000                  
    
          # time between reconnection attempts
          reconnect: 60000               
    
    #
    # RTID2 : this subdomain will stream the events defined in 
    # "kafkaPurchaseJsonStream"
    #
    RTID2:
      # address of tra (or engine if no TRA)
      host: 10.105.57.187 
    
      # port to open
      port: 4100
    
      # timeout before considering the stream dead
      timeout: 60000
    
      # space-separated list of stream names.  Each of these names must be
      # defined in the "Configure All Streams" section.
      types: kafkaPurchaseJsonStream
      config: |
    
        #   route to send to the TRA
        route: RTID2
    
        #
        # specific overrides for the stream kafkaPurchaseJsonStream stream can be
        # listed here.  These values apply to the connection between the ESF and
        # the stream server that is being used to stream events from from this 
        # subdomain (RTID2) for this stream (kafkaPurchaseJsonStream).  Each such stream
        # (if there were more than one) can be individually controlled so that the
        # priority of each stream is indepependant of all other streams.
        #
        kafkaPurchaseJsonStream:
    
          # sleep delay after receiving events
          delay: 0                       
    
          # sleep when 0 events are received from the SS
          sleep: 5000                    
    
          # timeout for requests to the SS
          timeout: 5000                  
    
          # time between reconnect attempts
          reconnect: 60000               
    
    ##############################################################################
    #                          Configure All Streams
    #
    # This section is used to configure all the known types of streams.  All 
    # streams listed in the "types" entry for a subdomain (above) must be listed
    # here as well.
    #
    
    #
    # A space-separated list of all named stream definitions.  Each entry here
    # must have a corresponding section with the name of the stream below.
    #
    streams: kafkaAllMdcStream kafkaPurchaseJsonStream
    
    
    # Define "kafkaAllMdcStream"
    kafkaAllMdcStream:
    
      #
      # this section defines what sort of events should be included in the stream.
      # Its syntax is that of a Matrixx MDC in RAW format:
      #
      filter: |
        # required, the name of the MDC: MtxEventStreamFilter
        $: MtxEventStreamFilter
    
        # required, this array field, IncludeEventArray should have an MDC
        # entry for each type of event.  An event type is based on the MDC container
        # name
        IncludeEventArray:
    
        # Each array element should be of type "MtxEventFilterSpec"
        - $: MtxEventFilterSpec
    
          # we want MtxEvent objects
          EventTypeName: MtxEvent
    
          # setting IncludeDescendants to "true" indicates to include
          # any and all events whose MDC container is derived from the name
          # listed in "EventTypeName"
          IncludeDescendants: "true"
    
      #
      # this section is used to tell the ESF where these events should be delivered
      #
      config: |
        # the connector to use for these events
        connector: kafka-connector
    
        # the topic (specific to kafka-connector) where events should be delivered
        topic: allevents_mdc
    
        # the format these events should be delivered in
        format: raw
    
        # settings (specific to kafka-connector)
        settings:
    
          # here to find out about the kafka connector
          bootstrap.servers: 10.10.96.51:29092
    
          # how the kafka-connector should deal with acknowledgements
          acks: all
    
          # number of retries for the connector
          retries: 0
    
          # value serializer to use (must be appropriate for the "format")
          value.serializer: com.matrixx.kafka.eventStore.DataContainerSerdes
    
          # key serializer (must be appropriate for key, usually StringSerializer)
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
    
    # Define "kafkaPurchaseJsonStream"
    kafkaPurchaseJsonStream:
      #
      # this section defines what sort of events should be inlucded in the stream.
      # It's syntax is that of a Matrixx MDC in JSON format:
      #
      filter: |
        # required, the name of the MDC: MtxEventStreamFilter
        $: MtxEventStreamFilter
    
        # required, this array field, IncludeEventArray should have an MDC
        # entry for each type of event.  An event type is based on the MDC container
        # name
        IncludeEventArray:
    
        # Each array element should be of type "MtxEventFilterSpec"
        - $: MtxEventFilterSpec
    
          # we want MtxPurchaseEvent objects
          EventTypeName: MtxPurchaseEvent
    
          # setting IncludeDescendants to "true" indicates that we want
          # any and all events whose MDC container is derived from the name
          # listed in "EventTypeName"
          IncludeDescendants: "true"
    
      #
      # this section is used to tell the ESF where these events should be delivered
      #
      config: |
        # the connector to use for these events
        connector: kafka-connector
    
        # the topic (specific to kafka-connector) where events should be delivered
        topic: purchaseevents_json
    
        # the format these events should be delivered in
        format: json
    
        # settings (specific to kafka-connector)
        settings:
    
          # here to find out about the kafka connector
          bootstrap.servers: 10.10.96.51:29092
    
          # how the kafka-connector should deal with acknowledgements
          acks: all
    
          # number of retries for the connector
          retries: 0
    
          # value serializer to use (must be appropriate for the "format")
          value.serializer: org.apache.kafka.common.serialization.StringSerializer
    
          # key serializer (must be appropriate for key, usually StringSerializer)
          key.serializer: org.apache.kafka.common.serialization.StringSerializer