1 static int NextID = 0;
2 static pe_ring AllWatchers;
3 static struct pe_watcher_vtbl pe_watcher_base_vtbl;
4 
pe_watcher_init(pe_watcher * ev,HV * stash,SV * temple)5 static void pe_watcher_init(pe_watcher *ev, HV *stash, SV *temple) {
6     STRLEN n_a;
7     assert(ev);
8     assert(ev->vtbl);
9     if (!ev->vtbl->stash)
10 	croak("sub-class VTBL must have a stash (doesn't!)");
11     if (!ev->vtbl->did_require) {
12 	SV *tmp;
13 	char *name = HvNAME(ev->vtbl->stash);
14 	dTHX;
15 	if (memEQ(name, "Event::", 7))
16 	    name += 7;
17 	tmp = sv_2mortal(newSVpvf("Event/%s.pm", name));
18 	perl_require_pv(SvPV(tmp, n_a));
19 	if (sv_true(ERRSV))
20 	    croak("Event: could not load perl support code for Event::%s: %s",
21 		  name, SvPV(ERRSV,n_a));
22 	++ev->vtbl->did_require;
23     }
24     /* if we have a non-default stash then we need to save it! */
25     ev->mysv = stash || temple ? wrap_watcher(ev, stash, temple) : 0;
26     PE_RING_INIT(&ev->all, ev);
27     PE_RING_INIT(&ev->events, 0);
28 
29     /* no exceptions after this point */
30 
31     PE_RING_UNSHIFT(&ev->all, &AllWatchers);
32     WaFLAGS(ev) = 0;
33     WaINVOKE1_on(ev);
34     WaREENTRANT_on(ev);
35     ev->FALLBACK = 0;
36     NextID = (NextID+1) & 0x7fff; /* make it look like the kernel :-, */
37     ev->refcnt = 0;
38     ev->desc = newSVpvn("??",2);
39     ev->running = 0;
40     ev->max_cb_tm = 1;  /* make default configurable? */
41     ev->cbtime = 0;
42     ev->prio = PE_QUEUES;
43     ev->callback = 0;
44     ev->ext_data = 0;
45     ev->stats = 0;
46 }
47 
pe_watcher_cancel_events(pe_watcher * wa)48 static void pe_watcher_cancel_events(pe_watcher *wa) {
49     pe_event *ev;
50     while (!PE_RING_EMPTY(&wa->events)) {
51 	pe_ring *lk = wa->events.prev;
52 	ev = (pe_event*) lk->self;
53 	dequeEvent(ev);
54 	pe_event_release(ev);
55     }
56 }
57 
pe_watcher_dtor(pe_watcher * wa)58 static void pe_watcher_dtor(pe_watcher *wa) {
59     STRLEN n_a;
60     assert(WaCANDESTROY(wa));
61     if (WaDESTROYED(wa)) {
62 	warn("Attempt to destroy watcher 0x%x again (ignored)", wa);
63 	return;
64     }
65     WaDESTROYED_on(wa);
66     if (WaDEBUGx(wa) >= 3)
67 	warn("Watcher '%s' destroyed", SvPV(wa->desc, n_a));
68     assert(PE_RING_EMPTY(&wa->events));
69     if (WaPERLCB(wa))
70 	SvREFCNT_dec(wa->callback);
71     if (wa->FALLBACK)
72 	SvREFCNT_dec(wa->FALLBACK);
73     if (wa->desc)
74 	SvREFCNT_dec(wa->desc);
75     if (wa->stats)
76 	Estat.dtor(wa->stats);
77     /* safefree(wa); do it yourself */
78 }
79 
80 /********************************** *******************************/
81 
WKEYMETH(_watcher_callback)82 WKEYMETH(_watcher_callback) {
83     if (nval) {
84 	AV *av;
85 	SV *sv;
86 	SV *old=0;
87 	if (WaPERLCB(ev))
88 	    old = (SV*) ev->callback;
89 	if (!SvOK(nval)) {
90 	    WaPERLCB_off(ev);
91 	    ev->callback = 0;
92 	    ev->ext_data = 0;
93 	    pe_watcher_stop(ev, 0);
94 	} else if (SvROK(nval) && (SvTYPE(sv=SvRV(nval)) == SVt_PVCV)) {
95 	    WaPERLCB_on(ev);
96 	    ev->callback = SvREFCNT_inc(nval);
97 	} else if (SvROK(nval) &&
98 		   (SvTYPE(av=(AV*)SvRV(nval)) == SVt_PVAV) &&
99 		   av_len(av) == 1) {
100 	    /* method lookup code adapted from universal.c */
101 	    STRLEN n_a;
102 	    SV *pkgsv = *av_fetch(av, 0, 0);
103 	    HV *pkg = NULL;
104 	    SV *namesv = *av_fetch(av, 1, 0);
105 	    char *name = SvPV(namesv, n_a);
106 	    int ok=0;
107 	    if(SvROK(pkgsv)) {
108 		pkgsv = (SV*)SvRV(pkgsv);
109 		if(SvOBJECT(pkgsv))
110 		    pkg = SvSTASH(pkgsv);
111 	    }
112 	    else {
113 		pkg = gv_stashsv(pkgsv, FALSE);
114 	    }
115 	    if (pkg) {
116 		GV *gv = gv_fetchmethod_autoload(pkg, name, FALSE);
117 		if (gv && isGV(gv))
118 		    ok=1;
119 	    }
120 	    else {
121 		warn("Event: package '%s' doesn't exist (creating)",
122 		     SvPV(pkgsv, n_a));
123 		pkg = gv_stashsv(pkgsv, 1);
124 	    }
125 	    if (!ok) {
126 		warn("Event: callback method %s->%s doesn't exist",
127 		     HvNAME(pkg), name);
128 	    }
129 	    WaPERLCB_on(ev);
130 	    ev->callback = SvREFCNT_inc(nval);
131 	} else {
132 	    if (SvIV(DebugLevel) >= 2)
133 		sv_dump(sv);
134 	    croak("Callback must be a code ref or [$object, $method_name]");
135 	}
136 	if (old)
137 	    SvREFCNT_dec(old);
138     }
139     {
140 	SV *ret = (WaPERLCB(ev)?
141 		   (SV*) ev->callback :
142 		   (ev->callback?
143 		    sv_2mortal(newSVpvf("<FPTR=0x%p EXT=0x%p>",
144 					ev->callback, ev->ext_data)) :
145 		    &PL_sv_undef));
146 	dSP;
147 	XPUSHs(ret);
148 	PUTBACK;
149     }
150 }
151 
WKEYMETH(_watcher_cbtime)152 WKEYMETH(_watcher_cbtime) {
153     if (!nval) {
154 	dSP;
155 	XPUSHs(sv_2mortal(newSVnv(ev->cbtime)));
156 	PUTBACK;
157     } else
158 	croak("'e_cbtime' is read-only");
159 }
160 
WKEYMETH(_watcher_desc)161 WKEYMETH(_watcher_desc) {
162     if (nval) {
163 	sv_setsv(ev->desc, nval);
164     }
165     {
166 	dSP;
167 	XPUSHs(ev->desc);
168 	PUTBACK;
169     }
170 }
171 
WKEYMETH(_watcher_debug)172 WKEYMETH(_watcher_debug) {
173     if (nval) {
174 	if (sv_true(nval)) WaDEBUG_on(ev); else WaDEBUG_off(ev);
175     }
176     {
177 	dSP;
178 	XPUSHs(boolSV(WaDEBUG(ev)));
179 	PUTBACK;
180     }
181 }
182 
WKEYMETH(_watcher_priority)183 WKEYMETH(_watcher_priority) {
184     if (nval) {
185 	ev->prio = SvIV(nval);
186     }
187     {
188 	dSP;
189 	XPUSHs(sv_2mortal(newSViv(ev->prio)));
190 	PUTBACK;
191     }
192 }
193 
WKEYMETH(_watcher_reentrant)194 WKEYMETH(_watcher_reentrant) {
195     if (nval) {
196 	if (sv_true(nval))
197 	    WaREENTRANT_on(ev);
198 	else {
199 	    if (ev->running > 1)
200 		croak("'reentrant' cannot be turned off while nested %d times",
201 		      ev->running);
202 	    WaREENTRANT_off(ev);
203 	}
204     }
205     {
206 	dSP;
207 	XPUSHs(boolSV(WaREENTRANT(ev)));
208 	PUTBACK;
209     }
210 }
211 
WKEYMETH(_watcher_repeat)212 WKEYMETH(_watcher_repeat) {
213     if (nval) {
214 	if (sv_true(nval)) WaREPEAT_on(ev); else WaREPEAT_off(ev);
215     }
216     {
217 	dSP;
218 	XPUSHs(boolSV(WaREPEAT(ev)));
219 	PUTBACK;
220     }
221 }
222 
WKEYMETH(_watcher_suspend)223 WKEYMETH(_watcher_suspend) {
224     if (nval) {
225 	if (sv_true(nval))
226 	    pe_watcher_suspend(ev);
227 	else
228 	    pe_watcher_resume(ev);
229     }
230     {
231 	dSP;
232 	XPUSHs(boolSV(WaSUSPEND(ev)));
233 	PUTBACK;
234     }
235 }
236 
WKEYMETH(_watcher_max_cb_tm)237 WKEYMETH(_watcher_max_cb_tm) {
238     if (nval) {
239 	int tm = SvIOK(nval)? SvIV(nval) : 0;
240 	if (tm < 0) {
241 	    warn("e_max_cb_tm must be non-negative");
242 	    tm=0;
243 	}
244 	ev->max_cb_tm = tm;
245     }
246     {
247 	dSP;
248 	XPUSHs(sv_2mortal(newSViv(ev->max_cb_tm)));
249 	PUTBACK;
250     }
251 }
252 
253 /********************************** *******************************/
254 
pe_watcher_nomethod(pe_watcher * ev,char * meth)255 static void pe_watcher_nomethod(pe_watcher *ev, char *meth) {
256     HV *stash = ev->vtbl->stash;
257     assert(stash);
258     croak("%s::%s is missing", HvNAME(stash), meth);
259 }
260 
pe_watcher_nostart(pe_watcher * ev,int repeat)261 static char *pe_watcher_nostart(pe_watcher *ev, int repeat)
262 { pe_watcher_nomethod(ev,"start"); return 0; }
pe_watcher_nostop(pe_watcher * ev)263 static void pe_watcher_nostop(pe_watcher *ev)
264 { pe_watcher_nomethod(ev,"stop"); }
pe_watcher_alarm(pe_watcher * ev,pe_timeable * tm)265 static void pe_watcher_alarm(pe_watcher *ev, pe_timeable *tm)
266 { pe_watcher_nomethod(ev,"alarm"); }
267 
boot_pe_watcher()268 static void boot_pe_watcher() {
269     HV *stash = gv_stashpv("Event::Watcher", 1);
270     struct pe_watcher_vtbl *vt;
271     PE_RING_INIT(&AllWatchers, 0);
272     vt = &pe_watcher_base_vtbl;
273     vt->stash = 0;
274     vt->did_require = 0;
275     vt->dtor = 0;
276     vt->start = pe_watcher_nostart;
277     vt->stop = pe_watcher_nostop;
278     vt->alarm = pe_watcher_alarm;
279     newCONSTSUB(stash, "ACTIVE", newSViv(PE_ACTIVE));
280     newCONSTSUB(stash, "SUSPEND", newSViv(PE_SUSPEND));
281     newCONSTSUB(stash, "R", newSViv(PE_R));
282     newCONSTSUB(stash, "W", newSViv(PE_W));
283     newCONSTSUB(stash, "E", newSViv(PE_E));
284     newCONSTSUB(stash, "T", newSViv(PE_T));
285 }
286 
pe_register_vtbl(pe_watcher_vtbl * vt,HV * stash,pe_event_vtbl * evt)287 static void pe_register_vtbl(pe_watcher_vtbl *vt, HV *stash,
288 			     pe_event_vtbl *evt) {
289     vt->stash = stash;
290     vt->event_vtbl = evt;
291     vt->new_event = evt->new_event;
292 }
293 
pe_watcher_now(pe_watcher * wa)294 static void pe_watcher_now(pe_watcher *wa) {
295     pe_event *ev;
296     if (WaSUSPEND(wa)) return;
297     if (!wa->callback) {
298       STRLEN n_a;
299       croak("Event: attempt to invoke now() method with callback unset on watcher '%s'", SvPV(wa->desc,n_a));
300     }
301 
302     WaRUNNOW_on(wa); /* race condition XXX */
303     ev = (*wa->vtbl->new_event)(wa);
304     ++ev->hits;
305     queueEvent(ev);
306 }
307 
308 /*******************************************************************
309   The following methods change the status flags.  This is the only
310   code that should be changing these flags!
311 */
312 
pe_watcher_cancel(pe_watcher * wa)313 static void pe_watcher_cancel(pe_watcher *wa) {
314     if (WaCANCELLED(wa))
315 	return;
316     WaSUSPEND_off(wa);
317     pe_watcher_stop(wa, 1); /* peer */
318     WaCANCELLED_on(wa);
319     PE_RING_DETACH(&wa->all);
320     if (wa->mysv)
321 	SvREFCNT_dec(wa->mysv);	/* might destroy */
322     else if (WaCANDESTROY(wa))
323 	(*wa->vtbl->dtor)(wa);
324 }
325 
pe_watcher_suspend(pe_watcher * ev)326 static void pe_watcher_suspend(pe_watcher *ev) {
327     STRLEN n_a;
328     assert(ev);
329     if (WaSUSPEND(ev))
330 	return;
331     if (WaDEBUGx(ev) >= 4)
332 	warn("Event: suspend '%s'\n", SvPV(ev->desc,n_a));
333     pe_watcher_off(ev);
334     pe_watcher_cancel_events(ev);
335     WaSUSPEND_on(ev); /* must happen nowhere else!! */
336 }
337 
pe_watcher_resume(pe_watcher * ev)338 static void pe_watcher_resume(pe_watcher *ev) {
339     STRLEN n_a;
340     assert(ev);
341     if (!WaSUSPEND(ev))
342 	return;
343     WaSUSPEND_off(ev);
344     if (WaDEBUGx(ev) >= 4)
345 	warn("Event: resume '%s'%s\n", SvPV(ev->desc,n_a),
346 	     WaACTIVE(ev)?" ACTIVE":"");
347     if (WaACTIVE(ev))
348         pe_watcher_on(ev, 0);
349 }
350 
pe_watcher_on(pe_watcher * wa,int repeat)351 static char *pe_watcher_on(pe_watcher *wa, int repeat) {
352     STRLEN n_a;
353     char *excuse;
354     if (WaPOLLING(wa) || WaSUSPEND(wa))
355 	return 0;
356     if (WaCANCELLED(wa))
357 	croak("Event: attempt to start cancelled watcher '%s'",
358 	      SvPV(wa->desc,n_a));
359     excuse = (*wa->vtbl->start)(wa, repeat);
360     if (excuse) {
361 	if (SvIV(DebugLevel))
362 	    warn("Event: can't restart '%s' %s", SvPV(wa->desc, n_a), excuse);
363 	pe_watcher_stop(wa, 1); /* update flags! */
364     } else
365 	WaPOLLING_on(wa); /* must happen nowhere else!! */
366     return excuse;
367 }
368 
pe_watcher_off(pe_watcher * wa)369 static void pe_watcher_off(pe_watcher *wa) {
370     if (!WaPOLLING(wa) || WaSUSPEND(wa)) return;
371     (*wa->vtbl->stop)(wa);
372     WaPOLLING_off(wa);
373 }
374 
pe_watcher_start(pe_watcher * ev,int repeat)375 static void pe_watcher_start(pe_watcher *ev, int repeat) {
376     char *excuse;
377     STRLEN n_a;
378     if (WaACTIVE(ev))
379 	return;
380     if (WaDEBUGx(ev) >= 4)
381 	warn("Event: active ON '%s'\n", SvPV(ev->desc,n_a));
382     excuse = pe_watcher_on(ev, repeat);
383     if (excuse)
384 	croak("Event: can't start '%s' %s", SvPV(ev->desc,n_a), excuse);
385     WaACTIVE_on(ev); /* must happen nowhere else!! */
386     ++ActiveWatchers;
387 }
388 
pe_watcher_stop(pe_watcher * ev,int cancel_events)389 static void pe_watcher_stop(pe_watcher *ev, int cancel_events) {
390     STRLEN n_a;
391     if (!WaACTIVE(ev))
392 	return;
393     if (WaDEBUGx(ev) >= 4)
394 	warn("Event: active OFF '%s'\n", SvPV(ev->desc,n_a));
395     pe_watcher_off(ev);
396     WaACTIVE_off(ev); /* must happen nowhere else!! */
397     if (cancel_events) pe_watcher_cancel_events(ev);
398     --ActiveWatchers;
399 }
400