1// Copyright 2015 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package dbus
16
17import (
18	"errors"
19	"log"
20	"time"
21
22	"github.com/godbus/dbus"
23)
24
25const (
26	cleanIgnoreInterval = int64(10 * time.Second)
27	ignoreInterval      = int64(30 * time.Millisecond)
28)
29
30// Subscribe sets up this connection to subscribe to all systemd dbus events.
31// This is required before calling SubscribeUnits. When the connection closes
32// systemd will automatically stop sending signals so there is no need to
33// explicitly call Unsubscribe().
34func (c *Conn) Subscribe() error {
35	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
36		"type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
37	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
38		"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
39
40	return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
41}
42
43// Unsubscribe this connection from systemd dbus events.
44func (c *Conn) Unsubscribe() error {
45	return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
46}
47
48func (c *Conn) dispatch() {
49	ch := make(chan *dbus.Signal, signalBuffer)
50
51	c.sigconn.Signal(ch)
52
53	go func() {
54		for {
55			signal, ok := <-ch
56			if !ok {
57				return
58			}
59
60			if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
61				c.jobComplete(signal)
62			}
63
64			if c.subStateSubscriber.updateCh == nil &&
65				c.propertiesSubscriber.updateCh == nil {
66				continue
67			}
68
69			var unitPath dbus.ObjectPath
70			switch signal.Name {
71			case "org.freedesktop.systemd1.Manager.JobRemoved":
72				unitName := signal.Body[2].(string)
73				c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
74			case "org.freedesktop.systemd1.Manager.UnitNew":
75				unitPath = signal.Body[1].(dbus.ObjectPath)
76			case "org.freedesktop.DBus.Properties.PropertiesChanged":
77				if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
78					unitPath = signal.Path
79
80					if len(signal.Body) >= 2 {
81						if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
82							c.sendPropertiesUpdate(unitPath, changed)
83						}
84					}
85				}
86			}
87
88			if unitPath == dbus.ObjectPath("") {
89				continue
90			}
91
92			c.sendSubStateUpdate(unitPath)
93		}
94	}()
95}
96
97// SubscribeUnits returns two unbuffered channels which will receive all changed units every
98// interval.  Deleted units are sent as nil.
99func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
100	return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
101}
102
103// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
104// size of the channels, the comparison function for detecting changes and a filter
105// function for cutting down on the noise that your channel receives.
106func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
107	old := make(map[string]*UnitStatus)
108	statusChan := make(chan map[string]*UnitStatus, buffer)
109	errChan := make(chan error, buffer)
110
111	go func() {
112		for {
113			timerChan := time.After(interval)
114
115			units, err := c.ListUnits()
116			if err == nil {
117				cur := make(map[string]*UnitStatus)
118				for i := range units {
119					if filterUnit != nil && filterUnit(units[i].Name) {
120						continue
121					}
122					cur[units[i].Name] = &units[i]
123				}
124
125				// add all new or changed units
126				changed := make(map[string]*UnitStatus)
127				for n, u := range cur {
128					if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
129						changed[n] = u
130					}
131					delete(old, n)
132				}
133
134				// add all deleted units
135				for oldN := range old {
136					changed[oldN] = nil
137				}
138
139				old = cur
140
141				if len(changed) != 0 {
142					statusChan <- changed
143				}
144			} else {
145				errChan <- err
146			}
147
148			<-timerChan
149		}
150	}()
151
152	return statusChan, errChan
153}
154
155type SubStateUpdate struct {
156	UnitName string
157	SubState string
158}
159
160// SetSubStateSubscriber writes to updateCh when any unit's substate changes.
161// Although this writes to updateCh on every state change, the reported state
162// may be more recent than the change that generated it (due to an unavoidable
163// race in the systemd dbus interface).  That is, this method provides a good
164// way to keep a current view of all units' states, but is not guaranteed to
165// show every state transition they go through.  Furthermore, state changes
166// will only be written to the channel with non-blocking writes.  If updateCh
167// is full, it attempts to write an error to errCh; if errCh is full, the error
168// passes silently.
169func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
170	if c == nil {
171		msg := "nil receiver"
172		select {
173		case errCh <- errors.New(msg):
174		default:
175			log.Printf("full error channel while reporting: %s\n", msg)
176		}
177		return
178	}
179
180	c.subStateSubscriber.Lock()
181	defer c.subStateSubscriber.Unlock()
182	c.subStateSubscriber.updateCh = updateCh
183	c.subStateSubscriber.errCh = errCh
184}
185
186func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
187	c.subStateSubscriber.Lock()
188	defer c.subStateSubscriber.Unlock()
189
190	if c.subStateSubscriber.updateCh == nil {
191		return
192	}
193
194	isIgnored := c.shouldIgnore(unitPath)
195	defer c.cleanIgnore()
196	if isIgnored {
197		return
198	}
199
200	info, err := c.GetUnitPathProperties(unitPath)
201	if err != nil {
202		select {
203		case c.subStateSubscriber.errCh <- err:
204		default:
205			log.Printf("full error channel while reporting: %s\n", err)
206		}
207		return
208	}
209	defer c.updateIgnore(unitPath, info)
210
211	name, ok := info["Id"].(string)
212	if !ok {
213		msg := "failed to cast info.Id"
214		select {
215		case c.subStateSubscriber.errCh <- errors.New(msg):
216		default:
217			log.Printf("full error channel while reporting: %s\n", err)
218		}
219		return
220	}
221	substate, ok := info["SubState"].(string)
222	if !ok {
223		msg := "failed to cast info.SubState"
224		select {
225		case c.subStateSubscriber.errCh <- errors.New(msg):
226		default:
227			log.Printf("full error channel while reporting: %s\n", msg)
228		}
229		return
230	}
231
232	update := &SubStateUpdate{name, substate}
233	select {
234	case c.subStateSubscriber.updateCh <- update:
235	default:
236		msg := "update channel is full"
237		select {
238		case c.subStateSubscriber.errCh <- errors.New(msg):
239		default:
240			log.Printf("full error channel while reporting: %s\n", msg)
241		}
242		return
243	}
244}
245
246// The ignore functions work around a wart in the systemd dbus interface.
247// Requesting the properties of an unloaded unit will cause systemd to send a
248// pair of UnitNew/UnitRemoved signals.  Because we need to get a unit's
249// properties on UnitNew (as that's the only indication of a new unit coming up
250// for the first time), we would enter an infinite loop if we did not attempt
251// to detect and ignore these spurious signals.  The signal themselves are
252// indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
253// unloaded unit's signals for a short time after requesting its properties.
254// This means that we will miss e.g. a transient unit being restarted
255// *immediately* upon failure and also a transient unit being started
256// immediately after requesting its status (with systemctl status, for example,
257// because this causes a UnitNew signal to be sent which then causes us to fetch
258// the properties).
259
260func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
261	t, ok := c.subStateSubscriber.ignore[path]
262	return ok && t >= time.Now().UnixNano()
263}
264
265func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
266	loadState, ok := info["LoadState"].(string)
267	if !ok {
268		return
269	}
270
271	// unit is unloaded - it will trigger bad systemd dbus behavior
272	if loadState == "not-found" {
273		c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
274	}
275}
276
277// without this, ignore would grow unboundedly over time
278func (c *Conn) cleanIgnore() {
279	now := time.Now().UnixNano()
280	if c.subStateSubscriber.cleanIgnore < now {
281		c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval
282
283		for p, t := range c.subStateSubscriber.ignore {
284			if t < now {
285				delete(c.subStateSubscriber.ignore, p)
286			}
287		}
288	}
289}
290
291// PropertiesUpdate holds a map of a unit's changed properties
292type PropertiesUpdate struct {
293	UnitName string
294	Changed  map[string]dbus.Variant
295}
296
297// SetPropertiesSubscriber writes to updateCh when any unit's properties
298// change. Every property change reported by systemd will be sent; that is, no
299// transitions will be "missed" (as they might be with SetSubStateSubscriber).
300// However, state changes will only be written to the channel with non-blocking
301// writes.  If updateCh is full, it attempts to write an error to errCh; if
302// errCh is full, the error passes silently.
303func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
304	c.propertiesSubscriber.Lock()
305	defer c.propertiesSubscriber.Unlock()
306	c.propertiesSubscriber.updateCh = updateCh
307	c.propertiesSubscriber.errCh = errCh
308}
309
310// we don't need to worry about shouldIgnore() here because
311// sendPropertiesUpdate doesn't call GetProperties()
312func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
313	c.propertiesSubscriber.Lock()
314	defer c.propertiesSubscriber.Unlock()
315
316	if c.propertiesSubscriber.updateCh == nil {
317		return
318	}
319
320	update := &PropertiesUpdate{unitName(unitPath), changedProps}
321
322	select {
323	case c.propertiesSubscriber.updateCh <- update:
324	default:
325		msg := "update channel is full"
326		select {
327		case c.propertiesSubscriber.errCh <- errors.New(msg):
328		default:
329			log.Printf("full error channel while reporting: %s\n", msg)
330		}
331		return
332	}
333}
334