NSQ Output plugin

NSQ output plugin, following the NSQ methodology output is a producer
to one instance of NSQD. The go library does not accept array values be
default for a Producer. Additionally service discovery is generally
done as a consumer.

Follows same methodology as Kafka Output without the tag reference.
This commit is contained in:
Jonathan Cross 2015-10-27 12:59:05 -04:00
parent f7eae86cdb
commit 6430047828
37 changed files with 5469 additions and 0 deletions

10
Godeps/Godeps.json generated
View File

@ -260,6 +260,16 @@
{
"ImportPath": "gopkg.in/yaml.v2",
"Rev": "7ad95dd0798a40da1ccdff6dff35fd177b5edf40"
},
{
"ImportPath": "github.com/mreiferson/go-snappystream",
"Comment": "v0.2.3",
"Rev": "028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504"
},
{
"ImportPath": "github.com/nsqio/go-nsq",
"Comment": "v1.0.5-6-g2118015",
"Rev": "2118015c120962edc5d03325c680daf3163a8b5f"
}
]
}

View File

@ -0,0 +1,13 @@
language: go
go:
- 1.2.2
- 1.3.1
env:
- GOARCH=amd64
- GOARCH=386
install:
- go get code.google.com/p/snappy-go/snappy
script:
- go test -v
notifications:
email: false

View File

@ -0,0 +1,17 @@
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,11 @@
## go-snappystream
a Go package for framed snappy streams.
[![Build Status](https://secure.travis-ci.org/mreiferson/go-snappystream.png?branch=master)](http://travis-ci.org/mreiferson/go-snappystream) [![GoDoc](https://godoc.org/github.com/mreiferson/go-snappystream?status.svg)](https://godoc.org/github.com/mreiferson/go-snappystream)
This package wraps [snappy-go][1] and supplies a `Reader` and `Writer`
for the snappy [framed stream format][2].
[1]: https://code.google.com/p/snappy-go/
[2]: https://snappy.googlecode.com/svn/trunk/framing_format.txt

View File

@ -0,0 +1,321 @@
package snappystream
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"github.com/mreiferson/go-snappystream/snappy-go"
)
// errMssingStreamID is returned from a reader when the source stream does not
// begin with a stream identifier block (4.1 Stream identifier). Its occurance
// signifies that the source byte stream is not snappy framed.
var errMissingStreamID = fmt.Errorf("missing stream identifier")
type reader struct {
reader io.Reader
err error
seenStreamID bool
verifyChecksum bool
buf bytes.Buffer
hdr []byte
src []byte
dst []byte
}
// NewReader returns an io.Reader interface to the snappy framed stream format.
//
// It transparently handles reading the stream identifier (but does not proxy this
// to the caller), decompresses blocks, and (optionally) validates checksums.
//
// Internally, three buffers are maintained. The first two are for reading
// off the wrapped io.Reader and for holding the decompressed block (both are grown
// automatically and re-used and will never exceed the largest block size, 65536). The
// last buffer contains the *unread* decompressed bytes (and can grow indefinitely).
//
// The second param determines whether or not the reader will verify block
// checksums and can be enabled/disabled with the constants VerifyChecksum and SkipVerifyChecksum
//
// For each Read, the returned length will be up to the lesser of len(b) or 65536
// decompressed bytes, regardless of the length of *compressed* bytes read
// from the wrapped io.Reader.
func NewReader(r io.Reader, verifyChecksum bool) io.Reader {
return &reader{
reader: r,
verifyChecksum: verifyChecksum,
hdr: make([]byte, 4),
src: make([]byte, 4096),
dst: make([]byte, 4096),
}
}
// WriteTo implements the io.WriterTo interface used by io.Copy. It writes
// decoded data from the underlying reader to w. WriteTo returns the number of
// bytes written along with any error encountered.
func (r *reader) WriteTo(w io.Writer) (int64, error) {
if r.err != nil {
return 0, r.err
}
n, err := r.buf.WriteTo(w)
if err != nil {
// r.err doesn't need to be set because a write error occurred and the
// stream hasn't been corrupted.
return n, err
}
// pass a bufferFallbackWriter to nextFrame so that write errors may be
// recovered from, allowing the unwritten stream to be read successfully.
wfallback := &bufferFallbackWriter{
w: w,
buf: &r.buf,
}
for {
var m int
m, err = r.nextFrame(wfallback)
if wfallback.writerErr != nil && err == nil {
// a partial write was made before an error occurred and not all m
// bytes were writen to w. but decoded bytes were successfully
// buffered and reading can resume later.
n += wfallback.n
return n, wfallback.writerErr
}
n += int64(m)
if err == io.EOF {
return n, nil
}
if err != nil {
r.err = err
return n, err
}
}
panic("unreachable")
}
// bufferFallbackWriter writes to an underlying io.Writer until an error
// occurs. If a error occurs in the underlying io.Writer the value is saved
// for later inspection while the bufferFallbackWriter silently starts
// buffering all data written to it. From the caller's perspective
// bufferFallbackWriter has the same Write behavior has a bytes.Buffer.
//
// bufferFallbackWriter is useful for the reader.WriteTo method because it
// allows internal decoding routines to avoid interruption (and subsequent
// stream corruption) due to writing errors.
type bufferFallbackWriter struct {
w io.Writer
buf *bytes.Buffer
n int64 // number of bytes successfully written to w
writerErr error // any error that ocurred writing to w
}
// Write attempts to write b to the underlying io.Writer. If the underlying
// writer fails or has failed previously unwritten bytes are buffered
// internally. Write never returns an error but may panic with
// bytes.ErrTooLarge if the buffer grows too large.
func (w *bufferFallbackWriter) Write(b []byte) (int, error) {
if w.writerErr != nil {
return w.buf.Write(b)
}
n, err := w.w.Write(b)
w.n += int64(n)
if err != nil {
// begin buffering input. bytes.Buffer does not return errors and so we
// do not need complex error handling here.
w.writerErr = err
w.Write(b[n:])
return len(b), nil
}
return n, nil
}
func (r *reader) read(b []byte) (int, error) {
n, err := r.buf.Read(b)
r.err = err
return n, err
}
func (r *reader) Read(b []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
if r.buf.Len() < len(b) {
_, r.err = r.nextFrame(&r.buf)
if r.err == io.EOF {
// fill b with any remaining bytes in the buffer.
return r.read(b)
}
if r.err != nil {
return 0, r.err
}
}
return r.read(b)
}
func (r *reader) nextFrame(w io.Writer) (int, error) {
for {
// read the 4-byte snappy frame header
_, err := io.ReadFull(r.reader, r.hdr)
if err != nil {
return 0, err
}
// a stream identifier may appear anywhere and contains no information.
// it must appear at the beginning of the stream. when found, validate
// it and continue to the next block.
if r.hdr[0] == blockStreamIdentifier {
err := r.readStreamID()
if err != nil {
return 0, err
}
r.seenStreamID = true
continue
}
if !r.seenStreamID {
return 0, errMissingStreamID
}
switch typ := r.hdr[0]; {
case typ == blockCompressed || typ == blockUncompressed:
return r.decodeBlock(w)
case typ == blockPadding || (0x80 <= typ && typ <= 0xfd):
// skip blocks whose data must not be inspected (4.4 Padding, and 4.6
// Reserved skippable chunks).
err := r.discardBlock()
if err != nil {
return 0, err
}
continue
default:
// typ must be unskippable range 0x02-0x7f. Read the block in full
// and return an error (4.5 Reserved unskippable chunks).
err = r.discardBlock()
if err != nil {
return 0, err
}
return 0, fmt.Errorf("unrecognized unskippable frame %#x", r.hdr[0])
}
}
panic("unreachable")
}
// decodeDataBlock assumes r.hdr[0] to be either blockCompressed or
// blockUncompressed.
func (r *reader) decodeBlock(w io.Writer) (int, error) {
// read compressed block data and determine if uncompressed data is too
// large.
buf, err := r.readBlock()
if err != nil {
return 0, err
}
declen := len(buf[4:])
if r.hdr[0] == blockCompressed {
declen, err = snappy.DecodedLen(buf[4:])
if err != nil {
return 0, err
}
}
if declen > MaxBlockSize {
return 0, fmt.Errorf("decoded block data too large %d > %d", declen, MaxBlockSize)
}
// decode data and verify its integrity using the little-endian crc32
// preceding encoded data
crc32le, blockdata := buf[:4], buf[4:]
if r.hdr[0] == blockCompressed {
r.dst, err = snappy.Decode(r.dst, blockdata)
if err != nil {
return 0, err
}
blockdata = r.dst
}
if r.verifyChecksum {
checksum := unmaskChecksum(uint32(crc32le[0]) | uint32(crc32le[1])<<8 | uint32(crc32le[2])<<16 | uint32(crc32le[3])<<24)
actualChecksum := crc32.Checksum(blockdata, crcTable)
if checksum != actualChecksum {
return 0, fmt.Errorf("checksum does not match %x != %x", checksum, actualChecksum)
}
}
return w.Write(blockdata)
}
func (r *reader) readStreamID() error {
// the length of the block is fixed so don't decode it from the header.
if !bytes.Equal(r.hdr, streamID[:4]) {
return fmt.Errorf("invalid stream identifier length")
}
// read the identifier block data "sNaPpY"
block := r.src[:6]
_, err := noeof(io.ReadFull(r.reader, block))
if err != nil {
return err
}
if !bytes.Equal(block, streamID[4:]) {
return fmt.Errorf("invalid stream identifier block")
}
return nil
}
func (r *reader) discardBlock() error {
length := uint64(decodeLength(r.hdr[1:]))
_, err := noeof64(io.CopyN(ioutil.Discard, r.reader, int64(length)))
return err
}
func (r *reader) readBlock() ([]byte, error) {
// check bounds on encoded length (+4 for checksum)
length := decodeLength(r.hdr[1:])
if length > (maxEncodedBlockSize + 4) {
return nil, fmt.Errorf("encoded block data too large %d > %d", length, (maxEncodedBlockSize + 4))
}
if int(length) > len(r.src) {
r.src = make([]byte, length)
}
buf := r.src[:length]
_, err := noeof(io.ReadFull(r.reader, buf))
if err != nil {
return nil, err
}
return buf, nil
}
// decodeLength decodes a 24-bit (3-byte) little-endian length from b.
func decodeLength(b []byte) uint32 {
return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16
}
func unmaskChecksum(c uint32) uint32 {
x := c - 0xa282ead8
return ((x >> 17) | (x << 15))
}
// noeof is used after reads in situations where EOF signifies invalid
// formatting or corruption.
func noeof(n int, err error) (int, error) {
if err == io.EOF {
return n, io.ErrUnexpectedEOF
}
return n, err
}
// noeof64 is used after long reads (e.g. io.Copy) in situations where io.EOF
// signifies invalid formatting or corruption.
func noeof64(n int64, err error) (int64, error) {
if err == io.EOF {
return n, io.ErrUnexpectedEOF
}
return n, err
}

View File

@ -0,0 +1,124 @@
// Copyright 2011 The Snappy-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package snappy
import (
"encoding/binary"
"errors"
)
// ErrCorrupt reports that the input is invalid.
var ErrCorrupt = errors.New("snappy: corrupt input")
// DecodedLen returns the length of the decoded block.
func DecodedLen(src []byte) (int, error) {
v, _, err := decodedLen(src)
return v, err
}
// decodedLen returns the length of the decoded block and the number of bytes
// that the length header occupied.
func decodedLen(src []byte) (blockLen, headerLen int, err error) {
v, n := binary.Uvarint(src)
if n == 0 {
return 0, 0, ErrCorrupt
}
if uint64(int(v)) != v {
return 0, 0, errors.New("snappy: decoded block is too large")
}
return int(v), n, nil
}
// Decode returns the decoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire decoded block.
// Otherwise, a newly allocated slice will be returned.
// It is valid to pass a nil dst.
func Decode(dst, src []byte) ([]byte, error) {
dLen, s, err := decodedLen(src)
if err != nil {
return nil, err
}
if len(dst) < dLen {
dst = make([]byte, dLen)
}
var d, offset, length int
for s < len(src) {
switch src[s] & 0x03 {
case tagLiteral:
x := uint(src[s] >> 2)
switch {
case x < 60:
s += 1
case x == 60:
s += 2
if s > len(src) {
return nil, ErrCorrupt
}
x = uint(src[s-1])
case x == 61:
s += 3
if s > len(src) {
return nil, ErrCorrupt
}
x = uint(src[s-2]) | uint(src[s-1])<<8
case x == 62:
s += 4
if s > len(src) {
return nil, ErrCorrupt
}
x = uint(src[s-3]) | uint(src[s-2])<<8 | uint(src[s-1])<<16
case x == 63:
s += 5
if s > len(src) {
return nil, ErrCorrupt
}
x = uint(src[s-4]) | uint(src[s-3])<<8 | uint(src[s-2])<<16 | uint(src[s-1])<<24
}
length = int(x + 1)
if length <= 0 {
return nil, errors.New("snappy: unsupported literal length")
}
if length > len(dst)-d || length > len(src)-s {
return nil, ErrCorrupt
}
copy(dst[d:], src[s:s+length])
d += length
s += length
continue
case tagCopy1:
s += 2
if s > len(src) {
return nil, ErrCorrupt
}
length = 4 + int(src[s-2])>>2&0x7
offset = int(src[s-2])&0xe0<<3 | int(src[s-1])
case tagCopy2:
s += 3
if s > len(src) {
return nil, ErrCorrupt
}
length = 1 + int(src[s-3])>>2
offset = int(src[s-2]) | int(src[s-1])<<8
case tagCopy4:
return nil, errors.New("snappy: unsupported COPY_4 tag")
}
end := d + length
if offset > d || end > len(dst) {
return nil, ErrCorrupt
}
for ; d < end; d++ {
dst[d] = dst[d-offset]
}
}
if d != dLen {
return nil, ErrCorrupt
}
return dst[:d], nil
}

View File

@ -0,0 +1,174 @@
// Copyright 2011 The Snappy-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package snappy
import (
"encoding/binary"
)
// We limit how far copy back-references can go, the same as the C++ code.
const maxOffset = 1 << 15
// emitLiteral writes a literal chunk and returns the number of bytes written.
func emitLiteral(dst, lit []byte) int {
i, n := 0, uint(len(lit)-1)
switch {
case n < 60:
dst[0] = uint8(n)<<2 | tagLiteral
i = 1
case n < 1<<8:
dst[0] = 60<<2 | tagLiteral
dst[1] = uint8(n)
i = 2
case n < 1<<16:
dst[0] = 61<<2 | tagLiteral
dst[1] = uint8(n)
dst[2] = uint8(n >> 8)
i = 3
case n < 1<<24:
dst[0] = 62<<2 | tagLiteral
dst[1] = uint8(n)
dst[2] = uint8(n >> 8)
dst[3] = uint8(n >> 16)
i = 4
case int64(n) < 1<<32:
dst[0] = 63<<2 | tagLiteral
dst[1] = uint8(n)
dst[2] = uint8(n >> 8)
dst[3] = uint8(n >> 16)
dst[4] = uint8(n >> 24)
i = 5
default:
panic("snappy: source buffer is too long")
}
if copy(dst[i:], lit) != len(lit) {
panic("snappy: destination buffer is too short")
}
return i + len(lit)
}
// emitCopy writes a copy chunk and returns the number of bytes written.
func emitCopy(dst []byte, offset, length int) int {
i := 0
for length > 0 {
x := length - 4
if 0 <= x && x < 1<<3 && offset < 1<<11 {
dst[i+0] = uint8(offset>>8)&0x07<<5 | uint8(x)<<2 | tagCopy1
dst[i+1] = uint8(offset)
i += 2
break
}
x = length
if x > 1<<6 {
x = 1 << 6
}
dst[i+0] = uint8(x-1)<<2 | tagCopy2
dst[i+1] = uint8(offset)
dst[i+2] = uint8(offset >> 8)
i += 3
length -= x
}
return i
}
// Encode returns the encoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire encoded block.
// Otherwise, a newly allocated slice will be returned.
// It is valid to pass a nil dst.
func Encode(dst, src []byte) ([]byte, error) {
if n := MaxEncodedLen(len(src)); len(dst) < n {
dst = make([]byte, n)
}
// The block starts with the varint-encoded length of the decompressed bytes.
d := binary.PutUvarint(dst, uint64(len(src)))
// Return early if src is short.
if len(src) <= 4 {
if len(src) != 0 {
d += emitLiteral(dst[d:], src)
}
return dst[:d], nil
}
// Initialize the hash table. Its size ranges from 1<<8 to 1<<14 inclusive.
const maxTableSize = 1 << 14
shift, tableSize := uint(32-8), 1<<8
for tableSize < maxTableSize && tableSize < len(src) {
shift--
tableSize *= 2
}
var table [maxTableSize]int
// Iterate over the source bytes.
var (
s int // The iterator position.
t int // The last position with the same hash as s.
lit int // The start position of any pending literal bytes.
)
for s+3 < len(src) {
// Update the hash table.
b0, b1, b2, b3 := src[s], src[s+1], src[s+2], src[s+3]
h := uint32(b0) | uint32(b1)<<8 | uint32(b2)<<16 | uint32(b3)<<24
p := &table[(h*0x1e35a7bd)>>shift]
// We need to to store values in [-1, inf) in table. To save
// some initialization time, (re)use the table's zero value
// and shift the values against this zero: add 1 on writes,
// subtract 1 on reads.
t, *p = *p-1, s+1
// If t is invalid or src[s:s+4] differs from src[t:t+4], accumulate a literal byte.
if t < 0 || s-t >= maxOffset || b0 != src[t] || b1 != src[t+1] || b2 != src[t+2] || b3 != src[t+3] {
s++
continue
}
// Otherwise, we have a match. First, emit any pending literal bytes.
if lit != s {
d += emitLiteral(dst[d:], src[lit:s])
}
// Extend the match to be as long as possible.
s0 := s
s, t = s+4, t+4
for s < len(src) && src[s] == src[t] {
s++
t++
}
// Emit the copied bytes.
d += emitCopy(dst[d:], s-t, s-s0)
lit = s
}
// Emit any final pending literal bytes and return.
if lit != len(src) {
d += emitLiteral(dst[d:], src[lit:])
}
return dst[:d], nil
}
// MaxEncodedLen returns the maximum length of a snappy block, given its
// uncompressed length.
func MaxEncodedLen(srcLen int) int {
// Compressed data can be defined as:
// compressed := item* literal*
// item := literal* copy
//
// The trailing literal sequence has a space blowup of at most 62/60
// since a literal of length 60 needs one tag byte + one extra byte
// for length information.
//
// Item blowup is trickier to measure. Suppose the "copy" op copies
// 4 bytes of data. Because of a special check in the encoding code,
// we produce a 4-byte copy only if the offset is < 65536. Therefore
// the copy op takes 3 bytes to encode, and this type of item leads
// to at most the 62/60 blowup for representing literals.
//
// Suppose the "copy" op copies 5 bytes of data. If the offset is big
// enough, it will take 5 bytes to encode the copy op. Therefore the
// worst case here is a one-byte literal followed by a five-byte copy.
// That is, 6 bytes of input turn into 7 bytes of "compressed" data.
//
// This last factor dominates the blowup, so the final estimate is:
return 32 + srcLen + srcLen/6
}

View File

@ -0,0 +1,38 @@
// Copyright 2011 The Snappy-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package snappy implements the snappy block-based compression format.
// It aims for very high speeds and reasonable compression.
//
// The C++ snappy implementation is at http://code.google.com/p/snappy/
package snappy
/*
Each encoded block begins with the varint-encoded length of the decoded data,
followed by a sequence of chunks. Chunks begin and end on byte boundaries. The
first byte of each chunk is broken into its 2 least and 6 most significant bits
called l and m: l ranges in [0, 4) and m ranges in [0, 64). l is the chunk tag.
Zero means a literal tag. All other values mean a copy tag.
For literal tags:
- If m < 60, the next 1 + m bytes are literal bytes.
- Otherwise, let n be the little-endian unsigned integer denoted by the next
m - 59 bytes. The next 1 + n bytes after that are literal bytes.
For copy tags, length bytes are copied from offset bytes ago, in the style of
Lempel-Ziv compression algorithms. In particular:
- For l == 1, the offset ranges in [0, 1<<11) and the length in [4, 12).
The length is 4 + the low 3 bits of m. The high 3 bits of m form bits 8-10
of the offset. The next byte is bits 0-7 of the offset.
- For l == 2, the offset ranges in [0, 1<<16) and the length in [1, 65).
The length is 1 + m. The offset is the little-endian unsigned integer
denoted by the next 2 bytes.
- For l == 3, this tag is a legacy format that is no longer supported.
*/
const (
tagLiteral = 0x00
tagCopy1 = 0x01
tagCopy2 = 0x02
tagCopy4 = 0x03
)

View File

@ -0,0 +1,54 @@
// snappystream wraps snappy-go and supplies a Reader and Writer
// for the snappy framed stream format:
// https://snappy.googlecode.com/svn/trunk/framing_format.txt
package snappystream
import (
"hash/crc32"
"github.com/mreiferson/go-snappystream/snappy-go"
)
// Ext is the file extension for files whose content is a snappy framed stream.
const Ext = ".sz"
// MediaType is the MIME type used to represent snappy framed content.
const MediaType = "application/x-snappy-framed"
// ContentEncoding is the appropriate HTTP Content-Encoding header value for
// requests containing a snappy framed entity body.
const ContentEncoding = "x-snappy-framed"
// MaxBlockSize is the maximum number of decoded bytes allowed to be
// represented in a snappy framed block (sections 4.2 and 4.3).
const MaxBlockSize = 65536
// maxEncodedBlockSize is the maximum number of encoded bytes in a framed
// block.
var maxEncodedBlockSize = uint32(snappy.MaxEncodedLen(MaxBlockSize))
const VerifyChecksum = true
const SkipVerifyChecksum = false
// Block types defined by the snappy framed format specification.
const (
blockCompressed = 0x00
blockUncompressed = 0x01
blockPadding = 0xfe
blockStreamIdentifier = 0xff
)
// streamID is the stream identifier block that begins a valid snappy framed
// stream.
var streamID = []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}
// maskChecksum implements the checksum masking algorithm described by the spec.
func maskChecksum(c uint32) uint32 {
return ((c >> 15) | (c << 17)) + 0xa282ead8
}
var crcTable *crc32.Table
func init() {
crcTable = crc32.MakeTable(crc32.Castagnoli)
}

