Compare commits

...

5 Commits

Author SHA1 Message Date
Gunnar Aasen 40c37aadc6 Refactor to use AggregatingOutput 2018-05-07 08:21:00 -07:00
Gunnar Aasen 93a579d7e4 Output: Azure Monitor: Cleanup and add README 2018-05-07 08:20:49 -07:00
Gunnar Aasen 5c4c3a1ca9 Output: Azure Monitor: Initial aggregated metric implementation 2018-05-02 09:29:58 -07:00
Mark Simms 02d86b1b6e Starting on azure monitor metrics integration with MSI auth 2018-05-02 09:29:58 -07:00
Daniel Nelson 4f61d2a09c
Add idea for an output that aggregates before adding to metric buffer 2018-05-01 16:47:16 -07:00
9 changed files with 877 additions and 0 deletions

4
Godeps
View File

@ -1,4 +1,7 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/Azure/go-autorest 9ad9326b278af8fa5cc67c30c0ce9a58cc0862b2
github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07
@ -18,6 +21,7 @@ github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c
github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98
github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483
github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a
github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438

View File

@ -113,6 +113,11 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
m, _ = metric.New(name, tags, fields, t) m, _ = metric.New(name, tags, fields, t)
} }
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
output.Add(m)
return
}
ro.metrics.Add(m) ro.metrics.Add(m)
if ro.metrics.Len() == ro.MetricBatchSize { if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize) batch := ro.metrics.Batch(ro.MetricBatchSize)
@ -125,6 +130,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
// Write writes all cached points to this output. // Write writes all cached points to this output.
func (ro *RunningOutput) Write() error { func (ro *RunningOutput) Write() error {
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
metrics := output.Push()
ro.metrics.Add(metrics...)
output.Reset()
}
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
ro.BufferSize.Set(int64(nFails + nMetrics)) ro.BufferSize.Set(int64(nFails + nMetrics))
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ", log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",

View File

@ -13,6 +13,12 @@ type Output interface {
Write(metrics []Metric) error Write(metrics []Metric) error
} }
type AggregatingOutput interface {
Add(in Metric)
Push() []Metric
Reset()
}
type ServiceOutput interface { type ServiceOutput interface {
// Connect to the Output // Connect to the Output
Connect() error Connect() error

View File

@ -3,6 +3,7 @@ package all
import ( import (
_ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amon"
_ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/azuremonitor"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/datadog"

View File

@ -0,0 +1,74 @@
## Azure Monitor Custom Metrics Output for Telegraf
This plugin will send custom metrics to Azure Monitor.
All metrics are written as summarized values: min, max, sum, count. The Telegraf field name is appended to the metric name. All Telegraf tags are set as the metric dimensions.
## Azure Authentication
This plugin can use one of several different types of credentials to authenticate
with the Azure Monitor Custom Metrics ingestion API endpoint. In the following
order the plugin will attempt to authenticate.
1. Managed Service Identity (MSI) token
- This is the prefered authentication method.
- Note: MSI is only available to ARM-based resources.
2. AAD Application Tokens (Service Principals)
- Primarily useful if Telegraf is writing metrics for other resources. [More information](https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects).
- A Service Principal or User Principal needs to be assigned the `Monitoring Contributor` roles.
3. AAD User Tokens (User Principals)
- Allows Telegraf to authenticate like a user. It is best to use this method for development.
## Config
For this output plugin to function correctly the following variables
must be configured.
* resourceId
* region
### region
The region is the Azure region that you wish to connect to.
Examples include but are not limited to:
* useast
* centralus
* westcentralus
* westeurope
* southeastasia
### resourceId
The resourceId used for Azure Monitor metrics.
### Configuration:
```
# Configuration for sending aggregate metrics to Azure Monitor
[[outputs.azuremonitor]]
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resourceId = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#useManagedServiceIdentity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azureSubscription = ""
## Tenant ID
#azureTenant = ""
## Client ID
#azureClientId = ""
## Client secrete
#azureClientSecret = ""
```

View File

@ -0,0 +1,196 @@
package azuremonitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
)
// VirtualMachineMetadata contains information about a VM from the metadata service
type VirtualMachineMetadata struct {
Raw string
AzureResourceID string
Compute struct {
Location string `json:"location"`
Name string `json:"name"`
Offer string `json:"offer"`
OsType string `json:"osType"`
PlacementGroupID string `json:"placementGroupId"`
PlatformFaultDomain string `json:"platformFaultDomain"`
PlatformUpdateDomain string `json:"platformUpdateDomain"`
Publisher string `json:"publisher"`
ResourceGroupName string `json:"resourceGroupName"`
Sku string `json:"sku"`
SubscriptionID string `json:"subscriptionId"`
Tags string `json:"tags"`
Version string `json:"version"`
VMID string `json:"vmId"`
VMScaleSetName string `json:"vmScaleSetName"`
VMSize string `json:"vmSize"`
Zone string `json:"zone"`
} `json:"compute"`
Network struct {
Interface []struct {
Ipv4 struct {
IPAddress []struct {
PrivateIPAddress string `json:"privateIpAddress"`
PublicIPAddress string `json:"publicIpAddress"`
} `json:"ipAddress"`
Subnet []struct {
Address string `json:"address"`
Prefix string `json:"prefix"`
} `json:"subnet"`
} `json:"ipv4"`
Ipv6 struct {
IPAddress []interface{} `json:"ipAddress"`
} `json:"ipv6"`
MacAddress string `json:"macAddress"`
} `json:"interface"`
} `json:"network"`
}
// msiToken is the managed service identity token
type msiToken struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn string `json:"expires_in"`
ExpiresOn string `json:"expires_on"`
NotBefore string `json:"not_before"`
Resource string `json:"resource"`
TokenType string `json:"token_type"`
expiresAt time.Time
notBefore time.Time
raw string
}
func (m *msiToken) parseTimes() {
val, err := strconv.ParseInt(m.ExpiresOn, 10, 64)
if err == nil {
m.expiresAt = time.Unix(val, 0)
}
val, err = strconv.ParseInt(m.NotBefore, 10, 64)
if err == nil {
m.notBefore = time.Unix(val, 0)
}
}
// ExpiresAt is the time at which the token expires
func (m *msiToken) ExpiresAt() time.Time {
return m.expiresAt
}
// ExpiresInDuration returns the duration until the token expires
func (m *msiToken) ExpiresInDuration() time.Duration {
expiresDuration := m.expiresAt.Sub(time.Now().UTC())
return expiresDuration
}
// NotBeforeTime returns the time at which the token becomes valid
func (m *msiToken) NotBeforeTime() time.Time {
return m.notBefore
}
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM
func (a *AzureMonitor) getMsiToken(clientID string, resourceID string) (*msiToken, error) {
// Acquire an MSI token. Documented at:
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
//
//GET http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=712eac09-e943-418c-9be6-9fd5c91078bl HTTP/1.1 Metadata: true
// Create HTTP request for MSI token to access Azure Resource Manager
var msiEndpoint *url.URL
msiEndpoint, err := url.Parse(msiInstanceMetadataURL)
if err != nil {
return nil, err
}
// Resource ID defaults to https://management.azure.com
if resourceID == "" {
resourceID = "https://management.azure.com"
}
msiParameters := url.Values{}
msiParameters.Add("resource", resourceID)
msiParameters.Add("api-version", "2018-02-01")
// Client id is optional
if clientID != "" {
msiParameters.Add("client_id", clientID)
}
msiEndpoint.RawQuery = msiParameters.Encode()
req, err := http.NewRequest("GET", msiEndpoint.String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Metadata", "true")
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s, content: %s",
resp.StatusCode, resp.Status, reply)
}
var token msiToken
if err := json.Unmarshal(reply, &token); err != nil {
return nil, err
}
token.parseTimes()
token.raw = string(reply)
return &token, nil
}
const (
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
msiInstanceMetadataURL = "http://169.254.169.254/metadata/identity/oauth2/token"
)
// GetInstanceMetadata retrieves metadata about the current Azure VM
func (a *AzureMonitor) GetInstanceMetadata() (*VirtualMachineMetadata, error) {
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
if err != nil {
return nil, fmt.Errorf("Error creating HTTP request")
}
req.Header.Set("Metadata", "true")
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
var metadata VirtualMachineMetadata
if err := json.Unmarshal(reply, &metadata); err != nil {
return nil, err
}
metadata.AzureResourceID = fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s",
metadata.Compute.SubscriptionID, metadata.Compute.ResourceGroupName, metadata.Compute.Name)
return &metadata, nil
}

