Compare commits

..

17 Commits

Author SHA1 Message Date
Kelvin Wang
54ac4d70c9 add kube state metrics 2018-07-06 11:54:44 -07:00
kelwang
90a38bd125 Merge pull request #2 from kelwang/revert-1-jenkins-gollar-changes
Revert "Jenkins gollar changes"
2018-06-25 17:14:14 -07:00
kelwang
d017718033 Revert "Jenkins gollar changes" 2018-06-25 17:14:03 -07:00
kelwang
8ff50e4327 Merge pull request #1 from kelwang/jenkins-gollar-changes
Jenkins gollar changes
2018-06-25 17:12:08 -07:00
Kelvin Wang
4ec7999186 use semaphore model 2018-06-25 17:10:15 -07:00
Kelvin Wang
3457c98eb1 remove err struct 2018-06-25 15:43:10 -07:00
Kelvin Wang
e7ff7d506b fix conflicts 2018-06-22 10:48:18 -07:00
Kelvin Wang
cdc15205d8 fix go 1.8 compatibility 2018-06-22 10:35:43 -07:00
Kelvin Wang
73eaa057d1 add requested changes 2018-06-22 10:35:43 -07:00
Kelvin Wang
9c85c05fcb add jenkins lib 2018-06-22 10:35:43 -07:00
Daniel Nelson
23523ffd10 Document path tag in tail input 2018-06-21 18:02:34 -07:00
Daniel Nelson
523d761f34 Update changelog 2018-06-21 17:59:31 -07:00
JongHyok Lee
3f28add025 Added path tag to tail input plugin (#4292) 2018-06-21 17:55:54 -07:00
Daniel Nelson
ee6e4b0afd Run windows tests with -short 2018-06-21 17:46:58 -07:00
Patrick Hemmer
16454e25ba Fix postfix input handling of multi-level queues (#4333) 2018-06-21 16:01:38 -07:00
Daniel Nelson
2a1feb6db9 Update changelog 2018-06-21 14:20:35 -07:00
Ayrdrie
61e197d254 Add support for comma in logparser timestamp format (#4311) 2018-06-21 14:19:15 -07:00
28 changed files with 872 additions and 985 deletions

View File

@@ -20,6 +20,8 @@
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input. - [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin. - [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input. - [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
## v1.7.1 [unreleased] ## v1.7.1 [unreleased]
@@ -27,6 +29,7 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
## v1.7 [2018-06-12] ## v1.7 [2018-06-12]

59
Gopkg.lock generated
View File

@@ -317,10 +317,19 @@
[[projects]] [[projects]]
name = "github.com/gogo/protobuf" name = "github.com/gogo/protobuf"
packages = ["proto"] packages = [
"proto",
"sortkeys"
]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c" revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0" version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/golang/glog"
packages = ["."]
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
[[projects]] [[projects]]
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"
packages = [ packages = [
@@ -350,6 +359,12 @@
revision = "3af367b6b30c263d47e8895973edcca9a49cf029" revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
version = "v0.2.0" version = "v0.2.0"
[[projects]]
branch = "master"
name = "github.com/google/gofuzz"
packages = ["."]
revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1"
[[projects]] [[projects]]
name = "github.com/gorilla/context" name = "github.com/gorilla/context"
packages = ["."] packages = ["."]
@@ -925,6 +940,12 @@
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b" revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
version = "v3.0.5" version = "v3.0.5"
[[projects]]
name = "gopkg.in/inf.v0"
packages = ["."]
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
version = "v0.9.1"
[[projects]] [[projects]]
name = "gopkg.in/ldap.v2" name = "gopkg.in/ldap.v2"
packages = ["."] packages = ["."]
@@ -965,9 +986,43 @@
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1" version = "v2.2.1"
[[projects]]
name = "k8s.io/api"
packages = ["core/v1"]
revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a"
version = "kubernetes-1.9.0"
[[projects]]
branch = "release-1.11"
name = "k8s.io/apimachinery"
packages = [
"pkg/api/resource",
"pkg/apis/meta/v1",
"pkg/conversion",
"pkg/conversion/queryparams",
"pkg/fields",
"pkg/labels",
"pkg/runtime",
"pkg/runtime/schema",
"pkg/selection",
"pkg/types",
"pkg/util/errors",
"pkg/util/intstr",
"pkg/util/json",
"pkg/util/net",
"pkg/util/runtime",
"pkg/util/sets",
"pkg/util/validation",
"pkg/util/validation/field",
"pkg/util/wait",
"pkg/watch",
"third_party/forked/golang/reflect"
]
revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae"
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370" inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@@ -241,3 +241,15 @@
[[override]] [[override]]
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
name = "gopkg.in/fsnotify.v1" name = "gopkg.in/fsnotify.v1"
[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/kubernetes"
version = "v1.11.0"

View File

@@ -54,11 +54,11 @@ fmtcheck:
@echo '[INFO] done.' @echo '[INFO] done.'
test-windows: test-windows:
go test ./plugins/inputs/ping/... go test -short ./plugins/inputs/ping/...
go test ./plugins/inputs/win_perf_counters/... go test -short ./plugins/inputs/win_perf_counters/...
go test ./plugins/inputs/win_services/... go test -short ./plugins/inputs/win_services/...
go test ./plugins/inputs/procstat/... go test -short ./plugins/inputs/procstat/...
go test ./plugins/inputs/ntpq/... go test -short ./plugins/inputs/ntpq/...
# vet runs the Go source code static analysis tool `vet` to find # vet runs the Go source code static analysis tool `vet` to find
# any common errors. # any common errors.

View File

@@ -48,6 +48,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_state"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser" _ "github.com/influxdata/telegraf/plugins/inputs/logparser"

View File

@@ -0,0 +1,77 @@
### Line Protocol
### PODs
#### kube_pod
namespace =
name =
host_ip =
pod_ip =
node =
created_by_kind =
created_by_name =
owner_kind =
owner_name =
owner_is_controller = "true"
label_1 = ""
label_2 = ""
created = ""
start_time =
completion_time =
owner =
label_* =
created =
status_scheduled_time
#### kube_pod_status_scheduled_time
#### kube_pod_status_phase
#### kube_pod_status_ready
#### kube_pod_status_scheduled
#### kube_pod_container_info
namespace=
pod_name=
container_name=
#### kube_pod_container_status_waiting
#### kube_pod_container_status_waiting_reason
#### kube_pod_container_status_running
#### kube_pod_container_status_terminated
#### kube_pod_container_status_terminated_reason
#### kube_pod_container_status_ready
#### kube_pod_container_status_restarts_total
#### kube_pod_container_resource_requests
#### kube_pod_container_resource_limits
#### kube_pod_container_resource_requests_cpu_cores
#### kube_pod_container_resource_requests_memory_bytes
#### kube_pod_container_resource_limits_cpu_cores
#### kube_pod_container_resource_limits_memory_bytes
#### kube_pod_spec_volumes_persistentvolumeclaims_info
#### kube_pod_spec_volumes_persistentvolumeclaims_readonly

View File

@@ -0,0 +1,144 @@
package kube_state
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type client struct {
baseURL string
httpClient *http.Client
bearerToken string
semaphore chan struct{}
}
func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client {
return &client{
baseURL: baseURL,
httpClient: &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
TLSClientConfig: tlsConfig,
},
Timeout: timeout,
},
bearerToken: bearerToken,
semaphore: make(chan struct{}, maxConns),
}
}
func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) {
rList = new(metav1.APIResourceList)
if err = c.doGet(ctx, "", rList); err != nil {
return nil, err
}
if rList.GroupVersion == "" {
return nil, &APIError{
URL: c.baseURL,
StatusCode: http.StatusOK,
Title: "empty group version",
}
}
return rList, nil
}
func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) {
list = new(v1.NodeList)
if err = c.doGet(ctx, "/nodes/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) {
list = new(v1.PodList)
if err = c.doGet(ctx, "/pods/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) {
list = new(v1.ConfigMapList)
if err = c.doGet(ctx, "/configmaps/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(c.baseURL+url, c.bearerToken)
if err != nil {
return err
}
select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
return err
}
defer func() {
resp.Body.Close()
<-c.semaphore
}()
// Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized {
c.bearerToken = ""
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{
URL: url,
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}
if resp.StatusCode == http.StatusNoContent {
return nil
}
return json.NewDecoder(resp.Body).Decode(v)
}
func createGetRequest(url string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
req.Header.Add("Accept", "application/json")
return req, nil
}
type APIError struct {
URL string
StatusCode int
Title string
Description string
}
func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
}

View File

@@ -0,0 +1,42 @@
package kube_state
import (
"context"
"time"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var configMapMeasurement = "kube_configmap"
func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getConfigMaps(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, s := range list.Items {
if err = ks.gatherConfigMap(s, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error {
var creationTime time.Time
if !s.CreationTimestamp.IsZero() {
creationTime = s.CreationTimestamp.Time
}
fields := map[string]interface{}{
"gauge": 1,
}
tags := map[string]string{
"namespace": s.Namespace,
"configmap": s.Name,
"resource_version": s.ResourceVersion,
}
acc.AddFields(configMapMeasurement, fields, tags, creationTime)
return nil
}

View File

@@ -0,0 +1,194 @@
package kube_state
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KubenetesState represents the config object for the plugin.
type KubenetesState struct {
URL string
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
// MaxConnections for worker pool tcp connections
MaxConnections int `toml:"max_connections"`
// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *client
rListHash string
filter filter.Filter
lastFilterBuilt int64
ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"`
ResourceExclude []string `toml:"resource_exclude"`
DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"`
DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"`
}
var sampleConfig = `
## URL for the kubelet
url = "http://1.1.1.1:10255"
## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Woker pool for kube_state_metric plugin only
# empty this field will use default value 30
# max_connections = 30
`
//SampleConfig returns a sample config
func (k *KubenetesState) SampleConfig() string {
return sampleConfig
}
//Description returns the description of this plugin
func (k *KubenetesState) Description() string {
return "Read metrics from the kubernetes kubelet api"
}
//Gather collects kubernetes metrics from a given URL
func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) {
var rList *metav1.APIResourceList
if k.client == nil {
if k.client, rList, err = k.initClient(); err != nil {
return err
}
goto buildFilter
}
if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) {
println("! skip to gather")
goto doGather
}
rList, err = k.client.getAPIResourceList(context.Background())
if err != nil {
return err
}
buildFilter:
k.lastFilterBuilt = time.Now().Unix()
if err = k.buildFilter(rList); err != nil {
return err
}
doGather:
for n, f := range availableCollectors {
ctx := context.Background()
if k.filter.Match(n) {
println("!", n)
go f(ctx, acc, k)
}
}
return nil
}
func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error {
hash, err := genHash(rList)
if err != nil {
return err
}
if k.rListHash == hash {
return nil
}
k.rListHash = hash
include := make([]string, len(rList.APIResources))
for k, v := range rList.APIResources {
include[k] = v.Name
}
k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude)
return err
}
func genHash(rList *metav1.APIResourceList) (string, error) {
buf := new(bytes.Buffer)
for _, v := range rList.APIResources {
if _, err := buf.WriteString(v.Name + "|"); err != nil {
return "", err
}
}
sum := md5.Sum(buf.Bytes())
return string(sum[:]), nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){
// "cronjobs": RegisterCronJobCollector,
// "daemonsets": RegisterDaemonSetCollector,
// "deployments": RegisterDeploymentCollector,
// "jobs": RegisterJobCollector,
// "limitranges": RegisterLimitRangeCollector,
"nodes": registerNodeCollector,
"pods": registerPodCollector,
// "replicasets": RegisterReplicaSetCollector,
// "replicationcontrollers": RegisterReplicationControllerCollector,
// "resourcequotas": RegisterResourceQuotaCollector,
// "services": RegisterServiceCollector,
// "statefulsets": RegisterStatefulSetCollector,
// "persistentvolumes": RegisterPersistentVolumeCollector,
// "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector,
// "namespaces": RegisterNamespaceCollector,
// "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector,
// "endpoints": RegisterEndpointCollector,
// "secrets": RegisterSecretCollector,
"configmaps": registerConfigMapCollector,
}
func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err)
}
// default 30 concurrent TCP connections
if k.MaxConnections == 0 {
k.MaxConnections = 30
}
// default check resourceList every hour
if k.ResourceListCheckInterval == nil {
k.ResourceListCheckInterval = &internal.Duration{
Duration: time.Hour,
}
}
c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg)
rList, err := c.getAPIResourceList(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err)
}
log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion)
return c, rList, nil
}
func init() {
inputs.Add("kubernetes_state", func() telegraf.Input {
return &KubenetesState{}
})
}

View File

@@ -0,0 +1,73 @@
package kube_state
import (
"context"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var (
nodeMeasurement = "kube_node"
nodeTaintMeasurement = "kube_node_spec_taint"
)
func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getNodes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, n := range list.Items {
if err = ks.gatherNode(n, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
fields := map[string]interface{}{}
tags := map[string]string{
"node": n.Name,
"kernel_version": n.Status.NodeInfo.KernelVersion,
"os_image": n.Status.NodeInfo.OSImage,
"container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion,
"kubelet_version": n.Status.NodeInfo.KubeletVersion,
"kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion,
"provider_id": n.Spec.ProviderID,
"spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable)
}
if !n.CreationTimestamp.IsZero() {
fields["created"] = n.CreationTimestamp.Unix()
}
for k, v := range n.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
// Collect node taints
for _, taint := range n.Spec.Taints {
go gatherNodeTaint(n, taint, acc)
}
acc.AddFields(nodeMeasurement, fields, tags)
return nil
}
func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){
fields := map[string]interface{}{
"gauge":1,
}
tags := map[string]string{
"node": n.Name,
"key": taint.Key,
"value": taint.Value,
"effect":string(taint.Effect),
}
acc.AddFields(nodeTaintMeasurement, fields, tags)
}

View File

@@ -0,0 +1,202 @@
package kube_state
import (
"context"
"regexp"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/util/node"
)
var (
podMeasurement = "kube_pod"
podContainerMeasurement = "kube_pod_container"
podVolumeMeasurement = "kube_pod_spec_volumes"
)
func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getPods(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, p := range list.Items {
if err = ks.gatherPod(p, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
nodeName := p.Spec.NodeName
fields := make(map[string]interface{})
tags := make(map[string]string)
createdBy := metav1.GetControllerOf(&p)
createdByKind := ""
createdByName := ""
if createdBy != nil {
if createdBy.Kind != "" {
createdByKind = createdBy.Kind
}
if createdBy.Name != "" {
createdByName = createdBy.Name
}
}
if p.Status.StartTime != nil {
fields["start_time"] = p.Status.StartTime.UnixNano()
}
tags["namesapce"] = p.Namespace
tags["name"] = p.Name
tags["host_ip"] = p.Status.HostIP
tags["pod_ip"] = p.Status.PodIP
tags["node"] = nodeName
tags["created_by_kind"] = createdByKind
tags["created_by_name"] = createdByName
tags["status_scheduled"] = "false"
tags["status_ready"] = "false"
owners := p.GetOwnerReferences()
if len(owners) == 0 {
tags["owner_kind"] = ""
tags["owner_name"] = ""
tags["owner_is_controller"] = ""
} else {
tags["owner_kind"] = owners[0].Kind
tags["owner_name"] = owners[0].Name
if owners[0].Controller != nil {
tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller)
} else {
tags["owner_is_controller"] = "false"
}
}
for k, v := range p.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
if phase := p.Status.Phase; phase != "" {
tags["status_phase"] = string(phase)
// This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601
// For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410
if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason {
tags["status_phase"] = string(v1.PodUnknown)
}
}
if !p.CreationTimestamp.IsZero() {
fields["created"] = p.CreationTimestamp.Unix()
}
for _, c := range p.Status.Conditions {
switch c.Type {
case v1.PodReady:
tags["status_ready"] = "true"
case v1.PodScheduled:
tags["status_scheduled"] = "true"
fields["status_scheduled_time"] = c.LastTransitionTime.Unix()
}
}
var lastFinishTime int64
for i, cs := range p.Status.ContainerStatuses {
c := p.Spec.Containers[i]
gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc)
}
if lastFinishTime > 0 {
fields["completion_time"] = lastFinishTime
}
for _, v := range p.Spec.Volumes {
if v.PersistentVolumeClaim != nil {
gatherPodVolume(v, p, acc)
}
}
acc.AddFields(podMeasurement, fields, tags)
return nil
}
func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"read_only": 0.0,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod": p.Name,
"volume": v.Name,
"persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName,
}
if v.PersistentVolumeClaim.ReadOnly {
fields["read_only"] = 1.0
}
acc.AddFields(podVolumeMeasurement, fields, tags)
}
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"status_restarts_total": cs.RestartCount,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod_name": p.Name,
"node_name": nodeName,
"container": c.Name,
"image": cs.Image,
"image_id": cs.ImageID,
"container_id": cs.ContainerID,
"status_waiting": strconv.FormatBool(cs.State.Waiting != nil),
"status_waiting_reason": "",
"status_running": strconv.FormatBool(cs.State.Terminated != nil),
"status_terminated": strconv.FormatBool(cs.State.Running != nil),
"status_terminated_reason": "",
"container_status_ready": strconv.FormatBool(cs.Ready),
}
if cs.State.Waiting != nil {
tags["status_waiting_reason"] = cs.State.Waiting.Reason
}
if cs.State.Terminated != nil {
tags["status_terminated_reason"] = cs.State.Terminated.Reason
if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() {
*lastFinishTime = cs.State.Terminated.FinishedAt.Unix()
}
}
req := c.Resources.Requests
lim := c.Resources.Limits
for resourceName, val := range req {
switch resourceName {
case v1.ResourceCPU:
fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
for resourceName, val := range lim {
switch resourceName {
case v1.ResourceCPU:
fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
acc.AddFields(podContainerMeasurement, fields, tags)
}
var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitizeLabelName(s string) string {
return invalidLabelCharRE.ReplaceAllString(s, "_")
}

View File

@@ -108,7 +108,9 @@ You must capture at least one field per line.
- ts-"CUSTOM" - ts-"CUSTOM"
CUSTOM time layouts must be within quotes and be the representation of the CUSTOM time layouts must be within quotes and be the representation of the
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006` "reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
To match a comma decimal point you can use a period in the pattern string.
See https://golang.org/pkg/time/#Parse for more details. See https://golang.org/pkg/time/#Parse for more details.
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns), Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),

View File

@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts

View File

@@ -982,3 +982,21 @@ func TestSyslogTimestampParser(t *testing.T) {
require.NotNil(t, m) require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year()) require.Equal(t, 2018, m.Time().Year())
} }
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path" "path/filepath"
"strings" "strings"
"time" "time"
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil return strings.TrimSpace(string(qd)), nil
} }
func qScan(path string) (int64, int64, int64, error) { func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
var length, size int64 var length, size int64
var oldest time.Time var oldest time.Time
for _, finfo := range finfos { err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++ length++
size += finfo.Size() size += finfo.Size()
ctime := statCTime(finfo.Sys()) ctime := statCTime(finfo.Sys())
if ctime.IsZero() { if ctime.IsZero() {
continue return nil
} }
if oldest.IsZero() || ctime.Before(oldest) { if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime oldest = ctime
} }
return nil
})
if err != nil {
return 0, 0, 0, err
} }
var age int64 var age int64
if !oldest.IsZero() { if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second) age = int64(time.Now().Sub(oldest) / time.Second)
} else if len(finfos) != 0 { } else if length != 0 {
// system doesn't support ctime // system doesn't support ctime
age = -1 age = -1
} }
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
} }
} }
for _, q := range []string{"active", "hold", "incoming", "maildrop"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, q)) length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue continue
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
} }
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil return nil
} }

View File

@@ -3,7 +3,7 @@ package postfix
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path/filepath"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(td) defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
require.NoError(t, os.Mkdir(path.Join(td, q), 0755)) require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
} }
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
p := Postfix{ p := Postfix{
QueueDirectory: td, QueueDirectory: td,

View File

@@ -1,8 +1,6 @@
# Procstat Input Plugin # Procstat Input Plugin
The procstat plugin can be used to monitor the system resource usage of one or more processes. The procstat plugin can be used to monitor the system resource usage of one or more processes.
The procstat_lookup metric displays the query information,
specifically the number of PIDs returned on a search
Processes can be selected for monitoring using one of several methods: Processes can be selected for monitoring using one of several methods:
- pidfile - pidfile

View File

@@ -1,117 +0,0 @@
package reader
import (
"io/ioutil"
"log"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Reader struct {
Filepaths []string `toml:"files"`
FromBeginning bool
DataFormat string `toml:"data_format"`
ParserConfig parsers.Config
Parser parsers.Parser
Tags []string
Filenames []string
//for grok parser
Patterns []string
namedPatterns []string
CustomPatterns string
CustomPatternFiles []string
}
const sampleConfig = `## Files to parse.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""`
// SampleConfig returns the default configuration of the Input
func (r *Reader) SampleConfig() string {
return sampleConfig
}
func (r *Reader) Description() string {
return "reload and gather from file[s] on telegraf's interval"
}
func (r *Reader) Gather(acc telegraf.Accumulator) error {
r.refreshFilePaths()
for _, k := range r.Filenames {
metrics, err := r.readMetric(k)
if err != nil {
return err
}
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags())
}
}
return nil
}
func (r *Reader) compileParser() {
if r.DataFormat == "" {
log.Printf("E! No data_format specified")
return
}
r.ParserConfig = parsers.Config{
DataFormat: r.DataFormat,
TagKeys: r.Tags,
//grok settings
Patterns: r.Patterns,
NamedPatterns: r.namedPatterns,
CustomPatterns: r.CustomPatterns,
CustomPatternFiles: r.CustomPatternFiles,
}
nParser, err := parsers.NewParser(&r.ParserConfig)
if err != nil {
log.Printf("E! Error building parser: %v", err)
}
r.Parser = nParser
}
func (r *Reader) refreshFilePaths() {
var allFiles []string
for _, filepath := range r.Filepaths {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
for k := range files {
allFiles = append(allFiles, k)
}
}
r.Filenames = allFiles
}
//requires that Parser has been compiled
func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) {
fileContents, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("E! File could not be opened: %v", filename)
}
return r.Parser.Parse(fileContents)
}

