1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 using System.Collections.Generic; 19 using Lucene.Net.Support; 20 using Directory = Lucene.Net.Store.Directory; 21 22 namespace Lucene.Net.Index 23 { 24 25 /// <summary>A <see cref="MergeScheduler" /> that runs each merge using a 26 /// separate thread, up until a maximum number of threads 27 /// (<see cref="MaxThreadCount" />) at which when a merge is 28 /// needed, the thread(s) that are updating the index will 29 /// pause until one or more merges completes. This is a 30 /// simple way to use concurrency in the indexing process 31 /// without having to create and manage application level 32 /// threads. 33 /// </summary> 34 35 public class ConcurrentMergeScheduler:MergeScheduler 36 { 37 38 private int mergeThreadPriority = - 1; 39 40 protected internal IList<MergeThread> mergeThreads = new List<MergeThread>(); 41 42 // Max number of threads allowed to be merging at once 43 private int _maxThreadCount = 1; 44 45 protected internal Directory dir; 46 47 private bool closed; 48 protected internal IndexWriter writer; 49 protected internal int mergeThreadCount; 50 ConcurrentMergeScheduler()51 public ConcurrentMergeScheduler() 52 { 53 if (allInstances != null) 54 { 55 // Only for testing 56 AddMyself(); 57 } 58 } 59 60 /// <summary>Gets or sets the max # simultaneous threads that may be 61 /// running. If a merge is necessary yet we already have 62 /// this many threads running, the incoming thread (that 63 /// is calling add/updateDocument) will block until 64 /// a merge thread has completed. 65 /// </summary> 66 public virtual int MaxThreadCount 67 { 68 set 69 { 70 if (value < 1) 71 throw new System.ArgumentException("count should be at least 1"); 72 _maxThreadCount = value; 73 } 74 get { return _maxThreadCount; } 75 } 76 77 /// <summary>Return the priority that merge threads run at. By 78 /// default the priority is 1 plus the priority of (ie, 79 /// slightly higher priority than) the first thread that 80 /// calls merge. 81 /// </summary> 82 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate")] GetMergeThreadPriority()83 public virtual int GetMergeThreadPriority() 84 { 85 lock (this) 86 { 87 InitMergeThreadPriority(); 88 return mergeThreadPriority; 89 } 90 } 91 92 /// <summary>Set the priority that merge threads run at. </summary> SetMergeThreadPriority(int pri)93 public virtual void SetMergeThreadPriority(int pri) 94 { 95 lock (this) 96 { 97 if (pri > (int) System.Threading.ThreadPriority.Highest || pri < (int) System.Threading.ThreadPriority.Lowest) 98 throw new System.ArgumentException("priority must be in range " + (int) System.Threading.ThreadPriority.Lowest + " .. " + (int) System.Threading.ThreadPriority.Highest + " inclusive"); 99 mergeThreadPriority = pri; 100 101 int numThreads = MergeThreadCount(); 102 for (int i = 0; i < numThreads; i++) 103 { 104 MergeThread merge = mergeThreads[i]; 105 merge.SetThreadPriority(pri); 106 } 107 } 108 } 109 Verbose()110 private bool Verbose() 111 { 112 return writer != null && writer.Verbose; 113 } 114 Message(System.String message)115 private void Message(System.String message) 116 { 117 if (Verbose()) 118 writer.Message("CMS: " + message); 119 } 120 InitMergeThreadPriority()121 private void InitMergeThreadPriority() 122 { 123 lock (this) 124 { 125 if (mergeThreadPriority == - 1) 126 { 127 // Default to slightly higher priority than our 128 // calling thread 129 mergeThreadPriority = 1 + (System.Int32) ThreadClass.Current().Priority; 130 if (mergeThreadPriority > (int) System.Threading.ThreadPriority.Highest) 131 mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest; 132 } 133 } 134 } 135 Dispose(bool disposing)136 protected override void Dispose(bool disposing) 137 { 138 //if (disposing) 139 //{ 140 closed = true; 141 //} 142 } 143 Sync()144 public virtual void Sync() 145 { 146 lock (this) 147 { 148 while (MergeThreadCount() > 0) 149 { 150 if (Verbose()) 151 Message("now wait for threads; currently " + mergeThreads.Count + " still running"); 152 int count = mergeThreads.Count; 153 if (Verbose()) 154 { 155 for (int i = 0; i < count; i++) 156 Message(" " + i + ": " + mergeThreads[i]); 157 } 158 159 System.Threading.Monitor.Wait(this); 160 161 } 162 } 163 } 164 MergeThreadCount()165 private int MergeThreadCount() 166 { 167 lock (this) 168 { 169 int count = 0; 170 int numThreads = mergeThreads.Count; 171 for (int i = 0; i < numThreads; i++) 172 { 173 if (mergeThreads[i].IsAlive) 174 { 175 count++; 176 } 177 } 178 return count; 179 } 180 } 181 Merge(IndexWriter writer)182 public override void Merge(IndexWriter writer) 183 { 184 // TODO: .NET doesn't support this 185 // assert !Thread.holdsLock(writer); 186 187 this.writer = writer; 188 189 InitMergeThreadPriority(); 190 191 dir = writer.Directory; 192 193 // First, quickly run through the newly proposed merges 194 // and add any orthogonal merges (ie a merge not 195 // involving segments already pending to be merged) to 196 // the queue. If we are way behind on merging, many of 197 // these newly proposed merges will likely already be 198 // registered. 199 200 if (Verbose()) 201 { 202 Message("now merge"); 203 Message(" index: " + writer.SegString()); 204 } 205 206 // Iterate, pulling from the IndexWriter's queue of 207 // pending merges, until it's empty: 208 while (true) 209 { 210 // TODO: we could be careful about which merges to do in 211 // the BG (eg maybe the "biggest" ones) vs FG, which 212 // merges to do first (the easiest ones?), etc. 213 214 MergePolicy.OneMerge merge = writer.GetNextMerge(); 215 if (merge == null) 216 { 217 if (Verbose()) 218 Message(" no more merges pending; now return"); 219 return ; 220 } 221 222 // We do this w/ the primary thread to keep 223 // deterministic assignment of segment names 224 writer.MergeInit(merge); 225 226 bool success = false; 227 try 228 { 229 lock (this) 230 { 231 while (MergeThreadCount() >= _maxThreadCount) 232 { 233 if (Verbose()) 234 Message(" too many merge threads running; stalling..."); 235 236 System.Threading.Monitor.Wait(this); 237 238 239 } 240 241 if (Verbose()) 242 Message(" consider merge " + merge.SegString(dir)); 243 244 System.Diagnostics.Debug.Assert(MergeThreadCount() < _maxThreadCount); 245 246 // OK to spawn a new merge thread to handle this 247 // merge: 248 MergeThread merger = GetMergeThread(writer, merge); 249 mergeThreads.Add(merger); 250 if (Verbose()) 251 Message(" launch new thread [" + merger.Name + "]"); 252 253 merger.Start(); 254 success = true; 255 } 256 } 257 finally 258 { 259 if (!success) 260 { 261 writer.MergeFinish(merge); 262 } 263 } 264 } 265 } 266 267 /// <summary>Does the actual merge, by calling <see cref="IndexWriter.Merge" /> </summary> DoMerge(MergePolicy.OneMerge merge)268 protected internal virtual void DoMerge(MergePolicy.OneMerge merge) 269 { 270 writer.Merge(merge); 271 } 272 273 /// <summary>Create and return a new MergeThread </summary> GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)274 protected internal virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) 275 { 276 lock (this) 277 { 278 var thread = new MergeThread(this, writer, merge); 279 thread.SetThreadPriority(mergeThreadPriority); 280 thread.IsBackground = true; 281 thread.Name = "Lucene Merge Thread #" + mergeThreadCount++; 282 return thread; 283 } 284 } 285 286 public /*protected internal*/ class MergeThread:ThreadClass 287 { InitBlock(ConcurrentMergeScheduler enclosingInstance)288 private void InitBlock(ConcurrentMergeScheduler enclosingInstance) 289 { 290 this.enclosingInstance = enclosingInstance; 291 } 292 private ConcurrentMergeScheduler enclosingInstance; 293 public ConcurrentMergeScheduler Enclosing_Instance 294 { 295 get 296 { 297 return enclosingInstance; 298 } 299 300 } 301 302 internal IndexWriter writer; 303 internal MergePolicy.OneMerge startMerge; 304 internal MergePolicy.OneMerge runningMerge; 305 MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)306 public MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge) 307 { 308 InitBlock(enclosingInstance); 309 this.writer = writer; 310 this.startMerge = startMerge; 311 } 312 SetRunningMerge(MergePolicy.OneMerge merge)313 public virtual void SetRunningMerge(MergePolicy.OneMerge merge) 314 { 315 lock (this) 316 { 317 runningMerge = merge; 318 } 319 } 320 321 public virtual MergePolicy.OneMerge RunningMerge 322 { 323 get 324 { 325 lock (this) 326 { 327 return runningMerge; 328 } 329 } 330 } 331 SetThreadPriority(int pri)332 public virtual void SetThreadPriority(int pri) 333 { 334 try 335 { 336 Priority = (System.Threading.ThreadPriority) pri; 337 } 338 catch (System.NullReferenceException) 339 { 340 // Strangely, Sun's JDK 1.5 on Linux sometimes 341 // throws NPE out of here... 342 } 343 catch (System.Security.SecurityException) 344 { 345 // Ignore this because we will still run fine with 346 // normal thread priority 347 } 348 } 349 Run()350 override public void Run() 351 { 352 353 // First time through the while loop we do the merge 354 // that we were started with: 355 MergePolicy.OneMerge merge = this.startMerge; 356 357 try 358 { 359 360 if (Enclosing_Instance.Verbose()) 361 Enclosing_Instance.Message(" merge thread: start"); 362 363 while (true) 364 { 365 SetRunningMerge(merge); 366 Enclosing_Instance.DoMerge(merge); 367 368 // Subsequent times through the loop we do any new 369 // merge that writer says is necessary: 370 merge = writer.GetNextMerge(); 371 if (merge != null) 372 { 373 writer.MergeInit(merge); 374 if (Enclosing_Instance.Verbose()) 375 Enclosing_Instance.Message(" merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir)); 376 } 377 else 378 break; 379 } 380 381 if (Enclosing_Instance.Verbose()) 382 Enclosing_Instance.Message(" merge thread: done"); 383 } 384 catch (System.Exception exc) 385 { 386 // Ignore the exception if it was due to abort: 387 if (!(exc is MergePolicy.MergeAbortedException)) 388 { 389 if (!Enclosing_Instance.suppressExceptions) 390 { 391 // suppressExceptions is normally only set during 392 // testing. 393 Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions = true; 394 Enclosing_Instance.HandleMergeException(exc); 395 } 396 } 397 } 398 finally 399 { 400 lock (Enclosing_Instance) 401 { 402 System.Threading.Monitor.PulseAll(Enclosing_Instance); 403 Enclosing_Instance.mergeThreads.Remove(this); 404 bool removed = !Enclosing_Instance.mergeThreads.Contains(this); 405 System.Diagnostics.Debug.Assert(removed); 406 } 407 } 408 } 409 ToString()410 public override System.String ToString() 411 { 412 MergePolicy.OneMerge merge = RunningMerge ?? startMerge; 413 return "merge thread: " + merge.SegString(Enclosing_Instance.dir); 414 } 415 } 416 417 /// <summary>Called when an exception is hit in a background merge 418 /// thread 419 /// </summary> HandleMergeException(System.Exception exc)420 protected internal virtual void HandleMergeException(System.Exception exc) 421 { 422 // When an exception is hit during merge, IndexWriter 423 // removes any partial files and then allows another 424 // merge to run. If whatever caused the error is not 425 // transient then the exception will keep happening, 426 // so, we sleep here to avoid saturating CPU in such 427 // cases: 428 System.Threading.Thread.Sleep(new System.TimeSpan((System.Int64) 10000 * 250)); 429 430 throw new MergePolicy.MergeException(exc, dir); 431 } 432 433 internal static bool anyExceptions = false; 434 435 /// <summary>Used for testing </summary> AnyUnhandledExceptions()436 public static bool AnyUnhandledExceptions() 437 { 438 if (allInstances == null) 439 { 440 throw new System.SystemException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase"); 441 } 442 lock (allInstances) 443 { 444 int count = allInstances.Count; 445 // Make sure all outstanding threads are done so we see 446 // any exceptions they may produce: 447 for (int i = 0; i < count; i++) 448 allInstances[i].Sync(); 449 bool v = anyExceptions; 450 anyExceptions = false; 451 return v; 452 } 453 } 454 ClearUnhandledExceptions()455 public static void ClearUnhandledExceptions() 456 { 457 lock (allInstances) 458 { 459 anyExceptions = false; 460 } 461 } 462 463 /// <summary>Used for testing </summary> AddMyself()464 private void AddMyself() 465 { 466 lock (allInstances) 467 { 468 int size = allInstances.Count; 469 int upto = 0; 470 for (int i = 0; i < size; i++) 471 { 472 ConcurrentMergeScheduler other = allInstances[i]; 473 if (!(other.closed && 0 == other.MergeThreadCount())) 474 // Keep this one for now: it still has threads or 475 // may spawn new threads 476 allInstances[upto++] = other; 477 } 478 allInstances.RemoveRange(upto, allInstances.Count - upto); 479 allInstances.Add(this); 480 } 481 } 482 483 private bool suppressExceptions; 484 485 /// <summary>Used for testing </summary> SetSuppressExceptions()486 public /*internal*/ virtual void SetSuppressExceptions() 487 { 488 suppressExceptions = true; 489 } 490 491 /// <summary>Used for testing </summary> ClearSuppressExceptions()492 public /*internal*/ virtual void ClearSuppressExceptions() 493 { 494 suppressExceptions = false; 495 } 496 497 /// <summary>Used for testing </summary> 498 private static List<ConcurrentMergeScheduler> allInstances; SetTestMode()499 public static void SetTestMode() 500 { 501 allInstances = new List<ConcurrentMergeScheduler>(); 502 } 503 } 504 }