589 lines
12 KiB
Go
589 lines
12 KiB
Go
package metric
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
// TODO remove
|
|
"github.com/influxdata/influxdb/client/v2"
|
|
)
|
|
|
|
const MaxInt = int(^uint(0) >> 1)
|
|
|
|
func New(
|
|
name string,
|
|
tags map[string]string,
|
|
fields map[string]interface{},
|
|
t time.Time,
|
|
mType ...telegraf.ValueType,
|
|
) (telegraf.Metric, error) {
|
|
if len(fields) == 0 {
|
|
return nil, fmt.Errorf("Metric cannot be made without any fields")
|
|
}
|
|
if len(name) == 0 {
|
|
return nil, fmt.Errorf("Metric cannot be made with an empty name")
|
|
}
|
|
|
|
var thisType telegraf.ValueType
|
|
if len(mType) > 0 {
|
|
thisType = mType[0]
|
|
} else {
|
|
thisType = telegraf.Untyped
|
|
}
|
|
|
|
m := &metric{
|
|
name: []byte(escape(name, "name")),
|
|
t: []byte(fmt.Sprint(t.UnixNano())),
|
|
nsec: t.UnixNano(),
|
|
mType: thisType,
|
|
}
|
|
|
|
// pre-allocate exact size of the tags slice
|
|
taglen := 0
|
|
for k, v := range tags {
|
|
// TODO check that length of tag key & value are > 0
|
|
taglen += 2 + len(escape(k, "tagkey")) + len(escape(v, "tagval"))
|
|
}
|
|
m.tags = make([]byte, taglen)
|
|
|
|
i := 0
|
|
for k, v := range tags {
|
|
m.tags[i] = ','
|
|
i++
|
|
i += copy(m.tags[i:], escape(k, "tagkey"))
|
|
m.tags[i] = '='
|
|
i++
|
|
i += copy(m.tags[i:], escape(v, "tagval"))
|
|
}
|
|
|
|
// pre-allocate capacity of the fields slice
|
|
fieldlen := 0
|
|
for k, _ := range fields {
|
|
// 10 bytes is completely arbitrary, but will at least prevent some
|
|
// amount of allocations. There's a small possibility this will create
|
|
// slightly more allocations for a metric that has many short fields.
|
|
fieldlen += len(k) + 10
|
|
}
|
|
m.fields = make([]byte, 0, fieldlen)
|
|
|
|
i = 0
|
|
for k, v := range fields {
|
|
if i != 0 {
|
|
m.fields = append(m.fields, ',')
|
|
}
|
|
m.fields = appendField(m.fields, k, v)
|
|
i++
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// indexUnescapedByte finds the index of the first byte equal to b in buf that
|
|
// is not escaped. Returns -1 if not found.
|
|
func indexUnescapedByte(buf []byte, b byte) int {
|
|
var keyi int
|
|
for {
|
|
i := bytes.IndexByte(buf[keyi:], b)
|
|
if i == -1 {
|
|
return -1
|
|
} else if i == 0 {
|
|
break
|
|
}
|
|
keyi += i
|
|
if countBackslashes(buf, keyi-1)%2 == 0 {
|
|
break
|
|
} else {
|
|
keyi++
|
|
}
|
|
}
|
|
return keyi
|
|
}
|
|
|
|
// countBackslashes counts the number of preceding backslashes starting at
|
|
// the 'start' index.
|
|
func countBackslashes(buf []byte, index int) int {
|
|
var count int
|
|
for {
|
|
if index < 0 {
|
|
return count
|
|
}
|
|
if buf[index] == '\\' {
|
|
count++
|
|
index--
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
type metric struct {
|
|
name []byte
|
|
tags []byte
|
|
fields []byte
|
|
t []byte
|
|
|
|
mType telegraf.ValueType
|
|
aggregate bool
|
|
|
|
// cached values for reuse in "get" functions
|
|
hashID uint64
|
|
nsec int64
|
|
}
|
|
|
|
func (m *metric) Point() *client.Point {
|
|
c, _ := client.NewPoint(m.Name(), m.Tags(), m.Fields(), m.Time())
|
|
return c
|
|
}
|
|
|
|
func (m *metric) String() string {
|
|
return string(m.name) + string(m.tags) + " " + string(m.fields) + " " + string(m.t) + "\n"
|
|
}
|
|
|
|
func (m *metric) SetAggregate(b bool) {
|
|
m.aggregate = b
|
|
}
|
|
|
|
func (m *metric) IsAggregate() bool {
|
|
return m.aggregate
|
|
}
|
|
|
|
func (m *metric) Type() telegraf.ValueType {
|
|
return m.mType
|
|
}
|
|
|
|
func (m *metric) Len() int {
|
|
// 3 is for 2 spaces surrounding the fields array + newline at the end.
|
|
return len(m.name) + len(m.tags) + len(m.fields) + len(m.t) + 3
|
|
}
|
|
|
|
func (m *metric) Serialize() []byte {
|
|
tmp := make([]byte, m.Len())
|
|
i := 0
|
|
i += copy(tmp[i:], m.name)
|
|
i += copy(tmp[i:], m.tags)
|
|
tmp[i] = ' '
|
|
i++
|
|
i += copy(tmp[i:], m.fields)
|
|
tmp[i] = ' '
|
|
i++
|
|
i += copy(tmp[i:], m.t)
|
|
tmp[i] = '\n'
|
|
return tmp
|
|
}
|
|
|
|
func (m *metric) SerializeTo(dst []byte) int {
|
|
i := 0
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
i += copy(dst[i:], m.name)
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
i += copy(dst[i:], m.tags)
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
dst[i] = ' '
|
|
i++
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
i += copy(dst[i:], m.fields)
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
dst[i] = ' '
|
|
i++
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
|
|
i += copy(dst[i:], m.t)
|
|
if i >= len(dst) {
|
|
return i
|
|
}
|
|
dst[i] = '\n'
|
|
|
|
return i + 1
|
|
}
|
|
|
|
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
|
if m.Len() < maxSize {
|
|
return []telegraf.Metric{m}
|
|
}
|
|
var out []telegraf.Metric
|
|
|
|
// constant number of bytes for each metric (in addition to field bytes)
|
|
constant := len(m.name) + len(m.tags) + len(m.t) + 3
|
|
// currently selected fields
|
|
fields := make([]byte, 0, maxSize)
|
|
|
|
i := 0
|
|
for {
|
|
if i >= len(m.fields) {
|
|
// hit the end of the field byte slice
|
|
if len(fields) > 0 {
|
|
out = append(out, copyWith(m.name, m.tags, fields, m.t))
|
|
}
|
|
break
|
|
}
|
|
|
|
// find the end of the next field
|
|
j := indexUnescapedByte(m.fields[i:], ',')
|
|
if j == -1 {
|
|
j = len(m.fields)
|
|
} else {
|
|
j += i
|
|
}
|
|
|
|
// if true, then we need to create a metric _not_ including the currently
|
|
// selected field
|
|
if len(m.fields[i:j])+len(fields)+constant > maxSize {
|
|
// if false, then we'll create a metric including the currently
|
|
// selected field anyways. This means that the given maxSize is too
|
|
// small for a single field to fit.
|
|
if len(fields) > 0 {
|
|
out = append(out, copyWith(m.name, m.tags, fields, m.t))
|
|
}
|
|
|
|
fields = make([]byte, 0, maxSize)
|
|
}
|
|
if len(fields) > 0 {
|
|
fields = append(fields, ',')
|
|
}
|
|
fields = append(fields, m.fields[i:j]...)
|
|
|
|
i = j + 1
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (m *metric) Fields() map[string]interface{} {
|
|
fieldMap := map[string]interface{}{}
|
|
i := 0
|
|
for {
|
|
if i >= len(m.fields) {
|
|
break
|
|
}
|
|
// end index of field key
|
|
i1 := indexUnescapedByte(m.fields[i:], '=')
|
|
if i1 == -1 {
|
|
break
|
|
}
|
|
// start index of field value
|
|
i2 := i1 + 1
|
|
|
|
// end index of field value
|
|
var i3 int
|
|
if m.fields[i:][i2] == '"' {
|
|
i3 = indexUnescapedByte(m.fields[i:][i2+1:], '"')
|
|
if i3 == -1 {
|
|
i3 = len(m.fields[i:])
|
|
}
|
|
i3 += i2 + 2 // increment index to the comma
|
|
} else {
|
|
i3 = indexUnescapedByte(m.fields[i:], ',')
|
|
if i3 == -1 {
|
|
i3 = len(m.fields[i:])
|
|
}
|
|
}
|
|
|
|
switch m.fields[i:][i2] {
|
|
case '"':
|
|
// string field
|
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
|
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
|
// number field
|
|
switch m.fields[i:][i3-1] {
|
|
case 'i':
|
|
// integer field
|
|
n, err := parseIntBytes(m.fields[i:][i2:i3-1], 10, 64)
|
|
if err == nil {
|
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = n
|
|
} else {
|
|
// TODO handle error or just ignore field silently?
|
|
}
|
|
default:
|
|
// float field
|
|
n, err := parseFloatBytes(m.fields[i:][i2:i3], 64)
|
|
if err == nil {
|
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = n
|
|
} else {
|
|
// TODO handle error or just ignore field silently?
|
|
}
|
|
}
|
|
case 'T', 't':
|
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = true
|
|
case 'F', 'f':
|
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = false
|
|
default:
|
|
// TODO handle unsupported field type
|
|
}
|
|
|
|
i += i3 + 1
|
|
}
|
|
|
|
return fieldMap
|
|
}
|
|
|
|
func (m *metric) Tags() map[string]string {
|
|
tagMap := map[string]string{}
|
|
if len(m.tags) == 0 {
|
|
return tagMap
|
|
}
|
|
|
|
i := 0
|
|
for {
|
|
// start index of tag key
|
|
i0 := indexUnescapedByte(m.tags[i:], ',') + 1
|
|
if i0 == 0 {
|
|
// didn't find a tag start
|
|
break
|
|
}
|
|
// end index of tag key
|
|
i1 := indexUnescapedByte(m.tags[i:], '=')
|
|
// start index of tag value
|
|
i2 := i1 + 1
|
|
// end index of tag value (starting from i2)
|
|
i3 := indexUnescapedByte(m.tags[i+i2:], ',')
|
|
if i3 == -1 {
|
|
tagMap[unescape(string(m.tags[i:][i0:i1]), "tagkey")] = unescape(string(m.tags[i:][i2:]), "tagval")
|
|
break
|
|
}
|
|
tagMap[unescape(string(m.tags[i:][i0:i1]), "tagkey")] = unescape(string(m.tags[i:][i2:i2+i3]), "tagval")
|
|
// increment start index for the next tag
|
|
i += i2 + i3
|
|
}
|
|
|
|
return tagMap
|
|
}
|
|
|
|
func (m *metric) Name() string {
|
|
return unescape(string(m.name), "name")
|
|
}
|
|
|
|
func (m *metric) Time() time.Time {
|
|
// assume metric has been verified already and ignore error:
|
|
if m.nsec == 0 {
|
|
m.nsec, _ = parseIntBytes(m.t, 10, 64)
|
|
}
|
|
return time.Unix(0, m.nsec)
|
|
}
|
|
|
|
func (m *metric) UnixNano() int64 {
|
|
// assume metric has been verified already and ignore error:
|
|
if m.nsec == 0 {
|
|
m.nsec, _ = parseIntBytes(m.t, 10, 64)
|
|
}
|
|
return m.nsec
|
|
}
|
|
|
|
func (m *metric) SetName(name string) {
|
|
m.hashID = 0
|
|
m.name = []byte(nameEscaper.Replace(name))
|
|
}
|
|
|
|
func (m *metric) SetPrefix(prefix string) {
|
|
m.hashID = 0
|
|
m.name = append([]byte(nameEscaper.Replace(prefix)), m.name...)
|
|
}
|
|
|
|
func (m *metric) SetSuffix(suffix string) {
|
|
m.hashID = 0
|
|
m.name = append(m.name, []byte(nameEscaper.Replace(suffix))...)
|
|
}
|
|
|
|
func (m *metric) AddTag(key, value string) {
|
|
m.RemoveTag(key)
|
|
m.tags = append(m.tags, []byte(","+escape(key, "tagkey")+"="+escape(value, "tagval"))...)
|
|
}
|
|
|
|
func (m *metric) HasTag(key string) bool {
|
|
i := bytes.Index(m.tags, []byte(escape(key, "tagkey")+"="))
|
|
if i == -1 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *metric) RemoveTag(key string) {
|
|
m.hashID = 0
|
|
|
|
i := bytes.Index(m.tags, []byte(escape(key, "tagkey")+"="))
|
|
if i == -1 {
|
|
return
|
|
}
|
|
|
|
tmp := m.tags[0 : i-1]
|
|
j := indexUnescapedByte(m.tags[i:], ',')
|
|
if j != -1 {
|
|
tmp = append(tmp, m.tags[i+j:]...)
|
|
}
|
|
m.tags = tmp
|
|
return
|
|
}
|
|
|
|
func (m *metric) AddField(key string, value interface{}) {
|
|
m.fields = append(m.fields, ',')
|
|
m.fields = appendField(m.fields, key, value)
|
|
}
|
|
|
|
func (m *metric) HasField(key string) bool {
|
|
i := bytes.Index(m.fields, []byte(escape(key, "tagkey")+"="))
|
|
if i == -1 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (m *metric) RemoveField(key string) error {
|
|
i := bytes.Index(m.fields, []byte(escape(key, "tagkey")+"="))
|
|
if i == -1 {
|
|
return nil
|
|
}
|
|
|
|
var tmp []byte
|
|
if i != 0 {
|
|
tmp = m.fields[0 : i-1]
|
|
}
|
|
j := indexUnescapedByte(m.fields[i:], ',')
|
|
if j != -1 {
|
|
tmp = append(tmp, m.fields[i+j:]...)
|
|
}
|
|
|
|
if len(tmp) == 0 {
|
|
return fmt.Errorf("Metric cannot remove final field: %s", m.fields)
|
|
}
|
|
|
|
m.fields = tmp
|
|
return nil
|
|
}
|
|
|
|
func (m *metric) Copy() telegraf.Metric {
|
|
return copyWith(m.name, m.tags, m.fields, m.t)
|
|
}
|
|
|
|
func copyWith(name, tags, fields, t []byte) telegraf.Metric {
|
|
out := metric{
|
|
name: make([]byte, len(name)),
|
|
tags: make([]byte, len(tags)),
|
|
fields: make([]byte, len(fields)),
|
|
t: make([]byte, len(t)),
|
|
}
|
|
copy(out.name, name)
|
|
copy(out.tags, tags)
|
|
copy(out.fields, fields)
|
|
copy(out.t, t)
|
|
return &out
|
|
}
|
|
|
|
func (m *metric) HashID() uint64 {
|
|
if m.hashID == 0 {
|
|
h := fnv.New64a()
|
|
h.Write(m.name)
|
|
|
|
tags := m.Tags()
|
|
tmp := make([]string, len(tags))
|
|
i := 0
|
|
for k, v := range tags {
|
|
tmp[i] = k + v
|
|
i++
|
|
}
|
|
sort.Strings(tmp)
|
|
|
|
for _, s := range tmp {
|
|
h.Write([]byte(s))
|
|
}
|
|
|
|
m.hashID = h.Sum64()
|
|
}
|
|
return m.hashID
|
|
}
|
|
|
|
func appendField(b []byte, k string, v interface{}) []byte {
|
|
if v == nil {
|
|
return b
|
|
}
|
|
b = append(b, []byte(escape(k, "tagkey")+"=")...)
|
|
|
|
// check popular types first
|
|
switch v := v.(type) {
|
|
case float64:
|
|
b = strconv.AppendFloat(b, v, 'f', -1, 64)
|
|
case int64:
|
|
b = strconv.AppendInt(b, v, 10)
|
|
b = append(b, 'i')
|
|
case string:
|
|
b = append(b, '"')
|
|
b = append(b, []byte(escape(v, "fieldval"))...)
|
|
b = append(b, '"')
|
|
case bool:
|
|
b = strconv.AppendBool(b, v)
|
|
case int32:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case int16:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case int8:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case int:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case uint64:
|
|
// Cap uints above the maximum int value
|
|
var intv int64
|
|
if v <= uint64(MaxInt) {
|
|
intv = int64(v)
|
|
} else {
|
|
intv = int64(MaxInt)
|
|
}
|
|
b = strconv.AppendInt(b, intv, 10)
|
|
b = append(b, 'i')
|
|
case uint32:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case uint16:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case uint8:
|
|
b = strconv.AppendInt(b, int64(v), 10)
|
|
b = append(b, 'i')
|
|
case uint:
|
|
// Cap uints above the maximum int value
|
|
var intv int64
|
|
if v <= uint(MaxInt) {
|
|
intv = int64(v)
|
|
} else {
|
|
intv = int64(MaxInt)
|
|
}
|
|
b = strconv.AppendInt(b, intv, 10)
|
|
b = append(b, 'i')
|
|
case float32:
|
|
b = strconv.AppendFloat(b, float64(v), 'f', -1, 32)
|
|
case []byte:
|
|
b = append(b, v...)
|
|
default:
|
|
// Can't determine the type, so convert to string
|
|
b = append(b, '"')
|
|
b = append(b, []byte(escape(fmt.Sprintf("%v", v), "fieldval"))...)
|
|
b = append(b, '"')
|
|
}
|
|
|
|
return b
|
|
}
|