What's new in SwiftMQ Streams 10.1.0

New Windowing

SwiftMQ Streams supports 2 types of windows: tumbling and sliding windows, both can be limited by count or by time. It further supports session windows by Memory Groups.

When a window reaches its limit, Messages will retire and can be processed by an onRetire callback that can be registered at the particular Memory.

Sliding Windows

A sliding window retires only those Messages that drops out of the window if the limit is reached. It does not create a new window but the window slides along the Message stream:

Tumbling Windows

A tumbling window retires all Messages of that window at once when the limit is reached. For a count-based window this is when the count of the limit is reached, in a time-based window this is when the oldest Message time is less or equal the limit's time.

Session Windows

A session window is a window which allows to group different Messages from the stream for a specific session.

The Message stream may consist of activities of different users, customers, hosts etc. A Property of the Messages in the stream contains the value where we like to group these Messages. Result is one Memory per distinct value, e.g. one Memory per distinct user if we group over a user id.

Count Limit

A Count Limit limits a Memory by message count. It can be tumbling or sliding (default).

This example limits Memory "warning" to the last 2 Messages in a sliding window:

         stream.create().memory("warning").heap().limit().count(2).sliding();

This example limits Memory "mem" to the last 1000 Messages in a tumbling window:

         stream.create().memory("mem").heap().limit().count(1000).tumbling();

Time Limit

A Time Limit limits a Memory by time. It can be tumbling or sliding (default).

This example limits Memory "mem" to the last 90 seconds in a sliding window:

         stream.create().memory("mem").heap()
                                      .limit()
                                      .time()
                                      .sliding()
                                      .minutes(1)
                                      .seconds(30);

This example limits Memory "itemstats" to the last 10 seconds in a tumbling window:

         stream.create().memory("itemstats").heap()
                                            .limit()
                                            .time()
                                            .tumbling()
                                            .seconds(10);

Chaining Limits

Limits can be chained and will be checked in the order of definition. For example, a Time Limit can be defined and in case of a high volume Message stream a hard Count Limit can be added to prevent that too many messages are stored in the window:

         stream.create().memory("itemstats").heap()
                                            .limit().time().tumbling().seconds(10)
                                            .limit().count(10000).tumbling();

onRetire Callback

When a Limit is reached, Messages will retire. To process these Messages, an onRetire callback needs to be registered at the Memory where the processing takes place:

        // Callback function for onRetire to print the statistics
        function printStatistic(retired){
            // Generate statistics in the streams's log file
            stream.log().info("Item Statistics:");

            // Get and log the statistic data (items with summarized quantity in descending order)
            retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function(message){
              stream.log().info(message.property("ITEMNO").value().toString() + " = " +
                                message.property("QTY").value().toString());
            });
        }
        // Create itemstats Memory
        stream.create().memory("itemstats")
                       .heap()
                       .limit()
                       .time()
                       .tumbling()
                       .seconds(10)
                       .onRetire(printStatistic);

Event Time Processing

When a Message is added to a Memory, the Memory uses the current time as the store time and all time-based limits work on this time. This is called processing time.

However, if Messages should be processed on the time at which the event has occured, it is called event time processing. In that case the Message must contain a Property of type Long with the actual event time.

The event time Property name needs to be set at the Memory with:

        stream.create().memory("mem").orderBy("evttime");

The Memory then orders the Messages by the values of the "orderBy" Property. This is also the case if the Messages arrive out of order but only within the current window. The first Message in the Memory is the oldest, the last Message is the youngest, based on event time.

New Base Functionality

Memory Join

Performs an inner join with the right Memory over the named join Property name which must exists in the Messages of both Memories. Result is a Memory which contains Messages where each Message at the left side (this Memory) matches with a Message on the right side. The result Message will contain the left result Message enriched with all Properties of the right result Message.

      // Join orderhead with the orderpos memory
      var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID");

Timer Reconfiguration

Should a Timer be reconfigured with a new interval without recreating it, it must be resetted, the new value must be set and then reconfigure must be called:

        // Create a Management Input that receives changes on my own statistic-interval-sec parameter
        // and reconfigures the Timer
        stream.create().input("sys$streams/streams/"+stream.name()+"/parameters/statistic-interval-sec").management().onChange(function (input){

          // Get the new value
          var secs = input.current().property("value").value().toInteger();

          // Reset and reconfigure the Timer with the new value
          stream.timer("stats").reset().seconds(secs).reconfigure();

          // Recreate the itemstats Memory
          stream.memory("itemstats").close();
          stream.create().memory("itemstats").heap().limit().time().sliding().seconds(secs);

          // Log it into the Logfile
          stream.log().info("Statistic interval reconfigured to "+secs+" seconds");

          // Avoid that this Management Message is passed to onMessage
          input.current().onMessageEnabled(false);

        });

CLI Interface

