1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "errors" 18 "sync" 19) 20 21// serviceStatus specifies the current status of the service. The order of the 22// values reflects the lifecycle of services. Note that some statuses may be 23// skipped. 24type serviceStatus int 25 26const ( 27 // Service has not been started. 28 serviceUninitialized serviceStatus = 0 29 // Service is starting up. 30 serviceStarting serviceStatus = 1 31 // Service is active and accepting new data. Note that the underlying stream 32 // may be reconnecting due to retryable errors. 33 serviceActive serviceStatus = 2 34 // Service is gracefully shutting down by flushing all pending data. No new 35 // data is accepted. 36 serviceTerminating serviceStatus = 3 37 // Service has terminated. No new data is accepted. 38 serviceTerminated serviceStatus = 4 39) 40 41// serviceHandle is used to compare pointers to service instances. 42type serviceHandle interface{} 43 44// serviceStatusChangeFunc notifies the parent of service status changes. 45// `serviceTerminating` and `serviceTerminated` have an associated error. This 46// error may be nil if the user called Stop(). 47type serviceStatusChangeFunc func(serviceHandle, serviceStatus, error) 48 49// service is the interface that must be implemented by services (essentially 50// gRPC client stream wrappers, e.g. subscriber, publisher) that can be 51// dependencies of a compositeService. 52type service interface { 53 Start() 54 Stop() 55 56 // Methods below are implemented by abstractService. 57 AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc) 58 RemoveStatusChangeReceiver(serviceHandle) 59 Handle() serviceHandle 60 Status() serviceStatus 61 Error() error 62} 63 64// abstractService can be embedded into other structs to provide common 65// functionality for managing service status and status change receivers. 66type abstractService struct { 67 mu sync.Mutex 68 statusChangeReceivers []*statusChangeReceiver 69 status serviceStatus 70 // The error that cause the service to terminate. 71 err error 72} 73 74type statusChangeReceiver struct { 75 handle serviceHandle // For removing the receiver. 76 onStatusChange serviceStatusChangeFunc 77} 78 79func (as *abstractService) AddStatusChangeReceiver(handle serviceHandle, onStatusChange serviceStatusChangeFunc) { 80 as.mu.Lock() 81 defer as.mu.Unlock() 82 as.statusChangeReceivers = append( 83 as.statusChangeReceivers, 84 &statusChangeReceiver{handle, onStatusChange}) 85} 86 87func (as *abstractService) RemoveStatusChangeReceiver(handle serviceHandle) { 88 as.mu.Lock() 89 defer as.mu.Unlock() 90 91 for i := len(as.statusChangeReceivers) - 1; i >= 0; i-- { 92 r := as.statusChangeReceivers[i] 93 if r.handle == handle { 94 // Swap with last element, erase last element and truncate the slice. 95 lastIdx := len(as.statusChangeReceivers) - 1 96 if i != lastIdx { 97 as.statusChangeReceivers[i] = as.statusChangeReceivers[lastIdx] 98 } 99 as.statusChangeReceivers[lastIdx] = nil 100 as.statusChangeReceivers = as.statusChangeReceivers[:lastIdx] 101 } 102 } 103} 104 105// Handle identifies this service instance, even when there are multiple layers 106// of embedding. 107func (as *abstractService) Handle() serviceHandle { 108 return as 109} 110 111func (as *abstractService) Error() error { 112 as.mu.Lock() 113 defer as.mu.Unlock() 114 return as.err 115} 116 117func (as *abstractService) Status() serviceStatus { 118 as.mu.Lock() 119 defer as.mu.Unlock() 120 return as.status 121} 122 123func (as *abstractService) unsafeCheckServiceStatus() error { 124 switch { 125 case as.status == serviceUninitialized: 126 return ErrServiceUninitialized 127 case as.status == serviceStarting: 128 return ErrServiceStarting 129 case as.status >= serviceTerminating: 130 return ErrServiceStopped 131 default: 132 return nil 133 } 134} 135 136// unsafeUpdateStatus assumes the service is already holding a mutex when 137// called, as it often needs to be atomic with other operations. 138func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err error) bool { 139 if as.status >= targetStatus { 140 // Already at the same or later stage of the service lifecycle. 141 return false 142 } 143 144 as.status = targetStatus 145 if as.err == nil { 146 // Prevent clobbering original error. 147 as.err = err 148 } 149 150 for _, receiver := range as.statusChangeReceivers { 151 // Notify in a goroutine to prevent deadlocks if the receiver is holding a 152 // locked mutex. 153 go receiver.onStatusChange(as.Handle(), as.status, as.err) 154 } 155 return true 156} 157 158var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started") 159 160// compositeService can be embedded into other structs to manage child services. 161// It implements the service interface and can itself be a dependency of another 162// compositeService. 163// 164// If one child service terminates due to a permanent failure, all other child 165// services are stopped. Child services can be added and removed dynamically. 166type compositeService struct { 167 // Used to block until all dependencies have started or terminated. 168 waitStarted chan struct{} 169 waitTerminated chan struct{} 170 171 dependencies []service 172 removed []service 173 174 abstractService 175} 176 177// init must be called after creation of the derived struct. 178func (cs *compositeService) init() { 179 cs.waitStarted = make(chan struct{}) 180 cs.waitTerminated = make(chan struct{}) 181} 182 183// Start up dependencies. 184func (cs *compositeService) Start() { 185 cs.mu.Lock() 186 defer cs.mu.Unlock() 187 188 if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) { 189 for _, s := range cs.dependencies { 190 s.Start() 191 } 192 } 193} 194 195// WaitStarted waits for all dependencies to start. 196func (cs *compositeService) WaitStarted() error { 197 <-cs.waitStarted 198 return cs.Error() 199} 200 201// Stop all dependencies. 202func (cs *compositeService) Stop() { 203 cs.mu.Lock() 204 defer cs.mu.Unlock() 205 cs.unsafeInitiateShutdown(serviceTerminating, nil) 206} 207 208// WaitStopped waits for all dependencies to stop. 209func (cs *compositeService) WaitStopped() error { 210 <-cs.waitTerminated 211 return cs.Error() 212} 213 214func (cs *compositeService) unsafeAddServices(services ...service) error { 215 if cs.status >= serviceTerminating { 216 return ErrServiceStopped 217 } 218 219 for _, s := range services { 220 // Adding dependent services which have already started not currently 221 // supported. Requires updating logic to handle the compositeService state. 222 if s.Status() > serviceUninitialized { 223 return errChildServiceStarted 224 } 225 226 s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange) 227 cs.dependencies = append(cs.dependencies, s) 228 if cs.status > serviceUninitialized { 229 s.Start() 230 } 231 } 232 return nil 233} 234 235func (cs *compositeService) unsafeRemoveService(remove service) { 236 removeIdx := -1 237 for i, s := range cs.dependencies { 238 if s.Handle() == remove.Handle() { 239 // Move from the `dependencies` to the `removed` list. 240 cs.removed = append(cs.removed, s) 241 removeIdx = i 242 if s.Status() < serviceTerminating { 243 s.Stop() 244 } 245 break 246 } 247 } 248 cs.dependencies = removeFromSlice(cs.dependencies, removeIdx) 249} 250 251func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { 252 if cs.unsafeUpdateStatus(targetStatus, err) { 253 for _, s := range cs.dependencies { 254 if s.Status() < serviceTerminating { 255 s.Stop() 256 } 257 } 258 } 259} 260 261func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) { 262 previousStatus := cs.status 263 if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret { 264 // Note: the waitStarted channel must be closed when the service fails to 265 // start. 266 if previousStatus < serviceActive && targetStatus >= serviceActive { 267 close(cs.waitStarted) 268 } 269 if targetStatus == serviceTerminated { 270 close(cs.waitTerminated) 271 } 272 } 273 return 274} 275 276func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status serviceStatus, err error) { 277 cs.mu.Lock() 278 defer cs.mu.Unlock() 279 280 removeIdx := -1 281 for i, s := range cs.removed { 282 if s.Handle() == handle { 283 if status == serviceTerminated { 284 s.RemoveStatusChangeReceiver(cs.Handle()) 285 removeIdx = i 286 } 287 break 288 } 289 } 290 cs.removed = removeFromSlice(cs.removed, removeIdx) 291 292 // Note: we cannot rely on the service not being in the removed list above to 293 // determine whether it is an active dependency. The notification may be for a 294 // service that is no longer in cs.removed or cs.dependencies, because status 295 // changes are notified asynchronously and may be received out of order. 296 isDependency := false 297 for _, s := range cs.dependencies { 298 if s.Handle() == handle { 299 isDependency = true 300 break 301 } 302 } 303 304 // If a single service terminates, stop them all, but allow the others to 305 // flush pending data. Ignore removed services that are stopping. 306 shouldTerminate := status >= serviceTerminating && isDependency 307 numStarted := 0 308 numTerminated := 0 309 310 for _, s := range cs.dependencies { 311 if shouldTerminate && s.Status() < serviceTerminating { 312 s.Stop() 313 } 314 if s.Status() >= serviceActive { 315 numStarted++ 316 } 317 if s.Status() == serviceTerminated { 318 numTerminated++ 319 } 320 } 321 322 switch { 323 case numTerminated == len(cs.dependencies) && len(cs.removed) == 0: 324 cs.unsafeUpdateStatus(serviceTerminated, err) 325 case shouldTerminate: 326 cs.unsafeUpdateStatus(serviceTerminating, err) 327 case numStarted == len(cs.dependencies): 328 cs.unsafeUpdateStatus(serviceActive, err) 329 } 330} 331 332func removeFromSlice(services []service, removeIdx int) []service { 333 lastIdx := len(services) - 1 334 if removeIdx < 0 || removeIdx > lastIdx { 335 return services 336 } 337 338 // Swap with last element, erase last element and truncate the slice. 339 if removeIdx != lastIdx { 340 services[removeIdx] = services[lastIdx] 341 } 342 services[lastIdx] = nil 343 return services[:lastIdx] 344} 345