1""" 2 tests.support.events 3 ~~~~~~~~~~~~~~~~~~~~ 4""" 5 6 7import multiprocessing 8import os 9import time 10from contextlib import contextmanager 11 12import salt.utils.event 13from salt.utils.process import clean_proc 14 15 16@contextmanager 17def eventpublisher_process(sock_dir): 18 proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir}) 19 proc.start() 20 try: 21 if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None: 22 # Travis is slow 23 time.sleep(10) 24 else: 25 time.sleep(8) 26 yield 27 finally: 28 clean_proc(proc) 29 30 31class EventSender(multiprocessing.Process): 32 def __init__(self, data, tag, wait, sock_dir): 33 super().__init__() 34 self.data = data 35 self.tag = tag 36 self.wait = wait 37 self.sock_dir = sock_dir 38 39 def run(self): 40 with salt.utils.event.MasterEvent(self.sock_dir, listen=False) as me: 41 time.sleep(self.wait) 42 me.fire_event(self.data, self.tag) 43 # Wait a few seconds before tearing down the zmq context 44 if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None: 45 # Travis is slow 46 time.sleep(10) 47 else: 48 time.sleep(2) 49 50 51@contextmanager 52def eventsender_process(data, tag, sock_dir, wait=0): 53 proc = EventSender(data, tag, wait, sock_dir=sock_dir) 54 proc.start() 55 try: 56 yield 57 finally: 58 clean_proc(proc) 59