A Stream provides a CLI interface to the local Router's management tree to execute any CLI command, e.g. changing contexts, creating queues, reboot the Router.

        stream.cli().execute("cc /sys$queuemanager/queues")
                    .exceptionOff()
                    .execute("delete myqueue")
                    .exceptionOn()
                    .execute("new myqueue cache-size 1000")
                    .execute("save");

Create and Close Components dynamically

Components such as Inputs, Memories, Outputs, Timers can now be created and closed dynamically inside callbacks:

        stream.create().input("sys$queuemanager/usage/testqueue/messagecount").management().onChange(function(input){
            if (input.current().property("messagecount").value().toInteger() > 0 && stream.input("testqueue") == null)
                stream.create().input("testqueue").queue().onInput(function(input){
                    print("onInput (testqueue): "+input.current().body());
                }).start();
            else if (stream.input("testqueue") != null)
                stream.input("testqueue").close();
        });

Create Temporary Queues, Register JNDI, use as Queue Input

If a Stream acts as a service to other clients and receives request Messages on an Input, it is not always necessary to use a regular queue here if the Messages are non-persistent. For this case a Stream can create a temporary queue, registers it in JNDI and creates a Queue Input so that JMS clients can perform a JNDI lookup and send Messages to this queue.

        stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();

Disable onMessage Callback Processing

If a Message is handled in onInput and must not flow through the onMessage callback, it can be disabled during onInput:

        input.current().onMessageEnabled(false):

Management Input

A Management Input consumes events from the local Router's management tree and converts it into Messages. The name of the Input is the CLI context and can refer to an EntityList, an Entity or a Property.

        // Create a ManagementInput on sys$queuemanager/usage and receive adds and changes.
        // No need to set a onRemove callback as these Memories time out by the inactivity timeout.
        // The selector filters those queue which are not system queues and not our own queue (qs_)
        // and the message must contain the messagecount property (other property changes are not of interest).
        stream.create().input("sys$queuemanager/usage")
            .management()
            .selector("name not like '%$%' and name not like 'qs\_%' and messagecount is not null")
            .onAdd(function (input) {
                // A new queue has been created.
                // Add it to the memory group.
                stream.memoryGroup("queues").add(input.current());
            })
            .onChange(function (input) {
                // Property "messagecount" has changed.
                // Add it to the MemoryGroup
                stream.memoryGroup("queues").add(input.current());
            });

forEach Callback

All Messages of a Memory and all Properties of a Property Set can be accessed by the new forEach method:

        grouped.forEach(function (message) {
          stream.log().info(message.property("ITEMNO").value().toString() + " = " +
                            message.property("QTY").value().toString());
        });

Property Set

To get all Properties of a Message, use "properties()" which returns a PropertySet. This has various methods to select Properties by their names like "startsWith()", "endsWith()", "select(regex)". Each of these methods returns a new PropertySet. Once you are happy with the selected set, use forEach to iterate over the result:

        // Returns all Properties not starting with an underscore and prints it
        stream.current().properties().notStartsWith("_").forEach(function(property){
            print(property.name()+"="+property.value().toString());
        });

Parameters

Stream parameters, passed as the "parameters" variable to the Stream, now have 2 convenient methods "require" and "optional":

Require

This method defines a required (mandatory) parameter. It throws an exception that leads to a stop of this script if the parameter is not defined:

          var inputQueue = parameters.require("input-queue");

Optional

This method defines an optional parameter and provides a default value which is returned if the parameter is not defined:

          var mailHost = parameters.optional("mail-host", "localhost");

onException Callback

To get informed about exceptions that may occur during Stream processing an onException callback can be registered at the Stream. It is a function with 2 String parameters. The first is the exception, the second is the formatted stack trace:

        stream.onException(function(lastException, lastStackTrace){
            print("Last Exception: "+lastException);
            print("Last Stacktrace: "+lastStackTrace);
        });

Stream Restart Policy

When an exception occurred anywhere during the Stream processing, the onException callback is called (if registered), the current transaction is rolled back, the exception is logged into the Stream's log file and the Stream is stopped.

There are 2 properties for each Stream to configure an automatic restart, e.g. if a database connection was temporarily down etc. These are "Restart Delay" and "Max Restarts".

Changes

Domains and Packages

To have a clear logical separation between Streams, we have introduced a new structure which consists of domains, packages, and inside packages the Stream declaration itself.

Code Changes

onInput Callback

The onInput callback has now a single parameter which is the Input where this callback is registered:

          stream.create().input(orderHeadQueue).queue().onInput(function (input) {
            stream.memory("orderhead").add(input.current());
          });

onTimer Callback

The onTimer callback has now a single parameter which is the Timer where this callback is registered:

          stream.create().timer("monitor").interval().minutes(2).seconds(10).onTimer(function (timer) {
            stream.memory("all").checkLimit();
          });

Further Information

For further detailed information have a look at the SwiftMQ Streams Docs.