1// Copyright 2016 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "sync" 19 "time" 20 21 "golang.org/x/net/context" 22) 23 24// keepAlive keeps track of which Messages need to have their deadline extended, and 25// periodically extends them. 26// Messages are tracked by Ack ID. 27type keepAlive struct { 28 s service 29 Ctx context.Context // The context to use when extending deadlines. 30 Sub string // The full name of the subscription. 31 ExtensionTick <-chan time.Time // ExtenstionTick supplies the frequency with which to make extension requests. 32 Deadline time.Duration // How long to extend messages for each time they are extended. Should be greater than ExtensionTick frequency. 33 MaxExtension time.Duration // How long to keep extending each message's ack deadline before automatically removing it. 34 35 mu sync.Mutex 36 // key: ackID; value: time at which ack deadline extension should cease. 37 items map[string]time.Time 38 dr drain 39 40 wg sync.WaitGroup 41} 42 43// Start initiates the deadline extension loop. Stop must be called once keepAlive is no longer needed. 44func (ka *keepAlive) Start() { 45 ka.items = make(map[string]time.Time) 46 ka.dr = drain{Drained: make(chan struct{})} 47 ka.wg.Add(1) 48 go func() { 49 defer ka.wg.Done() 50 for { 51 select { 52 case <-ka.Ctx.Done(): 53 // Don't bother waiting for items to be removed: we can't extend them any more. 54 return 55 case <-ka.dr.Drained: 56 return 57 case <-ka.ExtensionTick: 58 live, expired := ka.getAckIDs() 59 ka.wg.Add(1) 60 go func() { 61 defer ka.wg.Done() 62 ka.extendDeadlines(live) 63 }() 64 65 for _, id := range expired { 66 ka.Remove(id) 67 } 68 } 69 } 70 }() 71} 72 73// Add adds an ack id to be kept alive. 74// It should not be called after Stop. 75func (ka *keepAlive) Add(ackID string) { 76 ka.mu.Lock() 77 defer ka.mu.Unlock() 78 79 ka.items[ackID] = time.Now().Add(ka.MaxExtension) 80 ka.dr.SetPending(true) 81} 82 83// Remove removes ackID from the list to be kept alive. 84func (ka *keepAlive) Remove(ackID string) { 85 ka.mu.Lock() 86 defer ka.mu.Unlock() 87 88 // Note: If users NACKs a message after it has been removed due to 89 // expiring, Remove will be called twice with same ack id. This is OK. 90 delete(ka.items, ackID) 91 ka.dr.SetPending(len(ka.items) != 0) 92} 93 94// Stop waits until all added ackIDs have been removed, and cleans up resources. 95// Stop may only be called once. 96func (ka *keepAlive) Stop() { 97 ka.mu.Lock() 98 ka.dr.Drain() 99 ka.mu.Unlock() 100 101 ka.wg.Wait() 102} 103 104// getAckIDs returns the set of ackIDs that are being kept alive. 105// The set is divided into two lists: one with IDs that should continue to be kept alive, 106// and the other with IDs that should be dropped. 107func (ka *keepAlive) getAckIDs() (live, expired []string) { 108 ka.mu.Lock() 109 defer ka.mu.Unlock() 110 111 now := time.Now() 112 for id, expiry := range ka.items { 113 if expiry.Before(now) { 114 expired = append(expired, id) 115 } else { 116 live = append(live, id) 117 } 118 } 119 return live, expired 120} 121 122const maxExtensionAttempts = 2 123 124func (ka *keepAlive) extendDeadlines(ackIDs []string) { 125 head, tail := ka.s.splitAckIDs(ackIDs) 126 for len(head) > 0 { 127 for i := 0; i < maxExtensionAttempts; i++ { 128 if ka.s.modifyAckDeadline(ka.Ctx, ka.Sub, ka.Deadline, head) == nil { 129 break 130 } 131 } 132 // NOTE: Messages whose deadlines we fail to extend will 133 // eventually be redelivered and this is a documented behaviour 134 // of the API. 135 // 136 // NOTE: If we fail to extend deadlines here, this 137 // implementation will continue to attempt extending the 138 // deadlines for those ack IDs the next time the extension 139 // ticker ticks. By then the deadline will have expired. 140 // Re-extending them is harmless, however. 141 // 142 // TODO: call Remove for ids which fail to be extended. 143 144 head, tail = ka.s.splitAckIDs(tail) 145 } 146} 147 148// A drain (once started) indicates via a channel when there is no work pending. 149type drain struct { 150 started bool 151 pending bool 152 153 // Drained is closed once there are no items outstanding if Drain has been called. 154 Drained chan struct{} 155} 156 157// Drain starts the drain process. This cannot be undone. 158func (d *drain) Drain() { 159 d.started = true 160 d.closeIfDrained() 161} 162 163// SetPending sets whether there is work pending or not. It may be called multiple times before or after Drain. 164func (d *drain) SetPending(pending bool) { 165 d.pending = pending 166 d.closeIfDrained() 167} 168 169func (d *drain) closeIfDrained() { 170 if !d.pending && d.started { 171 // Check to see if d.Drained is closed before closing it. 172 // This allows SetPending(false) to be safely called multiple times. 173 select { 174 case <-d.Drained: 175 default: 176 close(d.Drained) 177 } 178 } 179} 180