From 863af9d1d4375868d4cd2cd4cab7d53760167f7f Mon Sep 17 00:00:00 2001 From: Karol Zadora-Przylecki Date: Tue, 15 May 2018 16:05:59 -0700 Subject: [PATCH] Add Microsoft Application Insights output plugin (#4010) --- Godeps | 2 + plugins/outputs/all/all.go | 1 + .../outputs/application_insights/README.md | 32 ++ .../application_insights.go | 357 +++++++++++++ .../application_insights_test.go | 477 ++++++++++++++++++ .../diagnostic_message_subscriber.go | 12 + .../mocks/diagnostics_message_listener.go | 12 + .../mocks/diagnostics_message_subscriber.go | 27 + .../application_insights/mocks/transmitter.go | 32 ++ .../application_insights/transmitter.go | 19 + 10 files changed, 971 insertions(+) create mode 100644 plugins/outputs/application_insights/README.md create mode 100644 plugins/outputs/application_insights/application_insights.go create mode 100644 plugins/outputs/application_insights/application_insights_test.go create mode 100644 plugins/outputs/application_insights/diagnostic_message_subscriber.go create mode 100644 plugins/outputs/application_insights/mocks/diagnostics_message_listener.go create mode 100644 plugins/outputs/application_insights/mocks/diagnostics_message_subscriber.go create mode 100644 plugins/outputs/application_insights/mocks/transmitter.go create mode 100644 plugins/outputs/application_insights/transmitter.go diff --git a/Godeps b/Godeps index 12cf9cb92..ff845abaa 100644 --- a/Godeps +++ b/Godeps @@ -1,3 +1,4 @@ +code.cloudfoundry.org/clock e9dc86bbf0e5bbe6bf7ff5a6f71e048959b61f71 collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 github.com/aerospike/aerospike-client-go 9701404f4c60a6ea256595d24bf318f721a7e8b8 github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -41,6 +42,7 @@ github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c +github.com/Microsoft/ApplicationInsights-Go 3612f58550c1de70f1a110c78c830e55f29aa65d github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 github.com/mitchellh/mapstructure d0303fe809921458f417bcf828397a65db30a7e4 diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 94f5149b4..037807c22 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amqp" + _ "github.com/influxdata/telegraf/plugins/outputs/application_insights" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" diff --git a/plugins/outputs/application_insights/README.md b/plugins/outputs/application_insights/README.md new file mode 100644 index 000000000..c5b925c2a --- /dev/null +++ b/plugins/outputs/application_insights/README.md @@ -0,0 +1,32 @@ +# Application Insights Output Plugin + +This plugin writes telegraf metrics to Azure Application Insights + +## Configuration +``` +[[outputs.application_insights]] + ## Instrumentation key of the Application Insights resource. + instrumentationKey = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx" + + ## Timeout on close. If not provided, will default to 5s. 0s means no timeout (not recommended). + # timeout = "5s" + + ## Determines whether diagnostic logging (for Application Insights endpoint traffic) is enabled. Default is false. + # enable_diagnosic_logging = "true" + + ## ContextTagSources dictionary instructs the Application Insights plugin to set Application Insights context tags using metric properties. + ## In this dictionary keys are Application Insights context tags to set, and values are names of metric properties to use as source of data. + ## For example: + # [outputs.application_insights.context_tag_sources] + # "ai.cloud.role" = "kubernetes_container_name" + # "ai.cloud.roleInstance" = "kubernetes_pod_name" + ## will set the ai.cloud.role context tag to the value of kubernetes_container_name property (if present), + ## and the ai.cloud.roleInstance context tag to the value of kubernetes_pod_name property. + ## For list of all context tag keys see https://github.com/Microsoft/ApplicationInsights-Go/blob/master/appinsights/contracts/contexttagkeys.go +``` + +## Implementation notes +- Every field in a metric will result in a separate metric telemetry. For example, the metric `foo,host=a first=42,second=43 1525293034000000000` +will result in two metric telemetry records sent to Application Insights: first named `foo_first` and value of 42, and the secod named `foo_second` and a value of 43 (both having property `host` set to "a". \ +\ +The exception is a single-field metric with a value named `value`, in that case the single metric telemetry created will use just the whole metric name without the "value" suffix. For example, `bar,host=a value=23 1525293034000000000` will result in a telemetry named `bar` and value 23. \ No newline at end of file diff --git a/plugins/outputs/application_insights/application_insights.go b/plugins/outputs/application_insights/application_insights.go new file mode 100644 index 000000000..895b8530b --- /dev/null +++ b/plugins/outputs/application_insights/application_insights.go @@ -0,0 +1,357 @@ +package application_insights + +import ( + "fmt" + "log" + "math" + "time" + "unsafe" + + "github.com/Microsoft/ApplicationInsights-Go/appinsights" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type TelemetryTransmitter interface { + Track(appinsights.Telemetry) + Close() <-chan struct{} +} + +type DiagnosticsMessageSubscriber interface { + Subscribe(appinsights.DiagnosticsMessageHandler) appinsights.DiagnosticsMessageListener +} + +type ApplicationInsights struct { + InstrumentationKey string + Timeout internal.Duration + EnableDiagnosticLogging bool + ContextTagSources map[string]string + diagMsgSubscriber DiagnosticsMessageSubscriber + transmitter TelemetryTransmitter + diagMsgListener appinsights.DiagnosticsMessageListener +} + +const ( + Error = "E! " + Warning = "W! " + Info = "I! " + Debug = "D! " +) + +var ( + sampleConfig = ` +## Instrumentation key of the Application Insights resource. + instrumentation_key = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx" + +## Timeout on close. If not provided, will default to 5s. 0s means no timeout (not recommended). +# timeout = "5s" + +## Determines whether diagnostic logging (for Application Insights endpoint traffic) is enabled. Default is false. +# enable_diagnosic_logging = "true" + +## ContextTagSources dictionary instructs the Application Insights plugin to set Application Insights context tags using metric properties. +## In this dictionary keys are Application Insights context tags to set, and values are names of metric properties to use as source of data. +## For example: +# [outputs.application_insights.context_tag_sources] +# "ai.cloud.role" = "kubernetes_container_name" +# "ai.cloud.roleInstance" = "kubernetes_pod_name" +## will set the ai.cloud.role context tag to the value of kubernetes_container_name property (if present), +## and the ai.cloud.roleInstance context tag to the value of kubernetes_pod_name property. +## For list of all context tag keys see https://github.com/Microsoft/ApplicationInsights-Go/blob/master/appinsights/contracts/contexttagkeys.go +` + is32Bit bool + is32BitChecked bool +) + +func (a *ApplicationInsights) SampleConfig() string { + return sampleConfig +} + +func (a *ApplicationInsights) Description() string { + return "Send telegraf metrics to Azure Application Insights" +} + +func (a *ApplicationInsights) Connect() error { + if a.InstrumentationKey == "" { + return fmt.Errorf("Instrumentation key is required") + } + + if a.transmitter == nil { + a.transmitter = NewTransmitter(a.InstrumentationKey) + } + + if a.EnableDiagnosticLogging && a.diagMsgSubscriber != nil { + a.diagMsgListener = a.diagMsgSubscriber.Subscribe(func(msg string) error { + logOutputMsg(Info, "%s", msg) + return nil + }) + } + + return nil +} + +func (a *ApplicationInsights) Write(metrics []telegraf.Metric) error { + for _, metric := range metrics { + allMetricTelemetry := a.createTelemetry(metric) + for _, telemetry := range allMetricTelemetry { + a.transmitter.Track(telemetry) + } + } + + return nil +} + +func (a *ApplicationInsights) Close() error { + if a.diagMsgListener != nil { + // We want to listen to diagnostic messages during closing + // That is why we stop listening only after Close() ends (or a timeout occurs) + defer a.diagMsgListener.Remove() + } + + if a.transmitter == nil { + return nil + } + + select { + case <-a.transmitter.Close(): + logOutputMsg(Info, "Closed") + case <-time.After(a.Timeout.Duration): + logOutputMsg(Warning, "Close operation timed out after %v", a.Timeout.Duration) + } + + return nil +} + +func (a *ApplicationInsights) createTelemetry(metric telegraf.Metric) []appinsights.Telemetry { + aggregateTelemetry, usedFields := a.createAggregateMetricTelemetry(metric) + if aggregateTelemetry != nil { + telemetry := a.createTelemetryForUnusedFields(metric, usedFields) + telemetry = append(telemetry, aggregateTelemetry) + return telemetry + } + + fields := metric.Fields() + if len(fields) == 1 && metric.FieldList()[0].Key == "value" { + // Just use metric name as the telemetry name + telemetry := a.createSimpleMetricTelemetry(metric, "value", false) + if telemetry != nil { + return []appinsights.Telemetry{telemetry} + } else { + return nil + } + } else { + // AppInsights does not support multi-dimensional metrics at the moment, so we need to disambiguate resulting telemetry + // by adding field name as the telemetry name suffix + retval := a.createTelemetryForUnusedFields(metric, nil) + return retval + } +} + +func (a *ApplicationInsights) createSimpleMetricTelemetry(metric telegraf.Metric, fieldName string, useFieldNameInTelemetryName bool) *appinsights.MetricTelemetry { + telemetryValue, err := getFloat64TelemetryPropertyValue([]string{fieldName}, metric, nil) + if err != nil { + return nil + } + + var telemetryName string + if useFieldNameInTelemetryName { + telemetryName = metric.Name() + "_" + fieldName + } else { + telemetryName = metric.Name() + } + telemetry := appinsights.NewMetricTelemetry(telemetryName, telemetryValue) + telemetry.Properties = metric.Tags() + a.addContextTags(metric, telemetry) + telemetry.Timestamp = metric.Time() + return telemetry +} + +func (a *ApplicationInsights) createAggregateMetricTelemetry(metric telegraf.Metric) (*appinsights.AggregateMetricTelemetry, []string) { + usedFields := make([]string, 0, 6) // We will use up to 6 fields + + // Get the sum of all individual measurements(mandatory property) + telemetryValue, err := getFloat64TelemetryPropertyValue([]string{"sum", "value"}, metric, &usedFields) + if err != nil { + return nil, nil + } + + // Get the count of measurements (mandatory property) + telemetryCount, err := getIntTelemetryPropertyValue([]string{"count", "samples"}, metric, &usedFields) + if err != nil { + return nil, nil + } + + telemetry := appinsights.NewAggregateMetricTelemetry(metric.Name()) + telemetry.Value = telemetryValue + telemetry.Count = telemetryCount + telemetry.Properties = metric.Tags() + a.addContextTags(metric, telemetry) + telemetry.Timestamp = metric.Time() + + // We attempt to set min, max, variance and stddev fields but do not really care if they are not present-- + // they are not essential for aggregate metric. + // By convention AppInsights prefers stddev over variance, so to be consistent, we test for stddev after testing for variance. + telemetry.Min, _ = getFloat64TelemetryPropertyValue([]string{"min"}, metric, &usedFields) + telemetry.Max, _ = getFloat64TelemetryPropertyValue([]string{"max"}, metric, &usedFields) + telemetry.Variance, _ = getFloat64TelemetryPropertyValue([]string{"variance"}, metric, &usedFields) + telemetry.StdDev, _ = getFloat64TelemetryPropertyValue([]string{"stddev"}, metric, &usedFields) + + return telemetry, usedFields +} + +func (a *ApplicationInsights) createTelemetryForUnusedFields(metric telegraf.Metric, usedFields []string) []appinsights.Telemetry { + fields := metric.Fields() + retval := make([]appinsights.Telemetry, 0, len(fields)) + + for fieldName := range fields { + if contains(usedFields, fieldName) { + continue + } + + telemetry := a.createSimpleMetricTelemetry(metric, fieldName, true) + if telemetry != nil { + retval = append(retval, telemetry) + } + } + + return retval +} + +func (a *ApplicationInsights) addContextTags(metric telegraf.Metric, telemetry appinsights.Telemetry) { + for contextTagName, tagSourceName := range a.ContextTagSources { + if contextTagValue, found := metric.GetTag(tagSourceName); found { + telemetry.ContextTags()[contextTagName] = contextTagValue + } + } +} + +func getFloat64TelemetryPropertyValue( + candidateFields []string, + metric telegraf.Metric, + usedFields *[]string) (float64, error) { + + for _, fieldName := range candidateFields { + fieldValue, found := metric.GetField(fieldName) + if !found { + continue + } + + metricValue, err := toFloat64(fieldValue) + if err != nil { + continue + } + + if usedFields != nil { + *usedFields = append(*usedFields, fieldName) + } + + return metricValue, nil + } + + return 0.0, fmt.Errorf("No field from the candidate list was found in the metric") +} + +func getIntTelemetryPropertyValue( + candidateFields []string, + metric telegraf.Metric, + usedFields *[]string) (int, error) { + + for _, fieldName := range candidateFields { + fieldValue, found := metric.GetField(fieldName) + if !found { + continue + } + + metricValue, err := toInt(fieldValue) + if err != nil { + continue + } + + if usedFields != nil { + *usedFields = append(*usedFields, fieldName) + } + + return metricValue, nil + } + + return 0, fmt.Errorf("No field from the candidate list was found in the metric") +} + +func contains(set []string, val string) bool { + for _, elem := range set { + if elem == val { + return true + } + } + + return false +} + +func toFloat64(value interface{}) (float64, error) { + // Out of all Golang numerical types Telegraf only uses int64, unit64 and float64 for fields + switch v := value.(type) { + case int64: + return float64(v), nil + case uint64: + return float64(v), nil + case float64: + return v, nil + } + + return 0.0, fmt.Errorf("[%s] cannot be converted to a float64 value", value) +} + +func toInt(value interface{}) (int, error) { + if !is32BitChecked { + is32BitChecked = true + var i int + if unsafe.Sizeof(i) == 4 { + is32Bit = true + } else { + is32Bit = false + } + } + + // Out of all Golang numerical types Telegraf only uses int64, unit64 and float64 for fields + switch v := value.(type) { + case uint64: + if is32Bit { + if v > math.MaxInt32 { + return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v) + } + } else { + if v > math.MaxInt64 { + return 0, fmt.Errorf("Value [%d] out of range of 64-bit integers", v) + } + } + + return int(v), nil + + case int64: + if is32Bit { + if v > math.MaxInt32 || v < math.MinInt32 { + return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v) + } + } + + return int(v), nil + } + + return 0.0, fmt.Errorf("[%s] cannot be converted to an int value", value) +} + +func logOutputMsg(level string, format string, v ...interface{}) { + log.Printf(level+"[outputs.application_insights] "+format, v...) +} + +func init() { + outputs.Add("application_insights", func() telegraf.Output { + return &ApplicationInsights{ + Timeout: internal.Duration{Duration: time.Second * 5}, + diagMsgSubscriber: diagnosticsMessageSubscriber{}, + // It is very common to set Cloud.RoleName and Cloud.RoleInstance context properties, hence initial capacity of two + ContextTagSources: make(map[string]string, 2), + } + }) +} diff --git a/plugins/outputs/application_insights/application_insights_test.go b/plugins/outputs/application_insights/application_insights_test.go new file mode 100644 index 000000000..561e6c9f9 --- /dev/null +++ b/plugins/outputs/application_insights/application_insights_test.go @@ -0,0 +1,477 @@ +package application_insights + +import ( + "math" + "testing" + "time" + + "github.com/Microsoft/ApplicationInsights-Go/appinsights" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/outputs/application_insights/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestConnectFailsIfNoIkey(t *testing.T) { + assert := assert.New(t) + + transmitter := new(mocks.Transmitter) + transmitter.On("Close").Return(closed) + + ai := ApplicationInsights{ + transmitter: transmitter, + // Very long timeout to ensure we do not rely on timeouts for closing the transmitter + Timeout: internal.Duration{Duration: time.Hour}, + } + + err := ai.Connect() + assert.Error(err) +} + +func TestOutputCloseTimesOut(t *testing.T) { + assert := assert.New(t) + + transmitter := new(mocks.Transmitter) + transmitter.On("Close").Return(unfinished) + + ai := ApplicationInsights{ + transmitter: transmitter, + Timeout: internal.Duration{Duration: time.Millisecond * 50}, + } + + err := ai.Close() + assert.NoError(err) + transmitter.AssertCalled(t, "Close") +} + +func TestCloseRemovesDiagMsgListener(t *testing.T) { + assert := assert.New(t) + + transmitter := new(mocks.Transmitter) + transmitter.On("Close").Return(closed) + + diagMsgListener := new(mocks.DiagnosticsMessageListener) + diagMsgListener.On("Remove") + + diagMsgSubscriber := new(mocks.DiagnosticsMessageSubscriber) + diagMsgSubscriber. + On("Subscribe", mock.AnythingOfType("appinsights.DiagnosticsMessageHandler")). + Return(diagMsgListener) + + ai := ApplicationInsights{ + transmitter: transmitter, + Timeout: internal.Duration{Duration: time.Hour}, + EnableDiagnosticLogging: true, + diagMsgSubscriber: diagMsgSubscriber, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + } + + err := ai.Connect() + assert.NoError(err) + diagMsgSubscriber.AssertCalled(t, "Subscribe", mock.AnythingOfType("appinsights.DiagnosticsMessageHandler")) + + err = ai.Close() + assert.NoError(err) + transmitter.AssertCalled(t, "Close") + diagMsgListener.AssertCalled(t, "Remove") +} + +func TestAggregateMetricCreated(t *testing.T) { + tests := []struct { + name string + fields map[string]interface{} + valueField string + countField string + additionalMetricValueFields []string + }{ + {"value and count", map[string]interface{}{"value": 16.5, "count": 23}, "value", "count", nil}, + {"value and samples", map[string]interface{}{"value": 16.5, "samples": 23}, "value", "samples", nil}, + {"sum and count", map[string]interface{}{"sum": 16.5, "count": 23}, "sum", "count", nil}, + {"sum and samples", map[string]interface{}{"samples": 23, "sum": 16.5}, "sum", "samples", nil}, + {"value and count, sum is wrong type", map[string]interface{}{"sum": "J23", "value": 16.5, "count": 23}, "value", "count", nil}, + { + "with aggregates", + map[string]interface{}{ + "value": 16.5, + "count": 23, + "min": -2.1, + "max": 34, + "stddev": 3.4, + }, + "value", + "count", + nil, + }, + { + "some aggregates with invalid values", + map[string]interface{}{ + "value": 16.5, + "count": 23, + "min": "min", + "max": []float64{3.4, 5.6}, + "stddev": struct { + name string + value float64 + }{"delta", 7.0}, + }, + "value", + "count", + nil, + }, + { + "aggregate with additional fields", + map[string]interface{}{"value": 16.5, "samples": 23, "alpha": -34e12, "bravo": -3, "charlie": "charlie"}, + "value", + "samples", + []string{"alpha", "bravo"}, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + assert := assert.New(t) + now := time.Now().UTC() + + transmitter := new(mocks.Transmitter) + transmitter.On("Track", mock.Anything) + metricName := "ShouldBeAggregateMetric" + + m, err := metric.New( + metricName, + nil, // tags + tt.fields, + now, + ) + assert.NoError(err) + + ai := ApplicationInsights{ + transmitter: transmitter, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + } + + err = ai.Connect() + assert.NoError(err) + + mSet := []telegraf.Metric{m} + ai.Write(mSet) + transmitter.AssertNumberOfCalls(t, "Track", 1+len(tt.additionalMetricValueFields)) + var pAggregateTelemetry *appinsights.AggregateMetricTelemetry + assert.IsType(pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry") + aggregateTelemetry := transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry) + verifyAggregateTelemetry(assert, m, tt.valueField, tt.countField, aggregateTelemetry) + + verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName) + } + + t.Run(tt.name, tf) + } +} + +func TestSimpleMetricCreated(t *testing.T) { + tests := []struct { + name string + fields map[string]interface{} + primaryMetricValueField string + additionalMetricValueFields []string + }{ + {"just a single value field", map[string]interface{}{"value": 16.5}, "value", nil}, + {"single field not named value", map[string]interface{}{"first": 32.9}, "first", nil}, + {"value but no count", map[string]interface{}{"value": 16.5, "other": "bulba"}, "", []string{"value"}}, + {"count but no value", map[string]interface{}{"v1": "v1Val", "count": 23}, "", []string{"count"}}, + {"neither value nor count", map[string]interface{}{"v1": "alpha", "v2": 45.8}, "", []string{"v2"}}, + {"value is of wrong type", map[string]interface{}{"value": "alpha", "count": 15}, "", []string{"count"}}, + {"count is of wrong type", map[string]interface{}{"value": 23.77, "count": 7.5}, "", []string{"count", "value"}}, + {"count is out of range", map[string]interface{}{"value": -98.45E4, "count": math.MaxUint64 - uint64(20)}, "", []string{"value", "count"}}, + {"several additional fields", map[string]interface{}{"alpha": 10, "bravo": "bravo", "charlie": 30, "delta": 40.7}, "", []string{"alpha", "charlie", "delta"}}, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + assert := assert.New(t) + now := time.Now().UTC() + + transmitter := new(mocks.Transmitter) + transmitter.On("Track", mock.Anything) + metricName := "ShouldBeSimpleMetric" + + m, err := metric.New( + metricName, + nil, // tags + tt.fields, + now, + ) + assert.NoError(err) + + ai := ApplicationInsights{ + transmitter: transmitter, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + } + + err = ai.Connect() + assert.NoError(err) + + mSet := []telegraf.Metric{m} + ai.Write(mSet) + + expectedNumberOfCalls := len(tt.additionalMetricValueFields) + if tt.primaryMetricValueField != "" { + expectedNumberOfCalls++ + } + + transmitter.AssertNumberOfCalls(t, "Track", expectedNumberOfCalls) + if tt.primaryMetricValueField != "" { + var pMetricTelemetry *appinsights.MetricTelemetry + assert.IsType(pMetricTelemetry, transmitter.Calls[0].Arguments.Get(0), "First created telemetry should be simple MetricTelemetry") + metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry) + + var expectedTelemetryName string + if tt.primaryMetricValueField == "value" { + expectedTelemetryName = m.Name() + } else { + expectedTelemetryName = m.Name() + "_" + tt.primaryMetricValueField + } + verifySimpleTelemetry(assert, m, tt.primaryMetricValueField, expectedTelemetryName, metricTelemetry) + } + + verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName) + } + + t.Run(tt.name, tf) + } +} + +func TestTagsAppliedToTelemetry(t *testing.T) { + tests := []struct { + name string + fields map[string]interface{} + tags map[string]string + metricValueFields []string + }{ + { + "value but no count", + map[string]interface{}{"value": 16.5, "alpha": 3.5, "bravo": 17}, + map[string]string{"alpha": "a tag is not a field", "charlie": "charlie"}, + []string{"value", "alpha", "bravo"}, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + assert := assert.New(t) + now := time.Now().UTC() + + transmitter := new(mocks.Transmitter) + transmitter.On("Track", mock.Anything) + metricName := "ShouldBeSimpleMetric" + + m, err := metric.New( + metricName, + tt.tags, + tt.fields, + now, + ) + assert.NoError(err) + + ai := ApplicationInsights{ + transmitter: transmitter, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + } + + err = ai.Connect() + assert.NoError(err) + + mSet := []telegraf.Metric{m} + ai.Write(mSet) + transmitter.AssertNumberOfCalls(t, "Track", len(tt.metricValueFields)) + transmitter.AssertCalled(t, "Track", mock.AnythingOfType("*appinsights.MetricTelemetry")) + + // Will verify that all original tags are present in telemetry.Properies map + verifyAdditionalTelemetry(assert, m, transmitter, tt.metricValueFields, metricName) + } + + t.Run(tt.name, tf) + } +} + +func TestContextTagsSetOnSimpleTelemetry(t *testing.T) { + assert := assert.New(t) + now := time.Now().UTC() + + transmitter := new(mocks.Transmitter) + transmitter.On("Track", mock.Anything) + + m, err := metric.New( + "SimpleMetric", + map[string]string{"kubernetes_container_name": "atcsvc", "kubernetes_pod_name": "bunkie17554"}, + map[string]interface{}{"value": 23.0}, + now, + ) + assert.NoError(err) + + ai := ApplicationInsights{ + transmitter: transmitter, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + ContextTagSources: map[string]string{ + "ai.cloud.role": "kubernetes_container_name", + "ai.cloud.roleInstance": "kubernetes_pod_name", + "ai.user.id": "nonexistent", + }, + } + + err = ai.Connect() + assert.NoError(err) + + mSet := []telegraf.Metric{m} + ai.Write(mSet) + transmitter.AssertNumberOfCalls(t, "Track", 1) + metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry) + cloudTags := metricTelemetry.Tags.Cloud() + assert.Equal("atcsvc", cloudTags.GetRole()) + assert.Equal("bunkie17554", cloudTags.GetRoleInstance()) +} + +func TestContextTagsSetOnAggregateTelemetry(t *testing.T) { + assert := assert.New(t) + now := time.Now().UTC() + + transmitter := new(mocks.Transmitter) + transmitter.On("Track", mock.Anything) + + m, err := metric.New( + "AggregateMetric", + map[string]string{"kubernetes_container_name": "atcsvc", "kubernetes_pod_name": "bunkie17554"}, + map[string]interface{}{"value": 23.0, "count": 5}, + now, + ) + assert.NoError(err) + + ai := ApplicationInsights{ + transmitter: transmitter, + InstrumentationKey: "1234", // Fake, but necessary to enable tracking + ContextTagSources: map[string]string{ + "ai.cloud.role": "kubernetes_container_name", + "ai.cloud.roleInstance": "kubernetes_pod_name", + "ai.user.id": "nonexistent", + }, + } + + err = ai.Connect() + assert.NoError(err) + + mSet := []telegraf.Metric{m} + ai.Write(mSet) + transmitter.AssertNumberOfCalls(t, "Track", 1) + metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry) + cloudTags := metricTelemetry.Tags.Cloud() + assert.Equal("atcsvc", cloudTags.GetRole()) + assert.Equal("bunkie17554", cloudTags.GetRoleInstance()) +} + +func closed() <-chan struct{} { + closed := make(chan struct{}) + close(closed) + return closed +} + +func unfinished() <-chan struct{} { + unfinished := make(chan struct{}) + return unfinished +} + +func verifyAggregateTelemetry( + assert *assert.Assertions, + metric telegraf.Metric, + valueField string, + countField string, + telemetry *appinsights.AggregateMetricTelemetry, +) { + + verifyAggregateField := func(fieldName string, telemetryValue float64) { + metricRawFieldValue, found := metric.Fields()[fieldName] + if !found { + return + } + + if _, err := toFloat64(metricRawFieldValue); err == nil { + assert.EqualValues(metricRawFieldValue, telemetryValue, "Telemetry property %s does not match the metric field", fieldName) + } + } + assert.Equal(metric.Name(), telemetry.Name, "Telemetry name should be the same as metric name") + assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") + assert.EqualValues(metric.Fields()[countField], telemetry.Count, "Telemetry sample count does not mach metric sample count field") + verifyAggregateField("min", telemetry.Min) + verifyAggregateField("max", telemetry.Max) + verifyAggregateField("stdev", telemetry.StdDev) + verifyAggregateField("variance", telemetry.Variance) + assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") + assertMapContains(assert, metric.Tags(), telemetry.Properties) +} + +func verifySimpleTelemetry( + assert *assert.Assertions, + metric telegraf.Metric, + valueField string, + expectedTelemetryName string, + telemetry *appinsights.MetricTelemetry, +) { + + assert.Equal(expectedTelemetryName, telemetry.Name, "Telemetry name is not what was expected") + assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") + assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") + assertMapContains(assert, metric.Tags(), telemetry.Properties) +} + +func verifyAdditionalTelemetry( + assert *assert.Assertions, + metric telegraf.Metric, + transmitter *mocks.Transmitter, + additionalMetricValueFields []string, + telemetryNamePrefix string, +) { + for _, fieldName := range additionalMetricValueFields { + expectedTelemetryName := telemetryNamePrefix + "_" + fieldName + telemetry := findTransmittedTelemetry(transmitter, expectedTelemetryName) + assert.NotNil(telemetry, "Expected telemetry named %s to be created, but could not find it", expectedTelemetryName) + if telemetry != nil { + verifySimpleTelemetry(assert, metric, fieldName, expectedTelemetryName, telemetry) + } + } +} + +func findTransmittedTelemetry(transmitter *mocks.Transmitter, telemetryName string) *appinsights.MetricTelemetry { + for _, call := range transmitter.Calls { + telemetry, isMetricTelemetry := call.Arguments.Get(0).(*appinsights.MetricTelemetry) + if isMetricTelemetry && telemetry.Name == telemetryName { + return telemetry + } + } + + return nil +} + +func keys(m map[string]string) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + + return keys +} + +func assertMapContains(assert *assert.Assertions, expected, actual map[string]string) { + if expected == nil && actual == nil { + return + } + + assert.NotNil(expected, "Maps not equal: expected is nil but actual is not") + assert.NotNil(actual, "Maps not equal: actual is nil but expected is not") + + for k, v := range expected { + av, ok := actual[k] + assert.True(ok, "Actual map does not contain a value for key '%s'", k) + assert.Equal(v, av, "The expected value for key '%s' is '%s' but the actual value is '%s", k, v, av) + } +} diff --git a/plugins/outputs/application_insights/diagnostic_message_subscriber.go b/plugins/outputs/application_insights/diagnostic_message_subscriber.go new file mode 100644 index 000000000..789931910 --- /dev/null +++ b/plugins/outputs/application_insights/diagnostic_message_subscriber.go @@ -0,0 +1,12 @@ +package application_insights + +import ( + "github.com/Microsoft/ApplicationInsights-Go/appinsights" +) + +type diagnosticsMessageSubscriber struct { +} + +func (ms diagnosticsMessageSubscriber) Subscribe(handler appinsights.DiagnosticsMessageHandler) appinsights.DiagnosticsMessageListener { + return appinsights.NewDiagnosticsMessageListener(handler) +} diff --git a/plugins/outputs/application_insights/mocks/diagnostics_message_listener.go b/plugins/outputs/application_insights/mocks/diagnostics_message_listener.go new file mode 100644 index 000000000..65747c10c --- /dev/null +++ b/plugins/outputs/application_insights/mocks/diagnostics_message_listener.go @@ -0,0 +1,12 @@ +package mocks + +import mock "github.com/stretchr/testify/mock" + +// DiagnosticsMessageSubscriber is an autogenerated mock type for the DiagnosticsMessageSubscriber type +type DiagnosticsMessageListener struct { + mock.Mock +} + +func (_m *DiagnosticsMessageListener) Remove() { + _m.Called() +} diff --git a/plugins/outputs/application_insights/mocks/diagnostics_message_subscriber.go b/plugins/outputs/application_insights/mocks/diagnostics_message_subscriber.go new file mode 100644 index 000000000..ba7007d40 --- /dev/null +++ b/plugins/outputs/application_insights/mocks/diagnostics_message_subscriber.go @@ -0,0 +1,27 @@ +// Code generated by mockery v1.0.0 +package mocks + +import appinsights "github.com/Microsoft/ApplicationInsights-Go/appinsights" + +import mock "github.com/stretchr/testify/mock" + +// DiagnosticsMessageSubscriber is an autogenerated mock type for the DiagnosticsMessageSubscriber type +type DiagnosticsMessageSubscriber struct { + mock.Mock +} + +// Subscribe provides a mock function with given fields: _a0 +func (_m *DiagnosticsMessageSubscriber) Subscribe(_a0 appinsights.DiagnosticsMessageHandler) appinsights.DiagnosticsMessageListener { + ret := _m.Called(_a0) + + var r0 appinsights.DiagnosticsMessageListener + if rf, ok := ret.Get(0).(func(appinsights.DiagnosticsMessageHandler) appinsights.DiagnosticsMessageListener); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(appinsights.DiagnosticsMessageListener) + } + } + + return r0 +} diff --git a/plugins/outputs/application_insights/mocks/transmitter.go b/plugins/outputs/application_insights/mocks/transmitter.go new file mode 100644 index 000000000..5cc56fbb1 --- /dev/null +++ b/plugins/outputs/application_insights/mocks/transmitter.go @@ -0,0 +1,32 @@ +// Code generated by mockery v1.0.0 +package mocks + +import appinsights "github.com/Microsoft/ApplicationInsights-Go/appinsights" + +import mock "github.com/stretchr/testify/mock" + +// Transmitter is an autogenerated mock type for the Transmitter type +type Transmitter struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Transmitter) Close() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// Track provides a mock function with given fields: _a0 +func (_m *Transmitter) Track(_a0 appinsights.Telemetry) { + _m.Called(_a0) +} diff --git a/plugins/outputs/application_insights/transmitter.go b/plugins/outputs/application_insights/transmitter.go new file mode 100644 index 000000000..44bc1b806 --- /dev/null +++ b/plugins/outputs/application_insights/transmitter.go @@ -0,0 +1,19 @@ +package application_insights + +import "github.com/Microsoft/ApplicationInsights-Go/appinsights" + +type Transmitter struct { + client appinsights.TelemetryClient +} + +func NewTransmitter(ikey string) *Transmitter { + return &Transmitter{client: appinsights.NewTelemetryClient(ikey)} +} + +func (t *Transmitter) Track(telemetry appinsights.Telemetry) { + t.client.Track(telemetry) +} + +func (t *Transmitter) Close() <-chan struct{} { + return t.client.Channel().Close(0) +}