View File

@ -0,0 +1,44 @@
package azuremonitor
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestGetMetadata(t *testing.T) {
azureMetadata := &AzureMonitor{}
metadata, err := azureMetadata.GetInstanceMetadata()
require.NoError(t, err)
require.NotNil(t, metadata)
require.NotEmpty(t, metadata.AzureResourceID)
require.NotEmpty(t, metadata.Compute.Location)
// if err != nil {
// t.Logf("could not get metadata: %v\n", err)
// } else {
// t.Logf("resource id \n%s", metadata.AzureResourceID)
// t.Logf("metadata is \n%v", metadata)
// }
//fmt.Printf("metadata is \n%v", metadata)
}
func TestGetTOKEN(t *testing.T) {
azureMetadata := &AzureMonitor{}
resourceID := "https://ingestion.monitor.azure.com/"
token, err := azureMetadata.getMsiToken("", resourceID)
require.NoError(t, err)
require.NotEmpty(t, token.AccessToken)
require.EqualValues(t, token.Resource, resourceID)
t.Logf("token is %+v\n", token)
t.Logf("expiry time is %s\n", token.ExpiresAt().Format(time.RFC3339))
t.Logf("expiry duration is %s\n", token.ExpiresInDuration().String())
t.Logf("resource is %s\n", token.Resource)
}

