1import gevent 2import time 3from gevent.event import AsyncResult 4 5 6class Noparallel(object): # Only allow function running once in same time 7 8 def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False): 9 self.threads = {} 10 self.blocking = blocking # Blocking: Acts like normal function else thread returned 11 self.queue = queue 12 self.queued = False 13 self.ignore_args = ignore_args 14 self.ignore_class = ignore_class 15 16 def __call__(self, func): 17 def wrapper(*args, **kwargs): 18 if self.ignore_class: 19 key = func # Unique key only by function and class object 20 elif self.ignore_args: 21 key = (func, args[0]) # Unique key only by function and class object 22 else: 23 key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters 24 if key in self.threads: # Thread already running (if using blocking mode) 25 if self.queue: 26 self.queued = True 27 thread = self.threads[key] 28 if self.blocking: 29 if self.queued: 30 res = thread.get() # Blocking until its finished 31 if key in self.threads: 32 return self.threads[key].get() # Queue finished since started running 33 self.queued = False 34 return wrapper(*args, **kwargs) # Run again after the end 35 else: 36 return thread.get() # Return the value 37 38 else: # No blocking 39 if thread.ready(): # Its finished, create a new 40 thread = gevent.spawn(func, *args, **kwargs) 41 self.threads[key] = thread 42 return thread 43 else: # Still running 44 return thread 45 else: # Thread not running 46 if self.blocking: # Wait for finish 47 asyncres = AsyncResult() 48 self.threads[key] = asyncres 49 try: 50 res = func(*args, **kwargs) 51 asyncres.set(res) 52 self.cleanup(key, asyncres) 53 return res 54 except Exception as err: 55 asyncres.set_exception(err) 56 self.cleanup(key, asyncres) 57 raise(err) 58 else: # No blocking just return the thread 59 thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread 60 thread.link(lambda thread: self.cleanup(key, thread)) 61 self.threads[key] = thread 62 return thread 63 wrapper.__name__ = func.__name__ 64 65 return wrapper 66 67 # Cleanup finished threads 68 def cleanup(self, key, thread): 69 if key in self.threads: 70 del(self.threads[key]) 71 72 73if __name__ == "__main__": 74 75 76 class Test(): 77 78 @Noparallel() 79 def count(self, num=5): 80 for i in range(num): 81 print(self, i) 82 time.sleep(1) 83 return "%s return:%s" % (self, i) 84 85 class TestNoblock(): 86 87 @Noparallel(blocking=False) 88 def count(self, num=5): 89 for i in range(num): 90 print(self, i) 91 time.sleep(1) 92 return "%s return:%s" % (self, i) 93 94 def testBlocking(): 95 test = Test() 96 test2 = Test() 97 print("Counting...") 98 print("Creating class1/thread1") 99 thread1 = gevent.spawn(test.count) 100 print("Creating class1/thread2 (ignored)") 101 thread2 = gevent.spawn(test.count) 102 print("Creating class2/thread3") 103 thread3 = gevent.spawn(test2.count) 104 105 print("Joining class1/thread1") 106 thread1.join() 107 print("Joining class1/thread2") 108 thread2.join() 109 print("Joining class2/thread3") 110 thread3.join() 111 112 print("Creating class1/thread4 (its finished, allowed again)") 113 thread4 = gevent.spawn(test.count) 114 print("Joining thread4") 115 thread4.join() 116 117 print(thread1.value, thread2.value, thread3.value, thread4.value) 118 print("Done.") 119 120 def testNoblocking(): 121 test = TestNoblock() 122 test2 = TestNoblock() 123 print("Creating class1/thread1") 124 thread1 = test.count() 125 print("Creating class1/thread2 (ignored)") 126 thread2 = test.count() 127 print("Creating class2/thread3") 128 thread3 = test2.count() 129 print("Joining class1/thread1") 130 thread1.join() 131 print("Joining class1/thread2") 132 thread2.join() 133 print("Joining class2/thread3") 134 thread3.join() 135 136 print("Creating class1/thread4 (its finished, allowed again)") 137 thread4 = test.count() 138 print("Joining thread4") 139 thread4.join() 140 141 print(thread1.value, thread2.value, thread3.value, thread4.value) 142 print("Done.") 143 144 def testBenchmark(): 145 import time 146 147 def printThreadNum(): 148 import gc 149 from greenlet import greenlet 150 objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] 151 print("Greenlets: %s" % len(objs)) 152 153 printThreadNum() 154 test = TestNoblock() 155 s = time.time() 156 for i in range(3): 157 gevent.spawn(test.count, i + 1) 158 print("Created in %.3fs" % (time.time() - s)) 159 printThreadNum() 160 time.sleep(5) 161 162 def testException(): 163 import time 164 @Noparallel(blocking=True, queue=True) 165 def count(self, num=5): 166 s = time.time() 167 # raise Exception("err") 168 for i in range(num): 169 print(self, i) 170 time.sleep(1) 171 return "%s return:%s" % (s, i) 172 def caller(): 173 try: 174 print("Ret:", count(5)) 175 except Exception as err: 176 print("Raised:", repr(err)) 177 178 gevent.joinall([ 179 gevent.spawn(caller), 180 gevent.spawn(caller), 181 gevent.spawn(caller), 182 gevent.spawn(caller) 183 ]) 184 185 186 from gevent import monkey 187 monkey.patch_all() 188 189 testException() 190 191 """ 192 testBenchmark() 193 print("Testing blocking mode...") 194 testBlocking() 195 print("Testing noblocking mode...") 196 testNoblocking() 197 """ 198