1#/** 2# Client proxy for apartment threaded objects. 3# <p> 4# Licensed under the Academic Free License version 2.1, as specified in the 5# License.txt file included in this software package, or at 6# <a href="http://www.opensource.org/licenses/afl-2.1.php">OpenSource.org</a>. 7# 8# @author D. Arnold 9# @since 2005-12-01 10# @self $self 11#*/ 12package Thread::Apartment::Client; 13 14use Carp; 15use threads; 16use threads::shared; 17use Thread::Queue::Queueable; 18use Thread::Queue::TQDContainer; 19use Thread::Apartment; 20use Thread::Apartment::Common; 21 22use Thread::Apartment::Common qw(:ta_method_flags); 23 24use base qw(Thread::Queue::Queueable Thread::Queue::TQDContainer Thread::Apartment::Common); 25 26our $AUTOLOAD; 27 28use strict; 29use warnings; 30 31our $VERSION = '0.51'; 32 33use constant TAC_CLASS_LEN => 27; 34use constant TAC_REENT_LEN => 13; 35use constant TAC_ASYNC_LEN => 9; 36 37our $async_method; # set by T::A::start() 38 39sub CLONE { $async_method = undef; } 40 41#/** 42# Constructor. Creates a threads::shared hash to contain the proxy 43# information, so it can be readily passed between threads. 44# 45# @param $proxied_class the class of the object to be proxied 46# @param $tqd TQD communications channel to proxied object 47# @param $id unique object ID for proxied object 48# @param $isa arrayref of object's class hierarchy 49# @param $methods hashref mapping exported method names to behavior flags 50# @param $timeout TQD timeout seconds 51# @param $tid thread ID of the apartment thread for the proxied object 52# 53# @return Thread::Apartment::Client object 54#*/ 55sub new { 56 my ($class, $proxied_class, $tqd, $id, $isa, $methods, $timeout, $tid)= @_; 57# 58# create isa, can as shared so we can curse/redeem them easily 59# 60 my @isa : shared = ( @$isa, 'ta_invoke_closure' ); 61 my %can : shared = ( %$methods ); 62 63 my %self : shared = ( 64 _class => $proxied_class, # class we're proxying 65 _id => $id, # object unique ID (for object hierarchies) 66 _tqd => $tqd, # our comm. channel 67 _isa => \@isa, # classes in hierarchy of proxied object 68 _can => \%can, # exported methods of proxied object 69 _timeout => $timeout, # TQD timeout 70 _server_tid => $tid, # tid of apartment thread 71 ); 72 bless \%self, $class; 73# 74# if we have the $method, then we should proceed to 75# install all the exported methods into our object 76# 77 return \%self; 78} 79#/** 80# Overload UNIVERSAL::isa() to test the class hierarchy of the proxied object. 81# 82# @param $class class to check if implemented by the proxied object 83# 84# @return 1 if the proxied object implements $class; undef otherwise 85#*/ 86sub isa { 87 my ($self, $class) = @_; 88# 89# catch stuff we need to expose for queueing purposes 90# 91 return 1 92 if (($class eq 'Thread::Queue::Queueable') || 93 ($class eq 'Thread::Queue::TQDContainer') || 94 ($class eq 'Thread::Apartment::Client')); 95 foreach (@{$self->{_isa}}) { 96 return 1 if ($_ eq $class); 97 } 98 return undef; 99} 100 101#/** 102# Overload UNIVERSAL::can() to test the available methods of the proxied object. 103# 104# @param $method method to check if implemented by the proxied object 105# 106# @return if the proxied object exports $method (or exports AUTOLOAD), 107# a closure forcing an AUTOLOAD of the specified $method; undef otherwise 108#*/ 109sub can { 110 my ($self, $method) = @_; 111# 112# we may need to trap the methods for TQQ here... 113# NOTE!!! Need to return a coderef here!!! 114# 115 return ((exists $self->{_can}{$method}) || 116 (exists $self->{_can}{'*'}) || 117 (exists $self->{_can}{AUTOLOAD})) ? 118 sub { $AUTOLOAD = $method; return $self->AUTOLOAD(@_); } : 119 undef; 120} 121#/** 122# Test if the specified method is exported as simplex 123# 124# @param $method method to test for simplex behavior 125# 126# @return 1 if $method is exported and is simplex; undef otherwise 127#*/ 128sub ta_is_simplex { 129 return (exists $_[0]->{_can}{$_[1]} ? 130 ($_[0]->{_can}{$_[1]} & TA_SIMPLEX) : undef); 131} 132 133#/** 134# Test if the specified method is exported as urgent 135# 136# @param $method method to test for urgent behavior 137# 138# @return 1 if $method is exported and is urgent; undef otherwise 139#*/ 140sub ta_is_urgent { 141 return (exists $_[0]->{_can}{$_[1]} ? 142 ($_[0]->{_can}{$_[1]} & TA_URGENT) : undef); 143} 144 145#/** 146# Set debug level. When set to a "true" value, causes the TAC to emit 147# diagnostic information. 148# 149# @param $level debug level. zero or undef turns off debugging; all other values enable debugging 150# 151# @return the new level 152#*/ 153sub tac_debug { $_[0]->{_tac_debug} = $_[1]; } 154 155sub AUTOLOAD { 156# 157# called in client stub 158# passes method name 159# if starts w/ ta_async_, then return immediately 160# if starts w/ ta_reentrant_, or local thread's T::A::is_reentrant 161# is true, interleave local thread inbound calls 162# while waiting for method results 163# NOTE: use explicit substr() instead of regex for performance 164# 165 my $self = shift; 166 167 my $method = $AUTOLOAD; 168 169 print STDERR "TAC::AUTOLOAD: Method is $method\n" 170 if (substr($method, -9) ne '::DESTROY') && $self->{_tac_debug}; 171 172 $async_method = undef, 173 return 1 174 if (substr($method, -9) eq '::DESTROY'); 175# 176# get rid of leading stuff 177# 178#warn "requested method $method\n"; 179 $method = substr($method, TAC_CLASS_LEN) 180 if (substr($method, 0, TAC_CLASS_LEN) eq 'Thread::Apartment::Client::'); 181 182 my $tid = threads->self()->tid(); 183 184 my $async; 185 my $reentrant = Thread::Apartment::get_reentrancy(); 186 if (substr($method, 0, TAC_ASYNC_LEN) eq 'ta_async_') { 187 $method = substr($method, TAC_ASYNC_LEN); 188 $@ = "No response closure supplied for async method $method.", 189 $async_method = undef, 190 return undef 191 unless $_[0] && (ref $_[0]) && (ref $_[0] eq 'CODE'); 192 193 $async = 1; 194# $method = defined($1) ? "$1$2" : $2; 195 } 196 elsif (substr($method, 0, TAC_REENT_LEN) eq 'ta_reentrant_') { 197 $reentrant = 1; 198 $method = substr($method, TAC_REENT_LEN); 199# print STDERR "Got re-entrant call to $method\n"; 200 } 201 202 unless (($method eq 'ta_invoke_closure') || 203 (exists $self->{_can}{$method}) || 204 (exists $self->{_can}{'AUTOLOAD'})) { 205 $@ = "Can't locate method \"$method\" via package \"$self->{_class}\""; 206 print STDERR $@, "\n" 207 if $self->{_tac_debug}; 208 $async_method = undef; 209 return undef; 210 } 211# print STDERR "Client objid is $self->{_id}\n" 212# if exists $self->{_can}{'AUTOLOAD'}; 213# 214# support simplex/urgent specification 215# 216 my $flag = $self->{_can}{$method} || 0; 217# 218# including for closures 219# check for default closure call behaviors; 220# note that these are cumulative 221# 222#print "Simplex is ", TA_SIMPLEX, " urgent is ", TA_URGENT, "\n"; 223 $_[1] |= Thread::Apartment::get_closure_behavior(), 224 $flag = ($_[1] & (TA_SIMPLEX | TA_URGENT)) 225 if ($method eq 'ta_invoke_closure'); 226 227 my @params = ($async && (!$async_method)) ? 228 ('ta_async', $self->{_id}, wantarray, $method) : 229 ($method, $self->{_id}, wantarray); 230# 231# marshal params 232# (assume the TAS implementation has a complementary unmarshal) 233# 234# print join(', ', @_), "\n" 235# if (scalar @_) && ($method eq 'ta_invoke_closure'); 236 push @params, $self->marshal(@_) 237 if scalar @_; 238 239 my $tqd = $self->{_tqd}; # perf opt. 240 my $timeout = $self->{_timeout}; # perf opt. 241# 242# don't support start()/rendezvous() for simplex 243# 244 $async_method = undef, 245 return (($flag & TA_URGENT) ? 246 $tqd->enqueue_simplex_urgent(@params) : 247 $tqd->enqueue_simplex(@params)) 248 if ($flag & TA_SIMPLEX); 249 250#print STDERR "calling getCase in $tid\n" if ($method eq 'getCase'); 251 252#print STDERR "calling $method with ", join(', ', @params), "\n" 253# if $async; 254 my $id = ($flag & TA_URGENT) ? 255 $tqd->enqueue_urgent(@params) : 256 $tqd->enqueue(@params); 257# 258# NOTE: we don't support ta_async_ w/ start()/rendezvous() 259# 260 $async_method = undef, 261# print STDERR "Called async method $method\n" and 262 return $id 263 if $async; 264 265 Thread::Apartment->map_async_request_id($async_method, $self, $id), 266 $async_method = undef, 267 return $id 268 if $async_method; 269 270#print STDERR "called getCase in $tid\n" if ($method eq 'getCase'); 271# 272# if reentrant, attempt to service inbound calls to the caller 273# while we wait for the response... 274# note that the return value doesn't matter, since the subsequent 275# wait()'s will retrieve any pending response if the caller is 276# a TAS, or will just do the usual wait() thing if the caller 277# isn't TAS (i.e., run_wait returns undef) 278# 279 if ($reentrant) { 280# print STDERR "Calling T::A::run_wait in $tid for id $id at ", time(), "\n"; 281 282# print STDERR "Returned from T::A::run_wait in $tid for timed out\n" and 283 return undef 284 unless Thread::Apartment::run_wait($tqd, $id, $timeout); 285# print STDERR "Returned from T::A::run_wait in $tid for id $id at ", time(), "\n"; 286 } 287 288#print STDERR "waiting for getCase in $tid\n" if ($method eq 'getCase'); 289 290 my $resp = $timeout ? 291 $tqd->wait_until($id, $timeout) : 292 $tqd->wait($id); 293 294#print STDERR "getCase returned in $tid\n" if ($method eq 'getCase'); 295 296# warn "\nwait failed: $@\n" and 297 return undef 298 unless $resp; 299 300 unless (defined($resp->[0])) { 301 $@ = $resp->[1]; 302 print STDERR $@, "\n" 303 if $self->{_tac_debug}; 304 return undef; 305 } 306 307#warn "got response: $$results[0]\n"; 308# shift @$results; 309# 310# unmarshal results 311# 312 my $results = $self->unmarshal($resp->[0]); 313 314 return wantarray ? @$results : defined(wantarray) ? $results->[0] : 1; 315} 316 317#/** 318# Return current TQD timeout 319# 320# @return TQD timeout in seconds 321#*/ 322sub get_timeout { 323 return $_[0]->{_timeout}; 324} 325 326#/** 327# Return proxied class 328# 329# @return proxied class name string 330#*/ 331sub get_proxied_class { 332 return $_[0]->{_class}; 333} 334 335#/** 336# Set TQD timeout 337# 338# @param $timeout max. number of seconds to wait for TQD responses. 339# 340# @return previous timeout value 341#*/ 342sub set_timeout { 343 my $to = $_[0]->{_timeout}; 344 $_[0]->{_timeout} = $_[1]; 345 return $to; 346} 347#/** 348# Wait for the proxied object's apartment thread to exit. 349# 350# @return 1 351#*/ 352sub join { 353# 354# Don't know why, but unless we use the scalar TID 355# instead of deref'ing, object() just won't work ??? 356# 357 my $tid = $_[0]->{_server_tid}; 358 359#print STDERR "Joining $tid\n"; 360 my $thread = threads->object($tid); 361#print STDERR "Thread $tid not found\n" and 362 return 1 363 unless $thread; 364 $thread->join(); 365#print STDERR "Joined...$tid\n"; 366 return 1; 367} 368 369#/** 370# Stop the proxied object's apartment thread. 371#*/ 372sub stop { 373 $_[0]->{_tqd}->enqueue_simplex('STOP'); 374} 375#/** 376# TQQ redeem() override. Checks if the TAC has been passed into 377# the thread in which is was created, in which case it looks up 378# and returns the proxied object in the T::A object map. Otherwise, just 379# blesses the object back into a TAC. 380# 381# @param $class our TAC class 382# @param $obj the object to be redeem()ed 383# 384# @return if in the originating thread, the proxied object; else a 385# reblessed TAC. 386#*/ 387sub redeem { 388 my ($class, $obj) = @_; 389 390 bless $obj, $class; 391 return ($obj->{_server_tid} == threads->self()->tid()) ? 392 Thread::Apartment::get_object_by_id($obj->{_id}) : $obj; 393} 394#/** 395# Return results of a pending method/closure request. 396# Looks up the pending request ID in the current thread's T::A, 397# then waits for the completion of the request, unmarshals and returns 398# the results. 399# 400# @return results of the currently pending request (if any) 401#*/ 402sub get_pending_results { 403 my $self = shift; 404 my $id = Thread::Apartment->get_pending_request($self); 405 return undef unless $id; 406 my @results = $self->{_tqd}->wait($id); 407 return @results; 408} 409#/** 410# Set async method for next call in current thread. 411# 412# @param $async boolean value to set $async_method flag 413# 414# @return none 415#*/ 416sub set_async { $async_method = $_[0]; } 417 4181; 419