// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "golang.org/x/net/context" "golang.org/x/sync/semaphore" ) // flowController implements flow control for Subscription.Receive. type flowController struct { maxSize int // max total size of messages semCount, semSize *semaphore.Weighted // enforces max number and size of messages } // newFlowController creates a new flowController that ensures no more than // maxCount messages or maxSize bytes are outstanding at once. If maxCount or // maxSize is < 1, then an unlimited number of messages or bytes is permitted, // respectively. func newFlowController(maxCount, maxSize int) *flowController { fc := &flowController{ maxSize: maxSize, semCount: nil, semSize: nil, } if maxCount > 0 { fc.semCount = semaphore.NewWeighted(int64(maxCount)) } if maxSize > 0 { fc.semSize = semaphore.NewWeighted(int64(maxSize)) } return fc } // acquire blocks until one message of size bytes can proceed or ctx is done. // It returns nil in the first case, or ctx.Err() in the second. // // acquire allows large messages to proceed by treating a size greater than maxSize // as if it were equal to maxSize. func (f *flowController) acquire(ctx context.Context, size int) error { if f.semCount != nil { if err := f.semCount.Acquire(ctx, 1); err != nil { return err } } if f.semSize != nil { if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil { if f.semCount != nil { f.semCount.Release(1) } return err } } return nil } // tryAcquire returns false if acquire would block. Otherwise, it behaves like // acquire and returns true. // // tryAcquire allows large messages to proceed by treating a size greater than // maxSize as if it were equal to maxSize. func (f *flowController) tryAcquire(size int) bool { if f.semCount != nil { if !f.semCount.TryAcquire(1) { return false } } if f.semSize != nil { if !f.semSize.TryAcquire(f.bound(size)) { if f.semCount != nil { f.semCount.Release(1) } return false } } return true } // release notes that one message of size bytes is no longer outstanding. func (f *flowController) release(size int) { if f.semCount != nil { f.semCount.Release(1) } if f.semSize != nil { f.semSize.Release(f.bound(size)) } } func (f *flowController) bound(size int) int64 { if size > f.maxSize { return int64(f.maxSize) } return int64(size) }