Starting on azure monitor metrics integration with MSI auth

This commit is contained in:
Mark Simms 2018-03-27 19:15:52 -07:00 committed by Gunnar Aasen
parent 4f61d2a09c
commit 02d86b1b6e
7 changed files with 799 additions and 0 deletions

3
.gitignore vendored
View File

@ -2,3 +2,6 @@
/telegraf
/telegraf.exe
/telegraf.gz
*~
*#
*.tar.gz

13
build-azmon.sh Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash
TELEGRAF_VERSION=1.5.3-azmon
CONTAINER_URI=https://masdiagstore.blob.core.windows.net/share/
make
tar -cvfz telegraf-${TELEGRAF_VERSION}-l_amd64.tar.gz ./telegraf
azcopy \
--source ./telegraf-${TELEGRAF_VERSION}-l_amd64.tar.gz \
--destination $CONTAINER_URL/telegraf-${TELEGRAF_VERSION}-l_amd64.tar.gz \
--dest-key $AZURE_STORAGE_KEY

View File

@ -0,0 +1,213 @@
package azuremonitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/prometheus/common/log"
)
// AzureInstanceMetadata is the proxy for accessing the instance metadata service on an Azure VM
type AzureInstanceMetadata struct {
}
// VirtualMachineMetadata contains information about a VM from the metadata service
type VirtualMachineMetadata struct {
Raw string
AzureResourceID string
Compute struct {
Location string `json:"location"`
Name string `json:"name"`
Offer string `json:"offer"`
OsType string `json:"osType"`
PlacementGroupID string `json:"placementGroupId"`
PlatformFaultDomain string `json:"platformFaultDomain"`
PlatformUpdateDomain string `json:"platformUpdateDomain"`
Publisher string `json:"publisher"`
ResourceGroupName string `json:"resourceGroupName"`
Sku string `json:"sku"`
SubscriptionID string `json:"subscriptionId"`
Tags string `json:"tags"`
Version string `json:"version"`
VMID string `json:"vmId"`
VMScaleSetName string `json:"vmScaleSetName"`
VMSize string `json:"vmSize"`
Zone string `json:"zone"`
} `json:"compute"`
Network struct {
Interface []struct {
Ipv4 struct {
IPAddress []struct {
PrivateIPAddress string `json:"privateIpAddress"`
PublicIPAddress string `json:"publicIpAddress"`
} `json:"ipAddress"`
Subnet []struct {
Address string `json:"address"`
Prefix string `json:"prefix"`
} `json:"subnet"`
} `json:"ipv4"`
Ipv6 struct {
IPAddress []interface{} `json:"ipAddress"`
} `json:"ipv6"`
MacAddress string `json:"macAddress"`
} `json:"interface"`
} `json:"network"`
}
// MsiToken is the managed service identity token
type MsiToken struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn string `json:"expires_in"`
ExpiresOn string `json:"expires_on"`
NotBefore string `json:"not_before"`
Resource string `json:"resource"`
TokenType string `json:"token_type"`
expiresAt time.Time
notBefore time.Time
raw string
}
func (m *MsiToken) parseTimes() {
val, err := strconv.ParseInt(m.ExpiresOn, 10, 64)
if err == nil {
m.expiresAt = time.Unix(val, 0)
}
val, err = strconv.ParseInt(m.NotBefore, 10, 64)
if err == nil {
m.notBefore = time.Unix(val, 0)
}
}
// ExpiresAt is the time at which the token expires
func (m *MsiToken) ExpiresAt() time.Time {
return m.expiresAt
}
// ExpiresInDuration returns the duration until the token expires
func (m *MsiToken) ExpiresInDuration() time.Duration {
expiresDuration := m.expiresAt.Sub(time.Now().UTC())
return expiresDuration
}
// NotBeforeTime returns the time at which the token becomes valid
func (m *MsiToken) NotBeforeTime() time.Time {
return m.notBefore
}
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM
func (s *AzureInstanceMetadata) GetMsiToken(clientID string, resourceID string) (*MsiToken, error) {
// Acquire an MSI token. Documented at:
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
//
//GET http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=712eac09-e943-418c-9be6-9fd5c91078bl HTTP/1.1 Metadata: true
// Create HTTP request for MSI token to access Azure Resource Manager
var msiEndpoint *url.URL
msiEndpoint, err := url.Parse(msiInstanceMetadataURL)
if err != nil {
return nil, err
}
// Resource ID defaults to https://management.azure.com
if resourceID == "" {
resourceID = "https://management.azure.com"
}
msiParameters := url.Values{}
msiParameters.Add("resource", resourceID)
msiParameters.Add("api-version", "2018-02-01")
// Client id is optional
if clientID != "" {
msiParameters.Add("client_id", clientID)
}
msiEndpoint.RawQuery = msiParameters.Encode()
req, err := http.NewRequest("GET", msiEndpoint.String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Metadata", "true")
// Create the HTTP client and call the token service
client := http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
// Complete reading the body
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s, content: %s",
resp.StatusCode, resp.Status, reply)
}
var token MsiToken
if err := json.Unmarshal(reply, &token); err != nil {
return nil, err
}
token.parseTimes()
token.raw = string(reply)
return &token, nil
}
const (
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
msiInstanceMetadataURL = "http://169.254.169.254/metadata/identity/oauth2/token"
)
// GetInstanceMetadata retrieves metadata about the current Azure VM
func (s *AzureInstanceMetadata) GetInstanceMetadata() (*VirtualMachineMetadata, error) {
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
if err != nil {
log.Errorf("Error creating HTTP request")
return nil, err
}
req.Header.Set("Metadata", "true")
client := http.Client{
Timeout: 15 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
var metadata VirtualMachineMetadata
if err := json.Unmarshal(reply, &metadata); err != nil {
return nil, err
}
metadata.AzureResourceID = fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s",
metadata.Compute.SubscriptionID, metadata.Compute.ResourceGroupName, metadata.Compute.Name)
return &metadata, nil
}

