HiveMQ Influxdb Sparkplug extension

I am using the the HiveMQ extension to ingest data to InfluxDB. I have verified the Sparkplug B DDATA messages using Chariot MQTT Client and everything looks fine.

And they can be serialized from the Sparkplug B proto so every thing looks fine. However in the HiveMQ log I get the following exception:


I have found the place for this exception Could not parse MQTT payload to protobuf in the java code. I correctly gets the metrics and generates a list. It also correctly determines that this is a double metric. However, when trying to case to type SettableDoubleGauge the code excepts:

return (SettableDoubleGauge) getMetricRegistry().getMetrics().get(metricName);

The data serialized to Sparkplug B DDATA is:

DDATA: VersionBDataPayload: {“Timestamp”:1743772194044,“Metrics”:[{“Name”:“Sine1/value”,“Alias”:1,“Timestamp”:1743772194042,“IsHistorical”:false,“IsTransient”:false,“IsNull”:false,“Metadata”:null,“Properties”:null,“BytesValue”:“”,“DataSetValue”:{“NumOfColumns”:0,“Columns”:,“Types”:,“Rows”:,“Details”:,“$type”:“DataSet”},“TemplateValue”:null,“ExtensionValue”:null,“Value”:551.60117038362728,“IntValue”:0,“LongValue”:0,“ValueCase”:10,“DataType”:“Double”,“$type”:“Metric”},{“Name”:“bdSeq”,“Alias”:0,“Timestamp”:1743772194044,“IsHistorical”:false,“IsTransient”:false,“IsNull”:false,“Metadata”:null,“Properties”:null,“BytesValue”:“”,“DataSetValue”:{“NumOfColumns”:0,“Columns”:,“Types”:,“Rows”:,“Details”:,“$type”:“DataSet”},“TemplateValue”:null,“ExtensionValue”:null,“Value”:27234,“IntValue”:0,“LongValue”:27234,“ValueCase”:4,“DataType”:“Int64”,“$type”:“Metric”}],“Seq”:0,“Uuid”:“”,“Body”:null,“Details”:,“$type”:“Payload”}

I assume that there is something wrong with the Sparkplug B payload according to the HiveMQ extension. Has anyone experienced a similar issue?

Best regards
Henrik

Hi @hewa

Thanks for sharing the details! Since the HiveMQ extension is throwing a ClassCastException when parsing the payload, it seems likely that the protobuf schema used by the sender may not align with what the extension expects.

Could you clarify which version of the sparkplug_b.proto schema you are using? Would it be possible to share how you are serializing the Sparkplug B payload?

Best regards,
Dasha from The HiveMQ Team

Hi Daria

Thanks for your fast response.

Yes, I agree that is the most likely cause. I have setup Chariot MQTT client and it parses the same MQTT messages (received from HiveMQ) as SparkplugB:

Also tries without using Alias.

The version is SparkplugB 3.0.0

Here is the class used for serialization:

//
// This file was generated by a tool; you should avoid making direct changes.
// Consider using ‘partial classes’ to extend these types
// Input: my.proto
//

#region Designer generated code
#pragma warning disable CS0612, CS0618, CS1591, CS3021, IDE0079, IDE1006, RCS1036, RCS1057, RCS1085, RCS1192
#nullable enable
namespace SparkplugNet.VersionB.ProtoBuf
{
[global::ProtoBuf.ProtoContract()]
internal partial class ProtoBufPayload : global::ProtoBuf.IExtensible
{
private global::ProtoBuf.IExtension? __pbn__extensionData;
global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
=> global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);

    [global::ProtoBuf.ProtoMember(1, Name = @"timestamp")]
    public ulong Timestamp { get; set; }

    [global::ProtoBuf.ProtoMember(2, Name = @"metrics")]
    public global::System.Collections.Generic.List<Metric> Metrics { get; set; } = new global::System.Collections.Generic.List<Metric>();

    // Begin FLS: 14072023/1. IsRequired = true added for compliance with Ignition
    [global::ProtoBuf.ProtoMember(3, Name = @"seq", IsRequired = true)]
    // End FLS: 14072023/1.
    public ulong Seq { get; set; }

