Race condition fix: copy BatchPoints into goroutine

Fixes #250
This commit is contained in:
Cameron Sparr 2015-10-08 11:33:56 -06:00
parent d9f1a60a64
commit 7293376973
3 changed files with 51 additions and 1 deletions

View File

@ -9,6 +9,11 @@ build: prepare
"-X main.Version=$(VERSION)" \ "-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go ./cmd/telegraf/telegraf.go
dev: prepare
$(GOBIN)/godep go build -race -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
build-linux-bins: prepare build-linux-bins: prepare
GOARCH=amd64 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_amd64 \ GOARCH=amd64 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_amd64 \
-ldflags "-X main.Version=$(VERSION)" \ -ldflags "-X main.Version=$(VERSION)" \

View File

@ -24,6 +24,47 @@ type BatchPoints struct {
Config *ConfiguredPlugin Config *ConfiguredPlugin
} }
// deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.mu.Lock()
defer bp.mu.Unlock()
var bpc BatchPoints
bpc.Time = bp.Time
bpc.Precision = bp.Precision
bpc.Tags = make(map[string]string)
for k, v := range bp.Tags {
bpc.Tags[k] = v
}
var pts []client.Point
for _, pt := range bp.Points {
var ptc client.Point
ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
ptc.Precision = pt.Precision
ptc.Raw = pt.Raw
ptc.Tags = make(map[string]string)
ptc.Fields = make(map[string]interface{})
for k, v := range pt.Tags {
ptc.Tags[k] = v
}
for k, v := range pt.Fields {
ptc.Fields[k] = v
}
pts = append(pts, ptc)
}
bpc.Points = pts
return &bpc
}
// Add adds a measurement // Add adds a measurement
func (bp *BatchPoints) Add( func (bp *BatchPoints) Add(
measurement string, measurement string,

View File

@ -303,10 +303,14 @@ func (a *Agent) flush(bp *BatchPoints) error {
for _, o := range a.outputs { for _, o := range a.outputs {
wg.Add(1) wg.Add(1)
// Copy BatchPoints
bpc := bp.deepcopy()
go func(ro *runningOutput) { go func(ro *runningOutput) {
defer wg.Done() defer wg.Done()
// Log all output errors: // Log all output errors:
if err := ro.output.Write(bp.BatchPoints); err != nil { if err := ro.output.Write(bpc.BatchPoints); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err) log.Printf("Error in output [%s]: %s", ro.name, err)
outerr = errors.New("Error encountered flushing outputs") outerr = errors.New("Error encountered flushing outputs")
} }