1#/**
2# Client proxy for apartment threaded objects.
3# <p>
4# Licensed under the Academic Free License version 2.1, as specified in the
5# License.txt file included in this software package, or at
6# <a href="http://www.opensource.org/licenses/afl-2.1.php">OpenSource.org</a>.
7#
8# @author D. Arnold
9# @since 2005-12-01
10# @self	$self
11#*/
12package Thread::Apartment::Client;
13
14use Carp;
15use threads;
16use threads::shared;
17use Thread::Queue::Queueable;
18use Thread::Queue::TQDContainer;
19use Thread::Apartment;
20use Thread::Apartment::Common;
21
22use Thread::Apartment::Common qw(:ta_method_flags);
23
24use base qw(Thread::Queue::Queueable Thread::Queue::TQDContainer Thread::Apartment::Common);
25
26our $AUTOLOAD;
27
28use strict;
29use warnings;
30
31our $VERSION = '0.51';
32
33use constant TAC_CLASS_LEN => 27;
34use constant TAC_REENT_LEN => 13;
35use constant TAC_ASYNC_LEN => 9;
36
37our $async_method;	# set by T::A::start()
38
39sub CLONE { $async_method = undef; }
40
41#/**
42# Constructor. Creates a threads::shared hash to contain the proxy
43# information, so it can be readily passed between threads.
44#
45# @param $proxied_class		the class of the object to be proxied
46# @param $tqd				TQD communications channel to proxied object
47# @param $id				unique object ID for proxied object
48# @param $isa				arrayref of object's class hierarchy
49# @param $methods			hashref mapping exported method names to behavior flags
50# @param $timeout			TQD timeout seconds
51# @param $tid				thread ID of the apartment thread for the proxied object
52#
53# @return		Thread::Apartment::Client object
54#*/
55sub new {
56	my ($class, $proxied_class, $tqd, $id, $isa, $methods, $timeout, $tid)= @_;
57#
58#	create isa, can as shared so we can curse/redeem them easily
59#
60	my @isa : shared = ( @$isa, 'ta_invoke_closure' );
61	my %can : shared = ( %$methods );
62
63	my %self : shared = (
64		_class	=> $proxied_class,	# class we're proxying
65		_id 	=> $id,				# object unique ID (for object hierarchies)
66		_tqd	=> $tqd,			# our comm. channel
67		_isa	=> \@isa,			# classes in hierarchy of proxied object
68		_can	=> \%can,			# exported methods of proxied object
69		_timeout => $timeout,		# TQD timeout
70		_server_tid => $tid,		# tid of apartment thread
71	);
72	bless \%self, $class;
73#
74#	if we have the $method, then we should proceed to
75#	install all the exported methods into our object
76#
77	return \%self;
78}
79#/**
80# Overload UNIVERSAL::isa() to test the class hierarchy of the proxied object.
81#
82# @param $class		class to check if implemented by the proxied object
83#
84# @return		1 if the proxied object implements $class; undef otherwise
85#*/
86sub isa {
87	my ($self, $class) = @_;
88#
89#	catch stuff we need to expose for queueing purposes
90#
91	return 1
92		if (($class eq 'Thread::Queue::Queueable') ||
93			($class eq 'Thread::Queue::TQDContainer') ||
94			($class eq 'Thread::Apartment::Client'));
95	foreach (@{$self->{_isa}}) {
96		return 1 if ($_ eq $class);
97	}
98	return undef;
99}
100
101#/**
102# Overload UNIVERSAL::can() to test the available methods of the proxied object.
103#
104# @param $method	method to check if implemented by the proxied object
105#
106# @return		if the proxied object exports $method (or exports AUTOLOAD),
107#				a closure forcing an AUTOLOAD of the specified $method; undef otherwise
108#*/
109sub can {
110	my ($self, $method) = @_;
111#
112#	we may need to trap the methods for TQQ here...
113#	NOTE!!! Need to return a coderef here!!!
114#
115	return ((exists $self->{_can}{$method}) ||
116		(exists $self->{_can}{'*'}) ||
117		(exists $self->{_can}{AUTOLOAD})) ?
118		sub { $AUTOLOAD = $method; return $self->AUTOLOAD(@_); } :
119		undef;
120}
121#/**
122# Test if the specified method is exported as simplex
123#
124# @param $method method to test for simplex behavior
125#
126# @return		1 if $method is exported and is simplex; undef otherwise
127#*/
128sub ta_is_simplex {
129	return (exists $_[0]->{_can}{$_[1]} ?
130		($_[0]->{_can}{$_[1]} & TA_SIMPLEX) : undef);
131}
132
133#/**
134# Test if the specified method is exported as urgent
135#
136# @param $method method to test for urgent behavior
137#
138# @return		1 if $method is exported and is urgent; undef otherwise
139#*/
140sub ta_is_urgent {
141	return (exists $_[0]->{_can}{$_[1]} ?
142		($_[0]->{_can}{$_[1]} & TA_URGENT) : undef);
143}
144
145#/**
146# Set debug level. When set to a "true" value, causes the TAC to emit
147# diagnostic information.
148#
149# @param $level	debug level. zero or undef turns off debugging; all other values enable debugging
150#
151# @return		the new level
152#*/
153sub tac_debug { $_[0]->{_tac_debug} = $_[1]; }
154
155sub AUTOLOAD {
156#
157#	called in client stub
158#	passes method name
159#	if starts w/ ta_async_, then return immediately
160#	if starts w/ ta_reentrant_, or local thread's T::A::is_reentrant
161#		is true, interleave local thread inbound calls
162#		while waiting for method results
163#	NOTE: use explicit substr() instead of regex for performance
164#
165	my $self = shift;
166
167	my $method = $AUTOLOAD;
168
169	print STDERR "TAC::AUTOLOAD: Method is $method\n"
170		if (substr($method, -9) ne '::DESTROY') && $self->{_tac_debug};
171
172	$async_method = undef,
173	return 1
174		if (substr($method, -9) eq '::DESTROY');
175#
176#	get rid of leading stuff
177#
178#warn "requested method $method\n";
179	$method = substr($method, TAC_CLASS_LEN)
180		if (substr($method, 0, TAC_CLASS_LEN) eq 'Thread::Apartment::Client::');
181
182	my $tid = threads->self()->tid();
183
184	my $async;
185	my $reentrant = Thread::Apartment::get_reentrancy();
186	if (substr($method, 0, TAC_ASYNC_LEN) eq 'ta_async_') {
187		$method = substr($method, TAC_ASYNC_LEN);
188		$@ = "No response closure supplied for async method $method.",
189		$async_method = undef,
190		return undef
191			unless $_[0] && (ref $_[0]) && (ref $_[0] eq 'CODE');
192
193		$async = 1;
194#		$method = defined($1) ? "$1$2" : $2;
195	}
196	elsif (substr($method, 0, TAC_REENT_LEN) eq 'ta_reentrant_') {
197		$reentrant = 1;
198		$method = substr($method, TAC_REENT_LEN);
199#		print STDERR "Got re-entrant call to $method\n";
200	}
201
202	unless (($method eq 'ta_invoke_closure') ||
203		(exists $self->{_can}{$method}) ||
204		(exists $self->{_can}{'AUTOLOAD'})) {
205		$@ = "Can't locate method \"$method\" via package \"$self->{_class}\"";
206		print STDERR $@, "\n"
207			if $self->{_tac_debug};
208		$async_method = undef;
209		return undef;
210	}
211#	print STDERR "Client objid is $self->{_id}\n"
212#		if exists $self->{_can}{'AUTOLOAD'};
213#
214#	support simplex/urgent specification
215#
216	my $flag = $self->{_can}{$method} || 0;
217#
218#	including for closures
219#	check for default closure call behaviors;
220#	note that these are cumulative
221#
222#print "Simplex is ", TA_SIMPLEX, " urgent is ", TA_URGENT, "\n";
223	$_[1] |= Thread::Apartment::get_closure_behavior(),
224	$flag = ($_[1] & (TA_SIMPLEX | TA_URGENT))
225		if ($method eq 'ta_invoke_closure');
226
227	my @params = ($async && (!$async_method)) ?
228		('ta_async', $self->{_id}, wantarray, $method) :
229		($method, $self->{_id}, wantarray);
230#
231#	marshal params
232#	(assume the TAS implementation has a complementary unmarshal)
233#
234#	print join(', ', @_), "\n"
235#		if (scalar @_) && ($method eq 'ta_invoke_closure');
236	push @params, $self->marshal(@_)
237		if scalar @_;
238
239	my $tqd = $self->{_tqd};			# perf opt.
240	my $timeout = $self->{_timeout};	# perf opt.
241#
242#	don't support start()/rendezvous() for simplex
243#
244	$async_method = undef,
245	return (($flag & TA_URGENT) ?
246		$tqd->enqueue_simplex_urgent(@params) :
247		$tqd->enqueue_simplex(@params))
248		if ($flag & TA_SIMPLEX);
249
250#print STDERR "calling getCase in $tid\n" if ($method eq 'getCase');
251
252#print STDERR "calling $method with ", join(', ', @params), "\n"
253#	if $async;
254	my $id = ($flag & TA_URGENT) ?
255		$tqd->enqueue_urgent(@params) :
256		$tqd->enqueue(@params);
257#
258#	NOTE: we don't support ta_async_ w/ start()/rendezvous()
259#
260	$async_method = undef,
261#	print STDERR "Called async method $method\n" and
262	return $id
263		if $async;
264
265	Thread::Apartment->map_async_request_id($async_method, $self, $id),
266	$async_method = undef,
267	return $id
268		if $async_method;
269
270#print STDERR "called getCase in $tid\n" if ($method eq 'getCase');
271#
272#	if reentrant, attempt to service inbound calls to the caller
273#	while we wait for the response...
274#	note that the return value doesn't matter, since the subsequent
275#	wait()'s will retrieve any pending response if the caller is
276#	a TAS, or will just do the usual wait() thing if the caller
277#	isn't TAS (i.e., run_wait returns undef)
278#
279	if ($reentrant) {
280#		print STDERR "Calling T::A::run_wait in $tid for id $id at ", time(), "\n";
281
282#		print STDERR "Returned from T::A::run_wait in $tid for timed out\n" and
283		return undef
284			unless Thread::Apartment::run_wait($tqd, $id, $timeout);
285#		print STDERR "Returned from T::A::run_wait in $tid for id $id at ", time(), "\n";
286	}
287
288#print STDERR "waiting for getCase in $tid\n" if ($method eq 'getCase');
289
290	my $resp = $timeout ?
291		$tqd->wait_until($id, $timeout) :
292		$tqd->wait($id);
293
294#print STDERR "getCase returned in $tid\n" if ($method eq 'getCase');
295
296#	warn "\nwait failed: $@\n" and
297	return undef
298		unless $resp;
299
300	unless (defined($resp->[0])) {
301		$@ = $resp->[1];
302		print STDERR $@, "\n"
303			if $self->{_tac_debug};
304		return undef;
305	}
306
307#warn "got response: $$results[0]\n";
308#	shift @$results;
309#
310#	unmarshal results
311#
312	my $results = $self->unmarshal($resp->[0]);
313
314	return wantarray ? @$results : defined(wantarray) ? $results->[0] : 1;
315}
316
317#/**
318# Return current TQD timeout
319#
320# @return		TQD timeout in seconds
321#*/
322sub get_timeout {
323	return $_[0]->{_timeout};
324}
325
326#/**
327# Return proxied class
328#
329# @return		proxied class name string
330#*/
331sub get_proxied_class {
332	return $_[0]->{_class};
333}
334
335#/**
336# Set TQD timeout
337#
338# @param $timeout	max. number of seconds to wait for TQD responses.
339#
340# @return		previous timeout value
341#*/
342sub set_timeout {
343	my $to = $_[0]->{_timeout};
344	$_[0]->{_timeout} = $_[1];
345	return $to;
346}
347#/**
348# Wait for the proxied object's apartment thread to exit.
349#
350# @return		1
351#*/
352sub join {
353#
354#	Don't know why, but unless we use the scalar TID
355#	instead of deref'ing, object() just won't work ???
356#
357	my $tid = $_[0]->{_server_tid};
358
359#print STDERR "Joining $tid\n";
360	my $thread = threads->object($tid);
361#print STDERR "Thread $tid not found\n" and
362	return 1
363		unless $thread;
364	$thread->join();
365#print STDERR "Joined...$tid\n";
366	return 1;
367}
368
369#/**
370# Stop the proxied object's apartment thread.
371#*/
372sub stop {
373	$_[0]->{_tqd}->enqueue_simplex('STOP');
374}
375#/**
376# TQQ redeem() override. Checks if the TAC has been passed into
377# the thread in which is was created, in which case it looks up
378# and returns the proxied object in the T::A object map. Otherwise, just
379# blesses the object back into a TAC.
380#
381# @param $class	our TAC class
382# @param $obj   the object to be redeem()ed
383#
384# @return		if in the originating thread, the proxied object; else a
385#				reblessed TAC.
386#*/
387sub redeem {
388	my ($class, $obj) = @_;
389
390	bless $obj, $class;
391	return ($obj->{_server_tid} == threads->self()->tid()) ?
392		Thread::Apartment::get_object_by_id($obj->{_id}) : $obj;
393}
394#/**
395# Return results of a pending method/closure request.
396# Looks up the pending request ID in the current thread's T::A,
397# then waits for the completion of the request, unmarshals and returns
398# the results.
399#
400# @return		results of the currently pending request (if any)
401#*/
402sub get_pending_results {
403	my $self = shift;
404	my $id = Thread::Apartment->get_pending_request($self);
405	return undef unless $id;
406	my @results = $self->{_tqd}->wait($id);
407	return @results;
408}
409#/**
410# Set async method for next call in current thread.
411#
412# @param $async	boolean value to set $async_method flag
413#
414# @return		none
415#*/
416sub set_async { $async_method = $_[0]; }
417
4181;
419