Process Events with ApisEventBus

Overview

The EventBus bee is used for event processing. It uses item types to define four basic processing components:

  • Sources connects to something that can produce events, e.g. Chronical, Kafka.
  • Sinks connects to something that can consume events, e.g. Chronical, RDBMS, Kafka.
  • Channels are places where Sources can publish events, and Sinks can subscribe to events
  • Routers move events between Channels with optional filtering and transformations

Channel item type

This item type defines a channel/queue with the same name as the item. By default, all events published to a channel will immediately be forwarded to all subscribers. If the attribute BatchTime is set, events will be grouped together for this number of seconds, into a single event. If the attribute BatchSize is also specified, this it will override BatchTime when the specified number of events have been batched.

Router item type

This item type has attributes for Input channel, Output channel and Script. All events published to the input channel is filtered and/or transformed by the script, and the result is published on the output channel.

Source.Chronical item type

This item type creates a subscription for chronical events in the local hive instance. It has attributes used to specify an Event source, an Event type, and an Output channel.

The Source.Chronical item produces events like this:

<event>
	<Timestamp value="132313410105734253" filetime="2020-04-14T12:30:10.5734253Z"/>
	<Generation value="1"/>
	<Sequence value="19708435"/>
	<Source value="2166" name="Worker.Variable1">
		<Area name="AlarmArea"/>
	</Source>
	<Type value="20" name="LevelAlarm">
		<Attr>
			<UA_NODEID value="0:0:9482"/>
			<UA_SUBSTATES value="Low,LowLow,High,HighHigh"/>
			<AE_CONDITION value="Level alarm"/>
		</Attr>
	</Type>
	<State value="16777227" Enabled="1" Active="1" Acked="1" Low="1"/>
	<Severity value="780"/>
	<Message value="The value is lower than 5, last was 0"/>
	<Received value="132313410105734253" filetime="2020-04-14T12:30:10.5734253Z"/>
	<SourceName value="Worker.Variable1"/>
	<UserName value="PREDIKTOR\larsh"/>
	<Category value="2" name="Level"/>
	<ActiveTime value="132313408772290610" filetime="2020-04-14T12:27:57.2290610Z"/>
	<CurrentValue value="0"/>
	<CurrentQuality value="192"/>
	<CurrentTimestamp value="132313408772290610" filetime="2020-04-14T12:27:57.2290610Z"/>
	<LastState value="16777223" Enabled="1" Active="1" AckRequired="1" Low="1"/>
	<Comment value="Acked from AMS"/>
</event>

Sink.Db item type

This item type connects to an ADO database specified by the attribute Connectingstring, and executes stored procedures when the events received on the Input channel has the following layout:

<db>
	<exec name='stored-proc-name'>
		<arg name='param-name-1' value='param-value'/>
		<arg name='param-name-2' value='param-value'/>
	</exec>
</db>

Sink.Smtp item type

This item type is used to send emails. The attributes Server (and optionally Username and Password) defines the SMTP connection.

The attributes From, To, Cc, Bcc and Importance defines message properties.

The Sink.Smtp item expects incoming events to have the following layout:

<mail subject="New alarms from ApisHive">
  <part type='text/html'>
	<html>
	  <head>.....</head>
	  <body>....</body>
	</html>
  </part>
<mail>

Sink.Tracelog item type

This item type saves each received event to a tracelog. It is used for debugging/inspecting the events on a channel specified by the attribute Input channel. The attribute TraceFile specifies the path to the tracefile, and the attribute Enabled is used to enable/disable tracing.

Example: exporting events from Chronical to PDS DB

  • Create two Channel items: "ChronicalChannel" and "PdsChannel"
  • Create one Source.Chronical item, specify the Eventtype and Eventsource which should be exported, and set the Output channel to "ChronicalChannel".
  • Create one Sink.Db item, specify the ADO Connectionstring for the PDS database and set the Input channel to "PdsChannel"
  • Create one Router item, set Input channel to "ChronicalChannel", Output channel to "PdsChannel"

Now events can be transformed stored procedure invocations by the following xslt:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">

<xsl:template match="/event">
	<db>
		<exec name='ac_new_event'>
			<arg name='Timestamp' value='{Timestamp/@filetime}'/>
			<arg name='SourceName' value='{Source/@name}'/>
			<arg name='AlarmAreaName' value='{Source/Area/@name}'/>
			<arg name='TypeName' value='{Type/@name}'/>
			<xsl:apply-templates select='Type/Attr/AE_CONDITION'/>
			<xsl:apply-templates select='State/@Active'/>
			<xsl:apply-templates select='State/@Enabled'/>
			<xsl:apply-templates select='State/@Acked'/>
			<xsl:apply-templates select='State/@Low'/>
			<xsl:apply-templates select='State/@LowLow'/>
			<xsl:apply-templates select='State/@High'/>
			<xsl:apply-templates select='State/@HighHigh'/>
			<arg name='Severity' value='{Severity/@value}'/>
			<arg name='Message' value='{Message/@value}'/>
			<arg name='CategoryName' value='{Category/@name}'/>
			<arg name='ActiveTime' value='{ActiveTime/@filetime}'/>
			<arg name='Quality' value='{CurrentQuality/@value}'/>
			<xsl:apply-templates select='Comment'/>
			<xsl:apply-templates select='UserName'/>
		</exec>
	</db>
