1# frozen_string_literal: false
2require 'monitor'
3require 'drb/drb'
4require_relative 'rinda'
5require 'forwardable'
6
7module Rinda
8
9  ##
10  # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
11  # together with expiry and cancellation data.
12
13  class TupleEntry
14
15    include DRbUndumped
16
17    attr_accessor :expires
18
19    ##
20    # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
21    # time +sec+.
22    #
23    # A renewer must implement the +renew+ method which returns a Numeric,
24    # nil, or true to indicate when the tuple has expired.
25
26    def initialize(ary, sec=nil)
27      @cancel = false
28      @expires = nil
29      @tuple = make_tuple(ary)
30      @renewer = nil
31      renew(sec)
32    end
33
34    ##
35    # Marks this TupleEntry as canceled.
36
37    def cancel
38      @cancel = true
39    end
40
41    ##
42    # A TupleEntry is dead when it is canceled or expired.
43
44    def alive?
45      !canceled? && !expired?
46    end
47
48    ##
49    # Return the object which makes up the tuple itself: the Array
50    # or Hash.
51
52    def value; @tuple.value; end
53
54    ##
55    # Returns the canceled status.
56
57    def canceled?; @cancel; end
58
59    ##
60    # Has this tuple expired? (true/false).
61    #
62    # A tuple has expired when its expiry timer based on the +sec+ argument to
63    # #initialize runs out.
64
65    def expired?
66      return true unless @expires
67      return false if @expires > Time.now
68      return true if @renewer.nil?
69      renew(@renewer)
70      return true unless @expires
71      return @expires < Time.now
72    end
73
74    ##
75    # Reset the expiry time according to +sec_or_renewer+.
76    #
77    # +nil+::    it is set to expire in the far future.
78    # +true+::   it has expired.
79    # Numeric::  it will expire in that many seconds.
80    #
81    # Otherwise the argument refers to some kind of renewer object
82    # which will reset its expiry time.
83
84    def renew(sec_or_renewer)
85      sec, @renewer = get_renewer(sec_or_renewer)
86      @expires = make_expires(sec)
87    end
88
89    ##
90    # Returns an expiry Time based on +sec+ which can be one of:
91    # Numeric:: +sec+ seconds into the future
92    # +true+::  the expiry time is the start of 1970 (i.e. expired)
93    # +nil+::   it is  Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
94    #           UNIX clocks will die)
95
96    def make_expires(sec=nil)
97      case sec
98      when Numeric
99        Time.now + sec
100      when true
101        Time.at(1)
102      when nil
103        Time.at(2**31-1)
104      end
105    end
106
107    ##
108    # Retrieves +key+ from the tuple.
109
110    def [](key)
111      @tuple[key]
112    end
113
114    ##
115    # Fetches +key+ from the tuple.
116
117    def fetch(key)
118      @tuple.fetch(key)
119    end
120
121    ##
122    # The size of the tuple.
123
124    def size
125      @tuple.size
126    end
127
128    ##
129    # Creates a Rinda::Tuple for +ary+.
130
131    def make_tuple(ary)
132      Rinda::Tuple.new(ary)
133    end
134
135    private
136
137    ##
138    # Returns a valid argument to make_expires and the renewer or nil.
139    #
140    # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
141    # renewer).  Otherwise it returns an expiry value from calling +it.renew+
142    # and the renewer.
143
144    def get_renewer(it)
145      case it
146      when Numeric, true, nil
147        return it, nil
148      else
149        begin
150          return it.renew, it
151        rescue Exception
152          return it, nil
153        end
154      end
155    end
156
157  end
158
159  ##
160  # A TemplateEntry is a Template together with expiry and cancellation data.
161
162  class TemplateEntry < TupleEntry
163    ##
164    # Matches this TemplateEntry against +tuple+.  See Template#match for
165    # details on how a Template matches a Tuple.
166
167    def match(tuple)
168      @tuple.match(tuple)
169    end
170
171    alias === match
172
173    def make_tuple(ary) # :nodoc:
174      Rinda::Template.new(ary)
175    end
176
177  end
178
179  ##
180  # <i>Documentation?</i>
181
182  class WaitTemplateEntry < TemplateEntry
183
184    attr_reader :found
185
186    def initialize(place, ary, expires=nil)
187      super(ary, expires)
188      @place = place
189      @cond = place.new_cond
190      @found = nil
191    end
192
193    def cancel
194      super
195      signal
196    end
197
198    def wait
199      @cond.wait
200    end
201
202    def read(tuple)
203      @found = tuple
204      signal
205    end
206
207    def signal
208      @place.synchronize do
209        @cond.signal
210      end
211    end
212
213  end
214
215  ##
216  # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
217  # TupleSpace changes.  You may receive either your subscribed event or the
218  # 'close' event when iterating over notifications.
219  #
220  # See TupleSpace#notify_event for valid notification types.
221  #
222  # == Example
223  #
224  #   ts = Rinda::TupleSpace.new
225  #   observer = ts.notify 'write', [nil]
226  #
227  #   Thread.start do
228  #     observer.each { |t| p t }
229  #   end
230  #
231  #   3.times { |i| ts.write [i] }
232  #
233  # Outputs:
234  #
235  #   ['write', [0]]
236  #   ['write', [1]]
237  #   ['write', [2]]
238
239  class NotifyTemplateEntry < TemplateEntry
240
241    ##
242    # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
243    # match +tuple+.
244
245    def initialize(place, event, tuple, expires=nil)
246      ary = [event, Rinda::Template.new(tuple)]
247      super(ary, expires)
248      @queue = Thread::Queue.new
249      @done = false
250    end
251
252    ##
253    # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
254
255    def notify(ev)
256      @queue.push(ev)
257    end
258
259    ##
260    # Retrieves a notification.  Raises RequestExpiredError when this
261    # NotifyTemplateEntry expires.
262
263    def pop
264      raise RequestExpiredError if @done
265      it = @queue.pop
266      @done = true if it[0] == 'close'
267      return it
268    end
269
270    ##
271    # Yields event/tuple pairs until this NotifyTemplateEntry expires.
272
273    def each # :yields: event, tuple
274      while !@done
275        it = pop
276        yield(it)
277      end
278    rescue
279    ensure
280      cancel
281    end
282
283  end
284
285  ##
286  # TupleBag is an unordered collection of tuples. It is the basis
287  # of Tuplespace.
288
289  class TupleBag
290    class TupleBin
291      extend Forwardable
292      def_delegators '@bin', :find_all, :delete_if, :each, :empty?
293
294      def initialize
295        @bin = []
296      end
297
298      def add(tuple)
299        @bin.push(tuple)
300      end
301
302      def delete(tuple)
303        idx = @bin.rindex(tuple)
304        @bin.delete_at(idx) if idx
305      end
306
307      def find
308        @bin.reverse_each do |x|
309          return x if yield(x)
310        end
311        nil
312      end
313    end
314
315    def initialize # :nodoc:
316      @hash = {}
317      @enum = enum_for(:each_entry)
318    end
319
320    ##
321    # +true+ if the TupleBag to see if it has any expired entries.
322
323    def has_expires?
324      @enum.find do |tuple|
325        tuple.expires
326      end
327    end
328
329    ##
330    # Add +tuple+ to the TupleBag.
331
332    def push(tuple)
333      key = bin_key(tuple)
334      @hash[key] ||= TupleBin.new
335      @hash[key].add(tuple)
336    end
337
338    ##
339    # Removes +tuple+ from the TupleBag.
340
341    def delete(tuple)
342      key = bin_key(tuple)
343      bin = @hash[key]
344      return nil unless bin
345      bin.delete(tuple)
346      @hash.delete(key) if bin.empty?
347      tuple
348    end
349
350    ##
351    # Finds all live tuples that match +template+.
352    def find_all(template)
353      bin_for_find(template).find_all do |tuple|
354        tuple.alive? && template.match(tuple)
355      end
356    end
357
358    ##
359    # Finds a live tuple that matches +template+.
360
361    def find(template)
362      bin_for_find(template).find do |tuple|
363        tuple.alive? && template.match(tuple)
364      end
365    end
366
367    ##
368    # Finds all tuples in the TupleBag which when treated as templates, match
369    # +tuple+ and are alive.
370
371    def find_all_template(tuple)
372      @enum.find_all do |template|
373        template.alive? && template.match(tuple)
374      end
375    end
376
377    ##
378    # Delete tuples which dead tuples from the TupleBag, returning the deleted
379    # tuples.
380
381    def delete_unless_alive
382      deleted = []
383      @hash.each do |key, bin|
384        bin.delete_if do |tuple|
385          if tuple.alive?
386            false
387          else
388            deleted.push(tuple)
389            true
390          end
391        end
392      end
393      deleted
394    end
395
396    private
397    def each_entry(&blk)
398      @hash.each do |k, v|
399        v.each(&blk)
400      end
401    end
402
403    def bin_key(tuple)
404      head = tuple[0]
405      if head.class == Symbol
406        return head
407      else
408        false
409      end
410    end
411
412    def bin_for_find(template)
413      key = bin_key(template)
414      key ? @hash.fetch(key, []) : @enum
415    end
416  end
417
418  ##
419  # The Tuplespace manages access to the tuples it contains,
420  # ensuring mutual exclusion requirements are met.
421  #
422  # The +sec+ option for the write, take, move, read and notify methods may
423  # either be a number of seconds or a Renewer object.
424
425  class TupleSpace
426
427    include DRbUndumped
428    include MonitorMixin
429
430    ##
431    # Creates a new TupleSpace.  +period+ is used to control how often to look
432    # for dead tuples after modifications to the TupleSpace.
433    #
434    # If no dead tuples are found +period+ seconds after the last
435    # modification, the TupleSpace will stop looking for dead tuples.
436
437    def initialize(period=60)
438      super()
439      @bag = TupleBag.new
440      @read_waiter = TupleBag.new
441      @take_waiter = TupleBag.new
442      @notify_waiter = TupleBag.new
443      @period = period
444      @keeper = nil
445    end
446
447    ##
448    # Adds +tuple+
449
450    def write(tuple, sec=nil)
451      entry = create_entry(tuple, sec)
452      synchronize do
453        if entry.expired?
454          @read_waiter.find_all_template(entry).each do |template|
455            template.read(tuple)
456          end
457          notify_event('write', entry.value)
458          notify_event('delete', entry.value)
459        else
460          @bag.push(entry)
461          start_keeper if entry.expires
462          @read_waiter.find_all_template(entry).each do |template|
463            template.read(tuple)
464          end
465          @take_waiter.find_all_template(entry).each do |template|
466            template.signal
467          end
468          notify_event('write', entry.value)
469        end
470      end
471      entry
472    end
473
474    ##
475    # Removes +tuple+
476
477    def take(tuple, sec=nil, &block)
478      move(nil, tuple, sec, &block)
479    end
480
481    ##
482    # Moves +tuple+ to +port+.
483
484    def move(port, tuple, sec=nil)
485      template = WaitTemplateEntry.new(self, tuple, sec)
486      yield(template) if block_given?
487      synchronize do
488        entry = @bag.find(template)
489        if entry
490          port.push(entry.value) if port
491          @bag.delete(entry)
492          notify_event('take', entry.value)
493          return port ? nil : entry.value
494        end
495        raise RequestExpiredError if template.expired?
496
497        begin
498          @take_waiter.push(template)
499          start_keeper if template.expires
500          while true
501            raise RequestCanceledError if template.canceled?
502            raise RequestExpiredError if template.expired?
503            entry = @bag.find(template)
504            if entry
505              port.push(entry.value) if port
506              @bag.delete(entry)
507              notify_event('take', entry.value)
508              return port ? nil : entry.value
509            end
510            template.wait
511          end
512        ensure
513          @take_waiter.delete(template)
514        end
515      end
516    end
517
518    ##
519    # Reads +tuple+, but does not remove it.
520
521    def read(tuple, sec=nil)
522      template = WaitTemplateEntry.new(self, tuple, sec)
523      yield(template) if block_given?
524      synchronize do
525        entry = @bag.find(template)
526        return entry.value if entry
527        raise RequestExpiredError if template.expired?
528
529        begin
530          @read_waiter.push(template)
531          start_keeper if template.expires
532          template.wait
533          raise RequestCanceledError if template.canceled?
534          raise RequestExpiredError if template.expired?
535          return template.found
536        ensure
537          @read_waiter.delete(template)
538        end
539      end
540    end
541
542    ##
543    # Returns all tuples matching +tuple+.  Does not remove the found tuples.
544
545    def read_all(tuple)
546      template = WaitTemplateEntry.new(self, tuple, nil)
547      synchronize do
548        entry = @bag.find_all(template)
549        entry.collect do |e|
550          e.value
551        end
552      end
553    end
554
555    ##
556    # Registers for notifications of +event+.  Returns a NotifyTemplateEntry.
557    # See NotifyTemplateEntry for examples of how to listen for notifications.
558    #
559    # +event+ can be:
560    # 'write'::  A tuple was added
561    # 'take'::   A tuple was taken or moved
562    # 'delete':: A tuple was lost after being overwritten or expiring
563    #
564    # The TupleSpace will also notify you of the 'close' event when the
565    # NotifyTemplateEntry has expired.
566
567    def notify(event, tuple, sec=nil)
568      template = NotifyTemplateEntry.new(self, event, tuple, sec)
569      synchronize do
570        @notify_waiter.push(template)
571      end
572      template
573    end
574
575    private
576
577    def create_entry(tuple, sec)
578      TupleEntry.new(tuple, sec)
579    end
580
581    ##
582    # Removes dead tuples.
583
584    def keep_clean
585      synchronize do
586        @read_waiter.delete_unless_alive.each do |e|
587          e.signal
588        end
589        @take_waiter.delete_unless_alive.each do |e|
590          e.signal
591        end
592        @notify_waiter.delete_unless_alive.each do |e|
593          e.notify(['close'])
594        end
595        @bag.delete_unless_alive.each do |e|
596          notify_event('delete', e.value)
597        end
598      end
599    end
600
601    ##
602    # Notifies all registered listeners for +event+ of a status change of
603    # +tuple+.
604
605    def notify_event(event, tuple)
606      ev = [event, tuple]
607      @notify_waiter.find_all_template(ev).each do |template|
608        template.notify(ev)
609      end
610    end
611
612    ##
613    # Creates a thread that scans the tuplespace for expired tuples.
614
615    def start_keeper
616      return if @keeper && @keeper.alive?
617      @keeper = Thread.new do
618        while true
619          sleep(@period)
620          synchronize do
621            break unless need_keeper?
622            keep_clean
623          end
624        end
625      end
626    end
627
628    ##
629    # Checks the tuplespace to see if it needs cleaning.
630
631    def need_keeper?
632      return true if @bag.has_expires?
633      return true if @read_waiter.has_expires?
634      return true if @take_waiter.has_expires?
635      return true if @notify_waiter.has_expires?
636    end
637
638  end
639
640end
641
642