Changing AddValues to AddFields and temp disabling adding w time
Currently adding with time is broken, because InfluxDB does not support using precision for timestamp truncation both with and without timestamps. This will be re-enabled once we fix InfluxDB to use the precision argument for truncation in all cases, and a "unit" argument in the line-protocol for adding points with non-nanosecond stamps Fixes #175
This commit is contained in:
parent
46cd9ff9f5
commit
733ba07312
|
@ -39,7 +39,7 @@ type Plugin interface {
|
||||||
|
|
||||||
type Accumulator interface {
|
type Accumulator interface {
|
||||||
Add(measurement string, value interface{}, tags map[string]string)
|
Add(measurement string, value interface{}, tags map[string]string)
|
||||||
AddValuesWithTime(measurement string,
|
AddFieldsWithTime(measurement string,
|
||||||
values map[string]interface{},
|
values map[string]interface{},
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
timestamp time.Time)
|
timestamp time.Time)
|
||||||
|
@ -63,7 +63,7 @@ The `Add` function takes 3 arguments:
|
||||||
about the metric. For instance, the `net` plugin adds a tag named `"interface"`
|
about the metric. For instance, the `net` plugin adds a tag named `"interface"`
|
||||||
set to the name of the network interface, like `"eth0"`.
|
set to the name of the network interface, like `"eth0"`.
|
||||||
|
|
||||||
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
|
The `AddFieldsWithTime` allows multiple values for a point to be passed. The values
|
||||||
used are the same type profile as **value** above. The **timestamp** argument
|
used are the same type profile as **value** above. The **timestamp** argument
|
||||||
allows a point to be registered as having occurred at an arbitrary time.
|
allows a point to be registered as having occurred at an arbitrary time.
|
||||||
|
|
||||||
|
|
|
@ -62,12 +62,60 @@ func (bp *BatchPoints) Add(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddValuesWithTime adds a measurement with a provided timestamp
|
// AddFieldsWithTime adds a measurement with a provided timestamp
|
||||||
func (bp *BatchPoints) AddValuesWithTime(
|
func (bp *BatchPoints) AddFieldsWithTime(
|
||||||
measurement string,
|
measurement string,
|
||||||
values map[string]interface{},
|
fields map[string]interface{},
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
timestamp time.Time,
|
timestamp time.Time,
|
||||||
|
) {
|
||||||
|
// TODO this function should add the fields with the timestamp, but that will
|
||||||
|
// need to wait for the InfluxDB point precision/unit to be fixed
|
||||||
|
bp.AddFields(measurement, fields, tags)
|
||||||
|
// bp.mu.Lock()
|
||||||
|
// defer bp.mu.Unlock()
|
||||||
|
|
||||||
|
// measurement = bp.Prefix + measurement
|
||||||
|
|
||||||
|
// if bp.Config != nil {
|
||||||
|
// if !bp.Config.ShouldPass(measurement, tags) {
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if bp.Debug {
|
||||||
|
// var tg []string
|
||||||
|
|
||||||
|
// for k, v := range tags {
|
||||||
|
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var vals []string
|
||||||
|
|
||||||
|
// for k, v := range fields {
|
||||||
|
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
||||||
|
// }
|
||||||
|
|
||||||
|
// sort.Strings(tg)
|
||||||
|
// sort.Strings(vals)
|
||||||
|
|
||||||
|
// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
|
||||||
|
// }
|
||||||
|
|
||||||
|
// bp.Points = append(bp.Points, client.Point{
|
||||||
|
// Measurement: measurement,
|
||||||
|
// Tags: tags,
|
||||||
|
// Fields: fields,
|
||||||
|
// Time: timestamp,
|
||||||
|
// })
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddFields will eventually replace the Add function, once we move to having a
|
||||||
|
// single plugin as a single measurement with multiple fields
|
||||||
|
func (bp *BatchPoints) AddFields(
|
||||||
|
measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
) {
|
) {
|
||||||
bp.mu.Lock()
|
bp.mu.Lock()
|
||||||
defer bp.mu.Unlock()
|
defer bp.mu.Unlock()
|
||||||
|
@ -89,7 +137,7 @@ func (bp *BatchPoints) AddValuesWithTime(
|
||||||
|
|
||||||
var vals []string
|
var vals []string
|
||||||
|
|
||||||
for k, v := range values {
|
for k, v := range fields {
|
||||||
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +150,6 @@ func (bp *BatchPoints) AddValuesWithTime(
|
||||||
bp.Points = append(bp.Points, client.Point{
|
bp.Points = append(bp.Points, client.Point{
|
||||||
Measurement: measurement,
|
Measurement: measurement,
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
Fields: values,
|
Fields: fields,
|
||||||
Time: timestamp,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, point := range points {
|
for _, point := range points {
|
||||||
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
|
acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
|
||||||
}
|
}
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
|
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
|
||||||
acc.AddValuesWithTime(
|
acc.AddFieldsWithTime(
|
||||||
key,
|
key,
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"value": val,
|
"value": val,
|
||||||
|
|
|
@ -10,8 +10,8 @@ type Accumulator interface {
|
||||||
|
|
||||||
// Create a point with a set of values, decorating it with tags
|
// Create a point with a set of values, decorating it with tags
|
||||||
// NOTE: tags and values are expected to be owned by the caller, don't mutate
|
// NOTE: tags and values are expected to be owned by the caller, don't mutate
|
||||||
// them after passing to AddValuesWithTime.
|
// them after passing to AddFieldsWithTime.
|
||||||
AddValuesWithTime(
|
AddFieldsWithTime(
|
||||||
measurement string,
|
measurement string,
|
||||||
values map[string]interface{},
|
values map[string]interface{},
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
|
|
|
@ -34,8 +34,8 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddValuesWithTime adds a measurement point with a specified timestamp.
|
// AddFieldsWithTime adds a measurement point with a specified timestamp.
|
||||||
func (a *Accumulator) AddValuesWithTime(
|
func (a *Accumulator) AddFieldsWithTime(
|
||||||
measurement string,
|
measurement string,
|
||||||
values map[string]interface{},
|
values map[string]interface{},
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
|
|
Loading…
Reference in New Issue