Example integration implementation
About the example
note
This is just an example of how to create an ISO19848 adapter for your timeseries data and alarms. The actual implementation may vary depending on the use case.
This example assumes that the DataChannelList have already been sent, which is required in order for the Vista Gateway to know which data to expect.
Schema references:
Example data assumptions:
- SparkplugB protocol
- Timeseries resolution: every 15 seconds
- Batch size: 4 messages
- Batching interval: 1 minute
- Alarms: 2 alarms were active
Example payload from a made up system
Made up SparkplugB topic structure:
spBv1.0/ShipFleet/VESSEL_DATA/IMO1234567/MainEngine
[
{
"timestamp": 1708190400000,
"metrics": [
{
"name": "RPM_MainEngine",
"alias": 1,
"timestamp": 1708190400000,
"dataType": "Float",
"value": 1500.0
},
{
"name": "FuelRate",
"alias": 2,
"timestamp": 1708190400000,
"dataType": "Float",
"value": 250.5
},
{
"name": "OilPressure",
"alias": 3,
"timestamp": 1708190400000,
"dataType": "Float",
"value": 5.6
},
{
"name": "Alarm_HighTemp",
"alias": 4,
"timestamp": 1708190400000,
"dataType": "String",
"value": "ACTIVE"
}
],
"seq": 0
},
{
"timestamp": 1708190415000,
"metrics": [
{
"name": "RPM_MainEngine",
"alias": 1,
"timestamp": 1708190415000,
"dataType": "Float",
"value": 1498.5
},
{
"name": "FuelRate",
"alias": 2,
"timestamp": 1708190415000,
"dataType": "Float",
"value": 249.8
},
{
"name": "OilPressure",
"alias": 3,
"timestamp": 1708190415000,
"dataType": "Float",
"value": 5.7
},
{
"name": "Alarm_HighTemp",
"alias": 4,
"timestamp": 1708190415000,
"dataType": "String",
"value": "INACTIVE"
}
],
"seq": 1
},
{
"timestamp": 1708190430000,
"metrics": [
{
"name": "RPM_MainEngine",
"alias": 1,
"timestamp": 1708190430000,
"dataType": "Float",
"value": 1502.3
},
{
"name": "FuelRate",
"alias": 2,
"timestamp": 1708190430000,
"dataType": "Float",
"value": 248.9
},
{
"name": "OilPressure",
"alias": 3,
"timestamp": 1708190430000,
"dataType": "Float",
"value": 5.6
},
{
"name": "Alarm_HighTemp",
"alias": 4,
"timestamp": 1708190430000,
"dataType": "String",
"value": "ACTIVE"
}
],
"seq": 2
},
{
"timestamp": 1708190445000,
"metrics": [
{
"name": "RPM_MainEngine",
"alias": 1,
"timestamp": 1708190445000,
"dataType": "Float",
"value": 1501.2
},
{
"name": "FuelRate",
"alias": 2,
"timestamp": 1708190445000,
"dataType": "Float",
"value": 251.0
},
{
"name": "OilPressure",
"alias": 3,
"timestamp": 1708190445000,
"dataType": "Float",
"value": 5.5
},
{
"name": "Alarm_HighTemp",
"alias": 4,
"timestamp": 1708190445000,
"dataType": "String",
"value": "INACTIVE"
}
],
"seq": 3
}
]
1. Load the DataChannelList
For the example, assume this is just loaded in memory from a file
.NET
using Vista.SDK.Transport.Json;
using Vista.SDK.Transport.Json.DataChannel;
using Domain = Vista.SDK.Transport.DataChannel;
namespace Vista.Gateway.IngestExample;
public sealed class Iso19848DataChannelList
{
// Singleton instance, if using caching etc
public static readonly Iso19848DataChannelList Instance = new();
public Domain.DataChannelListPackage GetDataChannelListPackage(string vesselId) => LoadJsonFile(vesselId);
private static Domain.DataChannelListPackage LoadJsonFile(string vesselId)
{
using var reader = new FileStream(GetFilePath(vesselId), FileMode.Open, FileAccess.Read, FileShare.Read);
var package = Serializer.DeserializeDataChannelList(reader);
if (package is null)
throw new Exception("Failed to deserialize DataChannelList");
return package.ToDomainModel();
}
// Example path
private static string GetFilePath(string vesselId) =>
$"../resources/{vesselId}/Engine_Sensors_DataChannelList.json";
}
Python
import json
from pathlib import Path
from vista_gateway_client.gateway_package import ISO19848, DataChannel as dc
class Iso19848DataChannelList:
_file_path = Path("../resources/Engine_Sensors_DataChannelList.json") # Example file
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Iso19848DataChannelList, cls).__new__(cls)
return cls._instance
@classmethod
def get_data_channel_list_package(cls, vesselId: str) -> dc.DataChannelListPackage:
return cls._load_json_file(vesselId)
@classmethod
def _load_json_file(cls, vesselId: str) -> dc.DataChannelListPackage:
if not cls._file_path.exists():
raise FileNotFoundError(f"File not found: {cls._file_path}")
with cls._file_path.open("r", encoding="utf-8") as file:
data = json.load(file)
if data is None:
raise ValueError("Failed to deserialize DataChannelList")
return cls
# Singleton instance, if caching etc
instance = Iso19848DataChannelList()
2. Load the data
.NET
using Microsoft.EntityFrameworkCore;
namespace Vista.Gateway.IngestExample;
// Typically not defined here, but for the sake of the example
public sealed record SparkplugBMetric
{
public required string Name { get; set; }
public required int Alias { get; set; }
public required long Timestamp { get; set; }
public required string DataType { get; set; }
public required string Value { get; set; }
}
// Typically not defined here, but for the sake of the example
public sealed record SparkplugBMessage
{
public required long Timestamp { get; set; }
public required List<SparkplugBMetric> Metrics { get; set; }
public required int Seq { get; set; }
}
// EF Core example
public sealed class MessageRepository : DbContext
{
public DbSet<SparkplugBMessage> Messages { get; set; }
public DbSet<SparkplugBMetric> Metrics { get; set; }
/// <summary>
/// This is a really simple example to show how the data can be filtered using the DataChannelList.
/// </summary>
/// <returns>List of messages</returns>
public async ValueTask<List<SparkplugBMessage>> GetDataChannelListMessages(
string vesselId,
CancellationToken cancellationToken
)
{
var dataChanneListPackge = Iso19848DataChannelList.Instance.GetDataChannelListPackage(vesselId);
var mySystemIds = dataChanneListPackge.DataChannelList.Select(dc => dc.DataChannelId.ShortId).ToList();
return await Messages
.AsNoTracking()
.Include(m => m.Metrics.Where(m => mySystemIds.Contains(m.Name)))
.Where(m => m.Timestamp > DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds())
.ToListAsync(cancellationToken);
}
}
Python
from sqlalchemy import create_engine, Column, Integer, String, BigInteger, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from iso19848_data_channel import instance # Import singleton instance for DataChannelList
from datetime import datetime, timedelta
import asyncio
# SQLAlchemy Base
Base = declarative_base()
# Database setup (SQLite for simplicity, change as needed)
DATABASE_URL = "sqlite:///example.db" # Replace with your actual DB connection
engine = create_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
# SparkplugBMetric Model
class SparkplugBMetric(Base):
__tablename__ = "metrics"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
alias = Column(Integer, nullable=False)
timestamp = Column(BigInteger, nullable=False)
data_type = Column(String, nullable=False)
value = Column(String, nullable=False)
message_id = Column(Integer, ForeignKey("messages.id")) # ForeignKey to Message
# SparkplugBMessage Model
class SparkplugBMessage(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
timestamp = Column(BigInteger, nullable=False)
seq = Column(Integer, nullable=False)
metrics = relationship("SparkplugBMetric", backref="message", lazy="joined")
# Message Repository using sqlalchemy
class MessageRepository:
def __init__(self):
self.db = SessionLocal()
async def get_data_channel_list_messages(self, vessel_id):
""" Fetch messages from DB while filtering out only allowed metrics. """
data_channel_list_package = instance.get_data_channel_list_package(vessel_id)
# Extract mySystemIds from the DataChannelList
my_system_ids = {dc["DataChannelId"]["ShortId"] for dc in data_channel_list_package["DataChannelList"]}
# Calculate time filter (last minute)
time_cutoff = int((datetime.utcnow() - timedelta(minutes=1)).timestamp() * 1000)
# Query messages and filter metrics
messages = (
self.db.query(SparkplugBMessage)
.filter(SparkplugBMessage.timestamp > time_cutoff)
.all()
)
# Manually filter metrics (since SQLAlchemy does not support filtering on relationships in the same query)
for message in messages:
message.metrics = [metric for metric in message.metrics if metric.name in my_system_ids]
return messages
# Initialize DB (Run only once)
def initialize_db():
Base.metadata.create_all(bind=engine)
3. Transform the data to ISO19848 TimeSeriesData
.NET
using Vista.SDK.Transport;
using Domain = Vista.SDK.Transport.TimeSeries;
namespace Vista.Gateway.IngestExample;
// Typically not defined here, but for the sake of the example
public static class Iso19848Transformer
{
private const string Author = "Some automated System";
/// <summary>
/// Utilizes the Domain implementation of the TimeSeriesDataPackage, which are implemented according to the ISO 19848 standard
/// </summary>
/// <returns>Return the Domain implementation of the package</returns>
public static Domain.TimeSeriesDataPackage TransformToIso19848(
string vesselId,
// Already filtered messages
List<SparkplugBMessage> sparkplugBMessages
)
{
var dataChannelListPackage = Iso19848DataChannelList.Instance.GetDataChannelListPackage(vesselId);
var shipId = ShipId.Parse(vesselId);
var dataChannelIdMap = new Dictionary<DataChannelId, int>();
// Sensor data is reported as TabularData
var tabularData = new Domain.TabularData { DataChannelIds = [], DataSets = [] };
// Alarms are reported as EventData
var eventData = new Domain.EventData { DataSet = [], };
int index = 0;
// Prepopulate the data channel IDs
foreach (var msg in sparkplugBMessages)
{
foreach (var metric in msg.Metrics)
{
// Add DataChannelIDs datapoints reported as TabularData
// EventData data channels are added to the eventData directly
if (metric.DataType == "Float" || metric.DataType == "Integer")
{
var dataChannelId = DataChannelId.Parse(metric.Name);
if (dataChannelIdMap.TryAdd(dataChannelId, index))
{
tabularData.DataChannelIds.Add(dataChannelId);
index++;
}
}
}
}
// Process each message
foreach (var msg in sparkplugBMessages)
{
var timestamp = ConvertTimestamp(msg.Timestamp);
var dataset = new Domain.TabularDataSet(dataChannelIdMap.Count)
{
TimeStamp = timestamp,
Value = new(dataChannelIdMap.Count),
Quality = null // Can be set to null if quality is not available
};
var alarms = new List<Domain.EventDataSet>();
foreach (var metric in msg.Metrics)
{
var dataChannelId = DataChannelId.Parse(metric.Name);
if (metric.DataType == "Float" || metric.DataType == "Integer")
{
var i = dataChannelIdMap[dataChannelId];
dataset.Value[i] = metric.Value;
}
else if (metric.DataType == "String" && metric.Name.Contains("Alarm"))
{
alarms.Add(
new Domain.EventDataSet
{
TimeStamp = timestamp,
DataChannelId = dataChannelId,
Value = metric.Value,
Quality = null // Can be set to null if quality is not available
}
);
}
}
tabularData.DataSets.Add(dataset);
}
var package = new Domain.TimeSeriesDataPackage
{
Package = new Domain.Package
{
Header = new Domain.Header
{
ShipId = shipId,
TimeSpan = new Domain.TimeSpan
{
Start = ConvertTimestamp(sparkplugBMessages[0].Timestamp),
End = ConvertTimestamp(sparkplugBMessages[^1].Timestamp)
},
DateCreated = DateTime.UtcNow,
DateModified = DateTime.UtcNow,
Author = Author
},
TimeSeriesData =
[
new Domain.TimeSeriesData
{
DataConfiguration = new Domain.ConfigurationReference
{
// Reference to the DataChannelList
Id = dataChannelListPackage.Package.Header.DataChannelListId.Id,
TimeStamp = dataChannelListPackage.Package.Header.DataChannelListId.TimeStamp
},
TabularData = [tabularData],
EventData = eventData
}
]
}
};
return package;
}
private static DateTimeOffset ConvertTimestamp(long timestamp)
{
return DateTimeOffset.FromUnixTimeMilliseconds(timestamp);
}
}
Python
from datetime import datetime
from collections import OrderedDict
from iso19848_data_channel import instance # Import singleton instance for DataChannelList
from vista_gateway_client.gateway_package import ISO19848, TimeSeries as ts
# Function to convert SparkplugB timestamp to ISO8601 format
def convert_timestamp(ms_timestamp):
return datetime.utcfromtimestamp(ms_timestamp / 1000).isoformat() + "Z"
# Function to transform SparkplugB to ISO19848 format
def transform_to_iso19848(ship_id, sparkplugb_messages):
author = "Automated System"
data_channel_list_package = iso19848_data_channel.get_data_channel_list_package(ship_id)
# Maintain order of DataChannelIDs dynamically
data_channel_id_map = OrderedDict()
tabular_data = ts.TabularData(
NumberOfDataSet= 0, # Will be updated later
DataChannelID=[],
DataSet=[]
)
event_data = ts.EventData(
NumberOfDataSet= 0, # Will be updated later
DataSet=[]
)
# Populate DataChannelIDs dynamically
index = 0
for msg in sparkplugb_messages:
for metric in msg["metrics"]:
if metric["dataType"] in ["Float", "Integer"] and metric["name"] not in data_channel_id_map:
data_channel_id_map[metric["name"]] = index
index += 1
tabular_data["DataChannelID"] = list(data_channel_id_map.keys())
# Process each message
for msg in sparkplugb_messages:
timestamp_iso = convert_timestamp(msg["timestamp"])
data_values = {key: "" for key in data_channel_id_map.keys()}
alarms = []
for metric in msg["metrics"]:
if metric["dataType"] in ["Float", "Integer"]:
data_values[metric["name"]] = str(metric["value"])
elif metric["dataType"] == "String" and "Alarm" in metric["name"]:
alarms.append({
"TimeStamp": convert_timestamp(metric["timestamp"]),
"DataChannelID": metric["name"],
"Value": metric["value"]
})
# Append tabular data with consistent order
tabular_data["DataSet"].append({
"TimeStamp": timestamp_iso,
"Value": [data_values[key] for key in data_channel_id_map.keys()]
})
# Append alarm events
event_data["DataSet"].extend(alarms)
event_data["NumberOfDataSet"] = len(event_data["DataSet"])
tabular_data["NumberOfDataSet"] = len(tabular_data["DataSet"])
# Create ISO19848 package
iso19848_data = ts.Package(
Header=ts.Header(
ShipID=ship_id,
TimeSpan=ts.TimeSpan(
Start=convert_timestamp(sparkplugb_messages[0]["timestamp"]),
End=convert_timestamp(sparkplugb_messages[-1]["timestamp"])
),
DateCreated=convert_timestamp(datetime.utcnow().timestamp() * 1000),
DateModified=convert_timestamp(datetime.utcnow().timestamp() * 1000),
Author=author,
SystemConfiguration=[]
),
TimeSeriesData=[
ts.TimeSeriesData(
DataConfiguration=ts.ConfigurationReference(
ID=data_channel_list_package.Header.DataChannelListId.Id,
TimeStamp=data_channel_list_timestamp.Header.DataChannelListId.TimeStamp
),
TabularData=[tabular_data],
EventData=event_data
)
],
)
return iso19848_data
4. Send the ISO19848 TimeSeriesData to the Vista Gateway
.NET
using Vista.Gateway.Client;
namespace Vista.Gateway.IngestExample;
// Make sure this and the IVistaGatewayClient is dependency injected in Program.cs
public class Iso19848Sender(IVistaGatewayClient client)
{
public async Task SendIso19848DataAsync(
string vesselId,
List<SparkplugBMessage> sparkplugBMessages,
string? externalId,
string? correlationId
)
{
var payload = Iso19848Transformer.TransformToIso19848(vesselId, sparkplugBMessages);
try
{
await client.Send(new GatewayPackage.ISO19848.TimeSeriesData(payload, externalId, correlationId));
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
}
Python
# from step 3
from vista_gateway_transformer import transform_to_iso19848
from vista_gateway_client.client import Client
client = Client(
api_key = "<Your API Key>",
protocol = "Http", # "Mqtt" not yet supported in Python client
)
async def main():
repo = MessageRepository()
messages = await repo.get_data_channel_list_messages("IMO1234567")
package = transform_to_iso19848("IMO1234567", messages)
# Create client package
client_package = ISO19848.TimeSeriesData(
package=package,
external_id="<Your External ID>",
correlation_id="<Your Correlation ID>",
user_agent="<Your User Agent>"
)
try:
# Send the package
client.send(client_package)
except VistaGatewayException as e:
# Handle VistaGatewayException
# Run Example
if __name__ == "__main__":
initialize_db()
asyncio.run(main())
Result ISO19848 TimeSeriesData
{
"Package": {
"Header": {
"ShipID": "IMO1234567",
"TimeSpan": {
"Start": "2024-02-17T12:00:00Z",
"End": "2024-02-17T12:01:00Z"
},
"DateCreated": "2024-02-17T12:01:10Z",
"DateModified": "2024-02-17T12:01:10Z",
"Author": "Automated System",
"SystemConfiguration": []
},
"TimeSeriesData": [
{
"DataConfiguration": {
"ID": "Engine_Sensors",
"TimeStamp": "2024-02-17T12:00:00Z"
},
"TabularData": [
{
"NumberOfDataSet": 4,
"NumberOfDataChannel": 3,
"DataChannelID": ["RPM_MainEngine", "FuelRate", "OilPressure"],
"DataSet": [
{
"TimeStamp": "2024-02-17T12:00:00Z",
"Value": ["1500.0", "250.5", "5.6"]
},
{
"TimeStamp": "2024-02-17T12:00:15Z",
"Value": ["1498.5", "249.8", "5.7"]
},
{
"TimeStamp": "2024-02-17T12:00:30Z",
"Value": ["1502.3", "248.9", "5.6"]
},
{
"TimeStamp": "2024-02-17T12:00:45Z",
"Value": ["1501.2", "251.0", "5.5"]
}
]
}
],
"EventData": {
"NumberOfDataSet": 4,
"DataSet": [
{
"TimeStamp": "2024-02-17T12:00:00Z",
"DataChannelID": "Alarm_HighTemp",
"Value": "ACTIVE"
},
{
"TimeStamp": "2024-02-17T12:00:15Z",
"DataChannelID": "Alarm_HighTemp",
"Value": "INACTIVE"
},
{
"TimeStamp": "2024-02-17T12:00:30Z",
"DataChannelID": "Alarm_HighTemp",
"Value": "ACTIVE"
},
{
"TimeStamp": "2024-02-17T12:00:45Z",
"DataChannelID": "Alarm_HighTemp",
"Value": "INACTIVE"
}
]
}
}
]
}
}