Refactor to use AggregatingOutput

This commit is contained in:
Gunnar Aasen 2018-05-03 02:11:31 -07:00
parent 9490a22aeb
commit 79b6edadd2
4 changed files with 285 additions and 346 deletions

View File

@ -8,14 +8,8 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/prometheus/common/log"
) )
// AzureInstanceMetadata is the proxy for accessing the instance metadata service on an Azure VM
type AzureInstanceMetadata struct {
}
// VirtualMachineMetadata contains information about a VM from the metadata service // VirtualMachineMetadata contains information about a VM from the metadata service
type VirtualMachineMetadata struct { type VirtualMachineMetadata struct {
Raw string Raw string
@ -103,7 +97,7 @@ func (m *msiToken) NotBeforeTime() time.Time {
} }
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM // GetMsiToken retrieves a managed service identity token from the specified port on the local VM
func (s *AzureInstanceMetadata) getMsiToken(clientID string, resourceID string) (*msiToken, error) { func (a *AzureMonitor) getMsiToken(clientID string, resourceID string) (*msiToken, error) {
// Acquire an MSI token. Documented at: // Acquire an MSI token. Documented at:
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token // https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
// //
@ -137,16 +131,10 @@ func (s *AzureInstanceMetadata) getMsiToken(clientID string, resourceID string)
} }
req.Header.Add("Metadata", "true") req.Header.Add("Metadata", "true")
// Create the HTTP client and call the token service resp, err := a.client.Do(req)
client := http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Complete reading the body
defer resp.Body.Close() defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body) reply, err := ioutil.ReadAll(resp.Body)
@ -174,22 +162,17 @@ const (
) )
// GetInstanceMetadata retrieves metadata about the current Azure VM // GetInstanceMetadata retrieves metadata about the current Azure VM
func (s *AzureInstanceMetadata) GetInstanceMetadata() (*VirtualMachineMetadata, error) { func (a *AzureMonitor) GetInstanceMetadata() (*VirtualMachineMetadata, error) {
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil) req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
if err != nil { if err != nil {
log.Errorf("Error creating HTTP request") return nil, fmt.Errorf("Error creating HTTP request")
return nil, err
} }
req.Header.Set("Metadata", "true") req.Header.Set("Metadata", "true")
client := http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req) resp, err := a.client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body) reply, err := ioutil.ReadAll(resp.Body)

View File