View File

@ -0,0 +1,229 @@
package snappystream
import (
"bufio"
"errors"
"fmt"
"hash/crc32"
"io"
"github.com/mreiferson/go-snappystream/snappy-go"
)
var errClosed = fmt.Errorf("closed")
// BufferedWriter is an io.WriteCloser with behavior similar to writers
// returned by NewWriter but it buffers written data, maximizing block size (to
// improve the output compression ratio) at the cost of speed. Benefits over
// NewWriter are most noticible when individual writes are small and when
// streams are long.
//
// Failure to call a BufferedWriter's Close or Flush methods after it is done
// being written to will likely result in missing data frames which will be
// undetectable in the decoding process.
//
// NOTE: BufferedWriter cannot be instantiated via struct literal and must
// use NewBufferedWriter (i.e. its zero value is not usable).
type BufferedWriter struct {
err error
w *writer
bw *bufio.Writer
}
// NewBufferedWriter allocates and returns a BufferedWriter with an internal
// buffer of MaxBlockSize bytes. If an error occurs writing a block to w, all
// future writes will fail with the same error. After all data has been
// written, the client should call the Flush method to guarantee all data has
// been forwarded to the underlying io.Writer.
func NewBufferedWriter(w io.Writer) *BufferedWriter {
_w := NewWriter(w).(*writer)
return &BufferedWriter{
w: _w,
bw: bufio.NewWriterSize(_w, MaxBlockSize),
}
}
// ReadFrom implements the io.ReaderFrom interface used by io.Copy. It encodes
// data read from r as a snappy framed stream that is written to the underlying
// writer. ReadFrom returns the number number of bytes read, along with any
// error encountered (other than io.EOF).
func (w *BufferedWriter) ReadFrom(r io.Reader) (int64, error) {
if w.err != nil {
return 0, w.err
}
var n int64
n, w.err = w.bw.ReadFrom(r)
return n, w.err
}
// Write buffers p internally, encoding and writing a block to the underlying
// buffer if the buffer grows beyond MaxBlockSize bytes. The returned int
// will be 0 if there was an error and len(p) otherwise.
func (w *BufferedWriter) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
_, w.err = w.bw.Write(p)
if w.err != nil {
return 0, w.err
}
return len(p), nil
}
// Flush encodes and writes a block with the contents of w's internal buffer to
// the underlying writer even if the buffer does not contain a full block of
// data (MaxBlockSize bytes).
func (w *BufferedWriter) Flush() error {
if w.err == nil {
w.err = w.bw.Flush()
}
return w.err
}
// Close flushes w's internal buffer and tears down internal data structures.
// After a successful call to Close method calls on w return an error. Close
// makes no attempt to close the underlying writer.
func (w *BufferedWriter) Close() error {
if w.err != nil {
return w.err
}
w.err = w.bw.Flush()
w.w = nil
w.bw = nil
if w.err != nil {
return w.err
}
w.err = errClosed
return nil
}
type writer struct {
writer io.Writer
err error
hdr []byte
dst []byte
sentStreamID bool
}
// NewWriter returns an io.Writer that writes its input to an underlying
// io.Writer encoded as a snappy framed stream. A stream identifier block is
// written to w preceding the first data block. The returned writer will never
// emit a block with length in bytes greater than MaxBlockSize+4 nor one
// containing more than MaxBlockSize bytes of (uncompressed) data.
//
// For each Write, the returned length will only ever be len(p) or 0,
// regardless of the length of *compressed* bytes written to the wrapped
// io.Writer. If the returned length is 0 then error will be non-nil. If
// len(p) exceeds 65536, the slice will be automatically chunked into smaller
// blocks which are all emitted before the call returns.
func NewWriter(w io.Writer) io.Writer {
return &writer{
writer: w,
hdr: make([]byte, 8),
dst: make([]byte, 4096),
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
total := 0
sz := MaxBlockSize
var n int
for i := 0; i < len(p); i += n {
if i+sz > len(p) {
sz = len(p) - i
}
n, w.err = w.write(p[i : i+sz])
if w.err != nil {
return 0, w.err
}
total += n
}
return total, nil
}
// write attempts to encode p as a block and write it to the underlying writer.
// The returned int may not equal p's length if compression below
// MaxBlockSize-4 could not be achieved.
func (w *writer) write(p []byte) (int, error) {
var err error
if len(p) > MaxBlockSize {
return 0, errors.New(fmt.Sprintf("block too large %d > %d", len(p), MaxBlockSize))
}
w.dst = w.dst[:cap(w.dst)] // Encode does dumb resize w/o context. reslice avoids alloc.
w.dst, err = snappy.Encode(w.dst, p)
if err != nil {
return 0, err
}
block := w.dst
n := len(p)
compressed := true
// check for data which is better left uncompressed. this is determined if
// the encoded content is longer than the source.
if len(w.dst) >= len(p) {
compressed = false
block = p[:n]
}
if !w.sentStreamID {
_, err := w.writer.Write(streamID)
if err != nil {
return 0, err
}
w.sentStreamID = true
}
// set the block type
if compressed {
writeHeader(w.hdr, blockCompressed, block, p[:n])
} else {
writeHeader(w.hdr, blockUncompressed, block, p[:n])
}
_, err = w.writer.Write(w.hdr)
if err != nil {
return 0, err
}
_, err = w.writer.Write(block)
if err != nil {
return 0, err
}
return n, nil
}
// writeHeader panics if len(hdr) is less than 8.
func writeHeader(hdr []byte, btype byte, enc, dec []byte) {
hdr[0] = btype
// 3 byte little endian length of encoded content
length := uint32(len(enc)) + 4 // +4 for checksum
hdr[1] = byte(length)
hdr[2] = byte(length >> 8)
hdr[3] = byte(length >> 16)
// 4 byte little endian CRC32 checksum of decoded content
checksum := maskChecksum(crc32.Checksum(dec, crcTable))
hdr[4] = byte(checksum)
hdr[5] = byte(checksum >> 8)
hdr[6] = byte(checksum >> 16)
hdr[7] = byte(checksum >> 24)
}

View File

@ -0,0 +1,25 @@
language: go
go:
- 1.4.2
- 1.5.1
env:
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.5.linux-amd64.go1.4.2 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.5.linux-amd64.go1.4.2 GOARCH=386
install:
- go get github.com/mreiferson/go-snappystream
script:
- wget http://bitly-downloads.s3.amazonaws.com/nsq/$NSQ_DOWNLOAD.tar.gz
- tar zxvf $NSQ_DOWNLOAD.tar.gz
- export PATH=$NSQ_DOWNLOAD/bin:$PATH
- pushd $TRAVIS_BUILD_DIR
- ./test.sh
- popd
notifications:
email: false
sudo: false

View File

@ -0,0 +1,239 @@
## go-nsq Change Log
### 1.0.5 - 2015-09-19
**Upgrading from 1.0.4**: There are no backward incompatible changes.
* #156 - consumer: prevent data race on RNG
* #155 - config: support `flag.Value` interface
* #147/#150 - consumer: fix application of `max_backoff_duration` (thanks @judwhite)
* #138 - fix lint, vet, fmt issues
* #137 - remove `go-simplejson` dependency
### 1.0.4 - 2015-04-07
**Upgrading from 1.0.3**: There are no backward incompatible changes.
* #133 - fix `ErrNotConnected` race during `Producer` connection (thanks @jeddenlea)
* #132 - fix `RDY` redistribution after backoff with no connections
* #128 - fix backoff stall when using `RequeueWithoutBackoff`
* #127 - fix handling of connection closing when resuming after backoff (thanks @jnewmano)
* #126 - allow `BackoffStrategy` to be set via flag (thanks @twmb)
* #125 - add pluggable consumer `BackoffStrategy`; add full-jitter strategy (thanks @hden)
* #124 - add `DialTimeout` and `LocalAddr` config (thanks @yashkin)
* #119 - add `Producer.Ping()` method (thanks @zulily)
* #122 - refactor log level string handling
* #120 - fix `Message` data races on `responded`
* #114 - fix lookupd jitter having no effect (thanks @judwhite)
### 1.0.3 - 2015-02-07
**Upgrading from 1.0.2**: There are no backward incompatible changes.
* #104 - fix reconnect address bug (thanks @ryanslade)
* #106 - fix backoff reconnect deadlock (thanks @ryanslade)
* #107 - fix out-of-bounds error when removing nsqlookupd addresses (thanks @andreas)
* #108 - fix potential logger race conditions (thanks @judwhite)
* #111 - fix resolved address error in reconnect loop (thanks @twmb)
### 1.0.2 - 2015-01-21
**Upgrading from 1.0.1**: There are no backward incompatible changes.
* #102 - TLS min/max config defaults (thanks @twmb)
* #99 - fix `Consumer.Stop()` race and `Producer.Stop()` deadlock (thanks @tylertreat)
* #92 - expose `Message.NSQDAddress`
* #95 - cleanup panic during `Consumer.Stop()` if handlers are deadlocked
* #98 - add `tls-min-version` option (thanks @twmb)
* #93 - expose a way to get `Consumer` runtime stats (thanks @dcarney)
* #94 - allow `#ephemeral` topic names (thanks @jamesgroat)
### 1.0.1 - 2014-11-09
**Upgrading from 1.0.0**: There are no backward incompatible changes functionally, however this
release no longer compiles with Go `1.0.x`.
* #89 - don't spam connection teardown cleanup messages
* #91 - add consumer `DisconnectFrom*`
* #87 - allow `heartbeat_interval` and `output_buffer_timeout` to be disabled
* #86 - pluggable `nsqlookupd` behaviors
* #83 - send `RDY` before `FIN`/`REQ` (forwards compatibility with nsqio/nsq#404)
* #82 - fix panic when conn isn't assigned
* #75/#76 - minor config related bug fixes
* #75/#77/#78 - add `tls-cert` and `tls-key` config options
### 1.0.0 - 2014-08-11
**Upgrading from 0.3.7**: The public API was significantly refactored and is not backwards
compatible, please read [UPGRADING](UPGRADING.md).
* #58 - support `IDENTIFY` `msg_timeout`
* #54 - per-connection TLS config and set `ServerName`
* #49 - add common connect helpers
* #43/#63 - more flexible `nsqlookupd` URL specification
* #35 - `AUTH` support
* #41/#62 - use package private RNG
* #36 - support 64 character topic/channel names
* #30/#38/#39/#42/#45/#46/#48/#51/#52/#65/#70 - refactor public API (see [UPGRADING](UPGRADING.md))
### 0.3.7 - 2014-05-25
**Upgrading from 0.3.6**: There are no backward incompatible changes. **THIS IS THE LAST STABLE
RELEASE PROVIDING THIS API**. Future releases will be based on the api in #30 and **will not be
backwards compatible!**
This is a bug fix release relating to the refactoring done in `0.3.6`.
* #32 - fix potential panic for race condition when # conns == 0
* #33/#34 - more granular connection locking
### 0.3.6 - 2014-04-29
**Upgrading from 0.3.5**: There are no backward incompatible changes.
This release includes a significant internal refactoring, designed
to better encapsulate responsibility, see #19.
Specifically:
* make `Conn` public
* move transport responsibilities into `Conn` from `Reader`/`Writer`
* supply callbacks for hooking into `Conn` events
As part of the refactoring, a few additional clean exit related
issues were resolved:
* wait group now includes all exit related goroutines
* ensure that readLoop exits before exiting cleanup
* always check messagesInFlight at readLoop exit
* close underlying connection last
### 0.3.5 - 2014-04-05
**Upgrading from 0.3.4**: There are no backward incompatible changes.
This release includes a few new features such as support for channel
sampling and sending along a user agent string (which is now displayed
in `nsqadmin`).
Also, a critical bug fix for potential deadlocks (thanks @kjk
for reporting and help testing).
New Features/Improvements:
* #27 - reader logs disambiguate topic/channel
* #22 - channel sampling
* #23 - user agent
Bug Fixes:
* #24 - fix racey reader IDENTIFY buffering
* #29 - fix recursive RLock deadlocks
### 0.3.4 - 2013-11-19
**Upgrading from 0.3.3**: There are no backward incompatible changes.
This is a bug fix release, notably potential deadlocks in `Message.Requeue()` and `Message.Touch()`
as well as a potential busy loop cleaning up closed connections with in-flight messages.
New Features/Improvements:
* #14 - add `Reader.Configure()`
* #18 - return an exported error when an `nsqlookupd` address is already configured
Bug Fixes:
* #15 - dont let `handleError()` loop if already connected
* #17 - resolve potential deadlocks on `Message` responders
* #16 - eliminate busy loop when draining `finishedMessages`
### 0.3.3 - 2013-10-21
**Upgrading from 0.3.2**: This release requires NSQ binary version `0.2.23+` for compression
support.
This release contains significant `Reader` refactoring of the RDY handling code paths. The
motivation is documented in #1 however the commits in #8 identify individual changes. Additionally,
we eliminated deadlocks during connection cleanup in `Writer`.
As a result, both user-facing APIs should now be considerably more robust and stable. Additionally,
`Reader` should behave better when backing off.
New Features/Improvements:
* #9 - ability to ignore publish responses in `Writer`
* #12 - `Requeue()` method on `Message`
* #6 - `Touch()` method on `Message`
* #4 - snappy/deflate feature negotiation
Bug Fixes:
* #8 - `Reader` RDY handling refactoring (race conditions, deadlocks, consolidation)
* #13 - fix `Writer` deadlocks
* #10 - stop accessing simplejson internals
* #5 - fix `max-in-flight` race condition
### 0.3.2 - 2013-08-26
**Upgrading from 0.3.1**: This release requires NSQ binary version `0.2.22+` for TLS support.
New Features/Improvements:
* #227 - TLS feature negotiation
* #164/#202/#255 - add `Writer`
* #186 - `MaxBackoffDuration` of `0` disables backoff
* #175 - support for `nsqd` config option `--max-rdy-count`
* #169 - auto-reconnect to hard-coded `nsqd`
Bug Fixes:
* #254/#256/#257 - new connection RDY starvation
* #250 - `nsqlookupd` polling improvements
* #243 - limit `IsStarved()` to connections w/ in-flight messages
* #169 - use last RDY count for `IsStarved()`; redistribute RDY state
* #204 - fix early termination blocking
* #177 - support `broadcast_address`
* #161 - connection pool goroutine safety
### 0.3.1 - 2013-02-07
**Upgrading from 0.3.0**: This release requires NSQ binary version `0.2.17+` for `TOUCH` support.
* #119 - add TOUCH command
* #133 - improved handling of errors/magic
* #127 - send IDENTIFY (missed in #90)
* #16 - add backoff to Reader
### 0.3.0 - 2013-01-07
**Upgrading from 0.2.4**: There are no backward incompatible changes to applications
written against the public `nsq.Reader` API.
However, there *are* a few backward incompatible changes to the API for applications that
directly use other public methods, or properties of a few NSQ data types:
`nsq.Message` IDs are now a type `nsq.MessageID` (a `[16]byte` array). The signatures of
`nsq.Finish()` and `nsq.Requeue()` reflect this change.
`nsq.SendCommand()` and `nsq.Frame()` were removed in favor of `nsq.SendFramedResponse()`.
`nsq.Subscribe()` no longer accepts `shortId` and `longId`. If upgrading your consumers
before upgrading your `nsqd` binaries to `0.2.16-rc.1` they will not be able to send the
optional custom identifiers.
* #90 performance optimizations
* #81 reader performance improvements / MPUB support
### 0.2.4 - 2012-10-15
* #69 added IsStarved() to reader API
### 0.2.3 - 2012-10-11
* #64 timeouts on reader queries to lookupd
* #54 fix crash issue with reader cleaning up from unexpectedly closed nsqd connections
### 0.2.2 - 2012-10-09
* Initial public release

17
Godeps/_workspace/src/github.com/nsqio/go-nsq/LICENSE generated vendored Normal file
View File

@ -0,0 +1,17 @@
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,19 @@
## go-nsq
[![Build Status](https://secure.travis-ci.org/nsqio/go-nsq.svg?branch=master)][travis] [![GoDoc](https://godoc.org/github.com/nsqio/go-nsq?status.svg)](https://godoc.org/github.com/nsqio/go-nsq) [![GitHub release](https://img.shields.io/github/release/nsqio/go-nsq.svg)](https://github.com/nsqio/go-nsq/releases/latest)
The official Go package for [NSQ][nsq].
### Docs
See [godoc][nsq_gopkgdoc] and the [main repo apps][apps] directory for examples of clients built
using this package.
### Tests
Tests are run via `./test.sh` (which requires `nsqd` and `nsqlookupd` to be installed).
[nsq]: https://github.com/nsqio/nsq
[nsq_gopkgdoc]: http://godoc.org/github.com/nsqio/go-nsq
[apps]: https://github.com/nsqio/nsq/tree/master/apps
[travis]: http://travis-ci.org/nsqio/go-nsq

View File

@ -0,0 +1,180 @@
This outlines the backwards incompatible changes that were made to the public API after the
`v0.3.7` stable release, and and how to migrate existing legacy codebases.
#### Background
The original `go-nsq` codebase is some of our earliest Go code, and one of our first attempts at a
public Go library.
We've learned a lot over the last 2 years and we wanted `go-nsq` to reflect the experiences we've
had working with the library as well as the general Go conventions and best practices we picked up
along the way.
The diff can be seen via: https://github.com/nsqio/go-nsq/compare/v0.3.7...HEAD
The bulk of the refactoring came via: https://github.com/nsqio/go-nsq/pull/30
#### Naming
Previously, the high-level types we exposed were named `nsq.Reader` and `nsq.Writer`. These
reflected internal naming conventions we had used at bitly for some time but conflated semantics
with what a typical Go developer would expect (they obviously did not implement `io.Reader` and
`io.Writer`).
We renamed these types to `nsq.Consumer` and `nsq.Producer`, which more effectively communicate
their purpose and is consistent with the NSQ documentation.
#### Configuration
In the previous API there were inconsistent and confusing ways to configure your clients.
Now, configuration is performed *before* creating an `nsq.Consumer` or `nsq.Producer` by creating
an `nsq.Config` struct. The only valid way to do this is via `nsq.NewConfig` (i.e. using a struct
literal will panic due to invalid internal state).
The `nsq.Config` struct has exported variables that can be set directly in a type-safe manner. You
can also call `cfg.Validate()` to check that the values are correct and within range.
`nsq.Config` also exposes a convenient helper method `Set(k string, v interface{})` that can set
options by *coercing* the supplied `interface{}` value.
This is incredibly convenient if you're reading options from a config file or in a serialized
format that does not exactly match the native types.
It is both flexible and forgiving.
#### Improving the nsq.Handler interface
`go-nsq` attempts to make writing the common use case consumer incredibly easy.
You specify a type that implements the `nsq.Handler` interface, the interface method is called per
message, and the return value of said method indicates to the library what the response to `nsqd`
should be (`FIN` or `REQ`), all the while managing flow control and backoff.
However, more advanced use cases require the ability to respond to a message *later*
("asynchronously", if you will). Our original API provided a *second* message handler interface
called `nsq.AsyncHandler`.
Unfortunately, it was never obvious from the name alone (or even the documentation) how to properly
use this form. The API was needlessly complex, involving the garbage creation of wrapping structs
to track state and respond to messages.
We originally had the same problem in `pynsq`, our Python client library, and we were able to
resolve the tension and expose an API that was robust and supported all use cases.
The new `go-nsq` message handler interface exposes only `nsq.Handler`, and its `HandleMessage`
method remains identical (specifically, `nsq.AsyncHandler` has been removed).
Additionally, the API to configure handlers has been improved to provide better first-class support
for common operations. We've added `AddConcurrentHandlers` (for quickly spawning multiple handler
goroutines).
For the most common use case, where you want `go-nsq` to respond to messages on your behalf, there
are no changes required! In fact, we've made it even easier to implement the `nsq.Handler`
interface for simple functions by providing the `nsq.HandlerFunc` type (in the spirit of the Go
standard library's `http.HandlerFunc`):
```go
r, err := nsq.NewConsumer("test_topic", "test_channel", nsq.NewConfig())
if err != nil {
log.Fatalf(err.Error())
}
r.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
return doSomeWork(m)
})
err := r.ConnectToNSQD(nsqdAddr)
if err != nil {
log.Fatalf(err.Error())
}
<-r.StopChan
```
In the new API, we've made the `nsq.Message` struct more robust, giving it the ability to proxy
responses. If you want to usurp control of the message from `go-nsq`, you simply call
`msg.DisableAutoResponse()`.
This is effectively the same as if you had used `nsq.AsyncHandler`, only you don't need to manage
`nsq.FinishedMessage` structs or implement a separate interface. Instead you just keep/pass
references to the `nsq.Message` itself, and when you're ready to respond you call `msg.Finish()`,
`msg.Requeue(<duration>)` or `msg.Touch(<duration>)`. Additionally, this means you can make this
decision on a *per-message* basis rather than for the lifetime of the handler.
Here is an example:
```go
type myHandler struct {}
func (h *myHandler) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
workerChan <- m
return nil
}
go func() {
for m := range workerChan {
err := doSomeWork(m)
if err != nil {
m.Requeue(-1)
continue
}
m.Finish()
}
}()
cfg := nsq.NewConfig()
cfg.MaxInFlight = 1000
r, err := nsq.NewConsumer("test_topic", "test_channel", cfg)
if err != nil {
log.Fatalf(err.Error())
}
r.AddConcurrentHandlers(&myHandler{}, 20)
err := r.ConnectToNSQD(nsqdAddr)
if err != nil {
log.Fatalf(err.Error())
}
<-r.StopChan
```
#### Requeue without backoff
As a side effect of the message handler restructuring above, it is now trivial to respond to a
message without triggering a backoff state in `nsq.Consumer` (which was not possible in the
previous API).
The `nsq.Message` type now has a `msg.RequeueWithoutBackoff()` method for this purpose.
#### Producer Error Handling
Previously, `Writer` (now `Producer`) returned a triplicate of `frameType`, `responseBody`, and
`error` from calls to `*Publish`.
This required the caller to check both `error` and `frameType` to confirm success. `Producer`
publish methods now return only `error`.
#### Logging
One of the challenges library implementors face is how to provide feedback via logging, while
exposing an interface that follows the standard library and still provides a means to control and
configure the output.
In the new API, we've provided a method on `Consumer` and `Producer` called `SetLogger` that takes
an interface compatible with the Go standard library `log.Logger` (which can be instantiated via
`log.NewLogger`) and a traditional log level integer `nsq.LogLevel{Debug,Info,Warning,Error}`:
Output(maxdepth int, s string) error
This gives the user the flexibility to control the format, destination, and verbosity while still
conforming to standard library logging conventions.
#### Misc.
Un-exported `NewDeadlineTransport` and `ApiRequest`, which never should have been exported in the
first place.
`nsq.Message` serialization switched away from `binary.{Read,Write}` for performance and
`nsq.Message` now implements the `io.WriterTo` interface.

View File

@ -0,0 +1,90 @@
package nsq
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"time"
)
type deadlinedConn struct {
Timeout time.Duration
net.Conn
}
func (c *deadlinedConn) Read(b []byte) (n int, err error) {
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
return c.Conn.Read(b)
}
func (c *deadlinedConn) Write(b []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
return c.Conn.Write(b)
}
func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
return nil, err
}
return &deadlinedConn{timeout, c}, nil
},
}
return transport
}
type wrappedResp struct {
Status string `json:"status_txt"`
StatusCode int `json:"status_code"`
Data interface{} `json:"data"`
}
// stores the result in the value pointed to by ret(must be a pointer)
func apiRequestNegotiateV1(method string, endpoint string, body io.Reader, ret interface{}) error {
httpclient := &http.Client{Transport: newDeadlineTransport(2 * time.Second)}
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return err
}
req.Header.Add("Accept", "application/vnd.nsq; version=1.0")
resp, err := httpclient.Do(req)
if err != nil {
return err
}
respBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("got response %s %q", resp.Status, respBody)
}
if len(respBody) == 0 {
respBody = []byte("{}")
}
if resp.Header.Get("X-NSQ-Content-Type") == "nsq; version=1.0" {
return json.Unmarshal(respBody, ret)
}
wResp := &wrappedResp{
Data: ret,
}
if err = json.Unmarshal(respBody, wResp); err != nil {
return err
}
// wResp.StatusCode here is equal to resp.StatusCode, so ignore it
return nil
}

