Collect from newly discovered/launched pods (#5293)

This commit is contained in:
Greg 2019-01-16 16:49:24 -07:00 committed by Daniel Nelson
parent 2b8729e048
commit b620a56d21
3 changed files with 84 additions and 31 deletions

View File

@ -45,6 +45,7 @@ func (p *Prometheus) start(ctx context.Context) error {
if err != nil { if err != nil {
return fmt.Errorf("Failed to get current user - %v", err) return fmt.Errorf("Failed to get current user - %v", err)
} }
configLocation := filepath.Join(u.HomeDir, ".kube/config") configLocation := filepath.Join(u.HomeDir, ".kube/config")
if p.KubeConfig != "" { if p.KubeConfig != "" {
configLocation = p.KubeConfig configLocation = p.KubeConfig
@ -76,6 +77,10 @@ func (p *Prometheus) start(ctx context.Context) error {
return nil return nil
} }
// An edge case exists if a pod goes offline at the same time a new pod is created
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
pod := &corev1.Pod{} pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, "", &corev1.Pod{}) watcher, err := client.Watch(ctx, "", &corev1.Pod{})
@ -96,18 +101,44 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
return err return err
} }
// If the pod is not "ready", there will be no ip associated with it.
if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.GetContainerStatuses()) {
continue
}
switch eventType { switch eventType {
case k8s.EventAdded: case k8s.EventAdded:
registerPod(pod, p) registerPod(pod, p)
case k8s.EventDeleted:
unregisterPod(pod, p)
case k8s.EventModified: case k8s.EventModified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.Metadata.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
} }
} }
} }
} }
func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
}
for _, cs := range statuss {
if !cs.GetReady() {
return false
}
}
return true
}
func registerPod(pod *corev1.Pod, p *Prometheus) { func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{}
}
targetURL := getScrapeURL(pod) targetURL := getScrapeURL(pod)
if targetURL == nil { if targetURL == nil {
return return
@ -116,6 +147,9 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL) log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL)
// add annotation as metrics tags // add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations() tags := pod.GetMetadata().GetAnnotations()
if tags == nil {
tags = map[string]string{}
}
tags["pod_name"] = pod.GetMetadata().GetName() tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace() tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags // add labels as metrics tags
@ -129,20 +163,16 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
} }
podURL := p.AddressToURL(URL, URL.Hostname()) podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock() p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods, p.kubernetesPods[podURL.String()] = URLAndAddress{
URLAndAddress{
URL: podURL, URL: podURL,
Address: URL.Hostname(), Address: URL.Hostname(),
OriginalURL: URL, OriginalURL: URL,
Tags: tags}) Tags: tags,
}
p.lock.Unlock() p.lock.Unlock()
} }
func getScrapeURL(pod *corev1.Pod) *string { func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP() ip := pod.Status.GetPodIP()
if ip == "" { if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod // return as if scrape was disabled, we will be notified again once the pod
@ -181,18 +211,13 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) {
return return
} }
p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s", log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods { p.lock.Lock()
if v.URL.String() != *url { defer p.lock.Unlock()
result = append(result, v) if _, ok := p.kubernetesPods[*url]; ok {
} else { delete(p.kubernetesPods, *url)
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url) log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
} }
}
p.kubernetesPods = result
} }

View File

@ -15,6 +15,7 @@ func TestScrapeURLNoAnnotations(t *testing.T) {
url := getScrapeURL(p) url := getScrapeURL(p)
assert.Nil(t, url) assert.Nil(t, url)
} }
func TestScrapeURLAnnotationsNoScrape(t *testing.T) { func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod") p.Metadata.Name = str("myPod")
@ -22,18 +23,21 @@ func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
url := getScrapeURL(p) url := getScrapeURL(p)
assert.Nil(t, url) assert.Nil(t, url)
} }
func TestScrapeURLAnnotations(t *testing.T) { func TestScrapeURLAnnotations(t *testing.T) {
p := pod() p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p) url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
} }
func TestScrapeURLAnnotationsCustomPort(t *testing.T) { func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod() p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p) url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
} }
func TestScrapeURLAnnotationsCustomPath(t *testing.T) { func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod() p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
@ -50,12 +54,14 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
func TestAddPod(t *testing.T) { func TestAddPod(t *testing.T) {
prom := &Prometheus{} prom := &Prometheus{}
p := pod() p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom) registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods)) assert.Equal(t, 1, len(prom.kubernetesPods))
} }
func TestAddMultiplePods(t *testing.T) {
func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{} prom := &Prometheus{}
p := pod() p := pod()
@ -63,8 +69,21 @@ func TestAddMultiplePods(t *testing.T) {
registerPod(p, prom) registerPod(p, prom)
p.Metadata.Name = str("Pod2") p.Metadata.Name = str("Pod2")
registerPod(p, prom) registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
p.Status.PodIP = str("127.0.0.2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods)) assert.Equal(t, 2, len(prom.kubernetesPods))
} }
func TestDeletePods(t *testing.T) { func TestDeletePods(t *testing.T) {
prom := &Prometheus{} prom := &Prometheus{}

View File

@ -43,7 +43,7 @@ type Prometheus struct {
// Should we scrape Kubernetes services for prometheus annotations // Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"` MonitorPods bool `toml:"monitor_kubernetes_pods"`
lock sync.Mutex lock sync.Mutex
kubernetesPods []URLAndAddress kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -118,21 +118,23 @@ type URLAndAddress struct {
Tags map[string]string Tags map[string]string
} }
func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make([]URLAndAddress, 0) allURLs := make(map[string]URLAndAddress, 0)
for _, u := range p.URLs { for _, u := range p.URLs {
URL, err := url.Parse(u) URL, err := url.Parse(u)
if err != nil { if err != nil {
log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error()) log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error())
continue continue
} }
allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL}
allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL})
} }
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods // loop through all pods scraped via the prometheus annotation on the pods
allURLs = append(allURLs, p.kubernetesPods...) for k, v := range p.kubernetesPods {
allURLs[k] = v
}
for _, service := range p.KubernetesServices { for _, service := range p.KubernetesServices {
URL, err := url.Parse(service) URL, err := url.Parse(service)
@ -147,7 +149,11 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
} }
for _, resolved := range resolvedAddresses { for _, resolved := range resolvedAddresses {
serviceURL := p.AddressToURL(URL, resolved) serviceURL := p.AddressToURL(URL, resolved)
allURLs = append(allURLs, URLAndAddress{URL: serviceURL, Address: resolved, OriginalURL: URL}) allURLs[serviceURL.String()] = URLAndAddress{
URL: serviceURL,
Address: resolved,
OriginalURL: URL,
}
} }
} }
return allURLs, nil return allURLs, nil
@ -317,6 +323,9 @@ func (p *Prometheus) Stop() {
func init() { func init() {
inputs.Add("prometheus", func() telegraf.Input { inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} return &Prometheus{
ResponseTimeout: internal.Duration{Duration: time.Second * 3},
kubernetesPods: map[string]URLAndAddress{},
}
}) })
} }