View File

@@ -1,58 +0,0 @@
package reader
import (
"log"
"runtime"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestRefreshFilePaths(t *testing.T) {
testDir := getPluginDir()
r := Reader{
Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"},
}
r.refreshFilePaths()
//log.Printf("filenames: %v", filenames)
assert.Equal(t, len(r.Filenames), 2)
}
func TestJSONParserCompile(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"},
DataFormat: "json",
Tags: []string{"parent_ignored_child"},
}
r.compileParser()
r.Gather(&acc)
log.Printf("acc: %v", acc.Metrics[0].Tags)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"},
DataFormat: "grok",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
r.compileParser()
err := r.Gather(&acc)
log.Printf("err: %v", err)
log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields)
log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields)
assert.Equal(t, 2, len(acc.Metrics))
}
func getPluginDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "/reader/reader_test.go", "", 1)
}

View File

@@ -1,2 +0,0 @@
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45

View File

@@ -1,14 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,4 +1,4 @@
# tail Input Plugin # Tail Input Plugin
The tail plugin "tails" a logfile and parses each log message. The tail plugin "tails" a logfile and parses each log message.
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
data_format = "influx" data_format = "influx"
``` ```
### Metrics:
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
m, err = t.parser.ParseLine(text) m, err = t.parser.ParseLine(text)
if err == nil { if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else { } else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err)) tailer.Filename, line.Text, err))

