Skip to main content
Version: 2.8

RmQ Publisher Configuration

The publisher is used to send data from HumanOS® NodeSpace to the RabbitMQ broker. Each publisher has its own context and can publish data either to different RabbitMQ instances or exchanges. To check the possibilities of publisher configuration, please check the Common DataLogger Configuration manual.

The logger specific connection string must be set in the property "Connection". Connection string containing host and user credentials: Server=<Enter IP>; Uid=<Username>; Pwd=<Password>; Vhost=<Virtual Host>

The following table contains the specific properties of the RabbitMqClient Publisher configuration. It extends the general DataLogger publisher configuration documented in Common DataLogger Configuration.

ParameterDescriptionData Type
RoutingKeyDefault routing keySystem.String
ExchangeThe exchange with its name, if no name is provided, the default exchange (direct) is used (standard). Exchange see Exchange Config
QueuesArray of Queues see Queue Config

Keep in mind that if no data is available the routing key and queue specification in the messages will not be effective if using a sampler, so there will be no data sent. Specify this data in the client if you are willing to receive continuous (even empty) packages.

The following graphic explains the functioning of the different configuration types:

Different settings of RabbitMQ data publishing

Exchange Configuration

ParameterDescriptionData Type
AutoDeleteIf no more queues are bound and no more messages are passed the exchange will be deleted. Default: falseSystem.Boolean
DeclareDeclare the exchange, this will create a new exchange if it doesn’t exit; if it exists with the same configuration this is ignored, a different configuration will lead to an error. Default: falseSystem.Boolean
DurableThe exchange will not be deleted if the client restarts. Default: trueSystem.Boolean
RoutingKeyPass a routing key to define which queues receive which data (matches with the queues binding key, see rabbit mq doc)System.String
TypeThe type of the Exchange which can either be topic, direct, fanout or header based (see rabbit mq documentation) Default: directSystem.String
BindingThe queues binding key which will match a routing key (see rabbit mq documents)System.String

Queue Configuration

ParameterDescriptionData Type
NameDeclare a queue with its name, in minimum one queue must be providedSystem.String
AutoDeleteDelete the queue if there are no more consumers or the connection is closed. Default: falseSystem.Boolean
DeclareDeclare the queue, this will create a new queue if it doesn’t exit; if it exists with the same configuration this is ignored, a different configuration will lead to an error. Default: falseSystem.Boolean
DurableThe queue will not be deleted if the client restarts. Default: trueSystem.Boolean
ExclusiveThis queue can only be deleted by its owner (connection close or command) Default: falseSystem.Boolean
BindingKeyThe queues binding key which will match a routing key (see rabbit mq documents)System.String
AcknowledgeEach message in said queue will be handshaked with the server and requeued if not acknowledged within time or in any error case. Default: falseSystem.String

Payload Scripting

When scripting the payload, it is possible to use System classes. Not all Functions or String operations will work however, special Chars like "$" will result in a script compile error (see CodeDom restrictions). The data to work with is located inside the list which is passed at the method head. Its structure looks like this:

  • DataSet (List<TDataSet> lstDatasets)
    • Name
    • Fields (List<TLogField> lstFields)
      • Config
        • Name
        • DataType
        • Query
        • Type
      • Value

Following Context Objects are always usable:

  • Logger (used to print errors or warnings to log, use with Context.Logger)
  • Configuration (the whole client configuration, use with Context.getValue<TAbstractDataLoggerConfiguration>("Configuration"))

Here is an example with a simple payload:

using CyberTech.Diagnostics;
using HumanOS.Kernel;
using HumanOS.Kernel.PeSeL.DataLogger;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using HumanOS.Kernel.DataModel;
using HumanOS.Kernel.Utils;
using CyberTech;
using HumanOS.Kernel.PeSeL.Script;
using Newtonsoft.Json.Linq;

