From 7b365180d07bdde84c1002efeb12696bc19e9377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Sesqu=C3=A8s?= Date: Wed, 31 Jan 2018 21:25:27 +0100 Subject: [PATCH] Add Ipset input plugin (#3346) --- plugins/inputs/all/all.go | 1 + plugins/inputs/ipset/README.md | 62 +++++++++++++ plugins/inputs/ipset/ipset.go | 126 +++++++++++++++++++++++++++ plugins/inputs/ipset/ipset_test.go | 135 +++++++++++++++++++++++++++++ 4 files changed, 324 insertions(+) create mode 100644 plugins/inputs/ipset/README.md create mode 100644 plugins/inputs/ipset/ipset.go create mode 100644 plugins/inputs/ipset/ipset_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index fa78b3ff0..a13bd6cd3 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -36,6 +36,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/internal" _ "github.com/influxdata/telegraf/plugins/inputs/interrupts" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" + _ "github.com/influxdata/telegraf/plugins/inputs/ipset" _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia2" diff --git a/plugins/inputs/ipset/README.md b/plugins/inputs/ipset/README.md new file mode 100644 index 000000000..2209de911 --- /dev/null +++ b/plugins/inputs/ipset/README.md @@ -0,0 +1,62 @@ +# Ipset Plugin + +The ipset plugin gathers packets and bytes counters from Linux ipset. +It uses the output of the command "ipset save". +Ipsets created without the "counters" option are ignored. + +Results are tagged with: +- ipset name +- ipset entry + +There are 3 ways to grant telegraf the right to run ipset: +* Run as root (strongly discouraged) +* Use sudo +* Configure systemd to run telegraf with CAP_NET_ADMIN and CAP_NET_RAW capabilities. + +### Using systemd capabilities + +You may run `systemctl edit telegraf.service` and add the following: + +``` +[Service] +CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN +AmbientCapabilities=CAP_NET_RAW CAP_NET_ADMIN +``` + +### Using sudo + +You may edit your sudo configuration with the following: + +```sudo +telegraf ALL=(root) NOPASSWD: /sbin/ipset save +``` + +### Configuration + +```toml + [[inputs.ipset]] + ## By default, we only show sets which have already matched at least 1 packet. + ## set include_unmatched_sets = true to gather them all. + include_unmatched_sets = false + ## Adjust your sudo settings appropriately if using this option ("sudo ipset save") + ## You can avoid using sudo or root, by setting appropriate privileges for + ## the telegraf.service systemd service. + use_sudo = false + ## The default timeout of 1s for ipset execution can be overridden here: + # timeout = "1s" + +``` + +### Example Output + +``` +$ sudo ipset save +create myset hash:net family inet hashsize 1024 maxelem 65536 counters comment +add myset 10.69.152.1 packets 8 bytes 672 comment "machine A" +``` + +``` +$ telegraf --config telegraf.conf --input-filter ipset --test --debug +* Plugin: inputs.ipset, Collection 1 +> ipset,rule=10.69.152.1,host=trashme,set=myset bytes_total=8i,packets_total=672i 1507615028000000000 +``` diff --git a/plugins/inputs/ipset/ipset.go b/plugins/inputs/ipset/ipset.go new file mode 100644 index 000000000..c459ebf4c --- /dev/null +++ b/plugins/inputs/ipset/ipset.go @@ -0,0 +1,126 @@ +package ipset + +import ( + "bufio" + "bytes" + "fmt" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Ipsets is a telegraf plugin to gather packets and bytes counters from ipset +type Ipset struct { + IncludeUnmatchedSets bool + UseSudo bool + Timeout internal.Duration + lister setLister +} + +type setLister func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) + +const measurement = "ipset" + +var defaultTimeout = internal.Duration{Duration: time.Second} + +// Description returns a short description of the plugin +func (ipset *Ipset) Description() string { + return "Gather packets and bytes counters from Linux ipsets" +} + +// SampleConfig returns sample configuration options. +func (ipset *Ipset) SampleConfig() string { + return ` + ## By default, we only show sets which have already matched at least 1 packet. + ## set include_unmatched_sets = true to gather them all. + include_unmatched_sets = false + ## Adjust your sudo settings appropriately if using this option ("sudo ipset save") + use_sudo = false + ## The default timeout of 1s for ipset execution can be overridden here: + # timeout = "1s" +` +} + +func (ips *Ipset) Gather(acc telegraf.Accumulator) error { + out, e := ips.lister(ips.Timeout, ips.UseSudo) + if e != nil { + acc.AddError(e) + } + + scanner := bufio.NewScanner(out) + for scanner.Scan() { + line := scanner.Text() + // Ignore sets created without the "counters" option + nocomment := strings.Split(line, "\"")[0] + if !(strings.Contains(nocomment, "packets") && + strings.Contains(nocomment, "bytes")) { + continue + } + + data := strings.Fields(line) + if len(data) < 7 { + acc.AddError(fmt.Errorf("Error parsing line (expected at least 7 fields): %s", line)) + continue + } + if data[0] == "add" && (data[4] != "0" || ips.IncludeUnmatchedSets) { + tags := map[string]string{ + "set": data[1], + "rule": data[2], + } + packets_total, err := strconv.ParseUint(data[4], 10, 64) + if err != nil { + acc.AddError(err) + } + bytes_total, err := strconv.ParseUint(data[6], 10, 64) + if err != nil { + acc.AddError(err) + } + fields := map[string]interface{}{ + "packets_total": packets_total, + "bytes_total": bytes_total, + } + acc.AddCounter(measurement, fields, tags) + } + } + return nil +} + +func setList(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) { + // Is ipset installed ? + ipsetPath, err := exec.LookPath("ipset") + if err != nil { + return nil, err + } + var args []string + cmdName := ipsetPath + if UseSudo { + cmdName = "sudo" + args = append(args, ipsetPath) + } + args = append(args, "save") + + cmd := exec.Command(cmdName, args...) + + var out bytes.Buffer + cmd.Stdout = &out + err = internal.RunTimeout(cmd, Timeout.Duration) + if err != nil { + return &out, fmt.Errorf("error running ipset save: %s", err) + } + + return &out, nil +} + +func init() { + inputs.Add("ipset", func() telegraf.Input { + return &Ipset{ + lister: setList, + Timeout: defaultTimeout, + } + }) +} diff --git a/plugins/inputs/ipset/ipset_test.go b/plugins/inputs/ipset/ipset_test.go new file mode 100644 index 000000000..9438c806d --- /dev/null +++ b/plugins/inputs/ipset/ipset_test.go @@ -0,0 +1,135 @@ +package ipset + +import ( + "bytes" + "errors" + "fmt" + "reflect" + "testing" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" +) + +func TestIpset(t *testing.T) { + tests := []struct { + name string + value string + tags []map[string]string + fields [][]map[string]interface{} + err error + }{ + { + name: "0 sets, no results", + value: "", + }, + { + name: "Empty sets, no values", + value: `create myset hash:net family inet hashsize 1024 maxelem 65536 + create myset2 hash:net,port family inet hashsize 16384 maxelem 524288 counters comment + `, + }, + { + name: "Non-empty sets, but no counters, no results", + value: `create myset hash:net family inet hashsize 1024 maxelem 65536 + add myset 1.2.3.4 + `, + }, + { + name: "Line with data but not enough fields", + value: `create hash:net family inet hashsize 1024 maxelem 65536 counters + add myset 4.5.6.7 packets 123 bytes + `, + err: fmt.Errorf("Error parsing line (expected at least 7 fields): \t\t\t\tadd myset 4.5.6.7 packets 123 bytes"), + }, + { + name: "Non-empty sets, counters, no comment", + value: `create myset hash:net family inet hashsize 1024 maxelem 65536 counters + add myset 1.2.3.4 packets 1328 bytes 79680 + add myset 2.3.4.5 packets 0 bytes 0 + add myset 3.4.5.6 packets 3 bytes 222 + `, + tags: []map[string]string{ + map[string]string{"set": "myset", "rule": "1.2.3.4"}, + map[string]string{"set": "myset", "rule": "3.4.5.6"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"packets_total": uint64(1328), "bytes_total": uint64(79680)}}, + {map[string]interface{}{"packets_total": uint64(3), "bytes_total": uint64(222)}}, + }, + }, + { + name: "Sets with counters and comment", + value: `create myset hash:net family inet hashsize 1024 maxelem 65536 counters comment + add myset 1.2.3.4 packets 1328 bytes 79680 comment "first IP" + add myset 2.3.4.5 packets 0 bytes 0 comment "2nd IP" + add myset 3.4.5.6 packets 3 bytes 222 "3rd IP" + `, + tags: []map[string]string{ + map[string]string{"set": "myset", "rule": "1.2.3.4"}, + map[string]string{"set": "myset", "rule": "3.4.5.6"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"packets_total": uint64(1328), "bytes_total": uint64(79680)}}, + {map[string]interface{}{"packets_total": uint64(3), "bytes_total": uint64(222)}}, + }, + }, + } + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + i++ + ips := &Ipset{ + lister: func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) { + return bytes.NewBufferString(tt.value), nil + }, + } + acc := new(testutil.Accumulator) + err := acc.GatherError(ips.Gather) + if !reflect.DeepEqual(tt.err, err) { + t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err) + } + if len(tt.tags) == 0 { + n := acc.NFields() + if n != 0 { + t.Errorf("%d: expected 0 values got %d", i, n) + } + return + } + n := 0 + for j, tags := range tt.tags { + for k, fields := range tt.fields[j] { + if len(acc.Metrics) < n+1 { + t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics)) + break + } + m := acc.Metrics[n] + if !reflect.DeepEqual(m.Measurement, measurement) { + t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement) + } + if !reflect.DeepEqual(m.Tags, tags) { + t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags) + } + if !reflect.DeepEqual(m.Fields, fields) { + t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields) + } + n++ + } + } + }) + } +} + +func TestIpset_Gather_listerError(t *testing.T) { + errFoo := errors.New("error foobar") + ips := &Ipset{ + lister: func(Timeout internal.Duration, UseSudo bool) (*bytes.Buffer, error) { + return new(bytes.Buffer), errFoo + }, + } + acc := new(testutil.Accumulator) + err := acc.GatherError(ips.Gather) + if !reflect.DeepEqual(err, errFoo) { + t.Errorf("Expected error %#v got\n%#v\n", errFoo, err) + } +}