Skip to main content
Version: 2.8

UHAL Streaming Payload Scripts

The UHAL streaming communication scripts are used to implement a native communication protocol. Following plugins support streaming:

Class TAbstractStreamScriptObject

The script inherits from the class TAbstractStreamScriptObject.

Fields

NameDescriptionData Type
LoggerLogger instanceCyberTech.Diagnostics.ILogger

Methods

It provides following methods:

NameDescription
connectToDeviceMethod is called after a successful connection to a server or device.
disconnectFromDeviceMethod is called after a connection lost or disconnection.
handleStreamHandles the data stream. Method is called periodically in the pace of the task processor. This method must be implemented in the script file.
write<T>Writes data of type T to the server or device. Method is called by the write command of data nodes.
read<T>Reads data of type T from the server or device. Method is called by the read command of data nodes.
callCommandCalls the command to the server or device.

The script object must be derived from TAbstractStreamScriptObject. Following methods can be overwritten:

Callbacks

NameDescription
onDataReceivedCallback is called if data arrives asynchronously from a stream. Only supported by HumanOS.UHAL.SerialDeviceDriver.

Interface IDataStream

The interface IDataStream is used to access the data stream.

Methods

NameDescription
readreads a byte stream
writewrites a byte stream.
flushflushes the stream after writing.
NOTE

Use method flush() to force sending the data to the server or device.

Reading from stream

The method read returns a number of bytes from the stream. Note, that depending on the protocol not all data are returned at once. Rather, only available data is returned, and for larger amounts, read method must be called multiple times.

There are two method used to handle this:

  1. the read method is used in a loop reading all data until no data is available any more. This is not a safe method. Especially on ethernet, some packages has a larger delay.

  2. The protocol supports a data size field somewhere in the data header. In this case, we have to loop until all data is read from the stream.

Example

Following example shows how read a fixed data block from stream:

  string strData = "";

//Read header first: 4 leading bytes are block size
byte[] aui8Size = new byte[4];
DataStream.read(aui8Size, 0, 4);
int iStringSize = BitConverter.ToInt32(aui8Size, 0);

//Check if the string size is valid
if (iStringSize > 0 && iStringSize < 4000000)
{
byte[] aui8Buffer = new byte[iStringSize];
int iDataRead = 0;
while (iDataRead < iStringSize)
{
iDataRead += DataStream.read(aui8Buffer, iDataRead, iStringSize - iDataRead);
}
strData = System.Text.Encoding.ASCII.GetString(aui8Buffer);
}

Example: Reading Data from Native TCP

using CyberTech;
using HumanOS.Kernel;
using HumanOS.Kernel.Communication;
using HumanOS.Kernel.DataModel;
using HumanOS.Kernel.DataModel.Space;
using HumanOS.Kernel.Processing;
using HumanOS.Kernel.UHAL.InfoModel;
using HumanOS.Kernel.UHAL.Script;
using HumanOS.Kernel.Utils;
using System;
using System.Text;

