From 580ac61cf754b960499d83edbc1a73cdd143a097 Mon Sep 17 00:00:00 2001 From: hsinghkalsi <41585091+hsinghkalsi@users.noreply.github.com> Date: Wed, 27 May 2020 14:24:49 -0400 Subject: [PATCH] Add newrelic output plugin (#7019) --- README.md | 1 + go.mod | 1 + go.sum | 2 + plugins/outputs/all/all.go | 1 + plugins/outputs/newrelic/README.md | 22 +++ plugins/outputs/newrelic/newrelic.go | 159 +++++++++++++++++++ plugins/outputs/newrelic/newrelic_test.go | 180 ++++++++++++++++++++++ 7 files changed, 366 insertions(+) create mode 100644 plugins/outputs/newrelic/README.md create mode 100644 plugins/outputs/newrelic/newrelic.go create mode 100644 plugins/outputs/newrelic/newrelic_test.go diff --git a/README.md b/README.md index c749811de..32ed21edb 100644 --- a/README.md +++ b/README.md @@ -414,6 +414,7 @@ For documentation on the latest development code see the [documentation index][d * [librato](./plugins/outputs/librato) * [mqtt](./plugins/outputs/mqtt) * [nats](./plugins/outputs/nats) +* [newrelic](./plugins/outputs/newrelic) * [nsq](./plugins/outputs/nsq) * [opentsdb](./plugins/outputs/opentsdb) * [prometheus](./plugins/outputs/prometheus_client) diff --git a/go.mod b/go.mod index 29427b02f..ff764b4d9 100644 --- a/go.mod +++ b/go.mod @@ -95,6 +95,7 @@ require ( github.com/naoina/go-stringutil v0.1.0 // indirect github.com/nats-io/nats-server/v2 v2.1.4 github.com/nats-io/nats.go v1.9.1 + github.com/newrelic/newrelic-telemetry-sdk-go v0.2.0 github.com/nsqio/go-nsq v1.0.7 github.com/openconfig/gnmi v0.0.0-20180912164834-33a1865c3029 github.com/opencontainers/go-digest v1.0.0-rc1 // indirect diff --git a/go.sum b/go.sum index 76862fcfe..aca1b99a8 100644 --- a/go.sum +++ b/go.sum @@ -445,6 +445,8 @@ github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/newrelic/newrelic-telemetry-sdk-go v0.2.0 h1:W8+lNIfAldCScGiikToSprbf3DCaMXk0VIM9l73BIpY= +github.com/newrelic/newrelic-telemetry-sdk-go v0.2.0/go.mod h1:G9MqE/cHGv3Hx3qpYhfuyFUsGx2DpVcGi1iJIqTg+JQ= github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY= github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 35e0393de..7d37c2208 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -25,6 +25,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/librato" _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" _ "github.com/influxdata/telegraf/plugins/outputs/nats" + _ "github.com/influxdata/telegraf/plugins/outputs/newrelic" _ "github.com/influxdata/telegraf/plugins/outputs/nsq" _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" diff --git a/plugins/outputs/newrelic/README.md b/plugins/outputs/newrelic/README.md new file mode 100644 index 000000000..323595711 --- /dev/null +++ b/plugins/outputs/newrelic/README.md @@ -0,0 +1,22 @@ +#New Relic output plugin + +This plugins writes to New Relic insights. + +``` +[[outputs.newrelic]] +## New Relic Insights API key +insights_key = "insights api key" + +# metric_prefix if defined, prefix's metrics name for easy identification +# metric_prefix = "" + +# harvest timeout, default is 15 seconds +# timeout = "15s" +``` +####Parameters + +|Parameter Name|Type|Description| +|:-|:-|:-| +| insights_key | Required | Insights API Insert key | +| metric_prefix | Optional | If defined, prefix's metrics name for easy identification | +| timeout | Optional | If defined, changes harvest timeout | diff --git a/plugins/outputs/newrelic/newrelic.go b/plugins/outputs/newrelic/newrelic.go new file mode 100644 index 000000000..07e2569c3 --- /dev/null +++ b/plugins/outputs/newrelic/newrelic.go @@ -0,0 +1,159 @@ +package newrelic + +// newrelic.go +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" + "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" +) + +// NewRelic nr structure +type NewRelic struct { + harvestor *telemetry.Harvester + dc *cumulative.DeltaCalculator + InsightsKey string `toml:"insights_key"` + MetricPrefix string `toml:"metric_prefix"` + Timeout internal.Duration `toml:"timeout"` + savedErrors map[int]interface{} + errorCount int + Client http.Client +} + +// Description returns a one-sentence description on the Output +func (nr *NewRelic) Description() string { + return "Send metrics to New Relic metrics endpoint" +} + +// SampleConfig : return default configuration of the Output +func (nr *NewRelic) SampleConfig() string { + return ` + ## New Relic Insights API key (required) + insights_key = "insights api key" + + # metric_prefix if defined, prefix's metrics name for easy identification (optional) + # metric_prefix = "" + + # harvest timeout, default is 15 seconds + # timeout = "15s" +` +} + +// Connect to the Output +func (nr *NewRelic) Connect() error { + if nr.InsightsKey == "" { + return fmt.Errorf("InsightKey is a required for newrelic") + } + var err error + nr.harvestor, err = telemetry.NewHarvester(telemetry.ConfigAPIKey(nr.InsightsKey), + telemetry.ConfigHarvestPeriod(0), + func(cfg *telemetry.Config) { + cfg.Product = "NewRelic-Telegraf-Plugin" + cfg.ProductVersion = "1.0" + cfg.HarvestTimeout = nr.Timeout.Duration + cfg.Client = &nr.Client + cfg.ErrorLogger = func(e map[string]interface{}) { + var errorString string + for k, v := range e { + errorString += fmt.Sprintf("%s = %s ", k, v) + } + nr.errorCount++ + nr.savedErrors[nr.errorCount] = errorString + } + }) + if err != nil { + return fmt.Errorf("unable to connect to newrelic %v", err) + } + + nr.dc = cumulative.NewDeltaCalculator() + return nil +} + +// Close any connections to the Output +func (nr *NewRelic) Close() error { + + nr.errorCount = 0 + nr.Client.CloseIdleConnections() + return nil +} + +// Write takes in group of points to be written to the Output +func (nr *NewRelic) Write(metrics []telegraf.Metric) error { + + nr.errorCount = 0 + nr.savedErrors = make(map[int]interface{}) + + for _, metric := range metrics { + // create tag map + tags := make(map[string]interface{}) + for _, tag := range metric.TagList() { + tags[tag.Key] = tag.Value + } + for _, field := range metric.FieldList() { + var mvalue float64 + var mname string + if nr.MetricPrefix != "" { + mname = nr.MetricPrefix + "." + metric.Name() + "." + field.Key + } else { + mname = metric.Name() + "." + field.Key + } + switch n := field.Value.(type) { + case int64: + mvalue = float64(n) + case uint64: + mvalue = float64(n) + case float64: + mvalue = float64(n) + case bool: + mvalue = float64(0) + if n { + mvalue = float64(1) + } + case string: + // Do not log everytime we encounter string + // we just skip + continue + default: + return fmt.Errorf("Undefined field type: %T", field.Value) + } + + switch metric.Type() { + case telegraf.Counter: + if counter, ok := nr.dc.CountMetric(mname, tags, mvalue, metric.Time()); ok { + nr.harvestor.RecordMetric(counter) + } + default: + nr.harvestor.RecordMetric(telemetry.Gauge{ + Timestamp: metric.Time(), + Value: mvalue, + Name: mname, + Attributes: tags}) + } + } + } + // By default, the Harvester sends metrics and spans to the New Relic + // backend every 5 seconds. You can force data to be sent at any time + // using HarvestNow. + nr.harvestor.HarvestNow(context.Background()) + + //Check if we encountered errors + if nr.errorCount != 0 { + return fmt.Errorf("unable to harvest metrics %s ", nr.savedErrors[nr.errorCount]) + } + return nil +} + +func init() { + outputs.Add("newrelic", func() telegraf.Output { + return &NewRelic{ + Timeout: internal.Duration{Duration: time.Second * 15}, + Client: http.Client{}, + } + }) +} diff --git a/plugins/outputs/newrelic/newrelic_test.go b/plugins/outputs/newrelic/newrelic_test.go new file mode 100644 index 000000000..aa23950c7 --- /dev/null +++ b/plugins/outputs/newrelic/newrelic_test.go @@ -0,0 +1,180 @@ +package newrelic + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBasic(t *testing.T) { + nr := &NewRelic{ + MetricPrefix: "Test", + InsightsKey: "12345", + Timeout: internal.Duration{Duration: time.Second * 5}, + } + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + err := nr.Connect() + require.NoError(t, err) + + err = nr.Write(testutil.MockMetrics()) + assert.Contains(t, err.Error(), "unable to harvest metrics") +} + +func TestNewRelic_Write(t *testing.T) { + type args struct { + metrics []telegraf.Metric + } + tests := []struct { + name string + metrics []telegraf.Metric + auditMessage string + wantErr bool + }{ + { + name: "Test: Basic mock metric write", + metrics: testutil.MockMetrics(), + wantErr: false, + auditMessage: `"metrics":[{"name":"test1.value","type":"gauge","value":1,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test string ", + metrics: []telegraf.Metric{ + testutil.TestMetric("value1", "test_String"), + }, + wantErr: false, + auditMessage: "", + }, + { + name: "Test: Test int64 ", + metrics: []telegraf.Metric{ + testutil.TestMetric(int64(15), "test_int64"), + }, + wantErr: false, + auditMessage: `"metrics":[{"name":"test_int64.value","type":"gauge","value":15,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test uint64 ", + metrics: []telegraf.Metric{ + testutil.TestMetric(uint64(20), "test_uint64"), + }, + wantErr: false, + auditMessage: `"metrics":[{"name":"test_uint64.value","type":"gauge","value":20,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test bool true ", + metrics: []telegraf.Metric{ + testutil.TestMetric(bool(true), "test_bool_true"), + }, + wantErr: false, + auditMessage: `"metrics":[{"name":"test_bool_true.value","type":"gauge","value":1,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test bool false ", + metrics: []telegraf.Metric{ + testutil.TestMetric(bool(false), "test_bool_false"), + }, + wantErr: false, + auditMessage: `"metrics":[{"name":"test_bool_false.value","type":"gauge","value":0,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test max float64 ", + metrics: []telegraf.Metric{ + testutil.TestMetric(math.MaxFloat64, "test_maxfloat64"), + }, + wantErr: false, + auditMessage: `"metrics":[{"name":"test_maxfloat64.value","type":"gauge","value":1.7976931348623157e+308,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`, + }, + { + name: "Test: Test NAN ", + metrics: []telegraf.Metric{ + testutil.TestMetric(math.NaN, "test_NaN"), + }, + wantErr: false, + auditMessage: ``, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var auditLog map[string]interface{} + nr := &NewRelic{} + nr.harvestor, _ = telemetry.NewHarvester( + telemetry.ConfigHarvestPeriod(0), + func(cfg *telemetry.Config) { + cfg.APIKey = "dummyTestKey" + cfg.HarvestPeriod = 0 + cfg.HarvestTimeout = 0 + cfg.AuditLogger = func(e map[string]interface{}) { + auditLog = e + } + }) + err := nr.Write(tt.metrics) + assert.NoError(t, err) + if auditLog["data"] != nil { + assert.Contains(t, auditLog["data"], tt.auditMessage) + } else { + assert.Contains(t, "", tt.auditMessage) + } + + if (err != nil) != tt.wantErr { + t.Errorf("NewRelic.Write() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestNewRelic_Connect(t *testing.T) { + tests := []struct { + name string + newrelic *NewRelic + wantErr bool + }{ + { + name: "Test: No Insights key", + newrelic: &NewRelic{ + MetricPrefix: "prefix", + }, + wantErr: true, + }, + { + name: "Test: Insights key", + newrelic: &NewRelic{ + InsightsKey: "12312133", + MetricPrefix: "prefix", + }, + wantErr: false, + }, + { + name: "Test: Only Insights key", + newrelic: &NewRelic{ + InsightsKey: "12312133", + }, + wantErr: false, + }, + { + name: "Test: Insights key and Timeout", + newrelic: &NewRelic{ + InsightsKey: "12312133", + Timeout: internal.Duration{Duration: time.Second * 5}, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nr := tt.newrelic + if err := nr.Connect(); (err != nil) != tt.wantErr { + t.Errorf("NewRelic.Connect() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}