131 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			131 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
package buffer
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/influxdata/telegraf"
 | 
						|
	"github.com/influxdata/telegraf/selfstat"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	MetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{})
 | 
						|
	MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
 | 
						|
)
 | 
						|
 | 
						|
// Buffer is an object for storing metrics in a circular buffer.
 | 
						|
type Buffer struct {
 | 
						|
	sync.Mutex
 | 
						|
	buf   []telegraf.Metric
 | 
						|
	first int
 | 
						|
	last  int
 | 
						|
	size  int
 | 
						|
	empty bool
 | 
						|
}
 | 
						|
 | 
						|
// NewBuffer returns a Buffer
 | 
						|
//   size is the maximum number of metrics that Buffer will cache. If Add is
 | 
						|
//   called when the buffer is full, then the oldest metric(s) will be dropped.
 | 
						|
func NewBuffer(size int) *Buffer {
 | 
						|
	return &Buffer{
 | 
						|
		buf:   make([]telegraf.Metric, size),
 | 
						|
		first: 0,
 | 
						|
		last:  0,
 | 
						|
		size:  size,
 | 
						|
		empty: true,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// IsEmpty returns true if Buffer is empty.
 | 
						|
func (b *Buffer) IsEmpty() bool {
 | 
						|
	return b.empty
 | 
						|
}
 | 
						|
 | 
						|
// Len returns the current length of the buffer.
 | 
						|
func (b *Buffer) Len() int {
 | 
						|
	if b.empty {
 | 
						|
		return 0
 | 
						|
	} else if b.first <= b.last {
 | 
						|
		return b.last - b.first + 1
 | 
						|
	}
 | 
						|
	// Spans the end of array.
 | 
						|
	// size - gap in the middle
 | 
						|
	return b.size - (b.first - b.last - 1) // size - gap
 | 
						|
}
 | 
						|
 | 
						|
func (b *Buffer) push(m telegraf.Metric) {
 | 
						|
	// Empty
 | 
						|
	if b.empty {
 | 
						|
		b.last = b.first // Reset
 | 
						|
		b.buf[b.last] = m
 | 
						|
		b.empty = false
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	b.last++
 | 
						|
	b.last %= b.size
 | 
						|
 | 
						|
	// Full
 | 
						|
	if b.first == b.last {
 | 
						|
		MetricsDropped.Incr(1)
 | 
						|
		b.first = (b.first + 1) % b.size
 | 
						|
	}
 | 
						|
	b.buf[b.last] = m
 | 
						|
}
 | 
						|
 | 
						|
// Add adds metrics to the buffer.
 | 
						|
func (b *Buffer) Add(metrics ...telegraf.Metric) {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
	for i := range metrics {
 | 
						|
		MetricsWritten.Incr(1)
 | 
						|
		b.push(metrics[i])
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Batch returns a batch of metrics of size batchSize.
 | 
						|
// the batch will be of maximum length batchSize. It can be less than batchSize,
 | 
						|
// if the length of Buffer is less than batchSize.
 | 
						|
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
	outLen := min(b.Len(), batchSize)
 | 
						|
	out := make([]telegraf.Metric, outLen)
 | 
						|
	if outLen == 0 {
 | 
						|
		return out
 | 
						|
	}
 | 
						|
 | 
						|
	// We copy everything right of first up to last, count or end
 | 
						|
	// b.last >= rightInd || b.last < b.first
 | 
						|
	// therefore wont copy past b.last
 | 
						|
	rightInd := min(b.size, b.first+outLen) - 1
 | 
						|
 | 
						|
	copyCount := copy(out, b.buf[b.first:rightInd+1])
 | 
						|
 | 
						|
	// We've emptied the ring
 | 
						|
	if rightInd == b.last {
 | 
						|
		b.empty = true
 | 
						|
	}
 | 
						|
	b.first = rightInd + 1
 | 
						|
	b.first %= b.size
 | 
						|
 | 
						|
	// We circle back for the rest
 | 
						|
	if copyCount < outLen {
 | 
						|
		right := min(b.last, outLen-copyCount)
 | 
						|
		copy(out[copyCount:], b.buf[b.first:right+1])
 | 
						|
		// We've emptied the ring
 | 
						|
		if right == b.last {
 | 
						|
			b.empty = true
 | 
						|
		}
 | 
						|
		b.first = right + 1
 | 
						|
		b.first %= b.size
 | 
						|
	}
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func min(a, b int) int {
 | 
						|
	if b < a {
 | 
						|
		return b
 | 
						|
	}
 | 
						|
	return a
 | 
						|
}
 |