View File

@ -0,0 +1,452 @@
package azuremonitor
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
"io/ioutil"
"log"
"net/http"
"regexp"
"time"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
)
var _ telegraf.AggregatingOutput = (*AzureMonitor)(nil)
var _ telegraf.Output = (*AzureMonitor)(nil)
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
type AzureMonitor struct {
useMsi bool `toml:"use_managed_service_identity"`
ResourceID string `toml:"resource_id"`
Region string `toml:"region"`
Timeout internal.Duration `toml:"Timeout"`
AzureSubscriptionID string `toml:"azure_subscription"`
AzureTenantID string `toml:"azure_tenant"`
AzureClientID string `toml:"azure_client_id"`
AzureClientSecret string `toml:"azure_client_secret"`
StringAsDimension bool `toml:"string_as_dimension"`
url string
msiToken *msiToken
oauthConfig *adal.OAuthConfig
adalToken adal.OAuthTokenProvider
client *http.Client
cache map[time.Time]map[uint64]*aggregate
}
type aggregate struct {
telegraf.Metric
updated bool
}
const (
defaultRegion string = "eastus"
defaultMSIResource string = "https://monitoring.azure.com/"
urlTemplate string = "https://%s.monitoring.azure.com%s/metrics"
)
var sampleConfig = `
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resource_id = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#use_managed_service_identity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azure_subscription = ""
## Tenant ID
#azure_tenant = ""
## Client ID
#azure_client_id = ""
## Client secrete
#azure_client_secret = ""
`
// Description provides a description of the plugin
func (a *AzureMonitor) Description() string {
return "Configuration for sending aggregate metrics to Azure Monitor"
}
// SampleConfig provides a sample configuration for the plugin
func (a *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Connect initializes the plugin and validates connectivity
func (a *AzureMonitor) Connect() error {
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}
// If no direct AD values provided, fall back to MSI
if a.AzureSubscriptionID == "" && a.AzureTenantID == "" && a.AzureClientID == "" && a.AzureClientSecret == "" {
a.useMsi = true
} else if a.AzureSubscriptionID == "" || a.AzureTenantID == "" || a.AzureClientID == "" || a.AzureClientSecret == "" {
return fmt.Errorf("Must provide values for azureSubscription, azureTenant, azureClient and azureClientSecret, or leave all blank to default to MSI")
}
if !a.useMsi {
// If using direct AD authentication create the AD access client
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, a.AzureTenantID)
if err != nil {
return fmt.Errorf("Could not initialize AD client: %s", err)
}
a.oauthConfig = oauthConfig
}
// Validate the resource identifier
metadata, err := a.GetInstanceMetadata()
if err != nil {
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
}
a.ResourceID = metadata.AzureResourceID
if a.Region == "" {
a.Region = metadata.Compute.Location
}
a.url := fmt.Sprintf(urlTemplate, a.Region, a.ResourceID)
// Validate credentials
err = a.validateCredentials()
if err != nil {
return err
}
a.Reset()
return nil
}
func (a *AzureMonitor) validateCredentials() error {
if a.useMsi {
// Check expiry on the token
if a.msiToken == nil || a.msiToken.ExpiresInDuration() < time.Minute {
msiToken, err := a.getMsiToken(a.AzureClientID, defaultMSIResource)
if err != nil {
return err
}
a.msiToken = msiToken
}
return nil
}
adToken, err := adal.NewServicePrincipalToken(
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint)
if err != nil {
return fmt.Errorf("Could not acquire ADAL token: %s", err)
}
a.adalToken = adToken
return nil
}
// Close shuts down an any active connections
func (a *AzureMonitor) Close() error {
a.client = nil
return nil
}
type azureMonitorMetric struct {
Time time.Time `json:"time"`
Data *azureMonitorData `json:"data"`
}
type azureMonitorData struct {
BaseData *azureMonitorBaseData `json:"baseData"`
}
type azureMonitorBaseData struct {
Metric string `json:"metric"`
Namespace string `json:"namespace"`
DimensionNames []string `json:"dimNames"`
Series []*azureMonitorSeries `json:"series"`
}
type azureMonitorSeries struct {
DimensionValues []string `json:"dimValues"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Sum float64 `json:"sum"`
Count int64 `json:"count"`
}
// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for _, m := range metrics {
id := hashIDWithTagKeysOnly(m)
if azm, ok := azmetrics[id]; !ok {
azmetrics[id] = translate(m)
} else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
translate(m).Data.BaseData.Series...,
)
}
}
var body []byte
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
jsonBytes, err := json.Marshal(&m)
if err != nil {
return err
}
body = append(body, jsonBytes...)
body = append(body, '\n')
}
if err := a.validateCredentials(); err != nil {
return fmt.Errorf("Error authenticating: %v", err)
}
req, err := http.NewRequest("POST", a.url, bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+a.msiToken.AccessToken)
req.Header.Set("Content-Type", "application/x-ndjson")
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return nil
}
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte("\n"))
for _, tag := range m.TagList() {
h.Write([]byte(tag.Key))
h.Write([]byte("\n"))
}
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n])
h.Write([]byte("\n"))
return h.Sum64()
}
func translate(m telegraf.Metric) *azureMonitorMetric {
var dimensionNames []string
var dimensionValues []string
for i, tag := range m.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if i > 10 {
log.Printf("W! [outputs.azuremonitor] metric [%s] exceeds 10 dimensions", m.Name())
continue
}
dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value)
}
min, _ := m.GetField("min")
max, _ := m.GetField("max")
sum, _ := m.GetField("sum")
count, _ := m.GetField("count")
return &azureMonitorMetric{
Time: m.Time(),
Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{
Metric: m.Name(),
Namespace: "default",
DimensionNames: dimensionNames,
Series: []*azureMonitorSeries{
&azureMonitorSeries{
DimensionValues: dimensionValues,
Min: min.(float64),
Max: max.(float64),
Sum: sum.(float64),
Count: count.(int64),
},
},
},
},
}
}
// Add will append a metric to the output aggregate
func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past
// and 4 minutes into the future. Future metrics are dropped when pushed.
t := m.Time()
tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
log.Printf("W! attempted to aggregate metric over 30 minutes old: %v, %v", t, tbucket)
return
}
// Azure Monitor doesn't have a string value type, so convert string
// fields to dimensions (a.k.a. tags) if enabled.
if a.StringAsDimension {
for fk, fv := range m.Fields() {
if v, ok := fv.(string); ok {
m.AddTag(fk, v)
}
}
}
for _, f := range m.FieldList() {
fv, ok := convert(f.Value)
if !ok {
continue
}
// Azure Monitor does not support fields so the field
// name is appended to the metric name.
name := m.Name() + "_" + sanitize(f.Key)
id := hashIDWithField(m.HashID(), f.Key)
_, ok = a.cache[tbucket]
if !ok {
// Time bucket does not exist and needs to be created.
a.cache[tbucket] = make(map[uint64]*aggregate)
}
nf := make(map[string]interface{}, 4)
nf["min"] = fv
nf["max"] = fv
nf["sum"] = fv
nf["count"] = 1
// Fetch existing aggregate
agg, ok := a.cache[tbucket][id]
if ok {
aggfields := agg.Fields()
if fv > aggfields["min"].(float64) {
nf["min"] = aggfields["min"]
}
if fv < aggfields["max"].(float64) {
nf["max"] = aggfields["max"]
}
nf["sum"] = fv + aggfields["sum"].(float64)
nf["count"] = aggfields["count"].(int64) + 1
}
na, _ := metric.New(name, m.Tags(), nf, tbucket)
a.cache[tbucket][id] = &aggregate{na, true}
}
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case int64:
return float64(v), true
case uint64:
return float64(v), true
case float64:
return v, true
case bool:
if v {
return 1, true
}
return 0, true
default:
return 0, false
}
}
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func hashIDWithField(id uint64, fk string) uint64 {
h := fnv.New64a()
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, id)
h.Write(b[:n])
h.Write([]byte("\n"))
h.Write([]byte(fk))
h.Write([]byte("\n"))
return h.Sum64()
}
// Push sends metrics to the output metric buffer
func (a *AzureMonitor) Push() []telegraf.Metric {
var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(time.Now().Add(-time.Minute)) {
continue
}
for _, agg := range aggs {
// Only send aggregates that have had an update since
// the last push.
if !agg.updated {
continue
}
metrics = append(metrics, agg.Metric)
}
}
return metrics
}
// Reset clears the cache of aggregate metrics
func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
// Remove aggregates older than 30 minutes
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
delete(a.cache, tbucket)
continue
}
for id := range a.cache[tbucket] {
a.cache[tbucket][id].updated = false
}
}
}
func init() {
outputs.Add("azuremonitor", func() telegraf.Output {
return &AzureMonitor{
StringAsDimension: false,
Timeout: internal.Duration{Duration: time.Second * 5},
Region: defaultRegion,
cache: make(map[time.Time]map[uint64]*aggregate, 36),
}
})
}

View File

@ -0,0 +1,89 @@
package azuremonitor
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
// func TestDefaultConnectAndWrite(t *testing.T) {
// if testing.Short() {
// t.Skip("Skipping integration test in short mode")
// }
// // Test with all defaults (MSI+IMS)
// azmon := &AzureMonitor{}
// // Verify that we can connect to Log Analytics
// err := azmon.Connect()
// require.NoError(t, err)
// // Verify that we can write a metric to Log Analytics
// err = azmon.Write(testutil.MockMetrics())
// require.NoError(t, err)
// }
// MockMetrics returns a mock []telegraf.Metric object for using in unit tests
// of telegraf output sinks.
func getMockMetrics() []telegraf.Metric {
metrics := make([]telegraf.Metric, 0)
// Create a new point batch
metrics = append(metrics, getTestMetric(1.0))
return metrics
}
// TestMetric Returns a simple test point:
// measurement -> "test1" or name
// tags -> "tag1":"value1"
// value -> value
// time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
func getTestMetric(value interface{}, name ...string) telegraf.Metric {
if value == nil {
panic("Cannot use a nil value")
}
measurement := "test1"
if len(name) > 0 {
measurement = name[0]
}
tags := map[string]string{"tag1": "value1"}
pt, _ := metric.New(
measurement,
tags,
map[string]interface{}{"value": value},
time.Now().UTC(),
)
return pt
}
// func TestPostData(t *testing.T) {
// azmon := &AzureMonitor{
// Region: "eastus",
// }
// err := azmon.Connect()
// metrics := getMockMetrics()
// t.Logf("mock metrics are %+v\n", metrics)
// // metricsList, err := azmon.add(&metrics[0])
// for _, m := range metrics {
// azmon.Add(m)
// }
// jsonBytes, err := json.Marshal(azmon.cache)
// t.Logf("json content is:\n----------\n%s\n----------\n", string(jsonBytes))
// req, err := azmon.postData(&jsonBytes)
// if err != nil {
// // t.Logf("Error publishing metrics %s", err)
// t.Logf("url is %+v\n", req.URL)
// // t.Logf("failed request is %+v\n", req)
// // raw, err := httputil.DumpRequestOut(req, true)
// // if err != nil {
// // t.Logf("Request detail is \n%s\n", string(raw))
// // } else {
// // t.Logf("could not dump request: %s\n", err)
// // }
// }
// require.NoError(t, err)
// }