1 //
2 // $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
3 //
4 
5 //
6 // Copyright (c) 2001-2014, Andrew Aksyonoff
7 // Copyright (c) 2008-2014, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15 
16 #ifdef USE_PRAGMA_IMPLEMENTATION
17 #pragma implementation // gcc: Class implementation
18 #endif
19 
20 #if defined(_MSC_VER) && _MSC_VER>=1400
21 #define _CRT_SECURE_NO_DEPRECATE 1
22 #define _CRT_NONSTDC_NO_DEPRECATE 1
23 #endif
24 
25 #include <my_global.h>
26 #include <mysql_version.h>
27 
28 #if MYSQL_VERSION_ID>=50515
29 #include "sql_class.h"
30 #include "sql_array.h"
31 #elif MYSQL_VERSION_ID>50100
32 #include "mysql_priv.h"
33 #include <mysql/plugin.h>
34 #else
35 #include "../mysql_priv.h"
36 #endif
37 
38 #include <mysys_err.h>
39 #include <my_sys.h>
40 #include <mysql.h> // include client for INSERT table (sort of redoing federated..)
41 
42 #ifndef __WIN__
43 	// UNIX-specific
44 	#include <my_net.h>
45 	#include <netdb.h>
46 	#include <sys/un.h>
47 
48 	#define	RECV_FLAGS	MSG_WAITALL
49 
50 	#define sphSockClose(_sock)	::close(_sock)
51 #else
52 	// Windows-specific
53 	#include <io.h>
54 	#define snprintf	_snprintf
55 
56 	#define	RECV_FLAGS	0
57 
58 	#define sphSockClose(_sock)	::closesocket(_sock)
59 #endif
60 
61 #include <ctype.h>
62 #include "ha_sphinx.h"
63 
64 #ifndef MSG_WAITALL
65 #define MSG_WAITALL 0
66 #endif
67 
68 #if defined(_MSC_VER) && _MSC_VER>=1400
69 #pragma warning(push,4)
70 #endif
71 
72 /////////////////////////////////////////////////////////////////////////////
73 
74 /// there might be issues with min() on different platforms (eg. Gentoo, they say)
75 #define Min(a,b) ((a)<(b)?(a):(b))
76 
77 /// unaligned RAM accesses are forbidden on SPARC
78 #if defined(sparc) || defined(__sparc__)
79 #define UNALIGNED_RAM_ACCESS 0
80 #else
81 #define UNALIGNED_RAM_ACCESS 1
82 #endif
83 
84 
85 #if UNALIGNED_RAM_ACCESS
86 
87 /// pass-through wrapper
sphUnalignedRead(const T & tRef)88 template < typename T > inline T sphUnalignedRead ( const T & tRef )
89 {
90 	return tRef;
91 }
92 
93 /// pass-through wrapper
sphUnalignedWrite(void * pPtr,const T & tVal)94 template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
95 {
96 	*(T*)pPtr = tVal;
97 }
98 
99 #else
100 
101 /// unaligned read wrapper for some architectures (eg. SPARC)
102 template < typename T >
sphUnalignedRead(const T & tRef)103 inline T sphUnalignedRead ( const T & tRef )
104 {
105 	T uTmp;
106 	byte * pSrc = (byte *) &tRef;
107 	byte * pDst = (byte *) &uTmp;
108 	for ( int i=0; i<(int)sizeof(T); i++ )
109 		*pDst++ = *pSrc++;
110 	return uTmp;
111 }
112 
113 /// unaligned write wrapper for some architectures (eg. SPARC)
114 template < typename T >
sphUnalignedWrite(void * pPtr,const T & tVal)115 void sphUnalignedWrite ( void * pPtr, const T & tVal )
116 {
117 	byte * pDst = (byte *) pPtr;
118 	byte * pSrc = (byte *) &tVal;
119 	for ( int i=0; i<(int)sizeof(T); i++ )
120 		*pDst++ = *pSrc++;
121 }
122 
123 #endif
124 
125 #if MYSQL_VERSION_ID>=50515
126 
127 #define sphinx_hash_init my_hash_init
128 #define sphinx_hash_free my_hash_free
129 #define sphinx_hash_search my_hash_search
130 #define sphinx_hash_delete my_hash_delete
131 
132 #else
133 
134 #define sphinx_hash_init hash_init
135 #define sphinx_hash_free hash_free
136 #define sphinx_hash_search hash_search
137 #define sphinx_hash_delete hash_delete
138 
139 #endif
140 
141 /////////////////////////////////////////////////////////////////////////////
142 
143 // FIXME! make this all dynamic
144 #define SPHINXSE_MAX_FILTERS		32
145 
146 #define SPHINXAPI_DEFAULT_HOST		"127.0.0.1"
147 #define SPHINXAPI_DEFAULT_PORT		9312
148 #define SPHINXAPI_DEFAULT_INDEX		"*"
149 
150 #define SPHINXQL_DEFAULT_PORT		9306
151 
152 #define SPHINXSE_SYSTEM_COLUMNS		3
153 
154 #define SPHINXSE_MAX_ALLOC			(16*1024*1024)
155 #define SPHINXSE_MAX_KEYWORDSTATS	4096
156 
157 #define SPHINXSE_VERSION			"2.2.6-release"
158 
159 // FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
160 // cut-n-paste is somewhat simpler that adding dependencies however..
161 
162 enum
163 {
164 	SPHINX_SEARCHD_PROTO	= 1,
165 	SEARCHD_COMMAND_SEARCH	= 0,
166 	VER_COMMAND_SEARCH		= 0x119,
167 };
168 
169 /// search query sorting orders
170 enum ESphSortOrder
171 {
172 	SPH_SORT_RELEVANCE		= 0,	///< sort by document relevance desc, then by date
173 	SPH_SORT_ATTR_DESC		= 1,	///< sort by document date desc, then by relevance desc
174 	SPH_SORT_ATTR_ASC		= 2,	///< sort by document date asc, then by relevance desc
175 	SPH_SORT_TIME_SEGMENTS	= 3,	///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
176 	SPH_SORT_EXTENDED		= 4,	///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
177 	SPH_SORT_EXPR			= 5,	///< sort by expression
178 
179 	SPH_SORT_TOTAL
180 };
181 
182 /// search query matching mode
183 enum ESphMatchMode
184 {
185 	SPH_MATCH_ALL = 0,			///< match all query words
186 	SPH_MATCH_ANY,				///< match any query word
187 	SPH_MATCH_PHRASE,			///< match this exact phrase
188 	SPH_MATCH_BOOLEAN,			///< match this boolean query
189 	SPH_MATCH_EXTENDED,			///< match this extended query
190 	SPH_MATCH_FULLSCAN,			///< match all document IDs w/o fulltext query, apply filters
191 	SPH_MATCH_EXTENDED2,		///< extended engine V2
192 
193 	SPH_MATCH_TOTAL
194 };
195 
196 /// search query relevance ranking mode
197 enum ESphRankMode
198 {
199 	SPH_RANK_PROXIMITY_BM25		= 0,	///< default mode, phrase proximity major factor and BM25 minor one
200 	SPH_RANK_BM25				= 1,	///< statistical mode, BM25 ranking only (faster but worse quality)
201 	SPH_RANK_NONE				= 2,	///< no ranking, all matches get a weight of 1
202 	SPH_RANK_WORDCOUNT			= 3,	///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
203 	SPH_RANK_PROXIMITY			= 4,	///< phrase proximity
204 	SPH_RANK_MATCHANY			= 5,	///< emulate old match-any weighting
205 	SPH_RANK_FIELDMASK			= 6,	///< sets bits where there were matches
206 	SPH_RANK_SPH04				= 7,	///< codename SPH04, phrase proximity + bm25 + head/exact boost
207 	SPH_RANK_EXPR				= 8,	///< expression based ranker
208 
209 	SPH_RANK_TOTAL,
210 	SPH_RANK_DEFAULT			= SPH_RANK_PROXIMITY_BM25
211 };
212 
213 /// search query grouping mode
214 enum ESphGroupBy
215 {
216 	SPH_GROUPBY_DAY		= 0,	///< group by day
217 	SPH_GROUPBY_WEEK	= 1,	///< group by week
218 	SPH_GROUPBY_MONTH	= 2,	///< group by month
219 	SPH_GROUPBY_YEAR	= 3,	///< group by year
220 	SPH_GROUPBY_ATTR	= 4,	///< group by attribute value
221 	SPH_GROUPBY_ATTRPAIR	= 5,	///< group by sequential attrs pair (rendered redundant by 64bit attrs support; removed)
222 	SPH_GROUPBY_MULTIPLE	= 6 	///< group by on multiple attribute values
223 };
224 
225 /// known attribute types
226 enum
227 {
228 	SPH_ATTR_NONE		= 0,			///< not an attribute at all
229 	SPH_ATTR_INTEGER	= 1,			///< this attr is just an integer
230 	SPH_ATTR_TIMESTAMP	= 2,			///< this attr is a timestamp
231 	SPH_ATTR_ORDINAL	= 3,			///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
232 	SPH_ATTR_BOOL		= 4,			///< this attr is a boolean bit field
233 	SPH_ATTR_FLOAT		= 5,
234 	SPH_ATTR_BIGINT		= 6,
235 	SPH_ATTR_STRING		= 7,			///< string (binary; in-memory)
236 
237 	SPH_ATTR_UINT32SET		= 0x40000001UL,	///< this attr is multiple int32 values (0 or more)
238 	SPH_ATTR_UINT64SET		= 0x40000002UL	///< this attr is multiple int64 values (0 or more)
239 };
240 
241 /// known answers
242 enum
243 {
244 	SEARCHD_OK		= 0,	///< general success, command-specific reply follows
245 	SEARCHD_ERROR	= 1,	///< general failure, error message follows
246 	SEARCHD_RETRY	= 2,	///< temporary failure, error message follows, client should retry later
247 	SEARCHD_WARNING	= 3		///< general success, warning message and command-specific reply follow
248 };
249 
250 //////////////////////////////////////////////////////////////////////////////
251 
252 #define SPHINX_DEBUG_OUTPUT		0
253 #define SPHINX_DEBUG_CALLS		0
254 
255 #include <stdarg.h>
256 
257 #if SPHINX_DEBUG_OUTPUT
SPH_DEBUG(const char * format,...)258 inline void SPH_DEBUG ( const char * format, ... )
259 {
260 	va_list ap;
261 	va_start ( ap, format );
262 	fprintf ( stderr, "SphinxSE: " );
263 	vfprintf ( stderr, format, ap );
264 	fprintf ( stderr, "\n" );
265 	va_end ( ap );
266 }
267 #else
SPH_DEBUG(const char *,...)268 inline void SPH_DEBUG ( const char *, ... ) {}
269 #endif
270 
271 #if SPHINX_DEBUG_CALLS
272 
273 #define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
274 #define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
275 #define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
276 #define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
277 
278 #else
279 
280 #define SPH_ENTER_FUNC()
281 #define SPH_ENTER_METHOD()
282 #define SPH_RET(_arg) { return(_arg); }
283 #define SPH_VOID_RET() { return; }
284 
285 #endif
286 
287 
288 #define SafeDelete(_arg)		{ delete ( _arg ); (_arg) = NULL; }
289 #define SafeDeleteArray(_arg)	{ if ( _arg ) { delete [] ( _arg );	(_arg) = NULL; } }
290 
291 //////////////////////////////////////////////////////////////////////////////
292 
293 /// per-table structure that will be shared among all open Sphinx SE handlers
294 struct CSphSEShare
295 {
296 	pthread_mutex_t	m_tMutex;
297 	THR_LOCK		m_tLock;
298 
299 	char *			m_sTable;
300 	char *			m_sScheme;		///< our connection string
301 	char *			m_sHost;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
302 	char *			m_sSocket;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
303 	char *			m_sIndex;		///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
304 	ushort			m_iPort;
305 	bool			m_bSphinxQL;	///< is this read-only SphinxAPI table, or write-only SphinxQL table?
306 	uint			m_iTableNameLen;
307 	uint			m_iUseCount;
308 #if MYSQL_VERSION_ID<50610
309 	CHARSET_INFO *	m_pTableQueryCharset;
310 #else
311 	const CHARSET_INFO *	m_pTableQueryCharset;
312 #endif
313 
314 	int					m_iTableFields;
315 	char **				m_sTableField;
316 	enum_field_types *	m_eTableFieldType;
317 
CSphSEShareCSphSEShare318 	CSphSEShare ()
319 		: m_sTable ( NULL )
320 		, m_sScheme ( NULL )
321 		, m_sHost ( NULL )
322 		, m_sSocket ( NULL )
323 		, m_sIndex ( NULL )
324 		, m_iPort ( 0 )
325 		, m_bSphinxQL ( false )
326 		, m_iTableNameLen ( 0 )
327 		, m_iUseCount ( 1 )
328 		, m_pTableQueryCharset ( NULL )
329 
330 		, m_iTableFields ( 0 )
331 		, m_sTableField ( NULL )
332 		, m_eTableFieldType ( NULL )
333 	{
334 		thr_lock_init ( &m_tLock );
335 		pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
336 	}
337 
~CSphSEShareCSphSEShare338 	~CSphSEShare ()
339 	{
340 		pthread_mutex_destroy ( &m_tMutex );
341 		thr_lock_delete ( &m_tLock );
342 
343 		SafeDeleteArray ( m_sTable );
344 		SafeDeleteArray ( m_sScheme );
345 		ResetTable ();
346 	}
347 
ResetTableCSphSEShare348 	void ResetTable ()
349 	{
350 		for ( int i=0; i<m_iTableFields; i++ )
351 			SafeDeleteArray ( m_sTableField[i] );
352 		SafeDeleteArray ( m_sTableField );
353 		SafeDeleteArray ( m_eTableFieldType );
354 	}
355 };
356 
357 /// schema attribute
358 struct CSphSEAttr
359 {
360 	char *			m_sName;		///< attribute name (received from Sphinx)
361 	uint32			m_uType;		///< attribute type (received from Sphinx)
362 	int				m_iField;		///< field index in current table (-1 if none)
363 
CSphSEAttrCSphSEAttr364 	CSphSEAttr()
365 		: m_sName ( NULL )
366 		, m_uType ( SPH_ATTR_NONE )
367 		, m_iField ( -1 )
368 	{}
369 
~CSphSEAttrCSphSEAttr370 	~CSphSEAttr ()
371 	{
372 		SafeDeleteArray ( m_sName );
373 	}
374 };
375 
376 /// word stats
377 struct CSphSEWordStats
378 {
379 	char *			m_sWord;
380 	int				m_iDocs;
381 	int				m_iHits;
382 
CSphSEWordStatsCSphSEWordStats383 	CSphSEWordStats ()
384 		: m_sWord ( NULL )
385 		, m_iDocs ( 0 )
386 		, m_iHits ( 0 )
387 	{}
388 
~CSphSEWordStatsCSphSEWordStats389 	~CSphSEWordStats ()
390 	{
391 		SafeDeleteArray ( m_sWord );
392 	}
393 };
394 
395 /// request stats
396 struct CSphSEStats
397 {
398 public:
399 	int					m_iMatchesTotal;
400 	int					m_iMatchesFound;
401 	int					m_iQueryMsec;
402 	int					m_iWords;
403 	CSphSEWordStats *	m_dWords;
404 	bool				m_bLastError;
405 	char				m_sLastMessage[1024];
406 
CSphSEStatsCSphSEStats407 	CSphSEStats()
408 		: m_dWords ( NULL )
409 	{
410 		Reset ();
411 	}
412 
ResetCSphSEStats413 	void Reset ()
414 	{
415 		m_iMatchesTotal = 0;
416 		m_iMatchesFound = 0;
417 		m_iQueryMsec = 0;
418 		m_iWords = 0;
419 		m_bLastError = false;
420 		m_sLastMessage[0] = '\0';
421                 SafeDeleteArray ( m_dWords );
422 	}
423 
~CSphSEStatsCSphSEStats424 	~CSphSEStats()
425 	{
426 		SafeDeleteArray ( m_dWords );
427 	}
428 };
429 
430 /// thread local storage
431 struct CSphSEThreadTable
432 {
433 	static const int	MAX_QUERY_LEN	= 262144; // 256k should be enough, right?
434 
435 	bool				m_bStats;
436 	CSphSEStats			m_tStats;
437 
438 	bool				m_bQuery;
439 	char				m_sQuery[MAX_QUERY_LEN];
440 
441 #if MYSQL_VERSION_ID<50610
442 	CHARSET_INFO *		m_pQueryCharset;
443 #else
444 	const CHARSET_INFO *		m_pQueryCharset;
445 #endif
446 
447 	bool				m_bReplace;		///< are we doing an INSERT or REPLACE
448 
449 	bool				m_bCondId;		///< got a value from condition pushdown
450 	longlong			m_iCondId;		///< value acquired from id=value condition pushdown
451 	bool				m_bCondDone;	///< index_read() is now over
452 
453 	const ha_sphinx *	m_pHandler;
454 	CSphSEThreadTable *	m_pTableNext;
455 
CSphSEThreadTableCSphSEThreadTable456 	CSphSEThreadTable ( const ha_sphinx * pHandler )
457 		: m_bStats ( false )
458 		, m_bQuery ( false )
459 		, m_pQueryCharset ( NULL )
460 		, m_bReplace ( false )
461 		, m_bCondId ( false )
462 		, m_iCondId ( 0 )
463 		, m_bCondDone ( false )
464 		, m_pHandler ( pHandler )
465 		, m_pTableNext ( NULL )
466 	{}
467 };
468 
469 
470 struct CSphTLS
471 {
472 	CSphSEThreadTable *	m_pHeadTable;
473 
CSphTLSCSphTLS474 	explicit CSphTLS ( const ha_sphinx * pHandler )
475 	{
476 		m_pHeadTable = new CSphSEThreadTable ( pHandler );
477 	}
478 
~CSphTLSCSphTLS479 	~CSphTLS()
480 	{
481 		CSphSEThreadTable * pCur = m_pHeadTable;
482 		while ( pCur )
483 		{
484 			CSphSEThreadTable * pNext = pCur->m_pTableNext;
485 			SafeDelete ( pCur );
486 			pCur = pNext;
487 		}
488 	}
489 };
490 
491 
492 /// filter types
493 enum ESphFilter
494 {
495 	SPH_FILTER_VALUES		= 0,	///< filter by integer values set
496 	SPH_FILTER_RANGE		= 1,	///< filter by integer range
497 	SPH_FILTER_FLOATRANGE	= 2		///< filter by float range
498 };
499 
500 
501 /// search query filter
502 struct CSphSEFilter
503 {
504 public:
505 	ESphFilter		m_eType;
506 	char *			m_sAttrName;
507 	longlong		m_uMinValue;
508 	longlong		m_uMaxValue;
509 	float			m_fMinValue;
510 	float			m_fMaxValue;
511 	int				m_iValues;
512 	longlong *		m_pValues;
513 	int				m_bExclude;
514 
515 public:
CSphSEFilterCSphSEFilter516 	CSphSEFilter ()
517 		: m_eType ( SPH_FILTER_VALUES )
518 		, m_sAttrName ( NULL )
519 		, m_uMinValue ( 0 )
520 		, m_uMaxValue ( UINT_MAX )
521 		, m_fMinValue ( 0.0f )
522 		, m_fMaxValue ( 0.0f )
523 		, m_iValues ( 0 )
524 		, m_pValues ( NULL )
525 		, m_bExclude ( 0 )
526 	{
527 	}
528 
~CSphSEFilterCSphSEFilter529 	~CSphSEFilter ()
530 	{
531 		SafeDeleteArray ( m_pValues );
532 	}
533 };
534 
535 
536 /// float vs dword conversion
sphF2DW(float f)537 inline uint32 sphF2DW ( float f )	{ union { float f; uint32 d; } u; u.f = f; return u.d; }
538 
539 /// dword vs float conversion
sphDW2F(uint32 d)540 inline float sphDW2F ( uint32 d )	{ union { float f; uint32 d; } u; u.d = d; return u.f; }
541 
542 
543 /// client-side search query
544 struct CSphSEQuery
545 {
546 public:
547 	const char *	m_sHost;
548 	int				m_iPort;
549 
550 private:
551 	char *			m_sQueryBuffer;
552 
553 	const char *	m_sIndex;
554 	int				m_iOffset;
555 	int				m_iLimit;
556 
557 	bool			m_bQuery;
558         const char *		m_sQuery;
559 	uint32 *		m_pWeights;
560 	int				m_iWeights;
561 	ESphMatchMode	m_eMode;
562 	ESphRankMode	m_eRanker;
563 	char *			m_sRankExpr;
564 	ESphSortOrder	m_eSort;
565 	const char *			m_sSortBy;
566 	int				m_iMaxMatches;
567 	int				m_iMaxQueryTime;
568 	uint32			m_iMinID;
569 	uint32			m_iMaxID;
570 
571 	int				m_iFilters;
572 	CSphSEFilter	m_dFilters[SPHINXSE_MAX_FILTERS];
573 
574 	ESphGroupBy		m_eGroupFunc;
575 	const char *			m_sGroupBy;
576 	const char *		m_sGroupSortBy;
577 	int				m_iCutoff;
578 	int				m_iRetryCount;
579 	int				m_iRetryDelay;
580         const char *		m_sGroupDistinct;							///< points to query buffer; do NOT delete
581 	int				m_iIndexWeights;
582 	char *			m_sIndexWeight[SPHINXSE_MAX_FILTERS];		///< points to query buffer; do NOT delete
583 	int				m_iIndexWeight[SPHINXSE_MAX_FILTERS];
584 	int				m_iFieldWeights;
585 	char *			m_sFieldWeight[SPHINXSE_MAX_FILTERS];		///< points to query buffer; do NOT delete
586 	int				m_iFieldWeight[SPHINXSE_MAX_FILTERS];
587 
588 	bool			m_bGeoAnchor;
589 	const char *		m_sGeoLatAttr;
590 	const char *		m_sGeoLongAttr;
591 	float			m_fGeoLatitude;
592 	float			m_fGeoLongitude;
593 
594 	char *			m_sComment;
595 	char *			m_sSelect;
596 
597 	struct Override_t
598 	{
599 		union Value_t
600 		{
601 			uint32		m_uValue;
602 			longlong	m_iValue64;
603 			float		m_fValue;
604 		};
605 		char *						m_sName; ///< points to query buffer
606 		int							m_iType;
607 		Dynamic_array<ulonglong>	m_dIds;
608 		Dynamic_array<Value_t>		m_dValues;
609 	};
610 	Dynamic_array<Override_t *> m_dOverrides;
611 
612 public:
613 	char			m_sParseError[256];
614 
615 public:
616 	CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
617 	~CSphSEQuery ();
618 
619 	bool			Parse ();
620 	int				BuildRequest ( char ** ppBuffer );
621 
622 protected:
623 	char *			m_pBuf;
624 	char *			m_pCur;
625 	int				m_iBufLeft;
626 	bool			m_bBufOverrun;
627 
628 	template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
629 	bool			ParseField ( char * sField );
630 
631 	void			SendBytes ( const void * pBytes, int iBytes );
SendWordCSphSEQuery632 	void			SendWord ( short int v )		{ v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
SendIntCSphSEQuery633 	void			SendInt ( int v )				{ v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
SendDwordCSphSEQuery634 	void			SendDword ( uint v )			{ v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
SendUint64CSphSEQuery635 	void			SendUint64 ( ulonglong v )		{ SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
SendStringCSphSEQuery636 	void			SendString ( const char * v )	{ int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
SendFloatCSphSEQuery637 	void			SendFloat ( float v )			{ SendDword ( sphF2DW(v) ); }
638 };
639 
640 #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
641 template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
642 template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
643 #endif
644 
645 //////////////////////////////////////////////////////////////////////////////
646 
647 #if MYSQL_VERSION_ID>50100
648 
649 #if MYSQL_VERSION_ID<50114
650 #error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
651 #endif
652 
653 static handler *	sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
654 static int			sphinx_init_func ( void * p );
655 static int			sphinx_close_connection ( handlerton * hton, THD * thd );
656 static int			sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
657 static bool			sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
658 
659 #else
660 
661 static bool			sphinx_init_func_for_handlerton ();
662 static int			sphinx_close_connection ( THD * thd );
663 bool				sphinx_show_status ( THD * thd );
664 
665 #endif // >50100
666 
667 //////////////////////////////////////////////////////////////////////////////
668 
669 static const char	sphinx_hton_name[]		= "SPHINX";
670 static const char	sphinx_hton_comment[]	= "Sphinx storage engine " SPHINXSE_VERSION;
671 
672 #if MYSQL_VERSION_ID<50100
673 handlerton sphinx_hton =
674 {
675 	#ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
676 	MYSQL_HANDLERTON_INTERFACE_VERSION,
677 	#endif
678 	sphinx_hton_name,
679 	SHOW_OPTION_YES,
680 	sphinx_hton_comment,
681 	DB_TYPE_SPHINX_DB,
682 	sphinx_init_func_for_handlerton,
683 	0,							// slot
684 	0,							// savepoint size
685 	sphinx_close_connection,	// close_connection
686 	NULL,	// savepoint
687 	NULL,	// rollback to savepoint
688 	NULL,	// release savepoint
689 	NULL,	// commit
690 	NULL,	// rollback
691 	NULL,	// prepare
692 	NULL,	// recover
693 	NULL,	// commit_by_xid
694 	NULL,	// rollback_by_xid
695 	NULL,	// create_cursor_read_view
696 	NULL,	// set_cursor_read_view
697 	NULL,	// close_cursor_read_view
698 	HTON_CAN_RECREATE
699 };
700 #else
701 static handlerton * sphinx_hton_ptr = NULL;
702 #endif
703 
704 //////////////////////////////////////////////////////////////////////////////
705 
706 // variables for Sphinx shared methods
707 pthread_mutex_t		sphinx_mutex;		// mutex to init the hash
708 static int			sphinx_init = 0;	// flag whether the hash was initialized
709 static HASH			sphinx_open_tables;	// hash used to track open tables
710 
711 //////////////////////////////////////////////////////////////////////////////
712 // INITIALIZATION AND SHUTDOWN
713 //////////////////////////////////////////////////////////////////////////////
714 
715 // hashing function
716 #if MYSQL_VERSION_ID>=50120
717 typedef size_t GetKeyLength_t;
718 #else
719 typedef uint GetKeyLength_t;
720 #endif
721 
sphinx_get_key(const byte * pSharePtr,GetKeyLength_t * pLength,my_bool)722 static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
723 {
724 	CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
725 	*pLength = (size_t) pShare->m_iTableNameLen;
726 	return (byte*) pShare->m_sTable;
727 }
728 
729 #if MYSQL_VERSION_ID<50100
sphinx_init_func(void *)730 static int sphinx_init_func ( void * ) // to avoid unused arg warning
731 #else
732 static int sphinx_init_func ( void * p )
733 #endif
734 {
735 	SPH_ENTER_FUNC();
736 	if ( !sphinx_init )
737 	{
738 		sphinx_init = 1;
739 		void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
740 		sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
741 			sphinx_get_key, 0, 0 );
742 
743 		#if MYSQL_VERSION_ID > 50100
744 		handlerton * hton = (handlerton*) p;
745 		hton->state = SHOW_OPTION_YES;
746 		hton->db_type = DB_TYPE_AUTOASSIGN;
747 		hton->create = sphinx_create_handler;
748 		hton->close_connection = sphinx_close_connection;
749 		hton->show_status = sphinx_show_status;
750 		hton->panic = sphinx_panic;
751 		hton->flags = HTON_CAN_RECREATE;
752 		#endif
753 	}
754 	SPH_RET(0);
755 }
756 
757 
758 #if MYSQL_VERSION_ID<50100
sphinx_init_func_for_handlerton()759 static bool sphinx_init_func_for_handlerton ()
760 {
761 	return sphinx_init_func ( &sphinx_hton );
762 }
763 #endif
764 
765 
766 #if MYSQL_VERSION_ID>50100
767 
sphinx_close_connection(handlerton * hton,THD * thd)768 static int sphinx_close_connection ( handlerton * hton, THD * thd )
769 {
770 	// deallocate common handler data
771 	SPH_ENTER_FUNC();
772 	void ** tmp = thd_ha_data ( thd, hton );
773 	CSphTLS * pTls = (CSphTLS *) (*tmp);
774 	SafeDelete ( pTls );
775 	*tmp = NULL;
776 	SPH_RET(0);
777 }
778 
779 
sphinx_done_func(void *)780 static int sphinx_done_func ( void * )
781 {
782 	SPH_ENTER_FUNC();
783 
784 	int error __attribute__ ((unused)) = 0;
785 	if ( sphinx_init )
786 	{
787 		sphinx_init = 0;
788 		if ( sphinx_open_tables.records )
789 			error = 1;
790 		sphinx_hash_free ( &sphinx_open_tables );
791 		pthread_mutex_destroy ( &sphinx_mutex );
792 	}
793 
794 	SPH_RET(0);
795 }
796 
797 
sphinx_panic(handlerton * hton,enum ha_panic_function)798 static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
799 {
800 	return sphinx_done_func ( hton );
801 }
802 
803 #else
804 
sphinx_close_connection(THD * thd)805 static int sphinx_close_connection ( THD * thd )
806 {
807 	// deallocate common handler data
808 	SPH_ENTER_FUNC();
809 	CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
810 	SafeDelete ( pTls );
811 	thd->ha_data[sphinx_hton.slot] = NULL;
812 	SPH_RET(0);
813 }
814 
815 #endif // >50100
816 
817 //////////////////////////////////////////////////////////////////////////////
818 // SHOW STATUS
819 //////////////////////////////////////////////////////////////////////////////
820 
821 #if MYSQL_VERSION_ID>50100
sphinx_show_status(handlerton * hton,THD * thd,stat_print_fn * stat_print,enum ha_stat_type)822 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
823 	enum ha_stat_type )
824 #else
825 bool sphinx_show_status ( THD * thd )
826 #endif
827 {
828 	SPH_ENTER_FUNC();
829 
830 #if MYSQL_VERSION_ID<50100
831 	Protocol * protocol = thd->protocol;
832 	List<Item> field_list;
833 #endif
834 
835 	char buf1[IO_SIZE];
836 	uint buf1len;
837 	char buf2[IO_SIZE];
838 	uint buf2len = 0;
839 	String words;
840 
841 	buf1[0] = '\0';
842 	buf2[0] = '\0';
843 
844 
845 #if MYSQL_VERSION_ID>50100
846 	// 5.1.x style stats
847 	CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
848 
849 #define LOC_STATS(_key,_keylen,_val,_vallen) \
850 	stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
851 
852 #else
853 	// 5.0.x style stats
854 	if ( have_sphinx_db!=SHOW_OPTION_YES )
855 	{
856 		my_message ( ER_NOT_SUPPORTED_YET,
857 			"failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
858 			MYF(0) );
859 		SPH_RET(TRUE);
860 	}
861 	CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
862 
863 	field_list.push_back ( new Item_empty_string ( thd, "Type", 10 ) );
864 	field_list.push_back ( new Item_empty_string ( thd, "Name", FN_REFLEN ) );
865 	field_list.push_back ( new Item_empty_string ( thd, "Status", 10 ) );
866 	if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
867 		SPH_RET(TRUE);
868 
869 #define LOC_STATS(_key,_keylen,_val,_vallen) \
870 	protocol->prepare_for_resend (); \
871 	protocol->store ( "SPHINX", 6, system_charset_info ); \
872 	protocol->store ( _key, _keylen, system_charset_info ); \
873 	protocol->store ( _val, _vallen, system_charset_info ); \
874 	if ( protocol->write() ) \
875 		SPH_RET(TRUE);
876 
877 #endif
878 
879 
880 	// show query stats
881 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
882 	{
883 		const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
884 		buf1len = my_snprintf ( buf1, sizeof(buf1),
885 			"total: %d, total found: %d, time: %d, words: %d",
886 			pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
887 
888 		LOC_STATS ( "stats", 5, buf1, buf1len );
889 
890 		if ( pStats->m_iWords )
891 		{
892 			for ( int i=0; i<pStats->m_iWords; i++ )
893 			{
894 				CSphSEWordStats & tWord = pStats->m_dWords[i];
895 				buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
896 					buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
897 			}
898 
899 			// convert it if we can
900 			const char * sWord = buf2;
901 			int iWord = buf2len;
902 
903 			String sBuf3;
904 			if ( pTls->m_pHeadTable->m_pQueryCharset )
905 			{
906 				uint iErrors;
907 				sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
908 				sWord = sBuf3.c_ptr();
909 				iWord = sBuf3.length();
910 			}
911 
912 			LOC_STATS ( "words", 5, sWord, iWord );
913 		}
914 	}
915 
916 	// show last error or warning (either in addition to stats, or on their own)
917 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
918 	{
919 		const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
920 
921 		LOC_STATS (
922 			sMessageType, strlen ( sMessageType ),
923 			pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
924 
925 	} else
926 	{
927 		// well, nothing to show just yet
928 #if MYSQL_VERSION_ID < 50100
929 		LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
930 #endif
931 	}
932 
933 #if MYSQL_VERSION_ID < 50100
934 	send_eof(thd);
935 #endif
936 
937 	SPH_RET(FALSE);
938 }
939 
940 //////////////////////////////////////////////////////////////////////////////
941 // HELPERS
942 //////////////////////////////////////////////////////////////////////////////
943 
sphDup(const char * sSrc,int iLen=-1)944 static char * sphDup ( const char * sSrc, int iLen=-1 )
945 {
946 	if ( !sSrc )
947 		return NULL;
948 
949 	if ( iLen<0 )
950 		iLen = strlen(sSrc);
951 
952 	char * sRes = new char [ 1+iLen ];
953 	memcpy ( sRes, sSrc, iLen );
954 	sRes[iLen] = '\0';
955 	return sRes;
956 }
957 
958 
sphLogError(const char * sFmt,...)959 static void sphLogError ( const char * sFmt, ... )
960 {
961 	// emit timestamp
962 #ifdef __WIN__
963 	SYSTEMTIME t;
964 	GetLocalTime ( &t );
965 
966 	fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
967 		(int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
968 		(int)t.wHour, (int)t.wMinute, (int)t.wSecond );
969 #else
970 	// Unix version
971 	time_t tStamp;
972 	time ( &tStamp );
973 
974 	struct tm * pParsed;
975 #ifdef HAVE_LOCALTIME_R
976 	struct tm tParsed;
977 	localtime_r ( &tStamp, &tParsed );
978 	pParsed = &tParsed;
979 #else
980 	pParsed = localtime ( &tStamp );
981 #endif // HAVE_LOCALTIME_R
982 
983 	fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
984 		pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
985 		pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
986 #endif // __WIN__
987 
988 	// emit message
989 	va_list ap;
990 	va_start ( ap, sFmt );
991 	vfprintf ( stderr, sFmt, ap );
992 	va_end ( ap );
993 
994 	// emit newline
995 	fprintf ( stderr, "\n" );
996 }
997 
998 
999 
1000 // the following scheme variants are recognized
1001 //
1002 // sphinx://host[:port]/index
1003 // sphinxql://host[:port]/index
1004 // unix://unix/domain/socket[:index]
ParseUrl(CSphSEShare * share,TABLE * table,bool bCreate)1005 static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
1006 {
1007 	SPH_ENTER_FUNC();
1008 
1009 	if ( share )
1010 	{
1011 		// check incoming stuff
1012 		if ( !table )
1013 		{
1014 			sphLogError ( "table==NULL in ParseUrl()" );
1015 			return false;
1016 		}
1017 		if ( !table->s )
1018 		{
1019 			sphLogError ( "(table->s)==NULL in ParseUrl()" );
1020 			return false;
1021 		}
1022 
1023 		// free old stuff
1024 		share->ResetTable ();
1025 
1026 		// fill new stuff
1027 		share->m_iTableFields = table->s->fields;
1028 		if ( share->m_iTableFields )
1029 		{
1030 			share->m_sTableField = new char * [ share->m_iTableFields ];
1031 			share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
1032 
1033 			for ( int i=0; i<share->m_iTableFields; i++ )
1034 			{
1035 				share->m_sTableField[i] = sphDup ( table->field[i]->field_name.str );
1036 				share->m_eTableFieldType[i] = table->field[i]->type();
1037 			}
1038 		}
1039 	}
1040 
1041 	// defaults
1042 	bool bOk = true;
1043 	bool bQL = false;
1044 	char * sScheme = NULL;
1045 	char * sHost = (char*) SPHINXAPI_DEFAULT_HOST;
1046 	char * sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1047 	int iPort = SPHINXAPI_DEFAULT_PORT;
1048 
1049 	// parse connection string, if any
1050 	while ( table->s->connect_string.length!=0 )
1051 	{
1052 		sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
1053 
1054 		sHost = strstr ( sScheme, "://" );
1055 		if ( !sHost )
1056 		{
1057 			bOk = false;
1058 			break;
1059 		}
1060 		sHost[0] = '\0';
1061 		sHost += 3;
1062 
1063 		/////////////////////////////
1064 		// sphinxapi via unix socket
1065 		/////////////////////////////
1066 
1067 		if ( !strcmp ( sScheme, "unix" ) )
1068 		{
1069 			sHost--; // reuse last slash
1070 			iPort = 0;
1071 			if (!( sIndex = strrchr ( sHost, ':' ) ))
1072                           sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1073 			else
1074 			{
1075 				*sIndex++ = '\0';
1076 				if ( !*sIndex )
1077                                   sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1078 			}
1079 			bOk = true;
1080 			break;
1081 		}
1082 
1083 		/////////////////////
1084 		// sphinxapi via tcp
1085 		/////////////////////
1086 
1087 		if ( !strcmp ( sScheme, "sphinx" ) )
1088 		{
1089 			char * sPort = strchr ( sHost, ':' );
1090 			if ( sPort )
1091 			{
1092 				*sPort++ = '\0';
1093 				if ( *sPort )
1094 				{
1095 					sIndex = strchr ( sPort, '/' );
1096 					if ( sIndex )
1097 						*sIndex++ = '\0';
1098 					else
1099                                           sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1100 
1101 					iPort = atoi(sPort);
1102 					if ( !iPort )
1103                                           iPort = SPHINXAPI_DEFAULT_PORT;
1104 				}
1105 			} else
1106 			{
1107 				sIndex = strchr ( sHost, '/' );
1108 				if ( sIndex )
1109 					*sIndex++ = '\0';
1110 				else
1111                                   sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1112 			}
1113 			bOk = true;
1114 			break;
1115 		}
1116 
1117 		////////////
1118 		// sphinxql
1119 		////////////
1120 
1121 		if ( !strcmp ( sScheme, "sphinxql" ) )
1122 		{
1123 			bQL = true;
1124 			iPort = SPHINXQL_DEFAULT_PORT;
1125 
1126 			// handle port
1127 			char * sPort = strchr ( sHost, ':' );
1128 			sIndex = sHost; // starting point for index name search
1129 
1130 			if ( sPort )
1131 			{
1132 				*sPort++ = '\0';
1133 				sIndex = sPort;
1134 
1135 				iPort = atoi(sPort);
1136 				if ( !iPort )
1137 				{
1138 					bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
1139 					break;
1140 				}
1141 			}
1142 
1143 			// find index
1144 			sIndex = strchr ( sIndex, '/' );
1145 			if ( sIndex )
1146 				*sIndex++ = '\0';
1147 
1148 			// final checks
1149 			// host and index names are required
1150 			bOk = ( sHost && *sHost && sIndex && *sIndex );
1151 			break;
1152 		}
1153 
1154 		// unknown case
1155 		bOk = false;
1156 		break;
1157 	}
1158 
1159 	if ( !bOk )
1160 	{
1161 		my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
1162 			MYF(0), table->s->connect_string.str);
1163 	} else
1164 	{
1165 		if ( share )
1166 		{
1167 			SafeDeleteArray ( share->m_sScheme );
1168 			share->m_sScheme = sScheme;
1169 			share->m_sHost = sHost;
1170 			share->m_sIndex = sIndex;
1171 			share->m_iPort = (ushort)iPort;
1172 			share->m_bSphinxQL = bQL;
1173 		}
1174 	}
1175 	if ( !bOk && !share )
1176 		SafeDeleteArray ( sScheme );
1177 
1178 	SPH_RET(bOk);
1179 }
1180 
1181 
1182 // Example of simple lock controls. The "share" it creates is structure we will
1183 // pass to each sphinx handler. Do you have to have one of these? Well, you have
1184 // pieces that are used for locking, and they are needed to function.
get_share(const char * table_name,TABLE * table)1185 static CSphSEShare * get_share ( const char * table_name, TABLE * table )
1186 {
1187 	SPH_ENTER_FUNC();
1188 	pthread_mutex_lock ( &sphinx_mutex );
1189 
1190 	CSphSEShare * pShare = NULL;
1191 	for ( ;; )
1192 	{
1193 		// check if we already have this share
1194 #if MYSQL_VERSION_ID>=50120
1195 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
1196 #else
1197 #ifdef __WIN__
1198 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
1199 #else
1200 		pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
1201 #endif // win
1202 #endif // pre-5.1.20
1203 
1204 		if ( pShare )
1205 		{
1206 			pShare->m_iUseCount++;
1207 			break;
1208 		}
1209 
1210 		// try to allocate new share
1211 		pShare = new CSphSEShare ();
1212 		if ( !pShare )
1213 			break;
1214 
1215 		// try to setup it
1216 		if ( !ParseUrl ( pShare, table, false ) )
1217 		{
1218 			SafeDelete ( pShare );
1219 			break;
1220 		}
1221 
1222 		if ( !pShare->m_bSphinxQL )
1223 			pShare->m_pTableQueryCharset = table->field[2]->charset();
1224 
1225 		// try to hash it
1226 		pShare->m_iTableNameLen = strlen(table_name);
1227 		pShare->m_sTable = sphDup ( table_name );
1228 		if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
1229 		{
1230 			SafeDelete ( pShare );
1231 			break;
1232 		}
1233 
1234 		// all seems fine
1235 		break;
1236 	}
1237 
1238 	pthread_mutex_unlock ( &sphinx_mutex );
1239 	SPH_RET(pShare);
1240 }
1241 
1242 
1243 // Free lock controls. We call this whenever we close a table. If the table had
1244 // the last reference to the share then we free memory associated with it.
free_share(CSphSEShare * pShare)1245 static int free_share ( CSphSEShare * pShare )
1246 {
1247 	SPH_ENTER_FUNC();
1248 	pthread_mutex_lock ( &sphinx_mutex );
1249 
1250 	if ( !--pShare->m_iUseCount )
1251 	{
1252 		sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
1253 		SafeDelete ( pShare );
1254 	}
1255 
1256 	pthread_mutex_unlock ( &sphinx_mutex );
1257 	SPH_RET(0);
1258 }
1259 
1260 
1261 #if MYSQL_VERSION_ID>50100
sphinx_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)1262 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
1263 {
1264 	sphinx_hton_ptr = hton;
1265 	return new ( mem_root ) ha_sphinx ( hton, table );
1266 }
1267 #endif
1268 
1269 //////////////////////////////////////////////////////////////////////////////
1270 // CLIENT-SIDE REQUEST STUFF
1271 //////////////////////////////////////////////////////////////////////////////
1272 
CSphSEQuery(const char * sQuery,int iLength,const char * sIndex)1273 CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
1274 	: m_sHost ( "" )
1275 	, m_iPort ( 0 )
1276 	, m_sIndex ( sIndex ? sIndex : "*" )
1277 	, m_iOffset ( 0 )
1278 	, m_iLimit ( 20 )
1279 	, m_bQuery ( false )
1280 	, m_sQuery ( "" )
1281 	, m_pWeights ( NULL )
1282 	, m_iWeights ( 0 )
1283 	, m_eMode ( SPH_MATCH_ALL )
1284 	, m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
1285 	, m_sRankExpr ( NULL )
1286 	, m_eSort ( SPH_SORT_RELEVANCE )
1287 	, m_sSortBy ( "" )
1288 	, m_iMaxMatches ( 1000 )
1289 	, m_iMaxQueryTime ( 0 )
1290 	, m_iMinID ( 0 )
1291 	, m_iMaxID ( 0 )
1292 	, m_iFilters ( 0 )
1293 	, m_eGroupFunc ( SPH_GROUPBY_DAY )
1294 	, m_sGroupBy ( "" )
1295 	, m_sGroupSortBy ( "@group desc" )
1296 	, m_iCutoff ( 0 )
1297 	, m_iRetryCount ( 0 )
1298 	, m_iRetryDelay ( 0 )
1299 	, m_sGroupDistinct ( "" )
1300 	, m_iIndexWeights ( 0 )
1301 	, m_iFieldWeights ( 0 )
1302 	, m_bGeoAnchor ( false )
1303 	, m_sGeoLatAttr ( "" )
1304 	, m_sGeoLongAttr ( "" )
1305 	, m_fGeoLatitude ( 0.0f )
1306 	, m_fGeoLongitude ( 0.0f )
1307 	, m_sComment ( (char*) "" )
1308 	, m_sSelect ( (char*) "*" )
1309 
1310 	, m_pBuf ( NULL )
1311 	, m_pCur ( NULL )
1312 	, m_iBufLeft ( 0 )
1313 	, m_bBufOverrun ( false )
1314 {
1315 	m_sQueryBuffer = new char [ iLength+2 ];
1316 	memcpy ( m_sQueryBuffer, sQuery, iLength );
1317 	m_sQueryBuffer[iLength] = ';';
1318 	m_sQueryBuffer[iLength+1] = '\0';
1319 }
1320 
1321 
~CSphSEQuery()1322 CSphSEQuery::~CSphSEQuery ()
1323 {
1324 	SPH_ENTER_METHOD();
1325 	SafeDeleteArray ( m_sQueryBuffer );
1326 	SafeDeleteArray ( m_pWeights );
1327 	SafeDeleteArray ( m_pBuf );
1328 	for ( size_t i=0; i<m_dOverrides.elements(); i++ )
1329 		SafeDelete ( m_dOverrides.at(i) );
1330 	SPH_VOID_RET();
1331 }
1332 
1333 
1334 template < typename T >
ParseArray(T ** ppValues,const char * sValue)1335 int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
1336 {
1337 	SPH_ENTER_METHOD();
1338 
1339 	assert ( ppValues );
1340 	assert ( !(*ppValues) );
1341 
1342 	const char * pValue;
1343 	bool bPrevDigit = false;
1344 	int iValues = 0;
1345 
1346 	// count the values
1347 	for ( pValue=sValue; *pValue; pValue++ )
1348 	{
1349 		bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1350 		if ( bDigit && !bPrevDigit )
1351 			iValues++;
1352 		bPrevDigit = bDigit;
1353 	}
1354 	if ( !iValues )
1355 		SPH_RET(0);
1356 
1357 	// extract the values
1358 	T * pValues = new T [ iValues ];
1359 	*ppValues = pValues;
1360 
1361 	int iIndex = 0, iSign = 1;
1362 	T uValue = 0;
1363 
1364 	bPrevDigit = false;
1365 	for ( pValue=sValue ;; pValue++ )
1366 	{
1367 		bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1368 
1369 		if ( bDigit )
1370 		{
1371 			if ( !bPrevDigit )
1372 				uValue = 0;
1373 			uValue = uValue*10 + ( (*pValue)-'0' );
1374 		} else if ( bPrevDigit )
1375 		{
1376 			assert ( iIndex<iValues );
1377 			pValues [ iIndex++ ] = uValue * iSign;
1378 			iSign = 1;
1379 		} else if ( *pValue=='-' )
1380 			iSign = -1;
1381 
1382 		bPrevDigit = bDigit;
1383 		if ( !*pValue )
1384 			break;
1385 	}
1386 
1387 	SPH_RET ( iValues );
1388 }
1389 
1390 
chop(char * s)1391 static char * chop ( char * s )
1392 {
1393 	while ( *s && isspace(*s) )
1394 		s++;
1395 
1396 	char * p = s + strlen(s);
1397 	while ( p>s && isspace ( p[-1] ) )
1398 		p--;
1399 	*p = '\0';
1400 
1401 	return s;
1402 }
1403 
1404 
myisattr(char c)1405 static bool myisattr ( char c )
1406 {
1407 	return
1408 		( c>='0' && c<='9' ) ||
1409 		( c>='a' && c<='z' ) ||
1410 		( c>='A' && c<='Z' ) ||
1411 		c=='_';
1412 }
1413 
myismagic(char c)1414 static bool myismagic ( char c )
1415 {
1416 	return c=='@';
1417 }
1418 
myisjson(char c)1419 static bool myisjson ( char c )
1420 {
1421 	return
1422 		c=='.' ||
1423 		c=='[' ||
1424 		c==']';
1425 }
1426 
1427 
ParseField(char * sField)1428 bool CSphSEQuery::ParseField ( char * sField )
1429 {
1430 	SPH_ENTER_METHOD();
1431 
1432 	// look for option name/value separator
1433 	char * sValue = strchr ( sField, '=' );
1434 	if ( !sValue || sValue==sField || sValue[-1]=='\\' )
1435 	{
1436 		// by default let's assume it's just query
1437 		if ( sField[0] )
1438 		{
1439 			if ( m_bQuery )
1440 			{
1441 				snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
1442 				SPH_RET(false);
1443 			} else
1444 			{
1445 				m_sQuery = sField;
1446 				m_bQuery = true;
1447 
1448 				// unescape only 1st one
1449 				char *s = sField, *d = sField;
1450 				int iSlashes = 0;
1451 				while ( *s )
1452 				{
1453 					iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
1454 					if ( ( iSlashes%2 )==0 ) *d++ = *s;
1455 					s++;
1456 				}
1457 				*d = '\0';
1458 			}
1459 		}
1460 		SPH_RET(true);
1461 	}
1462 
1463 	// split
1464 	*sValue++ = '\0';
1465 	sValue = chop ( sValue );
1466 	int iValue = atoi ( sValue );
1467 
1468 	// handle options
1469 	char * sName = chop ( sField );
1470 
1471 	if ( !strcmp ( sName, "query" ) )			m_sQuery = sValue;
1472 	else if ( !strcmp ( sName, "host" ) )		m_sHost = sValue;
1473 	else if ( !strcmp ( sName, "port" ) )		m_iPort = iValue;
1474 	else if ( !strcmp ( sName, "index" ) )		m_sIndex = sValue;
1475 	else if ( !strcmp ( sName, "offset" ) )		m_iOffset = iValue;
1476 	else if ( !strcmp ( sName, "limit" ) )		m_iLimit = iValue;
1477 	else if ( !strcmp ( sName, "weights" ) )	m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
1478 	else if ( !strcmp ( sName, "minid" ) )		m_iMinID = iValue;
1479 	else if ( !strcmp ( sName, "maxid" ) )		m_iMaxID = iValue;
1480 	else if ( !strcmp ( sName, "maxmatches" ) )	m_iMaxMatches = iValue;
1481 	else if ( !strcmp ( sName, "maxquerytime" ) )	m_iMaxQueryTime = iValue;
1482 	else if ( !strcmp ( sName, "groupsort" ) )	m_sGroupSortBy = sValue;
1483 	else if ( !strcmp ( sName, "distinct" ) )	m_sGroupDistinct = sValue;
1484 	else if ( !strcmp ( sName, "cutoff" ) )		m_iCutoff = iValue;
1485 	else if ( !strcmp ( sName, "comment" ) )	m_sComment = sValue;
1486 	else if ( !strcmp ( sName, "select" ) )		m_sSelect = sValue;
1487 
1488 	else if ( !strcmp ( sName, "mode" ) )
1489 	{
1490 		m_eMode = SPH_MATCH_ALL;
1491 		if ( !strcmp ( sValue, "any" ) )			m_eMode = SPH_MATCH_ANY;
1492 		else if ( !strcmp ( sValue, "phrase" ) )	m_eMode = SPH_MATCH_PHRASE;
1493 		else if ( !strcmp ( sValue, "boolean" ) )	m_eMode = SPH_MATCH_BOOLEAN;
1494 		else if ( !strcmp ( sValue, "ext" ) )		m_eMode = SPH_MATCH_EXTENDED;
1495 		else if ( !strcmp ( sValue, "extended" ) )	m_eMode = SPH_MATCH_EXTENDED;
1496 		else if ( !strcmp ( sValue, "ext2" ) )		m_eMode = SPH_MATCH_EXTENDED2;
1497 		else if ( !strcmp ( sValue, "extended2" ) )	m_eMode = SPH_MATCH_EXTENDED2;
1498 		else if ( !strcmp ( sValue, "all" ) )		m_eMode = SPH_MATCH_ALL;
1499 		else if ( !strcmp ( sValue, "fullscan" ) )	m_eMode = SPH_MATCH_FULLSCAN;
1500 		else
1501 		{
1502 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
1503 			SPH_RET(false);
1504 		}
1505 	} else if ( !strcmp ( sName, "ranker" ) )
1506 	{
1507 		m_eRanker = SPH_RANK_PROXIMITY_BM25;
1508 		if ( !strcmp ( sValue, "proximity_bm25" ) )	m_eRanker = SPH_RANK_PROXIMITY_BM25;
1509 		else if ( !strcmp ( sValue, "bm25" ) )		m_eRanker = SPH_RANK_BM25;
1510 		else if ( !strcmp ( sValue, "none" ) )		m_eRanker = SPH_RANK_NONE;
1511 		else if ( !strcmp ( sValue, "wordcount" ) )	m_eRanker = SPH_RANK_WORDCOUNT;
1512 		else if ( !strcmp ( sValue, "proximity" ) )	m_eRanker = SPH_RANK_PROXIMITY;
1513 		else if ( !strcmp ( sValue, "matchany" ) )	m_eRanker = SPH_RANK_MATCHANY;
1514 		else if ( !strcmp ( sValue, "fieldmask" ) )	m_eRanker = SPH_RANK_FIELDMASK;
1515 		else if ( !strcmp ( sValue, "sph04" ) )		m_eRanker = SPH_RANK_SPH04;
1516 		else if ( !strncmp ( sValue, "expr:", 5 ) )
1517 		{
1518 			m_eRanker = SPH_RANK_EXPR;
1519 			m_sRankExpr = sValue+5;
1520 		} else
1521 		{
1522 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
1523 			SPH_RET(false);
1524 		}
1525 	} else if ( !strcmp ( sName, "sort" ) )
1526 	{
1527 		static const struct
1528 		{
1529 			const char *	m_sName;
1530 			ESphSortOrder	m_eSort;
1531 		} dSortModes[] =
1532 		{
1533 			{ "relevance",		SPH_SORT_RELEVANCE },
1534 			{ "attr_desc:",		SPH_SORT_ATTR_DESC },
1535 			{ "attr_asc:",		SPH_SORT_ATTR_ASC },
1536 			{ "time_segments:",	SPH_SORT_TIME_SEGMENTS },
1537 			{ "extended:",		SPH_SORT_EXTENDED },
1538 			{ "expr:",			SPH_SORT_EXPR }
1539 		};
1540 
1541 		int i;
1542 		const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
1543 		for ( i=0; i<nModes; i++ )
1544 			if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
1545 		{
1546 			m_eSort = dSortModes[i].m_eSort;
1547 			m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
1548 			break;
1549 		}
1550 		if ( i==nModes )
1551 		{
1552 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
1553 			SPH_RET(false);
1554 		}
1555 
1556 	} else if ( !strcmp ( sName, "groupby" ) )
1557 	{
1558 		static const struct
1559 		{
1560 			const char *	m_sName;
1561 			ESphGroupBy		m_eFunc;
1562 		} dGroupModes[] =
1563 		{
1564 			{ "day:",	SPH_GROUPBY_DAY },
1565 			{ "week:",	SPH_GROUPBY_WEEK },
1566 			{ "month:",	SPH_GROUPBY_MONTH },
1567 			{ "year:",	SPH_GROUPBY_YEAR },
1568 			{ "attr:",	SPH_GROUPBY_ATTR },
1569 			{ "multi:",     SPH_GROUPBY_MULTIPLE }
1570 		};
1571 
1572 		int i;
1573 		const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
1574 		for ( i=0; i<nModes; i++ )
1575 			if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
1576 		{
1577 			m_eGroupFunc = dGroupModes[i].m_eFunc;
1578 			m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
1579 			break;
1580 		}
1581 		if ( i==nModes )
1582 		{
1583 			snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
1584 			SPH_RET(false);
1585 		}
1586 
1587 	} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1588 		( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
1589 	{
1590 		for ( ;; )
1591 		{
1592 			char * p = sName;
1593 			CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1594 			tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
1595 			tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
1596 
1597 			if (!( p = strchr ( sValue, ',' ) ))
1598 				break;
1599 			*p++ = '\0';
1600 
1601 			tFilter.m_sAttrName = chop ( sValue );
1602 			sValue = p;
1603 
1604 			if (!( p = strchr ( sValue, ',' ) ))
1605 				break;
1606 			*p++ = '\0';
1607 
1608 			if ( tFilter.m_eType==SPH_FILTER_RANGE )
1609 			{
1610 				tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
1611 				tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
1612 			} else
1613 			{
1614 				tFilter.m_fMinValue = (float)atof(sValue);
1615 				tFilter.m_fMaxValue = (float)atof(p);
1616 			}
1617 
1618 			// all ok
1619 			m_iFilters++;
1620 			break;
1621 		}
1622 
1623 	} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1624 		( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
1625 	{
1626 		for ( ;; )
1627 		{
1628 			CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1629 			tFilter.m_eType = SPH_FILTER_VALUES;
1630 			tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
1631 
1632 			// get the attr name
1633 			while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
1634 				sValue++;
1635 			if ( !*sValue )
1636 				break;
1637 
1638 			tFilter.m_sAttrName = sValue;
1639 			while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) || myisjson(*sValue) ) )
1640 				sValue++;
1641 			if ( !*sValue )
1642 				break;
1643 			*sValue++ = '\0';
1644 
1645 			// get the values
1646 			tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
1647 			if ( !tFilter.m_iValues )
1648 			{
1649 				assert ( !tFilter.m_pValues );
1650 				break;
1651 			}
1652 
1653 			// all ok
1654 			m_iFilters++;
1655 			break;
1656 		}
1657 
1658 	} else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
1659 	{
1660 		bool bIndex = !strcmp ( sName, "indexweights" );
1661 		int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
1662 		char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
1663 		int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
1664 
1665 		*pCount = 0;
1666 
1667 		char * p = sValue;
1668 		while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
1669 		{
1670 			// extract attr name
1671 			if ( !myisattr(*p) )
1672 			{
1673 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
1674 				SPH_RET(false);
1675 			}
1676 
1677 			pNames[*pCount] = p;
1678 			while ( myisattr(*p) ) p++;
1679 
1680 			if ( *p!=',' )
1681 			{
1682 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1683 				SPH_RET(false);
1684 			}
1685 			*p++ = '\0';
1686 
1687 			// extract attr value
1688 			char * sVal = p;
1689 			while ( isdigit(*p) ) p++;
1690 			if ( p==sVal )
1691 			{
1692 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
1693 				SPH_RET(false);
1694 			}
1695 			pWeights[*pCount] = atoi(sVal);
1696 			(*pCount)++;
1697 
1698 			if ( !*p )
1699 				break;
1700 			if ( *p!=',' )
1701 			{
1702 				snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1703 				SPH_RET(false);
1704 			}
1705 			p++;
1706 		}
1707 
1708 	} else if ( !strcmp ( sName, "geoanchor" ) )
1709 	{
1710 		m_bGeoAnchor = false;
1711 		for ( ;; )
1712 		{
1713 			char * sLat = sValue;
1714 			char * p = sValue;
1715 
1716 			if (!( p = strchr ( p, ',' ) )) break;
1717                         *p++ = '\0';
1718 			char * sLong = p;
1719 
1720 			if (!( p = strchr ( p, ',' ) )) break;
1721                         *p++ = '\0';
1722 			char * sLatVal = p;
1723 
1724 			if (!( p = strchr ( p, ',' ) )) break;
1725                         *p++ = '\0';
1726 			char * sLongVal = p;
1727 
1728 			m_sGeoLatAttr = chop(sLat);
1729 			m_sGeoLongAttr = chop(sLong);
1730 			m_fGeoLatitude = (float)atof ( sLatVal );
1731 			m_fGeoLongitude = (float)atof ( sLongVal );
1732 			m_bGeoAnchor = true;
1733 			break;
1734 		}
1735 		if ( !m_bGeoAnchor )
1736 		{
1737 			snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
1738 			SPH_RET(false);
1739 		}
1740 	} else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
1741 	{
1742 		sName = NULL;
1743 		int iType = 0;
1744 		CSphSEQuery::Override_t * pOverride = NULL;
1745 
1746 		// get name and type
1747 		char * sRest = sValue;
1748 		for ( ;; )
1749 		{
1750 			sName = sRest;
1751 			if ( !*sName )
1752 				break;
1753 			if (!( sRest = strchr ( sRest, ',' ) ))
1754 				break;
1755 			*sRest++ = '\0';
1756 			char * sType = sRest;
1757 			if (!( sRest = strchr ( sRest, ',' ) ))
1758 				break;
1759 
1760 			static const struct
1761 			{
1762 				const char *	m_sName;
1763 				int				m_iType;
1764 			}
1765 			dAttrTypes[] =
1766 			{
1767 				{ "int",		SPH_ATTR_INTEGER },
1768 				{ "timestamp",	SPH_ATTR_TIMESTAMP },
1769 				{ "bool",		SPH_ATTR_BOOL },
1770 				{ "float",		SPH_ATTR_FLOAT },
1771 				{ "bigint",		SPH_ATTR_BIGINT }
1772 			};
1773 			for ( uint i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
1774 				if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
1775 			{
1776 				iType = dAttrTypes[i].m_iType;
1777 				break;
1778 			}
1779 			break;
1780 		}
1781 
1782 		// fail
1783 		if ( !sName || !*sName || !iType )
1784 		{
1785 			snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
1786 			SPH_RET(false);
1787 		}
1788 
1789 		// grab id:value pairs
1790 		sRest++;
1791 		while ( sRest )
1792 		{
1793 			char * sId = sRest;
1794 			if (!( sRest = strchr ( sRest, ':' ) )) break;
1795                         *sRest++ = '\0';
1796 			if (!( sRest - sId )) break;
1797 
1798 			sValue = sRest;
1799 			if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
1800 				*sRest++ = '\0';
1801 			if ( !*sValue )
1802 				break;
1803 
1804 			if ( !pOverride )
1805 			{
1806 				pOverride = new CSphSEQuery::Override_t;
1807 				pOverride->m_sName = chop(sName);
1808 				pOverride->m_iType = iType;
1809 				m_dOverrides.append ( pOverride );
1810 			}
1811 
1812 			ulonglong uId = strtoull ( sId, NULL, 10 );
1813 			CSphSEQuery::Override_t::Value_t tValue;
1814 			if ( iType==SPH_ATTR_FLOAT )
1815 				tValue.m_fValue = (float)atof(sValue);
1816 			else if ( iType==SPH_ATTR_BIGINT )
1817 				tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
1818 			else
1819 				tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
1820 
1821 			pOverride->m_dIds.append ( uId );
1822 			pOverride->m_dValues.append ( tValue );
1823 		}
1824 
1825 		if ( !pOverride )
1826 		{
1827 			snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
1828 			SPH_RET(false);
1829 		}
1830 		SPH_RET(true);
1831 	} else
1832 	{
1833 		snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
1834 		SPH_RET(false);
1835 	}
1836 
1837 	// !COMMIT handle syntax errors
1838 
1839 	SPH_RET(true);
1840 }
1841 
1842 
Parse()1843 bool CSphSEQuery::Parse ()
1844 {
1845 	SPH_ENTER_METHOD();
1846 	SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
1847 
1848 	m_bQuery = false;
1849 	char * pCur = m_sQueryBuffer;
1850 	char * pNext = pCur;
1851 
1852 	while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
1853 	{
1854 		// handle escaped semicolons
1855 		if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
1856 		{
1857 			pNext++;
1858 			continue;
1859 		}
1860 
1861 		// handle semicolon-separated clauses
1862 		*pNext++ = '\0';
1863 		if ( !ParseField ( pCur ) )
1864 			SPH_RET(false);
1865 		pCur = pNext;
1866 	}
1867 
1868 	SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
1869 
1870 	SPH_RET(true);
1871 }
1872 
1873 
SendBytes(const void * pBytes,int iBytes)1874 void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
1875 {
1876 	SPH_ENTER_METHOD();
1877 	if ( m_iBufLeft<iBytes )
1878 	{
1879 		m_bBufOverrun = true;
1880 		SPH_VOID_RET();
1881 	}
1882 
1883 	memcpy ( m_pCur, pBytes, iBytes );
1884 
1885 	m_pCur += iBytes;
1886 	m_iBufLeft -= iBytes;
1887 	SPH_VOID_RET();
1888 }
1889 
1890 
BuildRequest(char ** ppBuffer)1891 int CSphSEQuery::BuildRequest ( char ** ppBuffer )
1892 {
1893 	SPH_ENTER_METHOD();
1894 
1895 	// calc request length
1896 	int iReqSize = 128 + 4*m_iWeights
1897 		+ strlen ( m_sSortBy )
1898 		+ strlen ( m_sQuery )
1899 		+ strlen ( m_sIndex )
1900 		+ strlen ( m_sGroupBy )
1901 		+ strlen ( m_sGroupSortBy )
1902 		+ strlen ( m_sGroupDistinct )
1903 		+ strlen ( m_sComment )
1904 		+ strlen ( m_sSelect );
1905 	if ( m_eRanker==SPH_RANK_EXPR )
1906 		iReqSize += 4 + strlen(m_sRankExpr);
1907 	for ( int i=0; i<m_iFilters; i++ )
1908 	{
1909 		const CSphSEFilter & tFilter = m_dFilters[i];
1910 		iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
1911 		switch ( tFilter.m_eType )
1912 		{
1913 			case SPH_FILTER_VALUES:		iReqSize += 4 + 8*tFilter.m_iValues; break;
1914 			case SPH_FILTER_RANGE:		iReqSize += 16; break;
1915 			case SPH_FILTER_FLOATRANGE:	iReqSize += 8; break;
1916 		}
1917 	}
1918 	if ( m_bGeoAnchor ) // 1.14+
1919 		iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
1920 	for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
1921 		iReqSize += 8 + strlen(m_sIndexWeight[i] );
1922 	for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
1923 		iReqSize += 8 + strlen(m_sFieldWeight[i] );
1924 	// overrides
1925 	iReqSize += 4;
1926 	for ( size_t i=0; i<m_dOverrides.elements(); i++ )
1927 	{
1928 		CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
1929 		const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
1930 		iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
1931 	}
1932 	// select
1933 	iReqSize += 4;
1934 
1935 	m_iBufLeft = 0;
1936 	SafeDeleteArray ( m_pBuf );
1937 
1938 	m_pBuf = new char [ iReqSize ];
1939 	if ( !m_pBuf )
1940 		SPH_RET(-1);
1941 
1942 	m_pCur = m_pBuf;
1943 	m_iBufLeft = iReqSize;
1944 	m_bBufOverrun = false;
1945 	(*ppBuffer) = m_pBuf;
1946 
1947 	// build request
1948 	SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
1949 	SendWord ( VER_COMMAND_SEARCH ); // command version
1950 	SendInt ( iReqSize-8 ); // packet body length
1951 	SendInt ( 0 ); // its a client
1952 
1953 	SendInt ( 1 ); // number of queries
1954 	SendInt ( m_iOffset );
1955 	SendInt ( m_iLimit );
1956 	SendInt ( m_eMode );
1957 	SendInt ( m_eRanker ); // 1.16+
1958 	if ( m_eRanker==SPH_RANK_EXPR )
1959 		SendString ( m_sRankExpr );
1960 	SendInt ( m_eSort );
1961 	SendString ( m_sSortBy ); // sort attr
1962 	SendString ( m_sQuery ); // query
1963 	SendInt ( m_iWeights );
1964 	for ( int j=0; j<m_iWeights; j++ )
1965 		SendInt ( m_pWeights[j] ); // weights
1966 	SendString ( m_sIndex ); // indexes
1967 	SendInt ( 1 ); // id64 range follows
1968 	SendUint64 ( m_iMinID ); // id/ts ranges
1969 	SendUint64 ( m_iMaxID );
1970 
1971 	SendInt ( m_iFilters );
1972 	for ( int j=0; j<m_iFilters; j++ )
1973 	{
1974 		const CSphSEFilter & tFilter = m_dFilters[j];
1975 		SendString ( tFilter.m_sAttrName );
1976 		SendInt ( tFilter.m_eType );
1977 
1978 		switch ( tFilter.m_eType )
1979 		{
1980 			case SPH_FILTER_VALUES:
1981 				SendInt ( tFilter.m_iValues );
1982 				for ( int k=0; k<tFilter.m_iValues; k++ )
1983 					SendUint64 ( tFilter.m_pValues[k] );
1984 				break;
1985 
1986 			case SPH_FILTER_RANGE:
1987 				SendUint64 ( tFilter.m_uMinValue );
1988 				SendUint64 ( tFilter.m_uMaxValue );
1989 				break;
1990 
1991 			case SPH_FILTER_FLOATRANGE:
1992 				SendFloat ( tFilter.m_fMinValue );
1993 				SendFloat ( tFilter.m_fMaxValue );
1994 				break;
1995 		}
1996 
1997 		SendInt ( tFilter.m_bExclude );
1998 	}
1999 
2000 	SendInt ( m_eGroupFunc );
2001 	SendString ( m_sGroupBy );
2002 	SendInt ( m_iMaxMatches );
2003 	SendString ( m_sGroupSortBy );
2004 	SendInt ( m_iCutoff ); // 1.9+
2005 	SendInt ( m_iRetryCount ); // 1.10+
2006 	SendInt ( m_iRetryDelay );
2007 	SendString ( m_sGroupDistinct ); // 1.11+
2008 	SendInt ( m_bGeoAnchor ); // 1.14+
2009 	if ( m_bGeoAnchor )
2010 	{
2011 		SendString ( m_sGeoLatAttr );
2012 		SendString ( m_sGeoLongAttr );
2013 		SendFloat ( m_fGeoLatitude );
2014 		SendFloat ( m_fGeoLongitude );
2015 	}
2016 	SendInt ( m_iIndexWeights ); // 1.15+
2017 	for ( int i=0; i<m_iIndexWeights; i++ )
2018 	{
2019 		SendString ( m_sIndexWeight[i] );
2020 		SendInt ( m_iIndexWeight[i] );
2021 	}
2022 	SendInt ( m_iMaxQueryTime ); // 1.17+
2023 	SendInt ( m_iFieldWeights ); // 1.18+
2024 	for ( int i=0; i<m_iFieldWeights; i++ )
2025 	{
2026 		SendString ( m_sFieldWeight[i] );
2027 		SendInt ( m_iFieldWeight[i] );
2028 	}
2029 	SendString ( m_sComment );
2030 
2031 	// overrides
2032 	SendInt ( m_dOverrides.elements() );
2033 	for ( size_t i=0; i<m_dOverrides.elements(); i++ )
2034 	{
2035 		CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
2036 		SendString ( pOverride->m_sName );
2037 		SendDword ( pOverride->m_iType );
2038 		SendInt ( pOverride->m_dIds.elements() );
2039 		for ( size_t j=0; j<pOverride->m_dIds.elements(); j++ )
2040 		{
2041 			SendUint64 ( pOverride->m_dIds.at(j) );
2042 			if ( pOverride->m_iType==SPH_ATTR_FLOAT )
2043 				SendFloat ( pOverride->m_dValues.at(j).m_fValue );
2044 			else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
2045 				SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
2046 			else
2047 				SendDword ( pOverride->m_dValues.at(j).m_uValue );
2048 		}
2049 	}
2050 
2051 	// select
2052 	SendString ( m_sSelect );
2053 
2054 	// detect buffer overruns and underruns, and report internal error
2055 	if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
2056 		SPH_RET(-1);
2057 
2058 	// all fine
2059 	SPH_RET ( iReqSize );
2060 }
2061 
2062 //////////////////////////////////////////////////////////////////////////////
2063 // SPHINX HANDLER
2064 //////////////////////////////////////////////////////////////////////////////
2065 
2066 #if MYSQL_VERSION_ID<50100
ha_sphinx(TABLE_ARG * table)2067 ha_sphinx::ha_sphinx ( TABLE_ARG * table )
2068 	: handler ( &sphinx_hton, table )
2069 #else
2070 ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
2071 	: handler ( hton, table )
2072 #endif
2073 	, m_pShare ( NULL )
2074 	, m_iMatchesTotal ( 0 )
2075 	, m_iCurrentPos ( 0 )
2076 	, m_pCurrentKey ( NULL )
2077 	, m_iCurrentKeyLen ( 0 )
2078 	, m_pResponse ( NULL )
2079 	, m_pResponseEnd ( NULL )
2080 	, m_pCur ( NULL )
2081 	, m_bUnpackError ( false )
2082 	, m_iFields ( 0 )
2083 	, m_dFields ( NULL )
2084 	, m_iAttrs ( 0 )
2085 	, m_dAttrs ( NULL )
2086 	, m_bId64 ( 0 )
2087 	, m_dUnboundFields ( NULL )
2088 {
2089 	SPH_ENTER_METHOD();
2090 	SPH_VOID_RET();
2091 }
2092 
~ha_sphinx()2093 ha_sphinx::~ha_sphinx()
2094 {
2095   SafeDeleteArray ( m_dAttrs );
2096   SafeDeleteArray ( m_dUnboundFields );
2097   if ( m_dFields )
2098   {
2099     for (uint32 i=0; i< m_iFields; i++ )
2100       SafeDeleteArray ( m_dFields[i] );
2101     delete [] m_dFields;
2102   }
2103 }
2104 
2105 // Used for opening tables. The name will be the name of the file.
2106 // A table is opened when it needs to be opened. For instance
2107 // when a request comes in for a select on the table (tables are not
2108 // open and closed for each request, they are cached).
2109 //
2110 // Called from handler.cc by handler::ha_open(). The server opens all tables by
2111 // calling ha_open() which then calls the handler specific open().
open(const char * name,int,uint)2112 int ha_sphinx::open ( const char * name, int, uint )
2113 {
2114 	SPH_ENTER_METHOD();
2115 	m_pShare = get_share ( name, table );
2116 	if ( !m_pShare )
2117 		SPH_RET(1);
2118 
2119 	thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
2120 
2121 	#if MYSQL_VERSION_ID>50100
2122 	*thd_ha_data ( table->in_use, ht ) = NULL;
2123 	#else
2124 	table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
2125 	#endif
2126 
2127 	SPH_RET(0);
2128 }
2129 
2130 
Connect(const char * sHost,ushort uPort)2131 int ha_sphinx::Connect ( const char * sHost, ushort uPort )
2132 {
2133 	struct sockaddr_in sin;
2134 #ifndef __WIN__
2135 	struct sockaddr_un saun;
2136 #endif
2137 
2138 	int iDomain = 0;
2139 	int iSockaddrSize = 0;
2140 	struct sockaddr * pSockaddr = NULL;
2141 
2142 	in_addr_t ip_addr;
2143 
2144 	if ( uPort )
2145 	{
2146 		iDomain = AF_INET;
2147 		iSockaddrSize = sizeof(sin);
2148 		pSockaddr = (struct sockaddr *) &sin;
2149 
2150 		memset ( &sin, 0, sizeof(sin) );
2151 		sin.sin_family = AF_INET;
2152 		sin.sin_port = htons(uPort);
2153 
2154 		// prepare host address
2155 		if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
2156 		{
2157 			memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
2158 		} else
2159 		{
2160 			int tmp_errno;
2161 			bool bError = false;
2162 
2163 #if MYSQL_VERSION_ID>=50515
2164 			struct addrinfo *hp = NULL;
2165 			tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
2166 			if ( tmp_errno || !hp || !hp->ai_addr )
2167 			{
2168 				bError = true;
2169 				if ( hp )
2170 					freeaddrinfo ( hp );
2171 			}
2172 #else
2173 			struct hostent tmp_hostent, *hp;
2174 			char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
2175 			hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
2176 			if ( !hp )
2177 			{
2178 				my_gethostbyname_r_free();
2179 				bError = true;
2180 			}
2181 #endif
2182 
2183 			if ( bError )
2184 			{
2185 				char sError[256];
2186 				my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
2187 
2188 				my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2189 				SPH_RET(-1);
2190 			}
2191 
2192 #if MYSQL_VERSION_ID>=50515
2193 			struct sockaddr_in *in = (sockaddr_in *)hp->ai_addr;
2194 			memcpy ( &sin.sin_addr, &in->sin_addr, Min ( sizeof(sin.sin_addr), sizeof(in->sin_addr) ) );
2195  			freeaddrinfo ( hp );
2196 #else
2197 			memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
2198 			my_gethostbyname_r_free();
2199 #endif
2200 		}
2201 	} else
2202 	{
2203 #ifndef __WIN__
2204 		iDomain = AF_UNIX;
2205 		iSockaddrSize = sizeof(saun);
2206 		pSockaddr = (struct sockaddr *) &saun;
2207 
2208 		memset ( &saun, 0, sizeof(saun) );
2209 		saun.sun_family = AF_UNIX;
2210 		strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
2211 #else
2212 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
2213 		SPH_RET(-1);
2214 #endif
2215 	}
2216 
2217 	char sError[512];
2218 	int iSocket = (int) socket ( iDomain, SOCK_STREAM, 0 );
2219 
2220 	if ( iSocket<0 )
2221 	{
2222 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
2223 		SPH_RET(-1);
2224 	}
2225 
2226 	if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
2227 	{
2228 		sphSockClose ( iSocket );
2229 		my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
2230 			sHost, errno, (int)uPort );
2231 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2232 		SPH_RET(-1);
2233 	}
2234 
2235 	return iSocket;
2236 }
2237 
2238 
ConnectAPI(const char * sQueryHost,int iQueryPort)2239 int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
2240 {
2241 	SPH_ENTER_METHOD();
2242 
2243 	const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
2244 	ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
2245 
2246 	int iSocket = Connect ( sHost, uPort );
2247 	if ( iSocket<0 )
2248 		SPH_RET ( iSocket );
2249 
2250 	char sError[512];
2251 
2252 	int version;
2253 	if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
2254 	{
2255 		sphSockClose ( iSocket );
2256 		my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
2257 			sHost, (int)uPort );
2258 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2259 		SPH_RET(-1);
2260 	}
2261 
2262 	uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
2263 	if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
2264 	{
2265 		sphSockClose ( iSocket );
2266 		my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
2267 			sHost, (int)uPort );
2268 		my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2269 		SPH_RET(-1);
2270 	}
2271 
2272 	SPH_RET ( iSocket );
2273 }
2274 
2275 
2276 // Closes a table. We call the free_share() function to free any resources
2277 // that we have allocated in the "shared" structure.
2278 //
2279 // Called from sql_base.cc, sql_select.cc, and table.cc.
2280 // In sql_select.cc it is only used to close up temporary tables or during
2281 // the process where a temporary table is converted over to being a
2282 // myisam table.
2283 // For sql_base.cc look at close_data_tables().
close()2284 int ha_sphinx::close()
2285 {
2286 	SPH_ENTER_METHOD();
2287 	SPH_RET ( free_share ( m_pShare ) );
2288 }
2289 
2290 
HandleMysqlError(MYSQL * pConn,int iErrCode)2291 int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
2292 {
2293 	CSphSEThreadTable * pTable = GetTls ();
2294 	if ( pTable )
2295 	{
2296 		strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof pTable->m_tStats.m_sLastMessage - 1 );
2297 		pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2298 		pTable->m_tStats.m_bLastError = true;
2299 	}
2300 
2301 	mysql_close ( pConn );
2302 
2303 	my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
2304 	return -1;
2305 }
2306 
2307 
extra(enum ha_extra_function op)2308 int ha_sphinx::extra ( enum ha_extra_function op )
2309 {
2310 	CSphSEThreadTable * pTable = GetTls();
2311 	if ( pTable )
2312 	{
2313 		if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
2314 			pTable->m_bReplace = true;
2315 		else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
2316 			pTable->m_bReplace = false;
2317 	}
2318 	return 0;
2319 }
2320 
2321 
write_row(const byte *)2322 int ha_sphinx::write_row ( const byte * )
2323 {
2324 	SPH_ENTER_METHOD();
2325 	if ( !m_pShare || !m_pShare->m_bSphinxQL )
2326 		SPH_RET ( HA_ERR_WRONG_COMMAND );
2327 
2328 	// SphinxQL inserts only, pretty much similar to abandoned federated
2329 	char sQueryBuf[1024];
2330 	char sValueBuf[1024];
2331 
2332 	String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2333 	String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
2334 	sQuery.length ( 0 );
2335 	sValue.length ( 0 );
2336 
2337 	CSphSEThreadTable * pTable = GetTls ();
2338 	sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
2339 	sQuery.append ( m_pShare->m_sIndex );
2340 	sQuery.append ( " (" );
2341 
2342 	for ( Field ** ppField = table->field; *ppField; ppField++ )
2343 	{
2344 		sQuery.append ( (*ppField)->field_name.str );
2345 		if ( ppField[1] )
2346 			sQuery.append ( ", " );
2347 	}
2348 	sQuery.append ( ") VALUES (" );
2349 
2350 	for ( Field ** ppField = table->field; *ppField; ppField++ )
2351 	{
2352 		if ( (*ppField)->is_null() )
2353 		{
2354 			sQuery.append ( "''" );
2355 
2356 		} else
2357 		{
2358                         THD *thd= ha_thd();
2359 			if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
2360 			{
2361                           Item_field * pWrap = new (thd->mem_root) Item_field(thd, *ppField); // autofreed by query arena, I assume
2362                           Item_func_unix_timestamp * pConv = new (thd->mem_root) Item_func_unix_timestamp(thd, pWrap);
2363 				pConv->quick_fix_field();
2364 				unsigned int uTs = (unsigned int) pConv->val_int();
2365 
2366 				snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
2367 				sQuery.append ( sValueBuf );
2368 
2369 			} else
2370 			{
2371 				(*ppField)->val_str ( &sValue );
2372 				sQuery.append ( "'" );
2373 				sValue.print ( &sQuery );
2374 				sQuery.append ( "'" );
2375 				sValue.length(0);
2376 			}
2377 		}
2378 
2379 		if ( ppField[1] )
2380 			sQuery.append ( ", " );
2381 	}
2382 	sQuery.append ( ")" );
2383 
2384 	// FIXME? pretty inefficient to reconnect every time under high load,
2385 	// but this was intentionally written for a low load scenario..
2386 	MYSQL * pConn = mysql_init ( NULL );
2387 	if ( !pConn )
2388 		SPH_RET ( ER_OUT_OF_RESOURCES );
2389 
2390 	unsigned int uTimeout = 1;
2391 	mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2392 
2393         my_bool my_true= 1;
2394         mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
2395 
2396 	if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2397 		SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2398 
2399 	if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2400 		SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2401 
2402 	// all ok!
2403 	mysql_close ( pConn );
2404 	SPH_RET(0);
2405 }
2406 
2407 
IsIntegerFieldType(enum_field_types eType)2408 static inline bool IsIntegerFieldType ( enum_field_types eType )
2409 {
2410 	return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
2411 }
2412 
2413 
IsIDField(Field * pField)2414 static inline bool IsIDField ( Field * pField )
2415 {
2416 	enum_field_types eType = pField->type();
2417 
2418 	if ( eType==MYSQL_TYPE_LONGLONG )
2419 		return true;
2420 
2421 	if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
2422 		return true;
2423 
2424 	return false;
2425 }
2426 
2427 
delete_row(const byte *)2428 int ha_sphinx::delete_row ( const byte * )
2429 {
2430 	SPH_ENTER_METHOD();
2431 	if ( !m_pShare || !m_pShare->m_bSphinxQL )
2432 		SPH_RET ( HA_ERR_WRONG_COMMAND );
2433 
2434 	char sQueryBuf[1024];
2435 	String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2436 	sQuery.length ( 0 );
2437 
2438 	sQuery.append ( "DELETE FROM " );
2439 	sQuery.append ( m_pShare->m_sIndex );
2440 	sQuery.append ( " WHERE id=" );
2441 
2442 	char sValue[32];
2443 	snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
2444 	sQuery.append ( sValue );
2445 
2446 	// FIXME? pretty inefficient to reconnect every time under high load,
2447 	// but this was intentionally written for a low load scenario..
2448 	MYSQL * pConn = mysql_init ( NULL );
2449 	if ( !pConn )
2450 		SPH_RET ( ER_OUT_OF_RESOURCES );
2451 
2452 	unsigned int uTimeout = 1;
2453 	mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2454 
2455         my_bool my_true= 1;
2456         mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
2457 
2458 	if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2459 		SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2460 
2461 	if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2462 		SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2463 
2464 	// all ok!
2465 	mysql_close ( pConn );
2466 	SPH_RET(0);
2467 }
2468 
2469 
update_row(const byte *,const byte *)2470 int ha_sphinx::update_row ( const byte *, const byte * )
2471 {
2472 	SPH_ENTER_METHOD();
2473 	SPH_RET ( HA_ERR_WRONG_COMMAND );
2474 }
2475 
2476 
2477 // keynr is key (index) number
2478 // sorted is 1 if result MUST be sorted according to index
index_init(uint keynr,bool)2479 int ha_sphinx::index_init ( uint keynr, bool )
2480 {
2481 	SPH_ENTER_METHOD();
2482 	active_index = keynr;
2483 
2484 	CSphSEThreadTable * pTable = GetTls();
2485 	if ( pTable )
2486 		pTable->m_bCondDone = false;
2487 
2488 	SPH_RET(0);
2489 }
2490 
2491 
index_end()2492 int ha_sphinx::index_end()
2493 {
2494 	SPH_ENTER_METHOD();
2495 	SPH_RET(0);
2496 }
2497 
2498 
CheckResponcePtr(int iLen)2499 bool ha_sphinx::CheckResponcePtr ( int iLen )
2500 {
2501 	if ( m_pCur+iLen>m_pResponseEnd )
2502 	{
2503 		m_pCur = m_pResponseEnd;
2504 		m_bUnpackError = true;
2505 		return false;
2506 	}
2507 
2508 	return true;
2509 }
2510 
2511 
UnpackDword()2512 uint32 ha_sphinx::UnpackDword ()
2513 {
2514 	if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
2515 	{
2516 		return 0;
2517 	}
2518 
2519 	uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
2520 	m_pCur += sizeof(uint32); // NOLINT
2521 	return uRes;
2522 }
2523 
2524 
UnpackString()2525 char * ha_sphinx::UnpackString ()
2526 {
2527 	uint32 iLen = UnpackDword ();
2528 	if ( !iLen )
2529 		return NULL;
2530 
2531 	if ( !CheckResponcePtr ( iLen ) )
2532 	{
2533 		return NULL;
2534 	}
2535 
2536 	char * sRes = new char [ 1+iLen ];
2537 	memcpy ( sRes, m_pCur, iLen );
2538 	sRes[iLen] = '\0';
2539 	m_pCur += iLen;
2540 	return sRes;
2541 }
2542 
2543 
UnpackSchema()2544 bool ha_sphinx::UnpackSchema ()
2545 {
2546 	SPH_ENTER_METHOD();
2547 
2548 	// cleanup
2549 	if ( m_dFields )
2550 		for ( int i=0; i<(int)m_iFields; i++ )
2551 			SafeDeleteArray ( m_dFields[i] );
2552 	SafeDeleteArray ( m_dFields );
2553 
2554 	// unpack network packet
2555 	uint32 uStatus = UnpackDword ();
2556 	char * sMessage = NULL;
2557 
2558 	if ( uStatus!=SEARCHD_OK )
2559 	{
2560 		sMessage = UnpackString ();
2561 		CSphSEThreadTable * pTable = GetTls ();
2562 		if ( pTable )
2563 		{
2564 			strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
2565 			pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2566 			pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
2567 		}
2568 
2569 		if ( uStatus==SEARCHD_ERROR )
2570 		{
2571 			char sError[1024];
2572 			my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
2573 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2574 			SafeDeleteArray ( sMessage );
2575 			SPH_RET ( false );
2576 		}
2577 	}
2578 
2579 	m_iFields = UnpackDword ();
2580 	m_dFields = new char * [ m_iFields ];
2581 	if ( !m_dFields )
2582 	{
2583 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
2584 		SPH_RET(false);
2585 	}
2586 
2587 	for ( uint32 i=0; i<m_iFields; i++ )
2588 		m_dFields[i] = UnpackString ();
2589 
2590 	SafeDeleteArray ( m_dAttrs );
2591 	m_iAttrs = UnpackDword ();
2592 	m_dAttrs = new CSphSEAttr [ m_iAttrs ];
2593 	if ( !m_dAttrs )
2594 	{
2595 		for ( int i=0; i<(int)m_iFields; i++ )
2596 			SafeDeleteArray ( m_dFields[i] );
2597 		SafeDeleteArray ( m_dFields );
2598 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
2599 		SPH_RET(false);
2600 	}
2601 
2602 	for ( uint32 i=0; i<m_iAttrs; i++ )
2603 	{
2604 		m_dAttrs[i].m_sName = UnpackString ();
2605 		m_dAttrs[i].m_uType = UnpackDword ();
2606 		if ( m_bUnpackError ) // m_sName may be null
2607 			break;
2608 
2609 		m_dAttrs[i].m_iField = -1;
2610 		for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
2611 		{
2612 			const char * sTableField = m_pShare->m_sTableField[j];
2613 			const char * sAttrField = m_dAttrs[i].m_sName;
2614 			if ( m_dAttrs[i].m_sName[0]=='@' )
2615 			{
2616 				const char * sAtPrefix = "_sph_";
2617 				if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
2618 					continue;
2619 				sTableField += strlen(sAtPrefix);
2620 				sAttrField++;
2621 			}
2622 
2623 			if ( !strcasecmp ( sAttrField, sTableField ) )
2624 			{
2625 				// we're almost good, but
2626 				// let's enforce that timestamp columns can only receive timestamp attributes
2627 				if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
2628 					m_dAttrs[i].m_iField = j;
2629 				break;
2630 			}
2631 		}
2632 	}
2633 
2634 	m_iMatchesTotal = UnpackDword ();
2635 
2636 	m_bId64 = UnpackDword ();
2637 	if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
2638 	{
2639 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
2640 		SPH_RET(false);
2641 	}
2642 
2643 	// network packet unpacked; build unbound fields map
2644 	SafeDeleteArray ( m_dUnboundFields );
2645 	m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
2646 
2647 	for ( int i=0; i<m_pShare->m_iTableFields; i++ )
2648 	{
2649 		if ( i<SPHINXSE_SYSTEM_COLUMNS )
2650 			m_dUnboundFields[i] = SPH_ATTR_NONE;
2651 
2652 		else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
2653 			m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
2654 
2655 		else
2656 			m_dUnboundFields[i] = SPH_ATTR_INTEGER;
2657 	}
2658 
2659 	for ( uint32 i=0; i<m_iAttrs; i++ )
2660 		if ( m_dAttrs[i].m_iField>=0 )
2661 			m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
2662 
2663 	if ( m_bUnpackError )
2664 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
2665 
2666 	SPH_RET ( !m_bUnpackError );
2667 }
2668 
2669 
UnpackStats(CSphSEStats * pStats)2670 bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
2671 {
2672 	assert ( pStats );
2673 
2674 	char * pCurSave = m_pCur;
2675 	for ( uint m=0; m<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); m++ ) // NOLINT
2676 	{
2677 		m_pCur += m_bId64 ? 12 : 8; // skip id+weight
2678 		for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2679 		{
2680 			if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
2681 			{
2682 				// skip MVA list
2683 				uint32 uCount = UnpackDword ();
2684 				m_pCur += uCount*4;
2685 			} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
2686 			{
2687 				uint32 iLen = UnpackDword();
2688 				m_pCur += iLen;
2689 			} else // skip normal value
2690 				m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
2691 		}
2692 	}
2693 
2694 	pStats->m_iMatchesTotal = UnpackDword ();
2695 	pStats->m_iMatchesFound = UnpackDword ();
2696 	pStats->m_iQueryMsec = UnpackDword ();
2697 	pStats->m_iWords = UnpackDword ();
2698 
2699 	if ( m_bUnpackError )
2700 		return false;
2701 
2702 	if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
2703 		return false;
2704 
2705 	SafeDeleteArray ( pStats->m_dWords );
2706 	pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
2707 	if ( !pStats->m_dWords )
2708 		return false;
2709 
2710 	for ( int i=0; i<pStats->m_iWords; i++ )
2711 	{
2712 		CSphSEWordStats & tWord = pStats->m_dWords[i];
2713 		tWord.m_sWord = UnpackString ();
2714 		tWord.m_iDocs = UnpackDword ();
2715 		tWord.m_iHits = UnpackDword ();
2716 	}
2717 
2718 	if ( m_bUnpackError )
2719 		return false;
2720 
2721 	m_pCur = pCurSave;
2722 	return true;
2723 }
2724 
2725 
2726 /// condition pushdown implementation, to properly intercept WHERE clauses on my columns
2727 #if MYSQL_VERSION_ID<50610
cond_push(const COND * cond)2728 const COND * ha_sphinx::cond_push ( const COND * cond )
2729 #else
2730 const Item * ha_sphinx::cond_push ( const Item *cond )
2731 #endif
2732 {
2733 	// catch the simplest case: query_column="some text"
2734 	for ( ;; )
2735 	{
2736 		if ( cond->type()!=Item::FUNC_ITEM )
2737 			break;
2738 
2739 		Item_func * condf = (Item_func *)cond;
2740 		if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
2741 			break;
2742 
2743 		// get my tls
2744 		CSphSEThreadTable * pTable = GetTls ();
2745 		if ( !pTable )
2746 			break;
2747 
2748 		Item ** args = condf->arguments();
2749 		if ( !m_pShare->m_bSphinxQL )
2750 		{
2751 			// on non-QL tables, intercept query=value condition for SELECT
2752 			if (!( args[0]->type()==Item::FIELD_ITEM &&
2753 			       args[1]->is_of_type(Item::CONST_ITEM,
2754 			                           STRING_RESULT)))
2755 				break;
2756 
2757 			Item_field * pField = (Item_field *) args[0];
2758 			if ( pField->field->field_index!=2 ) // FIXME! magic key index
2759 				break;
2760 
2761 			// copy the query, and let know that we intercepted this condition
2762 			String *pString= args[1]->val_str(NULL);
2763 			pTable->m_bQuery = true;
2764 			strncpy ( pTable->m_sQuery, pString->c_ptr(), sizeof(pTable->m_sQuery) );
2765 			pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
2766 			pTable->m_pQueryCharset = pString->charset();
2767 
2768 		} else
2769 		{
2770 			if (!( args[0]->type()==Item::FIELD_ITEM &&
2771 			       args[1]->is_of_type(Item::CONST_ITEM,
2772 			                           INT_RESULT)))
2773 				break;
2774 
2775 			// on QL tables, intercept id=value condition for DELETE
2776 			Item_field * pField = (Item_field *) args[0];
2777 			if ( pField->field->field_index!=0 ) // FIXME! magic key index
2778 				break;
2779 
2780 			Item_int * pVal = (Item_int *) args[1];
2781 			pTable->m_iCondId = pVal->val_int();
2782 			pTable->m_bCondId = true;
2783 		}
2784 
2785 		// we intercepted this condition
2786 		return NULL;
2787 	}
2788 
2789 	// don't change anything
2790 	return cond;
2791 }
2792 
2793 
2794 /// condition popup
cond_pop()2795 void ha_sphinx::cond_pop ()
2796 {
2797 	CSphSEThreadTable * pTable = GetTls ();
2798 	if ( pTable )
2799 		pTable->m_bQuery = false;
2800 }
2801 
2802 
2803 /// get TLS (maybe allocate it, too)
GetTls()2804 CSphSEThreadTable * ha_sphinx::GetTls()
2805 {
2806 	SPH_ENTER_METHOD()
2807 	// where do we store that pointer in today's version?
2808 	CSphTLS ** ppTls;
2809 #if MYSQL_VERSION_ID>50100
2810 	ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
2811 #else
2812 	ppTls = (CSphTLS**) &current_thd->ha_data[sphinx_hton.slot];
2813 #endif // >50100
2814 
2815 	CSphSEThreadTable * pTable = NULL;
2816 	// allocate if needed
2817 	if ( !*ppTls )
2818 	{
2819 		*ppTls = new CSphTLS ( this );
2820 		pTable = (*ppTls)->m_pHeadTable;
2821 	} else
2822 	{
2823 		pTable = (*ppTls)->m_pHeadTable;
2824 	}
2825 
2826 	while ( pTable && pTable->m_pHandler!=this )
2827 		pTable = pTable->m_pTableNext;
2828 
2829 	if ( !pTable )
2830 	{
2831 		pTable = new CSphSEThreadTable ( this );
2832 		pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
2833 		(*ppTls)->m_pHeadTable = pTable;
2834 	}
2835 
2836 	// errors will be handled by caller
2837 	return pTable;
2838 }
2839 
2840 
2841 // Positions an index cursor to the index specified in the handle. Fetches the
2842 // row if available. If the key value is null, begin at the first key of the
2843 // index.
index_read(byte * buf,const byte * key,uint key_len,enum ha_rkey_function)2844 int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
2845 {
2846 	SPH_ENTER_METHOD();
2847 	char sError[256];
2848 
2849 	// set new data for thd->ha_data, it is used in show_status
2850 	CSphSEThreadTable * pTable = GetTls();
2851 	if ( !pTable )
2852 	{
2853 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
2854 		SPH_RET ( HA_ERR_END_OF_FILE );
2855 	}
2856 	pTable->m_tStats.Reset ();
2857 
2858 	// sphinxql table, just return the key once
2859 	if ( m_pShare->m_bSphinxQL )
2860 	{
2861 		// over and out
2862 		if ( pTable->m_bCondDone )
2863 			SPH_RET ( HA_ERR_END_OF_FILE );
2864 
2865 		// return a value from pushdown, if any
2866 		if ( pTable->m_bCondId )
2867 		{
2868 			table->field[0]->store ( pTable->m_iCondId, 1 );
2869 			pTable->m_bCondDone = true;
2870 			SPH_RET(0);
2871 		}
2872 
2873 		// return a value from key
2874 		longlong iRef = 0;
2875 		if ( key_len==4 )
2876 			iRef = uint4korr ( key );
2877 		else if ( key_len==8 )
2878 			iRef = uint8korr ( key );
2879 		else
2880 		{
2881 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
2882 			SPH_RET ( HA_ERR_END_OF_FILE );
2883 		}
2884 
2885 		table->field[0]->store ( iRef, 1 );
2886 		pTable->m_bCondDone = true;
2887 		SPH_RET(0);
2888 	}
2889 
2890 	// parse query
2891 	if ( pTable->m_bQuery )
2892 	{
2893 		// we have a query from condition pushdown
2894 		m_pCurrentKey = (const byte *) pTable->m_sQuery;
2895 		m_iCurrentKeyLen = strlen(pTable->m_sQuery);
2896 	} else
2897 	{
2898 		// just use the key (might be truncated)
2899 		m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
2900 		m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
2901 		pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
2902 	}
2903 
2904 	CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
2905 	if ( !q.Parse () )
2906 	{
2907 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
2908 		SPH_RET ( HA_ERR_END_OF_FILE );
2909 	}
2910 
2911 	// do connect
2912 	int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
2913 	if ( iSocket<0 )
2914 		SPH_RET ( HA_ERR_END_OF_FILE );
2915 
2916 	// my buffer
2917 	char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
2918 	int iReqLen = q.BuildRequest ( &pBuffer );
2919 
2920 	if ( iReqLen<=0 )
2921 	{
2922 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
2923 		SPH_RET ( HA_ERR_END_OF_FILE );
2924 	}
2925 
2926 	// send request
2927 	::send ( iSocket, pBuffer, iReqLen, 0 );
2928 
2929 	// receive reply
2930 	char sHeader[8];
2931 	int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
2932 	if ( iGot!=sizeof(sHeader) )
2933 	{
2934 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
2935 		SPH_RET ( HA_ERR_END_OF_FILE );
2936 	}
2937 
2938 	short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
2939 	short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
2940 	uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
2941 	SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
2942 		uRespStatus, uRespVersion, uRespLength );
2943 
2944 	SafeDeleteArray ( m_pResponse );
2945 	if ( uRespLength<=SPHINXSE_MAX_ALLOC )
2946 		m_pResponse = new char [ uRespLength+1 ];
2947 
2948 	if ( !m_pResponse )
2949 	{
2950 		my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
2951 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2952 		SPH_RET ( HA_ERR_END_OF_FILE );
2953 	}
2954 
2955 	int iRecvLength = 0;
2956 	while ( iRecvLength<(int)uRespLength )
2957 	{
2958 		int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
2959 		if ( iRecv<0 )
2960 			break;
2961 		iRecvLength += iRecv;
2962 	}
2963 
2964 	::closesocket ( iSocket );
2965 	iSocket = -1;
2966 
2967 	if ( iRecvLength!=(int)uRespLength )
2968 	{
2969 		my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
2970 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2971 		SPH_RET ( HA_ERR_END_OF_FILE );
2972 	}
2973 
2974 	// we'll have a message, at least
2975 	pTable->m_bStats = true;
2976 
2977 	// parse reply
2978 	m_iCurrentPos = 0;
2979 	m_pCur = m_pResponse;
2980 	m_pResponseEnd = m_pResponse + uRespLength;
2981 	m_bUnpackError = false;
2982 
2983 	if ( uRespStatus!=SEARCHD_OK )
2984 	{
2985 		char * sMessage = UnpackString ();
2986 		if ( !sMessage )
2987 		{
2988 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
2989 				uRespStatus, uRespLength );
2990 			SPH_RET ( HA_ERR_END_OF_FILE );
2991 		}
2992 
2993 		strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
2994 		pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2995 		SafeDeleteArray ( sMessage );
2996 
2997 		if ( uRespStatus!=SEARCHD_WARNING )
2998 		{
2999 			my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
3000 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
3001 
3002 			pTable->m_tStats.m_bLastError = true;
3003 			SPH_RET ( HA_ERR_END_OF_FILE );
3004 		}
3005 	}
3006 
3007 	if ( !UnpackSchema () )
3008 		SPH_RET ( HA_ERR_END_OF_FILE );
3009 
3010 	if ( !UnpackStats ( &pTable->m_tStats ) )
3011 	{
3012 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
3013 		SPH_RET ( HA_ERR_END_OF_FILE );
3014 	}
3015 
3016 	SPH_RET ( get_rec ( buf, key, key_len ) );
3017 }
3018 
3019 
3020 // Positions an index cursor to the index specified in key. Fetches the
3021 // row if any. This is only used to read whole keys.
index_read_idx(byte *,uint,const byte *,uint,enum ha_rkey_function)3022 int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
3023 {
3024 	SPH_ENTER_METHOD();
3025 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3026 }
3027 
3028 
3029 // Used to read forward through the index.
index_next(byte * buf)3030 int ha_sphinx::index_next ( byte * buf )
3031 {
3032 	SPH_ENTER_METHOD();
3033 	SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
3034 }
3035 
3036 
index_next_same(byte * buf,const byte * key,uint keylen)3037 int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
3038 {
3039 	SPH_ENTER_METHOD();
3040 	SPH_RET ( get_rec ( buf, key, keylen ) );
3041 }
3042 
3043 
get_rec(byte * buf,const byte *,uint)3044 int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
3045 {
3046 	SPH_ENTER_METHOD();
3047 
3048 	if ( m_iCurrentPos>=m_iMatchesTotal )
3049 	{
3050 		SafeDeleteArray ( m_pResponse );
3051 		SPH_RET ( HA_ERR_END_OF_FILE );
3052 	}
3053 
3054 	#if MYSQL_VERSION_ID>50100
3055 	MY_BITMAP * org_bitmap = dbug_tmp_use_all_columns ( table, &table->write_set );
3056 	#endif
3057 	Field ** field = table->field;
3058 
3059 	// unpack and return the match
3060 	longlong uMatchID = UnpackDword ();
3061 	if ( m_bId64 )
3062 		uMatchID = ( uMatchID<<32 ) + UnpackDword();
3063 	uint32 uMatchWeight = UnpackDword ();
3064 
3065 	field[0]->store ( uMatchID, 1 );
3066 	field[1]->store ( uMatchWeight, 1 );
3067 	field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
3068 
3069 	for ( uint32 i=0; i<m_iAttrs; i++ )
3070 	{
3071 		longlong iValue64 = 0;
3072 		uint32 uValue = UnpackDword ();
3073 		if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
3074 			iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
3075 		if ( m_dAttrs[i].m_iField<0 )
3076 		{
3077 			// skip MVA or String
3078 			if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
3079 			{
3080 				for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3081 					UnpackDword();
3082 			} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
3083 			{
3084 				m_pCur += uValue;
3085 			}
3086 			continue;
3087 		}
3088 
3089 		Field * af = field [ m_dAttrs[i].m_iField ];
3090 		switch ( m_dAttrs[i].m_uType )
3091 		{
3092 			case SPH_ATTR_INTEGER:
3093 			case SPH_ATTR_ORDINAL:
3094 			case SPH_ATTR_BOOL:
3095 				af->store ( uValue, 1 );
3096 				break;
3097 
3098 			case SPH_ATTR_FLOAT:
3099 				af->store ( sphDW2F(uValue) );
3100 				break;
3101 
3102 			case SPH_ATTR_TIMESTAMP:
3103 				if ( af->type()==MYSQL_TYPE_TIMESTAMP )
3104 					longstore ( af->ptr, uValue ); // because store() does not accept timestamps
3105 				else
3106 					af->store ( uValue, 1 );
3107 				break;
3108 
3109 			case SPH_ATTR_BIGINT:
3110 				af->store ( iValue64, 0 );
3111 				break;
3112 
3113 			case SPH_ATTR_STRING:
3114 				if ( !uValue )
3115 					af->store ( "", 0, &my_charset_bin );
3116 				else if ( CheckResponcePtr ( uValue ) )
3117 				{
3118 					af->store ( m_pCur, uValue, &my_charset_bin );
3119 					m_pCur += uValue;
3120 				}
3121 				break;
3122 
3123 			case SPH_ATTR_UINT64SET:
3124 			case SPH_ATTR_UINT32SET :
3125 				if ( uValue<=0 )
3126 				{
3127 					// shortcut, empty MVA set
3128 					af->store ( "", 0, &my_charset_bin );
3129 
3130 				} else
3131 				{
3132 					// convert MVA set to comma-separated string
3133 					char sBuf[1024]; // FIXME! magic size
3134 					char * pCur = sBuf;
3135 
3136 					if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
3137 					{
3138 						for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3139 						{
3140 							uint32 uEntry = UnpackDword ();
3141 							if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
3142 							{
3143 								snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
3144 								while ( *pCur ) pCur++;
3145 								if ( uValue>1 )
3146 									*pCur++ = ','; // non-trailing commas
3147 							}
3148 						}
3149 					} else
3150 					{
3151 						for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
3152 						{
3153 							uint32 uEntryLo = UnpackDword ();
3154 							uint32 uEntryHi = UnpackDword();
3155 							if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
3156 							{
3157 								snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u%u", uEntryHi, uEntryLo );
3158 								while ( *pCur ) pCur++;
3159 								if ( uValue>2 )
3160 									*pCur++ = ','; // non-trailing commas
3161 							}
3162 						}
3163 					}
3164 
3165 					af->store ( sBuf, uint(pCur-sBuf), &my_charset_bin );
3166 				}
3167 				break;
3168 
3169 			default:
3170 				my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
3171 				SafeDeleteArray ( m_pResponse );
3172 				SPH_RET ( HA_ERR_END_OF_FILE );
3173 		}
3174 	}
3175 
3176 	if ( m_bUnpackError )
3177 	{
3178 		my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
3179 		SafeDeleteArray ( m_pResponse );
3180 		SPH_RET ( HA_ERR_END_OF_FILE );
3181 	}
3182 
3183 	// zero out unmapped fields
3184 	for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
3185 		if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
3186 			switch ( m_dUnboundFields[i] )
3187 	{
3188 		case SPH_ATTR_INTEGER:		table->field[i]->store ( 0, 1 ); break;
3189 		case SPH_ATTR_TIMESTAMP:	longstore ( table->field[i]->ptr, 0 ); break;
3190 		default:
3191 			my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
3192 				"INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
3193 			SafeDeleteArray ( m_pResponse );
3194 			SPH_RET ( HA_ERR_END_OF_FILE );
3195 	}
3196 
3197 	memset ( buf, 0, table->s->null_bytes );
3198 	m_iCurrentPos++;
3199 
3200 	#if MYSQL_VERSION_ID > 50100
3201 	dbug_tmp_restore_column_map ( &table->write_set, org_bitmap );
3202 	#endif
3203 
3204 	SPH_RET(0);
3205 }
3206 
3207 
3208 // Used to read backwards through the index.
index_prev(byte *)3209 int ha_sphinx::index_prev ( byte * )
3210 {
3211 	SPH_ENTER_METHOD();
3212 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3213 }
3214 
3215 
3216 // index_first() asks for the first key in the index.
3217 //
3218 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3219 // and sql_select.cc.
index_first(byte *)3220 int ha_sphinx::index_first ( byte * )
3221 {
3222 	SPH_ENTER_METHOD();
3223 	SPH_RET ( HA_ERR_END_OF_FILE );
3224 }
3225 
3226 // index_last() asks for the last key in the index.
3227 //
3228 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3229 // and sql_select.cc.
index_last(byte *)3230 int ha_sphinx::index_last ( byte * )
3231 {
3232 	SPH_ENTER_METHOD();
3233 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3234 }
3235 
3236 
rnd_init(bool)3237 int ha_sphinx::rnd_init ( bool )
3238 {
3239 	SPH_ENTER_METHOD();
3240 	SPH_RET(0);
3241 }
3242 
3243 
rnd_end()3244 int ha_sphinx::rnd_end()
3245 {
3246 	SPH_ENTER_METHOD();
3247 	SPH_RET(0);
3248 }
3249 
3250 
rnd_next(byte *)3251 int ha_sphinx::rnd_next ( byte * )
3252 {
3253 	SPH_ENTER_METHOD();
3254 	SPH_RET ( HA_ERR_END_OF_FILE );
3255 }
3256 
3257 
position(const byte *)3258 void ha_sphinx::position ( const byte * )
3259 {
3260 	SPH_ENTER_METHOD();
3261 	SPH_VOID_RET();
3262 }
3263 
3264 
3265 // This is like rnd_next, but you are given a position to use
3266 // to determine the row. The position will be of the type that you stored in
3267 // ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
3268 // or position you saved when position() was called.
3269 // Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
rnd_pos(byte *,byte *)3270 int ha_sphinx::rnd_pos ( byte *, byte * )
3271 {
3272 	SPH_ENTER_METHOD();
3273 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3274 }
3275 
3276 
3277 #if MYSQL_VERSION_ID>=50030
info(uint)3278 int ha_sphinx::info ( uint )
3279 #else
3280 void ha_sphinx::info ( uint )
3281 #endif
3282 {
3283 	SPH_ENTER_METHOD();
3284 
3285 	if ( table->s->keys>0 )
3286 		table->key_info[0].rec_per_key[0] = 1;
3287 
3288 	#if MYSQL_VERSION_ID>50100
3289 	stats.records = 20;
3290 	#else
3291 	records = 20;
3292 	#endif
3293 
3294 #if MYSQL_VERSION_ID>=50030
3295 	SPH_RET(0);
3296 #else
3297 	SPH_VOID_RET();
3298 #endif
3299 }
3300 
3301 
reset()3302 int ha_sphinx::reset ()
3303 {
3304 	SPH_ENTER_METHOD();
3305 	CSphSEThreadTable * pTable = GetTls ();
3306 	if ( pTable )
3307 		pTable->m_bQuery = false;
3308 	SPH_RET(0);
3309 }
3310 
3311 
delete_all_rows()3312 int ha_sphinx::delete_all_rows()
3313 {
3314 	SPH_ENTER_METHOD();
3315 	SPH_RET ( HA_ERR_WRONG_COMMAND );
3316 }
3317 
3318 
3319 // First you should go read the section "locking functions for mysql" in
3320 // lock.cc to understand this.
3321 // This create a lock on the table. If you are implementing a storage engine
3322 // that can handle transacations look at ha_berkely.cc to see how you will
3323 // want to go about doing this. Otherwise you should consider calling flock()
3324 // here.
3325 //
3326 // Called from lock.cc by lock_external() and unlock_external(). Also called
3327 // from sql_table.cc by copy_data_between_tables().
external_lock(THD *,int)3328 int ha_sphinx::external_lock ( THD *, int )
3329 {
3330 	SPH_ENTER_METHOD();
3331 	SPH_RET(0);
3332 }
3333 
3334 
store_lock(THD *,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3335 THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
3336 	enum thr_lock_type lock_type )
3337 {
3338 	SPH_ENTER_METHOD();
3339 
3340 	if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
3341 		m_tLock.type = lock_type;
3342 
3343 	*to++ = &m_tLock;
3344 	SPH_RET(to);
3345 }
3346 
3347 
delete_table(const char *)3348 int ha_sphinx::delete_table ( const char * )
3349 {
3350 	SPH_ENTER_METHOD();
3351 	SPH_RET(0);
3352 }
3353 
3354 
3355 // Renames a table from one name to another from alter table call.
3356 //
3357 // If you do not implement this, the default rename_table() is called from
3358 // handler.cc and it will delete all files with the file extensions returned
3359 // by bas_ext().
3360 //
3361 // Called from sql_table.cc by mysql_rename_table().
rename_table(const char *,const char *)3362 int ha_sphinx::rename_table ( const char *, const char * )
3363 {
3364 	SPH_ENTER_METHOD();
3365 	SPH_RET(0);
3366 }
3367 
3368 
3369 // Given a starting key, and an ending key estimate the number of rows that
3370 // will exist between the two. end_key may be empty which in case determine
3371 // if start_key matches any rows.
3372 //
3373 // Called from opt_range.cc by check_quick_keys().
records_in_range(uint,key_range *,key_range *)3374 ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
3375 {
3376 	SPH_ENTER_METHOD();
3377 	SPH_RET(3); // low number to force index usage
3378 }
3379 
3380 #if MYSQL_VERSION_ID < 50610
3381 #define user_defined_key_parts key_parts
3382 #endif
3383 
3384 // create() is called to create a database. The variable name will have the name
3385 // of the table. When create() is called you do not need to worry about opening
3386 // the table. Also, the FRM file will have already been created so adjusting
3387 // create_info will not do you any good. You can overwrite the frm file at this
3388 // point if you wish to change the table definition, but there are no methods
3389 // currently provided for doing that.
3390 //
3391 // Called from handle.cc by ha_create_table().
create(const char * name,TABLE * table_arg,HA_CREATE_INFO *)3392 int ha_sphinx::create ( const char * name, TABLE * table_arg, HA_CREATE_INFO * )
3393 {
3394 	SPH_ENTER_METHOD();
3395 	char sError[256];
3396 
3397 	CSphSEShare tInfo;
3398 	if ( !ParseUrl ( &tInfo, table_arg, true ) )
3399 		SPH_RET(-1);
3400 
3401 	// check SphinxAPI table
3402 	for ( ; !tInfo.m_bSphinxQL; )
3403 	{
3404 		// check system fields (count and types)
3405 		if ( table_arg->s->fields<SPHINXSE_SYSTEM_COLUMNS )
3406 		{
3407 			my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
3408 				name, SPHINXSE_SYSTEM_COLUMNS );
3409 			break;
3410 		}
3411 
3412 		if ( !IsIDField ( table_arg->field[0] ) )
3413 		{
3414 			my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
3415 			break;
3416 		}
3417 
3418 		if ( !IsIntegerFieldType ( table_arg->field[1]->type() ) )
3419 		{
3420 			my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
3421 			break;
3422 		}
3423 
3424 		enum_field_types f2 = table_arg->field[2]->type();
3425 		if ( f2!=MYSQL_TYPE_VARCHAR
3426 			&& f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
3427 		{
3428 			my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
3429 			break;
3430 		}
3431 
3432 		// check attributes
3433 		int i;
3434 		for ( i=3; i<(int)table_arg->s->fields; i++ )
3435 		{
3436 			enum_field_types eType = table_arg->field[i]->type();
3437 			if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3438 			{
3439 				my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
3440 					name, i+1, table_arg->field[i]->field_name.str );
3441 				break;
3442 			}
3443 		}
3444 
3445 		if ( i!=(int)table_arg->s->fields )
3446 			break;
3447 
3448 		// check index
3449 		if (
3450 			table_arg->s->keys!=1 ||
3451 			table_arg->key_info[0].user_defined_key_parts!=1 ||
3452 			strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, table_arg->field[2]->field_name.str ) )
3453 		{
3454 			my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
3455 				name, table_arg->field[2]->field_name.str );
3456 			break;
3457 		}
3458 
3459 		// all good
3460 		sError[0] = '\0';
3461 		break;
3462 	}
3463 
3464 	// check SphinxQL table
3465 	for ( ; tInfo.m_bSphinxQL; )
3466 	{
3467 		sError[0] = '\0';
3468 
3469 		// check that 1st column is id, is of int type, and has an index
3470 		if ( strcmp ( table_arg->field[0]->field_name.str, "id" ) )
3471 		{
3472 			my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
3473 			break;
3474 		}
3475 
3476 		if ( !IsIDField ( table_arg->field[0] ) )
3477 		{
3478 			my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
3479 			break;
3480 		}
3481 
3482 		// check index
3483 		if (
3484 			table_arg->s->keys!=1 ||
3485 			table_arg->key_info[0].user_defined_key_parts!=1 ||
3486 			strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, "id" ) )
3487 		{
3488 			my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
3489 			break;
3490 		}
3491 
3492 		// check column types
3493 		for ( int i=1; i<(int)table_arg->s->fields; i++ )
3494 		{
3495 			enum_field_types eType = table_arg->field[i]->type();
3496 			if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3497 			{
3498 				my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
3499 					name, i+1, table_arg->field[i]->field_name.str );
3500 				break;
3501 			}
3502 		}
3503 		if ( sError[0] )
3504 			break;
3505 
3506 		// all good
3507 		break;
3508 	}
3509 
3510 	// report and bail
3511 	if ( sError[0] )
3512 	{
3513 		my_printf_error(ER_CANT_CREATE_TABLE,
3514                                 "Can\'t create table %s.%s (Error: %s)",
3515                                 MYF(0),
3516                                 table_arg->s->db.str,
3517                                 table_arg->s->table_name.str, sError);
3518 		SPH_RET(-1);
3519 	}
3520 
3521 	SPH_RET(0);
3522 }
3523 
3524 // show functions
3525 
3526 #if MYSQL_VERSION_ID<50100
3527 #define SHOW_VAR_FUNC_BUFF_SIZE 1024
3528 #endif
3529 
sphinx_get_stats(THD * thd,SHOW_VAR * out)3530 CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
3531 {
3532 #if MYSQL_VERSION_ID>50100
3533 	if ( sphinx_hton_ptr )
3534 	{
3535 		CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3536 
3537 		if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3538 			return &pTls->m_pHeadTable->m_tStats;
3539 	}
3540 #else
3541 	CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3542 	if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3543 		return &pTls->m_pHeadTable->m_tStats;
3544 #endif
3545 
3546 	out->type = SHOW_CHAR;
3547 	out->value = (char*) "";
3548 	return 0;
3549 }
3550 
sphinx_showfunc_total(THD * thd,SHOW_VAR * out,char *)3551 int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
3552 {
3553 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3554 	if ( pStats )
3555 	{
3556 		out->type = SHOW_INT;
3557 		out->value = (char *) &pStats->m_iMatchesTotal;
3558 	}
3559 	return 0;
3560 }
3561 
sphinx_showfunc_total_found(THD * thd,SHOW_VAR * out,char *)3562 int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
3563 {
3564 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3565 	if ( pStats )
3566 	{
3567 		out->type = SHOW_INT;
3568 		out->value = (char *) &pStats->m_iMatchesFound;
3569 	}
3570 	return 0;
3571 }
3572 
sphinx_showfunc_time(THD * thd,SHOW_VAR * out,char *)3573 int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
3574 {
3575 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3576 	if ( pStats )
3577 	{
3578 		out->type = SHOW_INT;
3579 		out->value = (char *) &pStats->m_iQueryMsec;
3580 	}
3581 	return 0;
3582 }
3583 
sphinx_showfunc_word_count(THD * thd,SHOW_VAR * out,char *)3584 int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
3585 {
3586 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3587 	if ( pStats )
3588 	{
3589 		out->type = SHOW_INT;
3590 		out->value = (char *) &pStats->m_iWords;
3591 	}
3592 	return 0;
3593 }
3594 
sphinx_showfunc_words(THD * thd,SHOW_VAR * out,char * sBuffer)3595 int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
3596 {
3597 #if MYSQL_VERSION_ID>50100
3598 	if ( sphinx_hton_ptr )
3599 	{
3600 		CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3601 #else
3602 	{
3603 		CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3604 #endif
3605 		if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3606 		{
3607 			CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
3608 			if ( pStats && pStats->m_iWords )
3609 			{
3610 				uint uBuffLen = 0;
3611 
3612 				out->type = SHOW_CHAR;
3613 				out->value = sBuffer;
3614 
3615 				// the following is partially based on code in sphinx_show_status()
3616 				sBuffer[0] = 0;
3617 				for ( int i=0; i<pStats->m_iWords; i++ )
3618 				{
3619 					CSphSEWordStats & tWord = pStats->m_dWords[i];
3620 					uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
3621 						tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
3622 				}
3623 
3624 				if ( uBuffLen > 0 )
3625 				{
3626 					// trim last space
3627 					sBuffer [ --uBuffLen ] = 0;
3628 
3629 					if ( pTls->m_pHeadTable->m_pQueryCharset )
3630 					{
3631 						// String::c_ptr() will nul-terminate the buffer.
3632 						//
3633 						// NOTE: It's not entirely clear whether this conversion is necessary at all.
3634 
3635 						String sConvert;
3636 						uint iErrors;
3637 						sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
3638 						memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
3639 					}
3640 				}
3641 
3642 				return 0;
3643 			}
3644 		}
3645 	}
3646 
3647 	out->type = SHOW_CHAR;
3648 	out->value = (char*) "";
3649 	return 0;
3650 }
3651 
3652 int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
3653 {
3654 	CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3655 	out->type = SHOW_CHAR;
3656 	if ( pStats && pStats->m_bLastError )
3657 	{
3658 		out->value = pStats->m_sLastMessage;
3659 	}
3660         else
3661 		out->value = (char*)"";
3662 	return 0;
3663 }
3664 
3665 #if MYSQL_VERSION_ID>50100
3666 struct st_mysql_storage_engine sphinx_storage_engine =
3667 {
3668 	MYSQL_HANDLERTON_INTERFACE_VERSION
3669 };
3670 
3671 struct st_mysql_show_var sphinx_status_vars[] =
3672 {
3673 	{"Sphinx_total",		(char *)sphinx_showfunc_total,			SHOW_SIMPLE_FUNC},
3674 	{"Sphinx_total_found",	(char *)sphinx_showfunc_total_found,	SHOW_SIMPLE_FUNC},
3675 	{"Sphinx_time",			(char *)sphinx_showfunc_time,			SHOW_SIMPLE_FUNC},
3676 	{"Sphinx_word_count",	(char *)sphinx_showfunc_word_count,		SHOW_SIMPLE_FUNC},
3677 	{"Sphinx_words",		(char *)sphinx_showfunc_words,			SHOW_SIMPLE_FUNC},
3678 	{"Sphinx_error",		(char *)sphinx_showfunc_error,			SHOW_SIMPLE_FUNC},
3679 	{0, 0, (enum_mysql_show_type)0}
3680 };
3681 
3682 
3683 maria_declare_plugin(sphinx)
3684 {
3685 	MYSQL_STORAGE_ENGINE_PLUGIN,
3686 	&sphinx_storage_engine,
3687 	sphinx_hton_name,
3688 	"Sphinx developers",
3689 	sphinx_hton_comment,
3690 	PLUGIN_LICENSE_GPL,
3691 	sphinx_init_func, // Plugin Init
3692 	sphinx_done_func, // Plugin Deinit
3693 	0x0202, // 2.2
3694 	sphinx_status_vars,
3695 	NULL,
3696 	SPHINXSE_VERSION, // string version
3697 MariaDB_PLUGIN_MATURITY_GAMMA
3698 }
3699 maria_declare_plugin_end;
3700 
3701 #endif // >50100
3702 
3703 //
3704 // $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
3705 //
3706