1#
2# $Id: sphinxapi.py 3436 2012-10-08 09:17:18Z kevg $
3#
4# Python version of Sphinx searchd client (Python API)
5#
6# Copyright (c) 2006, Mike Osadnik
7# Copyright (c) 2006-2012, Andrew Aksyonoff
8# Copyright (c) 2008-2012, Sphinx Technologies Inc
9# All rights reserved
10#
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License. You should have
13# received a copy of the GPL license along with this program; if you
14# did not, you can find it at http://www.gnu.org/
15#
16
17import sys
18import select
19import socket
20import re
21from struct import *
22
23
24# known searchd commands
25SEARCHD_COMMAND_SEARCH		= 0
26SEARCHD_COMMAND_EXCERPT		= 1
27SEARCHD_COMMAND_UPDATE		= 2
28SEARCHD_COMMAND_KEYWORDS	= 3
29SEARCHD_COMMAND_PERSIST		= 4
30SEARCHD_COMMAND_STATUS		= 5
31SEARCHD_COMMAND_FLUSHATTRS	= 7
32
33# current client-side command implementation versions
34VER_COMMAND_SEARCH		= 0x119
35VER_COMMAND_EXCERPT		= 0x104
36VER_COMMAND_UPDATE		= 0x102
37VER_COMMAND_KEYWORDS	= 0x100
38VER_COMMAND_STATUS		= 0x100
39VER_COMMAND_FLUSHATTRS	= 0x100
40
41# known searchd status codes
42SEARCHD_OK				= 0
43SEARCHD_ERROR			= 1
44SEARCHD_RETRY			= 2
45SEARCHD_WARNING			= 3
46
47# known match modes
48SPH_MATCH_ALL			= 0
49SPH_MATCH_ANY			= 1
50SPH_MATCH_PHRASE		= 2
51SPH_MATCH_BOOLEAN		= 3
52SPH_MATCH_EXTENDED		= 4
53SPH_MATCH_FULLSCAN		= 5
54SPH_MATCH_EXTENDED2		= 6
55
56# known ranking modes (extended2 mode only)
57SPH_RANK_PROXIMITY_BM25	= 0 # default mode, phrase proximity major factor and BM25 minor one
58SPH_RANK_BM25			= 1 # statistical mode, BM25 ranking only (faster but worse quality)
59SPH_RANK_NONE			= 2 # no ranking, all matches get a weight of 1
60SPH_RANK_WORDCOUNT		= 3 # simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
61SPH_RANK_PROXIMITY		= 4
62SPH_RANK_MATCHANY		= 5
63SPH_RANK_FIELDMASK		= 6
64SPH_RANK_SPH04			= 7
65SPH_RANK_EXPR			= 8
66SPH_RANK_TOTAL			= 9
67
68# known sort modes
69SPH_SORT_RELEVANCE		= 0
70SPH_SORT_ATTR_DESC		= 1
71SPH_SORT_ATTR_ASC		= 2
72SPH_SORT_TIME_SEGMENTS	= 3
73SPH_SORT_EXTENDED		= 4
74SPH_SORT_EXPR			= 5
75
76# known filter types
77SPH_FILTER_VALUES		= 0
78SPH_FILTER_RANGE		= 1
79SPH_FILTER_FLOATRANGE	= 2
80
81# known attribute types
82SPH_ATTR_NONE			= 0
83SPH_ATTR_INTEGER		= 1
84SPH_ATTR_TIMESTAMP		= 2
85SPH_ATTR_ORDINAL		= 3
86SPH_ATTR_BOOL			= 4
87SPH_ATTR_FLOAT			= 5
88SPH_ATTR_BIGINT			= 6
89SPH_ATTR_STRING			= 7
90SPH_ATTR_MULTI			= 0X40000001L
91SPH_ATTR_MULTI64		= 0X40000002L
92
93SPH_ATTR_TYPES = (SPH_ATTR_NONE,
94				  SPH_ATTR_INTEGER,
95				  SPH_ATTR_TIMESTAMP,
96				  SPH_ATTR_ORDINAL,
97				  SPH_ATTR_BOOL,
98				  SPH_ATTR_FLOAT,
99				  SPH_ATTR_BIGINT,
100				  SPH_ATTR_STRING,
101				  SPH_ATTR_MULTI,
102				  SPH_ATTR_MULTI64)
103
104# known grouping functions
105SPH_GROUPBY_DAY	 		= 0
106SPH_GROUPBY_WEEK		= 1
107SPH_GROUPBY_MONTH		= 2
108SPH_GROUPBY_YEAR		= 3
109SPH_GROUPBY_ATTR		= 4
110SPH_GROUPBY_ATTRPAIR	= 5
111
112
113class SphinxClient:
114	def __init__ (self):
115		"""
116		Create a new client object, and fill defaults.
117		"""
118		self._host			= 'localhost'					# searchd host (default is "localhost")
119		self._port			= 9312							# searchd port (default is 9312)
120		self._path			= None							# searchd unix-domain socket path
121		self._socket		= None
122		self._offset		= 0								# how much records to seek from result-set start (default is 0)
123		self._limit			= 20							# how much records to return from result-set starting at offset (default is 20)
124		self._mode			= SPH_MATCH_ALL					# query matching mode (default is SPH_MATCH_ALL)
125		self._weights		= []							# per-field weights (default is 1 for all fields)
126		self._sort			= SPH_SORT_RELEVANCE			# match sorting mode (default is SPH_SORT_RELEVANCE)
127		self._sortby		= ''							# attribute to sort by (defualt is "")
128		self._min_id		= 0								# min ID to match (default is 0)
129		self._max_id		= 0								# max ID to match (default is UINT_MAX)
130		self._filters		= []							# search filters
131		self._groupby		= ''							# group-by attribute name
132		self._groupfunc		= SPH_GROUPBY_DAY				# group-by function (to pre-process group-by attribute value with)
133		self._groupsort		= '@group desc'					# group-by sorting clause (to sort groups in result set with)
134		self._groupdistinct	= ''							# group-by count-distinct attribute
135		self._maxmatches	= 1000							# max matches to retrieve
136		self._cutoff		= 0								# cutoff to stop searching at
137		self._retrycount	= 0								# distributed retry count
138		self._retrydelay	= 0								# distributed retry delay
139		self._anchor		= {}							# geographical anchor point
140		self._indexweights	= {}							# per-index weights
141		self._ranker		= SPH_RANK_PROXIMITY_BM25		# ranking mode
142		self._rankexpr		= ''							# ranking expression for SPH_RANK_EXPR
143		self._maxquerytime	= 0								# max query time, milliseconds (default is 0, do not limit)
144		self._timeout = 1.0										# connection timeout
145		self._fieldweights	= {}							# per-field-name weights
146		self._overrides		= {}							# per-query attribute values overrides
147		self._select		= '*'							# select-list (attributes or expressions, with optional aliases)
148
149		self._error			= ''							# last error message
150		self._warning		= ''							# last warning message
151		self._reqs			= []							# requests array for multi-query
152
153	def __del__ (self):
154		if self._socket:
155			self._socket.close()
156
157
158	def GetLastError (self):
159		"""
160		Get last error message (string).
161		"""
162		return self._error
163
164
165	def GetLastWarning (self):
166		"""
167		Get last warning message (string).
168		"""
169		return self._warning
170
171
172	def SetServer (self, host, port = None):
173		"""
174		Set searchd server host and port.
175		"""
176		assert(isinstance(host, str))
177		if host.startswith('/'):
178			self._path = host
179			return
180		elif host.startswith('unix://'):
181			self._path = host[7:]
182			return
183		self._host = host
184		if isinstance(port, int):
185			assert(port>0 and port<65536)
186			self._port = port
187		self._path = None
188
189	def SetConnectTimeout ( self, timeout ):
190		"""
191		Set connection timeout ( float second )
192		"""
193		assert (isinstance(timeout, float))
194		# set timeout to 0 make connaection non-blocking that is wrong so timeout got clipped to reasonable minimum
195		self._timeout = max ( 0.001, timeout )
196
197	def _Connect (self):
198		"""
199		INTERNAL METHOD, DO NOT CALL. Connects to searchd server.
200		"""
201		if self._socket:
202			# we have a socket, but is it still alive?
203			sr, sw, _ = select.select ( [self._socket], [self._socket], [], 0 )
204
205			# this is how alive socket should look
206			if len(sr)==0 and len(sw)==1:
207				return self._socket
208
209			# oops, looks like it was closed, lets reopen
210			self._socket.close()
211			self._socket = None
212
213		try:
214			if self._path:
215				af = socket.AF_UNIX
216				addr = self._path
217				desc = self._path
218			else:
219				af = socket.AF_INET
220				addr = ( self._host, self._port )
221				desc = '%s;%s' % addr
222			sock = socket.socket ( af, socket.SOCK_STREAM )
223			sock.settimeout ( self._timeout )
224			sock.connect ( addr )
225		except socket.error, msg:
226			if sock:
227				sock.close()
228			self._error = 'connection to %s failed (%s)' % ( desc, msg )
229			return
230
231		v = unpack('>L', sock.recv(4))
232		if v<1:
233			sock.close()
234			self._error = 'expected searchd protocol version, got %s' % v
235			return
236
237		# all ok, send my version
238		sock.send(pack('>L', 1))
239		return sock
240
241
242	def _GetResponse (self, sock, client_ver):
243		"""
244		INTERNAL METHOD, DO NOT CALL. Gets and checks response packet from searchd server.
245		"""
246		(status, ver, length) = unpack('>2HL', sock.recv(8))
247		response = ''
248		left = length
249		while left>0:
250			chunk = sock.recv(left)
251			if chunk:
252				response += chunk
253				left -= len(chunk)
254			else:
255				break
256
257		if not self._socket:
258			sock.close()
259
260		# check response
261		read = len(response)
262		if not response or read!=length:
263			if length:
264				self._error = 'failed to read searchd response (status=%s, ver=%s, len=%s, read=%s)' \
265					% (status, ver, length, read)
266			else:
267				self._error = 'received zero-sized searchd response'
268			return None
269
270		# check status
271		if status==SEARCHD_WARNING:
272			wend = 4 + unpack ( '>L', response[0:4] )[0]
273			self._warning = response[4:wend]
274			return response[wend:]
275
276		if status==SEARCHD_ERROR:
277			self._error = 'searchd error: '+response[4:]
278			return None
279
280		if status==SEARCHD_RETRY:
281			self._error = 'temporary searchd error: '+response[4:]
282			return None
283
284		if status!=SEARCHD_OK:
285			self._error = 'unknown status code %d' % status
286			return None
287
288		# check version
289		if ver<client_ver:
290			self._warning = 'searchd command v.%d.%d older than client\'s v.%d.%d, some options might not work' \
291				% (ver>>8, ver&0xff, client_ver>>8, client_ver&0xff)
292
293		return response
294
295
296	def _Send ( self, sock, req ):
297		"""
298		INTERNAL METHOD, DO NOT CALL. send request to searchd server.
299		"""
300		total = 0
301		while True:
302			sent = sock.send ( req[total:] )
303			if sent<=0:
304				break
305
306			total = total + sent
307
308		return total
309
310
311	def SetLimits (self, offset, limit, maxmatches=0, cutoff=0):
312		"""
313		Set offset and count into result set, and optionally set max-matches and cutoff limits.
314		"""
315		assert ( type(offset) in [int,long] and 0<=offset<16777216 )
316		assert ( type(limit) in [int,long] and 0<limit<16777216 )
317		assert(maxmatches>=0)
318		self._offset = offset
319		self._limit = limit
320		if maxmatches>0:
321			self._maxmatches = maxmatches
322		if cutoff>=0:
323			self._cutoff = cutoff
324
325
326	def SetMaxQueryTime (self, maxquerytime):
327		"""
328		Set maximum query time, in milliseconds, per-index. 0 means 'do not limit'.
329		"""
330		assert(isinstance(maxquerytime,int) and maxquerytime>0)
331		self._maxquerytime = maxquerytime
332
333
334	def SetMatchMode (self, mode):
335		"""
336		Set matching mode.
337		"""
338		assert(mode in [SPH_MATCH_ALL, SPH_MATCH_ANY, SPH_MATCH_PHRASE, SPH_MATCH_BOOLEAN, SPH_MATCH_EXTENDED, SPH_MATCH_FULLSCAN, SPH_MATCH_EXTENDED2])
339		self._mode = mode
340
341
342	def SetRankingMode ( self, ranker, rankexpr='' ):
343		"""
344		Set ranking mode.
345		"""
346		assert(ranker>=0 and ranker<SPH_RANK_TOTAL)
347		self._ranker = ranker
348		self._rankexpr = rankexpr
349
350
351	def SetSortMode ( self, mode, clause='' ):
352		"""
353		Set sorting mode.
354		"""
355		assert ( mode in [SPH_SORT_RELEVANCE, SPH_SORT_ATTR_DESC, SPH_SORT_ATTR_ASC, SPH_SORT_TIME_SEGMENTS, SPH_SORT_EXTENDED, SPH_SORT_EXPR] )
356		assert ( isinstance ( clause, str ) )
357		self._sort = mode
358		self._sortby = clause
359
360
361	def SetWeights (self, weights):
362		"""
363		Set per-field weights.
364		WARNING, DEPRECATED; do not use it! use SetFieldWeights() instead
365		"""
366		assert(isinstance(weights, list))
367		for w in weights:
368			AssertUInt32 ( w )
369		self._weights = weights
370
371
372	def SetFieldWeights (self, weights):
373		"""
374		Bind per-field weights by name; expects (name,field_weight) dictionary as argument.
375		"""
376		assert(isinstance(weights,dict))
377		for key,val in weights.items():
378			assert(isinstance(key,str))
379			AssertUInt32 ( val )
380		self._fieldweights = weights
381
382
383	def SetIndexWeights (self, weights):
384		"""
385		Bind per-index weights by name; expects (name,index_weight) dictionary as argument.
386		"""
387		assert(isinstance(weights,dict))
388		for key,val in weights.items():
389			assert(isinstance(key,str))
390			AssertUInt32(val)
391		self._indexweights = weights
392
393
394	def SetIDRange (self, minid, maxid):
395		"""
396		Set IDs range to match.
397		Only match records if document ID is beetwen $min and $max (inclusive).
398		"""
399		assert(isinstance(minid, (int, long)))
400		assert(isinstance(maxid, (int, long)))
401		assert(minid<=maxid)
402		self._min_id = minid
403		self._max_id = maxid
404
405
406	def SetFilter ( self, attribute, values, exclude=0 ):
407		"""
408		Set values set filter.
409		Only match records where 'attribute' value is in given 'values' set.
410		"""
411		assert(isinstance(attribute, str))
412		assert iter(values)
413
414		for value in values:
415			AssertInt32 ( value )
416
417		self._filters.append ( { 'type':SPH_FILTER_VALUES, 'attr':attribute, 'exclude':exclude, 'values':values } )
418
419
420	def SetFilterRange (self, attribute, min_, max_, exclude=0 ):
421		"""
422		Set range filter.
423		Only match records if 'attribute' value is beetwen 'min_' and 'max_' (inclusive).
424		"""
425		assert(isinstance(attribute, str))
426		AssertInt32(min_)
427		AssertInt32(max_)
428		assert(min_<=max_)
429
430		self._filters.append ( { 'type':SPH_FILTER_RANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_ } )
431
432
433	def SetFilterFloatRange (self, attribute, min_, max_, exclude=0 ):
434		assert(isinstance(attribute,str))
435		assert(isinstance(min_,float))
436		assert(isinstance(max_,float))
437		assert(min_ <= max_)
438		self._filters.append ( {'type':SPH_FILTER_FLOATRANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_} )
439
440
441	def SetGeoAnchor (self, attrlat, attrlong, latitude, longitude):
442		assert(isinstance(attrlat,str))
443		assert(isinstance(attrlong,str))
444		assert(isinstance(latitude,float))
445		assert(isinstance(longitude,float))
446		self._anchor['attrlat'] = attrlat
447		self._anchor['attrlong'] = attrlong
448		self._anchor['lat'] = latitude
449		self._anchor['long'] = longitude
450
451
452	def SetGroupBy ( self, attribute, func, groupsort='@group desc' ):
453		"""
454		Set grouping attribute and function.
455		"""
456		assert(isinstance(attribute, str))
457		assert(func in [SPH_GROUPBY_DAY, SPH_GROUPBY_WEEK, SPH_GROUPBY_MONTH, SPH_GROUPBY_YEAR, SPH_GROUPBY_ATTR, SPH_GROUPBY_ATTRPAIR] )
458		assert(isinstance(groupsort, str))
459
460		self._groupby = attribute
461		self._groupfunc = func
462		self._groupsort = groupsort
463
464
465	def SetGroupDistinct (self, attribute):
466		assert(isinstance(attribute,str))
467		self._groupdistinct = attribute
468
469
470	def SetRetries (self, count, delay=0):
471		assert(isinstance(count,int) and count>=0)
472		assert(isinstance(delay,int) and delay>=0)
473		self._retrycount = count
474		self._retrydelay = delay
475
476
477	def SetOverride (self, name, type, values):
478		assert(isinstance(name, str))
479		assert(type in SPH_ATTR_TYPES)
480		assert(isinstance(values, dict))
481
482		self._overrides[name] = {'name': name, 'type': type, 'values': values}
483
484	def SetSelect (self, select):
485		assert(isinstance(select, str))
486		self._select = select
487
488
489	def ResetOverrides (self):
490		self._overrides = {}
491
492
493	def ResetFilters (self):
494		"""
495		Clear all filters (for multi-queries).
496		"""
497		self._filters = []
498		self._anchor = {}
499
500
501	def ResetGroupBy (self):
502		"""
503		Clear groupby settings (for multi-queries).
504		"""
505		self._groupby = ''
506		self._groupfunc = SPH_GROUPBY_DAY
507		self._groupsort = '@group desc'
508		self._groupdistinct = ''
509
510
511	def Query (self, query, index='*', comment=''):
512		"""
513		Connect to searchd server and run given search query.
514		Returns None on failure; result set hash on success (see documentation for details).
515		"""
516		assert(len(self._reqs)==0)
517		self.AddQuery(query,index,comment)
518		results = self.RunQueries()
519		self._reqs = [] # we won't re-run erroneous batch
520
521		if not results or len(results)==0:
522			return None
523		self._error = results[0]['error']
524		self._warning = results[0]['warning']
525		if results[0]['status'] == SEARCHD_ERROR:
526			return None
527		return results[0]
528
529
530	def AddQuery (self, query, index='*', comment=''):
531		"""
532		Add query to batch.
533		"""
534		# build request
535		req = []
536		req.append(pack('>4L', self._offset, self._limit, self._mode, self._ranker))
537		if self._ranker==SPH_RANK_EXPR:
538			req.append(pack('>L', len(self._rankexpr)))
539			req.append(self._rankexpr)
540		req.append(pack('>L', self._sort))
541		req.append(pack('>L', len(self._sortby)))
542		req.append(self._sortby)
543
544		if isinstance(query,unicode):
545			query = query.encode('utf-8')
546		assert(isinstance(query,str))
547
548		req.append(pack('>L', len(query)))
549		req.append(query)
550
551		req.append(pack('>L', len(self._weights)))
552		for w in self._weights:
553			req.append(pack('>L', w))
554		assert(isinstance(index,str))
555		req.append(pack('>L', len(index)))
556		req.append(index)
557		req.append(pack('>L',1)) # id64 range marker
558		req.append(pack('>Q', self._min_id))
559		req.append(pack('>Q', self._max_id))
560
561		# filters
562		req.append ( pack ( '>L', len(self._filters) ) )
563		for f in self._filters:
564			req.append ( pack ( '>L', len(f['attr'])) + f['attr'])
565			filtertype = f['type']
566			req.append ( pack ( '>L', filtertype))
567			if filtertype == SPH_FILTER_VALUES:
568				req.append ( pack ('>L', len(f['values'])))
569				for val in f['values']:
570					req.append ( pack ('>q', val))
571			elif filtertype == SPH_FILTER_RANGE:
572				req.append ( pack ('>2q', f['min'], f['max']))
573			elif filtertype == SPH_FILTER_FLOATRANGE:
574				req.append ( pack ('>2f', f['min'], f['max']))
575			req.append ( pack ( '>L', f['exclude'] ) )
576
577		# group-by, max-matches, group-sort
578		req.append ( pack ( '>2L', self._groupfunc, len(self._groupby) ) )
579		req.append ( self._groupby )
580		req.append ( pack ( '>2L', self._maxmatches, len(self._groupsort) ) )
581		req.append ( self._groupsort )
582		req.append ( pack ( '>LLL', self._cutoff, self._retrycount, self._retrydelay))
583		req.append ( pack ( '>L', len(self._groupdistinct)))
584		req.append ( self._groupdistinct)
585
586		# anchor point
587		if len(self._anchor) == 0:
588			req.append ( pack ('>L', 0))
589		else:
590			attrlat, attrlong = self._anchor['attrlat'], self._anchor['attrlong']
591			latitude, longitude = self._anchor['lat'], self._anchor['long']
592			req.append ( pack ('>L', 1))
593			req.append ( pack ('>L', len(attrlat)) + attrlat)
594			req.append ( pack ('>L', len(attrlong)) + attrlong)
595			req.append ( pack ('>f', latitude) + pack ('>f', longitude))
596
597		# per-index weights
598		req.append ( pack ('>L',len(self._indexweights)))
599		for indx,weight in self._indexweights.items():
600			req.append ( pack ('>L',len(indx)) + indx + pack ('>L',weight))
601
602		# max query time
603		req.append ( pack ('>L', self._maxquerytime) )
604
605		# per-field weights
606		req.append ( pack ('>L',len(self._fieldweights) ) )
607		for field,weight in self._fieldweights.items():
608			req.append ( pack ('>L',len(field)) + field + pack ('>L',weight) )
609
610		# comment
611		comment = str(comment)
612		req.append ( pack('>L',len(comment)) + comment )
613
614		# attribute overrides
615		req.append ( pack('>L', len(self._overrides)) )
616		for v in self._overrides.values():
617			req.extend ( ( pack('>L', len(v['name'])), v['name'] ) )
618			req.append ( pack('>LL', v['type'], len(v['values'])) )
619			for id, value in v['values'].iteritems():
620				req.append ( pack('>Q', id) )
621				if v['type'] == SPH_ATTR_FLOAT:
622					req.append ( pack('>f', value) )
623				elif v['type'] == SPH_ATTR_BIGINT:
624					req.append ( pack('>q', value) )
625				else:
626					req.append ( pack('>l', value) )
627
628		# select-list
629		req.append ( pack('>L', len(self._select)) )
630		req.append ( self._select )
631
632		# send query, get response
633		req = ''.join(req)
634
635		self._reqs.append(req)
636		return
637
638
639	def RunQueries (self):
640		"""
641		Run queries batch.
642		Returns None on network IO failure; or an array of result set hashes on success.
643		"""
644		if len(self._reqs)==0:
645			self._error = 'no queries defined, issue AddQuery() first'
646			return None
647
648		sock = self._Connect()
649		if not sock:
650			return None
651
652		req = ''.join(self._reqs)
653		length = len(req)+8
654		req = pack('>HHLLL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, length, 0, len(self._reqs))+req
655		self._Send ( sock, req )
656
657		response = self._GetResponse(sock, VER_COMMAND_SEARCH)
658		if not response:
659			return None
660
661		nreqs = len(self._reqs)
662
663		# parse response
664		max_ = len(response)
665		p = 0
666
667		results = []
668		for i in range(0,nreqs,1):
669			result = {}
670			results.append(result)
671
672			result['error'] = ''
673			result['warning'] = ''
674			status = unpack('>L', response[p:p+4])[0]
675			p += 4
676			result['status'] = status
677			if status != SEARCHD_OK:
678				length = unpack('>L', response[p:p+4])[0]
679				p += 4
680				message = response[p:p+length]
681				p += length
682
683				if status == SEARCHD_WARNING:
684					result['warning'] = message
685				else:
686					result['error'] = message
687					continue
688
689			# read schema
690			fields = []
691			attrs = []
692
693			nfields = unpack('>L', response[p:p+4])[0]
694			p += 4
695			while nfields>0 and p<max_:
696				nfields -= 1
697				length = unpack('>L', response[p:p+4])[0]
698				p += 4
699				fields.append(response[p:p+length])
700				p += length
701
702			result['fields'] = fields
703
704			nattrs = unpack('>L', response[p:p+4])[0]
705			p += 4
706			while nattrs>0 and p<max_:
707				nattrs -= 1
708				length = unpack('>L', response[p:p+4])[0]
709				p += 4
710				attr = response[p:p+length]
711				p += length
712				type_ = unpack('>L', response[p:p+4])[0]
713				p += 4
714				attrs.append([attr,type_])
715
716			result['attrs'] = attrs
717
718			# read match count
719			count = unpack('>L', response[p:p+4])[0]
720			p += 4
721			id64 = unpack('>L', response[p:p+4])[0]
722			p += 4
723
724			# read matches
725			result['matches'] = []
726			while count>0 and p<max_:
727				count -= 1
728				if id64:
729					doc, weight = unpack('>QL', response[p:p+12])
730					p += 12
731				else:
732					doc, weight = unpack('>2L', response[p:p+8])
733					p += 8
734
735				match = { 'id':doc, 'weight':weight, 'attrs':{} }
736				for i in range(len(attrs)):
737					if attrs[i][1] == SPH_ATTR_FLOAT:
738						match['attrs'][attrs[i][0]] = unpack('>f', response[p:p+4])[0]
739					elif attrs[i][1] == SPH_ATTR_BIGINT:
740						match['attrs'][attrs[i][0]] = unpack('>q', response[p:p+8])[0]
741						p += 4
742					elif attrs[i][1] == SPH_ATTR_STRING:
743						slen = unpack('>L', response[p:p+4])[0]
744						p += 4
745						match['attrs'][attrs[i][0]] = ''
746						if slen>0:
747							match['attrs'][attrs[i][0]] = response[p:p+slen]
748						p += slen-4
749					elif attrs[i][1] == SPH_ATTR_MULTI:
750						match['attrs'][attrs[i][0]] = []
751						nvals = unpack('>L', response[p:p+4])[0]
752						p += 4
753						for n in range(0,nvals,1):
754							match['attrs'][attrs[i][0]].append(unpack('>L', response[p:p+4])[0])
755							p += 4
756						p -= 4
757					elif attrs[i][1] == SPH_ATTR_MULTI64:
758						match['attrs'][attrs[i][0]] = []
759						nvals = unpack('>L', response[p:p+4])[0]
760						nvals = nvals/2
761						p += 4
762						for n in range(0,nvals,1):
763							match['attrs'][attrs[i][0]].append(unpack('>q', response[p:p+8])[0])
764							p += 8
765						p -= 4
766					else:
767						match['attrs'][attrs[i][0]] = unpack('>L', response[p:p+4])[0]
768					p += 4
769
770				result['matches'].append ( match )
771
772			result['total'], result['total_found'], result['time'], words = unpack('>4L', response[p:p+16])
773
774			result['time'] = '%.3f' % (result['time']/1000.0)
775			p += 16
776
777			result['words'] = []
778			while words>0:
779				words -= 1
780				length = unpack('>L', response[p:p+4])[0]
781				p += 4
782				word = response[p:p+length]
783				p += length
784				docs, hits = unpack('>2L', response[p:p+8])
785				p += 8
786
787				result['words'].append({'word':word, 'docs':docs, 'hits':hits})
788
789		self._reqs = []
790		return results
791
792
793	def BuildExcerpts (self, docs, index, words, opts=None):
794		"""
795		Connect to searchd server and generate exceprts from given documents.
796		"""
797		if not opts:
798			opts = {}
799		if isinstance(words,unicode):
800			words = words.encode('utf-8')
801
802		assert(isinstance(docs, list))
803		assert(isinstance(index, str))
804		assert(isinstance(words, str))
805		assert(isinstance(opts, dict))
806
807		sock = self._Connect()
808
809		if not sock:
810			return None
811
812		# fixup options
813		opts.setdefault('before_match', '<b>')
814		opts.setdefault('after_match', '</b>')
815		opts.setdefault('chunk_separator', ' ... ')
816		opts.setdefault('html_strip_mode', 'index')
817		opts.setdefault('limit', 256)
818		opts.setdefault('limit_passages', 0)
819		opts.setdefault('limit_words', 0)
820		opts.setdefault('around', 5)
821		opts.setdefault('start_passage_id', 1)
822		opts.setdefault('passage_boundary', 'none')
823
824		# build request
825		# v.1.0 req
826
827		flags = 1 # (remove spaces)
828		if opts.get('exact_phrase'):	flags |= 2
829		if opts.get('single_passage'):	flags |= 4
830		if opts.get('use_boundaries'):	flags |= 8
831		if opts.get('weight_order'):	flags |= 16
832		if opts.get('query_mode'):		flags |= 32
833		if opts.get('force_all_words'):	flags |= 64
834		if opts.get('load_files'):		flags |= 128
835		if opts.get('allow_empty'):		flags |= 256
836		if opts.get('emit_zones'):		flags |= 512
837		if opts.get('load_files_scattered'):	flags |= 1024
838
839		# mode=0, flags
840		req = [pack('>2L', 0, flags)]
841
842		# req index
843		req.append(pack('>L', len(index)))
844		req.append(index)
845
846		# req words
847		req.append(pack('>L', len(words)))
848		req.append(words)
849
850		# options
851		req.append(pack('>L', len(opts['before_match'])))
852		req.append(opts['before_match'])
853
854		req.append(pack('>L', len(opts['after_match'])))
855		req.append(opts['after_match'])
856
857		req.append(pack('>L', len(opts['chunk_separator'])))
858		req.append(opts['chunk_separator'])
859
860		req.append(pack('>L', int(opts['limit'])))
861		req.append(pack('>L', int(opts['around'])))
862
863		req.append(pack('>L', int(opts['limit_passages'])))
864		req.append(pack('>L', int(opts['limit_words'])))
865		req.append(pack('>L', int(opts['start_passage_id'])))
866		req.append(pack('>L', len(opts['html_strip_mode'])))
867		req.append((opts['html_strip_mode']))
868		req.append(pack('>L', len(opts['passage_boundary'])))
869		req.append((opts['passage_boundary']))
870
871		# documents
872		req.append(pack('>L', len(docs)))
873		for doc in docs:
874			if isinstance(doc,unicode):
875				doc = doc.encode('utf-8')
876			assert(isinstance(doc, str))
877			req.append(pack('>L', len(doc)))
878			req.append(doc)
879
880		req = ''.join(req)
881
882		# send query, get response
883		length = len(req)
884
885		# add header
886		req = pack('>2HL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, length)+req
887		self._Send ( sock, req )
888
889		response = self._GetResponse(sock, VER_COMMAND_EXCERPT )
890		if not response:
891			return []
892
893		# parse response
894		pos = 0
895		res = []
896		rlen = len(response)
897
898		for i in range(len(docs)):
899			length = unpack('>L', response[pos:pos+4])[0]
900			pos += 4
901
902			if pos+length > rlen:
903				self._error = 'incomplete reply'
904				return []
905
906			res.append(response[pos:pos+length])
907			pos += length
908
909		return res
910
911
912	def UpdateAttributes ( self, index, attrs, values, mva=False ):
913		"""
914		Update given attribute values on given documents in given indexes.
915		Returns amount of updated documents (0 or more) on success, or -1 on failure.
916
917		'attrs' must be a list of strings.
918		'values' must be a dict with int key (document ID) and list of int values (new attribute values).
919		optional boolean parameter 'mva' points that there is update of MVA attributes.
920		In this case the 'values' must be a dict with int key (document ID) and list of lists of int values
921		(new MVA attribute values).
922
923		Example:
924			res = cl.UpdateAttributes ( 'test1', [ 'group_id', 'date_added' ], { 2:[123,1000000000], 4:[456,1234567890] } )
925		"""
926		assert ( isinstance ( index, str ) )
927		assert ( isinstance ( attrs, list ) )
928		assert ( isinstance ( values, dict ) )
929		for attr in attrs:
930			assert ( isinstance ( attr, str ) )
931		for docid, entry in values.items():
932			AssertUInt32(docid)
933			assert ( isinstance ( entry, list ) )
934			assert ( len(attrs)==len(entry) )
935			for val in entry:
936				if mva:
937					assert ( isinstance ( val, list ) )
938					for vals in val:
939						AssertInt32(vals)
940				else:
941					AssertInt32(val)
942
943		# build request
944		req = [ pack('>L',len(index)), index ]
945
946		req.append ( pack('>L',len(attrs)) )
947		mva_attr = 0
948		if mva: mva_attr = 1
949		for attr in attrs:
950			req.append ( pack('>L',len(attr)) + attr )
951			req.append ( pack('>L', mva_attr ) )
952
953		req.append ( pack('>L',len(values)) )
954		for docid, entry in values.items():
955			req.append ( pack('>Q',docid) )
956			for val in entry:
957				val_len = val
958				if mva: val_len = len ( val )
959				req.append ( pack('>L',val_len ) )
960				if mva:
961					for vals in val:
962						req.append ( pack ('>L',vals) )
963
964		# connect, send query, get response
965		sock = self._Connect()
966		if not sock:
967			return None
968
969		req = ''.join(req)
970		length = len(req)
971		req = pack ( '>2HL', SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, length ) + req
972		self._Send ( sock, req )
973
974		response = self._GetResponse ( sock, VER_COMMAND_UPDATE )
975		if not response:
976			return -1
977
978		# parse response
979		updated = unpack ( '>L', response[0:4] )[0]
980		return updated
981
982
983	def BuildKeywords ( self, query, index, hits ):
984		"""
985		Connect to searchd server, and generate keywords list for a given query.
986		Returns None on failure, or a list of keywords on success.
987		"""
988		assert ( isinstance ( query, str ) )
989		assert ( isinstance ( index, str ) )
990		assert ( isinstance ( hits, int ) )
991
992		# build request
993		req = [ pack ( '>L', len(query) ) + query ]
994		req.append ( pack ( '>L', len(index) ) + index )
995		req.append ( pack ( '>L', hits ) )
996
997		# connect, send query, get response
998		sock = self._Connect()
999		if not sock:
1000			return None
1001
1002		req = ''.join(req)
1003		length = len(req)
1004		req = pack ( '>2HL', SEARCHD_COMMAND_KEYWORDS, VER_COMMAND_KEYWORDS, length ) + req
1005		self._Send ( sock, req )
1006
1007		response = self._GetResponse ( sock, VER_COMMAND_KEYWORDS )
1008		if not response:
1009			return None
1010
1011		# parse response
1012		res = []
1013
1014		nwords = unpack ( '>L', response[0:4] )[0]
1015		p = 4
1016		max_ = len(response)
1017
1018		while nwords>0 and p<max_:
1019			nwords -= 1
1020
1021			length = unpack ( '>L', response[p:p+4] )[0]
1022			p += 4
1023			tokenized = response[p:p+length]
1024			p += length
1025
1026			length = unpack ( '>L', response[p:p+4] )[0]
1027			p += 4
1028			normalized = response[p:p+length]
1029			p += length
1030
1031			entry = { 'tokenized':tokenized, 'normalized':normalized }
1032			if hits:
1033				entry['docs'], entry['hits'] = unpack ( '>2L', response[p:p+8] )
1034				p += 8
1035
1036			res.append ( entry )
1037
1038		if nwords>0 or p>max_:
1039			self._error = 'incomplete reply'
1040			return None
1041
1042		return res
1043
1044	def Status ( self ):
1045		"""
1046		Get the status
1047		"""
1048
1049		# connect, send query, get response
1050		sock = self._Connect()
1051		if not sock:
1052			return None
1053
1054		req = pack ( '>2HLL', SEARCHD_COMMAND_STATUS, VER_COMMAND_STATUS, 4, 1 )
1055		self._Send ( sock, req )
1056
1057		response = self._GetResponse ( sock, VER_COMMAND_STATUS )
1058		if not response:
1059			return None
1060
1061		# parse response
1062		res = []
1063
1064		p = 8
1065		max_ = len(response)
1066
1067		while p<max_:
1068			length = unpack ( '>L', response[p:p+4] )[0]
1069			k = response[p+4:p+length+4]
1070			p += 4+length
1071			length = unpack ( '>L', response[p:p+4] )[0]
1072			v = response[p+4:p+length+4]
1073			p += 4+length
1074			res += [[k, v]]
1075
1076		return res
1077
1078	### persistent connections
1079
1080	def Open(self):
1081		if self._socket:
1082			self._error = 'already connected'
1083			return None
1084
1085		server = self._Connect()
1086		if not server:
1087			return None
1088
1089		# command, command version = 0, body length = 4, body = 1
1090		request = pack ( '>hhII', SEARCHD_COMMAND_PERSIST, 0, 4, 1 )
1091		self._Send ( server, request )
1092
1093		self._socket = server
1094		return True
1095
1096	def Close(self):
1097		if not self._socket:
1098			self._error = 'not connected'
1099			return
1100		self._socket.close()
1101		self._socket = None
1102
1103	def EscapeString(self, string):
1104		return re.sub(r"([=\(\)|\-!@~\"&/\\\^\$\=])", r"\\\1", string)
1105
1106
1107	def FlushAttributes(self):
1108		sock = self._Connect()
1109		if not sock:
1110			return -1
1111
1112		request = pack ( '>hhI', SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, 0 ) # cmd, ver, bodylen
1113		self._Send ( sock, request )
1114
1115		response = self._GetResponse ( sock, VER_COMMAND_FLUSHATTRS )
1116		if not response or len(response)!=4:
1117			self._error = 'unexpected response length'
1118			return -1
1119
1120		tag = unpack ( '>L', response[0:4] )[0]
1121		return tag
1122
1123def AssertInt32 ( value ):
1124	assert(isinstance(value, (int, long)))
1125	assert(value>=-2**32-1 and value<=2**32-1)
1126
1127def AssertUInt32 ( value ):
1128	assert(isinstance(value, (int, long)))
1129	assert(value>=0 and value<=2**32-1)
1130
1131#
1132# $Id: sphinxapi.py 3436 2012-10-08 09:17:18Z kevg $
1133#
1134