1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package pubsublite
15
16import (
17	"context"
18	"time"
19
20	vkit "cloud.google.com/go/pubsublite/apiv1"
21	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
22	tspb "google.golang.org/protobuf/types/known/timestamppb"
23)
24
25// SeekTarget is the target location to seek a subscription to. Implemented by
26// BacklogLocation, PublishTime, EventTime.
27type SeekTarget interface {
28	setRequest(req *pb.SeekSubscriptionRequest)
29}
30
31// BacklogLocation refers to a location with respect to the message backlog.
32// It implements the SeekTarget interface.
33type BacklogLocation int
34
35const (
36	// End refers to the location past all currently published messages. End
37	// skips the entire message backlog.
38	End BacklogLocation = iota + 1
39
40	// Beginning refers to the location of the oldest retained message.
41	Beginning
42)
43
44func (b BacklogLocation) setRequest(req *pb.SeekSubscriptionRequest) {
45	target := pb.SeekSubscriptionRequest_TAIL
46	if b == End {
47		target = pb.SeekSubscriptionRequest_HEAD
48	}
49	req.Target = &pb.SeekSubscriptionRequest_NamedTarget_{
50		NamedTarget: target,
51	}
52}
53
54// PublishTime is a message publish timestamp. It implements the SeekTarget
55// interface.
56type PublishTime time.Time
57
58func (p PublishTime) setRequest(req *pb.SeekSubscriptionRequest) {
59	req.Target = &pb.SeekSubscriptionRequest_TimeTarget{
60		TimeTarget: &pb.TimeTarget{
61			Time: &pb.TimeTarget_PublishTime{tspb.New(time.Time(p))},
62		},
63	}
64}
65
66// EventTime is a message event timestamp. It implements the SeekTarget
67// interface.
68type EventTime time.Time
69
70func (e EventTime) setRequest(req *pb.SeekSubscriptionRequest) {
71	req.Target = &pb.SeekSubscriptionRequest_TimeTarget{
72		TimeTarget: &pb.TimeTarget{
73			Time: &pb.TimeTarget_EventTime{tspb.New(time.Time(e))},
74		},
75	}
76}
77
78// SeekSubscriptionOption is reserved for future options.
79type SeekSubscriptionOption interface{}
80
81// SeekSubscriptionResult is the result of a seek subscription operation.
82// Currently empty.
83type SeekSubscriptionResult struct{}
84
85// OperationMetadata stores metadata for long-running operations.
86type OperationMetadata struct {
87	// The target of the operation. For example, targets of seeks are
88	// subscriptions, structured like:
89	// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID"
90	Target string
91
92	// The verb describing the kind of operation.
93	Verb string
94
95	// The time the operation was created.
96	CreateTime time.Time
97
98	// The time the operation finished running. Is zero if the operation has not
99	// completed.
100	EndTime time.Time
101}
102
103func protoToOperationMetadata(o *pb.OperationMetadata) (*OperationMetadata, error) {
104	if err := o.GetCreateTime().CheckValid(); err != nil {
105		return nil, err
106	}
107	metadata := &OperationMetadata{
108		Target:     o.Target,
109		Verb:       o.Verb,
110		CreateTime: o.GetCreateTime().AsTime(),
111	}
112	if o.GetEndTime() != nil {
113		if err := o.GetEndTime().CheckValid(); err != nil {
114			return nil, err
115		}
116		metadata.EndTime = o.GetEndTime().AsTime()
117	}
118	return metadata, nil
119}
120
121// SeekSubscriptionOperation manages a long-running seek operation from
122// AdminClient.SeekSubscription.
123type SeekSubscriptionOperation struct {
124	op *vkit.SeekSubscriptionOperation
125}
126
127// Name returns the path of the seek operation, in the format:
128// "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID".
129func (s *SeekSubscriptionOperation) Name() string {
130	return s.op.Name()
131}
132
133// Done returns whether the seek operation has completed.
134func (s *SeekSubscriptionOperation) Done() bool {
135	return s.op.Done()
136}
137
138// Metadata returns metadata associated with the seek operation. To get the
139// latest metadata, call this method after a successful call to Wait.
140func (s *SeekSubscriptionOperation) Metadata() (*OperationMetadata, error) {
141	m, err := s.op.Metadata()
142	if err != nil {
143		return nil, err
144	}
145	return protoToOperationMetadata(m)
146}
147
148// Wait polls until the seek operation is complete and returns one of the
149// following:
150//  - A SeekSubscriptionResult and nil error if the operation is complete and
151//    succeeded.
152//  - Error containing failure reason if the operation is complete and failed.
153//  - Error if polling the operation status failed due to a non-retryable error.
154func (s *SeekSubscriptionOperation) Wait(ctx context.Context) (*SeekSubscriptionResult, error) {
155	if _, err := s.op.Wait(ctx); err != nil {
156		return nil, err
157	}
158	return &SeekSubscriptionResult{}, nil
159}
160