@ -8,7 +8,7 @@ import (
) )
func TestGetMetadata(t *testing.T) { func TestGetMetadata(t *testing.T) {
azureMetadata := &AzureInstanceMetadata{} azureMetadata := &AzureMonitor{}
metadata, err := azureMetadata.GetInstanceMetadata() metadata, err := azureMetadata.GetInstanceMetadata()
require.NoError(t, err) require.NoError(t, err)
@ -27,7 +27,7 @@ func TestGetMetadata(t *testing.T) {
} }
func TestGetTOKEN(t *testing.T) { func TestGetTOKEN(t *testing.T) {
azureMetadata := &AzureInstanceMetadata{} azureMetadata := &AzureMonitor{}
resourceID := "https://ingestion.monitor.azure.com/" resourceID := "https://ingestion.monitor.azure.com/"
token, err := azureMetadata.getMsiToken("", resourceID) token, err := azureMetadata.getMsiToken("", resourceID)

View File

@ -2,24 +2,30 @@ package azuremonitor
import ( import (
"bytes" "bytes"
"crypto/tls" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"hash/fnv"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"strconv" "regexp"
"time" "time"
"github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/azure"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
var _ telegraf.AggregatingOutput = (*AzureMonitor)(nil)
var _ telegraf.Output = (*AzureMonitor)(nil)
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service // AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
type AzureMonitor struct { type AzureMonitor struct {
useMsi bool `toml:"use_managed_service_identity"`
ResourceID string `toml:"resource_id"` ResourceID string `toml:"resource_id"`
Region string `toml:"region"` Region string `toml:"region"`
Timeout internal.Duration `toml:"Timeout"` Timeout internal.Duration `toml:"Timeout"`
@ -29,52 +35,28 @@ type AzureMonitor struct {
AzureClientSecret string `toml:"azure_client_secret"` AzureClientSecret string `toml:"azure_client_secret"`
StringAsDimension bool `toml:"string_as_dimension"` StringAsDimension bool `toml:"string_as_dimension"`
useMsi bool `toml:"use_managed_service_identity"` url string
metadataService *AzureInstanceMetadata
instanceMetadata *VirtualMachineMetadata
msiToken *msiToken msiToken *msiToken
msiResource string
bearerToken string
expiryWatermark time.Duration
oauthConfig *adal.OAuthConfig oauthConfig *adal.OAuthConfig
adalToken adal.OAuthTokenProvider adalToken adal.OAuthTokenProvider
client *http.Client client *http.Client
cache map[string]*azureMonitorMetric cache map[time.Time]map[uint64]*aggregate
period time.Duration
delay time.Duration
periodStart time.Time
periodEnd time.Time
metrics chan telegraf.Metric
shutdown chan struct{}
} }
type azureMonitorMetric struct { type aggregate struct {
Time time.Time `json:"time"` telegraf.Metric
Data *azureMonitorData `json:"data"` updated bool
} }
type azureMonitorData struct { const (
BaseData *azureMonitorBaseData `json:"baseData"` defaultRegion string = "eastus"
}
type azureMonitorBaseData struct { defaultMSIResource string = "https://monitoring.azure.com/"
Metric string `json:"metric"`
Namespace string `json:"namespace"`
DimensionNames []string `json:"dimNames"`
Series []*azureMonitorSeries `json:"series"`
}
type azureMonitorSeries struct { urlTemplate string = "https://%s.monitoring.azure.com%s/metrics"
DimensionValues []string `json:"dimValues"` )
Min float64 `json:"min"`
Max float64 `json:"max"`
Sum float64 `json:"sum"`
Count float64 `json:"count"`
}
var sampleConfig = ` var sampleConfig = `
## The resource ID against which metric will be logged. If not ## The resource ID against which metric will be logged. If not
@ -105,15 +87,24 @@ var sampleConfig = `
#azure_client_secret = "" #azure_client_secret = ""
` `
const ( // Description provides a description of the plugin
defaultRegion = "eastus" func (a *AzureMonitor) Description() string {
return "Configuration for sending aggregate metrics to Azure Monitor"
}
defaultMSIResource = "https://monitoring.azure.com/" // SampleConfig provides a sample configuration for the plugin
) func (a *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Connect initializes the plugin and validates connectivity // Connect initializes the plugin and validates connectivity
func (a *AzureMonitor) Connect() error { func (a *AzureMonitor) Connect() error {
// Set defaults a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}
// If no direct AD values provided, fall back to MSI // If no direct AD values provided, fall back to MSI
if a.AzureSubscriptionID == "" && a.AzureTenantID == "" && a.AzureClientID == "" && a.AzureClientSecret == "" { if a.AzureSubscriptionID == "" && a.AzureTenantID == "" && a.AzureClientID == "" && a.AzureClientSecret == "" {
@ -131,13 +122,8 @@ func (a *AzureMonitor) Connect() error {
a.oauthConfig = oauthConfig a.oauthConfig = oauthConfig
} }
a.metadataService = &AzureInstanceMetadata{}
// For the metrics API the MSI resource has to be https://ingestion.monitor.azure.com
a.msiResource = "https://monitoring.azure.com/"
// Validate the resource identifier // Validate the resource identifier
metadata, err := a.metadataService.GetInstanceMetadata() metadata, err := a.GetInstanceMetadata()
if err != nil { if err != nil {
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId") return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
} }
@ -147,45 +133,32 @@ func (a *AzureMonitor) Connect() error {
a.Region = metadata.Compute.Location a.Region = metadata.Compute.Location
} }
a.url := fmt.Sprintf(urlTemplate, a.Region, a.ResourceID)
// Validate credentials // Validate credentials
err = a.validateCredentials() err = a.validateCredentials()
if err != nil { if err != nil {
return err return err
} }
a.reset() a.Reset()
go a.run()
return nil return nil
} }
func (a *AzureMonitor) validateCredentials() error { func (a *AzureMonitor) validateCredentials() error {
// Use managed service identity
if a.useMsi { if a.useMsi {
// Check expiry on the token // Check expiry on the token
if a.msiToken != nil { if a.msiToken == nil || a.msiToken.ExpiresInDuration() < time.Minute {
expiryDuration := a.msiToken.ExpiresInDuration() msiToken, err := a.getMsiToken(a.AzureClientID, defaultMSIResource)
if expiryDuration > a.expiryWatermark {
return nil
}
// Token is about to expire
log.Printf("Bearer token expiring in %s; acquiring new token\n", expiryDuration.String())
a.msiToken = nil
}
// No token, acquire an MSI token
if a.msiToken == nil {
msiToken, err := a.metadataService.getMsiToken(a.AzureClientID, a.msiResource)
if err != nil { if err != nil {
return err return err
} }
log.Printf("Bearer token acquired; expiring in %s\n", msiToken.ExpiresInDuration().String())
a.msiToken = msiToken a.msiToken = msiToken
a.bearerToken = msiToken.AccessToken
} }
// Otherwise directory acquire a token return nil
} else { }
adToken, err := adal.NewServicePrincipalToken( adToken, err := adal.NewServicePrincipalToken(
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret, *(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint) azure.PublicCloud.ActiveDirectoryEndpoint)
@ -193,204 +166,211 @@ func (a *AzureMonitor) validateCredentials() error {
return fmt.Errorf("Could not acquire ADAL token: %s", err) return fmt.Errorf("Could not acquire ADAL token: %s", err)
} }
a.adalToken = adToken a.adalToken = adToken
}
return nil return nil
} }
// Description provides a description of the plugin
func (a *AzureMonitor) Description() string {
return "Configuration for sending aggregate metrics to Azure Monitor"
}
// SampleConfig provides a sample configuration for the plugin
func (a *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Close shuts down an any active connections // Close shuts down an any active connections
func (a *AzureMonitor) Close() error { func (a *AzureMonitor) Close() error {
// Close connection to the URL here a.client = nil
close(a.shutdown)
return nil return nil
} }
type azureMonitorMetric struct {
Time time.Time `json:"time"`
Data *azureMonitorData `json:"data"`
}
type azureMonitorData struct {
BaseData *azureMonitorBaseData `json:"baseData"`
}
type azureMonitorBaseData struct {
Metric string `json:"metric"`
Namespace string `json:"namespace"`
DimensionNames []string `json:"dimNames"`
Series []*azureMonitorSeries `json:"series"`
}
type azureMonitorSeries struct {
DimensionValues []string `json:"dimValues"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Sum float64 `json:"sum"`
Count int64 `json:"count"`
}
// Write writes metrics to the remote endpoint // Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error { func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
// Assemble basic stats on incoming metrics azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for _, metric := range metrics { for _, m := range metrics {
select { id := hashIDWithTagKeysOnly(m)
case a.metrics <- metric: if azm, ok := azmetrics[id]; !ok {
default: azmetrics[id] = translate(m)
log.Printf("metrics buffer is full") } else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
translate(m).Data.BaseData.Series...,
)
} }
} }
var body []byte
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
jsonBytes, err := json.Marshal(&m)
if err != nil {
return err
}
body = append(body, jsonBytes...)
body = append(body, '\n')
}
if err := a.validateCredentials(); err != nil {
return fmt.Errorf("Error authenticating: %v", err)
}
req, err := http.NewRequest("POST", a.url, bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+a.msiToken.AccessToken)
req.Header.Set("Content-Type", "application/x-ndjson")
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return nil return nil
} }
func (a *AzureMonitor) run() { func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
// The start of the period is truncated to the nearest minute. h := fnv.New64a()
// h.Write([]byte(m.Name()))
// Every metric then gets it's timestamp checked and is dropped if it h.Write([]byte("\n"))
// is not within: for _, tag := range m.TagList() {
// h.Write([]byte(tag.Key))
// start < t < end + truncation + delay h.Write([]byte("\n"))
//
// So if we start at now = 00:00.2 with a 10s period and 0.3s delay:
// now = 00:00.2
// start = 00:00
// truncation = 00:00.2
// end = 00:10
// 1st interval: 00:00 - 00:10.5
// 2nd interval: 00:10 - 00:20.5
// etc.
//
now := time.Now()
a.periodStart = now.Truncate(time.Minute)
truncation := now.Sub(a.periodStart)
a.periodEnd = a.periodStart.Add(a.period)
time.Sleep(a.delay)
periodT := time.NewTicker(a.period)
defer periodT.Stop()
for {
select {
case <-a.shutdown:
if len(a.metrics) > 0 {
// wait until metrics are flushed before exiting
continue
}
return
case m := <-a.metrics:
if m.Time().Before(a.periodStart) ||
m.Time().After(a.periodEnd.Add(truncation).Add(a.delay)) {
// the metric is outside the current aggregation period, so
// skip it.
continue
}
a.add(m)
case <-periodT.C:
a.periodStart = a.periodEnd
a.periodEnd = a.periodStart.Add(a.period)
a.push()
a.reset()
}
} }
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n])
h.Write([]byte("\n"))
return h.Sum64()
} }
func (a *AzureMonitor) reset() { func translate(m telegraf.Metric) *azureMonitorMetric {
a.cache = make(map[string]*azureMonitorMetric)
}
func (a *AzureMonitor) add(metric telegraf.Metric) {
var dimensionNames []string var dimensionNames []string
var dimensionValues []string var dimensionValues []string
for i, tag := range metric.TagList() { for i, tag := range m.TagList() {
// Azure custom metrics service supports up to 10 dimensions // Azure custom metrics service supports up to 10 dimensions
if i > 10 { if i > 10 {
log.Printf("W! [outputs.azuremonitor] metric [%s] exceeds 10 dimensions", m.Name())
continue continue
} }
dimensionNames = append(dimensionNames, tag.Key) dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value) dimensionValues = append(dimensionValues, tag.Value)
} }
// Azure Monitoe does not support string value types, so convert string min, _ := m.GetField("min")
// fields to dimensions if enabled. max, _ := m.GetField("max")
if a.StringAsDimension { sum, _ := m.GetField("sum")
for _, f := range metric.FieldList() { count, _ := m.GetField("count")
switch fv := f.Value.(type) { return &azureMonitorMetric{
case string: Time: m.Time(),
dimensionNames = append(dimensionNames, f.Key)
dimensionValues = append(dimensionValues, fv)
metric.RemoveField(f.Key)
}
}
}
for _, f := range metric.FieldList() {
name := metric.Name() + "_" + f.Key
fv, ok := convert(f.Value)
if !ok {
log.Printf("unable to convert field %s (type %T) to float type: %v", f.Key, fv, fv)
continue
}
if azm, ok := a.cache[name]; !ok {
// hit an uncached metric, create it for first time
a.cache[name] = &azureMonitorMetric{
Time: metric.Time(),
Data: &azureMonitorData{ Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{ BaseData: &azureMonitorBaseData{
Metric: name, Metric: m.Name(),
Namespace: "default", Namespace: "default",
DimensionNames: dimensionNames, DimensionNames: dimensionNames,
Series: []*azureMonitorSeries{ Series: []*azureMonitorSeries{
newAzureMonitorSeries(dimensionValues, fv), &azureMonitorSeries{
DimensionValues: dimensionValues,
Min: min.(float64),
Max: max.(float64),
Sum: sum.(float64),
Count: count.(int64),
},
}, },
}, },
}, },
} }
} else { }
tmp, i, ok := azm.findSeries(dimensionValues)
// Add will append a metric to the output aggregate
func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past
// and 4 minutes into the future. Future metrics are dropped when pushed.
t := m.Time()
tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
log.Printf("W! attempted to aggregate metric over 30 minutes old: %v, %v", t, tbucket)
return
}
// Azure Monitor doesn't have a string value type, so convert string
// fields to dimensions (a.k.a. tags) if enabled.
if a.StringAsDimension {
for fk, fv := range m.Fields() {
if v, ok := fv.(string); ok {
m.AddTag(fk, v)
}
}
}
for _, f := range m.FieldList() {
fv, ok := convert(f.Value)
if !ok { if !ok {
// add series new series (should be rare)
n := append(azm.Data.BaseData.Series, newAzureMonitorSeries(dimensionValues, fv))
a.cache[name].Data.BaseData.Series = n
continue continue
} }
//counter compute // Azure Monitor does not support fields so the field
n := tmp.Count + 1 // name is appended to the metric name.
tmp.Count = n name := m.Name() + "_" + sanitize(f.Key)
//max/min compute id := hashIDWithField(m.HashID(), f.Key)
if fv < tmp.Min {
tmp.Min = fv _, ok = a.cache[tbucket]
} else if fv > tmp.Max { if !ok {
tmp.Max = fv // Time bucket does not exist and needs to be created.
} a.cache[tbucket] = make(map[uint64]*aggregate)
//sum compute
tmp.Sum += fv
//store final data
a.cache[name].Data.BaseData.Series[i] = tmp
}
}
} }
func (m *azureMonitorMetric) findSeries(dv []string) (*azureMonitorSeries, int, bool) { nf := make(map[string]interface{}, 4)
if len(m.Data.BaseData.DimensionNames) != len(dv) { nf["min"] = fv
return nil, 0, false nf["max"] = fv
nf["sum"] = fv
nf["count"] = 1
// Fetch existing aggregate
agg, ok := a.cache[tbucket][id]
if ok {
aggfields := agg.Fields()
if fv > aggfields["min"].(float64) {
nf["min"] = aggfields["min"]
} }
for i := range m.Data.BaseData.Series { if fv < aggfields["max"].(float64) {
if m.Data.BaseData.Series[i].equal(dv) { nf["max"] = aggfields["max"]
return m.Data.BaseData.Series[i], i, true
} }
} nf["sum"] = fv + aggfields["sum"].(float64)
return nil, 0, false nf["count"] = aggfields["count"].(int64) + 1
} }
func newAzureMonitorSeries(dv []string, fv float64) *azureMonitorSeries { na, _ := metric.New(name, m.Tags(), nf, tbucket)
return &azureMonitorSeries{ a.cache[tbucket][id] = &aggregate{na, true}
DimensionValues: append([]string{}, dv...),
Min: fv,
Max: fv,
Sum: fv,
Count: 1,
} }
} }
func (s *azureMonitorSeries) equal(dv []string) bool {
if len(s.DimensionValues) != len(dv) {
return false
}
for i := range dv {
if dv[i] != s.DimensionValues[i] {
return false
}
}
return true
}
func convert(in interface{}) (float64, bool) { func convert(in interface{}) (float64, bool) {
switch v := in.(type) { switch v := in.(type) {
case int64: case int64:
@ -403,90 +383,70 @@ func convert(in interface{}) (float64, bool) {
if v { if v {
return 1, true return 1, true
} }
return 1, true return 0, true
case string:
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, false
}
return f, true
default: default:
return 0, false return 0, false
} }
} }
func (a *AzureMonitor) push() { var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
var body []byte
for _, metric := range a.cache { func sanitize(value string) string {
jsonBytes, err := json.Marshal(&metric) return invalidNameCharRE.ReplaceAllString(value, "_")
if err != nil {
log.Printf("Error marshalling metrics %s", err)
return
}
body = append(body, jsonBytes...)
body = append(body, '\n')
} }
_, err := a.postData(&body) func hashIDWithField(id uint64, fk string) uint64 {
if err != nil { h := fnv.New64a()
log.Printf("Error publishing aggregate metrics %s", err) b := make([]byte, binary.MaxVarintLen64)
} n := binary.PutUvarint(b, id)
return h.Write(b[:n])
h.Write([]byte("\n"))
h.Write([]byte(fk))
h.Write([]byte("\n"))
return h.Sum64()
} }
func (a *AzureMonitor) postData(msg *[]byte) (*http.Request, error) { // Push sends metrics to the output metric buffer
if err := a.validateCredentials(); err != nil { func (a *AzureMonitor) Push() []telegraf.Metric {
return nil, fmt.Errorf("Error authenticating: %v", err) var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(time.Now().Add(-time.Minute)) {
continue
}
for _, agg := range aggs {
// Only send aggregates that have had an update since
// the last push.
if !agg.updated {
continue
}
metrics = append(metrics, agg.Metric)
}
}
return metrics
} }
metricsEndpoint := fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics", // Reset clears the cache of aggregate metrics
a.Region, a.ResourceID) func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
req, err := http.NewRequest("POST", metricsEndpoint, bytes.NewBuffer(*msg)) // Remove aggregates older than 30 minutes
if err != nil { if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
log.Printf("Error creating HTTP request") delete(a.cache, tbucket)
return nil, err continue
} }
for id := range a.cache[tbucket] {
req.Header.Set("Authorization", "Bearer "+a.bearerToken) a.cache[tbucket][id].updated = false
req.Header.Set("Content-Type", "application/x-ndjson")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
} }
client := http.Client{
Transport: tr,
Timeout: a.Timeout.Duration,
} }
resp, err := client.Do(req)
if err != nil {
return req, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
var reply []byte
reply, err = ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return req, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return req, nil
} }
func init() { func init() {
outputs.Add("azuremonitor", func() telegraf.Output { outputs.Add("azuremonitor", func() telegraf.Output {
return &AzureMonitor{ return &AzureMonitor{
StringAsDimension: true, StringAsDimension: false,
Timeout: internal.Duration{Duration: time.Second * 5}, Timeout: internal.Duration{Duration: time.Second * 5},
Region: defaultRegion, Region: defaultRegion,
period: time.Minute, cache: make(map[time.Time]map[uint64]*aggregate, 36),
delay: time.Second * 5,
metrics: make(chan telegraf.Metric, 100),
shutdown: make(chan struct{}),
} }
}) })
} }

