Use alias name in output metric buffer stats
This commit is contained in:
@@ -32,7 +32,7 @@ type Buffer struct {
|
||||
}
|
||||
|
||||
// NewBuffer returns a new empty Buffer with the given capacity.
|
||||
func NewBuffer(name string, capacity int) *Buffer {
|
||||
func NewBuffer(name string, alias string, capacity int) *Buffer {
|
||||
b := &Buffer{
|
||||
buf: make([]telegraf.Metric, capacity),
|
||||
first: 0,
|
||||
@@ -43,27 +43,27 @@ func NewBuffer(name string, capacity int) *Buffer {
|
||||
MetricsAdded: selfstat.Register(
|
||||
"write",
|
||||
"metrics_added",
|
||||
map[string]string{"output": name},
|
||||
map[string]string{"output": name, "alias": alias},
|
||||
),
|
||||
MetricsWritten: selfstat.Register(
|
||||
"write",
|
||||
"metrics_written",
|
||||
map[string]string{"output": name},
|
||||
map[string]string{"output": name, "alias": alias},
|
||||
),
|
||||
MetricsDropped: selfstat.Register(
|
||||
"write",
|
||||
"metrics_dropped",
|
||||
map[string]string{"output": name},
|
||||
map[string]string{"output": name, "alias": alias},
|
||||
),
|
||||
BufferSize: selfstat.Register(
|
||||
"write",
|
||||
"buffer_size",
|
||||
map[string]string{"output": name},
|
||||
map[string]string{"output": name, "alias": alias},
|
||||
),
|
||||
BufferLimit: selfstat.Register(
|
||||
"write",
|
||||
"buffer_limit",
|
||||
map[string]string{"output": name},
|
||||
map[string]string{"output": name, "alias": alias},
|
||||
),
|
||||
}
|
||||
b.BufferSize.Set(int64(0))
|
||||
|
||||
@@ -49,7 +49,7 @@ func MetricTime(sec int64) telegraf.Metric {
|
||||
}
|
||||
|
||||
func BenchmarkAddMetrics(b *testing.B) {
|
||||
buf := NewBuffer("test", 10000)
|
||||
buf := NewBuffer("test", "", 10000)
|
||||
m := Metric()
|
||||
for n := 0; n < b.N; n++ {
|
||||
buf.Add(m)
|
||||
@@ -64,14 +64,14 @@ func setup(b *Buffer) *Buffer {
|
||||
}
|
||||
|
||||
func TestBuffer_LenEmpty(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
require.Equal(t, 0, b.Len())
|
||||
}
|
||||
|
||||
func TestBuffer_LenOne(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m)
|
||||
|
||||
require.Equal(t, 1, b.Len())
|
||||
@@ -79,7 +79,7 @@ func TestBuffer_LenOne(t *testing.T) {
|
||||
|
||||
func TestBuffer_LenFull(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m, m, m)
|
||||
|
||||
require.Equal(t, 5, b.Len())
|
||||
@@ -87,7 +87,7 @@ func TestBuffer_LenFull(t *testing.T) {
|
||||
|
||||
func TestBuffer_LenOverfill(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
setup(b)
|
||||
b.Add(m, m, m, m, m, m)
|
||||
|
||||
@@ -95,14 +95,14 @@ func TestBuffer_LenOverfill(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_BatchLenZero(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
batch := b.Batch(0)
|
||||
|
||||
require.Len(t, batch, 0)
|
||||
}
|
||||
|
||||
func TestBuffer_BatchLenBufferEmpty(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
batch := b.Batch(2)
|
||||
|
||||
require.Len(t, batch, 0)
|
||||
@@ -110,7 +110,7 @@ func TestBuffer_BatchLenBufferEmpty(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchLenUnderfill(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m)
|
||||
batch := b.Batch(2)
|
||||
|
||||
@@ -119,7 +119,7 @@ func TestBuffer_BatchLenUnderfill(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchLenFill(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m)
|
||||
batch := b.Batch(2)
|
||||
require.Len(t, batch, 2)
|
||||
@@ -127,7 +127,7 @@ func TestBuffer_BatchLenFill(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchLenExact(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m)
|
||||
batch := b.Batch(2)
|
||||
require.Len(t, batch, 2)
|
||||
@@ -135,7 +135,7 @@ func TestBuffer_BatchLenExact(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(6)
|
||||
require.Len(t, batch, 5)
|
||||
@@ -143,7 +143,7 @@ func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchWrap(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(2)
|
||||
b.Accept(batch)
|
||||
@@ -153,7 +153,7 @@ func TestBuffer_BatchWrap(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_BatchLatest(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 4))
|
||||
b := setup(NewBuffer("test", "", 4))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -167,7 +167,7 @@ func TestBuffer_BatchLatest(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_BatchLatestWrap(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 4))
|
||||
b := setup(NewBuffer("test", "", 4))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -183,7 +183,7 @@ func TestBuffer_BatchLatestWrap(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_MultipleBatch(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 10))
|
||||
b := setup(NewBuffer("test", "", 10))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -209,7 +209,7 @@ func TestBuffer_MultipleBatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectWithRoom(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -232,7 +232,7 @@ func TestBuffer_RejectWithRoom(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectNothingNewFull(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -255,7 +255,7 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectNoRoom(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
|
||||
b.Add(MetricTime(2))
|
||||
@@ -284,7 +284,7 @@ func TestBuffer_RejectNoRoom(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectRoomExact(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
batch := b.Batch(2)
|
||||
@@ -308,7 +308,7 @@ func TestBuffer_RejectRoomExact(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -333,7 +333,7 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectPartialRoom(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
|
||||
b.Add(MetricTime(2))
|
||||
@@ -360,7 +360,7 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -403,7 +403,7 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectWrapped(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -434,7 +434,7 @@ func TestBuffer_RejectWrapped(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectAdjustFirst(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 10))
|
||||
b := setup(NewBuffer("test", "", 10))
|
||||
b.Add(MetricTime(1))
|
||||
b.Add(MetricTime(2))
|
||||
b.Add(MetricTime(3))
|
||||
@@ -482,7 +482,7 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) {
|
||||
|
||||
func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
b.Add(m, m, m, m, m)
|
||||
@@ -493,7 +493,7 @@ func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
|
||||
|
||||
func TestBuffer_AcceptRemovesBatch(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m)
|
||||
batch := b.Batch(2)
|
||||
b.Accept(batch)
|
||||
@@ -502,7 +502,7 @@ func TestBuffer_AcceptRemovesBatch(t *testing.T) {
|
||||
|
||||
func TestBuffer_RejectLeavesBatch(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m)
|
||||
batch := b.Batch(2)
|
||||
b.Reject(batch)
|
||||
@@ -511,7 +511,7 @@ func TestBuffer_RejectLeavesBatch(t *testing.T) {
|
||||
|
||||
func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(5)
|
||||
@@ -524,7 +524,7 @@ func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(5)
|
||||
@@ -537,7 +537,7 @@ func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) {
|
||||
|
||||
func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(3)
|
||||
@@ -549,7 +549,7 @@ func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) {
|
||||
|
||||
func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(3)
|
||||
@@ -561,7 +561,7 @@ func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) {
|
||||
|
||||
func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(3)
|
||||
@@ -573,7 +573,7 @@ func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) {
|
||||
|
||||
func TestBuffer_WrapWithBatch(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
|
||||
b.Add(m, m, m)
|
||||
b.Batch(3)
|
||||
@@ -584,7 +584,7 @@ func TestBuffer_WrapWithBatch(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchNotRemoved(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m, m, m)
|
||||
b.Batch(2)
|
||||
require.Equal(t, 5, b.Len())
|
||||
@@ -592,7 +592,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) {
|
||||
|
||||
func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
|
||||
m := Metric()
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(m, m, m, m, m)
|
||||
batch := b.Batch(2)
|
||||
b.Reject(batch)
|
||||
@@ -608,7 +608,7 @@ func TestBuffer_AcceptCallsMetricAccept(t *testing.T) {
|
||||
accept++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(mm, mm, mm)
|
||||
batch := b.Batch(2)
|
||||
b.Accept(batch)
|
||||
@@ -623,7 +623,7 @@ func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) {
|
||||
reject++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
setup(b)
|
||||
b.Add(mm, mm, mm, mm, mm)
|
||||
b.Add(mm, mm)
|
||||
@@ -638,7 +638,7 @@ func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) {
|
||||
reject++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
setup(b)
|
||||
b.Add(mm, mm, mm, mm, mm)
|
||||
batch := b.Batch(2)
|
||||
@@ -656,7 +656,7 @@ func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) {
|
||||
reject++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(mm, mm, mm, mm, mm)
|
||||
batch := b.Batch(5)
|
||||
b.Add(mm, mm)
|
||||
@@ -673,7 +673,7 @@ func TestBuffer_AddOverwriteAndReject(t *testing.T) {
|
||||
reject++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(mm, mm, mm, mm, mm)
|
||||
batch := b.Batch(5)
|
||||
b.Add(mm, mm, mm, mm, mm)
|
||||
@@ -697,7 +697,7 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
|
||||
accept++
|
||||
},
|
||||
}
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
b.Add(mm, mm, mm)
|
||||
b.Add(mm, mm, mm, mm)
|
||||
require.Equal(t, 2, reject)
|
||||
@@ -716,7 +716,7 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuffer_RejectEmptyBatch(t *testing.T) {
|
||||
b := setup(NewBuffer("test", 5))
|
||||
b := setup(NewBuffer("test", "", 5))
|
||||
batch := b.Batch(2)
|
||||
b.Add(MetricTime(1))
|
||||
b.Reject(batch)
|
||||
|
||||
@@ -79,7 +79,7 @@ func NewRunningOutput(
|
||||
}
|
||||
|
||||
ro := &RunningOutput{
|
||||
buffer: NewBuffer(config.LogName(), bufferLimit),
|
||||
buffer: NewBuffer(config.Name, config.Alias, bufferLimit),
|
||||
BatchReady: make(chan time.Time, 1),
|
||||
Output: output,
|
||||
Config: config,
|
||||
@@ -101,13 +101,6 @@ func NewRunningOutput(
|
||||
return ro
|
||||
}
|
||||
|
||||
func (c *OutputConfig) LogName() string {
|
||||
if c.Alias == "" {
|
||||
return c.Name
|
||||
}
|
||||
return c.Name + "::" + c.Alias
|
||||
}
|
||||
|
||||
func (r *RunningOutput) LogName() string {
|
||||
return logName("outputs", r.Config.Name, r.Config.Alias)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user