1 //
2 // $Id: testrt.cpp 4113 2013-08-26 07:43:28Z deogar $
3 //
4 
5 //
6 // Copyright (c) 2001-2013, Andrew Aksyonoff
7 // Copyright (c) 2008-2013, Sphinx Technologies Inc
8 // All rights reserved
9 //
10 // This program is free software; you can redistribute it and/or modify
11 // it under the terms of the GNU General Public License. You should have
12 // received a copy of the GPL license along with this program; if you
13 // did not, you can find it at http://www.gnu.org/
14 //
15 
16 #include "sphinx.h"
17 #include "sphinxrt.h"
18 #include "sphinxutils.h"
19 
20 #if USE_WINDOWS
21 #include "psapi.h"
22 #pragma comment(linker, "/defaultlib:psapi.lib")
23 #pragma message("Automatically linking with psapi.lib")
24 #endif
25 
26 const int	COMMIT_STEP = 1;
27 float		g_fTotalMB = 0.0f;
28 
SetupIndexing(CSphSource_MySQL * pSrc,const CSphSourceParams_MySQL & tParams)29 void SetupIndexing ( CSphSource_MySQL * pSrc, const CSphSourceParams_MySQL & tParams )
30 {
31 	CSphString sError;
32 	if ( !pSrc->Setup ( tParams ) )
33 		sphDie ( "setup failed" );
34 	if ( !pSrc->Connect ( sError ) )
35 		sphDie ( "connect failed: %s", sError.cstr() );
36 	if ( !pSrc->IterateStart ( sError ) )
37 		sphDie ( "iterate-start failed: %s", sError.cstr() );
38 }
39 
40 
DoSearch(CSphIndex * pIndex)41 void DoSearch ( CSphIndex * pIndex )
42 {
43 	printf ( "---\nsearching... " );
44 
45 	CSphQuery tQuery;
46 	CSphQueryResult tResult;
47 	tQuery.m_sQuery = "@title cat";
48 
49 	ISphMatchSorter * pSorter = sphCreateQueue ( &tQuery, pIndex->GetMatchSchema(), tResult.m_sError, false );
50 	if ( !pSorter )
51 	{
52 		printf ( "failed to create sorter; error=%s", tResult.m_sError.cstr() );
53 
54 	} else if ( !pIndex->MultiQuery ( &tQuery, &tResult, 1, &pSorter, NULL ) )
55 	{
56 		printf ( "query failed; error=%s", pIndex->GetLastError().cstr() );
57 
58 	} else
59 	{
60 		sphFlattenQueue ( pSorter, &tResult, 0 );
61 		printf ( "%d results found in %d.%03d sec!\n", tResult.m_dMatches.GetLength(), tResult.m_iQueryTime/1000, tResult.m_iQueryTime%1000 );
62 		ARRAY_FOREACH ( i, tResult.m_dMatches )
63 			printf ( "%d. id=" DOCID_FMT ", weight=%d\n", 1+i, tResult.m_dMatches[i].m_iDocID, tResult.m_dMatches[i].m_iWeight );
64 	}
65 
66 	SafeDelete ( pSorter );
67 	printf ( "---\n" );
68 }
69 
70 
DoIndexing(CSphSource * pSrc,ISphRtIndex * pIndex)71 void DoIndexing ( CSphSource * pSrc, ISphRtIndex * pIndex )
72 {
73 	CSphString sError;
74 	CSphVector<DWORD> dMvas;
75 
76 	int64_t tmStart = sphMicroTimer ();
77 	int64_t tmAvgCommit = 0;
78 	int64_t tmMaxCommit = 0;
79 	int iCommits = 0;
80 	for ( ;; )
81 	{
82 		if ( !pSrc->IterateDocument ( sError ) )
83 			sphDie ( "iterate-document failed: %s", sError.cstr() );
84 		ISphHits * pHitsNext = pSrc->IterateHits ( sError );
85 		if ( !sError.IsEmpty() )
86 			sphDie ( "iterate-hits failed: %s", sError.cstr() );
87 
88 		if ( pSrc->m_tDocInfo.m_iDocID )
89 			pIndex->AddDocument ( pHitsNext, pSrc->m_tDocInfo, NULL, dMvas, sError );
90 
91 		if ( ( pSrc->GetStats().m_iTotalDocuments % COMMIT_STEP )==0 || !pSrc->m_tDocInfo.m_iDocID )
92 		{
93 			int64_t tmCommit = sphMicroTimer();
94 			pIndex->Commit ();
95 			tmCommit = sphMicroTimer()-tmCommit;
96 
97 			iCommits++;
98 			tmAvgCommit += tmCommit;
99 			tmMaxCommit = Max ( tmMaxCommit, tmCommit );
100 
101 			if ( !pSrc->m_tDocInfo.m_iDocID )
102 			{
103 				tmAvgCommit /= iCommits;
104 				break;
105 			}
106 		}
107 
108 		if (!( pSrc->GetStats().m_iTotalDocuments % 100 ))
109 			printf ( "%d docs\r", (int)pSrc->GetStats().m_iTotalDocuments );
110 
111 		static bool bOnce = true;
112 		if ( iCommits*COMMIT_STEP>=5000 && bOnce )
113 		{
114 			printf ( "\n" );
115 			DoSearch ( pIndex );
116 			bOnce = false;
117 		}
118 	}
119 
120 	pSrc->Disconnect();
121 
122 	int64_t tmEnd = sphMicroTimer ();
123 	float fTotalMB = (float)pSrc->GetStats().m_iTotalBytes/1000000.0f;
124 	printf ( "commit-step %d, %d docs, %d bytes, %d.%03d sec, %.2f MB/sec\n",
125 		COMMIT_STEP,
126 		(int)pSrc->GetStats().m_iTotalDocuments,
127 		(int)pSrc->GetStats().m_iTotalBytes,
128 		(int)((tmEnd-tmStart)/1000000), (int)(((tmEnd-tmStart)%1000000)/1000),
129 		fTotalMB*1000000.0f/(tmEnd-tmStart) );
130 	printf ( "commit-docs %d, avg %d.%03d msec, max %d.%03d msec\n", COMMIT_STEP,
131 		(int)(tmAvgCommit/1000), (int)(tmAvgCommit%1000),
132 		(int)(tmMaxCommit/1000), (int)(tmMaxCommit%1000) );
133 	g_fTotalMB += fTotalMB;
134 }
135 
136 
SpawnSource(const char * sQuery,ISphTokenizer * pTok,CSphDict * pDict)137 CSphSource * SpawnSource ( const char * sQuery, ISphTokenizer * pTok, CSphDict * pDict )
138 {
139 	CSphSource_MySQL * pSrc = new CSphSource_MySQL ( "test" );
140 	pSrc->SetTokenizer ( pTok );
141 	pSrc->SetDict ( pDict );
142 
143 	CSphSourceParams_MySQL tParams;
144 	tParams.m_sHost = "localhost";
145 	tParams.m_sUser = "root";
146 	tParams.m_sDB = "lj";
147 	tParams.m_dQueryPre.Add ( "SET NAMES utf8" );
148 	tParams.m_sQuery = sQuery;
149 
150 	CSphColumnInfo tCol;
151 	tCol.m_eAttrType = SPH_ATTR_INTEGER;
152 	tCol.m_sName = "channel_id";
153 	tParams.m_dAttrs.Add ( tCol );
154 	tCol.m_eAttrType = SPH_ATTR_TIMESTAMP;
155 	tCol.m_sName = "published";
156 	tParams.m_dAttrs.Add ( tCol );
157 
158 	SetupIndexing ( pSrc, tParams );
159 	return pSrc;
160 }
161 
162 
163 static ISphRtIndex * g_pIndex = NULL;
164 
165 
IndexingThread(void * pArg)166 void IndexingThread ( void * pArg )
167 {
168 	CSphSource * pSrc = (CSphSource *) pArg;
169 	DoIndexing ( pSrc, g_pIndex );
170 }
171 
172 
main()173 int main ()
174 {
175 	// threads should be initialized before memory allocations
176 	char cTopOfMainStack;
177 	sphThreadInit();
178 	MemorizeStack ( &cTopOfMainStack );
179 
180 	CSphString sError;
181 	CSphDictSettings tDictSettings;
182 
183 	ISphTokenizer * pTok = sphCreateUTF8Tokenizer();
184 	CSphDict * pDict = sphCreateDictionaryCRC ( tDictSettings, pTok, sError, "rt1" );
185 	CSphSource * pSrc = SpawnSource ( "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM posting WHERE id<=10000 AND id%2=0", pTok, pDict );
186 
187 	ISphTokenizer * pTok2 = sphCreateUTF8Tokenizer();
188 	CSphDict * pDict2 = sphCreateDictionaryCRC ( tDictSettings, pTok, sError, "rt2" );
189 	CSphSource * pSrc2 = SpawnSource ( "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM posting WHERE id<=10000 AND id%2=1", pTok2, pDict2 );
190 
191 	CSphSchema tSrcSchema;
192 	if ( !pSrc->UpdateSchema ( &tSrcSchema, sError ) )
193 		sphDie ( "update-schema failed: %s", sError.cstr() );
194 
195 	CSphSchema tSchema; // source schema must be all dynamic attrs; but index ones must be static
196 	tSchema.m_dFields = tSrcSchema.m_dFields;
197 	for ( int i=0; i<tSrcSchema.GetAttrsCount(); i++ )
198 		tSchema.AddAttr ( tSrcSchema.GetAttr(i), false );
199 
200 	CSphConfigSection tRTConfig;
201 	sphRTInit ( tRTConfig, true );
202 	sphRTConfigure ( tRTConfig, true );
203 	SmallStringHash_T< CSphIndex * > dTemp;
204 	sphReplayBinlog ( dTemp, 0 );
205 	ISphRtIndex * pIndex = sphCreateIndexRT ( tSchema, "testrt", 32*1024*1024, "data/dump", false );
206 	pIndex->SetTokenizer ( pTok ); // index will own this pair from now on
207 	pIndex->SetDictionary ( pDict );
208 	if ( !pIndex->Prealloc ( false, false, sError ) )
209 		sphDie ( "prealloc failed: %s", pIndex->GetLastError().cstr() );
210 	g_pIndex = pIndex;
211 
212 	// initial indexing
213 	int64_t tmStart = sphMicroTimer();
214 
215 	SphThread_t t1, t2;
216 	sphThreadCreate ( &t1, IndexingThread, pSrc );
217 	sphThreadCreate ( &t2, IndexingThread, pSrc2 );
218 	sphThreadJoin ( &t1 );
219 	sphThreadJoin ( &t2 );
220 
221 #if 0
222 	// update
223 	tParams.m_sQuery = "SELECT id, channel_id, UNIX_TIMESTAMP(published) published, title, UNCOMPRESS(content) content FROM rt2 WHERE id<=10000";
224 	SetupIndexing ( pSrc, tParams );
225 	DoIndexing ( pSrc, pIndex );
226 #endif
227 
228 	// search
229 	DoSearch ( pIndex );
230 
231 	// shutdown index (should cause dump)
232 	int64_t tmShutdown = sphMicroTimer();
233 
234 #if SPH_ALLOCS_PROFILER
235 	printf ( "pre-shutdown allocs=%d, bytes="INT64_FMT"\n", sphAllocsCount(), sphAllocBytes() );
236 #endif
237 	SafeDelete ( pIndex );
238 #if SPH_ALLOCS_PROFILER
239 	printf ( "post-shutdown allocs=%d, bytes="INT64_FMT"\n", sphAllocsCount(), sphAllocBytes() );
240 #endif
241 
242 	int64_t tmEnd = sphMicroTimer();
243 	printf ( "shutdown done in %d.%03d sec\n", (int)((tmEnd-tmShutdown)/1000000), (int)(((tmEnd-tmShutdown)%1000000)/1000) );
244 	printf ( "total with shutdown %d.%03d sec, %.2f MB/sec\n",
245 		(int)((tmEnd-tmStart)/1000000), (int)(((tmEnd-tmStart)%1000000)/1000),
246 		g_fTotalMB*1000000.0f/(tmEnd-tmStart) );
247 
248 #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
249 	sphAllocsStats();
250 #endif
251 #if USE_WINDOWS
252 	PROCESS_MEMORY_COUNTERS pmc;
253 	HANDLE hProcess = OpenProcess ( PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, FALSE, GetCurrentProcessId() );
254 	if ( hProcess && GetProcessMemoryInfo ( hProcess, &pmc, sizeof(pmc)) )
255 	{
256 		printf ( "--- peak-wss=%d, peak-pagefile=%d\n", (int)pmc.PeakWorkingSetSize, (int)pmc.PeakPagefileUsage );
257 	}
258 #endif
259 
260 	SafeDelete ( pIndex );
261 	sphRTDone ();
262 }
263 
264 //
265 // $Id: testrt.cpp 4113 2013-08-26 07:43:28Z deogar $
266 //
267