View File

@ -1,14 +1,10 @@
package azuremonitor package azuremonitor
import ( import (
"encoding/json"
"testing"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
) )
// func TestDefaultConnectAndWrite(t *testing.T) { // func TestDefaultConnectAndWrite(t *testing.T) {
@ -60,34 +56,34 @@ func getTestMetric(value interface{}, name ...string) telegraf.Metric {
return pt return pt
} }
func TestPostData(t *testing.T) { // func TestPostData(t *testing.T) {
azmon := &AzureMonitor{ // azmon := &AzureMonitor{
Region: "eastus", // Region: "eastus",
} // }
err := azmon.Connect() // err := azmon.Connect()
metrics := getMockMetrics() // metrics := getMockMetrics()
t.Logf("mock metrics are %+v\n", metrics) // t.Logf("mock metrics are %+v\n", metrics)
// metricsList, err := azmon.add(&metrics[0]) // // metricsList, err := azmon.add(&metrics[0])
for _, m := range metrics { // for _, m := range metrics {
azmon.add(m) // azmon.Add(m)
} // }
jsonBytes, err := json.Marshal(azmon.cache) // jsonBytes, err := json.Marshal(azmon.cache)
t.Logf("json content is:\n----------\n%s\n----------\n", string(jsonBytes)) // t.Logf("json content is:\n----------\n%s\n----------\n", string(jsonBytes))
req, err := azmon.postData(&jsonBytes) // req, err := azmon.postData(&jsonBytes)
if err != nil { // if err != nil {
// t.Logf("Error publishing metrics %s", err) // // t.Logf("Error publishing metrics %s", err)
t.Logf("url is %+v\n", req.URL) // t.Logf("url is %+v\n", req.URL)
// t.Logf("failed request is %+v\n", req) // // t.Logf("failed request is %+v\n", req)
// raw, err := httputil.DumpRequestOut(req, true) // // raw, err := httputil.DumpRequestOut(req, true)
// if err != nil { // // if err != nil {
// t.Logf("Request detail is \n%s\n", string(raw)) // // t.Logf("Request detail is \n%s\n", string(raw))
// } else { // // } else {
// t.Logf("could not dump request: %s\n", err) // // t.Logf("could not dump request: %s\n", err)
// // }
// }
// require.NoError(t, err)
// } // }
}
require.NoError(t, err)
}