From 9389099820a163b49da899f6e8dcf2fd20933300 Mon Sep 17 00:00:00 2001 From: Antonio Garcia Date: Wed, 29 Jan 2020 04:18:58 -0600 Subject: [PATCH] Add modbus input plugin (#6154) --- go.mod | 3 + go.sum | 6 + plugins/inputs/all/all.go | 1 + plugins/inputs/modbus/.README.md.swp | Bin 0 -> 12288 bytes plugins/inputs/modbus/README.md | 84 ++++ plugins/inputs/modbus/modbus.go | 589 +++++++++++++++++++++++++++ plugins/inputs/modbus/modbus_test.go | 376 +++++++++++++++++ 7 files changed, 1059 insertions(+) create mode 100644 plugins/inputs/modbus/.README.md.swp create mode 100644 plugins/inputs/modbus/README.md create mode 100644 plugins/inputs/modbus/modbus.go create mode 100644 plugins/inputs/modbus/modbus_test.go diff --git a/go.mod b/go.mod index 36819f522..fe4c52d0f 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,8 @@ require ( github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-redis/redis v6.12.0+incompatible github.com/go-sql-driver/mysql v1.4.1 + github.com/goburrow/modbus v0.1.0 + github.com/goburrow/serial v0.1.0 // indirect github.com/gobwas/glob v0.2.3 github.com/gofrs/uuid v2.1.0+incompatible github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d @@ -108,6 +110,7 @@ require ( github.com/soniah/gosnmp v1.22.0 github.com/streadway/amqp v0.0.0-20180528204448-e5adc2ada8b8 github.com/stretchr/testify v1.4.0 + github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00 // indirect github.com/tidwall/gjson v1.3.0 github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e // indirect diff --git a/go.sum b/go.sum index ed221dee3..c04272c07 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,10 @@ github.com/go-redis/redis v6.12.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8w github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goburrow/modbus v0.1.0 h1:DejRZY73nEM6+bt5JSP6IsFolJ9dVcqxsYbpLbeW/ro= +github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg= +github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= +github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gofrs/uuid v2.1.0+incompatible h1:8oEj3gioPmmDAOLQUZdnW+h4FZu9aSE/SQIas1E9pzA= @@ -406,6 +410,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 h1:Oj2e7Sae4XrOsk3ij21QjjEgAcVSeo9nkp0dI//cD2o= +github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62/go.mod h1:qUzPVlSj2UgxJkVbH0ZwuuiR46U8RBMDT5KLY78Ifpw= github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00 h1:mujcChM89zOHwgZBBNr5WZ77mBXP1yR+gLThGCYZgAg= github.com/tedsuo/ifrit v0.0.0-20191009134036-9a97d0632f00/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= github.com/tidwall/gjson v1.3.0 h1:kfpsw1W3trbg4Xm6doUtqSl9+LhLB6qJ9PkltVAQZYs= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 6a42b9451..dec04b397 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -90,6 +90,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mesos" _ "github.com/influxdata/telegraf/plugins/inputs/minecraft" + _ "github.com/influxdata/telegraf/plugins/inputs/modbus" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/monit" _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" diff --git a/plugins/inputs/modbus/.README.md.swp b/plugins/inputs/modbus/.README.md.swp new file mode 100644 index 0000000000000000000000000000000000000000..6a1629b1a3978092196c48639609dfc763bac751 GIT binary patch literal 12288 zcmeHN%WoS+7@t0Pw51e*XirEVF-3`F$B)E`lVTyiLes`gWIMT3Azkl|omJPnW_LDi z3KVfba6>{I$_)V$LI{Zi2mS(XaN`VI3J8f_Ir5v?bz*xPrK-39yV9@go%v?I`F`JH zrpmN48_RVnOlKILrx^S6?w_0Ay!RY?AD@n;nVxJvuQ~J-}1Yr?V+#l3oqSsjJEHk8?{ok zUYqV22P%m>Vg_Ocq73+=lPa8EpUGuaswXC%r)QpCkIKa@F#|CJF#|CJF#|CJF#|CJ zF#|CJ|5*k^{|I{v)*THkY=rM4WAE@Ye#H#L48#n?48#n?48#n?48#n?48#n?48#n? z4Ezfj(D0Y^v&R^_bsUSw-~R{y|Nr?6V?O~p&<0k4E5H&k1)K+deUh;|z=yyGz^lM{ z;2iKQ@YiX^egp0T-vOTh9|P|KZD1968OQ(x90!g947m3MV}Ag@1K$F7fL-7Q&<3)= zG2q8jjC~G#27Clu2a3QM;1qBI`2HkgUjT0d*MOIRbHKe5jJ*R~0w#eMfiu9Dk2CfP z@XKT11vm?Q{U~E!0e65I;D<*T+XgtG0d-&nxCopCZi7$BsVRAlpO}IFJp%Zu6E%=AJAPd1;+7v>8Kl+EX7GxH01T!t7t>;;67a81eXP)c1!N_9*N>!e`uloH$%48}2*NF=CsQ|t9DPOH9vQH!dBHW?AU zPPW$LRHTVDzt_*EntjfVi6pgm1>bU915=`}RIXIfC@({8iQT@EU%t|s$)S041 z)lVYLFkJ3=72{?}IXv@P`g6K{TM+Cld5j8>f3>f<-)Z~+M5iMcf9bk-V9tpg|2@67EB!#5wgG|SS z$62!^gBeqaJ zmfBG4?$W5DDW!?tNTpJhYN;G76zb}xkyYy!hER(G(EpiT>?uL5e|YFUe-xfM!Za=L-=uynL;hSCk+ zj!Fm7t~;isYI~&Y)eucxx%CZJHV90)5k#`Ffj|?X62;w+bkvH$lmW~5cSt&FMN??* zL32whFrJZNdb-Q8$ElJYnxuR+nEQZi8o2vDzE;DlChia;y@q^U4Rr-h_mrb2B+4EThrkLhoa=<=`M7&q<5+Y(|Fw7;Ra4U(9)DM(Q*~tp}#r&LiN>ZdA&(s!PGf9wknsn;yvUj zX#{4HtXQVaVO+20+orCHCYko2$W6jiQv@Sit{JK(PnMdM<>e_tk~p%Q2j-$a!nZ!E zoaguZjtiHDe8KG=35V8JDqg^yT3uR_>bP>WnxPV~3u(enoDY3CS{87e{)E%Bf)n6K z*CJyr=nZ^9!KHR+MKI`4KNhY@Op923z>+oOWC|8KPy;^FiUBN$TH%fQunw-Nd-aQ< GDEk}yv%(Gl literal 0 HcmV?d00001 diff --git a/plugins/inputs/modbus/README.md b/plugins/inputs/modbus/README.md new file mode 100644 index 000000000..1e042deba --- /dev/null +++ b/plugins/inputs/modbus/README.md @@ -0,0 +1,84 @@ +# Telegraf Input Plugin: Modbus + +The Modbus plugin collects Discrete Inputs, Coils, Input Registers and Holding Registers via Modbus TCP or Modbus RTU/ASCII + +### Configuration: + +```toml + ## Connection Configuration + ## + ## The module supports connections to PLCs via MODBUS/TCP or + ## via serial line communication in binary (RTU) or readable (ASCII) encoding + ## + ## Device name + name = "Device" + + ## Slave ID - addresses a MODBUS device on the bus + ## Range: 0 - 255 [0 = broadcast; 248 - 255 = reserved] + slave_id = 1 + + ## Timeout for each request + timeout = "1s" + + # TCP - connect via Modbus/TCP + controller = "tcp://localhost:502" + + # Serial (RS485; RS232) + #controller = "file:///dev/ttyUSB0" + #baud_rate = 9600 + #data_bits = 8 + #parity = "N" + #stop_bits = 1 + #transmission_mode = "RTU" + + + ## Measurements + ## + + ## Digital Variables, Discrete Inputs and Coils + ## name - the variable name + ## address - variable address + + discrete_inputs = [ + { name = "Start", address = [0]}, + { name = "Stop", address = [1]}, + { name = "Reset", address = [2]}, + { name = "EmergencyStop", address = [3]}, + ] + coils = [ + { name = "Motor1-Run", address = [0]}, + { name = "Motor1-Jog", address = [1]}, + { name = "Motor1-Stop", address = [2]}, + ] + + ## Analog Variables, Input Registers and Holding Registers + ## name - the variable name + ## byte_order - the ordering of bytes + ## |---AB, ABCD - Big Endian + ## |---BA, DCBA - Little Endian + ## |---BADC - Mid-Big Endian + ## |---CDAB - Mid-Little Endian + ## data_type - UINT16, INT16, INT32, UINT32, FLOAT32, FLOAT32-IEEE (the IEEE 754 binary representation) + ## scale - the final numeric variable representation + ## address - variable address + + holding_registers = [ + { name = "PowerFactor", byte_order = "AB", data_type = "FLOAT32", scale=0.01, address = [8]}, + { name = "Voltage", byte_order = "AB", data_type = "FLOAT32", scale=0.1, address = [0]}, + { name = "Energy", byte_order = "ABCD", data_type = "FLOAT32", scale=0.001, address = [5,6]}, + { name = "Current", byte_order = "ABCD", data_type = "FLOAT32", scale=0.001, address = [1,2]}, + { name = "Frequency", byte_order = "AB", data_type = "FLOAT32", scale=0.1, address = [7]}, + { name = "Power", byte_order = "ABCD", data_type = "FLOAT32", scale=0.1, address = [3,4]}, + ] + input_registers = [ + { name = "TankLevel", byte_order = "AB", data_type = "INT16", scale=1.0, address = [0]}, + { name = "TankPH", byte_order = "AB", data_type = "INT16", scale=1.0, address = [1]}, + { name = "Pump1-Speed", byte_order = "ABCD", data_type = "INT32", scale=1.0, address = [3,4]}, + ] +``` +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter modbus -test +modbus.InputRegisters,host=orangepizero Current=0,Energy=0,Frecuency=60,Power=0,PowerFactor=0,Voltage=123.9000015258789 1554079521000000000 +``` diff --git a/plugins/inputs/modbus/modbus.go b/plugins/inputs/modbus/modbus.go new file mode 100644 index 000000000..d845ef8fe --- /dev/null +++ b/plugins/inputs/modbus/modbus.go @@ -0,0 +1,589 @@ +package modbus + +import ( + "encoding/binary" + "fmt" + "math" + "net" + "net/url" + "sort" + + mb "github.com/goburrow/modbus" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Modbus holds all data relevant to the plugin +type Modbus struct { + Name string `toml:"name"` + Controller string `toml:"controller"` + TransmissionMode string `toml:"transmission_mode"` + BaudRate int `toml:"baud_rate"` + DataBits int `toml:"data_bits"` + Parity string `toml:"parity"` + StopBits int `toml:"stop_bits"` + SlaveID int `toml:"slave_id"` + Timeout internal.Duration `toml:"timeout"` + DiscreteInputs []fieldContainer `toml:"discrete_inputs"` + Coils []fieldContainer `toml:"coils"` + HoldingRegisters []fieldContainer `toml:"holding_registers"` + InputRegisters []fieldContainer `toml:"input_registers"` + registers []register + isConnected bool + tcpHandler *mb.TCPClientHandler + rtuHandler *mb.RTUClientHandler + asciiHandler *mb.ASCIIClientHandler + client mb.Client +} + +type register struct { + Type string + RegistersRange []registerRange + ReadValue func(uint16, uint16) ([]byte, error) + Fields []fieldContainer +} + +type fieldContainer struct { + Name string `toml:"name"` + ByteOrder string `toml:"byte_order"` + DataType string `toml:"data_type"` + Scale float32 `toml:"scale"` + Address []uint16 `toml:"address"` + value interface{} +} + +type registerRange struct { + address uint16 + length uint16 +} + +const ( + cDiscreteInputs = "discrete_input" + cCoils = "coil" + cHoldingRegisters = "holding_register" + cInputRegisters = "input_register" +) + +const description = `Retrieve data from MODBUS slave devices` +const sampleConfig = ` + ## Connection Configuration + ## + ## The plugin supports connections to PLCs via MODBUS/TCP or + ## via serial line communication in binary (RTU) or readable (ASCII) encoding + ## + ## Device name + name = "Device" + + ## Slave ID - addresses a MODBUS device on the bus + ## Range: 0 - 255 [0 = broadcast; 248 - 255 = reserved] + slave_id = 1 + + ## Timeout for each request + timeout = "1s" + + # TCP - connect via Modbus/TCP + controller = "tcp://localhost:502" + + # Serial (RS485; RS232) + #controller = "file:///dev/ttyUSB0" + #baud_rate = 9600 + #data_bits = 8 + #parity = "N" + #stop_bits = 1 + #transmission_mode = "RTU" + + + ## Measurements + ## + + ## Digital Variables, Discrete Inputs and Coils + ## name - the variable name + ## address - variable address + + discrete_inputs = [ + { name = "start", address = [0]}, + { name = "stop", address = [1]}, + { name = "reset", address = [2]}, + { name = "emergency_stop", address = [3]}, + ] + coils = [ + { name = "motor1_run", address = [0]}, + { name = "motor1_jog", address = [1]}, + { name = "motor1_stop", address = [2]}, + ] + + ## Analog Variables, Input Registers and Holding Registers + ## name - the variable name + ## byte_order - the ordering of bytes + ## |---AB, ABCD - Big Endian + ## |---BA, DCBA - Little Endian + ## |---BADC - Mid-Big Endian + ## |---CDAB - Mid-Little Endian + ## data_type - UINT16, INT16, INT32, UINT32, FLOAT32, FLOAT32-IEEE (the IEEE 754 binary representation) + ## scale - the final numeric variable representation + ## address - variable address + + holding_registers = [ + { name = "power_factor", byte_order = "AB", data_type = "FLOAT32", scale=0.01, address = [8]}, + { name = "voltage", byte_order = "AB", data_type = "FLOAT32", scale=0.1, address = [0]}, + { name = "energy", byte_order = "ABCD", data_type = "FLOAT32", scale=0.001, address = [5,6]}, + { name = "current", byte_order = "ABCD", data_type = "FLOAT32", scale=0.001, address = [1,2]}, + { name = "frequency", byte_order = "AB", data_type = "FLOAT32", scale=0.1, address = [7]}, + { name = "power", byte_order = "ABCD", data_type = "FLOAT32", scale=0.1, address = [3,4]}, + ] + input_registers = [ + { name = "tank_level", byte_order = "AB", data_type = "INT16", scale=1.0, address = [0]}, + { name = "tank_ph", byte_order = "AB", data_type = "INT16", scale=1.0, address = [1]}, + { name = "pump1_speed", byte_order = "ABCD", data_type = "INT32", scale=1.0, address = [3,4]}, + ] +` + +// SampleConfig returns a basic configuration for the plugin +func (m *Modbus) SampleConfig() string { + return sampleConfig +} + +// Description returns a short description of what the plugin does +func (m *Modbus) Description() string { + return description +} + +func (m *Modbus) Init() error { + //check device name + if m.Name == "" { + return fmt.Errorf("device name is empty") + } + + err := connect(m) + if err != nil { + m.isConnected = false + return err + } + + err = m.InitRegister(m.DiscreteInputs, cDiscreteInputs) + if err != nil { + return err + } + + err = m.InitRegister(m.Coils, cCoils) + if err != nil { + return err + } + + err = m.InitRegister(m.HoldingRegisters, cHoldingRegisters) + if err != nil { + return err + } + + err = m.InitRegister(m.InputRegisters, cInputRegisters) + if err != nil { + return err + } + + return nil +} + +func (m *Modbus) InitRegister(fields []fieldContainer, name string) error { + if len(fields) == 0 { + return nil + } + + err := validateFieldContainers(fields, name) + if err != nil { + return err + } + + addrs := []uint16{} + for _, field := range fields { + for _, a := range field.Address { + addrs = append(addrs, a) + } + } + + addrs = removeDuplicates(addrs) + sort.Slice(addrs, func(i, j int) bool { return addrs[i] < addrs[j] }) + + ii := 0 + var registersRange []registerRange + + // Get range of consecutive integers + // [1, 2, 3, 5, 6, 10, 11, 12, 14] + // (1, 3) , (5, 2) , (10, 3), (14 , 1) + for range addrs { + if ii < len(addrs) { + start := addrs[ii] + end := start + + for ii < len(addrs)-1 && addrs[ii+1]-addrs[ii] == 1 { + end = addrs[ii+1] + ii++ + } + ii++ + registersRange = append(registersRange, registerRange{start, end - start + 1}) + } + } + + var fn func(uint16, uint16) ([]byte, error) + + if name == cDiscreteInputs { + fn = m.client.ReadDiscreteInputs + } else if name == cCoils { + fn = m.client.ReadCoils + } else if name == cInputRegisters { + fn = m.client.ReadInputRegisters + } else if name == cHoldingRegisters { + fn = m.client.ReadHoldingRegisters + } else { + return fmt.Errorf("not Valid function") + } + + m.registers = append(m.registers, register{name, registersRange, fn, fields}) + + return nil +} + +// Connect to a MODBUS Slave device via Modbus/[TCP|RTU|ASCII] +func connect(m *Modbus) error { + u, err := url.Parse(m.Controller) + if err != nil { + return err + } + + switch u.Scheme { + case "tcp": + var host, port string + host, port, err = net.SplitHostPort(u.Host) + if err != nil { + return err + } + m.tcpHandler = mb.NewTCPClientHandler(host + ":" + port) + m.tcpHandler.Timeout = m.Timeout.Duration + m.tcpHandler.SlaveId = byte(m.SlaveID) + m.client = mb.NewClient(m.tcpHandler) + err := m.tcpHandler.Connect() + if err != nil { + return err + } + m.isConnected = true + return nil + case "file": + if m.TransmissionMode == "RTU" { + m.rtuHandler = mb.NewRTUClientHandler(u.Path) + m.rtuHandler.Timeout = m.Timeout.Duration + m.rtuHandler.SlaveId = byte(m.SlaveID) + m.rtuHandler.BaudRate = m.BaudRate + m.rtuHandler.DataBits = m.DataBits + m.rtuHandler.Parity = m.Parity + m.rtuHandler.StopBits = m.StopBits + m.client = mb.NewClient(m.rtuHandler) + err := m.rtuHandler.Connect() + if err != nil { + return err + } + m.isConnected = true + return nil + } else if m.TransmissionMode == "ASCII" { + m.asciiHandler = mb.NewASCIIClientHandler(u.Path) + m.asciiHandler.Timeout = m.Timeout.Duration + m.asciiHandler.SlaveId = byte(m.SlaveID) + m.asciiHandler.BaudRate = m.BaudRate + m.asciiHandler.DataBits = m.DataBits + m.asciiHandler.Parity = m.Parity + m.asciiHandler.StopBits = m.StopBits + m.client = mb.NewClient(m.asciiHandler) + err := m.asciiHandler.Connect() + if err != nil { + return err + } + m.isConnected = true + return nil + } else { + return fmt.Errorf("invalid protocol '%s' - '%s' ", u.Scheme, m.TransmissionMode) + } + default: + return fmt.Errorf("invalid controller") + } +} + +func validateFieldContainers(t []fieldContainer, n string) error { + nameEncountered := map[string]bool{} + for _, item := range t { + //check empty name + if item.Name == "" { + return fmt.Errorf("empty name in '%s'", n) + } + + //search name duplicate + if nameEncountered[item.Name] { + return fmt.Errorf("name '%s' is duplicated in '%s' - '%s'", item.Name, n, item.Name) + } else { + nameEncountered[item.Name] = true + } + + if n == cInputRegisters || n == cHoldingRegisters { + // search byte order + switch item.ByteOrder { + case "AB", "BA", "ABCD", "CDAB", "BADC", "DCBA": + break + default: + return fmt.Errorf("invalid byte order '%s' in '%s' - '%s'", item.ByteOrder, n, item.Name) + } + + // search data type + switch item.DataType { + case "UINT16", "INT16", "UINT32", "INT32", "FLOAT32-IEEE", "FLOAT32": + break + default: + return fmt.Errorf("invalid data type '%s' in '%s' - '%s'", item.DataType, n, item.Name) + } + + // check scale + if item.Scale == 0.0 { + return fmt.Errorf("invalid scale '%f' in '%s' - '%s'", item.Scale, n, item.Name) + } + } + + // check address + if len(item.Address) == 0 || len(item.Address) > 2 { + return fmt.Errorf("invalid address '%v' length '%v' in '%s' - '%s'", item.Address, len(item.Address), n, item.Name) + } else if n == cInputRegisters || n == cHoldingRegisters { + if (len(item.Address) == 1 && len(item.ByteOrder) != 2) || (len(item.Address) == 2 && len(item.ByteOrder) != 4) { + return fmt.Errorf("invalid byte order '%s' and address '%v' in '%s' - '%s'", item.ByteOrder, item.Address, n, item.Name) + } + + // search duplicated + if len(item.Address) > len(removeDuplicates(item.Address)) { + return fmt.Errorf("duplicate address '%v' in '%s' - '%s'", item.Address, n, item.Name) + } + + } else if len(item.Address) > 1 || (n == cInputRegisters || n == cHoldingRegisters) { + return fmt.Errorf("invalid address'%v' length'%v' in '%s' - '%s'", item.Address, len(item.Address), n, item.Name) + } + } + return nil +} + +func removeDuplicates(elements []uint16) []uint16 { + encountered := map[uint16]bool{} + result := []uint16{} + + for v := range elements { + if encountered[elements[v]] { + } else { + encountered[elements[v]] = true + result = append(result, elements[v]) + } + } + + return result +} + +func (m *Modbus) getFields() error { + for _, register := range m.registers { + rawValues := make(map[uint16][]byte) + bitRawValues := make(map[uint16]uint16) + for _, rr := range register.RegistersRange { + address := rr.address + readValues, err := register.ReadValue(uint16(rr.address), uint16(rr.length)) + if err != nil { + return err + } + + // Raw Values + if register.Type == cDiscreteInputs || register.Type == cCoils { + for _, readValue := range readValues { + for bitPosition := 0; bitPosition < 8; bitPosition++ { + bitRawValues[address] = getBitValue(readValue, bitPosition) + address = address + 1 + if address+1 > rr.length { + break + } + } + } + } + + // Raw Values + if register.Type == cInputRegisters || register.Type == cHoldingRegisters { + batchSize := 2 + for batchSize < len(readValues) { + rawValues[address] = readValues[0:batchSize:batchSize] + address = address + 1 + readValues = readValues[batchSize:] + } + + rawValues[address] = readValues[0:batchSize:batchSize] + } + } + + if register.Type == cDiscreteInputs || register.Type == cCoils { + for i := 0; i < len(register.Fields); i++ { + register.Fields[i].value = bitRawValues[register.Fields[i].Address[0]] + } + } + + if register.Type == cInputRegisters || register.Type == cHoldingRegisters { + for i := 0; i < len(register.Fields); i++ { + var values_t []byte + + for j := 0; j < len(register.Fields[i].Address); j++ { + tempArray := rawValues[register.Fields[i].Address[j]] + for x := 0; x < len(tempArray); x++ { + values_t = append(values_t, tempArray[x]) + } + } + + register.Fields[i].value = convertDataType(register.Fields[i], values_t) + } + + } + } + + return nil +} + +func getBitValue(n byte, pos int) uint16 { + return uint16(n >> uint(pos) & 0x01) +} + +func convertDataType(t fieldContainer, bytes []byte) interface{} { + switch t.DataType { + case "UINT16": + e16 := convertEndianness16(t.ByteOrder, bytes) + f16 := format16(t.DataType, e16).(uint16) + return scaleUint16(t.Scale, f16) + case "INT16": + e16 := convertEndianness16(t.ByteOrder, bytes) + f16 := format16(t.DataType, e16).(int16) + return scaleInt16(t.Scale, f16) + case "UINT32": + e32 := convertEndianness32(t.ByteOrder, bytes) + f32 := format32(t.DataType, e32).(uint32) + return scaleUint32(t.Scale, f32) + case "INT32": + e32 := convertEndianness32(t.ByteOrder, bytes) + return format32(t.DataType, e32) + case "FLOAT32-IEEE": + e32 := convertEndianness32(t.ByteOrder, bytes) + return format32(t.DataType, e32) + case "FLOAT32": + if len(bytes) == 2 { + e16 := convertEndianness16(t.ByteOrder, bytes) + f16 := format16(t.DataType, e16).(uint16) + return scale16toFloat32(t.Scale, f16) + } else { + e32 := convertEndianness32(t.ByteOrder, bytes) + return scale32toFloat32(t.Scale, e32) + } + default: + return 0 + } +} + +func convertEndianness16(o string, b []byte) uint16 { + switch o { + case "AB": + return binary.BigEndian.Uint16(b) + case "BA": + return binary.LittleEndian.Uint16(b) + default: + return 0 + } +} + +func convertEndianness32(o string, b []byte) uint32 { + switch o { + case "ABCD": + return binary.BigEndian.Uint32(b) + case "DCBA": + return binary.LittleEndian.Uint32(b) + case "BADC": + return uint32(binary.LittleEndian.Uint16(b[0:]))<<16 | uint32(binary.LittleEndian.Uint16(b[2:])) + case "CDAB": + return uint32(binary.BigEndian.Uint16(b[2:]))<<16 | uint32(binary.BigEndian.Uint16(b[0:])) + default: + return 0 + } +} + +func format16(f string, r uint16) interface{} { + switch f { + case "UINT16": + return r + case "INT16": + return int16(r) + default: + return r + } +} + +func format32(f string, r uint32) interface{} { + switch f { + case "UINT32": + return r + case "INT32": + return int32(r) + case "FLOAT32-IEEE": + return math.Float32frombits(r) + default: + return r + } +} + +func scale16toFloat32(s float32, v uint16) float32 { + return float32(v) * s +} + +func scale32toFloat32(s float32, v uint32) float32 { + return float32(v) * s +} + +func scaleInt16(s float32, v int16) int16 { + return int16(float32(v) * s) +} + +func scaleUint16(s float32, v uint16) uint16 { + return uint16(float32(v) * s) +} + +func scaleUint32(s float32, v uint32) uint32 { + return uint32(float64(v) * float64(s)) +} + +// Gather implements the telegraf plugin interface method for data accumulation +func (m *Modbus) Gather(acc telegraf.Accumulator) error { + if !m.isConnected { + err := connect(m) + if err != nil { + m.isConnected = false + return err + } + } + + err := m.getFields() + if err != nil { + m.isConnected = false + return err + } + + for _, reg := range m.registers { + fields := make(map[string]interface{}) + tags := map[string]string{ + "name": m.Name, + "type": reg.Type, + } + + for _, field := range reg.Fields { + fields[field.Name] = field.value + } + + acc.AddFields("modbus", fields, tags) + } + + return nil +} + +// Add this plugin to telegraf +func init() { + inputs.Add("modbus", func() telegraf.Input { return &Modbus{} }) +} diff --git a/plugins/inputs/modbus/modbus_test.go b/plugins/inputs/modbus/modbus_test.go new file mode 100644 index 000000000..3d54c68c5 --- /dev/null +++ b/plugins/inputs/modbus/modbus_test.go @@ -0,0 +1,376 @@ +package modbus + +import ( + "testing" + + m "github.com/goburrow/modbus" + "github.com/stretchr/testify/assert" + "github.com/tbrandon/mbserver" + + "github.com/influxdata/telegraf/testutil" +) + +func TestCoils(t *testing.T) { + var coilTests = []struct { + name string + address uint16 + quantity uint16 + write []byte + read uint16 + }{ + { + name: "coil0_turn_off", + address: 0, + quantity: 1, + write: []byte{0x00}, + read: 0, + }, + { + name: "coil0_turn_on", + address: 0, + quantity: 1, + write: []byte{0x01}, + read: 1, + }, + { + name: "coil1_turn_on", + address: 1, + quantity: 1, + write: []byte{0x01}, + read: 1, + }, + { + name: "coil2_turn_on", + address: 2, + quantity: 1, + write: []byte{0x01}, + read: 1, + }, + { + name: "coil3_turn_on", + address: 3, + quantity: 1, + write: []byte{0x01}, + read: 1, + }, + { + name: "coil1_turn_off", + address: 1, + quantity: 1, + write: []byte{0x00}, + read: 0, + }, + { + name: "coil2_turn_off", + address: 2, + quantity: 1, + write: []byte{0x00}, + read: 0, + }, + { + name: "coil3_turn_off", + address: 3, + quantity: 1, + write: []byte{0x00}, + read: 0, + }, + } + + serv := mbserver.NewServer() + err := serv.ListenTCP("localhost:1502") + defer serv.Close() + assert.NoError(t, err) + + handler := m.NewTCPClientHandler("localhost:1502") + err = handler.Connect() + assert.NoError(t, err) + defer handler.Close() + client := m.NewClient(handler) + + for _, ct := range coilTests { + t.Run(ct.name, func(t *testing.T) { + _, err = client.WriteMultipleCoils(ct.address, ct.quantity, ct.write) + assert.NoError(t, err) + + modbus := Modbus{ + Name: "TestCoils", + Controller: "tcp://localhost:1502", + SlaveID: 1, + Coils: []fieldContainer{ + { + Name: ct.name, + Address: []uint16{ct.address}, + }, + }, + } + + err = modbus.Init() + assert.NoError(t, err) + var acc testutil.Accumulator + err = modbus.Gather(&acc) + assert.NoError(t, err) + assert.NotEmpty(t, modbus.registers) + + for _, coil := range modbus.registers { + assert.Equal(t, ct.read, coil.Fields[0].value) + } + }) + } +} + +func TestHoldingRegisters(t *testing.T) { + var holdingRegisterTests = []struct { + name string + address []uint16 + quantity uint16 + byteOrder string + dataType string + scale float32 + write []byte + read interface{} + }{ + { + name: "register0_ab_float32", + address: []uint16{0}, + quantity: 1, + byteOrder: "AB", + dataType: "FLOAT32", + scale: 0.1, + write: []byte{0x08, 0x98}, + read: float32(220), + }, + { + name: "register0_register1_ab_float32", + address: []uint16{0, 1}, + quantity: 2, + byteOrder: "ABCD", + dataType: "FLOAT32", + scale: 0.001, + write: []byte{0x00, 0x00, 0x03, 0xE8}, + read: float32(1), + }, + { + name: "register1_register2_abcd_float32", + address: []uint16{1, 2}, + quantity: 2, + byteOrder: "ABCD", + dataType: "FLOAT32", + scale: 0.1, + write: []byte{0x00, 0x00, 0x08, 0x98}, + read: float32(220), + }, + { + name: "register3_register4_abcd_float32", + address: []uint16{3, 4}, + quantity: 2, + byteOrder: "ABCD", + dataType: "FLOAT32", + scale: 0.1, + write: []byte{0x00, 0x00, 0x08, 0x98}, + read: float32(220), + }, + { + name: "register7_ab_float32", + address: []uint16{7}, + quantity: 1, + byteOrder: "AB", + dataType: "FLOAT32", + scale: 0.1, + write: []byte{0x01, 0xF4}, + read: float32(50), + }, + { + name: "register10_ab_uint16", + address: []uint16{10}, + quantity: 1, + byteOrder: "AB", + dataType: "UINT16", + scale: 1, + write: []byte{0xAB, 0xCD}, + read: uint16(43981), + }, + { + name: "register10_ab_uint16-scale_.1", + address: []uint16{10}, + quantity: 1, + byteOrder: "AB", + dataType: "UINT16", + scale: .1, + write: []byte{0xAB, 0xCD}, + read: uint16(4398), + }, + { + name: "register10_ab_uint16_scale_10", + address: []uint16{10}, + quantity: 1, + byteOrder: "AB", + dataType: "UINT16", + scale: 10, + write: []byte{0x00, 0x2A}, + read: uint16(420), + }, + { + name: "register20_ba_uint16", + address: []uint16{20}, + quantity: 1, + byteOrder: "BA", + dataType: "UINT16", + scale: 1, + write: []byte{0xAB, 0xCD}, + read: uint16(52651), + }, + { + name: "register30_ab_int16", + address: []uint16{20}, + quantity: 1, + byteOrder: "AB", + dataType: "INT16", + scale: 1, + write: []byte{0xAB, 0xCD}, + read: int16(-21555), + }, + { + name: "register40_ba_int16", + address: []uint16{40}, + quantity: 1, + byteOrder: "BA", + dataType: "INT16", + scale: 1, + write: []byte{0xAB, 0xCD}, + read: int16(-12885), + }, + { + name: "register50_register51_abcd_int32", + address: []uint16{50, 51}, + quantity: 2, + byteOrder: "ABCD", + dataType: "INT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: int32(-1430532899), + }, + { + name: "register60_register61_dcba_int32", + address: []uint16{60, 61}, + quantity: 2, + byteOrder: "DCBA", + dataType: "INT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: int32(-573785174), + }, + { + name: "register70_register71_badc_int32", + address: []uint16{70, 71}, + quantity: 2, + byteOrder: "BADC", + dataType: "INT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: int32(-1146430004), + }, + { + name: "register80_register81_cdab_int32", + address: []uint16{80, 81}, + quantity: 2, + byteOrder: "CDAB", + dataType: "INT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: int32(-857888069), + }, + { + name: "register90_register91_abcd_uint32", + address: []uint16{90, 91}, + quantity: 2, + byteOrder: "ABCD", + dataType: "UINT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: uint32(2864434397), + }, + { + name: "register100_register101_dcba_uint32", + address: []uint16{100, 101}, + quantity: 2, + byteOrder: "DCBA", + dataType: "UINT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: uint32(3721182122), + }, + { + name: "register110_register111_badc_uint32", + address: []uint16{110, 111}, + quantity: 2, + byteOrder: "BADC", + dataType: "UINT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: uint32(3148537292), + }, + { + name: "register120_register121_cdab_uint32", + address: []uint16{120, 121}, + quantity: 2, + byteOrder: "CDAB", + dataType: "UINT32", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: uint32(3437079227), + }, + { + name: "register130_register131_abcd_float32_ieee", + address: []uint16{130, 131}, + quantity: 2, + byteOrder: "ABCD", + dataType: "FLOAT32-IEEE", + scale: 1, + write: []byte{0xAA, 0xBB, 0xCC, 0xDD}, + read: float32(-3.3360025e-13), + }, + } + + serv := mbserver.NewServer() + err := serv.ListenTCP("localhost:1502") + defer serv.Close() + assert.NoError(t, err) + + handler := m.NewTCPClientHandler("localhost:1502") + err = handler.Connect() + assert.NoError(t, err) + defer handler.Close() + client := m.NewClient(handler) + + for _, hrt := range holdingRegisterTests { + t.Run(hrt.name, func(t *testing.T) { + _, err = client.WriteMultipleRegisters(hrt.address[0], hrt.quantity, hrt.write) + assert.NoError(t, err) + + modbus := Modbus{ + Name: "TestHoldingRegisters", + Controller: "tcp://localhost:1502", + SlaveID: 1, + HoldingRegisters: []fieldContainer{ + { + Name: hrt.name, + ByteOrder: hrt.byteOrder, + DataType: hrt.dataType, + Scale: hrt.scale, + Address: hrt.address, + }, + }, + } + + err = modbus.Init() + assert.NoError(t, err) + var acc testutil.Accumulator + modbus.Gather(&acc) + assert.NotEmpty(t, modbus.registers) + + for _, coil := range modbus.registers { + assert.Equal(t, hrt.read, coil.Fields[0].value) + } + }) + } +}