</xsl:template>

<xsl:template match='@Active'>
	<arg name="StateActive" value="1"/>
</xsl:template>

<xsl:template match='@Enabled'>
	<arg name="StateEnabled" value="1"/>
</xsl:template>

<xsl:template match='@Acked'>
	<arg name="StateAck" value="1"/>
</xsl:template>

<xsl:template match='@Low'>
	<arg name="SubconditionName" value="Lo"/>
</xsl:template>

<xsl:template match='@LowLow'>
	<arg name="SubconditionName" value="LoLo"/>
</xsl:template>

<xsl:template match='@High'>
	<arg name="SubconditionName" value="Hi"/>
</xsl:template>

<xsl:template match='@HighHigh'>
	<arg name="SubconditionName" value="HiHi"/>
</xsl:template>

<xsl:template match='Comment'>
	<arg name="Comment" value="{@value}"/>
</xsl:template>

<xsl:template match='UserName'>
	<arg name="UserName" value="{@value}"/>
</xsl:template>

<xsl:template match='AE_CONDITION'>
	<arg name="ConditionName" value="{@value}"/>
</xsl:template>

</xsl:stylesheet>

Example: SMTP formatting of batched events

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">

<xsl:output method="xml" indent='yes'/>

<xsl:template match="/batch">
  <mail subject="You've got {count(event)} new alarms!">
	<part type='text/html'>
	  <html>
		<head>
		  <style>
			body {font-family: Helvetica, sans-serif;}
			th, td {text-align: left;}
			th {background: #ccc; padding: 0.2em; padding-right: 2em;}
			td {padding: 0.1em 0.2em; padding-right: 2em;}
		  </style>
		</head>
		<body>
		  <h1>Alarm list</h1>
		  <p>There was <xsl:value-of select="count(event)"/> alarm-related events since the previous report.</p>
		  <table>
			<tr>
			  <th>Date</th>
			  <th>Time</th>
			  <th>Severity</th>
			  <th>Active</th>
			  <th>Acked</th>
			  <th>Condition</th>
			  <th>Source</th>
			  <th>Message</th>
			  <th>Operator</th>
			  <th>Comment</th>
			</tr>
			<xsl:apply-templates select='event'/>
		  </table>
		</body>
	  </html>
	</part>
  </mail>
</xsl:template>

<xsl:template match='event'>
  <tr>
	<xsl:if test='position() mod 2 = 0'>
	  <xsl:attribute name='style'>background-color: #f0f0f5;</xsl:attribute>
	</xsl:if>
	<td><xsl:value-of select='substring(Timestamp/@filetime, 0, 11)'/></td>
	<td><xsl:value-of select='substring(Timestamp/@filetime, 12, 8)'/></td>
	<td><xsl:value-of select='Severity/@value'/></td>
	<td style='color:red;'><xsl:if test='State/@Active'>&#x2757;</xsl:if></td>
	<td style='color:green;'><xsl:if test='State/@Acked'>&#x2714;</xsl:if></td>
	<td><xsl:value-of select='Type/Attr/AE_CONDITION/@value'/></td>
	<td><xsl:value-of select='Source/@name'/></td>
	<td><xsl:value-of select='Message/@value'/></td>
	<td><xsl:value-of select='UserName/@value'/></td>
	<td><xsl:value-of select='Comment/@value'/></td>
  </tr>
</xsl:template>

</xsl:stylesheet>

Example: filtering of chronical events

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">

<xsl:output method="xml" omit-xml-declaration="yes"/>

<!-- 
Template matching the top element of the input document.

If this element matches the filtering criteria (State/@Active=1),
the element is applied to the 'copy-template' which recursively
copies an element and all its attributes and child elements.

If the top element does not match the filtering, nothing will be
copied to the output document.
-->
<xsl:template match="/event">
  <xsl:if test="State[@Active=1]">
	<xsl:apply-templates mode="copy" select="."/>
  </xsl:if>
</xsl:template>

<!--
Template used to copy nodes and attributes, recursively
-->  
<xsl:template mode="copy" match='@*|node()'>
  <xsl:copy>
	<xsl:apply-templates mode="copy" select="@*|node()"/>
  </xsl:copy>
</xsl:template>

</xsl:stylesheet>