From d3cf7d669b1d223f73237c4a34a065f52c8ca735 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Tue, 20 Aug 2019 18:10:25 -0600 Subject: [PATCH] Add apcupsd input plugin (#6226) --- Gopkg.lock | 8 + Gopkg.toml | 4 + README.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/apcupsd/README.md | 45 +++++ plugins/inputs/apcupsd/apcupsd.go | 108 ++++++++++++ plugins/inputs/apcupsd/apcupsd_test.go | 227 +++++++++++++++++++++++++ 7 files changed, 394 insertions(+) create mode 100644 plugins/inputs/apcupsd/README.md create mode 100644 plugins/inputs/apcupsd/apcupsd.go create mode 100644 plugins/inputs/apcupsd/apcupsd_test.go diff --git a/Gopkg.lock b/Gopkg.lock index b884eb9b9..248d55456 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -813,6 +813,13 @@ revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" version = "v1.0.1" +[[projects]] + digest = "1:d4a3035a03b4612c714b993891c071706a64890e55ef64bcc42bc2b461cb2756" + name = "github.com/mdlayher/apcupsd" + packages = ["."] + pruneopts = "" + revision = "2fe55d9e1d0704d3c6f03f69a1fd9ebe2aef9df1" + [[projects]] digest = "1:4c8d8358c45ba11ab7bb15df749d4df8664ff1582daead28bae58cf8cbe49890" name = "github.com/miekg/dns" @@ -1734,6 +1741,7 @@ "github.com/kballard/go-shellquote", "github.com/kubernetes/apimachinery/pkg/api/resource", "github.com/matttproud/golang_protobuf_extensions/pbutil", + "github.com/mdlayher/apcupsd", "github.com/miekg/dns", "github.com/multiplay/go-ts3", "github.com/nats-io/gnatsd/server", diff --git a/Gopkg.toml b/Gopkg.toml index 028af3487..2cc57dd71 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -220,6 +220,10 @@ source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" name = "gopkg.in/fsnotify.v1" +[[constraint]] + name = "github.com/mdlayher/apcupsd" + revision = "2fe55d9e1d0704d3c6f03f69a1fd9ebe2aef9df1" + [[constraint]] branch = "master" name = "google.golang.org/genproto" diff --git a/README.md b/README.md index 7e5f18954..7a9650e97 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ For documentation on the latest development code see the [documentation index][d * [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq) * [apache](./plugins/inputs/apache) * [aurora](./plugins/inputs/aurora) +* [apcupsd](./plugins/inputs/apcupsd) * [aws cloudwatch](./plugins/inputs/cloudwatch) * [bcache](./plugins/inputs/bcache) * [beanstalkd](./plugins/inputs/beanstalkd) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index bd8393c0b..c3b134684 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/apache" + _ "github.com/influxdata/telegraf/plugins/inputs/apcupsd" _ "github.com/influxdata/telegraf/plugins/inputs/aurora" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd" diff --git a/plugins/inputs/apcupsd/README.md b/plugins/inputs/apcupsd/README.md new file mode 100644 index 000000000..52edeafe6 --- /dev/null +++ b/plugins/inputs/apcupsd/README.md @@ -0,0 +1,45 @@ +# apcupsd Input Plugin + +This plugin reads data from an apcupsd daemon over its NIS network protocol. + +### Requirements + +apcupsd should be installed and it's daemon should be running. + +### Configuration + +```toml +[[inputs.apcupsd]] + # A list of running apcupsd server to connect to. + # If not provided will default to tcp://127.0.0.1:3551 + servers = ["tcp://127.0.0.1:3551"] + + ## Timeout for dialing server. + timeout = "5s" +``` + +### Metrics + +- apcupsd + - tags: + - serial + - ups_name + - status + - fields: + - online + - input_voltage + - load_percent + - battery_charge_percent + - time_left_ns + - output_voltage + - internal_temp + - battery_voltage + - input_frequency + - time_on_battery_ns + + +### Example output + +``` +apcupsd,serial=AS1231515,ups_name=name1,host=server1 time_on_battery=0,load_percent=9.7,time_left_minutes=98,output_voltage=230.4,internal_temp=32.4,battery_voltage=27.4,input_frequency=50.2,online=true,input_voltage=230.4,battery_charge_percent=100 1490035922000000000 +``` diff --git a/plugins/inputs/apcupsd/apcupsd.go b/plugins/inputs/apcupsd/apcupsd.go new file mode 100644 index 000000000..9a73c454a --- /dev/null +++ b/plugins/inputs/apcupsd/apcupsd.go @@ -0,0 +1,108 @@ +package apcupsd + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/mdlayher/apcupsd" +) + +const defaultAddress = "tcp://127.0.0.1:3551" + +var defaultTimeout = internal.Duration{Duration: time.Duration(time.Second * 5)} + +type ApcUpsd struct { + Servers []string + Timeout internal.Duration +} + +func (*ApcUpsd) Description() string { + return "Monitor APC UPSes connected to apcupsd" +} + +var sampleConfig = ` + # A list of running apcupsd server to connect to. + # If not provided will default to tcp://127.0.0.1:3551 + servers = ["tcp://127.0.0.1:3551"] + + ## Timeout for dialing server. + timeout = "5s" +` + +func (*ApcUpsd) SampleConfig() string { + return sampleConfig +} + +func (h *ApcUpsd) Gather(acc telegraf.Accumulator) error { + ctx := context.Background() + + for _, addr := range h.Servers { + addrBits, err := url.Parse(addr) + if err != nil { + return err + } + if addrBits.Scheme == "" { + addrBits.Scheme = "tcp" + } + + ctx, cancel := context.WithTimeout(ctx, h.Timeout.Duration) + defer cancel() + + status, err := fetchStatus(ctx, addrBits) + if err != nil { + return err + } + + tags := map[string]string{ + "serial": status.SerialNumber, + "ups_name": status.UPSName, + "status": status.Status, + } + + flags, err := strconv.ParseUint(strings.Fields(status.StatusFlags)[0], 0, 64) + if err != nil { + return err + } + + fields := map[string]interface{}{ + "status_flags": flags, + "input_voltage": status.LineVoltage, + "load_percent": status.LoadPercent, + "battery_charge_percent": status.BatteryChargePercent, + "time_left_ns": status.TimeLeft.Nanoseconds(), + "output_voltage": status.OutputVoltage, + "internal_temp": status.InternalTemp, + "battery_voltage": status.BatteryVoltage, + "input_frequency": status.LineFrequency, + "time_on_battery_ns": status.TimeOnBattery.Nanoseconds(), + } + + acc.AddFields("apcupsd", fields, tags) + } + return nil +} + +func fetchStatus(ctx context.Context, addr *url.URL) (*apcupsd.Status, error) { + client, err := apcupsd.DialContext(ctx, addr.Scheme, addr.Host) + if err != nil { + return nil, err + } + defer client.Close() + + return client.Status() +} + +func init() { + inputs.Add("apcupsd", func() telegraf.Input { + return &ApcUpsd{ + Servers: []string{defaultAddress}, + Timeout: defaultTimeout, + } + }) +} diff --git a/plugins/inputs/apcupsd/apcupsd_test.go b/plugins/inputs/apcupsd/apcupsd_test.go new file mode 100644 index 000000000..2418faf85 --- /dev/null +++ b/plugins/inputs/apcupsd/apcupsd_test.go @@ -0,0 +1,227 @@ +// +build go1.11 + +package apcupsd + +import ( + "context" + "encoding/binary" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestApcupsdDocs(t *testing.T) { + apc := &ApcUpsd{} + apc.Description() + apc.SampleConfig() +} + +func TestApcupsdInit(t *testing.T) { + input, ok := inputs.Inputs["apcupsd"] + if !ok { + t.Fatal("Input not defined") + } + + _ = input().(*ApcUpsd) +} + +func listen(ctx context.Context, t *testing.T, out [][]byte) (string, error) { + lc := net.ListenConfig{} + ln, err := lc.Listen(ctx, "tcp4", "127.0.0.1:0") + if err != nil { + return "", err + } + + go func() { + for ctx.Err() == nil { + defer ln.Close() + + conn, err := ln.Accept() + if err != nil { + continue + } + defer conn.Close() + conn.SetReadDeadline(time.Now().Add(time.Second)) + + in := make([]byte, 128) + n, err := conn.Read(in) + require.NoError(t, err, "failed to read from connection") + + status := []byte{0, 6, 's', 't', 'a', 't', 'u', 's'} + want, got := status, in[:n] + require.Equal(t, want, got) + + // Run against test function and append EOF to end of output bytes + out = append(out, []byte{0, 0}) + + for _, o := range out { + _, err := conn.Write(o) + require.NoError(t, err, "failed to write to connection") + } + } + }() + + return ln.Addr().String(), nil +} + +func TestConfig(t *testing.T) { + apc := &ApcUpsd{Timeout: defaultTimeout} + + var ( + tests = []struct { + name string + servers []string + err bool + }{ + { + name: "test listen address no scheme", + servers: []string{"127.0.0.1:1234"}, + err: true, + }, + { + name: "test no port", + servers: []string{"127.0.0.3"}, + err: true, + }, + } + + acc testutil.Accumulator + ) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + apc.Servers = tt.servers + + err := apc.Gather(&acc) + if tt.err { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } + +} + +func TestApcupsdGather(t *testing.T) { + apc := &ApcUpsd{Timeout: defaultTimeout} + + var ( + tests = []struct { + name string + err bool + tags map[string]string + fields map[string]interface{} + out func() [][]byte + }{ + { + name: "test listening server with output", + err: false, + tags: map[string]string{ + "serial": "ABC123", + "status": "ONLINE", + "ups_name": "BERTHA", + }, + fields: map[string]interface{}{ + "status_flags": uint64(8), + "battery_charge_percent": float64(0), + "battery_voltage": float64(0), + "input_frequency": float64(0), + "input_voltage": float64(0), + "internal_temp": float64(0), + "load_percent": float64(13), + "output_voltage": float64(0), + "time_left_ns": int64(2790000000000), + "time_on_battery_ns": int64(0), + }, + out: genOutput, + }, + { + name: "test with bad output", + err: true, + out: genBadOutput, + }, + } + + acc testutil.Accumulator + ) + + for _, tt := range tests { + + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + lAddr, err := listen(ctx, t, tt.out()) + if err != nil { + t.Fatal(err) + } + + apc.Servers = []string{"tcp://" + lAddr} + + err = apc.Gather(&acc) + if tt.err { + require.Error(t, err) + } else { + require.NoError(t, err) + acc.AssertContainsTaggedFields(t, "apcupsd", tt.fields, tt.tags) + } + cancel() + }) + } +} + +// The following functionality is straight from apcupsd tests. + +// kvBytes is a helper to generate length and key/value byte buffers. +func kvBytes(kv string) ([]byte, []byte) { + lenb := make([]byte, 2) + binary.BigEndian.PutUint16(lenb, uint16(len(kv))) + + return lenb, []byte(kv) +} + +func genOutput() [][]byte { + kvs := []string{ + "SERIALNO : ABC123", + "STATUS : ONLINE", + "STATFLAG : 0x08 Status Flag", + "UPSNAME : BERTHA", + "DATE : 2016-09-06 22:13:28 -0400", + "HOSTNAME : example", + "LOADPCT : 13.0 Percent Load Capacity", + "BATTDATE : 2016-09-06", + "TIMELEFT : 46.5 Minutes", + "TONBATT : 0 seconds", + "NUMXFERS : 0", + "SELFTEST : NO", + "NOMPOWER : 865 Watts", + } + + var out [][]byte + for _, kv := range kvs { + lenb, kvb := kvBytes(kv) + out = append(out, lenb) + out = append(out, kvb) + } + + return out +} + +func genBadOutput() [][]byte { + kvs := []string{ + "STATFLAG : 0x08Status Flag", + } + + var out [][]byte + for _, kv := range kvs { + lenb, kvb := kvBytes(kv) + out = append(out, lenb) + out = append(out, kvb) + } + + return out +}