    [global::ProtoBuf.ProtoMember(4, Name = @"uuid")]
    [global::System.ComponentModel.DefaultValue("")]
    public string Uuid { get; set; } = string.Empty;

    [global::ProtoBuf.ProtoMember(5, Name = @"body")]
    public byte[]? Body { get; set; } = Array.Empty<byte>();

    [global::ProtoBuf.ProtoMember(6, Name = @"details")]
    public global::System.Collections.Generic.List<byte> Details { get; set; } = new global::System.Collections.Generic.List<byte>();

    [global::ProtoBuf.ProtoContract()]
    internal partial class Template : global::ProtoBuf.IExtensible
    {
        private global::ProtoBuf.IExtension? __pbn__extensionData;
        global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
            => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);

        [global::ProtoBuf.ProtoMember(1, Name = @"version")]
        [global::System.ComponentModel.DefaultValue("")]
        public string Version { get; set; } = string.Empty;

        [global::ProtoBuf.ProtoMember(2, Name = @"metrics")]
        public global::System.Collections.Generic.List<Metric> Metrics { get; set; } = new global::System.Collections.Generic.List<Metric>();

        [global::ProtoBuf.ProtoMember(3, Name = @"parameters")]
        public global::System.Collections.Generic.List<Parameter> Parameters { get; set; } = new global::System.Collections.Generic.List<Parameter>();

        [global::ProtoBuf.ProtoMember(4, Name = @"template_ref")]
        [global::System.ComponentModel.DefaultValue("")]
        public string TemplateRef { get; set; } = string.Empty;

        [global::ProtoBuf.ProtoMember(5, Name = @"is_definition")]
        public bool IsDefinition { get; set; }

        [global::ProtoBuf.ProtoMember(6, Name = @"details")]
        public global::System.Collections.Generic.List<byte> Details { get; set; } = new global::System.Collections.Generic.List<byte>();

        [global::ProtoBuf.ProtoContract()]
        internal partial class Parameter : global::ProtoBuf.IExtensible
        {
            private global::ProtoBuf.IExtension? __pbn__extensionData;
            global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
                => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);

            [global::ProtoBuf.ProtoMember(1, Name = @"name")]
            [global::System.ComponentModel.DefaultValue("")]
            public string Name { get; set; } = string.Empty;

            [global::ProtoBuf.ProtoMember(2, Name = @"type")]
            public uint Type { get; set; }

            [global::ProtoBuf.ProtoMember(3, Name = @"int_value")]
            public uint IntValue
            {
                get => __pbn__value.Is(3) ? __pbn__value.UInt32 : default;
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(3, value);
            }
            public bool ShouldSerializeIntValue() => __pbn__value.Is(3);
            public void ResetIntValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 3);

            private global::ProtoBuf.DiscriminatedUnion64Object __pbn__value;

            [global::ProtoBuf.ProtoMember(4, Name = @"long_value")]
            public ulong LongValue
            {
                get => __pbn__value.Is(4) ? __pbn__value.UInt64 : default;
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(4, value);
            }
            public bool ShouldSerializeLongValue() => __pbn__value.Is(4);
            public void ResetLongValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 4);

            [global::ProtoBuf.ProtoMember(5, Name = @"float_value")]
            public float FloatValue
            {
                get => __pbn__value.Is(5) ? __pbn__value.Single : default;
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(5, value);
            }
            public bool ShouldSerializeFloatValue() => __pbn__value.Is(5);
            public void ResetFloatValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 5);

            [global::ProtoBuf.ProtoMember(6, Name = @"double_value")]
            public double DoubleValue
            {
                get => __pbn__value.Is(6) ? __pbn__value.Double : default;
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(6, value);
            }
            public bool ShouldSerializeDoubleValue() => __pbn__value.Is(6);
            public void ResetDoubleValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 6);

            [global::ProtoBuf.ProtoMember(7, Name = @"boolean_value")]
            public bool BooleanValue
            {
                get => __pbn__value.Is(7) ? __pbn__value.Boolean : default;
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(7, value);
            }
            public bool ShouldSerializeBooleanValue() => __pbn__value.Is(7);
            public void ResetBooleanValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 7);

            [global::ProtoBuf.ProtoMember(8, Name = @"string_value")]
            [global::System.ComponentModel.DefaultValue("")]
            public string StringValue
            {
                get => __pbn__value.Is(8) ? ((string)__pbn__value.Object) : "";
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(8, value);
            }
            public bool ShouldSerializeStringValue() => __pbn__value.Is(8);
            public void ResetStringValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 8);

            [global::ProtoBuf.ProtoMember(9, Name = @"extension_value")]
            public ParameterValueExtension ExtensionValue
            {
                get => __pbn__value.Is(9) ? ((ParameterValueExtension)__pbn__value.Object) : new();
                set => __pbn__value = new global::ProtoBuf.DiscriminatedUnion64Object(9, value);
            }
            public bool ShouldSerializeExtensionValue() => __pbn__value.Is(9);
            public void ResetExtensionValue() => global::ProtoBuf.DiscriminatedUnion64Object.Reset(ref __pbn__value, 9);

            public ValueOneofCase ValueCase => (ValueOneofCase)__pbn__value.Discriminator;

            public enum ValueOneofCase
            {
                None = 0,
                IntValue = 3,
                LongValue = 4,
                FloatValue = 5,
                DoubleValue = 6,
                BooleanValue = 7,
                StringValue = 8,
                ExtensionValue = 9,
            }

            [global::ProtoBuf.ProtoContract()]
            internal partial class ParameterValueExtension : global::ProtoBuf.IExtensible
            {
                private global::ProtoBuf.IExtension? __pbn__extensionData;
                global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
                    => global::ProtoBuf.Extensible.GetExtensionObject(ref __pbn__extensionData, createIfMissing);

                [global::ProtoBuf.ProtoMember(1, Name = @"extensions")]
                public global::System.Collections.Generic.List<byte> Extensions { get; set; } = new global::System.Collections.Generic.List<byte>();

            }

        }

    }

