1// Copyright 2015 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package dbus 16 17import ( 18 "errors" 19 "log" 20 "time" 21 22 "github.com/godbus/dbus" 23) 24 25const ( 26 cleanIgnoreInterval = int64(10 * time.Second) 27 ignoreInterval = int64(30 * time.Millisecond) 28) 29 30// Subscribe sets up this connection to subscribe to all systemd dbus events. 31// This is required before calling SubscribeUnits. When the connection closes 32// systemd will automatically stop sending signals so there is no need to 33// explicitly call Unsubscribe(). 34func (c *Conn) Subscribe() error { 35 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, 36 "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'") 37 c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, 38 "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'") 39 40 return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() 41} 42 43// Unsubscribe this connection from systemd dbus events. 44func (c *Conn) Unsubscribe() error { 45 return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() 46} 47 48func (c *Conn) dispatch() { 49 ch := make(chan *dbus.Signal, signalBuffer) 50 51 c.sigconn.Signal(ch) 52 53 go func() { 54 for { 55 signal, ok := <-ch 56 if !ok { 57 return 58 } 59 60 if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" { 61 c.jobComplete(signal) 62 } 63 64 if c.subStateSubscriber.updateCh == nil && 65 c.propertiesSubscriber.updateCh == nil { 66 continue 67 } 68 69 var unitPath dbus.ObjectPath 70 switch signal.Name { 71 case "org.freedesktop.systemd1.Manager.JobRemoved": 72 unitName := signal.Body[2].(string) 73 c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath) 74 case "org.freedesktop.systemd1.Manager.UnitNew": 75 unitPath = signal.Body[1].(dbus.ObjectPath) 76 case "org.freedesktop.DBus.Properties.PropertiesChanged": 77 if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { 78 unitPath = signal.Path 79 80 if len(signal.Body) >= 2 { 81 if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok { 82 c.sendPropertiesUpdate(unitPath, changed) 83 } 84 } 85 } 86 } 87 88 if unitPath == dbus.ObjectPath("") { 89 continue 90 } 91 92 c.sendSubStateUpdate(unitPath) 93 } 94 }() 95} 96 97// Returns two unbuffered channels which will receive all changed units every 98// interval. Deleted units are sent as nil. 99func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { 100 return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) 101} 102 103// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer 104// size of the channels, the comparison function for detecting changes and a filter 105// function for cutting down on the noise that your channel receives. 106func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { 107 old := make(map[string]*UnitStatus) 108 statusChan := make(chan map[string]*UnitStatus, buffer) 109 errChan := make(chan error, buffer) 110 111 go func() { 112 for { 113 timerChan := time.After(interval) 114 115 units, err := c.ListUnits() 116 if err == nil { 117 cur := make(map[string]*UnitStatus) 118 for i := range units { 119 if filterUnit != nil && filterUnit(units[i].Name) { 120 continue 121 } 122 cur[units[i].Name] = &units[i] 123 } 124 125 // add all new or changed units 126 changed := make(map[string]*UnitStatus) 127 for n, u := range cur { 128 if oldU, ok := old[n]; !ok || isChanged(oldU, u) { 129 changed[n] = u 130 } 131 delete(old, n) 132 } 133 134 // add all deleted units 135 for oldN := range old { 136 changed[oldN] = nil 137 } 138 139 old = cur 140 141 if len(changed) != 0 { 142 statusChan <- changed 143 } 144 } else { 145 errChan <- err 146 } 147 148 <-timerChan 149 } 150 }() 151 152 return statusChan, errChan 153} 154 155type SubStateUpdate struct { 156 UnitName string 157 SubState string 158} 159 160// SetSubStateSubscriber writes to updateCh when any unit's substate changes. 161// Although this writes to updateCh on every state change, the reported state 162// may be more recent than the change that generated it (due to an unavoidable 163// race in the systemd dbus interface). That is, this method provides a good 164// way to keep a current view of all units' states, but is not guaranteed to 165// show every state transition they go through. Furthermore, state changes 166// will only be written to the channel with non-blocking writes. If updateCh 167// is full, it attempts to write an error to errCh; if errCh is full, the error 168// passes silently. 169func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { 170 if c == nil { 171 msg := "nil receiver" 172 select { 173 case errCh <- errors.New(msg): 174 default: 175 log.Printf("full error channel while reporting: %s\n", msg) 176 } 177 return 178 } 179 180 c.subStateSubscriber.Lock() 181 defer c.subStateSubscriber.Unlock() 182 c.subStateSubscriber.updateCh = updateCh 183 c.subStateSubscriber.errCh = errCh 184} 185 186func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { 187 c.subStateSubscriber.Lock() 188 defer c.subStateSubscriber.Unlock() 189 190 if c.subStateSubscriber.updateCh == nil { 191 return 192 } 193 194 isIgnored := c.shouldIgnore(unitPath) 195 defer c.cleanIgnore() 196 if isIgnored { 197 return 198 } 199 200 info, err := c.GetUnitPathProperties(unitPath) 201 if err != nil { 202 select { 203 case c.subStateSubscriber.errCh <- err: 204 default: 205 log.Printf("full error channel while reporting: %s\n", err) 206 } 207 return 208 } 209 defer c.updateIgnore(unitPath, info) 210 211 name, ok := info["Id"].(string) 212 if !ok { 213 msg := "failed to cast info.Id" 214 select { 215 case c.subStateSubscriber.errCh <- errors.New(msg): 216 default: 217 log.Printf("full error channel while reporting: %s\n", err) 218 } 219 return 220 } 221 substate, ok := info["SubState"].(string) 222 if !ok { 223 msg := "failed to cast info.SubState" 224 select { 225 case c.subStateSubscriber.errCh <- errors.New(msg): 226 default: 227 log.Printf("full error channel while reporting: %s\n", msg) 228 } 229 return 230 } 231 232 update := &SubStateUpdate{name, substate} 233 select { 234 case c.subStateSubscriber.updateCh <- update: 235 default: 236 msg := "update channel is full" 237 select { 238 case c.subStateSubscriber.errCh <- errors.New(msg): 239 default: 240 log.Printf("full error channel while reporting: %s\n", msg) 241 } 242 return 243 } 244} 245 246// The ignore functions work around a wart in the systemd dbus interface. 247// Requesting the properties of an unloaded unit will cause systemd to send a 248// pair of UnitNew/UnitRemoved signals. Because we need to get a unit's 249// properties on UnitNew (as that's the only indication of a new unit coming up 250// for the first time), we would enter an infinite loop if we did not attempt 251// to detect and ignore these spurious signals. The signal themselves are 252// indistinguishable from relevant ones, so we (somewhat hackishly) ignore an 253// unloaded unit's signals for a short time after requesting its properties. 254// This means that we will miss e.g. a transient unit being restarted 255// *immediately* upon failure and also a transient unit being started 256// immediately after requesting its status (with systemctl status, for example, 257// because this causes a UnitNew signal to be sent which then causes us to fetch 258// the properties). 259 260func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { 261 t, ok := c.subStateSubscriber.ignore[path] 262 return ok && t >= time.Now().UnixNano() 263} 264 265func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { 266 loadState, ok := info["LoadState"].(string) 267 if !ok { 268 return 269 } 270 271 // unit is unloaded - it will trigger bad systemd dbus behavior 272 if loadState == "not-found" { 273 c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval 274 } 275} 276 277// without this, ignore would grow unboundedly over time 278func (c *Conn) cleanIgnore() { 279 now := time.Now().UnixNano() 280 if c.subStateSubscriber.cleanIgnore < now { 281 c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval 282 283 for p, t := range c.subStateSubscriber.ignore { 284 if t < now { 285 delete(c.subStateSubscriber.ignore, p) 286 } 287 } 288 } 289} 290 291// PropertiesUpdate holds a map of a unit's changed properties 292type PropertiesUpdate struct { 293 UnitName string 294 Changed map[string]dbus.Variant 295} 296 297// SetPropertiesSubscriber writes to updateCh when any unit's properties 298// change. Every property change reported by systemd will be sent; that is, no 299// transitions will be "missed" (as they might be with SetSubStateSubscriber). 300// However, state changes will only be written to the channel with non-blocking 301// writes. If updateCh is full, it attempts to write an error to errCh; if 302// errCh is full, the error passes silently. 303func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) { 304 c.propertiesSubscriber.Lock() 305 defer c.propertiesSubscriber.Unlock() 306 c.propertiesSubscriber.updateCh = updateCh 307 c.propertiesSubscriber.errCh = errCh 308} 309 310// we don't need to worry about shouldIgnore() here because 311// sendPropertiesUpdate doesn't call GetProperties() 312func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) { 313 c.propertiesSubscriber.Lock() 314 defer c.propertiesSubscriber.Unlock() 315 316 if c.propertiesSubscriber.updateCh == nil { 317 return 318 } 319 320 update := &PropertiesUpdate{unitName(unitPath), changedProps} 321 322 select { 323 case c.propertiesSubscriber.updateCh <- update: 324 default: 325 msg := "update channel is full" 326 select { 327 case c.propertiesSubscriber.errCh <- errors.New(msg): 328 default: 329 log.Printf("full error channel while reporting: %s\n", msg) 330 } 331 return 332 } 333} 334