1 //
2 // $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
3 //
4
5 //
6 // Copyright (c) 2001-2014, Andrew Aksyonoff
7 // Copyright (c) 2008-2014, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15
16 #ifdef USE_PRAGMA_IMPLEMENTATION
17 #pragma implementation // gcc: Class implementation
18 #endif
19
20 #if defined(_MSC_VER) && _MSC_VER>=1400
21 #define _CRT_SECURE_NO_DEPRECATE 1
22 #define _CRT_NONSTDC_NO_DEPRECATE 1
23 #endif
24
25 #include <my_global.h>
26 #include <mysql_version.h>
27
28 #if MYSQL_VERSION_ID>=50515
29 #include "sql_class.h"
30 #include "sql_array.h"
31 #elif MYSQL_VERSION_ID>50100
32 #include "mysql_priv.h"
33 #include <mysql/plugin.h>
34 #else
35 #include "../mysql_priv.h"
36 #endif
37
38 #include <mysys_err.h>
39 #include <my_sys.h>
40 #include <mysql.h> // include client for INSERT table (sort of redoing federated..)
41
42 #ifndef __WIN__
43 // UNIX-specific
44 #include <my_net.h>
45 #include <netdb.h>
46 #include <sys/un.h>
47
48 #define RECV_FLAGS MSG_WAITALL
49
50 #define sphSockClose(_sock) ::close(_sock)
51 #else
52 // Windows-specific
53 #include <io.h>
54 #define snprintf _snprintf
55
56 #define RECV_FLAGS 0
57
58 #define sphSockClose(_sock) ::closesocket(_sock)
59 #endif
60
61 #include <ctype.h>
62 #include "ha_sphinx.h"
63
64 #ifndef MSG_WAITALL
65 #define MSG_WAITALL 0
66 #endif
67
68 #if defined(_MSC_VER) && _MSC_VER>=1400
69 #pragma warning(push,4)
70 #endif
71
72 /////////////////////////////////////////////////////////////////////////////
73
74 /// there might be issues with min() on different platforms (eg. Gentoo, they say)
75 #define Min(a,b) ((a)<(b)?(a):(b))
76
77 /// unaligned RAM accesses are forbidden on SPARC
78 #if defined(sparc) || defined(__sparc__)
79 #define UNALIGNED_RAM_ACCESS 0
80 #else
81 #define UNALIGNED_RAM_ACCESS 1
82 #endif
83
84
85 #if UNALIGNED_RAM_ACCESS
86
87 /// pass-through wrapper
sphUnalignedRead(const T & tRef)88 template < typename T > inline T sphUnalignedRead ( const T & tRef )
89 {
90 return tRef;
91 }
92
93 /// pass-through wrapper
sphUnalignedWrite(void * pPtr,const T & tVal)94 template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
95 {
96 *(T*)pPtr = tVal;
97 }
98
99 #else
100
101 /// unaligned read wrapper for some architectures (eg. SPARC)
102 template < typename T >
sphUnalignedRead(const T & tRef)103 inline T sphUnalignedRead ( const T & tRef )
104 {
105 T uTmp;
106 byte * pSrc = (byte *) &tRef;
107 byte * pDst = (byte *) &uTmp;
108 for ( int i=0; i<(int)sizeof(T); i++ )
109 *pDst++ = *pSrc++;
110 return uTmp;
111 }
112
113 /// unaligned write wrapper for some architectures (eg. SPARC)
114 template < typename T >
sphUnalignedWrite(void * pPtr,const T & tVal)115 void sphUnalignedWrite ( void * pPtr, const T & tVal )
116 {
117 byte * pDst = (byte *) pPtr;
118 byte * pSrc = (byte *) &tVal;
119 for ( int i=0; i<(int)sizeof(T); i++ )
120 *pDst++ = *pSrc++;
121 }
122
123 #endif
124
125 #if MYSQL_VERSION_ID>=50515
126
127 #define sphinx_hash_init my_hash_init
128 #define sphinx_hash_free my_hash_free
129 #define sphinx_hash_search my_hash_search
130 #define sphinx_hash_delete my_hash_delete
131
132 #else
133
134 #define sphinx_hash_init hash_init
135 #define sphinx_hash_free hash_free
136 #define sphinx_hash_search hash_search
137 #define sphinx_hash_delete hash_delete
138
139 #endif
140
141 /////////////////////////////////////////////////////////////////////////////
142
143 // FIXME! make this all dynamic
144 #define SPHINXSE_MAX_FILTERS 32
145
146 #define SPHINXAPI_DEFAULT_HOST "127.0.0.1"
147 #define SPHINXAPI_DEFAULT_PORT 9312
148 #define SPHINXAPI_DEFAULT_INDEX "*"
149
150 #define SPHINXQL_DEFAULT_PORT 9306
151
152 #define SPHINXSE_SYSTEM_COLUMNS 3
153
154 #define SPHINXSE_MAX_ALLOC (16*1024*1024)
155 #define SPHINXSE_MAX_KEYWORDSTATS 4096
156
157 #define SPHINXSE_VERSION "2.2.6-release"
158
159 // FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
160 // cut-n-paste is somewhat simpler that adding dependencies however..
161
162 enum
163 {
164 SPHINX_SEARCHD_PROTO = 1,
165 SEARCHD_COMMAND_SEARCH = 0,
166 VER_COMMAND_SEARCH = 0x119,
167 };
168
169 /// search query sorting orders
170 enum ESphSortOrder
171 {
172 SPH_SORT_RELEVANCE = 0, ///< sort by document relevance desc, then by date
173 SPH_SORT_ATTR_DESC = 1, ///< sort by document date desc, then by relevance desc
174 SPH_SORT_ATTR_ASC = 2, ///< sort by document date asc, then by relevance desc
175 SPH_SORT_TIME_SEGMENTS = 3, ///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
176 SPH_SORT_EXTENDED = 4, ///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
177 SPH_SORT_EXPR = 5, ///< sort by expression
178
179 SPH_SORT_TOTAL
180 };
181
182 /// search query matching mode
183 enum ESphMatchMode
184 {
185 SPH_MATCH_ALL = 0, ///< match all query words
186 SPH_MATCH_ANY, ///< match any query word
187 SPH_MATCH_PHRASE, ///< match this exact phrase
188 SPH_MATCH_BOOLEAN, ///< match this boolean query
189 SPH_MATCH_EXTENDED, ///< match this extended query
190 SPH_MATCH_FULLSCAN, ///< match all document IDs w/o fulltext query, apply filters
191 SPH_MATCH_EXTENDED2, ///< extended engine V2
192
193 SPH_MATCH_TOTAL
194 };
195
196 /// search query relevance ranking mode
197 enum ESphRankMode
198 {
199 SPH_RANK_PROXIMITY_BM25 = 0, ///< default mode, phrase proximity major factor and BM25 minor one
200 SPH_RANK_BM25 = 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
201 SPH_RANK_NONE = 2, ///< no ranking, all matches get a weight of 1
202 SPH_RANK_WORDCOUNT = 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
203 SPH_RANK_PROXIMITY = 4, ///< phrase proximity
204 SPH_RANK_MATCHANY = 5, ///< emulate old match-any weighting
205 SPH_RANK_FIELDMASK = 6, ///< sets bits where there were matches
206 SPH_RANK_SPH04 = 7, ///< codename SPH04, phrase proximity + bm25 + head/exact boost
207 SPH_RANK_EXPR = 8, ///< expression based ranker
208
209 SPH_RANK_TOTAL,
210 SPH_RANK_DEFAULT = SPH_RANK_PROXIMITY_BM25
211 };
212
213 /// search query grouping mode
214 enum ESphGroupBy
215 {
216 SPH_GROUPBY_DAY = 0, ///< group by day
217 SPH_GROUPBY_WEEK = 1, ///< group by week
218 SPH_GROUPBY_MONTH = 2, ///< group by month
219 SPH_GROUPBY_YEAR = 3, ///< group by year
220 SPH_GROUPBY_ATTR = 4, ///< group by attribute value
221 SPH_GROUPBY_ATTRPAIR = 5, ///< group by sequential attrs pair (rendered redundant by 64bit attrs support; removed)
222 SPH_GROUPBY_MULTIPLE = 6 ///< group by on multiple attribute values
223 };
224
225 /// known attribute types
226 enum
227 {
228 SPH_ATTR_NONE = 0, ///< not an attribute at all
229 SPH_ATTR_INTEGER = 1, ///< this attr is just an integer
230 SPH_ATTR_TIMESTAMP = 2, ///< this attr is a timestamp
231 SPH_ATTR_ORDINAL = 3, ///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
232 SPH_ATTR_BOOL = 4, ///< this attr is a boolean bit field
233 SPH_ATTR_FLOAT = 5,
234 SPH_ATTR_BIGINT = 6,
235 SPH_ATTR_STRING = 7, ///< string (binary; in-memory)
236
237 SPH_ATTR_UINT32SET = 0x40000001UL, ///< this attr is multiple int32 values (0 or more)
238 SPH_ATTR_UINT64SET = 0x40000002UL ///< this attr is multiple int64 values (0 or more)
239 };
240
241 /// known answers
242 enum
243 {
244 SEARCHD_OK = 0, ///< general success, command-specific reply follows
245 SEARCHD_ERROR = 1, ///< general failure, error message follows
246 SEARCHD_RETRY = 2, ///< temporary failure, error message follows, client should retry later
247 SEARCHD_WARNING = 3 ///< general success, warning message and command-specific reply follow
248 };
249
250 //////////////////////////////////////////////////////////////////////////////
251
252 #define SPHINX_DEBUG_OUTPUT 0
253 #define SPHINX_DEBUG_CALLS 0
254
255 #include <stdarg.h>
256
257 #if SPHINX_DEBUG_OUTPUT
SPH_DEBUG(const char * format,...)258 inline void SPH_DEBUG ( const char * format, ... )
259 {
260 va_list ap;
261 va_start ( ap, format );
262 fprintf ( stderr, "SphinxSE: " );
263 vfprintf ( stderr, format, ap );
264 fprintf ( stderr, "\n" );
265 va_end ( ap );
266 }
267 #else
SPH_DEBUG(const char *,...)268 inline void SPH_DEBUG ( const char *, ... ) {}
269 #endif
270
271 #if SPHINX_DEBUG_CALLS
272
273 #define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
274 #define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
275 #define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
276 #define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
277
278 #else
279
280 #define SPH_ENTER_FUNC()
281 #define SPH_ENTER_METHOD()
282 #define SPH_RET(_arg) { return(_arg); }
283 #define SPH_VOID_RET() { return; }
284
285 #endif
286
287
288 #define SafeDelete(_arg) { delete ( _arg ); (_arg) = NULL; }
289 #define SafeDeleteArray(_arg) { if ( _arg ) { delete [] ( _arg ); (_arg) = NULL; } }
290
291 //////////////////////////////////////////////////////////////////////////////
292
293 /// per-table structure that will be shared among all open Sphinx SE handlers
294 struct CSphSEShare
295 {
296 pthread_mutex_t m_tMutex;
297 THR_LOCK m_tLock;
298
299 char * m_sTable;
300 char * m_sScheme; ///< our connection string
301 char * m_sHost; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
302 char * m_sSocket; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
303 char * m_sIndex; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
304 ushort m_iPort;
305 bool m_bSphinxQL; ///< is this read-only SphinxAPI table, or write-only SphinxQL table?
306 uint m_iTableNameLen;
307 uint m_iUseCount;
308 #if MYSQL_VERSION_ID<50610
309 CHARSET_INFO * m_pTableQueryCharset;
310 #else
311 const CHARSET_INFO * m_pTableQueryCharset;
312 #endif
313
314 int m_iTableFields;
315 char ** m_sTableField;
316 enum_field_types * m_eTableFieldType;
317
CSphSEShareCSphSEShare318 CSphSEShare ()
319 : m_sTable ( NULL )
320 , m_sScheme ( NULL )
321 , m_sHost ( NULL )
322 , m_sSocket ( NULL )
323 , m_sIndex ( NULL )
324 , m_iPort ( 0 )
325 , m_bSphinxQL ( false )
326 , m_iTableNameLen ( 0 )
327 , m_iUseCount ( 1 )
328 , m_pTableQueryCharset ( NULL )
329
330 , m_iTableFields ( 0 )
331 , m_sTableField ( NULL )
332 , m_eTableFieldType ( NULL )
333 {
334 thr_lock_init ( &m_tLock );
335 pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
336 }
337
~CSphSEShareCSphSEShare338 ~CSphSEShare ()
339 {
340 pthread_mutex_destroy ( &m_tMutex );
341 thr_lock_delete ( &m_tLock );
342
343 SafeDeleteArray ( m_sTable );
344 SafeDeleteArray ( m_sScheme );
345 ResetTable ();
346 }
347
ResetTableCSphSEShare348 void ResetTable ()
349 {
350 for ( int i=0; i<m_iTableFields; i++ )
351 SafeDeleteArray ( m_sTableField[i] );
352 SafeDeleteArray ( m_sTableField );
353 SafeDeleteArray ( m_eTableFieldType );
354 }
355 };
356
357 /// schema attribute
358 struct CSphSEAttr
359 {
360 char * m_sName; ///< attribute name (received from Sphinx)
361 uint32 m_uType; ///< attribute type (received from Sphinx)
362 int m_iField; ///< field index in current table (-1 if none)
363
CSphSEAttrCSphSEAttr364 CSphSEAttr()
365 : m_sName ( NULL )
366 , m_uType ( SPH_ATTR_NONE )
367 , m_iField ( -1 )
368 {}
369
~CSphSEAttrCSphSEAttr370 ~CSphSEAttr ()
371 {
372 SafeDeleteArray ( m_sName );
373 }
374 };
375
376 /// word stats
377 struct CSphSEWordStats
378 {
379 char * m_sWord;
380 int m_iDocs;
381 int m_iHits;
382
CSphSEWordStatsCSphSEWordStats383 CSphSEWordStats ()
384 : m_sWord ( NULL )
385 , m_iDocs ( 0 )
386 , m_iHits ( 0 )
387 {}
388
~CSphSEWordStatsCSphSEWordStats389 ~CSphSEWordStats ()
390 {
391 SafeDeleteArray ( m_sWord );
392 }
393 };
394
395 /// request stats
396 struct CSphSEStats
397 {
398 public:
399 int m_iMatchesTotal;
400 int m_iMatchesFound;
401 int m_iQueryMsec;
402 int m_iWords;
403 CSphSEWordStats * m_dWords;
404 bool m_bLastError;
405 char m_sLastMessage[1024];
406
CSphSEStatsCSphSEStats407 CSphSEStats()
408 : m_dWords ( NULL )
409 {
410 Reset ();
411 }
412
ResetCSphSEStats413 void Reset ()
414 {
415 m_iMatchesTotal = 0;
416 m_iMatchesFound = 0;
417 m_iQueryMsec = 0;
418 m_iWords = 0;
419 m_bLastError = false;
420 m_sLastMessage[0] = '\0';
421 SafeDeleteArray ( m_dWords );
422 }
423
~CSphSEStatsCSphSEStats424 ~CSphSEStats()
425 {
426 SafeDeleteArray ( m_dWords );
427 }
428 };
429
430 /// thread local storage
431 struct CSphSEThreadTable
432 {
433 static const int MAX_QUERY_LEN = 262144; // 256k should be enough, right?
434
435 bool m_bStats;
436 CSphSEStats m_tStats;
437
438 bool m_bQuery;
439 char m_sQuery[MAX_QUERY_LEN];
440
441 #if MYSQL_VERSION_ID<50610
442 CHARSET_INFO * m_pQueryCharset;
443 #else
444 const CHARSET_INFO * m_pQueryCharset;
445 #endif
446
447 bool m_bReplace; ///< are we doing an INSERT or REPLACE
448
449 bool m_bCondId; ///< got a value from condition pushdown
450 longlong m_iCondId; ///< value acquired from id=value condition pushdown
451 bool m_bCondDone; ///< index_read() is now over
452
453 const ha_sphinx * m_pHandler;
454 CSphSEThreadTable * m_pTableNext;
455
CSphSEThreadTableCSphSEThreadTable456 CSphSEThreadTable ( const ha_sphinx * pHandler )
457 : m_bStats ( false )
458 , m_bQuery ( false )
459 , m_pQueryCharset ( NULL )
460 , m_bReplace ( false )
461 , m_bCondId ( false )
462 , m_iCondId ( 0 )
463 , m_bCondDone ( false )
464 , m_pHandler ( pHandler )
465 , m_pTableNext ( NULL )
466 {}
467 };
468
469
470 struct CSphTLS
471 {
472 CSphSEThreadTable * m_pHeadTable;
473
CSphTLSCSphTLS474 explicit CSphTLS ( const ha_sphinx * pHandler )
475 {
476 m_pHeadTable = new CSphSEThreadTable ( pHandler );
477 }
478
~CSphTLSCSphTLS479 ~CSphTLS()
480 {
481 CSphSEThreadTable * pCur = m_pHeadTable;
482 while ( pCur )
483 {
484 CSphSEThreadTable * pNext = pCur->m_pTableNext;
485 SafeDelete ( pCur );
486 pCur = pNext;
487 }
488 }
489 };
490
491
492 /// filter types
493 enum ESphFilter
494 {
495 SPH_FILTER_VALUES = 0, ///< filter by integer values set
496 SPH_FILTER_RANGE = 1, ///< filter by integer range
497 SPH_FILTER_FLOATRANGE = 2 ///< filter by float range
498 };
499
500
501 /// search query filter
502 struct CSphSEFilter
503 {
504 public:
505 ESphFilter m_eType;
506 char * m_sAttrName;
507 longlong m_uMinValue;
508 longlong m_uMaxValue;
509 float m_fMinValue;
510 float m_fMaxValue;
511 int m_iValues;
512 longlong * m_pValues;
513 int m_bExclude;
514
515 public:
CSphSEFilterCSphSEFilter516 CSphSEFilter ()
517 : m_eType ( SPH_FILTER_VALUES )
518 , m_sAttrName ( NULL )
519 , m_uMinValue ( 0 )
520 , m_uMaxValue ( UINT_MAX )
521 , m_fMinValue ( 0.0f )
522 , m_fMaxValue ( 0.0f )
523 , m_iValues ( 0 )
524 , m_pValues ( NULL )
525 , m_bExclude ( 0 )
526 {
527 }
528
~CSphSEFilterCSphSEFilter529 ~CSphSEFilter ()
530 {
531 SafeDeleteArray ( m_pValues );
532 }
533 };
534
535
536 /// float vs dword conversion
sphF2DW(float f)537 inline uint32 sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }
538
539 /// dword vs float conversion
sphDW2F(uint32 d)540 inline float sphDW2F ( uint32 d ) { union { float f; uint32 d; } u; u.d = d; return u.f; }
541
542
543 /// client-side search query
544 struct CSphSEQuery
545 {
546 public:
547 const char * m_sHost;
548 int m_iPort;
549
550 private:
551 char * m_sQueryBuffer;
552
553 const char * m_sIndex;
554 int m_iOffset;
555 int m_iLimit;
556
557 bool m_bQuery;
558 const char * m_sQuery;
559 uint32 * m_pWeights;
560 int m_iWeights;
561 ESphMatchMode m_eMode;
562 ESphRankMode m_eRanker;
563 char * m_sRankExpr;
564 ESphSortOrder m_eSort;
565 const char * m_sSortBy;
566 int m_iMaxMatches;
567 int m_iMaxQueryTime;
568 uint32 m_iMinID;
569 uint32 m_iMaxID;
570
571 int m_iFilters;
572 CSphSEFilter m_dFilters[SPHINXSE_MAX_FILTERS];
573
574 ESphGroupBy m_eGroupFunc;
575 const char * m_sGroupBy;
576 const char * m_sGroupSortBy;
577 int m_iCutoff;
578 int m_iRetryCount;
579 int m_iRetryDelay;
580 const char * m_sGroupDistinct; ///< points to query buffer; do NOT delete
581 int m_iIndexWeights;
582 char * m_sIndexWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
583 int m_iIndexWeight[SPHINXSE_MAX_FILTERS];
584 int m_iFieldWeights;
585 char * m_sFieldWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
586 int m_iFieldWeight[SPHINXSE_MAX_FILTERS];
587
588 bool m_bGeoAnchor;
589 const char * m_sGeoLatAttr;
590 const char * m_sGeoLongAttr;
591 float m_fGeoLatitude;
592 float m_fGeoLongitude;
593
594 char * m_sComment;
595 char * m_sSelect;
596
597 struct Override_t
598 {
599 union Value_t
600 {
601 uint32 m_uValue;
602 longlong m_iValue64;
603 float m_fValue;
604 };
605 char * m_sName; ///< points to query buffer
606 int m_iType;
607 Dynamic_array<ulonglong> m_dIds;
608 Dynamic_array<Value_t> m_dValues;
609 };
610 Dynamic_array<Override_t *> m_dOverrides;
611
612 public:
613 char m_sParseError[256];
614
615 public:
616 CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
617 ~CSphSEQuery ();
618
619 bool Parse ();
620 int BuildRequest ( char ** ppBuffer );
621
622 protected:
623 char * m_pBuf;
624 char * m_pCur;
625 int m_iBufLeft;
626 bool m_bBufOverrun;
627
628 template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
629 bool ParseField ( char * sField );
630
631 void SendBytes ( const void * pBytes, int iBytes );
SendWordCSphSEQuery632 void SendWord ( short int v ) { v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
SendIntCSphSEQuery633 void SendInt ( int v ) { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
SendDwordCSphSEQuery634 void SendDword ( uint v ) { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
SendUint64CSphSEQuery635 void SendUint64 ( ulonglong v ) { SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
SendStringCSphSEQuery636 void SendString ( const char * v ) { int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
SendFloatCSphSEQuery637 void SendFloat ( float v ) { SendDword ( sphF2DW(v) ); }
638 };
639
640 #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
641 template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
642 template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
643 #endif
644
645 //////////////////////////////////////////////////////////////////////////////
646
647 #if MYSQL_VERSION_ID>50100
648
649 #if MYSQL_VERSION_ID<50114
650 #error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
651 #endif
652
653 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
654 static int sphinx_init_func ( void * p );
655 static int sphinx_close_connection ( handlerton * hton, THD * thd );
656 static int sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
657 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
658
659 #else
660
661 static bool sphinx_init_func_for_handlerton ();
662 static int sphinx_close_connection ( THD * thd );
663 bool sphinx_show_status ( THD * thd );
664
665 #endif // >50100
666
667 //////////////////////////////////////////////////////////////////////////////
668
669 static const char sphinx_hton_name[] = "SPHINX";
670 static const char sphinx_hton_comment[] = "Sphinx storage engine " SPHINXSE_VERSION;
671
672 #if MYSQL_VERSION_ID<50100
673 handlerton sphinx_hton =
674 {
675 #ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
676 MYSQL_HANDLERTON_INTERFACE_VERSION,
677 #endif
678 sphinx_hton_name,
679 SHOW_OPTION_YES,
680 sphinx_hton_comment,
681 DB_TYPE_SPHINX_DB,
682 sphinx_init_func_for_handlerton,
683 0, // slot
684 0, // savepoint size
685 sphinx_close_connection, // close_connection
686 NULL, // savepoint
687 NULL, // rollback to savepoint
688 NULL, // release savepoint
689 NULL, // commit
690 NULL, // rollback
691 NULL, // prepare
692 NULL, // recover
693 NULL, // commit_by_xid
694 NULL, // rollback_by_xid
695 NULL, // create_cursor_read_view
696 NULL, // set_cursor_read_view
697 NULL, // close_cursor_read_view
698 HTON_CAN_RECREATE
699 };
700 #else
701 static handlerton * sphinx_hton_ptr = NULL;
702 #endif
703
704 //////////////////////////////////////////////////////////////////////////////
705
706 // variables for Sphinx shared methods
707 pthread_mutex_t sphinx_mutex; // mutex to init the hash
708 static int sphinx_init = 0; // flag whether the hash was initialized
709 static HASH sphinx_open_tables; // hash used to track open tables
710
711 //////////////////////////////////////////////////////////////////////////////
712 // INITIALIZATION AND SHUTDOWN
713 //////////////////////////////////////////////////////////////////////////////
714
715 // hashing function
716 #if MYSQL_VERSION_ID>=50120
717 typedef size_t GetKeyLength_t;
718 #else
719 typedef uint GetKeyLength_t;
720 #endif
721
sphinx_get_key(const byte * pSharePtr,GetKeyLength_t * pLength,my_bool)722 static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
723 {
724 CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
725 *pLength = (size_t) pShare->m_iTableNameLen;
726 return (byte*) pShare->m_sTable;
727 }
728
729 #if MYSQL_VERSION_ID<50100
sphinx_init_func(void *)730 static int sphinx_init_func ( void * ) // to avoid unused arg warning
731 #else
732 static int sphinx_init_func ( void * p )
733 #endif
734 {
735 SPH_ENTER_FUNC();
736 if ( !sphinx_init )
737 {
738 sphinx_init = 1;
739 void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
740 sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
741 sphinx_get_key, 0, 0 );
742
743 #if MYSQL_VERSION_ID > 50100
744 handlerton * hton = (handlerton*) p;
745 hton->state = SHOW_OPTION_YES;
746 hton->db_type = DB_TYPE_AUTOASSIGN;
747 hton->create = sphinx_create_handler;
748 hton->close_connection = sphinx_close_connection;
749 hton->show_status = sphinx_show_status;
750 hton->panic = sphinx_panic;
751 hton->flags = HTON_CAN_RECREATE;
752 #endif
753 }
754 SPH_RET(0);
755 }
756
757
758 #if MYSQL_VERSION_ID<50100
sphinx_init_func_for_handlerton()759 static bool sphinx_init_func_for_handlerton ()
760 {
761 return sphinx_init_func ( &sphinx_hton );
762 }
763 #endif
764
765
766 #if MYSQL_VERSION_ID>50100
767
sphinx_close_connection(handlerton * hton,THD * thd)768 static int sphinx_close_connection ( handlerton * hton, THD * thd )
769 {
770 // deallocate common handler data
771 SPH_ENTER_FUNC();
772 void ** tmp = thd_ha_data ( thd, hton );
773 CSphTLS * pTls = (CSphTLS *) (*tmp);
774 SafeDelete ( pTls );
775 *tmp = NULL;
776 SPH_RET(0);
777 }
778
779
sphinx_done_func(void *)780 static int sphinx_done_func ( void * )
781 {
782 SPH_ENTER_FUNC();
783
784 int error __attribute__ ((unused)) = 0;
785 if ( sphinx_init )
786 {
787 sphinx_init = 0;
788 if ( sphinx_open_tables.records )
789 error = 1;
790 sphinx_hash_free ( &sphinx_open_tables );
791 pthread_mutex_destroy ( &sphinx_mutex );
792 }
793
794 SPH_RET(0);
795 }
796
797
sphinx_panic(handlerton * hton,enum ha_panic_function)798 static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
799 {
800 return sphinx_done_func ( hton );
801 }
802
803 #else
804
sphinx_close_connection(THD * thd)805 static int sphinx_close_connection ( THD * thd )
806 {
807 // deallocate common handler data
808 SPH_ENTER_FUNC();
809 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
810 SafeDelete ( pTls );
811 thd->ha_data[sphinx_hton.slot] = NULL;
812 SPH_RET(0);
813 }
814
815 #endif // >50100
816
817 //////////////////////////////////////////////////////////////////////////////
818 // SHOW STATUS
819 //////////////////////////////////////////////////////////////////////////////
820
821 #if MYSQL_VERSION_ID>50100
sphinx_show_status(handlerton * hton,THD * thd,stat_print_fn * stat_print,enum ha_stat_type)822 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
823 enum ha_stat_type )
824 #else
825 bool sphinx_show_status ( THD * thd )
826 #endif
827 {
828 SPH_ENTER_FUNC();
829
830 #if MYSQL_VERSION_ID<50100
831 Protocol * protocol = thd->protocol;
832 List<Item> field_list;
833 #endif
834
835 char buf1[IO_SIZE];
836 uint buf1len;
837 char buf2[IO_SIZE];
838 uint buf2len = 0;
839 String words;
840
841 buf1[0] = '\0';
842 buf2[0] = '\0';
843
844
845 #if MYSQL_VERSION_ID>50100
846 // 5.1.x style stats
847 CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
848
849 #define LOC_STATS(_key,_keylen,_val,_vallen) \
850 stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
851
852 #else
853 // 5.0.x style stats
854 if ( have_sphinx_db!=SHOW_OPTION_YES )
855 {
856 my_message ( ER_NOT_SUPPORTED_YET,
857 "failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
858 MYF(0) );
859 SPH_RET(TRUE);
860 }
861 CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
862
863 field_list.push_back ( new Item_empty_string ( thd, "Type", 10 ) );
864 field_list.push_back ( new Item_empty_string ( thd, "Name", FN_REFLEN ) );
865 field_list.push_back ( new Item_empty_string ( thd, "Status", 10 ) );
866 if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
867 SPH_RET(TRUE);
868
869 #define LOC_STATS(_key,_keylen,_val,_vallen) \
870 protocol->prepare_for_resend (); \
871 protocol->store ( "SPHINX", 6, system_charset_info ); \
872 protocol->store ( _key, _keylen, system_charset_info ); \
873 protocol->store ( _val, _vallen, system_charset_info ); \
874 if ( protocol->write() ) \
875 SPH_RET(TRUE);
876
877 #endif
878
879
880 // show query stats
881 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
882 {
883 const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
884 buf1len = my_snprintf ( buf1, sizeof(buf1),
885 "total: %d, total found: %d, time: %d, words: %d",
886 pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
887
888 LOC_STATS ( "stats", 5, buf1, buf1len );
889
890 if ( pStats->m_iWords )
891 {
892 for ( int i=0; i<pStats->m_iWords; i++ )
893 {
894 CSphSEWordStats & tWord = pStats->m_dWords[i];
895 buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
896 buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
897 }
898
899 // convert it if we can
900 const char * sWord = buf2;
901 int iWord = buf2len;
902
903 String sBuf3;
904 if ( pTls->m_pHeadTable->m_pQueryCharset )
905 {
906 uint iErrors;
907 sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
908 sWord = sBuf3.c_ptr();
909 iWord = sBuf3.length();
910 }
911
912 LOC_STATS ( "words", 5, sWord, iWord );
913 }
914 }
915
916 // show last error or warning (either in addition to stats, or on their own)
917 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
918 {
919 const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
920
921 LOC_STATS (
922 sMessageType, strlen ( sMessageType ),
923 pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
924
925 } else
926 {
927 // well, nothing to show just yet
928 #if MYSQL_VERSION_ID < 50100
929 LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
930 #endif
931 }
932
933 #if MYSQL_VERSION_ID < 50100
934 send_eof(thd);
935 #endif
936
937 SPH_RET(FALSE);
938 }
939
940 //////////////////////////////////////////////////////////////////////////////
941 // HELPERS
942 //////////////////////////////////////////////////////////////////////////////
943
sphDup(const char * sSrc,int iLen=-1)944 static char * sphDup ( const char * sSrc, int iLen=-1 )
945 {
946 if ( !sSrc )
947 return NULL;
948
949 if ( iLen<0 )
950 iLen = strlen(sSrc);
951
952 char * sRes = new char [ 1+iLen ];
953 memcpy ( sRes, sSrc, iLen );
954 sRes[iLen] = '\0';
955 return sRes;
956 }
957
958
sphLogError(const char * sFmt,...)959 static void sphLogError ( const char * sFmt, ... )
960 {
961 // emit timestamp
962 #ifdef __WIN__
963 SYSTEMTIME t;
964 GetLocalTime ( &t );
965
966 fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
967 (int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
968 (int)t.wHour, (int)t.wMinute, (int)t.wSecond );
969 #else
970 // Unix version
971 time_t tStamp;
972 time ( &tStamp );
973
974 struct tm * pParsed;
975 #ifdef HAVE_LOCALTIME_R
976 struct tm tParsed;
977 localtime_r ( &tStamp, &tParsed );
978 pParsed = &tParsed;
979 #else
980 pParsed = localtime ( &tStamp );
981 #endif // HAVE_LOCALTIME_R
982
983 fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
984 pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
985 pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
986 #endif // __WIN__
987
988 // emit message
989 va_list ap;
990 va_start ( ap, sFmt );
991 vfprintf ( stderr, sFmt, ap );
992 va_end ( ap );
993
994 // emit newline
995 fprintf ( stderr, "\n" );
996 }
997
998
999
1000 // the following scheme variants are recognized
1001 //
1002 // sphinx://host[:port]/index
1003 // sphinxql://host[:port]/index
1004 // unix://unix/domain/socket[:index]
ParseUrl(CSphSEShare * share,TABLE * table,bool bCreate)1005 static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
1006 {
1007 SPH_ENTER_FUNC();
1008
1009 if ( share )
1010 {
1011 // check incoming stuff
1012 if ( !table )
1013 {
1014 sphLogError ( "table==NULL in ParseUrl()" );
1015 return false;
1016 }
1017 if ( !table->s )
1018 {
1019 sphLogError ( "(table->s)==NULL in ParseUrl()" );
1020 return false;
1021 }
1022
1023 // free old stuff
1024 share->ResetTable ();
1025
1026 // fill new stuff
1027 share->m_iTableFields = table->s->fields;
1028 if ( share->m_iTableFields )
1029 {
1030 share->m_sTableField = new char * [ share->m_iTableFields ];
1031 share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
1032
1033 for ( int i=0; i<share->m_iTableFields; i++ )
1034 {
1035 share->m_sTableField[i] = sphDup ( table->field[i]->field_name.str );
1036 share->m_eTableFieldType[i] = table->field[i]->type();
1037 }
1038 }
1039 }
1040
1041 // defaults
1042 bool bOk = true;
1043 bool bQL = false;
1044 char * sScheme = NULL;
1045 char * sHost = (char*) SPHINXAPI_DEFAULT_HOST;
1046 char * sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1047 int iPort = SPHINXAPI_DEFAULT_PORT;
1048
1049 // parse connection string, if any
1050 while ( table->s->connect_string.length!=0 )
1051 {
1052 sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
1053
1054 sHost = strstr ( sScheme, "://" );
1055 if ( !sHost )
1056 {
1057 bOk = false;
1058 break;
1059 }
1060 sHost[0] = '\0';
1061 sHost += 3;
1062
1063 /////////////////////////////
1064 // sphinxapi via unix socket
1065 /////////////////////////////
1066
1067 if ( !strcmp ( sScheme, "unix" ) )
1068 {
1069 sHost--; // reuse last slash
1070 iPort = 0;
1071 if (!( sIndex = strrchr ( sHost, ':' ) ))
1072 sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1073 else
1074 {
1075 *sIndex++ = '\0';
1076 if ( !*sIndex )
1077 sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1078 }
1079 bOk = true;
1080 break;
1081 }
1082
1083 /////////////////////
1084 // sphinxapi via tcp
1085 /////////////////////
1086
1087 if ( !strcmp ( sScheme, "sphinx" ) )
1088 {
1089 char * sPort = strchr ( sHost, ':' );
1090 if ( sPort )
1091 {
1092 *sPort++ = '\0';
1093 if ( *sPort )
1094 {
1095 sIndex = strchr ( sPort, '/' );
1096 if ( sIndex )
1097 *sIndex++ = '\0';
1098 else
1099 sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1100
1101 iPort = atoi(sPort);
1102 if ( !iPort )
1103 iPort = SPHINXAPI_DEFAULT_PORT;
1104 }
1105 } else
1106 {
1107 sIndex = strchr ( sHost, '/' );
1108 if ( sIndex )
1109 *sIndex++ = '\0';
1110 else
1111 sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
1112 }
1113 bOk = true;
1114 break;
1115 }
1116
1117 ////////////
1118 // sphinxql
1119 ////////////
1120
1121 if ( !strcmp ( sScheme, "sphinxql" ) )
1122 {
1123 bQL = true;
1124 iPort = SPHINXQL_DEFAULT_PORT;
1125
1126 // handle port
1127 char * sPort = strchr ( sHost, ':' );
1128 sIndex = sHost; // starting point for index name search
1129
1130 if ( sPort )
1131 {
1132 *sPort++ = '\0';
1133 sIndex = sPort;
1134
1135 iPort = atoi(sPort);
1136 if ( !iPort )
1137 {
1138 bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
1139 break;
1140 }
1141 }
1142
1143 // find index
1144 sIndex = strchr ( sIndex, '/' );
1145 if ( sIndex )
1146 *sIndex++ = '\0';
1147
1148 // final checks
1149 // host and index names are required
1150 bOk = ( sHost && *sHost && sIndex && *sIndex );
1151 break;
1152 }
1153
1154 // unknown case
1155 bOk = false;
1156 break;
1157 }
1158
1159 if ( !bOk )
1160 {
1161 my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
1162 MYF(0), table->s->connect_string.str);
1163 } else
1164 {
1165 if ( share )
1166 {
1167 SafeDeleteArray ( share->m_sScheme );
1168 share->m_sScheme = sScheme;
1169 share->m_sHost = sHost;
1170 share->m_sIndex = sIndex;
1171 share->m_iPort = (ushort)iPort;
1172 share->m_bSphinxQL = bQL;
1173 }
1174 }
1175 if ( !bOk && !share )
1176 SafeDeleteArray ( sScheme );
1177
1178 SPH_RET(bOk);
1179 }
1180
1181
1182 // Example of simple lock controls. The "share" it creates is structure we will
1183 // pass to each sphinx handler. Do you have to have one of these? Well, you have
1184 // pieces that are used for locking, and they are needed to function.
get_share(const char * table_name,TABLE * table)1185 static CSphSEShare * get_share ( const char * table_name, TABLE * table )
1186 {
1187 SPH_ENTER_FUNC();
1188 pthread_mutex_lock ( &sphinx_mutex );
1189
1190 CSphSEShare * pShare = NULL;
1191 for ( ;; )
1192 {
1193 // check if we already have this share
1194 #if MYSQL_VERSION_ID>=50120
1195 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
1196 #else
1197 #ifdef __WIN__
1198 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
1199 #else
1200 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
1201 #endif // win
1202 #endif // pre-5.1.20
1203
1204 if ( pShare )
1205 {
1206 pShare->m_iUseCount++;
1207 break;
1208 }
1209
1210 // try to allocate new share
1211 pShare = new CSphSEShare ();
1212 if ( !pShare )
1213 break;
1214
1215 // try to setup it
1216 if ( !ParseUrl ( pShare, table, false ) )
1217 {
1218 SafeDelete ( pShare );
1219 break;
1220 }
1221
1222 if ( !pShare->m_bSphinxQL )
1223 pShare->m_pTableQueryCharset = table->field[2]->charset();
1224
1225 // try to hash it
1226 pShare->m_iTableNameLen = strlen(table_name);
1227 pShare->m_sTable = sphDup ( table_name );
1228 if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
1229 {
1230 SafeDelete ( pShare );
1231 break;
1232 }
1233
1234 // all seems fine
1235 break;
1236 }
1237
1238 pthread_mutex_unlock ( &sphinx_mutex );
1239 SPH_RET(pShare);
1240 }
1241
1242
1243 // Free lock controls. We call this whenever we close a table. If the table had
1244 // the last reference to the share then we free memory associated with it.
free_share(CSphSEShare * pShare)1245 static int free_share ( CSphSEShare * pShare )
1246 {
1247 SPH_ENTER_FUNC();
1248 pthread_mutex_lock ( &sphinx_mutex );
1249
1250 if ( !--pShare->m_iUseCount )
1251 {
1252 sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
1253 SafeDelete ( pShare );
1254 }
1255
1256 pthread_mutex_unlock ( &sphinx_mutex );
1257 SPH_RET(0);
1258 }
1259
1260
1261 #if MYSQL_VERSION_ID>50100
sphinx_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)1262 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
1263 {
1264 sphinx_hton_ptr = hton;
1265 return new ( mem_root ) ha_sphinx ( hton, table );
1266 }
1267 #endif
1268
1269 //////////////////////////////////////////////////////////////////////////////
1270 // CLIENT-SIDE REQUEST STUFF
1271 //////////////////////////////////////////////////////////////////////////////
1272
CSphSEQuery(const char * sQuery,int iLength,const char * sIndex)1273 CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
1274 : m_sHost ( "" )
1275 , m_iPort ( 0 )
1276 , m_sIndex ( sIndex ? sIndex : "*" )
1277 , m_iOffset ( 0 )
1278 , m_iLimit ( 20 )
1279 , m_bQuery ( false )
1280 , m_sQuery ( "" )
1281 , m_pWeights ( NULL )
1282 , m_iWeights ( 0 )
1283 , m_eMode ( SPH_MATCH_ALL )
1284 , m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
1285 , m_sRankExpr ( NULL )
1286 , m_eSort ( SPH_SORT_RELEVANCE )
1287 , m_sSortBy ( "" )
1288 , m_iMaxMatches ( 1000 )
1289 , m_iMaxQueryTime ( 0 )
1290 , m_iMinID ( 0 )
1291 , m_iMaxID ( 0 )
1292 , m_iFilters ( 0 )
1293 , m_eGroupFunc ( SPH_GROUPBY_DAY )
1294 , m_sGroupBy ( "" )
1295 , m_sGroupSortBy ( "@group desc" )
1296 , m_iCutoff ( 0 )
1297 , m_iRetryCount ( 0 )
1298 , m_iRetryDelay ( 0 )
1299 , m_sGroupDistinct ( "" )
1300 , m_iIndexWeights ( 0 )
1301 , m_iFieldWeights ( 0 )
1302 , m_bGeoAnchor ( false )
1303 , m_sGeoLatAttr ( "" )
1304 , m_sGeoLongAttr ( "" )
1305 , m_fGeoLatitude ( 0.0f )
1306 , m_fGeoLongitude ( 0.0f )
1307 , m_sComment ( (char*) "" )
1308 , m_sSelect ( (char*) "*" )
1309
1310 , m_pBuf ( NULL )
1311 , m_pCur ( NULL )
1312 , m_iBufLeft ( 0 )
1313 , m_bBufOverrun ( false )
1314 {
1315 m_sQueryBuffer = new char [ iLength+2 ];
1316 memcpy ( m_sQueryBuffer, sQuery, iLength );
1317 m_sQueryBuffer[iLength] = ';';
1318 m_sQueryBuffer[iLength+1] = '\0';
1319 }
1320
1321
~CSphSEQuery()1322 CSphSEQuery::~CSphSEQuery ()
1323 {
1324 SPH_ENTER_METHOD();
1325 SafeDeleteArray ( m_sQueryBuffer );
1326 SafeDeleteArray ( m_pWeights );
1327 SafeDeleteArray ( m_pBuf );
1328 for ( size_t i=0; i<m_dOverrides.elements(); i++ )
1329 SafeDelete ( m_dOverrides.at(i) );
1330 SPH_VOID_RET();
1331 }
1332
1333
1334 template < typename T >
ParseArray(T ** ppValues,const char * sValue)1335 int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
1336 {
1337 SPH_ENTER_METHOD();
1338
1339 assert ( ppValues );
1340 assert ( !(*ppValues) );
1341
1342 const char * pValue;
1343 bool bPrevDigit = false;
1344 int iValues = 0;
1345
1346 // count the values
1347 for ( pValue=sValue; *pValue; pValue++ )
1348 {
1349 bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1350 if ( bDigit && !bPrevDigit )
1351 iValues++;
1352 bPrevDigit = bDigit;
1353 }
1354 if ( !iValues )
1355 SPH_RET(0);
1356
1357 // extract the values
1358 T * pValues = new T [ iValues ];
1359 *ppValues = pValues;
1360
1361 int iIndex = 0, iSign = 1;
1362 T uValue = 0;
1363
1364 bPrevDigit = false;
1365 for ( pValue=sValue ;; pValue++ )
1366 {
1367 bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1368
1369 if ( bDigit )
1370 {
1371 if ( !bPrevDigit )
1372 uValue = 0;
1373 uValue = uValue*10 + ( (*pValue)-'0' );
1374 } else if ( bPrevDigit )
1375 {
1376 assert ( iIndex<iValues );
1377 pValues [ iIndex++ ] = uValue * iSign;
1378 iSign = 1;
1379 } else if ( *pValue=='-' )
1380 iSign = -1;
1381
1382 bPrevDigit = bDigit;
1383 if ( !*pValue )
1384 break;
1385 }
1386
1387 SPH_RET ( iValues );
1388 }
1389
1390
chop(char * s)1391 static char * chop ( char * s )
1392 {
1393 while ( *s && isspace(*s) )
1394 s++;
1395
1396 char * p = s + strlen(s);
1397 while ( p>s && isspace ( p[-1] ) )
1398 p--;
1399 *p = '\0';
1400
1401 return s;
1402 }
1403
1404
myisattr(char c)1405 static bool myisattr ( char c )
1406 {
1407 return
1408 ( c>='0' && c<='9' ) ||
1409 ( c>='a' && c<='z' ) ||
1410 ( c>='A' && c<='Z' ) ||
1411 c=='_';
1412 }
1413
myismagic(char c)1414 static bool myismagic ( char c )
1415 {
1416 return c=='@';
1417 }
1418
myisjson(char c)1419 static bool myisjson ( char c )
1420 {
1421 return
1422 c=='.' ||
1423 c=='[' ||
1424 c==']';
1425 }
1426
1427
ParseField(char * sField)1428 bool CSphSEQuery::ParseField ( char * sField )
1429 {
1430 SPH_ENTER_METHOD();
1431
1432 // look for option name/value separator
1433 char * sValue = strchr ( sField, '=' );
1434 if ( !sValue || sValue==sField || sValue[-1]=='\\' )
1435 {
1436 // by default let's assume it's just query
1437 if ( sField[0] )
1438 {
1439 if ( m_bQuery )
1440 {
1441 snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
1442 SPH_RET(false);
1443 } else
1444 {
1445 m_sQuery = sField;
1446 m_bQuery = true;
1447
1448 // unescape only 1st one
1449 char *s = sField, *d = sField;
1450 int iSlashes = 0;
1451 while ( *s )
1452 {
1453 iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
1454 if ( ( iSlashes%2 )==0 ) *d++ = *s;
1455 s++;
1456 }
1457 *d = '\0';
1458 }
1459 }
1460 SPH_RET(true);
1461 }
1462
1463 // split
1464 *sValue++ = '\0';
1465 sValue = chop ( sValue );
1466 int iValue = atoi ( sValue );
1467
1468 // handle options
1469 char * sName = chop ( sField );
1470
1471 if ( !strcmp ( sName, "query" ) ) m_sQuery = sValue;
1472 else if ( !strcmp ( sName, "host" ) ) m_sHost = sValue;
1473 else if ( !strcmp ( sName, "port" ) ) m_iPort = iValue;
1474 else if ( !strcmp ( sName, "index" ) ) m_sIndex = sValue;
1475 else if ( !strcmp ( sName, "offset" ) ) m_iOffset = iValue;
1476 else if ( !strcmp ( sName, "limit" ) ) m_iLimit = iValue;
1477 else if ( !strcmp ( sName, "weights" ) ) m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
1478 else if ( !strcmp ( sName, "minid" ) ) m_iMinID = iValue;
1479 else if ( !strcmp ( sName, "maxid" ) ) m_iMaxID = iValue;
1480 else if ( !strcmp ( sName, "maxmatches" ) ) m_iMaxMatches = iValue;
1481 else if ( !strcmp ( sName, "maxquerytime" ) ) m_iMaxQueryTime = iValue;
1482 else if ( !strcmp ( sName, "groupsort" ) ) m_sGroupSortBy = sValue;
1483 else if ( !strcmp ( sName, "distinct" ) ) m_sGroupDistinct = sValue;
1484 else if ( !strcmp ( sName, "cutoff" ) ) m_iCutoff = iValue;
1485 else if ( !strcmp ( sName, "comment" ) ) m_sComment = sValue;
1486 else if ( !strcmp ( sName, "select" ) ) m_sSelect = sValue;
1487
1488 else if ( !strcmp ( sName, "mode" ) )
1489 {
1490 m_eMode = SPH_MATCH_ALL;
1491 if ( !strcmp ( sValue, "any" ) ) m_eMode = SPH_MATCH_ANY;
1492 else if ( !strcmp ( sValue, "phrase" ) ) m_eMode = SPH_MATCH_PHRASE;
1493 else if ( !strcmp ( sValue, "boolean" ) ) m_eMode = SPH_MATCH_BOOLEAN;
1494 else if ( !strcmp ( sValue, "ext" ) ) m_eMode = SPH_MATCH_EXTENDED;
1495 else if ( !strcmp ( sValue, "extended" ) ) m_eMode = SPH_MATCH_EXTENDED;
1496 else if ( !strcmp ( sValue, "ext2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
1497 else if ( !strcmp ( sValue, "extended2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
1498 else if ( !strcmp ( sValue, "all" ) ) m_eMode = SPH_MATCH_ALL;
1499 else if ( !strcmp ( sValue, "fullscan" ) ) m_eMode = SPH_MATCH_FULLSCAN;
1500 else
1501 {
1502 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
1503 SPH_RET(false);
1504 }
1505 } else if ( !strcmp ( sName, "ranker" ) )
1506 {
1507 m_eRanker = SPH_RANK_PROXIMITY_BM25;
1508 if ( !strcmp ( sValue, "proximity_bm25" ) ) m_eRanker = SPH_RANK_PROXIMITY_BM25;
1509 else if ( !strcmp ( sValue, "bm25" ) ) m_eRanker = SPH_RANK_BM25;
1510 else if ( !strcmp ( sValue, "none" ) ) m_eRanker = SPH_RANK_NONE;
1511 else if ( !strcmp ( sValue, "wordcount" ) ) m_eRanker = SPH_RANK_WORDCOUNT;
1512 else if ( !strcmp ( sValue, "proximity" ) ) m_eRanker = SPH_RANK_PROXIMITY;
1513 else if ( !strcmp ( sValue, "matchany" ) ) m_eRanker = SPH_RANK_MATCHANY;
1514 else if ( !strcmp ( sValue, "fieldmask" ) ) m_eRanker = SPH_RANK_FIELDMASK;
1515 else if ( !strcmp ( sValue, "sph04" ) ) m_eRanker = SPH_RANK_SPH04;
1516 else if ( !strncmp ( sValue, "expr:", 5 ) )
1517 {
1518 m_eRanker = SPH_RANK_EXPR;
1519 m_sRankExpr = sValue+5;
1520 } else
1521 {
1522 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
1523 SPH_RET(false);
1524 }
1525 } else if ( !strcmp ( sName, "sort" ) )
1526 {
1527 static const struct
1528 {
1529 const char * m_sName;
1530 ESphSortOrder m_eSort;
1531 } dSortModes[] =
1532 {
1533 { "relevance", SPH_SORT_RELEVANCE },
1534 { "attr_desc:", SPH_SORT_ATTR_DESC },
1535 { "attr_asc:", SPH_SORT_ATTR_ASC },
1536 { "time_segments:", SPH_SORT_TIME_SEGMENTS },
1537 { "extended:", SPH_SORT_EXTENDED },
1538 { "expr:", SPH_SORT_EXPR }
1539 };
1540
1541 int i;
1542 const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
1543 for ( i=0; i<nModes; i++ )
1544 if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
1545 {
1546 m_eSort = dSortModes[i].m_eSort;
1547 m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
1548 break;
1549 }
1550 if ( i==nModes )
1551 {
1552 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
1553 SPH_RET(false);
1554 }
1555
1556 } else if ( !strcmp ( sName, "groupby" ) )
1557 {
1558 static const struct
1559 {
1560 const char * m_sName;
1561 ESphGroupBy m_eFunc;
1562 } dGroupModes[] =
1563 {
1564 { "day:", SPH_GROUPBY_DAY },
1565 { "week:", SPH_GROUPBY_WEEK },
1566 { "month:", SPH_GROUPBY_MONTH },
1567 { "year:", SPH_GROUPBY_YEAR },
1568 { "attr:", SPH_GROUPBY_ATTR },
1569 { "multi:", SPH_GROUPBY_MULTIPLE }
1570 };
1571
1572 int i;
1573 const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
1574 for ( i=0; i<nModes; i++ )
1575 if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
1576 {
1577 m_eGroupFunc = dGroupModes[i].m_eFunc;
1578 m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
1579 break;
1580 }
1581 if ( i==nModes )
1582 {
1583 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
1584 SPH_RET(false);
1585 }
1586
1587 } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1588 ( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
1589 {
1590 for ( ;; )
1591 {
1592 char * p = sName;
1593 CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1594 tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
1595 tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
1596
1597 if (!( p = strchr ( sValue, ',' ) ))
1598 break;
1599 *p++ = '\0';
1600
1601 tFilter.m_sAttrName = chop ( sValue );
1602 sValue = p;
1603
1604 if (!( p = strchr ( sValue, ',' ) ))
1605 break;
1606 *p++ = '\0';
1607
1608 if ( tFilter.m_eType==SPH_FILTER_RANGE )
1609 {
1610 tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
1611 tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
1612 } else
1613 {
1614 tFilter.m_fMinValue = (float)atof(sValue);
1615 tFilter.m_fMaxValue = (float)atof(p);
1616 }
1617
1618 // all ok
1619 m_iFilters++;
1620 break;
1621 }
1622
1623 } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1624 ( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
1625 {
1626 for ( ;; )
1627 {
1628 CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1629 tFilter.m_eType = SPH_FILTER_VALUES;
1630 tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
1631
1632 // get the attr name
1633 while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
1634 sValue++;
1635 if ( !*sValue )
1636 break;
1637
1638 tFilter.m_sAttrName = sValue;
1639 while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) || myisjson(*sValue) ) )
1640 sValue++;
1641 if ( !*sValue )
1642 break;
1643 *sValue++ = '\0';
1644
1645 // get the values
1646 tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
1647 if ( !tFilter.m_iValues )
1648 {
1649 assert ( !tFilter.m_pValues );
1650 break;
1651 }
1652
1653 // all ok
1654 m_iFilters++;
1655 break;
1656 }
1657
1658 } else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
1659 {
1660 bool bIndex = !strcmp ( sName, "indexweights" );
1661 int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
1662 char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
1663 int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
1664
1665 *pCount = 0;
1666
1667 char * p = sValue;
1668 while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
1669 {
1670 // extract attr name
1671 if ( !myisattr(*p) )
1672 {
1673 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
1674 SPH_RET(false);
1675 }
1676
1677 pNames[*pCount] = p;
1678 while ( myisattr(*p) ) p++;
1679
1680 if ( *p!=',' )
1681 {
1682 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1683 SPH_RET(false);
1684 }
1685 *p++ = '\0';
1686
1687 // extract attr value
1688 char * sVal = p;
1689 while ( isdigit(*p) ) p++;
1690 if ( p==sVal )
1691 {
1692 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
1693 SPH_RET(false);
1694 }
1695 pWeights[*pCount] = atoi(sVal);
1696 (*pCount)++;
1697
1698 if ( !*p )
1699 break;
1700 if ( *p!=',' )
1701 {
1702 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1703 SPH_RET(false);
1704 }
1705 p++;
1706 }
1707
1708 } else if ( !strcmp ( sName, "geoanchor" ) )
1709 {
1710 m_bGeoAnchor = false;
1711 for ( ;; )
1712 {
1713 char * sLat = sValue;
1714 char * p = sValue;
1715
1716 if (!( p = strchr ( p, ',' ) )) break;
1717 *p++ = '\0';
1718 char * sLong = p;
1719
1720 if (!( p = strchr ( p, ',' ) )) break;
1721 *p++ = '\0';
1722 char * sLatVal = p;
1723
1724 if (!( p = strchr ( p, ',' ) )) break;
1725 *p++ = '\0';
1726 char * sLongVal = p;
1727
1728 m_sGeoLatAttr = chop(sLat);
1729 m_sGeoLongAttr = chop(sLong);
1730 m_fGeoLatitude = (float)atof ( sLatVal );
1731 m_fGeoLongitude = (float)atof ( sLongVal );
1732 m_bGeoAnchor = true;
1733 break;
1734 }
1735 if ( !m_bGeoAnchor )
1736 {
1737 snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
1738 SPH_RET(false);
1739 }
1740 } else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
1741 {
1742 sName = NULL;
1743 int iType = 0;
1744 CSphSEQuery::Override_t * pOverride = NULL;
1745
1746 // get name and type
1747 char * sRest = sValue;
1748 for ( ;; )
1749 {
1750 sName = sRest;
1751 if ( !*sName )
1752 break;
1753 if (!( sRest = strchr ( sRest, ',' ) ))
1754 break;
1755 *sRest++ = '\0';
1756 char * sType = sRest;
1757 if (!( sRest = strchr ( sRest, ',' ) ))
1758 break;
1759
1760 static const struct
1761 {
1762 const char * m_sName;
1763 int m_iType;
1764 }
1765 dAttrTypes[] =
1766 {
1767 { "int", SPH_ATTR_INTEGER },
1768 { "timestamp", SPH_ATTR_TIMESTAMP },
1769 { "bool", SPH_ATTR_BOOL },
1770 { "float", SPH_ATTR_FLOAT },
1771 { "bigint", SPH_ATTR_BIGINT }
1772 };
1773 for ( uint i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
1774 if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
1775 {
1776 iType = dAttrTypes[i].m_iType;
1777 break;
1778 }
1779 break;
1780 }
1781
1782 // fail
1783 if ( !sName || !*sName || !iType )
1784 {
1785 snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
1786 SPH_RET(false);
1787 }
1788
1789 // grab id:value pairs
1790 sRest++;
1791 while ( sRest )
1792 {
1793 char * sId = sRest;
1794 if (!( sRest = strchr ( sRest, ':' ) )) break;
1795 *sRest++ = '\0';
1796 if (!( sRest - sId )) break;
1797
1798 sValue = sRest;
1799 if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
1800 *sRest++ = '\0';
1801 if ( !*sValue )
1802 break;
1803
1804 if ( !pOverride )
1805 {
1806 pOverride = new CSphSEQuery::Override_t;
1807 pOverride->m_sName = chop(sName);
1808 pOverride->m_iType = iType;
1809 m_dOverrides.append ( pOverride );
1810 }
1811
1812 ulonglong uId = strtoull ( sId, NULL, 10 );
1813 CSphSEQuery::Override_t::Value_t tValue;
1814 if ( iType==SPH_ATTR_FLOAT )
1815 tValue.m_fValue = (float)atof(sValue);
1816 else if ( iType==SPH_ATTR_BIGINT )
1817 tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
1818 else
1819 tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
1820
1821 pOverride->m_dIds.append ( uId );
1822 pOverride->m_dValues.append ( tValue );
1823 }
1824
1825 if ( !pOverride )
1826 {
1827 snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
1828 SPH_RET(false);
1829 }
1830 SPH_RET(true);
1831 } else
1832 {
1833 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
1834 SPH_RET(false);
1835 }
1836
1837 // !COMMIT handle syntax errors
1838
1839 SPH_RET(true);
1840 }
1841
1842
Parse()1843 bool CSphSEQuery::Parse ()
1844 {
1845 SPH_ENTER_METHOD();
1846 SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
1847
1848 m_bQuery = false;
1849 char * pCur = m_sQueryBuffer;
1850 char * pNext = pCur;
1851
1852 while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
1853 {
1854 // handle escaped semicolons
1855 if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
1856 {
1857 pNext++;
1858 continue;
1859 }
1860
1861 // handle semicolon-separated clauses
1862 *pNext++ = '\0';
1863 if ( !ParseField ( pCur ) )
1864 SPH_RET(false);
1865 pCur = pNext;
1866 }
1867
1868 SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
1869
1870 SPH_RET(true);
1871 }
1872
1873
SendBytes(const void * pBytes,int iBytes)1874 void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
1875 {
1876 SPH_ENTER_METHOD();
1877 if ( m_iBufLeft<iBytes )
1878 {
1879 m_bBufOverrun = true;
1880 SPH_VOID_RET();
1881 }
1882
1883 memcpy ( m_pCur, pBytes, iBytes );
1884
1885 m_pCur += iBytes;
1886 m_iBufLeft -= iBytes;
1887 SPH_VOID_RET();
1888 }
1889
1890
BuildRequest(char ** ppBuffer)1891 int CSphSEQuery::BuildRequest ( char ** ppBuffer )
1892 {
1893 SPH_ENTER_METHOD();
1894
1895 // calc request length
1896 int iReqSize = 128 + 4*m_iWeights
1897 + strlen ( m_sSortBy )
1898 + strlen ( m_sQuery )
1899 + strlen ( m_sIndex )
1900 + strlen ( m_sGroupBy )
1901 + strlen ( m_sGroupSortBy )
1902 + strlen ( m_sGroupDistinct )
1903 + strlen ( m_sComment )
1904 + strlen ( m_sSelect );
1905 if ( m_eRanker==SPH_RANK_EXPR )
1906 iReqSize += 4 + strlen(m_sRankExpr);
1907 for ( int i=0; i<m_iFilters; i++ )
1908 {
1909 const CSphSEFilter & tFilter = m_dFilters[i];
1910 iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
1911 switch ( tFilter.m_eType )
1912 {
1913 case SPH_FILTER_VALUES: iReqSize += 4 + 8*tFilter.m_iValues; break;
1914 case SPH_FILTER_RANGE: iReqSize += 16; break;
1915 case SPH_FILTER_FLOATRANGE: iReqSize += 8; break;
1916 }
1917 }
1918 if ( m_bGeoAnchor ) // 1.14+
1919 iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
1920 for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
1921 iReqSize += 8 + strlen(m_sIndexWeight[i] );
1922 for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
1923 iReqSize += 8 + strlen(m_sFieldWeight[i] );
1924 // overrides
1925 iReqSize += 4;
1926 for ( size_t i=0; i<m_dOverrides.elements(); i++ )
1927 {
1928 CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
1929 const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
1930 iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
1931 }
1932 // select
1933 iReqSize += 4;
1934
1935 m_iBufLeft = 0;
1936 SafeDeleteArray ( m_pBuf );
1937
1938 m_pBuf = new char [ iReqSize ];
1939 if ( !m_pBuf )
1940 SPH_RET(-1);
1941
1942 m_pCur = m_pBuf;
1943 m_iBufLeft = iReqSize;
1944 m_bBufOverrun = false;
1945 (*ppBuffer) = m_pBuf;
1946
1947 // build request
1948 SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
1949 SendWord ( VER_COMMAND_SEARCH ); // command version
1950 SendInt ( iReqSize-8 ); // packet body length
1951 SendInt ( 0 ); // its a client
1952
1953 SendInt ( 1 ); // number of queries
1954 SendInt ( m_iOffset );
1955 SendInt ( m_iLimit );
1956 SendInt ( m_eMode );
1957 SendInt ( m_eRanker ); // 1.16+
1958 if ( m_eRanker==SPH_RANK_EXPR )
1959 SendString ( m_sRankExpr );
1960 SendInt ( m_eSort );
1961 SendString ( m_sSortBy ); // sort attr
1962 SendString ( m_sQuery ); // query
1963 SendInt ( m_iWeights );
1964 for ( int j=0; j<m_iWeights; j++ )
1965 SendInt ( m_pWeights[j] ); // weights
1966 SendString ( m_sIndex ); // indexes
1967 SendInt ( 1 ); // id64 range follows
1968 SendUint64 ( m_iMinID ); // id/ts ranges
1969 SendUint64 ( m_iMaxID );
1970
1971 SendInt ( m_iFilters );
1972 for ( int j=0; j<m_iFilters; j++ )
1973 {
1974 const CSphSEFilter & tFilter = m_dFilters[j];
1975 SendString ( tFilter.m_sAttrName );
1976 SendInt ( tFilter.m_eType );
1977
1978 switch ( tFilter.m_eType )
1979 {
1980 case SPH_FILTER_VALUES:
1981 SendInt ( tFilter.m_iValues );
1982 for ( int k=0; k<tFilter.m_iValues; k++ )
1983 SendUint64 ( tFilter.m_pValues[k] );
1984 break;
1985
1986 case SPH_FILTER_RANGE:
1987 SendUint64 ( tFilter.m_uMinValue );
1988 SendUint64 ( tFilter.m_uMaxValue );
1989 break;
1990
1991 case SPH_FILTER_FLOATRANGE:
1992 SendFloat ( tFilter.m_fMinValue );
1993 SendFloat ( tFilter.m_fMaxValue );
1994 break;
1995 }
1996
1997 SendInt ( tFilter.m_bExclude );
1998 }
1999
2000 SendInt ( m_eGroupFunc );
2001 SendString ( m_sGroupBy );
2002 SendInt ( m_iMaxMatches );
2003 SendString ( m_sGroupSortBy );
2004 SendInt ( m_iCutoff ); // 1.9+
2005 SendInt ( m_iRetryCount ); // 1.10+
2006 SendInt ( m_iRetryDelay );
2007 SendString ( m_sGroupDistinct ); // 1.11+
2008 SendInt ( m_bGeoAnchor ); // 1.14+
2009 if ( m_bGeoAnchor )
2010 {
2011 SendString ( m_sGeoLatAttr );
2012 SendString ( m_sGeoLongAttr );
2013 SendFloat ( m_fGeoLatitude );
2014 SendFloat ( m_fGeoLongitude );
2015 }
2016 SendInt ( m_iIndexWeights ); // 1.15+
2017 for ( int i=0; i<m_iIndexWeights; i++ )
2018 {
2019 SendString ( m_sIndexWeight[i] );
2020 SendInt ( m_iIndexWeight[i] );
2021 }
2022 SendInt ( m_iMaxQueryTime ); // 1.17+
2023 SendInt ( m_iFieldWeights ); // 1.18+
2024 for ( int i=0; i<m_iFieldWeights; i++ )
2025 {
2026 SendString ( m_sFieldWeight[i] );
2027 SendInt ( m_iFieldWeight[i] );
2028 }
2029 SendString ( m_sComment );
2030
2031 // overrides
2032 SendInt ( m_dOverrides.elements() );
2033 for ( size_t i=0; i<m_dOverrides.elements(); i++ )
2034 {
2035 CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
2036 SendString ( pOverride->m_sName );
2037 SendDword ( pOverride->m_iType );
2038 SendInt ( pOverride->m_dIds.elements() );
2039 for ( size_t j=0; j<pOverride->m_dIds.elements(); j++ )
2040 {
2041 SendUint64 ( pOverride->m_dIds.at(j) );
2042 if ( pOverride->m_iType==SPH_ATTR_FLOAT )
2043 SendFloat ( pOverride->m_dValues.at(j).m_fValue );
2044 else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
2045 SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
2046 else
2047 SendDword ( pOverride->m_dValues.at(j).m_uValue );
2048 }
2049 }
2050
2051 // select
2052 SendString ( m_sSelect );
2053
2054 // detect buffer overruns and underruns, and report internal error
2055 if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
2056 SPH_RET(-1);
2057
2058 // all fine
2059 SPH_RET ( iReqSize );
2060 }
2061
2062 //////////////////////////////////////////////////////////////////////////////
2063 // SPHINX HANDLER
2064 //////////////////////////////////////////////////////////////////////////////
2065
2066 #if MYSQL_VERSION_ID<50100
ha_sphinx(TABLE_ARG * table)2067 ha_sphinx::ha_sphinx ( TABLE_ARG * table )
2068 : handler ( &sphinx_hton, table )
2069 #else
2070 ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
2071 : handler ( hton, table )
2072 #endif
2073 , m_pShare ( NULL )
2074 , m_iMatchesTotal ( 0 )
2075 , m_iCurrentPos ( 0 )
2076 , m_pCurrentKey ( NULL )
2077 , m_iCurrentKeyLen ( 0 )
2078 , m_pResponse ( NULL )
2079 , m_pResponseEnd ( NULL )
2080 , m_pCur ( NULL )
2081 , m_bUnpackError ( false )
2082 , m_iFields ( 0 )
2083 , m_dFields ( NULL )
2084 , m_iAttrs ( 0 )
2085 , m_dAttrs ( NULL )
2086 , m_bId64 ( 0 )
2087 , m_dUnboundFields ( NULL )
2088 {
2089 SPH_ENTER_METHOD();
2090 SPH_VOID_RET();
2091 }
2092
~ha_sphinx()2093 ha_sphinx::~ha_sphinx()
2094 {
2095 SafeDeleteArray ( m_dAttrs );
2096 SafeDeleteArray ( m_dUnboundFields );
2097 if ( m_dFields )
2098 {
2099 for (uint32 i=0; i< m_iFields; i++ )
2100 SafeDeleteArray ( m_dFields[i] );
2101 delete [] m_dFields;
2102 }
2103 }
2104
2105 // Used for opening tables. The name will be the name of the file.
2106 // A table is opened when it needs to be opened. For instance
2107 // when a request comes in for a select on the table (tables are not
2108 // open and closed for each request, they are cached).
2109 //
2110 // Called from handler.cc by handler::ha_open(). The server opens all tables by
2111 // calling ha_open() which then calls the handler specific open().
open(const char * name,int,uint)2112 int ha_sphinx::open ( const char * name, int, uint )
2113 {
2114 SPH_ENTER_METHOD();
2115 m_pShare = get_share ( name, table );
2116 if ( !m_pShare )
2117 SPH_RET(1);
2118
2119 thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
2120
2121 #if MYSQL_VERSION_ID>50100
2122 *thd_ha_data ( table->in_use, ht ) = NULL;
2123 #else
2124 table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
2125 #endif
2126
2127 SPH_RET(0);
2128 }
2129
2130
Connect(const char * sHost,ushort uPort)2131 int ha_sphinx::Connect ( const char * sHost, ushort uPort )
2132 {
2133 struct sockaddr_in sin;
2134 #ifndef __WIN__
2135 struct sockaddr_un saun;
2136 #endif
2137
2138 int iDomain = 0;
2139 int iSockaddrSize = 0;
2140 struct sockaddr * pSockaddr = NULL;
2141
2142 in_addr_t ip_addr;
2143
2144 if ( uPort )
2145 {
2146 iDomain = AF_INET;
2147 iSockaddrSize = sizeof(sin);
2148 pSockaddr = (struct sockaddr *) &sin;
2149
2150 memset ( &sin, 0, sizeof(sin) );
2151 sin.sin_family = AF_INET;
2152 sin.sin_port = htons(uPort);
2153
2154 // prepare host address
2155 if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
2156 {
2157 memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
2158 } else
2159 {
2160 int tmp_errno;
2161 bool bError = false;
2162
2163 #if MYSQL_VERSION_ID>=50515
2164 struct addrinfo *hp = NULL;
2165 tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
2166 if ( tmp_errno || !hp || !hp->ai_addr )
2167 {
2168 bError = true;
2169 if ( hp )
2170 freeaddrinfo ( hp );
2171 }
2172 #else
2173 struct hostent tmp_hostent, *hp;
2174 char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
2175 hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
2176 if ( !hp )
2177 {
2178 my_gethostbyname_r_free();
2179 bError = true;
2180 }
2181 #endif
2182
2183 if ( bError )
2184 {
2185 char sError[256];
2186 my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
2187
2188 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2189 SPH_RET(-1);
2190 }
2191
2192 #if MYSQL_VERSION_ID>=50515
2193 struct sockaddr_in *in = (sockaddr_in *)hp->ai_addr;
2194 memcpy ( &sin.sin_addr, &in->sin_addr, Min ( sizeof(sin.sin_addr), sizeof(in->sin_addr) ) );
2195 freeaddrinfo ( hp );
2196 #else
2197 memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
2198 my_gethostbyname_r_free();
2199 #endif
2200 }
2201 } else
2202 {
2203 #ifndef __WIN__
2204 iDomain = AF_UNIX;
2205 iSockaddrSize = sizeof(saun);
2206 pSockaddr = (struct sockaddr *) &saun;
2207
2208 memset ( &saun, 0, sizeof(saun) );
2209 saun.sun_family = AF_UNIX;
2210 strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
2211 #else
2212 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
2213 SPH_RET(-1);
2214 #endif
2215 }
2216
2217 char sError[512];
2218 int iSocket = (int) socket ( iDomain, SOCK_STREAM, 0 );
2219
2220 if ( iSocket<0 )
2221 {
2222 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
2223 SPH_RET(-1);
2224 }
2225
2226 if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
2227 {
2228 sphSockClose ( iSocket );
2229 my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
2230 sHost, errno, (int)uPort );
2231 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2232 SPH_RET(-1);
2233 }
2234
2235 return iSocket;
2236 }
2237
2238
ConnectAPI(const char * sQueryHost,int iQueryPort)2239 int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
2240 {
2241 SPH_ENTER_METHOD();
2242
2243 const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
2244 ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
2245
2246 int iSocket = Connect ( sHost, uPort );
2247 if ( iSocket<0 )
2248 SPH_RET ( iSocket );
2249
2250 char sError[512];
2251
2252 int version;
2253 if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
2254 {
2255 sphSockClose ( iSocket );
2256 my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
2257 sHost, (int)uPort );
2258 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2259 SPH_RET(-1);
2260 }
2261
2262 uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
2263 if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
2264 {
2265 sphSockClose ( iSocket );
2266 my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
2267 sHost, (int)uPort );
2268 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2269 SPH_RET(-1);
2270 }
2271
2272 SPH_RET ( iSocket );
2273 }
2274
2275
2276 // Closes a table. We call the free_share() function to free any resources
2277 // that we have allocated in the "shared" structure.
2278 //
2279 // Called from sql_base.cc, sql_select.cc, and table.cc.
2280 // In sql_select.cc it is only used to close up temporary tables or during
2281 // the process where a temporary table is converted over to being a
2282 // myisam table.
2283 // For sql_base.cc look at close_data_tables().
close()2284 int ha_sphinx::close()
2285 {
2286 SPH_ENTER_METHOD();
2287 SPH_RET ( free_share ( m_pShare ) );
2288 }
2289
2290
HandleMysqlError(MYSQL * pConn,int iErrCode)2291 int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
2292 {
2293 CSphSEThreadTable * pTable = GetTls ();
2294 if ( pTable )
2295 {
2296 strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof pTable->m_tStats.m_sLastMessage - 1 );
2297 pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2298 pTable->m_tStats.m_bLastError = true;
2299 }
2300
2301 mysql_close ( pConn );
2302
2303 my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
2304 return -1;
2305 }
2306
2307
extra(enum ha_extra_function op)2308 int ha_sphinx::extra ( enum ha_extra_function op )
2309 {
2310 CSphSEThreadTable * pTable = GetTls();
2311 if ( pTable )
2312 {
2313 if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
2314 pTable->m_bReplace = true;
2315 else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
2316 pTable->m_bReplace = false;
2317 }
2318 return 0;
2319 }
2320
2321
write_row(const byte *)2322 int ha_sphinx::write_row ( const byte * )
2323 {
2324 SPH_ENTER_METHOD();
2325 if ( !m_pShare || !m_pShare->m_bSphinxQL )
2326 SPH_RET ( HA_ERR_WRONG_COMMAND );
2327
2328 // SphinxQL inserts only, pretty much similar to abandoned federated
2329 char sQueryBuf[1024];
2330 char sValueBuf[1024];
2331
2332 String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2333 String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
2334 sQuery.length ( 0 );
2335 sValue.length ( 0 );
2336
2337 CSphSEThreadTable * pTable = GetTls ();
2338 sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
2339 sQuery.append ( m_pShare->m_sIndex );
2340 sQuery.append ( " (" );
2341
2342 for ( Field ** ppField = table->field; *ppField; ppField++ )
2343 {
2344 sQuery.append ( (*ppField)->field_name.str );
2345 if ( ppField[1] )
2346 sQuery.append ( ", " );
2347 }
2348 sQuery.append ( ") VALUES (" );
2349
2350 for ( Field ** ppField = table->field; *ppField; ppField++ )
2351 {
2352 if ( (*ppField)->is_null() )
2353 {
2354 sQuery.append ( "''" );
2355
2356 } else
2357 {
2358 THD *thd= ha_thd();
2359 if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
2360 {
2361 Item_field * pWrap = new (thd->mem_root) Item_field(thd, *ppField); // autofreed by query arena, I assume
2362 Item_func_unix_timestamp * pConv = new (thd->mem_root) Item_func_unix_timestamp(thd, pWrap);
2363 pConv->quick_fix_field();
2364 unsigned int uTs = (unsigned int) pConv->val_int();
2365
2366 snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
2367 sQuery.append ( sValueBuf );
2368
2369 } else
2370 {
2371 (*ppField)->val_str ( &sValue );
2372 sQuery.append ( "'" );
2373 sValue.print ( &sQuery );
2374 sQuery.append ( "'" );
2375 sValue.length(0);
2376 }
2377 }
2378
2379 if ( ppField[1] )
2380 sQuery.append ( ", " );
2381 }
2382 sQuery.append ( ")" );
2383
2384 // FIXME? pretty inefficient to reconnect every time under high load,
2385 // but this was intentionally written for a low load scenario..
2386 MYSQL * pConn = mysql_init ( NULL );
2387 if ( !pConn )
2388 SPH_RET ( ER_OUT_OF_RESOURCES );
2389
2390 unsigned int uTimeout = 1;
2391 mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2392
2393 my_bool my_true= 1;
2394 mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
2395
2396 if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2397 SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2398
2399 if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2400 SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2401
2402 // all ok!
2403 mysql_close ( pConn );
2404 SPH_RET(0);
2405 }
2406
2407
IsIntegerFieldType(enum_field_types eType)2408 static inline bool IsIntegerFieldType ( enum_field_types eType )
2409 {
2410 return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
2411 }
2412
2413
IsIDField(Field * pField)2414 static inline bool IsIDField ( Field * pField )
2415 {
2416 enum_field_types eType = pField->type();
2417
2418 if ( eType==MYSQL_TYPE_LONGLONG )
2419 return true;
2420
2421 if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
2422 return true;
2423
2424 return false;
2425 }
2426
2427
delete_row(const byte *)2428 int ha_sphinx::delete_row ( const byte * )
2429 {
2430 SPH_ENTER_METHOD();
2431 if ( !m_pShare || !m_pShare->m_bSphinxQL )
2432 SPH_RET ( HA_ERR_WRONG_COMMAND );
2433
2434 char sQueryBuf[1024];
2435 String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2436 sQuery.length ( 0 );
2437
2438 sQuery.append ( "DELETE FROM " );
2439 sQuery.append ( m_pShare->m_sIndex );
2440 sQuery.append ( " WHERE id=" );
2441
2442 char sValue[32];
2443 snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
2444 sQuery.append ( sValue );
2445
2446 // FIXME? pretty inefficient to reconnect every time under high load,
2447 // but this was intentionally written for a low load scenario..
2448 MYSQL * pConn = mysql_init ( NULL );
2449 if ( !pConn )
2450 SPH_RET ( ER_OUT_OF_RESOURCES );
2451
2452 unsigned int uTimeout = 1;
2453 mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2454
2455 my_bool my_true= 1;
2456 mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
2457
2458 if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2459 SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2460
2461 if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2462 SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2463
2464 // all ok!
2465 mysql_close ( pConn );
2466 SPH_RET(0);
2467 }
2468
2469
update_row(const byte *,const byte *)2470 int ha_sphinx::update_row ( const byte *, const byte * )
2471 {
2472 SPH_ENTER_METHOD();
2473 SPH_RET ( HA_ERR_WRONG_COMMAND );
2474 }
2475
2476
2477 // keynr is key (index) number
2478 // sorted is 1 if result MUST be sorted according to index
index_init(uint keynr,bool)2479 int ha_sphinx::index_init ( uint keynr, bool )
2480 {
2481 SPH_ENTER_METHOD();
2482 active_index = keynr;
2483
2484 CSphSEThreadTable * pTable = GetTls();
2485 if ( pTable )
2486 pTable->m_bCondDone = false;
2487
2488 SPH_RET(0);
2489 }
2490
2491
index_end()2492 int ha_sphinx::index_end()
2493 {
2494 SPH_ENTER_METHOD();
2495 SPH_RET(0);
2496 }
2497
2498
CheckResponcePtr(int iLen)2499 bool ha_sphinx::CheckResponcePtr ( int iLen )
2500 {
2501 if ( m_pCur+iLen>m_pResponseEnd )
2502 {
2503 m_pCur = m_pResponseEnd;
2504 m_bUnpackError = true;
2505 return false;
2506 }
2507
2508 return true;
2509 }
2510
2511
UnpackDword()2512 uint32 ha_sphinx::UnpackDword ()
2513 {
2514 if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
2515 {
2516 return 0;
2517 }
2518
2519 uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
2520 m_pCur += sizeof(uint32); // NOLINT
2521 return uRes;
2522 }
2523
2524
UnpackString()2525 char * ha_sphinx::UnpackString ()
2526 {
2527 uint32 iLen = UnpackDword ();
2528 if ( !iLen )
2529 return NULL;
2530
2531 if ( !CheckResponcePtr ( iLen ) )
2532 {
2533 return NULL;
2534 }
2535
2536 char * sRes = new char [ 1+iLen ];
2537 memcpy ( sRes, m_pCur, iLen );
2538 sRes[iLen] = '\0';
2539 m_pCur += iLen;
2540 return sRes;
2541 }
2542
2543
UnpackSchema()2544 bool ha_sphinx::UnpackSchema ()
2545 {
2546 SPH_ENTER_METHOD();
2547
2548 // cleanup
2549 if ( m_dFields )
2550 for ( int i=0; i<(int)m_iFields; i++ )
2551 SafeDeleteArray ( m_dFields[i] );
2552 SafeDeleteArray ( m_dFields );
2553
2554 // unpack network packet
2555 uint32 uStatus = UnpackDword ();
2556 char * sMessage = NULL;
2557
2558 if ( uStatus!=SEARCHD_OK )
2559 {
2560 sMessage = UnpackString ();
2561 CSphSEThreadTable * pTable = GetTls ();
2562 if ( pTable )
2563 {
2564 strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
2565 pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2566 pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
2567 }
2568
2569 if ( uStatus==SEARCHD_ERROR )
2570 {
2571 char sError[1024];
2572 my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
2573 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2574 SafeDeleteArray ( sMessage );
2575 SPH_RET ( false );
2576 }
2577 }
2578
2579 m_iFields = UnpackDword ();
2580 m_dFields = new char * [ m_iFields ];
2581 if ( !m_dFields )
2582 {
2583 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
2584 SPH_RET(false);
2585 }
2586
2587 for ( uint32 i=0; i<m_iFields; i++ )
2588 m_dFields[i] = UnpackString ();
2589
2590 SafeDeleteArray ( m_dAttrs );
2591 m_iAttrs = UnpackDword ();
2592 m_dAttrs = new CSphSEAttr [ m_iAttrs ];
2593 if ( !m_dAttrs )
2594 {
2595 for ( int i=0; i<(int)m_iFields; i++ )
2596 SafeDeleteArray ( m_dFields[i] );
2597 SafeDeleteArray ( m_dFields );
2598 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
2599 SPH_RET(false);
2600 }
2601
2602 for ( uint32 i=0; i<m_iAttrs; i++ )
2603 {
2604 m_dAttrs[i].m_sName = UnpackString ();
2605 m_dAttrs[i].m_uType = UnpackDword ();
2606 if ( m_bUnpackError ) // m_sName may be null
2607 break;
2608
2609 m_dAttrs[i].m_iField = -1;
2610 for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
2611 {
2612 const char * sTableField = m_pShare->m_sTableField[j];
2613 const char * sAttrField = m_dAttrs[i].m_sName;
2614 if ( m_dAttrs[i].m_sName[0]=='@' )
2615 {
2616 const char * sAtPrefix = "_sph_";
2617 if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
2618 continue;
2619 sTableField += strlen(sAtPrefix);
2620 sAttrField++;
2621 }
2622
2623 if ( !strcasecmp ( sAttrField, sTableField ) )
2624 {
2625 // we're almost good, but
2626 // let's enforce that timestamp columns can only receive timestamp attributes
2627 if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
2628 m_dAttrs[i].m_iField = j;
2629 break;
2630 }
2631 }
2632 }
2633
2634 m_iMatchesTotal = UnpackDword ();
2635
2636 m_bId64 = UnpackDword ();
2637 if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
2638 {
2639 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
2640 SPH_RET(false);
2641 }
2642
2643 // network packet unpacked; build unbound fields map
2644 SafeDeleteArray ( m_dUnboundFields );
2645 m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
2646
2647 for ( int i=0; i<m_pShare->m_iTableFields; i++ )
2648 {
2649 if ( i<SPHINXSE_SYSTEM_COLUMNS )
2650 m_dUnboundFields[i] = SPH_ATTR_NONE;
2651
2652 else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
2653 m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
2654
2655 else
2656 m_dUnboundFields[i] = SPH_ATTR_INTEGER;
2657 }
2658
2659 for ( uint32 i=0; i<m_iAttrs; i++ )
2660 if ( m_dAttrs[i].m_iField>=0 )
2661 m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
2662
2663 if ( m_bUnpackError )
2664 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
2665
2666 SPH_RET ( !m_bUnpackError );
2667 }
2668
2669
UnpackStats(CSphSEStats * pStats)2670 bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
2671 {
2672 assert ( pStats );
2673
2674 char * pCurSave = m_pCur;
2675 for ( uint m=0; m<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); m++ ) // NOLINT
2676 {
2677 m_pCur += m_bId64 ? 12 : 8; // skip id+weight
2678 for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2679 {
2680 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
2681 {
2682 // skip MVA list
2683 uint32 uCount = UnpackDword ();
2684 m_pCur += uCount*4;
2685 } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
2686 {
2687 uint32 iLen = UnpackDword();
2688 m_pCur += iLen;
2689 } else // skip normal value
2690 m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
2691 }
2692 }
2693
2694 pStats->m_iMatchesTotal = UnpackDword ();
2695 pStats->m_iMatchesFound = UnpackDword ();
2696 pStats->m_iQueryMsec = UnpackDword ();
2697 pStats->m_iWords = UnpackDword ();
2698
2699 if ( m_bUnpackError )
2700 return false;
2701
2702 if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
2703 return false;
2704
2705 SafeDeleteArray ( pStats->m_dWords );
2706 pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
2707 if ( !pStats->m_dWords )
2708 return false;
2709
2710 for ( int i=0; i<pStats->m_iWords; i++ )
2711 {
2712 CSphSEWordStats & tWord = pStats->m_dWords[i];
2713 tWord.m_sWord = UnpackString ();
2714 tWord.m_iDocs = UnpackDword ();
2715 tWord.m_iHits = UnpackDword ();
2716 }
2717
2718 if ( m_bUnpackError )
2719 return false;
2720
2721 m_pCur = pCurSave;
2722 return true;
2723 }
2724
2725
2726 /// condition pushdown implementation, to properly intercept WHERE clauses on my columns
2727 #if MYSQL_VERSION_ID<50610
cond_push(const COND * cond)2728 const COND * ha_sphinx::cond_push ( const COND * cond )
2729 #else
2730 const Item * ha_sphinx::cond_push ( const Item *cond )
2731 #endif
2732 {
2733 // catch the simplest case: query_column="some text"
2734 for ( ;; )
2735 {
2736 if ( cond->type()!=Item::FUNC_ITEM )
2737 break;
2738
2739 Item_func * condf = (Item_func *)cond;
2740 if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
2741 break;
2742
2743 // get my tls
2744 CSphSEThreadTable * pTable = GetTls ();
2745 if ( !pTable )
2746 break;
2747
2748 Item ** args = condf->arguments();
2749 if ( !m_pShare->m_bSphinxQL )
2750 {
2751 // on non-QL tables, intercept query=value condition for SELECT
2752 if (!( args[0]->type()==Item::FIELD_ITEM &&
2753 args[1]->is_of_type(Item::CONST_ITEM,
2754 STRING_RESULT)))
2755 break;
2756
2757 Item_field * pField = (Item_field *) args[0];
2758 if ( pField->field->field_index!=2 ) // FIXME! magic key index
2759 break;
2760
2761 // copy the query, and let know that we intercepted this condition
2762 String *pString= args[1]->val_str(NULL);
2763 pTable->m_bQuery = true;
2764 strncpy ( pTable->m_sQuery, pString->c_ptr(), sizeof(pTable->m_sQuery) );
2765 pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
2766 pTable->m_pQueryCharset = pString->charset();
2767
2768 } else
2769 {
2770 if (!( args[0]->type()==Item::FIELD_ITEM &&
2771 args[1]->is_of_type(Item::CONST_ITEM,
2772 INT_RESULT)))
2773 break;
2774
2775 // on QL tables, intercept id=value condition for DELETE
2776 Item_field * pField = (Item_field *) args[0];
2777 if ( pField->field->field_index!=0 ) // FIXME! magic key index
2778 break;
2779
2780 Item_int * pVal = (Item_int *) args[1];
2781 pTable->m_iCondId = pVal->val_int();
2782 pTable->m_bCondId = true;
2783 }
2784
2785 // we intercepted this condition
2786 return NULL;
2787 }
2788
2789 // don't change anything
2790 return cond;
2791 }
2792
2793
2794 /// condition popup
cond_pop()2795 void ha_sphinx::cond_pop ()
2796 {
2797 CSphSEThreadTable * pTable = GetTls ();
2798 if ( pTable )
2799 pTable->m_bQuery = false;
2800 }
2801
2802
2803 /// get TLS (maybe allocate it, too)
GetTls()2804 CSphSEThreadTable * ha_sphinx::GetTls()
2805 {
2806 SPH_ENTER_METHOD()
2807 // where do we store that pointer in today's version?
2808 CSphTLS ** ppTls;
2809 #if MYSQL_VERSION_ID>50100
2810 ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
2811 #else
2812 ppTls = (CSphTLS**) ¤t_thd->ha_data[sphinx_hton.slot];
2813 #endif // >50100
2814
2815 CSphSEThreadTable * pTable = NULL;
2816 // allocate if needed
2817 if ( !*ppTls )
2818 {
2819 *ppTls = new CSphTLS ( this );
2820 pTable = (*ppTls)->m_pHeadTable;
2821 } else
2822 {
2823 pTable = (*ppTls)->m_pHeadTable;
2824 }
2825
2826 while ( pTable && pTable->m_pHandler!=this )
2827 pTable = pTable->m_pTableNext;
2828
2829 if ( !pTable )
2830 {
2831 pTable = new CSphSEThreadTable ( this );
2832 pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
2833 (*ppTls)->m_pHeadTable = pTable;
2834 }
2835
2836 // errors will be handled by caller
2837 return pTable;
2838 }
2839
2840
2841 // Positions an index cursor to the index specified in the handle. Fetches the
2842 // row if available. If the key value is null, begin at the first key of the
2843 // index.
index_read(byte * buf,const byte * key,uint key_len,enum ha_rkey_function)2844 int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
2845 {
2846 SPH_ENTER_METHOD();
2847 char sError[256];
2848
2849 // set new data for thd->ha_data, it is used in show_status
2850 CSphSEThreadTable * pTable = GetTls();
2851 if ( !pTable )
2852 {
2853 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
2854 SPH_RET ( HA_ERR_END_OF_FILE );
2855 }
2856 pTable->m_tStats.Reset ();
2857
2858 // sphinxql table, just return the key once
2859 if ( m_pShare->m_bSphinxQL )
2860 {
2861 // over and out
2862 if ( pTable->m_bCondDone )
2863 SPH_RET ( HA_ERR_END_OF_FILE );
2864
2865 // return a value from pushdown, if any
2866 if ( pTable->m_bCondId )
2867 {
2868 table->field[0]->store ( pTable->m_iCondId, 1 );
2869 pTable->m_bCondDone = true;
2870 SPH_RET(0);
2871 }
2872
2873 // return a value from key
2874 longlong iRef = 0;
2875 if ( key_len==4 )
2876 iRef = uint4korr ( key );
2877 else if ( key_len==8 )
2878 iRef = uint8korr ( key );
2879 else
2880 {
2881 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
2882 SPH_RET ( HA_ERR_END_OF_FILE );
2883 }
2884
2885 table->field[0]->store ( iRef, 1 );
2886 pTable->m_bCondDone = true;
2887 SPH_RET(0);
2888 }
2889
2890 // parse query
2891 if ( pTable->m_bQuery )
2892 {
2893 // we have a query from condition pushdown
2894 m_pCurrentKey = (const byte *) pTable->m_sQuery;
2895 m_iCurrentKeyLen = strlen(pTable->m_sQuery);
2896 } else
2897 {
2898 // just use the key (might be truncated)
2899 m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
2900 m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
2901 pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
2902 }
2903
2904 CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
2905 if ( !q.Parse () )
2906 {
2907 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
2908 SPH_RET ( HA_ERR_END_OF_FILE );
2909 }
2910
2911 // do connect
2912 int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
2913 if ( iSocket<0 )
2914 SPH_RET ( HA_ERR_END_OF_FILE );
2915
2916 // my buffer
2917 char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
2918 int iReqLen = q.BuildRequest ( &pBuffer );
2919
2920 if ( iReqLen<=0 )
2921 {
2922 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
2923 SPH_RET ( HA_ERR_END_OF_FILE );
2924 }
2925
2926 // send request
2927 ::send ( iSocket, pBuffer, iReqLen, 0 );
2928
2929 // receive reply
2930 char sHeader[8];
2931 int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
2932 if ( iGot!=sizeof(sHeader) )
2933 {
2934 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
2935 SPH_RET ( HA_ERR_END_OF_FILE );
2936 }
2937
2938 short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
2939 short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
2940 uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
2941 SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
2942 uRespStatus, uRespVersion, uRespLength );
2943
2944 SafeDeleteArray ( m_pResponse );
2945 if ( uRespLength<=SPHINXSE_MAX_ALLOC )
2946 m_pResponse = new char [ uRespLength+1 ];
2947
2948 if ( !m_pResponse )
2949 {
2950 my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
2951 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2952 SPH_RET ( HA_ERR_END_OF_FILE );
2953 }
2954
2955 int iRecvLength = 0;
2956 while ( iRecvLength<(int)uRespLength )
2957 {
2958 int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
2959 if ( iRecv<0 )
2960 break;
2961 iRecvLength += iRecv;
2962 }
2963
2964 ::closesocket ( iSocket );
2965 iSocket = -1;
2966
2967 if ( iRecvLength!=(int)uRespLength )
2968 {
2969 my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
2970 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2971 SPH_RET ( HA_ERR_END_OF_FILE );
2972 }
2973
2974 // we'll have a message, at least
2975 pTable->m_bStats = true;
2976
2977 // parse reply
2978 m_iCurrentPos = 0;
2979 m_pCur = m_pResponse;
2980 m_pResponseEnd = m_pResponse + uRespLength;
2981 m_bUnpackError = false;
2982
2983 if ( uRespStatus!=SEARCHD_OK )
2984 {
2985 char * sMessage = UnpackString ();
2986 if ( !sMessage )
2987 {
2988 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
2989 uRespStatus, uRespLength );
2990 SPH_RET ( HA_ERR_END_OF_FILE );
2991 }
2992
2993 strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
2994 pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
2995 SafeDeleteArray ( sMessage );
2996
2997 if ( uRespStatus!=SEARCHD_WARNING )
2998 {
2999 my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
3000 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
3001
3002 pTable->m_tStats.m_bLastError = true;
3003 SPH_RET ( HA_ERR_END_OF_FILE );
3004 }
3005 }
3006
3007 if ( !UnpackSchema () )
3008 SPH_RET ( HA_ERR_END_OF_FILE );
3009
3010 if ( !UnpackStats ( &pTable->m_tStats ) )
3011 {
3012 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
3013 SPH_RET ( HA_ERR_END_OF_FILE );
3014 }
3015
3016 SPH_RET ( get_rec ( buf, key, key_len ) );
3017 }
3018
3019
3020 // Positions an index cursor to the index specified in key. Fetches the
3021 // row if any. This is only used to read whole keys.
index_read_idx(byte *,uint,const byte *,uint,enum ha_rkey_function)3022 int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
3023 {
3024 SPH_ENTER_METHOD();
3025 SPH_RET ( HA_ERR_WRONG_COMMAND );
3026 }
3027
3028
3029 // Used to read forward through the index.
index_next(byte * buf)3030 int ha_sphinx::index_next ( byte * buf )
3031 {
3032 SPH_ENTER_METHOD();
3033 SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
3034 }
3035
3036
index_next_same(byte * buf,const byte * key,uint keylen)3037 int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
3038 {
3039 SPH_ENTER_METHOD();
3040 SPH_RET ( get_rec ( buf, key, keylen ) );
3041 }
3042
3043
get_rec(byte * buf,const byte *,uint)3044 int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
3045 {
3046 SPH_ENTER_METHOD();
3047
3048 if ( m_iCurrentPos>=m_iMatchesTotal )
3049 {
3050 SafeDeleteArray ( m_pResponse );
3051 SPH_RET ( HA_ERR_END_OF_FILE );
3052 }
3053
3054 #if MYSQL_VERSION_ID>50100
3055 MY_BITMAP * org_bitmap = dbug_tmp_use_all_columns ( table, &table->write_set );
3056 #endif
3057 Field ** field = table->field;
3058
3059 // unpack and return the match
3060 longlong uMatchID = UnpackDword ();
3061 if ( m_bId64 )
3062 uMatchID = ( uMatchID<<32 ) + UnpackDword();
3063 uint32 uMatchWeight = UnpackDword ();
3064
3065 field[0]->store ( uMatchID, 1 );
3066 field[1]->store ( uMatchWeight, 1 );
3067 field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
3068
3069 for ( uint32 i=0; i<m_iAttrs; i++ )
3070 {
3071 longlong iValue64 = 0;
3072 uint32 uValue = UnpackDword ();
3073 if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
3074 iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
3075 if ( m_dAttrs[i].m_iField<0 )
3076 {
3077 // skip MVA or String
3078 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
3079 {
3080 for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3081 UnpackDword();
3082 } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
3083 {
3084 m_pCur += uValue;
3085 }
3086 continue;
3087 }
3088
3089 Field * af = field [ m_dAttrs[i].m_iField ];
3090 switch ( m_dAttrs[i].m_uType )
3091 {
3092 case SPH_ATTR_INTEGER:
3093 case SPH_ATTR_ORDINAL:
3094 case SPH_ATTR_BOOL:
3095 af->store ( uValue, 1 );
3096 break;
3097
3098 case SPH_ATTR_FLOAT:
3099 af->store ( sphDW2F(uValue) );
3100 break;
3101
3102 case SPH_ATTR_TIMESTAMP:
3103 if ( af->type()==MYSQL_TYPE_TIMESTAMP )
3104 longstore ( af->ptr, uValue ); // because store() does not accept timestamps
3105 else
3106 af->store ( uValue, 1 );
3107 break;
3108
3109 case SPH_ATTR_BIGINT:
3110 af->store ( iValue64, 0 );
3111 break;
3112
3113 case SPH_ATTR_STRING:
3114 if ( !uValue )
3115 af->store ( "", 0, &my_charset_bin );
3116 else if ( CheckResponcePtr ( uValue ) )
3117 {
3118 af->store ( m_pCur, uValue, &my_charset_bin );
3119 m_pCur += uValue;
3120 }
3121 break;
3122
3123 case SPH_ATTR_UINT64SET:
3124 case SPH_ATTR_UINT32SET :
3125 if ( uValue<=0 )
3126 {
3127 // shortcut, empty MVA set
3128 af->store ( "", 0, &my_charset_bin );
3129
3130 } else
3131 {
3132 // convert MVA set to comma-separated string
3133 char sBuf[1024]; // FIXME! magic size
3134 char * pCur = sBuf;
3135
3136 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
3137 {
3138 for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3139 {
3140 uint32 uEntry = UnpackDword ();
3141 if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
3142 {
3143 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
3144 while ( *pCur ) pCur++;
3145 if ( uValue>1 )
3146 *pCur++ = ','; // non-trailing commas
3147 }
3148 }
3149 } else
3150 {
3151 for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
3152 {
3153 uint32 uEntryLo = UnpackDword ();
3154 uint32 uEntryHi = UnpackDword();
3155 if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
3156 {
3157 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u%u", uEntryHi, uEntryLo );
3158 while ( *pCur ) pCur++;
3159 if ( uValue>2 )
3160 *pCur++ = ','; // non-trailing commas
3161 }
3162 }
3163 }
3164
3165 af->store ( sBuf, uint(pCur-sBuf), &my_charset_bin );
3166 }
3167 break;
3168
3169 default:
3170 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
3171 SafeDeleteArray ( m_pResponse );
3172 SPH_RET ( HA_ERR_END_OF_FILE );
3173 }
3174 }
3175
3176 if ( m_bUnpackError )
3177 {
3178 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
3179 SafeDeleteArray ( m_pResponse );
3180 SPH_RET ( HA_ERR_END_OF_FILE );
3181 }
3182
3183 // zero out unmapped fields
3184 for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
3185 if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
3186 switch ( m_dUnboundFields[i] )
3187 {
3188 case SPH_ATTR_INTEGER: table->field[i]->store ( 0, 1 ); break;
3189 case SPH_ATTR_TIMESTAMP: longstore ( table->field[i]->ptr, 0 ); break;
3190 default:
3191 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
3192 "INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
3193 SafeDeleteArray ( m_pResponse );
3194 SPH_RET ( HA_ERR_END_OF_FILE );
3195 }
3196
3197 memset ( buf, 0, table->s->null_bytes );
3198 m_iCurrentPos++;
3199
3200 #if MYSQL_VERSION_ID > 50100
3201 dbug_tmp_restore_column_map ( &table->write_set, org_bitmap );
3202 #endif
3203
3204 SPH_RET(0);
3205 }
3206
3207
3208 // Used to read backwards through the index.
index_prev(byte *)3209 int ha_sphinx::index_prev ( byte * )
3210 {
3211 SPH_ENTER_METHOD();
3212 SPH_RET ( HA_ERR_WRONG_COMMAND );
3213 }
3214
3215
3216 // index_first() asks for the first key in the index.
3217 //
3218 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3219 // and sql_select.cc.
index_first(byte *)3220 int ha_sphinx::index_first ( byte * )
3221 {
3222 SPH_ENTER_METHOD();
3223 SPH_RET ( HA_ERR_END_OF_FILE );
3224 }
3225
3226 // index_last() asks for the last key in the index.
3227 //
3228 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3229 // and sql_select.cc.
index_last(byte *)3230 int ha_sphinx::index_last ( byte * )
3231 {
3232 SPH_ENTER_METHOD();
3233 SPH_RET ( HA_ERR_WRONG_COMMAND );
3234 }
3235
3236
rnd_init(bool)3237 int ha_sphinx::rnd_init ( bool )
3238 {
3239 SPH_ENTER_METHOD();
3240 SPH_RET(0);
3241 }
3242
3243
rnd_end()3244 int ha_sphinx::rnd_end()
3245 {
3246 SPH_ENTER_METHOD();
3247 SPH_RET(0);
3248 }
3249
3250
rnd_next(byte *)3251 int ha_sphinx::rnd_next ( byte * )
3252 {
3253 SPH_ENTER_METHOD();
3254 SPH_RET ( HA_ERR_END_OF_FILE );
3255 }
3256
3257
position(const byte *)3258 void ha_sphinx::position ( const byte * )
3259 {
3260 SPH_ENTER_METHOD();
3261 SPH_VOID_RET();
3262 }
3263
3264
3265 // This is like rnd_next, but you are given a position to use
3266 // to determine the row. The position will be of the type that you stored in
3267 // ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
3268 // or position you saved when position() was called.
3269 // Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
rnd_pos(byte *,byte *)3270 int ha_sphinx::rnd_pos ( byte *, byte * )
3271 {
3272 SPH_ENTER_METHOD();
3273 SPH_RET ( HA_ERR_WRONG_COMMAND );
3274 }
3275
3276
3277 #if MYSQL_VERSION_ID>=50030
info(uint)3278 int ha_sphinx::info ( uint )
3279 #else
3280 void ha_sphinx::info ( uint )
3281 #endif
3282 {
3283 SPH_ENTER_METHOD();
3284
3285 if ( table->s->keys>0 )
3286 table->key_info[0].rec_per_key[0] = 1;
3287
3288 #if MYSQL_VERSION_ID>50100
3289 stats.records = 20;
3290 #else
3291 records = 20;
3292 #endif
3293
3294 #if MYSQL_VERSION_ID>=50030
3295 SPH_RET(0);
3296 #else
3297 SPH_VOID_RET();
3298 #endif
3299 }
3300
3301
reset()3302 int ha_sphinx::reset ()
3303 {
3304 SPH_ENTER_METHOD();
3305 CSphSEThreadTable * pTable = GetTls ();
3306 if ( pTable )
3307 pTable->m_bQuery = false;
3308 SPH_RET(0);
3309 }
3310
3311
delete_all_rows()3312 int ha_sphinx::delete_all_rows()
3313 {
3314 SPH_ENTER_METHOD();
3315 SPH_RET ( HA_ERR_WRONG_COMMAND );
3316 }
3317
3318
3319 // First you should go read the section "locking functions for mysql" in
3320 // lock.cc to understand this.
3321 // This create a lock on the table. If you are implementing a storage engine
3322 // that can handle transacations look at ha_berkely.cc to see how you will
3323 // want to go about doing this. Otherwise you should consider calling flock()
3324 // here.
3325 //
3326 // Called from lock.cc by lock_external() and unlock_external(). Also called
3327 // from sql_table.cc by copy_data_between_tables().
external_lock(THD *,int)3328 int ha_sphinx::external_lock ( THD *, int )
3329 {
3330 SPH_ENTER_METHOD();
3331 SPH_RET(0);
3332 }
3333
3334
store_lock(THD *,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3335 THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
3336 enum thr_lock_type lock_type )
3337 {
3338 SPH_ENTER_METHOD();
3339
3340 if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
3341 m_tLock.type = lock_type;
3342
3343 *to++ = &m_tLock;
3344 SPH_RET(to);
3345 }
3346
3347
delete_table(const char *)3348 int ha_sphinx::delete_table ( const char * )
3349 {
3350 SPH_ENTER_METHOD();
3351 SPH_RET(0);
3352 }
3353
3354
3355 // Renames a table from one name to another from alter table call.
3356 //
3357 // If you do not implement this, the default rename_table() is called from
3358 // handler.cc and it will delete all files with the file extensions returned
3359 // by bas_ext().
3360 //
3361 // Called from sql_table.cc by mysql_rename_table().
rename_table(const char *,const char *)3362 int ha_sphinx::rename_table ( const char *, const char * )
3363 {
3364 SPH_ENTER_METHOD();
3365 SPH_RET(0);
3366 }
3367
3368
3369 // Given a starting key, and an ending key estimate the number of rows that
3370 // will exist between the two. end_key may be empty which in case determine
3371 // if start_key matches any rows.
3372 //
3373 // Called from opt_range.cc by check_quick_keys().
records_in_range(uint,key_range *,key_range *)3374 ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
3375 {
3376 SPH_ENTER_METHOD();
3377 SPH_RET(3); // low number to force index usage
3378 }
3379
3380 #if MYSQL_VERSION_ID < 50610
3381 #define user_defined_key_parts key_parts
3382 #endif
3383
3384 // create() is called to create a database. The variable name will have the name
3385 // of the table. When create() is called you do not need to worry about opening
3386 // the table. Also, the FRM file will have already been created so adjusting
3387 // create_info will not do you any good. You can overwrite the frm file at this
3388 // point if you wish to change the table definition, but there are no methods
3389 // currently provided for doing that.
3390 //
3391 // Called from handle.cc by ha_create_table().
create(const char * name,TABLE * table_arg,HA_CREATE_INFO *)3392 int ha_sphinx::create ( const char * name, TABLE * table_arg, HA_CREATE_INFO * )
3393 {
3394 SPH_ENTER_METHOD();
3395 char sError[256];
3396
3397 CSphSEShare tInfo;
3398 if ( !ParseUrl ( &tInfo, table_arg, true ) )
3399 SPH_RET(-1);
3400
3401 // check SphinxAPI table
3402 for ( ; !tInfo.m_bSphinxQL; )
3403 {
3404 // check system fields (count and types)
3405 if ( table_arg->s->fields<SPHINXSE_SYSTEM_COLUMNS )
3406 {
3407 my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
3408 name, SPHINXSE_SYSTEM_COLUMNS );
3409 break;
3410 }
3411
3412 if ( !IsIDField ( table_arg->field[0] ) )
3413 {
3414 my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
3415 break;
3416 }
3417
3418 if ( !IsIntegerFieldType ( table_arg->field[1]->type() ) )
3419 {
3420 my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
3421 break;
3422 }
3423
3424 enum_field_types f2 = table_arg->field[2]->type();
3425 if ( f2!=MYSQL_TYPE_VARCHAR
3426 && f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
3427 {
3428 my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
3429 break;
3430 }
3431
3432 // check attributes
3433 int i;
3434 for ( i=3; i<(int)table_arg->s->fields; i++ )
3435 {
3436 enum_field_types eType = table_arg->field[i]->type();
3437 if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3438 {
3439 my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
3440 name, i+1, table_arg->field[i]->field_name.str );
3441 break;
3442 }
3443 }
3444
3445 if ( i!=(int)table_arg->s->fields )
3446 break;
3447
3448 // check index
3449 if (
3450 table_arg->s->keys!=1 ||
3451 table_arg->key_info[0].user_defined_key_parts!=1 ||
3452 strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, table_arg->field[2]->field_name.str ) )
3453 {
3454 my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
3455 name, table_arg->field[2]->field_name.str );
3456 break;
3457 }
3458
3459 // all good
3460 sError[0] = '\0';
3461 break;
3462 }
3463
3464 // check SphinxQL table
3465 for ( ; tInfo.m_bSphinxQL; )
3466 {
3467 sError[0] = '\0';
3468
3469 // check that 1st column is id, is of int type, and has an index
3470 if ( strcmp ( table_arg->field[0]->field_name.str, "id" ) )
3471 {
3472 my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
3473 break;
3474 }
3475
3476 if ( !IsIDField ( table_arg->field[0] ) )
3477 {
3478 my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
3479 break;
3480 }
3481
3482 // check index
3483 if (
3484 table_arg->s->keys!=1 ||
3485 table_arg->key_info[0].user_defined_key_parts!=1 ||
3486 strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, "id" ) )
3487 {
3488 my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
3489 break;
3490 }
3491
3492 // check column types
3493 for ( int i=1; i<(int)table_arg->s->fields; i++ )
3494 {
3495 enum_field_types eType = table_arg->field[i]->type();
3496 if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3497 {
3498 my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
3499 name, i+1, table_arg->field[i]->field_name.str );
3500 break;
3501 }
3502 }
3503 if ( sError[0] )
3504 break;
3505
3506 // all good
3507 break;
3508 }
3509
3510 // report and bail
3511 if ( sError[0] )
3512 {
3513 my_printf_error(ER_CANT_CREATE_TABLE,
3514 "Can\'t create table %s.%s (Error: %s)",
3515 MYF(0),
3516 table_arg->s->db.str,
3517 table_arg->s->table_name.str, sError);
3518 SPH_RET(-1);
3519 }
3520
3521 SPH_RET(0);
3522 }
3523
3524 // show functions
3525
3526 #if MYSQL_VERSION_ID<50100
3527 #define SHOW_VAR_FUNC_BUFF_SIZE 1024
3528 #endif
3529
sphinx_get_stats(THD * thd,SHOW_VAR * out)3530 CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
3531 {
3532 #if MYSQL_VERSION_ID>50100
3533 if ( sphinx_hton_ptr )
3534 {
3535 CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3536
3537 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3538 return &pTls->m_pHeadTable->m_tStats;
3539 }
3540 #else
3541 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3542 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3543 return &pTls->m_pHeadTable->m_tStats;
3544 #endif
3545
3546 out->type = SHOW_CHAR;
3547 out->value = (char*) "";
3548 return 0;
3549 }
3550
sphinx_showfunc_total(THD * thd,SHOW_VAR * out,char *)3551 int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
3552 {
3553 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3554 if ( pStats )
3555 {
3556 out->type = SHOW_INT;
3557 out->value = (char *) &pStats->m_iMatchesTotal;
3558 }
3559 return 0;
3560 }
3561
sphinx_showfunc_total_found(THD * thd,SHOW_VAR * out,char *)3562 int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
3563 {
3564 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3565 if ( pStats )
3566 {
3567 out->type = SHOW_INT;
3568 out->value = (char *) &pStats->m_iMatchesFound;
3569 }
3570 return 0;
3571 }
3572
sphinx_showfunc_time(THD * thd,SHOW_VAR * out,char *)3573 int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
3574 {
3575 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3576 if ( pStats )
3577 {
3578 out->type = SHOW_INT;
3579 out->value = (char *) &pStats->m_iQueryMsec;
3580 }
3581 return 0;
3582 }
3583
sphinx_showfunc_word_count(THD * thd,SHOW_VAR * out,char *)3584 int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
3585 {
3586 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3587 if ( pStats )
3588 {
3589 out->type = SHOW_INT;
3590 out->value = (char *) &pStats->m_iWords;
3591 }
3592 return 0;
3593 }
3594
sphinx_showfunc_words(THD * thd,SHOW_VAR * out,char * sBuffer)3595 int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
3596 {
3597 #if MYSQL_VERSION_ID>50100
3598 if ( sphinx_hton_ptr )
3599 {
3600 CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3601 #else
3602 {
3603 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3604 #endif
3605 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3606 {
3607 CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
3608 if ( pStats && pStats->m_iWords )
3609 {
3610 uint uBuffLen = 0;
3611
3612 out->type = SHOW_CHAR;
3613 out->value = sBuffer;
3614
3615 // the following is partially based on code in sphinx_show_status()
3616 sBuffer[0] = 0;
3617 for ( int i=0; i<pStats->m_iWords; i++ )
3618 {
3619 CSphSEWordStats & tWord = pStats->m_dWords[i];
3620 uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
3621 tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
3622 }
3623
3624 if ( uBuffLen > 0 )
3625 {
3626 // trim last space
3627 sBuffer [ --uBuffLen ] = 0;
3628
3629 if ( pTls->m_pHeadTable->m_pQueryCharset )
3630 {
3631 // String::c_ptr() will nul-terminate the buffer.
3632 //
3633 // NOTE: It's not entirely clear whether this conversion is necessary at all.
3634
3635 String sConvert;
3636 uint iErrors;
3637 sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
3638 memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
3639 }
3640 }
3641
3642 return 0;
3643 }
3644 }
3645 }
3646
3647 out->type = SHOW_CHAR;
3648 out->value = (char*) "";
3649 return 0;
3650 }
3651
3652 int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
3653 {
3654 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3655 out->type = SHOW_CHAR;
3656 if ( pStats && pStats->m_bLastError )
3657 {
3658 out->value = pStats->m_sLastMessage;
3659 }
3660 else
3661 out->value = (char*)"";
3662 return 0;
3663 }
3664
3665 #if MYSQL_VERSION_ID>50100
3666 struct st_mysql_storage_engine sphinx_storage_engine =
3667 {
3668 MYSQL_HANDLERTON_INTERFACE_VERSION
3669 };
3670
3671 struct st_mysql_show_var sphinx_status_vars[] =
3672 {
3673 {"Sphinx_total", (char *)sphinx_showfunc_total, SHOW_SIMPLE_FUNC},
3674 {"Sphinx_total_found", (char *)sphinx_showfunc_total_found, SHOW_SIMPLE_FUNC},
3675 {"Sphinx_time", (char *)sphinx_showfunc_time, SHOW_SIMPLE_FUNC},
3676 {"Sphinx_word_count", (char *)sphinx_showfunc_word_count, SHOW_SIMPLE_FUNC},
3677 {"Sphinx_words", (char *)sphinx_showfunc_words, SHOW_SIMPLE_FUNC},
3678 {"Sphinx_error", (char *)sphinx_showfunc_error, SHOW_SIMPLE_FUNC},
3679 {0, 0, (enum_mysql_show_type)0}
3680 };
3681
3682
3683 maria_declare_plugin(sphinx)
3684 {
3685 MYSQL_STORAGE_ENGINE_PLUGIN,
3686 &sphinx_storage_engine,
3687 sphinx_hton_name,
3688 "Sphinx developers",
3689 sphinx_hton_comment,
3690 PLUGIN_LICENSE_GPL,
3691 sphinx_init_func, // Plugin Init
3692 sphinx_done_func, // Plugin Deinit
3693 0x0202, // 2.2
3694 sphinx_status_vars,
3695 NULL,
3696 SPHINXSE_VERSION, // string version
3697 MariaDB_PLUGIN_MATURITY_GAMMA
3698 }
3699 maria_declare_plugin_end;
3700
3701 #endif // >50100
3702
3703 //
3704 // $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
3705 //
3706