1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"errors"
18	"sync"
19)
20
21// serviceStatus specifies the current status of the service. The order of the
22// values reflects the lifecycle of services. Note that some statuses may be
23// skipped.
24type serviceStatus int
25
26const (
27	// Service has not been started.
28	serviceUninitialized serviceStatus = 0
29	// Service is starting up.
30	serviceStarting serviceStatus = 1
31	// Service is active and accepting new data. Note that the underlying stream
32	// may be reconnecting due to retryable errors.
33	serviceActive serviceStatus = 2
34	// Service is gracefully shutting down by flushing all pending data. No new
35	// data is accepted.
36	serviceTerminating serviceStatus = 3
37	// Service has terminated. No new data is accepted.
38	serviceTerminated serviceStatus = 4
39)
40
41// serviceHandle is used to compare pointers to service instances.
42type serviceHandle interface{}
43
44// serviceStatusChangeFunc notifies the parent of service status changes.
45// `serviceTerminating` and `serviceTerminated` have an associated error. This
46// error may be nil if the user called Stop().
47type serviceStatusChangeFunc func(serviceHandle, serviceStatus, error)
48
49// service is the interface that must be implemented by services (essentially
50// gRPC client stream wrappers, e.g. subscriber, publisher) that can be
51// dependencies of a compositeService.
52type service interface {
53	Start()
54	Stop()
55
56	// Methods below are implemented by abstractService.
57	AddStatusChangeReceiver(serviceHandle, serviceStatusChangeFunc)
58	RemoveStatusChangeReceiver(serviceHandle)
59	Handle() serviceHandle
60	Status() serviceStatus
61	Error() error
62}
63
64// abstractService can be embedded into other structs to provide common
65// functionality for managing service status and status change receivers.
66type abstractService struct {
67	mu                    sync.Mutex
68	statusChangeReceivers []*statusChangeReceiver
69	status                serviceStatus
70	// The error that cause the service to terminate.
71	err error
72}
73
74type statusChangeReceiver struct {
75	handle         serviceHandle // For removing the receiver.
76	onStatusChange serviceStatusChangeFunc
77}
78
79func (as *abstractService) AddStatusChangeReceiver(handle serviceHandle, onStatusChange serviceStatusChangeFunc) {
80	as.mu.Lock()
81	defer as.mu.Unlock()
82	as.statusChangeReceivers = append(
83		as.statusChangeReceivers,
84		&statusChangeReceiver{handle, onStatusChange})
85}
86
87func (as *abstractService) RemoveStatusChangeReceiver(handle serviceHandle) {
88	as.mu.Lock()
89	defer as.mu.Unlock()
90
91	for i := len(as.statusChangeReceivers) - 1; i >= 0; i-- {
92		r := as.statusChangeReceivers[i]
93		if r.handle == handle {
94			// Swap with last element, erase last element and truncate the slice.
95			lastIdx := len(as.statusChangeReceivers) - 1
96			if i != lastIdx {
97				as.statusChangeReceivers[i] = as.statusChangeReceivers[lastIdx]
98			}
99			as.statusChangeReceivers[lastIdx] = nil
100			as.statusChangeReceivers = as.statusChangeReceivers[:lastIdx]
101		}
102	}
103}
104
105// Handle identifies this service instance, even when there are multiple layers
106// of embedding.
107func (as *abstractService) Handle() serviceHandle {
108	return as
109}
110
111func (as *abstractService) Error() error {
112	as.mu.Lock()
113	defer as.mu.Unlock()
114	return as.err
115}
116
117func (as *abstractService) Status() serviceStatus {
118	as.mu.Lock()
119	defer as.mu.Unlock()
120	return as.status
121}
122
123func (as *abstractService) unsafeCheckServiceStatus() error {
124	switch {
125	case as.status == serviceUninitialized:
126		return ErrServiceUninitialized
127	case as.status == serviceStarting:
128		return ErrServiceStarting
129	case as.status >= serviceTerminating:
130		return ErrServiceStopped
131	default:
132		return nil
133	}
134}
135
136// unsafeUpdateStatus assumes the service is already holding a mutex when
137// called, as it often needs to be atomic with other operations.
138func (as *abstractService) unsafeUpdateStatus(targetStatus serviceStatus, err error) bool {
139	if as.status >= targetStatus {
140		// Already at the same or later stage of the service lifecycle.
141		return false
142	}
143
144	as.status = targetStatus
145	if as.err == nil {
146		// Prevent clobbering original error.
147		as.err = err
148	}
149
150	for _, receiver := range as.statusChangeReceivers {
151		// Notify in a goroutine to prevent deadlocks if the receiver is holding a
152		// locked mutex.
153		go receiver.onStatusChange(as.Handle(), as.status, as.err)
154	}
155	return true
156}
157
158var errChildServiceStarted = errors.New("pubsublite: dependent service must not be started")
159
160// compositeService can be embedded into other structs to manage child services.
161// It implements the service interface and can itself be a dependency of another
162// compositeService.
163//
164// If one child service terminates due to a permanent failure, all other child
165// services are stopped. Child services can be added and removed dynamically.
166type compositeService struct {
167	// Used to block until all dependencies have started or terminated.
168	waitStarted    chan struct{}
169	waitTerminated chan struct{}
170
171	dependencies []service
172	removed      []service
173
174	abstractService
175}
176
177// init must be called after creation of the derived struct.
178func (cs *compositeService) init() {
179	cs.waitStarted = make(chan struct{})
180	cs.waitTerminated = make(chan struct{})
181}
182
183// Start up dependencies.
184func (cs *compositeService) Start() {
185	cs.mu.Lock()
186	defer cs.mu.Unlock()
187
188	if cs.abstractService.unsafeUpdateStatus(serviceStarting, nil) {
189		for _, s := range cs.dependencies {
190			s.Start()
191		}
192	}
193}
194
195// WaitStarted waits for all dependencies to start.
196func (cs *compositeService) WaitStarted() error {
197	<-cs.waitStarted
198	return cs.Error()
199}
200
201// Stop all dependencies.
202func (cs *compositeService) Stop() {
203	cs.mu.Lock()
204	defer cs.mu.Unlock()
205	cs.unsafeInitiateShutdown(serviceTerminating, nil)
206}
207
208// WaitStopped waits for all dependencies to stop.
209func (cs *compositeService) WaitStopped() error {
210	<-cs.waitTerminated
211	return cs.Error()
212}
213
214func (cs *compositeService) unsafeAddServices(services ...service) error {
215	if cs.status >= serviceTerminating {
216		return ErrServiceStopped
217	}
218
219	for _, s := range services {
220		// Adding dependent services which have already started not currently
221		// supported. Requires updating logic to handle the compositeService state.
222		if s.Status() > serviceUninitialized {
223			return errChildServiceStarted
224		}
225
226		s.AddStatusChangeReceiver(cs.Handle(), cs.onServiceStatusChange)
227		cs.dependencies = append(cs.dependencies, s)
228		if cs.status > serviceUninitialized {
229			s.Start()
230		}
231	}
232	return nil
233}
234
235func (cs *compositeService) unsafeRemoveService(remove service) {
236	removeIdx := -1
237	for i, s := range cs.dependencies {
238		if s.Handle() == remove.Handle() {
239			// Move from the `dependencies` to the `removed` list.
240			cs.removed = append(cs.removed, s)
241			removeIdx = i
242			if s.Status() < serviceTerminating {
243				s.Stop()
244			}
245			break
246		}
247	}
248	cs.dependencies = removeFromSlice(cs.dependencies, removeIdx)
249}
250
251func (cs *compositeService) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
252	if cs.unsafeUpdateStatus(targetStatus, err) {
253		for _, s := range cs.dependencies {
254			if s.Status() < serviceTerminating {
255				s.Stop()
256			}
257		}
258	}
259}
260
261func (cs *compositeService) unsafeUpdateStatus(targetStatus serviceStatus, err error) (ret bool) {
262	previousStatus := cs.status
263	if ret = cs.abstractService.unsafeUpdateStatus(targetStatus, err); ret {
264		// Note: the waitStarted channel must be closed when the service fails to
265		// start.
266		if previousStatus < serviceActive && targetStatus >= serviceActive {
267			close(cs.waitStarted)
268		}
269		if targetStatus == serviceTerminated {
270			close(cs.waitTerminated)
271		}
272	}
273	return
274}
275
276func (cs *compositeService) onServiceStatusChange(handle serviceHandle, status serviceStatus, err error) {
277	cs.mu.Lock()
278	defer cs.mu.Unlock()
279
280	removeIdx := -1
281	for i, s := range cs.removed {
282		if s.Handle() == handle {
283			if status == serviceTerminated {
284				s.RemoveStatusChangeReceiver(cs.Handle())
285				removeIdx = i
286			}
287			break
288		}
289	}
290	cs.removed = removeFromSlice(cs.removed, removeIdx)
291
292	// Note: we cannot rely on the service not being in the removed list above to
293	// determine whether it is an active dependency. The notification may be for a
294	// service that is no longer in cs.removed or cs.dependencies, because status
295	// changes are notified asynchronously and may be received out of order.
296	isDependency := false
297	for _, s := range cs.dependencies {
298		if s.Handle() == handle {
299			isDependency = true
300			break
301		}
302	}
303
304	// If a single service terminates, stop them all, but allow the others to
305	// flush pending data. Ignore removed services that are stopping.
306	shouldTerminate := status >= serviceTerminating && isDependency
307	numStarted := 0
308	numTerminated := 0
309
310	for _, s := range cs.dependencies {
311		if shouldTerminate && s.Status() < serviceTerminating {
312			s.Stop()
313		}
314		if s.Status() >= serviceActive {
315			numStarted++
316		}
317		if s.Status() == serviceTerminated {
318			numTerminated++
319		}
320	}
321
322	switch {
323	case numTerminated == len(cs.dependencies) && len(cs.removed) == 0:
324		cs.unsafeUpdateStatus(serviceTerminated, err)
325	case shouldTerminate:
326		cs.unsafeUpdateStatus(serviceTerminating, err)
327	case numStarted == len(cs.dependencies):
328		cs.unsafeUpdateStatus(serviceActive, err)
329	}
330}
331
332func removeFromSlice(services []service, removeIdx int) []service {
333	lastIdx := len(services) - 1
334	if removeIdx < 0 || removeIdx > lastIdx {
335		return services
336	}
337
338	// Swap with last element, erase last element and truncate the slice.
339	if removeIdx != lastIdx {
340		services[removeIdx] = services[lastIdx]
341	}
342	services[lastIdx] = nil
343	return services[:lastIdx]
344}
345