diff --git a/nsqd/http.go b/nsqd/http.go index c67424187..e2a918b29 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -248,6 +248,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred + msg.deadline = time.Now().UnixNano() + int64(deferred) err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} diff --git a/nsqd/message.go b/nsqd/message.go index 460cc3722..eda2fcb53 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -1,6 +1,7 @@ package nsqd import ( + "bytes" "encoding/binary" "fmt" "io" @@ -12,6 +13,8 @@ const ( minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts ) +var deferMsgMagicFlag = []byte("#DEFER_MSG#") + type MessageID [MsgIDLength]byte type Message struct { @@ -26,6 +29,7 @@ type Message struct { pri int64 index int deferred time.Duration + deadline int64 } func NewMessage(id MessageID, body []byte) *Message { @@ -61,16 +65,33 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { return total, err } + if m.deadline > time.Now().UnixNano() { + n, err = w.Write(deferMsgMagicFlag) + total += int64(n) + if err != nil { + return total, err + } + + var deferBuf [8]byte + binary.BigEndian.PutUint64(deferBuf[:8], uint64(m.deadline)) + + n, err := w.Write(deferBuf[:]) + total += int64(n) + if err != nil { + return total, err + } + } + return total, nil } // decodeMessage deserializes data (as []byte) and creates a new Message // -// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... -// | (int64) || || (hex string encoded in ASCII) || (binary) -// | 8-byte || || 16-byte || N-byte -// ------------------------------------------------------------------------------------------... -// nanosecond timestamp ^^ message ID message body +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... [x][x][x][x][x][x][x][x] +// | (int64) || || (hex string encoded in ASCII) || (binary) || (int64) +// | 8-byte || || 16-byte || N-byte || 8-byte +// ------------------------------------------------------------------------------------------... ------------------------ +// nanosecond timestamp ^^ message ID message body nanosecond deadline // (uint16) // 2-byte // attempts @@ -84,7 +105,16 @@ func decodeMessage(b []byte) (*Message, error) { msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) copy(msg.ID[:], b[10:10+MsgIDLength]) - msg.Body = b[10+MsgIDLength:] + + if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { + msg.deadline = int64(binary.BigEndian.Uint64(b[len(b)-8:])) + if deferred := msg.deadline - time.Now().UnixNano(); deferred > 0 { + msg.deferred = time.Duration(deferred) + } + msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)] + } else { + msg.Body = b[10+MsgIDLength:] + } return &msg, nil } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8ec422430..384bfdf46 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -310,6 +310,11 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + if msg.deferred != 0 { + subChannel.StartDeferredTimeout(msg, msg.deferred) + continue + } + msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) @@ -919,6 +924,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration + msg.deadline = time.Now().UnixNano() + int64(timeoutDuration) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) diff --git a/nsqd/topic.go b/nsqd/topic.go index 4834c0102..ec26bc2c0 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -315,6 +315,7 @@ func (t *Topic) messagePump() { goto exit } + deferred := time.Duration(msg.deadline - time.Now().UnixNano()) for i, channel := range chans { chanMsg := msg // copy the message because each channel @@ -324,9 +325,10 @@ func (t *Topic) messagePump() { if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp - chanMsg.deferred = msg.deferred + chanMsg.deadline = msg.deadline } - if chanMsg.deferred != 0 { + if deferred > 0 { + chanMsg.deferred = deferred channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue }