// Copyright 2014 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub // import "cloud.google.com/go/pubsub" import ( "fmt" "os" "runtime" "time" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "golang.org/x/net/context" ) const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" ) const prodAddr = "https://pubsub.googleapis.com/" // Client is a Google Pub/Sub client scoped to a single project. // // Clients should be reused rather than being created as needed. // A Client may be shared by multiple goroutines. type Client struct { projectID string s service } // NewClient creates a new PubSub client. func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) { var o []option.ClientOption // Environment variables for gcloud emulator: // https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/ if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" { conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { return nil, fmt.Errorf("grpc.Dial: %v", err) } o = []option.ClientOption{option.WithGRPCConn(conn)} } else { o = []option.ClientOption{ // Create multiple connections to increase throughput. option.WithGRPCConnectionPool(runtime.GOMAXPROCS(0)), // TODO(grpc/grpc-go#1388) using connection pool without WithBlock // can cause RPCs to fail randomly. We can delete this after the issue is fixed. option.WithGRPCDialOption(grpc.WithBlock()), option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 5 * time.Minute, })), } } o = append(o, opts...) s, err := newPubSubService(ctx, o) if err != nil { return nil, fmt.Errorf("constructing pubsub client: %v", err) } c := &Client{ projectID: projectID, s: s, } return c, nil } // Close closes any resources held by the client. // // Close need not be called at program exit. func (c *Client) Close() error { return c.s.close() } func (c *Client) fullyQualifiedProjectName() string { return fmt.Sprintf("projects/%s", c.projectID) } // pageToken stores the next page token for a server response which is split over multiple pages. type pageToken struct { tok string explicit bool } func (pt *pageToken) set(tok string) { pt.tok = tok pt.explicit = true } func (pt *pageToken) get() string { return pt.tok } // more returns whether further pages should be fetched from the server. func (pt *pageToken) more() bool { return pt.tok != "" || !pt.explicit } // stringsIterator provides an iterator API for a sequence of API page fetches that return lists of strings. type stringsIterator struct { ctx context.Context strings []string token pageToken fetch func(ctx context.Context, tok string) (*stringsPage, error) } // Next returns the next string. If there are no more strings, iterator.Done will be returned. func (si *stringsIterator) Next() (string, error) { for len(si.strings) == 0 && si.token.more() { page, err := si.fetch(si.ctx, si.token.get()) if err != nil { return "", err } si.token.set(page.tok) si.strings = page.strings } if len(si.strings) == 0 { return "", iterator.Done } s := si.strings[0] si.strings = si.strings[1:] return s, nil }