1// Copyright 2015 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package dbus 16 17import ( 18 "errors" 19 "time" 20 21 "github.com/godbus/dbus" 22) 23 24const ( 25 cleanIgnoreInterval = int64(10 * time.Second) 26 ignoreInterval = int64(30 * time.Millisecond) 27) 28 29// Subscribe sets up this connection to subscribe to all systemd dbus events. 30// This is required before calling SubscribeUnits. When the connection closes 31// systemd will automatically stop sending signals so there is no need to 32// explicitly call Unsubscribe(). 33func (c *Conn) Subscribe() error { 34 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, 35 "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'") 36 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, 37 "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'") 38 39 err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() 40 if err != nil { 41 return err 42 } 43 44 return nil 45} 46 47// Unsubscribe this connection from systemd dbus events. 48func (c *Conn) Unsubscribe() error { 49 err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() 50 if err != nil { 51 return err 52 } 53 54 return nil 55} 56 57func (c *Conn) dispatch() { 58 ch := make(chan *dbus.Signal, signalBuffer) 59 60 c.sigconn.Signal(ch) 61 62 go func() { 63 for { 64 signal, ok := <-ch 65 if !ok { 66 return 67 } 68 69 if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" { 70 c.jobComplete(signal) 71 } 72 73 if c.subscriber.updateCh == nil { 74 continue 75 } 76 77 var unitPath dbus.ObjectPath 78 switch signal.Name { 79 case "org.freedesktop.systemd1.Manager.JobRemoved": 80 unitName := signal.Body[2].(string) 81 c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath) 82 case "org.freedesktop.systemd1.Manager.UnitNew": 83 unitPath = signal.Body[1].(dbus.ObjectPath) 84 case "org.freedesktop.DBus.Properties.PropertiesChanged": 85 if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { 86 unitPath = signal.Path 87 } 88 } 89 90 if unitPath == dbus.ObjectPath("") { 91 continue 92 } 93 94 c.sendSubStateUpdate(unitPath) 95 } 96 }() 97} 98 99// Returns two unbuffered channels which will receive all changed units every 100// interval. Deleted units are sent as nil. 101func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { 102 return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) 103} 104 105// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer 106// size of the channels, the comparison function for detecting changes and a filter 107// function for cutting down on the noise that your channel receives. 108func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { 109 old := make(map[string]*UnitStatus) 110 statusChan := make(chan map[string]*UnitStatus, buffer) 111 errChan := make(chan error, buffer) 112 113 go func() { 114 for { 115 timerChan := time.After(interval) 116 117 units, err := c.ListUnits() 118 if err == nil { 119 cur := make(map[string]*UnitStatus) 120 for i := range units { 121 if filterUnit != nil && filterUnit(units[i].Name) { 122 continue 123 } 124 cur[units[i].Name] = &units[i] 125 } 126 127 // add all new or changed units 128 changed := make(map[string]*UnitStatus) 129 for n, u := range cur { 130 if oldU, ok := old[n]; !ok || isChanged(oldU, u) { 131 changed[n] = u 132 } 133 delete(old, n) 134 } 135 136 // add all deleted units 137 for oldN := range old { 138 changed[oldN] = nil 139 } 140 141 old = cur 142 143 if len(changed) != 0 { 144 statusChan <- changed 145 } 146 } else { 147 errChan <- err 148 } 149 150 <-timerChan 151 } 152 }() 153 154 return statusChan, errChan 155} 156 157type SubStateUpdate struct { 158 UnitName string 159 SubState string 160} 161 162// SetSubStateSubscriber writes to updateCh when any unit's substate changes. 163// Although this writes to updateCh on every state change, the reported state 164// may be more recent than the change that generated it (due to an unavoidable 165// race in the systemd dbus interface). That is, this method provides a good 166// way to keep a current view of all units' states, but is not guaranteed to 167// show every state transition they go through. Furthermore, state changes 168// will only be written to the channel with non-blocking writes. If updateCh 169// is full, it attempts to write an error to errCh; if errCh is full, the error 170// passes silently. 171func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { 172 c.subscriber.Lock() 173 defer c.subscriber.Unlock() 174 c.subscriber.updateCh = updateCh 175 c.subscriber.errCh = errCh 176} 177 178func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) { 179 c.subscriber.Lock() 180 defer c.subscriber.Unlock() 181 182 if c.shouldIgnore(path) { 183 return 184 } 185 186 info, err := c.GetUnitProperties(string(path)) 187 if err != nil { 188 select { 189 case c.subscriber.errCh <- err: 190 default: 191 } 192 } 193 194 name := info["Id"].(string) 195 substate := info["SubState"].(string) 196 197 update := &SubStateUpdate{name, substate} 198 select { 199 case c.subscriber.updateCh <- update: 200 default: 201 select { 202 case c.subscriber.errCh <- errors.New("update channel full!"): 203 default: 204 } 205 } 206 207 c.updateIgnore(path, info) 208} 209 210// The ignore functions work around a wart in the systemd dbus interface. 211// Requesting the properties of an unloaded unit will cause systemd to send a 212// pair of UnitNew/UnitRemoved signals. Because we need to get a unit's 213// properties on UnitNew (as that's the only indication of a new unit coming up 214// for the first time), we would enter an infinite loop if we did not attempt 215// to detect and ignore these spurious signals. The signal themselves are 216// indistinguishable from relevant ones, so we (somewhat hackishly) ignore an 217// unloaded unit's signals for a short time after requesting its properties. 218// This means that we will miss e.g. a transient unit being restarted 219// *immediately* upon failure and also a transient unit being started 220// immediately after requesting its status (with systemctl status, for example, 221// because this causes a UnitNew signal to be sent which then causes us to fetch 222// the properties). 223 224func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { 225 t, ok := c.subscriber.ignore[path] 226 return ok && t >= time.Now().UnixNano() 227} 228 229func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { 230 c.cleanIgnore() 231 232 // unit is unloaded - it will trigger bad systemd dbus behavior 233 if info["LoadState"].(string) == "not-found" { 234 c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval 235 } 236} 237 238// without this, ignore would grow unboundedly over time 239func (c *Conn) cleanIgnore() { 240 now := time.Now().UnixNano() 241 if c.subscriber.cleanIgnore < now { 242 c.subscriber.cleanIgnore = now + cleanIgnoreInterval 243 244 for p, t := range c.subscriber.ignore { 245 if t < now { 246 delete(c.subscriber.ignore, p) 247 } 248 } 249 } 250} 251