Add possibility to specify measurement per register (#7231)

This commit is contained in:
Sven Rebhan 2020-03-30 22:30:42 +02:00 committed by GitHub
parent 9a1c26d6cc
commit 3650d74de2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 30 deletions

View File

@ -52,6 +52,7 @@ The Modbus plugin collects Discrete Inputs, Coils, Input Registers and Holding R
] ]
## Analog Variables, Input Registers and Holding Registers ## Analog Variables, Input Registers and Holding Registers
## measurement - the (optional) measurement name, defaults to "modbus"
## name - the variable name ## name - the variable name
## byte_order - the ordering of bytes ## byte_order - the ordering of bytes
## |---AB, ABCD - Big Endian ## |---AB, ABCD - Big Endian

View File

@ -7,10 +7,12 @@ import (
"net" "net"
"net/url" "net/url"
"sort" "sort"
"time"
mb "github.com/goburrow/modbus" mb "github.com/goburrow/modbus"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -44,12 +46,13 @@ type register struct {
} }
type fieldContainer struct { type fieldContainer struct {
Name string `toml:"name"` Measurement string `toml:"measurement"`
ByteOrder string `toml:"byte_order"` Name string `toml:"name"`
DataType string `toml:"data_type"` ByteOrder string `toml:"byte_order"`
Scale float64 `toml:"scale"` DataType string `toml:"data_type"`
Address []uint16 `toml:"address"` Scale float64 `toml:"scale"`
value interface{} Address []uint16 `toml:"address"`
value interface{}
} }
type registerRange struct { type registerRange struct {
@ -97,14 +100,15 @@ const sampleConfig = `
## ##
## Digital Variables, Discrete Inputs and Coils ## Digital Variables, Discrete Inputs and Coils
## name - the variable name ## measurement - the (optional) measurement name, defaults to "modbus"
## address - variable address ## name - the variable name
## address - variable address
discrete_inputs = [ discrete_inputs = [
{ name = "start", address = [0]}, { name = "start", address = [0]},
{ name = "stop", address = [1]}, { name = "stop", address = [1]},
{ name = "reset", address = [2]}, { name = "reset", address = [2]},
{ name = "emergency_stop", address = [3]}, { name = "emergency_stop", address = [3]},
] ]
coils = [ coils = [
{ name = "motor1_run", address = [0]}, { name = "motor1_run", address = [0]},
@ -113,8 +117,9 @@ const sampleConfig = `
] ]
## Analog Variables, Input Registers and Holding Registers ## Analog Variables, Input Registers and Holding Registers
## name - the variable name ## measurement - the (optional) measurement name, defaults to "modbus"
## byte_order - the ordering of bytes ## name - the variable name
## byte_order - the ordering of bytes
## |---AB, ABCD - Big Endian ## |---AB, ABCD - Big Endian
## |---BA, DCBA - Little Endian ## |---BA, DCBA - Little Endian
## |---BADC - Mid-Big Endian ## |---BADC - Mid-Big Endian
@ -134,7 +139,7 @@ const sampleConfig = `
input_registers = [ input_registers = [
{ name = "tank_level", byte_order = "AB", data_type = "INT16", scale=1.0, address = [0]}, { name = "tank_level", byte_order = "AB", data_type = "INT16", scale=1.0, address = [0]},
{ name = "tank_ph", byte_order = "AB", data_type = "INT16", scale=1.0, address = [1]}, { name = "tank_ph", byte_order = "AB", data_type = "INT16", scale=1.0, address = [1]},
{ name = "pump1_speed", byte_order = "ABCD", data_type = "INT32", scale=1.0, address = [3,4]}, { name = "pump1_speed", byte_order = "ABCD", data_type = "INT32", scale=1.0, address = [3,4]},
] ]
` `
@ -319,10 +324,11 @@ func validateFieldContainers(t []fieldContainer, n string) error {
} }
//search name duplicate //search name duplicate
if nameEncountered[item.Name] { canonical_name := item.Measurement + "." + item.Name
return fmt.Errorf("name '%s' is duplicated in '%s' - '%s'", item.Name, n, item.Name) if nameEncountered[canonical_name] {
return fmt.Errorf("name '%s' is duplicated in measurement '%s' '%s' - '%s'", item.Name, item.Measurement, n, item.Name)
} else { } else {
nameEncountered[item.Name] = true nameEncountered[canonical_name] = true
} }
if n == cInputRegisters || n == cHoldingRegisters { if n == cInputRegisters || n == cHoldingRegisters {
@ -635,6 +641,7 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
} }
} }
timestamp := time.Now()
err := m.getFields() err := m.getFields()
if err != nil { if err != nil {
disconnect(m) disconnect(m)
@ -642,18 +649,28 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
return err return err
} }
grouper := metric.NewSeriesGrouper()
for _, reg := range m.registers { for _, reg := range m.registers {
fields := make(map[string]interface{})
tags := map[string]string{ tags := map[string]string{
"name": m.Name, "name": m.Name,
"type": reg.Type, "type": reg.Type,
} }
for _, field := range reg.Fields { for _, field := range reg.Fields {
fields[field.Name] = field.value // In case no measurement was specified we use "modbus" as default
measurement := "modbus"
if field.Measurement != "" {
measurement = field.Measurement
}
// Group the data by series
grouper.Add(measurement, tags, timestamp, field.Name, field.value)
} }
acc.AddFields("modbus", fields, tags) // Add the metrics grouped by series to the accumulator
for _, metric := range grouper.Metrics() {
acc.AddMetric(metric)
}
} }
return nil return nil