1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package pubsublite 15 16import ( 17 "context" 18 "time" 19 20 vkit "cloud.google.com/go/pubsublite/apiv1" 21 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 22 tspb "google.golang.org/protobuf/types/known/timestamppb" 23) 24 25// SeekTarget is the target location to seek a subscription to. Implemented by 26// BacklogLocation, PublishTime, EventTime. 27type SeekTarget interface { 28 setRequest(req *pb.SeekSubscriptionRequest) 29} 30 31// BacklogLocation refers to a location with respect to the message backlog. 32// It implements the SeekTarget interface. 33type BacklogLocation int 34 35const ( 36 // End refers to the location past all currently published messages. End 37 // skips the entire message backlog. 38 End BacklogLocation = iota + 1 39 40 // Beginning refers to the location of the oldest retained message. 41 Beginning 42) 43 44func (b BacklogLocation) setRequest(req *pb.SeekSubscriptionRequest) { 45 target := pb.SeekSubscriptionRequest_TAIL 46 if b == End { 47 target = pb.SeekSubscriptionRequest_HEAD 48 } 49 req.Target = &pb.SeekSubscriptionRequest_NamedTarget_{ 50 NamedTarget: target, 51 } 52} 53 54// PublishTime is a message publish timestamp. It implements the SeekTarget 55// interface. 56type PublishTime time.Time 57 58func (p PublishTime) setRequest(req *pb.SeekSubscriptionRequest) { 59 req.Target = &pb.SeekSubscriptionRequest_TimeTarget{ 60 TimeTarget: &pb.TimeTarget{ 61 Time: &pb.TimeTarget_PublishTime{tspb.New(time.Time(p))}, 62 }, 63 } 64} 65 66// EventTime is a message event timestamp. It implements the SeekTarget 67// interface. 68type EventTime time.Time 69 70func (e EventTime) setRequest(req *pb.SeekSubscriptionRequest) { 71 req.Target = &pb.SeekSubscriptionRequest_TimeTarget{ 72 TimeTarget: &pb.TimeTarget{ 73 Time: &pb.TimeTarget_EventTime{tspb.New(time.Time(e))}, 74 }, 75 } 76} 77 78// SeekSubscriptionOption is reserved for future options. 79type SeekSubscriptionOption interface{} 80 81// SeekSubscriptionResult is the result of a seek subscription operation. 82// Currently empty. 83type SeekSubscriptionResult struct{} 84 85// OperationMetadata stores metadata for long-running operations. 86type OperationMetadata struct { 87 // The target of the operation. For example, targets of seeks are 88 // subscriptions, structured like: 89 // "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID" 90 Target string 91 92 // The verb describing the kind of operation. 93 Verb string 94 95 // The time the operation was created. 96 CreateTime time.Time 97 98 // The time the operation finished running. Is zero if the operation has not 99 // completed. 100 EndTime time.Time 101} 102 103func protoToOperationMetadata(o *pb.OperationMetadata) (*OperationMetadata, error) { 104 if err := o.GetCreateTime().CheckValid(); err != nil { 105 return nil, err 106 } 107 metadata := &OperationMetadata{ 108 Target: o.Target, 109 Verb: o.Verb, 110 CreateTime: o.GetCreateTime().AsTime(), 111 } 112 if o.GetEndTime() != nil { 113 if err := o.GetEndTime().CheckValid(); err != nil { 114 return nil, err 115 } 116 metadata.EndTime = o.GetEndTime().AsTime() 117 } 118 return metadata, nil 119} 120 121// SeekSubscriptionOperation manages a long-running seek operation from 122// AdminClient.SeekSubscription. 123type SeekSubscriptionOperation struct { 124 op *vkit.SeekSubscriptionOperation 125} 126 127// Name returns the path of the seek operation, in the format: 128// "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID". 129func (s *SeekSubscriptionOperation) Name() string { 130 return s.op.Name() 131} 132 133// Done returns whether the seek operation has completed. 134func (s *SeekSubscriptionOperation) Done() bool { 135 return s.op.Done() 136} 137 138// Metadata returns metadata associated with the seek operation. To get the 139// latest metadata, call this method after a successful call to Wait. 140func (s *SeekSubscriptionOperation) Metadata() (*OperationMetadata, error) { 141 m, err := s.op.Metadata() 142 if err != nil { 143 return nil, err 144 } 145 return protoToOperationMetadata(m) 146} 147 148// Wait polls until the seek operation is complete and returns one of the 149// following: 150// - A SeekSubscriptionResult and nil error if the operation is complete and 151// succeeded. 152// - Error containing failure reason if the operation is complete and failed. 153// - Error if polling the operation status failed due to a non-retryable error. 154func (s *SeekSubscriptionOperation) Wait(ctx context.Context) (*SeekSubscriptionResult, error) { 155 if _, err := s.op.Wait(ctx); err != nil { 156 return nil, err 157 } 158 return &SeekSubscriptionResult{}, nil 159} 160