View File

@ -0,0 +1,44 @@
package azuremonitor
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestGetMetadata(t *testing.T) {
azureMetadata := &AzureInstanceMetadata{}
metadata, err := azureMetadata.GetInstanceMetadata()
require.NoError(t, err)
require.NotNil(t, metadata)
require.NotEmpty(t, metadata.AzureResourceID)
require.NotEmpty(t, metadata.Compute.Location)
// if err != nil {
// t.Logf("could not get metadata: %v\n", err)
// } else {
// t.Logf("resource id \n%s", metadata.AzureResourceID)
// t.Logf("metadata is \n%v", metadata)
// }
//fmt.Printf("metadata is \n%v", metadata)
}
func TestGetTOKEN(t *testing.T) {
azureMetadata := &AzureInstanceMetadata{}
resourceID := "https://ingestion.monitor.azure.com/"
token, err := azureMetadata.GetMsiToken("", resourceID)
require.NoError(t, err)
require.NotEmpty(t, token.AccessToken)
require.EqualValues(t, token.Resource, resourceID)
t.Logf("token is %+v\n", token)
t.Logf("expiry time is %s\n", token.ExpiresAt().Format(time.RFC3339))
t.Logf("expiry duration is %s\n", token.ExpiresInDuration().String())
t.Logf("resource is %s\n", token.Resource)
}

View File

