1import datetime
2
3import pytest
4
5from pypuppetdb.QueryBuilder import (AndOperator, EqualsOperator, ExtractOperator, FromOperator,
6                                     FunctionOperator, GreaterEqualOperator, GreaterOperator,
7                                     InOperator, LessEqualOperator, LessOperator, NotOperator,
8                                     NullOperator, OrOperator, RegexArrayOperator, RegexOperator,
9                                     SubqueryOperator)
10from pypuppetdb.errors import APIError
11
12
13class TestBinaryOperator(object):
14    """
15    Test the BinaryOperator object and all sub-classes.
16    """
17    def test_equal_operator(self):
18        op = EqualsOperator("certname", "test01")
19        assert str(op) == '["=", "certname", "test01"]'
20        assert repr(op) == 'Query: ["=", "certname", "test01"]'
21        assert str(op) == '["=", "certname", "test01"]'
22        assert str(EqualsOperator("clientversion", 91))\
23            == '["=", "clientversion", 91]'
24        assert str(EqualsOperator("start_time", "2016-05-11T23:22:48.709Z"))\
25            == '["=", "start_time", "2016-05-11T23:22:48.709Z"]'
26        assert str(EqualsOperator("is_virtual", True))\
27            == '["=", "is_virtual", true]'
28        assert str(EqualsOperator("bios_version", ["6.00", 5.00]))\
29            == '["=", "bios_version", ["6.00", 5.0]]'
30        assert str(EqualsOperator(['parameter', 'ensure'], "present"))\
31            == '["=", ["parameter", "ensure"], "present"]'
32        assert str(EqualsOperator(u"latest_report?", True))\
33            == '["=", "latest_report?", true]'
34        assert str(EqualsOperator("report_timestamp",
35                                  datetime.datetime(2016, 6, 11)))\
36            == '["=", "report_timestamp", "2016-06-11 00:00:00"]'
37
38    def test_greater_operator(self):
39        assert str(GreaterOperator("uptime", 150))\
40            == '[">", "uptime", 150]'
41        assert str(GreaterOperator("end_time", '2016-05-11T23:22:48.709Z'))\
42            == '[">", "end_time", "2016-05-11T23:22:48.709Z"]'
43        assert str(GreaterOperator(['parameter', 'version'], 4.0))\
44            == '[">", ["parameter", "version"], 4.0]'
45        assert str(GreaterOperator("report_timestamp",
46                                   datetime.datetime(2016, 6, 11)))\
47            == '[">", "report_timestamp", "2016-06-11 00:00:00"]'
48
49    def test_less_operator(self):
50        assert str(LessOperator("uptime_seconds", 300))\
51            == '["<", "uptime_seconds", 300]'
52        assert str(LessOperator(
53            "producer_timestamp",
54            "2016-05-11T23:53:29.962Z"))\
55            == '["<", "producer_timestamp", "2016-05-11T23:53:29.962Z"]'
56        assert str(LessOperator(['parameter', 'version'], 4.0))\
57            == '["<", ["parameter", "version"], 4.0]'
58        assert str(LessOperator("report_timestamp",
59                                datetime.datetime(2016, 6, 11)))\
60            == '["<", "report_timestamp", "2016-06-11 00:00:00"]'
61
62    def test_greater_equal_operator(self):
63        assert str(GreaterEqualOperator("uptime_days", 3))\
64            == '[">=", "uptime_days", 3]'
65        assert str(GreaterEqualOperator(
66            "start_time",
67            "2016-05-11T23:53:29.962Z"))\
68            == '[">=", "start_time", "2016-05-11T23:53:29.962Z"]'
69        assert str(GreaterEqualOperator(['parameter', 'version'], 4.0))\
70            == '[">=", ["parameter", "version"], 4.0]'
71        assert str(GreaterEqualOperator("report_timestamp",
72                                        datetime.datetime(2016, 6, 11)))\
73            == '[">=", "report_timestamp", "2016-06-11 00:00:00"]'
74
75    def test_less_equal_operator(self):
76        assert str(LessEqualOperator("kernelmajversion", 4))\
77            == '["<=", "kernelmajversion", 4]'
78        assert str(LessEqualOperator("end_time", "2016-05-11T23:53:29.962Z"))\
79            == '["<=", "end_time", "2016-05-11T23:53:29.962Z"]'
80        assert str(LessEqualOperator(['parameter', 'version'], 4.0))\
81            == '["<=", ["parameter", "version"], 4.0]'
82        assert str(LessEqualOperator("report_timestamp",
83                                     datetime.datetime(2016, 6, 11)))\
84            == '["<=", "report_timestamp", "2016-06-11 00:00:00"]'
85
86    def test_regex_operator(self):
87        assert str(RegexOperator("certname", "www\\d+\\.example\\.com"))\
88            == '["~", "certname", "www\\\\d+\\\\.example\\\\.com"]'
89        assert str(RegexOperator(['parameter', 'version'], "4\\.\\d+"))\
90            == '["~", ["parameter", "version"], "4\\\\.\\\\d+"]'
91
92    def test_regex_array_operator(self):
93        assert str(RegexArrayOperator(
94            "networking",
95            ["interfaces", "eno.*", "netmask"]))\
96            == '["~>", "networking", ["interfaces", "eno.*", "netmask"]]'
97
98    def test_null_operator(self):
99        assert str(NullOperator("expired", True))\
100            == '["null?", "expired", true]'
101        assert str(NullOperator("report_environment", False))\
102            == '["null?", "report_environment", false]'
103        with pytest.raises(APIError):
104            NullOperator("environment", "test")
105
106
107class TestBooleanOperator(object):
108    """
109    Test the BooleanOperator object and all sub-classes.
110    """
111    def test_and_operator(self):
112        op = AndOperator()
113        op.add(EqualsOperator("operatingsystem", "CentOS"))
114        op.add([EqualsOperator("architecture", "x86_64"),
115                GreaterOperator("operatingsystemmajrelease", 6)])
116
117        assert str(op) == '["and", ["=", "operatingsystem", "CentOS"], '\
118            '["=", "architecture", "x86_64"], '\
119            '[">", "operatingsystemmajrelease", 6]]'
120        assert repr(op) == 'Query: ["and", '\
121            '["=", "operatingsystem", "CentOS"], '\
122            '["=", "architecture", "x86_64"], '\
123            '[">", "operatingsystemmajrelease", 6]]'
124        assert str(op) == '["and", ["=", "operatingsystem", "CentOS"], ' \
125                          '["=", "architecture", "x86_64"], '\
126            '[">", "operatingsystemmajrelease", 6]]'
127
128        with pytest.raises(APIError):
129            op.add({"query1": '["=", "catalog_environment", "production"]'})
130
131    def test_or_operator(self):
132        op = OrOperator()
133        op.add(EqualsOperator("operatingsystem", "CentOS"))
134        op.add([EqualsOperator("architecture", "x86_64"),
135                GreaterOperator("operatingsystemmajrelease", 6)])
136
137        assert str(op) == '["or", ["=", "operatingsystem", "CentOS"], '\
138            '["=", "architecture", "x86_64"], '\
139            '[">", "operatingsystemmajrelease", 6]]'
140        assert repr(op) == 'Query: ["or", '\
141            '["=", "operatingsystem", "CentOS"], '\
142            '["=", "architecture", "x86_64"], '\
143            '[">", "operatingsystemmajrelease", 6]]'
144        assert str(op) == '["or", ["=", "operatingsystem", "CentOS"], ' \
145                          '["=", "architecture", "x86_64"], '\
146            '[">", "operatingsystemmajrelease", 6]]'
147
148        with pytest.raises(APIError):
149            op.add({"query1": '["=", "catalog_environment", "production"]'})
150
151    def test_not_operator(self):
152        op = NotOperator()
153        op.add(EqualsOperator("operatingsystem", "CentOS"))
154
155        assert str(op) == '["not", ["=", "operatingsystem", "CentOS"]]'
156        assert repr(op) == 'Query: ["not", ["=", "operatingsystem", "CentOS"]]'
157        assert str(op) == '["not", ["=", "operatingsystem", "CentOS"]]'
158
159        with pytest.raises(APIError):
160            op.add(GreaterOperator("operatingsystemmajrelease", 6))
161        with pytest.raises(APIError):
162            op.add([EqualsOperator("architecture", "x86_64"),
163                    GreaterOperator("operatingsystemmajrelease", 6)])
164
165    def test_and_with_no_operations(self):
166        op = AndOperator()
167
168        with pytest.raises(APIError):
169            repr(op)
170        with pytest.raises(APIError):
171            str(op)
172        with pytest.raises(APIError):
173            str(op)
174
175    def test_or_with_no_operations(self):
176        op = OrOperator()
177
178        with pytest.raises(APIError):
179            repr(op)
180        with pytest.raises(APIError):
181            str(op)
182        with pytest.raises(APIError):
183            str(op)
184
185    def test_not_with_no_operations(self):
186        op = NotOperator()
187
188        with pytest.raises(APIError):
189            repr(op)
190        with pytest.raises(APIError):
191            str(op)
192        with pytest.raises(APIError):
193            str(op)
194
195    def test_not_with_list(self):
196        op = NotOperator()
197
198        with pytest.raises(APIError):
199            str(op.add([EqualsOperator('clientversion', '4.5.1'),
200                        EqualsOperator('facterversion', '3.2.0')]))
201
202
203class TestExtractOperator(object):
204    """
205    Test the ExtractOperator object and all sub-classes.
206    """
207    def test_with_add_field(self):
208        op = ExtractOperator()
209
210        with pytest.raises(APIError):
211            repr(op)
212        with pytest.raises(APIError):
213            str(op)
214        with pytest.raises(APIError):
215            str(op)
216
217        op.add_field("certname")
218        op.add_field(['fact_environment', 'catalog_environment'])
219
220        assert repr(op) == 'Query: ["extract", '\
221            '["certname", "fact_environment", "catalog_environment"]]'
222        assert str(op) == '["extract", '\
223            '["certname", "fact_environment", "catalog_environment"]]'
224        assert str(op) == '["extract", ' \
225                          '["certname", "fact_environment", "catalog_environment"]]'
226
227        with pytest.raises(APIError):
228            op.add_field({'equal': 'operatingsystemrelease'})
229
230    def test_with_add_query(self):
231        op = ExtractOperator()
232
233        op.add_field(['certname', 'fact_environment', 'catalog_environment'])
234
235        with pytest.raises(APIError):
236            op.add_query({'less': 42, 'greater': 50})
237
238        op.add_query(EqualsOperator('domain', 'example.com'))
239
240        assert repr(op) == 'Query: ["extract", '\
241            '["certname", "fact_environment", "catalog_environment"], '\
242            '["=", "domain", "example.com"]]'
243        assert str(op) == '["extract", '\
244            '["certname", "fact_environment", "catalog_environment"], '\
245            '["=", "domain", "example.com"]]'
246        assert str(op) == '["extract", ' \
247                          '["certname", "fact_environment", "catalog_environment"], '\
248            '["=", "domain", "example.com"]]'
249
250        with pytest.raises(APIError):
251            op.add_query(GreaterOperator("processorcount", 1))
252
253    def test_with_add_group_by(self):
254        op = ExtractOperator()
255
256        op.add_field(['certname', 'fact_environment', 'catalog_environment'])
257        op.add_query(EqualsOperator('domain', 'example.com'))
258        op.add_group_by(["fact_environment", "catalog_environment"])
259
260        with pytest.raises(APIError):
261            op.add_group_by({"deactivated": False})
262
263        assert repr(op) == 'Query: ["extract", '\
264            '["certname", "fact_environment", "catalog_environment"], '\
265            '["=", "domain", "example.com"], '\
266            '["group_by", "fact_environment", "catalog_environment"]]'
267        assert str(op) == '["extract", '\
268            '["certname", "fact_environment", "catalog_environment"], '\
269            '["=", "domain", "example.com"], '\
270            '["group_by", "fact_environment", "catalog_environment"]]'
271        assert str(op) == '["extract", ' \
272                          '["certname", "fact_environment", "catalog_environment"], '\
273            '["=", "domain", "example.com"], '\
274            '["group_by", "fact_environment", "catalog_environment"]]'
275
276    def test_with_add_function_operator(self):
277        op = ExtractOperator()
278
279        op.add_field(FunctionOperator('to_string',
280                                      'producer_timestamp',
281                                      'FMDAY'))
282        op.add_field(FunctionOperator('count'))
283        op.add_group_by(FunctionOperator('to_string',
284                                         'producer_timestamp',
285                                         'FMDAY'))
286
287        assert str(op) == '["extract", '\
288            '[["function", "to_string", "producer_timestamp", "FMDAY"], '\
289            '["function", "count"]], '\
290            '["group_by", '\
291            '["function", "to_string", "producer_timestamp", "FMDAY"]]]'
292        assert repr(op) == 'Query: ["extract", '\
293            '[["function", "to_string", "producer_timestamp", "FMDAY"], '\
294            '["function", "count"]], '\
295            '["group_by", '\
296            '["function", "to_string", "producer_timestamp", "FMDAY"]]]'
297        assert str(op) == '["extract", ' \
298                          '[["function", "to_string", "producer_timestamp", "FMDAY"], '\
299            '["function", "count"]], '\
300            '["group_by", '\
301            '["function", "to_string", "producer_timestamp", "FMDAY"]]]'
302
303
304class TestFunctionOperator(object):
305    """
306    Test the FunctionOperator object and all sub-classes.
307    """
308    def test_count_function(self):
309        assert str(FunctionOperator('count')) == \
310            '["function", "count"]'
311        assert repr(FunctionOperator('count')) == \
312            'Query: ["function", "count"]'
313        assert str(FunctionOperator('count')) == \
314            '["function", "count"]'
315        assert str(FunctionOperator('count', 'domain')) == \
316            '["function", "count", "domain"]'
317        assert repr(FunctionOperator('count', 'domain')) == \
318            'Query: ["function", "count", "domain"]'
319        assert str(FunctionOperator('count', 'domain')) == \
320            '["function", "count", "domain"]'
321
322    def test_avg_function(self):
323        assert str(FunctionOperator('avg', 'uptime')) == \
324            '["function", "avg", "uptime"]'
325        assert repr(FunctionOperator('avg', 'uptime')) == \
326            'Query: ["function", "avg", "uptime"]'
327        assert str(FunctionOperator('avg', 'uptime')) == \
328            '["function", "avg", "uptime"]'
329
330        with pytest.raises(APIError):
331            FunctionOperator("avg")
332
333    def test_sum_function(self):
334        assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
335            '["function", "sum", "memoryfree_mb"]'
336        assert repr(FunctionOperator('sum', 'memoryfree_mb')) == \
337            'Query: ["function", "sum", "memoryfree_mb"]'
338        assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
339            '["function", "sum", "memoryfree_mb"]'
340
341        with pytest.raises(APIError):
342            FunctionOperator("sum")
343
344    def test_min_function(self):
345        assert str(FunctionOperator('min', 'kernelversion')) == \
346            '["function", "min", "kernelversion"]'
347        assert repr(FunctionOperator('min', 'kernelversion')) == \
348            'Query: ["function", "min", "kernelversion"]'
349        assert str(FunctionOperator('min', 'kernelversion')) == \
350            '["function", "min", "kernelversion"]'
351
352        with pytest.raises(APIError):
353            FunctionOperator("min")
354
355    def test_max_function(self):
356        assert str(FunctionOperator('max', 'facterversion')) == \
357            '["function", "max", "facterversion"]'
358        assert repr(FunctionOperator('max', 'facterversion')) == \
359            'Query: ["function", "max", "facterversion"]'
360        assert str(FunctionOperator('max', 'facterversion')) == \
361            '["function", "max", "facterversion"]'
362
363        with pytest.raises(APIError):
364            FunctionOperator("max")
365
366    def test_to_string_function(self):
367        assert str(FunctionOperator("to_string",
368                                    'producer_timestamp',
369                                    'FMDAY')) == \
370            '["function", "to_string", "producer_timestamp", "FMDAY"]'
371        assert repr(FunctionOperator("to_string",
372                                     'producer_timestamp',
373                                     'FMDAY')) == \
374            'Query: ["function", "to_string", "producer_timestamp", "FMDAY"]'
375        assert str(FunctionOperator("to_string", 'producer_timestamp', 'FMDAY')) == \
376            '["function", "to_string", "producer_timestamp", "FMDAY"]'
377
378        with pytest.raises(APIError):
379            FunctionOperator("to_string")
380        with pytest.raises(APIError):
381            FunctionOperator("to_string", 'receive_time')
382
383    def test_unknown_function(self):
384        with pytest.raises(APIError):
385            FunctionOperator("std_dev")
386        with pytest.raises(APIError):
387            FunctionOperator("last")
388
389
390class TestSubqueryOperator(object):
391    """
392    Test the SubqueryOperator object
393    """
394    def test_events_endpoint(self):
395        assert str(SubqueryOperator('events')) == \
396            '["select_events"]'
397
398        op = SubqueryOperator('events')
399        op.add_query(EqualsOperator('status', 'noop'))
400
401        assert repr(op) == 'Query: ["select_events", '\
402            '["=", "status", "noop"]]'
403
404    def test_multiple_add_query(self):
405        with pytest.raises(APIError):
406            op = SubqueryOperator('events')
407            op.add_query(EqualsOperator('status', 'noop'))
408            op.add_query(EqualsOperator('status', 'changed'))
409
410    def test_unknown_endpoint(self):
411        with pytest.raises(APIError):
412            SubqueryOperator('cats')
413
414
415class TestInOperator(object):
416    """
417    Test the InOperator object
418    """
419    def test_events_endpoint(self):
420        assert str(InOperator('certname')) == \
421            '["in", "certname"]'
422
423        op = InOperator('certname')
424        ex = ExtractOperator()
425        ex.add_field("certname")
426        op.add_query(ex)
427
428        assert repr(op) == 'Query: ["in", "certname", ' \
429            '["extract", ["certname"]]]'
430
431    def test_multiple_add_query(self):
432        with pytest.raises(APIError):
433            op = InOperator('certname')
434            op.add_query(ExtractOperator())
435            op.add_query(ExtractOperator())
436
437    def test_add_array(self):
438        arr = [1, "2", 3]
439        op = InOperator('certname')
440        op.add_array(arr)
441
442        assert repr(op) == 'Query: ["in", "certname", ' \
443            '["array", [1, "2", 3]]]'
444
445    def test_invalid_add_array(self):
446        arr = [1, 2, 3]
447        inv1 = [1, [2, 3]]
448        inv2 = []
449
450        with pytest.raises(APIError):
451            op = InOperator('certname')
452            op.add_array(inv1)
453
454        with pytest.raises(APIError):
455            op = InOperator('certname')
456            op.add_array(inv2)
457
458        with pytest.raises(APIError):
459            op = InOperator('certname')
460            op.add_array(arr)
461            op.add_array(arr)
462
463        with pytest.raises(APIError):
464            op = InOperator('certname')
465
466            op.add_array(arr)
467            ex = ExtractOperator()
468            ex.add_field("certname")
469            op.add_query(ex)
470
471    def test_fromoperator(self):
472        op = InOperator('certname')
473        ex = ExtractOperator()
474        ex.add_field(["certname", "facts"])
475        fr = FromOperator("facts")
476        fr.add_query(ex)
477        fr.add_offset(10)
478        op.add_query(fr)
479
480        assert repr(op) == 'Query: ["in", "certname", ' \
481            '["from", "facts", ["extract", ' \
482            '["certname", "facts"]], ["offset", 10]]]'
483
484        # last example on page
485        # https://puppet.com/docs/puppetdb/5.1/api/query/v4/ast.html
486
487        op = InOperator('certname')
488        ex = ExtractOperator()
489        ex.add_field('certname')
490        fr = FromOperator('fact_contents')
491        nd = AndOperator()
492        nd.add(EqualsOperator("path",
493                              ["networking", "eth0", "macaddresses", 0]))
494        nd.add(EqualsOperator("value", "aa:bb:cc:dd:ee:00"))
495        ex.add_query(nd)
496        fr.add_query(ex)
497        op.add_query(fr)
498
499        assert str(op) == '["in", "certname", ' \
500                          '["from", "fact_contents", ' \
501            '["extract", ["certname"], ["and", ["=", "path", ' \
502            '["networking", "eth0", "macaddresses", 0]], ' \
503            '["=", "value", "aa:bb:cc:dd:ee:00"]]]]]'
504
505
506class TestFromOperator(object):
507    """
508    Test the FromOperator object
509    """
510
511    def test_init_from(self):
512        fr = FromOperator("facts")
513
514        with pytest.raises(APIError):
515            str(fr) == "unimportant_no_query"
516
517        with pytest.raises(APIError):
518            FromOperator('invalid_entity')
519
520    def test_add_query(self):
521        fr = FromOperator("facts")
522        op = EqualsOperator("certname", "test01")
523        fr.add_query(op)
524
525        assert str(fr) == '["from", "facts", ["=", "certname", "test01"]]'
526
527        fr2 = FromOperator("facts")
528        op2 = "test, test, test"
529        with pytest.raises(APIError):
530            fr2.add_query(op2)
531        fr2.add_query(op)
532        with pytest.raises(APIError):
533            fr2.add_query(op)
534
535        fr3 = FromOperator("facts")
536        op3 = ExtractOperator()
537        op3.add_field(['certname', 'fact_environment', 'catalog_environment'])
538        fr3.add_query(op3)
539
540        assert str(fr3) == \
541            '["from", "facts", ["extract", '\
542            '["certname", "fact_environment", "catalog_environment"]]]'
543
544    def test_limit_offset(self):
545        fr = FromOperator("facts")
546        op = EqualsOperator("certname", "test01")
547        fr.add_query(op)
548
549        fr.add_offset(10)
550        assert str(fr) == \
551            '["from", "facts", ["=", "certname", "test01"], ["offset", 10]]'
552
553        fr.add_limit(5)
554        assert str(fr) == \
555            '["from", "facts", ["=", "certname",' \
556            ' "test01"], ["limit", 5], ["offset", 10]]'
557
558        fr.add_limit(15)
559        assert str(fr) == \
560            '["from", "facts", ["=", "certname",' \
561            ' "test01"], ["limit", 15], ["offset", 10]]'
562
563        assert repr(fr) == \
564            'Query: ["from", "facts", ["=", "certname",' \
565            ' "test01"], ["limit", 15], ["offset", 10]]'
566
567        with pytest.raises(APIError):
568            fr.add_offset("invalid")
569
570        with pytest.raises(APIError):
571            fr.add_limit(["invalid"])
572
573    def test_order_by(self):
574        fr = FromOperator("facts")
575        op = EqualsOperator("certname", "test01")
576        fr.add_query(op)
577
578        o1 = ["certname"]
579        o2 = ["certname", ["timestamp", "desc"], "facts"]
580        o3inv = ['certname', ['timestamp', 'desc', ['oops']]]
581
582        fr.add_order_by(o1)
583        assert str(fr) == \
584            '["from", "facts", ["=", "certname", ' \
585            '"test01"], ["order_by", ["certname"]]]'
586
587        fr.add_order_by(o2)
588        assert repr(fr) == \
589            'Query: ["from", "facts", ' \
590            '["=", "certname", "test01"], ' \
591            '["order_by", ["certname", ' \
592            '["timestamp", "desc"], "facts"]]]'
593
594        assert str(fr) == \
595            '["from", "facts", ' \
596            '["=", "certname", "test01"], ' \
597            '["order_by", ["certname", ' \
598            '["timestamp", "desc"], "facts"]]]'
599
600        assert str(fr) == \
601            '["from", "facts", ' \
602            '["=", "certname", "test01"], ' \
603            '["order_by", ["certname", ' \
604            '["timestamp", "desc"], "facts"]]]'
605
606        with pytest.raises(APIError):
607            fr.add_order_by(o3inv)
608