1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 using System.Collections.Generic;
19 using Lucene.Net.Support;
20 using Directory = Lucene.Net.Store.Directory;
21 
22 namespace Lucene.Net.Index
23 {
24 
25 	/// <summary>A <see cref="MergeScheduler" /> that runs each merge using a
26 	/// separate thread, up until a maximum number of threads
27 	/// (<see cref="MaxThreadCount" />) at which when a merge is
28 	/// needed, the thread(s) that are updating the index will
29 	/// pause until one or more merges completes.  This is a
30 	/// simple way to use concurrency in the indexing process
31 	/// without having to create and manage application level
32 	/// threads.
33 	/// </summary>
34 
35 	public class ConcurrentMergeScheduler:MergeScheduler
36 	{
37 
38 		private int mergeThreadPriority = - 1;
39 
40         protected internal IList<MergeThread> mergeThreads = new List<MergeThread>();
41 
42 		// Max number of threads allowed to be merging at once
43 		private int _maxThreadCount = 1;
44 
45 		protected internal Directory dir;
46 
47 		private bool closed;
48 		protected internal IndexWriter writer;
49 		protected internal int mergeThreadCount;
50 
ConcurrentMergeScheduler()51 		public ConcurrentMergeScheduler()
52 		{
53 			if (allInstances != null)
54 			{
55 				// Only for testing
56 				AddMyself();
57 			}
58 		}
59 
60 	    /// <summary>Gets or sets the max # simultaneous threads that may be
61 	    /// running.  If a merge is necessary yet we already have
62 	    /// this many threads running, the incoming thread (that
63 	    /// is calling add/updateDocument) will block until
64 	    /// a merge thread has completed.
65 	    /// </summary>
66 	    public virtual int MaxThreadCount
67 	    {
68 	        set
69 	        {
70 	            if (value < 1)
71 	                throw new System.ArgumentException("count should be at least 1");
72 	            _maxThreadCount = value;
73 	        }
74 	        get { return _maxThreadCount; }
75         }
76 
77 	    /// <summary>Return the priority that merge threads run at.  By
78 		/// default the priority is 1 plus the priority of (ie,
79 		/// slightly higher priority than) the first thread that
80 		/// calls merge.
81 		/// </summary>
82         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate")]
GetMergeThreadPriority()83         public virtual int GetMergeThreadPriority()
84 		{
85 			lock (this)
86 			{
87 				InitMergeThreadPriority();
88 				return mergeThreadPriority;
89 			}
90 		}
91 
92 		/// <summary>Set the priority that merge threads run at. </summary>
SetMergeThreadPriority(int pri)93 		public virtual void  SetMergeThreadPriority(int pri)
94 		{
95 			lock (this)
96 			{
97 				if (pri > (int) System.Threading.ThreadPriority.Highest || pri < (int) System.Threading.ThreadPriority.Lowest)
98 					throw new System.ArgumentException("priority must be in range " + (int) System.Threading.ThreadPriority.Lowest + " .. " + (int) System.Threading.ThreadPriority.Highest + " inclusive");
99 				mergeThreadPriority = pri;
100 
101 				int numThreads = MergeThreadCount();
102 				for (int i = 0; i < numThreads; i++)
103 				{
104 					MergeThread merge = mergeThreads[i];
105 					merge.SetThreadPriority(pri);
106 				}
107 			}
108 		}
109 
Verbose()110 		private bool Verbose()
111 		{
112 			return writer != null && writer.Verbose;
113 		}
114 
Message(System.String message)115 		private void  Message(System.String message)
116 		{
117 			if (Verbose())
118 				writer.Message("CMS: " + message);
119 		}
120 
InitMergeThreadPriority()121 		private void  InitMergeThreadPriority()
122 		{
123 			lock (this)
124 			{
125 				if (mergeThreadPriority == - 1)
126 				{
127 					// Default to slightly higher priority than our
128 					// calling thread
129 					mergeThreadPriority = 1 + (System.Int32) ThreadClass.Current().Priority;
130 					if (mergeThreadPriority > (int) System.Threading.ThreadPriority.Highest)
131 						mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest;
132 				}
133 			}
134 		}
135 
Dispose(bool disposing)136         protected override void Dispose(bool disposing)
137         {
138             //if (disposing)
139             //{
140                 closed = true;
141             //}
142         }
143 
Sync()144 		public virtual void  Sync()
145 		{
146 			lock (this)
147 			{
148 				while (MergeThreadCount() > 0)
149 				{
150 					if (Verbose())
151 						Message("now wait for threads; currently " + mergeThreads.Count + " still running");
152 					int count = mergeThreads.Count;
153 					if (Verbose())
154 					{
155 						for (int i = 0; i < count; i++)
156 							Message("    " + i + ": " + mergeThreads[i]);
157 					}
158 
159 					System.Threading.Monitor.Wait(this);
160 
161 				}
162 			}
163 		}
164 
MergeThreadCount()165 		private int MergeThreadCount()
166 		{
167             lock (this)
168             {
169                 int count = 0;
170                 int numThreads = mergeThreads.Count;
171                 for (int i = 0; i < numThreads; i++)
172                 {
173                     if (mergeThreads[i].IsAlive)
174                     {
175                             count++;
176                     }
177                 }
178                 return count;
179             }
180         }
181 
Merge(IndexWriter writer)182 		public override void  Merge(IndexWriter writer)
183 		{
184 			// TODO: .NET doesn't support this
185 			// assert !Thread.holdsLock(writer);
186 
187 			this.writer = writer;
188 
189 			InitMergeThreadPriority();
190 
191 			dir = writer.Directory;
192 
193 			// First, quickly run through the newly proposed merges
194 			// and add any orthogonal merges (ie a merge not
195 			// involving segments already pending to be merged) to
196 			// the queue.  If we are way behind on merging, many of
197 			// these newly proposed merges will likely already be
198 			// registered.
199 
200 			if (Verbose())
201 			{
202 				Message("now merge");
203 				Message("  index: " + writer.SegString());
204 			}
205 
206 			// Iterate, pulling from the IndexWriter's queue of
207 			// pending merges, until it's empty:
208 			while (true)
209 			{
210 				// TODO: we could be careful about which merges to do in
211 				// the BG (eg maybe the "biggest" ones) vs FG, which
212 				// merges to do first (the easiest ones?), etc.
213 
214 				MergePolicy.OneMerge merge = writer.GetNextMerge();
215 				if (merge == null)
216 				{
217 					if (Verbose())
218 						Message("  no more merges pending; now return");
219 					return ;
220 				}
221 
222 				// We do this w/ the primary thread to keep
223 				// deterministic assignment of segment names
224 				writer.MergeInit(merge);
225 
226 				bool success = false;
227 				try
228 				{
229 					lock (this)
230 					{
231 						while (MergeThreadCount() >= _maxThreadCount)
232 						{
233 							if (Verbose())
234 								Message("    too many merge threads running; stalling...");
235 
236                             System.Threading.Monitor.Wait(this);
237 
238 
239 						}
240 
241 						if (Verbose())
242 							Message("  consider merge " + merge.SegString(dir));
243 
244 					    System.Diagnostics.Debug.Assert(MergeThreadCount() < _maxThreadCount);
245 
246 						// OK to spawn a new merge thread to handle this
247 						// merge:
248 						MergeThread merger = GetMergeThread(writer, merge);
249 						mergeThreads.Add(merger);
250 						if (Verbose())
251 							Message("    launch new thread [" + merger.Name + "]");
252 
253 						merger.Start();
254 						success = true;
255 					}
256 				}
257 				finally
258 				{
259 					if (!success)
260 					{
261 						writer.MergeFinish(merge);
262 					}
263 				}
264 			}
265 		}
266 
267 		/// <summary>Does the actual merge, by calling <see cref="IndexWriter.Merge" /> </summary>
DoMerge(MergePolicy.OneMerge merge)268 		protected internal virtual void  DoMerge(MergePolicy.OneMerge merge)
269 		{
270 			writer.Merge(merge);
271 		}
272 
273 		/// <summary>Create and return a new MergeThread </summary>
GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)274 		protected internal virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)
275 		{
276 			lock (this)
277 			{
278 				var thread = new MergeThread(this, writer, merge);
279 				thread.SetThreadPriority(mergeThreadPriority);
280 				thread.IsBackground = true;
281 				thread.Name = "Lucene Merge Thread #" + mergeThreadCount++;
282 				return thread;
283 			}
284 		}
285 
286 		public /*protected internal*/ class MergeThread:ThreadClass
287 		{
InitBlock(ConcurrentMergeScheduler enclosingInstance)288 			private void  InitBlock(ConcurrentMergeScheduler enclosingInstance)
289 			{
290 				this.enclosingInstance = enclosingInstance;
291 			}
292 			private ConcurrentMergeScheduler enclosingInstance;
293 			public ConcurrentMergeScheduler Enclosing_Instance
294 			{
295 				get
296 				{
297 					return enclosingInstance;
298 				}
299 
300 			}
301 
302 			internal IndexWriter writer;
303 			internal MergePolicy.OneMerge startMerge;
304 			internal MergePolicy.OneMerge runningMerge;
305 
MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)306 			public MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)
307 			{
308 				InitBlock(enclosingInstance);
309 				this.writer = writer;
310 				this.startMerge = startMerge;
311 			}
312 
SetRunningMerge(MergePolicy.OneMerge merge)313 			public virtual void  SetRunningMerge(MergePolicy.OneMerge merge)
314 			{
315 				lock (this)
316 				{
317 					runningMerge = merge;
318 				}
319 			}
320 
321 		    public virtual MergePolicy.OneMerge RunningMerge
322 		    {
323 		        get
324 		        {
325 		            lock (this)
326 		            {
327 		                return runningMerge;
328 		            }
329 		        }
330 		    }
331 
SetThreadPriority(int pri)332 		    public virtual void  SetThreadPriority(int pri)
333 			{
334 				try
335 				{
336 					Priority = (System.Threading.ThreadPriority) pri;
337 				}
338 				catch (System.NullReferenceException)
339 				{
340 					// Strangely, Sun's JDK 1.5 on Linux sometimes
341 					// throws NPE out of here...
342 				}
343 				catch (System.Security.SecurityException)
344 				{
345 					// Ignore this because we will still run fine with
346 					// normal thread priority
347 				}
348 			}
349 
Run()350 			override public void  Run()
351 			{
352 
353 				// First time through the while loop we do the merge
354 				// that we were started with:
355 				MergePolicy.OneMerge merge = this.startMerge;
356 
357 				try
358 				{
359 
360 					if (Enclosing_Instance.Verbose())
361 						Enclosing_Instance.Message("  merge thread: start");
362 
363 					while (true)
364 					{
365 						SetRunningMerge(merge);
366 						Enclosing_Instance.DoMerge(merge);
367 
368 						// Subsequent times through the loop we do any new
369 						// merge that writer says is necessary:
370 						merge = writer.GetNextMerge();
371 						if (merge != null)
372 						{
373 							writer.MergeInit(merge);
374 							if (Enclosing_Instance.Verbose())
375 								Enclosing_Instance.Message("  merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir));
376 						}
377 						else
378 							break;
379 					}
380 
381 					if (Enclosing_Instance.Verbose())
382 						Enclosing_Instance.Message("  merge thread: done");
383 				}
384 				catch (System.Exception exc)
385 				{
386 					// Ignore the exception if it was due to abort:
387 					if (!(exc is MergePolicy.MergeAbortedException))
388 					{
389 						if (!Enclosing_Instance.suppressExceptions)
390 						{
391 							// suppressExceptions is normally only set during
392 							// testing.
393 							Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions = true;
394 							Enclosing_Instance.HandleMergeException(exc);
395 						}
396 					}
397 				}
398 				finally
399 				{
400 					lock (Enclosing_Instance)
401 					{
402 						System.Threading.Monitor.PulseAll(Enclosing_Instance);
403 						Enclosing_Instance.mergeThreads.Remove(this);
404                         bool removed = !Enclosing_Instance.mergeThreads.Contains(this);
405 						System.Diagnostics.Debug.Assert(removed);
406 					}
407 				}
408 			}
409 
ToString()410 			public override System.String ToString()
411 			{
412 				MergePolicy.OneMerge merge = RunningMerge ?? startMerge;
413 				return "merge thread: " + merge.SegString(Enclosing_Instance.dir);
414 			}
415 		}
416 
417 		/// <summary>Called when an exception is hit in a background merge
418 		/// thread
419 		/// </summary>
HandleMergeException(System.Exception exc)420 		protected internal virtual void  HandleMergeException(System.Exception exc)
421 		{
422 			// When an exception is hit during merge, IndexWriter
423 			// removes any partial files and then allows another
424 			// merge to run.  If whatever caused the error is not
425 			// transient then the exception will keep happening,
426 			// so, we sleep here to avoid saturating CPU in such
427 			// cases:
428 			System.Threading.Thread.Sleep(new System.TimeSpan((System.Int64) 10000 * 250));
429 
430             throw new MergePolicy.MergeException(exc, dir);
431 		}
432 
433 		internal static bool anyExceptions = false;
434 
435 		/// <summary>Used for testing </summary>
AnyUnhandledExceptions()436 		public static bool AnyUnhandledExceptions()
437 		{
438 			if (allInstances == null)
439 			{
440 				throw new System.SystemException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
441 			}
442 			lock (allInstances)
443 			{
444 				int count = allInstances.Count;
445 				// Make sure all outstanding threads are done so we see
446 				// any exceptions they may produce:
447 				for (int i = 0; i < count; i++)
448 				    allInstances[i].Sync();
449 				bool v = anyExceptions;
450 				anyExceptions = false;
451 				return v;
452 			}
453 		}
454 
ClearUnhandledExceptions()455 		public static void  ClearUnhandledExceptions()
456 		{
457 			lock (allInstances)
458 			{
459 				anyExceptions = false;
460 			}
461 		}
462 
463 		/// <summary>Used for testing </summary>
AddMyself()464 		private void  AddMyself()
465 		{
466 			lock (allInstances)
467 			{
468 				int size = allInstances.Count;
469 				int upto = 0;
470 				for (int i = 0; i < size; i++)
471 				{
472 					ConcurrentMergeScheduler other = allInstances[i];
473 					if (!(other.closed && 0 == other.MergeThreadCount()))
474 					// Keep this one for now: it still has threads or
475 					// may spawn new threads
476 						allInstances[upto++] = other;
477 				}
478 			    allInstances.RemoveRange(upto, allInstances.Count - upto);
479 				allInstances.Add(this);
480 			}
481 		}
482 
483 		private bool suppressExceptions;
484 
485 		/// <summary>Used for testing </summary>
SetSuppressExceptions()486 		public /*internal*/ virtual void  SetSuppressExceptions()
487 		{
488 			suppressExceptions = true;
489 		}
490 
491 		/// <summary>Used for testing </summary>
ClearSuppressExceptions()492 		public /*internal*/ virtual void  ClearSuppressExceptions()
493 		{
494 			suppressExceptions = false;
495 		}
496 
497 		/// <summary>Used for testing </summary>
498 		private static List<ConcurrentMergeScheduler> allInstances;
SetTestMode()499 		public static void  SetTestMode()
500 		{
501 			allInstances = new List<ConcurrentMergeScheduler>();
502 		}
503 	}
504 }