1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "context" 18 "errors" 19 "fmt" 20 "reflect" 21 22 "github.com/google/uuid" 23 "google.golang.org/grpc" 24 25 vkit "cloud.google.com/go/pubsublite/apiv1" 26 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 27) 28 29// partitionSet is a set of partition numbers. 30type partitionSet map[int]struct{} 31 32func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet { 33 var void struct{} 34 partitions := make(map[int]struct{}) 35 for _, p := range assignmentpb.GetPartitions() { 36 partitions[int(p)] = void 37 } 38 return partitionSet(partitions) 39} 40 41func (ps partitionSet) Ints() (partitions []int) { 42 for p := range ps { 43 partitions = append(partitions, p) 44 } 45 return 46} 47 48func (ps partitionSet) Contains(partition int) bool { 49 _, exists := ps[partition] 50 return exists 51} 52 53// A function that generates a 16-byte UUID. 54type generateUUIDFunc func() (uuid.UUID, error) 55 56// partitionAssignmentReceiver must enact the received partition assignment from 57// the server, or otherwise return an error, which will break the stream. The 58// receiver must not call the assigner, as this would result in a deadlock. 59type partitionAssignmentReceiver func(partitionSet) error 60 61// assigner wraps the partition assignment stream and notifies a receiver when 62// the server sends a new set of partition assignments for a subscriber. 63type assigner struct { 64 // Immutable after creation. 65 assignmentClient *vkit.PartitionAssignmentClient 66 subscription string 67 initialReq *pb.PartitionAssignmentRequest 68 receiveAssignment partitionAssignmentReceiver 69 metadata pubsubMetadata 70 71 // Fields below must be guarded with mu. 72 stream *retryableStream 73 74 abstractService 75} 76 77func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) { 78 clientID, err := genUUID() 79 if err != nil { 80 return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err) 81 } 82 83 a := &assigner{ 84 assignmentClient: assignmentClient, 85 subscription: subscriptionPath, 86 initialReq: &pb.PartitionAssignmentRequest{ 87 Request: &pb.PartitionAssignmentRequest_Initial{ 88 Initial: &pb.InitialPartitionAssignmentRequest{ 89 Subscription: subscriptionPath, 90 ClientId: clientID[:], 91 }, 92 }, 93 }, 94 receiveAssignment: receiver, 95 metadata: newPubsubMetadata(), 96 } 97 a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) 98 a.metadata.AddClientInfo(settings.Framework) 99 return a, nil 100} 101 102func (a *assigner) Start() { 103 a.mu.Lock() 104 defer a.mu.Unlock() 105 if a.unsafeUpdateStatus(serviceStarting, nil) { 106 a.stream.Start() 107 } 108} 109 110func (a *assigner) Stop() { 111 a.mu.Lock() 112 defer a.mu.Unlock() 113 a.unsafeInitiateShutdown(serviceTerminating, nil) 114} 115 116func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { 117 return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx)) 118} 119 120func (a *assigner) initialRequest() (interface{}, initialResponseRequired) { 121 return a.initialReq, initialResponseRequired(false) 122} 123 124func (a *assigner) validateInitialResponse(_ interface{}) error { 125 // Should not be called as initialResponseRequired=false above. 126 return errors.New("pubsublite: unexpected initial response") 127} 128 129func (a *assigner) onStreamStatusChange(status streamStatus) { 130 a.mu.Lock() 131 defer a.mu.Unlock() 132 133 switch status { 134 case streamConnected: 135 a.unsafeUpdateStatus(serviceActive, nil) 136 case streamTerminated: 137 a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error()) 138 } 139} 140 141func (a *assigner) onResponse(response interface{}) { 142 a.mu.Lock() 143 defer a.mu.Unlock() 144 145 if a.status >= serviceTerminating { 146 return 147 } 148 149 assignment, _ := response.(*pb.PartitionAssignment) 150 if err := a.handleAssignment(assignment); err != nil { 151 a.unsafeInitiateShutdown(serviceTerminated, err) 152 } 153} 154 155func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error { 156 if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil { 157 return err 158 } 159 160 a.stream.Send(&pb.PartitionAssignmentRequest{ 161 Request: &pb.PartitionAssignmentRequest_Ack{ 162 Ack: &pb.PartitionAssignmentAck{}, 163 }, 164 }) 165 return nil 166} 167 168func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { 169 if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) { 170 return 171 } 172 // No data to send. Immediately terminate the stream. 173 a.stream.Stop() 174} 175