194 lines
5.6 KiB
Go
194 lines
5.6 KiB
Go
package kube_inventory
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kubernetes/apimachinery/pkg/api/resource"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/filter"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/internal/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
const (
|
|
defaultServiceAccountPath = "/run/secrets/kubernetes.io/serviceaccount/token"
|
|
)
|
|
|
|
// KubernetesInventory represents the config object for the plugin.
|
|
type KubernetesInventory struct {
|
|
URL string `toml:"url"`
|
|
BearerToken string `toml:"bearer_token"`
|
|
BearerTokenString string `toml:"bearer_token_string"`
|
|
Namespace string `toml:"namespace"`
|
|
ResponseTimeout internal.Duration `toml:"response_timeout"` // Timeout specified as a string - 3s, 1m, 1h
|
|
ResourceExclude []string `toml:"resource_exclude"`
|
|
ResourceInclude []string `toml:"resource_include"`
|
|
MaxConfigMapAge internal.Duration `toml:"max_config_map_age"`
|
|
|
|
tls.ClientConfig
|
|
client *client
|
|
}
|
|
|
|
var sampleConfig = `
|
|
## URL for the Kubernetes API
|
|
url = "https://127.0.0.1"
|
|
|
|
## Namespace to use. Set to "" to use all namespaces.
|
|
# namespace = "default"
|
|
|
|
## Use bearer token for authorization. ('bearer_token' takes priority)
|
|
## If both of these are empty, we'll use the default serviceaccount:
|
|
## at: /run/secrets/kubernetes.io/serviceaccount/token
|
|
# bearer_token = "/path/to/bearer/token"
|
|
## OR
|
|
# bearer_token_string = "abc_123"
|
|
|
|
## Set response_timeout (default 5 seconds)
|
|
# response_timeout = "5s"
|
|
|
|
## Optional Resources to exclude from gathering
|
|
## Leave them with blank with try to gather everything available.
|
|
## Values can be - "daemonsets", deployments", "endpoints", "ingress", "nodes",
|
|
## "persistentvolumes", "persistentvolumeclaims", "pods", "services", "statefulsets"
|
|
# resource_exclude = [ "deployments", "nodes", "statefulsets" ]
|
|
|
|
## Optional Resources to include when gathering
|
|
## Overrides resource_exclude if both set.
|
|
# resource_include = [ "deployments", "nodes", "statefulsets" ]
|
|
|
|
## Optional TLS Config
|
|
# tls_ca = "/path/to/cafile"
|
|
# tls_cert = "/path/to/certfile"
|
|
# tls_key = "/path/to/keyfile"
|
|
## Use TLS but skip chain & host verification
|
|
# insecure_skip_verify = false
|
|
`
|
|
|
|
// SampleConfig returns a sample config
|
|
func (ki *KubernetesInventory) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description returns the description of this plugin
|
|
func (ki *KubernetesInventory) Description() string {
|
|
return "Read metrics from the Kubernetes api"
|
|
}
|
|
|
|
func (ki *KubernetesInventory) Init() error {
|
|
// If neither are provided, use the default service account.
|
|
if ki.BearerToken == "" && ki.BearerTokenString == "" {
|
|
ki.BearerToken = defaultServiceAccountPath
|
|
}
|
|
|
|
if ki.BearerToken != "" {
|
|
token, err := ioutil.ReadFile(ki.BearerToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ki.BearerTokenString = strings.TrimSpace(string(token))
|
|
}
|
|
|
|
var err error
|
|
ki.client, err = newClient(ki.URL, ki.Namespace, ki.BearerTokenString, ki.ResponseTimeout.Duration, ki.ClientConfig)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Gather collects kubernetes metrics from a given URL.
|
|
func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) {
|
|
resourceFilter, err := filter.NewIncludeExcludeFilter(ki.ResourceInclude, ki.ResourceExclude)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
ctx := context.Background()
|
|
|
|
for collector, f := range availableCollectors {
|
|
if resourceFilter.Match(collector) {
|
|
wg.Add(1)
|
|
go func(f func(ctx context.Context, acc telegraf.Accumulator, k *KubernetesInventory)) {
|
|
defer wg.Done()
|
|
f(ctx, acc, ki)
|
|
}(f)
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){
|
|
"daemonsets": collectDaemonSets,
|
|
"deployments": collectDeployments,
|
|
"endpoints": collectEndpoints,
|
|
"ingress": collectIngress,
|
|
"nodes": collectNodes,
|
|
"pods": collectPods,
|
|
"services": collectServices,
|
|
"statefulsets": collectStatefulSets,
|
|
"persistentvolumes": collectPersistentVolumes,
|
|
"persistentvolumeclaims": collectPersistentVolumeClaims,
|
|
}
|
|
|
|
func atoi(s string) int64 {
|
|
i, err := strconv.ParseInt(s, 10, 64)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return int64(i)
|
|
}
|
|
|
|
func convertQuantity(s string, m float64) int64 {
|
|
q, err := resource.ParseQuantity(s)
|
|
if err != nil {
|
|
log.Printf("D! [inputs.kube_inventory] failed to parse quantity: %s", err.Error())
|
|
return 0
|
|
}
|
|
f, err := strconv.ParseFloat(fmt.Sprint(q.AsDec()), 64)
|
|
if err != nil {
|
|
log.Printf("D! [inputs.kube_inventory] failed to parse float: %s", err.Error())
|
|
return 0
|
|
}
|
|
if m < 1 {
|
|
m = 1
|
|
}
|
|
return int64(f * m)
|
|
}
|
|
|
|
var (
|
|
daemonSetMeasurement = "kubernetes_daemonset"
|
|
deploymentMeasurement = "kubernetes_deployment"
|
|
endpointMeasurement = "kubernetes_endpoint"
|
|
ingressMeasurement = "kubernetes_ingress"
|
|
nodeMeasurement = "kubernetes_node"
|
|
persistentVolumeMeasurement = "kubernetes_persistentvolume"
|
|
persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim"
|
|
podContainerMeasurement = "kubernetes_pod_container"
|
|
serviceMeasurement = "kubernetes_service"
|
|
statefulSetMeasurement = "kubernetes_statefulset"
|
|
)
|
|
|
|
func init() {
|
|
inputs.Add("kube_inventory", func() telegraf.Input {
|
|
return &KubernetesInventory{
|
|
ResponseTimeout: internal.Duration{Duration: time.Second * 5},
|
|
Namespace: "default",
|
|
}
|
|
})
|
|
}
|