We are using SparkplugNet but had to compile our own version to be compatible with Ignition. We made the following correction:

    // Begin FLS: 14072023/1. IsRequired = true added for compliance with Ignition
    [global::ProtoBuf.ProtoMember(3, Name = @"seq", IsRequired = true)]
    // End FLS: 14072023/1.
    public ulong Seq { get; set; }

I have found the underlying proto file, but I cant attach it:
// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others
// *
// * This program and the accompanying materials are made available under the
// * terms of the Eclipse Public License 2.0 which is available at
// * Eclipse Public License 2.0 (EPL) | The Eclipse Foundation.
// *
// * SPDX-License-Identifier: EPL-2.0
// *
// * Contributors:
// * Cirrus Link Solutions - initial implementation
//
// To compile:
// cd client_libraries/c_sharp
// protoc --proto_path=../../ --csharp_out=src --csharp_opt=base_namespace=Org.Eclipse.Tahu.Protobuf ../../sparkplug_b/sparkplug_b_c_sharp.proto
//

syntax = “proto3”;

import “google/protobuf/any.proto”;

package org.eclipse.tahu.protobuf;

option java_package = “org.eclipse.tahu.protobuf”;
option java_outer_classname = “SparkplugBProto”;

message Payload {
/*
// Indexes of Data Types
// Unknown placeholder for future expansion.
Unknown = 0;
// Basic Types
Int8 = 1;
Int16 = 2;
Int32 = 3;
Int64 = 4;
UInt8 = 5;
UInt16 = 6;
UInt32 = 7;
UInt64 = 8;
Float = 9;
Double = 10;
Boolean = 11;
String = 12;
DateTime = 13;
Text = 14;
// Additional Metric Types
UUID = 15;
DataSet = 16;
Bytes = 17;
File = 18;
Template = 19;
// Additional PropertyValue Types
PropertySet = 20;
PropertySetList = 21;
*/

message Template {

    message Parameter {
        string name        = 1;
        uint32 type        = 2;

        oneof value {
            uint32 int_value        = 3;
            uint64 long_value       = 4;
            float  float_value      = 5;
            double double_value     = 6;
            bool   boolean_value    = 7;
            string string_value     = 8;
            ParameterValueExtension extension_value = 9;
        }

        message ParameterValueExtension {
            repeated google.protobuf.Any extensions = 1;
        }
    }

    string version                       = 1;          // The version of the Template to prevent mismatches
    repeated Metric metrics              = 2;          // Each metric is the name of the metric and the datatype of the member but does not contain a value
    repeated Parameter parameters        = 3;
    string template_ref                  = 4;          // Reference to a template if this is extending a Template or an instance - must exist if an instance
    bool is_definition                   = 5;
    repeated google.protobuf.Any details = 6;
}

message DataSet {

    message DataSetValue {

        oneof value {
            uint32 int_value                        = 1;
            uint64 long_value                       = 2;
            float  float_value                      = 3;
            double double_value                     = 4;
            bool   boolean_value                    = 5;
            string string_value                     = 6;
            DataSetValueExtension extension_value   = 7;
        }

        message DataSetValueExtension {
            repeated google.protobuf.Any details = 1;
        }
    }

    message Row {
        repeated DataSetValue elements  = 1;
        repeated google.protobuf.Any details = 2;
    }

    uint64 num_of_columns                = 1;
    repeated string   columns            = 2;
    repeated uint32   types              = 3;
    repeated Row      rows               = 4;
    repeated google.protobuf.Any details = 5;
}

message PropertyValue {

    uint32     type                   = 1;
    bool       is_null                = 2;

    oneof value {
        uint32          int_value              = 3;
        uint64          long_value             = 4;
        float           float_value            = 5;
        double          double_value           = 6;
        bool            boolean_value          = 7;
        string          string_value           = 8;
        PropertySet     propertyset_value      = 9;
        PropertySetList propertysets_value     = 10;      // List of Property Values
        PropertyValueExtension extension_value = 11;
    }

    message PropertyValueExtension {
        repeated google.protobuf.Any details = 1;
    }
}

message PropertySet {
    repeated string        keys     = 1;         // Names of the properties
    repeated PropertyValue values   = 2;
    repeated google.protobuf.Any details = 3;
}

message PropertySetList {
    repeated PropertySet propertyset = 1;
    repeated google.protobuf.Any details = 2;
}

message MetaData {
    // Bytes specific metadata
    bool   is_multi_part   = 1;

    // General metadata
    string content_type    = 2;        // Content/Media type
    uint64 size            = 3;        // File size, String size, Multi-part size, etc
    uint64 seq             = 4;        // Sequence number for multi-part messages

    // File metadata
    string file_name       = 5;        // File name
    string file_type       = 6;        // File type (i.e. xml, json, txt, cpp, etc)
    string md5             = 7;        // md5 of data

    // Catchalls and future expansion
    string description     = 8;        // Could be anything such as json or xml of custom properties
    repeated google.protobuf.Any details = 9;
}

message Metric {

    string   name          = 1;        // Metric name - should only be included on birth
    uint64   alias         = 2;        // Metric alias - tied to name on birth and included in all later DATA messages
    uint64   timestamp     = 3;        // Timestamp associated with data acquisition time
    uint32   datatype      = 4;        // DataType of the metric/tag value
    bool     is_historical = 5;        // If this is historical data and should not update real time tag
    bool     is_transient  = 6;        // Tells consuming clients such as MQTT Engine to not store this as a tag
    bool     is_null       = 7;        // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
    MetaData metadata      = 8;        // Metadata for the payload
    PropertySet properties = 9;

    oneof value {
        uint32   int_value                      = 10;
        uint64   long_value                     = 11;
        float    float_value                    = 12;
        double   double_value                   = 13;
        bool     boolean_value                  = 14;
        string   string_value                   = 15;
        bytes    bytes_value                    = 16;       // Bytes, File
        DataSet  dataset_value                  = 17;
        Template template_value                 = 18;
        MetricValueExtension extension_value    = 19;
    }

    message MetricValueExtension {
        repeated google.protobuf.Any details = 1;
    }
}

uint64   timestamp      = 1;        // Timestamp at message sending time
repeated Metric metrics = 2;        // Repeated forever - no limit in Google Protobufs
uint64   seq            = 3;        // Sequence number
string   uuid           = 4;        // UUID to track message type in terms of schema definitions
bytes    body           = 5;        // To optionally bypass the whole definition above
repeated google.protobuf.Any details = 6;

}

