1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"errors"
18	"sync"
19)
20
21// serviceStatus specifies the current status of the service. The order of the
22// values reflects the lifecycle of services. Note that some statuses may be
23// skipped.
24type serviceStatus int
25
26const (
27	// Service has not been started.
28	serviceUninitialized serviceStatus = iota
29	// Service is starting up.
30	serviceStarting
31	// Service is active and accepting new data. Note that the underlying stream
32	// may be reconnecting due to retryable errors.
33	serviceActive
34	// Service is gracefully shutting down by flushing all pending data. No new
35	// data is accepted.
36	serviceTerminating
37	// Service has terminated. No new data is accepted.
38	serviceTerminated
39)
40
41// serviceHandle is used to compare pointers to service instances.
42type serviceHandle interface{}
43
44// serviceStatusChangeFunc notifies the parent of service status changes.
45// `serviceTerminating` and `serviceTerminated` have an associated error. This
46// error may be nil if the user called Stop().
47type serviceStatusChangeFunc func(serviceHandle, serviceStatus, error)
48
49// service is the interface that must be implemented by services (essentially
50// gRPC client stream wrappers, e.g. subscriber, publisher) that can be
51// dependencies of a compositeService.
52type service interface {
53	Start()
54	Stop()
55
56	// Methods below are implemented by abstractService.
57	AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc)
58	RemoveStatusChangeReceiver(serviceHandle)
59	Handle() serviceHandle
60	Status() serviceStatus
61	Error() error
62}
63
64// abstractService can be embedded into other structs to provide common
65// functionality for managing service status and status change receivers.
66type abstractService struct {
67	mu                    sync.Mutex
68	statusChangeReceivers []*statusChangeReceiver
69	status                serviceStatus
70	// The error that cause the service to terminate.
71	err error
72}
73
74type statusChangeReceiver struct {
75	handle         serviceHandle // For removing the receiver.
76	onStatusChange serviceStatusChangeFunc
77}
78
79func (as *abstractService) AddStatusChangeReceiver(handle serviceHandle, onStatusChange serviceStatusChangeFunc) {
80	as.mu.Lock()
81	defer as.mu.Unlock()
82	as.statusChangeReceivers = append(
83		as.statusChangeReceivers,
84		&statusChangeReceiver{handle, onStatusChange})
85}
86
87func (as *abstractService) RemoveStatusChangeReceiver(handle serviceHandle) {
88	as.mu.Lock()
89	defer as.mu.Unlock()
90
91	for i := len(as.statusChangeReceivers) - 1; i >= 0; i-- {
92		r := as.statusChangeReceivers[i]
93		if r.handle == handle {
94			// Swap with last element, erase last element and truncate the slice.
95			lastIdx := len(as.statusChangeReceivers) - 1
96			if i != lastIdx {
97				as.statusChangeReceivers[i] = as.statusChangeReceivers[lastIdx]
98			}
99			as.statusChangeReceivers[lastIdx] = nil
100			as.statusChangeReceivers = as.statusChangeReceivers[:lastIdx]
101		}
102	}
103}
104
105// Handle identifies this service instance, even when there are multiple layers
106// of embedding.
107func (as *abstractService) Handle() serviceHandle {
108	return as
109}
110
111func (as *abstractService) Error() error {
112	as.mu.Lock()
113	defer as.mu.Unlock()
114	return as.err
115}
116
117func (as *abstractService) Status() serviceStatus {
118	as.mu.Lock()
119	defer as.mu.Unlock()
120	return as.status
121}
122
123func (as *abstractService) unsafeCheckServiceStatus() error {
124	switch {
125	case as.status == serviceUninitialized:
126		return ErrServiceUninitialized
127	case as.status == serviceStarting:
128		return ErrServiceStarting
129	case as.status >= serviceTerminating:
130		return ErrServiceStopped
131	default:
132		return nil
133	}
134}
135
136// unsafeUpdateStatus assumes the service is already holding a mutex when
137// called, as it often needs to be atomic with other operations.
138func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err error) bool {
139	if as.status >= targetStatus {
140		// Already at the same or later stage of the service lifecycle.
141		return false
142	}
143
144	as.status = targetStatus
145	if as.err == nil {
146		// Prevent clobbering original error.
147		as.err = err
148	}
149
150	for _, receiver := range as.statusChangeReceivers {
151		// Notify in a goroutine to prevent deadlocks if the receiver is holding a
152		// locked mutex.
153		go receiver.onStatusChange(as.Handle(), as.status, as.err)
154	}
155	return true
156}
157
158var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started")
159
160// compositeService can be embedded into other structs to manage child services.
161// It implements the service interface and can itself be a dependency of another
162// compositeService.
163//
164// If one child service terminates due to a permanent failure, all other child
165// services are stopped. Child services can be added and removed dynamically.
166type compositeService struct {
167	// Used to block until all dependencies have started or terminated.
168	waitStarted    chan struct{}
169	waitTerminated chan struct{}
170
171	// Current dependencies.
172	dependencies map[serviceHandle]service
173	// Removed dependencies that are in the process of terminating.
174	removed map[serviceHandle]service
175
176	abstractService
177}
178
179// init must be called after creation of the derived struct.
180func (cs *compositeService) init() {
181	cs.waitStarted = make(chan struct{})
182	cs.waitTerminated = make(chan struct{})
183	cs.dependencies = make(map[serviceHandle]service)
184	cs.removed = make(map[serviceHandle]service)
185}
186
187// Start up dependencies.
188func (cs *compositeService) Start() {
189	cs.mu.Lock()
190	defer cs.mu.Unlock()
191
192	if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) {
193		for _, s := range cs.dependencies {
194			s.Start()
195		}
196	}
197}
198
199// WaitStarted waits for all dependencies to start.
200func (cs *compositeService) WaitStarted() error {
201	<-cs.waitStarted
202	return cs.Error()
203}
204
205// Stop all dependencies.
206func (cs *compositeService) Stop() {
207	cs.mu.Lock()
208	defer cs.mu.Unlock()
209	cs.unsafeInitiateShutdown(serviceTerminating, nil)
210}
211
212// WaitStopped waits for all dependencies to stop.
213func (cs *compositeService) WaitStopped() error {
214	<-cs.waitTerminated
215	return cs.Error()
216}
217
218func (cs *compositeService) unsafeAddServices(services ...service) error {
219	if cs.status >= serviceTerminating {
220		return ErrServiceStopped
221	}
222
223	for _, s := range services {
224		// Adding dependent services which have already started not currently
225		// supported. Requires updating logic to handle the compositeService state.
226		if s.Status() > serviceUninitialized {
227			return errChildServiceStarted
228		}
229
230		s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange)
231		cs.dependencies[s.Handle()] = s
232		if cs.status > serviceUninitialized {
233			s.Start()
234		}
235	}
236	return nil
237}
238
239func (cs *compositeService) unsafeRemoveService(remove service) {
240	if _, present := cs.dependencies[remove.Handle()]; !present {
241		return
242	}
243	delete(cs.dependencies, remove.Handle())
244	// The service will be completely removed after it has terminated.
245	cs.removed[remove.Handle()] = remove
246	if remove.Status() < serviceTerminating {
247		remove.Stop()
248	}
249}
250
251func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
252	if cs.unsafeUpdateStatus(targetStatus, err) {
253		for _, s := range cs.dependencies {
254			if s.Status() < serviceTerminating {
255				s.Stop()
256			}
257		}
258	}
259}
260
261func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) {
262	previousStatus := cs.status
263	if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret {
264		// Note: the waitStarted channel must be closed when the service fails to
265		// start.
266		if previousStatus < serviceActive && targetStatus >= serviceActive {
267			close(cs.waitStarted)
268		}
269		if targetStatus == serviceTerminated {
270			close(cs.waitTerminated)
271		}
272	}
273	return
274}
275
276func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status serviceStatus, err error) {
277	cs.mu.Lock()
278	defer cs.mu.Unlock()
279
280	if removedService, present := cs.removed[handle]; present {
281		if status == serviceTerminated {
282			removedService.RemoveStatusChangeReceiver(cs.Handle())
283			delete(cs.removed, handle)
284		}
285	}
286
287	// Note: we cannot rely on the service not being in the `removed` map to
288	// determine whether it is an active dependency. The notification may be for a
289	// service that is no longer in cs.removed or cs.dependencies, because status
290	// changes are notified asynchronously and may be received out of order.
291	_, isDependency := cs.dependencies[handle]
292
293	// If a single service terminates, stop them all, but allow the others to
294	// flush pending data. Ignore removed services that are stopping.
295	shouldTerminate := status >= serviceTerminating && isDependency
296	numStarted := 0
297	numTerminated := 0
298
299	for _, s := range cs.dependencies {
300		if shouldTerminate && s.Status() < serviceTerminating {
301			s.Stop()
302		}
303		if s.Status() >= serviceActive {
304			numStarted++
305		}
306		if s.Status() == serviceTerminated {
307			numTerminated++
308		}
309	}
310
311	switch {
312	case numTerminated == len(cs.dependencies) && len(cs.removed) == 0:
313		cs.unsafeUpdateStatus(serviceTerminated, err)
314	case shouldTerminate:
315		cs.unsafeUpdateStatus(serviceTerminating, err)
316	case numStarted == len(cs.dependencies):
317		cs.unsafeUpdateStatus(serviceActive, err)
318	}
319}
320
321type apiClient interface {
322	Close() error
323}
324
325type apiClients []apiClient
326
327func (ac apiClients) Close() (retErr error) {
328	for _, c := range ac {
329		if err := c.Close(); retErr == nil {
330			retErr = err
331		}
332	}
333	return
334}
335
336// A compositeService that handles closing API clients on shutdown.
337type apiClientService struct {
338	clients apiClients
339
340	compositeService
341}
342
343func (acs *apiClientService) WaitStarted() error {
344	err := acs.compositeService.WaitStarted()
345	if err != nil {
346		acs.WaitStopped()
347	}
348	return err
349}
350
351func (acs *apiClientService) WaitStopped() error {
352	err := acs.compositeService.WaitStopped()
353	acs.clients.Close()
354	return err
355}
356