namespace HumanOS.UHAL.TcpClientControl
{
/// <summary>
/// Implements a TcpBridge communicator
/// </summary>
public class TTcpBridgeCommunicator : TAbstractStreamScriptObject
{
///<see cref="TAbstractStreamScriptObject"/>
public override EProcessingState connectToDevice(IKernelAccess Kernel, TDeviceSchemaInfo DeviceInfo, IDataStream DataStream)
{
EProcessingState eRetval = EProcessingState.Good;
try
{
StringBuilder Builder = new StringBuilder();
Builder.AppendLine("ADD");
foreach (INode Node in Kernel.NodeSpace.queryNodes(n => n.hasProperty("DeviceId", DeviceInfo.Id)))
{
string strAddress = Node.getProperty("Address", "");
Logger.writeInfo($"Node found: {Node.Name}");
if (strAddress != "")
{
Builder.AppendLine($"{Node.GlobalId}={strAddress}");
}
else
{
Logger.writeDebug($"Node '{Node.Name}' has no address. Ignored.");
}
}
writeData(DataStream, Builder.ToString());
string strText = readData(DataStream);
Logger.writeInfo($"Items added. Retval={strText}.");
}
catch (Exception Exc)
{
eRetval = EProcessingState.BadConnection;
}
return eRetval;
}

///<see cref="TAbstractStreamScriptObject"/>
public override void handleStream(IKernelAccess Kernel, TDeviceSchemaInfo DeviceInfo, IDataStream DataStream)
{
writeData(DataStream, "GET");
string strText = readData(DataStream);

string[] astrLines = strText.Split('\n', StringSplitOptions.RemoveEmptyEntries);
foreach (string strLine in astrLines)
{
//Logger.writeInfo(strLine);
try
{
string[] astrItem = strLine.trimWhiteSpaces().Split('=');

if (astrItem.Length == 2)
{
setValue(Kernel.NodeSpace, DeviceInfo, Guid.Parse(astrItem[0].trimWhiteSpaces()), astrItem[1].trimWhiteSpaces());
}
else
{
Logger.writeWarning($"Line ignored: {strLine}");
}
}
catch (Exception Exc)
{
Logger.writeError($"Failed to parse line or set data value: {strLine}");
}
}
}

/// <summary>
/// writes data to the data stream
/// </summary>
private void writeData(IDataStream DataStream, string strData)
{
byte[] aui8Buffer = Encoding.ASCII.GetBytes(strData);
byte[] auiLength = BitConverter.GetBytes((int)aui8Buffer.Length);
DataStream.write(auiLength, 0, auiLength.Length);
DataStream.write(aui8Buffer, 0, aui8Buffer.Length);
DataStream.flush();
}

/// <summary>
/// reads data from the data stream
/// </summary>
private string readData(IDataStream DataStream)
{
string strRetval = "";
byte[] aui8Size = new byte[4];
DataStream.read(aui8Size, 0, 4);
int iStringSize = BitConverter.ToInt32(aui8Size, 0);

if (iStringSize > 0 && iStringSize < 4000000)
{
byte[] aui8Buffer = new byte[iStringSize];
int iDataRead = 0;
while (iDataRead < iStringSize)
{
iDataRead += DataStream.read(aui8Buffer, iDataRead, iStringSize - iDataRead);
}
strRetval = System.Text.Encoding.ASCII.GetString(aui8Buffer);
}
return strRetval;
}

/// <summary>
/// sets a value to a data node
/// </summary>
/// <param name="NodeSpace">nodespace</param>
/// <param name="DeviceInfo">device info model</param>
/// <param name="NodeId"></param>
/// <param name="strValue"></param>
private void setValue(INodeSpace NodeSpace, TDeviceSchemaInfo DeviceInfo, Guid NodeId, string strValue)
{
INode nNode = NodeSpace.queryNode(n => n.hasProperty("DeviceId", DeviceInfo.Id) && n.GlobalId == NodeId);

if (nNode != null && TObject.castAndExecute<IDataNode>(nNode, n =>
{
TSimpleVariant Value = new TSimpleVariant(TValueConverter.convertToObject(n.DataType, strValue));
n.passValue(Value, false);
}))
{
//OK
}
}
}
}

Example: Reading Data over Serial COM

Advanced example reading data from the device and set the corresponding data nodes, and vis versa; getting data from the node space and writing them to the device.

using CyberTech.Threading;
using CyberTech.Diagnostics;
using HumanOS.Kernel;
using HumanOS.Kernel.Communication;
using HumanOS.Kernel.DataModel;
using HumanOS.Kernel.Processing;
using HumanOS.Kernel.Utils;
using HumanOS.Kernel.UHAL;
using HumanOS.Kernel.UHAL.Device;
using HumanOS.Kernel.UHAL.Script;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using HumanOS.Kernel.DataModel.Space;

