Fix could not mark message delivered error in kafka_consumer (#6363)
This commit is contained in:
@@ -360,8 +360,8 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||
}
|
||||
}
|
||||
|
||||
id := h.acc.AddTrackingMetricGroup(metrics)
|
||||
h.mu.Lock()
|
||||
id := h.acc.AddTrackingMetricGroup(metrics)
|
||||
h.undelivered[id] = Message{session: session, message: msg}
|
||||
h.mu.Unlock()
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user