1 //
2 // $Id$
3 //
4
5 //
6 // Copyright (c) 2001-2016, Andrew Aksyonoff
7 // Copyright (c) 2008-2016, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15
16 #ifdef USE_PRAGMA_IMPLEMENTATION
17 #pragma implementation // gcc: Class implementation
18 #endif
19
20 #if _MSC_VER>=1400
21 #define _CRT_SECURE_NO_DEPRECATE 1
22 #define _CRT_NONSTDC_NO_DEPRECATE 1
23 #endif
24
25 #include <mysql_version.h>
26
27 #if MYSQL_VERSION_ID>=50515
28 #include "sql_class.h"
29 #include "sql_array.h"
30 #elif MYSQL_VERSION_ID>50100
31 #include "mysql_priv.h"
32 #include <mysql/plugin.h>
33 #else
34 #include "../mysql_priv.h"
35 #endif
36
37 #include <mysys_err.h>
38 #include <my_sys.h>
39 #include <mysql.h> // include client for INSERT table (sort of redoing federated..)
40
41 #ifndef __WIN__
42 // UNIX-specific
43 #include <my_net.h>
44 #include <netdb.h>
45 #include <sys/un.h>
46
47 #define RECV_FLAGS MSG_WAITALL
48
49 #define sphSockClose(_sock) ::close(_sock)
50 #else
51 // Windows-specific
52 #include <io.h>
53 #define strcasecmp stricmp
54 #define snprintf _snprintf
55
56 #define RECV_FLAGS 0
57
58 #define sphSockClose(_sock) ::closesocket(_sock)
59 #endif
60
61 #include <ctype.h>
62 #include "ha_sphinx.h"
63
64 #ifndef MSG_WAITALL
65 #define MSG_WAITALL 0
66 #endif
67
68 #if _MSC_VER>=1400
69 #pragma warning(push,4)
70 #endif
71
72 /////////////////////////////////////////////////////////////////////////////
73
74 /// there might be issues with min() on different platforms (eg. Gentoo, they say)
75 #define Min(a,b) ((a)<(b)?(a):(b))
76
77 /// unaligned RAM accesses are forbidden on SPARC
78 #if defined(sparc) || defined(__sparc__)
79 #define UNALIGNED_RAM_ACCESS 0
80 #else
81 #define UNALIGNED_RAM_ACCESS 1
82 #endif
83
84
85 #if UNALIGNED_RAM_ACCESS
86
87 /// pass-through wrapper
sphUnalignedRead(const T & tRef)88 template < typename T > inline T sphUnalignedRead ( const T & tRef )
89 {
90 return tRef;
91 }
92
93 /// pass-through wrapper
sphUnalignedWrite(void * pPtr,const T & tVal)94 template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
95 {
96 *(T*)pPtr = tVal;
97 }
98
99 #else
100
101 /// unaligned read wrapper for some architectures (eg. SPARC)
102 template < typename T >
sphUnalignedRead(const T & tRef)103 inline T sphUnalignedRead ( const T & tRef )
104 {
105 T uTmp;
106 byte * pSrc = (byte *) &tRef;
107 byte * pDst = (byte *) &uTmp;
108 for ( int i=0; i<(int)sizeof(T); i++ )
109 *pDst++ = *pSrc++;
110 return uTmp;
111 }
112
113 /// unaligned write wrapper for some architectures (eg. SPARC)
114 template < typename T >
sphUnalignedWrite(void * pPtr,const T & tVal)115 void sphUnalignedWrite ( void * pPtr, const T & tVal )
116 {
117 byte * pDst = (byte *) pPtr;
118 byte * pSrc = (byte *) &tVal;
119 for ( int i=0; i<(int)sizeof(T); i++ )
120 *pDst++ = *pSrc++;
121 }
122
123 #endif
124
125 #if MYSQL_VERSION_ID>=50515
126
127 #define sphinx_hash_init my_hash_init
128 #define sphinx_hash_free my_hash_free
129 #define sphinx_hash_search my_hash_search
130 #define sphinx_hash_delete my_hash_delete
131
132 #else
133
134 #define sphinx_hash_init hash_init
135 #define sphinx_hash_free hash_free
136 #define sphinx_hash_search hash_search
137 #define sphinx_hash_delete hash_delete
138
139 #endif
140
141 /////////////////////////////////////////////////////////////////////////////
142
143 // FIXME! make this all dynamic
144 #define SPHINXSE_MAX_FILTERS 32
145
146 #define SPHINXAPI_DEFAULT_HOST "127.0.0.1"
147 #define SPHINXAPI_DEFAULT_PORT 9312
148 #define SPHINXAPI_DEFAULT_INDEX "*"
149
150 #define SPHINXQL_DEFAULT_PORT 9306
151
152 #define SPHINXSE_SYSTEM_COLUMNS 3
153
154 #define SPHINXSE_MAX_ALLOC (16*1024*1024)
155 #define SPHINXSE_MAX_KEYWORDSTATS 4096
156
157 #define SPHINXSE_VERSION "2.2.11-release"
158
159 // FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
160 // cut-n-paste is somewhat simpler that adding dependencies however..
161
162 enum
163 {
164 SPHINX_SEARCHD_PROTO = 1,
165 SEARCHD_COMMAND_SEARCH = 0,
166 VER_COMMAND_SEARCH = 0x119,
167 };
168
169 /// search query sorting orders
170 enum ESphSortOrder
171 {
172 SPH_SORT_RELEVANCE = 0, ///< sort by document relevance desc, then by date
173 SPH_SORT_ATTR_DESC = 1, ///< sort by document date desc, then by relevance desc
174 SPH_SORT_ATTR_ASC = 2, ///< sort by document date asc, then by relevance desc
175 SPH_SORT_TIME_SEGMENTS = 3, ///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
176 SPH_SORT_EXTENDED = 4, ///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
177 SPH_SORT_EXPR = 5, ///< sort by expression
178
179 SPH_SORT_TOTAL
180 };
181
182 /// search query matching mode
183 enum ESphMatchMode
184 {
185 SPH_MATCH_ALL = 0, ///< match all query words
186 SPH_MATCH_ANY, ///< match any query word
187 SPH_MATCH_PHRASE, ///< match this exact phrase
188 SPH_MATCH_BOOLEAN, ///< match this boolean query
189 SPH_MATCH_EXTENDED, ///< match this extended query
190 SPH_MATCH_FULLSCAN, ///< match all document IDs w/o fulltext query, apply filters
191 SPH_MATCH_EXTENDED2, ///< extended engine V2
192
193 SPH_MATCH_TOTAL
194 };
195
196 /// search query relevance ranking mode
197 enum ESphRankMode
198 {
199 SPH_RANK_PROXIMITY_BM25 = 0, ///< default mode, phrase proximity major factor and BM25 minor one
200 SPH_RANK_BM25 = 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
201 SPH_RANK_NONE = 2, ///< no ranking, all matches get a weight of 1
202 SPH_RANK_WORDCOUNT = 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
203 SPH_RANK_PROXIMITY = 4, ///< phrase proximity
204 SPH_RANK_MATCHANY = 5, ///< emulate old match-any weighting
205 SPH_RANK_FIELDMASK = 6, ///< sets bits where there were matches
206 SPH_RANK_SPH04 = 7, ///< codename SPH04, phrase proximity + bm25 + head/exact boost
207 SPH_RANK_EXPR = 8, ///< expression based ranker
208
209 SPH_RANK_TOTAL,
210 SPH_RANK_DEFAULT = SPH_RANK_PROXIMITY_BM25
211 };
212
213 /// search query grouping mode
214 enum ESphGroupBy
215 {
216 SPH_GROUPBY_DAY = 0, ///< group by day
217 SPH_GROUPBY_WEEK = 1, ///< group by week
218 SPH_GROUPBY_MONTH = 2, ///< group by month
219 SPH_GROUPBY_YEAR = 3, ///< group by year
220 SPH_GROUPBY_ATTR = 4 ///< group by attribute value
221 };
222
223 /// known attribute types
224 enum
225 {
226 SPH_ATTR_NONE = 0, ///< not an attribute at all
227 SPH_ATTR_INTEGER = 1, ///< this attr is just an integer
228 SPH_ATTR_TIMESTAMP = 2, ///< this attr is a timestamp
229 SPH_ATTR_ORDINAL = 3, ///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
230 SPH_ATTR_BOOL = 4, ///< this attr is a boolean bit field
231 SPH_ATTR_FLOAT = 5,
232 SPH_ATTR_BIGINT = 6,
233 SPH_ATTR_STRING = 7, ///< string (binary; in-memory)
234
235 SPH_ATTR_UINT32SET = 0x40000001UL, ///< this attr is multiple int32 values (0 or more)
236 SPH_ATTR_UINT64SET = 0x40000002UL ///< this attr is multiple int64 values (0 or more)
237 };
238
239 /// known answers
240 enum
241 {
242 SEARCHD_OK = 0, ///< general success, command-specific reply follows
243 SEARCHD_ERROR = 1, ///< general failure, error message follows
244 SEARCHD_RETRY = 2, ///< temporary failure, error message follows, client should retry later
245 SEARCHD_WARNING = 3 ///< general success, warning message and command-specific reply follow
246 };
247
248 //////////////////////////////////////////////////////////////////////////////
249
250 #define SPHINX_DEBUG_OUTPUT 0
251 #define SPHINX_DEBUG_CALLS 0
252
253 #include <stdarg.h>
254
255 #if SPHINX_DEBUG_OUTPUT
SPH_DEBUG(const char * format,...)256 inline void SPH_DEBUG ( const char * format, ... )
257 {
258 va_list ap;
259 va_start ( ap, format );
260 fprintf ( stderr, "SphinxSE: " );
261 vfprintf ( stderr, format, ap );
262 fprintf ( stderr, "\n" );
263 va_end ( ap );
264 }
265 #else
SPH_DEBUG(const char *,...)266 inline void SPH_DEBUG ( const char *, ... ) {}
267 #endif
268
269 #if SPHINX_DEBUG_CALLS
270
271 #define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
272 #define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
273 #define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
274 #define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
275
276 #else
277
278 #define SPH_ENTER_FUNC()
279 #define SPH_ENTER_METHOD()
280 #define SPH_RET(_arg) { return(_arg); }
281 #define SPH_VOID_RET() { return; }
282
283 #endif
284
285
286 #define SafeDelete(_arg) { if ( _arg ) delete ( _arg ); (_arg) = NULL; }
287 #define SafeDeleteArray(_arg) { if ( _arg ) delete [] ( _arg ); (_arg) = NULL; }
288
289 //////////////////////////////////////////////////////////////////////////////
290
291 /// per-table structure that will be shared among all open Sphinx SE handlers
292 struct CSphSEShare
293 {
294 pthread_mutex_t m_tMutex;
295 THR_LOCK m_tLock;
296
297 char * m_sTable;
298 char * m_sScheme; ///< our connection string
299 char * m_sHost; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
300 char * m_sSocket; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
301 char * m_sIndex; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
302 ushort m_iPort;
303 bool m_bSphinxQL; ///< is this read-only SphinxAPI table, or write-only SphinxQL table?
304 uint m_iTableNameLen;
305 uint m_iUseCount;
306 #if MYSQL_VERSION_ID<50610
307 CHARSET_INFO * m_pTableQueryCharset;
308 #else
309 const CHARSET_INFO * m_pTableQueryCharset;
310 #endif
311
312 int m_iTableFields;
313 char ** m_sTableField;
314 enum_field_types * m_eTableFieldType;
315
CSphSEShareCSphSEShare316 CSphSEShare ()
317 : m_sTable ( NULL )
318 , m_sScheme ( NULL )
319 , m_sHost ( NULL )
320 , m_sSocket ( NULL )
321 , m_sIndex ( NULL )
322 , m_iPort ( 0 )
323 , m_bSphinxQL ( false )
324 , m_iTableNameLen ( 0 )
325 , m_iUseCount ( 1 )
326 , m_pTableQueryCharset ( NULL )
327
328 , m_iTableFields ( 0 )
329 , m_sTableField ( NULL )
330 , m_eTableFieldType ( NULL )
331 {
332 thr_lock_init ( &m_tLock );
333 pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
334 }
335
~CSphSEShareCSphSEShare336 ~CSphSEShare ()
337 {
338 pthread_mutex_destroy ( &m_tMutex );
339 thr_lock_delete ( &m_tLock );
340
341 SafeDeleteArray ( m_sTable );
342 SafeDeleteArray ( m_sScheme );
343 ResetTable ();
344 }
345
ResetTableCSphSEShare346 void ResetTable ()
347 {
348 for ( int i=0; i<m_iTableFields; i++ )
349 SafeDeleteArray ( m_sTableField[i] );
350 SafeDeleteArray ( m_sTableField );
351 SafeDeleteArray ( m_eTableFieldType );
352 }
353 };
354
355 /// schema attribute
356 struct CSphSEAttr
357 {
358 char * m_sName; ///< attribute name (received from Sphinx)
359 uint32 m_uType; ///< attribute type (received from Sphinx)
360 int m_iField; ///< field index in current table (-1 if none)
361
CSphSEAttrCSphSEAttr362 CSphSEAttr()
363 : m_sName ( NULL )
364 , m_uType ( SPH_ATTR_NONE )
365 , m_iField ( -1 )
366 {}
367
~CSphSEAttrCSphSEAttr368 ~CSphSEAttr ()
369 {
370 SafeDeleteArray ( m_sName );
371 }
372 };
373
374 /// word stats
375 struct CSphSEWordStats
376 {
377 char * m_sWord;
378 int m_iDocs;
379 int m_iHits;
380
CSphSEWordStatsCSphSEWordStats381 CSphSEWordStats ()
382 : m_sWord ( NULL )
383 , m_iDocs ( 0 )
384 , m_iHits ( 0 )
385 {}
386
~CSphSEWordStatsCSphSEWordStats387 ~CSphSEWordStats ()
388 {
389 SafeDeleteArray ( m_sWord );
390 }
391 };
392
393 /// request stats
394 struct CSphSEStats
395 {
396 public:
397 int m_iMatchesTotal;
398 int m_iMatchesFound;
399 int m_iQueryMsec;
400 int m_iWords;
401 CSphSEWordStats * m_dWords;
402 bool m_bLastError;
403 char m_sLastMessage[1024];
404
CSphSEStatsCSphSEStats405 CSphSEStats()
406 : m_dWords ( NULL )
407 {
408 Reset ();
409 }
410
ResetCSphSEStats411 void Reset ()
412 {
413 m_iMatchesTotal = 0;
414 m_iMatchesFound = 0;
415 m_iQueryMsec = 0;
416 m_iWords = 0;
417 SafeDeleteArray ( m_dWords );
418 m_bLastError = false;
419 m_sLastMessage[0] = '\0';
420 }
421
~CSphSEStatsCSphSEStats422 ~CSphSEStats()
423 {
424 Reset ();
425 }
426 };
427
428 /// thread local storage
429 struct CSphSEThreadTable
430 {
431 static const int MAX_QUERY_LEN = 262144; // 256k should be enough, right?
432
433 bool m_bStats;
434 CSphSEStats m_tStats;
435
436 bool m_bQuery;
437 char m_sQuery[MAX_QUERY_LEN];
438
439 #if MYSQL_VERSION_ID<50610
440 CHARSET_INFO * m_pQueryCharset;
441 #else
442 const CHARSET_INFO * m_pQueryCharset;
443 #endif
444
445 bool m_bReplace; ///< are we doing an INSERT or REPLACE
446
447 bool m_bCondId; ///< got a value from condition pushdown
448 longlong m_iCondId; ///< value acquired from id=value condition pushdown
449 bool m_bCondDone; ///< index_read() is now over
450
451 const ha_sphinx * m_pHandler;
452 CSphSEThreadTable * m_pTableNext;
453
CSphSEThreadTableCSphSEThreadTable454 CSphSEThreadTable ( const ha_sphinx * pHandler )
455 : m_bStats ( false )
456 , m_bQuery ( false )
457 , m_pQueryCharset ( NULL )
458 , m_bReplace ( false )
459 , m_bCondId ( false )
460 , m_iCondId ( 0 )
461 , m_bCondDone ( false )
462 , m_pHandler ( pHandler )
463 , m_pTableNext ( NULL )
464 {}
465 };
466
467
468 struct CSphTLS
469 {
470 CSphSEThreadTable * m_pHeadTable;
471
CSphTLSCSphTLS472 explicit CSphTLS ( const ha_sphinx * pHandler )
473 {
474 m_pHeadTable = new CSphSEThreadTable ( pHandler );
475 }
476
~CSphTLSCSphTLS477 ~CSphTLS()
478 {
479 CSphSEThreadTable * pCur = m_pHeadTable;
480 while ( pCur )
481 {
482 CSphSEThreadTable * pNext = pCur->m_pTableNext;
483 SafeDelete ( pCur );
484 pCur = pNext;
485 }
486 }
487 };
488
489
490 /// filter types
491 enum ESphFilter
492 {
493 SPH_FILTER_VALUES = 0, ///< filter by integer values set
494 SPH_FILTER_RANGE = 1, ///< filter by integer range
495 SPH_FILTER_FLOATRANGE = 2 ///< filter by float range
496 };
497
498
499 /// search query filter
500 struct CSphSEFilter
501 {
502 public:
503 ESphFilter m_eType;
504 char * m_sAttrName;
505 longlong m_uMinValue;
506 longlong m_uMaxValue;
507 float m_fMinValue;
508 float m_fMaxValue;
509 int m_iValues;
510 longlong * m_pValues;
511 int m_bExclude;
512
513 public:
CSphSEFilterCSphSEFilter514 CSphSEFilter ()
515 : m_eType ( SPH_FILTER_VALUES )
516 , m_sAttrName ( NULL )
517 , m_uMinValue ( 0 )
518 , m_uMaxValue ( UINT_MAX )
519 , m_fMinValue ( 0.0f )
520 , m_fMaxValue ( 0.0f )
521 , m_iValues ( 0 )
522 , m_pValues ( NULL )
523 , m_bExclude ( 0 )
524 {
525 }
526
~CSphSEFilterCSphSEFilter527 ~CSphSEFilter ()
528 {
529 SafeDeleteArray ( m_pValues );
530 }
531 };
532
533
534 /// float vs dword conversion
sphF2DW(float f)535 inline uint32 sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }
536
537 /// dword vs float conversion
sphDW2F(uint32 d)538 inline float sphDW2F ( uint32 d ) { union { float f; uint32 d; } u; u.d = d; return u.f; }
539
540
541 /// client-side search query
542 struct CSphSEQuery
543 {
544 public:
545 const char * m_sHost;
546 int m_iPort;
547
548 private:
549 char * m_sQueryBuffer;
550
551 const char * m_sIndex;
552 int m_iOffset;
553 int m_iLimit;
554
555 bool m_bQuery;
556 char * m_sQuery;
557 uint32 * m_pWeights;
558 int m_iWeights;
559 ESphMatchMode m_eMode;
560 ESphRankMode m_eRanker;
561 char * m_sRankExpr;
562 ESphSortOrder m_eSort;
563 char * m_sSortBy;
564 int m_iMaxMatches;
565 int m_iMaxQueryTime;
566 uint32 m_iMinID;
567 uint32 m_iMaxID;
568
569 int m_iFilters;
570 CSphSEFilter m_dFilters[SPHINXSE_MAX_FILTERS];
571
572 ESphGroupBy m_eGroupFunc;
573 char * m_sGroupBy;
574 char * m_sGroupSortBy;
575 int m_iCutoff;
576 int m_iRetryCount;
577 int m_iRetryDelay;
578 char * m_sGroupDistinct; ///< points to query buffer; do NOT delete
579 int m_iIndexWeights;
580 char * m_sIndexWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
581 int m_iIndexWeight[SPHINXSE_MAX_FILTERS];
582 int m_iFieldWeights;
583 char * m_sFieldWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
584 int m_iFieldWeight[SPHINXSE_MAX_FILTERS];
585
586 bool m_bGeoAnchor;
587 char * m_sGeoLatAttr;
588 char * m_sGeoLongAttr;
589 float m_fGeoLatitude;
590 float m_fGeoLongitude;
591
592 char * m_sComment;
593 char * m_sSelect;
594
595 struct Override_t
596 {
597 union Value_t
598 {
599 uint32 m_uValue;
600 longlong m_iValue64;
601 float m_fValue;
602 };
603 char * m_sName; ///< points to query buffer
604 int m_iType;
605 Dynamic_array<ulonglong> m_dIds;
606 Dynamic_array<Value_t> m_dValues;
607 };
608 Dynamic_array<Override_t *> m_dOverrides;
609
610 public:
611 char m_sParseError[256];
612
613 public:
614 CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
615 ~CSphSEQuery ();
616
617 bool Parse ();
618 int BuildRequest ( char ** ppBuffer );
619
620 protected:
621 char * m_pBuf;
622 char * m_pCur;
623 int m_iBufLeft;
624 bool m_bBufOverrun;
625
626 template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
627 bool ParseField ( char * sField );
628
629 void SendBytes ( const void * pBytes, int iBytes );
SendWordCSphSEQuery630 void SendWord ( short int v ) { v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
SendIntCSphSEQuery631 void SendInt ( int v ) { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
SendDwordCSphSEQuery632 void SendDword ( uint v ) { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
SendUint64CSphSEQuery633 void SendUint64 ( ulonglong v ) { SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
SendStringCSphSEQuery634 void SendString ( const char * v ) { int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
SendFloatCSphSEQuery635 void SendFloat ( float v ) { SendDword ( sphF2DW(v) ); }
636 };
637
638 template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
639 template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
640
641 //////////////////////////////////////////////////////////////////////////////
642
643 #if MYSQL_VERSION_ID>50100
644
645 #if MYSQL_VERSION_ID<50114
646 #error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
647 #endif
648
649 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
650 static int sphinx_init_func ( void * p );
651 static int sphinx_close_connection ( handlerton * hton, THD * thd );
652 static int sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
653 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
654
655 #else
656
657 static bool sphinx_init_func_for_handlerton ();
658 static int sphinx_close_connection ( THD * thd );
659 bool sphinx_show_status ( THD * thd );
660
661 #endif // >50100
662
663 //////////////////////////////////////////////////////////////////////////////
664
665 static const char sphinx_hton_name[] = "SPHINX";
666 static const char sphinx_hton_comment[] = "Sphinx storage engine " SPHINXSE_VERSION;
667
668 #if MYSQL_VERSION_ID<50100
669 handlerton sphinx_hton =
670 {
671 #ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
672 MYSQL_HANDLERTON_INTERFACE_VERSION,
673 #endif
674 sphinx_hton_name,
675 SHOW_OPTION_YES,
676 sphinx_hton_comment,
677 DB_TYPE_SPHINX_DB,
678 sphinx_init_func_for_handlerton,
679 0, // slot
680 0, // savepoint size
681 sphinx_close_connection, // close_connection
682 NULL, // savepoint
683 NULL, // rollback to savepoint
684 NULL, // release savepoint
685 NULL, // commit
686 NULL, // rollback
687 NULL, // prepare
688 NULL, // recover
689 NULL, // commit_by_xid
690 NULL, // rollback_by_xid
691 NULL, // create_cursor_read_view
692 NULL, // set_cursor_read_view
693 NULL, // close_cursor_read_view
694 HTON_CAN_RECREATE
695 };
696 #else
697 static handlerton * sphinx_hton_ptr = NULL;
698 #endif
699
700 //////////////////////////////////////////////////////////////////////////////
701
702 // variables for Sphinx shared methods
703 pthread_mutex_t sphinx_mutex; // mutex to init the hash
704 static int sphinx_init = 0; // flag whether the hash was initialized
705 static HASH sphinx_open_tables; // hash used to track open tables
706
707 //////////////////////////////////////////////////////////////////////////////
708 // INITIALIZATION AND SHUTDOWN
709 //////////////////////////////////////////////////////////////////////////////
710
711 // hashing function
712 #if MYSQL_VERSION_ID>=50120
713 typedef size_t GetKeyLength_t;
714 #else
715 typedef uint GetKeyLength_t;
716 #endif
717
sphinx_get_key(const byte * pSharePtr,GetKeyLength_t * pLength,my_bool)718 static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
719 {
720 CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
721 *pLength = (size_t) pShare->m_iTableNameLen;
722 return (byte*) pShare->m_sTable;
723 }
724
725 #if MYSQL_VERSION_ID<50100
sphinx_init_func(void *)726 static int sphinx_init_func ( void * ) // to avoid unused arg warning
727 #else
728 static int sphinx_init_func ( void * p )
729 #endif
730 {
731 SPH_ENTER_FUNC();
732 if ( !sphinx_init )
733 {
734 sphinx_init = 1;
735 void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
736 sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
737 sphinx_get_key, 0, 0 );
738
739 #if MYSQL_VERSION_ID > 50100
740 handlerton * hton = (handlerton*) p;
741 hton->state = SHOW_OPTION_YES;
742 hton->db_type = DB_TYPE_FIRST_DYNAMIC;
743 hton->create = sphinx_create_handler;
744 hton->close_connection = sphinx_close_connection;
745 hton->show_status = sphinx_show_status;
746 hton->panic = sphinx_panic;
747 hton->flags = HTON_CAN_RECREATE;
748 #endif
749 }
750 SPH_RET(0);
751 }
752
753
754 #if MYSQL_VERSION_ID<50100
sphinx_init_func_for_handlerton()755 static bool sphinx_init_func_for_handlerton ()
756 {
757 return sphinx_init_func ( &sphinx_hton );
758 }
759 #endif
760
761
762 #if MYSQL_VERSION_ID>50100
763
sphinx_close_connection(handlerton * hton,THD * thd)764 static int sphinx_close_connection ( handlerton * hton, THD * thd )
765 {
766 // deallocate common handler data
767 SPH_ENTER_FUNC();
768 void ** tmp = thd_ha_data ( thd, hton );
769 CSphTLS * pTls = (CSphTLS *) (*tmp);
770 SafeDelete ( pTls );
771 *tmp = NULL;
772 SPH_RET(0);
773 }
774
775
sphinx_done_func(void *)776 static int sphinx_done_func ( void * )
777 {
778 SPH_ENTER_FUNC();
779
780 int error = 0;
781 if ( sphinx_init )
782 {
783 sphinx_init = 0;
784 if ( sphinx_open_tables.records )
785 error = 1;
786 sphinx_hash_free ( &sphinx_open_tables );
787 pthread_mutex_destroy ( &sphinx_mutex );
788 }
789
790 SPH_RET(0);
791 }
792
793
sphinx_panic(handlerton * hton,enum ha_panic_function)794 static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
795 {
796 return sphinx_done_func ( hton );
797 }
798
799 #else
800
sphinx_close_connection(THD * thd)801 static int sphinx_close_connection ( THD * thd )
802 {
803 // deallocate common handler data
804 SPH_ENTER_FUNC();
805 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
806 SafeDelete ( pTls );
807 thd->ha_data[sphinx_hton.slot] = NULL;
808 SPH_RET(0);
809 }
810
811 #endif // >50100
812
813 //////////////////////////////////////////////////////////////////////////////
814 // SHOW STATUS
815 //////////////////////////////////////////////////////////////////////////////
816
817 #if MYSQL_VERSION_ID>50100
sphinx_show_status(handlerton * hton,THD * thd,stat_print_fn * stat_print,enum ha_stat_type)818 static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
819 enum ha_stat_type )
820 #else
821 bool sphinx_show_status ( THD * thd )
822 #endif
823 {
824 SPH_ENTER_FUNC();
825
826 #if MYSQL_VERSION_ID<50100
827 Protocol * protocol = thd->protocol;
828 List<Item> field_list;
829 #endif
830
831 char buf1[IO_SIZE];
832 uint buf1len;
833 char buf2[IO_SIZE];
834 uint buf2len = 0;
835 String words;
836
837 buf1[0] = '\0';
838 buf2[0] = '\0';
839
840
841 #if MYSQL_VERSION_ID>50100
842 // 5.1.x style stats
843 CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
844
845 #define LOC_STATS(_key,_keylen,_val,_vallen) \
846 stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
847
848 #else
849 // 5.0.x style stats
850 if ( have_sphinx_db!=SHOW_OPTION_YES )
851 {
852 my_message ( ER_NOT_SUPPORTED_YET,
853 "failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
854 MYF(0) );
855 SPH_RET(TRUE);
856 }
857 CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
858
859 field_list.push_back ( new Item_empty_string ( "Type", 10 ) );
860 field_list.push_back ( new Item_empty_string ( "Name", FN_REFLEN ) );
861 field_list.push_back ( new Item_empty_string ( "Status", 10 ) );
862 if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
863 SPH_RET(TRUE);
864
865 #define LOC_STATS(_key,_keylen,_val,_vallen) \
866 protocol->prepare_for_resend (); \
867 protocol->store ( "SPHINX", 6, system_charset_info ); \
868 protocol->store ( _key, _keylen, system_charset_info ); \
869 protocol->store ( _val, _vallen, system_charset_info ); \
870 if ( protocol->write() ) \
871 SPH_RET(TRUE);
872
873 #endif
874
875
876 // show query stats
877 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
878 {
879 const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
880 buf1len = my_snprintf ( buf1, sizeof(buf1),
881 "total: %d, total found: %d, time: %d, words: %d",
882 pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
883
884 LOC_STATS ( "stats", 5, buf1, buf1len );
885
886 if ( pStats->m_iWords )
887 {
888 for ( int i=0; i<pStats->m_iWords; i++ )
889 {
890 CSphSEWordStats & tWord = pStats->m_dWords[i];
891 buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
892 buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
893 }
894
895 // convert it if we can
896 const char * sWord = buf2;
897 int iWord = buf2len;
898
899 String sBuf3;
900 if ( pTls->m_pHeadTable->m_pQueryCharset )
901 {
902 uint iErrors;
903 sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
904 sWord = sBuf3.c_ptr();
905 iWord = sBuf3.length();
906 }
907
908 LOC_STATS ( "words", 5, sWord, iWord );
909 }
910 }
911
912 // show last error or warning (either in addition to stats, or on their own)
913 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
914 {
915 const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
916
917 LOC_STATS (
918 sMessageType, strlen ( sMessageType ),
919 pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
920
921 } else
922 {
923 // well, nothing to show just yet
924 #if MYSQL_VERSION_ID < 50100
925 LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
926 #endif
927 }
928
929 #if MYSQL_VERSION_ID < 50100
930 send_eof(thd);
931 #endif
932
933 SPH_RET(FALSE);
934 }
935
936 //////////////////////////////////////////////////////////////////////////////
937 // HELPERS
938 //////////////////////////////////////////////////////////////////////////////
939
sphDup(const char * sSrc,int iLen=-1)940 static char * sphDup ( const char * sSrc, int iLen=-1 )
941 {
942 if ( !sSrc )
943 return NULL;
944
945 if ( iLen<0 )
946 iLen = strlen(sSrc);
947
948 char * sRes = new char [ 1+iLen ];
949 memcpy ( sRes, sSrc, iLen );
950 sRes[iLen] = '\0';
951 return sRes;
952 }
953
954
sphLogError(const char * sFmt,...)955 static void sphLogError ( const char * sFmt, ... )
956 {
957 // emit timestamp
958 #ifdef __WIN__
959 SYSTEMTIME t;
960 GetLocalTime ( &t );
961
962 fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
963 (int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
964 (int)t.wHour, (int)t.wMinute, (int)t.wSecond );
965 #else
966 // Unix version
967 time_t tStamp;
968 time ( &tStamp );
969
970 struct tm * pParsed;
971 #ifdef HAVE_LOCALTIME_R
972 struct tm tParsed;
973 localtime_r ( &tStamp, &tParsed );
974 pParsed = &tParsed;
975 #else
976 pParsed = localtime ( &tStamp );
977 #endif // HAVE_LOCALTIME_R
978
979 fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
980 pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
981 pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
982 #endif // __WIN__
983
984 // emit message
985 va_list ap;
986 va_start ( ap, sFmt );
987 vfprintf ( stderr, sFmt, ap );
988 va_end ( ap );
989
990 // emit newline
991 fprintf ( stderr, "\n" );
992 }
993
994
995
996 // the following scheme variants are recognized
997 //
998 // sphinx://host[:port]/index
999 // sphinxql://host[:port]/index
1000 // unix://unix/domain/socket[:index]
ParseUrl(CSphSEShare * share,TABLE * table,bool bCreate)1001 static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
1002 {
1003 SPH_ENTER_FUNC();
1004
1005 if ( share )
1006 {
1007 // check incoming stuff
1008 if ( !table )
1009 {
1010 sphLogError ( "table==NULL in ParseUrl()" );
1011 return false;
1012 }
1013 if ( !table->s )
1014 {
1015 sphLogError ( "(table->s)==NULL in ParseUrl()" );
1016 return false;
1017 }
1018
1019 // free old stuff
1020 share->ResetTable ();
1021
1022 // fill new stuff
1023 share->m_iTableFields = table->s->fields;
1024 if ( share->m_iTableFields )
1025 {
1026 share->m_sTableField = new char * [ share->m_iTableFields ];
1027 share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
1028
1029 for ( int i=0; i<share->m_iTableFields; i++ )
1030 {
1031 share->m_sTableField[i] = sphDup ( table->field[i]->field_name );
1032 share->m_eTableFieldType[i] = table->field[i]->type();
1033 }
1034 }
1035 }
1036
1037 // defaults
1038 bool bOk = true;
1039 bool bQL = false;
1040 char * sScheme = NULL;
1041 char * sHost = SPHINXAPI_DEFAULT_HOST;
1042 char * sIndex = SPHINXAPI_DEFAULT_INDEX;
1043 int iPort = SPHINXAPI_DEFAULT_PORT;
1044
1045 // parse connection string, if any
1046 while ( table->s->connect_string.length!=0 )
1047 {
1048 sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
1049
1050 sHost = strstr ( sScheme, "://" );
1051 if ( !sHost )
1052 {
1053 bOk = false;
1054 break;
1055 }
1056 sHost[0] = '\0';
1057 sHost += 3;
1058
1059 /////////////////////////////
1060 // sphinxapi via unix socket
1061 /////////////////////////////
1062
1063 if ( !strcmp ( sScheme, "unix" ) )
1064 {
1065 sHost--; // reuse last slash
1066 iPort = 0;
1067 if (!( sIndex = strrchr ( sHost, ':' ) ))
1068 sIndex = SPHINXAPI_DEFAULT_INDEX;
1069 else
1070 {
1071 *sIndex++ = '\0';
1072 if ( !*sIndex )
1073 sIndex = SPHINXAPI_DEFAULT_INDEX;
1074 }
1075 bOk = true;
1076 break;
1077 }
1078
1079 /////////////////////
1080 // sphinxapi via tcp
1081 /////////////////////
1082
1083 if ( !strcmp ( sScheme, "sphinx" ) )
1084 {
1085 char * sPort = strchr ( sHost, ':' );
1086 if ( sPort )
1087 {
1088 *sPort++ = '\0';
1089 if ( *sPort )
1090 {
1091 sIndex = strchr ( sPort, '/' );
1092 if ( sIndex )
1093 *sIndex++ = '\0';
1094 else
1095 sIndex = SPHINXAPI_DEFAULT_INDEX;
1096
1097 iPort = atoi(sPort);
1098 if ( !iPort )
1099 iPort = SPHINXAPI_DEFAULT_PORT;
1100 }
1101 } else
1102 {
1103 sIndex = strchr ( sHost, '/' );
1104 if ( sIndex )
1105 *sIndex++ = '\0';
1106 else
1107 sIndex = SPHINXAPI_DEFAULT_INDEX;
1108 }
1109 bOk = true;
1110 break;
1111 }
1112
1113 ////////////
1114 // sphinxql
1115 ////////////
1116
1117 if ( !strcmp ( sScheme, "sphinxql" ) )
1118 {
1119 bQL = true;
1120 iPort = SPHINXQL_DEFAULT_PORT;
1121
1122 // handle port
1123 char * sPort = strchr ( sHost, ':' );
1124 sIndex = sHost; // starting point for index name search
1125
1126 if ( sPort )
1127 {
1128 *sPort++ = '\0';
1129 sIndex = sPort;
1130
1131 iPort = atoi(sPort);
1132 if ( !iPort )
1133 {
1134 bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
1135 break;
1136 }
1137 }
1138
1139 // find index
1140 sIndex = strchr ( sIndex, '/' );
1141 if ( sIndex )
1142 *sIndex++ = '\0';
1143
1144 // final checks
1145 // host and index names are required
1146 bOk = ( sHost && *sHost && sIndex && *sIndex );
1147 break;
1148 }
1149
1150 // unknown case
1151 bOk = false;
1152 break;
1153 }
1154
1155 if ( !bOk )
1156 {
1157 my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
1158 MYF(0), table->s->connect_string );
1159 } else
1160 {
1161 if ( share )
1162 {
1163 SafeDeleteArray ( share->m_sScheme );
1164 share->m_sScheme = sScheme;
1165 share->m_sHost = sHost;
1166 share->m_sIndex = sIndex;
1167 share->m_iPort = (ushort)iPort;
1168 share->m_bSphinxQL = bQL;
1169 }
1170 }
1171 if ( !bOk && !share )
1172 SafeDeleteArray ( sScheme );
1173
1174 SPH_RET(bOk);
1175 }
1176
1177
1178 // Example of simple lock controls. The "share" it creates is structure we will
1179 // pass to each sphinx handler. Do you have to have one of these? Well, you have
1180 // pieces that are used for locking, and they are needed to function.
get_share(const char * table_name,TABLE * table)1181 static CSphSEShare * get_share ( const char * table_name, TABLE * table )
1182 {
1183 SPH_ENTER_FUNC();
1184 pthread_mutex_lock ( &sphinx_mutex );
1185
1186 CSphSEShare * pShare = NULL;
1187 for ( ;; )
1188 {
1189 // check if we already have this share
1190 #if MYSQL_VERSION_ID>=50120
1191 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
1192 #else
1193 #ifdef __WIN__
1194 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
1195 #else
1196 pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
1197 #endif // win
1198 #endif // pre-5.1.20
1199
1200 if ( pShare )
1201 {
1202 pShare->m_iUseCount++;
1203 break;
1204 }
1205
1206 // try to allocate new share
1207 pShare = new CSphSEShare ();
1208 if ( !pShare )
1209 break;
1210
1211 // try to setup it
1212 if ( !ParseUrl ( pShare, table, false ) )
1213 {
1214 SafeDelete ( pShare );
1215 break;
1216 }
1217
1218 if ( !pShare->m_bSphinxQL )
1219 pShare->m_pTableQueryCharset = table->field[2]->charset();
1220
1221 // try to hash it
1222 pShare->m_iTableNameLen = strlen(table_name);
1223 pShare->m_sTable = sphDup ( table_name );
1224 if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
1225 {
1226 SafeDelete ( pShare );
1227 break;
1228 }
1229
1230 // all seems fine
1231 break;
1232 }
1233
1234 pthread_mutex_unlock ( &sphinx_mutex );
1235 SPH_RET(pShare);
1236 }
1237
1238
1239 // Free lock controls. We call this whenever we close a table. If the table had
1240 // the last reference to the share then we free memory associated with it.
free_share(CSphSEShare * pShare)1241 static int free_share ( CSphSEShare * pShare )
1242 {
1243 SPH_ENTER_FUNC();
1244 pthread_mutex_lock ( &sphinx_mutex );
1245
1246 if ( !--pShare->m_iUseCount )
1247 {
1248 sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
1249 SafeDelete ( pShare );
1250 }
1251
1252 pthread_mutex_unlock ( &sphinx_mutex );
1253 SPH_RET(0);
1254 }
1255
1256
1257 #if MYSQL_VERSION_ID>50100
sphinx_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)1258 static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
1259 {
1260 sphinx_hton_ptr = hton;
1261 return new ( mem_root ) ha_sphinx ( hton, table );
1262 }
1263 #endif
1264
1265 //////////////////////////////////////////////////////////////////////////////
1266 // CLIENT-SIDE REQUEST STUFF
1267 //////////////////////////////////////////////////////////////////////////////
1268
CSphSEQuery(const char * sQuery,int iLength,const char * sIndex)1269 CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
1270 : m_sHost ( "" )
1271 , m_iPort ( 0 )
1272 , m_sIndex ( sIndex ? sIndex : "*" )
1273 , m_iOffset ( 0 )
1274 , m_iLimit ( 20 )
1275 , m_bQuery ( false )
1276 , m_sQuery ( "" )
1277 , m_pWeights ( NULL )
1278 , m_iWeights ( 0 )
1279 , m_eMode ( SPH_MATCH_ALL )
1280 , m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
1281 , m_sRankExpr ( NULL )
1282 , m_eSort ( SPH_SORT_RELEVANCE )
1283 , m_sSortBy ( "" )
1284 , m_iMaxMatches ( 1000 )
1285 , m_iMaxQueryTime ( 0 )
1286 , m_iMinID ( 0 )
1287 , m_iMaxID ( 0 )
1288 , m_iFilters ( 0 )
1289 , m_eGroupFunc ( SPH_GROUPBY_DAY )
1290 , m_sGroupBy ( "" )
1291 , m_sGroupSortBy ( "@group desc" )
1292 , m_iCutoff ( 0 )
1293 , m_iRetryCount ( 0 )
1294 , m_iRetryDelay ( 0 )
1295 , m_sGroupDistinct ( "" )
1296 , m_iIndexWeights ( 0 )
1297 , m_iFieldWeights ( 0 )
1298 , m_bGeoAnchor ( false )
1299 , m_sGeoLatAttr ( "" )
1300 , m_sGeoLongAttr ( "" )
1301 , m_fGeoLatitude ( 0.0f )
1302 , m_fGeoLongitude ( 0.0f )
1303 , m_sComment ( "" )
1304 , m_sSelect ( "*" )
1305
1306 , m_pBuf ( NULL )
1307 , m_pCur ( NULL )
1308 , m_iBufLeft ( 0 )
1309 , m_bBufOverrun ( false )
1310 {
1311 m_sQueryBuffer = new char [ iLength+2 ];
1312 memcpy ( m_sQueryBuffer, sQuery, iLength );
1313 m_sQueryBuffer[iLength] = ';';
1314 m_sQueryBuffer[iLength+1] = '\0';
1315 }
1316
1317
~CSphSEQuery()1318 CSphSEQuery::~CSphSEQuery ()
1319 {
1320 SPH_ENTER_METHOD();
1321 SafeDeleteArray ( m_sQueryBuffer );
1322 SafeDeleteArray ( m_pWeights );
1323 SafeDeleteArray ( m_pBuf );
1324 for ( int i=0; i<m_dOverrides.elements(); i++ )
1325 SafeDelete ( m_dOverrides.at(i) );
1326 SPH_VOID_RET();
1327 }
1328
1329
1330 template < typename T >
ParseArray(T ** ppValues,const char * sValue)1331 int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
1332 {
1333 SPH_ENTER_METHOD();
1334
1335 assert ( ppValues );
1336 assert ( !(*ppValues) );
1337
1338 const char * pValue;
1339 bool bPrevDigit = false;
1340 int iValues = 0;
1341
1342 // count the values
1343 for ( pValue=sValue; *pValue; pValue++ )
1344 {
1345 bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1346 if ( bDigit && !bPrevDigit )
1347 iValues++;
1348 bPrevDigit = bDigit;
1349 }
1350 if ( !iValues )
1351 SPH_RET(0);
1352
1353 // extract the values
1354 T * pValues = new T [ iValues ];
1355 *ppValues = pValues;
1356
1357 int iIndex = 0, iSign = 1;
1358 T uValue = 0;
1359
1360 bPrevDigit = false;
1361 for ( pValue=sValue ;; pValue++ )
1362 {
1363 bool bDigit = (*pValue)>='0' && (*pValue)<='9';
1364
1365 if ( bDigit )
1366 {
1367 if ( !bPrevDigit )
1368 uValue = 0;
1369 uValue = uValue*10 + ( (*pValue)-'0' );
1370 } else if ( bPrevDigit )
1371 {
1372 assert ( iIndex<iValues );
1373 pValues [ iIndex++ ] = uValue * iSign;
1374 iSign = 1;
1375 } else if ( *pValue=='-' )
1376 iSign = -1;
1377
1378 bPrevDigit = bDigit;
1379 if ( !*pValue )
1380 break;
1381 }
1382
1383 SPH_RET ( iValues );
1384 }
1385
1386
chop(char * s)1387 static char * chop ( char * s )
1388 {
1389 while ( *s && isspace(*s) )
1390 s++;
1391
1392 char * p = s + strlen(s);
1393 while ( p>s && isspace ( p[-1] ) )
1394 p--;
1395 *p = '\0';
1396
1397 return s;
1398 }
1399
1400
myisattr(char c)1401 static bool myisattr ( char c )
1402 {
1403 return
1404 ( c>='0' && c<='9' ) ||
1405 ( c>='a' && c<='z' ) ||
1406 ( c>='A' && c<='Z' ) ||
1407 c=='_';
1408 }
1409
myismagic(char c)1410 static bool myismagic ( char c )
1411 {
1412 return c=='@';
1413 }
1414
1415
ParseField(char * sField)1416 bool CSphSEQuery::ParseField ( char * sField )
1417 {
1418 SPH_ENTER_METHOD();
1419
1420 // look for option name/value separator
1421 char * sValue = strchr ( sField, '=' );
1422 if ( !sValue || sValue==sField || sValue[-1]=='\\' )
1423 {
1424 // by default let's assume it's just query
1425 if ( sField[0] )
1426 {
1427 if ( m_bQuery )
1428 {
1429 snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
1430 SPH_RET(false);
1431 } else
1432 {
1433 m_sQuery = sField;
1434 m_bQuery = true;
1435
1436 // unescape only 1st one
1437 char *s = sField, *d = sField;
1438 int iSlashes = 0;
1439 while ( *s )
1440 {
1441 iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
1442 if ( ( iSlashes%2 )==0 ) *d++ = *s;
1443 s++;
1444 }
1445 *d = '\0';
1446 }
1447 }
1448 SPH_RET(true);
1449 }
1450
1451 // split
1452 *sValue++ = '\0';
1453 sValue = chop ( sValue );
1454 int iValue = atoi ( sValue );
1455
1456 // handle options
1457 char * sName = chop ( sField );
1458
1459 if ( !strcmp ( sName, "query" ) ) m_sQuery = sValue;
1460 else if ( !strcmp ( sName, "host" ) ) m_sHost = sValue;
1461 else if ( !strcmp ( sName, "port" ) ) m_iPort = iValue;
1462 else if ( !strcmp ( sName, "index" ) ) m_sIndex = sValue;
1463 else if ( !strcmp ( sName, "offset" ) ) m_iOffset = iValue;
1464 else if ( !strcmp ( sName, "limit" ) ) m_iLimit = iValue;
1465 else if ( !strcmp ( sName, "weights" ) ) m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
1466 else if ( !strcmp ( sName, "minid" ) ) m_iMinID = iValue;
1467 else if ( !strcmp ( sName, "maxid" ) ) m_iMaxID = iValue;
1468 else if ( !strcmp ( sName, "maxmatches" ) ) m_iMaxMatches = iValue;
1469 else if ( !strcmp ( sName, "maxquerytime" ) ) m_iMaxQueryTime = iValue;
1470 else if ( !strcmp ( sName, "groupsort" ) ) m_sGroupSortBy = sValue;
1471 else if ( !strcmp ( sName, "distinct" ) ) m_sGroupDistinct = sValue;
1472 else if ( !strcmp ( sName, "cutoff" ) ) m_iCutoff = iValue;
1473 else if ( !strcmp ( sName, "comment" ) ) m_sComment = sValue;
1474 else if ( !strcmp ( sName, "select" ) ) m_sSelect = sValue;
1475
1476 else if ( !strcmp ( sName, "mode" ) )
1477 {
1478 m_eMode = SPH_MATCH_ALL;
1479 if ( !strcmp ( sValue, "any" ) ) m_eMode = SPH_MATCH_ANY;
1480 else if ( !strcmp ( sValue, "phrase" ) ) m_eMode = SPH_MATCH_PHRASE;
1481 else if ( !strcmp ( sValue, "boolean" ) ) m_eMode = SPH_MATCH_BOOLEAN;
1482 else if ( !strcmp ( sValue, "ext" ) ) m_eMode = SPH_MATCH_EXTENDED;
1483 else if ( !strcmp ( sValue, "extended" ) ) m_eMode = SPH_MATCH_EXTENDED;
1484 else if ( !strcmp ( sValue, "ext2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
1485 else if ( !strcmp ( sValue, "extended2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
1486 else if ( !strcmp ( sValue, "all" ) ) m_eMode = SPH_MATCH_ALL;
1487 else if ( !strcmp ( sValue, "fullscan" ) ) m_eMode = SPH_MATCH_FULLSCAN;
1488 else
1489 {
1490 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
1491 SPH_RET(false);
1492 }
1493 } else if ( !strcmp ( sName, "ranker" ) )
1494 {
1495 m_eRanker = SPH_RANK_PROXIMITY_BM25;
1496 if ( !strcmp ( sValue, "proximity_bm25" ) ) m_eRanker = SPH_RANK_PROXIMITY_BM25;
1497 else if ( !strcmp ( sValue, "bm25" ) ) m_eRanker = SPH_RANK_BM25;
1498 else if ( !strcmp ( sValue, "none" ) ) m_eRanker = SPH_RANK_NONE;
1499 else if ( !strcmp ( sValue, "wordcount" ) ) m_eRanker = SPH_RANK_WORDCOUNT;
1500 else if ( !strcmp ( sValue, "proximity" ) ) m_eRanker = SPH_RANK_PROXIMITY;
1501 else if ( !strcmp ( sValue, "matchany" ) ) m_eRanker = SPH_RANK_MATCHANY;
1502 else if ( !strcmp ( sValue, "fieldmask" ) ) m_eRanker = SPH_RANK_FIELDMASK;
1503 else if ( !strcmp ( sValue, "sph04" ) ) m_eRanker = SPH_RANK_SPH04;
1504 else if ( !strncmp ( sValue, "expr:", 5 ) )
1505 {
1506 m_eRanker = SPH_RANK_EXPR;
1507 m_sRankExpr = sValue+5;
1508 } else
1509 {
1510 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
1511 SPH_RET(false);
1512 }
1513 } else if ( !strcmp ( sName, "sort" ) )
1514 {
1515 static const struct
1516 {
1517 const char * m_sName;
1518 ESphSortOrder m_eSort;
1519 } dSortModes[] =
1520 {
1521 { "relevance", SPH_SORT_RELEVANCE },
1522 { "attr_desc:", SPH_SORT_ATTR_DESC },
1523 { "attr_asc:", SPH_SORT_ATTR_ASC },
1524 { "time_segments:", SPH_SORT_TIME_SEGMENTS },
1525 { "extended:", SPH_SORT_EXTENDED },
1526 { "expr:", SPH_SORT_EXPR }
1527 };
1528
1529 int i;
1530 const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
1531 for ( i=0; i<nModes; i++ )
1532 if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
1533 {
1534 m_eSort = dSortModes[i].m_eSort;
1535 m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
1536 break;
1537 }
1538 if ( i==nModes )
1539 {
1540 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
1541 SPH_RET(false);
1542 }
1543
1544 } else if ( !strcmp ( sName, "groupby" ) )
1545 {
1546 static const struct
1547 {
1548 const char * m_sName;
1549 ESphGroupBy m_eFunc;
1550 } dGroupModes[] =
1551 {
1552 { "day:", SPH_GROUPBY_DAY },
1553 { "week:", SPH_GROUPBY_WEEK },
1554 { "month:", SPH_GROUPBY_MONTH },
1555 { "year:", SPH_GROUPBY_YEAR },
1556 { "attr:", SPH_GROUPBY_ATTR },
1557 };
1558
1559 int i;
1560 const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
1561 for ( i=0; i<nModes; i++ )
1562 if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
1563 {
1564 m_eGroupFunc = dGroupModes[i].m_eFunc;
1565 m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
1566 break;
1567 }
1568 if ( i==nModes )
1569 {
1570 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
1571 SPH_RET(false);
1572 }
1573
1574 } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1575 ( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
1576 {
1577 for ( ;; )
1578 {
1579 char * p = sName;
1580 CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1581 tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
1582 tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
1583
1584 if (!( p = strchr ( sValue, ',' ) ))
1585 break;
1586 *p++ = '\0';
1587
1588 tFilter.m_sAttrName = chop ( sValue );
1589 sValue = p;
1590
1591 if (!( p = strchr ( sValue, ',' ) ))
1592 break;
1593 *p++ = '\0';
1594
1595 if ( tFilter.m_eType==SPH_FILTER_RANGE )
1596 {
1597 tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
1598 tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
1599 } else
1600 {
1601 tFilter.m_fMinValue = (float)atof(sValue);
1602 tFilter.m_fMaxValue = (float)atof(p);
1603 }
1604
1605 // all ok
1606 m_iFilters++;
1607 break;
1608 }
1609
1610 } else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
1611 ( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
1612 {
1613 for ( ;; )
1614 {
1615 CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
1616 tFilter.m_eType = SPH_FILTER_VALUES;
1617 tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
1618
1619 // get the attr name
1620 while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
1621 sValue++;
1622 if ( !*sValue )
1623 break;
1624
1625 tFilter.m_sAttrName = sValue;
1626 while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) ) )
1627 sValue++;
1628 if ( !*sValue )
1629 break;
1630 *sValue++ = '\0';
1631
1632 // get the values
1633 tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
1634 if ( !tFilter.m_iValues )
1635 {
1636 assert ( !tFilter.m_pValues );
1637 break;
1638 }
1639
1640 // all ok
1641 m_iFilters++;
1642 break;
1643 }
1644
1645 } else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
1646 {
1647 bool bIndex = !strcmp ( sName, "indexweights" );
1648 int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
1649 char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
1650 int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
1651
1652 *pCount = 0;
1653
1654 char * p = sValue;
1655 while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
1656 {
1657 // extract attr name
1658 if ( !myisattr(*p) )
1659 {
1660 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
1661 SPH_RET(false);
1662 }
1663
1664 pNames[*pCount] = p;
1665 while ( myisattr(*p) ) p++;
1666
1667 if ( *p!=',' )
1668 {
1669 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1670 SPH_RET(false);
1671 }
1672 *p++ = '\0';
1673
1674 // extract attr value
1675 char * sVal = p;
1676 while ( isdigit(*p) ) p++;
1677 if ( p==sVal )
1678 {
1679 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
1680 SPH_RET(false);
1681 }
1682 pWeights[*pCount] = atoi(sVal);
1683 (*pCount)++;
1684
1685 if ( !*p )
1686 break;
1687 if ( *p!=',' )
1688 {
1689 snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
1690 SPH_RET(false);
1691 }
1692 p++;
1693 }
1694
1695 } else if ( !strcmp ( sName, "geoanchor" ) )
1696 {
1697 m_bGeoAnchor = false;
1698 for ( ;; )
1699 {
1700 char * sLat = sValue;
1701 char * p = sValue;
1702
1703 if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1704 char * sLong = p;
1705
1706 if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1707 char * sLatVal = p;
1708
1709 if (!( p = strchr ( p, ',' ) )) break; *p++ = '\0';
1710 char * sLongVal = p;
1711
1712 m_sGeoLatAttr = chop(sLat);
1713 m_sGeoLongAttr = chop(sLong);
1714 m_fGeoLatitude = (float)atof ( sLatVal );
1715 m_fGeoLongitude = (float)atof ( sLongVal );
1716 m_bGeoAnchor = true;
1717 break;
1718 }
1719 if ( !m_bGeoAnchor )
1720 {
1721 snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
1722 SPH_RET(false);
1723 }
1724 } else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
1725 {
1726 char * sName = NULL;
1727 int iType = 0;
1728 CSphSEQuery::Override_t * pOverride = NULL;
1729
1730 // get name and type
1731 char * sRest = sValue;
1732 for ( ;; )
1733 {
1734 sName = sRest;
1735 if ( !*sName )
1736 break;
1737 if (!( sRest = strchr ( sRest, ',' ) ))
1738 break;
1739 *sRest++ = '\0';
1740 char * sType = sRest;
1741 if (!( sRest = strchr ( sRest, ',' ) ))
1742 break;
1743
1744 static const struct
1745 {
1746 const char * m_sName;
1747 int m_iType;
1748 }
1749 dAttrTypes[] =
1750 {
1751 { "int", SPH_ATTR_INTEGER },
1752 { "timestamp", SPH_ATTR_TIMESTAMP },
1753 { "bool", SPH_ATTR_BOOL },
1754 { "float", SPH_ATTR_FLOAT },
1755 { "bigint", SPH_ATTR_BIGINT }
1756 };
1757 for ( int i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
1758 if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
1759 {
1760 iType = dAttrTypes[i].m_iType;
1761 break;
1762 }
1763 break;
1764 }
1765
1766 // fail
1767 if ( !sName || !*sName || !iType )
1768 {
1769 snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
1770 SPH_RET(false);
1771 }
1772
1773 // grab id:value pairs
1774 sRest++;
1775 while ( sRest )
1776 {
1777 char * sId = sRest;
1778 if (!( sRest = strchr ( sRest, ':' ) )) break; *sRest++ = '\0';
1779 if (!( sRest - sId )) break;
1780
1781 char * sValue = sRest;
1782 if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
1783 *sRest++ = '\0';
1784 if ( !*sValue )
1785 break;
1786
1787 if ( !pOverride )
1788 {
1789 pOverride = new CSphSEQuery::Override_t;
1790 pOverride->m_sName = chop(sName);
1791 pOverride->m_iType = iType;
1792 m_dOverrides.append ( pOverride );
1793 }
1794
1795 ulonglong uId = strtoull ( sId, NULL, 10 );
1796 CSphSEQuery::Override_t::Value_t tValue;
1797 if ( iType==SPH_ATTR_FLOAT )
1798 tValue.m_fValue = (float)atof(sValue);
1799 else if ( iType==SPH_ATTR_BIGINT )
1800 tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
1801 else
1802 tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
1803
1804 pOverride->m_dIds.append ( uId );
1805 pOverride->m_dValues.append ( tValue );
1806 }
1807
1808 if ( !pOverride )
1809 {
1810 snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
1811 SPH_RET(false);
1812 }
1813 SPH_RET(true);
1814 } else
1815 {
1816 snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
1817 SPH_RET(false);
1818 }
1819
1820 // !COMMIT handle syntax errors
1821
1822 SPH_RET(true);
1823 }
1824
1825
Parse()1826 bool CSphSEQuery::Parse ()
1827 {
1828 SPH_ENTER_METHOD();
1829 SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
1830
1831 m_bQuery = false;
1832 char * pCur = m_sQueryBuffer;
1833 char * pNext = pCur;
1834
1835 while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
1836 {
1837 // handle escaped semicolons
1838 if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
1839 {
1840 pNext++;
1841 continue;
1842 }
1843
1844 // handle semicolon-separated clauses
1845 *pNext++ = '\0';
1846 if ( !ParseField ( pCur ) )
1847 SPH_RET(false);
1848 pCur = pNext;
1849 }
1850
1851 SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
1852
1853 SPH_RET(true);
1854 }
1855
1856
SendBytes(const void * pBytes,int iBytes)1857 void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
1858 {
1859 SPH_ENTER_METHOD();
1860 if ( m_iBufLeft<iBytes )
1861 {
1862 m_bBufOverrun = true;
1863 SPH_VOID_RET();
1864 }
1865
1866 memcpy ( m_pCur, pBytes, iBytes );
1867
1868 m_pCur += iBytes;
1869 m_iBufLeft -= iBytes;
1870 SPH_VOID_RET();
1871 }
1872
1873
BuildRequest(char ** ppBuffer)1874 int CSphSEQuery::BuildRequest ( char ** ppBuffer )
1875 {
1876 SPH_ENTER_METHOD();
1877
1878 // calc request length
1879 int iReqSize = 128 + 4*m_iWeights
1880 + strlen ( m_sSortBy )
1881 + strlen ( m_sQuery )
1882 + strlen ( m_sIndex )
1883 + strlen ( m_sGroupBy )
1884 + strlen ( m_sGroupSortBy )
1885 + strlen ( m_sGroupDistinct )
1886 + strlen ( m_sComment )
1887 + strlen ( m_sSelect );
1888 if ( m_eRanker==SPH_RANK_EXPR )
1889 iReqSize += 4 + strlen(m_sRankExpr);
1890 for ( int i=0; i<m_iFilters; i++ )
1891 {
1892 const CSphSEFilter & tFilter = m_dFilters[i];
1893 iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
1894 switch ( tFilter.m_eType )
1895 {
1896 case SPH_FILTER_VALUES: iReqSize += 4 + 8*tFilter.m_iValues; break;
1897 case SPH_FILTER_RANGE: iReqSize += 16; break;
1898 case SPH_FILTER_FLOATRANGE: iReqSize += 8; break;
1899 }
1900 }
1901 if ( m_bGeoAnchor ) // 1.14+
1902 iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
1903 for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
1904 iReqSize += 8 + strlen(m_sIndexWeight[i] );
1905 for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
1906 iReqSize += 8 + strlen(m_sFieldWeight[i] );
1907 // overrides
1908 iReqSize += 4;
1909 for ( int i=0; i<m_dOverrides.elements(); i++ )
1910 {
1911 CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
1912 const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
1913 iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
1914 }
1915 // select
1916 iReqSize += 4;
1917
1918 m_iBufLeft = 0;
1919 SafeDeleteArray ( m_pBuf );
1920
1921 m_pBuf = new char [ iReqSize ];
1922 if ( !m_pBuf )
1923 SPH_RET(-1);
1924
1925 m_pCur = m_pBuf;
1926 m_iBufLeft = iReqSize;
1927 m_bBufOverrun = false;
1928 (*ppBuffer) = m_pBuf;
1929
1930 // build request
1931 SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
1932 SendWord ( VER_COMMAND_SEARCH ); // command version
1933 SendInt ( iReqSize-8 ); // packet body length
1934 SendInt ( 0 ); // its a client
1935
1936 SendInt ( 1 ); // number of queries
1937 SendInt ( m_iOffset );
1938 SendInt ( m_iLimit );
1939 SendInt ( m_eMode );
1940 SendInt ( m_eRanker ); // 1.16+
1941 if ( m_eRanker==SPH_RANK_EXPR )
1942 SendString ( m_sRankExpr );
1943 SendInt ( m_eSort );
1944 SendString ( m_sSortBy ); // sort attr
1945 SendString ( m_sQuery ); // query
1946 SendInt ( m_iWeights );
1947 for ( int j=0; j<m_iWeights; j++ )
1948 SendInt ( m_pWeights[j] ); // weights
1949 SendString ( m_sIndex ); // indexes
1950 SendInt ( 1 ); // id64 range follows
1951 SendUint64 ( m_iMinID ); // id/ts ranges
1952 SendUint64 ( m_iMaxID );
1953
1954 SendInt ( m_iFilters );
1955 for ( int j=0; j<m_iFilters; j++ )
1956 {
1957 const CSphSEFilter & tFilter = m_dFilters[j];
1958 SendString ( tFilter.m_sAttrName );
1959 SendInt ( tFilter.m_eType );
1960
1961 switch ( tFilter.m_eType )
1962 {
1963 case SPH_FILTER_VALUES:
1964 SendInt ( tFilter.m_iValues );
1965 for ( int k=0; k<tFilter.m_iValues; k++ )
1966 SendUint64 ( tFilter.m_pValues[k] );
1967 break;
1968
1969 case SPH_FILTER_RANGE:
1970 SendUint64 ( tFilter.m_uMinValue );
1971 SendUint64 ( tFilter.m_uMaxValue );
1972 break;
1973
1974 case SPH_FILTER_FLOATRANGE:
1975 SendFloat ( tFilter.m_fMinValue );
1976 SendFloat ( tFilter.m_fMaxValue );
1977 break;
1978 }
1979
1980 SendInt ( tFilter.m_bExclude );
1981 }
1982
1983 SendInt ( m_eGroupFunc );
1984 SendString ( m_sGroupBy );
1985 SendInt ( m_iMaxMatches );
1986 SendString ( m_sGroupSortBy );
1987 SendInt ( m_iCutoff ); // 1.9+
1988 SendInt ( m_iRetryCount ); // 1.10+
1989 SendInt ( m_iRetryDelay );
1990 SendString ( m_sGroupDistinct ); // 1.11+
1991 SendInt ( m_bGeoAnchor ); // 1.14+
1992 if ( m_bGeoAnchor )
1993 {
1994 SendString ( m_sGeoLatAttr );
1995 SendString ( m_sGeoLongAttr );
1996 SendFloat ( m_fGeoLatitude );
1997 SendFloat ( m_fGeoLongitude );
1998 }
1999 SendInt ( m_iIndexWeights ); // 1.15+
2000 for ( int i=0; i<m_iIndexWeights; i++ )
2001 {
2002 SendString ( m_sIndexWeight[i] );
2003 SendInt ( m_iIndexWeight[i] );
2004 }
2005 SendInt ( m_iMaxQueryTime ); // 1.17+
2006 SendInt ( m_iFieldWeights ); // 1.18+
2007 for ( int i=0; i<m_iFieldWeights; i++ )
2008 {
2009 SendString ( m_sFieldWeight[i] );
2010 SendInt ( m_iFieldWeight[i] );
2011 }
2012 SendString ( m_sComment );
2013
2014 // overrides
2015 SendInt ( m_dOverrides.elements() );
2016 for ( int i=0; i<m_dOverrides.elements(); i++ )
2017 {
2018 CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
2019 SendString ( pOverride->m_sName );
2020 SendDword ( pOverride->m_iType );
2021 SendInt ( pOverride->m_dIds.elements() );
2022 for ( int j=0; j<pOverride->m_dIds.elements(); j++ )
2023 {
2024 SendUint64 ( pOverride->m_dIds.at(j) );
2025 if ( pOverride->m_iType==SPH_ATTR_FLOAT )
2026 SendFloat ( pOverride->m_dValues.at(j).m_fValue );
2027 else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
2028 SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
2029 else
2030 SendDword ( pOverride->m_dValues.at(j).m_uValue );
2031 }
2032 }
2033
2034 // select
2035 SendString ( m_sSelect );
2036
2037 // detect buffer overruns and underruns, and report internal error
2038 if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
2039 SPH_RET(-1);
2040
2041 // all fine
2042 SPH_RET ( iReqSize );
2043 }
2044
2045 //////////////////////////////////////////////////////////////////////////////
2046 // SPHINX HANDLER
2047 //////////////////////////////////////////////////////////////////////////////
2048
2049 static const char * ha_sphinx_exts[] = { NullS };
2050
2051
2052 #if MYSQL_VERSION_ID<50100
ha_sphinx(TABLE_ARG * table)2053 ha_sphinx::ha_sphinx ( TABLE_ARG * table )
2054 : handler ( &sphinx_hton, table )
2055 #else
2056 ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
2057 : handler ( hton, table )
2058 #endif
2059 , m_pShare ( NULL )
2060 , m_iMatchesTotal ( 0 )
2061 , m_iCurrentPos ( 0 )
2062 , m_pCurrentKey ( NULL )
2063 , m_iCurrentKeyLen ( 0 )
2064 , m_pResponse ( NULL )
2065 , m_pResponseEnd ( NULL )
2066 , m_pCur ( NULL )
2067 , m_bUnpackError ( false )
2068 , m_iFields ( 0 )
2069 , m_dFields ( NULL )
2070 , m_iAttrs ( 0 )
2071 , m_dAttrs ( NULL )
2072 , m_bId64 ( 0 )
2073 , m_dUnboundFields ( NULL )
2074 {
2075 SPH_ENTER_METHOD();
2076 if ( current_thd )
2077 current_thd->variables.engine_condition_pushdown = true;
2078 SPH_VOID_RET();
2079 }
2080
2081
2082 // If frm_error() is called then we will use this to to find out what file extentions
2083 // exist for the storage engine. This is also used by the default rename_table and
2084 // delete_table method in handler.cc.
bas_ext() const2085 const char ** ha_sphinx::bas_ext() const
2086 {
2087 return ha_sphinx_exts;
2088 }
2089
2090
2091 // Used for opening tables. The name will be the name of the file.
2092 // A table is opened when it needs to be opened. For instance
2093 // when a request comes in for a select on the table (tables are not
2094 // open and closed for each request, they are cached).
2095 //
2096 // Called from handler.cc by handler::ha_open(). The server opens all tables by
2097 // calling ha_open() which then calls the handler specific open().
open(const char * name,int,uint)2098 int ha_sphinx::open ( const char * name, int, uint )
2099 {
2100 SPH_ENTER_METHOD();
2101 m_pShare = get_share ( name, table );
2102 if ( !m_pShare )
2103 SPH_RET(1);
2104
2105 thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
2106
2107 #if MYSQL_VERSION_ID>50100
2108 void ** tmp = thd_ha_data ( table->in_use, ht );
2109 if ( *tmp )
2110 {
2111 CSphTLS * pTls = (CSphTLS *)( *tmp );
2112 SafeDelete ( pTls );
2113 *tmp = NULL;
2114 }
2115 #else
2116 if ( table->in_use->ha_data [ sphinx_hton.slot ] )
2117 {
2118 CSphTLS * pTls = (CSphTLS *)( table->in_use->ha_data [ sphinx_hton.slot ] );
2119 SafeDelete ( pTls );
2120 table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
2121 }
2122 #endif
2123
2124 SPH_RET(0);
2125 }
2126
2127
Connect(const char * sHost,ushort uPort)2128 int ha_sphinx::Connect ( const char * sHost, ushort uPort )
2129 {
2130 struct sockaddr_in sin;
2131 #ifndef __WIN__
2132 struct sockaddr_un saun;
2133 #endif
2134
2135 int iDomain = 0;
2136 int iSockaddrSize = 0;
2137 struct sockaddr * pSockaddr = NULL;
2138
2139 in_addr_t ip_addr;
2140
2141 if ( uPort )
2142 {
2143 iDomain = AF_INET;
2144 iSockaddrSize = sizeof(sin);
2145 pSockaddr = (struct sockaddr *) &sin;
2146
2147 memset ( &sin, 0, sizeof(sin) );
2148 sin.sin_family = AF_INET;
2149 sin.sin_port = htons(uPort);
2150
2151 // prepare host address
2152 if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
2153 {
2154 memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
2155 } else
2156 {
2157 int tmp_errno;
2158 bool bError = false;
2159
2160 #if MYSQL_VERSION_ID>=50515
2161 struct addrinfo * hp = NULL;
2162 tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
2163 if ( tmp_errno!=0 || !hp || !hp->ai_addr )
2164 {
2165 bError = true;
2166 if ( hp )
2167 freeaddrinfo ( hp );
2168 }
2169 #else
2170 struct hostent tmp_hostent, *hp;
2171 char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
2172 hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
2173 if ( !hp )
2174 {
2175 my_gethostbyname_r_free();
2176 bError = true;
2177 }
2178 #endif
2179
2180 if ( bError )
2181 {
2182 char sError[256];
2183 my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
2184
2185 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2186 SPH_RET(-1);
2187 }
2188
2189 #if MYSQL_VERSION_ID>=50515
2190 memcpy ( &sin.sin_addr, &( (struct sockaddr_in *)hp->ai_addr )->sin_addr, sizeof(sin.sin_addr) );
2191 freeaddrinfo ( hp );
2192 #else
2193 memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
2194 my_gethostbyname_r_free();
2195 #endif
2196 }
2197 } else
2198 {
2199 #ifndef __WIN__
2200 iDomain = AF_UNIX;
2201 iSockaddrSize = sizeof(saun);
2202 pSockaddr = (struct sockaddr *) &saun;
2203
2204 memset ( &saun, 0, sizeof(saun) );
2205 saun.sun_family = AF_UNIX;
2206 strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
2207 #else
2208 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
2209 SPH_RET(-1);
2210 #endif
2211 }
2212
2213 char sError[512];
2214 int iSocket = socket ( iDomain, SOCK_STREAM, 0 );
2215
2216 if ( iSocket<0 )
2217 {
2218 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
2219 SPH_RET(-1);
2220 }
2221
2222 if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
2223 {
2224 sphSockClose ( iSocket );
2225 my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
2226 sHost, errno, (int)uPort );
2227 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2228 SPH_RET(-1);
2229 }
2230
2231 return iSocket;
2232 }
2233
2234
ConnectAPI(const char * sQueryHost,int iQueryPort)2235 int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
2236 {
2237 SPH_ENTER_METHOD();
2238
2239 const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
2240 ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
2241
2242 int iSocket = Connect ( sHost, uPort );
2243 if ( iSocket<0 )
2244 SPH_RET ( iSocket );
2245
2246 char sError[512];
2247
2248 int version;
2249 if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
2250 {
2251 sphSockClose ( iSocket );
2252 my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
2253 sHost, (int)uPort );
2254 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2255 SPH_RET(-1);
2256 }
2257
2258 uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
2259 if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
2260 {
2261 sphSockClose ( iSocket );
2262 my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
2263 sHost, (int)uPort );
2264 my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
2265 SPH_RET(-1);
2266 }
2267
2268 SPH_RET ( iSocket );
2269 }
2270
2271
2272 // Closes a table. We call the free_share() function to free any resources
2273 // that we have allocated in the "shared" structure.
2274 //
2275 // Called from sql_base.cc, sql_select.cc, and table.cc.
2276 // In sql_select.cc it is only used to close up temporary tables or during
2277 // the process where a temporary table is converted over to being a
2278 // myisam table.
2279 // For sql_base.cc look at close_data_tables().
close()2280 int ha_sphinx::close()
2281 {
2282 SPH_ENTER_METHOD();
2283 SPH_RET ( free_share ( m_pShare ) );
2284 }
2285
2286
HandleMysqlError(MYSQL * pConn,int iErrCode)2287 int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
2288 {
2289 CSphSEThreadTable * pTable = GetTls ();
2290 if ( pTable )
2291 {
2292 strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof ( pTable->m_tStats.m_sLastMessage ) );
2293 pTable->m_tStats.m_bLastError = true;
2294 }
2295
2296 mysql_close ( pConn );
2297
2298 my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
2299 return -1;
2300 }
2301
2302
extra(enum ha_extra_function op)2303 int ha_sphinx::extra ( enum ha_extra_function op )
2304 {
2305 CSphSEThreadTable * pTable = GetTls();
2306 if ( pTable )
2307 {
2308 if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
2309 pTable->m_bReplace = true;
2310 else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
2311 pTable->m_bReplace = false;
2312 }
2313 return 0;
2314 }
2315
2316
write_row(byte *)2317 int ha_sphinx::write_row ( byte * )
2318 {
2319 SPH_ENTER_METHOD();
2320 if ( !m_pShare || !m_pShare->m_bSphinxQL )
2321 SPH_RET ( HA_ERR_WRONG_COMMAND );
2322
2323 // SphinxQL inserts only, pretty much similar to abandoned federated
2324 char sQueryBuf[1024];
2325 char sValueBuf[1024];
2326
2327 String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2328 String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
2329 sQuery.length ( 0 );
2330 sValue.length ( 0 );
2331
2332 CSphSEThreadTable * pTable = GetTls ();
2333 sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
2334 sQuery.append ( m_pShare->m_sIndex );
2335 sQuery.append ( " (" );
2336
2337 for ( Field ** ppField = table->field; *ppField; ppField++ )
2338 {
2339 sQuery.append ( (*ppField)->field_name );
2340 if ( ppField[1] )
2341 sQuery.append ( ", " );
2342 }
2343 sQuery.append ( ") VALUES (" );
2344
2345 for ( Field ** ppField = table->field; *ppField; ppField++ )
2346 {
2347 if ( (*ppField)->is_null() )
2348 {
2349 sQuery.append ( "''" );
2350
2351 } else
2352 {
2353 if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
2354 {
2355 Item_field * pWrap = new Item_field ( *ppField ); // autofreed by query arena, I assume
2356 Item_func_unix_timestamp * pConv = new Item_func_unix_timestamp ( pWrap );
2357 pConv->quick_fix_field();
2358 unsigned int uTs = (unsigned int) pConv->val_int();
2359
2360 snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
2361 sQuery.append ( sValueBuf );
2362
2363 } else
2364 {
2365 (*ppField)->val_str ( &sValue );
2366
2367 int iLen = sValue.length();
2368 bool bMva = ( iLen>1 && sValue.ptr()[0]=='(' && sValue.ptr()[iLen-1]==')' );
2369
2370 if ( !bMva )
2371 sQuery.append ( "'" );
2372 sValue.print ( &sQuery );
2373 if ( !bMva )
2374 sQuery.append ( "'" );
2375
2376 sValue.length(0);
2377 }
2378 }
2379
2380 if ( ppField[1] )
2381 sQuery.append ( ", " );
2382 }
2383 sQuery.append ( ")" );
2384
2385 // FIXME? pretty inefficient to reconnect every time under high load,
2386 // but this was intentionally written for a low load scenario..
2387 MYSQL * pConn = mysql_init ( NULL );
2388 if ( !pConn )
2389 SPH_RET ( ER_OUT_OF_RESOURCES );
2390
2391 unsigned int uTimeout = 1;
2392 mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2393
2394 if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2395 SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2396
2397 if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2398 SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2399
2400 // all ok!
2401 mysql_close ( pConn );
2402 SPH_RET(0);
2403 }
2404
2405
IsIntegerFieldType(enum_field_types eType)2406 static inline bool IsIntegerFieldType ( enum_field_types eType )
2407 {
2408 return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
2409 }
2410
2411
IsIDField(Field * pField)2412 static inline bool IsIDField ( Field * pField )
2413 {
2414 enum_field_types eType = pField->type();
2415
2416 if ( eType==MYSQL_TYPE_LONGLONG )
2417 return true;
2418
2419 if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
2420 return true;
2421
2422 return false;
2423 }
2424
2425
delete_row(const byte *)2426 int ha_sphinx::delete_row ( const byte * )
2427 {
2428 SPH_ENTER_METHOD();
2429 if ( !m_pShare || !m_pShare->m_bSphinxQL )
2430 SPH_RET ( HA_ERR_WRONG_COMMAND );
2431
2432 char sQueryBuf[1024];
2433 String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
2434 sQuery.length ( 0 );
2435
2436 sQuery.append ( "DELETE FROM " );
2437 sQuery.append ( m_pShare->m_sIndex );
2438 sQuery.append ( " WHERE id=" );
2439
2440 char sValue[32];
2441 snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
2442 sQuery.append ( sValue );
2443
2444 // FIXME? pretty inefficient to reconnect every time under high load,
2445 // but this was intentionally written for a low load scenario..
2446 MYSQL * pConn = mysql_init ( NULL );
2447 if ( !pConn )
2448 SPH_RET ( ER_OUT_OF_RESOURCES );
2449
2450 unsigned int uTimeout = 1;
2451 mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
2452
2453 if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
2454 SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
2455
2456 if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
2457 SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
2458
2459 // all ok!
2460 mysql_close ( pConn );
2461 SPH_RET(0);
2462 }
2463
2464
update_row(const byte *,byte *)2465 int ha_sphinx::update_row ( const byte *, byte * )
2466 {
2467 SPH_ENTER_METHOD();
2468 SPH_RET ( HA_ERR_WRONG_COMMAND );
2469 }
2470
2471
2472 // keynr is key (index) number
2473 // sorted is 1 if result MUST be sorted according to index
index_init(uint keynr,bool)2474 int ha_sphinx::index_init ( uint keynr, bool )
2475 {
2476 SPH_ENTER_METHOD();
2477 active_index = keynr;
2478
2479 CSphSEThreadTable * pTable = GetTls();
2480 if ( pTable )
2481 pTable->m_bCondDone = false;
2482
2483 SPH_RET(0);
2484 }
2485
2486
index_end()2487 int ha_sphinx::index_end()
2488 {
2489 SPH_ENTER_METHOD();
2490 SPH_RET(0);
2491 }
2492
2493
CheckResponcePtr(int iLen)2494 bool ha_sphinx::CheckResponcePtr ( int iLen )
2495 {
2496 if ( m_pCur+iLen>m_pResponseEnd )
2497 {
2498 m_pCur = m_pResponseEnd;
2499 m_bUnpackError = true;
2500 return false;
2501 }
2502
2503 return true;
2504 }
2505
2506
UnpackDword()2507 uint32 ha_sphinx::UnpackDword ()
2508 {
2509 if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
2510 {
2511 return 0;
2512 }
2513
2514 uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
2515 m_pCur += sizeof(uint32); // NOLINT
2516 return uRes;
2517 }
2518
2519
UnpackString()2520 char * ha_sphinx::UnpackString ()
2521 {
2522 uint32 iLen = UnpackDword ();
2523 if ( !iLen )
2524 return NULL;
2525
2526 if ( !CheckResponcePtr ( iLen ) )
2527 {
2528 return NULL;
2529 }
2530
2531 char * sRes = new char [ 1+iLen ];
2532 memcpy ( sRes, m_pCur, iLen );
2533 sRes[iLen] = '\0';
2534 m_pCur += iLen;
2535 return sRes;
2536 }
2537
2538
FixNull(const char * s)2539 static inline const char * FixNull ( const char * s )
2540 {
2541 return s ? s : "(null)";
2542 }
2543
2544
UnpackSchema()2545 bool ha_sphinx::UnpackSchema ()
2546 {
2547 SPH_ENTER_METHOD();
2548
2549 // cleanup
2550 if ( m_dFields )
2551 for ( int i=0; i<(int)m_iFields; i++ )
2552 SafeDeleteArray ( m_dFields[i] );
2553 SafeDeleteArray ( m_dFields );
2554
2555 // unpack network packet
2556 uint32 uStatus = UnpackDword ();
2557 char * sMessage = NULL;
2558
2559 if ( uStatus!=SEARCHD_OK )
2560 {
2561 sMessage = UnpackString ();
2562 CSphSEThreadTable * pTable = GetTls ();
2563 if ( pTable )
2564 {
2565 strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
2566 pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
2567 }
2568
2569 if ( uStatus==SEARCHD_ERROR )
2570 {
2571 char sError[1024];
2572 my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
2573 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2574 SafeDeleteArray ( sMessage );
2575 SPH_RET ( false );
2576 }
2577 }
2578
2579 m_iFields = UnpackDword ();
2580 m_dFields = new char * [ m_iFields ];
2581 if ( !m_dFields )
2582 {
2583 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
2584 SPH_RET(false);
2585 }
2586
2587 for ( uint32 i=0; i<m_iFields; i++ )
2588 m_dFields[i] = UnpackString ();
2589
2590 SafeDeleteArray ( m_dAttrs );
2591 m_iAttrs = UnpackDword ();
2592 m_dAttrs = new CSphSEAttr [ m_iAttrs ];
2593 if ( !m_dAttrs )
2594 {
2595 for ( int i=0; i<(int)m_iFields; i++ )
2596 SafeDeleteArray ( m_dFields[i] );
2597 SafeDeleteArray ( m_dFields );
2598 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
2599 SPH_RET(false);
2600 }
2601
2602 for ( uint32 i=0; i<m_iAttrs; i++ )
2603 {
2604 m_dAttrs[i].m_sName = UnpackString ();
2605 m_dAttrs[i].m_uType = UnpackDword ();
2606 if ( m_bUnpackError ) // m_sName may be null
2607 break;
2608
2609 m_dAttrs[i].m_iField = -1;
2610 for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
2611 {
2612 const char * sTableField = m_pShare->m_sTableField[j];
2613 const char * sAttrField = m_dAttrs[i].m_sName;
2614 if ( m_dAttrs[i].m_sName[0]=='@' )
2615 {
2616 const char * sAtPrefix = "_sph_";
2617 if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
2618 continue;
2619 sTableField += strlen(sAtPrefix);
2620 sAttrField++;
2621 }
2622
2623 if ( !strcasecmp ( sAttrField, sTableField ) )
2624 {
2625 // we're almost good, but
2626 // let's enforce that timestamp columns can only receive timestamp attributes
2627 if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
2628 m_dAttrs[i].m_iField = j;
2629 break;
2630 }
2631 }
2632 }
2633
2634 m_iMatchesTotal = UnpackDword ();
2635
2636 m_bId64 = UnpackDword ();
2637 if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
2638 {
2639 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
2640 SPH_RET(false);
2641 }
2642
2643 // network packet unpacked; build unbound fields map
2644 SafeDeleteArray ( m_dUnboundFields );
2645 m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
2646
2647 for ( int i=0; i<m_pShare->m_iTableFields; i++ )
2648 {
2649 if ( i<SPHINXSE_SYSTEM_COLUMNS )
2650 m_dUnboundFields[i] = SPH_ATTR_NONE;
2651
2652 else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
2653 m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
2654
2655 else
2656 m_dUnboundFields[i] = SPH_ATTR_INTEGER;
2657 }
2658
2659 for ( uint32 i=0; i<m_iAttrs; i++ )
2660 if ( m_dAttrs[i].m_iField>=0 )
2661 m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
2662
2663 if ( m_bUnpackError )
2664 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
2665
2666 SPH_RET ( !m_bUnpackError );
2667 }
2668
2669
UnpackStats(CSphSEStats * pStats)2670 bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
2671 {
2672 assert ( pStats );
2673
2674 char * pCurSave = m_pCur;
2675 for ( uint i=0; i<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2676 {
2677 m_pCur += m_bId64 ? 12 : 8; // skip id+weight
2678 for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
2679 {
2680 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
2681 {
2682 // skip MVA list
2683 uint32 uCount = UnpackDword ();
2684 m_pCur += uCount*4;
2685 } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
2686 {
2687 uint32 iLen = UnpackDword();
2688 m_pCur += iLen;
2689 } else // skip normal value
2690 m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
2691 }
2692 }
2693
2694 pStats->m_iMatchesTotal = UnpackDword ();
2695 pStats->m_iMatchesFound = UnpackDword ();
2696 pStats->m_iQueryMsec = UnpackDword ();
2697 pStats->m_iWords = UnpackDword ();
2698
2699 if ( m_bUnpackError )
2700 return false;
2701
2702 SafeDeleteArray ( pStats->m_dWords );
2703 if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
2704 return false;
2705 pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
2706 if ( !pStats->m_dWords )
2707 return false;
2708
2709 for ( int i=0; i<pStats->m_iWords; i++ )
2710 {
2711 CSphSEWordStats & tWord = pStats->m_dWords[i];
2712 tWord.m_sWord = UnpackString ();
2713 tWord.m_iDocs = UnpackDword ();
2714 tWord.m_iHits = UnpackDword ();
2715 }
2716
2717 if ( m_bUnpackError )
2718 return false;
2719
2720 m_pCur = pCurSave;
2721 return true;
2722 }
2723
2724
2725 /// condition pushdown implementation, to properly intercept WHERE clauses on my columns
2726 #if MYSQL_VERSION_ID<50610
cond_push(const COND * cond)2727 const COND * ha_sphinx::cond_push ( const COND * cond )
2728 #else
2729 const Item * ha_sphinx::cond_push ( const Item *cond )
2730 #endif
2731 {
2732 // catch the simplest case: query_column="some text"
2733 for ( ;; )
2734 {
2735 if ( cond->type()!=Item::FUNC_ITEM )
2736 break;
2737
2738 Item_func * condf = (Item_func *)cond;
2739 if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
2740 break;
2741
2742 // get my tls
2743 CSphSEThreadTable * pTable = GetTls ();
2744 if ( !pTable )
2745 break;
2746
2747 Item ** args = condf->arguments();
2748 if ( !m_pShare->m_bSphinxQL )
2749 {
2750 // on non-QL tables, intercept query=value condition for SELECT
2751 if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::STRING_ITEM ))
2752 break;
2753
2754 Item_field * pField = (Item_field *) args[0];
2755 if ( pField->field->field_index!=2 ) // FIXME! magic key index
2756 break;
2757
2758 // copy the query, and let know that we intercepted this condition
2759 Item_string * pString = (Item_string *) args[1];
2760 pTable->m_bQuery = true;
2761 strncpy ( pTable->m_sQuery, pString->str_value.c_ptr(), sizeof(pTable->m_sQuery) );
2762 pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
2763 pTable->m_pQueryCharset = pString->str_value.charset();
2764
2765 } else
2766 {
2767 if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::INT_ITEM ))
2768 break;
2769
2770 // on QL tables, intercept id=value condition for DELETE
2771 Item_field * pField = (Item_field *) args[0];
2772 if ( pField->field->field_index!=0 ) // FIXME! magic key index
2773 break;
2774
2775 Item_int * pVal = (Item_int *) args[1];
2776 pTable->m_iCondId = pVal->val_int();
2777 pTable->m_bCondId = true;
2778 }
2779
2780 // we intercepted this condition
2781 return NULL;
2782 }
2783
2784 // don't change anything
2785 return cond;
2786 }
2787
2788
2789 /// condition popup
cond_pop()2790 void ha_sphinx::cond_pop ()
2791 {
2792 CSphSEThreadTable * pTable = GetTls ();
2793 if ( pTable )
2794 pTable->m_bQuery = false;
2795 }
2796
2797
2798 /// get TLS (maybe allocate it, too)
GetTls()2799 CSphSEThreadTable * ha_sphinx::GetTls()
2800 {
2801 SPH_ENTER_METHOD()
2802 // where do we store that pointer in today's version?
2803 CSphTLS ** ppTls;
2804 #if MYSQL_VERSION_ID>50100
2805 ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
2806 #else
2807 ppTls = (CSphTLS**) ¤t_thd->ha_data[sphinx_hton.slot];
2808 #endif // >50100
2809
2810 CSphSEThreadTable * pTable = NULL;
2811 // allocate if needed
2812 if ( !*ppTls )
2813 {
2814 *ppTls = new CSphTLS ( this );
2815 pTable = (*ppTls)->m_pHeadTable;
2816 } else
2817 {
2818 pTable = (*ppTls)->m_pHeadTable;
2819 }
2820
2821 while ( pTable && pTable->m_pHandler!=this )
2822 pTable = pTable->m_pTableNext;
2823
2824 if ( !pTable )
2825 {
2826 pTable = new CSphSEThreadTable ( this );
2827 pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
2828 (*ppTls)->m_pHeadTable = pTable;
2829 }
2830
2831 // errors will be handled by caller
2832 return pTable;
2833 }
2834
2835
2836 // Positions an index cursor to the index specified in the handle. Fetches the
2837 // row if available. If the key value is null, begin at the first key of the
2838 // index.
index_read(byte * buf,const byte * key,uint key_len,enum ha_rkey_function)2839 int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
2840 {
2841 SPH_ENTER_METHOD();
2842 char sError[256];
2843
2844 // set new data for thd->ha_data, it is used in show_status
2845 CSphSEThreadTable * pTable = GetTls();
2846 if ( !pTable )
2847 {
2848 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
2849 SPH_RET ( HA_ERR_END_OF_FILE );
2850 }
2851 pTable->m_tStats.Reset ();
2852
2853 // sphinxql table, just return the key once
2854 if ( m_pShare->m_bSphinxQL )
2855 {
2856 // over and out
2857 if ( pTable->m_bCondDone )
2858 SPH_RET ( HA_ERR_END_OF_FILE );
2859
2860 // return a value from pushdown, if any
2861 if ( pTable->m_bCondId )
2862 {
2863 table->field[0]->store ( pTable->m_iCondId, 1 );
2864 pTable->m_bCondDone = true;
2865 SPH_RET(0);
2866 }
2867
2868 // return a value from key
2869 longlong iRef = 0;
2870 if ( key_len==4 )
2871 iRef = uint4korr ( key );
2872 else if ( key_len==8 )
2873 iRef = uint8korr ( key );
2874 else
2875 {
2876 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
2877 SPH_RET ( HA_ERR_END_OF_FILE );
2878 }
2879
2880 table->field[0]->store ( iRef, 1 );
2881 pTable->m_bCondDone = true;
2882 SPH_RET(0);
2883 }
2884
2885 // parse query
2886 if ( pTable->m_bQuery )
2887 {
2888 // we have a query from condition pushdown
2889 m_pCurrentKey = (const byte *) pTable->m_sQuery;
2890 m_iCurrentKeyLen = strlen(pTable->m_sQuery);
2891 } else
2892 {
2893 // just use the key (might be truncated)
2894 m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
2895 m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
2896 pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
2897 }
2898
2899 CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
2900 if ( !q.Parse () )
2901 {
2902 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
2903 SPH_RET ( HA_ERR_END_OF_FILE );
2904 }
2905
2906 // do connect
2907 int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
2908 if ( iSocket<0 )
2909 SPH_RET ( HA_ERR_END_OF_FILE );
2910
2911 // my buffer
2912 char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
2913 int iReqLen = q.BuildRequest ( &pBuffer );
2914
2915 if ( iReqLen<=0 )
2916 {
2917 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
2918 SPH_RET ( HA_ERR_END_OF_FILE );
2919 }
2920
2921 // send request
2922 ::send ( iSocket, pBuffer, iReqLen, 0 );
2923
2924 // receive reply
2925 char sHeader[8];
2926 int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
2927 if ( iGot!=sizeof(sHeader) )
2928 {
2929 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
2930 SPH_RET ( HA_ERR_END_OF_FILE );
2931 }
2932
2933 short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
2934 short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
2935 uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
2936 SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
2937 uRespStatus, uRespVersion, uRespLength );
2938
2939 SafeDeleteArray ( m_pResponse );
2940 if ( uRespLength<=SPHINXSE_MAX_ALLOC )
2941 m_pResponse = new char [ uRespLength+1 ];
2942
2943 if ( !m_pResponse )
2944 {
2945 my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
2946 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2947 SPH_RET ( HA_ERR_END_OF_FILE );
2948 }
2949
2950 int iRecvLength = 0;
2951 while ( iRecvLength<(int)uRespLength )
2952 {
2953 int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
2954 if ( iRecv<0 )
2955 break;
2956 iRecvLength += iRecv;
2957 }
2958
2959 ::closesocket ( iSocket );
2960 iSocket = -1;
2961
2962 if ( iRecvLength!=(int)uRespLength )
2963 {
2964 my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
2965 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2966 SPH_RET ( HA_ERR_END_OF_FILE );
2967 }
2968
2969 // we'll have a message, at least
2970 pTable->m_bStats = true;
2971
2972 // parse reply
2973 m_iCurrentPos = 0;
2974 m_pCur = m_pResponse;
2975 m_pResponseEnd = m_pResponse + uRespLength;
2976 m_bUnpackError = false;
2977
2978 if ( uRespStatus!=SEARCHD_OK )
2979 {
2980 char * sMessage = UnpackString ();
2981 if ( !sMessage )
2982 {
2983 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
2984 uRespStatus, uRespLength );
2985 SPH_RET ( HA_ERR_END_OF_FILE );
2986 }
2987
2988 strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof(pTable->m_tStats.m_sLastMessage) );
2989 SafeDeleteArray ( sMessage );
2990
2991 if ( uRespStatus!=SEARCHD_WARNING )
2992 {
2993 my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
2994 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
2995
2996 pTable->m_tStats.m_bLastError = true;
2997 SPH_RET ( HA_ERR_END_OF_FILE );
2998 }
2999 }
3000
3001 if ( !UnpackSchema () )
3002 SPH_RET ( HA_ERR_END_OF_FILE );
3003
3004 if ( !UnpackStats ( &pTable->m_tStats ) )
3005 {
3006 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
3007 SPH_RET ( HA_ERR_END_OF_FILE );
3008 }
3009
3010 SPH_RET ( get_rec ( buf, key, key_len ) );
3011 }
3012
3013
3014 // Positions an index cursor to the index specified in key. Fetches the
3015 // row if any. This is only used to read whole keys.
index_read_idx(byte *,uint,const byte *,uint,enum ha_rkey_function)3016 int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
3017 {
3018 SPH_ENTER_METHOD();
3019 SPH_RET ( HA_ERR_WRONG_COMMAND );
3020 }
3021
3022
3023 // Used to read forward through the index.
index_next(byte * buf)3024 int ha_sphinx::index_next ( byte * buf )
3025 {
3026 SPH_ENTER_METHOD();
3027 SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
3028 }
3029
3030
index_next_same(byte * buf,const byte * key,uint keylen)3031 int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
3032 {
3033 SPH_ENTER_METHOD();
3034 SPH_RET ( get_rec ( buf, key, keylen ) );
3035 }
3036
3037 #ifndef PRIi64
3038 #define PRIi64 "lld"
3039 #endif
3040
3041 #define INT64_FMT "%" PRIi64
3042
get_rec(byte * buf,const byte *,uint)3043 int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
3044 {
3045 SPH_ENTER_METHOD();
3046
3047 if ( m_iCurrentPos>=m_iMatchesTotal )
3048 {
3049 SafeDeleteArray ( m_pResponse );
3050 SPH_RET ( HA_ERR_END_OF_FILE );
3051 }
3052
3053 #if MYSQL_VERSION_ID>50100
3054 my_bitmap_map * org_bitmap = dbug_tmp_use_all_columns ( table, table->write_set );
3055 #endif
3056 Field ** field = table->field;
3057
3058 // unpack and return the match
3059 longlong uMatchID = UnpackDword ();
3060 if ( m_bId64 )
3061 uMatchID = ( uMatchID<<32 ) + UnpackDword();
3062 uint32 uMatchWeight = UnpackDword ();
3063
3064 field[0]->store ( uMatchID, 1 );
3065 field[1]->store ( uMatchWeight, 1 );
3066 field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
3067
3068 for ( uint32 i=0; i<m_iAttrs; i++ )
3069 {
3070 longlong iValue64 = 0;
3071 uint32 uValue = UnpackDword ();
3072 if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
3073 iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
3074 if ( m_dAttrs[i].m_iField<0 )
3075 {
3076 // skip MVA or String
3077 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
3078 {
3079 for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3080 UnpackDword();
3081 } else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
3082 {
3083 m_pCur += uValue;
3084 }
3085 continue;
3086 }
3087
3088 Field * af = field [ m_dAttrs[i].m_iField ];
3089 switch ( m_dAttrs[i].m_uType )
3090 {
3091 case SPH_ATTR_INTEGER:
3092 case SPH_ATTR_ORDINAL:
3093 case SPH_ATTR_BOOL:
3094 af->store ( uValue, 1 );
3095 break;
3096
3097 case SPH_ATTR_FLOAT:
3098 af->store ( sphDW2F(uValue) );
3099 break;
3100
3101 case SPH_ATTR_TIMESTAMP:
3102 if ( af->type()==MYSQL_TYPE_TIMESTAMP )
3103 longstore ( af->ptr, uValue ); // because store() does not accept timestamps
3104 else
3105 af->store ( uValue, 1 );
3106 break;
3107
3108 case SPH_ATTR_BIGINT:
3109 af->store ( iValue64, 0 );
3110 break;
3111
3112 case SPH_ATTR_STRING:
3113 if ( !uValue )
3114 af->store ( "", 0, &my_charset_bin );
3115 else if ( CheckResponcePtr ( uValue ) )
3116 {
3117 af->store ( m_pCur, uValue, &my_charset_bin );
3118 m_pCur += uValue;
3119 }
3120 break;
3121
3122 case SPH_ATTR_UINT64SET:
3123 case SPH_ATTR_UINT32SET :
3124 if ( uValue<=0 )
3125 {
3126 // shortcut, empty MVA set
3127 af->store ( "", 0, &my_charset_bin );
3128
3129 } else
3130 {
3131 // convert MVA set to comma-separated string
3132 char sBuf[1024]; // FIXME! magic size
3133 char * pCur = sBuf;
3134
3135 if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
3136 {
3137 for ( ; uValue>0 && !m_bUnpackError; uValue-- )
3138 {
3139 uint32 uEntry = UnpackDword ();
3140 if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
3141 {
3142 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
3143 while ( *pCur ) pCur++;
3144 if ( uValue>1 )
3145 *pCur++ = ','; // non-trailing commas
3146 }
3147 }
3148 } else
3149 {
3150 for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
3151 {
3152 longlong uEntry = UnpackDword ();
3153 uEntry = uEntry<<32;
3154 uEntry += UnpackDword();
3155 if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
3156 {
3157 snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, INT64_FMT, uEntry );
3158 while ( *pCur ) pCur++;
3159 if ( uValue>2 )
3160 *pCur++ = ','; // non-trailing commas
3161 }
3162 }
3163 }
3164
3165 af->store ( sBuf, pCur-sBuf, &my_charset_bin );
3166 }
3167 break;
3168
3169 default:
3170 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
3171 SafeDeleteArray ( m_pResponse );
3172 SPH_RET ( HA_ERR_END_OF_FILE );
3173 }
3174 }
3175
3176 if ( m_bUnpackError )
3177 {
3178 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
3179 SafeDeleteArray ( m_pResponse );
3180 SPH_RET ( HA_ERR_END_OF_FILE );
3181 }
3182
3183 // zero out unmapped fields
3184 for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
3185 if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
3186 switch ( m_dUnboundFields[i] )
3187 {
3188 case SPH_ATTR_INTEGER: table->field[i]->store ( 0, 1 ); break;
3189 case SPH_ATTR_TIMESTAMP: longstore ( table->field[i]->ptr, 0 ); break;
3190 default:
3191 my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
3192 "INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
3193 SafeDeleteArray ( m_pResponse );
3194 SPH_RET ( HA_ERR_END_OF_FILE );
3195 }
3196
3197 memset ( buf, 0, table->s->null_bytes );
3198 m_iCurrentPos++;
3199
3200 #if MYSQL_VERSION_ID > 50100
3201 dbug_tmp_restore_column_map ( table->write_set, org_bitmap );
3202 #endif
3203
3204 SPH_RET(0);
3205 }
3206
3207
3208 // Used to read backwards through the index.
index_prev(byte *)3209 int ha_sphinx::index_prev ( byte * )
3210 {
3211 SPH_ENTER_METHOD();
3212 SPH_RET ( HA_ERR_WRONG_COMMAND );
3213 }
3214
3215
3216 // index_first() asks for the first key in the index.
3217 //
3218 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3219 // and sql_select.cc.
index_first(byte *)3220 int ha_sphinx::index_first ( byte * )
3221 {
3222 SPH_ENTER_METHOD();
3223 SPH_RET ( HA_ERR_END_OF_FILE );
3224 }
3225
3226 // index_last() asks for the last key in the index.
3227 //
3228 // Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
3229 // and sql_select.cc.
index_last(byte *)3230 int ha_sphinx::index_last ( byte * )
3231 {
3232 SPH_ENTER_METHOD();
3233 SPH_RET ( HA_ERR_WRONG_COMMAND );
3234 }
3235
3236
rnd_init(bool)3237 int ha_sphinx::rnd_init ( bool )
3238 {
3239 SPH_ENTER_METHOD();
3240 SPH_RET(0);
3241 }
3242
3243
rnd_end()3244 int ha_sphinx::rnd_end()
3245 {
3246 SPH_ENTER_METHOD();
3247 SPH_RET(0);
3248 }
3249
3250
rnd_next(byte *)3251 int ha_sphinx::rnd_next ( byte * )
3252 {
3253 SPH_ENTER_METHOD();
3254 SPH_RET ( HA_ERR_END_OF_FILE );
3255 }
3256
3257
position(const byte *)3258 void ha_sphinx::position ( const byte * )
3259 {
3260 SPH_ENTER_METHOD();
3261 SPH_VOID_RET();
3262 }
3263
3264
3265 // This is like rnd_next, but you are given a position to use
3266 // to determine the row. The position will be of the type that you stored in
3267 // ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
3268 // or position you saved when position() was called.
3269 // Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
rnd_pos(byte *,byte *)3270 int ha_sphinx::rnd_pos ( byte *, byte * )
3271 {
3272 SPH_ENTER_METHOD();
3273 SPH_RET ( HA_ERR_WRONG_COMMAND );
3274 }
3275
3276
3277 #if MYSQL_VERSION_ID>=50030
info(uint)3278 int ha_sphinx::info ( uint )
3279 #else
3280 void ha_sphinx::info ( uint )
3281 #endif
3282 {
3283 SPH_ENTER_METHOD();
3284
3285 if ( table->s->keys>0 )
3286 table->key_info[0].rec_per_key[0] = 1;
3287
3288 #if MYSQL_VERSION_ID>50100
3289 stats.records = 20;
3290 #else
3291 records = 20;
3292 #endif
3293
3294 #if MYSQL_VERSION_ID>=50030
3295 SPH_RET(0);
3296 #else
3297 SPH_VOID_RET();
3298 #endif
3299 }
3300
3301
reset()3302 int ha_sphinx::reset ()
3303 {
3304 SPH_ENTER_METHOD();
3305 CSphSEThreadTable * pTable = GetTls ();
3306 if ( pTable )
3307 pTable->m_bQuery = false;
3308 SPH_RET(0);
3309 }
3310
3311
delete_all_rows()3312 int ha_sphinx::delete_all_rows()
3313 {
3314 SPH_ENTER_METHOD();
3315 SPH_RET ( HA_ERR_WRONG_COMMAND );
3316 }
3317
3318
3319 // First you should go read the section "locking functions for mysql" in
3320 // lock.cc to understand this.
3321 // This create a lock on the table. If you are implementing a storage engine
3322 // that can handle transacations look at ha_berkely.cc to see how you will
3323 // want to go about doing this. Otherwise you should consider calling flock()
3324 // here.
3325 //
3326 // Called from lock.cc by lock_external() and unlock_external(). Also called
3327 // from sql_table.cc by copy_data_between_tables().
external_lock(THD *,int)3328 int ha_sphinx::external_lock ( THD *, int )
3329 {
3330 SPH_ENTER_METHOD();
3331 SPH_RET(0);
3332 }
3333
3334
store_lock(THD *,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)3335 THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
3336 enum thr_lock_type lock_type )
3337 {
3338 SPH_ENTER_METHOD();
3339
3340 if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
3341 m_tLock.type = lock_type;
3342
3343 *to++ = &m_tLock;
3344 SPH_RET(to);
3345 }
3346
3347
delete_table(const char *)3348 int ha_sphinx::delete_table ( const char * )
3349 {
3350 SPH_ENTER_METHOD();
3351 SPH_RET(0);
3352 }
3353
3354
3355 // Renames a table from one name to another from alter table call.
3356 //
3357 // If you do not implement this, the default rename_table() is called from
3358 // handler.cc and it will delete all files with the file extentions returned
3359 // by bas_ext().
3360 //
3361 // Called from sql_table.cc by mysql_rename_table().
rename_table(const char *,const char *)3362 int ha_sphinx::rename_table ( const char *, const char * )
3363 {
3364 SPH_ENTER_METHOD();
3365 SPH_RET(0);
3366 }
3367
3368
3369 // Given a starting key, and an ending key estimate the number of rows that
3370 // will exist between the two. end_key may be empty which in case determine
3371 // if start_key matches any rows.
3372 //
3373 // Called from opt_range.cc by check_quick_keys().
records_in_range(uint,key_range *,key_range *)3374 ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
3375 {
3376 SPH_ENTER_METHOD();
3377 SPH_RET(3); // low number to force index usage
3378 }
3379
3380 #if MYSQL_VERSION_ID < 50610
3381 #define user_defined_key_parts key_parts
3382 #endif
3383
3384 // create() is called to create a database. The variable name will have the name
3385 // of the table. When create() is called you do not need to worry about opening
3386 // the table. Also, the FRM file will have already been created so adjusting
3387 // create_info will not do you any good. You can overwrite the frm file at this
3388 // point if you wish to change the table definition, but there are no methods
3389 // currently provided for doing that.
3390 //
3391 // Called from handle.cc by ha_create_table().
create(const char * name,TABLE * table,HA_CREATE_INFO *)3392 int ha_sphinx::create ( const char * name, TABLE * table, HA_CREATE_INFO * )
3393 {
3394 SPH_ENTER_METHOD();
3395 char sError[256];
3396
3397 CSphSEShare tInfo;
3398 if ( !ParseUrl ( &tInfo, table, true ) )
3399 SPH_RET(-1);
3400
3401 // check SphinxAPI table
3402 for ( ; !tInfo.m_bSphinxQL; )
3403 {
3404 // check system fields (count and types)
3405 if ( table->s->fields<SPHINXSE_SYSTEM_COLUMNS )
3406 {
3407 my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
3408 name, SPHINXSE_SYSTEM_COLUMNS );
3409 break;
3410 }
3411
3412 if ( !IsIDField ( table->field[0] ) )
3413 {
3414 my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
3415 break;
3416 }
3417
3418 if ( !IsIntegerFieldType ( table->field[1]->type() ) )
3419 {
3420 my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
3421 break;
3422 }
3423
3424 enum_field_types f2 = table->field[2]->type();
3425 if ( f2!=MYSQL_TYPE_VARCHAR
3426 && f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
3427 {
3428 my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
3429 break;
3430 }
3431
3432 // check attributes
3433 int i;
3434 for ( i=3; i<(int)table->s->fields; i++ )
3435 {
3436 enum_field_types eType = table->field[i]->type();
3437 if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3438 {
3439 my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
3440 name, i+1, table->field[i]->field_name );
3441 break;
3442 }
3443 }
3444
3445 if ( i!=(int)table->s->fields )
3446 break;
3447
3448 // check index
3449 if (
3450 table->s->keys!=1 ||
3451 table->key_info[0].user_defined_key_parts!=1 ||
3452 strcasecmp ( table->key_info[0].key_part[0].field->field_name, table->field[2]->field_name ) )
3453 {
3454 my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
3455 name, table->field[2]->field_name );
3456 break;
3457 }
3458
3459 // all good
3460 sError[0] = '\0';
3461 break;
3462 }
3463
3464 // check SphinxQL table
3465 for ( ; tInfo.m_bSphinxQL; )
3466 {
3467 sError[0] = '\0';
3468
3469 // check that 1st column is id, is of int type, and has an index
3470 if ( strcmp ( table->field[0]->field_name, "id" ) )
3471 {
3472 my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
3473 break;
3474 }
3475
3476 if ( !IsIDField ( table->field[0] ) )
3477 {
3478 my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
3479 break;
3480 }
3481
3482 // check index
3483 if (
3484 table->s->keys!=1 ||
3485 table->key_info[0].user_defined_key_parts!=1 ||
3486 strcasecmp ( table->key_info[0].key_part[0].field->field_name, "id" ) )
3487 {
3488 my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
3489 break;
3490 }
3491
3492 // check column types
3493 for ( int i=1; i<(int)table->s->fields; i++ )
3494 {
3495 enum_field_types eType = table->field[i]->type();
3496 if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
3497 {
3498 my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
3499 name, i+1, table->field[i]->field_name );
3500 break;
3501 }
3502 }
3503 if ( sError[0] )
3504 break;
3505
3506 // all good
3507 break;
3508 }
3509
3510 // report and bail
3511 if ( sError[0] )
3512 {
3513 my_error ( ER_CANT_CREATE_TABLE, MYF(0), sError, -1 );
3514 SPH_RET(-1);
3515 }
3516
3517 SPH_RET(0);
3518 }
3519
3520 // show functions
3521
3522 #if MYSQL_VERSION_ID<50100
3523 #define SHOW_VAR_FUNC_BUFF_SIZE 1024
3524 #endif
3525
sphinx_get_stats(THD * thd,SHOW_VAR * out)3526 CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
3527 {
3528 #if MYSQL_VERSION_ID>50100
3529 if ( sphinx_hton_ptr )
3530 {
3531 CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3532
3533 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3534 return &pTls->m_pHeadTable->m_tStats;
3535 }
3536 #else
3537 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3538 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3539 return &pTls->m_pHeadTable->m_tStats;
3540 #endif
3541
3542 out->type = SHOW_CHAR;
3543 out->value = "";
3544 return 0;
3545 }
3546
sphinx_showfunc_total(THD * thd,SHOW_VAR * out,char *)3547 int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
3548 {
3549 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3550 if ( pStats )
3551 {
3552 out->type = SHOW_INT;
3553 out->value = (char *) &pStats->m_iMatchesTotal;
3554 }
3555 return 0;
3556 }
3557
sphinx_showfunc_total_found(THD * thd,SHOW_VAR * out,char *)3558 int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
3559 {
3560 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3561 if ( pStats )
3562 {
3563 out->type = SHOW_INT;
3564 out->value = (char *) &pStats->m_iMatchesFound;
3565 }
3566 return 0;
3567 }
3568
sphinx_showfunc_time(THD * thd,SHOW_VAR * out,char *)3569 int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
3570 {
3571 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3572 if ( pStats )
3573 {
3574 out->type = SHOW_INT;
3575 out->value = (char *) &pStats->m_iQueryMsec;
3576 }
3577 return 0;
3578 }
3579
sphinx_showfunc_word_count(THD * thd,SHOW_VAR * out,char *)3580 int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
3581 {
3582 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3583 if ( pStats )
3584 {
3585 out->type = SHOW_INT;
3586 out->value = (char *) &pStats->m_iWords;
3587 }
3588 return 0;
3589 }
3590
sphinx_showfunc_words(THD * thd,SHOW_VAR * out,char * sBuffer)3591 int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
3592 {
3593 #if MYSQL_VERSION_ID>50100
3594 if ( sphinx_hton_ptr )
3595 {
3596 CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
3597 #else
3598 {
3599 CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
3600 #endif
3601 if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
3602 {
3603 CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
3604 if ( pStats && pStats->m_iWords )
3605 {
3606 uint uBuffLen = 0;
3607
3608 out->type = SHOW_CHAR;
3609 out->value = sBuffer;
3610
3611 // the following is partially based on code in sphinx_show_status()
3612 sBuffer[0] = 0;
3613 for ( int i=0; i<pStats->m_iWords; i++ )
3614 {
3615 CSphSEWordStats & tWord = pStats->m_dWords[i];
3616 uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
3617 tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
3618 }
3619
3620 if ( uBuffLen > 0 )
3621 {
3622 // trim last space
3623 sBuffer [ --uBuffLen ] = 0;
3624
3625 if ( pTls->m_pHeadTable->m_pQueryCharset )
3626 {
3627 // String::c_ptr() will nul-terminate the buffer.
3628 //
3629 // NOTE: It's not entirely clear whether this conversion is necessary at all.
3630
3631 String sConvert;
3632 uint iErrors;
3633 sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
3634 memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
3635 }
3636 }
3637
3638 return 0;
3639 }
3640 }
3641 }
3642
3643 out->type = SHOW_CHAR;
3644 out->value = "";
3645 return 0;
3646 }
3647
3648 int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
3649 {
3650 CSphSEStats * pStats = sphinx_get_stats ( thd, out );
3651 if ( pStats && pStats->m_bLastError )
3652 {
3653 out->type = SHOW_CHAR;
3654 out->value = pStats->m_sLastMessage;
3655 }
3656 return 0;
3657 }
3658
3659 #if MYSQL_VERSION_ID>50100
3660 struct st_mysql_storage_engine sphinx_storage_engine =
3661 {
3662 MYSQL_HANDLERTON_INTERFACE_VERSION
3663 };
3664
3665 struct st_mysql_show_var sphinx_status_vars[] =
3666 {
3667 {"sphinx_total", (char *)sphinx_showfunc_total, SHOW_FUNC},
3668 {"sphinx_total_found", (char *)sphinx_showfunc_total_found, SHOW_FUNC},
3669 {"sphinx_time", (char *)sphinx_showfunc_time, SHOW_FUNC},
3670 {"sphinx_word_count", (char *)sphinx_showfunc_word_count, SHOW_FUNC},
3671 {"sphinx_words", (char *)sphinx_showfunc_words, SHOW_FUNC},
3672 {"sphinx_error", (char *)sphinx_showfunc_error, SHOW_FUNC},
3673 {0, 0, (enum_mysql_show_type)0}
3674 };
3675
3676
3677 mysql_declare_plugin(sphinx)
3678 {
3679 MYSQL_STORAGE_ENGINE_PLUGIN,
3680 &sphinx_storage_engine,
3681 sphinx_hton_name,
3682 "Sphinx developers",
3683 sphinx_hton_comment,
3684 PLUGIN_LICENSE_GPL,
3685 sphinx_init_func, // Plugin Init
3686 sphinx_done_func, // Plugin Deinit
3687 0x0001, // 0.1
3688 sphinx_status_vars,
3689 NULL,
3690 NULL
3691 }
3692 mysql_declare_plugin_end;
3693
3694 #endif // >50100
3695
3696 //
3697 // $Id$
3698 //
3699