1package Continuity::Adapt::PSGI;
2
3=head1 NAME
4
5Continuity::Adapt::PSGI - PSGI backend for Continuity
6
7=head1 SYNOPSIS
8
9  # Run with on of these:
10  #   corona demo.pl
11  #   twiggy demo.pl
12  #   ./myapp.pl # Will try to fall back to HttpDaemon ;)
13
14  # "Twiggy is a lightweight and fast HTTP server"
15  # "Corona is a Coro based Plack web server. It uses Net::Server::Coro under the hood"
16
17  use Continuity;
18
19  my $server = Continuity->new;
20
21  sub main {
22    my $request = shift;
23    my $i = 0;
24    while(++$i) {
25      $request->print("Hello number $i!");
26      $request->next;
27    }
28  }
29
30  # This is actually returning a subref to PSI/Plack
31  # So put it at the end
32  $server->loop;
33
34=cut
35
36use strict;
37use warnings;
38
39use Continuity::Request;
40use base 'Continuity::Request';
41
42use Coro;
43use Coro::Channel;
44use Plack;
45use Plack::App::File; # use this now; no surprises for later
46
47warn "tested against Plack 0.9938; you have $Plack::VERSION" if $Plack::VERSION < 0.9938;
48
49sub debug_level { exists $_[1] ? $_[0]->{debug_level} = $_[1] : $_[0]->{debug_level} }
50
51sub debug_callback { exists $_[1] ? $_[0]->{debug_callback} = $_[1] : $_[0]->{debug_callback} }
52
53sub docroot { exists $_[1] ? $_[0]->{docroot} = $_[1] : $_[0]->{docroot} }
54
55sub new {
56  my $class = shift;
57  bless {
58    first_request => 1,
59    debug_level => 1,
60    debug_callback => sub { print STDERR "@_\n" },
61    request_queue => Coro::Channel->new(),
62    @_
63  }, $class;
64}
65
66sub get_request {
67  # called from Continuity's main loop (new calls start_request_loop; start_request_loop gets requests from here or wherever and passes them to the mapper)
68  my ($self) = @_;
69  my $request = $self->{request_queue}->get or die;
70  return $request;
71}
72
73sub loop_hook {
74
75  my $self = shift;
76
77  # $server->loop calls this; plackup run .psgi files except a coderef as the
78  # last value and this lets that coderef fall out of the call to
79  # $server->loop.
80
81  # unique to the PSGI adapter -- a coderef that gets invoked when a request
82  # comes in
83
84  my $app = sub {
85    my $env = shift;
86
87    unless ($env->{'psgi.streaming'}) {
88      die 'This application needs psgi.streaming support!';
89    }
90
91    # stuff $env onto a queue that get_request above pulls from; get_request is
92    # called from Continuity's main execution context/loop. Continuity's main
93    # execution loop invokes the Mapper to send the request across a queue to
94    # the per session execution context (creating a new one as needed).
95
96    return sub {
97      my $response = shift;
98
99      async {
100        local $Coro::current->{desc} = 'PSGI Response Maker';
101
102        # make it now and send it through the queue fully formed
103        my $request = Continuity::Adapt::PSGI::Request->new( $env, $response );
104        $self->{request_queue}->put($request);
105
106        # Now... we wait!
107        $request->{response_done_watcher}->wait;
108      };
109    };
110  };
111
112  # Is this needed?
113  Coro::cede();
114
115  return $app;
116}
117
118=head2 C<< $adapter->map_path($path) >>
119
120Decodes URL-encoding in the path and attempts to guard against malice.
121Returns the processed filesystem path.
122
123=cut
124
125sub map_path {
126  my $self = shift;
127  my $path = shift() || '';
128  my $docroot = $self->docroot || '';
129  # my $docroot = Cwd::getcwd();
130  # $docroot .= '/' if $docroot and $docroot ne '.' and $docroot !~ m{/$};
131  # some massaging, also makes it more secure
132  $path =~ s/%([0-9a-fA-F][0-9a-fA-F])/chr hex $1/ge;
133  $path =~ s%//+%/%g unless $docroot;
134  $path =~ s%/\.(?=/|$)%%g;
135  $path =~ s%/[^/]+/\.\.(?=/|$)%%g;
136
137  # if($path =~ m%^/?\.\.(?=/|$)%) then bad
138
139$self->Continuity::debug(2,"path: $docroot$path\n");
140
141  return "$docroot$path";
142}
143
144
145sub send_static {
146  my ($self, $r) = @_;
147
148  # this is called from Continuity.pm to give a request back to us to deal with that it got from our get_request.
149  # rather than sending it to the mapper to get sent to the per-user execution context, it gets returned straight back here.
150  # $r is an instance of Continuity::Adapt::PSGI::Request
151
152  my $url_path = $r->url_path;
153
154  $url_path =~ s{\?.*}{};
155  my $path = $self->map_path($url_path) or do {
156       $self->Continuity::debug(1, "can't map path: " . $url_path);
157       # die; # XXX don't die except in debugging
158      ( $r->{response_code}, $r->{response_headers}, $r->{response_content} ) = ( 404, [], [ "Static file not found" ] );
159      $r->{response_done_watcher}->send;
160      return;
161  };
162
163  my $stuff = Plack::App::File->serve_path({},$path);
164
165  ( $r->{response_code}, $r->{response_headers}, $r->{response_content} ) = @$stuff;
166  $r->response->(
167    [ $r->response_code, $r->response_headers, $r->response_content ]
168  );
169  $r->{response_done_watcher}->send;
170
171}
172
173#
174#
175#
176
177package Continuity::Adapt::PSGI::Request;
178
179use Coro::Signal;
180use Coro::AnyEvent;
181
182# List of cookies to send
183sub cookies { exists $_[1] ? $_[0]->{cookies} = $_[1] : $_[0]->{cookies} }
184
185# Flag, never send type
186sub no_content_type { exists $_[1] ? $_[0]->{no_content_type} = $_[1] : $_[0]->{no_content_type} }
187
188# CGI query params
189sub cached_params { exists $_[1] ? $_[0]->{cached_params} = $_[1] : $_[0]->{cached_params} }
190
191# The writer is kinda like our connection
192sub writer { exists $_[1] ? $_[0]->{writer} = $_[1] : $_[0]->{writer} }
193sub response { exists $_[1] ? $_[0]->{response} = $_[1] : $_[0]->{response} }
194
195sub response_code { exists $_[1] ? $_[0]->{response_code} = $_[1] : $_[0]->{response_code} }
196sub response_headers { exists $_[1] ? $_[0]->{response_headers} = $_[1] : $_[0]->{response_headers} }
197sub response_content { exists $_[1] ? $_[0]->{response_content} = $_[1] : $_[0]->{response_content} }
198
199sub debug_level { exists $_[1] ? $_[0]->{debug_level} = $_[1] : $_[0]->{debug_level} }
200
201sub debug_callback { exists $_[1] ? $_[0]->{debug_callback} = $_[1] : $_[0]->{debug_callback} }
202
203sub new {
204  my ($class, $env, $response) = @_;
205  my $self = {
206    response_code => 200,
207    response_headers => [],
208    response_content => [],
209    response_done_watcher => Coro::Signal->new,
210    response => $response,
211    debug_level => 3,
212    debug_callback => sub { print STDERR "@_\n" },
213    %$env
214  };
215  bless $self, $class;
216  return $self;
217}
218
219sub param {
220    my $self = shift;
221    my $env = { %$self };
222    unless($self->cached_params) {
223      use Plack::Request;
224      my $req = Plack::Request->new($env);
225      $self->cached_params( [ %{$req->parameters} ] );
226    };
227    my @params = @{ $self->cached_params };
228    if(@_) {
229        my @values;
230        while(@_) {
231          my $param = shift;
232          for(my $i = 0; $i < @params; $i += 2) {
233              push @values, $params[$i+1] if $params[$i] eq $param;
234          }
235        }
236        return unless @values;
237        return wantarray ? @values : $values[0];
238    } else {
239        return @{$self->cached_params};
240    }
241}
242
243sub params {
244    my $self = shift;
245    $self->param;
246    return @{$self->cached_params};
247}
248
249sub method {
250  my ($self) = @_;
251  return $self->{REQUEST_METHOD};
252}
253
254sub url {
255  my ($self) = @_;
256  return $self->{'psgi.url_scheme'} . '://' . $self->{HTTP_HOST} . $self->{PATH_INFO};
257}
258
259sub url_path {
260  my ($self) = @_;
261  return $self->{PATH_INFO};
262}
263
264sub uri {
265  my $self = shift;
266  return $self->url(@_);
267}
268
269sub set_cookie {
270    my $self = shift;
271    my $cookie = shift;
272    # record cookies and then send them the next time send_basic_header() is called and a header is sent.
273    #$self->{Cookie} = $self->{Cookie} . "Set-Cookie: $cookie";
274    push @{ $self->{response_headers} }, "Set-Cookie" => "$cookie";
275}
276
277sub get_cookie {
278    my $self = shift;
279    my $cookie_name = shift;
280    my ($cookie) =  map $_->[1],
281      grep $_->[0] eq $cookie_name,
282      map [ m/(.*?)=(.*)/ ],
283      split /; */,
284      $self->{HTTP_COOKIE} || '';
285    return $cookie;
286}
287
288sub immediate { }
289
290sub send_basic_header {
291    my $self = shift;
292    my $cookies = $self->cookies;
293    $self->cookies('');
294
295    unless($self->no_content_type) {
296      push @{ $self->{response_headers} },
297           "Cache-Control" => "private, no-store, no-cache",
298           "Pragma" => "no-cache",
299           "Expires" => "0",
300           "Content-type" => "text/html",
301      ;
302    }
303
304    my $writer = $self->response->(
305      [ $self->response_code, $self->response_headers ]
306    );
307
308    $self->writer( $writer );
309}
310
311sub print {
312  my $self = shift;
313
314  eval {
315    $self->writer->write( @_ );
316  };
317
318  # This is a good time to let other stuff run
319  Coro::AnyEvent::idle();
320
321  return $self;
322}
323
324sub end_request {
325  my $self = shift;
326
327  # Tell our writer that we're done
328  $self->writer->close if $self->writer;
329
330  # Signal that we are done building our response
331  $self->{response_done_watcher}->send;
332}
333
3341;
335