View File

@ -0,0 +1,221 @@
package nsq
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"strconv"
"time"
)
var byteSpace = []byte(" ")
var byteNewLine = []byte("\n")
// Command represents a command from a client to an NSQ daemon
type Command struct {
Name []byte
Params [][]byte
Body []byte
}
// String returns the name and parameters of the Command
func (c *Command) String() string {
if len(c.Params) > 0 {
return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, byteSpace)))
}
return string(c.Name)
}
// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {
var total int64
var buf [4]byte
n, err := w.Write(c.Name)
total += int64(n)
if err != nil {
return total, err
}
for _, param := range c.Params {
n, err := w.Write(byteSpace)
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(param)
total += int64(n)
if err != nil {
return total, err
}
}
n, err = w.Write(byteNewLine)
total += int64(n)
if err != nil {
return total, err
}
if c.Body != nil {
bufs := buf[:]
binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))
n, err := w.Write(bufs)
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(c.Body)
total += int64(n)
if err != nil {
return total, err
}
}
return total, nil
}
// Identify creates a new Command to provide information about the client. After connecting,
// it is generally the first message sent.
//
// The supplied map is marshaled into JSON to provide some flexibility
// for this command to evolve over time.
//
// See http://nsq.io/clients/tcp_protocol_spec.html#identify for information
// on the supported options
func Identify(js map[string]interface{}) (*Command, error) {
body, err := json.Marshal(js)
if err != nil {
return nil, err
}
return &Command{[]byte("IDENTIFY"), nil, body}, nil
}
// Auth sends credentials for authentication
//
// After `Identify`, this is usually the first message sent, if auth is used.
func Auth(secret string) (*Command, error) {
return &Command{[]byte("AUTH"), nil, []byte(secret)}, nil
}
// Register creates a new Command to add a topic/channel for the connected nsqd
func Register(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{[]byte("REGISTER"), params, nil}
}
// UnRegister creates a new Command to remove a topic/channel for the connected nsqd
func UnRegister(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{[]byte("UNREGISTER"), params, nil}
}
// Ping creates a new Command to keep-alive the state of all the
// announced topic/channels for a given client
func Ping() *Command {
return &Command{[]byte("PING"), nil, nil}
}
// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
var params = [][]byte{[]byte(topic)}
return &Command{[]byte("PUB"), params, body}
}
// DeferredPublish creates a new Command to write a message to a given topic
// where the message will queue at the channel level until the timeout expires
func DeferredPublish(topic string, delay time.Duration, body []byte) *Command {
var params = [][]byte{[]byte(topic), []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("DPUB"), params, body}
}
// MultiPublish creates a new Command to write more than one message to a given topic
// (useful for high-throughput situations to avoid roundtrips and saturate the pipe)
func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
var params = [][]byte{[]byte(topic)}
num := uint32(len(bodies))
bodySize := 4
for _, b := range bodies {
bodySize += len(b) + 4
}
body := make([]byte, 0, bodySize)
buf := bytes.NewBuffer(body)
err := binary.Write(buf, binary.BigEndian, &num)
if err != nil {
return nil, err
}
for _, b := range bodies {
err = binary.Write(buf, binary.BigEndian, int32(len(b)))
if err != nil {
return nil, err
}
_, err = buf.Write(b)
if err != nil {
return nil, err
}
}
return &Command{[]byte("MPUB"), params, buf.Bytes()}, nil
}
// Subscribe creates a new Command to subscribe to the given topic/channel
func Subscribe(topic string, channel string) *Command {
var params = [][]byte{[]byte(topic), []byte(channel)}
return &Command{[]byte("SUB"), params, nil}
}
// Ready creates a new Command to specify
// the number of messages a client is willing to receive
func Ready(count int) *Command {
var params = [][]byte{[]byte(strconv.Itoa(count))}
return &Command{[]byte("RDY"), params, nil}
}
// Finish creates a new Command to indiciate that
// a given message (by id) has been processed successfully
func Finish(id MessageID) *Command {
var params = [][]byte{id[:]}
return &Command{[]byte("FIN"), params, nil}
}
// Requeue creates a new Command to indicate that
// a given message (by id) should be requeued after the given delay
// NOTE: a delay of 0 indicates immediate requeue
func Requeue(id MessageID, delay time.Duration) *Command {
var params = [][]byte{id[:], []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("REQ"), params, nil}
}
// Touch creates a new Command to reset the timeout for
// a given message (by id)
func Touch(id MessageID) *Command {
var params = [][]byte{id[:]}
return &Command{[]byte("TOUCH"), params, nil}
}
// StartClose creates a new Command to indicate that the
// client would like to start a close cycle. nsqd will no longer
// send messages to a client in this state and the client is expected
// finish pending messages and close the connection
func StartClose() *Command {
return &Command{[]byte("CLS"), nil, nil}
}
// Nop creates a new Command that has no effect server side.
// Commonly used to respond to heartbeats
func Nop() *Command {
return &Command{[]byte("NOP"), nil, nil}
}