@ -0,0 +1,343 @@
package azuremonitor
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"time"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/telegraf"
)
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
type AzureMonitor struct {
ResourceID string `toml:"resourceId"`
Region string `toml:"region"`
HTTPPostTimeout int `toml:"httpPostTimeout"`
AzureSubscriptionID string `toml:"azureSubscription"`
AzureTenantID string `toml:"azureTenant"`
AzureClientID string `toml:"azureClientId"`
AzureClientSecret string `toml:"azureClientSecret"`
useMsi bool
metadataService *AzureInstanceMetadata
instanceMetadata *VirtualMachineMetadata
msiToken *MsiToken
msiResource string
bearerToken string
expiryWatermark time.Duration
oauthConfig *adal.OAuthConfig
adalToken adal.OAuthTokenProvider
}
var sampleConfig = `
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
resourceId = "/subscriptions/3e9c2afc-52b3-4137-9bba-02b6eb204331/resourceGroups/someresourcegroup-rg/providers/Microsoft.Compute/virtualMachines/somevmname"
## Azure region to publish metrics against. Defaults to eastus
region = "useast"
## Maximum duration to wait for HTTP post (in seconds). Defaults to 15
httpPostTimeout = 15
## Whether or not to use managed service identity (defaults to true).
useManagedServiceIdentity = true
## TODO
azureSubscription = "TODO"
## TODO
azureTenant = "TODO"
## TODO
azureClientId = "TODO"
## TODO
azureClientSecret = "TODO"
`
const (
azureMonitorDefaultRegion = "eastus"
)
// Description provides a description of the plugin
func (s *AzureMonitor) Description() string {
return "Configuration for Azure Monitor to send metrics to"
}
// SampleConfig provides a sample configuration for the plugin
func (s *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Connect initializes the plugin and validates connectivity
func (s *AzureMonitor) Connect() error {
// Set defaults
// If no direct AD values provided, fall back to MSI
if s.AzureSubscriptionID == "" && s.AzureTenantID == "" && s.AzureClientID == "" && s.AzureClientSecret == "" {
s.useMsi = true
} else if s.AzureSubscriptionID == "" || s.AzureTenantID == "" || s.AzureClientID == "" || s.AzureClientSecret == "" {
return fmt.Errorf("Must provide values for azureSubscription, azureTenant, azureClient and azureClientSecret, or leave all blank to default to MSI")
}
if s.useMsi == false {
// If using direct AD authentication create the AD access client
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, s.AzureTenantID)
if err != nil {
return fmt.Errorf("Could not initialize AD client: %s", err)
}
s.oauthConfig = oauthConfig
}
if s.HTTPPostTimeout == 0 {
s.HTTPPostTimeout = 10
}
s.metadataService = &AzureInstanceMetadata{}
// For the metrics API the MSI resource has to be https://ingestion.monitor.azure.com
s.msiResource = "https://ingestion.monitor.azure.com/"
// Validate the resource identifier
if s.ResourceID == "" {
metadata, err := s.metadataService.GetInstanceMetadata()
if err != nil {
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
}
s.ResourceID = metadata.AzureResourceID
if s.Region == "" {
s.Region = metadata.Compute.Location
}
}
if s.Region == "" {
s.Region = azureMonitorDefaultRegion
}
// Validate credentials
err := s.validateCredentials()
if err != nil {
return err
}
return nil
}
// Close shuts down an any active connections
func (s *AzureMonitor) Close() error {
// Close connection to the URL here
return nil
}
// Write writes metrics to the remote endpoint
func (s *AzureMonitor) Write(metrics []telegraf.Metric) error {
// Flatten metrics into an Azure Monitor common schema compatible format
metricsList, err := s.flattenMetrics(metrics)
if err != nil {
log.Printf("Error translating metrics %s", err)
return err
}
for _, v := range metricsList {
jsonBytes, err := json.Marshal(&v)
_, err = s.postData(&jsonBytes)
if err != nil {
log.Printf("Error publishing metrics %s", err)
return err
}
}
return nil
}
func (s *AzureMonitor) validateCredentials() error {
// Use managed service identity
if s.useMsi {
// Check expiry on the token
if s.msiToken != nil {
expiryDuration := s.msiToken.ExpiresInDuration()
if expiryDuration > s.expiryWatermark {
return nil
}
// Token is about to expire
log.Printf("Bearer token expiring in %s; acquiring new token\n", expiryDuration.String())
s.msiToken = nil
}
// No token, acquire an MSI token
if s.msiToken == nil {
msiToken, err := s.metadataService.GetMsiToken(s.AzureClientID, s.msiResource)
if err != nil {
return err
}
log.Printf("Bearer token acquired; expiring in %s\n", msiToken.ExpiresInDuration().String())
s.msiToken = msiToken
s.bearerToken = msiToken.AccessToken
}
// Otherwise directory acquire a token
} else {
adToken, err := adal.NewServicePrincipalToken(
*(s.oauthConfig), s.AzureClientID, s.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint)
if err != nil {
return fmt.Errorf("Could not acquire ADAL token: %s", err)
}
s.adalToken = adToken
}
return nil
}
type azureMonitorMetric struct {
Time time.Time `json:"time"`
Data azureMonitorData `json:"data"`
}
type azureMonitorData struct {
BaseData azureMonitorBaseData `json:"baseData"`
}
type azureMonitorBaseData struct {
Metric string `json:"metric"`
Namespace string `json:"namespace"`
DimensionNames []string `json:"dimNames"`
Series []azureMonitorSeries `json:"series"`
}
type azureMonitorSeries struct {
DimensionValues []string `json:"dimValues"`
Min int64 `json:"min"`
Max int64 `json:"max"`
Sum int64 `json:"sum"`
Count int64 `json:"count"`
}
func (s *AzureMonitor) flattenMetrics(metrics []telegraf.Metric) ([]azureMonitorMetric, error) {
var azureMetrics []azureMonitorMetric
for _, metric := range metrics {
// Get the list of custom dimensions (elevated tags and fields)
var dimensionNames []string
var dimensionValues []string
for name, value := range metric.Fields() {
dimensionNames = append(dimensionNames, name)
dimensionValues = append(dimensionValues, s.formatField(value))
}
series := azureMonitorSeries{
DimensionValues: dimensionValues,
}
if v, ok := metric.Fields()["min"]; ok {
series.Min = s.formatInt(v)
}
if v, ok := metric.Fields()["max"]; ok {
series.Max = s.formatInt(v)
}
if v, ok := metric.Fields()["sum"]; ok {
series.Sum = s.formatInt(v)
}
if v, ok := metric.Fields()["count"]; ok {
series.Count = s.formatInt(v)
} else {
// Azure Monitor requires count >= 1
series.Count = 1
}
azureMetric := azureMonitorMetric{
Time: metric.Time(),
Data: azureMonitorData{
BaseData: azureMonitorBaseData{
Metric: metric.Name(),
Namespace: "default",
DimensionNames: dimensionNames,
Series: []azureMonitorSeries{series},
},
},
}
azureMetrics = append(azureMetrics, azureMetric)
}
return azureMetrics, nil
}
func (s *AzureMonitor) formatInt(value interface{}) int64 {
return 0
}
func (s *AzureMonitor) formatField(value interface{}) string {
var ret string
switch v := value.(type) {
case int:
ret = strconv.FormatInt(int64(value.(int)), 10)
case int8:
ret = strconv.FormatInt(int64(value.(int8)), 10)
case int16:
ret = strconv.FormatInt(int64(value.(int16)), 10)
case int32:
ret = strconv.FormatInt(int64(value.(int32)), 10)
case int64:
ret = strconv.FormatInt(value.(int64), 10)
case float32:
ret = strconv.FormatFloat(float64(value.(float32)), 'f', -1, 64)
case float64:
ret = strconv.FormatFloat(value.(float64), 'f', -1, 64)
default:
spew.Printf("field is of unsupported value type %v\n", v)
}
return ret
}
func (s *AzureMonitor) postData(msg *[]byte) (*http.Request, error) {
metricsEndpoint := fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics",
s.Region, s.ResourceID)
req, err := http.NewRequest("POST", metricsEndpoint, bytes.NewBuffer(*msg))
if err != nil {
log.Printf("Error creating HTTP request")
return nil, err
}
req.Header.Set("Authorization", "Bearer "+s.bearerToken)
req.Header.Set("Content-Type", "application/json")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := http.Client{
Transport: tr,
// TODO - fix this
//Timeout: time.Duration(s.HTTPPostTimeout * time.Second),
Timeout: time.Duration(10 * time.Second),
}
resp, err := client.Do(req)
if err != nil {
return req, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
var reply []byte
reply, err = ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return req, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return req, nil
}