namespace Example.Payload
{
/// <summary>
/// Implements a test payload processor
/// </summary>
public class ProcessPayload : TAbstractStreamScriptObject
{

///<see cref="TAbstractStreamScriptObject"/>
public override EProcessingState connectToDevice(IKernelAccess Kernel, ILogger Logger, TDeviceInfo DeviceInfo, IDataStream DataStream)
{
return EProcessingState.Good;
}

///<see cref="TAbstractStreamScriptObject"/>
public override void disconnectFromDevice(IKernelAccess Kernel, ILogger Logger, TDeviceInfo DeviceInfo, IDataStream DataStream)
{

}


///<see cref="TAbstractStreamScriptObject"/>
public override void onDataReceived(IKernelAccess Kernel,
ILogger Logger,
TDeviceInfo DeviceInfo,
IDataStream DataStream,
int iBytesToRead)
{
byte[] aui8Buffer = new byte[iBytesToRead];
DataStream.read(aui8Buffer, 0, iBytesToRead);

string strText = System.Text.Encoding.ASCII.GetString(aui8Buffer);
string[] astrParts = strText.Split('@');

/////////////////////////////////////////////////
lock (m_DataLock)
{
int iLastIndex = m_lstContentToProcess.Count - 1;
if (m_lstContentToProcess.Count > 0)
{
m_lstContentToProcess[iLastIndex].Append(astrParts[0]);
}

for (int i = 1; i < astrParts.Length; i++)
{
m_lstContentToProcess.Add(new StringBuilder(astrParts[i]));
}

while (m_lstContentToProcess.Count > 2)
{
m_lstContentToProcess.RemoveAt(0);
}
}
/////////////////////////////////////////////////
}

///<see cref="TAbstractStreamScriptObject"/>
public override void handleStream(IKernelAccess Kernel, ILogger Logger, TDeviceInfo DeviceInfo, IDataStream DataStream)
{
string strData = "";
/////////////////////////////////////////////////
lock (m_DataLock)
{
if (m_lstContentToProcess.Count > 1)
{
strData = m_lstContentToProcess[0].ToString();
m_lstContentToProcess.RemoveAt(0);
}
}
/////////////////////////////////////////////////

if (!strData.isEmpty())
{
try
{
JObject Payload = JObject.Parse(strData);
string strTemperature = (string)Payload["Temperature"];
string strSwitch1 = (string)Payload["Switch1"];
string strSwitch2 = (string)Payload["Switch2"];
string strSwitch3 = (string)Payload["Switch3"];

setValue(Kernel.NodeSpace, Logger, DeviceInfo.DataNodes.First(n => n.Name == "Temperature"), strTemperature);
setValue(Kernel.NodeSpace, Logger, DeviceInfo.DataNodes.First(n => n.Name == "Switch1"), strSwitch1);
setValue(Kernel.NodeSpace, Logger, DeviceInfo.DataNodes.First(n => n.Name == "Switch2"), strSwitch2);
setValue(Kernel.NodeSpace, Logger, DeviceInfo.DataNodes.First(n => n.Name == "Switch3"), strSwitch3);
}
catch (JsonReaderException)
{
Logger.writeDebug($"JSON Error parsing the payload. Dismiss packet.");
}
catch (Exception Exc)
{
Logger.writeError($"Error parsing the payload. {Exc.Message}\n{Exc.StackTrace}");
}
}

StringBuilder DataToWrite = new StringBuilder();

bool bDataToProcess;
/////////////////////////////////////////////////
lock (m_DataLock)
{
bDataToProcess = !m_dicValuesToWrite.isEmpty();
bool bFirst = true;
foreach (KeyValuePair<string, string> Item in m_dicValuesToWrite)
{
if (!bFirst)
{
DataToWrite.Append(",");
}
bFirst = false;
DataToWrite.Append($"\"{Item.Key}\" : {Item.Value}");
}
m_dicValuesToWrite.Clear();
}
/////////////////////////////////////////////////

if (bDataToProcess)
{
//Write data to the device
string strDataToWrite = $"{{{DataToWrite}}}";
byte[] aui8Data = Encoding.ASCII.GetBytes(strDataToWrite);
DataStream.write(aui8Data, 0, aui8Data.Length);
DataStream.flush();
}
}

///<see cref="TAbstractStreamScriptObject"/>
public override void write<T>(IKernelAccess Kernel, ILogger Logger, TDataAccessInfo DataInfo, IDataStream DataStream, T Value)
{
if (Kernel.NodeSpace.tryGetNode<IDataNode<T>>(DataInfo.DataNodeId, out IDataNode<T> DataNode))
{
DataNode.passState(EDataState.Good);
DataNode.passValue(Value, false);

/////////////////////////////////////////////////
lock (m_DataLock)
{
m_dicValuesToWrite[DataNode.Name] = TValueConverter.convertToString(typeof(T), Value);
}
/////////////////////////////////////////////////
}
}

/// <summary>
/// Sets the data node value
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="NodeSpace"></param>
/// <param name="NodeId"></param>
/// <param name="Value"></param>
protected void setValueToDataNode<T>(INodeSpace NodeSpace, ILogger Logger, Guid NodeId, T Value)
{
if (NodeSpace.tryGetNode<IDataNode<T>>(NodeId, out IDataNode<T> DataNode))
{
DataNode.passValue(Value, false);
DataNode.passState(EDataState.Good);
}
else
{
Logger.writeWarning($"Node '{NodeId}' not configured.");
}
}

/// <summary>
/// sets a value to a data node
/// </summary>
/// <param name="NodeSpace"></param>
/// <param name="Logger"></param>
/// <param name="DataInfo"></param>
/// <param name="strValue"></param>
private void setValue(INodeSpace NodeSpace, ILogger Logger, TDataAccessInfo DataInfo, string strValue)
{
MethodInfo Method = this.GetType().GetMethod(nameof(setValueToDataNode), BindingFlags.NonPublic | BindingFlags.Instance).MakeGenericMethod(DataInfo.DataSystemType);
Method.Invoke(this, new object[] { NodeSpace, Logger, DataInfo.DataNodeId, TValueConverter.convertToObject(DataInfo.DataSystemType, strValue) });
}

/// <summary>
/// Data lock
/// </summary>
private readonly object m_DataLock = new object();

/// <summary>
/// read content to process
/// </summary>
private readonly List<StringBuilder> m_lstContentToProcess = new List<StringBuilder>();

/// <summary>
/// Write content to process
/// </summary>
private readonly Dictionary<string, string> m_dicValuesToWrite = new Dictionary<string, string>();

}
}