1# frozen_string_literal: false 2require 'monitor' 3require 'drb/drb' 4require_relative 'rinda' 5require 'forwardable' 6 7module Rinda 8 9 ## 10 # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) 11 # together with expiry and cancellation data. 12 13 class TupleEntry 14 15 include DRbUndumped 16 17 attr_accessor :expires 18 19 ## 20 # Creates a TupleEntry based on +ary+ with an optional renewer or expiry 21 # time +sec+. 22 # 23 # A renewer must implement the +renew+ method which returns a Numeric, 24 # nil, or true to indicate when the tuple has expired. 25 26 def initialize(ary, sec=nil) 27 @cancel = false 28 @expires = nil 29 @tuple = make_tuple(ary) 30 @renewer = nil 31 renew(sec) 32 end 33 34 ## 35 # Marks this TupleEntry as canceled. 36 37 def cancel 38 @cancel = true 39 end 40 41 ## 42 # A TupleEntry is dead when it is canceled or expired. 43 44 def alive? 45 !canceled? && !expired? 46 end 47 48 ## 49 # Return the object which makes up the tuple itself: the Array 50 # or Hash. 51 52 def value; @tuple.value; end 53 54 ## 55 # Returns the canceled status. 56 57 def canceled?; @cancel; end 58 59 ## 60 # Has this tuple expired? (true/false). 61 # 62 # A tuple has expired when its expiry timer based on the +sec+ argument to 63 # #initialize runs out. 64 65 def expired? 66 return true unless @expires 67 return false if @expires > Time.now 68 return true if @renewer.nil? 69 renew(@renewer) 70 return true unless @expires 71 return @expires < Time.now 72 end 73 74 ## 75 # Reset the expiry time according to +sec_or_renewer+. 76 # 77 # +nil+:: it is set to expire in the far future. 78 # +true+:: it has expired. 79 # Numeric:: it will expire in that many seconds. 80 # 81 # Otherwise the argument refers to some kind of renewer object 82 # which will reset its expiry time. 83 84 def renew(sec_or_renewer) 85 sec, @renewer = get_renewer(sec_or_renewer) 86 @expires = make_expires(sec) 87 end 88 89 ## 90 # Returns an expiry Time based on +sec+ which can be one of: 91 # Numeric:: +sec+ seconds into the future 92 # +true+:: the expiry time is the start of 1970 (i.e. expired) 93 # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when 94 # UNIX clocks will die) 95 96 def make_expires(sec=nil) 97 case sec 98 when Numeric 99 Time.now + sec 100 when true 101 Time.at(1) 102 when nil 103 Time.at(2**31-1) 104 end 105 end 106 107 ## 108 # Retrieves +key+ from the tuple. 109 110 def [](key) 111 @tuple[key] 112 end 113 114 ## 115 # Fetches +key+ from the tuple. 116 117 def fetch(key) 118 @tuple.fetch(key) 119 end 120 121 ## 122 # The size of the tuple. 123 124 def size 125 @tuple.size 126 end 127 128 ## 129 # Creates a Rinda::Tuple for +ary+. 130 131 def make_tuple(ary) 132 Rinda::Tuple.new(ary) 133 end 134 135 private 136 137 ## 138 # Returns a valid argument to make_expires and the renewer or nil. 139 # 140 # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual 141 # renewer). Otherwise it returns an expiry value from calling +it.renew+ 142 # and the renewer. 143 144 def get_renewer(it) 145 case it 146 when Numeric, true, nil 147 return it, nil 148 else 149 begin 150 return it.renew, it 151 rescue Exception 152 return it, nil 153 end 154 end 155 end 156 157 end 158 159 ## 160 # A TemplateEntry is a Template together with expiry and cancellation data. 161 162 class TemplateEntry < TupleEntry 163 ## 164 # Matches this TemplateEntry against +tuple+. See Template#match for 165 # details on how a Template matches a Tuple. 166 167 def match(tuple) 168 @tuple.match(tuple) 169 end 170 171 alias === match 172 173 def make_tuple(ary) # :nodoc: 174 Rinda::Template.new(ary) 175 end 176 177 end 178 179 ## 180 # <i>Documentation?</i> 181 182 class WaitTemplateEntry < TemplateEntry 183 184 attr_reader :found 185 186 def initialize(place, ary, expires=nil) 187 super(ary, expires) 188 @place = place 189 @cond = place.new_cond 190 @found = nil 191 end 192 193 def cancel 194 super 195 signal 196 end 197 198 def wait 199 @cond.wait 200 end 201 202 def read(tuple) 203 @found = tuple 204 signal 205 end 206 207 def signal 208 @place.synchronize do 209 @cond.signal 210 end 211 end 212 213 end 214 215 ## 216 # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of 217 # TupleSpace changes. You may receive either your subscribed event or the 218 # 'close' event when iterating over notifications. 219 # 220 # See TupleSpace#notify_event for valid notification types. 221 # 222 # == Example 223 # 224 # ts = Rinda::TupleSpace.new 225 # observer = ts.notify 'write', [nil] 226 # 227 # Thread.start do 228 # observer.each { |t| p t } 229 # end 230 # 231 # 3.times { |i| ts.write [i] } 232 # 233 # Outputs: 234 # 235 # ['write', [0]] 236 # ['write', [1]] 237 # ['write', [2]] 238 239 class NotifyTemplateEntry < TemplateEntry 240 241 ## 242 # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that 243 # match +tuple+. 244 245 def initialize(place, event, tuple, expires=nil) 246 ary = [event, Rinda::Template.new(tuple)] 247 super(ary, expires) 248 @queue = Thread::Queue.new 249 @done = false 250 end 251 252 ## 253 # Called by TupleSpace to notify this NotifyTemplateEntry of a new event. 254 255 def notify(ev) 256 @queue.push(ev) 257 end 258 259 ## 260 # Retrieves a notification. Raises RequestExpiredError when this 261 # NotifyTemplateEntry expires. 262 263 def pop 264 raise RequestExpiredError if @done 265 it = @queue.pop 266 @done = true if it[0] == 'close' 267 return it 268 end 269 270 ## 271 # Yields event/tuple pairs until this NotifyTemplateEntry expires. 272 273 def each # :yields: event, tuple 274 while !@done 275 it = pop 276 yield(it) 277 end 278 rescue 279 ensure 280 cancel 281 end 282 283 end 284 285 ## 286 # TupleBag is an unordered collection of tuples. It is the basis 287 # of Tuplespace. 288 289 class TupleBag 290 class TupleBin 291 extend Forwardable 292 def_delegators '@bin', :find_all, :delete_if, :each, :empty? 293 294 def initialize 295 @bin = [] 296 end 297 298 def add(tuple) 299 @bin.push(tuple) 300 end 301 302 def delete(tuple) 303 idx = @bin.rindex(tuple) 304 @bin.delete_at(idx) if idx 305 end 306 307 def find 308 @bin.reverse_each do |x| 309 return x if yield(x) 310 end 311 nil 312 end 313 end 314 315 def initialize # :nodoc: 316 @hash = {} 317 @enum = enum_for(:each_entry) 318 end 319 320 ## 321 # +true+ if the TupleBag to see if it has any expired entries. 322 323 def has_expires? 324 @enum.find do |tuple| 325 tuple.expires 326 end 327 end 328 329 ## 330 # Add +tuple+ to the TupleBag. 331 332 def push(tuple) 333 key = bin_key(tuple) 334 @hash[key] ||= TupleBin.new 335 @hash[key].add(tuple) 336 end 337 338 ## 339 # Removes +tuple+ from the TupleBag. 340 341 def delete(tuple) 342 key = bin_key(tuple) 343 bin = @hash[key] 344 return nil unless bin 345 bin.delete(tuple) 346 @hash.delete(key) if bin.empty? 347 tuple 348 end 349 350 ## 351 # Finds all live tuples that match +template+. 352 def find_all(template) 353 bin_for_find(template).find_all do |tuple| 354 tuple.alive? && template.match(tuple) 355 end 356 end 357 358 ## 359 # Finds a live tuple that matches +template+. 360 361 def find(template) 362 bin_for_find(template).find do |tuple| 363 tuple.alive? && template.match(tuple) 364 end 365 end 366 367 ## 368 # Finds all tuples in the TupleBag which when treated as templates, match 369 # +tuple+ and are alive. 370 371 def find_all_template(tuple) 372 @enum.find_all do |template| 373 template.alive? && template.match(tuple) 374 end 375 end 376 377 ## 378 # Delete tuples which dead tuples from the TupleBag, returning the deleted 379 # tuples. 380 381 def delete_unless_alive 382 deleted = [] 383 @hash.each do |key, bin| 384 bin.delete_if do |tuple| 385 if tuple.alive? 386 false 387 else 388 deleted.push(tuple) 389 true 390 end 391 end 392 end 393 deleted 394 end 395 396 private 397 def each_entry(&blk) 398 @hash.each do |k, v| 399 v.each(&blk) 400 end 401 end 402 403 def bin_key(tuple) 404 head = tuple[0] 405 if head.class == Symbol 406 return head 407 else 408 false 409 end 410 end 411 412 def bin_for_find(template) 413 key = bin_key(template) 414 key ? @hash.fetch(key, []) : @enum 415 end 416 end 417 418 ## 419 # The Tuplespace manages access to the tuples it contains, 420 # ensuring mutual exclusion requirements are met. 421 # 422 # The +sec+ option for the write, take, move, read and notify methods may 423 # either be a number of seconds or a Renewer object. 424 425 class TupleSpace 426 427 include DRbUndumped 428 include MonitorMixin 429 430 ## 431 # Creates a new TupleSpace. +period+ is used to control how often to look 432 # for dead tuples after modifications to the TupleSpace. 433 # 434 # If no dead tuples are found +period+ seconds after the last 435 # modification, the TupleSpace will stop looking for dead tuples. 436 437 def initialize(period=60) 438 super() 439 @bag = TupleBag.new 440 @read_waiter = TupleBag.new 441 @take_waiter = TupleBag.new 442 @notify_waiter = TupleBag.new 443 @period = period 444 @keeper = nil 445 end 446 447 ## 448 # Adds +tuple+ 449 450 def write(tuple, sec=nil) 451 entry = create_entry(tuple, sec) 452 synchronize do 453 if entry.expired? 454 @read_waiter.find_all_template(entry).each do |template| 455 template.read(tuple) 456 end 457 notify_event('write', entry.value) 458 notify_event('delete', entry.value) 459 else 460 @bag.push(entry) 461 start_keeper if entry.expires 462 @read_waiter.find_all_template(entry).each do |template| 463 template.read(tuple) 464 end 465 @take_waiter.find_all_template(entry).each do |template| 466 template.signal 467 end 468 notify_event('write', entry.value) 469 end 470 end 471 entry 472 end 473 474 ## 475 # Removes +tuple+ 476 477 def take(tuple, sec=nil, &block) 478 move(nil, tuple, sec, &block) 479 end 480 481 ## 482 # Moves +tuple+ to +port+. 483 484 def move(port, tuple, sec=nil) 485 template = WaitTemplateEntry.new(self, tuple, sec) 486 yield(template) if block_given? 487 synchronize do 488 entry = @bag.find(template) 489 if entry 490 port.push(entry.value) if port 491 @bag.delete(entry) 492 notify_event('take', entry.value) 493 return port ? nil : entry.value 494 end 495 raise RequestExpiredError if template.expired? 496 497 begin 498 @take_waiter.push(template) 499 start_keeper if template.expires 500 while true 501 raise RequestCanceledError if template.canceled? 502 raise RequestExpiredError if template.expired? 503 entry = @bag.find(template) 504 if entry 505 port.push(entry.value) if port 506 @bag.delete(entry) 507 notify_event('take', entry.value) 508 return port ? nil : entry.value 509 end 510 template.wait 511 end 512 ensure 513 @take_waiter.delete(template) 514 end 515 end 516 end 517 518 ## 519 # Reads +tuple+, but does not remove it. 520 521 def read(tuple, sec=nil) 522 template = WaitTemplateEntry.new(self, tuple, sec) 523 yield(template) if block_given? 524 synchronize do 525 entry = @bag.find(template) 526 return entry.value if entry 527 raise RequestExpiredError if template.expired? 528 529 begin 530 @read_waiter.push(template) 531 start_keeper if template.expires 532 template.wait 533 raise RequestCanceledError if template.canceled? 534 raise RequestExpiredError if template.expired? 535 return template.found 536 ensure 537 @read_waiter.delete(template) 538 end 539 end 540 end 541 542 ## 543 # Returns all tuples matching +tuple+. Does not remove the found tuples. 544 545 def read_all(tuple) 546 template = WaitTemplateEntry.new(self, tuple, nil) 547 synchronize do 548 entry = @bag.find_all(template) 549 entry.collect do |e| 550 e.value 551 end 552 end 553 end 554 555 ## 556 # Registers for notifications of +event+. Returns a NotifyTemplateEntry. 557 # See NotifyTemplateEntry for examples of how to listen for notifications. 558 # 559 # +event+ can be: 560 # 'write':: A tuple was added 561 # 'take':: A tuple was taken or moved 562 # 'delete':: A tuple was lost after being overwritten or expiring 563 # 564 # The TupleSpace will also notify you of the 'close' event when the 565 # NotifyTemplateEntry has expired. 566 567 def notify(event, tuple, sec=nil) 568 template = NotifyTemplateEntry.new(self, event, tuple, sec) 569 synchronize do 570 @notify_waiter.push(template) 571 end 572 template 573 end 574 575 private 576 577 def create_entry(tuple, sec) 578 TupleEntry.new(tuple, sec) 579 end 580 581 ## 582 # Removes dead tuples. 583 584 def keep_clean 585 synchronize do 586 @read_waiter.delete_unless_alive.each do |e| 587 e.signal 588 end 589 @take_waiter.delete_unless_alive.each do |e| 590 e.signal 591 end 592 @notify_waiter.delete_unless_alive.each do |e| 593 e.notify(['close']) 594 end 595 @bag.delete_unless_alive.each do |e| 596 notify_event('delete', e.value) 597 end 598 end 599 end 600 601 ## 602 # Notifies all registered listeners for +event+ of a status change of 603 # +tuple+. 604 605 def notify_event(event, tuple) 606 ev = [event, tuple] 607 @notify_waiter.find_all_template(ev).each do |template| 608 template.notify(ev) 609 end 610 end 611 612 ## 613 # Creates a thread that scans the tuplespace for expired tuples. 614 615 def start_keeper 616 return if @keeper && @keeper.alive? 617 @keeper = Thread.new do 618 while true 619 sleep(@period) 620 synchronize do 621 break unless need_keeper? 622 keep_clean 623 end 624 end 625 end 626 end 627 628 ## 629 # Checks the tuplespace to see if it needs cleaning. 630 631 def need_keeper? 632 return true if @bag.has_expires? 633 return true if @read_waiter.has_expires? 634 return true if @take_waiter.has_expires? 635 return true if @notify_waiter.has_expires? 636 end 637 638 end 639 640end 641 642