1 //
2 // $Id$
3 //
4 
5 //
6 // Copyright (c) 2001-2016, Andrew Aksyonoff
7 // Copyright (c) 2008-2016, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15 
16 #ifdef USE_PRAGMA_IMPLEMENTATION
17 #pragma implementation // gcc: Class implementation
18 #endif
19 
20 #if _MSC_VER>=1400
21 #define _CRT_SECURE_NO_DEPRECATE 1
22 #define _CRT_NONSTDC_NO_DEPRECATE 1
23 #endif
24 
25 #include <mysql_version.h>
26 
27 #if MYSQL_VERSION_ID>=50515
28 #include "sql_class.h"
29 #include "sql_array.h"
30 #elif MYSQL_VERSION_ID>50100
31 #include "mysql_priv.h"
32 #include <mysql/plugin.h>
33 #else
34 #include "../mysql_priv.h"
35 #endif
36 
37 #include <mysys_err.h>
38 #include <my_sys.h>
39 #include <mysql.h> // include client for INSERT table (sort of redoing federated..)
40 
41 #ifndef __WIN__
42 	// UNIX-specific
43 	#include <my_net.h>
44 	#include <netdb.h>
45 	#include <sys/un.h>
46 
47 	#define	RECV_FLAGS	MSG_WAITALL
48 
49 	#define sphSockClose(_sock)	::close(_sock)
50 #else
51 	// Windows-specific
52 	#include <io.h>
53 	#define strcasecmp	stricmp
54 	#define snprintf	_snprintf
55 
56 	#define	RECV_FLAGS	0
57 
58 	#define sphSockClose(_sock)	::closesocket(_sock)
59 #endif
60 
61 #include <ctype.h>
62 #include "ha_sphinx.h"
63 
64 #ifndef MSG_WAITALL
65 #define MSG_WAITALL 0
66 #endif
67 
68 #if _MSC_VER>=1400
69 #pragma warning(push,4)
70 #endif
71 
72 /////////////////////////////////////////////////////////////////////////////
73 
74 /// there might be issues with min() on different platforms (eg. Gentoo, they say)
75 #define Min(a,b) ((a)<(b)?(a):(b))
76 
77 /// unaligned RAM accesses are forbidden on SPARC
78 #if defined(sparc) || defined(__sparc__)
79 #define UNALIGNED_RAM_ACCESS 0
80 #else
81 #define UNALIGNED_RAM_ACCESS 1
82 #endif
83 
84 
85 #if UNALIGNED_RAM_ACCESS
86 
87 /// pass-through wrapper
sphUnalignedRead(const T & tRef)88 template < typename T > inline T sphUnalignedRead ( const T & tRef )
89 {
90 	return tRef;
91 }
92 
93 /// pass-through wrapper
sphUnalignedWrite(void * pPtr,const T & tVal)94 template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
95 {
96 	*(T*)pPtr = tVal;
97 }
98 
99 #else
100 
101 /// unaligned read wrapper for some architectures (eg. SPARC)
102 template < typename T >
sphUnalignedRead(const T & tRef)103 inline T sphUnalignedRead ( const T & tRef )
104 {
105 	T uTmp;
106 	byte * pSrc = (byte *) &tRef;
107 	byte * pDst = (byte *) &uTmp;
108 	for ( int i=0; i<(int)sizeof(T); i++ )
109 		*pDst++ = *pSrc++;
110 	return uTmp;
111 }
112 
113 /// unaligned write wrapper for some architectures (eg. SPARC)
114 template < typename T >
sphUnalignedWrite(void * pPtr,const T & tVal)115 void sphUnalignedWrite ( void * pPtr, const T & tVal )
116 {
117 	byte * pDst = (byte *) pPtr;
118 	byte * pSrc = (byte *) &tVal;
119 	for ( int i=0; i<(int)sizeof(T); i++ )
120 		*pDst++ = *pSrc++;
121 }
122 
123 #endif
124 
125 #if MYSQL_VERSION_ID>=50515
126 
127 #define sphinx_hash_init my_hash_init
128 #define sphinx_hash_free my_hash_free
129 #define sphinx_hash_search my_hash_search
130 #define sphinx_hash_delete my_hash_delete
131 
132 #else
133 
134 #define sphinx_hash_init hash_init
135 #define sphinx_hash_free hash_free
136 #define sphinx_hash_search hash_search
137 #define sphinx_hash_delete hash_delete
138 
139 #endif
140 
141 /////////////////////////////////////////////////////////////////////////////
142 
143 // FIXME! make this all dynamic
144 #define SPHINXSE_MAX_FILTERS		32
145 
146 #define SPHINXAPI_DEFAULT_HOST		"127.0.0.1"
147 #define SPHINXAPI_DEFAULT_PORT		9312
148 #define SPHINXAPI_DEFAULT_INDEX		"*"
149 
150 #define SPHINXQL_DEFAULT_PORT		9306
151 
152 #define SPHINXSE_SYSTEM_COLUMNS		3
153 
154 #define SPHINXSE_MAX_ALLOC			(16*1024*1024)
155 #define SPHINXSE_MAX_KEYWORDSTATS	4096
156 
157 #define SPHINXSE_VERSION			"2.2.11-release"
158 
159 // FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
160 // cut-n-paste is somewhat simpler that adding dependencies however..
161 
162 enum
163 {
164 	SPHINX_SEARCHD_PROTO	= 1,
165 	SEARCHD_COMMAND_SEARCH	= 0,
166 	VER_COMMAND_SEARCH		= 0x119,
167 };
168 
169 /// search query sorting orders
170 enum ESphSortOrder
171 {
172 	SPH_SORT_RELEVANCE		= 0,	///< sort by document relevance desc, then by date
173 	SPH_SORT_ATTR_DESC		= 1,	///< sort by document date desc, then by relevance desc
174 	SPH_SORT_ATTR_ASC		= 2,	///< sort by document date asc, then by relevance desc
175 	SPH_SORT_TIME_SEGMENTS	= 3,	///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
176 	SPH_SORT_EXTENDED		= 4,	///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
177 	SPH_SORT_EXPR			= 5,	///< sort by expression
178 
179 	SPH_SORT_TOTAL
180 };
181 
182 /// search query matching mode
183 enum ESphMatchMode
184 {
185 	SPH_MATCH_ALL = 0,			///< match all query words
186 	SPH_MATCH_ANY,				///< match any query word
187 	SPH_MATCH_PHRASE,			///< match this exact phrase
188 	SPH_MATCH_BOOLEAN,			///< match this boolean query
189 	SPH_MATCH_EXTENDED,			///< match this extended query
190 	SPH_MATCH_FULLSCAN,			///< match all document IDs w/o fulltext query, apply filters
191 	SPH_MATCH_EXTENDED2,		///< extended engine V2
192 
193 	SPH_MATCH_TOTAL
194 };
195 
196 /// search query relevance ranking mode
197 enum ESphRankMode
198 {
199 	SPH_RANK_PROXIMITY_BM25		= 0,	///< default mode, phrase proximity major factor and BM25 minor one
200 	SPH_RANK_BM25				= 1,	///< statistical mode, BM25 ranking only (faster but worse quality)
201 	SPH_RANK_NONE				= 2,	///< no ranking, all matches get a weight of 1
202 	SPH_RANK_WORDCOUNT			= 3,	///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
203 	SPH_RANK_PROXIMITY			= 4,	///< phrase proximity
204 	SPH_RANK_MATCHANY			= 5,	///< emulate old match-any weighting
205 	SPH_RANK_FIELDMASK			= 6,	///< sets bits where there were matches
206 	SPH_RANK_SPH04				= 7,	///< codename SPH04, phrase proximity + bm25 + head/exact boost
207 	SPH_RANK_EXPR				= 8,	///< expression based ranker
208 
209 	SPH_RANK_TOTAL,
210 	SPH_RANK_DEFAULT			= SPH_RANK_PROXIMITY_BM25
211 };
212 
213 /// search query grouping mode
214 enum ESphGroupBy
215 {
216 	SPH_GROUPBY_DAY		= 0,	///< group by day
217 	SPH_GROUPBY_WEEK	= 1,	///< group by week
218 	SPH_GROUPBY_MONTH	= 2,	///< group by month
219 	SPH_GROUPBY_YEAR	= 3,	///< group by year
220 	SPH_GROUPBY_ATTR	= 4		///< group by attribute value
221 };
222 
223 /// known attribute types
224 enum
225 {
226 	SPH_ATTR_NONE		= 0,			///< not an attribute at all
227 	SPH_ATTR_INTEGER	= 1,			///< this attr is just an integer
228 	SPH_ATTR_TIMESTAMP	= 2,			///< this attr is a timestamp
229 	SPH_ATTR_ORDINAL	= 3,			///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
230 	SPH_ATTR_BOOL		= 4,			///< this attr is a boolean bit field
231 	SPH_ATTR_FLOAT		= 5,
232 	SPH_ATTR_BIGINT		= 6,
233 	SPH_ATTR_STRING		= 7,			///< string (binary; in-memory)
234 
235 	SPH_ATTR_UINT32SET		= 0x40000001UL,	///< this attr is multiple int32 values (0 or more)
236 	SPH_ATTR_UINT64SET		= 0x40000002UL	///< this attr is multiple int64 values (0 or more)
237 };
238 
239 /// known answers
240 enum
241 {
242 	SEARCHD_OK		= 0,	///< general success, command-specific reply follows
243 	SEARCHD_ERROR	= 1,	///< general failure, error message follows
244 	SEARCHD_RETRY	= 2,	///< temporary failure, error message follows, client should retry later
245 	SEARCHD_WARNING	= 3		///< general success, warning message and command-specific reply follow
246 };
247 
248 //////////////////////////////////////////////////////////////////////////////
249 
250 #define SPHINX_DEBUG_OUTPUT		0
251 #define SPHINX_DEBUG_CALLS		0
252 
253 #include <stdarg.h>
254 
255 #if SPHINX_DEBUG_OUTPUT
SPH_DEBUG(const char * format,...)256 inline void SPH_DEBUG ( const char * format, ... )
257 {
258 	va_list ap;
259 	va_start ( ap, format );
260 	fprintf ( stderr, "SphinxSE: " );
261 	vfprintf ( stderr, format, ap );
262 	fprintf ( stderr, "\n" );
263 	va_end ( ap );
264 }
265 #else
SPH_DEBUG(const char *,...)266 inline void SPH_DEBUG ( const char *, ... ) {}
267 #endif
268 
269 #if SPHINX_DEBUG_CALLS
270 
271 #define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
272 #define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
273 #define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
274 #define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
275 
276 #else
277 
278 #define SPH_ENTER_FUNC()
279 #define SPH_ENTER_METHOD()
280 #define SPH_RET(_arg) { return(_arg); }
281 #define SPH_VOID_RET() { return; }
282 
283 #endif
284 
285 
286 #define SafeDelete(_arg)		{ if ( _arg ) delete ( _arg );		(_arg) = NULL; }
287 #define SafeDeleteArray(_arg)	{ if ( _arg ) delete [] ( _arg );	(_arg) = NULL; }
288 
289 //////////////////////////////////////////////////////////////////////////////
290 
291 /// per-table structure that will be shared among all open Sphinx SE handlers
292 struct CSphSEShare
293 {
294 	pthread_mutex_t	m_tMutex;
295 	THR_LOCK		m_tLock;
296 
297 	char *			m_sTable;
298 	char *			m_sScheme;		///< our connection string
299 	char *			m_sHost;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
300 	char *			m_sSocket;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
301 	char *			m_sIndex;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
302 	ushort			m_iPort;
303 	bool			m_bSphinxQL;	///< is this read-only SphinxAPI table, or write-only SphinxQL table?
304 	uint			m_iTableNameLen;
305 	uint			m_iUseCount;
306 #if MYSQL_VERSION_ID<50610
307 	CHARSET_INFO *	m_pTableQueryCharset;
308 #else
309 	const CHARSET_INFO *	m_pTableQueryCharset;
310 #endif
311 
312 	int					m_iTableFields;
313 	char **				m_sTableField;
314 	enum_field_types *	m_eTableFieldType;
315 
CSphSEShareCSphSEShare316 	CSphSEShare ()
317 		: m_sTable ( NULL )
318 		, m_sScheme ( NULL )
319 		, m_sHost ( NULL )
320 		, m_sSocket ( NULL )
321 		, m_sIndex ( NULL )
322 		, m_iPort ( 0 )
323 		, m_bSphinxQL ( false )
324 		, m_iTableNameLen ( 0 )
325 		, m_iUseCount ( 1 )
326 		, m_pTableQueryCharset ( NULL )
327 
328 		, m_iTableFields ( 0 )
329 		, m_sTableField ( NULL )
330 		, m_eTableFieldType ( NULL )
331 	{
332 		thr_lock_init ( &m_tLock );
333 		pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
334 	}
335 
~CSphSEShareCSphSEShare336 	~CSphSEShare ()
337 	{
338 		pthread_mutex_destroy ( &m_tMutex );
339 		thr_lock_delete ( &m_tLock );
340 
341 		SafeDeleteArray ( m_sTable );
342 		SafeDeleteArray ( m_sScheme );
343 		ResetTable ();
344 	}
345 
ResetTableCSphSEShare346 	void ResetTable ()
347 	{
348 		for ( int i=0; i<m_iTableFields; i++ )
349 			SafeDeleteArray ( m_sTableField[i] );
350 		SafeDeleteArray ( m_sTableField );
351 		SafeDeleteArray ( m_eTableFieldType );
352 	}
353 };
354 
355 /// schema attribute
356 struct CSphSEAttr
357 {
358 	char *			m_sName;		///< attribute name (received from Sphinx)
359 	uint32			m_uType;		///< attribute type (received from Sphinx)
360 	int				m_iField;		///< field index in current table (-1 if none)
361 
CSphSEAttrCSphSEAttr362 	CSphSEAttr()
363 		: m_sName ( NULL )
364 		, m_uType ( SPH_ATTR_NONE )
365 		, m_iField ( -1 )
366 	{}
367 
~CSphSEAttrCSphSEAttr368 	~CSphSEAttr ()
369 	{
370 		SafeDeleteArray ( m_sName );
371 	}
372 };
373 
374 /// word stats
375 struct CSphSEWordStats
376 {
377 	char *			m_sWord;
378 	int				m_iDocs;
379 	int				m_iHits;
380 
CSphSEWordStatsCSphSEWordStats381 	CSphSEWordStats ()
382 		: m_sWord ( NULL )
383 		, m_iDocs ( 0 )
384 		, m_iHits ( 0 )
385 	{}
386 
~CSphSEWordStatsCSphSEWordStats387 	~CSphSEWordStats ()
388 	{
389 		SafeDeleteArray ( m_sWord );
390 	}
391 };
392 
393 /// request stats
394 struct CSphSEStats
395 {
396 public:
397 	int					m_iMatchesTotal;
398 	int					m_iMatchesFound;
399 	int					m_iQueryMsec;
400 	int					m_iWords;
401 	CSphSEWordStats *	m_dWords;
402 	bool				m_bLastError;
403 	char				m_sLastMessage[1024];
404 
CSphSEStatsCSphSEStats405 	CSphSEStats()
406 		: m_dWords ( NULL )
407 	{
408 		Reset ();
409 	}
410 
ResetCSphSEStats411 	void Reset ()
412 	{
413 		m_iMatchesTotal = 0;
414 		m_iMatchesFound = 0;
415 		m_iQueryMsec = 0;
416 		m_iWords = 0;
417 		SafeDeleteArray ( m_dWords );
418 		m_bLastError = false;
419 		m_sLastMessage[0] = '\0';
420 	}
421 
~CSphSEStatsCSphSEStats422 	~CSphSEStats()
423 	{
424 		Reset ();
425 	}
426 };
427 
428 /// thread local storage
429 struct CSphSEThreadTable
430 {
431 	static const int	MAX_QUERY_LEN	= 262144; // 256k should be enough, right?
432 
433 	bool				m_bStats;
434 	CSphSEStats			m_tStats;
435 
436 	bool				m_bQuery;
437 	char				m_sQuery[MAX_QUERY_LEN];
438 
439 #if MYSQL_VERSION_ID<50610
440 	CHARSET_INFO *		m_pQueryCharset;
441 #else
442 	const CHARSET_INFO *		m_pQueryCharset;
443 #endif
444 
445 	bool				m_bReplace;		///< are we doing an INSERT or REPLACE
446 
447 	bool				m_bCondId;		///< got a value from condition pushdown
448 	longlong			m_iCondId;		///< value acquired from id=value condition pushdown
449 	bool				m_bCondDone;	///< index_read() is now over
450 
451 	const ha_sphinx *	m_pHandler;
452 	CSphSEThreadTable *	m_pTableNext;
453 
CSphSEThreadTableCSphSEThreadTable454 	CSphSEThreadTable ( const ha_sphinx * pHandler )
455 		: m_bStats ( false )
456 		, m_bQuery ( false )
457 		, m_pQueryCharset ( NULL )
458 		, m_bReplace ( false )
459 		, m_bCondId ( false )
460 		, m_iCondId ( 0 )
461 		, m_bCondDone ( false )
462 		, m_pHandler ( pHandler )
463 		, m_pTableNext ( NULL )
464 	{}
465 };
466 
467 
468 struct CSphTLS
469 {
470 	CSphSEThreadTable *	m_pHeadTable;
471 
CSphTLSCSphTLS472 	explicit CSphTLS ( const ha_sphinx * pHandler )
473 	{
474 		m_pHeadTable = new CSphSEThreadTable ( pHandler );
475 	}
476 
~CSphTLSCSphTLS477 	~CSphTLS()
478 	{
479 		CSphSEThreadTable * pCur = m_pHeadTable;
480 		while ( pCur )
481 		{
482 			CSphSEThreadTable * pNext = pCur->m_pTableNext;
483 			SafeDelete ( pCur );
484 			pCur = pNext;
485 		}
486 	}
487 };
488 
489 
490 /// filter types
491 enum ESphFilter
492 {
493 	SPH_FILTER_VALUES		= 0,	///< filter by integer values set
494 	SPH_FILTER_RANGE		= 1,	///< filter by integer range
495 	SPH_FILTER_FLOATRANGE	= 2		///< filter by float range
496 };
497 
498 
499 /// search query filter
500 struct CSphSEFilter
501 {
502 public:
503 	ESphFilter		m_eType;
504 	char *			m_sAttrName;
505 	longlong		m_uMinValue;
506 	longlong		m_uMaxValue;
507 	float			m_fMinValue;
508 	float			m_fMaxValue;
509 	int				m_iValues;
510 	longlong *		m_pValues;
511 	int				m_bExclude;
512 
513 public:
CSphSEFilterCSphSEFilter514 	CSphSEFilter ()
515 		: m_eType ( SPH_FILTER_VALUES )
516 		, m_sAttrName ( NULL )
517 		, m_uMinValue ( 0 )
518 		, m_uMaxValue ( UINT_MAX )
519 		, m_fMinValue ( 0.0f )
520 		, m_fMaxValue ( 0.0f )
521 		, m_iValues ( 0 )
522 		, m_pValues ( NULL )
523 		, m_bExclude ( 0 )
524 	{
525 	}
526 
~CSphSEFilterCSphSEFilter527 	~CSphSEFilter ()
528 	{
529 		SafeDeleteArray ( m_pValues );
530 	}
531 };
532 
533 
534 /// float vs dword conversion
sphF2DW(float f)535 inline uint32 sphF2DW ( float f )	{ union { float f; uint32 d; } u; u.f = f; return u.d; }
536 
537 /// dword vs float conversion
sphDW2F(uint32 d)538 inline float sphDW2F ( uint32 d )	{ union { float f; uint32 d; } u; u.d = d; return u.f; }
539 
540 
541 /// client-side search query
542 struct CSphSEQuery
543 {
544 public:
545 	const char *	m_sHost;
546 	int				m_iPort;
547 
548 private:
549 	char *			m_sQueryBuffer;
550 
551 	const char *	m_sIndex;
552 	int				m_iOffset;
553 	int				m_iLimit;
554 
555 	bool			m_bQuery;
556 	char *			m_sQuery;
557 	uint32 *		m_pWeights;
558 	int				m_iWeights;
559 	ESphMatchMode	m_eMode;
560 	ESphRankMode	m_eRanker;
561 	char *			m_sRankExpr;
562 	ESphSortOrder	m_eSort;
563 	char *			m_sSortBy;
564 	int				m_iMaxMatches;
565 	int				m_iMaxQueryTime;
566 	uint32			m_iMinID;
567 	uint32			m_iMaxID;
568 
569 	int				m_iFilters;
570 	CSphSEFilter	m_dFilters[SPHINXSE_MAX_FILTERS];
571 
572 	ESphGroupBy		m_eGroupFunc;
573 	char *			m_sGroupBy;
574 	char *			m_sGroupSortBy;
575 	int				m_iCutoff;
576 	int				m_iRetryCount;
577 	int				m_iRetryDelay;
578 	char *			m_sGroupDistinct;							///< points to query buffer; do NOT delete
579 	int				m_iIndexWeights;
580 	char *			m_sIndexWeight[SPHINXSE_MAX_FILTERS];		///< points to query buffer; do NOT delete
581 	int				m_iIndexWeight[SPHINXSE_MAX_FILTERS];
582 	int				m_iFieldWeights;
583 	char *			m_sFieldWeight[SPHINXSE_MAX_FILTERS];		///< points to query buffer; do NOT delete
584 	int				m_iFieldWeight[SPHINXSE_MAX_FILTERS];
585 
586 	bool			m_bGeoAnchor;
587 	char *			m_sGeoLatAttr;
588 	char *			m_sGeoLongAttr;
589 	float			m_fGeoLatitude;
590 	float			m_fGeoLongitude;
591 
592 	char *			m_sComment;
593 	char *			m_sSelect;
594 
595 	struct Override_t
596 	{
597 		union Value_t
598 		{
599 			uint32		m_uValue;
600 			longlong	m_iValue64;
601 			float		m_fValue;
602 		};
603 		char *						m_sName; ///< points to query buffer
604 		int							m_iType;
605 		Dynamic_array<ulonglong>	m_dIds;
606 		Dynamic_array<Value_t>		m_dValues;
607 	};
608 	Dynamic_array<Override_t *> m_dOverrides;
609 
610 public:
611 	char			m_sParseError[256];
612 
613 public:
614 	CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
615 	~CSphSEQuery ();
616 
617 	bool			Parse ();
618 	int				BuildRequest ( char ** ppBuffer );
619 
620 protected:
621 	char *			m_pBuf;
622 	char *			m_pCur;
623 	int				m_iBufLeft;
624 	bool			m_bBufOverrun;
625 
626 	template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
627 	bool			ParseField ( char * sField );
628 
629 	void			SendBytes ( const void * pBytes, int iBytes );
SendWordCSphSEQuery630 	void			SendWord ( short int v )		{ v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
SendIntCSphSEQuery631 	void			SendInt ( int v )				{ v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
SendDwordCSphSEQuery632 	void			SendDword ( uint v )			{ v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
SendUint64CSphSEQuery633 	void			SendUint64 ( ulonglong v )		{ SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
SendStringCSphSEQuery634 	void			SendString ( const char * v )	{ int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
SendFloatCSphSEQuery635 	void			SendFloat ( float v )			{ SendDword ( sphF2DW(v) ); }
636 };
637 
638 template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
639 template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
640 
641 //////////////////////////////////////////////////////////////////////////////
642 
643 #if MYSQL_VERSION_ID>50100
644 
645 #if MYSQL_VERSION_ID<50114
646 #error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
647 #endif
648 
649 static handler *	sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
650 static int			sphinx_init_func ( void * p );
651 static int			sphinx_close_connection ( handlerton * hton, THD * thd );
652 static int			sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
653 static bool			sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
654 
655 #else
656 
657 static bool			sphinx_init_func_for_handlerton ();
658 static int			sphinx_close_connection ( THD * thd );
659 bool				sphinx_show_status ( THD * thd );
660 
661 #endif // >50100
662 
663 //////////////////////////////////////////////////////////////////////////////
664 
665 static const char	sphinx_hton_name[]		= "SPHINX";
666 static const char	sphinx_hton_comment[]	= "Sphinx storage engine " SPHINXSE_VERSION;
667 
668 #if MYSQL_VERSION_ID<50100
669 handlerton sphinx_hton =
670 {
671 	#ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
672 	MYSQL_HANDLERTON_INTERFACE_VERSION,
673 	#endif
674 	sphinx_hton_name,
675 	SHOW_OPTION_YES,
676 	sphinx_hton_comment,
677 	DB_TYPE_SPHINX_DB,
678 	sphinx_init_func_for_handlerton,
679 	0,							// slot
680 	0,							// savepoint size
681 	sphinx_close_connection,	// close_connection
682 	NULL,	// savepoint
683 	NULL,	// rollback to savepoint
684 	NULL,	// release savepoint
685 	NULL,	// commit
686 	NULL,	// rollback
687 	NULL,	// prepare
688 	NULL,	// recover
689 	NULL,	// commit_by_xid
690 	NULL,	// rollback_by_xid
691 	NULL,	// create_cursor_read_view
692 	NULL,	// set_cursor_read_view
693 	NULL,	// close_cursor_read_view
694 	HTON_CAN_RECREATE
695 };
696 #else
697 static handlerton * sphinx_hton_ptr = NULL;
698 #endif
699 
700 //////////////////////////////////////////////////////////////////////////////
701 
702 // variables for Sphinx shared methods
703 pthread_mutex_t		sphinx_mutex;		// mutex to init the hash
704 static int			sphinx_init = 0;	// flag whether the hash was initialized
705 static HASH			sphinx_open_tables;	// hash used to track open tables
706 
707 //////////////////////////////////////////////////////////////////////////////
708 // INITIALIZATION AND SHUTDOWN
709 //////////////////////////////////////////////////////////////////////////////
710 
711 // hashing function
712 #if MYSQL_VERSION_ID>=50120
713 typedef size_t GetKeyLength_t;
714 #else
715 typedef uint GetKeyLength_t;
716 #endif
717 
sphinx_get_key(const byte * pSharePtr,GetKeyLength_t * pLength,my_bool)718 static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
719 {
720 	CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
721 	*pLength = (size_t) pShare->m_iTableNameLen;
722 	return (byte*) pShare->m_sTable;
723 }
724 
725 #if MYSQL_VERSION_ID<50100
sphinx_init_func(void *)726 static int sphinx_init_func ( void * ) // to avoid unused arg warning
727 #else
728 static int sphinx_init_func ( void * p )
729 #endif
730 {
731 	SPH_ENTER_FUNC();
732 	if ( !sphinx_init )
733 	{
734 		sphinx_init = 1;
735 		void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
736 		sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
737 			sphinx_get_key, 0, 0 );
738 
739 		#if MYSQL_VERSION_ID > 50100
740 		handlerton * hton = (handlerton*) p;
741 		hton->state = SHOW_OPTION_YES;
742 		hton->db_type = DB_TYPE_FIRST_DYNAMIC;
743 		hton->create = sphinx_create_handler;
744 		hton->close_connection = sphinx_close_connection;
745 		hton->show_status = sphinx_show_status;
746 		hton->panic = sphinx_panic;
747 		hton->flags = HTON_CAN_RECREATE;
748 		#endif
749 	}
750 	SPH_RET(0);
751 }
752 
753 
754 #if MYSQL_VERSION_ID<50100
sphinx_init_func_for_handlerton()755 static bool sphinx_init_func_for_handlerton ()
756 {
757 	return sphinx_init_func ( &sphinx_hton );
758 }
759 #endif
760 
761 
762 #if MYSQL_VERSION_ID>50100
763 
sphinx_close_connection(handlerton * hton,THD * thd)764 static int sphinx_close_connection ( handlerton * hton, THD * thd )
765 {
766 	// deallocate common handler data
767 	SPH_ENTER_FUNC();
768 	void ** tmp = thd_ha_data ( thd, hton );
769 	CSphTLS * pTls = (CSphTLS *) (*tmp);
770 	SafeDelete ( pTls );
771 	*tmp = NULL;
772 	SPH_RET(0);
773 }
774 
775 
sphinx_done_func(void *)776 static int sphinx_done_func ( void * )
777 {
778 	SPH_ENTER_FUNC();
779 
780 	int error = 0;
781 	if ( sphinx_init )
782 	{
783 		sphinx_init = 0;
784 		if ( sphinx_open_tables.records )
785 			error = 1;
786 		sphinx_hash_free ( &sphinx_open_tables );
787 		pthread_mutex_destroy ( &sphinx_mutex );
788 	}
789 
790 	SPH_RET(0);
791 }
792 
793 
sphinx_panic(handlerton * hton,enum ha_panic_function)794 static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
795 {
796 	return sphinx_done_func ( hton );
797 }
798 
799 #else
800 
sphinx_close_connection(THD * thd)801 static int sphinx_close_connection ( THD * thd )
802 {
803 	// deallocate common handler data
804 	SPH_ENTER_FUNC();
805 	CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
806 	SafeDelete ( pTls );
807 	thd->ha_data[sphinx_hton.slot] = NULL;
808 	SPH_RET(0);
809 }
810 
811 #endif // >50100
812 
813 //////////////////////////////////////////////////////////////////////////////
814 // SHOW STATUS
815 //////////////////////////////////////////////////////////////////////////////
816 
817 #if MYSQL_VERSION_ID>50100
sphinx_show_status(handlerton * hton,THD * thd,stat_print_fn * stat_print,enum ha_stat_type)818 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
819 	enum ha_stat_type )
820 #else
821 bool sphinx_show_status ( THD * thd )
822 #endif
823 {
824 	SPH_ENTER_FUNC();
825 
826 #if MYSQL_VERSION_ID<50100
827 	Protocol * protocol = thd->protocol;
828 	List<Item> field_list;
829 #endif
830 
831 	char buf1[IO_SIZE];
832 	uint buf1len;
833 	char buf2[IO_SIZE];
834 	uint buf2len = 0;
835 	String words;
836 
837 	buf1[0] = '\0';
838 	buf2[0] = '\0';
839 
840 
841 #if MYSQL_VERSION_ID>50100
842 	// 5.1.x style stats
843 	CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
844 
845 #define LOC_STATS(_key,_keylen,_val,_vallen) \
846 	stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
847 
848 #else
849 	// 5.0.x style stats
850 	if ( have_sphinx_db!=SHOW_OPTION_YES )
851 	{
852 		my_message ( ER_NOT_SUPPORTED_YET,
853 			"failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
854 			MYF(0) );
855 		SPH_RET(TRUE);
856 	}
857 	CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
858 
859 	field_list.push_back ( new Item_empty_string ( "Type", 10 ) );
860 	field_list.push_back ( new Item_empty_string ( "Name", FN_REFLEN ) );
861 	field_list.push_back ( new Item_empty_string ( "Status", 10 ) );
862 	if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
863 		SPH_RET(TRUE);
864 
865 #define LOC_STATS(_key,_keylen,_val,_vallen) \
866 	protocol->prepare_for_resend (); \
867 	protocol->store ( "SPHINX", 6, system_charset_info ); \
868 	protocol->store ( _key, _keylen, system_charset_info ); \
869 	protocol->store ( _val, _vallen, system_charset_info ); \
870 	if ( protocol->write() ) \
871 		SPH_RET(TRUE);
872 
873 #endif
874 
875 
876 	// show query stats
877 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
878 	{
879 		const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
880 		buf1len = my_snprintf ( buf1, sizeof(buf1),
881 			"total: %d, total found: %d, time: %d, words: %d",
882 			pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
883 
884 		LOC_STATS ( "stats", 5, buf1, buf1len );
885 
886 		if ( pStats->m_iWords )
887 		{
888 			for ( int i=0; i<pStats->m_iWords; i++ )
889 			{
890 				CSphSEWordStats & tWord = pStats->m_dWords[i];
891 				buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
892 					buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
893 			}
894 
895 			// convert it if we can
896 			const char * sWord = buf2;
897 			int iWord = buf2len;
898 
899 			String sBuf3;
900 			if ( pTls->m_pHeadTable->m_pQueryCharset )
901 			{
902 				uint iErrors;
903 				sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
904 				sWord = sBuf3.c_ptr();
905 				iWord = sBuf3.length();
906 			}
907 
908 			LOC_STATS ( "words", 5, sWord, iWord );
909 		}
910 	}
911 
912 	// show last error or warning (either in addition to stats, or on their own)
913 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
914 	{
915 		const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
916 
917 		LOC_STATS (
918 			sMessageType, strlen ( sMessageType ),
919 			pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
920 
921 	} else
922 	{
923 		// well, nothing to show just yet
924 #if MYSQL_VERSION_ID < 50100
925 		LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
926 #endif
927 	}
928 
929 #if MYSQL_VERSION_ID < 50100
930 	send_eof(thd);
931 #endif
932 
933 	SPH_RET(FALSE);
934 }
935 
936 //////////////////////////////////////////////////////////////////////////////
937 // HELPERS
938 //////////////////////////////////////////////////////////////////////////////
939 
sphDup(const char * sSrc,int iLen=-1)940 static char * sphDup ( const char * sSrc, int iLen=-1 )
941 {
942 	if ( !sSrc )
943 		return NULL;
944 
945 	if ( iLen<0 )
946 		iLen = strlen(sSrc);
947 
948 	char * sRes = new char [ 1+iLen ];
949 	memcpy ( sRes, sSrc, iLen );
950 	sRes[iLen] = '\0';
951 	return sRes;
952 }
953 
954 
sphLogError(const char * sFmt,...)955 static void sphLogError ( const char * sFmt, ... )
956 {
957 	// emit timestamp
958 #ifdef __WIN__
959 	SYSTEMTIME t;
960 	GetLocalTime ( &t );
961 
962 	fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
963 		(int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
964 		(int)t.wHour, (int)t.wMinute, (int)t.wSecond );
965 #else
966 	// Unix version
967 	time_t tStamp;
968 	time ( &tStamp );
969 
970 	struct tm * pParsed;
971 #ifdef HAVE_LOCALTIME_R
972 	struct tm tParsed;
973 	localtime_r ( &tStamp, &tParsed );
974 	pParsed = &tParsed;
975 #else
976 	pParsed = localtime ( &tStamp );
977 #endif // HAVE_LOCALTIME_R
978 
979 	fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
980 		pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
981 		pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
982 #endif // __WIN__
983 
984 	// emit message
985 	va_list ap;
986 	va_start ( ap, sFmt );
987 	vfprintf ( stderr, sFmt, ap );
988 	va_end ( ap );
989 
990 	// emit newline
991 	fprintf ( stderr, "\n" );
992 }
993 
994 
995 
996 // the following scheme variants are recognized
997 //
998 // sphinx://host[:port]/index
999 // sphinxql://host[:port]/index
1000 // unix://unix/domain/socket[:index]
ParseUrl(CSphSEShare * share,TABLE * table,bool bCreate)1001 static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
1002 {
1003 	SPH_ENTER_FUNC();
1004 
1005 	if ( share )
1006 	{
1007 		// check incoming stuff
1008 		if ( !table )
1009 		{
1010 			sphLogError ( "table==NULL in ParseUrl()" );
1011 			return false;
1012 		}
1013 		if ( !table->s )
1014 		{
1015 			sphLogError ( "(table->s)==NULL in ParseUrl()" );
1016 			return false;
1017 		}
1018 
1019 		// free old stuff
1020 		share->ResetTable ();
1021 
1022 		// fill new stuff
1023 		share->m_iTableFields = table->s->fields;
1024 		if ( share->m_iTableFields )
1025 		{
1026 			share->m_sTableField = new char * [ share->m_iTableFields ];
1027 			share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
1028 
1029 			for ( int i=0; i<share->m_iTableFields; i++ )
1030 			{
1031 				share->m_sTableField[i] = sphDup ( table->field[i]->field_name );
1032 				share->m_eTableFieldType[i] = table->field[i]->type();
1033 			}
1034 		}
1035 	}
1036 
1037 	// defaults
1038 	bool bOk = true;
1039 	bool bQL = false;
1040 	char * sScheme = NULL;
1041 	char * sHost = SPHINXAPI_DEFAULT_HOST;
1042 	char * sIndex = SPHINXAPI_DEFAULT_INDEX;
1043 	int iPort = SPHINXAPI_DEFAULT_PORT;
1044 
1045 	// parse connection string, if any
1046 	while ( table->s->connect_string.length!=0 )
1047 	{
1048 		sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
1049 
1050 		sHost = strstr ( sScheme, "://" );
1051 		if ( !sHost )
1052 		{
1053 			bOk = false;
1054 			break;
1055 		}
1056 		sHost[0] = '\0';
1057 		sHost += 3;
1058 
1059 		/////////////////////////////
1060 		// sphinxapi via unix socket
1061 		/////////////////////////////
1062 
1063 		if ( !strcmp ( sScheme, "unix" ) )
1064 		{
1065 			sHost--; // reuse last slash
1066 			iPort = 0;
1067 			if (!( sIndex = strrchr ( sHost, ':' ) ))
1068 				sIndex = SPHINXAPI_DEFAULT_INDEX;
1069 			else
1070 			{
1071 				*sIndex++ = '\0';
1072 				if ( !*sIndex )
1073 					sIndex = SPHINXAPI_DEFAULT_INDEX;
1074 			}
1075 			bOk = true;
1076 			break;
1077 		}
1078 
1079 		/////////////////////
1080 		// sphinxapi via tcp
1081 		/////////////////////
1082 
1083 		if ( !strcmp ( sScheme, "sphinx" ) )
1084 		{
1085 			char * sPort = strchr ( sHost, ':' );
1086 			if ( sPort )
1087 			{
1088 				*sPort++ = '\0';
1089 				if ( *sPort )
1090 				{
1091 					sIndex = strchr ( sPort, '/' );
1092 					if ( sIndex )
1093 						*sIndex++ = '\0';
1094 					else
1095 						sIndex = SPHINXAPI_DEFAULT_INDEX;
1096 
1097 					iPort = atoi(sPort);
1098 					if ( !iPort )
1099 						iPort = SPHINXAPI_DEFAULT_PORT;
1100 				}
1101 			} else
1102 			{
1103 				sIndex = strchr ( sHost, '/' );
1104 				if ( sIndex )
1105 					*sIndex++ = '\0';
1106 				else
1107 					sIndex = SPHINXAPI_DEFAULT_INDEX;
1108 			}
1109 			bOk = true;
1110 			break;
1111 		}
1112 
1113 		////////////
1114 		// sphinxql
1115 		////////////
1116 
1117 		if ( !strcmp ( sScheme, "sphinxql" ) )
1118 		{
1119 			bQL = true;
1120 			iPort = SPHINXQL_DEFAULT_PORT;
1121 
1122 			// handle port
1123 			char * sPort = strchr ( sHost, ':' );
1124 			sIndex = sHost; // starting point for index name search
1125 
1126 			if ( sPort )
1127 			{
1128 				*sPort++ = '\0';
1129 				sIndex = sPort;
1130 
1131 				iPort = atoi(sPort);
1132 				if ( !iPort )
1133 				{
1134 					bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
1135 					break;
1136 				}
1137 			}
1138 
1139 			// find index
1140 			sIndex = strchr ( sIndex, '/' );
1141 			if ( sIndex )
1142 				*sIndex++ = '\0';
1143 
1144 			// final checks
1145 			// host and index names are required
1146 			bOk = ( sHost && *sHost && sIndex && *sIndex );
1147 			break;
1148 		}
1149 
1150 		// unknown case
1151 		bOk = false;
1152 		break;
1153 	}
1154 
1155 	if ( !bOk )
1156 	{
1157 		my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
1158 			MYF(0), table->s->connect_string );
1159 	} else
1160 	{
1161 		if ( share )
1162 		{
1163 			SafeDeleteArray ( share->m_sScheme );
1164 			share->m_sScheme = sScheme;
1165 			share->m_sHost = sHost;
1166 			share->m_sIndex = sIndex;
1167 			share->m_iPort = (ushort)iPort;
1168 			share->m_bSphinxQL = bQL;
1169 		}
1170 	}
1171 	if ( !bOk && !share )
1172 		SafeDeleteArray ( sScheme );
1173 
1174 	SPH_RET(bOk);
1175 }
1176 
1177 
1178 // Example of simple lock controls. The "share" it creates is structure we will
1179 // pass to each sphinx handler. Do you have to have one of these? Well, you have
1180 // pieces that are used for locking, and they are needed to function.
get_share(const char * table_name,TABLE * table)1181 static CSphSEShare * get_share ( const char * table_name, TABLE * table )
1182 {
1183 	SPH_ENTER_FUNC();
1184 	pthread_mutex_lock ( &sphinx_mutex );
1185 
1186 	CSphSEShare * pShare = NULL;
1187 	for ( ;; )
1188 	{
1189 		// check if we already have this share
1190 #if MYSQL_VERSION_ID>=50120
1191 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
1192 #else
1193 #ifdef __WIN__
1194 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
1195 #else
1196 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
1197 #endif // win
1198 #endif // pre-5.1.20
1199 
1200 		if ( pShare )
1201 		{
1202 			pShare->m_iUseCount++;
1203 			break;
1204 		}
1205 
1206 		// try to allocate new share
1207 		pShare = new CSphSEShare ();
1208 		if ( !pShare )
1209 			break;
1210 
1211 		// try to setup it
1212 		if ( !ParseUrl ( pShare, table, false ) )
1213 		{
1214 			SafeDelete ( pShare );
1215 			break;
1216 		}
1217 
1218 		if ( !pShare->m_bSphinxQL )
1219 			pShare->m_pTableQueryCharset = table->field[2]->charset();
1220 
1221 		// try to hash it
1222 		pShare->m_iTableNameLen = strlen(table_name);
1223 		pShare->m_sTable = sphDup ( table_name );
1224 		if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
1225 		{
1226 			SafeDelete ( pShare );
1227 			break;
1228 		}
1229 
1230 		// all seems fine
1231 		break;
1232 	}
1233 
1234 	pthread_mutex_unlock ( &sphinx_mutex );
1235 	SPH_RET(pShare);
1236 }
1237 
1238 
1239 // Free lock controls. We call this whenever we close a table. If the table had
1240 // the last reference to the share then we free memory associated with it.
free_share(CSphSEShare * pShare)1241 static int free_share ( CSphSEShare * pShare )
1242 {
1243 	SPH_ENTER_FUNC();
1244 	pthread_mutex_lock ( &sphinx_mutex );
1245 
1246 	if ( !--pShare->m_iUseCount )
1247 	{
1248 		sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
1249 		SafeDelete ( pShare );
1250 	}
1251 
1252 	pthread_mutex_unlock ( &sphinx_mutex );
1253 	SPH_RET(0);
1254 }
1255 
1256 
1257 #if MYSQL_VERSION_ID>50100
sphinx_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)1258 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
1259 {
1260 	sphinx_hton_ptr = hton;
1261 	return new ( mem_root ) ha_sphinx ( hton, table );
1262 }
1263 #endif
1264 
1265 //////////////////////////////////////////////////////////////////////////////
1266 // CLIENT-SIDE REQUEST STUFF
1267 //////////////////////////////////////////////////////////////////////////////
1268 
CSphSEQuery(const char * sQuery,int iLength,const char * sIndex)1269 CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
1270 	: m_sHost ( "" )
1271 	, m_iPort ( 0 )
1272 	, m_sIndex ( sIndex ? sIndex : "*" )
1273 	, m_iOffset ( 0 )
1274 	, m_iLimit ( 20 )
1275 	, m_bQuery ( false )
1276 	, m_sQuery ( "" )
1277 	, m_pWeights ( NULL )
1278 	, m_iWeights ( 0 )
1279 	, m_eMode ( SPH_MATCH_ALL )
1280 	, m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
1281 	, m_sRankExpr ( NULL )
1282 	, m_eSort ( SPH_SORT_RELEVANCE )
1283 	, m_sSortBy ( "" )
1284 	, m_iMaxMatches ( 1000 )
1285 	, m_iMaxQueryTime ( 0 )
1286 	, m_iMinID ( 0 )
1287 	, m_iMaxID ( 0 )
1288 	, m_iFilters ( 0 )
1289 	, m_eGroupFunc ( SPH_GROUPBY_DAY )
1290 	, m_sGroupBy ( "" )
1291 	, m_sGroupSortBy ( "@group desc" )
1292 	, m_iCutoff ( 0 )
1293 	, m_iRetryCount ( 0 )
1294 	, m_iRetryDelay ( 0 )
1295 	, m_sGroupDistinct ( "" )
1296 	, m_iIndexWeights ( 0 )
1297 	, m_iFieldWeights ( 0 )
1298 	, m_bGeoAnchor ( false )
1299 	, m_sGeoLatAttr ( "" )
1300 	, m_sGeoLongAttr ( "" )
1301 	, m_fGeoLatitude ( 0.0f )
1302 	, m_fGeoLongitude ( 0.0f )
1303 	, m_sComment ( "" )
1304 	, m_sSelect ( "*" )
1305 
1306 	, m_pBuf ( NULL )
1307 	, m_pCur ( NULL )
1308 	, m_iBufLeft ( 0 )
1309 	, m_bBufOverrun ( false )
1310 {
1311 	m_sQueryBuffer = new char [ iLength+2 ];
1312 	memcpy ( m_sQueryBuffer, sQuery, iLength );
1313 	m_sQueryBuffer[iLength] = ';';
1314 	m_sQueryBuffer[iLength+1] = '\0';
1315 }
1316 
1317 
~CSphSEQuery()1318 CSphSEQuery::~CSphSEQuery ()
1319 {
1320 	SPH_ENTER_METHOD();
1321 	SafeDeleteArray ( m_sQueryBuffer );
1322 	SafeDeleteArray ( m_pWeights );
1323 	SafeDeleteArray ( m_pBuf );
1324 	for ( int i=0; i<m_dOverrides.elements(); i++ )
1325 		SafeDelete ( m_dOverrides.at(i) );
1326 	SPH_VOID_RET();
1327 }
1328 
1329 
1330 template < typename T >
ParseArray(T ** ppValues,const char * sValue)1331 int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
1332 {
1333 	SPH_ENTER_METHOD();
1334 
1335 	assert ( ppValues );
1336 	assert ( !(*ppValues) );
1337 
1338 	const char * pValue;
1339 	bool bPrevDigit = false;
1340 	int iValues = 0;
1341 
1342 	// count the values
1343 	for ( pValue=sValue; *pValue; pValue++ )
1344 	{
1345 		bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1346 		if ( bDigit && !bPrevDigit )
1347 			iValues++;
1348 		bPrevDigit = bDigit;
1349 	}
1350 	if ( !iValues )
1351 		SPH_RET(0);
1352 
1353 	// extract the values
1354 	T * pValues = new T [ iValues ];
1355 	*ppValues = pValues;
1356 
1357 	int iIndex = 0, iSign = 1;
1358 	T uValue = 0;
1359 
1360 	bPrevDigit = false;
1361 	for ( pValue=sValue ;; pValue++ )
1362 	{
1363 		bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1364 
1365 		if ( bDigit )
1366 		{
1367 			if ( !bPrevDigit )
1368 				uValue = 0;
1369 			uValue = uValue*10 + ( (*pValue)-'0' );
1370 		} else if ( bPrevDigit )
1371 		{
1372 			assert ( iIndex<iValues );
1373 			pValues [ iIndex++ ] = uValue * iSign;
1374 			iSign = 1;
1375 		} else if ( *pValue=='-' )
1376 			iSign = -1;
1377 
1378 		bPrevDigit = bDigit;
1379 		if ( !*pValue )
1380 			break;
1381 	}
1382 
1383 	SPH_RET ( iValues );
1384 }
1385 
1386 
chop(char * s)1387 static char * chop ( char * s )
1388 {
1389 	while ( *s && isspace(*s) )
1390 		s++;
1391 
1392 	char * p = s + strlen(s);
1393 	while ( p>s && isspace ( p[-1] ) )
1394 		p--;
1395 	*p = '\0';
1396 
1397 	return s;
1398 }
1399 
1400 
myisattr(char c)1401 static bool myisattr ( char c )
1402 {
1403 	return
1404 		( c>='0' && c<='9' ) ||
1405 		( c>='a' && c<='z' ) ||
1406 		( c>='A' && c<='Z' ) ||
1407 		c=='_';
1408 }
1409 
myismagic(char c)1410 static bool myismagic ( char c )
1411 {
1412 	return c=='@';
1413 }
1414 
1415 
ParseField(char * sField)1416 bool CSphSEQuery::ParseField ( char * sField )
1417 {
1418 	SPH_ENTER_METHOD();
1419 
1420 	// look for option name/value separator
1421 	char * sValue = strchr ( sField, '=' );
1422 	if ( !sValue || sValue==sField || sValue[-1]=='\\' )
1423 	{
1424 		// by default let's assume it's just query
1425 		if ( sField[0] )
1426 		{
1427 			if ( m_bQuery )
1428 			{
1429 				snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
1430 				SPH_RET(false);
1431 			} else
1432 			{
1433 				m_sQuery = sField;
1434 				m_bQuery = true;
1435 
1436 				// unescape only 1st one
1437 				char *s = sField, *d = sField;
1438 				int iSlashes = 0;
1439 				while ( *s )
1440 				{
1441 					iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
1442 					if ( ( iSlashes%2 )==0 ) *d++ = *s;
1443 					s++;
1444 				}
1445 				*d = '\0';
1446 			}
1447 		}
1448 		SPH_RET(true);
1449 	}
1450 
1451 	// split
1452 	*sValue++ = '\0';
1453 	sValue = chop ( sValue );
1454 	int iValue = atoi ( sValue );
1455 
1456 	// handle options
1457 	char * sName = chop ( sField );
1458 
1459 	if ( !strcmp ( sName, "query" ) )			m_sQuery = sValue;
1460 	else if ( !strcmp ( sName, "host" ) )		m_sHost = sValue;
1461 	else if ( !strcmp ( sName, "port" ) )		m_iPort = iValue;
1462 	else if ( !strcmp ( sName, "index" ) )		m_sIndex = sValue;
1463 	else if ( !strcmp ( sName, "offset" ) )		m_iOffset = iValue;
1464 	else if ( !strcmp ( sName, "limit" ) )		m_iLimit = iValue;
1465 	else if ( !strcmp ( sName, "weights" ) )	m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
1466 	else if ( !strcmp ( sName, "minid" ) )		m_iMinID = iValue;
1467 	else if ( !strcmp ( sName, "maxid" ) )		m_iMaxID = iValue;
1468 	else if ( !strcmp ( sName, "maxmatches" ) )	m_iMaxMatches = iValue;
1469 	else if ( !strcmp ( sName, "maxquerytime" ) )	m_iMaxQueryTime = iValue;
1470 	else if ( !strcmp ( sName, "groupsort" ) )	m_sGroupSortBy = sValue;
1471 	else if ( !strcmp ( sName, "distinct" ) )	m_sGroupDistinct = sValue;
1472 	else if ( !strcmp ( sName, "cutoff" ) )		m_iCutoff = iValue;
1473 	else if ( !strcmp ( sName, "comment" ) )	m_sComment = sValue;
1474 	else if ( !strcmp ( sName, "select" ) )		m_sSelect = sValue;
1475 
1476 	else if ( !strcmp ( sName, "mode" ) )
1477 	{
1478 		m_eMode = SPH_MATCH_ALL;
1479 		if ( !strcmp ( sValue, "any" ) )			m_eMode = SPH_MATCH_ANY;
1480 		else if ( !strcmp ( sValue, "phrase" ) )	m_eMode = SPH_MATCH_PHRASE;
1481 		else if ( !strcmp ( sValue, "boolean" ) )	m_eMode = SPH_MATCH_BOOLEAN;
1482 		else if ( !strcmp ( sValue, "ext" ) )		m_eMode = SPH_MATCH_EXTENDED;
1483 		else if ( !strcmp ( sValue, "extended" ) )	m_eMode = SPH_MATCH_EXTENDED;
1484 		else if ( !strcmp ( sValue, "ext2" ) )		m_eMode = SPH_MATCH_EXTENDED2;
1485 		else if ( !strcmp ( sValue, "extended2" ) )	m_eMode = SPH_MATCH_EXTENDED2;
1486 		else if ( !strcmp ( sValue, "all" ) )		m_eMode = SPH_MATCH_ALL;
1487 		else if ( !strcmp ( sValue, "fullscan" ) )	m_eMode = SPH_MATCH_FULLSCAN;
1488 		else
1489 		{
1490 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
1491 			SPH_RET(false);
1492 		}
1493 	} else if ( !strcmp ( sName, "ranker" ) )
1494 	{
1495 		m_eRanker = SPH_RANK_PROXIMITY_BM25;
1496 		if ( !strcmp ( sValue, "proximity_bm25" ) )	m_eRanker = SPH_RANK_PROXIMITY_BM25;
1497 		else if ( !strcmp ( sValue, "bm25" ) )		m_eRanker = SPH_RANK_BM25;
1498 		else if ( !strcmp ( sValue, "none" ) )		m_eRanker = SPH_RANK_NONE;
1499 		else if ( !strcmp ( sValue, "wordcount" ) )	m_eRanker = SPH_RANK_WORDCOUNT;
1500 		else if ( !strcmp ( sValue, "proximity" ) )	m_eRanker = SPH_RANK_PROXIMITY;
1501 		else if ( !strcmp ( sValue, "matchany" ) )	m_eRanker = SPH_RANK_MATCHANY;
1502 		else if ( !strcmp ( sValue, "fieldmask" ) )	m_eRanker = SPH_RANK_FIELDMASK;
1503 		else if ( !strcmp ( sValue, "sph04" ) )		m_eRanker = SPH_RANK_SPH04;
1504 		else if ( !strncmp ( sValue, "expr:", 5 ) )
1505 		{
1506 			m_eRanker = SPH_RANK_EXPR;
1507 			m_sRankExpr = sValue+5;
1508 		} else
1509 		{
1510 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
1511 			SPH_RET(false);
1512 		}
1513 	} else if ( !strcmp ( sName, "sort" ) )
1514 	{
1515 		static const struct
1516 		{
1517 			const char *	m_sName;
1518 			ESphSortOrder	m_eSort;
1519 		} dSortModes[] =
1520 		{
1521 			{ "relevance",		SPH_SORT_RELEVANCE },
1522 			{ "attr_desc:",		SPH_SORT_ATTR_DESC },
1523 			{ "attr_asc:",		SPH_SORT_ATTR_ASC },
1524 			{ "time_segments:",	SPH_SORT_TIME_SEGMENTS },
1525 			{ "extended:",		SPH_SORT_EXTENDED },
1526 			{ "expr:",			SPH_SORT_EXPR }
1527 		};
1528 
1529 		int i;
1530 		const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
1531 		for ( i=0; i<nModes; i++ )
1532 			if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
1533 		{
1534 			m_eSort = dSortModes[i].m_eSort;
1535 			m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
1536 			break;
1537 		}
1538 		if ( i==nModes )
1539 		{
1540 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
1541 			SPH_RET(false);
1542 		}
1543 
1544 	} else if ( !strcmp ( sName, "groupby" ) )
1545 	{
1546 		static const struct
1547 		{
1548 			const char *	m_sName;
1549 			ESphGroupBy		m_eFunc;
1550 		} dGroupModes[] =
1551 		{
1552 			{ "day:",	SPH_GROUPBY_DAY },
1553 			{ "week:",	SPH_GROUPBY_WEEK },
1554 			{ "month:",	SPH_GROUPBY_MONTH },
1555 			{ "year:",	SPH_GROUPBY_YEAR },
1556 			{ "attr:",	SPH_GROUPBY_ATTR },
1557 		};
1558 
1559 		int i;
1560 		const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
1561 		for ( i=0; i<nModes; i++ )
1562 			if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
1563 		{
1564 			m_eGroupFunc = dGroupModes[i].m_eFunc;
1565 			m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
1566 			break;
1567 		}
1568 		if ( i==nModes )
1569 		{
1570 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
1571 			SPH_RET(false);
1572 		}
1573 
1574 	} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1575 		( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
1576 	{
1577 		for ( ;; )
1578 		{
1579 			char * p = sName;
1580 			CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1581 			tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
1582 			tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
1583 
1584 			if (!( p = strchr ( sValue, ',' ) ))
1585 				break;
1586 			*p++ = '\0';
1587 
1588 			tFilter.m_sAttrName = chop ( sValue );
1589 			sValue = p;
1590 
1591 			if (!( p = strchr ( sValue, ',' ) ))
1592 				break;
1593 			*p++ = '\0';
1594 
1595 			if ( tFilter.m_eType==SPH_FILTER_RANGE )
1596 			{
1597 				tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
1598 				tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
1599 			} else
1600 			{
1601 				tFilter.m_fMinValue = (float)atof(sValue);
1602 				tFilter.m_fMaxValue = (float)atof(p);
1603 			}
1604 
1605 			// all ok
1606 			m_iFilters++;
1607 			break;
1608 		}
1609 
1610 	} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1611 		( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
1612 	{
1613 		for ( ;; )
1614 		{
1615 			CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1616 			tFilter.m_eType = SPH_FILTER_VALUES;
1617 			tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
1618 
1619 			// get the attr name
1620 			while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
1621 				sValue++;
1622 			if ( !*sValue )
1623 				break;
1624 
1625 			tFilter.m_sAttrName = sValue;
1626 			while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) ) )
1627 				sValue++;
1628 			if ( !*sValue )
1629 				break;
1630 			*sValue++ = '\0';
1631 
1632 			// get the values
1633 			tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
1634 			if ( !tFilter.m_iValues )
1635 			{
1636 				assert ( !tFilter.m_pValues );
1637 				break;
1638 			}
1639 
1640 			// all ok
1641 			m_iFilters++;
1642 			break;
1643 		}
1644 
1645 	} else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
1646 	{
1647 		bool bIndex = !strcmp ( sName, "indexweights" );
1648 		int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
1649 		char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
1650 		int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
1651 
1652 		*pCount = 0;
1653 
1654 		char * p = sValue;
1655 		while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
1656 		{
1657 			// extract attr name
1658 			if ( !myisattr(*p) )
1659 			{
1660 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
1661 				SPH_RET(false);
1662 			}
1663 
1664 			pNames[*pCount] = p;
1665 			while ( myisattr(*p) ) p++;
1666 
1667 			if ( *p!=',' )
1668 			{
1669 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1670 				SPH_RET(false);
1671 			}
1672 			*p++ = '\0';
1673 
1674 			// extract attr value
1675 			char * sVal = p;
1676 			while ( isdigit(*p) ) p++;
1677 			if ( p==sVal )
1678 			{
1679 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
1680 				SPH_RET(false);
1681 			}
1682 			pWeights[*pCount] = atoi(sVal);
1683 			(*pCount)++;
1684 
1685 			if ( !*p )
1686 				break;
1687 			if ( *p!=',' )
1688 			{
1689 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1690 				SPH_RET(false);
1691 			}
1692 			p++;
1693 		}
1694 
1695 	} else if ( !strcmp ( sName, "geoanchor" ) )
1696 	{
1697 		m_bGeoAnchor = false;
1698 		for ( ;; )
1699 		{
1700 			char * sLat = sValue;
1701 			char * p = sValue;
1702 
1703 			if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1704 			char * sLong = p;
1705 
1706 			if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1707 			char * sLatVal = p;
1708 
1709 			if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1710 			char * sLongVal = p;
1711 
1712 			m_sGeoLatAttr = chop(sLat);
1713 			m_sGeoLongAttr = chop(sLong);
1714 			m_fGeoLatitude = (float)atof ( sLatVal );
1715 			m_fGeoLongitude = (float)atof ( sLongVal );
1716 			m_bGeoAnchor = true;
1717 			break;
1718 		}
1719 		if ( !m_bGeoAnchor )
1720 		{
1721 			snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
1722 			SPH_RET(false);
1723 		}
1724 	} else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
1725 	{
1726 		char * sName = NULL;
1727 		int iType = 0;
1728 		CSphSEQuery::Override_t * pOverride = NULL;
1729 
1730 		// get name and type
1731 		char * sRest = sValue;
1732 		for ( ;; )
1733 		{
1734 			sName = sRest;
1735 			if ( !*sName )
1736 				break;
1737 			if (!( sRest = strchr ( sRest, ',' ) ))
1738 				break;
1739 			*sRest++ = '\0';
1740 			char * sType = sRest;
1741 			if (!( sRest = strchr ( sRest, ',' ) ))
1742 				break;
1743 
1744 			static const struct
1745 			{
1746 				const char *	m_sName;
1747 				int				m_iType;
1748 			}
1749 			dAttrTypes[] =
1750 			{
1751 				{ "int",		SPH_ATTR_INTEGER },
1752 				{ "timestamp",	SPH_ATTR_TIMESTAMP },
1753 				{ "bool",		SPH_ATTR_BOOL },
1754 				{ "float",		SPH_ATTR_FLOAT },
1755 				{ "bigint",		SPH_ATTR_BIGINT }
1756 			};
1757 			for ( int i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
1758 				if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
1759 			{
1760 				iType = dAttrTypes[i].m_iType;
1761 				break;
1762 			}
1763 			break;
1764 		}
1765 
1766 		// fail
1767 		if ( !sName || !*sName || !iType )
1768 		{
1769 			snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
1770 			SPH_RET(false);
1771 		}
1772 
1773 		// grab id:value pairs
1774 		sRest++;
1775 		while ( sRest )
1776 		{
1777 			char * sId = sRest;
1778 			if (!( sRest = strchr ( sRest, ':' ) )) break; *sRest++ = '\0';
1779 			if (!( sRest - sId )) break;
1780 
1781 			char * sValue = sRest;
1782 			if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
1783 				*sRest++ = '\0';
1784 			if ( !*sValue )
1785 				break;
1786 
1787 			if ( !pOverride )
1788 			{
1789 				pOverride = new CSphSEQuery::Override_t;
1790 				pOverride->m_sName = chop(sName);
1791 				pOverride->m_iType = iType;
1792 				m_dOverrides.append ( pOverride );
1793 			}
1794 
1795 			ulonglong uId = strtoull ( sId, NULL, 10 );
1796 			CSphSEQuery::Override_t::Value_t tValue;
1797 			if ( iType==SPH_ATTR_FLOAT )
1798 				tValue.m_fValue = (float)atof(sValue);
1799 			else if ( iType==SPH_ATTR_BIGINT )
1800 				tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
1801 			else
1802 				tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
1803 
1804 			pOverride->m_dIds.append ( uId );
1805 			pOverride->m_dValues.append ( tValue );
1806 		}
1807 
1808 		if ( !pOverride )
1809 		{
1810 			snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
1811 			SPH_RET(false);
1812 		}
1813 		SPH_RET(true);
1814 	} else
1815 	{
1816 		snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
1817 		SPH_RET(false);
1818 	}
1819 
1820 	// !COMMIT handle syntax errors
1821 
1822 	SPH_RET(true);
1823 }
1824 
1825 
Parse()1826 bool CSphSEQuery::Parse ()
1827 {
1828 	SPH_ENTER_METHOD();
1829 	SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
1830 
1831 	m_bQuery = false;
1832 	char * pCur = m_sQueryBuffer;
1833 	char * pNext = pCur;
1834 
1835 	while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
1836 	{
1837 		// handle escaped semicolons
1838 		if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
1839 		{
1840 			pNext++;
1841 			continue;
1842 		}
1843 
1844 		// handle semicolon-separated clauses
1845 		*pNext++ = '\0';
1846 		if ( !ParseField ( pCur ) )
1847 			SPH_RET(false);
1848 		pCur = pNext;
1849 	}
1850 
1851 	SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
1852 
1853 	SPH_RET(true);
1854 }
1855 
1856 
SendBytes(const void * pBytes,int iBytes)1857 void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
1858 {
1859 	SPH_ENTER_METHOD();
1860 	if ( m_iBufLeft<iBytes )
1861 	{
1862 		m_bBufOverrun = true;
1863 		SPH_VOID_RET();
1864 	}
1865 
1866 	memcpy ( m_pCur, pBytes, iBytes );
1867 
1868 	m_pCur += iBytes;
1869 	m_iBufLeft -= iBytes;
1870 	SPH_VOID_RET();
1871 }
1872 
1873 
BuildRequest(char ** ppBuffer)1874 int CSphSEQuery::BuildRequest ( char ** ppBuffer )
1875 {
1876 	SPH_ENTER_METHOD();
1877 
1878 	// calc request length
1879 	int iReqSize = 128 + 4*m_iWeights
1880 		+ strlen ( m_sSortBy )
1881 		+ strlen ( m_sQuery )
1882 		+ strlen ( m_sIndex )
1883 		+ strlen ( m_sGroupBy )
1884 		+ strlen ( m_sGroupSortBy )
1885 		+ strlen ( m_sGroupDistinct )
1886 		+ strlen ( m_sComment )
1887 		+ strlen ( m_sSelect );
1888 	if ( m_eRanker==SPH_RANK_EXPR )
1889 		iReqSize += 4 + strlen(m_sRankExpr);
1890 	for ( int i=0; i<m_iFilters; i++ )
1891 	{
1892 		const CSphSEFilter & tFilter = m_dFilters[i];
1893 		iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
1894 		switch ( tFilter.m_eType )
1895 		{
1896 			case SPH_FILTER_VALUES:		iReqSize += 4 + 8*tFilter.m_iValues; break;
1897 			case SPH_FILTER_RANGE:		iReqSize += 16; break;
1898 			case SPH_FILTER_FLOATRANGE:	iReqSize += 8; break;
1899 		}
1900 	}
1901 	if ( m_bGeoAnchor ) // 1.14+
1902 		iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
1903 	for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
1904 		iReqSize += 8 + strlen(m_sIndexWeight[i] );
1905 	for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
1906 		iReqSize += 8 + strlen(m_sFieldWeight[i] );
1907 	// overrides
1908 	iReqSize += 4;
1909 	for ( int i=0; i<m_dOverrides.elements(); i++ )
1910 	{
1911 		CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
1912 		const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
1913 		iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
1914 	}
1915 	// select
1916 	iReqSize += 4;
1917 
1918 	m_iBufLeft = 0;
1919 	SafeDeleteArray ( m_pBuf );
1920 
1921 	m_pBuf = new char [ iReqSize ];
1922 	if ( !m_pBuf )
1923 		SPH_RET(-1);
1924 
1925 	m_pCur = m_pBuf;
1926 	m_iBufLeft = iReqSize;
1927 	m_bBufOverrun = false;
1928 	(*ppBuffer) = m_pBuf;
1929 
1930 	// build request
1931 	SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
1932 	SendWord ( VER_COMMAND_SEARCH ); // command version
1933 	SendInt ( iReqSize-8 ); // packet body length
1934 	SendInt ( 0 ); // its a client
1935 
1936 	SendInt ( 1 ); // number of queries
1937 	SendInt ( m_iOffset );
1938 	SendInt ( m_iLimit );
1939 	SendInt ( m_eMode );
1940 	SendInt ( m_eRanker ); // 1.16+
1941 	if ( m_eRanker==SPH_RANK_EXPR )
1942 		SendString ( m_sRankExpr );
1943 	SendInt ( m_eSort );
1944 	SendString ( m_sSortBy ); // sort attr
1945 	SendString ( m_sQuery ); // query
1946 	SendInt ( m_iWeights );
1947 	for ( int j=0; j<m_iWeights; j++ )
1948 		SendInt ( m_pWeights[j] ); // weights
1949 	SendString ( m_sIndex ); // indexes
1950 	SendInt ( 1 ); // id64 range follows
1951 	SendUint64 ( m_iMinID ); // id/ts ranges
1952 	SendUint64 ( m_iMaxID );
1953 
1954 	SendInt ( m_iFilters );
1955 	for ( int j=0; j<m_iFilters; j++ )
1956 	{
1957 		const CSphSEFilter & tFilter = m_dFilters[j];
1958 		SendString ( tFilter.m_sAttrName );
1959 		SendInt ( tFilter.m_eType );
1960 
1961 		switch ( tFilter.m_eType )
1962 		{
1963 			case SPH_FILTER_VALUES:
1964 				SendInt ( tFilter.m_iValues );
1965 				for ( int k=0; k<tFilter.m_iValues; k++ )
1966 					SendUint64 ( tFilter.m_pValues[k] );
1967 				break;
1968 
1969 			case SPH_FILTER_RANGE:
1970 				SendUint64 ( tFilter.m_uMinValue );
1971 				SendUint64 ( tFilter.m_uMaxValue );
1972 				break;
1973 
1974 			case SPH_FILTER_FLOATRANGE:
1975 				SendFloat ( tFilter.m_fMinValue );
1976 				SendFloat ( tFilter.m_fMaxValue );
1977 				break;
1978 		}
1979 
1980 		SendInt ( tFilter.m_bExclude );
1981 	}
1982 
1983 	SendInt ( m_eGroupFunc );
1984 	SendString ( m_sGroupBy );
1985 	SendInt ( m_iMaxMatches );
1986 	SendString ( m_sGroupSortBy );
1987 	SendInt ( m_iCutoff ); // 1.9+
1988 	SendInt ( m_iRetryCount ); // 1.10+
1989 	SendInt ( m_iRetryDelay );
1990 	SendString ( m_sGroupDistinct ); // 1.11+
1991 	SendInt ( m_bGeoAnchor ); // 1.14+
1992 	if ( m_bGeoAnchor )
1993 	{
1994 		SendString ( m_sGeoLatAttr );
1995 		SendString ( m_sGeoLongAttr );
1996 		SendFloat ( m_fGeoLatitude );
1997 		SendFloat ( m_fGeoLongitude );
1998 	}
1999 	SendInt ( m_iIndexWeights ); // 1.15+
2000 	for ( int i=0; i<m_iIndexWeights; i++ )
2001 	{
2002 		SendString ( m_sIndexWeight[i] );
2003 		SendInt ( m_iIndexWeight[i] );
2004 	}
2005 	SendInt ( m_iMaxQueryTime ); // 1.17+
2006 	SendInt ( m_iFieldWeights ); // 1.18+
2007 	for ( int i=0; i<m_iFieldWeights; i++ )
2008 	{
2009 		SendString ( m_sFieldWeight[i] );
2010 		SendInt ( m_iFieldWeight[i] );
2011 	}
2012 	SendString ( m_sComment );
2013 
2014 	// overrides
2015 	SendInt ( m_dOverrides.elements() );
2016 	for ( int i=0; i<m_dOverrides.elements(); i++ )
2017 	{
2018 		CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
2019 		SendString ( pOverride->m_sName );
2020 		SendDword ( pOverride->m_iType );
2021 		SendInt ( pOverride->m_dIds.elements() );
2022 		for ( int j=0; j<pOverride->m_dIds.elements(); j++ )
2023 		{
2024 			SendUint64 ( pOverride->m_dIds.at(j) );
2025 			if ( pOverride->m_iType==SPH_ATTR_FLOAT )
2026 				SendFloat ( pOverride->m_dValues.at(j).m_fValue );
2027 			else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
2028 				SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
2029 			else
2030 				SendDword ( pOverride->m_dValues.at(j).m_uValue );
2031 		}
2032 	}
2033 
2034 	// select
2035 	SendString ( m_sSelect );
2036 
2037 	// detect buffer overruns and underruns, and report internal error
2038 	if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
2039 		SPH_RET(-1);
2040 
2041 	// all fine
2042 	SPH_RET ( iReqSize );
2043 }
2044 
2045 //////////////////////////////////////////////////////////////////////////////
2046 // SPHINX HANDLER
2047 //////////////////////////////////////////////////////////////////////////////
2048 
2049 static const char * ha_sphinx_exts[] = { NullS };
2050 
2051 
2052 #if MYSQL_VERSION_ID<50100
ha_sphinx(TABLE_ARG * table)2053 ha_sphinx::ha_sphinx ( TABLE_ARG * table )
2054 	: handler ( &sphinx_hton, table )
2055 #else
2056 ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
2057 	: handler ( hton, table )
2058 #endif
2059 	, m_pShare ( NULL )
2060 	, m_iMatchesTotal ( 0 )
2061 	, m_iCurrentPos ( 0 )
2062 	, m_pCurrentKey ( NULL )
2063 	, m_iCurrentKeyLen ( 0 )
2064 	, m_pResponse ( NULL )
2065 	, m_pResponseEnd ( NULL )
2066 	, m_pCur ( NULL )
2067 	, m_bUnpackError ( false )
2068 	, m_iFields ( 0 )
2069 	, m_dFields ( NULL )
2070 	, m_iAttrs ( 0 )
2071 	, m_dAttrs ( NULL )
2072 	, m_bId64 ( 0 )
2073 	, m_dUnboundFields ( NULL )
2074 {
2075 	SPH_ENTER_METHOD();
2076 	if ( current_thd )
2077 		current_thd->variables.engine_condition_pushdown = true;
2078 	SPH_VOID_RET();
2079 }
2080 
2081 
2082 // If frm_error() is called then we will use this to to find out what file extentions
2083 // exist for the storage engine. This is also used by the default rename_table and
2084 // delete_table method in handler.cc.
bas_ext() const2085 const char ** ha_sphinx::bas_ext() const
2086 {
2087 	return ha_sphinx_exts;
2088 }
2089 
2090 
2091 // Used for opening tables. The name will be the name of the file.
2092 // A table is opened when it needs to be opened. For instance
2093 // when a request comes in for a select on the table (tables are not
2094 // open and closed for each request, they are cached).
2095 //
2096 // Called from handler.cc by handler::ha_open(). The server opens all tables by
2097 // calling ha_open() which then calls the handler specific open().
open(const char * name,int,uint)2098 int ha_sphinx::open ( const char * name, int, uint )
2099 {
2100 	SPH_ENTER_METHOD();
2101 	m_pShare = get_share ( name, table );
2102 	if ( !m_pShare )
2103 		SPH_RET(1);
2104 
2105 	thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
2106 
2107 	#if MYSQL_VERSION_ID>50100
2108 	void ** tmp = thd_ha_data ( table->in_use, ht );
2109 	if ( *tmp )
2110 	{
2111 		CSphTLS * pTls = (CSphTLS *)( *tmp );
2112 		SafeDelete ( pTls );
2113 		*tmp = NULL;
2114 	}
2115 	#else
2116 	if ( table->in_use->ha_data [ sphinx_hton.slot ] )
2117 	{
2118 		CSphTLS * pTls = (CSphTLS *)( table->in_use->ha_data [ sphinx_hton.slot ] );
2119 		SafeDelete ( pTls );
2120 		table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
2121 	}
2122 	#endif
2123 
2124 	SPH_RET(0);
2125 }
2126 
2127 
Connect(const char * sHost,ushort uPort)2128 int ha_sphinx::Connect ( const char * sHost, ushort uPort )
2129 {
2130 	struct sockaddr_in sin;
2131 #ifndef __WIN__
2132 	struct sockaddr_un saun;
2133 #endif
2134 
2135 	int iDomain = 0;
2136 	int iSockaddrSize = 0;
2137 	struct sockaddr * pSockaddr = NULL;
2138 
2139 	in_addr_t ip_addr;
2140 
2141 	if ( uPort )
2142 	{
2143 		iDomain = AF_INET;
2144 		iSockaddrSize = sizeof(sin);
2145 		pSockaddr = (struct sockaddr *) &sin;
2146 
2147 		memset ( &sin, 0, sizeof(sin) );
2148 		sin.sin_family = AF_INET;
2149 		sin.sin_port = htons(uPort);
2150 
2151 		// prepare host address
2152 		if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
2153 		{
2154 			memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
2155 		} else
2156 		{
2157 			int tmp_errno;
2158 			bool bError = false;
2159 
2160 #if MYSQL_VERSION_ID>=50515
2161 			struct addrinfo * hp = NULL;
2162 			tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
2163 			if ( tmp_errno!=0 || !hp || !hp->ai_addr )
2164 			{
2165 				bError = true;
2166 				if ( hp )
2167 					freeaddrinfo ( hp );
2168 			}
2169 #else
2170 			struct hostent tmp_hostent, *hp;
2171 			char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
2172 			hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
2173 			if ( !hp )
2174 			{
2175 				my_gethostbyname_r_free();
2176 				bError = true;
2177 			}
2178 #endif
2179 
2180 			if ( bError )
2181 			{
2182 				char sError[256];
2183 				my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
2184 
2185 				my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2186 				SPH_RET(-1);
2187 			}
2188 
2189 #if MYSQL_VERSION_ID>=50515
2190 			memcpy ( &sin.sin_addr, &( (struct sockaddr_in *)hp->ai_addr )->sin_addr, sizeof(sin.sin_addr) );
2191 			freeaddrinfo ( hp );
2192 #else
2193 			memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
2194 			my_gethostbyname_r_free();
2195 #endif
2196 		}
2197 	} else
2198 	{
2199 #ifndef __WIN__
2200 		iDomain = AF_UNIX;
2201 		iSockaddrSize = sizeof(saun);
2202 		pSockaddr = (struct sockaddr *) &saun;
2203 
2204 		memset ( &saun, 0, sizeof(saun) );
2205 		saun.sun_family = AF_UNIX;
2206 		strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
2207 #else
2208 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
2209 		SPH_RET(-1);
2210 #endif
2211 	}
2212 
2213 	char sError[512];
2214 	int iSocket = socket ( iDomain, SOCK_STREAM, 0 );
2215 
2216 	if ( iSocket<0 )
2217 	{
2218 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
2219 		SPH_RET(-1);
2220 	}
2221 
2222 	if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
2223 	{
2224 		sphSockClose ( iSocket );
2225 		my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
2226 			sHost, errno, (int)uPort );
2227 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2228 		SPH_RET(-1);
2229 	}
2230 
2231 	return iSocket;
2232 }
2233 
2234 
ConnectAPI(const char * sQueryHost,int iQueryPort)2235 int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
2236 {
2237 	SPH_ENTER_METHOD();
2238 
2239 	const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
2240 	ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
2241 
2242 	int iSocket = Connect ( sHost, uPort );
2243 	if ( iSocket<0 )
2244 		SPH_RET ( iSocket );
2245 
2246 	char sError[512];
2247 
2248 	int version;
2249 	if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
2250 	{
2251 		sphSockClose ( iSocket );
2252 		my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
2253 			sHost, (int)uPort );
2254 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2255 		SPH_RET(-1);
2256 	}
2257 
2258 	uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
2259 	if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
2260 	{
2261 		sphSockClose ( iSocket );
2262 		my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
2263 			sHost, (int)uPort );
2264 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2265 		SPH_RET(-1);
2266 	}
2267 
2268 	SPH_RET ( iSocket );
2269 }
2270 
2271 
2272 // Closes a table. We call the free_share() function to free any resources
2273 // that we have allocated in the "shared" structure.
2274 //
2275 // Called from sql_base.cc, sql_select.cc, and table.cc.
2276 // In sql_select.cc it is only used to close up temporary tables or during
2277 // the process where a temporary table is converted over to being a
2278 // myisam table.
2279 // For sql_base.cc look at close_data_tables().
close()2280 int ha_sphinx::close()
2281 {
2282 	SPH_ENTER_METHOD();
2283 	SPH_RET ( free_share ( m_pShare ) );
2284 }
2285 
2286 
HandleMysqlError(MYSQL * pConn,int iErrCode)2287 int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
2288 {
2289 	CSphSEThreadTable * pTable = GetTls ();
2290 	if ( pTable )
2291 	{
2292 		strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof ( pTable->m_tStats.m_sLastMessage ) );
2293 		pTable->m_tStats.m_bLastError = true;
2294 	}
2295 
2296 	mysql_close ( pConn );
2297 
2298 	my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
2299 	return -1;
2300 }
2301 
2302 
extra(enum ha_extra_function op)2303 int ha_sphinx::extra ( enum ha_extra_function op )
2304 {
2305 	CSphSEThreadTable * pTable = GetTls();
2306 	if ( pTable )
2307 	{
2308 		if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
2309 			pTable->m_bReplace = true;
2310 		else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
2311 			pTable->m_bReplace = false;
2312 	}
2313 	return 0;
2314 }
2315 
2316 
write_row(byte *)2317 int ha_sphinx::write_row ( byte * )
2318 {
2319 	SPH_ENTER_METHOD();
2320 	if ( !m_pShare || !m_pShare->m_bSphinxQL )
2321 		SPH_RET ( HA_ERR_WRONG_COMMAND );
2322 
2323 	// SphinxQL inserts only, pretty much similar to abandoned federated
2324 	char sQueryBuf[1024];
2325 	char sValueBuf[1024];
2326 
2327 	String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2328 	String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
2329 	sQuery.length ( 0 );
2330 	sValue.length ( 0 );
2331 
2332 	CSphSEThreadTable * pTable = GetTls ();
2333 	sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
2334 	sQuery.append ( m_pShare->m_sIndex );
2335 	sQuery.append ( " (" );
2336 
2337 	for ( Field ** ppField = table->field; *ppField; ppField++ )
2338 	{
2339 		sQuery.append ( (*ppField)->field_name );
2340 		if ( ppField[1] )
2341 			sQuery.append ( ", " );
2342 	}
2343 	sQuery.append ( ") VALUES (" );
2344 
2345 	for ( Field ** ppField = table->field; *ppField; ppField++ )
2346 	{
2347 		if ( (*ppField)->is_null() )
2348 		{
2349 			sQuery.append ( "''" );
2350 
2351 		} else
2352 		{
2353 			if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
2354 			{
2355 				Item_field * pWrap = new Item_field ( *ppField ); // autofreed by query arena, I assume
2356 				Item_func_unix_timestamp * pConv = new Item_func_unix_timestamp ( pWrap );
2357 				pConv->quick_fix_field();
2358 				unsigned int uTs = (unsigned int) pConv->val_int();
2359 
2360 				snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
2361 				sQuery.append ( sValueBuf );
2362 
2363 			} else
2364 			{
2365 				(*ppField)->val_str ( &sValue );
2366 
2367 				int iLen = sValue.length();
2368 				bool bMva = ( iLen>1 && sValue.ptr()[0]=='(' && sValue.ptr()[iLen-1]==')' );
2369 
2370 				if ( !bMva )
2371 					sQuery.append ( "'" );
2372 				sValue.print ( &sQuery );
2373 				if ( !bMva )
2374 					sQuery.append ( "'" );
2375 
2376 				sValue.length(0);
2377 			}
2378 		}
2379 
2380 		if ( ppField[1] )
2381 			sQuery.append ( ", " );
2382 	}
2383 	sQuery.append ( ")" );
2384 
2385 	// FIXME? pretty inefficient to reconnect every time under high load,
2386 	// but this was intentionally written for a low load scenario..
2387 	MYSQL * pConn = mysql_init ( NULL );
2388 	if ( !pConn )
2389 		SPH_RET ( ER_OUT_OF_RESOURCES );
2390 
2391 	unsigned int uTimeout = 1;
2392 	mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2393 
2394 	if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2395 		SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2396 
2397 	if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2398 		SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2399 
2400 	// all ok!
2401 	mysql_close ( pConn );
2402 	SPH_RET(0);
2403 }
2404 
2405 
IsIntegerFieldType(enum_field_types eType)2406 static inline bool IsIntegerFieldType ( enum_field_types eType )
2407 {
2408 	return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
2409 }
2410 
2411 
IsIDField(Field * pField)2412 static inline bool IsIDField ( Field * pField )
2413 {
2414 	enum_field_types eType = pField->type();
2415 
2416 	if ( eType==MYSQL_TYPE_LONGLONG )
2417 		return true;
2418 
2419 	if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
2420 		return true;
2421 
2422 	return false;
2423 }
2424 
2425 
delete_row(const byte *)2426 int ha_sphinx::delete_row ( const byte * )
2427 {
2428 	SPH_ENTER_METHOD();
2429 	if ( !m_pShare || !m_pShare->m_bSphinxQL )
2430 		SPH_RET ( HA_ERR_WRONG_COMMAND );
2431 
2432 	char sQueryBuf[1024];
2433 	String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2434 	sQuery.length ( 0 );
2435 
2436 	sQuery.append ( "DELETE FROM " );
2437 	sQuery.append ( m_pShare->m_sIndex );
2438 	sQuery.append ( " WHERE id=" );
2439 
2440 	char sValue[32];
2441 	snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
2442 	sQuery.append ( sValue );
2443 
2444 	// FIXME? pretty inefficient to reconnect every time under high load,
2445 	// but this was intentionally written for a low load scenario..
2446 	MYSQL * pConn = mysql_init ( NULL );
2447 	if ( !pConn )
2448 		SPH_RET ( ER_OUT_OF_RESOURCES );
2449 
2450 	unsigned int uTimeout = 1;
2451 	mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2452 
2453 	if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2454 		SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2455 
2456 	if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2457 		SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2458 
2459 	// all ok!
2460 	mysql_close ( pConn );
2461 	SPH_RET(0);
2462 }
2463 
2464 
update_row(const byte *,byte *)2465 int ha_sphinx::update_row ( const byte *, byte * )
2466 {
2467 	SPH_ENTER_METHOD();
2468 	SPH_RET ( HA_ERR_WRONG_COMMAND );
2469 }
2470 
2471 
2472 // keynr is key (index) number
2473 // sorted is 1 if result MUST be sorted according to index
index_init(uint keynr,bool)2474 int ha_sphinx::index_init ( uint keynr, bool )
2475 {
2476 	SPH_ENTER_METHOD();
2477 	active_index = keynr;
2478 
2479 	CSphSEThreadTable * pTable = GetTls();
2480 	if ( pTable )
2481 		pTable->m_bCondDone = false;
2482 
2483 	SPH_RET(0);
2484 }
2485 
2486 
index_end()2487 int ha_sphinx::index_end()
2488 {
2489 	SPH_ENTER_METHOD();
2490 	SPH_RET(0);
2491 }
2492 
2493 
CheckResponcePtr(int iLen)2494 bool ha_sphinx::CheckResponcePtr ( int iLen )
2495 {
2496 	if ( m_pCur+iLen>m_pResponseEnd )
2497 	{
2498 		m_pCur = m_pResponseEnd;
2499 		m_bUnpackError = true;
2500 		return false;
2501 	}
2502 
2503 	return true;
2504 }
2505 
2506 
UnpackDword()2507 uint32 ha_sphinx::UnpackDword ()
2508 {
2509 	if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
2510 	{
2511 		return 0;
2512 	}
2513 
2514 	uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
2515 	m_pCur += sizeof(uint32); // NOLINT
2516 	return uRes;
2517 }
2518 
2519 
UnpackString()2520 char * ha_sphinx::UnpackString ()
2521 {
2522 	uint32 iLen = UnpackDword ();
2523 	if ( !iLen )
2524 		return NULL;
2525 
2526 	if ( !CheckResponcePtr ( iLen ) )
2527 	{
2528 		return NULL;
2529 	}
2530 
2531 	char * sRes = new char [ 1+iLen ];
2532 	memcpy ( sRes, m_pCur, iLen );
2533 	sRes[iLen] = '\0';
2534 	m_pCur += iLen;
2535 	return sRes;
2536 }
2537 
2538 
FixNull(const char * s)2539 static inline const char * FixNull ( const char * s )
2540 {
2541 	return s ? s : "(null)";
2542 }
2543 
2544 
UnpackSchema()2545 bool ha_sphinx::UnpackSchema ()
2546 {
2547 	SPH_ENTER_METHOD();
2548 
2549 	// cleanup
2550 	if ( m_dFields )
2551 		for ( int i=0; i<(int)m_iFields; i++ )
2552 			SafeDeleteArray ( m_dFields[i] );
2553 	SafeDeleteArray ( m_dFields );
2554 
2555 	// unpack network packet
2556 	uint32 uStatus = UnpackDword ();
2557 	char * sMessage = NULL;
2558 
2559 	if ( uStatus!=SEARCHD_OK )
2560 	{
2561 		sMessage = UnpackString ();
2562 		CSphSEThreadTable * pTable = GetTls ();
2563 		if ( pTable )
2564 		{
2565 			strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
2566 			pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
2567 		}
2568 
2569 		if ( uStatus==SEARCHD_ERROR )
2570 		{
2571 			char sError[1024];
2572 			my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
2573 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2574 			SafeDeleteArray ( sMessage );
2575 			SPH_RET ( false );
2576 		}
2577 	}
2578 
2579 	m_iFields = UnpackDword ();
2580 	m_dFields = new char * [ m_iFields ];
2581 	if ( !m_dFields )
2582 	{
2583 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
2584 		SPH_RET(false);
2585 	}
2586 
2587 	for ( uint32 i=0; i<m_iFields; i++ )
2588 		m_dFields[i] = UnpackString ();
2589 
2590 	SafeDeleteArray ( m_dAttrs );
2591 	m_iAttrs = UnpackDword ();
2592 	m_dAttrs = new CSphSEAttr [ m_iAttrs ];
2593 	if ( !m_dAttrs )
2594 	{
2595 		for ( int i=0; i<(int)m_iFields; i++ )
2596 			SafeDeleteArray ( m_dFields[i] );
2597 		SafeDeleteArray ( m_dFields );
2598 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
2599 		SPH_RET(false);
2600 	}
2601 
2602 	for ( uint32 i=0; i<m_iAttrs; i++ )
2603 	{
2604 		m_dAttrs[i].m_sName = UnpackString ();
2605 		m_dAttrs[i].m_uType = UnpackDword ();
2606 		if ( m_bUnpackError ) // m_sName may be null
2607 			break;
2608 
2609 		m_dAttrs[i].m_iField = -1;
2610 		for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
2611 		{
2612 			const char * sTableField = m_pShare->m_sTableField[j];
2613 			const char * sAttrField = m_dAttrs[i].m_sName;
2614 			if ( m_dAttrs[i].m_sName[0]=='@' )
2615 			{
2616 				const char * sAtPrefix = "_sph_";
2617 				if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
2618 					continue;
2619 				sTableField += strlen(sAtPrefix);
2620 				sAttrField++;
2621 			}
2622 
2623 			if ( !strcasecmp ( sAttrField, sTableField ) )
2624 			{
2625 				// we're almost good, but
2626 				// let's enforce that timestamp columns can only receive timestamp attributes
2627 				if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
2628 					m_dAttrs[i].m_iField = j;
2629 				break;
2630 			}
2631 		}
2632 	}
2633 
2634 	m_iMatchesTotal = UnpackDword ();
2635 
2636 	m_bId64 = UnpackDword ();
2637 	if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
2638 	{
2639 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
2640 		SPH_RET(false);
2641 	}
2642 
2643 	// network packet unpacked; build unbound fields map
2644 	SafeDeleteArray ( m_dUnboundFields );
2645 	m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
2646 
2647 	for ( int i=0; i<m_pShare->m_iTableFields; i++ )
2648 	{
2649 		if ( i<SPHINXSE_SYSTEM_COLUMNS )
2650 			m_dUnboundFields[i] = SPH_ATTR_NONE;
2651 
2652 		else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
2653 			m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
2654 
2655 		else
2656 			m_dUnboundFields[i] = SPH_ATTR_INTEGER;
2657 	}
2658 
2659 	for ( uint32 i=0; i<m_iAttrs; i++ )
2660 		if ( m_dAttrs[i].m_iField>=0 )
2661 			m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
2662 
2663 	if ( m_bUnpackError )
2664 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
2665 
2666 	SPH_RET ( !m_bUnpackError );
2667 }
2668 
2669 
UnpackStats(CSphSEStats * pStats)2670 bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
2671 {
2672 	assert ( pStats );
2673 
2674 	char * pCurSave = m_pCur;
2675 	for ( uint i=0; i<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2676 	{
2677 		m_pCur += m_bId64 ? 12 : 8; // skip id+weight
2678 		for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2679 		{
2680 			if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
2681 			{
2682 				// skip MVA list
2683 				uint32 uCount = UnpackDword ();
2684 				m_pCur += uCount*4;
2685 			} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
2686 			{
2687 				uint32 iLen = UnpackDword();
2688 				m_pCur += iLen;
2689 			} else // skip normal value
2690 				m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
2691 		}
2692 	}
2693 
2694 	pStats->m_iMatchesTotal = UnpackDword ();
2695 	pStats->m_iMatchesFound = UnpackDword ();
2696 	pStats->m_iQueryMsec = UnpackDword ();
2697 	pStats->m_iWords = UnpackDword ();
2698 
2699 	if ( m_bUnpackError )
2700 		return false;
2701 
2702 	SafeDeleteArray ( pStats->m_dWords );
2703 	if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
2704 		return false;
2705 	pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
2706 	if ( !pStats->m_dWords )
2707 		return false;
2708 
2709 	for ( int i=0; i<pStats->m_iWords; i++ )
2710 	{
2711 		CSphSEWordStats & tWord = pStats->m_dWords[i];
2712 		tWord.m_sWord = UnpackString ();
2713 		tWord.m_iDocs = UnpackDword ();
2714 		tWord.m_iHits = UnpackDword ();
2715 	}
2716 
2717 	if ( m_bUnpackError )
2718 		return false;
2719 
2720 	m_pCur = pCurSave;
2721 	return true;
2722 }
2723 
2724 
2725 /// condition pushdown implementation, to properly intercept WHERE clauses on my columns
2726 #if MYSQL_VERSION_ID<50610
cond_push(const COND * cond)2727 const COND * ha_sphinx::cond_push ( const COND * cond )
2728 #else
2729 const Item * ha_sphinx::cond_push ( const Item *cond )
2730 #endif
2731 {
2732 	// catch the simplest case: query_column="some text"
2733 	for ( ;; )
2734 	{
2735 		if ( cond->type()!=Item::FUNC_ITEM )
2736 			break;
2737 
2738 		Item_func * condf = (Item_func *)cond;
2739 		if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
2740 			break;
2741 
2742 		// get my tls
2743 		CSphSEThreadTable * pTable = GetTls ();
2744 		if ( !pTable )
2745 			break;
2746 
2747 		Item ** args = condf->arguments();
2748 		if ( !m_pShare->m_bSphinxQL )
2749 		{
2750 			// on non-QL tables, intercept query=value condition for SELECT
2751 			if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::STRING_ITEM ))
2752 				break;
2753 
2754 			Item_field * pField = (Item_field *) args[0];
2755 			if ( pField->field->field_index!=2 ) // FIXME! magic key index
2756 				break;
2757 
2758 			// copy the query, and let know that we intercepted this condition
2759 			Item_string * pString = (Item_string *) args[1];
2760 			pTable->m_bQuery = true;
2761 			strncpy ( pTable->m_sQuery, pString->str_value.c_ptr(), sizeof(pTable->m_sQuery) );
2762 			pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
2763 			pTable->m_pQueryCharset = pString->str_value.charset();
2764 
2765 		} else
2766 		{
2767 			if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::INT_ITEM ))
2768 				break;
2769 
2770 			// on QL tables, intercept id=value condition for DELETE
2771 			Item_field * pField = (Item_field *) args[0];
2772 			if ( pField->field->field_index!=0 ) // FIXME! magic key index
2773 				break;
2774 
2775 			Item_int * pVal = (Item_int *) args[1];
2776 			pTable->m_iCondId = pVal->val_int();
2777 			pTable->m_bCondId = true;
2778 		}
2779 
2780 		// we intercepted this condition
2781 		return NULL;
2782 	}
2783 
2784 	// don't change anything
2785 	return cond;
2786 }
2787 
2788 
2789 /// condition popup
cond_pop()2790 void ha_sphinx::cond_pop ()
2791 {
2792 	CSphSEThreadTable * pTable = GetTls ();
2793 	if ( pTable )
2794 		pTable->m_bQuery = false;
2795 }
2796 
2797 
2798 /// get TLS (maybe allocate it, too)
GetTls()2799 CSphSEThreadTable * ha_sphinx::GetTls()
2800 {
2801 	SPH_ENTER_METHOD()
2802 	// where do we store that pointer in today's version?
2803 	CSphTLS ** ppTls;
2804 #if MYSQL_VERSION_ID>50100
2805 	ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
2806 #else
2807 	ppTls = (CSphTLS**) &current_thd->ha_data[sphinx_hton.slot];
2808 #endif // >50100
2809 
2810 	CSphSEThreadTable * pTable = NULL;
2811 	// allocate if needed
2812 	if ( !*ppTls )
2813 	{
2814 		*ppTls = new CSphTLS ( this );
2815 		pTable = (*ppTls)->m_pHeadTable;
2816 	} else
2817 	{
2818 		pTable = (*ppTls)->m_pHeadTable;
2819 	}
2820 
2821 	while ( pTable && pTable->m_pHandler!=this )
2822 		pTable = pTable->m_pTableNext;
2823 
2824 	if ( !pTable )
2825 	{
2826 		pTable = new CSphSEThreadTable ( this );
2827 		pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
2828 		(*ppTls)->m_pHeadTable = pTable;
2829 	}
2830 
2831 	// errors will be handled by caller
2832 	return pTable;
2833 }
2834 
2835 
2836 // Positions an index cursor to the index specified in the handle. Fetches the
2837 // row if available. If the key value is null, begin at the first key of the
2838 // index.
index_read(byte * buf,const byte * key,uint key_len,enum ha_rkey_function)2839 int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
2840 {
2841 	SPH_ENTER_METHOD();
2842 	char sError[256];
2843 
2844 	// set new data for thd->ha_data, it is used in show_status
2845 	CSphSEThreadTable * pTable = GetTls();
2846 	if ( !pTable )
2847 	{
2848 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
2849 		SPH_RET ( HA_ERR_END_OF_FILE );
2850 	}
2851 	pTable->m_tStats.Reset ();
2852 
2853 	// sphinxql table, just return the key once
2854 	if ( m_pShare->m_bSphinxQL )
2855 	{
2856 		// over and out
2857 		if ( pTable->m_bCondDone )
2858 			SPH_RET ( HA_ERR_END_OF_FILE );
2859 
2860 		// return a value from pushdown, if any
2861 		if ( pTable->m_bCondId )
2862 		{
2863 			table->field[0]->store ( pTable->m_iCondId, 1 );
2864 			pTable->m_bCondDone = true;
2865 			SPH_RET(0);
2866 		}
2867 
2868 		// return a value from key
2869 		longlong iRef = 0;
2870 		if ( key_len==4 )
2871 			iRef = uint4korr ( key );
2872 		else if ( key_len==8 )
2873 			iRef = uint8korr ( key );
2874 		else
2875 		{
2876 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
2877 			SPH_RET ( HA_ERR_END_OF_FILE );
2878 		}
2879 
2880 		table->field[0]->store ( iRef, 1 );
2881 		pTable->m_bCondDone = true;
2882 		SPH_RET(0);
2883 	}
2884 
2885 	// parse query
2886 	if ( pTable->m_bQuery )
2887 	{
2888 		// we have a query from condition pushdown
2889 		m_pCurrentKey = (const byte *) pTable->m_sQuery;
2890 		m_iCurrentKeyLen = strlen(pTable->m_sQuery);
2891 	} else
2892 	{
2893 		// just use the key (might be truncated)
2894 		m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
2895 		m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
2896 		pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
2897 	}
2898 
2899 	CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
2900 	if ( !q.Parse () )
2901 	{
2902 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
2903 		SPH_RET ( HA_ERR_END_OF_FILE );
2904 	}
2905 
2906 	// do connect
2907 	int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
2908 	if ( iSocket<0 )
2909 		SPH_RET ( HA_ERR_END_OF_FILE );
2910 
2911 	// my buffer
2912 	char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
2913 	int iReqLen = q.BuildRequest ( &pBuffer );
2914 
2915 	if ( iReqLen<=0 )
2916 	{
2917 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
2918 		SPH_RET ( HA_ERR_END_OF_FILE );
2919 	}
2920 
2921 	// send request
2922 	::send ( iSocket, pBuffer, iReqLen, 0 );
2923 
2924 	// receive reply
2925 	char sHeader[8];
2926 	int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
2927 	if ( iGot!=sizeof(sHeader) )
2928 	{
2929 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
2930 		SPH_RET ( HA_ERR_END_OF_FILE );
2931 	}
2932 
2933 	short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
2934 	short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
2935 	uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
2936 	SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
2937 		uRespStatus, uRespVersion, uRespLength );
2938 
2939 	SafeDeleteArray ( m_pResponse );
2940 	if ( uRespLength<=SPHINXSE_MAX_ALLOC )
2941 		m_pResponse = new char [ uRespLength+1 ];
2942 
2943 	if ( !m_pResponse )
2944 	{
2945 		my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
2946 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2947 		SPH_RET ( HA_ERR_END_OF_FILE );
2948 	}
2949 
2950 	int iRecvLength = 0;
2951 	while ( iRecvLength<(int)uRespLength )
2952 	{
2953 		int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
2954 		if ( iRecv<0 )
2955 			break;
2956 		iRecvLength += iRecv;
2957 	}
2958 
2959 	::closesocket ( iSocket );
2960 	iSocket = -1;
2961 
2962 	if ( iRecvLength!=(int)uRespLength )
2963 	{
2964 		my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
2965 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2966 		SPH_RET ( HA_ERR_END_OF_FILE );
2967 	}
2968 
2969 	// we'll have a message, at least
2970 	pTable->m_bStats = true;
2971 
2972 	// parse reply
2973 	m_iCurrentPos = 0;
2974 	m_pCur = m_pResponse;
2975 	m_pResponseEnd = m_pResponse + uRespLength;
2976 	m_bUnpackError = false;
2977 
2978 	if ( uRespStatus!=SEARCHD_OK )
2979 	{
2980 		char * sMessage = UnpackString ();
2981 		if ( !sMessage )
2982 		{
2983 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
2984 				uRespStatus, uRespLength );
2985 			SPH_RET ( HA_ERR_END_OF_FILE );
2986 		}
2987 
2988 		strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
2989 		SafeDeleteArray ( sMessage );
2990 
2991 		if ( uRespStatus!=SEARCHD_WARNING )
2992 		{
2993 			my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
2994 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2995 
2996 			pTable->m_tStats.m_bLastError = true;
2997 			SPH_RET ( HA_ERR_END_OF_FILE );
2998 		}
2999 	}
3000 
3001 	if ( !UnpackSchema () )
3002 		SPH_RET ( HA_ERR_END_OF_FILE );
3003 
3004 	if ( !UnpackStats ( &pTable->m_tStats ) )
3005 	{
3006 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
3007 		SPH_RET ( HA_ERR_END_OF_FILE );
3008 	}
3009 
3010 	SPH_RET ( get_rec ( buf, key, key_len ) );
3011 }
3012 
3013 
3014 // Positions an index cursor to the index specified in key. Fetches the
3015 // row if any. This is only used to read whole keys.
index_read_idx(byte *,uint,const byte *,uint,enum ha_rkey_function)3016 int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
3017 {
3018 	SPH_ENTER_METHOD();
3019 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3020 }
3021 
3022 
3023 // Used to read forward through the index.
index_next(byte * buf)3024 int ha_sphinx::index_next ( byte * buf )
3025 {
3026 	SPH_ENTER_METHOD();
3027 	SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
3028 }
3029 
3030 
index_next_same(byte * buf,const byte * key,uint keylen)3031 int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
3032 {
3033 	SPH_ENTER_METHOD();
3034 	SPH_RET ( get_rec ( buf, key, keylen ) );
3035 }
3036 
3037 #ifndef PRIi64
3038 #define PRIi64 "lld"
3039 #endif
3040 
3041 #define INT64_FMT "%" PRIi64
3042 
get_rec(byte * buf,const byte *,uint)3043 int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
3044 {
3045 	SPH_ENTER_METHOD();
3046 
3047 	if ( m_iCurrentPos>=m_iMatchesTotal )
3048 	{
3049 		SafeDeleteArray ( m_pResponse );
3050 		SPH_RET ( HA_ERR_END_OF_FILE );
3051 	}
3052 
3053 	#if MYSQL_VERSION_ID>50100
3054 	my_bitmap_map * org_bitmap = dbug_tmp_use_all_columns ( table, table->write_set );
3055 	#endif
3056 	Field ** field = table->field;
3057 
3058 	// unpack and return the match
3059 	longlong uMatchID = UnpackDword ();
3060 	if ( m_bId64 )
3061 		uMatchID = ( uMatchID<<32 ) + UnpackDword();
3062 	uint32 uMatchWeight = UnpackDword ();
3063 
3064 	field[0]->store ( uMatchID, 1 );
3065 	field[1]->store ( uMatchWeight, 1 );
3066 	field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
3067 
3068 	for ( uint32 i=0; i<m_iAttrs; i++ )
3069 	{
3070 		longlong iValue64 = 0;
3071 		uint32 uValue = UnpackDword ();
3072 		if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
3073 			iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
3074 		if ( m_dAttrs[i].m_iField<0 )
3075 		{
3076 			// skip MVA or String
3077 			if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
3078 			{
3079 				for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3080 					UnpackDword();
3081 			} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
3082 			{
3083 				m_pCur += uValue;
3084 			}
3085 			continue;
3086 		}
3087 
3088 		Field * af = field [ m_dAttrs[i].m_iField ];
3089 		switch ( m_dAttrs[i].m_uType )
3090 		{
3091 			case SPH_ATTR_INTEGER:
3092 			case SPH_ATTR_ORDINAL:
3093 			case SPH_ATTR_BOOL:
3094 				af->store ( uValue, 1 );
3095 				break;
3096 
3097 			case SPH_ATTR_FLOAT:
3098 				af->store ( sphDW2F(uValue) );
3099 				break;
3100 
3101 			case SPH_ATTR_TIMESTAMP:
3102 				if ( af->type()==MYSQL_TYPE_TIMESTAMP )
3103 					longstore ( af->ptr, uValue ); // because store() does not accept timestamps
3104 				else
3105 					af->store ( uValue, 1 );
3106 				break;
3107 
3108 			case SPH_ATTR_BIGINT:
3109 				af->store ( iValue64, 0 );
3110 				break;
3111 
3112 			case SPH_ATTR_STRING:
3113 				if ( !uValue )
3114 					af->store ( "", 0, &my_charset_bin );
3115 				else if ( CheckResponcePtr ( uValue ) )
3116 				{
3117 					af->store ( m_pCur, uValue, &my_charset_bin );
3118 					m_pCur += uValue;
3119 				}
3120 				break;
3121 
3122 			case SPH_ATTR_UINT64SET:
3123 			case SPH_ATTR_UINT32SET :
3124 				if ( uValue<=0 )
3125 				{
3126 					// shortcut, empty MVA set
3127 					af->store ( "", 0, &my_charset_bin );
3128 
3129 				} else
3130 				{
3131 					// convert MVA set to comma-separated string
3132 					char sBuf[1024]; // FIXME! magic size
3133 					char * pCur = sBuf;
3134 
3135 					if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
3136 					{
3137 						for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3138 						{
3139 							uint32 uEntry = UnpackDword ();
3140 							if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
3141 							{
3142 								snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
3143 								while ( *pCur ) pCur++;
3144 								if ( uValue>1 )
3145 									*pCur++ = ','; // non-trailing commas
3146 							}
3147 						}
3148 					} else
3149 					{
3150 						for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
3151 						{
3152 							longlong uEntry = UnpackDword ();
3153 							uEntry = uEntry<<32;
3154 							uEntry += UnpackDword();
3155 							if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
3156 							{
3157 								snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, INT64_FMT, uEntry );
3158 								while ( *pCur ) pCur++;
3159 								if ( uValue>2 )
3160 									*pCur++ = ','; // non-trailing commas
3161 							}
3162 						}
3163 					}
3164 
3165 					af->store ( sBuf, pCur-sBuf, &my_charset_bin );
3166 				}
3167 				break;
3168 
3169 			default:
3170 				my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
3171 				SafeDeleteArray ( m_pResponse );
3172 				SPH_RET ( HA_ERR_END_OF_FILE );
3173 		}
3174 	}
3175 
3176 	if ( m_bUnpackError )
3177 	{
3178 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
3179 		SafeDeleteArray ( m_pResponse );
3180 		SPH_RET ( HA_ERR_END_OF_FILE );
3181 	}
3182 
3183 	// zero out unmapped fields
3184 	for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
3185 		if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
3186 			switch ( m_dUnboundFields[i] )
3187 	{
3188 		case SPH_ATTR_INTEGER:		table->field[i]->store ( 0, 1 ); break;
3189 		case SPH_ATTR_TIMESTAMP:	longstore ( table->field[i]->ptr, 0 ); break;
3190 		default:
3191 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
3192 				"INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
3193 			SafeDeleteArray ( m_pResponse );
3194 			SPH_RET ( HA_ERR_END_OF_FILE );
3195 	}
3196 
3197 	memset ( buf, 0, table->s->null_bytes );
3198 	m_iCurrentPos++;
3199 
3200 	#if MYSQL_VERSION_ID > 50100
3201 	dbug_tmp_restore_column_map ( table->write_set, org_bitmap );
3202 	#endif
3203 
3204 	SPH_RET(0);
3205 }
3206 
3207 
3208 // Used to read backwards through the index.
index_prev(byte *)3209 int ha_sphinx::index_prev ( byte * )
3210 {
3211 	SPH_ENTER_METHOD();
3212 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3213 }
3214 
3215 
3216 // index_first() asks for the first key in the index.
3217 //
3218 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3219 // and sql_select.cc.
index_first(byte *)3220 int ha_sphinx::index_first ( byte * )
3221 {
3222 	SPH_ENTER_METHOD();
3223 	SPH_RET ( HA_ERR_END_OF_FILE );
3224 }
3225 
3226 // index_last() asks for the last key in the index.
3227 //
3228 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3229 // and sql_select.cc.
index_last(byte *)3230 int ha_sphinx::index_last ( byte * )
3231 {
3232 	SPH_ENTER_METHOD();
3233 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3234 }
3235 
3236 
rnd_init(bool)3237 int ha_sphinx::rnd_init ( bool )
3238 {
3239 	SPH_ENTER_METHOD();
3240 	SPH_RET(0);
3241 }
3242 
3243 
rnd_end()3244 int ha_sphinx::rnd_end()
3245 {
3246 	SPH_ENTER_METHOD();
3247 	SPH_RET(0);
3248 }
3249 
3250 
rnd_next(byte *)3251 int ha_sphinx::rnd_next ( byte * )
3252 {
3253 	SPH_ENTER_METHOD();
3254 	SPH_RET ( HA_ERR_END_OF_FILE );
3255 }
3256 
3257 
position(const byte *)3258 void ha_sphinx::position ( const byte * )
3259 {
3260 	SPH_ENTER_METHOD();
3261 	SPH_VOID_RET();
3262 }
3263 
3264 
3265 // This is like rnd_next, but you are given a position to use
3266 // to determine the row. The position will be of the type that you stored in
3267 // ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
3268 // or position you saved when position() was called.
3269 // Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
rnd_pos(byte *,byte *)3270 int ha_sphinx::rnd_pos ( byte *, byte * )
3271 {
3272 	SPH_ENTER_METHOD();
3273 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3274 }
3275 
3276 
3277 #if MYSQL_VERSION_ID>=50030
info(uint)3278 int ha_sphinx::info ( uint )
3279 #else
3280 void ha_sphinx::info ( uint )
3281 #endif
3282 {
3283 	SPH_ENTER_METHOD();
3284 
3285 	if ( table->s->keys>0 )
3286 		table->key_info[0].rec_per_key[0] = 1;
3287 
3288 	#if MYSQL_VERSION_ID>50100
3289 	stats.records = 20;
3290 	#else
3291 	records = 20;
3292 	#endif
3293 
3294 #if MYSQL_VERSION_ID>=50030
3295 	SPH_RET(0);
3296 #else
3297 	SPH_VOID_RET();
3298 #endif
3299 }
3300 
3301 
reset()3302 int ha_sphinx::reset ()
3303 {
3304 	SPH_ENTER_METHOD();
3305 	CSphSEThreadTable * pTable = GetTls ();
3306 	if ( pTable )
3307 		pTable->m_bQuery = false;
3308 	SPH_RET(0);
3309 }
3310 
3311 
delete_all_rows()3312 int ha_sphinx::delete_all_rows()
3313 {
3314 	SPH_ENTER_METHOD();
3315 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3316 }
3317 
3318 
3319 // First you should go read the section "locking functions for mysql" in
3320 // lock.cc to understand this.
3321 // This create a lock on the table. If you are implementing a storage engine
3322 // that can handle transacations look at ha_berkely.cc to see how you will
3323 // want to go about doing this. Otherwise you should consider calling flock()
3324 // here.
3325 //
3326 // Called from lock.cc by lock_external() and unlock_external(). Also called
3327 // from sql_table.cc by copy_data_between_tables().
external_lock(THD *,int)3328 int ha_sphinx::external_lock ( THD *, int )
3329 {
3330 	SPH_ENTER_METHOD();
3331 	SPH_RET(0);
3332 }
3333 
3334 
store_lock(THD *,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3335 THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
3336 	enum thr_lock_type lock_type )
3337 {
3338 	SPH_ENTER_METHOD();
3339 
3340 	if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
3341 		m_tLock.type = lock_type;
3342 
3343 	*to++ = &m_tLock;
3344 	SPH_RET(to);
3345 }
3346 
3347 
delete_table(const char *)3348 int ha_sphinx::delete_table ( const char * )
3349 {
3350 	SPH_ENTER_METHOD();
3351 	SPH_RET(0);
3352 }
3353 
3354 
3355 // Renames a table from one name to another from alter table call.
3356 //
3357 // If you do not implement this, the default rename_table() is called from
3358 // handler.cc and it will delete all files with the file extentions returned
3359 // by bas_ext().
3360 //
3361 // Called from sql_table.cc by mysql_rename_table().
rename_table(const char *,const char *)3362 int ha_sphinx::rename_table ( const char *, const char * )
3363 {
3364 	SPH_ENTER_METHOD();
3365 	SPH_RET(0);
3366 }
3367 
3368 
3369 // Given a starting key, and an ending key estimate the number of rows that
3370 // will exist between the two. end_key may be empty which in case determine
3371 // if start_key matches any rows.
3372 //
3373 // Called from opt_range.cc by check_quick_keys().
records_in_range(uint,key_range *,key_range *)3374 ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
3375 {
3376 	SPH_ENTER_METHOD();
3377 	SPH_RET(3); // low number to force index usage
3378 }
3379 
3380 #if MYSQL_VERSION_ID < 50610
3381 #define user_defined_key_parts key_parts
3382 #endif
3383 
3384 // create() is called to create a database. The variable name will have the name
3385 // of the table. When create() is called you do not need to worry about opening
3386 // the table. Also, the FRM file will have already been created so adjusting
3387 // create_info will not do you any good. You can overwrite the frm file at this
3388 // point if you wish to change the table definition, but there are no methods
3389 // currently provided for doing that.
3390 //
3391 // Called from handle.cc by ha_create_table().
create(const char * name,TABLE * table,HA_CREATE_INFO *)3392 int ha_sphinx::create ( const char * name, TABLE * table, HA_CREATE_INFO * )
3393 {
3394 	SPH_ENTER_METHOD();
3395 	char sError[256];
3396 
3397 	CSphSEShare tInfo;
3398 	if ( !ParseUrl ( &tInfo, table, true ) )
3399 		SPH_RET(-1);
3400 
3401 	// check SphinxAPI table
3402 	for ( ; !tInfo.m_bSphinxQL; )
3403 	{
3404 		// check system fields (count and types)
3405 		if ( table->s->fields<SPHINXSE_SYSTEM_COLUMNS )
3406 		{
3407 			my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
3408 				name, SPHINXSE_SYSTEM_COLUMNS );
3409 			break;
3410 		}
3411 
3412 		if ( !IsIDField ( table->field[0] ) )
3413 		{
3414 			my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
3415 			break;
3416 		}
3417 
3418 		if ( !IsIntegerFieldType ( table->field[1]->type() ) )
3419 		{
3420 			my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
3421 			break;
3422 		}
3423 
3424 		enum_field_types f2 = table->field[2]->type();
3425 		if ( f2!=MYSQL_TYPE_VARCHAR
3426 			&& f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
3427 		{
3428 			my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
3429 			break;
3430 		}
3431 
3432 		// check attributes
3433 		int i;
3434 		for ( i=3; i<(int)table->s->fields; i++ )
3435 		{
3436 			enum_field_types eType = table->field[i]->type();
3437 			if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3438 			{
3439 				my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
3440 					name, i+1, table->field[i]->field_name );
3441 				break;
3442 			}
3443 		}
3444 
3445 		if ( i!=(int)table->s->fields )
3446 			break;
3447 
3448 		// check index
3449 		if (
3450 			table->s->keys!=1 ||
3451 			table->key_info[0].user_defined_key_parts!=1 ||
3452 			strcasecmp ( table->key_info[0].key_part[0].field->field_name, table->field[2]->field_name ) )
3453 		{
3454 			my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
3455 				name, table->field[2]->field_name );
3456 			break;
3457 		}
3458 
3459 		// all good
3460 		sError[0] = '\0';
3461 		break;
3462 	}
3463 
3464 	// check SphinxQL table
3465 	for ( ; tInfo.m_bSphinxQL; )
3466 	{
3467 		sError[0] = '\0';
3468 
3469 		// check that 1st column is id, is of int type, and has an index
3470 		if ( strcmp ( table->field[0]->field_name, "id" ) )
3471 		{
3472 			my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
3473 			break;
3474 		}
3475 
3476 		if ( !IsIDField ( table->field[0] ) )
3477 		{
3478 			my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
3479 			break;
3480 		}
3481 
3482 		// check index
3483 		if (
3484 			table->s->keys!=1 ||
3485 			table->key_info[0].user_defined_key_parts!=1 ||
3486 			strcasecmp ( table->key_info[0].key_part[0].field->field_name, "id" ) )
3487 		{
3488 			my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
3489 			break;
3490 		}
3491 
3492 		// check column types
3493 		for ( int i=1; i<(int)table->s->fields; i++ )
3494 		{
3495 			enum_field_types eType = table->field[i]->type();
3496 			if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3497 			{
3498 				my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
3499 					name, i+1, table->field[i]->field_name );
3500 				break;
3501 			}
3502 		}
3503 		if ( sError[0] )
3504 			break;
3505 
3506 		// all good
3507 		break;
3508 	}
3509 
3510 	// report and bail
3511 	if ( sError[0] )
3512 	{
3513 		my_error ( ER_CANT_CREATE_TABLE, MYF(0), sError, -1 );
3514 		SPH_RET(-1);
3515 	}
3516 
3517 	SPH_RET(0);
3518 }
3519 
3520 // show functions
3521 
3522 #if MYSQL_VERSION_ID<50100
3523 #define SHOW_VAR_FUNC_BUFF_SIZE 1024
3524 #endif
3525 
sphinx_get_stats(THD * thd,SHOW_VAR * out)3526 CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
3527 {
3528 #if MYSQL_VERSION_ID>50100
3529 	if ( sphinx_hton_ptr )
3530 	{
3531 		CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3532 
3533 		if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3534 			return &pTls->m_pHeadTable->m_tStats;
3535 	}
3536 #else
3537 	CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3538 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3539 		return &pTls->m_pHeadTable->m_tStats;
3540 #endif
3541 
3542 	out->type = SHOW_CHAR;
3543 	out->value = "";
3544 	return 0;
3545 }
3546 
sphinx_showfunc_total(THD * thd,SHOW_VAR * out,char *)3547 int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
3548 {
3549 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3550 	if ( pStats )
3551 	{
3552 		out->type = SHOW_INT;
3553 		out->value = (char *) &pStats->m_iMatchesTotal;
3554 	}
3555 	return 0;
3556 }
3557 
sphinx_showfunc_total_found(THD * thd,SHOW_VAR * out,char *)3558 int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
3559 {
3560 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3561 	if ( pStats )
3562 	{
3563 		out->type = SHOW_INT;
3564 		out->value = (char *) &pStats->m_iMatchesFound;
3565 	}
3566 	return 0;
3567 }
3568 
sphinx_showfunc_time(THD * thd,SHOW_VAR * out,char *)3569 int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
3570 {
3571 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3572 	if ( pStats )
3573 	{
3574 		out->type = SHOW_INT;
3575 		out->value = (char *) &pStats->m_iQueryMsec;
3576 	}
3577 	return 0;
3578 }
3579 
sphinx_showfunc_word_count(THD * thd,SHOW_VAR * out,char *)3580 int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
3581 {
3582 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3583 	if ( pStats )
3584 	{
3585 		out->type = SHOW_INT;
3586 		out->value = (char *) &pStats->m_iWords;
3587 	}
3588 	return 0;
3589 }
3590 
sphinx_showfunc_words(THD * thd,SHOW_VAR * out,char * sBuffer)3591 int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
3592 {
3593 #if MYSQL_VERSION_ID>50100
3594 	if ( sphinx_hton_ptr )
3595 	{
3596 		CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3597 #else
3598 	{
3599 		CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3600 #endif
3601 		if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3602 		{
3603 			CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
3604 			if ( pStats && pStats->m_iWords )
3605 			{
3606 				uint uBuffLen = 0;
3607 
3608 				out->type = SHOW_CHAR;
3609 				out->value = sBuffer;
3610 
3611 				// the following is partially based on code in sphinx_show_status()
3612 				sBuffer[0] = 0;
3613 				for ( int i=0; i<pStats->m_iWords; i++ )
3614 				{
3615 					CSphSEWordStats & tWord = pStats->m_dWords[i];
3616 					uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
3617 						tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
3618 				}
3619 
3620 				if ( uBuffLen > 0 )
3621 				{
3622 					// trim last space
3623 					sBuffer [ --uBuffLen ] = 0;
3624 
3625 					if ( pTls->m_pHeadTable->m_pQueryCharset )
3626 					{
3627 						// String::c_ptr() will nul-terminate the buffer.
3628 						//
3629 						// NOTE: It's not entirely clear whether this conversion is necessary at all.
3630 
3631 						String sConvert;
3632 						uint iErrors;
3633 						sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
3634 						memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
3635 					}
3636 				}
3637 
3638 				return 0;
3639 			}
3640 		}
3641 	}
3642 
3643 	out->type = SHOW_CHAR;
3644 	out->value = "";
3645 	return 0;
3646 }
3647 
3648 int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
3649 {
3650 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3651 	if ( pStats && pStats->m_bLastError )
3652 	{
3653 		out->type = SHOW_CHAR;
3654 		out->value = pStats->m_sLastMessage;
3655 	}
3656 	return 0;
3657 }
3658 
3659 #if MYSQL_VERSION_ID>50100
3660 struct st_mysql_storage_engine sphinx_storage_engine =
3661 {
3662 	MYSQL_HANDLERTON_INTERFACE_VERSION
3663 };
3664 
3665 struct st_mysql_show_var sphinx_status_vars[] =
3666 {
3667 	{"sphinx_total",		(char *)sphinx_showfunc_total,			SHOW_FUNC},
3668 	{"sphinx_total_found",	(char *)sphinx_showfunc_total_found,	SHOW_FUNC},
3669 	{"sphinx_time",			(char *)sphinx_showfunc_time,			SHOW_FUNC},
3670 	{"sphinx_word_count",	(char *)sphinx_showfunc_word_count,		SHOW_FUNC},
3671 	{"sphinx_words",		(char *)sphinx_showfunc_words,			SHOW_FUNC},
3672 	{"sphinx_error",		(char *)sphinx_showfunc_error,			SHOW_FUNC},
3673 	{0, 0, (enum_mysql_show_type)0}
3674 };
3675 
3676 
3677 mysql_declare_plugin(sphinx)
3678 {
3679 	MYSQL_STORAGE_ENGINE_PLUGIN,
3680 	&sphinx_storage_engine,
3681 	sphinx_hton_name,
3682 	"Sphinx developers",
3683 	sphinx_hton_comment,
3684 	PLUGIN_LICENSE_GPL,
3685 	sphinx_init_func, // Plugin Init
3686 	sphinx_done_func, // Plugin Deinit
3687 	0x0001, // 0.1
3688 	sphinx_status_vars,
3689 	NULL,
3690 	NULL
3691 }
3692 mysql_declare_plugin_end;
3693 
3694 #endif // >50100
3695 
3696 //
3697 // $Id$
3698 //
3699