// Copyright 2016 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "sync" "time" "golang.org/x/net/context" ) // ackBuffer stores the pending ack IDs and notifies the Dirty channel when it becomes non-empty. type ackBuffer struct { Dirty chan struct{} // Close done when ackBuffer is no longer needed. Done chan struct{} mu sync.Mutex pending []string send bool } // Add adds ackID to the buffer. func (buf *ackBuffer) Add(ackID string) { buf.mu.Lock() defer buf.mu.Unlock() buf.pending = append(buf.pending, ackID) // If we are transitioning into a non-empty notification state. if buf.send && len(buf.pending) == 1 { buf.notify() } } // RemoveAll removes all ackIDs from the buffer and returns them. func (buf *ackBuffer) RemoveAll() []string { buf.mu.Lock() defer buf.mu.Unlock() ret := buf.pending buf.pending = nil return ret } // SendNotifications enables sending dirty notification on empty -> non-empty transitions. // If the buffer is already non-empty, a notification will be sent immediately. func (buf *ackBuffer) SendNotifications() { buf.mu.Lock() defer buf.mu.Unlock() buf.send = true // If we are transitioning into a non-empty notification state. if len(buf.pending) > 0 { buf.notify() } } func (buf *ackBuffer) notify() { go func() { select { case buf.Dirty <- struct{}{}: case <-buf.Done: } }() } // acker acks messages in batches. type acker struct { s service Ctx context.Context // The context to use when acknowledging messages. Sub string // The full name of the subscription. AckTick <-chan time.Time // AckTick supplies the frequency with which to make ack requests. // Notify is called with an ack ID after the message with that ack ID // has been processed. An ackID is considered to have been processed // if at least one attempt has been made to acknowledge it. Notify func(string) ackBuffer wg sync.WaitGroup done chan struct{} } // Start intiates processing of ackIDs which are added via Add. // Notify is called with each ackID once it has been processed. func (a *acker) Start() { a.done = make(chan struct{}) a.ackBuffer.Dirty = make(chan struct{}) a.ackBuffer.Done = a.done a.wg.Add(1) go func() { defer a.wg.Done() for { select { case <-a.ackBuffer.Dirty: a.ack(a.ackBuffer.RemoveAll()) case <-a.AckTick: a.ack(a.ackBuffer.RemoveAll()) case <-a.done: return } } }() } // Ack adds an ack id to be acked in the next batch. func (a *acker) Ack(ackID string) { a.ackBuffer.Add(ackID) } // FastMode switches acker into a mode which acks messages as they arrive, rather than waiting // for a.AckTick. func (a *acker) FastMode() { a.ackBuffer.SendNotifications() } // Stop drops all pending messages, and releases resources before returning. func (a *acker) Stop() { close(a.done) a.wg.Wait() } const maxAckAttempts = 2 // ack acknowledges the supplied ackIDs. // After the acknowledgement request has completed (regardless of its success // or failure), ids will be passed to a.Notify. func (a *acker) ack(ids []string) { head, tail := a.s.splitAckIDs(ids) for len(head) > 0 { for i := 0; i < maxAckAttempts; i++ { if a.s.acknowledge(a.Ctx, a.Sub, head) == nil { break } } // NOTE: if retry gives up and returns an error, we simply drop // those ack IDs. The messages will be redelivered and this is // a documented behaviour of the API. head, tail = a.s.splitAckIDs(tail) } for _, id := range ids { a.Notify(id) } }