1// Copyright 2015 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package dbus
16
17import (
18	"errors"
19	"time"
20
21	"github.com/godbus/dbus"
22)
23
24const (
25	cleanIgnoreInterval = int64(10 * time.Second)
26	ignoreInterval      = int64(30 * time.Millisecond)
27)
28
29// Subscribe sets up this connection to subscribe to all systemd dbus events.
30// This is required before calling SubscribeUnits. When the connection closes
31// systemd will automatically stop sending signals so there is no need to
32// explicitly call Unsubscribe().
33func (c *Conn) Subscribe() error {
34	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
35		"type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
36	c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
37		"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
38
39	err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
40	if err != nil {
41		return err
42	}
43
44	return nil
45}
46
47// Unsubscribe this connection from systemd dbus events.
48func (c *Conn) Unsubscribe() error {
49	err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
50	if err != nil {
51		return err
52	}
53
54	return nil
55}
56
57func (c *Conn) dispatch() {
58	ch := make(chan *dbus.Signal, signalBuffer)
59
60	c.sigconn.Signal(ch)
61
62	go func() {
63		for {
64			signal, ok := <-ch
65			if !ok {
66				return
67			}
68
69			if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
70				c.jobComplete(signal)
71			}
72
73			if c.subscriber.updateCh == nil {
74				continue
75			}
76
77			var unitPath dbus.ObjectPath
78			switch signal.Name {
79			case "org.freedesktop.systemd1.Manager.JobRemoved":
80				unitName := signal.Body[2].(string)
81				c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
82			case "org.freedesktop.systemd1.Manager.UnitNew":
83				unitPath = signal.Body[1].(dbus.ObjectPath)
84			case "org.freedesktop.DBus.Properties.PropertiesChanged":
85				if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
86					unitPath = signal.Path
87				}
88			}
89
90			if unitPath == dbus.ObjectPath("") {
91				continue
92			}
93
94			c.sendSubStateUpdate(unitPath)
95		}
96	}()
97}
98
99// Returns two unbuffered channels which will receive all changed units every
100// interval.  Deleted units are sent as nil.
101func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
102	return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
103}
104
105// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
106// size of the channels, the comparison function for detecting changes and a filter
107// function for cutting down on the noise that your channel receives.
108func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
109	old := make(map[string]*UnitStatus)
110	statusChan := make(chan map[string]*UnitStatus, buffer)
111	errChan := make(chan error, buffer)
112
113	go func() {
114		for {
115			timerChan := time.After(interval)
116
117			units, err := c.ListUnits()
118			if err == nil {
119				cur := make(map[string]*UnitStatus)
120				for i := range units {
121					if filterUnit != nil && filterUnit(units[i].Name) {
122						continue
123					}
124					cur[units[i].Name] = &units[i]
125				}
126
127				// add all new or changed units
128				changed := make(map[string]*UnitStatus)
129				for n, u := range cur {
130					if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
131						changed[n] = u
132					}
133					delete(old, n)
134				}
135
136				// add all deleted units
137				for oldN := range old {
138					changed[oldN] = nil
139				}
140
141				old = cur
142
143				if len(changed) != 0 {
144					statusChan <- changed
145				}
146			} else {
147				errChan <- err
148			}
149
150			<-timerChan
151		}
152	}()
153
154	return statusChan, errChan
155}
156
157type SubStateUpdate struct {
158	UnitName string
159	SubState string
160}
161
162// SetSubStateSubscriber writes to updateCh when any unit's substate changes.
163// Although this writes to updateCh on every state change, the reported state
164// may be more recent than the change that generated it (due to an unavoidable
165// race in the systemd dbus interface).  That is, this method provides a good
166// way to keep a current view of all units' states, but is not guaranteed to
167// show every state transition they go through.  Furthermore, state changes
168// will only be written to the channel with non-blocking writes.  If updateCh
169// is full, it attempts to write an error to errCh; if errCh is full, the error
170// passes silently.
171func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
172	c.subscriber.Lock()
173	defer c.subscriber.Unlock()
174	c.subscriber.updateCh = updateCh
175	c.subscriber.errCh = errCh
176}
177
178func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
179	c.subscriber.Lock()
180	defer c.subscriber.Unlock()
181
182	if c.shouldIgnore(path) {
183		return
184	}
185
186	info, err := c.GetUnitProperties(string(path))
187	if err != nil {
188		select {
189		case c.subscriber.errCh <- err:
190		default:
191		}
192	}
193
194	name := info["Id"].(string)
195	substate := info["SubState"].(string)
196
197	update := &SubStateUpdate{name, substate}
198	select {
199	case c.subscriber.updateCh <- update:
200	default:
201		select {
202		case c.subscriber.errCh <- errors.New("update channel full!"):
203		default:
204		}
205	}
206
207	c.updateIgnore(path, info)
208}
209
210// The ignore functions work around a wart in the systemd dbus interface.
211// Requesting the properties of an unloaded unit will cause systemd to send a
212// pair of UnitNew/UnitRemoved signals.  Because we need to get a unit's
213// properties on UnitNew (as that's the only indication of a new unit coming up
214// for the first time), we would enter an infinite loop if we did not attempt
215// to detect and ignore these spurious signals.  The signal themselves are
216// indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
217// unloaded unit's signals for a short time after requesting its properties.
218// This means that we will miss e.g. a transient unit being restarted
219// *immediately* upon failure and also a transient unit being started
220// immediately after requesting its status (with systemctl status, for example,
221// because this causes a UnitNew signal to be sent which then causes us to fetch
222// the properties).
223
224func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
225	t, ok := c.subscriber.ignore[path]
226	return ok && t >= time.Now().UnixNano()
227}
228
229func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
230	c.cleanIgnore()
231
232	// unit is unloaded - it will trigger bad systemd dbus behavior
233	if info["LoadState"].(string) == "not-found" {
234		c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
235	}
236}
237
238// without this, ignore would grow unboundedly over time
239func (c *Conn) cleanIgnore() {
240	now := time.Now().UnixNano()
241	if c.subscriber.cleanIgnore < now {
242		c.subscriber.cleanIgnore = now + cleanIgnoreInterval
243
244		for p, t := range c.subscriber.ignore {
245			if t < now {
246				delete(c.subscriber.ignore, p)
247			}
248		}
249	}
250}
251