1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"context"
18	"errors"
19	"fmt"
20	"reflect"
21
22	"github.com/google/uuid"
23	"google.golang.org/grpc"
24
25	vkit "cloud.google.com/go/pubsublite/apiv1"
26	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
27)
28
29// partitionSet is a set of partition numbers.
30type partitionSet map[int]struct{}
31
32func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet {
33	var void struct{}
34	partitions := make(map[int]struct{})
35	for _, p := range assignmentpb.GetPartitions() {
36		partitions[int(p)] = void
37	}
38	return partitionSet(partitions)
39}
40
41func (ps partitionSet) Ints() (partitions []int) {
42	for p := range ps {
43		partitions = append(partitions, p)
44	}
45	return
46}
47
48func (ps partitionSet) Contains(partition int) bool {
49	_, exists := ps[partition]
50	return exists
51}
52
53// A function that generates a 16-byte UUID.
54type generateUUIDFunc func() (uuid.UUID, error)
55
56// partitionAssignmentReceiver must enact the received partition assignment from
57// the server, or otherwise return an error, which will break the stream. The
58// receiver must not call the assigner, as this would result in a deadlock.
59type partitionAssignmentReceiver func(partitionSet) error
60
61// assigner wraps the partition assignment stream and notifies a receiver when
62// the server sends a new set of partition assignments for a subscriber.
63type assigner struct {
64	// Immutable after creation.
65	assignmentClient  *vkit.PartitionAssignmentClient
66	subscription      string
67	initialReq        *pb.PartitionAssignmentRequest
68	receiveAssignment partitionAssignmentReceiver
69	metadata          pubsubMetadata
70
71	// Fields below must be guarded with mu.
72	stream *retryableStream
73
74	abstractService
75}
76
77func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) {
78	clientID, err := genUUID()
79	if err != nil {
80		return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err)
81	}
82
83	a := &assigner{
84		assignmentClient: assignmentClient,
85		subscription:     subscriptionPath,
86		initialReq: &pb.PartitionAssignmentRequest{
87			Request: &pb.PartitionAssignmentRequest_Initial{
88				Initial: &pb.InitialPartitionAssignmentRequest{
89					Subscription: subscriptionPath,
90					ClientId:     clientID[:],
91				},
92			},
93		},
94		receiveAssignment: receiver,
95		metadata:          newPubsubMetadata(),
96	}
97	a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
98	a.metadata.AddClientInfo(settings.Framework)
99	return a, nil
100}
101
102func (a *assigner) Start() {
103	a.mu.Lock()
104	defer a.mu.Unlock()
105	if a.unsafeUpdateStatus(serviceStarting, nil) {
106		a.stream.Start()
107	}
108}
109
110func (a *assigner) Stop() {
111	a.mu.Lock()
112	defer a.mu.Unlock()
113	a.unsafeInitiateShutdown(serviceTerminating, nil)
114}
115
116func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
117	return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx))
118}
119
120func (a *assigner) initialRequest() (interface{}, initialResponseRequired) {
121	return a.initialReq, initialResponseRequired(false)
122}
123
124func (a *assigner) validateInitialResponse(_ interface{}) error {
125	// Should not be called as initialResponseRequired=false above.
126	return errors.New("pubsublite: unexpected initial response")
127}
128
129func (a *assigner) onStreamStatusChange(status streamStatus) {
130	a.mu.Lock()
131	defer a.mu.Unlock()
132
133	switch status {
134	case streamConnected:
135		a.unsafeUpdateStatus(serviceActive, nil)
136	case streamTerminated:
137		a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error())
138	}
139}
140
141func (a *assigner) onResponse(response interface{}) {
142	a.mu.Lock()
143	defer a.mu.Unlock()
144
145	if a.status >= serviceTerminating {
146		return
147	}
148
149	assignment, _ := response.(*pb.PartitionAssignment)
150	if err := a.handleAssignment(assignment); err != nil {
151		a.unsafeInitiateShutdown(serviceTerminated, err)
152	}
153}
154
155func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
156	if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
157		return err
158	}
159
160	a.stream.Send(&pb.PartitionAssignmentRequest{
161		Request: &pb.PartitionAssignmentRequest_Ack{
162			Ack: &pb.PartitionAssignmentAck{},
163		},
164	})
165	return nil
166}
167
168func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
169	if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) {
170		return
171	}
172	// No data to send. Immediately terminate the stream.
173	a.stream.Stop()
174}
175