1 //
2 // $Id: testrt.cpp 4113 2013-08-26 07:43:28Z deogar $
3 //
4
5 //
6 // Copyright (c) 2001-2013, Andrew Aksyonoff
7 // Copyright (c) 2008-2013, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15
16 #include "sphinx.h"
17 #include "sphinxrt.h"
18 #include "sphinxutils.h"
19
20 #if USE_WINDOWS
21 #include "psapi.h"
22 #pragma comment(linker, "/defaultlib:psapi.lib")
23 #pragma message("Automatically linking with psapi.lib")
24 #endif
25
26 const int COMMIT_STEP = 1;
27 float g_fTotalMB = 0.0f;
28
SetupIndexing(CSphSource_MySQL * pSrc,const CSphSourceParams_MySQL & tParams)29 void SetupIndexing ( CSphSource_MySQL * pSrc, const CSphSourceParams_MySQL & tParams )
30 {
31 CSphString sError;
32 if ( !pSrc->Setup ( tParams ) )
33 sphDie ( "setup failed" );
34 if ( !pSrc->Connect ( sError ) )
35 sphDie ( "connect failed: %s", sError.cstr() );
36 if ( !pSrc->IterateStart ( sError ) )
37 sphDie ( "iterate-start failed: %s", sError.cstr() );
38 }
39
40
DoSearch(CSphIndex * pIndex)41 void DoSearch ( CSphIndex * pIndex )
42 {
43 printf ( "---\nsearching... " );
44
45 CSphQuery tQuery;
46 CSphQueryResult tResult;
47 tQuery.m_sQuery = "@title cat";
48
49 ISphMatchSorter * pSorter = sphCreateQueue ( &tQuery, pIndex->GetMatchSchema(), tResult.m_sError, false );
50 if ( !pSorter )
51 {
52 printf ( "failed to create sorter; error=%s", tResult.m_sError.cstr() );
53
54 } else if ( !pIndex->MultiQuery ( &tQuery, &tResult, 1, &pSorter, NULL ) )
55 {
56 printf ( "query failed; error=%s", pIndex->GetLastError().cstr() );
57
58 } else
59 {
60 sphFlattenQueue ( pSorter, &tResult, 0 );
61 printf ( "%d results found in %d.%03d sec!\n", tResult.m_dMatches.GetLength(), tResult.m_iQueryTime/1000, tResult.m_iQueryTime%1000 );
62 ARRAY_FOREACH ( i, tResult.m_dMatches )
63 printf ( "%d. id=" DOCID_FMT ", weight=%d\n", 1+i, tResult.m_dMatches[i].m_iDocID, tResult.m_dMatches[i].m_iWeight );
64 }
65
66 SafeDelete ( pSorter );
67 printf ( "---\n" );
68 }
69
70
DoIndexing(CSphSource * pSrc,ISphRtIndex * pIndex)71 void DoIndexing ( CSphSource * pSrc, ISphRtIndex * pIndex )
72 {
73 CSphString sError;
74 CSphVector<DWORD> dMvas;
75
76 int64_t tmStart = sphMicroTimer ();
77 int64_t tmAvgCommit = 0;
78 int64_t tmMaxCommit = 0;
79 int iCommits = 0;
80 for ( ;; )
81 {
82 if ( !pSrc->IterateDocument ( sError ) )
83 sphDie ( "iterate-document failed: %s", sError.cstr() );
84 ISphHits * pHitsNext = pSrc->IterateHits ( sError );
85 if ( !sError.IsEmpty() )
86 sphDie ( "iterate-hits failed: %s", sError.cstr() );
87
88 if ( pSrc->m_tDocInfo.m_iDocID )
89 pIndex->AddDocument ( pHitsNext, pSrc->m_tDocInfo, NULL, dMvas, sError );
90
91 if ( ( pSrc->GetStats().m_iTotalDocuments % COMMIT_STEP )==0 || !pSrc->m_tDocInfo.m_iDocID )
92 {
93 int64_t tmCommit = sphMicroTimer();
94 pIndex->Commit ();
95 tmCommit = sphMicroTimer()-tmCommit;
96
97 iCommits++;
98 tmAvgCommit += tmCommit;
99 tmMaxCommit = Max ( tmMaxCommit, tmCommit );
100
101 if ( !pSrc->m_tDocInfo.m_iDocID )
102 {
103 tmAvgCommit /= iCommits;
104 break;
105 }
106 }
107
108 if (!( pSrc->GetStats().m_iTotalDocuments % 100 ))
109 printf ( "%d docs\r", (int)pSrc->GetStats().m_iTotalDocuments );
110
111 static bool bOnce = true;
112 if ( iCommits*COMMIT_STEP>=5000 && bOnce )
113 {
114 printf ( "\n" );
115 DoSearch ( pIndex );
116 bOnce = false;
117 }
118 }
119
120 pSrc->Disconnect();
121
122 int64_t tmEnd = sphMicroTimer ();
123 float fTotalMB = (float)pSrc->GetStats().m_iTotalBytes/1000000.0f;
124 printf ( "commit-step %d, %d docs, %d bytes, %d.%03d sec, %.2f MB/sec\n",
125 COMMIT_STEP,
126 (int)pSrc->GetStats().m_iTotalDocuments,
127 (int)pSrc->GetStats().m_iTotalBytes,
128 (int)((tmEnd-tmStart)/1000000), (int)(((tmEnd-tmStart)%1000000)/1000),
129 fTotalMB*1000000.0f/(tmEnd-tmStart) );
130 printf ( "commit-docs %d, avg %d.%03d msec, max %d.%03d msec\n", COMMIT_STEP,
131 (int)(tmAvgCommit/1000), (int)(tmAvgCommit%1000),
132 (int)(tmMaxCommit/1000), (int)(tmMaxCommit%1000) );
133 g_fTotalMB += fTotalMB;
134 }
135
136
SpawnSource(const char * sQuery,ISphTokenizer * pTok,CSphDict * pDict)137 CSphSource * SpawnSource ( const char * sQuery, ISphTokenizer * pTok, CSphDict * pDict )
138 {
139 CSphSource_MySQL * pSrc = new CSphSource_MySQL ( "test" );
140 pSrc->SetTokenizer ( pTok );
141 pSrc->SetDict ( pDict );
142
143 CSphSourceParams_MySQL tParams;
144 tParams.m_sHost = "localhost";
145 tParams.m_sUser = "root";
146 tParams.m_sDB = "lj";
147 tParams.m_dQueryPre.Add ( "SET NAMES utf8" );
148 tParams.m_sQuery = sQuery;
149
150 CSphColumnInfo tCol;
151 tCol.m_eAttrType = SPH_ATTR_INTEGER;
152 tCol.m_sName = "channel_id";
153 tParams.m_dAttrs.Add ( tCol );
154 tCol.m_eAttrType = SPH_ATTR_TIMESTAMP;
155 tCol.m_sName = "published";
156 tParams.m_dAttrs.Add ( tCol );
157
158 SetupIndexing ( pSrc, tParams );
159 return pSrc;
160 }
161
162
163 static ISphRtIndex * g_pIndex = NULL;
164
165
IndexingThread(void * pArg)166 void IndexingThread ( void * pArg )
167 {
168 CSphSource * pSrc = (CSphSource *) pArg;
169 DoIndexing ( pSrc, g_pIndex );
170 }
171
172
main()173 int main ()
174 {
175 // threads should be initialized before memory allocations
176 char cTopOfMainStack;
177 sphThreadInit();
178 MemorizeStack ( &cTopOfMainStack );
179
180 CSphString sError;
181 CSphDictSettings tDictSettings;
182
183 ISphTokenizer * pTok = sphCreateUTF8Tokenizer();
184 CSphDict * pDict = sphCreateDictionaryCRC ( tDictSettings, pTok, sError, "rt1" );
185 CSphSource * pSrc = SpawnSource ( "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM posting WHERE id<=10000 AND id%2=0", pTok, pDict );
186
187 ISphTokenizer * pTok2 = sphCreateUTF8Tokenizer();
188 CSphDict * pDict2 = sphCreateDictionaryCRC ( tDictSettings, pTok, sError, "rt2" );
189 CSphSource * pSrc2 = SpawnSource ( "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM posting WHERE id<=10000 AND id%2=1", pTok2, pDict2 );
190
191 CSphSchema tSrcSchema;
192 if ( !pSrc->UpdateSchema ( &tSrcSchema, sError ) )
193 sphDie ( "update-schema failed: %s", sError.cstr() );
194
195 CSphSchema tSchema; // source schema must be all dynamic attrs; but index ones must be static
196 tSchema.m_dFields = tSrcSchema.m_dFields;
197 for ( int i=0; i<tSrcSchema.GetAttrsCount(); i++ )
198 tSchema.AddAttr ( tSrcSchema.GetAttr(i), false );
199
200 CSphConfigSection tRTConfig;
201 sphRTInit ( tRTConfig, true );
202 sphRTConfigure ( tRTConfig, true );
203 SmallStringHash_T< CSphIndex * > dTemp;
204 sphReplayBinlog ( dTemp, 0 );
205 ISphRtIndex * pIndex = sphCreateIndexRT ( tSchema, "testrt", 32*1024*1024, "data/dump", false );
206 pIndex->SetTokenizer ( pTok ); // index will own this pair from now on
207 pIndex->SetDictionary ( pDict );
208 if ( !pIndex->Prealloc ( false, false, sError ) )
209 sphDie ( "prealloc failed: %s", pIndex->GetLastError().cstr() );
210 g_pIndex = pIndex;
211
212 // initial indexing
213 int64_t tmStart = sphMicroTimer();
214
215 SphThread_t t1, t2;
216 sphThreadCreate ( &t1, IndexingThread, pSrc );
217 sphThreadCreate ( &t2, IndexingThread, pSrc2 );
218 sphThreadJoin ( &t1 );
219 sphThreadJoin ( &t2 );
220
221 #if 0
222 // update
223 tParams.m_sQuery = "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM rt2 WHERE id<=10000";
224 SetupIndexing ( pSrc, tParams );
225 DoIndexing ( pSrc, pIndex );
226 #endif
227
228 // search
229 DoSearch ( pIndex );
230
231 // shutdown index (should cause dump)
232 int64_t tmShutdown = sphMicroTimer();
233
234 #if SPH_ALLOCS_PROFILER
235 printf ( "pre-shutdown allocs=%d, bytes="INT64_FMT"\n", sphAllocsCount(), sphAllocBytes() );
236 #endif
237 SafeDelete ( pIndex );
238 #if SPH_ALLOCS_PROFILER
239 printf ( "post-shutdown allocs=%d, bytes="INT64_FMT"\n", sphAllocsCount(), sphAllocBytes() );
240 #endif
241
242 int64_t tmEnd = sphMicroTimer();
243 printf ( "shutdown done in %d.%03d sec\n", (int)((tmEnd-tmShutdown)/1000000), (int)(((tmEnd-tmShutdown)%1000000)/1000) );
244 printf ( "total with shutdown %d.%03d sec, %.2f MB/sec\n",
245 (int)((tmEnd-tmStart)/1000000), (int)(((tmEnd-tmStart)%1000000)/1000),
246 g_fTotalMB*1000000.0f/(tmEnd-tmStart) );
247
248 #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
249 sphAllocsStats();
250 #endif
251 #if USE_WINDOWS
252 PROCESS_MEMORY_COUNTERS pmc;
253 HANDLE hProcess = OpenProcess ( PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, FALSE, GetCurrentProcessId() );
254 if ( hProcess && GetProcessMemoryInfo ( hProcess, &pmc, sizeof(pmc)) )
255 {
256 printf ( "--- peak-wss=%d, peak-pagefile=%d\n", (int)pmc.PeakWorkingSetSize, (int)pmc.PeakPagefileUsage );
257 }
258 #endif
259
260 SafeDelete ( pIndex );
261 sphRTDone ();
262 }
263
264 //
265 // $Id: testrt.cpp 4113 2013-08-26 07:43:28Z deogar $
266 //
267