670
Godeps/_workspace/src/github.com/nsqio/go-nsq/config.go generated vendored Normal file
View File

@ -0,0 +1,670 @@
package nsq
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)
// Define handlers for setting config defaults, and setting config values from command line arguments or config files
type configHandler interface {
HandlesOption(c *Config, option string) bool
Set(c *Config, option string, value interface{}) error
Validate(c *Config) error
}
type defaultsHandler interface {
SetDefaults(c *Config) error
}
// BackoffStrategy defines a strategy for calculating the duration of time
// a consumer should backoff for a given attempt
type BackoffStrategy interface {
Calculate(attempt int) time.Duration
}
// ExponentialStrategy implements an exponential backoff strategy (default)
type ExponentialStrategy struct {
cfg *Config
}
// Calculate returns a duration of time: 2 ^ attempt
func (s *ExponentialStrategy) Calculate(attempt int) time.Duration {
backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
return backoffDuration
}
func (s *ExponentialStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}
// FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
type FullJitterStrategy struct {
cfg *Config
rngOnce sync.Once
rng *rand.Rand
}
// Calculate returns a random duration of time [0, 2 ^ attempt]
func (s *FullJitterStrategy) Calculate(attempt int) time.Duration {
// lazily initialize the RNG
s.rngOnce.Do(func() {
if s.rng != nil {
return
}
s.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
})
backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
return time.Duration(s.rng.Intn(int(backoffDuration)))
}
func (s *FullJitterStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}
// Config is a struct of NSQ options
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no
// longer mutable (they are copied).
//
// Use Set(option string, value interface{}) as an alternate way to set parameters
type Config struct {
initialized bool
// used to Initialize, Validate
configHandlers []configHandler
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// LocalAddr is the local address to use when dialing an nsqd.
// If empty, a local address is automatically chosen.
LocalAddr net.Addr `opt:"local_addr"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
// To set TLS config, use the following options:
//
// tls_v1 - Bool enable TLS negotiation
// tls_root_ca_file - String path to file containing root CA
// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
// tls_cert - String path to file containing public key for certificate
// tls_key - String path to file containing private key for certificate
// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
//
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`
// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}
// NewConfig returns a new default nsq configuration.
//
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func NewConfig() *Config {
c := &Config{
configHandlers: []configHandler{&structTagsConfig{}, &tlsConfig{}},
initialized: true,
}
if err := c.setDefaults(); err != nil {
panic(err.Error())
}
return c
}
// Set takes an option as a string and a value as an interface and
// attempts to set the appropriate configuration option.
//
// It attempts to coerce the value into the right format depending on the named
// option and the underlying type of the value passed in.
//
// Calls to Set() that take a time.Duration as an argument can be input as:
//
// "1000ms" (a string parsed by time.ParseDuration())
// 1000 (an integer interpreted as milliseconds)
// 1000*time.Millisecond (a literal time.Duration value)
//
// Calls to Set() that take bool can be input as:
//
// "true" (a string parsed by strconv.ParseBool())
// true (a boolean)
// 1 (an int where 1 == true and 0 == false)
//
// It returns an error for an invalid option or value.
func (c *Config) Set(option string, value interface{}) error {
c.assertInitialized()
option = strings.Replace(option, "-", "_", -1)
for _, h := range c.configHandlers {
if h.HandlesOption(c, option) {
return h.Set(c, option, value)
}
}
return fmt.Errorf("invalid option %s", option)
}
func (c *Config) assertInitialized() {
if !c.initialized {
panic("Config{} must be created with NewConfig()")
}
}
// Validate checks that all values are within specified min/max ranges
func (c *Config) Validate() error {
c.assertInitialized()
for _, h := range c.configHandlers {
if err := h.Validate(c); err != nil {
return err
}
}
return nil
}
func (c *Config) setDefaults() error {
for _, h := range c.configHandlers {
hh, ok := h.(defaultsHandler)
if ok {
if err := hh.SetDefaults(c); err != nil {
return err
}
}
}
return nil
}
type structTagsConfig struct{}
// Handle options that are listed in StructTags
func (h *structTagsConfig) HandlesOption(c *Config, option string) bool {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
if opt == option {
return true
}
}
return false
}
// Set values based on parameters in StructTags
func (h *structTagsConfig) Set(c *Config, option string, value interface{}) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
if option != opt {
continue
}
min := field.Tag.Get("min")
max := field.Tag.Get("max")
fieldVal := val.FieldByName(field.Name)
dest := unsafeValueOf(fieldVal)
coercedVal, err := coerce(value, field.Type)
if err != nil {
return fmt.Errorf("failed to coerce option %s (%v) - %s",
option, value, err)
}
if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(coercedVal, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
option, coercedVal.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(coercedVal, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
option, coercedVal.Interface(), coercedMaxVal.Interface())
}
}
if coercedVal.Type().String() == "nsq.BackoffStrategy" {
v := coercedVal.Interface().(BackoffStrategy)
if v, ok := v.(interface {
setConfig(*Config)
}); ok {
v.setConfig(c)
}
}
dest.Set(coercedVal)
return nil
}
return fmt.Errorf("unknown option %s", option)
}
func (h *structTagsConfig) SetDefaults(c *Config) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
defaultVal := field.Tag.Get("default")
if defaultVal == "" || opt == "" {
continue
}
if err := c.Set(opt, defaultVal); err != nil {
return err
}
}
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
c.ClientID = strings.Split(hostname, ".")[0]
c.Hostname = hostname
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)
return nil
}
func (h *structTagsConfig) Validate(c *Config) error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
min := field.Tag.Get("min")
max := field.Tag.Get("max")
if min == "" && max == "" {
continue
}
value := val.FieldByName(field.Name)
if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(value, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
field.Name, value.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(value, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
field.Name, value.Interface(), coercedMaxVal.Interface())
}
}
}
if c.HeartbeatInterval > c.ReadTimeout {
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
}
return nil
}
// Parsing for higher order TLS settings
type tlsConfig struct {
certFile string
keyFile string
}
func (t *tlsConfig) HandlesOption(c *Config, option string) bool {
switch option {
case "tls_root_ca_file", "tls_insecure_skip_verify", "tls_cert", "tls_key", "tls_min_version":
return true
}
return false
}
func (t *tlsConfig) Set(c *Config, option string, value interface{}) error {
if c.TlsConfig == nil {
c.TlsConfig = &tls.Config{
MinVersion: tls.VersionTLS10,
MaxVersion: tls.VersionTLS12, // enable TLS_FALLBACK_SCSV prior to Go 1.5: https://go-review.googlesource.com/#/c/1776/
}
}
val := reflect.ValueOf(c.TlsConfig).Elem()
switch option {
case "tls_cert", "tls_key":
if option == "tls_cert" {
t.certFile = value.(string)
} else {
t.keyFile = value.(string)
}
if t.certFile != "" && t.keyFile != "" && len(c.TlsConfig.Certificates) == 0 {
cert, err := tls.LoadX509KeyPair(t.certFile, t.keyFile)
if err != nil {
return err
}
c.TlsConfig.Certificates = []tls.Certificate{cert}
}
return nil
case "tls_root_ca_file":
filename, ok := value.(string)
if !ok {
return fmt.Errorf("ERROR: %v is not a string", value)
}
tlsCertPool := x509.NewCertPool()
caCertFile, err := ioutil.ReadFile(filename)
if err != nil {
return fmt.Errorf("ERROR: failed to read custom Certificate Authority file %s", err)
}
if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
return fmt.Errorf("ERROR: failed to append certificates from Certificate Authority file")
}
c.TlsConfig.RootCAs = tlsCertPool
return nil
case "tls_insecure_skip_verify":
fieldVal := val.FieldByName("InsecureSkipVerify")
dest := unsafeValueOf(fieldVal)
coercedVal, err := coerce(value, fieldVal.Type())
if err != nil {
return fmt.Errorf("failed to coerce option %s (%v) - %s",
option, value, err)
}
dest.Set(coercedVal)
return nil
case "tls_min_version":
version, ok := value.(string)
if !ok {
return fmt.Errorf("ERROR: %v is not a string", value)
}
switch version {
case "ssl3.0":
c.TlsConfig.MinVersion = tls.VersionSSL30
case "tls1.0":
c.TlsConfig.MinVersion = tls.VersionTLS10
case "tls1.1":
c.TlsConfig.MinVersion = tls.VersionTLS11
case "tls1.2":
c.TlsConfig.MinVersion = tls.VersionTLS12
default:
return fmt.Errorf("ERROR: %v is not a tls version", value)
}
return nil
}
return fmt.Errorf("unknown option %s", option)
}
func (t *tlsConfig) Validate(c *Config) error {
return nil
}
// because Config contains private structs we can't use reflect.Value
// directly, instead we need to "unsafely" address the variable
func unsafeValueOf(val reflect.Value) reflect.Value {
uptr := unsafe.Pointer(val.UnsafeAddr())
return reflect.NewAt(val.Type(), uptr).Elem()
}
func valueCompare(v1 reflect.Value, v2 reflect.Value) int {
switch v1.Type().String() {
case "int", "int16", "int32", "int64":
if v1.Int() > v2.Int() {
return 1
} else if v1.Int() < v2.Int() {
return -1
}
return 0
case "uint", "uint16", "uint32", "uint64":
if v1.Uint() > v2.Uint() {
return 1
} else if v1.Uint() < v2.Uint() {
return -1
}
return 0
case "float32", "float64":
if v1.Float() > v2.Float() {
return 1
} else if v1.Float() < v2.Float() {
return -1
}
return 0
case "time.Duration":
if v1.Interface().(time.Duration) > v2.Interface().(time.Duration) {
return 1
} else if v1.Interface().(time.Duration) < v2.Interface().(time.Duration) {
return -1
}
return 0
}
panic("impossible")
}
func coerce(v interface{}, typ reflect.Type) (reflect.Value, error) {
var err error
if typ.Kind() == reflect.Ptr {
return reflect.ValueOf(v), nil
}
switch typ.String() {
case "string":
v, err = coerceString(v)
case "int", "int16", "int32", "int64":
v, err = coerceInt64(v)
case "uint", "uint16", "uint32", "uint64":
v, err = coerceUint64(v)
case "float32", "float64":
v, err = coerceFloat64(v)
case "bool":
v, err = coerceBool(v)
case "time.Duration":
v, err = coerceDuration(v)
case "net.Addr":
v, err = coerceAddr(v)
case "nsq.BackoffStrategy":
v, err = coerceBackoffStrategy(v)
default:
v = nil
err = fmt.Errorf("invalid type %s", typ.String())
}
return valueTypeCoerce(v, typ), err
}
func valueTypeCoerce(v interface{}, typ reflect.Type) reflect.Value {
val := reflect.ValueOf(v)
if reflect.TypeOf(v) == typ {
return val
}
tval := reflect.New(typ).Elem()
switch typ.String() {
case "int", "int16", "int32", "int64":
tval.SetInt(val.Int())
case "uint", "uint16", "uint32", "uint64":
tval.SetUint(val.Uint())
case "float32", "float64":
tval.SetFloat(val.Float())
default:
tval.Set(val)
}
return tval
}
func coerceString(v interface{}) (string, error) {
switch v := v.(type) {
case string:
return v, nil
case int, int16, int32, int64, uint, uint16, uint32, uint64:
return fmt.Sprintf("%d", v), nil
case float32, float64:
return fmt.Sprintf("%f", v), nil
}
return fmt.Sprintf("%s", v), nil
}
func coerceDuration(v interface{}) (time.Duration, error) {
switch v := v.(type) {
case string:
return time.ParseDuration(v)
case int, int16, int32, int64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Int()) * time.Millisecond, nil
case uint, uint16, uint32, uint64:
// treat like ms
return time.Duration(reflect.ValueOf(v).Uint()) * time.Millisecond, nil
case time.Duration:
return v, nil
}
return 0, errors.New("invalid value type")
}
func coerceAddr(v interface{}) (net.Addr, error) {
switch v := v.(type) {
case string:
return net.ResolveTCPAddr("tcp", v)
case net.Addr:
return v, nil
}
return nil, errors.New("invalid value type")
}
func coerceBackoffStrategy(v interface{}) (BackoffStrategy, error) {
switch v := v.(type) {
case string:
switch v {
case "", "exponential":
return &ExponentialStrategy{}, nil
case "full_jitter":
return &FullJitterStrategy{}, nil
}
case BackoffStrategy:
return v, nil
}
return nil, errors.New("invalid value type")
}
func coerceBool(v interface{}) (bool, error) {
switch v := v.(type) {
case bool:
return v, nil
case string:
return strconv.ParseBool(v)
case int, int16, int32, int64:
return reflect.ValueOf(v).Int() != 0, nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint() != 0, nil
}
return false, errors.New("invalid value type")
}
func coerceFloat64(v interface{}) (float64, error) {
switch v := v.(type) {
case string:
return strconv.ParseFloat(v, 64)
case int, int16, int32, int64:
return float64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return float64(reflect.ValueOf(v).Uint()), nil
case float32:
return float64(v), nil
case float64:
return v, nil
}
return 0, errors.New("invalid value type")
}
func coerceInt64(v interface{}) (int64, error) {
switch v := v.(type) {
case string:
return strconv.ParseInt(v, 10, 64)
case int, int16, int32, int64:
return reflect.ValueOf(v).Int(), nil
case uint, uint16, uint32, uint64:
return int64(reflect.ValueOf(v).Uint()), nil
}
return 0, errors.New("invalid value type")
}
func coerceUint64(v interface{}) (uint64, error) {
switch v := v.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int, int16, int32, int64:
return uint64(reflect.ValueOf(v).Int()), nil
case uint, uint16, uint32, uint64:
return reflect.ValueOf(v).Uint(), nil
}
return 0, errors.New("invalid value type")
}