View File

@@ -1,73 +0,0 @@
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

View File

@@ -1,78 +0,0 @@
package grok
// DEFAULT_PATTERNS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns
const DEFAULT_PATTERNS = `
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPV6}|%{IPV4}|%{HOSTNAME}|%{HOSTPORT})
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
`

View File

@@ -1,527 +0,0 @@
package grok
import (
"bufio"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/vjeantet/grok"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var timeLayouts = map[string]string{
"ts-ansic": "Mon Jan _2 15:04:05 2006",
"ts-unix": "Mon Jan _2 15:04:05 MST 2006",
"ts-ruby": "Mon Jan 02 15:04:05 -0700 2006",
"ts-rfc822": "02 Jan 06 15:04 MST",
"ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone
"ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST",
"ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST",
"ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone
"ts-rfc3339": "2006-01-02T15:04:05Z07:00",
"ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00",
"ts-httpd": "02/Jan/2006:15:04:05 -0700",
// These three are not exactly "layouts", but they are special cases that
// will get handled in the ParseLine function.
"ts-epoch": "EPOCH",
"ts-epochnano": "EPOCH_NANO",
"ts-syslog": "SYSLOG_TIMESTAMP",
"ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts.
}
const (
INT = "int"
TAG = "tag"
FLOAT = "float"
STRING = "string"
DURATION = "duration"
DROP = "drop"
EPOCH = "EPOCH"
EPOCH_NANO = "EPOCH_NANO"
SYSLOG_TIMESTAMP = "SYSLOG_TIMESTAMP"
GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP"
)
var (
// matches named captures that contain a modifier.
// ie,
// %{NUMBER:bytes:int}
// %{IPORHOST:clientip:tag}
// %{HTTPDATE:ts1:ts-http}
// %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"}
modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`)
// matches a plain pattern name. ie, %{NUMBER}
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
)
// Parser is the primary struct to handle and grok-patterns defined in the config toml
type Parser struct {
Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
Measurement string
// Timezone is an optional component to help render log dates to
// your chosen zone.
// Default: "" which renders UTC
// Options are as follows:
// 1. Local -- interpret based on machine localtime
// 2. "America/Chicago" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
// 3. UTC -- or blank/unspecified, will return timestamp in UTC
Timezone string
loc *time.Location
// typeMap is a map of patterns -> capture name -> modifier,
// ie, {
// "%{TESTLOG}":
// {
// "bytes": "int",
// "clientip": "tag"
// }
// }
typeMap map[string]map[string]string
// tsMap is a map of patterns -> capture name -> timestamp layout.
// ie, {
// "%{TESTLOG}":
// {
// "httptime": "02/Jan/2006:15:04:05 -0700"
// }
// }
tsMap map[string]map[string]string
// patterns is a map of all of the parsed patterns from CustomPatterns
// and CustomPatternFiles.
// ie, {
// "DURATION": "%{NUMBER}[nuµm]?s"
// "RESPONSE_CODE": "%{NUMBER:rc:tag}"
// }
patterns map[string]string
// foundTsLayouts is a slice of timestamp patterns that have been found
// in the log lines. This slice gets updated if the user uses the generic
// 'ts' modifier for timestamps. This slice is checked first for matches,
// so that previously-matched layouts get priority over all other timestamp
// layouts.
foundTsLayouts []string
timeFunc func() time.Time
g *grok.Grok
tsModder *tsModder
}
// Compile is a bound method to Parser which will process the options for our parser
func (p *Parser) Compile() error {
p.typeMap = make(map[string]map[string]string)
p.tsMap = make(map[string]map[string]string)
p.patterns = make(map[string]string)
p.tsModder = &tsModder{}
var err error
p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true})
if err != nil {
return err
}
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.NamedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns {
pattern = strings.TrimSpace(pattern)
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}")
}
if len(p.NamedPatterns) == 0 {
return fmt.Errorf("pattern required")
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, fileErr := os.Open(filename)
if fileErr != nil {
return fileErr
}
scanner := bufio.NewScanner(bufio.NewReader(file))
p.addCustomPatterns(scanner)
}
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
p.loc, err = time.LoadLocation(p.Timezone)
if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
p.loc, _ = time.LoadLocation("UTC")
}
if p.timeFunc == nil {
p.timeFunc = time.Now
}
return p.compileCustomPatterns()
}
// ParseLine is the primary function to process individual lines, returning the metrics
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error
// values are the parsed fields from the log line
var values map[string]string
// the matching pattern string
var patternName string
for _, pattern := range p.NamedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err
}
if len(values) != 0 {
patternName = pattern
break
}
}
if len(values) == 0 {
log.Printf("D! Grok no match found for: %q", line)
return nil, nil
}
fields := make(map[string]interface{})
tags := make(map[string]string)
timestamp := time.Now()
for k, v := range values {
if k == "" || v == "" {
continue
}
// t is the modifier of the field
var t string
// check if pattern has some modifiers
if types, ok := p.typeMap[patternName]; ok {
t = types[k]
}
// if we didn't find a modifier, check if we have a timestamp layout
if t == "" {
if ts, ok := p.tsMap[patternName]; ok {
// check if the modifier is a timestamp layout
if layout, ok := ts[k]; ok {
t = layout
}
}
}
// if we didn't find a type OR timestamp modifier, assume string
if t == "" {
t = STRING
}
switch t {
case INT:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
fields[k] = iv
}
case FLOAT:
fv, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("E! Error parsing %s to float: %s", v, err)
} else {
fields[k] = fv
}
case DURATION:
d, err := time.ParseDuration(v)
if err != nil {
log.Printf("E! Error parsing %s to duration: %s", v, err)
} else {
fields[k] = int64(d)
}
case TAG:
tags[k] = v
case STRING:
fields[k] = strings.Trim(v, `"`)
case EPOCH:
parts := strings.SplitN(v, ".", 2)
if len(parts) == 0 {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
sec, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts := time.Unix(sec, 0)
if len(parts) == 2 {
padded := fmt.Sprintf("%-9s", parts[1])
nsString := strings.Replace(padded[:9], " ", "0", -1)
nanosec, err := strconv.ParseInt(nsString, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts = ts.Add(time.Duration(nanosec) * time.Nanosecond)
}
timestamp = ts
case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
timestamp = time.Unix(0, iv)
}
case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)
}
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
case GENERIC_TIMESTAMP:
var foundTs bool
// first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
break
}
}
// if we haven't found a timestamp layout yet, try all timestamp
// layouts.
if !foundTs {
for _, layout := range timeLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
p.foundTsLayouts = append(p.foundTsLayouts, layout)
break
}
}
}
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("E! Error parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP:
// goodbye!
default:
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
}
}
if len(fields) == 0 {
return nil, fmt.Errorf("logparser_grok: must have one or more fields")
}
return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
lines := strings.Split(string(buf), "\n")
var metrics []telegraf.Metric
for _, line := range lines {
m, err := p.ParseLine(line)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
//needs implementation
}
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) > 0 && line[0] != '#' {
names := strings.SplitN(line, " ", 2)
p.patterns[names[0]] = names[1]
}
}
}
func (p *Parser) compileCustomPatterns() error {
var err error
// check if the pattern contains a subpattern that is already defined
// replace it with the subpattern for modifier inheritance.
for i := 0; i < 2; i++ {
for name, pattern := range p.patterns {
subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1)
for _, subName := range subNames {
if subPattern, ok := p.patterns[subName[1]]; ok {
pattern = strings.Replace(pattern, subName[0], subPattern, 1)
}
}
p.patterns[name] = pattern
}
}
// check if pattern contains modifiers. Parse them out if it does.
for name, pattern := range p.patterns {
if modifierRe.MatchString(pattern) {
// this pattern has modifiers, so parse out the modifiers
pattern, err = p.parseTypedCaptures(name, pattern)
if err != nil {
return err
}
p.patterns[name] = pattern
}
}
return p.g.AddPatternsFromMap(p.patterns)
}
// parseTypedCaptures parses the capture modifiers, and then deletes the
// modifier from the line so that it is a valid "grok" pattern again.
// ie,
// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int)
// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag)
func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
matches := modifierRe.FindAllStringSubmatch(pattern, -1)
// grab the name of the capture pattern
patternName := "%{" + name + "}"
// create type map for this pattern
p.typeMap[patternName] = make(map[string]string)
p.tsMap[patternName] = make(map[string]string)
// boolean to verify that each pattern only has a single ts- data type.
hasTimestamp := false
for _, match := range matches {
// regex capture 1 is the name of the capture
// regex capture 2 is the modifier of the capture
if strings.HasPrefix(match[2], "ts") {
if hasTimestamp {
return pattern, fmt.Errorf("logparser pattern compile error: "+
"Each pattern is allowed only one named "+
"timestamp data type. pattern: %s", pattern)
}
if layout, ok := timeLayouts[match[2]]; ok {
// built-in time format
p.tsMap[patternName][match[1]] = layout
} else {
// custom time format
p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`)
}
hasTimestamp = true
} else {
p.typeMap[patternName][match[1]] = match[2]
}
// the modifier is not a valid part of a "grok" pattern, so remove it
// from the pattern.
pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1)
}
return pattern, nil
}
// tsModder is a struct for incrementing identical timestamps of log lines
// so that we don't push identical metrics that will get overwritten.
type tsModder struct {
dupe time.Time
last time.Time
incr time.Duration
incrn time.Duration
rollover time.Duration
}
// tsMod increments the given timestamp one unit more from the previous
// duplicate timestamp.
// the increment unit is determined as the next smallest time unit below the
// most significant time unit of ts.
// ie, if the input is at ms precision, it will increment it 1µs.
func (t *tsModder) tsMod(ts time.Time) time.Time {
defer func() { t.last = ts }()
// don't mod the time if we don't need to
if t.last.IsZero() || ts.IsZero() {
t.incrn = 0
t.rollover = 0
return ts
}
if !ts.Equal(t.last) && !ts.Equal(t.dupe) {
t.incr = 0
t.incrn = 0
t.rollover = 0
return ts
}
if ts.Equal(t.last) {
t.dupe = ts
}
if ts.Equal(t.dupe) && t.incr == time.Duration(0) {
tsNano := ts.UnixNano()
d := int64(10)
counter := 1
for {
a := tsNano % d
if a > 0 {
break
}
d = d * 10
counter++
}
switch {
case counter <= 6:
t.incr = time.Nanosecond
case counter <= 9:
t.incr = time.Microsecond
case counter > 9:
t.incr = time.Millisecond
}
}
t.incrn++
if t.incrn == 999 && t.incr > time.Nanosecond {
t.rollover = t.incr * t.incrn
t.incrn = 1
t.incr = t.incr / 1000
if t.incr < time.Nanosecond {
t.incr = time.Nanosecond
}
}
return ts.Add(t.incr*t.incrn + t.rollover)
}

View File

@@ -1,19 +0,0 @@
package grok
import (
"log"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGrokParse(t *testing.T) {
parser := Parser{
Measurement: "t_met",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
parser.Compile()
metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`))
log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields())
assert.NoError(t, err)
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@@ -88,12 +87,6 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//grok patterns
Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@@ -127,36 +120,12 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok":
parser, err = NewGrokParser(
config.MetricName,
config.Patterns,
config.NamedPatterns,
config.CustomPatterns,
config.CustomPatternFiles)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func NewGrokParser(metricName string,
patterns []string,
nPatterns []string,
cPatterns string,
cPatternFiles []string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
}
parser.Compile()
return &parser, nil
}
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,