1## 2# Copyright (C) 2014-2017 Pietro Cerutti <gahr@gahr.ch> 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions 6# are met: 7# 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 11# 2. Redistributions in binary form must reproduce the above copyright 12# notice, this list of conditions and the following disclaimer in the 13# documentation and/or other materials provided with the distribution. 14# 15# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE 19# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25# SUCH DAMAGE. 26# 27 28package require Tcl 8.6 29package require TclOO 30 31package provide retcl 0.4.0 32 33catch {retcl destroy} 34 35namespace eval ::retcl { 36 37 ## 38 # Default Redis endpoint address 39 variable defaultHost 127.0.0.1 40 41 ## 42 # Default Redis endpoint port 43 variable defaultPort 6379 44 45 ## 46 # Mapping of RESP data types to symbolic names. 47 # See http://redis.io/topics/protocol. 48 variable typeNames { 49 + SimpleString 50 - Error 51 : Integer 52 $ BulkString 53 * Array 54 } 55 56 ## 57 # Commands (outgoing) related to pub-sub 58 variable pubSubCommands [list psubscribe punsubscribe subscribe unsubscribe] 59 60 ## 61 # Messages (incoming) related to pub-sub 62 variable pubSubMessages [concat message pmessage $pubSubCommands] 63} 64 65oo::class create retcl { 66 67 ## 68 # Keep a cache of the commands sent and results received. Each command sent 69 # to the server is assigned a unique identifier, which can be then used by 70 # the user to retrieve the result (see [result] method). The resultsCache 71 # variable is a dictionary where unique identifiers are the keys, with 72 # further keys for a possible callback, status, type, and response. A 73 # callback is defined by the -cb cmdPrefix argument to any command and is 74 # invoked whenever a result is made available. Status is either 0 (not 75 # replied) or 1 (replied). Type is one of the values of the typeNames list. 76 # New commands are appended at the tail of the list; responses are inserted 77 # at the first command with a status 0. 78 # 79 # rds:1 { 80 # status (0|1) 81 # callback (cmdPrefix) 82 # type (SimpleString|...) 83 # response (RESPONSE) 84 # } 85 variable resultsCache 86 87 ## 88 # Read buffer. This is appended to incrementally to handle partial reads 89 # from the server. 90 variable readBuf 91 92 ## 93 # Boolean to indicate whether a result should be kept in the results cache 94 # indefinitely or automatically removed as soon as it's retrieved by the 95 # client(0) (see +keepCache and -keepCache methods) 96 variable keepCache 97 98 ## 99 # Boolean to indicate whether commands are to be sent out in an 100 # asynchronous way (see +async and -async methods). 101 variable async 102 103 ## 104 # An incremental integer to track requests / responses. 105 variable cmdIdNumber 106 107 ## 108 # The Redis server host and port, as used in the constructor or in the 109 # connect method. 110 variable host 111 variable port 112 113 ## 114 # The socket used to connect to the server 115 variable sock 116 117 ## 118 # A dictionary with channels / patterns subscribed to and callbacks. 119 # Callbacks are scripts that will be evaluated at the global level. 120 variable callbacks 121 122 ## 123 # Command prefix to be invoked whenever an error occurs. Defaults to 124 # [error]. 125 variable errorCallback 126 127 ## 128 # A list of commands inside a pipeline. 129 variable pipeline 130 variable isPipelined 131 132 ## 133 # After Id for the period event that checks whether the connection is still 134 # valid. 135 variable checkEventId 136 137 ## 138 # After Id for the automatic reconnection. 139 variable reconnectEventId 140 141 ## 142 # Simple sentinel that's set whenever there's activity 143 variable activity 144 145 ## 146 # Constructor -- connect to a Retcl server. 147 constructor {args} { 148 set resultsCache [dict create] 149 set keepCache 1 150 set async 1 151 set cmdIdNumber 0 152 set host $::retcl::defaultHost 153 set port $::retcl::defaultPort 154 set sock {} 155 set callbacks [dict create] 156 set errorCallback error 157 set pipeline {} 158 set isPipelined 0 159 set checkEventId {} 160 set reconnectEventId {} 161 set activity 0 162 163 switch [llength $args] { 164 0 { 165 # connect to default host and port 166 my connect 167 } 168 1 { 169 if {[lindex $args 0] eq {-noconnect}} { 170 # disconnected mode - nothing to do 171 } else { 172 my Error "bad option \"[lindex $args 0]\": must be -noconnect" 173 } 174 } 175 2 { 176 lassign $args host port 177 my connect 178 } 179 default { 180 my Error "wrong # args: must be \"?host port?\" or \"-noconnect\"" 181 } 182 } 183 } 184 185 ## 186 # Destructor -- disconnect. 187 destructor { 188 my disconnect 189 } 190 191 ## 192 # Connect to a Redis server. 193 method connect {{a_host {}} {a_port {}}} { 194 if {$sock ne {}} { 195 my Error "Already connected" 196 } 197 198 if {$a_host ne {}} { 199 set host $a_host 200 } 201 if {$a_port ne {}} { 202 set port $a_port 203 } 204 205 if {[catch {socket $host $port} res]} { 206 my Error "Cannot connect: $res" 207 } 208 209 set sock $res 210 chan configure $sock -blocking 0 -translation binary 211 chan event $sock readable [list [self object] readEvent] 212 set checkEventId [after 500 [list [self object] checkConnection]] 213 return {} 214 } 215 216 ## 217 # Reconnect to the Redis server. This tries to reconnect waiting up to 10 218 # seconds in total. 219 method reconnect {{i 0}} { 220 my disconnect 221 set maxAttempts 20 222 set waitMillis 500 223 if {$i == $maxAttempts} { 224 my Error {Could not reconnect to Redis server} 225 } 226 set saveErrorCallback $errorCallback 227 my errorHandler {} 228 set err [catch {my connect $host $port} msg] 229 my errorHandler $saveErrorCallback 230 if {$err} { 231 set reconnectEventId \ 232 [after $waitMillis [list [self object] reconnect [incr i]]] 233 } 234 return {} 235 } 236 237 ## 238 # Periodically check whether a connection has been interrupted. 239 method checkConnection {} { 240 if {$sock ne {} && ![catch {chan eof $sock} err] && !$err} { 241 set checkEventId [after 500 [list [self object] checkConnection]] 242 } else { 243 my disconnect 244 set activity 1 245 } 246 } 247 248 ## 249 # Check whether we're currently connected to a Retcl server. 250 method connected {} { 251 expr {$sock ne {}} 252 } 253 254 ## 255 # Disconnect from the Redis server. 256 method disconnect {} { 257 catch {close $sock} 258 set sock {} 259 after cancel $checkEventId 260 after cancel $reconnectEventId 261 } 262 263 ## 264 # Turn on asynchronous operation 265 method +async {} { 266 set async 1 267 } 268 export +async 269 270 ## 271 # Turn off asynchronous operation. 272 method -async {} { 273 set async 0 274 } 275 export -async 276 277 ## 278 # Query the currenct asynchronous operation mode. 279 method ?async {} { 280 set async 281 } 282 export ?async 283 284 ## 285 # Turn on keeping results in the cache. 286 method +keepCache {} { 287 set keepCache 1 288 } 289 export +keepCache 290 291 ## 292 # Turn off keeping results in the cache. 293 method -keepCache {} { 294 set keepCache 0 295 } 296 export -keepCache 297 298 ## 299 # Query the current cache keeping mode. 300 method ?keepCache {} { 301 set keepCache 302 } 303 export ?keepCache 304 305 ## 306 # Setup and error callback or restore the default one ([error]). The 307 # cmdPrefix is passed an additional argument containing the error message. 308 method errorHandler {{cmdPrefix {}}} { 309 if {$cmdPrefix eq {}} { 310 set errorCallback error 311 } else { 312 set errorCallback $cmdPrefix 313 } 314 } 315 316 ## 317 # Get the result of a previously issued command. If the response has not 318 # yet arrived, the command waits until it's available, or returns the empty 319 # string if -async is given. 320 method result {args} { 321 322 switch [llength $args] { 323 1 { 324 set asyncArg 0 325 set cmdId $args 326 } 327 2 { 328 if {[lindex $args 0] ne {-async}} { 329 my Error {wrong # args: should be "result ?-async? cmdId"} 330 } 331 set asyncArg 1 332 set cmdId [lindex $args 1] 333 } 334 default { 335 my Error {wrong # args: should be "result ?-async? cmdId"} 336 } 337 } 338 339 if {![dict exists $resultsCache $cmdId]} { 340 my Error "Invalid command id: $cmdId" 341 } 342 343 while {1} { 344 if {[dict get $resultsCache $cmdId status] == 1} { 345 set res [dict get $resultsCache $cmdId response] 346 if {!$keepCache} { 347 dict unset resultsCache $cmdId 348 } 349 return $res 350 } 351 352 if {$asyncArg} { 353 return {} 354 } 355 356 vwait [self namespace]::activity 357 358 if {![my connected]} { 359 my Error {Disconnected} 360 } 361 } 362 } 363 364 ## 365 # Check whether a result is ready for retrieval. 366 method resultReady {cmdId} { 367 if {![dict exists $resultsCache $cmdId]} { 368 my Error "Invalid command id: $cmdId" 369 } 370 dict get $resultsCache $cmdId status 371 } 372 373 ## 374 # Retrieve the type of a result, or the empty string if the result is not 375 # ready. 376 method resultType {cmdId} { 377 if {[catch {dict get $resultsCache $cmdId type} res]} { 378 set res {} 379 } 380 set res 381 } 382 383 ## 384 # Return a dictionary of the reuslts in form of cmdId => result. 385 method allResults {} { 386 387 set res [dict create] 388 389 dict for {cmdId state} $resultsCache { 390 dict with state { 391 if {$status == 1} { 392 dict set res $cmdId $response 393 dict set res $cmdId:type $type 394 } 395 } 396 } 397 398 return $res 399 } 400 401 ## 402 # Clear results from the cache. 403 method clearResult {{clearCmdId {}}} { 404 405 if {$clearCmdId eq {}} { 406 set resultsCache [dict filter $resultsCache script {cmdId _} { 407 expr {![dict get $resultsCache $cmdId status]} 408 }] 409 } else { 410 set resultsCache [dict filter $resultsCache script {cmdId _} { 411 expr {$clearCmdId ne $cmdId || 412 ![dict get $resultsCache $cmdId status]} 413 }] 414 } 415 return {} 416 } 417 418 ## 419 # Execute all Redis commands inside a script within a single pipeline. 420 method pipeline {script} { 421 422 my LockPipeline 423 try { 424 uplevel [list eval $script] 425 } on error msg { 426 my Error $msg 427 } finally { 428 my ReleasePipeline 429 } 430 } 431 432 ## 433 # Set a callback to be called when a message is pushed from the server 434 # because of a PUBLISH command issued by some other client. Item can be a 435 # channel (see SUBSCRIBE) or a pattern (see PSUBSCRIBE). An empty callback 436 # removes the callback previously set on the same item, if any. This 437 # method returns the previously set callback, if any. This method does 438 # not automatically send a (P)SUBSCRIBE message to the Redis server. 439 # 440 # See http://redis.io/topics/pubsub. 441 method callback {item {callback {}}} { 442 try { 443 dict get $callbacks $item 444 } on error {} { 445 set prev {} 446 } on ok prev {} 447 448 if {$callback eq {}} { 449 dict unset callbacks $item 450 } else { 451 dict set callbacks $item $callback 452 } 453 454 return $prev 455 } 456 457 ## 458 # The unknown handler handles unknown methods as Redis commands 459 method unknown {args} { 460 461 if {![llength $args]} { 462 return 463 } 464 465 set sendAsync $async 466 set callback {} 467 468 switch [lindex $args 0] { 469 {-sync} { 470 # Send synchronously and return the result, when available 471 set sendAsync 0 472 set args [lrange $args 1 end] 473 } 474 {-cb} { 475 # Be notified via a callback 476 set callback [lindex $args 1] 477 set args [lrange $args 2 end] 478 } 479 } 480 481 if {![llength $args]} { 482 return 483 } 484 485 if {[string tolower [lindex $args 0]] in $::retcl::pubSubCommands} { 486 # These messages are part of the Pub/Sub protocol; we don't expect 487 # a response. 488 set cmdId {} 489 } else { 490 set cmdId "rds:[incr cmdIdNumber]" 491 dict set resultsCache $cmdId status 0 492 dict set resultsCache $cmdId callback $callback 493 } 494 495 my Send $args 496 497 if {$sendAsync || $callback ne {} || $cmdId eq {}} { 498 # Asynchronous send, return the command identifier 499 return $cmdId 500 } else { 501 # Synchronous send, wait for the result and return it 502 set res [my result $cmdId] 503 my clearResult $cmdId 504 return $res 505 } 506 } 507 508 ########################################################################## 509 # The following methods are private to the retcl library and not intended 510 # to be used by consumers. 511 ########################################################################## 512 513 ## 514 # Handle a read event from the socket. 515 # 516 # Must be public (starts with a lower case letter) because it's used in the 517 # event loop. 518 method readEvent {} { 519 set activity 1 520 if {[chan eof $sock]} { 521 my disconnect 522 return 523 } 524 525 append readBuf [read $sock] 526 527 set idx 0 528 while {$idx < [string length $readBuf]} { 529 set result [my ParseBuf $readBuf $idx] 530 if {$result eq {}} { 531 break 532 } 533 534 lassign $result idx type data 535 my HandleResult $type $data 536 } 537 538 if {$idx != 0} { 539 set readBuf [string range $readBuf $idx end] 540 } 541 } 542 543 ## 544 # Parse the read buffer. starting at index startIdx. Returns a list 545 # consisting of: 546 # 547 # idx : index up to which the buffer has been parsed 548 # type : type of the object found 549 # value : value of the object 550 # 551 # or the empty string if no complete object could be parsed. 552 method ParseBuf {buffer startIdx} { 553 554 if {![string length $buffer]} { 555 return 556 } 557 558 set respCode [string index $buffer $startIdx] 559 set respType [my TypeName $respCode] 560 561 switch -- $respCode { 562 563 "+" - 564 "-" - 565 ":" { 566 # Simple Strings, Errors, and Integers are handled 567 # straight forward 568 lassign [my ParseLine $buffer $startIdx+1] eol line 569 if {$eol == -1} { 570 return 571 } 572 return [list [expr {$eol+2}] $respType $line] 573 } 574 575 "$" { 576 # Bulk Strings, the number of characters is specified in the 577 # first line. We handle Null values and empty strings right 578 # away. 579 lassign [my ParseLine $buffer $startIdx+1] eol bulkLen 580 if {$eol == -1} { 581 return 582 } 583 584 # Null Bulk String 585 if {$bulkLen eq {-1}} { 586 return [list [expr {$eol+2}] $respType (nil)] 587 } 588 589 # Empty Bulk String 590 if {$bulkLen eq {0}} { 591 return [list [expr {$eol+4}] $respType {}] 592 } 593 594 # Non-empty Bulk String 595 incr eol 2 596 set endIdx [expr {$eol+$bulkLen-1}] 597 if {[string length $buffer] < [expr {$endIdx+2}]} { 598 # Need to wait for more input 599 return 600 } 601 return [list [expr {$endIdx+3}] $respType [string range $buffer $eol $endIdx]] 602 } 603 604 "*" { 605 # Arrays, the number of elements is specified in the first 606 # line. 607 lassign [my ParseLine $buffer $startIdx+1] eol arrLen 608 if {$eol == -1} { 609 return 610 } 611 612 # Null Array 613 if {$arrLen eq {-1}} { 614 return [list [expr {$eol+2}] $respType (nil)] 615 } 616 617 # Empty array 618 if {$arrLen eq {0}} { 619 return [list [expr {$eol+2}] $respType {}] 620 } 621 622 # Non-empty Array 623 set idx [expr {$eol+2}] 624 set elems [list] 625 while {$arrLen} { 626 set elem [my ParseBuf $buffer $idx] 627 if {$elem eq {}} { 628 return {} 629 } 630 631 lappend elems [lindex $elem 2] 632 set idx [lindex $elem 0] 633 incr arrLen -1 634 } 635 636 return [list $idx $respType $elems] 637 } 638 639 default { 640 puts "Unhandled type: $buffer" 641 } 642 } 643 } 644 645 method ParseLine {buffer startIdx} { 646 set eol [string first "\r" $buffer $startIdx] 647 if {$eol == -1} { 648 return -1 649 } 650 set line [string range $buffer $startIdx $eol-1] 651 return [list $eol $line] 652 } 653 654 ## 655 # Handle a complete result read from the server. 656 method HandleResult {type body} { 657 # We have to handle two distinct cases: 658 # - a pushed message (can be message, subscribe, or ubsubscribe) 659 # - a command response 660 # 661 # The first case is handled by forwarding the message contents to a 662 # registered callback, if any exists. For message types we're done. For 663 # subscribe / unsubscribe types we also have to locate the 664 # corresponding request and clear it. 665 # 666 # The second case is handled by locating the corresponding request and 667 # filling in the result. 668 669 # If the response is a pushed message 670 if {$type eq {Array} && [lindex $body 0] in $::retcl::pubSubMessages} { 671 if {[lindex $body 0] eq {pmessage}} { 672 lassign $body type pattern item data 673 } else { 674 lassign $body type item data 675 set pattern $item 676 } 677 try { 678 dict get $callbacks $pattern 679 } on ok callback { 680 namespace eval :: $callback $type $pattern $item $data 681 } finally { 682 # It's a subscribe / unsubscribe. Clear the corresponding 683 # request, if any. 684 return 685 } 686 } 687 688 # 689 # If we get here, the response wasn't a pushed message 690 # 691 692 # Look for the first command without a result 693 set cmdIds [my FindPendingRequest] 694 if {$cmdIds eq {}} { 695 # All requests already have a response, something went bad 696 my Error "No request found for response $body" 697 } 698 set cmdId [lindex $cmdIds 0] 699 set cb [dict get $resultsCache $cmdId callback] 700 if {$cb ne {}} { 701 {*}$cb $cmdId $type $body 702 dict unset resultsCache $cmdId 703 } else { 704 dict set resultsCache $cmdId type $type 705 dict set resultsCache $cmdId response $body 706 dict set resultsCache $cmdId status 1 707 } 708 } 709 710 ## 711 # Get a return type string by its byte. 712 method TypeName {byte} { 713 if {[catch {dict get $::retcl::typeNames $byte} name]} { 714 my Error "Invalid type byte: $byte" 715 } 716 set name 717 } 718 719 ## 720 # Build the RESP representation of a command. 721 method BuildResp {args} { 722 set msg "*[llength $args]\r\n" 723 foreach word $args { 724 append msg "\$[string length $word]\r\n$word\r\n" 725 } 726 set msg 727 } 728 729 ## 730 # Send command(s) over to the Redis server. Each 731 # argument is a list of words composing the command. 732 method Send {args} { 733 if {[llength $args] > 1} { 734 my Error "Too many args: $args" 735 } 736 foreach cmd $args { 737 append pipeline "[my BuildResp {*}$cmd]\r\n" 738 } 739 740 if {!$isPipelined} { 741 my Flush 742 } 743 } 744 745 ## 746 # Return a list of responses from the server. A maximum number 747 # of results might be specified. This is mostly used internally 748 # to recursively call [my Recv] to receive Array elements. 749 method Recv {{includeTypes 1} {maxResults -1}} { 750 if {$maxResults == 0} { 751 return 752 } 753 754 set result [list] 755 756 while {[gets $sock line] > 0} { 757 set respCode [string index $line 0] 758 set respName [my TypeName $respCode] 759 set respData [string range $line 1 end] 760 761 switch $respCode { 762 + - 763 - - 764 : { 765 # Simple Strings, Errors, and Integers are handled 766 # straight forward 767 if {$includeTypes} { 768 lappend result [list $respName $respData] 769 } else { 770 lappend result $respData 771 } 772 } 773 $ { 774 # Bulk Strings, read the number of char specified in the 775 # first line. 776 # If it's -1, it's a (nil). 777 if {$respData eq {-1}} { 778 if {$includeTypes} { 779 lappend result [list BulkString (nil)] 780 } else { 781 lappend result (nil) 782 } 783 } else { 784 set bulk [read $sock $respData] 785 if {$includeTypes} { 786 lappend result [list $respName $bulk] 787 } else { 788 lappend result $bulk 789 } 790 gets $sock ;# consume the final end of line 791 } 792 } 793 * { 794 # Arrays, call [my Recv] recursively to get the number of 795 # elements 796 if {$includeTypes} { 797 lappend result [list Array [my Recv 0 $respData]] 798 } else { 799 lappend result [my Recv 0 $respData] 800 } 801 } 802 } 803 804 if {$maxResults != -1 && [incr maxResults -1] == 0} { 805 break 806 } 807 } 808 809 set result 810 } 811 812 ## 813 # Return a list of pending command ids. 814 method FindPendingRequest {} { 815 set allPending [list] 816 817 foreach cmdId [dict keys $resultsCache] { 818 if {[dict get $resultsCache $cmdId status] == 0} { 819 lappend allPending $cmdId 820 } 821 } 822 823 set allPending 824 } 825 826 ## 827 # Lock the pipeline. Redis commands are buffered and only sent to the 828 # server when ReleasePipeline is called. 829 method LockPipeline {} { 830 if {$isPipelined} { 831 my Error "Cannot nest pipelines" 832 } 833 set isPipelined 1 834 } 835 836 ## 837 # Release a pipeline and flush all buffered commands. 838 method ReleasePipeline {} { 839 if {!$isPipelined} { 840 my Error "No pipeline to release" 841 } 842 my Flush 843 } 844 845 ## 846 # Flush the output buffer 847 method Flush {} { 848 if {![my connected]} { 849 my Error {Disconnected} 850 } 851 if {[catch {puts -nonewline $sock $pipeline} err]} { 852 my Error $err 853 } 854 chan flush $sock 855 856 set isPipelined 0 857 set pipeline [list] 858 } 859 860 ## 861 # Error handler. 862 method Error {msg} { 863 {*}$errorCallback $msg 864 return -level 2 865 } 866} 867 868# vim: set ft=tcl ts=4 expandtab: 869