1 static int NextID = 0;
2 static pe_ring AllWatchers;
3 static struct pe_watcher_vtbl pe_watcher_base_vtbl;
4
pe_watcher_init(pe_watcher * ev,HV * stash,SV * temple)5 static void pe_watcher_init(pe_watcher *ev, HV *stash, SV *temple) {
6 STRLEN n_a;
7 assert(ev);
8 assert(ev->vtbl);
9 if (!ev->vtbl->stash)
10 croak("sub-class VTBL must have a stash (doesn't!)");
11 if (!ev->vtbl->did_require) {
12 SV *tmp;
13 char *name = HvNAME(ev->vtbl->stash);
14 dTHX;
15 if (memEQ(name, "Event::", 7))
16 name += 7;
17 tmp = sv_2mortal(newSVpvf("Event/%s.pm", name));
18 perl_require_pv(SvPV(tmp, n_a));
19 if (sv_true(ERRSV))
20 croak("Event: could not load perl support code for Event::%s: %s",
21 name, SvPV(ERRSV,n_a));
22 ++ev->vtbl->did_require;
23 }
24 /* if we have a non-default stash then we need to save it! */
25 ev->mysv = stash || temple ? wrap_watcher(ev, stash, temple) : 0;
26 PE_RING_INIT(&ev->all, ev);
27 PE_RING_INIT(&ev->events, 0);
28
29 /* no exceptions after this point */
30
31 PE_RING_UNSHIFT(&ev->all, &AllWatchers);
32 WaFLAGS(ev) = 0;
33 WaINVOKE1_on(ev);
34 WaREENTRANT_on(ev);
35 ev->FALLBACK = 0;
36 NextID = (NextID+1) & 0x7fff; /* make it look like the kernel :-, */
37 ev->refcnt = 0;
38 ev->desc = newSVpvn("??",2);
39 ev->running = 0;
40 ev->max_cb_tm = 1; /* make default configurable? */
41 ev->cbtime = 0;
42 ev->prio = PE_QUEUES;
43 ev->callback = 0;
44 ev->ext_data = 0;
45 ev->stats = 0;
46 }
47
pe_watcher_cancel_events(pe_watcher * wa)48 static void pe_watcher_cancel_events(pe_watcher *wa) {
49 pe_event *ev;
50 while (!PE_RING_EMPTY(&wa->events)) {
51 pe_ring *lk = wa->events.prev;
52 ev = (pe_event*) lk->self;
53 dequeEvent(ev);
54 pe_event_release(ev);
55 }
56 }
57
pe_watcher_dtor(pe_watcher * wa)58 static void pe_watcher_dtor(pe_watcher *wa) {
59 STRLEN n_a;
60 assert(WaCANDESTROY(wa));
61 if (WaDESTROYED(wa)) {
62 warn("Attempt to destroy watcher 0x%x again (ignored)", wa);
63 return;
64 }
65 WaDESTROYED_on(wa);
66 if (WaDEBUGx(wa) >= 3)
67 warn("Watcher '%s' destroyed", SvPV(wa->desc, n_a));
68 assert(PE_RING_EMPTY(&wa->events));
69 if (WaPERLCB(wa))
70 SvREFCNT_dec(wa->callback);
71 if (wa->FALLBACK)
72 SvREFCNT_dec(wa->FALLBACK);
73 if (wa->desc)
74 SvREFCNT_dec(wa->desc);
75 if (wa->stats)
76 Estat.dtor(wa->stats);
77 /* safefree(wa); do it yourself */
78 }
79
80 /********************************** *******************************/
81
WKEYMETH(_watcher_callback)82 WKEYMETH(_watcher_callback) {
83 if (nval) {
84 AV *av;
85 SV *sv;
86 SV *old=0;
87 if (WaPERLCB(ev))
88 old = (SV*) ev->callback;
89 if (!SvOK(nval)) {
90 WaPERLCB_off(ev);
91 ev->callback = 0;
92 ev->ext_data = 0;
93 pe_watcher_stop(ev, 0);
94 } else if (SvROK(nval) && (SvTYPE(sv=SvRV(nval)) == SVt_PVCV)) {
95 WaPERLCB_on(ev);
96 ev->callback = SvREFCNT_inc(nval);
97 } else if (SvROK(nval) &&
98 (SvTYPE(av=(AV*)SvRV(nval)) == SVt_PVAV) &&
99 av_len(av) == 1) {
100 /* method lookup code adapted from universal.c */
101 STRLEN n_a;
102 SV *pkgsv = *av_fetch(av, 0, 0);
103 HV *pkg = NULL;
104 SV *namesv = *av_fetch(av, 1, 0);
105 char *name = SvPV(namesv, n_a);
106 int ok=0;
107 if(SvROK(pkgsv)) {
108 pkgsv = (SV*)SvRV(pkgsv);
109 if(SvOBJECT(pkgsv))
110 pkg = SvSTASH(pkgsv);
111 }
112 else {
113 pkg = gv_stashsv(pkgsv, FALSE);
114 }
115 if (pkg) {
116 GV *gv = gv_fetchmethod_autoload(pkg, name, FALSE);
117 if (gv && isGV(gv))
118 ok=1;
119 }
120 else {
121 warn("Event: package '%s' doesn't exist (creating)",
122 SvPV(pkgsv, n_a));
123 pkg = gv_stashsv(pkgsv, 1);
124 }
125 if (!ok) {
126 warn("Event: callback method %s->%s doesn't exist",
127 HvNAME(pkg), name);
128 }
129 WaPERLCB_on(ev);
130 ev->callback = SvREFCNT_inc(nval);
131 } else {
132 if (SvIV(DebugLevel) >= 2)
133 sv_dump(sv);
134 croak("Callback must be a code ref or [$object, $method_name]");
135 }
136 if (old)
137 SvREFCNT_dec(old);
138 }
139 {
140 SV *ret = (WaPERLCB(ev)?
141 (SV*) ev->callback :
142 (ev->callback?
143 sv_2mortal(newSVpvf("<FPTR=0x%p EXT=0x%p>",
144 ev->callback, ev->ext_data)) :
145 &PL_sv_undef));
146 dSP;
147 XPUSHs(ret);
148 PUTBACK;
149 }
150 }
151
WKEYMETH(_watcher_cbtime)152 WKEYMETH(_watcher_cbtime) {
153 if (!nval) {
154 dSP;
155 XPUSHs(sv_2mortal(newSVnv(ev->cbtime)));
156 PUTBACK;
157 } else
158 croak("'e_cbtime' is read-only");
159 }
160
WKEYMETH(_watcher_desc)161 WKEYMETH(_watcher_desc) {
162 if (nval) {
163 sv_setsv(ev->desc, nval);
164 }
165 {
166 dSP;
167 XPUSHs(ev->desc);
168 PUTBACK;
169 }
170 }
171
WKEYMETH(_watcher_debug)172 WKEYMETH(_watcher_debug) {
173 if (nval) {
174 if (sv_true(nval)) WaDEBUG_on(ev); else WaDEBUG_off(ev);
175 }
176 {
177 dSP;
178 XPUSHs(boolSV(WaDEBUG(ev)));
179 PUTBACK;
180 }
181 }
182
WKEYMETH(_watcher_priority)183 WKEYMETH(_watcher_priority) {
184 if (nval) {
185 ev->prio = SvIV(nval);
186 }
187 {
188 dSP;
189 XPUSHs(sv_2mortal(newSViv(ev->prio)));
190 PUTBACK;
191 }
192 }
193
WKEYMETH(_watcher_reentrant)194 WKEYMETH(_watcher_reentrant) {
195 if (nval) {
196 if (sv_true(nval))
197 WaREENTRANT_on(ev);
198 else {
199 if (ev->running > 1)
200 croak("'reentrant' cannot be turned off while nested %d times",
201 ev->running);
202 WaREENTRANT_off(ev);
203 }
204 }
205 {
206 dSP;
207 XPUSHs(boolSV(WaREENTRANT(ev)));
208 PUTBACK;
209 }
210 }
211
WKEYMETH(_watcher_repeat)212 WKEYMETH(_watcher_repeat) {
213 if (nval) {
214 if (sv_true(nval)) WaREPEAT_on(ev); else WaREPEAT_off(ev);
215 }
216 {
217 dSP;
218 XPUSHs(boolSV(WaREPEAT(ev)));
219 PUTBACK;
220 }
221 }
222
WKEYMETH(_watcher_suspend)223 WKEYMETH(_watcher_suspend) {
224 if (nval) {
225 if (sv_true(nval))
226 pe_watcher_suspend(ev);
227 else
228 pe_watcher_resume(ev);
229 }
230 {
231 dSP;
232 XPUSHs(boolSV(WaSUSPEND(ev)));
233 PUTBACK;
234 }
235 }
236
WKEYMETH(_watcher_max_cb_tm)237 WKEYMETH(_watcher_max_cb_tm) {
238 if (nval) {
239 int tm = SvIOK(nval)? SvIV(nval) : 0;
240 if (tm < 0) {
241 warn("e_max_cb_tm must be non-negative");
242 tm=0;
243 }
244 ev->max_cb_tm = tm;
245 }
246 {
247 dSP;
248 XPUSHs(sv_2mortal(newSViv(ev->max_cb_tm)));
249 PUTBACK;
250 }
251 }
252
253 /********************************** *******************************/
254
pe_watcher_nomethod(pe_watcher * ev,char * meth)255 static void pe_watcher_nomethod(pe_watcher *ev, char *meth) {
256 HV *stash = ev->vtbl->stash;
257 assert(stash);
258 croak("%s::%s is missing", HvNAME(stash), meth);
259 }
260
pe_watcher_nostart(pe_watcher * ev,int repeat)261 static char *pe_watcher_nostart(pe_watcher *ev, int repeat)
262 { pe_watcher_nomethod(ev,"start"); return 0; }
pe_watcher_nostop(pe_watcher * ev)263 static void pe_watcher_nostop(pe_watcher *ev)
264 { pe_watcher_nomethod(ev,"stop"); }
pe_watcher_alarm(pe_watcher * ev,pe_timeable * tm)265 static void pe_watcher_alarm(pe_watcher *ev, pe_timeable *tm)
266 { pe_watcher_nomethod(ev,"alarm"); }
267
boot_pe_watcher()268 static void boot_pe_watcher() {
269 HV *stash = gv_stashpv("Event::Watcher", 1);
270 struct pe_watcher_vtbl *vt;
271 PE_RING_INIT(&AllWatchers, 0);
272 vt = &pe_watcher_base_vtbl;
273 vt->stash = 0;
274 vt->did_require = 0;
275 vt->dtor = 0;
276 vt->start = pe_watcher_nostart;
277 vt->stop = pe_watcher_nostop;
278 vt->alarm = pe_watcher_alarm;
279 newCONSTSUB(stash, "ACTIVE", newSViv(PE_ACTIVE));
280 newCONSTSUB(stash, "SUSPEND", newSViv(PE_SUSPEND));
281 newCONSTSUB(stash, "R", newSViv(PE_R));
282 newCONSTSUB(stash, "W", newSViv(PE_W));
283 newCONSTSUB(stash, "E", newSViv(PE_E));
284 newCONSTSUB(stash, "T", newSViv(PE_T));
285 }
286
pe_register_vtbl(pe_watcher_vtbl * vt,HV * stash,pe_event_vtbl * evt)287 static void pe_register_vtbl(pe_watcher_vtbl *vt, HV *stash,
288 pe_event_vtbl *evt) {
289 vt->stash = stash;
290 vt->event_vtbl = evt;
291 vt->new_event = evt->new_event;
292 }
293
pe_watcher_now(pe_watcher * wa)294 static void pe_watcher_now(pe_watcher *wa) {
295 pe_event *ev;
296 if (WaSUSPEND(wa)) return;
297 if (!wa->callback) {
298 STRLEN n_a;
299 croak("Event: attempt to invoke now() method with callback unset on watcher '%s'", SvPV(wa->desc,n_a));
300 }
301
302 WaRUNNOW_on(wa); /* race condition XXX */
303 ev = (*wa->vtbl->new_event)(wa);
304 ++ev->hits;
305 queueEvent(ev);
306 }
307
308 /*******************************************************************
309 The following methods change the status flags. This is the only
310 code that should be changing these flags!
311 */
312
pe_watcher_cancel(pe_watcher * wa)313 static void pe_watcher_cancel(pe_watcher *wa) {
314 if (WaCANCELLED(wa))
315 return;
316 WaSUSPEND_off(wa);
317 pe_watcher_stop(wa, 1); /* peer */
318 WaCANCELLED_on(wa);
319 PE_RING_DETACH(&wa->all);
320 if (wa->mysv)
321 SvREFCNT_dec(wa->mysv); /* might destroy */
322 else if (WaCANDESTROY(wa))
323 (*wa->vtbl->dtor)(wa);
324 }
325
pe_watcher_suspend(pe_watcher * ev)326 static void pe_watcher_suspend(pe_watcher *ev) {
327 STRLEN n_a;
328 assert(ev);
329 if (WaSUSPEND(ev))
330 return;
331 if (WaDEBUGx(ev) >= 4)
332 warn("Event: suspend '%s'\n", SvPV(ev->desc,n_a));
333 pe_watcher_off(ev);
334 pe_watcher_cancel_events(ev);
335 WaSUSPEND_on(ev); /* must happen nowhere else!! */
336 }
337
pe_watcher_resume(pe_watcher * ev)338 static void pe_watcher_resume(pe_watcher *ev) {
339 STRLEN n_a;
340 assert(ev);
341 if (!WaSUSPEND(ev))
342 return;
343 WaSUSPEND_off(ev);
344 if (WaDEBUGx(ev) >= 4)
345 warn("Event: resume '%s'%s\n", SvPV(ev->desc,n_a),
346 WaACTIVE(ev)?" ACTIVE":"");
347 if (WaACTIVE(ev))
348 pe_watcher_on(ev, 0);
349 }
350
pe_watcher_on(pe_watcher * wa,int repeat)351 static char *pe_watcher_on(pe_watcher *wa, int repeat) {
352 STRLEN n_a;
353 char *excuse;
354 if (WaPOLLING(wa) || WaSUSPEND(wa))
355 return 0;
356 if (WaCANCELLED(wa))
357 croak("Event: attempt to start cancelled watcher '%s'",
358 SvPV(wa->desc,n_a));
359 excuse = (*wa->vtbl->start)(wa, repeat);
360 if (excuse) {
361 if (SvIV(DebugLevel))
362 warn("Event: can't restart '%s' %s", SvPV(wa->desc, n_a), excuse);
363 pe_watcher_stop(wa, 1); /* update flags! */
364 } else
365 WaPOLLING_on(wa); /* must happen nowhere else!! */
366 return excuse;
367 }
368
pe_watcher_off(pe_watcher * wa)369 static void pe_watcher_off(pe_watcher *wa) {
370 if (!WaPOLLING(wa) || WaSUSPEND(wa)) return;
371 (*wa->vtbl->stop)(wa);
372 WaPOLLING_off(wa);
373 }
374
pe_watcher_start(pe_watcher * ev,int repeat)375 static void pe_watcher_start(pe_watcher *ev, int repeat) {
376 char *excuse;
377 STRLEN n_a;
378 if (WaACTIVE(ev))
379 return;
380 if (WaDEBUGx(ev) >= 4)
381 warn("Event: active ON '%s'\n", SvPV(ev->desc,n_a));
382 excuse = pe_watcher_on(ev, repeat);
383 if (excuse)
384 croak("Event: can't start '%s' %s", SvPV(ev->desc,n_a), excuse);
385 WaACTIVE_on(ev); /* must happen nowhere else!! */
386 ++ActiveWatchers;
387 }
388
pe_watcher_stop(pe_watcher * ev,int cancel_events)389 static void pe_watcher_stop(pe_watcher *ev, int cancel_events) {
390 STRLEN n_a;
391 if (!WaACTIVE(ev))
392 return;
393 if (WaDEBUGx(ev) >= 4)
394 warn("Event: active OFF '%s'\n", SvPV(ev->desc,n_a));
395 pe_watcher_off(ev);
396 WaACTIVE_off(ev); /* must happen nowhere else!! */
397 if (cancel_events) pe_watcher_cancel_events(ev);
398 --ActiveWatchers;
399 }
400