1from __future__ import with_statement
2from datetime import datetime, timedelta
3
4import pytest
5
6from whoosh import fields, qparser, query
7from whoosh.compat import long_type, u, b, xrange
8from whoosh.filedb.filestore import RamStorage
9from whoosh.util import times
10from whoosh.util.testing import TempIndex
11
12
13def test_schema_eq():
14    a = fields.Schema()
15    b = fields.Schema()
16    assert a == b
17
18    a = fields.Schema(id=fields.ID)
19    b = a.copy()
20    assert a["id"] == b["id"]
21    assert a == b
22
23    c = fields.Schema(id=fields.TEXT)
24    assert a != c
25
26
27def test_creation1():
28    s = fields.Schema()
29    s.add("content", fields.TEXT(phrase=True))
30    s.add("title", fields.TEXT(stored=True))
31    s.add("path", fields.ID(stored=True))
32    s.add("tags", fields.KEYWORD(stored=True))
33    s.add("quick", fields.NGRAM)
34    s.add("note", fields.STORED)
35
36    assert s.names() == ["content", "note", "path", "quick", "tags", "title"]
37    assert "content" in s
38    assert "buzz" not in s
39    assert isinstance(s["tags"], fields.KEYWORD)
40
41
42def test_creation2():
43    s = fields.Schema(a=fields.ID(stored=True),
44                      b=fields.ID,
45                      c=fields.KEYWORD(scorable=True))
46
47    assert s.names() == ["a", "b", "c"]
48    assert "a" in s
49    assert "b" in s
50    assert "c" in s
51
52
53def test_declarative():
54    class MySchema(fields.SchemaClass):
55        content = fields.TEXT
56        title = fields.TEXT
57        path = fields.ID
58        date = fields.DATETIME
59
60    ix = RamStorage().create_index(MySchema)
61    assert ix.schema.names() == ["content", "date", "path", "title"]
62
63    ix = RamStorage().create_index(MySchema())
64    assert ix.schema.names() == ["content", "date", "path", "title"]
65
66    with pytest.raises(fields.FieldConfigurationError):
67        RamStorage().create_index(object())
68
69
70def test_declarative_inherit():
71    class Parent(fields.SchemaClass):
72        path = fields.ID
73        date = fields.DATETIME
74
75    class Child(Parent):
76        content = fields.TEXT
77
78    class Grandchild(Child):
79        title = fields.TEXT
80
81    s = Grandchild()
82    assert s.names() == ["content", "date", "path", "title"]
83
84
85def test_badnames():
86    s = fields.Schema()
87    with pytest.raises(fields.FieldConfigurationError):
88        s.add("_test", fields.ID)
89    with pytest.raises(fields.FieldConfigurationError):
90        s.add("a f", fields.ID)
91
92
93#def test_numeric_support():
94#    intf = fields.NUMERIC(int, shift_step=0)
95#    longf = fields.NUMERIC(int, bits=64, shift_step=0)
96#    floatf = fields.NUMERIC(float, shift_step=0)
97#
98#    def roundtrip(obj, num):
99#        assert obj.from_bytes(obj.to_bytes(num)), num)
100#
101#    roundtrip(intf, 0)
102#    roundtrip(intf, 12345)
103#    roundtrip(intf, -12345)
104#    roundtrip(longf, 0)
105#    roundtrip(longf, 85020450482)
106#    roundtrip(longf, -85020450482)
107#    roundtrip(floatf, 0)
108#    roundtrip(floatf, 582.592)
109#    roundtrip(floatf, -582.592)
110#    roundtrip(floatf, -99.42)
111#
112#    from random import shuffle
113#
114#    def roundtrip_sort(obj, start, end, step):
115#        count = start
116#        rng = []
117#        while count < end:
118#            rng.append(count)
119#            count += step
120#
121#        scrabled = list(rng)
122#        shuffle(scrabled)
123#        round = [obj.from_text(t) for t
124#                 in sorted([obj.to_text(n) for n in scrabled])]
125#        assert round, rng)
126#
127#    roundtrip_sort(intf, -100, 100, 1)
128#    roundtrip_sort(longf, -58902, 58249, 43)
129#    roundtrip_sort(floatf, -99.42, 99.83, 2.38)
130
131
132def test_index_numeric():
133    schema = fields.Schema(a=fields.NUMERIC(int, 32, signed=False),
134                           b=fields.NUMERIC(int, 32, signed=True))
135    ix = RamStorage().create_index(schema)
136    with ix.writer() as w:
137        w.add_document(a=1, b=1)
138    with ix.searcher() as s:
139        assert list(s.lexicon("a")) == \
140                     [b('\x00\x00\x00\x00\x01'), b('\x04\x00\x00\x00\x00'),
141                      b('\x08\x00\x00\x00\x00'), b('\x0c\x00\x00\x00\x00'),
142                      b('\x10\x00\x00\x00\x00'), b('\x14\x00\x00\x00\x00'),
143                      b('\x18\x00\x00\x00\x00'), b('\x1c\x00\x00\x00\x00')]
144        assert list(s.lexicon("b")) == \
145                     [b('\x00\x80\x00\x00\x01'), b('\x04\x08\x00\x00\x00'),
146                      b('\x08\x00\x80\x00\x00'), b('\x0c\x00\x08\x00\x00'),
147                      b('\x10\x00\x00\x80\x00'), b('\x14\x00\x00\x08\x00'),
148                      b('\x18\x00\x00\x00\x80'), b('\x1c\x00\x00\x00\x08')]
149
150
151def test_numeric():
152    schema = fields.Schema(id=fields.ID(stored=True),
153                           integer=fields.NUMERIC(int),
154                           floating=fields.NUMERIC(float))
155    ix = RamStorage().create_index(schema)
156
157    w = ix.writer()
158    w.add_document(id=u("a"), integer=5820, floating=1.2)
159    w.add_document(id=u("b"), integer=22, floating=2.3)
160    w.add_document(id=u("c"), integer=78, floating=3.4)
161    w.add_document(id=u("d"), integer=13, floating=4.5)
162    w.add_document(id=u("e"), integer=9, floating=5.6)
163    w.commit()
164
165    with ix.searcher() as s:
166        qp = qparser.QueryParser("integer", schema)
167
168        q = qp.parse(u("5820"))
169        r = s.search(q)
170        assert len(r) == 1
171        assert r[0]["id"] == "a"
172
173    with ix.searcher() as s:
174        r = s.search(qp.parse("floating:4.5"))
175        assert len(r) == 1
176        assert r[0]["id"] == "d"
177
178    q = qp.parse("integer:*")
179    assert q.__class__ == query.Every
180    assert q.field() == "integer"
181
182    q = qp.parse("integer:5?6")
183    assert q == query.NullQuery
184
185
186def test_decimal_numeric():
187    from decimal import Decimal
188
189    f = fields.NUMERIC(int, decimal_places=4)
190    schema = fields.Schema(id=fields.ID(stored=True), deci=f)
191    ix = RamStorage().create_index(schema)
192
193    # assert f.from_text(f.to_text(Decimal("123.56"))), Decimal("123.56"))
194
195    w = ix.writer()
196    w.add_document(id=u("a"), deci=Decimal("123.56"))
197    w.add_document(id=u("b"), deci=Decimal("0.536255"))
198    w.add_document(id=u("c"), deci=Decimal("2.5255"))
199    w.add_document(id=u("d"), deci=Decimal("58"))
200    w.commit()
201
202    with ix.searcher() as s:
203        qp = qparser.QueryParser("deci", schema)
204        q = qp.parse(u("123.56"))
205        r = s.search(q)
206        assert len(r) == 1
207        assert r[0]["id"] == "a"
208
209        r = s.search(qp.parse(u("0.536255")))
210        assert len(r) == 1
211        assert r[0]["id"] == "b"
212
213
214def test_numeric_parsing():
215    schema = fields.Schema(id=fields.ID(stored=True), number=fields.NUMERIC)
216
217    qp = qparser.QueryParser("number", schema)
218    q = qp.parse(u("[10 to *]"))
219    assert q == query.NullQuery
220
221    q = qp.parse(u("[to 400]"))
222    assert q.__class__ is query.NumericRange
223    assert q.start is None
224    assert q.end == 400
225
226    q = qp.parse(u("[10 to]"))
227    assert q.__class__ is query.NumericRange
228    assert q.start == 10
229    assert q.end is None
230
231    q = qp.parse(u("[10 to 400]"))
232    assert q.__class__ is query.NumericRange
233    assert q.start == 10
234    assert q.end == 400
235
236
237def test_numeric_ranges():
238    schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC)
239    ix = RamStorage().create_index(schema)
240    w = ix.writer()
241
242    for i in xrange(400):
243        w.add_document(id=i, num=i)
244    w.commit()
245
246    with ix.searcher() as s:
247        qp = qparser.QueryParser("num", schema)
248
249        def check(qs, target):
250            q = qp.parse(qs)
251            result = [s.stored_fields(d)["id"] for d in q.docs(s)]
252            assert result == target
253
254        # Note that range() is always inclusive-exclusive
255        check("[10 to 390]", list(range(10, 390 + 1)))
256        check("[100 to]", list(range(100, 400)))
257        check("[to 350]", list(range(0, 350 + 1)))
258        check("[16 to 255]", list(range(16, 255 + 1)))
259        check("{10 to 390]", list(range(11, 390 + 1)))
260        check("[10 to 390}", list(range(10, 390)))
261        check("{10 to 390}", list(range(11, 390)))
262        check("{16 to 255}", list(range(17, 255)))
263
264
265def test_numeric_ranges_unsigned():
266    values = [1, 10, 100, 1000, 2, 20, 200, 2000, 9, 90, 900, 9000]
267    schema = fields.Schema(num2=fields.NUMERIC(stored=True, signed=False))
268
269    ix = RamStorage().create_index(schema)
270    with ix.writer() as w:
271        for v in values:
272            w.add_document(num2=v)
273
274    with ix.searcher() as s:
275        q = query.NumericRange("num2", 55, None, True, False)
276        r = s.search(q, limit=None)
277        for hit in r:
278            assert int(hit["num2"]) >= 55
279
280
281def test_decimal_ranges():
282    from decimal import Decimal
283
284    schema = fields.Schema(id=fields.STORED,
285                           num=fields.NUMERIC(int, decimal_places=2))
286    ix = RamStorage().create_index(schema)
287    w = ix.writer()
288    count = Decimal("0.0")
289    inc = Decimal("0.2")
290    for _ in xrange(500):
291        w.add_document(id=str(count), num=count)
292        count += inc
293    w.commit()
294
295    with ix.searcher() as s:
296        qp = qparser.QueryParser("num", schema)
297
298        def check(qs, start, end):
299            q = qp.parse(qs)
300            result = [s.stored_fields(d)["id"] for d in q.docs(s)]
301
302            target = []
303            count = Decimal(start)
304            limit = Decimal(end)
305            while count <= limit:
306                target.append(str(count))
307                count += inc
308
309            assert result == target
310
311        check("[10.2 to 80.8]", "10.2", "80.8")
312        check("{10.2 to 80.8]", "10.4", "80.8")
313        check("[10.2 to 80.8}", "10.2", "80.6")
314        check("{10.2 to 80.8}", "10.4", "80.6")
315
316
317def test_numeric_errors():
318    f = fields.NUMERIC(int, bits=16, signed=True)
319    schema = fields.Schema(f=f)
320
321    with pytest.raises(ValueError):
322        list(f.index(-32769))
323    with pytest.raises(ValueError):
324        list(f.index(32768))
325
326
327def test_nontext_document():
328    schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC,
329                           date=fields.DATETIME, even=fields.BOOLEAN)
330    ix = RamStorage().create_index(schema)
331
332    dt = datetime.now()
333    w = ix.writer()
334    for i in xrange(50):
335        w.add_document(id=i, num=i, date=dt + timedelta(days=i),
336                       even=not(i % 2))
337    w.commit()
338
339    with ix.searcher() as s:
340        def check(kwargs, target):
341            result = [d['id'] for d in s.documents(**kwargs)]
342            assert result == target
343
344        check({"num": 49}, [49])
345        check({"date": dt + timedelta(days=30)}, [30])
346        check({"even": True}, list(range(0, 50, 2)))
347
348
349def test_nontext_update():
350    schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC(unique=True),
351                           date=fields.DATETIME(unique=True))
352    ix = RamStorage().create_index(schema)
353
354    dt = datetime.now()
355    w = ix.writer()
356    for i in xrange(10):
357        w.add_document(id=i, num=i, date=dt + timedelta(days=i))
358    w.commit()
359
360    w = ix.writer()
361    w.update_document(num=8, id="a")
362    w.update_document(num=2, id="b")
363    w.update_document(num=4, id="c")
364    w.update_document(date=dt + timedelta(days=5), id="d")
365    w.update_document(date=dt + timedelta(days=1), id="e")
366    w.update_document(date=dt + timedelta(days=7), id="f")
367    w.commit()
368
369
370def test_datetime():
371    dtf = fields.DATETIME(stored=True)
372    schema = fields.Schema(id=fields.ID(stored=True), date=dtf)
373    st = RamStorage()
374    ix = st.create_index(schema)
375
376    w = ix.writer()
377    for month in xrange(1, 12):
378        for day in xrange(1, 28):
379            w.add_document(id=u("%s-%s") % (month, day),
380                           date=datetime(2010, month, day, 14, 0, 0))
381    w.commit()
382
383    with ix.searcher() as s:
384        qp = qparser.QueryParser("id", schema)
385
386        r = s.search(qp.parse("date:20100523"))
387        assert len(r) == 1
388        assert r[0]["id"] == "5-23"
389        assert r[0]["date"].__class__ is datetime
390        assert r[0]["date"].month == 5
391        assert r[0]["date"].day == 23
392
393        r = s.search(qp.parse("date:'2010 02'"))
394        assert len(r) == 27
395
396        q = qp.parse(u("date:[2010-05 to 2010-08]"))
397        startdt = datetime(2010, 5, 1, 0, 0, 0, 0)
398        enddt = datetime(2010, 8, 31, 23, 59, 59, 999999)
399        assert q.__class__ is query.NumericRange
400        assert q.start == times.datetime_to_long(startdt)
401        assert q.end == times.datetime_to_long(enddt)
402
403
404def test_boolean():
405    schema = fields.Schema(id=fields.ID(stored=True),
406                           done=fields.BOOLEAN(stored=True))
407    ix = RamStorage().create_index(schema)
408
409    w = ix.writer()
410    w.add_document(id=u("a"), done=True)
411    w.add_document(id=u("b"), done=False)
412    w.add_document(id=u("c"), done=True)
413    w.add_document(id=u("d"), done=False)
414    w.add_document(id=u("e"), done=True)
415    w.commit()
416
417    with ix.searcher() as s:
418        qp = qparser.QueryParser("id", schema)
419
420        r = s.search(qp.parse("done:true"))
421        assert sorted([d["id"] for d in r]) == ["a", "c", "e"]
422        assert all(d["done"] for d in r)
423
424        r = s.search(qp.parse("done:yes"))
425        assert sorted([d["id"] for d in r]) == ["a", "c", "e"]
426        assert all(d["done"] for d in r)
427
428        q = qp.parse("done:false")
429        assert q.__class__ == query.Term
430        assert q.text is False
431        assert schema["done"].to_bytes(False) == b("f")
432        r = s.search(q)
433        assert sorted([d["id"] for d in r]) == ["b", "d"]
434        assert not any(d["done"] for d in r)
435
436        r = s.search(qp.parse("done:no"))
437        assert sorted([d["id"] for d in r]) == ["b", "d"]
438        assert not any(d["done"] for d in r)
439
440
441def test_boolean2():
442    schema = fields.Schema(t=fields.TEXT(stored=True),
443                           b=fields.BOOLEAN(stored=True))
444    ix = RamStorage().create_index(schema)
445    writer = ix.writer()
446    writer.add_document(t=u('some kind of text'), b=False)
447    writer.add_document(t=u('some other kind of text'), b=False)
448    writer.add_document(t=u('some more text'), b=False)
449    writer.add_document(t=u('some again'), b=True)
450    writer.commit()
451
452    with ix.searcher() as s:
453        qf = qparser.QueryParser('b', None).parse(u('f'))
454        qt = qparser.QueryParser('b', None).parse(u('t'))
455        r = s.search(qf)
456        assert len(r) == 3
457
458        assert [d["b"] for d in s.search(qt)] == [True]
459        assert [d["b"] for d in s.search(qf)] == [False] * 3
460
461
462def test_boolean3():
463    schema = fields.Schema(t=fields.TEXT(stored=True, field_boost=5),
464                           b=fields.BOOLEAN(stored=True),
465                           c=fields.TEXT)
466    ix = RamStorage().create_index(schema)
467
468    with ix.writer() as w:
469        w.add_document(t=u("with hardcopy"), b=True, c=u("alfa"))
470        w.add_document(t=u("no hardcopy"), b=False, c=u("bravo"))
471
472    with ix.searcher() as s:
473        q = query.Term("b", schema["b"].to_bytes(True))
474        ts = [hit["t"] for hit in s.search(q)]
475        assert ts == ["with hardcopy"]
476
477
478def test_boolean_strings():
479    schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True))
480    ix = RamStorage().create_index(schema)
481    with ix.writer() as w:
482        w.add_document(i=0, b="true")
483        w.add_document(i=1, b="True")
484        w.add_document(i=2, b="false")
485        w.add_document(i=3, b="False")
486        w.add_document(i=4, b=u("true"))
487        w.add_document(i=5, b=u("True"))
488        w.add_document(i=6, b=u("false"))
489        w.add_document(i=7, b=u("False"))
490
491    with ix.searcher() as s:
492        qp = qparser.QueryParser("b", ix.schema)
493
494        def check(qs, nums):
495            q = qp.parse(qs)
496            r = s.search(q, limit=None)
497            assert [hit["i"] for hit in r] == nums
498
499        trues = [0, 1, 4, 5]
500        falses = [2, 3, 6, 7]
501        check("true", trues)
502        check("True", trues)
503        check("false", falses)
504        check("False", falses)
505        check("t", trues)
506        check("f", falses)
507
508
509def test_boolean_find_deleted():
510    # "Random" string of ones and zeros representing deleted and undeleted
511    domain = "1110001010001110010101000101001011101010001011111101000101010101"
512
513    schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True))
514    ix = RamStorage().create_index(schema)
515    count = 0
516    # Create multiple segments just in case
517    for _ in xrange(5):
518        w = ix.writer()
519        for c in domain:
520            w.add_document(i=count, b=(c == "1"))
521        w.commit(merge=False)
522
523    # Delete documents where "b" is True
524    with ix.writer() as w:
525        w.delete_by_term("b", "t")
526
527    with ix.searcher() as s:
528        # Double check that documents with b=True are all deleted
529        reader = s.reader()
530        for docnum in xrange(s.doc_count_all()):
531            b = s.stored_fields(docnum)["b"]
532            assert b == reader.is_deleted(docnum)
533
534        # Try doing a search for documents where b=True
535        qp = qparser.QueryParser("b", ix.schema)
536        q = qp.parse("b:t")
537        r = s.search(q, limit=None)
538        assert len(r) == 0
539
540        # Make sure Every query doesn't match deleted docs
541        r = s.search(qp.parse("*"), limit=None)
542        assert not any(hit["b"] for hit in r)
543        assert not any(reader.is_deleted(hit.docnum) for hit in r)
544
545        r = s.search(qp.parse("*:*"), limit=None)
546        assert not any(hit["b"] for hit in r)
547        assert not any(reader.is_deleted(hit.docnum) for hit in r)
548
549        # Make sure Not query doesn't match deleted docs
550        q = qp.parse("NOT b:t")
551        r = s.search(q, limit=None)
552        assert not any(hit["b"] for hit in r)
553        assert not any(reader.is_deleted(hit.docnum) for hit in r)
554
555        r = s.search(q, limit=5)
556        assert not any(hit["b"] for hit in r)
557        assert not any(reader.is_deleted(hit.docnum) for hit in r)
558
559
560def test_boolean_multifield():
561    schema = fields.Schema(name=fields.TEXT(stored=True),
562                           bit=fields.BOOLEAN(stored=True))
563    ix = RamStorage().create_index(schema)
564    with ix.writer() as w:
565        w.add_document(name=u('audi'), bit=True)
566        w.add_document(name=u('vw'), bit=False)
567        w.add_document(name=u('porsche'), bit=False)
568        w.add_document(name=u('ferrari'), bit=True)
569        w.add_document(name=u('citroen'), bit=False)
570
571    with ix.searcher() as s:
572        qp = qparser.MultifieldParser(["name", "bit"], schema)
573        q = qp.parse(u("boop"))
574
575        r = s.search(q)
576        assert sorted(hit["name"] for hit in r) == ["audi", "ferrari"]
577        assert len(r) == 2
578
579
580def test_idlist():
581    schema = fields.Schema(paths=fields.IDLIST(stored=True))
582    ix = RamStorage().create_index(schema)
583
584    with ix.writer() as w:
585        w.add_document(paths=u('here there everywhere'))
586        w.add_document(paths=u('here'))
587        w.add_document(paths=u('there'))
588
589    with ix.searcher() as s:
590        qp = qparser.QueryParser('paths', schema)
591        q = qp.parse(u('here'))
592
593        r = s.search(q)
594        assert sorted(hit['paths'] for hit in r) == ['here', 'here there everywhere']
595
596
597def test_missing_field():
598    schema = fields.Schema()
599    ix = RamStorage().create_index(schema)
600
601    with ix.searcher() as s:
602        with pytest.raises(KeyError):
603            s.document_numbers(id=u("test"))
604
605
606def test_token_boost():
607    from whoosh.analysis import RegexTokenizer, DoubleMetaphoneFilter
608    ana = RegexTokenizer() | DoubleMetaphoneFilter()
609    field = fields.TEXT(analyzer=ana, phrase=False)
610    results = sorted(field.index(u("spruce view")))
611    assert results == [(b('F'), 1, 1.0, b('\x00\x00\x00\x01')),
612                       (b('FF'), 1, 0.5, b('\x00\x00\x00\x01')),
613                       (b('SPRS'), 1, 1.0, b('\x00\x00\x00\x01')),
614                       ]
615
616
617def test_pickle_idlist():
618    schema = fields.Schema(
619        pk=fields.ID(stored=True, unique=True),
620        text=fields.TEXT(),
621        tags=fields.IDLIST(stored=True),
622    )
623    with TempIndex(schema) as ix:
624        assert ix
625
626
627def test_pickle_schema():
628    from whoosh import analysis
629    from whoosh.support.charset import accent_map
630    from whoosh.compat import dumps
631
632    freetext_analyzer = (
633        analysis.StemmingAnalyzer() |
634        analysis.CharsetFilter(accent_map)
635    )
636
637    schema = fields.Schema(
638        path=fields.ID(stored=True, unique=True),
639        file_mtime=fields.DATETIME(stored=True),
640        name=fields.TEXT(stored=False, field_boost=2.0),
641        description=fields.TEXT(stored=False, field_boost=1.5,
642                                analyzer=freetext_analyzer),
643        content=fields.TEXT(analyzer=freetext_analyzer)
644    )
645
646    # Try to make some sentences that will require stemming
647    docs = [
648        u"The rain in spain falls mainly in the plain",
649        u"Plainly sitting on the plain",
650        u"Imagine a greatly improved sentence here"
651    ]
652
653    with TempIndex(schema) as ix:
654        with ix.writer() as w:
655            for doc in docs:
656                w.add_document(description=doc, content=doc)
657
658        assert dumps(schema, 2)
659
660        with ix.reader() as r:
661            assert dumps(r.schema, 2)
662
663
664