Compare commits
5 Commits
master
...
ga-azure-m
Author | SHA1 | Date |
---|---|---|
Gunnar Aasen | 40c37aadc6 | |
Gunnar Aasen | 93a579d7e4 | |
Gunnar Aasen | 5c4c3a1ca9 | |
Mark Simms | 02d86b1b6e | |
Daniel Nelson | 4f61d2a09c |
4
Godeps
4
Godeps
|
@ -1,4 +1,7 @@
|
||||||
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
|
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
|
||||||
|
github.com/Azure/go-autorest 9ad9326b278af8fa5cc67c30c0ce9a58cc0862b2
|
||||||
|
github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65
|
||||||
|
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
|
||||||
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
|
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
|
||||||
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
|
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
|
||||||
github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07
|
github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07
|
||||||
|
@ -18,6 +21,7 @@ github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c
|
||||||
github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98
|
github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98
|
||||||
github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483
|
github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483
|
||||||
github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
|
github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
|
||||||
|
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
|
||||||
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
|
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
|
||||||
github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a
|
github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a
|
||||||
github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438
|
github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438
|
||||||
|
|
|
@ -113,6 +113,11 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
m, _ = metric.New(name, tags, fields, t)
|
m, _ = metric.New(name, tags, fields, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
output.Add(m)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ro.metrics.Add(m)
|
ro.metrics.Add(m)
|
||||||
if ro.metrics.Len() == ro.MetricBatchSize {
|
if ro.metrics.Len() == ro.MetricBatchSize {
|
||||||
batch := ro.metrics.Batch(ro.MetricBatchSize)
|
batch := ro.metrics.Batch(ro.MetricBatchSize)
|
||||||
|
@ -125,6 +130,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
|
|
||||||
// Write writes all cached points to this output.
|
// Write writes all cached points to this output.
|
||||||
func (ro *RunningOutput) Write() error {
|
func (ro *RunningOutput) Write() error {
|
||||||
|
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
|
||||||
|
metrics := output.Push()
|
||||||
|
ro.metrics.Add(metrics...)
|
||||||
|
output.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
|
||||||
ro.BufferSize.Set(int64(nFails + nMetrics))
|
ro.BufferSize.Set(int64(nFails + nMetrics))
|
||||||
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",
|
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",
|
||||||
|
|
|
@ -13,6 +13,12 @@ type Output interface {
|
||||||
Write(metrics []Metric) error
|
Write(metrics []Metric) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AggregatingOutput interface {
|
||||||
|
Add(in Metric)
|
||||||
|
Push() []Metric
|
||||||
|
Reset()
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceOutput interface {
|
type ServiceOutput interface {
|
||||||
// Connect to the Output
|
// Connect to the Output
|
||||||
Connect() error
|
Connect() error
|
||||||
|
|
|
@ -3,6 +3,7 @@ package all
|
||||||
import (
|
import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/outputs/azuremonitor"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
|
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
## Azure Monitor Custom Metrics Output for Telegraf
|
||||||
|
|
||||||
|
This plugin will send custom metrics to Azure Monitor.
|
||||||
|
|
||||||
|
All metrics are written as summarized values: min, max, sum, count. The Telegraf field name is appended to the metric name. All Telegraf tags are set as the metric dimensions.
|
||||||
|
|
||||||
|
## Azure Authentication
|
||||||
|
|
||||||
|
This plugin can use one of several different types of credentials to authenticate
|
||||||
|
with the Azure Monitor Custom Metrics ingestion API endpoint. In the following
|
||||||
|
order the plugin will attempt to authenticate.
|
||||||
|
1. Managed Service Identity (MSI) token
|
||||||
|
- This is the prefered authentication method.
|
||||||
|
- Note: MSI is only available to ARM-based resources.
|
||||||
|
2. AAD Application Tokens (Service Principals)
|
||||||
|
- Primarily useful if Telegraf is writing metrics for other resources. [More information](https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects).
|
||||||
|
- A Service Principal or User Principal needs to be assigned the `Monitoring Contributor` roles.
|
||||||
|
3. AAD User Tokens (User Principals)
|
||||||
|
- Allows Telegraf to authenticate like a user. It is best to use this method for development.
|
||||||
|
|
||||||
|
## Config
|
||||||
|
|
||||||
|
For this output plugin to function correctly the following variables
|
||||||
|
must be configured.
|
||||||
|
|
||||||
|
* resourceId
|
||||||
|
* region
|
||||||
|
|
||||||
|
### region
|
||||||
|
|
||||||
|
The region is the Azure region that you wish to connect to.
|
||||||
|
Examples include but are not limited to:
|
||||||
|
* useast
|
||||||
|
* centralus
|
||||||
|
* westcentralus
|
||||||
|
* westeurope
|
||||||
|
* southeastasia
|
||||||
|
|
||||||
|
### resourceId
|
||||||
|
|
||||||
|
The resourceId used for Azure Monitor metrics.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```
|
||||||
|
# Configuration for sending aggregate metrics to Azure Monitor
|
||||||
|
[[outputs.azuremonitor]]
|
||||||
|
## The resource ID against which metric will be logged. If not
|
||||||
|
## specified, the plugin will attempt to retrieve the resource ID
|
||||||
|
## of the VM via the instance metadata service (optional if running
|
||||||
|
## on an Azure VM with MSI)
|
||||||
|
#resourceId = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
|
||||||
|
## Azure region to publish metrics against. Defaults to eastus.
|
||||||
|
## Leave blank to automatically query the region via MSI.
|
||||||
|
#region = "useast"
|
||||||
|
|
||||||
|
## Write HTTP timeout, formatted as a string. If not provided, will default
|
||||||
|
## to 5s. 0s means no timeout (not recommended).
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Whether or not to use managed service identity.
|
||||||
|
#useManagedServiceIdentity = true
|
||||||
|
|
||||||
|
## Fill in the following values if using Active Directory Service
|
||||||
|
## Principal or User Principal for authentication.
|
||||||
|
## Subscription ID
|
||||||
|
#azureSubscription = ""
|
||||||
|
## Tenant ID
|
||||||
|
#azureTenant = ""
|
||||||
|
## Client ID
|
||||||
|
#azureClientId = ""
|
||||||
|
## Client secrete
|
||||||
|
#azureClientSecret = ""
|
||||||
|
```
|
|
@ -0,0 +1,196 @@
|
||||||
|
package azuremonitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VirtualMachineMetadata contains information about a VM from the metadata service
|
||||||
|
type VirtualMachineMetadata struct {
|
||||||
|
Raw string
|
||||||
|
AzureResourceID string
|
||||||
|
Compute struct {
|
||||||
|
Location string `json:"location"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Offer string `json:"offer"`
|
||||||
|
OsType string `json:"osType"`
|
||||||
|
PlacementGroupID string `json:"placementGroupId"`
|
||||||
|
PlatformFaultDomain string `json:"platformFaultDomain"`
|
||||||
|
PlatformUpdateDomain string `json:"platformUpdateDomain"`
|
||||||
|
Publisher string `json:"publisher"`
|
||||||
|
ResourceGroupName string `json:"resourceGroupName"`
|
||||||
|
Sku string `json:"sku"`
|
||||||
|
SubscriptionID string `json:"subscriptionId"`
|
||||||
|
Tags string `json:"tags"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
VMID string `json:"vmId"`
|
||||||
|
VMScaleSetName string `json:"vmScaleSetName"`
|
||||||
|
VMSize string `json:"vmSize"`
|
||||||
|
Zone string `json:"zone"`
|
||||||
|
} `json:"compute"`
|
||||||
|
Network struct {
|
||||||
|
Interface []struct {
|
||||||
|
Ipv4 struct {
|
||||||
|
IPAddress []struct {
|
||||||
|
PrivateIPAddress string `json:"privateIpAddress"`
|
||||||
|
PublicIPAddress string `json:"publicIpAddress"`
|
||||||
|
} `json:"ipAddress"`
|
||||||
|
Subnet []struct {
|
||||||
|
Address string `json:"address"`
|
||||||
|
Prefix string `json:"prefix"`
|
||||||
|
} `json:"subnet"`
|
||||||
|
} `json:"ipv4"`
|
||||||
|
Ipv6 struct {
|
||||||
|
IPAddress []interface{} `json:"ipAddress"`
|
||||||
|
} `json:"ipv6"`
|
||||||
|
MacAddress string `json:"macAddress"`
|
||||||
|
} `json:"interface"`
|
||||||
|
} `json:"network"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// msiToken is the managed service identity token
|
||||||
|
type msiToken struct {
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
RefreshToken string `json:"refresh_token"`
|
||||||
|
ExpiresIn string `json:"expires_in"`
|
||||||
|
ExpiresOn string `json:"expires_on"`
|
||||||
|
NotBefore string `json:"not_before"`
|
||||||
|
Resource string `json:"resource"`
|
||||||
|
TokenType string `json:"token_type"`
|
||||||
|
|
||||||
|
expiresAt time.Time
|
||||||
|
notBefore time.Time
|
||||||
|
raw string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msiToken) parseTimes() {
|
||||||
|
val, err := strconv.ParseInt(m.ExpiresOn, 10, 64)
|
||||||
|
if err == nil {
|
||||||
|
m.expiresAt = time.Unix(val, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err = strconv.ParseInt(m.NotBefore, 10, 64)
|
||||||
|
if err == nil {
|
||||||
|
m.notBefore = time.Unix(val, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpiresAt is the time at which the token expires
|
||||||
|
func (m *msiToken) ExpiresAt() time.Time {
|
||||||
|
return m.expiresAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpiresInDuration returns the duration until the token expires
|
||||||
|
func (m *msiToken) ExpiresInDuration() time.Duration {
|
||||||
|
expiresDuration := m.expiresAt.Sub(time.Now().UTC())
|
||||||
|
return expiresDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotBeforeTime returns the time at which the token becomes valid
|
||||||
|
func (m *msiToken) NotBeforeTime() time.Time {
|
||||||
|
return m.notBefore
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM
|
||||||
|
func (a *AzureMonitor) getMsiToken(clientID string, resourceID string) (*msiToken, error) {
|
||||||
|
// Acquire an MSI token. Documented at:
|
||||||
|
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
|
||||||
|
//
|
||||||
|
//GET http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=712eac09-e943-418c-9be6-9fd5c91078bl HTTP/1.1 Metadata: true
|
||||||
|
|
||||||
|
// Create HTTP request for MSI token to access Azure Resource Manager
|
||||||
|
var msiEndpoint *url.URL
|
||||||
|
msiEndpoint, err := url.Parse(msiInstanceMetadataURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resource ID defaults to https://management.azure.com
|
||||||
|
if resourceID == "" {
|
||||||
|
resourceID = "https://management.azure.com"
|
||||||
|
}
|
||||||
|
|
||||||
|
msiParameters := url.Values{}
|
||||||
|
msiParameters.Add("resource", resourceID)
|
||||||
|
msiParameters.Add("api-version", "2018-02-01")
|
||||||
|
|
||||||
|
// Client id is optional
|
||||||
|
if clientID != "" {
|
||||||
|
msiParameters.Add("client_id", clientID)
|
||||||
|
}
|
||||||
|
|
||||||
|
msiEndpoint.RawQuery = msiParameters.Encode()
|
||||||
|
req, err := http.NewRequest("GET", msiEndpoint.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Metadata", "true")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
reply, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
|
||||||
|
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s, content: %s",
|
||||||
|
resp.StatusCode, resp.Status, reply)
|
||||||
|
}
|
||||||
|
|
||||||
|
var token msiToken
|
||||||
|
if err := json.Unmarshal(reply, &token); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
token.parseTimes()
|
||||||
|
token.raw = string(reply)
|
||||||
|
return &token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
|
||||||
|
msiInstanceMetadataURL = "http://169.254.169.254/metadata/identity/oauth2/token"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetInstanceMetadata retrieves metadata about the current Azure VM
|
||||||
|
func (a *AzureMonitor) GetInstanceMetadata() (*VirtualMachineMetadata, error) {
|
||||||
|
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error creating HTTP request")
|
||||||
|
}
|
||||||
|
req.Header.Set("Metadata", "true")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
reply, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
|
||||||
|
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
|
||||||
|
resp.StatusCode, resp.Status, reply)
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata VirtualMachineMetadata
|
||||||
|
if err := json.Unmarshal(reply, &metadata); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metadata.AzureResourceID = fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s",
|
||||||
|
metadata.Compute.SubscriptionID, metadata.Compute.ResourceGroupName, metadata.Compute.Name)
|
||||||
|
|
||||||
|
return &metadata, nil
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
package azuremonitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetMetadata(t *testing.T) {
|
||||||
|
azureMetadata := &AzureMonitor{}
|
||||||
|
metadata, err := azureMetadata.GetInstanceMetadata()
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, metadata)
|
||||||
|
require.NotEmpty(t, metadata.AzureResourceID)
|
||||||
|
require.NotEmpty(t, metadata.Compute.Location)
|
||||||
|
|
||||||
|
// if err != nil {
|
||||||
|
// t.Logf("could not get metadata: %v\n", err)
|
||||||
|
// } else {
|
||||||
|
// t.Logf("resource id \n%s", metadata.AzureResourceID)
|
||||||
|
// t.Logf("metadata is \n%v", metadata)
|
||||||
|
// }
|
||||||
|
|
||||||
|
//fmt.Printf("metadata is \n%v", metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetTOKEN(t *testing.T) {
|
||||||
|
azureMetadata := &AzureMonitor{}
|
||||||
|
|
||||||
|
resourceID := "https://ingestion.monitor.azure.com/"
|
||||||
|
token, err := azureMetadata.getMsiToken("", resourceID)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, token.AccessToken)
|
||||||
|
require.EqualValues(t, token.Resource, resourceID)
|
||||||
|
|
||||||
|
t.Logf("token is %+v\n", token)
|
||||||
|
t.Logf("expiry time is %s\n", token.ExpiresAt().Format(time.RFC3339))
|
||||||
|
t.Logf("expiry duration is %s\n", token.ExpiresInDuration().String())
|
||||||
|
t.Logf("resource is %s\n", token.Resource)
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,452 @@
|
||||||
|
package azuremonitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Azure/go-autorest/autorest/adal"
|
||||||
|
"github.com/Azure/go-autorest/autorest/azure"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ telegraf.AggregatingOutput = (*AzureMonitor)(nil)
|
||||||
|
var _ telegraf.Output = (*AzureMonitor)(nil)
|
||||||
|
|
||||||
|
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
|
||||||
|
type AzureMonitor struct {
|
||||||
|
useMsi bool `toml:"use_managed_service_identity"`
|
||||||
|
ResourceID string `toml:"resource_id"`
|
||||||
|
Region string `toml:"region"`
|
||||||
|
Timeout internal.Duration `toml:"Timeout"`
|
||||||
|
AzureSubscriptionID string `toml:"azure_subscription"`
|
||||||
|
AzureTenantID string `toml:"azure_tenant"`
|
||||||
|
AzureClientID string `toml:"azure_client_id"`
|
||||||
|
AzureClientSecret string `toml:"azure_client_secret"`
|
||||||
|
StringAsDimension bool `toml:"string_as_dimension"`
|
||||||
|
|
||||||
|
url string
|
||||||
|
msiToken *msiToken
|
||||||
|
oauthConfig *adal.OAuthConfig
|
||||||
|
adalToken adal.OAuthTokenProvider
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
|
||||||
|
cache map[time.Time]map[uint64]*aggregate
|
||||||
|
}
|
||||||
|
|
||||||
|
type aggregate struct {
|
||||||
|
telegraf.Metric
|
||||||
|
updated bool
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultRegion string = "eastus"
|
||||||
|
|
||||||
|
defaultMSIResource string = "https://monitoring.azure.com/"
|
||||||
|
|
||||||
|
urlTemplate string = "https://%s.monitoring.azure.com%s/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## The resource ID against which metric will be logged. If not
|
||||||
|
## specified, the plugin will attempt to retrieve the resource ID
|
||||||
|
## of the VM via the instance metadata service (optional if running
|
||||||
|
## on an Azure VM with MSI)
|
||||||
|
#resource_id = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
|
||||||
|
## Azure region to publish metrics against. Defaults to eastus.
|
||||||
|
## Leave blank to automatically query the region via MSI.
|
||||||
|
#region = "useast"
|
||||||
|
|
||||||
|
## Write HTTP timeout, formatted as a string. If not provided, will default
|
||||||
|
## to 5s. 0s means no timeout (not recommended).
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Whether or not to use managed service identity.
|
||||||
|
#use_managed_service_identity = true
|
||||||
|
|
||||||
|
## Fill in the following values if using Active Directory Service
|
||||||
|
## Principal or User Principal for authentication.
|
||||||
|
## Subscription ID
|
||||||
|
#azure_subscription = ""
|
||||||
|
## Tenant ID
|
||||||
|
#azure_tenant = ""
|
||||||
|
## Client ID
|
||||||
|
#azure_client_id = ""
|
||||||
|
## Client secrete
|
||||||
|
#azure_client_secret = ""
|
||||||
|
`
|
||||||
|
|
||||||
|
// Description provides a description of the plugin
|
||||||
|
func (a *AzureMonitor) Description() string {
|
||||||
|
return "Configuration for sending aggregate metrics to Azure Monitor"
|
||||||
|
}
|
||||||
|
|
||||||
|
// SampleConfig provides a sample configuration for the plugin
|
||||||
|
func (a *AzureMonitor) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect initializes the plugin and validates connectivity
|
||||||
|
func (a *AzureMonitor) Connect() error {
|
||||||
|
a.client = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
Timeout: a.Timeout.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no direct AD values provided, fall back to MSI
|
||||||
|
if a.AzureSubscriptionID == "" && a.AzureTenantID == "" && a.AzureClientID == "" && a.AzureClientSecret == "" {
|
||||||
|
a.useMsi = true
|
||||||
|
} else if a.AzureSubscriptionID == "" || a.AzureTenantID == "" || a.AzureClientID == "" || a.AzureClientSecret == "" {
|
||||||
|
return fmt.Errorf("Must provide values for azureSubscription, azureTenant, azureClient and azureClientSecret, or leave all blank to default to MSI")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !a.useMsi {
|
||||||
|
// If using direct AD authentication create the AD access client
|
||||||
|
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, a.AzureTenantID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not initialize AD client: %s", err)
|
||||||
|
}
|
||||||
|
a.oauthConfig = oauthConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the resource identifier
|
||||||
|
metadata, err := a.GetInstanceMetadata()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
|
||||||
|
}
|
||||||
|
a.ResourceID = metadata.AzureResourceID
|
||||||
|
|
||||||
|
if a.Region == "" {
|
||||||
|
a.Region = metadata.Compute.Location
|
||||||
|
}
|
||||||
|
|
||||||
|
a.url := fmt.Sprintf(urlTemplate, a.Region, a.ResourceID)
|
||||||
|
|
||||||
|
// Validate credentials
|
||||||
|
err = a.validateCredentials()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Reset()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AzureMonitor) validateCredentials() error {
|
||||||
|
if a.useMsi {
|
||||||
|
// Check expiry on the token
|
||||||
|
if a.msiToken == nil || a.msiToken.ExpiresInDuration() < time.Minute {
|
||||||
|
msiToken, err := a.getMsiToken(a.AzureClientID, defaultMSIResource)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.msiToken = msiToken
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
adToken, err := adal.NewServicePrincipalToken(
|
||||||
|
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
|
||||||
|
azure.PublicCloud.ActiveDirectoryEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not acquire ADAL token: %s", err)
|
||||||
|
}
|
||||||
|
a.adalToken = adToken
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down an any active connections
|
||||||
|
func (a *AzureMonitor) Close() error {
|
||||||
|
a.client = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorMetric struct {
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Data *azureMonitorData `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorData struct {
|
||||||
|
BaseData *azureMonitorBaseData `json:"baseData"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorBaseData struct {
|
||||||
|
Metric string `json:"metric"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
DimensionNames []string `json:"dimNames"`
|
||||||
|
Series []*azureMonitorSeries `json:"series"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorSeries struct {
|
||||||
|
DimensionValues []string `json:"dimValues"`
|
||||||
|
Min float64 `json:"min"`
|
||||||
|
Max float64 `json:"max"`
|
||||||
|
Sum float64 `json:"sum"`
|
||||||
|
Count int64 `json:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes metrics to the remote endpoint
|
||||||
|
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
|
||||||
|
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
|
||||||
|
for _, m := range metrics {
|
||||||
|
id := hashIDWithTagKeysOnly(m)
|
||||||
|
if azm, ok := azmetrics[id]; !ok {
|
||||||
|
azmetrics[id] = translate(m)
|
||||||
|
} else {
|
||||||
|
azmetrics[id].Data.BaseData.Series = append(
|
||||||
|
azm.Data.BaseData.Series,
|
||||||
|
translate(m).Data.BaseData.Series...,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var body []byte
|
||||||
|
for _, m := range azmetrics {
|
||||||
|
// Azure Monitor accepts new batches of points in new-line delimited
|
||||||
|
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
|
||||||
|
jsonBytes, err := json.Marshal(&m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body = append(body, jsonBytes...)
|
||||||
|
body = append(body, '\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.validateCredentials(); err != nil {
|
||||||
|
return fmt.Errorf("Error authenticating: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", a.url, bytes.NewBuffer(body))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Authorization", "Bearer "+a.msiToken.AccessToken)
|
||||||
|
req.Header.Set("Content-Type", "application/x-ndjson")
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
|
||||||
|
reply, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
reply = nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
|
||||||
|
resp.StatusCode, resp.Status, reply)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
|
||||||
|
h := fnv.New64a()
|
||||||
|
h.Write([]byte(m.Name()))
|
||||||
|
h.Write([]byte("\n"))
|
||||||
|
for _, tag := range m.TagList() {
|
||||||
|
h.Write([]byte(tag.Key))
|
||||||
|
h.Write([]byte("\n"))
|
||||||
|
}
|
||||||
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
|
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
|
||||||
|
h.Write(b[:n])
|
||||||
|
h.Write([]byte("\n"))
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
func translate(m telegraf.Metric) *azureMonitorMetric {
|
||||||
|
var dimensionNames []string
|
||||||
|
var dimensionValues []string
|
||||||
|
for i, tag := range m.TagList() {
|
||||||
|
// Azure custom metrics service supports up to 10 dimensions
|
||||||
|
if i > 10 {
|
||||||
|
log.Printf("W! [outputs.azuremonitor] metric [%s] exceeds 10 dimensions", m.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dimensionNames = append(dimensionNames, tag.Key)
|
||||||
|
dimensionValues = append(dimensionValues, tag.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
min, _ := m.GetField("min")
|
||||||
|
max, _ := m.GetField("max")
|
||||||
|
sum, _ := m.GetField("sum")
|
||||||
|
count, _ := m.GetField("count")
|
||||||
|
return &azureMonitorMetric{
|
||||||
|
Time: m.Time(),
|
||||||
|
Data: &azureMonitorData{
|
||||||
|
BaseData: &azureMonitorBaseData{
|
||||||
|
Metric: m.Name(),
|
||||||
|
Namespace: "default",
|
||||||
|
DimensionNames: dimensionNames,
|
||||||
|
Series: []*azureMonitorSeries{
|
||||||
|
&azureMonitorSeries{
|
||||||
|
DimensionValues: dimensionValues,
|
||||||
|
Min: min.(float64),
|
||||||
|
Max: max.(float64),
|
||||||
|
Sum: sum.(float64),
|
||||||
|
Count: count.(int64),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add will append a metric to the output aggregate
|
||||||
|
func (a *AzureMonitor) Add(m telegraf.Metric) {
|
||||||
|
// Azure Monitor only supports aggregates 30 minutes into the past
|
||||||
|
// and 4 minutes into the future. Future metrics are dropped when pushed.
|
||||||
|
t := m.Time()
|
||||||
|
tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
|
||||||
|
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
|
||||||
|
log.Printf("W! attempted to aggregate metric over 30 minutes old: %v, %v", t, tbucket)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Azure Monitor doesn't have a string value type, so convert string
|
||||||
|
// fields to dimensions (a.k.a. tags) if enabled.
|
||||||
|
if a.StringAsDimension {
|
||||||
|
for fk, fv := range m.Fields() {
|
||||||
|
if v, ok := fv.(string); ok {
|
||||||
|
m.AddTag(fk, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range m.FieldList() {
|
||||||
|
fv, ok := convert(f.Value)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Azure Monitor does not support fields so the field
|
||||||
|
// name is appended to the metric name.
|
||||||
|
name := m.Name() + "_" + sanitize(f.Key)
|
||||||
|
id := hashIDWithField(m.HashID(), f.Key)
|
||||||
|
|
||||||
|
_, ok = a.cache[tbucket]
|
||||||
|
if !ok {
|
||||||
|
// Time bucket does not exist and needs to be created.
|
||||||
|
a.cache[tbucket] = make(map[uint64]*aggregate)
|
||||||
|
}
|
||||||
|
|
||||||
|
nf := make(map[string]interface{}, 4)
|
||||||
|
nf["min"] = fv
|
||||||
|
nf["max"] = fv
|
||||||
|
nf["sum"] = fv
|
||||||
|
nf["count"] = 1
|
||||||
|
// Fetch existing aggregate
|
||||||
|
agg, ok := a.cache[tbucket][id]
|
||||||
|
if ok {
|
||||||
|
aggfields := agg.Fields()
|
||||||
|
if fv > aggfields["min"].(float64) {
|
||||||
|
nf["min"] = aggfields["min"]
|
||||||
|
}
|
||||||
|
if fv < aggfields["max"].(float64) {
|
||||||
|
nf["max"] = aggfields["max"]
|
||||||
|
}
|
||||||
|
nf["sum"] = fv + aggfields["sum"].(float64)
|
||||||
|
nf["count"] = aggfields["count"].(int64) + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
na, _ := metric.New(name, m.Tags(), nf, tbucket)
|
||||||
|
a.cache[tbucket][id] = &aggregate{na, true}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func convert(in interface{}) (float64, bool) {
|
||||||
|
switch v := in.(type) {
|
||||||
|
case int64:
|
||||||
|
return float64(v), true
|
||||||
|
case uint64:
|
||||||
|
return float64(v), true
|
||||||
|
case float64:
|
||||||
|
return v, true
|
||||||
|
case bool:
|
||||||
|
if v {
|
||||||
|
return 1, true
|
||||||
|
}
|
||||||
|
return 0, true
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||||
|
|
||||||
|
func sanitize(value string) string {
|
||||||
|
return invalidNameCharRE.ReplaceAllString(value, "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashIDWithField(id uint64, fk string) uint64 {
|
||||||
|
h := fnv.New64a()
|
||||||
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
|
n := binary.PutUvarint(b, id)
|
||||||
|
h.Write(b[:n])
|
||||||
|
h.Write([]byte("\n"))
|
||||||
|
h.Write([]byte(fk))
|
||||||
|
h.Write([]byte("\n"))
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push sends metrics to the output metric buffer
|
||||||
|
func (a *AzureMonitor) Push() []telegraf.Metric {
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
for tbucket, aggs := range a.cache {
|
||||||
|
// Do not send metrics early
|
||||||
|
if tbucket.After(time.Now().Add(-time.Minute)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, agg := range aggs {
|
||||||
|
// Only send aggregates that have had an update since
|
||||||
|
// the last push.
|
||||||
|
if !agg.updated {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metrics = append(metrics, agg.Metric)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset clears the cache of aggregate metrics
|
||||||
|
func (a *AzureMonitor) Reset() {
|
||||||
|
for tbucket := range a.cache {
|
||||||
|
// Remove aggregates older than 30 minutes
|
||||||
|
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
|
||||||
|
delete(a.cache, tbucket)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for id := range a.cache[tbucket] {
|
||||||
|
a.cache[tbucket][id].updated = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("azuremonitor", func() telegraf.Output {
|
||||||
|
return &AzureMonitor{
|
||||||
|
StringAsDimension: false,
|
||||||
|
Timeout: internal.Duration{Duration: time.Second * 5},
|
||||||
|
Region: defaultRegion,
|
||||||
|
cache: make(map[time.Time]map[uint64]*aggregate, 36),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
package azuremonitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
)
|
||||||
|
|
||||||
|
// func TestDefaultConnectAndWrite(t *testing.T) {
|
||||||
|
// if testing.Short() {
|
||||||
|
// t.Skip("Skipping integration test in short mode")
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Test with all defaults (MSI+IMS)
|
||||||
|
// azmon := &AzureMonitor{}
|
||||||
|
|
||||||
|
// // Verify that we can connect to Log Analytics
|
||||||
|
// err := azmon.Connect()
|
||||||
|
// require.NoError(t, err)
|
||||||
|
|
||||||
|
// // Verify that we can write a metric to Log Analytics
|
||||||
|
// err = azmon.Write(testutil.MockMetrics())
|
||||||
|
// require.NoError(t, err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// MockMetrics returns a mock []telegraf.Metric object for using in unit tests
|
||||||
|
// of telegraf output sinks.
|
||||||
|
func getMockMetrics() []telegraf.Metric {
|
||||||
|
metrics := make([]telegraf.Metric, 0)
|
||||||
|
// Create a new point batch
|
||||||
|
metrics = append(metrics, getTestMetric(1.0))
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMetric Returns a simple test point:
|
||||||
|
// measurement -> "test1" or name
|
||||||
|
// tags -> "tag1":"value1"
|
||||||
|
// value -> value
|
||||||
|
// time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||||
|
func getTestMetric(value interface{}, name ...string) telegraf.Metric {
|
||||||
|
if value == nil {
|
||||||
|
panic("Cannot use a nil value")
|
||||||
|
}
|
||||||
|
measurement := "test1"
|
||||||
|
if len(name) > 0 {
|
||||||
|
measurement = name[0]
|
||||||
|
}
|
||||||
|
tags := map[string]string{"tag1": "value1"}
|
||||||
|
pt, _ := metric.New(
|
||||||
|
measurement,
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{"value": value},
|
||||||
|
time.Now().UTC(),
|
||||||
|
)
|
||||||
|
return pt
|
||||||
|
}
|
||||||
|
|
||||||
|
// func TestPostData(t *testing.T) {
|
||||||
|
// azmon := &AzureMonitor{
|
||||||
|
// Region: "eastus",
|
||||||
|
// }
|
||||||
|
// err := azmon.Connect()
|
||||||
|
|
||||||
|
// metrics := getMockMetrics()
|
||||||
|
// t.Logf("mock metrics are %+v\n", metrics)
|
||||||
|
// // metricsList, err := azmon.add(&metrics[0])
|
||||||
|
// for _, m := range metrics {
|
||||||
|
// azmon.Add(m)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// jsonBytes, err := json.Marshal(azmon.cache)
|
||||||
|
// t.Logf("json content is:\n----------\n%s\n----------\n", string(jsonBytes))
|
||||||
|
|
||||||
|
// req, err := azmon.postData(&jsonBytes)
|
||||||
|
// if err != nil {
|
||||||
|
// // t.Logf("Error publishing metrics %s", err)
|
||||||
|
// t.Logf("url is %+v\n", req.URL)
|
||||||
|
// // t.Logf("failed request is %+v\n", req)
|
||||||
|
|
||||||
|
// // raw, err := httputil.DumpRequestOut(req, true)
|
||||||
|
// // if err != nil {
|
||||||
|
// // t.Logf("Request detail is \n%s\n", string(raw))
|
||||||
|
// // } else {
|
||||||
|
// // t.Logf("could not dump request: %s\n", err)
|
||||||
|
// // }
|
||||||
|
// }
|
||||||
|
// require.NoError(t, err)
|
||||||
|
// }
|
Loading…
Reference in New Issue