1#!/usr/bin/env python3 2# 3# 4# 5############################################################################## 6# Start off by implementing a general purpose event loop for anyone's use 7############################################################################## 8 9import atexit 10import os 11import libvirt 12import select 13import errno 14import time 15import threading 16from argparse import ArgumentParser 17from typing import Any, Callable, Dict, List, Optional, TypeVar # noqa F401 18_T = TypeVar("_T") 19_EventCallback = Callable[[int, int, int, _T], None] 20_TimerCallback = Callable[[int, _T], None] 21 22 23# This example can use three different event loop impls. It defaults 24# to a portable pure-python impl based on poll that is implemented 25# in this file. 26# 27# When Python >= 3.4, it can optionally use an impl based on the 28# new asyncio module. 29# 30# Finally, it can also use the libvirt native event loop impl 31# 32# This setting thus allows 'poll', 'native' or 'asyncio' as valid 33# choices 34# 35event_impl = "poll" 36 37do_debug = False 38 39 40def debug(msg: str) -> None: 41 if do_debug: 42 print(msg) 43 44 45# 46# This general purpose event loop will support waiting for file handle 47# I/O and errors events, as well as scheduling repeatable timers with 48# a fixed interval. 49# 50# It is a pure python implementation based around the poll() API 51# 52class virEventLoopPoll: 53 # This class contains the data we need to track for a 54 # single file handle 55 class virEventLoopPollHandle: 56 def __init__(self, handle: int, fd: int, events: int, cb: _EventCallback, opaque: _T): 57 self.handle = handle 58 self.fd = fd 59 self.events = events 60 self.cb = cb 61 self.opaque = opaque 62 63 def get_id(self) -> int: 64 return self.handle 65 66 def get_fd(self) -> int: 67 return self.fd 68 69 def get_events(self) -> int: 70 return self.events 71 72 def set_events(self, events: int): 73 self.events = events 74 75 def dispatch(self, events: int): 76 self.cb(self.handle, 77 self.fd, 78 events, 79 self.opaque) 80 81 # This class contains the data we need to track for a 82 # single periodic timer 83 class virEventLoopPollTimer: 84 def __init__(self, timer: int, interval: int, cb: _TimerCallback, opaque: _T): 85 self.timer = timer 86 self.interval = interval 87 self.cb = cb 88 self.opaque = opaque 89 self.lastfired = 0 90 91 def get_id(self) -> int: 92 return self.timer 93 94 def get_interval(self) -> int: 95 return self.interval 96 97 def set_interval(self, interval: int): 98 self.interval = interval 99 100 def get_last_fired(self) -> int: 101 return self.lastfired 102 103 def set_last_fired(self, now: int): 104 self.lastfired = now 105 106 def dispatch(self) -> None: 107 self.cb(self.timer, 108 self.opaque) 109 110 def __init__(self): 111 self.poll = select.poll() 112 self.pipetrick = os.pipe() 113 self.pendingWakeup = False 114 self.runningPoll = False 115 self.nextHandleID = 1 116 self.nextTimerID = 1 117 self.handles = [] # type: List[virEventLoopPollHandle] 118 self.timers = [] # type: List[virEventLoopPollTimer] 119 self.cleanup = [] 120 self.quit = False 121 122 # The event loop can be used from multiple threads at once. 123 # Specifically while the main thread is sleeping in poll() 124 # waiting for events to occur, another thread may come along 125 # and add/update/remove a file handle, or timer. When this 126 # happens we need to interrupt the poll() sleep in the other 127 # thread, so that it'll see the file handle / timer changes. 128 # 129 # Using OS level signals for this is very unreliable and 130 # hard to implement correctly. Thus we use the real classic 131 # "self pipe" trick. A anonymous pipe, with one end registered 132 # with the event loop for input events. When we need to force 133 # the main thread out of a poll() sleep, we simple write a 134 # single byte of data to the other end of the pipe. 135 debug("Self pipe watch %d write %d" % (self.pipetrick[0], self.pipetrick[1])) 136 self.poll.register(self.pipetrick[0], select.POLLIN) 137 138 # Calculate when the next timeout is due to occur, returning 139 # the absolute timestamp for the next timeout, or 0 if there is 140 # no timeout due 141 def next_timeout(self) -> int: 142 next = 0 143 for t in self.timers: 144 last = t.get_last_fired() 145 interval = t.get_interval() 146 if interval < 0: 147 continue 148 if next == 0 or (last + interval) < next: 149 next = last + interval 150 151 return next 152 153 # Lookup a virEventLoopPollHandle object based on file descriptor 154 def get_handle_by_fd(self, fd: int) -> Optional[virEventLoopPollHandle]: 155 for h in self.handles: 156 if h.get_fd() == fd: 157 return h 158 return None 159 160 # Lookup a virEventLoopPollHandle object based on its event loop ID 161 def get_handle_by_id(self, handleID: int) -> Optional[virEventLoopPollHandle]: 162 for h in self.handles: 163 if h.get_id() == handleID: 164 return h 165 return None 166 167 # This is the heart of the event loop, performing one single 168 # iteration. It asks when the next timeout is due, and then 169 # calculates the maximum amount of time it is able to sleep 170 # for in poll() pending file handle events. 171 # 172 # It then goes into the poll() sleep. 173 # 174 # When poll() returns, there will zero or more file handle 175 # events which need to be dispatched to registered callbacks 176 # It may also be time to fire some periodic timers. 177 # 178 # Due to the coarse granularity of scheduler timeslices, if 179 # we ask for a sleep of 500ms in order to satisfy a timer, we 180 # may return up to 1 scheduler timeslice early. So even though 181 # our sleep timeout was reached, the registered timer may not 182 # technically be at its expiry point. This leads to us going 183 # back around the loop with a crazy 5ms sleep. So when checking 184 # if timeouts are due, we allow a margin of 20ms, to avoid 185 # these pointless repeated tiny sleeps. 186 def run_once(self) -> None: 187 sleep = -1 # type: float 188 self.runningPoll = True 189 190 for opaque in self.cleanup: 191 libvirt.virEventInvokeFreeCallback(opaque) 192 self.cleanup = [] 193 194 try: 195 next = self.next_timeout() 196 debug("Next timeout due at %d" % next) 197 if next > 0: 198 now = int(time.time() * 1000) 199 if now >= next: 200 sleep = 0 201 else: 202 sleep = (next - now) / 1000.0 203 204 debug("Poll with a sleep of %d" % sleep) 205 events = self.poll.poll(sleep) 206 207 # Dispatch any file handle events that occurred 208 for (fd, revents) in events: 209 # See if the events was from the self-pipe 210 # telling us to wakup. if so, then discard 211 # the data just continue 212 if fd == self.pipetrick[0]: 213 self.pendingWakeup = False 214 os.read(fd, 1) 215 continue 216 217 h = self.get_handle_by_fd(fd) 218 if h: 219 debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) 220 h.dispatch(self.events_from_poll(revents)) 221 222 now = int(time.time() * 1000) 223 for t in self.timers: 224 interval = t.get_interval() 225 if interval < 0: 226 continue 227 228 want = t.get_last_fired() + interval 229 # Deduct 20ms, since scheduler timeslice 230 # means we could be ever so slightly early 231 if now >= want - 20: 232 debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) 233 t.set_last_fired(now) 234 t.dispatch() 235 236 except (os.error, select.error) as e: 237 if e.args[0] != errno.EINTR: 238 raise 239 finally: 240 self.runningPoll = False 241 242 # Actually run the event loop forever 243 def run_loop(self) -> None: 244 self.quit = False 245 while not self.quit: 246 self.run_once() 247 248 def interrupt(self) -> None: 249 if self.runningPoll and not self.pendingWakeup: 250 self.pendingWakeup = True 251 os.write(self.pipetrick[1], 'c'.encode("UTF-8")) 252 253 # Registers a new file handle 'fd', monitoring for 'events' (libvirt 254 # event constants), firing the callback cb() when an event occurs. 255 # Returns a unique integer identier for this handle, that should be 256 # used to later update/remove it 257 def add_handle(self, fd: int, events: int, cb: _EventCallback, opaque: _T) -> int: 258 handleID = self.nextHandleID + 1 259 self.nextHandleID = self.nextHandleID + 1 260 261 h = self.virEventLoopPollHandle(handleID, fd, events, cb, opaque) 262 self.handles.append(h) 263 264 self.poll.register(fd, self.events_to_poll(events)) 265 self.interrupt() 266 267 debug("Add handle %d fd %d events %d" % (handleID, fd, events)) 268 269 return handleID 270 271 # Registers a new timer with periodic expiry at 'interval' ms, 272 # firing cb() each time the timer expires. If 'interval' is -1, 273 # then the timer is registered, but not enabled 274 # Returns a unique integer identier for this handle, that should be 275 # used to later update/remove it 276 def add_timer(self, interval: int, cb: _TimerCallback, opaque: _T) -> int: 277 timerID = self.nextTimerID + 1 278 self.nextTimerID = self.nextTimerID + 1 279 280 h = self.virEventLoopPollTimer(timerID, interval, cb, opaque) 281 self.timers.append(h) 282 self.interrupt() 283 284 debug("Add timer %d interval %d" % (timerID, interval)) 285 286 return timerID 287 288 # Change the set of events to be monitored on the file handle 289 def update_handle(self, handleID: int, events: int) -> None: 290 h = self.get_handle_by_id(handleID) 291 if h: 292 h.set_events(events) 293 self.poll.unregister(h.get_fd()) 294 self.poll.register(h.get_fd(), self.events_to_poll(events)) 295 self.interrupt() 296 297 debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) 298 299 # Change the periodic frequency of the timer 300 def update_timer(self, timerID: int, interval: int) -> None: 301 for h in self.timers: 302 if h.get_id() == timerID: 303 h.set_interval(interval) 304 self.interrupt() 305 306 debug("Update timer %d interval %d" % (timerID, interval)) 307 break 308 309 # Stop monitoring for events on the file handle 310 def remove_handle(self, handleID: int) -> int: 311 handles = [] 312 for h in self.handles: 313 if h.get_id() == handleID: 314 debug("Remove handle %d fd %d" % (handleID, h.get_fd())) 315 self.poll.unregister(h.get_fd()) 316 self.cleanup.append(h.opaque) 317 else: 318 handles.append(h) 319 self.handles = handles 320 self.interrupt() 321 return 0 322 323 # Stop firing the periodic timer 324 def remove_timer(self, timerID: int) -> int: 325 timers = [] 326 for h in self.timers: 327 if h.get_id() != timerID: 328 timers.append(h) 329 else: 330 debug("Remove timer %d" % timerID) 331 self.cleanup.append(h.opaque) 332 self.timers = timers 333 self.interrupt() 334 return 0 335 336 # Convert from libvirt event constants, to poll() events constants 337 def events_to_poll(self, events: int) -> int: 338 ret = 0 339 if events & libvirt.VIR_EVENT_HANDLE_READABLE: 340 ret |= select.POLLIN 341 if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: 342 ret |= select.POLLOUT 343 if events & libvirt.VIR_EVENT_HANDLE_ERROR: 344 ret |= select.POLLERR 345 if events & libvirt.VIR_EVENT_HANDLE_HANGUP: 346 ret |= select.POLLHUP 347 return ret 348 349 # Convert from poll() event constants, to libvirt events constants 350 def events_from_poll(self, events: int) -> int: 351 ret = 0 352 if events & select.POLLIN: 353 ret |= libvirt.VIR_EVENT_HANDLE_READABLE 354 if events & select.POLLOUT: 355 ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE 356 if events & select.POLLNVAL: 357 ret |= libvirt.VIR_EVENT_HANDLE_ERROR 358 if events & select.POLLERR: 359 ret |= libvirt.VIR_EVENT_HANDLE_ERROR 360 if events & select.POLLHUP: 361 ret |= libvirt.VIR_EVENT_HANDLE_HANGUP 362 return ret 363 364 365########################################################################### 366# Now glue an instance of the general event loop into libvirt's event loop 367########################################################################### 368 369# This single global instance of the event loop wil be used for 370# monitoring libvirt events 371eventLoop = virEventLoopPoll() 372 373# This keeps track of what thread is running the event loop, 374# (if it is run in a background thread) 375eventLoopThread = None 376 377 378# These next set of 6 methods are the glue between the official 379# libvirt events API, and our particular impl of the event loop 380# 381# There is no reason why the 'virEventLoopPoll' has to be used. 382# An application could easily may these 6 glue methods hook into 383# another event loop such as GLib's, or something like the python 384# Twisted event framework. 385 386def virEventAddHandleImpl(fd: int, events: int, cb: _EventCallback, opaque: _T) -> int: 387 return eventLoop.add_handle(fd, events, cb, opaque) 388 389 390def virEventUpdateHandleImpl(handleID: int, events: int) -> None: 391 return eventLoop.update_handle(handleID, events) 392 393 394def virEventRemoveHandleImpl(handleID: int) -> int: 395 return eventLoop.remove_handle(handleID) 396 397 398def virEventAddTimerImpl(interval: int, cb: _TimerCallback, opaque: _T) -> int: 399 return eventLoop.add_timer(interval, cb, opaque) 400 401 402def virEventUpdateTimerImpl(timerID: int, interval: int) -> None: 403 return eventLoop.update_timer(timerID, interval) 404 405 406def virEventRemoveTimerImpl(timerID: int) -> int: 407 return eventLoop.remove_timer(timerID) 408 409 410# This tells libvirt what event loop implementation it 411# should use 412def virEventLoopPollRegister() -> None: 413 libvirt.virEventRegisterImpl(virEventAddHandleImpl, 414 virEventUpdateHandleImpl, 415 virEventRemoveHandleImpl, 416 virEventAddTimerImpl, 417 virEventUpdateTimerImpl, 418 virEventRemoveTimerImpl) 419 420 421# Directly run the event loop in the current thread 422def virEventLoopPollRun() -> None: 423 eventLoop.run_loop() 424 425 426def virEventLoopAIORun(loop) -> None: 427 import asyncio 428 asyncio.set_event_loop(loop) 429 loop.run_forever() 430 431 432def virEventLoopNativeRun() -> None: 433 while True: 434 libvirt.virEventRunDefaultImpl() 435 436 437# Spawn a background thread to run the event loop 438def virEventLoopPollStart() -> None: 439 global eventLoopThread 440 virEventLoopPollRegister() 441 eventLoopThread = threading.Thread(target=virEventLoopPollRun, name="libvirtEventLoop") 442 eventLoopThread.setDaemon(True) 443 eventLoopThread.start() 444 445 446def virEventLoopAIOStart() -> None: 447 global eventLoopThread 448 import libvirtaio 449 import asyncio 450 loop = asyncio.new_event_loop() 451 libvirtaio.virEventRegisterAsyncIOImpl(loop=loop) 452 eventLoopThread = threading.Thread(target=virEventLoopAIORun, args=(loop,), name="libvirtEventLoop") 453 eventLoopThread.setDaemon(True) 454 eventLoopThread.start() 455 456 457def virEventLoopNativeStart() -> None: 458 global eventLoopThread 459 libvirt.virEventRegisterDefaultImpl() 460 eventLoopThread = threading.Thread(target=virEventLoopNativeRun, name="libvirtEventLoop") 461 eventLoopThread.setDaemon(True) 462 eventLoopThread.start() 463 464 465########################################################################## 466# Everything that now follows is a simple demo of domain lifecycle events 467########################################################################## 468class Description(object): 469 __slots__ = ('desc', 'args') 470 471 def __init__(self, *args, **kwargs) -> None: 472 self.desc = kwargs.get('desc') 473 self.args = args 474 475 def __str__(self) -> str: 476 return self.desc or '' 477 478 def __getitem__(self, item: int) -> 'Description': 479 try: 480 data = self.args[item] 481 except IndexError: 482 return self.__class__(desc=str(item)) 483 484 if isinstance(data, str): 485 return self.__class__(desc=data) 486 elif isinstance(data, (list, tuple)): 487 desc, args = data 488 return self.__class__(*args, desc=desc) 489 490 raise TypeError(args) 491 492 493DOM_EVENTS = Description( 494 ("Defined", ("Added", "Updated", "Renamed", "Snapshot")), 495 ("Undefined", ("Removed", "Renamed")), 496 ("Started", ("Booted", "Migrated", "Restored", "Snapshot", "Wakeup")), 497 ("Suspended", ("Paused", "Migrated", "IOError", "Watchdog", "Restored", "Snapshot", "API error", "Postcopy", "Postcopy failed")), 498 ("Resumed", ("Unpaused", "Migrated", "Snapshot", "Postcopy")), 499 ("Stopped", ("Shutdown", "Destroyed", "Crashed", "Migrated", "Saved", "Failed", "Snapshot", "Daemon")), 500 ("Shutdown", ("Finished", "On guest request", "On host request")), 501 ("PMSuspended", ("Memory", "Disk")), 502 ("Crashed", ("Panicked",)), 503) 504BLOCK_JOB_TYPES = Description("unknown", "Pull", "Copy", "Commit", "ActiveCommit") 505BLOCK_JOB_STATUS = Description("Completed", "Failed", "Canceled", "Ready") 506WATCHDOG_ACTIONS = Description("none", "Pause", "Reset", "Poweroff", "Shutdown", "Debug", "Inject NMI") 507ERROR_EVENTS = Description("None", "Pause", "Report") 508AGENT_STATES = Description("unknown", "connected", "disconnected") 509AGENT_REASONS = Description("unknown", "domain started", "channel event") 510GRAPHICS_PHASES = Description("Connect", "Initialize", "Disconnect") 511DISK_EVENTS = Description("Change missing on start", "Drop missing on start") 512TRAY_EVENTS = Description("Opened", "Closed") 513 514 515def myDomainEventCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, event: int, detail: int, opaque: _T) -> None: 516 print("myDomainEventCallback%s EVENT: Domain %s(%s) %s %s" % ( 517 opaque, dom.name(), dom.ID(), DOM_EVENTS[event], DOM_EVENTS[event][detail])) 518 519 520def myDomainEventRebootCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, opaque: _T) -> None: 521 print("myDomainEventRebootCallback: Domain %s(%s)" % ( 522 dom.name(), dom.ID())) 523 524 525def myDomainEventRTCChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, utcoffset: int, opaque: _T) -> None: 526 print("myDomainEventRTCChangeCallback: Domain %s(%s) %d" % ( 527 dom.name(), dom.ID(), utcoffset)) 528 529 530def myDomainEventWatchdogCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, action: int, opaque: _T) -> None: 531 print("myDomainEventWatchdogCallback: Domain %s(%s) %s" % ( 532 dom.name(), dom.ID(), WATCHDOG_ACTIONS[action])) 533 534 535def myDomainEventIOErrorCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, srcpath: str, devalias: str, action: int, opaque: _T) -> None: 536 print("myDomainEventIOErrorCallback: Domain %s(%s) %s %s %s" % ( 537 dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action])) 538 539 540def myDomainEventIOErrorReasonCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, srcpath: str, devalias: str, action: int, reason: int, opaque: _T) -> None: 541 print("myDomainEventIOErrorReasonCallback: Domain %s(%s) %s %s %s %s" % ( 542 dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action], reason)) 543 544 545def myDomainEventGraphicsCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, phase: int, localAddr: str, remoteAddr: str, authScheme: str, subject: str, opaque: _T) -> None: 546 print("myDomainEventGraphicsCallback: Domain %s(%s) %s %s" % ( 547 dom.name(), dom.ID(), GRAPHICS_PHASES[phase], authScheme)) 548 549 550def myDomainEventControlErrorCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, opaque: _T) -> None: 551 print("myDomainEventControlErrorCallback: Domain %s(%s)" % ( 552 dom.name(), dom.ID())) 553 554 555def myDomainEventBlockJobCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, disk, type: int, status: int, opaque: _T) -> None: 556 print("myDomainEventBlockJobCallback: Domain %s(%s) %s on disk %s %s" % ( 557 dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status])) 558 559 560def myDomainEventDiskChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, oldSrcPath: str, newSrcPath: str, devAlias: str, reason: int, opaque: _T) -> None: 561 print("myDomainEventDiskChangeCallback: Domain %s(%s) disk change oldSrcPath: %s newSrcPath: %s devAlias: %s reason: %s" % ( 562 dom.name(), dom.ID(), oldSrcPath, newSrcPath, devAlias, DISK_EVENTS[reason])) 563 564 565def myDomainEventTrayChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, devAlias: str, reason: int, opaque: _T) -> None: 566 print("myDomainEventTrayChangeCallback: Domain %s(%s) tray change devAlias: %s reason: %s" % ( 567 dom.name(), dom.ID(), devAlias, TRAY_EVENTS[reason])) 568 569 570def myDomainEventPMWakeupCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None: 571 print("myDomainEventPMWakeupCallback: Domain %s(%s) system pmwakeup" % ( 572 dom.name(), dom.ID())) 573 574 575def myDomainEventPMSuspendCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None: 576 print("myDomainEventPMSuspendCallback: Domain %s(%s) system pmsuspend" % ( 577 dom.name(), dom.ID())) 578 579 580def myDomainEventBalloonChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, actual: int, opaque: _T) -> None: 581 print("myDomainEventBalloonChangeCallback: Domain %s(%s) %d" % ( 582 dom.name(), dom.ID(), actual)) 583 584 585def myDomainEventPMSuspendDiskCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None: 586 print("myDomainEventPMSuspendDiskCallback: Domain %s(%s) system pmsuspend_disk" % ( 587 dom.name(), dom.ID())) 588 589 590def myDomainEventDeviceRemovedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None: 591 print("myDomainEventDeviceRemovedCallback: Domain %s(%s) device removed: %s" % ( 592 dom.name(), dom.ID(), dev)) 593 594 595def myDomainEventBlockJob2Callback(conn: libvirt.virConnect, dom: libvirt.virDomain, disk: str, type: int, status: int, opaque: _T) -> None: 596 print("myDomainEventBlockJob2Callback: Domain %s(%s) %s on disk %s %s" % ( 597 dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status])) 598 599 600def myDomainEventTunableCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, params: Dict[str, Any], opaque: _T) -> None: 601 print("myDomainEventTunableCallback: Domain %s(%s) %s" % ( 602 dom.name(), dom.ID(), params)) 603 604 605def myDomainEventAgentLifecycleCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, state: int, reason: int, opaque: _T) -> None: 606 print("myDomainEventAgentLifecycleCallback: Domain %s(%s) %s %s" % ( 607 dom.name(), dom.ID(), AGENT_STATES[state], AGENT_REASONS[reason])) 608 609 610def myDomainEventDeviceAddedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None: 611 print("myDomainEventDeviceAddedCallback: Domain %s(%s) device added: %s" % ( 612 dom.name(), dom.ID(), dev)) 613 614 615def myDomainEventMigrationIteration(conn: libvirt.virConnect, dom: libvirt.virDomain, iteration: int, opaque: _T) -> None: 616 print("myDomainEventMigrationIteration: Domain %s(%s) started migration iteration %d" % ( 617 dom.name(), dom.ID(), iteration)) 618 619 620def myDomainEventJobCompletedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, params: Dict[str, Any], opaque: _T) -> None: 621 print("myDomainEventJobCompletedCallback: Domain %s(%s) %s" % ( 622 dom.name(), dom.ID(), params)) 623 624 625def myDomainEventDeviceRemovalFailedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None: 626 print("myDomainEventDeviceRemovalFailedCallback: Domain %s(%s) failed to remove device: %s" % ( 627 dom.name(), dom.ID(), dev)) 628 629 630def myDomainEventMetadataChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, mtype: int, nsuri: str, opaque: _T) -> None: 631 print("myDomainEventMetadataChangeCallback: Domain %s(%s) changed metadata mtype=%d nsuri=%s" % ( 632 dom.name(), dom.ID(), mtype, nsuri)) 633 634 635def myDomainEventBlockThresholdCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, path: str, threshold: int, excess: int, opaque: _T) -> None: 636 print("myDomainEventBlockThresholdCallback: Domain %s(%s) block device %s(%s) threshold %d exceeded by %d" % ( 637 dom.name(), dom.ID(), dev, path, threshold, excess)) 638 639def myDomainEventMemoryFailureCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, recipient: int, action: int, flags: int, opaque: _T) -> None: 640 print("myDomainEventMemoryFailureCallback: Domain %s(%s) memory failure recipient %d action %d flags %d" % ( 641 dom.name(), dom.ID(), recipient, action, flags)) 642 643def myDomainEventMemoryDeviceSizeChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, recipient: int, action: int, flags: int, opaque: _T) -> None: 644 print("myDomainEventMemoryDeviceSizeChangeCallback: Domain %s(%s) memory device size change alias %s size %d" % ( 645 dom.name(), dom.ID(), alias, size)) 646 647 648########################################################################## 649# Network events 650########################################################################## 651NET_EVENTS = Description( 652 ("Defined", ("Added",)), 653 ("Undefined", ("Removed",)), 654 ("Started", ("Started",)), 655 ("Stopped", ("Stopped",)), 656) 657 658 659def myNetworkEventLifecycleCallback(conn: libvirt.virConnect, net: libvirt.virNetwork, event: int, detail: int, opaque: _T) -> None: 660 print("myNetworkEventLifecycleCallback: Network %s %s %s" % ( 661 net.name(), NET_EVENTS[event], NET_EVENTS[event][detail])) 662 663 664########################################################################## 665# Storage pool events 666########################################################################## 667STORAGE_EVENTS = Description( 668 ("Defined", ()), 669 ("Undefined", ()), 670 ("Started", ()), 671 ("Stopped", ()), 672 ("Created", ()), 673 ("Deleted", ()), 674) 675 676 677def myStoragePoolEventLifecycleCallback(conn: libvirt.virConnect, pool: libvirt.virStoragePool, event: int, detail: int, opaque: _T) -> None: 678 print("myStoragePoolEventLifecycleCallback: Storage pool %s %s %s" % ( 679 pool.name(), STORAGE_EVENTS[event], STORAGE_EVENTS[event][detail])) 680 681 682def myStoragePoolEventRefreshCallback(conn: libvirt.virConnect, pool: libvirt.virStoragePool, opaque: _T) -> None: 683 print("myStoragePoolEventRefreshCallback: Storage pool %s" % pool.name()) 684 685 686########################################################################## 687# Node device events 688########################################################################## 689DEVICE_EVENTS = Description( 690 ("Created", ()), 691 ("Deleted", ()), 692) 693 694 695def myNodeDeviceEventLifecycleCallback(conn: libvirt.virConnect, dev: libvirt.virNodeDevice, event: int, detail: int, opaque: _T) -> None: 696 print("myNodeDeviceEventLifecycleCallback: Node device %s %s %s" % ( 697 dev.name(), DEVICE_EVENTS[event], DEVICE_EVENTS[event][detail])) 698 699 700def myNodeDeviceEventUpdateCallback(conn: libvirt.virConnect, dev: libvirt.virNodeDevice, opaque: _T) -> None: 701 print("myNodeDeviceEventUpdateCallback: Node device %s" % dev.name()) 702 703 704########################################################################## 705# Secret events 706########################################################################## 707SECRET_EVENTS = Description( 708 ("Defined", ()), 709 ("Undefined", ()), 710) 711 712 713def mySecretEventLifecycleCallback(conn: libvirt.virConnect, secret: libvirt.virSecret, event: int, detail: int, opaque: _T) -> None: 714 print("mySecretEventLifecycleCallback: Secret %s %s %s" % ( 715 secret.UUIDString(), SECRET_EVENTS[event], SECRET_EVENTS[event][detail])) 716 717 718def mySecretEventValueChanged(conn: libvirt.virConnect, secret: libvirt.virSecret, opaque: _T) -> None: 719 print("mySecretEventValueChanged: Secret %s" % secret.UUIDString()) 720 721 722########################################################################## 723# Set up and run the program 724########################################################################## 725 726run = True 727CONNECTION_EVENTS = Description("Error", "End-of-file", "Keepalive", "Client") 728 729 730def myConnectionCloseCallback(conn: libvirt.virConnect, reason: int, opaque: _T) -> None: 731 print("myConnectionCloseCallback: %s: %s" % ( 732 conn.getURI(), CONNECTION_EVENTS[reason])) 733 global run 734 run = False 735 736 737def main() -> None: 738 parser = ArgumentParser() 739 parser.add_argument("--debug", "-d", action="store_true", help="Print debug output") 740 parser.add_argument("--loop", "-l", choices=("native", "poll", "asyncio"), default=event_impl, help="Choose event-loop-implementation") 741 parser.add_argument("--timeout", type=int, default=None, help="Quit after SECS seconds running") 742 parser.add_argument("uri", nargs="?", default="qemu:///system") 743 args = parser.parse_args() 744 745 if args.debug: 746 global do_debug 747 do_debug = True 748 749 print("Using uri '%s' and event loop '%s'" % (args.uri, args.loop)) 750 751 # Run a background thread with the event loop 752 if args.loop == "poll": 753 virEventLoopPollStart() 754 elif args.loop == "asyncio": 755 virEventLoopAIOStart() 756 else: 757 virEventLoopNativeStart() 758 759 vc = libvirt.openReadOnly(args.uri) 760 761 # Close connection on exit (to test cleanup paths) 762 def exit() -> None: 763 print("Closing " + vc.getURI()) 764 if run: 765 vc.close() 766 767 atexit.register(exit) 768 769 vc.registerCloseCallback(myConnectionCloseCallback, None) 770 771 # Add 2 lifecycle callbacks to prove this works with more than just one 772 vc.domainEventRegister(myDomainEventCallback, 1) 773 domcallbacks = [ 774 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, myDomainEventCallback, 2), 775 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, myDomainEventRebootCallback, None), 776 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE, myDomainEventRTCChangeCallback, None), 777 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, myDomainEventWatchdogCallback, None), 778 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR, myDomainEventIOErrorCallback, None), 779 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, myDomainEventGraphicsCallback, None), 780 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON, myDomainEventIOErrorReasonCallback, None), 781 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR, myDomainEventControlErrorCallback, None), 782 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB, myDomainEventBlockJobCallback, None), 783 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE, myDomainEventDiskChangeCallback, None), 784 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_TRAY_CHANGE, myDomainEventTrayChangeCallback, None), 785 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMWAKEUP, myDomainEventPMWakeupCallback, None), 786 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMSUSPEND, myDomainEventPMSuspendCallback, None), 787 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_BALLOON_CHANGE, myDomainEventBalloonChangeCallback, None), 788 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMSUSPEND_DISK, myDomainEventPMSuspendDiskCallback, None), 789 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED, myDomainEventDeviceRemovedCallback, None), 790 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2, myDomainEventBlockJob2Callback, None), 791 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_TUNABLE, myDomainEventTunableCallback, None), 792 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_AGENT_LIFECYCLE, myDomainEventAgentLifecycleCallback, None), 793 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED, myDomainEventDeviceAddedCallback, None), 794 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_MIGRATION_ITERATION, myDomainEventMigrationIteration, None), 795 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_JOB_COMPLETED, myDomainEventJobCompletedCallback, None), 796 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVAL_FAILED, myDomainEventDeviceRemovalFailedCallback, None), 797 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_METADATA_CHANGE, myDomainEventMetadataChangeCallback, None), 798 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_THRESHOLD, myDomainEventBlockThresholdCallback, None), 799 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_MEMORY_FAILURE, myDomainEventMemoryFailureCallback, None), 800 vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_MEMORY_DEVICE_SIZE_CHANGE, myDomainEventMemoryDeviceSizeChangeCallback, None), 801 ] 802 803 netcallbacks = [ 804 vc.networkEventRegisterAny(None, libvirt.VIR_NETWORK_EVENT_ID_LIFECYCLE, myNetworkEventLifecycleCallback, None), 805 ] 806 807 poolcallbacks = [ 808 vc.storagePoolEventRegisterAny(None, libvirt.VIR_STORAGE_POOL_EVENT_ID_LIFECYCLE, myStoragePoolEventLifecycleCallback, None), 809 vc.storagePoolEventRegisterAny(None, libvirt.VIR_STORAGE_POOL_EVENT_ID_REFRESH, myStoragePoolEventRefreshCallback, None), 810 ] 811 812 devcallbacks = [ 813 vc.nodeDeviceEventRegisterAny(None, libvirt.VIR_NODE_DEVICE_EVENT_ID_LIFECYCLE, myNodeDeviceEventLifecycleCallback, None), 814 vc.nodeDeviceEventRegisterAny(None, libvirt.VIR_NODE_DEVICE_EVENT_ID_UPDATE, myNodeDeviceEventUpdateCallback, None), 815 ] 816 817 seccallbacks = [ 818 vc.secretEventRegisterAny(None, libvirt.VIR_SECRET_EVENT_ID_LIFECYCLE, mySecretEventLifecycleCallback, None), 819 vc.secretEventRegisterAny(None, libvirt.VIR_SECRET_EVENT_ID_VALUE_CHANGED, mySecretEventValueChanged, None), 820 ] 821 822 vc.setKeepAlive(5, 3) 823 824 # The rest of your app would go here normally, but for sake 825 # of demo we'll just go to sleep. The other option is to 826 # run the event loop in your main thread if your app is 827 # totally event based. 828 count = 0 829 while run and (args.timeout is None or count < args.timeout): 830 count = count + 1 831 time.sleep(1) 832 833 # If the connection was closed, we cannot unregister anything. 834 # Just abort now. 835 if not run: 836 return 837 838 vc.domainEventDeregister(myDomainEventCallback) 839 840 for id in seccallbacks: 841 vc.secretEventDeregisterAny(id) 842 for id in devcallbacks: 843 vc.nodeDeviceEventDeregisterAny(id) 844 for id in poolcallbacks: 845 vc.storagePoolEventDeregisterAny(id) 846 for id in netcallbacks: 847 vc.networkEventDeregisterAny(id) 848 for id in domcallbacks: 849 vc.domainEventDeregisterAny(id) 850 851 vc.unregisterCloseCallback() 852 vc.close() 853 854 # Allow delayed event loop cleanup to run, just for sake of testing 855 time.sleep(2) 856 857 858if __name__ == "__main__": 859 main() 860