1##
2# Copyright (C) 2014-2017 Pietro Cerutti <gahr@gahr.ch>
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions
6# are met:
7#
8# 1. Redistributions of source code must retain the above copyright
9#    notice, this list of conditions and the following disclaimer.
10#
11# 2. Redistributions in binary form must reproduce the above copyright
12#    notice, this list of conditions and the following disclaimer in the
13#    documentation and/or other materials provided with the distribution.
14#
15# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18# ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25# SUCH DAMAGE.
26#
27
28package require Tcl 8.6
29package require TclOO
30
31package provide retcl 0.4.0
32
33catch {retcl destroy}
34
35namespace eval ::retcl {
36
37    ##
38    # Default Redis endpoint address
39    variable defaultHost 127.0.0.1
40
41    ##
42    # Default Redis endpoint port
43    variable defaultPort 6379
44
45    ##
46    # Mapping of RESP data types to symbolic names.
47    # See http://redis.io/topics/protocol.
48    variable typeNames {
49        +   SimpleString
50        -   Error
51        :   Integer
52        $   BulkString
53        *   Array
54    }
55
56    ##
57    # Commands (outgoing) related to pub-sub
58    variable pubSubCommands [list psubscribe punsubscribe subscribe unsubscribe]
59
60    ##
61    # Messages (incoming) related to pub-sub
62    variable pubSubMessages [concat message pmessage $pubSubCommands]
63}
64
65oo::class create retcl {
66
67    ##
68    # Keep a cache of the commands sent and results received. Each command sent
69    # to the server is assigned a unique identifier, which can be then used by
70    # the user to retrieve the result (see [result] method).  The resultsCache
71    # variable is a dictionary where unique identifiers are the keys, with
72    # further keys for a possible callback, status, type, and response.  A
73    # callback is defined by the -cb cmdPrefix argument to any command and is
74    # invoked whenever a result is made available. Status is either 0 (not
75    # replied) or 1 (replied). Type is one of the values of the typeNames list.
76    # New commands are appended at the tail of the list; responses are inserted
77    # at the first command with a status 0.
78    #
79    # rds:1 {
80    #    status   (0|1)
81    #    callback (cmdPrefix)
82    #    type     (SimpleString|...)
83    #    response (RESPONSE)
84    # }
85    variable resultsCache
86
87    ##
88    # Read buffer. This is appended to incrementally to handle partial reads
89    # from the server.
90    variable readBuf
91
92    ##
93    # Boolean to indicate whether a result should be kept in the results cache
94    # indefinitely or automatically removed as soon as it's retrieved by the
95    # client(0) (see +keepCache and -keepCache methods)
96    variable keepCache
97
98    ##
99    # Boolean to indicate whether commands are to be sent out in an
100    # asynchronous way (see +async and -async methods).
101    variable async
102
103    ##
104    # An incremental integer to track requests / responses.
105    variable cmdIdNumber
106
107    ##
108    # The Redis server host and port, as used in the constructor or in the
109    # connect method.
110    variable host
111    variable port
112
113    ##
114    # The socket used to connect to the server
115    variable sock
116
117    ##
118    # A dictionary with channels / patterns subscribed to and callbacks.
119    # Callbacks are scripts that will be evaluated at the global level.
120    variable callbacks
121
122    ##
123    # Command prefix to be invoked whenever an error occurs. Defaults to
124    # [error].
125    variable errorCallback
126
127    ##
128    # A list of commands inside a pipeline.
129    variable pipeline
130    variable isPipelined
131
132    ##
133    # After Id for the period event that checks whether the connection is still
134    # valid.
135    variable checkEventId
136
137    ##
138    # After Id for the automatic reconnection.
139    variable reconnectEventId
140
141    ##
142    # Simple sentinel that's set whenever there's activity
143    variable activity
144
145    ##
146    # Constructor -- connect to a Retcl server.
147    constructor {args} {
148        set resultsCache [dict create]
149        set keepCache 1
150        set async 1
151        set cmdIdNumber 0
152        set host $::retcl::defaultHost
153        set port $::retcl::defaultPort
154        set sock {}
155        set callbacks [dict create]
156        set errorCallback error
157        set pipeline {}
158        set isPipelined 0
159        set checkEventId {}
160        set reconnectEventId {}
161        set activity 0
162
163        switch [llength $args] {
164            0 {
165                # connect to default host and port
166                my connect
167            }
168            1 {
169                if {[lindex $args 0] eq {-noconnect}} {
170                    # disconnected mode - nothing to do
171                } else {
172                    my Error "bad option \"[lindex $args 0]\": must be -noconnect"
173                }
174            }
175            2 {
176                lassign $args host port
177                my connect
178            }
179            default {
180                my Error "wrong # args: must be \"?host port?\" or \"-noconnect\""
181            }
182        }
183    }
184
185    ##
186    # Destructor -- disconnect.
187    destructor {
188        my disconnect
189    }
190
191    ##
192    # Connect to a Redis server.
193    method connect {{a_host {}} {a_port {}}} {
194        if {$sock ne {}} {
195            my Error "Already connected"
196        }
197
198        if {$a_host ne {}} {
199            set host $a_host
200        }
201        if {$a_port ne {}} {
202            set port $a_port
203        }
204
205        if {[catch {socket $host $port} res]} {
206            my Error "Cannot connect: $res"
207        }
208
209        set sock $res
210        chan configure $sock -blocking 0 -translation binary
211        chan event $sock readable [list [self object] readEvent]
212        set checkEventId [after 500 [list [self object] checkConnection]]
213        return {}
214    }
215
216    ##
217    # Reconnect to the Redis server. This tries to reconnect waiting up to 10
218    # seconds in total.
219    method reconnect {{i 0}} {
220        my disconnect
221        set maxAttempts 20
222        set waitMillis 500
223        if {$i == $maxAttempts} {
224            my Error {Could not reconnect to Redis server}
225        }
226        set saveErrorCallback $errorCallback
227        my errorHandler {}
228        set err [catch {my connect $host $port} msg]
229        my errorHandler $saveErrorCallback
230        if {$err} {
231            set reconnectEventId \
232                [after $waitMillis [list [self object] reconnect [incr i]]]
233        }
234        return {}
235    }
236
237    ##
238    # Periodically check whether a connection has been interrupted.
239    method checkConnection {} {
240        if {$sock ne {} && ![catch {chan eof $sock} err] && !$err} {
241            set checkEventId [after 500 [list [self object] checkConnection]]
242        } else {
243            my disconnect
244            set activity 1
245        }
246    }
247
248    ##
249    # Check whether we're currently connected to a Retcl server.
250    method connected {} {
251        expr {$sock ne {}}
252    }
253
254    ##
255    # Disconnect from the Redis server.
256    method disconnect {} {
257        catch {close $sock}
258        set sock {}
259        after cancel $checkEventId
260        after cancel $reconnectEventId
261    }
262
263    ##
264    # Turn on asynchronous operation
265    method +async {} {
266        set async 1
267    }
268    export +async
269
270    ##
271    # Turn off asynchronous operation.
272    method -async {} {
273        set async 0
274    }
275    export -async
276
277    ##
278    # Query the currenct asynchronous operation mode.
279    method ?async {} {
280        set async
281    }
282    export ?async
283
284    ##
285    # Turn on keeping results in the cache.
286    method +keepCache {} {
287        set keepCache 1
288    }
289    export +keepCache
290
291    ##
292    # Turn off keeping results in the cache.
293    method -keepCache {} {
294        set keepCache 0
295    }
296    export -keepCache
297
298    ##
299    # Query the current cache keeping mode.
300    method ?keepCache {} {
301        set keepCache
302    }
303    export ?keepCache
304
305    ##
306    # Setup and error callback or restore the default one ([error]). The
307    # cmdPrefix is passed an additional argument containing the error message.
308    method errorHandler {{cmdPrefix {}}} {
309        if {$cmdPrefix eq {}} {
310            set errorCallback error
311        } else {
312            set errorCallback $cmdPrefix
313        }
314    }
315
316    ##
317    # Get the result of a previously issued command. If the response has not
318    # yet arrived, the command waits until it's available, or returns the empty
319    # string if -async is given.
320    method result {args} {
321
322        switch [llength $args] {
323            1 {
324                set asyncArg 0
325                set cmdId $args
326            }
327            2 {
328                if {[lindex $args 0] ne {-async}} {
329                    my Error {wrong # args: should be "result ?-async? cmdId"}
330                }
331                set asyncArg 1
332                set cmdId [lindex $args 1]
333            }
334            default {
335                my Error {wrong # args: should be "result ?-async? cmdId"}
336            }
337        }
338
339        if {![dict exists $resultsCache $cmdId]} {
340            my Error "Invalid command id: $cmdId"
341        }
342
343        while {1} {
344            if {[dict get $resultsCache $cmdId status] == 1} {
345                set res [dict get $resultsCache $cmdId response]
346                if {!$keepCache} {
347                    dict unset resultsCache $cmdId
348                }
349                return $res
350            }
351
352            if {$asyncArg} {
353                return {}
354            }
355
356            vwait [self namespace]::activity
357
358            if {![my connected]} {
359                my Error {Disconnected}
360            }
361        }
362    }
363
364    ##
365    # Check whether a result is ready for retrieval.
366    method resultReady {cmdId} {
367        if {![dict exists $resultsCache $cmdId]} {
368            my Error "Invalid command id: $cmdId"
369        }
370        dict get $resultsCache $cmdId status
371    }
372
373    ##
374    # Retrieve the type of a result, or the empty string if the result is not
375    # ready.
376    method resultType {cmdId} {
377        if {[catch {dict get $resultsCache $cmdId type} res]} {
378            set res {}
379        }
380        set res
381    }
382
383    ##
384    # Return a dictionary of the reuslts in form of cmdId => result.
385    method allResults {} {
386
387        set res [dict create]
388
389        dict for {cmdId state} $resultsCache {
390            dict with state {
391                if {$status == 1} {
392                    dict set res $cmdId $response
393                    dict set res $cmdId:type $type
394                }
395            }
396        }
397
398        return $res
399    }
400
401    ##
402    # Clear results from the cache.
403    method clearResult {{clearCmdId {}}} {
404
405        if {$clearCmdId eq {}} {
406            set resultsCache [dict filter $resultsCache script {cmdId _} {
407                expr {![dict get $resultsCache $cmdId status]}
408            }]
409        } else {
410            set resultsCache [dict filter $resultsCache script {cmdId _} {
411                expr {$clearCmdId ne $cmdId ||
412                      ![dict get $resultsCache $cmdId status]}
413            }]
414        }
415        return {}
416    }
417
418    ##
419    # Execute all Redis commands inside a script within a single pipeline.
420    method pipeline {script} {
421
422        my LockPipeline
423        try {
424            uplevel [list eval $script]
425        } on error msg {
426            my Error $msg
427        } finally {
428            my ReleasePipeline
429        }
430    }
431
432    ##
433    # Set a callback to be called when a message is pushed from the server
434    # because of a PUBLISH command issued by some other client. Item can be a
435    # channel (see SUBSCRIBE) or a pattern (see PSUBSCRIBE). An empty callback
436    # removes the callback previously set on the same item, if any. This
437    # method returns the previously set callback, if any.  This method does
438    # not automatically send a (P)SUBSCRIBE message to the Redis server.
439    #
440    # See http://redis.io/topics/pubsub.
441    method callback {item {callback {}}} {
442        try {
443            dict get $callbacks $item
444        } on error {} {
445            set prev {}
446        } on ok prev {}
447
448        if {$callback eq {}} {
449            dict unset callbacks $item
450        } else {
451            dict set callbacks $item $callback
452        }
453
454        return $prev
455    }
456
457    ##
458    # The unknown handler handles unknown methods as Redis commands
459    method unknown {args} {
460
461        if {![llength $args]} {
462            return
463        }
464
465        set sendAsync $async
466        set callback {}
467
468        switch [lindex $args 0] {
469            {-sync} {
470                # Send synchronously and return the result, when available
471                set sendAsync 0
472                set args [lrange $args 1 end]
473            }
474            {-cb} {
475                # Be notified via a callback
476                set callback [lindex $args 1]
477                set args [lrange $args 2 end]
478            }
479        }
480
481        if {![llength $args]} {
482            return
483        }
484
485        if {[string tolower [lindex $args 0]] in $::retcl::pubSubCommands} {
486            # These messages are part of the Pub/Sub protocol; we don't expect
487            # a response.
488            set cmdId {}
489        } else {
490            set cmdId "rds:[incr cmdIdNumber]"
491            dict set resultsCache $cmdId status 0
492            dict set resultsCache $cmdId callback $callback
493        }
494
495        my Send $args
496
497        if {$sendAsync || $callback ne {} || $cmdId eq {}} {
498            # Asynchronous send, return the command identifier
499            return $cmdId
500        } else {
501            # Synchronous send, wait for the result and return it
502            set res [my result $cmdId]
503            my clearResult $cmdId
504            return $res
505        }
506    }
507
508    ##########################################################################
509    # The following methods are private to the retcl library and not intended
510    # to be used by consumers.
511    ##########################################################################
512
513    ##
514    # Handle a read event from the socket.
515    #
516    # Must be public (starts with a lower case letter) because it's used in the
517    # event loop.
518    method readEvent {} {
519        set activity 1
520        if {[chan eof $sock]} {
521            my disconnect
522            return
523        }
524
525        append readBuf [read $sock]
526
527        set idx 0
528        while {$idx < [string length $readBuf]} {
529            set result [my ParseBuf $readBuf $idx]
530            if {$result eq {}} {
531                break
532            }
533
534            lassign $result idx type data
535            my HandleResult $type $data
536        }
537
538        if {$idx != 0} {
539            set readBuf [string range $readBuf $idx end]
540        }
541    }
542
543    ##
544    # Parse the read buffer. starting at index startIdx. Returns a list
545    # consisting of:
546    #
547    # idx   : index up to which the buffer has been parsed
548    # type  : type of the object found
549    # value : value of the object
550    #
551    # or the empty string if no complete object could be parsed.
552    method ParseBuf {buffer startIdx} {
553
554        if {![string length $buffer]} {
555            return
556        }
557
558        set respCode [string index $buffer $startIdx]
559        set respType [my TypeName $respCode]
560
561        switch -- $respCode {
562
563            "+" -
564            "-" -
565            ":" {
566                # Simple Strings, Errors, and Integers are handled
567                # straight forward
568                lassign [my ParseLine $buffer $startIdx+1] eol line
569                if {$eol == -1} {
570                    return
571                }
572                return [list [expr {$eol+2}] $respType $line]
573            }
574
575            "$" {
576                # Bulk Strings, the number of characters is specified in the
577                # first line. We handle Null values and empty strings right
578                # away.
579                lassign [my ParseLine $buffer $startIdx+1] eol bulkLen
580                if {$eol == -1} {
581                    return
582                }
583
584                # Null Bulk String
585                if {$bulkLen eq {-1}} {
586                    return [list [expr {$eol+2}] $respType (nil)]
587                }
588
589                # Empty Bulk String
590                if {$bulkLen eq {0}} {
591                    return [list [expr {$eol+4}] $respType {}]
592                }
593
594                # Non-empty Bulk String
595                incr eol 2
596                set endIdx [expr {$eol+$bulkLen-1}]
597                if {[string length $buffer] < [expr {$endIdx+2}]} {
598                    # Need to wait for more input
599                    return
600                }
601                return [list [expr {$endIdx+3}] $respType [string range $buffer $eol $endIdx]]
602            }
603
604            "*" {
605                # Arrays, the number of elements is specified in the first
606                # line.
607                lassign [my ParseLine $buffer $startIdx+1] eol arrLen
608                if {$eol == -1} {
609                    return
610                }
611
612                # Null Array
613                if {$arrLen eq {-1}} {
614                    return [list [expr {$eol+2}] $respType (nil)]
615                }
616
617                # Empty array
618                if {$arrLen eq {0}} {
619                    return [list [expr {$eol+2}] $respType {}]
620                }
621
622                # Non-empty Array
623                set idx [expr {$eol+2}]
624                set elems [list]
625                while {$arrLen} {
626                    set elem [my ParseBuf $buffer $idx]
627                    if {$elem eq {}} {
628                        return {}
629                    }
630
631                    lappend elems [lindex $elem 2]
632                    set idx [lindex $elem 0]
633                    incr arrLen -1
634                }
635
636                return [list $idx $respType $elems]
637            }
638
639            default {
640                puts "Unhandled type: $buffer"
641            }
642        }
643    }
644
645    method ParseLine {buffer startIdx} {
646        set eol [string first "\r" $buffer $startIdx]
647        if {$eol == -1} {
648            return -1
649        }
650        set line [string range $buffer $startIdx $eol-1]
651        return [list $eol $line]
652    }
653
654    ##
655    # Handle a complete result read from the server.
656    method HandleResult {type body} {
657        # We have to handle two distinct cases:
658        # - a pushed message (can be message, subscribe, or ubsubscribe)
659        # - a command response
660        #
661        # The first case is handled by forwarding the message contents to a
662        # registered callback, if any exists. For message types we're done. For
663        # subscribe / unsubscribe types we also have to locate the
664        # corresponding request and clear it.
665        #
666        # The second case is handled by locating the corresponding request and
667        # filling in the result.
668
669        # If the response is a pushed message
670        if {$type eq {Array} && [lindex $body 0] in $::retcl::pubSubMessages} {
671            if {[lindex $body 0] eq {pmessage}} {
672                lassign $body type pattern item data
673            } else {
674                lassign $body type item data
675                set pattern $item
676            }
677            try {
678                dict get $callbacks $pattern
679            } on ok callback {
680                namespace eval :: $callback $type $pattern $item $data
681            } finally {
682                # It's a subscribe / unsubscribe. Clear the corresponding
683                # request, if any.
684                return
685            }
686        }
687
688        #
689        # If we get here, the response wasn't a pushed message
690        #
691
692        # Look for the first command without a result
693        set cmdIds [my FindPendingRequest]
694        if {$cmdIds eq {}} {
695            # All requests already have a response, something went bad
696            my Error "No request found for response $body"
697        }
698        set cmdId [lindex $cmdIds 0]
699        set cb [dict get $resultsCache $cmdId callback]
700        if {$cb ne {}} {
701            {*}$cb $cmdId $type $body
702            dict unset resultsCache $cmdId
703        } else {
704            dict set resultsCache $cmdId type $type
705            dict set resultsCache $cmdId response $body
706            dict set resultsCache $cmdId status 1
707        }
708    }
709
710    ##
711    # Get a return type string by its byte.
712    method TypeName {byte} {
713        if {[catch {dict get $::retcl::typeNames $byte} name]} {
714            my Error "Invalid type byte: $byte"
715        }
716        set name
717    }
718
719    ##
720    # Build the RESP representation of a command.
721    method BuildResp {args} {
722        set msg "*[llength $args]\r\n"
723        foreach word $args {
724            append msg "\$[string length $word]\r\n$word\r\n"
725        }
726        set msg
727    }
728
729    ##
730    # Send command(s) over to the Redis server. Each
731    # argument is a list of words composing the command.
732    method Send {args} {
733        if {[llength $args] > 1} {
734            my Error "Too many args: $args"
735        }
736        foreach cmd $args {
737            append pipeline "[my BuildResp {*}$cmd]\r\n"
738        }
739
740        if {!$isPipelined} {
741            my Flush
742        }
743    }
744
745    ##
746    # Return a list of responses from the server. A maximum number
747    # of results might be specified. This is mostly used internally
748    # to recursively call [my Recv] to receive Array elements.
749    method Recv {{includeTypes 1} {maxResults -1}} {
750        if {$maxResults == 0} {
751            return
752        }
753
754        set result [list]
755
756        while {[gets $sock line] > 0} {
757            set respCode [string index $line 0]
758            set respName [my TypeName $respCode]
759            set respData [string range $line 1 end]
760
761            switch $respCode {
762                + -
763                - -
764                : {
765                    # Simple Strings, Errors, and Integers are handled
766                    # straight forward
767                    if {$includeTypes} {
768                        lappend result [list $respName $respData]
769                    } else {
770                        lappend result $respData
771                    }
772                }
773                $ {
774                    # Bulk Strings, read the number of char specified in the
775                    # first line.
776                    # If it's -1, it's a (nil).
777                    if {$respData eq {-1}} {
778                        if {$includeTypes} {
779                            lappend result [list BulkString (nil)]
780                        } else {
781                            lappend result (nil)
782                        }
783                    } else {
784                        set bulk [read $sock $respData]
785                        if {$includeTypes} {
786                            lappend result [list $respName $bulk]
787                        } else {
788                            lappend result $bulk
789                        }
790                        gets $sock ;# consume the final end of line
791                    }
792                }
793                * {
794                    # Arrays, call [my Recv] recursively to get the number of
795                    # elements
796                    if {$includeTypes} {
797                        lappend result [list Array [my Recv 0 $respData]]
798                    } else {
799                        lappend result [my Recv 0 $respData]
800                    }
801                }
802            }
803
804            if {$maxResults != -1 && [incr maxResults -1] == 0} {
805                break
806            }
807        }
808
809        set result
810    }
811
812    ##
813    # Return a list of pending command ids.
814    method FindPendingRequest {} {
815        set allPending [list]
816
817        foreach cmdId [dict keys $resultsCache] {
818            if {[dict get $resultsCache $cmdId status] == 0} {
819                lappend allPending $cmdId
820            }
821        }
822
823        set allPending
824    }
825
826    ##
827    # Lock the pipeline. Redis commands are buffered and only sent to the
828    # server when ReleasePipeline is called.
829    method LockPipeline {} {
830        if {$isPipelined} {
831            my Error "Cannot nest pipelines"
832        }
833        set isPipelined 1
834    }
835
836    ##
837    # Release a pipeline and flush all buffered commands.
838    method ReleasePipeline {} {
839        if {!$isPipelined} {
840            my Error "No pipeline to release"
841        }
842        my Flush
843    }
844
845    ##
846    # Flush the output buffer
847    method Flush {} {
848        if {![my connected]} {
849            my Error {Disconnected}
850        }
851        if {[catch {puts -nonewline $sock $pipeline} err]} {
852            my Error $err
853        }
854        chan flush $sock
855
856        set isPipelined 0
857        set pipeline [list]
858    }
859
860    ##
861    # Error handler.
862    method Error {msg} {
863        {*}$errorCallback $msg
864        return -level 2
865    }
866}
867
868# vim: set ft=tcl ts=4 expandtab:
869