View File

@ -0,0 +1,90 @@
package azuremonitor
import (
"encoding/json"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
)
// func TestDefaultConnectAndWrite(t *testing.T) {
// if testing.Short() {
// t.Skip("Skipping integration test in short mode")
// }
// // Test with all defaults (MSI+IMS)
// azmon := &AzureMonitor{}
// // Verify that we can connect to Log Analytics
// err := azmon.Connect()
// require.NoError(t, err)
// // Verify that we can write a metric to Log Analytics
// err = azmon.Write(testutil.MockMetrics())
// require.NoError(t, err)
// }
// MockMetrics returns a mock []telegraf.Metric object for using in unit tests
// of telegraf output sinks.
func getMockMetrics() []telegraf.Metric {
metrics := make([]telegraf.Metric, 0)
// Create a new point batch
metrics = append(metrics, getTestMetric(1.0))
return metrics
}
// TestMetric Returns a simple test point:
// measurement -> "test1" or name
// tags -> "tag1":"value1"
// value -> value
// time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
func getTestMetric(value interface{}, name ...string) telegraf.Metric {
if value == nil {
panic("Cannot use a nil value")
}
measurement := "test1"
if len(name) > 0 {
measurement = name[0]
}
tags := map[string]string{"tag1": "value1"}
pt, _ := metric.New(
measurement,
tags,
map[string]interface{}{"value": value},
time.Now().UTC(),
)
return pt
}
func TestPostData(t *testing.T) {
azmon := &AzureMonitor{
Region: "eastus",
}
err := azmon.Connect()
metrics := getMockMetrics()
t.Logf("mock metrics are %+v\n", metrics)
metricsList, err := azmon.flattenMetrics(metrics)
jsonBytes, err := json.Marshal(&metricsList[0])
t.Logf("json content is:\n----------\n%s\n----------\n", string(jsonBytes))
req, err := azmon.postData(&jsonBytes)
if err != nil {
// t.Logf("Error publishing metrics %s", err)
t.Logf("url is %+v\n", req.URL)
// t.Logf("failed request is %+v\n", req)
// raw, err := httputil.DumpRequestOut(req, true)
// if err != nil {
// t.Logf("Request detail is \n%s\n", string(raw))
// } else {
// t.Logf("could not dump request: %s\n", err)
// }
}
require.NoError(t, err)
}

