1# 2# $Id: sphinxapi.py 3436 2012-10-08 09:17:18Z kevg $ 3# 4# Python version of Sphinx searchd client (Python API) 5# 6# Copyright (c) 2006, Mike Osadnik 7# Copyright (c) 2006-2012, Andrew Aksyonoff 8# Copyright (c) 2008-2012, Sphinx Technologies Inc 9# All rights reserved 10# 11# This program is free software; you can redistribute it and/or modify 12# it under the terms of the GNU General Public License. You should have 13# received a copy of the GPL license along with this program; if you 14# did not, you can find it at http://www.gnu.org/ 15# 16 17import sys 18import select 19import socket 20import re 21from struct import * 22 23 24# known searchd commands 25SEARCHD_COMMAND_SEARCH = 0 26SEARCHD_COMMAND_EXCERPT = 1 27SEARCHD_COMMAND_UPDATE = 2 28SEARCHD_COMMAND_KEYWORDS = 3 29SEARCHD_COMMAND_PERSIST = 4 30SEARCHD_COMMAND_STATUS = 5 31SEARCHD_COMMAND_FLUSHATTRS = 7 32 33# current client-side command implementation versions 34VER_COMMAND_SEARCH = 0x119 35VER_COMMAND_EXCERPT = 0x104 36VER_COMMAND_UPDATE = 0x102 37VER_COMMAND_KEYWORDS = 0x100 38VER_COMMAND_STATUS = 0x100 39VER_COMMAND_FLUSHATTRS = 0x100 40 41# known searchd status codes 42SEARCHD_OK = 0 43SEARCHD_ERROR = 1 44SEARCHD_RETRY = 2 45SEARCHD_WARNING = 3 46 47# known match modes 48SPH_MATCH_ALL = 0 49SPH_MATCH_ANY = 1 50SPH_MATCH_PHRASE = 2 51SPH_MATCH_BOOLEAN = 3 52SPH_MATCH_EXTENDED = 4 53SPH_MATCH_FULLSCAN = 5 54SPH_MATCH_EXTENDED2 = 6 55 56# known ranking modes (extended2 mode only) 57SPH_RANK_PROXIMITY_BM25 = 0 # default mode, phrase proximity major factor and BM25 minor one 58SPH_RANK_BM25 = 1 # statistical mode, BM25 ranking only (faster but worse quality) 59SPH_RANK_NONE = 2 # no ranking, all matches get a weight of 1 60SPH_RANK_WORDCOUNT = 3 # simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts 61SPH_RANK_PROXIMITY = 4 62SPH_RANK_MATCHANY = 5 63SPH_RANK_FIELDMASK = 6 64SPH_RANK_SPH04 = 7 65SPH_RANK_EXPR = 8 66SPH_RANK_TOTAL = 9 67 68# known sort modes 69SPH_SORT_RELEVANCE = 0 70SPH_SORT_ATTR_DESC = 1 71SPH_SORT_ATTR_ASC = 2 72SPH_SORT_TIME_SEGMENTS = 3 73SPH_SORT_EXTENDED = 4 74SPH_SORT_EXPR = 5 75 76# known filter types 77SPH_FILTER_VALUES = 0 78SPH_FILTER_RANGE = 1 79SPH_FILTER_FLOATRANGE = 2 80 81# known attribute types 82SPH_ATTR_NONE = 0 83SPH_ATTR_INTEGER = 1 84SPH_ATTR_TIMESTAMP = 2 85SPH_ATTR_ORDINAL = 3 86SPH_ATTR_BOOL = 4 87SPH_ATTR_FLOAT = 5 88SPH_ATTR_BIGINT = 6 89SPH_ATTR_STRING = 7 90SPH_ATTR_MULTI = 0X40000001L 91SPH_ATTR_MULTI64 = 0X40000002L 92 93SPH_ATTR_TYPES = (SPH_ATTR_NONE, 94 SPH_ATTR_INTEGER, 95 SPH_ATTR_TIMESTAMP, 96 SPH_ATTR_ORDINAL, 97 SPH_ATTR_BOOL, 98 SPH_ATTR_FLOAT, 99 SPH_ATTR_BIGINT, 100 SPH_ATTR_STRING, 101 SPH_ATTR_MULTI, 102 SPH_ATTR_MULTI64) 103 104# known grouping functions 105SPH_GROUPBY_DAY = 0 106SPH_GROUPBY_WEEK = 1 107SPH_GROUPBY_MONTH = 2 108SPH_GROUPBY_YEAR = 3 109SPH_GROUPBY_ATTR = 4 110SPH_GROUPBY_ATTRPAIR = 5 111 112 113class SphinxClient: 114 def __init__ (self): 115 """ 116 Create a new client object, and fill defaults. 117 """ 118 self._host = 'localhost' # searchd host (default is "localhost") 119 self._port = 9312 # searchd port (default is 9312) 120 self._path = None # searchd unix-domain socket path 121 self._socket = None 122 self._offset = 0 # how much records to seek from result-set start (default is 0) 123 self._limit = 20 # how much records to return from result-set starting at offset (default is 20) 124 self._mode = SPH_MATCH_ALL # query matching mode (default is SPH_MATCH_ALL) 125 self._weights = [] # per-field weights (default is 1 for all fields) 126 self._sort = SPH_SORT_RELEVANCE # match sorting mode (default is SPH_SORT_RELEVANCE) 127 self._sortby = '' # attribute to sort by (defualt is "") 128 self._min_id = 0 # min ID to match (default is 0) 129 self._max_id = 0 # max ID to match (default is UINT_MAX) 130 self._filters = [] # search filters 131 self._groupby = '' # group-by attribute name 132 self._groupfunc = SPH_GROUPBY_DAY # group-by function (to pre-process group-by attribute value with) 133 self._groupsort = '@group desc' # group-by sorting clause (to sort groups in result set with) 134 self._groupdistinct = '' # group-by count-distinct attribute 135 self._maxmatches = 1000 # max matches to retrieve 136 self._cutoff = 0 # cutoff to stop searching at 137 self._retrycount = 0 # distributed retry count 138 self._retrydelay = 0 # distributed retry delay 139 self._anchor = {} # geographical anchor point 140 self._indexweights = {} # per-index weights 141 self._ranker = SPH_RANK_PROXIMITY_BM25 # ranking mode 142 self._rankexpr = '' # ranking expression for SPH_RANK_EXPR 143 self._maxquerytime = 0 # max query time, milliseconds (default is 0, do not limit) 144 self._timeout = 1.0 # connection timeout 145 self._fieldweights = {} # per-field-name weights 146 self._overrides = {} # per-query attribute values overrides 147 self._select = '*' # select-list (attributes or expressions, with optional aliases) 148 149 self._error = '' # last error message 150 self._warning = '' # last warning message 151 self._reqs = [] # requests array for multi-query 152 153 def __del__ (self): 154 if self._socket: 155 self._socket.close() 156 157 158 def GetLastError (self): 159 """ 160 Get last error message (string). 161 """ 162 return self._error 163 164 165 def GetLastWarning (self): 166 """ 167 Get last warning message (string). 168 """ 169 return self._warning 170 171 172 def SetServer (self, host, port = None): 173 """ 174 Set searchd server host and port. 175 """ 176 assert(isinstance(host, str)) 177 if host.startswith('/'): 178 self._path = host 179 return 180 elif host.startswith('unix://'): 181 self._path = host[7:] 182 return 183 self._host = host 184 if isinstance(port, int): 185 assert(port>0 and port<65536) 186 self._port = port 187 self._path = None 188 189 def SetConnectTimeout ( self, timeout ): 190 """ 191 Set connection timeout ( float second ) 192 """ 193 assert (isinstance(timeout, float)) 194 # set timeout to 0 make connaection non-blocking that is wrong so timeout got clipped to reasonable minimum 195 self._timeout = max ( 0.001, timeout ) 196 197 def _Connect (self): 198 """ 199 INTERNAL METHOD, DO NOT CALL. Connects to searchd server. 200 """ 201 if self._socket: 202 # we have a socket, but is it still alive? 203 sr, sw, _ = select.select ( [self._socket], [self._socket], [], 0 ) 204 205 # this is how alive socket should look 206 if len(sr)==0 and len(sw)==1: 207 return self._socket 208 209 # oops, looks like it was closed, lets reopen 210 self._socket.close() 211 self._socket = None 212 213 try: 214 if self._path: 215 af = socket.AF_UNIX 216 addr = self._path 217 desc = self._path 218 else: 219 af = socket.AF_INET 220 addr = ( self._host, self._port ) 221 desc = '%s;%s' % addr 222 sock = socket.socket ( af, socket.SOCK_STREAM ) 223 sock.settimeout ( self._timeout ) 224 sock.connect ( addr ) 225 except socket.error, msg: 226 if sock: 227 sock.close() 228 self._error = 'connection to %s failed (%s)' % ( desc, msg ) 229 return 230 231 v = unpack('>L', sock.recv(4)) 232 if v<1: 233 sock.close() 234 self._error = 'expected searchd protocol version, got %s' % v 235 return 236 237 # all ok, send my version 238 sock.send(pack('>L', 1)) 239 return sock 240 241 242 def _GetResponse (self, sock, client_ver): 243 """ 244 INTERNAL METHOD, DO NOT CALL. Gets and checks response packet from searchd server. 245 """ 246 (status, ver, length) = unpack('>2HL', sock.recv(8)) 247 response = '' 248 left = length 249 while left>0: 250 chunk = sock.recv(left) 251 if chunk: 252 response += chunk 253 left -= len(chunk) 254 else: 255 break 256 257 if not self._socket: 258 sock.close() 259 260 # check response 261 read = len(response) 262 if not response or read!=length: 263 if length: 264 self._error = 'failed to read searchd response (status=%s, ver=%s, len=%s, read=%s)' \ 265 % (status, ver, length, read) 266 else: 267 self._error = 'received zero-sized searchd response' 268 return None 269 270 # check status 271 if status==SEARCHD_WARNING: 272 wend = 4 + unpack ( '>L', response[0:4] )[0] 273 self._warning = response[4:wend] 274 return response[wend:] 275 276 if status==SEARCHD_ERROR: 277 self._error = 'searchd error: '+response[4:] 278 return None 279 280 if status==SEARCHD_RETRY: 281 self._error = 'temporary searchd error: '+response[4:] 282 return None 283 284 if status!=SEARCHD_OK: 285 self._error = 'unknown status code %d' % status 286 return None 287 288 # check version 289 if ver<client_ver: 290 self._warning = 'searchd command v.%d.%d older than client\'s v.%d.%d, some options might not work' \ 291 % (ver>>8, ver&0xff, client_ver>>8, client_ver&0xff) 292 293 return response 294 295 296 def _Send ( self, sock, req ): 297 """ 298 INTERNAL METHOD, DO NOT CALL. send request to searchd server. 299 """ 300 total = 0 301 while True: 302 sent = sock.send ( req[total:] ) 303 if sent<=0: 304 break 305 306 total = total + sent 307 308 return total 309 310 311 def SetLimits (self, offset, limit, maxmatches=0, cutoff=0): 312 """ 313 Set offset and count into result set, and optionally set max-matches and cutoff limits. 314 """ 315 assert ( type(offset) in [int,long] and 0<=offset<16777216 ) 316 assert ( type(limit) in [int,long] and 0<limit<16777216 ) 317 assert(maxmatches>=0) 318 self._offset = offset 319 self._limit = limit 320 if maxmatches>0: 321 self._maxmatches = maxmatches 322 if cutoff>=0: 323 self._cutoff = cutoff 324 325 326 def SetMaxQueryTime (self, maxquerytime): 327 """ 328 Set maximum query time, in milliseconds, per-index. 0 means 'do not limit'. 329 """ 330 assert(isinstance(maxquerytime,int) and maxquerytime>0) 331 self._maxquerytime = maxquerytime 332 333 334 def SetMatchMode (self, mode): 335 """ 336 Set matching mode. 337 """ 338 assert(mode in [SPH_MATCH_ALL, SPH_MATCH_ANY, SPH_MATCH_PHRASE, SPH_MATCH_BOOLEAN, SPH_MATCH_EXTENDED, SPH_MATCH_FULLSCAN, SPH_MATCH_EXTENDED2]) 339 self._mode = mode 340 341 342 def SetRankingMode ( self, ranker, rankexpr='' ): 343 """ 344 Set ranking mode. 345 """ 346 assert(ranker>=0 and ranker<SPH_RANK_TOTAL) 347 self._ranker = ranker 348 self._rankexpr = rankexpr 349 350 351 def SetSortMode ( self, mode, clause='' ): 352 """ 353 Set sorting mode. 354 """ 355 assert ( mode in [SPH_SORT_RELEVANCE, SPH_SORT_ATTR_DESC, SPH_SORT_ATTR_ASC, SPH_SORT_TIME_SEGMENTS, SPH_SORT_EXTENDED, SPH_SORT_EXPR] ) 356 assert ( isinstance ( clause, str ) ) 357 self._sort = mode 358 self._sortby = clause 359 360 361 def SetWeights (self, weights): 362 """ 363 Set per-field weights. 364 WARNING, DEPRECATED; do not use it! use SetFieldWeights() instead 365 """ 366 assert(isinstance(weights, list)) 367 for w in weights: 368 AssertUInt32 ( w ) 369 self._weights = weights 370 371 372 def SetFieldWeights (self, weights): 373 """ 374 Bind per-field weights by name; expects (name,field_weight) dictionary as argument. 375 """ 376 assert(isinstance(weights,dict)) 377 for key,val in weights.items(): 378 assert(isinstance(key,str)) 379 AssertUInt32 ( val ) 380 self._fieldweights = weights 381 382 383 def SetIndexWeights (self, weights): 384 """ 385 Bind per-index weights by name; expects (name,index_weight) dictionary as argument. 386 """ 387 assert(isinstance(weights,dict)) 388 for key,val in weights.items(): 389 assert(isinstance(key,str)) 390 AssertUInt32(val) 391 self._indexweights = weights 392 393 394 def SetIDRange (self, minid, maxid): 395 """ 396 Set IDs range to match. 397 Only match records if document ID is beetwen $min and $max (inclusive). 398 """ 399 assert(isinstance(minid, (int, long))) 400 assert(isinstance(maxid, (int, long))) 401 assert(minid<=maxid) 402 self._min_id = minid 403 self._max_id = maxid 404 405 406 def SetFilter ( self, attribute, values, exclude=0 ): 407 """ 408 Set values set filter. 409 Only match records where 'attribute' value is in given 'values' set. 410 """ 411 assert(isinstance(attribute, str)) 412 assert iter(values) 413 414 for value in values: 415 AssertInt32 ( value ) 416 417 self._filters.append ( { 'type':SPH_FILTER_VALUES, 'attr':attribute, 'exclude':exclude, 'values':values } ) 418 419 420 def SetFilterRange (self, attribute, min_, max_, exclude=0 ): 421 """ 422 Set range filter. 423 Only match records if 'attribute' value is beetwen 'min_' and 'max_' (inclusive). 424 """ 425 assert(isinstance(attribute, str)) 426 AssertInt32(min_) 427 AssertInt32(max_) 428 assert(min_<=max_) 429 430 self._filters.append ( { 'type':SPH_FILTER_RANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_ } ) 431 432 433 def SetFilterFloatRange (self, attribute, min_, max_, exclude=0 ): 434 assert(isinstance(attribute,str)) 435 assert(isinstance(min_,float)) 436 assert(isinstance(max_,float)) 437 assert(min_ <= max_) 438 self._filters.append ( {'type':SPH_FILTER_FLOATRANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_} ) 439 440 441 def SetGeoAnchor (self, attrlat, attrlong, latitude, longitude): 442 assert(isinstance(attrlat,str)) 443 assert(isinstance(attrlong,str)) 444 assert(isinstance(latitude,float)) 445 assert(isinstance(longitude,float)) 446 self._anchor['attrlat'] = attrlat 447 self._anchor['attrlong'] = attrlong 448 self._anchor['lat'] = latitude 449 self._anchor['long'] = longitude 450 451 452 def SetGroupBy ( self, attribute, func, groupsort='@group desc' ): 453 """ 454 Set grouping attribute and function. 455 """ 456 assert(isinstance(attribute, str)) 457 assert(func in [SPH_GROUPBY_DAY, SPH_GROUPBY_WEEK, SPH_GROUPBY_MONTH, SPH_GROUPBY_YEAR, SPH_GROUPBY_ATTR, SPH_GROUPBY_ATTRPAIR] ) 458 assert(isinstance(groupsort, str)) 459 460 self._groupby = attribute 461 self._groupfunc = func 462 self._groupsort = groupsort 463 464 465 def SetGroupDistinct (self, attribute): 466 assert(isinstance(attribute,str)) 467 self._groupdistinct = attribute 468 469 470 def SetRetries (self, count, delay=0): 471 assert(isinstance(count,int) and count>=0) 472 assert(isinstance(delay,int) and delay>=0) 473 self._retrycount = count 474 self._retrydelay = delay 475 476 477 def SetOverride (self, name, type, values): 478 assert(isinstance(name, str)) 479 assert(type in SPH_ATTR_TYPES) 480 assert(isinstance(values, dict)) 481 482 self._overrides[name] = {'name': name, 'type': type, 'values': values} 483 484 def SetSelect (self, select): 485 assert(isinstance(select, str)) 486 self._select = select 487 488 489 def ResetOverrides (self): 490 self._overrides = {} 491 492 493 def ResetFilters (self): 494 """ 495 Clear all filters (for multi-queries). 496 """ 497 self._filters = [] 498 self._anchor = {} 499 500 501 def ResetGroupBy (self): 502 """ 503 Clear groupby settings (for multi-queries). 504 """ 505 self._groupby = '' 506 self._groupfunc = SPH_GROUPBY_DAY 507 self._groupsort = '@group desc' 508 self._groupdistinct = '' 509 510 511 def Query (self, query, index='*', comment=''): 512 """ 513 Connect to searchd server and run given search query. 514 Returns None on failure; result set hash on success (see documentation for details). 515 """ 516 assert(len(self._reqs)==0) 517 self.AddQuery(query,index,comment) 518 results = self.RunQueries() 519 self._reqs = [] # we won't re-run erroneous batch 520 521 if not results or len(results)==0: 522 return None 523 self._error = results[0]['error'] 524 self._warning = results[0]['warning'] 525 if results[0]['status'] == SEARCHD_ERROR: 526 return None 527 return results[0] 528 529 530 def AddQuery (self, query, index='*', comment=''): 531 """ 532 Add query to batch. 533 """ 534 # build request 535 req = [] 536 req.append(pack('>4L', self._offset, self._limit, self._mode, self._ranker)) 537 if self._ranker==SPH_RANK_EXPR: 538 req.append(pack('>L', len(self._rankexpr))) 539 req.append(self._rankexpr) 540 req.append(pack('>L', self._sort)) 541 req.append(pack('>L', len(self._sortby))) 542 req.append(self._sortby) 543 544 if isinstance(query,unicode): 545 query = query.encode('utf-8') 546 assert(isinstance(query,str)) 547 548 req.append(pack('>L', len(query))) 549 req.append(query) 550 551 req.append(pack('>L', len(self._weights))) 552 for w in self._weights: 553 req.append(pack('>L', w)) 554 assert(isinstance(index,str)) 555 req.append(pack('>L', len(index))) 556 req.append(index) 557 req.append(pack('>L',1)) # id64 range marker 558 req.append(pack('>Q', self._min_id)) 559 req.append(pack('>Q', self._max_id)) 560 561 # filters 562 req.append ( pack ( '>L', len(self._filters) ) ) 563 for f in self._filters: 564 req.append ( pack ( '>L', len(f['attr'])) + f['attr']) 565 filtertype = f['type'] 566 req.append ( pack ( '>L', filtertype)) 567 if filtertype == SPH_FILTER_VALUES: 568 req.append ( pack ('>L', len(f['values']))) 569 for val in f['values']: 570 req.append ( pack ('>q', val)) 571 elif filtertype == SPH_FILTER_RANGE: 572 req.append ( pack ('>2q', f['min'], f['max'])) 573 elif filtertype == SPH_FILTER_FLOATRANGE: 574 req.append ( pack ('>2f', f['min'], f['max'])) 575 req.append ( pack ( '>L', f['exclude'] ) ) 576 577 # group-by, max-matches, group-sort 578 req.append ( pack ( '>2L', self._groupfunc, len(self._groupby) ) ) 579 req.append ( self._groupby ) 580 req.append ( pack ( '>2L', self._maxmatches, len(self._groupsort) ) ) 581 req.append ( self._groupsort ) 582 req.append ( pack ( '>LLL', self._cutoff, self._retrycount, self._retrydelay)) 583 req.append ( pack ( '>L', len(self._groupdistinct))) 584 req.append ( self._groupdistinct) 585 586 # anchor point 587 if len(self._anchor) == 0: 588 req.append ( pack ('>L', 0)) 589 else: 590 attrlat, attrlong = self._anchor['attrlat'], self._anchor['attrlong'] 591 latitude, longitude = self._anchor['lat'], self._anchor['long'] 592 req.append ( pack ('>L', 1)) 593 req.append ( pack ('>L', len(attrlat)) + attrlat) 594 req.append ( pack ('>L', len(attrlong)) + attrlong) 595 req.append ( pack ('>f', latitude) + pack ('>f', longitude)) 596 597 # per-index weights 598 req.append ( pack ('>L',len(self._indexweights))) 599 for indx,weight in self._indexweights.items(): 600 req.append ( pack ('>L',len(indx)) + indx + pack ('>L',weight)) 601 602 # max query time 603 req.append ( pack ('>L', self._maxquerytime) ) 604 605 # per-field weights 606 req.append ( pack ('>L',len(self._fieldweights) ) ) 607 for field,weight in self._fieldweights.items(): 608 req.append ( pack ('>L',len(field)) + field + pack ('>L',weight) ) 609 610 # comment 611 comment = str(comment) 612 req.append ( pack('>L',len(comment)) + comment ) 613 614 # attribute overrides 615 req.append ( pack('>L', len(self._overrides)) ) 616 for v in self._overrides.values(): 617 req.extend ( ( pack('>L', len(v['name'])), v['name'] ) ) 618 req.append ( pack('>LL', v['type'], len(v['values'])) ) 619 for id, value in v['values'].iteritems(): 620 req.append ( pack('>Q', id) ) 621 if v['type'] == SPH_ATTR_FLOAT: 622 req.append ( pack('>f', value) ) 623 elif v['type'] == SPH_ATTR_BIGINT: 624 req.append ( pack('>q', value) ) 625 else: 626 req.append ( pack('>l', value) ) 627 628 # select-list 629 req.append ( pack('>L', len(self._select)) ) 630 req.append ( self._select ) 631 632 # send query, get response 633 req = ''.join(req) 634 635 self._reqs.append(req) 636 return 637 638 639 def RunQueries (self): 640 """ 641 Run queries batch. 642 Returns None on network IO failure; or an array of result set hashes on success. 643 """ 644 if len(self._reqs)==0: 645 self._error = 'no queries defined, issue AddQuery() first' 646 return None 647 648 sock = self._Connect() 649 if not sock: 650 return None 651 652 req = ''.join(self._reqs) 653 length = len(req)+8 654 req = pack('>HHLLL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, length, 0, len(self._reqs))+req 655 self._Send ( sock, req ) 656 657 response = self._GetResponse(sock, VER_COMMAND_SEARCH) 658 if not response: 659 return None 660 661 nreqs = len(self._reqs) 662 663 # parse response 664 max_ = len(response) 665 p = 0 666 667 results = [] 668 for i in range(0,nreqs,1): 669 result = {} 670 results.append(result) 671 672 result['error'] = '' 673 result['warning'] = '' 674 status = unpack('>L', response[p:p+4])[0] 675 p += 4 676 result['status'] = status 677 if status != SEARCHD_OK: 678 length = unpack('>L', response[p:p+4])[0] 679 p += 4 680 message = response[p:p+length] 681 p += length 682 683 if status == SEARCHD_WARNING: 684 result['warning'] = message 685 else: 686 result['error'] = message 687 continue 688 689 # read schema 690 fields = [] 691 attrs = [] 692 693 nfields = unpack('>L', response[p:p+4])[0] 694 p += 4 695 while nfields>0 and p<max_: 696 nfields -= 1 697 length = unpack('>L', response[p:p+4])[0] 698 p += 4 699 fields.append(response[p:p+length]) 700 p += length 701 702 result['fields'] = fields 703 704 nattrs = unpack('>L', response[p:p+4])[0] 705 p += 4 706 while nattrs>0 and p<max_: 707 nattrs -= 1 708 length = unpack('>L', response[p:p+4])[0] 709 p += 4 710 attr = response[p:p+length] 711 p += length 712 type_ = unpack('>L', response[p:p+4])[0] 713 p += 4 714 attrs.append([attr,type_]) 715 716 result['attrs'] = attrs 717 718 # read match count 719 count = unpack('>L', response[p:p+4])[0] 720 p += 4 721 id64 = unpack('>L', response[p:p+4])[0] 722 p += 4 723 724 # read matches 725 result['matches'] = [] 726 while count>0 and p<max_: 727 count -= 1 728 if id64: 729 doc, weight = unpack('>QL', response[p:p+12]) 730 p += 12 731 else: 732 doc, weight = unpack('>2L', response[p:p+8]) 733 p += 8 734 735 match = { 'id':doc, 'weight':weight, 'attrs':{} } 736 for i in range(len(attrs)): 737 if attrs[i][1] == SPH_ATTR_FLOAT: 738 match['attrs'][attrs[i][0]] = unpack('>f', response[p:p+4])[0] 739 elif attrs[i][1] == SPH_ATTR_BIGINT: 740 match['attrs'][attrs[i][0]] = unpack('>q', response[p:p+8])[0] 741 p += 4 742 elif attrs[i][1] == SPH_ATTR_STRING: 743 slen = unpack('>L', response[p:p+4])[0] 744 p += 4 745 match['attrs'][attrs[i][0]] = '' 746 if slen>0: 747 match['attrs'][attrs[i][0]] = response[p:p+slen] 748 p += slen-4 749 elif attrs[i][1] == SPH_ATTR_MULTI: 750 match['attrs'][attrs[i][0]] = [] 751 nvals = unpack('>L', response[p:p+4])[0] 752 p += 4 753 for n in range(0,nvals,1): 754 match['attrs'][attrs[i][0]].append(unpack('>L', response[p:p+4])[0]) 755 p += 4 756 p -= 4 757 elif attrs[i][1] == SPH_ATTR_MULTI64: 758 match['attrs'][attrs[i][0]] = [] 759 nvals = unpack('>L', response[p:p+4])[0] 760 nvals = nvals/2 761 p += 4 762 for n in range(0,nvals,1): 763 match['attrs'][attrs[i][0]].append(unpack('>q', response[p:p+8])[0]) 764 p += 8 765 p -= 4 766 else: 767 match['attrs'][attrs[i][0]] = unpack('>L', response[p:p+4])[0] 768 p += 4 769 770 result['matches'].append ( match ) 771 772 result['total'], result['total_found'], result['time'], words = unpack('>4L', response[p:p+16]) 773 774 result['time'] = '%.3f' % (result['time']/1000.0) 775 p += 16 776 777 result['words'] = [] 778 while words>0: 779 words -= 1 780 length = unpack('>L', response[p:p+4])[0] 781 p += 4 782 word = response[p:p+length] 783 p += length 784 docs, hits = unpack('>2L', response[p:p+8]) 785 p += 8 786 787 result['words'].append({'word':word, 'docs':docs, 'hits':hits}) 788 789 self._reqs = [] 790 return results 791 792 793 def BuildExcerpts (self, docs, index, words, opts=None): 794 """ 795 Connect to searchd server and generate exceprts from given documents. 796 """ 797 if not opts: 798 opts = {} 799 if isinstance(words,unicode): 800 words = words.encode('utf-8') 801 802 assert(isinstance(docs, list)) 803 assert(isinstance(index, str)) 804 assert(isinstance(words, str)) 805 assert(isinstance(opts, dict)) 806 807 sock = self._Connect() 808 809 if not sock: 810 return None 811 812 # fixup options 813 opts.setdefault('before_match', '<b>') 814 opts.setdefault('after_match', '</b>') 815 opts.setdefault('chunk_separator', ' ... ') 816 opts.setdefault('html_strip_mode', 'index') 817 opts.setdefault('limit', 256) 818 opts.setdefault('limit_passages', 0) 819 opts.setdefault('limit_words', 0) 820 opts.setdefault('around', 5) 821 opts.setdefault('start_passage_id', 1) 822 opts.setdefault('passage_boundary', 'none') 823 824 # build request 825 # v.1.0 req 826 827 flags = 1 # (remove spaces) 828 if opts.get('exact_phrase'): flags |= 2 829 if opts.get('single_passage'): flags |= 4 830 if opts.get('use_boundaries'): flags |= 8 831 if opts.get('weight_order'): flags |= 16 832 if opts.get('query_mode'): flags |= 32 833 if opts.get('force_all_words'): flags |= 64 834 if opts.get('load_files'): flags |= 128 835 if opts.get('allow_empty'): flags |= 256 836 if opts.get('emit_zones'): flags |= 512 837 if opts.get('load_files_scattered'): flags |= 1024 838 839 # mode=0, flags 840 req = [pack('>2L', 0, flags)] 841 842 # req index 843 req.append(pack('>L', len(index))) 844 req.append(index) 845 846 # req words 847 req.append(pack('>L', len(words))) 848 req.append(words) 849 850 # options 851 req.append(pack('>L', len(opts['before_match']))) 852 req.append(opts['before_match']) 853 854 req.append(pack('>L', len(opts['after_match']))) 855 req.append(opts['after_match']) 856 857 req.append(pack('>L', len(opts['chunk_separator']))) 858 req.append(opts['chunk_separator']) 859 860 req.append(pack('>L', int(opts['limit']))) 861 req.append(pack('>L', int(opts['around']))) 862 863 req.append(pack('>L', int(opts['limit_passages']))) 864 req.append(pack('>L', int(opts['limit_words']))) 865 req.append(pack('>L', int(opts['start_passage_id']))) 866 req.append(pack('>L', len(opts['html_strip_mode']))) 867 req.append((opts['html_strip_mode'])) 868 req.append(pack('>L', len(opts['passage_boundary']))) 869 req.append((opts['passage_boundary'])) 870 871 # documents 872 req.append(pack('>L', len(docs))) 873 for doc in docs: 874 if isinstance(doc,unicode): 875 doc = doc.encode('utf-8') 876 assert(isinstance(doc, str)) 877 req.append(pack('>L', len(doc))) 878 req.append(doc) 879 880 req = ''.join(req) 881 882 # send query, get response 883 length = len(req) 884 885 # add header 886 req = pack('>2HL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, length)+req 887 self._Send ( sock, req ) 888 889 response = self._GetResponse(sock, VER_COMMAND_EXCERPT ) 890 if not response: 891 return [] 892 893 # parse response 894 pos = 0 895 res = [] 896 rlen = len(response) 897 898 for i in range(len(docs)): 899 length = unpack('>L', response[pos:pos+4])[0] 900 pos += 4 901 902 if pos+length > rlen: 903 self._error = 'incomplete reply' 904 return [] 905 906 res.append(response[pos:pos+length]) 907 pos += length 908 909 return res 910 911 912 def UpdateAttributes ( self, index, attrs, values, mva=False ): 913 """ 914 Update given attribute values on given documents in given indexes. 915 Returns amount of updated documents (0 or more) on success, or -1 on failure. 916 917 'attrs' must be a list of strings. 918 'values' must be a dict with int key (document ID) and list of int values (new attribute values). 919 optional boolean parameter 'mva' points that there is update of MVA attributes. 920 In this case the 'values' must be a dict with int key (document ID) and list of lists of int values 921 (new MVA attribute values). 922 923 Example: 924 res = cl.UpdateAttributes ( 'test1', [ 'group_id', 'date_added' ], { 2:[123,1000000000], 4:[456,1234567890] } ) 925 """ 926 assert ( isinstance ( index, str ) ) 927 assert ( isinstance ( attrs, list ) ) 928 assert ( isinstance ( values, dict ) ) 929 for attr in attrs: 930 assert ( isinstance ( attr, str ) ) 931 for docid, entry in values.items(): 932 AssertUInt32(docid) 933 assert ( isinstance ( entry, list ) ) 934 assert ( len(attrs)==len(entry) ) 935 for val in entry: 936 if mva: 937 assert ( isinstance ( val, list ) ) 938 for vals in val: 939 AssertInt32(vals) 940 else: 941 AssertInt32(val) 942 943 # build request 944 req = [ pack('>L',len(index)), index ] 945 946 req.append ( pack('>L',len(attrs)) ) 947 mva_attr = 0 948 if mva: mva_attr = 1 949 for attr in attrs: 950 req.append ( pack('>L',len(attr)) + attr ) 951 req.append ( pack('>L', mva_attr ) ) 952 953 req.append ( pack('>L',len(values)) ) 954 for docid, entry in values.items(): 955 req.append ( pack('>Q',docid) ) 956 for val in entry: 957 val_len = val 958 if mva: val_len = len ( val ) 959 req.append ( pack('>L',val_len ) ) 960 if mva: 961 for vals in val: 962 req.append ( pack ('>L',vals) ) 963 964 # connect, send query, get response 965 sock = self._Connect() 966 if not sock: 967 return None 968 969 req = ''.join(req) 970 length = len(req) 971 req = pack ( '>2HL', SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, length ) + req 972 self._Send ( sock, req ) 973 974 response = self._GetResponse ( sock, VER_COMMAND_UPDATE ) 975 if not response: 976 return -1 977 978 # parse response 979 updated = unpack ( '>L', response[0:4] )[0] 980 return updated 981 982 983 def BuildKeywords ( self, query, index, hits ): 984 """ 985 Connect to searchd server, and generate keywords list for a given query. 986 Returns None on failure, or a list of keywords on success. 987 """ 988 assert ( isinstance ( query, str ) ) 989 assert ( isinstance ( index, str ) ) 990 assert ( isinstance ( hits, int ) ) 991 992 # build request 993 req = [ pack ( '>L', len(query) ) + query ] 994 req.append ( pack ( '>L', len(index) ) + index ) 995 req.append ( pack ( '>L', hits ) ) 996 997 # connect, send query, get response 998 sock = self._Connect() 999 if not sock: 1000 return None 1001 1002 req = ''.join(req) 1003 length = len(req) 1004 req = pack ( '>2HL', SEARCHD_COMMAND_KEYWORDS, VER_COMMAND_KEYWORDS, length ) + req 1005 self._Send ( sock, req ) 1006 1007 response = self._GetResponse ( sock, VER_COMMAND_KEYWORDS ) 1008 if not response: 1009 return None 1010 1011 # parse response 1012 res = [] 1013 1014 nwords = unpack ( '>L', response[0:4] )[0] 1015 p = 4 1016 max_ = len(response) 1017 1018 while nwords>0 and p<max_: 1019 nwords -= 1 1020 1021 length = unpack ( '>L', response[p:p+4] )[0] 1022 p += 4 1023 tokenized = response[p:p+length] 1024 p += length 1025 1026 length = unpack ( '>L', response[p:p+4] )[0] 1027 p += 4 1028 normalized = response[p:p+length] 1029 p += length 1030 1031 entry = { 'tokenized':tokenized, 'normalized':normalized } 1032 if hits: 1033 entry['docs'], entry['hits'] = unpack ( '>2L', response[p:p+8] ) 1034 p += 8 1035 1036 res.append ( entry ) 1037 1038 if nwords>0 or p>max_: 1039 self._error = 'incomplete reply' 1040 return None 1041 1042 return res 1043 1044 def Status ( self ): 1045 """ 1046 Get the status 1047 """ 1048 1049 # connect, send query, get response 1050 sock = self._Connect() 1051 if not sock: 1052 return None 1053 1054 req = pack ( '>2HLL', SEARCHD_COMMAND_STATUS, VER_COMMAND_STATUS, 4, 1 ) 1055 self._Send ( sock, req ) 1056 1057 response = self._GetResponse ( sock, VER_COMMAND_STATUS ) 1058 if not response: 1059 return None 1060 1061 # parse response 1062 res = [] 1063 1064 p = 8 1065 max_ = len(response) 1066 1067 while p<max_: 1068 length = unpack ( '>L', response[p:p+4] )[0] 1069 k = response[p+4:p+length+4] 1070 p += 4+length 1071 length = unpack ( '>L', response[p:p+4] )[0] 1072 v = response[p+4:p+length+4] 1073 p += 4+length 1074 res += [[k, v]] 1075 1076 return res 1077 1078 ### persistent connections 1079 1080 def Open(self): 1081 if self._socket: 1082 self._error = 'already connected' 1083 return None 1084 1085 server = self._Connect() 1086 if not server: 1087 return None 1088 1089 # command, command version = 0, body length = 4, body = 1 1090 request = pack ( '>hhII', SEARCHD_COMMAND_PERSIST, 0, 4, 1 ) 1091 self._Send ( server, request ) 1092 1093 self._socket = server 1094 return True 1095 1096 def Close(self): 1097 if not self._socket: 1098 self._error = 'not connected' 1099 return 1100 self._socket.close() 1101 self._socket = None 1102 1103 def EscapeString(self, string): 1104 return re.sub(r"([=\(\)|\-!@~\"&/\\\^\$\=])", r"\\\1", string) 1105 1106 1107 def FlushAttributes(self): 1108 sock = self._Connect() 1109 if not sock: 1110 return -1 1111 1112 request = pack ( '>hhI', SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, 0 ) # cmd, ver, bodylen 1113 self._Send ( sock, request ) 1114 1115 response = self._GetResponse ( sock, VER_COMMAND_FLUSHATTRS ) 1116 if not response or len(response)!=4: 1117 self._error = 'unexpected response length' 1118 return -1 1119 1120 tag = unpack ( '>L', response[0:4] )[0] 1121 return tag 1122 1123def AssertInt32 ( value ): 1124 assert(isinstance(value, (int, long))) 1125 assert(value>=-2**32-1 and value<=2**32-1) 1126 1127def AssertUInt32 ( value ): 1128 assert(isinstance(value, (int, long))) 1129 assert(value>=0 and value<=2**32-1) 1130 1131# 1132# $Id: sphinxapi.py 3436 2012-10-08 09:17:18Z kevg $ 1133# 1134