1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "errors" 18 "sync" 19) 20 21// serviceStatus specifies the current status of the service. The order of the 22// values reflects the lifecycle of services. Note that some statuses may be 23// skipped. 24type serviceStatus int 25 26const ( 27 // Service has not been started. 28 serviceUninitialized serviceStatus = iota 29 // Service is starting up. 30 serviceStarting 31 // Service is active and accepting new data. Note that the underlying stream 32 // may be reconnecting due to retryable errors. 33 serviceActive 34 // Service is gracefully shutting down by flushing all pending data. No new 35 // data is accepted. 36 serviceTerminating 37 // Service has terminated. No new data is accepted. 38 serviceTerminated 39) 40 41// serviceHandle is used to compare pointers to service instances. 42type serviceHandle interface{} 43 44// serviceStatusChangeFunc notifies the parent of service status changes. 45// `serviceTerminating` and `serviceTerminated` have an associated error. This 46// error may be nil if the user called Stop(). 47type serviceStatusChangeFunc func(serviceHandle, serviceStatus, error) 48 49// service is the interface that must be implemented by services (essentially 50// gRPC client stream wrappers, e.g. subscriber, publisher) that can be 51// dependencies of a compositeService. 52type service interface { 53 Start() 54 Stop() 55 56 // Methods below are implemented by abstractService. 57 AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc) 58 RemoveStatusChangeReceiver(serviceHandle) 59 Handle() serviceHandle 60 Status() serviceStatus 61 Error() error 62} 63 64// abstractService can be embedded into other structs to provide common 65// functionality for managing service status and status change receivers. 66type abstractService struct { 67 mu sync.Mutex 68 statusChangeReceivers []*statusChangeReceiver 69 status serviceStatus 70 // The error that cause the service to terminate. 71 err error 72} 73 74type statusChangeReceiver struct { 75 handle serviceHandle // For removing the receiver. 76 onStatusChange serviceStatusChangeFunc 77} 78 79func (as *abstractService) AddStatusChangeReceiver(handle serviceHandle, onStatusChange serviceStatusChangeFunc) { 80 as.mu.Lock() 81 defer as.mu.Unlock() 82 as.statusChangeReceivers = append( 83 as.statusChangeReceivers, 84 &statusChangeReceiver{handle, onStatusChange}) 85} 86 87func (as *abstractService) RemoveStatusChangeReceiver(handle serviceHandle) { 88 as.mu.Lock() 89 defer as.mu.Unlock() 90 91 for i := len(as.statusChangeReceivers) - 1; i >= 0; i-- { 92 r := as.statusChangeReceivers[i] 93 if r.handle == handle { 94 // Swap with last element, erase last element and truncate the slice. 95 lastIdx := len(as.statusChangeReceivers) - 1 96 if i != lastIdx { 97 as.statusChangeReceivers[i] = as.statusChangeReceivers[lastIdx] 98 } 99 as.statusChangeReceivers[lastIdx] = nil 100 as.statusChangeReceivers = as.statusChangeReceivers[:lastIdx] 101 } 102 } 103} 104 105// Handle identifies this service instance, even when there are multiple layers 106// of embedding. 107func (as *abstractService) Handle() serviceHandle { 108 return as 109} 110 111func (as *abstractService) Error() error { 112 as.mu.Lock() 113 defer as.mu.Unlock() 114 return as.err 115} 116 117func (as *abstractService) Status() serviceStatus { 118 as.mu.Lock() 119 defer as.mu.Unlock() 120 return as.status 121} 122 123func (as *abstractService) unsafeCheckServiceStatus() error { 124 switch { 125 case as.status == serviceUninitialized: 126 return ErrServiceUninitialized 127 case as.status == serviceStarting: 128 return ErrServiceStarting 129 case as.status >= serviceTerminating: 130 return ErrServiceStopped 131 default: 132 return nil 133 } 134} 135 136// unsafeUpdateStatus assumes the service is already holding a mutex when 137// called, as it often needs to be atomic with other operations. 138func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err error) bool { 139 if as.status >= targetStatus { 140 // Already at the same or later stage of the service lifecycle. 141 return false 142 } 143 144 as.status = targetStatus 145 if as.err == nil { 146 // Prevent clobbering original error. 147 as.err = err 148 } 149 150 for _, receiver := range as.statusChangeReceivers { 151 // Notify in a goroutine to prevent deadlocks if the receiver is holding a 152 // locked mutex. 153 go receiver.onStatusChange(as.Handle(), as.status, as.err) 154 } 155 return true 156} 157 158var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started") 159 160// compositeService can be embedded into other structs to manage child services. 161// It implements the service interface and can itself be a dependency of another 162// compositeService. 163// 164// If one child service terminates due to a permanent failure, all other child 165// services are stopped. Child services can be added and removed dynamically. 166type compositeService struct { 167 // Used to block until all dependencies have started or terminated. 168 waitStarted chan struct{} 169 waitTerminated chan struct{} 170 171 // Current dependencies. 172 dependencies map[serviceHandle]service 173 // Removed dependencies that are in the process of terminating. 174 removed map[serviceHandle]service 175 176 abstractService 177} 178 179// init must be called after creation of the derived struct. 180func (cs *compositeService) init() { 181 cs.waitStarted = make(chan struct{}) 182 cs.waitTerminated = make(chan struct{}) 183 cs.dependencies = make(map[serviceHandle]service) 184 cs.removed = make(map[serviceHandle]service) 185} 186 187// Start up dependencies. 188func (cs *compositeService) Start() { 189 cs.mu.Lock() 190 defer cs.mu.Unlock() 191 192 if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) { 193 for _, s := range cs.dependencies { 194 s.Start() 195 } 196 } 197} 198 199// WaitStarted waits for all dependencies to start. 200func (cs *compositeService) WaitStarted() error { 201 <-cs.waitStarted 202 return cs.Error() 203} 204 205// Stop all dependencies. 206func (cs *compositeService) Stop() { 207 cs.mu.Lock() 208 defer cs.mu.Unlock() 209 cs.unsafeInitiateShutdown(serviceTerminating, nil) 210} 211 212// WaitStopped waits for all dependencies to stop. 213func (cs *compositeService) WaitStopped() error { 214 <-cs.waitTerminated 215 return cs.Error() 216} 217 218func (cs *compositeService) unsafeAddServices(services ...service) error { 219 if cs.status >= serviceTerminating { 220 return ErrServiceStopped 221 } 222 223 for _, s := range services { 224 // Adding dependent services which have already started not currently 225 // supported. Requires updating logic to handle the compositeService state. 226 if s.Status() > serviceUninitialized { 227 return errChildServiceStarted 228 } 229 230 s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange) 231 cs.dependencies[s.Handle()] = s 232 if cs.status > serviceUninitialized { 233 s.Start() 234 } 235 } 236 return nil 237} 238 239func (cs *compositeService) unsafeRemoveService(remove service) { 240 if _, present := cs.dependencies[remove.Handle()]; !present { 241 return 242 } 243 delete(cs.dependencies, remove.Handle()) 244 // The service will be completely removed after it has terminated. 245 cs.removed[remove.Handle()] = remove 246 if remove.Status() < serviceTerminating { 247 remove.Stop() 248 } 249} 250 251func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { 252 if cs.unsafeUpdateStatus(targetStatus, err) { 253 for _, s := range cs.dependencies { 254 if s.Status() < serviceTerminating { 255 s.Stop() 256 } 257 } 258 } 259} 260 261func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) { 262 previousStatus := cs.status 263 if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret { 264 // Note: the waitStarted channel must be closed when the service fails to 265 // start. 266 if previousStatus < serviceActive && targetStatus >= serviceActive { 267 close(cs.waitStarted) 268 } 269 if targetStatus == serviceTerminated { 270 close(cs.waitTerminated) 271 } 272 } 273 return 274} 275 276func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status serviceStatus, err error) { 277 cs.mu.Lock() 278 defer cs.mu.Unlock() 279 280 if removedService, present := cs.removed[handle]; present { 281 if status == serviceTerminated { 282 removedService.RemoveStatusChangeReceiver(cs.Handle()) 283 delete(cs.removed, handle) 284 } 285 } 286 287 // Note: we cannot rely on the service not being in the `removed` map to 288 // determine whether it is an active dependency. The notification may be for a 289 // service that is no longer in cs.removed or cs.dependencies, because status 290 // changes are notified asynchronously and may be received out of order. 291 _, isDependency := cs.dependencies[handle] 292 293 // If a single service terminates, stop them all, but allow the others to 294 // flush pending data. Ignore removed services that are stopping. 295 shouldTerminate := status >= serviceTerminating && isDependency 296 numStarted := 0 297 numTerminated := 0 298 299 for _, s := range cs.dependencies { 300 if shouldTerminate && s.Status() < serviceTerminating { 301 s.Stop() 302 } 303 if s.Status() >= serviceActive { 304 numStarted++ 305 } 306 if s.Status() == serviceTerminated { 307 numTerminated++ 308 } 309 } 310 311 switch { 312 case numTerminated == len(cs.dependencies) && len(cs.removed) == 0: 313 cs.unsafeUpdateStatus(serviceTerminated, err) 314 case shouldTerminate: 315 cs.unsafeUpdateStatus(serviceTerminating, err) 316 case numStarted == len(cs.dependencies): 317 cs.unsafeUpdateStatus(serviceActive, err) 318 } 319} 320 321type apiClient interface { 322 Close() error 323} 324 325type apiClients []apiClient 326 327func (ac apiClients) Close() (retErr error) { 328 for _, c := range ac { 329 if err := c.Close(); retErr == nil { 330 retErr = err 331 } 332 } 333 return 334} 335 336// A compositeService that handles closing API clients on shutdown. 337type apiClientService struct { 338 clients apiClients 339 340 compositeService 341} 342 343func (acs *apiClientService) WaitStarted() error { 344 err := acs.compositeService.WaitStarted() 345 if err != nil { 346 acs.WaitStopped() 347 } 348 return err 349} 350 351func (acs *apiClientService) WaitStopped() error { 352 err := acs.compositeService.WaitStopped() 353 acs.clients.Close() 354 return err 355} 356