93
test.conf Normal file
View File

@ -0,0 +1,93 @@
# Telegraf Configuration
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_jitter = "0s"
precision = ""
debug = false
quiet = false
logfile = ""
hostname = ""
omit_hostname = false
# Configuration for Azure Log Analytics to send metrics
[[outputs.loganalytics]]
workspace = "$OMS_WORKSPACE"
sharedKey = "$OMS_KEY"
logname = "metrics"
includeTags = ["host", "dc"]
###############################################################################
# INPUT PLUGINS #
###############################################################################
# # Influx HTTP write listener
[[inputs.http_listener]]
## Address and port to host HTTP listener on
service_address = ":8186"
## timeouts
read_timeout = "10s"
write_timeout = "10s"
# Read metrics about cpu usage
[[inputs.cpu]]
## Whether to report per-cpu stats or not
percpu = false
## Whether to report total system cpu stats or not
totalcpu = true
## If true, collect raw CPU time metrics.
collect_cpu_time = false
## If true, compute and report the sum of all non-idle CPU states.
report_active = false
# # Read metrics exposed by fluentd in_monitor plugin
# [[inputs.fluentd]]
# ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
# ##
# ## Endpoint:
# ## - only one URI is allowed
# ## - https is not supported
# endpoint = "http://localhost:24220/api/plugins.json"
#
# ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
# exclude = [
# "monitor_agent",
# "dummy",
# ]
# # Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.influxdb]]
# ## Works with InfluxDB debug endpoints out of the box,
# ## but other services can use this format too.
# ## See the influxdb plugin's README for more details.
#
# ## Multiple URLs from which to read InfluxDB-formatted JSON
# ## Default is "http://localhost:8086/debug/vars".
# urls = [
# "http://localhost:8086/debug/vars"
# ]
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## http request & header timeout
# timeout = "5s"