1module Groonga
2  module Sharding
3    class LogicalEnumerator
4      include Enumerable
5
6      attr_reader :target_range
7      attr_reader :logical_table
8      attr_reader :shard_key_name
9      def initialize(command_name, input, options={})
10        @command_name = command_name
11        @input = input
12        @options = options
13        initialize_parameters
14      end
15
16      def each(&block)
17        each_internal(:ascending, &block)
18      end
19
20      def reverse_each(&block)
21        each_internal(:descending, &block)
22      end
23
24      private
25      def each_internal(order)
26        context = Context.instance
27        each_shard_with_around(order) do |prev_shard, current_shard, next_shard|
28          shard_range_data = current_shard.range_data
29          shard_range = nil
30
31          if shard_range_data.day.nil?
32            if order == :ascending
33              if next_shard
34                next_shard_range_data = next_shard.range_data
35              else
36                next_shard_range_data = nil
37              end
38            else
39              if prev_shard
40                next_shard_range_data = prev_shard.range_data
41              else
42                next_shard_range_data = nil
43              end
44            end
45            max_day = compute_month_shard_max_day(shard_range_data.year,
46                                                  shard_range_data.month,
47                                                  next_shard_range_data)
48            shard_range = MonthShardRange.new(shard_range_data.year,
49                                              shard_range_data.month,
50                                              max_day)
51          else
52            shard_range = DayShardRange.new(shard_range_data.year,
53                                            shard_range_data.month,
54                                            shard_range_data.day)
55          end
56
57          yield(current_shard, shard_range)
58        end
59      end
60
61      def each_shard_with_around(order)
62        context = Context.instance
63        prefix = "#{@logical_table}_"
64
65        shards = [nil]
66        context.database.each_name(:prefix => prefix,
67                                   :order_by => :key,
68                                   :order => order) do |name|
69          shard_range_raw = name[prefix.size..-1]
70
71          case shard_range_raw
72          when /\A(\d{4})(\d{2})\z/
73            shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, nil)
74          when /\A(\d{4})(\d{2})(\d{2})\z/
75            shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, $3.to_i)
76          else
77            next
78          end
79
80          shards << Shard.new(name, @shard_key_name, shard_range_data)
81          next if shards.size < 3
82          yield(*shards)
83          shards.shift
84        end
85
86        if shards.size == 2
87          yield(shards[0], shards[1], nil)
88        end
89      end
90
91      private
92      def initialize_parameters
93        @logical_table = @input[:logical_table]
94        if @logical_table.nil?
95          raise InvalidArgument, "[#{@command_name}] logical_table is missing"
96        end
97
98        @shard_key_name = @input[:shard_key]
99        if @shard_key_name.nil?
100          require_shard_key = @options[:require_shard_key]
101          require_shard_key = true if require_shard_key.nil?
102          if require_shard_key
103            raise InvalidArgument, "[#{@command_name}] shard_key is missing"
104          end
105        end
106
107        @target_range = TargetRange.new(@command_name, @input)
108      end
109
110      def compute_month_shard_max_day(year, month, next_shard_range)
111        return nil if next_shard_range.nil?
112
113        return nil if month != next_shard_range.month
114
115        next_shard_range.day
116      end
117
118      class Shard
119        attr_reader :table_name, :key_name, :range_data
120        def initialize(table_name, key_name, range_data)
121          @table_name = table_name
122          @key_name = key_name
123          @range_data = range_data
124        end
125
126        def table
127          @table ||= Context.instance[@table_name]
128        end
129
130        def full_key_name
131          "#{@table_name}.#{@key_name}"
132        end
133
134        def key
135          @key ||= Context.instance[full_key_name]
136        end
137      end
138
139      class ShardRangeData
140        attr_reader :year, :month, :day
141        def initialize(year, month, day)
142          @year = year
143          @month = month
144          @day = day
145        end
146
147        def to_suffix
148          if @day.nil?
149            "_%04d%02d" % [@year, @month]
150          else
151            "_%04d%02d%02d" % [@year, @month, @day]
152          end
153        end
154      end
155
156      class DayShardRange
157        attr_reader :year, :month, :day
158        def initialize(year, month, day)
159          @year = year
160          @month = month
161          @day = day
162        end
163
164        def least_over_time
165          next_day = Time.local(@year, @month, @day) + (60 * 60 * 24)
166          while next_day.day == @day # For leap second
167            next_day += 1
168          end
169          next_day
170        end
171
172        def min_time
173          Time.local(@year, @month, @day)
174        end
175
176        def include?(time)
177          @year == time.year and
178            @month == time.month and
179            @day == time.day
180        end
181      end
182
183      class MonthShardRange
184        attr_reader :year, :month, :max_day
185        def initialize(year, month, max_day)
186          @year = year
187          @month = month
188          @max_day = max_day
189        end
190
191        def least_over_time
192          if @max_day.nil?
193            if @month == 12
194              Time.local(@year + 1, 1, 1)
195            else
196              Time.local(@year, @month + 1, 1)
197            end
198          else
199            Time.local(@year, @month, @max_day)
200          end
201        end
202
203        def min_time
204          Time.local(@year, @month, 1)
205        end
206
207        def include?(time)
208          return false unless @year == time.year
209          return false unless @month == time.month
210
211          if @max_day.nil?
212            true
213          else
214            time.day <= @max_day
215          end
216        end
217      end
218
219      class TargetRange
220        attr_reader :min, :min_border
221        attr_reader :max, :max_border
222        def initialize(command_name, input)
223          @command_name = command_name
224          @input = input
225          @min = parse_value(:min)
226          @min_border = parse_border(:min_border)
227          @max = parse_value(:max)
228          @max_border = parse_border(:max_border)
229        end
230
231        def cover_type(shard_range)
232          return :all if @min.nil? and @max.nil?
233
234          if @min and @max
235            return :none unless in_min?(shard_range)
236            return :none unless in_max?(shard_range)
237            min_partial_p = in_min_partial?(shard_range)
238            max_partial_p = in_max_partial?(shard_range)
239            if min_partial_p and max_partial_p
240              :partial_min_and_max
241            elsif min_partial_p
242              :partial_min
243            elsif max_partial_p
244              :partial_max
245            else
246              :all
247            end
248          elsif @min
249            return :none unless in_min?(shard_range)
250            if in_min_partial?(shard_range)
251              :partial_min
252            else
253              :all
254            end
255          else
256            return :none unless in_max?(shard_range)
257            if in_max_partial?(shard_range)
258              :partial_max
259            else
260              :all
261            end
262          end
263        end
264
265        private
266        def parse_value(name)
267          value = @input[name]
268          return nil if value.nil?
269
270          Converter.convert(value, Time)
271        end
272
273        def parse_border(name)
274          border = @input[name]
275          return :include if border.nil?
276
277          case border
278          when "include"
279            :include
280          when "exclude"
281            :exclude
282          else
283            message =
284              "[#{@command_name}] #{name} must be \"include\" or \"exclude\": " +
285              "<#{border}>"
286            raise InvalidArgument, message
287          end
288        end
289
290        def in_min?(shard_range)
291          @min < shard_range.least_over_time
292        end
293
294        def in_min_partial?(shard_range)
295          return false unless shard_range.include?(@min)
296
297          return true if @min_border == :exclude
298
299          shard_range.min_time != @min
300        end
301
302        def in_max?(shard_range)
303          max_base_time = shard_range.min_time
304          if @max_border == :include
305            @max >= max_base_time
306          else
307            @max > max_base_time
308          end
309        end
310
311        def in_max_partial?(shard_range)
312          shard_range.include?(@max)
313        end
314      end
315    end
316  end
317end
318