/// <summary>
/// Script defining the payload
/// </summary>
public class TLoggerPayload : TAbstractDataLoggerScriptObject<byte[]>
{

///<see cref="TAbstractDataLoggerScriptObject{T}"/>
public override void initialize(IKernelAccess Kernel, TPayloadProcessingContext Context)
{
Context.setValue("dicAlarms", new Dictionary<string, TDataTimeSeries<TDataSet>>());
Context.setValue("dicClearingTimestamps", new Dictionary<string, DateTime>());
}

///<see cref="TAbstractDataLoggerScriptObject{T}"/>
public override byte[][] processPayload(IKernelAccess Kernel,
TPayloadProcessingContext Context,
List<TDataSet> lstDatasets)
{
List< byte[]> lstContents = new List<byte[]>();

//client bound routing
string strRoutingKey = Context.getValue<string>("RoutingKey");

//check for a TimeStamp to place in the header - none found means were sending non buffered
DateTime Timestamp = lstDatasets.Count > 0 ?
lstDatasets[0].getFieldValue<DateTime>("TimeStamp", TDateTime.UtcNowHighRes):
TDateTime.UtcNowHighRes;

string strTimeStampGenerated = Timestamp.ToString("yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff");
string strTimeStampRounded = new DateTime((Timestamp.Ticks + 150000000) /
300000000 * 300000000).ToString("yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff");

//*************************************Datasets**********************************************//

if (Context.getValue<TAbstractDataLoggerConfiguration>("Configuration").SamplerType ==
EDataSamplerType.OnlyLastValue)
{
JObject PayloadBodyDef = new JObject();
if (lstDatasets.Count > 0)
{
foreach (TDataSet DataSet in lstDatasets.Where(n => n.Type == EDataSetType.DataNode))
{
try
{
PayloadBodyDef.Add(new JProperty(DataSet.getFieldValue<string>("KKS", ""),
DataSet.Fields.First(n => n.Name == "Value").Value));
}
catch (ArgumentException Exc)
{
Context.Logger.writeError(String.Format(
"Client '{0}': Failed to add dataset '{1}' from SourceNode '{2}'. {3}.",
Context.getValue<TAbstractDataLoggerConfiguration>("Configuration").Name,
DataSet.Name,
DataSet.SourceNode.Name,
Exc.Message));
}
}
}

lstContents.Add(Encoding.UTF8.GetBytes(
new JObject(new JProperty("timestamp_generated", strTimeStampGenerated),
new JProperty("timestamp_rounded", strTimeStampRounded),
new JProperty("kks_Plant_ident", strRoutingKey),
new JProperty("data", PayloadBodyDef)).ToString()));
}

return lstContents.ToArray();
}

///<see cref="TAbstractDataLoggerScriptObject{T}"/>
public override void postProcess(IKernelAccess Kernel, TPayloadProcessingContext Context)
{
if (Context.getValue<TAbstractDataLoggerConfiguration>("Configuration").SamplerType == EDataSamplerType.AllData)
{
foreach (KeyValuePair<string, TDataTimeSeries<TDataSet>> TsKvp in Context.getValue<Dictionary<string, TDataTimeSeries<TDataSet>>>("dicAlarms"))
{
if (Context.getValue<Dictionary<string, DateTime>>("dicClearingTimeStamps").ContainsKey(TsKvp.Key))
{
Context.getValue<Dictionary<string, TDataTimeSeries<TDataSet>>>("dicAlarms")[TsKvp.Key].clearOlder(Context.getValue<Dictionary<string, DateTime>>("dicClearingTimeStamps")[TsKvp.Key]);
}
}
}
}
}

Example

  • 1 Publisher
  • Sampler On
  • 1 Exchange, topic based, declared
  • 1 Queue, declared, bound, acknowledge on
  • Message routing by messages
{
"Disable": false,
"Publishers": [
{
"Id": "6eadabf5-f86e-4c80-9114-ce8ab117e998",
"Name": "PublishLogger",
"PayloadScriptFile": "TLoggerPayload.cs",
"Connection": "Server=10.196.24.12;Uid=loguser;Pwd=<PW>;Vhost=test;",
"SamplingRate": "2",
"Queues": [
{
"Name": "Queue1",
"Acknowledge": true,
"Declare": true,
"BindingKey": "Routing1"
}
],
"Exchange": {
"Name": "Exchange_XY",
"Type": "topic",
"Declare": true
},
"RoutingKey": "Routing1",
"DataSets": [
{
"Name": "OpMode",
"Type": "DataNode",
"NodeFilter": "node.GlobalId == Guid.Parse(\"{e7c89c04-ff87-4951-95fa-6be81a29442d}\")",
"Fields": [
{
"Name": "Id",
"DataType": "System.Guid",
"Query": "item.GlobalId"
},
{
"Name": "TimeStamp",
"DataType": "System.DateTime",
"Query": "item.TimeStamp"
},
{
"Name": "Value",
"DataType": "System.Double",
"Query": "item.Value"
},
{
"Name": "State",
"DataType": "System.Int32",
"Query": "item.DataState"
}
]
}
],
"ServiceRule": {
"BindingRuleId": "{8A80927A-B1F8-4505-B4BA-FB2DFED765E4}",
"UnbindingRuleId": "{1058566D-0787-48C3-95A5-235E9A55CCF3}",
"Type": "All"
}
}
]
}