diff --git a/Makefile b/Makefile index e7889e89d..2b2e9668e 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,7 @@ test-windows: go test -short ./plugins/inputs/win_services/... go test -short ./plugins/inputs/procstat/... go test -short ./plugins/inputs/ntpq/... + go test -short ./plugins/processors/port_name/... .PHONY: vet vet: diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index f917bf6a6..dbf8a12e5 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -11,6 +11,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/pivot" + _ "github.com/influxdata/telegraf/plugins/processors/port_name" _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" diff --git a/plugins/processors/port_name/README.md b/plugins/processors/port_name/README.md new file mode 100644 index 000000000..c078fe1c4 --- /dev/null +++ b/plugins/processors/port_name/README.md @@ -0,0 +1,26 @@ +# Port Name Lookup Processor Plugin + +Use the `port_name` processor to convert a tag containing a well-known port number to the registered service name. + +Tag can contain a number ("80") or number and protocol separated by slash ("443/tcp"). If protocol is not provided it defaults to tcp but can be changed with the default_protocol setting. + +### Configuration + +```toml +[[processors.port_name]] + ## Name of tag holding the port number + # tag = "port" + + ## Name of output tag where service name will be added + # dest = "service" + + ## Default tcp or udp + # default_protocol = "tcp" +``` + +### Example + +```diff +- measurement,port=80 field=123 1560540094000000000 ++ measurement,port=80,service=http field=123 1560540094000000000 +``` diff --git a/plugins/processors/port_name/port_name.go b/plugins/processors/port_name/port_name.go new file mode 100644 index 000000000..50c893e60 --- /dev/null +++ b/plugins/processors/port_name/port_name.go @@ -0,0 +1,174 @@ +package portname + +import ( + "bufio" + "io" + "os" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +var sampleConfig = ` +[[processors.port_name]] + ## Name of tag holding the port number + # tag = "port" + + ## Name of output tag where service name will be added + # dest = "service" + + ## Default tcp or udp + # default_protocol = "tcp" +` + +type sMap map[string]map[int]string // "https" == services["tcp"][443] + +var services sMap + +type PortName struct { + SourceTag string `toml:"tag"` + DestTag string `toml:"dest"` + DefaultProtocol string `toml:"default_protocol"` + + Log telegraf.Logger `toml:"-"` +} + +func (d *PortName) SampleConfig() string { + return sampleConfig +} + +func (d *PortName) Description() string { + return "Given a tag of a TCP or UDP port number, add a tag of the service name looked up in the system services file" +} + +func readServicesFile() { + file, err := os.Open(servicesPath()) + if err != nil { + return + } + defer file.Close() + + services = readServices(file) +} + +// Read the services file into a map. +// +// This function takes a similar approach to parsing as the go +// standard library (see src/net/port_unix.go in golang source) but +// maps protocol and port number to service name, not protocol and +// service to port number. +func readServices(r io.Reader) sMap { + services = make(sMap) + scanner := bufio.NewScanner(r) + for scanner.Scan() { + line := scanner.Text() + // "http 80/tcp www www-http # World Wide Web HTTP" + if i := strings.Index(line, "#"); i >= 0 { + line = line[:i] + } + f := strings.Fields(line) + if len(f) < 2 { + continue + } + service := f[0] // "http" + portProto := f[1] // "80/tcp" + portProtoSlice := strings.SplitN(portProto, "/", 2) + if len(portProtoSlice) < 2 { + continue + } + port, err := strconv.Atoi(portProtoSlice[0]) // "80" + if err != nil || port <= 0 { + continue + } + proto := portProtoSlice[1] // "tcp" + proto = strings.ToLower(proto) + + protoMap, ok := services[proto] + if !ok { + protoMap = make(map[int]string) + services[proto] = protoMap + } + protoMap[port] = service + } + return services +} + +func (d *PortName) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + for _, m := range metrics { + portProto, ok := m.GetTag(d.SourceTag) + if !ok { + // Nonexistent tag + continue + } + portProtoSlice := strings.SplitN(portProto, "/", 2) + l := len(portProtoSlice) + + if l == 0 { + // Empty tag + d.Log.Errorf("empty port tag: %v", d.SourceTag) + continue + } + + var port int + if l > 0 { + var err error + val := portProtoSlice[0] + port, err = strconv.Atoi(val) + if err != nil { + // Can't convert port to string + d.Log.Errorf("error converting port to integer: %v", val) + continue + } + } + + proto := d.DefaultProtocol + if l > 1 && len(portProtoSlice[1]) > 0 { + proto = portProtoSlice[1] + } + proto = strings.ToLower(proto) + + protoMap, ok := services[proto] + if !ok { + // Unknown protocol + // + // Protocol is normally tcp or udp. The services file + // normally has entries for both, so our map does too. If + // not, it's very likely the source tag or the services + // file doesn't make sense. + d.Log.Errorf("protocol not found in services map: %v", proto) + continue + } + + service, ok := protoMap[port] + if !ok { + // Unknown port + // + // Not all ports are named so this isn't an error, but + // it's helpful to know when debugging. + d.Log.Debugf("port not found in services map: %v", port) + continue + } + + m.AddTag(d.DestTag, service) + } + + return metrics +} + +func (h *PortName) Init() error { + services = make(sMap) + readServicesFile() + return nil +} + +func init() { + processors.Add("port_name", func() telegraf.Processor { + return &PortName{ + SourceTag: "port", + DestTag: "service", + DefaultProtocol: "tcp", + } + }) +} diff --git a/plugins/processors/port_name/port_name_test.go b/plugins/processors/port_name/port_name_test.go new file mode 100644 index 000000000..b58f95a9e --- /dev/null +++ b/plugins/processors/port_name/port_name_test.go @@ -0,0 +1,261 @@ +package portname + +import ( + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var fakeServices = ` +http 80/tcp www # WorldWideWeb HTTP +https 443/tcp # http protocol over TLS/SSL +tftp 69/udp` + +func TestReadServicesFile(t *testing.T) { + readServicesFile() + require.NotZero(t, len(services)) +} + +func TestFakeServices(t *testing.T) { + r := strings.NewReader(fakeServices) + m := readServices(r) + require.Equal(t, sMap{"tcp": {80: "http", 443: "https"}, "udp": {69: "tftp"}}, m) +} + +func TestTable(t *testing.T) { + var tests = []struct { + name string + tag string + dest string + prot string + input []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "ordinary tcp default", + tag: "port", + dest: "service", + prot: "tcp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "443", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "443", + "service": "https", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "force udp default", + tag: "port", + dest: "service", + prot: "udp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "69", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "69", + "service": "tftp", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "override default protocol", + tag: "port", + dest: "service", + prot: "foobar", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80/tcp", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80/tcp", + "service": "http", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "multiple metrics, multiple protocols", + tag: "port", + dest: "service", + prot: "tcp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + testutil.MustMetric( + "meas", + map[string]string{ + "port": "69/udp", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80", + "service": "http", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + testutil.MustMetric( + "meas", + map[string]string{ + "port": "69/udp", + "service": "tftp", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "rename source and destination tags", + tag: "foo", + dest: "bar", + prot: "tcp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "foo": "80", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "foo": "80", + "bar": "http", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "unknown port", + tag: "port", + dest: "service", + prot: "tcp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "9999", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "9999", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "don't mix up protocols", + tag: "port", + dest: "service", + prot: "udp", + input: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "meas", + map[string]string{ + "port": "80", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + } + + r := strings.NewReader(fakeServices) + services = readServices(r) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := PortName{ + SourceTag: tt.tag, + DestTag: tt.dest, + DefaultProtocol: tt.prot, + Log: testutil.Logger{}, + } + + actual := p.Apply(tt.input...) + + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} diff --git a/plugins/processors/port_name/services_path.go b/plugins/processors/port_name/services_path.go new file mode 100644 index 000000000..c8cf73d14 --- /dev/null +++ b/plugins/processors/port_name/services_path.go @@ -0,0 +1,12 @@ +// +build windows + +package portname + +import ( + "os" + "path/filepath" +) + +func servicesPath() string { + return filepath.Join(os.Getenv("WINDIR"), `system32\drivers\etc\services`) +} diff --git a/plugins/processors/port_name/services_path_notwindows.go b/plugins/processors/port_name/services_path_notwindows.go new file mode 100644 index 000000000..5097bfa9c --- /dev/null +++ b/plugins/processors/port_name/services_path_notwindows.go @@ -0,0 +1,7 @@ +// +build !windows + +package portname + +func servicesPath() string { + return "/etc/services" +}