1import gevent
2import time
3from gevent.event import AsyncResult
4
5
6class Noparallel(object):  # Only allow function running once in same time
7
8    def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False):
9        self.threads = {}
10        self.blocking = blocking  # Blocking: Acts like normal function else thread returned
11        self.queue = queue
12        self.queued = False
13        self.ignore_args = ignore_args
14        self.ignore_class = ignore_class
15
16    def __call__(self, func):
17        def wrapper(*args, **kwargs):
18            if self.ignore_class:
19                key = func  # Unique key only by function and class object
20            elif self.ignore_args:
21                key = (func, args[0])  # Unique key only by function and class object
22            else:
23                key = (func, tuple(args), str(kwargs))  # Unique key for function including parameters
24            if key in self.threads:  # Thread already running (if using blocking mode)
25                if self.queue:
26                    self.queued = True
27                thread = self.threads[key]
28                if self.blocking:
29                    if self.queued:
30                        res = thread.get()  # Blocking until its finished
31                        if key in self.threads:
32                            return self.threads[key].get()  # Queue finished since started running
33                        self.queued = False
34                        return wrapper(*args, **kwargs)  # Run again after the end
35                    else:
36                        return thread.get()  # Return the value
37
38                else:  # No blocking
39                    if thread.ready():  # Its finished, create a new
40                        thread = gevent.spawn(func, *args, **kwargs)
41                        self.threads[key] = thread
42                        return thread
43                    else:  # Still running
44                        return thread
45            else:  # Thread not running
46                if self.blocking:  # Wait for finish
47                    asyncres = AsyncResult()
48                    self.threads[key] = asyncres
49                    try:
50                        res = func(*args, **kwargs)
51                        asyncres.set(res)
52                        self.cleanup(key, asyncres)
53                        return res
54                    except Exception as err:
55                        asyncres.set_exception(err)
56                        self.cleanup(key, asyncres)
57                        raise(err)
58                else:  # No blocking just return the thread
59                    thread = gevent.spawn(func, *args, **kwargs)  # Spawning new thread
60                    thread.link(lambda thread: self.cleanup(key, thread))
61                    self.threads[key] = thread
62                    return thread
63        wrapper.__name__ = func.__name__
64
65        return wrapper
66
67    # Cleanup finished threads
68    def cleanup(self, key, thread):
69        if key in self.threads:
70            del(self.threads[key])
71
72
73if __name__ == "__main__":
74
75
76    class Test():
77
78        @Noparallel()
79        def count(self, num=5):
80            for i in range(num):
81                print(self, i)
82                time.sleep(1)
83            return "%s return:%s" % (self, i)
84
85    class TestNoblock():
86
87        @Noparallel(blocking=False)
88        def count(self, num=5):
89            for i in range(num):
90                print(self, i)
91                time.sleep(1)
92            return "%s return:%s" % (self, i)
93
94    def testBlocking():
95        test = Test()
96        test2 = Test()
97        print("Counting...")
98        print("Creating class1/thread1")
99        thread1 = gevent.spawn(test.count)
100        print("Creating class1/thread2 (ignored)")
101        thread2 = gevent.spawn(test.count)
102        print("Creating class2/thread3")
103        thread3 = gevent.spawn(test2.count)
104
105        print("Joining class1/thread1")
106        thread1.join()
107        print("Joining class1/thread2")
108        thread2.join()
109        print("Joining class2/thread3")
110        thread3.join()
111
112        print("Creating class1/thread4 (its finished, allowed again)")
113        thread4 = gevent.spawn(test.count)
114        print("Joining thread4")
115        thread4.join()
116
117        print(thread1.value, thread2.value, thread3.value, thread4.value)
118        print("Done.")
119
120    def testNoblocking():
121        test = TestNoblock()
122        test2 = TestNoblock()
123        print("Creating class1/thread1")
124        thread1 = test.count()
125        print("Creating class1/thread2 (ignored)")
126        thread2 = test.count()
127        print("Creating class2/thread3")
128        thread3 = test2.count()
129        print("Joining class1/thread1")
130        thread1.join()
131        print("Joining class1/thread2")
132        thread2.join()
133        print("Joining class2/thread3")
134        thread3.join()
135
136        print("Creating class1/thread4 (its finished, allowed again)")
137        thread4 = test.count()
138        print("Joining thread4")
139        thread4.join()
140
141        print(thread1.value, thread2.value, thread3.value, thread4.value)
142        print("Done.")
143
144    def testBenchmark():
145        import time
146
147        def printThreadNum():
148            import gc
149            from greenlet import greenlet
150            objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
151            print("Greenlets: %s" % len(objs))
152
153        printThreadNum()
154        test = TestNoblock()
155        s = time.time()
156        for i in range(3):
157            gevent.spawn(test.count, i + 1)
158        print("Created in %.3fs" % (time.time() - s))
159        printThreadNum()
160        time.sleep(5)
161
162    def testException():
163        import time
164        @Noparallel(blocking=True, queue=True)
165        def count(self, num=5):
166            s = time.time()
167            # raise Exception("err")
168            for i in range(num):
169                print(self, i)
170                time.sleep(1)
171            return "%s return:%s" % (s, i)
172        def caller():
173            try:
174                print("Ret:", count(5))
175            except Exception as err:
176                print("Raised:", repr(err))
177
178        gevent.joinall([
179            gevent.spawn(caller),
180            gevent.spawn(caller),
181            gevent.spawn(caller),
182            gevent.spawn(caller)
183        ])
184
185
186    from gevent import monkey
187    monkey.patch_all()
188
189    testException()
190
191    """
192    testBenchmark()
193    print("Testing blocking mode...")
194    testBlocking()
195    print("Testing noblocking mode...")
196    testNoblocking()
197    """
198