View File

@ -0,0 +1,31 @@
package nsq
import (
"strings"
)
// ConfigFlag wraps a Config and implements the flag.Value interface
type ConfigFlag struct {
*Config
}
// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
parts := strings.SplitN(opt, ",", 2)
key := parts[0]
switch len(parts) {
case 1:
// default options specified without a value to boolean true
err = c.Config.Set(key, true)
case 2:
err = c.Config.Set(key, parts[1])
}
return
}
// String implements the flag.Value interface
func (c *ConfigFlag) String() string {
return ""
}

723
Godeps/_workspace/src/github.com/nsqio/go-nsq/conn.go generated vendored Normal file
View File

@ -0,0 +1,723 @@
package nsq
import (
"bufio"
"bytes"
"compress/flate"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/mreiferson/go-snappystream"
)
// IdentifyResponse represents the metadata
// returned from an IDENTIFY command to nsqd
type IdentifyResponse struct {
MaxRdyCount int64 `json:"max_rdy_count"`
TLSv1 bool `json:"tls_v1"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
AuthRequired bool `json:"auth_required"`
}
// AuthResponse represents the metadata
// returned from an AUTH command to nsqd
type AuthResponse struct {
Identity string `json:"identity"`
IdentityUrl string `json:"identity_url"`
PermissionCount int64 `json:"permission_count"`
}
type msgResponse struct {
msg *Message
cmd *Command
success bool
backoff bool
}
// Conn represents a connection to nsqd
//
// Conn exposes a set of callbacks for the
// various events that occur on a connection
type Conn struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messagesInFlight int64
maxRdyCount int64
rdyCount int64
lastRdyCount int64
lastMsgTimestamp int64
mtx sync.Mutex
config *Config
conn *net.TCPConn
tlsConn *tls.Conn
addr string
delegate ConnDelegate
logger logger
logLvl LogLevel
logFmt string
logGuard sync.RWMutex
r io.Reader
w io.Writer
cmdChan chan *Command
msgResponseChan chan *msgResponse
exitChan chan int
drainReady chan int
closeFlag int32
stopper sync.Once
wg sync.WaitGroup
readLoopRunning int32
}
// NewConn returns a new Conn instance
func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
if !config.initialized {
panic("Config must be created with NewConfig()")
}
return &Conn{
addr: addr,
config: config,
delegate: delegate,
maxRdyCount: 2500,
lastMsgTimestamp: time.Now().UnixNano(),
cmdChan: make(chan *Command),
msgResponseChan: make(chan *msgResponse),
exitChan: make(chan int),
drainReady: make(chan int),
}
}
// SetLogger assigns the logger to use as well as a level.
//
// The format parameter is expected to be a printf compatible string with
// a single %s argument. This is useful if you want to provide additional
// context to the log messages that the connection will print, the default
// is '(%s)'.
//
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()
c.logger = l
c.logLvl = lvl
c.logFmt = format
if c.logFmt == "" {
c.logFmt = "(%s)"
}
}
func (c *Conn) getLogger() (logger, LogLevel, string) {
c.logGuard.RLock()
defer c.logGuard.RUnlock()
return c.logger, c.logLvl, c.logFmt
}
// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
_, err = c.Write(MagicV2)
if err != nil {
c.Close()
return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
}
resp, err := c.identify()
if err != nil {
return nil, err
}
if resp != nil && resp.AuthRequired {
if c.config.AuthSecret == "" {
c.log(LogLevelError, "Auth Required")
return nil, errors.New("Auth Required")
}
err := c.auth(c.config.AuthSecret)
if err != nil {
c.log(LogLevelError, "Auth Failed %s", err)
return nil, err
}
}
c.wg.Add(2)
atomic.StoreInt32(&c.readLoopRunning, 1)
go c.readLoop()
go c.writeLoop()
return resp, nil
}
// Close idempotently initiates connection close
func (c *Conn) Close() error {
atomic.StoreInt32(&c.closeFlag, 1)
if c.conn != nil && atomic.LoadInt64(&c.messagesInFlight) == 0 {
return c.conn.CloseRead()
}
return nil
}
// IsClosing indicates whether or not the
// connection is currently in the processing of
// gracefully closing
func (c *Conn) IsClosing() bool {
return atomic.LoadInt32(&c.closeFlag) == 1
}
// RDY returns the current RDY count
func (c *Conn) RDY() int64 {
return atomic.LoadInt64(&c.rdyCount)
}
// LastRDY returns the previously set RDY count
func (c *Conn) LastRDY() int64 {
return atomic.LoadInt64(&c.lastRdyCount)
}
// SetRDY stores the specified RDY count
func (c *Conn) SetRDY(rdy int64) {
atomic.StoreInt64(&c.rdyCount, rdy)
atomic.StoreInt64(&c.lastRdyCount, rdy)
}
// MaxRDY returns the nsqd negotiated maximum
// RDY count that it will accept for this connection
func (c *Conn) MaxRDY() int64 {
return c.maxRdyCount
}
// LastMessageTime returns a time.Time representing
// the time at which the last message was received
func (c *Conn) LastMessageTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastMsgTimestamp))
}
// RemoteAddr returns the configured destination nsqd address
func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// String returns the fully-qualified address
func (c *Conn) String() string {
return c.addr
}
// Read performs a deadlined read on the underlying TCP connection
func (c *Conn) Read(p []byte) (int, error) {
c.conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
return c.r.Read(p)
}
// Write performs a deadlined write on the underlying TCP connection
func (c *Conn) Write(p []byte) (int, error) {
c.conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout))
return c.w.Write(p)
}
// WriteCommand is a goroutine safe method to write a Command
// to this connection, and flush.
func (c *Conn) WriteCommand(cmd *Command) error {
c.mtx.Lock()
_, err := cmd.WriteTo(c)
if err != nil {
goto exit
}
err = c.Flush()
exit:
c.mtx.Unlock()
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
return err
}
type flusher interface {
Flush() error
}
// Flush writes all buffered data to the underlying TCP connection
func (c *Conn) Flush() error {
if f, ok := c.w.(flusher); ok {
return f.Flush()
}
return nil
}
func (c *Conn) identify() (*IdentifyResponse, error) {
ci := make(map[string]interface{})
ci["client_id"] = c.config.ClientID
ci["hostname"] = c.config.Hostname
ci["user_agent"] = c.config.UserAgent
ci["short_id"] = c.config.ClientID // deprecated
ci["long_id"] = c.config.Hostname // deprecated
ci["tls_v1"] = c.config.TlsV1
ci["deflate"] = c.config.Deflate
ci["deflate_level"] = c.config.DeflateLevel
ci["snappy"] = c.config.Snappy
ci["feature_negotiation"] = true
if c.config.HeartbeatInterval == -1 {
ci["heartbeat_interval"] = -1
} else {
ci["heartbeat_interval"] = int64(c.config.HeartbeatInterval / time.Millisecond)
}
ci["sample_rate"] = c.config.SampleRate
ci["output_buffer_size"] = c.config.OutputBufferSize
if c.config.OutputBufferTimeout == -1 {
ci["output_buffer_timeout"] = -1
} else {
ci["output_buffer_timeout"] = int64(c.config.OutputBufferTimeout / time.Millisecond)
}
ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
cmd, err := Identify(ci)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
err = c.WriteCommand(cmd)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
if frameType == FrameTypeError {
return nil, ErrIdentify{string(data)}
}
// check to see if the server was able to respond w/ capabilities
// i.e. it was a JSON response
if data[0] != '{' {
return nil, nil
}
resp := &IdentifyResponse{}
err = json.Unmarshal(data, resp)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
c.log(LogLevelDebug, "IDENTIFY response: %+v", resp)
c.maxRdyCount = resp.MaxRdyCount
if resp.TLSv1 {
c.log(LogLevelInfo, "upgrading to TLS")
err := c.upgradeTLS(c.config.TlsConfig)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
if resp.Deflate {
c.log(LogLevelInfo, "upgrading to Deflate")
err := c.upgradeDeflate(c.config.DeflateLevel)
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
if resp.Snappy {
c.log(LogLevelInfo, "upgrading to Snappy")
err := c.upgradeSnappy()
if err != nil {
return nil, ErrIdentify{err.Error()}
}
}
// now that connection is bootstrapped, enable read buffering
// (and write buffering if it's not already capable of Flush())
c.r = bufio.NewReader(c.r)
if _, ok := c.w.(flusher); !ok {
c.w = bufio.NewWriter(c.w)
}
return resp, nil
}
func (c *Conn) upgradeTLS(tlsConf *tls.Config) error {
// create a local copy of the config to set ServerName for this connection
var conf tls.Config
if tlsConf != nil {
conf = *tlsConf
}
host, _, err := net.SplitHostPort(c.addr)
if err != nil {
return err
}
conf.ServerName = host
c.tlsConn = tls.Client(c.conn, &conf)
err = c.tlsConn.Handshake()
if err != nil {
return err
}
c.r = c.tlsConn
c.w = c.tlsConn
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from TLS upgrade")
}
return nil
}
func (c *Conn) upgradeDeflate(level int) error {
conn := net.Conn(c.conn)
if c.tlsConn != nil {
conn = c.tlsConn
}
fw, _ := flate.NewWriter(conn, level)
c.r = flate.NewReader(conn)
c.w = fw
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from Deflate upgrade")
}
return nil
}
func (c *Conn) upgradeSnappy() error {
conn := net.Conn(c.conn)
if c.tlsConn != nil {
conn = c.tlsConn
}
c.r = snappystream.NewReader(conn, snappystream.SkipVerifyChecksum)
c.w = snappystream.NewWriter(conn)
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) {
return errors.New("invalid response from Snappy upgrade")
}
return nil
}
func (c *Conn) auth(secret string) error {
cmd, err := Auth(secret)
if err != nil {
return err
}
err = c.WriteCommand(cmd)
if err != nil {
return err
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
return err
}
if frameType == FrameTypeError {
return errors.New("Error authenticating " + string(data))
}
resp := &AuthResponse{}
err = json.Unmarshal(data, resp)
if err != nil {
return err
}
c.log(LogLevelInfo, "Auth accepted. Identity: %q %s Permissions: %d",
resp.Identity, resp.IdentityUrl, resp.PermissionCount)
return nil
}
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
if atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
}
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
goto exit
}
if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
c.log(LogLevelDebug, "heartbeat received")
c.delegate.OnHeartbeat(c)
err := c.WriteCommand(Nop())
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
continue
}
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg)
case FrameTypeError:
c.log(LogLevelError, "protocol error - %s", data)
c.delegate.OnError(c, data)
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
exit:
atomic.StoreInt32(&c.readLoopRunning, 0)
// start the connection close
messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
if messagesInFlight == 0 {
// if we exited readLoop with no messages in flight
// we need to explicitly trigger the close because
// writeLoop won't
c.close()
} else {
c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
}
c.wg.Done()
c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
for {
select {
case <-c.exitChan:
c.log(LogLevelInfo, "breaking out of writeLoop")
// Indicate drainReady because we will not pull any more off msgResponseChan
close(c.drainReady)
goto exit
case cmd := <-c.cmdChan:
err := c.WriteCommand(cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", cmd, err)
c.close()
continue
}
case resp := <-c.msgResponseChan:
// Decrement this here so it is correct even if we can't respond to nsqd
msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
c.delegate.OnResume(c)
} else {
c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
c.delegate.OnMessageRequeued(c, resp.msg)
if resp.backoff {
c.delegate.OnBackoff(c)
} else {
c.delegate.OnContinue(c)
}
}
err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}
if msgsInFlight == 0 &&
atomic.LoadInt32(&c.closeFlag) == 1 {
c.close()
continue
}
}
}
exit:
c.wg.Done()
c.log(LogLevelInfo, "writeLoop exiting")
}
func (c *Conn) close() {
// a "clean" connection close is orchestrated as follows:
//
// 1. CLOSE cmd sent to nsqd
// 2. CLOSE_WAIT response received from nsqd
// 3. set c.closeFlag
// 4. readLoop() exits
// a. if messages-in-flight > 0 delay close()
// i. writeLoop() continues receiving on c.msgResponseChan chan
// x. when messages-in-flight == 0 call close()
// b. else call close() immediately
// 5. c.exitChan close
// a. writeLoop() exits
// i. c.drainReady close
// 6a. launch cleanup() goroutine (we're racing with intraprocess
// routed messages, see comments below)
// a. wait on c.drainReady
// b. loop and receive on c.msgResponseChan chan
// until messages-in-flight == 0
// i. ensure that readLoop has exited
// 6b. launch waitForCleanup() goroutine
// b. wait on waitgroup (covers readLoop() and writeLoop()
// and cleanup goroutine)
// c. underlying TCP connection close
// d. trigger Delegate OnClose()
//
c.stopper.Do(func() {
c.log(LogLevelInfo, "beginning close")
close(c.exitChan)
c.conn.CloseRead()
c.wg.Add(1)
go c.cleanup()
go c.waitForCleanup()
})
}
func (c *Conn) cleanup() {
<-c.drainReady
ticker := time.NewTicker(100 * time.Millisecond)
lastWarning := time.Now()
// writeLoop has exited, drain any remaining in flight messages
for {
// we're racing with readLoop which potentially has a message
// for handling so infinitely loop until messagesInFlight == 0
// and readLoop has exited
var msgsInFlight int64
select {
case <-c.msgResponseChan:
msgsInFlight = atomic.AddInt64(&c.messagesInFlight, -1)
case <-ticker.C:
msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
}
if msgsInFlight > 0 {
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
lastWarning = time.Now()
}
continue
}
// until the readLoop has exited we cannot be sure that there
// still won't be a race
if atomic.LoadInt32(&c.readLoopRunning) == 1 {
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... readLoop still running")
lastWarning = time.Now()
}
continue
}
goto exit
}
exit:
ticker.Stop()
c.wg.Done()
c.log(LogLevelInfo, "finished draining, cleanup exiting")
}
func (c *Conn) waitForCleanup() {
// this blocks until readLoop and writeLoop
// (and cleanup goroutine above) have exited
c.wg.Wait()
c.conn.CloseWrite()
c.log(LogLevelInfo, "clean close complete")
c.delegate.OnClose(c)
}
func (c *Conn) onMessageFinish(m *Message) {
c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
}
func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
if delay == -1 {
// linear delay
delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts)
// bound the requeueDelay to configured max
if delay > c.config.MaxRequeueDelay {
delay = c.config.MaxRequeueDelay
}
}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff}
}
func (c *Conn) onMessageTouch(m *Message) {
select {
case c.cmdChan <- Touch(m.ID):
case <-c.exitChan:
}
}
func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl, logFmt := c.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("%-4s %s %s", lvl,
fmt.Sprintf(logFmt, c.String()),
fmt.Sprintf(line, args...)))
}

1164
Godeps/_workspace/src/github.com/nsqio/go-nsq/consumer.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,138 @@
package nsq
import "time"
type logger interface {
Output(calldepth int, s string) error
}
// LogLevel specifies the severity of a given log message
type LogLevel int
// Log levels
const (
LogLevelDebug LogLevel = iota
LogLevelInfo
LogLevelWarning
LogLevelError
)
// String returns the string form for a given LogLevel
func (lvl LogLevel) String() string {
switch lvl {
case LogLevelInfo:
return "INF"
case LogLevelWarning:
return "WRN"
case LogLevelError:
return "ERR"
}
return "DBG"
}
// MessageDelegate is an interface of methods that are used as
// callbacks in Message
type MessageDelegate interface {
// OnFinish is called when the Finish() method
// is triggered on the Message
OnFinish(*Message)
// OnRequeue is called when the Requeue() method
// is triggered on the Message
OnRequeue(m *Message, delay time.Duration, backoff bool)
// OnTouch is called when the Touch() method
// is triggered on the Message
OnTouch(*Message)
}
type connMessageDelegate struct {
c *Conn
}
func (d *connMessageDelegate) OnFinish(m *Message) { d.c.onMessageFinish(m) }
func (d *connMessageDelegate) OnRequeue(m *Message, t time.Duration, b bool) {
d.c.onMessageRequeue(m, t, b)
}
func (d *connMessageDelegate) OnTouch(m *Message) { d.c.onMessageTouch(m) }
// ConnDelegate is an interface of methods that are used as
// callbacks in Conn
type ConnDelegate interface {
// OnResponse is called when the connection
// receives a FrameTypeResponse from nsqd
OnResponse(*Conn, []byte)
// OnError is called when the connection
// receives a FrameTypeError from nsqd
OnError(*Conn, []byte)
// OnMessage is called when the connection
// receives a FrameTypeMessage from nsqd
OnMessage(*Conn, *Message)
// OnMessageFinished is called when the connection
// handles a FIN command from a message handler
OnMessageFinished(*Conn, *Message)
// OnMessageRequeued is called when the connection
// handles a REQ command from a message handler
OnMessageRequeued(*Conn, *Message)
// OnBackoff is called when the connection triggers a backoff state
OnBackoff(*Conn)
// OnContinue is called when the connection finishes a message without adjusting backoff state
OnContinue(*Conn)
// OnResume is called when the connection triggers a resume state
OnResume(*Conn)
// OnIOError is called when the connection experiences
// a low-level TCP transport error
OnIOError(*Conn, error)
// OnHeartbeat is called when the connection
// receives a heartbeat from nsqd
OnHeartbeat(*Conn)
// OnClose is called when the connection
// closes, after all cleanup
OnClose(*Conn)
}
// keeps the exported Consumer struct clean of the exported methods
// required to implement the ConnDelegate interface
type consumerConnDelegate struct {
r *Consumer
}
func (d *consumerConnDelegate) OnResponse(c *Conn, data []byte) { d.r.onConnResponse(c, data) }
func (d *consumerConnDelegate) OnError(c *Conn, data []byte) { d.r.onConnError(c, data) }
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onConnMessage(c, m) }
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
func (d *consumerConnDelegate) OnClose(c *Conn) { d.r.onConnClose(c) }
// keeps the exported Producer struct clean of the exported methods
// required to implement the ConnDelegate interface
type producerConnDelegate struct {
w *Producer
}
func (d *producerConnDelegate) OnResponse(c *Conn, data []byte) { d.w.onConnResponse(c, data) }
func (d *producerConnDelegate) OnError(c *Conn, data []byte) { d.w.onConnError(c, data) }
func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnBackoff(c *Conn) {}
func (d *producerConnDelegate) OnContinue(c *Conn) {}
func (d *producerConnDelegate) OnResume(c *Conn) {}
func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) }
func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) }
func (d *producerConnDelegate) OnClose(c *Conn) { d.w.onConnClose(c) }

View File

@ -0,0 +1,44 @@
package nsq
import (
"errors"
"fmt"
)
// ErrNotConnected is returned when a publish command is made
// against a Producer that is not connected
var ErrNotConnected = errors.New("not connected")
// ErrStopped is returned when a publish command is
// made against a Producer that has been stopped
var ErrStopped = errors.New("stopped")
// ErrClosing is returned when a connection is closing
var ErrClosing = errors.New("closing")
// ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrAlreadyConnected = errors.New("already connected")
// ErrOverMaxInFlight is returned from Consumer if over max-in-flight
var ErrOverMaxInFlight = errors.New("over configure max-inflight")
// ErrIdentify is returned from Conn as part of the IDENTIFY handshake
type ErrIdentify struct {
Reason string
}
// Error returns a stringified error
func (e ErrIdentify) Error() string {
return fmt.Sprintf("failed to IDENTIFY - %s", e.Reason)
}
// ErrProtocol is returned from Producer when encountering
// an NSQ protocol level error
type ErrProtocol struct {
Reason string
}
// Error returns a stringified error
func (e ErrProtocol) Error() string {
return e.Reason
}

View File

@ -0,0 +1,162 @@
package nsq
import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"sync/atomic"
"time"
)
// The number of bytes for a Message.ID
const MsgIDLength = 16
// MessageID is the ASCII encoded hexadecimal message ID
type MessageID [MsgIDLength]byte
// Message is the fundamental data type containing
// the id, body, and metadata
type Message struct {
ID MessageID
Body []byte
Timestamp int64
Attempts uint16
NSQDAddress string
Delegate MessageDelegate
autoResponseDisabled int32
responded int32
}
// NewMessage creates a Message, initializes some metadata,
// and returns a pointer
func NewMessage(id MessageID, body []byte) *Message {
return &Message{
ID: id,
Body: body,
Timestamp: time.Now().UnixNano(),
}
}
// DisableAutoResponse disables the automatic response that
// would normally be sent when a handler.HandleMessage
// returns (FIN/REQ based on the error value returned).
//
// This is useful if you want to batch, buffer, or asynchronously
// respond to messages.
func (m *Message) DisableAutoResponse() {
atomic.StoreInt32(&m.autoResponseDisabled, 1)
}
// IsAutoResponseDisabled indicates whether or not this message
// will be responded to automatically
func (m *Message) IsAutoResponseDisabled() bool {
return atomic.LoadInt32(&m.autoResponseDisabled) == 1
}
// HasResponded indicates whether or not this message has been responded to
func (m *Message) HasResponded() bool {
return atomic.LoadInt32(&m.responded) == 1
}
// Finish sends a FIN command to the nsqd which
// sent this message
func (m *Message) Finish() {
if !atomic.CompareAndSwapInt32(&m.responded, 0, 1) {
return
}
m.Delegate.OnFinish(m)
}
// Touch sends a TOUCH command to the nsqd which
// sent this message
func (m *Message) Touch() {
if m.HasResponded() {
return
}
m.Delegate.OnTouch(m)
}
// Requeue sends a REQ command to the nsqd which
// sent this message, using the supplied delay.
//
// A delay of -1 will automatically calculate
// based on the number of attempts and the
// configured default_requeue_delay
func (m *Message) Requeue(delay time.Duration) {
m.doRequeue(delay, true)
}
// RequeueWithoutBackoff sends a REQ command to the nsqd which
// sent this message, using the supplied delay.
//
// Notably, using this method to respond does not trigger a backoff
// event on the configured Delegate.
func (m *Message) RequeueWithoutBackoff(delay time.Duration) {
m.doRequeue(delay, false)
}
func (m *Message) doRequeue(delay time.Duration, backoff bool) {
if !atomic.CompareAndSwapInt32(&m.responded, 0, 1) {
return
}
m.Delegate.OnRequeue(m, delay, backoff)
}
// WriteTo implements the WriterTo interface and serializes
// the message into the supplied producer.
//
// It is suggested that the target Writer is buffered to
// avoid performing many system calls.
func (m *Message) WriteTo(w io.Writer) (int64, error) {
var buf [10]byte
var total int64
binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
n, err := w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.ID[:])
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.Body)
total += int64(n)
if err != nil {
return total, err
}
return total, nil
}
// DecodeMessage deseralizes data (as []byte) and creates a new Message
func DecodeMessage(b []byte) (*Message, error) {
var msg Message
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
buf := bytes.NewBuffer(b[10:])
_, err := io.ReadFull(buf, msg.ID[:])
if err != nil {
return nil, err
}
msg.Body, err = ioutil.ReadAll(buf)
if err != nil {
return nil, err
}
return &msg, nil
}

View File

@ -0,0 +1,393 @@
package nsq
import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
)
type producerConn interface {
String() string
SetLogger(logger, LogLevel, string)
Connect() (*IdentifyResponse, error)
Close() error
WriteCommand(*Command) error
}
// Producer is a high-level type to publish to NSQ.
//
// A Producer instance is 1:1 with a destination `nsqd`
// and will lazily connect to that instance (and re-connect)
// when Publish commands are executed.
type Producer struct {
id int64
addr string
conn producerConn
config Config
logger logger
logLvl LogLevel
logGuard sync.RWMutex
responseChan chan []byte
errorChan chan []byte
closeChan chan int
transactionChan chan *ProducerTransaction
transactions []*ProducerTransaction
state int32
concurrentProducers int32
stopFlag int32
exitChan chan int
wg sync.WaitGroup
guard sync.Mutex
}
// ProducerTransaction is returned by the async publish methods
// to retrieve metadata about the command after the
// response is received.
type ProducerTransaction struct {
cmd *Command
doneChan chan *ProducerTransaction
Error error // the error (or nil) of the publish command
Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
}
func (t *ProducerTransaction) finish() {
if t.doneChan != nil {
t.doneChan <- t
}
}
// NewProducer returns an instance of Producer for the specified address
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {
config.assertInitialized()
err := config.Validate()
if err != nil {
return nil, err
}
p := &Producer{
id: atomic.AddInt64(&instCount, 1),
addr: addr,
config: *config,
logger: log.New(os.Stderr, "", log.Flags()),
logLvl: LogLevelInfo,
transactionChan: make(chan *ProducerTransaction),
exitChan: make(chan int),
responseChan: make(chan []byte),
errorChan: make(chan []byte),
}
return p, nil
}
// Ping causes the Producer to connect to it's configured nsqd (if not already
// connected) and send a `Nop` command, returning any error that might occur.
//
// This method can be used to verify that a newly-created Producer instance is
// configured correctly, rather than relying on the lazy "connect on Publish"
// behavior of a Producer.
func (w *Producer) Ping() error {
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
return w.conn.WriteCommand(Nop())
}
// SetLogger assigns the logger to use as well as a level
//
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
func (w *Producer) SetLogger(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()
w.logger = l
w.logLvl = lvl
}
func (w *Producer) getLogger() (logger, LogLevel) {
w.logGuard.RLock()
defer w.logGuard.RUnlock()
return w.logger, w.logLvl
}
// String returns the address of the Producer
func (w *Producer) String() string {
return w.addr
}
// Stop initiates a graceful stop of the Producer (permanent)
//
// NOTE: this blocks until completion
func (w *Producer) Stop() {
w.guard.Lock()
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
w.guard.Unlock()
return
}
w.log(LogLevelInfo, "stopping")
close(w.exitChan)
w.close()
w.guard.Unlock()
w.wg.Wait()
}
// PublishAsync publishes a message body to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) PublishAsync(topic string, body []byte, doneChan chan *ProducerTransaction,
args ...interface{}) error {
return w.sendCommandAsync(Publish(topic, body), doneChan, args)
}
// MultiPublishAsync publishes a slice of message bodies to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *ProducerTransaction,
args ...interface{}) error {
cmd, err := MultiPublish(topic, body)
if err != nil {
return err
}
return w.sendCommandAsync(cmd, doneChan, args)
}
// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
return w.sendCommand(Publish(topic, body))
}
// MultiPublish synchronously publishes a slice of message bodies to the specified topic, returning
// an error if publish failed
func (w *Producer) MultiPublish(topic string, body [][]byte) error {
cmd, err := MultiPublish(topic, body)
if err != nil {
return err
}
return w.sendCommand(cmd)
}
// DeferredPublish synchronously publishes a message body to the specified topic
// where the message will queue at the channel level until the timeout expires, returning
// an error if publish failed
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
return w.sendCommand(DeferredPublish(topic, delay, body))
}
// DeferredPublishAsync publishes a message body to the specified topic
// where the message will queue at the channel level until the timeout expires
// but does not wait for the response from `nsqd`.
//
// When the Producer eventually receives the response from `nsqd`,
// the supplied `doneChan` (if specified)
// will receive a `ProducerTransaction` instance with the supplied variadic arguments
// and the response error if present
func (w *Producer) DeferredPublishAsync(topic string, delay time.Duration, body []byte,
doneChan chan *ProducerTransaction, args ...interface{}) error {
return w.sendCommandAsync(DeferredPublish(topic, delay, body), doneChan, args)
}
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
func (w *Producer) connect() error {
w.guard.Lock()
defer w.guard.Unlock()
if atomic.LoadInt32(&w.stopFlag) == 1 {
return ErrStopped
}
switch state := atomic.LoadInt32(&w.state); state {
case StateInit:
case StateConnected:
return nil
default:
return ErrNotConnected
}
w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)
logger, logLvl := w.getLogger()
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
_, err := w.conn.Connect()
if err != nil {
w.conn.Close()
w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
return err
}
atomic.StoreInt32(&w.state, StateConnected)
w.closeChan = make(chan int)
w.wg.Add(1)
go w.router()
return nil
}
func (w *Producer) close() {
if !atomic.CompareAndSwapInt32(&w.state, StateConnected, StateDisconnected) {
return
}
w.conn.Close()
go func() {
// we need to handle this in a goroutine so we don't
// block the caller from making progress
w.wg.Wait()
atomic.StoreInt32(&w.state, StateInit)
}()
}
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
}
func (w *Producer) popTransaction(frameType int32, data []byte) {
t := w.transactions[0]
w.transactions = w.transactions[1:]
if frameType == FrameTypeError {
t.Error = ErrProtocol{string(data)}
}
t.finish()
}
func (w *Producer) transactionCleanup() {
// clean up transactions we can easily account for
for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
}
w.transactions = w.transactions[:0]
// spin and free up any writes that might have raced
// with the cleanup process (blocked on writing
// to transactionChan)
for {
select {
case t := <-w.transactionChan:
t.Error = ErrNotConnected
t.finish()
default:
// keep spinning until there are 0 concurrent producers
if atomic.LoadInt32(&w.concurrentProducers) == 0 {
return
}
// give the runtime a chance to schedule other racing goroutines
time.Sleep(5 * time.Millisecond)
}
}
}
func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := w.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("%-4s %3d %s", lvl, w.id, fmt.Sprintf(line, args...)))
}
func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan <- data }
func (w *Producer) onConnError(c *Conn, data []byte) { w.errorChan <- data }
func (w *Producer) onConnHeartbeat(c *Conn) {}
func (w *Producer) onConnIOError(c *Conn, err error) { w.close() }
func (w *Producer) onConnClose(c *Conn) {
w.guard.Lock()
defer w.guard.Unlock()
close(w.closeChan)
}

View File

@ -0,0 +1,96 @@
package nsq
import (
"encoding/binary"
"errors"
"io"
"regexp"
)
// MagicV1 is the initial identifier sent when connecting for V1 clients
var MagicV1 = []byte(" V1")
// MagicV2 is the initial identifier sent when connecting for V2 clients
var MagicV2 = []byte(" V2")
// frame types
const (
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
)
var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
// IsValidTopicName checks a topic name for correctness
func IsValidTopicName(name string) bool {
return isValidName(name)
}
// IsValidChannelName checks a channel name for correctness
func IsValidChannelName(name string) bool {
return isValidName(name)
}
func isValidName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validTopicChannelNameRegex.MatchString(name)
}
// ReadResponse is a client-side utility function to read from the supplied Reader
// according to the NSQ protocol spec:
//
// [x][x][x][x][x][x][x][x]...
// | (int32) || (binary)
// | 4-byte || N-byte
// ------------------------...
// size data
func ReadResponse(r io.Reader) ([]byte, error) {
var msgSize int32
// message size
err := binary.Read(r, binary.BigEndian, &msgSize)
if err != nil {
return nil, err
}
// message binary data
buf := make([]byte, msgSize)
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}
return buf, nil
}
// UnpackResponse is a client-side utility function that unpacks serialized data
// according to NSQ protocol spec:
//
// [x][x][x][x][x][x][x][x]...
// | (int32) || (binary)
// | 4-byte || N-byte
// ------------------------...
// frame ID data
//
// Returns a triplicate of: frame type, data ([]byte), error
func UnpackResponse(response []byte) (int32, []byte, error) {
if len(response) < 4 {
return -1, nil, errors.New("length of response is too small")
}
return int32(binary.BigEndian.Uint32(response)), response[4:], nil
}
// ReadUnpackedResponse reads and parses data from the underlying
// TCP connection according to the NSQ TCP protocol spec and
// returns the frameType, data or error
func ReadUnpackedResponse(r io.Reader) (int32, []byte, error) {
resp, err := ReadResponse(r)
if err != nil {
return -1, nil, err
}
return UnpackResponse(resp)
}

View File

@ -0,0 +1,12 @@
package nsq
// states
const (
StateInit = iota
StateDisconnected
StateConnected
StateSubscribed
// StateClosing means CLOSE has started...
// (responses are ok, but no new messages will be sent)
StateClosing
)

39
Godeps/_workspace/src/github.com/nsqio/go-nsq/test.sh generated vendored Normal file
View File

@ -0,0 +1,39 @@
#!/bin/bash
set -e
# a helper script to run tests
if ! which nsqd >/dev/null; then
echo "missing nsqd binary" && exit 1
fi
if ! which nsqlookupd >/dev/null; then
echo "missing nsqlookupd binary" && exit 1
fi
# run nsqlookupd
LOOKUP_LOGFILE=$(mktemp -t nsqlookupd.XXXXXXX)
echo "starting nsqlookupd"
echo " logging to $LOOKUP_LOGFILE"
nsqlookupd >$LOOKUP_LOGFILE 2>&1 &
LOOKUPD_PID=$!
# run nsqd configured to use our lookupd above
rm -f *.dat
NSQD_LOGFILE=$(mktemp -t nsqlookupd.XXXXXXX)
echo "starting nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/server.pem --tls-key=./test/server.key --tls-root-ca-file=./test/ca.pem"
echo " logging to $NSQD_LOGFILE"
nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=./test/server.pem --tls-key=./test/server.key --tls-root-ca-file=./test/ca.pem >$NSQD_LOGFILE 2>&1 &
NSQD_PID=$!
sleep 0.3
cleanup() {
echo "killing nsqd PID $NSQD_PID"
kill -s TERM $NSQD_PID || cat $NSQD_LOGFILE
echo "killing nsqlookupd PID $LOOKUPD_PID"
kill -s TERM $LOOKUPD_PID || cat $LOOKUP_LOGFILE
}
trap cleanup INT TERM EXIT
go test -v -timeout 60s

View File

@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIID9zCCAt+gAwIBAgIJAPYpAVNDj2lgMA0GCSqGSIb3DQEBBQUAMIGRMQswCQYD
VQQGEwJERTEMMAoGA1UECAwDTlJXMQ4wDAYDVQQHDAVFYXJ0aDEXMBUGA1UECgwO
UmFuZG9tIENvbXBhbnkxCzAJBgNVBAsMAklUMRcwFQYDVQQDDA53d3cucmFuZG9t
LmNvbTElMCMGCSqGSIb3DQEJARYWS3J5cHRvS2luZ3NAcmFuZG9tLmNvbTAeFw0x
NDA0MDIyMTE0NTJaFw0xNTA0MDIyMTE0NTJaMIGRMQswCQYDVQQGEwJERTEMMAoG
A1UECAwDTlJXMQ4wDAYDVQQHDAVFYXJ0aDEXMBUGA1UECgwOUmFuZG9tIENvbXBh
bnkxCzAJBgNVBAsMAklUMRcwFQYDVQQDDA53d3cucmFuZG9tLmNvbTElMCMGCSqG
SIb3DQEJARYWS3J5cHRvS2luZ3NAcmFuZG9tLmNvbTCCASIwDQYJKoZIhvcNAQEB
BQADggEPADCCAQoCggEBAL/sJU6ODQCsdWAmq3Qyp6vCqVFkSIHwR3oH8vPuwwob
IOrx/pXz2LIRekQ4egT8LCH3QDxhEvFhDNXYM4h/mkQ+GpgzynoIqYrw+yF93pik
T9Tpel2IuntThlZrO/4APRbVw4Ihf3zp47AY71J+8usJjmfWsId4dhqa1lTYecXK
Zwxii8RTH/7LsuwIDOW1QJLGGKNzvVYA42Gh8Cw3uHlmqZ2tA/sp5qg1Z3QU5g7y
EzzRybotHaRb5XMUWHAlGbIl/TW4KlFqFZ0kCXJXL1uO3uq2nIS3bG7ryjbobRVn
dZ6sV34eenIeZWu6zlDxQP/EqxAezz5Ndyt9uYWb/JECAwEAAaNQME4wHQYDVR0O
BBYEFI9l/QHE30clqx+1oCR6IhUYEdqLMB8GA1UdIwQYMBaAFI9l/QHE30clqx+1
oCR6IhUYEdqLMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEBAES6GKxL
oeCLZa83TjJXLagcc9mmdQZgfF3/o61+ye7D9BLqBwN5lx4+kIE1LAUI/Id0mCdW
9uXmPhpCJ926krahNc4Ol+wQTmZ3j7Mn3DCkFufjr64cGPU/UzH4yjMg9wEf84qz
5oH+dBifwJM8yoRCxbnMqGBu3xY8WCjPlw8E8lizXFk8wUbLZ/EC5Rjm+KmdT5ud
KTEgM+K6RMNo9vLn5ZasrYyhVcHdEKIzo6qLm1ZVIgpi/1WX0m8hACMfEcqee6ot
76LEyM3kwfqRkWGZWHEF9D4emp3quU+0AmjM57LHrYjidpDJkVTUHDoMBFHl9Uiq
0O9+azN48F/bVgU=
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDFHWaPfRA5nU/F
E8AVoFj2TAgMRISLduWlbAgDnMtFLSGVwgjxU13Txsv0LgwJgo4A5xpd2WNV0dIQ
brerxvPVJruKO8KxKFS2U58BCFIG0xGrlQSg5wDGyqxEQY80XlrBtxs81v79GYHy
fBhizg7onlmbNZzxPy7idU0a7TpgzakeDrfJHQ7rI3llvR0U0TdOLno82CtPvosY
6TYZAIFYgH05yN7DWKuDUI8Fa2RFVkbHPUlJVKROw/0n1yWy7XcwTmQQyaodFYgg
KMCdyR0ElPxLv8dKYFjLvef2DTmuYwbalt5hiQqOpY1wm616Xf4ywz2uEU+ooLW4
/Q6DcRUBAgMBAAECggEBAKDUgVc4YuGvkmOp3sOGhq/Hj5V63m7wvyV/BPb1mwFB
drK7lBJbxCXEgaslWxrzVCw2ZFQIyL1AKtbPj7tv5ImZRdHfOtbTonL1vbnY8Ryy
YSuPtiwW75JD0dULbO38wq5mWaoFfh5DDr0sNbWAjbeNZG14jCpnNDkAHr6Gq2hJ
VzwEwy+W7LXn8s8lYJHi3MsxCJqAFN1u2FOkjBtrcVW06VgV87IX59SOFns80x4E
Nn0ZKH7RU2DuJ7Fm4HtaNH+yaDYxUeo2A/2/UoavyYYCgC5gThsNjlp9/R4gtm0h
VO+8cN5a3s7zL+aITIusY7H/rwWc6XpRmxQn+jwqF8ECgYEA5PpAz4fhXA9abFZx
0XqCgcwTFY5gTX/JDd1qv0b/PueAR7dY5d37bRbTXpzrHiNFVrq3x432V3+KY0b5
55PEB1YxwBUga5DvTSa5fLfUibvLpdZjganzdTOsG53wMvNwUT8iUzUQDLkyRfIi
mV0r4Sa34RrBZdWJ2Aou9by2SlkCgYEA3GCHTP7nAcuHXRTsXH3eK/HsfwxdwjhA
G5SG7L7KSoMpzCbe90DuYEr6J/O1nnP0QiSQ2uEeTOARzMfio4E16exWlDDtvPBQ
HqSuQKg4M7fMTN1tj95xmk1yGZMyPxgEfCScBeCbYQzOyZ0j93iFjqMnb2mlriq5
MoSPat3BeukCgYEAjSGaFNABnUZxS1k0qhLCodHw6VZqERp0B7Gze9X8uP7jWFCv
4G6j66cn/KbnXBoNQNmxMLRVY7TezTYQDiZLquH7pBLheqtIc9ssdKyxuXsgmES9
7EueHV0N9a+xPxZA4jLxqyuHivATBn2pybFdvFaq+3oMPgISBjCwpRH9oXECgYAN
+n16j8ydW4iZieM4Nq+p/+1tXZ5w3FqMpU4tpCh2s30qOuj3rAGyz+6wLBBAHcDH
lUQu7gqa+7eFUsR4dJCz5s7TFYtu6ZtbZjy7UzBFb4og8gaqEoUIMZNkNecBA4f9
S+EtqkKQ1Fwlg7ctUlK+anDs6zmcI4+dubTTJX/JSQKBgQCsu/gCgoOi2GFgebIh
URvEMrhaiHxcw5u30nMNjWUGpDQK3lVTK51+7wj4xmVfiomvUW6M/HaR2+5xF1U1
QV08cKeWCGfGUFetTxjdhsVhMIk84ygF2l9K6jiHqvtd5rIoQ9Lf8XXbYaQVicRg
qmB2iOzmbQQM/GOSofAeUfE7/A==
-----END PRIVATE KEY-----

View File

@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIJAMsErP97ZQmgMA0GCSqGSIb3DQEBBQUAMIGNMQswCQYD
VQQGEwJERTEMMAoGA1UECAwDTlJXMQ4wDAYDVQQHDAVFYXJ0aDEXMBUGA1UECgwO
UmFuZG9tIENvbXBhbnkxCzAJBgNVBAsMAklUMRcwFQYDVQQDDA53d3cucmFuZG9t
LmNvbTEhMB8GCSqGSIb3DQEJARYSZm9vYmFyQGV4YW1wbGUuY29tMB4XDTE0MDQw
MjIxMTQ1MloXDTI0MDMzMDIxMTQ1MlowgY0xCzAJBgNVBAYTAkRFMQwwCgYDVQQI
DANOUlcxDjAMBgNVBAcMBUVhcnRoMRcwFQYDVQQKDA5SYW5kb20gQ29tcGFueTEL
MAkGA1UECwwCSVQxFzAVBgNVBAMMDnd3dy5yYW5kb20uY29tMSEwHwYJKoZIhvcN
AQkBFhJmb29iYXJAZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDFHWaPfRA5nU/FE8AVoFj2TAgMRISLduWlbAgDnMtFLSGVwgjxU13T
xsv0LgwJgo4A5xpd2WNV0dIQbrerxvPVJruKO8KxKFS2U58BCFIG0xGrlQSg5wDG
yqxEQY80XlrBtxs81v79GYHyfBhizg7onlmbNZzxPy7idU0a7TpgzakeDrfJHQ7r
I3llvR0U0TdOLno82CtPvosY6TYZAIFYgH05yN7DWKuDUI8Fa2RFVkbHPUlJVKRO
w/0n1yWy7XcwTmQQyaodFYggKMCdyR0ElPxLv8dKYFjLvef2DTmuYwbalt5hiQqO
pY1wm616Xf4ywz2uEU+ooLW4/Q6DcRUBAgMBAAGjUDBOMB0GA1UdDgQWBBTxyT32
Exu5TuortZY8zkVotLDNDTAfBgNVHSMEGDAWgBTxyT32Exu5TuortZY8zkVotLDN
DTAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4IBAQAu+0B+caaV4HzIHyfX
Zc6BUPcRoTEZIWX/7JLeeOVUztOjl9ExjYTzjo0QEt+PVcOzfQL/hxE2SPG6fRF7
YRZU1h9t5Ti9rTg9myAbGGMo6MdWZULFcxIWjxhv6qnFPk/fF47PvGwjygFNnzv8
FYmrAI99kK0CYolvXZ5ue250dpE/TCIAyk09a3WeBbHU/hMR/mBUNsitphelDbNK
oohrY9D7QR5Mf/NZgx3a0eDH6zoMYDRPARY3M02EuHHiRKmlyfnPv4ns4/0wCarj
pKpds+G80+k2fyiMgQ5bPTw8sfNgq1z0IvIuWB36XSNenTgnnjArbWii+x95jjNw
XcQg
-----END CERTIFICATE-----

View File

@ -0,0 +1,8 @@
// Package nsq is the official Go package for NSQ (http://nsq.io/)
//
// It provides high-level Consumer and Producer types as well as low-level
// functions to communicate over the NSQ protocol
package nsq
// VERSION
const VERSION = "1.0.5"

View File

@ -7,5 +7,6 @@ import (
_ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/librato"
_ "github.com/influxdb/telegraf/outputs/mqtt"
_ "github.com/influxdb/telegraf/outputs/nsq"
_ "github.com/influxdb/telegraf/outputs/opentsdb"
)

4
outputs/nsq/README.md Normal file
View File

@ -0,0 +1,4 @@
# NSQ Output Plugin
This plugin writes to a specified NSQD instance, usually local to the producer. It requires
a `server` name and a `topic` name.

92
outputs/nsq/nsq.go Normal file
View File

@ -0,0 +1,92 @@
package nsq
import (
"fmt"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/nsqio/go-nsq"
"time"
)
type NSQ struct {
Server string
Topic string
producer *nsq.Producer
}
var sampleConfig = `
# Location of nsqd instance listening on TCP
server = "localhost:4150"
# NSQ topic for producer messages
topic = "telegraf"
`
func (n *NSQ) Connect() error {
config := nsq.NewConfig()
producer, err := nsq.NewProducer(n.Server, config)
if err != nil {
return err
}
n.producer = producer
return nil
}
func (n *NSQ) Close() error {
n.producer.Stop()
return nil
}
func (n *NSQ) SampleConfig() string {
return sampleConfig
}
func (n *NSQ) Description() string {
return "Send telegraf measurements to NSQD"
}
func (n *NSQ) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
var zeroTime time.Time
for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to NSQ
var value string
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if p.Time == zeroTime {
if bp.Time == zeroTime {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
value = p.MarshalString()
}
err := n.producer.Publish(n.Topic, []byte(value))
if err != nil {
return fmt.Errorf("FAILED to send NSQD message: %s", err)
}
}
return nil
}
func init() {
outputs.Add("nsq", func() outputs.Output {
return &NSQ{}
})
}

28
outputs/nsq/nsq_test.go Normal file
View File

@ -0,0 +1,28 @@
package nsq
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
server := []string{testutil.GetLocalHost() + ":4150"}
n := &NSQ{
Server: server,
Topic: "telegraf",
}
// Verify that we can connect to the NSQ daemon
err := n.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the NSQ daemon
err = n.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}

View File

@ -47,3 +47,9 @@ aerospike:
image: aerospike/aerospike-server
ports:
- "3000:3000"
nsq:
image: nsqio/nsq
ports:
- "4150:4150"
- "4151:4151"