1from __future__ import with_statement 2from datetime import datetime, timedelta 3 4import pytest 5 6from whoosh import fields, qparser, query 7from whoosh.compat import long_type, u, b, xrange 8from whoosh.filedb.filestore import RamStorage 9from whoosh.util import times 10from whoosh.util.testing import TempIndex 11 12 13def test_schema_eq(): 14 a = fields.Schema() 15 b = fields.Schema() 16 assert a == b 17 18 a = fields.Schema(id=fields.ID) 19 b = a.copy() 20 assert a["id"] == b["id"] 21 assert a == b 22 23 c = fields.Schema(id=fields.TEXT) 24 assert a != c 25 26 27def test_creation1(): 28 s = fields.Schema() 29 s.add("content", fields.TEXT(phrase=True)) 30 s.add("title", fields.TEXT(stored=True)) 31 s.add("path", fields.ID(stored=True)) 32 s.add("tags", fields.KEYWORD(stored=True)) 33 s.add("quick", fields.NGRAM) 34 s.add("note", fields.STORED) 35 36 assert s.names() == ["content", "note", "path", "quick", "tags", "title"] 37 assert "content" in s 38 assert "buzz" not in s 39 assert isinstance(s["tags"], fields.KEYWORD) 40 41 42def test_creation2(): 43 s = fields.Schema(a=fields.ID(stored=True), 44 b=fields.ID, 45 c=fields.KEYWORD(scorable=True)) 46 47 assert s.names() == ["a", "b", "c"] 48 assert "a" in s 49 assert "b" in s 50 assert "c" in s 51 52 53def test_declarative(): 54 class MySchema(fields.SchemaClass): 55 content = fields.TEXT 56 title = fields.TEXT 57 path = fields.ID 58 date = fields.DATETIME 59 60 ix = RamStorage().create_index(MySchema) 61 assert ix.schema.names() == ["content", "date", "path", "title"] 62 63 ix = RamStorage().create_index(MySchema()) 64 assert ix.schema.names() == ["content", "date", "path", "title"] 65 66 with pytest.raises(fields.FieldConfigurationError): 67 RamStorage().create_index(object()) 68 69 70def test_declarative_inherit(): 71 class Parent(fields.SchemaClass): 72 path = fields.ID 73 date = fields.DATETIME 74 75 class Child(Parent): 76 content = fields.TEXT 77 78 class Grandchild(Child): 79 title = fields.TEXT 80 81 s = Grandchild() 82 assert s.names() == ["content", "date", "path", "title"] 83 84 85def test_badnames(): 86 s = fields.Schema() 87 with pytest.raises(fields.FieldConfigurationError): 88 s.add("_test", fields.ID) 89 with pytest.raises(fields.FieldConfigurationError): 90 s.add("a f", fields.ID) 91 92 93#def test_numeric_support(): 94# intf = fields.NUMERIC(int, shift_step=0) 95# longf = fields.NUMERIC(int, bits=64, shift_step=0) 96# floatf = fields.NUMERIC(float, shift_step=0) 97# 98# def roundtrip(obj, num): 99# assert obj.from_bytes(obj.to_bytes(num)), num) 100# 101# roundtrip(intf, 0) 102# roundtrip(intf, 12345) 103# roundtrip(intf, -12345) 104# roundtrip(longf, 0) 105# roundtrip(longf, 85020450482) 106# roundtrip(longf, -85020450482) 107# roundtrip(floatf, 0) 108# roundtrip(floatf, 582.592) 109# roundtrip(floatf, -582.592) 110# roundtrip(floatf, -99.42) 111# 112# from random import shuffle 113# 114# def roundtrip_sort(obj, start, end, step): 115# count = start 116# rng = [] 117# while count < end: 118# rng.append(count) 119# count += step 120# 121# scrabled = list(rng) 122# shuffle(scrabled) 123# round = [obj.from_text(t) for t 124# in sorted([obj.to_text(n) for n in scrabled])] 125# assert round, rng) 126# 127# roundtrip_sort(intf, -100, 100, 1) 128# roundtrip_sort(longf, -58902, 58249, 43) 129# roundtrip_sort(floatf, -99.42, 99.83, 2.38) 130 131 132def test_index_numeric(): 133 schema = fields.Schema(a=fields.NUMERIC(int, 32, signed=False), 134 b=fields.NUMERIC(int, 32, signed=True)) 135 ix = RamStorage().create_index(schema) 136 with ix.writer() as w: 137 w.add_document(a=1, b=1) 138 with ix.searcher() as s: 139 assert list(s.lexicon("a")) == \ 140 [b('\x00\x00\x00\x00\x01'), b('\x04\x00\x00\x00\x00'), 141 b('\x08\x00\x00\x00\x00'), b('\x0c\x00\x00\x00\x00'), 142 b('\x10\x00\x00\x00\x00'), b('\x14\x00\x00\x00\x00'), 143 b('\x18\x00\x00\x00\x00'), b('\x1c\x00\x00\x00\x00')] 144 assert list(s.lexicon("b")) == \ 145 [b('\x00\x80\x00\x00\x01'), b('\x04\x08\x00\x00\x00'), 146 b('\x08\x00\x80\x00\x00'), b('\x0c\x00\x08\x00\x00'), 147 b('\x10\x00\x00\x80\x00'), b('\x14\x00\x00\x08\x00'), 148 b('\x18\x00\x00\x00\x80'), b('\x1c\x00\x00\x00\x08')] 149 150 151def test_numeric(): 152 schema = fields.Schema(id=fields.ID(stored=True), 153 integer=fields.NUMERIC(int), 154 floating=fields.NUMERIC(float)) 155 ix = RamStorage().create_index(schema) 156 157 w = ix.writer() 158 w.add_document(id=u("a"), integer=5820, floating=1.2) 159 w.add_document(id=u("b"), integer=22, floating=2.3) 160 w.add_document(id=u("c"), integer=78, floating=3.4) 161 w.add_document(id=u("d"), integer=13, floating=4.5) 162 w.add_document(id=u("e"), integer=9, floating=5.6) 163 w.commit() 164 165 with ix.searcher() as s: 166 qp = qparser.QueryParser("integer", schema) 167 168 q = qp.parse(u("5820")) 169 r = s.search(q) 170 assert len(r) == 1 171 assert r[0]["id"] == "a" 172 173 with ix.searcher() as s: 174 r = s.search(qp.parse("floating:4.5")) 175 assert len(r) == 1 176 assert r[0]["id"] == "d" 177 178 q = qp.parse("integer:*") 179 assert q.__class__ == query.Every 180 assert q.field() == "integer" 181 182 q = qp.parse("integer:5?6") 183 assert q == query.NullQuery 184 185 186def test_decimal_numeric(): 187 from decimal import Decimal 188 189 f = fields.NUMERIC(int, decimal_places=4) 190 schema = fields.Schema(id=fields.ID(stored=True), deci=f) 191 ix = RamStorage().create_index(schema) 192 193 # assert f.from_text(f.to_text(Decimal("123.56"))), Decimal("123.56")) 194 195 w = ix.writer() 196 w.add_document(id=u("a"), deci=Decimal("123.56")) 197 w.add_document(id=u("b"), deci=Decimal("0.536255")) 198 w.add_document(id=u("c"), deci=Decimal("2.5255")) 199 w.add_document(id=u("d"), deci=Decimal("58")) 200 w.commit() 201 202 with ix.searcher() as s: 203 qp = qparser.QueryParser("deci", schema) 204 q = qp.parse(u("123.56")) 205 r = s.search(q) 206 assert len(r) == 1 207 assert r[0]["id"] == "a" 208 209 r = s.search(qp.parse(u("0.536255"))) 210 assert len(r) == 1 211 assert r[0]["id"] == "b" 212 213 214def test_numeric_parsing(): 215 schema = fields.Schema(id=fields.ID(stored=True), number=fields.NUMERIC) 216 217 qp = qparser.QueryParser("number", schema) 218 q = qp.parse(u("[10 to *]")) 219 assert q == query.NullQuery 220 221 q = qp.parse(u("[to 400]")) 222 assert q.__class__ is query.NumericRange 223 assert q.start is None 224 assert q.end == 400 225 226 q = qp.parse(u("[10 to]")) 227 assert q.__class__ is query.NumericRange 228 assert q.start == 10 229 assert q.end is None 230 231 q = qp.parse(u("[10 to 400]")) 232 assert q.__class__ is query.NumericRange 233 assert q.start == 10 234 assert q.end == 400 235 236 237def test_numeric_ranges(): 238 schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC) 239 ix = RamStorage().create_index(schema) 240 w = ix.writer() 241 242 for i in xrange(400): 243 w.add_document(id=i, num=i) 244 w.commit() 245 246 with ix.searcher() as s: 247 qp = qparser.QueryParser("num", schema) 248 249 def check(qs, target): 250 q = qp.parse(qs) 251 result = [s.stored_fields(d)["id"] for d in q.docs(s)] 252 assert result == target 253 254 # Note that range() is always inclusive-exclusive 255 check("[10 to 390]", list(range(10, 390 + 1))) 256 check("[100 to]", list(range(100, 400))) 257 check("[to 350]", list(range(0, 350 + 1))) 258 check("[16 to 255]", list(range(16, 255 + 1))) 259 check("{10 to 390]", list(range(11, 390 + 1))) 260 check("[10 to 390}", list(range(10, 390))) 261 check("{10 to 390}", list(range(11, 390))) 262 check("{16 to 255}", list(range(17, 255))) 263 264 265def test_numeric_ranges_unsigned(): 266 values = [1, 10, 100, 1000, 2, 20, 200, 2000, 9, 90, 900, 9000] 267 schema = fields.Schema(num2=fields.NUMERIC(stored=True, signed=False)) 268 269 ix = RamStorage().create_index(schema) 270 with ix.writer() as w: 271 for v in values: 272 w.add_document(num2=v) 273 274 with ix.searcher() as s: 275 q = query.NumericRange("num2", 55, None, True, False) 276 r = s.search(q, limit=None) 277 for hit in r: 278 assert int(hit["num2"]) >= 55 279 280 281def test_decimal_ranges(): 282 from decimal import Decimal 283 284 schema = fields.Schema(id=fields.STORED, 285 num=fields.NUMERIC(int, decimal_places=2)) 286 ix = RamStorage().create_index(schema) 287 w = ix.writer() 288 count = Decimal("0.0") 289 inc = Decimal("0.2") 290 for _ in xrange(500): 291 w.add_document(id=str(count), num=count) 292 count += inc 293 w.commit() 294 295 with ix.searcher() as s: 296 qp = qparser.QueryParser("num", schema) 297 298 def check(qs, start, end): 299 q = qp.parse(qs) 300 result = [s.stored_fields(d)["id"] for d in q.docs(s)] 301 302 target = [] 303 count = Decimal(start) 304 limit = Decimal(end) 305 while count <= limit: 306 target.append(str(count)) 307 count += inc 308 309 assert result == target 310 311 check("[10.2 to 80.8]", "10.2", "80.8") 312 check("{10.2 to 80.8]", "10.4", "80.8") 313 check("[10.2 to 80.8}", "10.2", "80.6") 314 check("{10.2 to 80.8}", "10.4", "80.6") 315 316 317def test_numeric_errors(): 318 f = fields.NUMERIC(int, bits=16, signed=True) 319 schema = fields.Schema(f=f) 320 321 with pytest.raises(ValueError): 322 list(f.index(-32769)) 323 with pytest.raises(ValueError): 324 list(f.index(32768)) 325 326 327def test_nontext_document(): 328 schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC, 329 date=fields.DATETIME, even=fields.BOOLEAN) 330 ix = RamStorage().create_index(schema) 331 332 dt = datetime.now() 333 w = ix.writer() 334 for i in xrange(50): 335 w.add_document(id=i, num=i, date=dt + timedelta(days=i), 336 even=not(i % 2)) 337 w.commit() 338 339 with ix.searcher() as s: 340 def check(kwargs, target): 341 result = [d['id'] for d in s.documents(**kwargs)] 342 assert result == target 343 344 check({"num": 49}, [49]) 345 check({"date": dt + timedelta(days=30)}, [30]) 346 check({"even": True}, list(range(0, 50, 2))) 347 348 349def test_nontext_update(): 350 schema = fields.Schema(id=fields.STORED, num=fields.NUMERIC(unique=True), 351 date=fields.DATETIME(unique=True)) 352 ix = RamStorage().create_index(schema) 353 354 dt = datetime.now() 355 w = ix.writer() 356 for i in xrange(10): 357 w.add_document(id=i, num=i, date=dt + timedelta(days=i)) 358 w.commit() 359 360 w = ix.writer() 361 w.update_document(num=8, id="a") 362 w.update_document(num=2, id="b") 363 w.update_document(num=4, id="c") 364 w.update_document(date=dt + timedelta(days=5), id="d") 365 w.update_document(date=dt + timedelta(days=1), id="e") 366 w.update_document(date=dt + timedelta(days=7), id="f") 367 w.commit() 368 369 370def test_datetime(): 371 dtf = fields.DATETIME(stored=True) 372 schema = fields.Schema(id=fields.ID(stored=True), date=dtf) 373 st = RamStorage() 374 ix = st.create_index(schema) 375 376 w = ix.writer() 377 for month in xrange(1, 12): 378 for day in xrange(1, 28): 379 w.add_document(id=u("%s-%s") % (month, day), 380 date=datetime(2010, month, day, 14, 0, 0)) 381 w.commit() 382 383 with ix.searcher() as s: 384 qp = qparser.QueryParser("id", schema) 385 386 r = s.search(qp.parse("date:20100523")) 387 assert len(r) == 1 388 assert r[0]["id"] == "5-23" 389 assert r[0]["date"].__class__ is datetime 390 assert r[0]["date"].month == 5 391 assert r[0]["date"].day == 23 392 393 r = s.search(qp.parse("date:'2010 02'")) 394 assert len(r) == 27 395 396 q = qp.parse(u("date:[2010-05 to 2010-08]")) 397 startdt = datetime(2010, 5, 1, 0, 0, 0, 0) 398 enddt = datetime(2010, 8, 31, 23, 59, 59, 999999) 399 assert q.__class__ is query.NumericRange 400 assert q.start == times.datetime_to_long(startdt) 401 assert q.end == times.datetime_to_long(enddt) 402 403 404def test_boolean(): 405 schema = fields.Schema(id=fields.ID(stored=True), 406 done=fields.BOOLEAN(stored=True)) 407 ix = RamStorage().create_index(schema) 408 409 w = ix.writer() 410 w.add_document(id=u("a"), done=True) 411 w.add_document(id=u("b"), done=False) 412 w.add_document(id=u("c"), done=True) 413 w.add_document(id=u("d"), done=False) 414 w.add_document(id=u("e"), done=True) 415 w.commit() 416 417 with ix.searcher() as s: 418 qp = qparser.QueryParser("id", schema) 419 420 r = s.search(qp.parse("done:true")) 421 assert sorted([d["id"] for d in r]) == ["a", "c", "e"] 422 assert all(d["done"] for d in r) 423 424 r = s.search(qp.parse("done:yes")) 425 assert sorted([d["id"] for d in r]) == ["a", "c", "e"] 426 assert all(d["done"] for d in r) 427 428 q = qp.parse("done:false") 429 assert q.__class__ == query.Term 430 assert q.text is False 431 assert schema["done"].to_bytes(False) == b("f") 432 r = s.search(q) 433 assert sorted([d["id"] for d in r]) == ["b", "d"] 434 assert not any(d["done"] for d in r) 435 436 r = s.search(qp.parse("done:no")) 437 assert sorted([d["id"] for d in r]) == ["b", "d"] 438 assert not any(d["done"] for d in r) 439 440 441def test_boolean2(): 442 schema = fields.Schema(t=fields.TEXT(stored=True), 443 b=fields.BOOLEAN(stored=True)) 444 ix = RamStorage().create_index(schema) 445 writer = ix.writer() 446 writer.add_document(t=u('some kind of text'), b=False) 447 writer.add_document(t=u('some other kind of text'), b=False) 448 writer.add_document(t=u('some more text'), b=False) 449 writer.add_document(t=u('some again'), b=True) 450 writer.commit() 451 452 with ix.searcher() as s: 453 qf = qparser.QueryParser('b', None).parse(u('f')) 454 qt = qparser.QueryParser('b', None).parse(u('t')) 455 r = s.search(qf) 456 assert len(r) == 3 457 458 assert [d["b"] for d in s.search(qt)] == [True] 459 assert [d["b"] for d in s.search(qf)] == [False] * 3 460 461 462def test_boolean3(): 463 schema = fields.Schema(t=fields.TEXT(stored=True, field_boost=5), 464 b=fields.BOOLEAN(stored=True), 465 c=fields.TEXT) 466 ix = RamStorage().create_index(schema) 467 468 with ix.writer() as w: 469 w.add_document(t=u("with hardcopy"), b=True, c=u("alfa")) 470 w.add_document(t=u("no hardcopy"), b=False, c=u("bravo")) 471 472 with ix.searcher() as s: 473 q = query.Term("b", schema["b"].to_bytes(True)) 474 ts = [hit["t"] for hit in s.search(q)] 475 assert ts == ["with hardcopy"] 476 477 478def test_boolean_strings(): 479 schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True)) 480 ix = RamStorage().create_index(schema) 481 with ix.writer() as w: 482 w.add_document(i=0, b="true") 483 w.add_document(i=1, b="True") 484 w.add_document(i=2, b="false") 485 w.add_document(i=3, b="False") 486 w.add_document(i=4, b=u("true")) 487 w.add_document(i=5, b=u("True")) 488 w.add_document(i=6, b=u("false")) 489 w.add_document(i=7, b=u("False")) 490 491 with ix.searcher() as s: 492 qp = qparser.QueryParser("b", ix.schema) 493 494 def check(qs, nums): 495 q = qp.parse(qs) 496 r = s.search(q, limit=None) 497 assert [hit["i"] for hit in r] == nums 498 499 trues = [0, 1, 4, 5] 500 falses = [2, 3, 6, 7] 501 check("true", trues) 502 check("True", trues) 503 check("false", falses) 504 check("False", falses) 505 check("t", trues) 506 check("f", falses) 507 508 509def test_boolean_find_deleted(): 510 # "Random" string of ones and zeros representing deleted and undeleted 511 domain = "1110001010001110010101000101001011101010001011111101000101010101" 512 513 schema = fields.Schema(i=fields.STORED, b=fields.BOOLEAN(stored=True)) 514 ix = RamStorage().create_index(schema) 515 count = 0 516 # Create multiple segments just in case 517 for _ in xrange(5): 518 w = ix.writer() 519 for c in domain: 520 w.add_document(i=count, b=(c == "1")) 521 w.commit(merge=False) 522 523 # Delete documents where "b" is True 524 with ix.writer() as w: 525 w.delete_by_term("b", "t") 526 527 with ix.searcher() as s: 528 # Double check that documents with b=True are all deleted 529 reader = s.reader() 530 for docnum in xrange(s.doc_count_all()): 531 b = s.stored_fields(docnum)["b"] 532 assert b == reader.is_deleted(docnum) 533 534 # Try doing a search for documents where b=True 535 qp = qparser.QueryParser("b", ix.schema) 536 q = qp.parse("b:t") 537 r = s.search(q, limit=None) 538 assert len(r) == 0 539 540 # Make sure Every query doesn't match deleted docs 541 r = s.search(qp.parse("*"), limit=None) 542 assert not any(hit["b"] for hit in r) 543 assert not any(reader.is_deleted(hit.docnum) for hit in r) 544 545 r = s.search(qp.parse("*:*"), limit=None) 546 assert not any(hit["b"] for hit in r) 547 assert not any(reader.is_deleted(hit.docnum) for hit in r) 548 549 # Make sure Not query doesn't match deleted docs 550 q = qp.parse("NOT b:t") 551 r = s.search(q, limit=None) 552 assert not any(hit["b"] for hit in r) 553 assert not any(reader.is_deleted(hit.docnum) for hit in r) 554 555 r = s.search(q, limit=5) 556 assert not any(hit["b"] for hit in r) 557 assert not any(reader.is_deleted(hit.docnum) for hit in r) 558 559 560def test_boolean_multifield(): 561 schema = fields.Schema(name=fields.TEXT(stored=True), 562 bit=fields.BOOLEAN(stored=True)) 563 ix = RamStorage().create_index(schema) 564 with ix.writer() as w: 565 w.add_document(name=u('audi'), bit=True) 566 w.add_document(name=u('vw'), bit=False) 567 w.add_document(name=u('porsche'), bit=False) 568 w.add_document(name=u('ferrari'), bit=True) 569 w.add_document(name=u('citroen'), bit=False) 570 571 with ix.searcher() as s: 572 qp = qparser.MultifieldParser(["name", "bit"], schema) 573 q = qp.parse(u("boop")) 574 575 r = s.search(q) 576 assert sorted(hit["name"] for hit in r) == ["audi", "ferrari"] 577 assert len(r) == 2 578 579 580def test_idlist(): 581 schema = fields.Schema(paths=fields.IDLIST(stored=True)) 582 ix = RamStorage().create_index(schema) 583 584 with ix.writer() as w: 585 w.add_document(paths=u('here there everywhere')) 586 w.add_document(paths=u('here')) 587 w.add_document(paths=u('there')) 588 589 with ix.searcher() as s: 590 qp = qparser.QueryParser('paths', schema) 591 q = qp.parse(u('here')) 592 593 r = s.search(q) 594 assert sorted(hit['paths'] for hit in r) == ['here', 'here there everywhere'] 595 596 597def test_missing_field(): 598 schema = fields.Schema() 599 ix = RamStorage().create_index(schema) 600 601 with ix.searcher() as s: 602 with pytest.raises(KeyError): 603 s.document_numbers(id=u("test")) 604 605 606def test_token_boost(): 607 from whoosh.analysis import RegexTokenizer, DoubleMetaphoneFilter 608 ana = RegexTokenizer() | DoubleMetaphoneFilter() 609 field = fields.TEXT(analyzer=ana, phrase=False) 610 results = sorted(field.index(u("spruce view"))) 611 assert results == [(b('F'), 1, 1.0, b('\x00\x00\x00\x01')), 612 (b('FF'), 1, 0.5, b('\x00\x00\x00\x01')), 613 (b('SPRS'), 1, 1.0, b('\x00\x00\x00\x01')), 614 ] 615 616 617def test_pickle_idlist(): 618 schema = fields.Schema( 619 pk=fields.ID(stored=True, unique=True), 620 text=fields.TEXT(), 621 tags=fields.IDLIST(stored=True), 622 ) 623 with TempIndex(schema) as ix: 624 assert ix 625 626 627def test_pickle_schema(): 628 from whoosh import analysis 629 from whoosh.support.charset import accent_map 630 from whoosh.compat import dumps 631 632 freetext_analyzer = ( 633 analysis.StemmingAnalyzer() | 634 analysis.CharsetFilter(accent_map) 635 ) 636 637 schema = fields.Schema( 638 path=fields.ID(stored=True, unique=True), 639 file_mtime=fields.DATETIME(stored=True), 640 name=fields.TEXT(stored=False, field_boost=2.0), 641 description=fields.TEXT(stored=False, field_boost=1.5, 642 analyzer=freetext_analyzer), 643 content=fields.TEXT(analyzer=freetext_analyzer) 644 ) 645 646 # Try to make some sentences that will require stemming 647 docs = [ 648 u"The rain in spain falls mainly in the plain", 649 u"Plainly sitting on the plain", 650 u"Imagine a greatly improved sentence here" 651 ] 652 653 with TempIndex(schema) as ix: 654 with ix.writer() as w: 655 for doc in docs: 656 w.add_document(description=doc, content=doc) 657 658 assert dumps(schema, 2) 659 660 with ix.reader() as r: 661 assert dumps(r.schema, 2) 662 663 664