We have also passed the SparkplugB 3.0.0 Node certification tests.

Best regards
Henrik

Something happend with the links. Hopes it makes sence. Adding results from Chariot:

I did some further investigation:
if (payload.isPresent() && topicStructure.isValid(sparkplugVersion)) {
//it is a sparkplug publish
final ByteBuffer byteBuffer = payload.get();
try {
final SparkplugBProto.Payload spPayload = SparkplugBProto.Payload.parseFrom(byteBuffer);
final List<SparkplugBProto.Payload.Metric> metricsList = spPayload.getMetricsList();
for (final SparkplugBProto.Payload.Metric metric : metricsList) {
aliasToMetric.put(metric.getAlias(), metric.getName());
if (log.isTraceEnabled()) {
log.trace(“Add Metric Mapping (Alias={}, MetricName={})”, metric.getAlias(), metric.getName());
}
}
generateMetricsFromMessage(topicStructure, metricsList);

        } catch (final Exception e) {
            log.error("Could not parse MQTT payload to protobuf", e);
        }
    } else {
        if (log.isTraceEnabled()) {
            log.trace("This might not be a sparkplug topic structure: {}", topicStructure);
        }
    }

Since the exception I get is: Could not parse MQTT payload to protobuf the following must be true:
if (payload.isPresent() && topicStructure.isValid(sparkplugVersion)) {

so “generateMetricsFromMessage” is executed.

Based on the exception log it is also clear that the right metric type is determined:

2025-04-07 08:13:38,037 ERROR - Could not parse MQTT payload to protobuf
2025-04-07 10:13:38 java.lang.ClassCastException: class hmq.lX.m cannot be cast to class com.hivemq.extensions.sparkplug.influxdb.metrics.SettableDoubleGauge (hmq.lX.m is in unnamed module of loader ‘app’; com.hivemq.extensions.sparkplug.influxdb.metrics.SettableDoubleGauge is in unnamed module of loader hmq.js.a @52df6d0f)
2025-04-07 10:13:38 at com.hivemq.extensions.sparkplug.influxdb.metrics.MetricsHolder.getSettableDoubleGauge(MetricsHolder.java:118)
2025-04-07 10:13:38 at com.hivemq.extensions.sparkplug.influxdb.metrics.MetricsHolder.getDeviceInformationMetricsDouble(MetricsHolder.java:65)
2025-04-07 10:13:38 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.generatMetricForEdgesAndDevices(SparkplugBInterceptor.java:152)
2025-04-07 10:13:38 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.generateMetricsFromMessage(SparkplugBInterceptor.java:109)
2025-04-07 10:13:38 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.onInboundPublish(SparkplugBInterceptor.java:86)

How can I enable tracelog so we can get more information?

I managed to enable TRACE level for HIVEMQ:
2025-04-07 12:25:14 2025-04-07 10:25:14,094 TRACE - Client ECS specified a keepAlive value of 15s. Using keepAlive of 15s. The maximum timeout before disconnecting is 22.5s
2025-04-07 12:25:14 2025-04-07 10:25:14,125 TRACE - Received ClientSessionSharedSubscriptionGetRequest request from RFyqK for key ECS.
2025-04-07 12:25:14 2025-04-07 10:25:14,141 TRACE - Received InflightMessagesRequest request from RFyqK for key ECS.
2025-04-07 12:25:14 2025-04-07 10:25:14,143 TRACE - Restarting read operations for MQTT client with id ECS and IP 172.17.0.1
2025-04-07 12:25:14 2025-04-07 10:25:14,157 TRACE - Received NewMessagesRequest request from RFyqK for key ECS.
2025-04-07 12:25:14 2025-04-07 10:25:14,402 TRACE - Checking SUBSCRIBE message of client ‘ECS’ if topics are valid
2025-04-07 12:25:14 2025-04-07 10:25:14,422 TRACE - Adding subscriptions for client [ECS] and topic [spBv1.0/MyPlant/NCMD/ECS] with qos [AT_LEAST_ONCE]
2025-04-07 12:25:14 2025-04-07 10:25:14,422 TRACE - Applied all subscriptions for client [ECS]
2025-04-07 12:25:14 2025-04-07 10:25:14,431 TRACE - Received retained message GET request for topic spBv1.0/MyPlant/NCMD/ECS
2025-04-07 12:25:14 2025-04-07 10:25:14,432 TRACE - Checking SUBSCRIBE message of client ‘ECS’ if topics are valid
2025-04-07 12:25:14 2025-04-07 10:25:14,432 TRACE - Applied all subscriptions for client [ECS]
2025-04-07 12:25:14 2025-04-07 10:25:14,448 TRACE - Adding subscriptions for client [ECS] and topic [spBv1.0/MyPlant/DCMD/ECS/#] with qos [AT_LEAST_ONCE]
2025-04-07 12:25:14 2025-04-07 10:25:14,448 TRACE - Sending retained message with topic [spBv1.0/MyPlant/NCMD/ECS] for client [ECS]
2025-04-07 12:25:14 2025-04-07 10:25:14,449 TRACE - Received retained message GET request for topic spBv1.0/MyPlant/DCMD/ECS/#
2025-04-07 12:25:14 2025-04-07 10:25:14,466 TRACE - Incoming publish from spBv1.0/MyPlant/NBIRTH/ECS
2025-04-07 12:25:14 2025-04-07 10:25:14,581 TRACE - Add Metric Mapping (Alias=0, MetricName=bdSeq)
2025-04-07 12:25:14 2025-04-07 10:25:14,584 TRACE - Add Metric Mapping (Alias=0, MetricName=Node Control/Rebirth)
2025-04-07 12:25:14 2025-04-07 10:25:14,584 TRACE - Sparkplug Message type & structure TopicStructure{namespace=‘spBv1.0’, groupId=‘MyPlant’, messageType=‘NBIRTH’, eonId=‘ECS’, deviceId=‘null’, scadaId=‘null’}
2025-04-07 12:25:14 2025-04-07 10:25:14,596 DEBUG - Register SettableDoubleGauge metric for: sparkplug.ECS.status
2025-04-07 12:25:14 2025-04-07 10:25:14,615 TRACE - Metrics gauge [sparkplug.ECS.status] added
2025-04-07 12:25:14 2025-04-07 10:25:14,616 TRACE - Metrics counter [sparkplug.eons.current.count] added
2025-04-07 12:25:14 2025-04-07 10:25:14,616 TRACE - Incoming publish from spBv1.0/MyPlant/DBIRTH/ECS/Core
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Add Metric Mapping (Alias=0, MetricName=CurrentCluster)
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Add Metric Mapping (Alias=0, MetricName=sine1/value)
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Add Metric Mapping (Alias=0, MetricName=Device Control/Rebirth)
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Add Metric Mapping (Alias=0, MetricName=bdSeq)
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Sparkplug Message type & structure TopicStructure{namespace=‘spBv1.0’, groupId=‘MyPlant’, messageType=‘DBIRTH’, eonId=‘ECS’, deviceId=‘Core’, scadaId=‘null’}
2025-04-07 12:25:14 2025-04-07 10:25:14,617 DEBUG - Register SettableDoubleGauge metric for: sparkplug.ECS.Core.status
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Metrics gauge [sparkplug.ECS.Core.status] added
2025-04-07 12:25:14 2025-04-07 10:25:14,617 TRACE - Metrics counter [sparkplug.devices.current.count] added
2025-04-07 12:25:14 2025-04-07 10:25:14,637 TRACE - No matching normal/shared subscriber or consumer found for PUBLISH with topic ‘spBv1.0/MyPlant/NBIRTH/ECS’
2025-04-07 12:25:14 2025-04-07 10:25:14,642 TRACE - No matching normal/shared subscriber or consumer found for PUBLISH with topic ‘spBv1.0/MyPlant/DBIRTH/ECS/Core’
2025-04-07 12:25:16 2025-04-07 10:25:16,112 TRACE - Incoming publish from spBv1.0/MyPlant/DDATA/ECS/Core
2025-04-07 12:25:16 2025-04-07 10:25:16,115 TRACE - Add Metric Mapping (Alias=0, MetricName=sine1/value)
2025-04-07 12:25:16 2025-04-07 10:25:16,118 TRACE - Add Metric Mapping (Alias=0, MetricName=bdSeq)
2025-04-07 12:25:16 2025-04-07 10:25:16,120 TRACE - Sparkplug Message type & structure TopicStructure{namespace=‘spBv1.0’, groupId=‘MyPlant’, messageType=‘DDATA’, eonId=‘ECS’, deviceId=‘Core’, scadaId=‘null’}
2025-04-07 12:25:16 2025-04-07 10:25:16,124 DEBUG - Register SettableDoubleGauge metric for: sparkplug.ECS.Core.bdSeq
2025-04-07 12:25:16 2025-04-07 10:25:16,124 TRACE - Metrics gauge [sparkplug.ECS.Core.bdSeq] added
2025-04-07 12:25:16 2025-04-07 10:25:16,132 ERROR - Could not parse MQTT payload to protobuf
2025-04-07 12:25:16 java.lang.ClassCastException: class hmq.lX.m cannot be cast to class com.hivemq.extensions.sparkplug.influxdb.metrics.SettableLongGauge (hmq.lX.m is in unnamed module of loader ‘app’; com.hivemq.extensions.sparkplug.influxdb.metrics.SettableLongGauge is in unnamed module of loader hmq.js.a @46f597f1)
2025-04-07 12:25:16 at com.hivemq.extensions.sparkplug.influxdb.metrics.MetricsHolder.getDeviceInformationMetricsLong(MetricsHolder.java:84)
2025-04-07 12:25:16 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.generatMetricForEdgesAndDevices(SparkplugBInterceptor.java:150)
2025-04-07 12:25:16 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.generateMetricsFromMessage(SparkplugBInterceptor.java:109)
2025-04-07 12:25:16 at com.hivemq.extensions.sparkplug.influxdb.SparkplugBInterceptor.onInboundPublish(SparkplugBInterceptor.java:86)

I use the docker file provided to build the extension image:

ARG HIVEMQ_VERSION=4.5.3

FROM hivemq/hivemq4@sha256:47d553afb99c674c3acee23fadbfd425957dcf5daf63f965d5c874b7db890bb2

Add extensions

COPY hivemq-sparkplug-extension /opt/hivemq/extensions/hivemq-sparkplug-extension

docker build --tag hivemqspb .

And I run the image as follows:

docker run --ulimit nofile=500000:500000 --name hivemqspb --env HIVEMQ_LOG_LEVEL=TRACE -d -p 8080:8080 -p 8000:8000 -p 1883:1883 hivemqspb

Th Sparkplug is valid. The first DDATA message get correctly send to InfluxDB:
2025-04-08 12:33:52 2025-04-08 10:33:52,067 TRACE - Incoming publish from spBv1.0/MyPlant/DDATA/ECS/Core
2025-04-08 12:33:52 2025-04-08 10:33:52,067 TRACE - Add Metric Mapping (Alias=3, MetricName=sine1/value)
2025-04-08 12:33:52 2025-04-08 10:33:52,067 TRACE - Sparkplug Message type & structure TopicStructure{namespace=‘spBv1.0’, groupId=‘MyPlant’, messageType=‘DDATA’, eonId=‘ECS’, deviceId=‘Core’, scadaId=‘null’}
2025-04-08 12:33:52 2025-04-08 10:33:52,067 DEBUG - Register SettableDoubleGauge metric for: sparkplug.ECS.Core.sine1/value
2025-04-08 12:33:52 2025-04-08 10:33:52,067 TRACE - Metrics gauge [sparkplug.ECS.Core.sine1/value] added
2025-04-08 12:33:52 2025-04-08 10:33:52,070 TRACE - No matching normal/shared subscriber or consumer found for PUBLISH with topic ‘spBv1.0/MyPlant/DDATA/ECS/Core’
Since it is the first time a SettableDoubleGauge is added to the Metric registry, hence the log statement: Register SettableDoubleGauge metric for: sparkplug.ECS.Core.sine1/value

private @NotNull SettableDoubleGauge getSettableDoubleGauge(@NotNull String metricName) {
if (getMetricRegistry().getMetrics().containsKey(metricName)) {
return (SettableDoubleGauge) getMetricRegistry().getMetrics().get(metricName);
}
log.debug("Register SettableDoubleGauge metric for: {} ", metricName);
return getMetricRegistry().register(metricName, new SettableDoubleGauge());
}

The second time the SettableDoubleGauge has already been registered for sparkplug.ECS.Core.sine1/value and hence it is retrieved from the registry. However, the casting fails. There is only one metric and the name and type is correct. I suspect some Java mismatch, but I am not a Java expert and could really use some help.

2025-04-08 12:33:52 2025-04-08 10:33:52,132 TRACE - Incoming publish from spBv1.0/MyPlant/DDATA/ECS/Core
2025-04-08 12:33:52 2025-04-08 10:33:52,133 TRACE - Add Metric Mapping (Alias=3, MetricName=sine1/value)
2025-04-08 12:33:52 2025-04-08 10:33:52,133 TRACE - Sparkplug Message type & structure TopicStructure{namespace=‘spBv1.0’, groupId=‘MyPlant’, messageType=‘DDATA’, eonId=‘ECS’, deviceId=‘Core’, scadaId=‘null’}
2025-04-08 12:33:52 2025-04-08 10:33:52,134 ERROR - Could not parse MQTT payload to protobuf
2025-04-08 12:33:52 java.lang.ClassCastException: class hmq.lX.m cannot be cast to class com.hivemq.extensions.sparkplug.metrics.SettableDoubleGauge (hmq.lX.m is in unnamed module of loader ‘app’; com.hivemq.extensions.sparkplug.metrics.SettableDoubleGauge is in unnamed module of loader hmq.js.a @f5a680)
2025-04-08 12:33:52 at com.hivemq.extensions.sparkplug.metrics.MetricsHolder.getSettableDoubleGauge(MetricsHolder.java:106)
2025-04-08 12:33:52 at com.hivemq.extensions.sparkplug.metrics.MetricsHolder.getDeviceDataMetrics(MetricsHolder.java:101)
2025-04-08 12:33:52 at com.hivemq.extensions.sparkplug.SparkplugBInterceptor.generatMetricForEdgesAndDevices(SparkplugBInterceptor.java:145)
2025-04-08 12:33:52 at com.hivemq.extensions.sparkplug.SparkplugBInterceptor.generateMetricsFromMessage(SparkplugBInterceptor.java:101)
2025-04-08 12:33:52 at com.hivemq.extensions.sparkplug.SparkplugBInterceptor.onInboundPublish(SparkplugBInterceptor.java:81)
2025-04-08 12:33:52 at hmq.jE.f$b.a(Source.java:257)
2025-04-08 12:33:52 at hmq.jE.f$b.apply(Source.java:223)
2025-04-08 12:33:52 at hmq.jD.q$b.a(Source.java:363)
2025-04-08 12:33:52 at hmq.jD.q$b.c(Source.java:331)
2025-04-08 12:33:52 at hmq.jD.q$b.b(Source.java:280)
2025-04-08 12:33:52 at hmq.jD.q$b.run(Source.java:193)
2025-04-08 12:33:52 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
2025-04-08 12:33:52 at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
2025-04-08 12:33:52 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
2025-04-08 12:33:52 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
2025-04-08 12:33:52 at java.base/java.lang.Thread.run(Unknown Source)

Hi @hewa,
If you are unable to attach the .proto file directly, please feel free to share a link to it instead. Alternatively, you may upload the file to a file-sharing service and provide the link here.

Thank you, and looking forward to your update.
Best
Dasha from The HiveMQ Team

You will find the proto file above in the thread, but not a s a linked file. Sharing a link is not possible for me. Anyway, if you go through the correspondence I have provided you will find that this has nothing to do with the proto. From the call stack it is evident that the sparkplug proto gets parsed correctly. It is the caching of Java classes that fails (SettableDoubleGauge). I think it is a problem with this image of hivemq4 which I have pulled from Docker Hub according to your documentation. I have the same SparkplugB node publisher working with Chariot-MQTT, Datahub and Rocworks/AutomationGataway. Both Datahub and AutomationGateway are injecting the same SparkplugB data as tested here into InfluxDB V2 without any issues.
Best regards
Henrik

Hey @hewa

The admin fixed your ability to insert links. Please feel free to insert again. Here is what they wrote:

I’ve updated the site settings to allow link posting. Since they’re a level 0 (new) user, some restrictions still apply. They can try wrapping the link in backticks like this: ` https://…/sparkplug_b_c_sharp.proto ``

Looking forward to your .proto file and a payload json
Best
Dasha