1 // 2 // Copyright (c) 2013-2017 Carsten Sonne Larsen <cs@innolan.net> 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 using System; 23 using System.Collections; 24 using System.Collections.Generic; 25 using System.Globalization; 26 using System.Linq; 27 using System.Threading; 28 using Ntp.Common.Log; 29 30 namespace Ntp.Common.Process 31 { 32 /// <summary> 33 /// A scheduler performs scheduling of jobs according to job schedule descriptions. 34 /// </summary> 35 public sealed class Scheduler : IScheduler, IDisposable 36 { 37 /// <summary> 38 /// Initializes a new instance of the <see cref="Scheduler" /> class. 39 /// </summary> 40 /// <param name="log">Log.</param> Scheduler(LogBase log)41 public Scheduler(LogBase log) 42 { 43 StartTime = DateTime.Now; 44 firstRun = true; 45 Active = false; 46 47 runningThreads = new List<Thread>(); 48 WaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset); 49 50 schedule = new List<ScheduledJob>(); 51 jobs = new List<Job>(); 52 ActivityLog = LogFactory.CreateActivityLog(); 53 54 LogGroup logGroup = LogFactory.CreateGroupLog(); 55 logGroup.Add(log); 56 logGroup.Add(ActivityLog); 57 58 Log = logGroup; 59 } 60 61 private readonly List<Job> jobs; 62 private readonly List<Thread> runningThreads; 63 private readonly List<ScheduledJob> schedule; 64 private bool firstRun; 65 66 /// <summary> 67 /// Gets the wait handle of this <see cref="Scheduler" />. 68 /// </summary> 69 /// <value>The wait handle.</value> 70 public EventWaitHandle WaitHandle { get; } 71 72 /// <summary> 73 /// Gets the start up time of this <see cref="Scheduler" />. 74 /// </summary> 75 /// <value>The start up time.</value> 76 public DateTime StartTime { get; } 77 78 /// <summary> 79 /// Gets the log used by this <see cref="Scheduler" />. 80 /// </summary> 81 /// <value>The log.</value> 82 public LogBase Log { get; } 83 84 /// <summary> 85 /// Gets the enumerator. 86 /// </summary> 87 /// <returns>The enumerator.</returns> GetEnumerator()88 public IEnumerator<Job> GetEnumerator() 89 { 90 return jobs.GetEnumerator(); 91 } 92 93 /// <summary> 94 /// Gets the enumerator. 95 /// </summary> 96 /// <returns>The enumerator.</returns> IEnumerable.GetEnumerator()97 IEnumerator IEnumerable.GetEnumerator() 98 { 99 return jobs.GetEnumerator(); 100 } 101 102 /// <summary> 103 /// Gets a value indicating whether this <see cref="Scheduler" /> is active. 104 /// </summary> 105 /// <value><c>true</c> if active; otherwise, <c>false</c>.</value> 106 public bool Active { get; private set; } 107 108 /// <summary> 109 /// Gets the activity log. 110 /// </summary> 111 /// <value>The activity log.</value> 112 public ActivityLog ActivityLog { get; } 113 114 /// <summary> 115 /// Gets the schedule. 116 /// </summary> 117 /// <value>The schedule.</value> 118 public IEnumerable<ScheduledJob> Schedule => schedule; 119 120 /// <summary> 121 /// Gets the next job to be executed. 122 /// </summary> 123 /// <value>The next job.</value> 124 public ScheduledJob NextJob { get; private set; } 125 126 /// <summary> 127 /// Add the specified job to the scheduler queue. 128 /// </summary> 129 /// <param name="description">Description.</param> Add(JobDescription description)130 public void Add(JobDescription description) 131 { 132 if (description.Configuration == null || description.Configuration.Frequency == -1) 133 return; 134 135 var scheduleDescription = new JobScheduleDescription( 136 description.Configuration.InitialRun, 137 description.Configuration.FixedRun, 138 description.Configuration.Frequency); 139 140 var job = new Job(description, scheduleDescription, Log); 141 jobs.Add(job); 142 143 Log.SchedulerJobAdded(description, job); 144 145 QueueJob(job, StartTime); 146 } 147 148 /// <summary> 149 /// Run the scheduler "queue pump" method. 150 /// </summary> RunOneCycle()151 public void RunOneCycle() 152 { 153 Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture; 154 155 if (firstRun) 156 { 157 firstRun = false; 158 Active = true; 159 160 lock (schedule) 161 { 162 Log.SchedulerStart(schedule.Count); 163 } 164 } 165 166 // Take the next job of the queue 167 ScheduledJob next; 168 lock (schedule) 169 { 170 next = schedule.OrderBy(j => j.Run).ThenBy(j => j.Job.Description.Priority).First(); 171 schedule.Remove(next); 172 next.Job.Queued = false; 173 NextJob = next; 174 } 175 176 // Wait until job should be run 177 int wait = Convert.ToInt32(next.Run.Subtract(DateTime.Now).TotalMilliseconds); 178 if (wait < 0) 179 { 180 if (wait < -5000) 181 Log.SchedulerBehind(); 182 183 wait = 0; 184 } 185 186 bool signal = WaitHandle.WaitOne(wait); 187 if (signal) 188 { 189 // Abort 190 return; 191 } 192 193 // Check if the job is already running and postpone if needed 194 lock (schedule) 195 { 196 if (next.Job.Description.ThreadType == ThreadType.SingleThreaded && 197 schedule.Count(j => j.Job.Description.ThreadType == ThreadType.SingleThreaded && j.Job.Running) != 0) 198 { 199 PostponeJob(next.Job); 200 return; 201 } 202 } 203 204 Log.SchedulerJobExecuting(next); 205 if (next.Job.Description.ThreadType != ThreadType.NoThread) 206 { 207 // Queue the job on the threadpool 208 ThreadPool.QueueUserWorkItem(JobThreadStart, next.Job); 209 } 210 else 211 { 212 // Execute directly in scheduler thread 213 ExecuteJob(next.Job); 214 } 215 216 // Dont re-schedule "run-only-once" jobs. 217 if (next.Job.Schedule.Frequency == 0) 218 return; 219 220 QueueJob(next.Job, DateTime.Now); 221 } 222 Stop()223 public void Stop() 224 { 225 int count; 226 227 // Remove finished threads 228 lock (runningThreads) 229 { 230 foreach (var finished in runningThreads.Where(t => !t.IsAlive).ToList()) 231 runningThreads.Remove(finished); 232 233 count = runningThreads.Count; 234 } 235 236 if (count != 0) 237 Log.SchedulerWaiting(count); 238 239 lock (runningThreads) 240 { 241 foreach (var thread in runningThreads.ToList()) 242 { 243 // Wait maximum 15 seconds 244 thread.Join(15000/count); 245 if (!thread.IsAlive) 246 continue; 247 248 Log.SchedulerAbort(thread); 249 thread.Abort(); 250 } 251 } 252 253 Log.SchedulerFinished(); 254 } 255 ExecuteJob(Job job)256 private void ExecuteJob(Job job) 257 { 258 try 259 { 260 job.Execute(); 261 } 262 catch (Exception e) 263 { 264 Log.SchedulerError(job.Description.Name, e); 265 } 266 } 267 JobThreadStart(object stateInfo)268 private void JobThreadStart(object stateInfo) 269 { 270 try 271 { 272 lock (runningThreads) 273 runningThreads.Add(Thread.CurrentThread); 274 275 var job = (Job) stateInfo; 276 Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture; 277 //Thread.CurrentThread.Name = job.Description.Name; 278 job.Execute(); 279 } 280 catch (Exception e) 281 { 282 Log.SchedulerError(Thread.CurrentThread.Name, e); 283 } 284 finally 285 { 286 lock (runningThreads) 287 { 288 if (runningThreads.Contains(Thread.CurrentThread)) 289 { 290 runningThreads.Remove(Thread.CurrentThread); 291 } 292 } 293 } 294 } 295 296 /// <summary> 297 /// Postpones the job when putting it to the queue. 298 /// </summary> 299 /// <param name="job">Job.</param> PostponeJob(Job job)300 private void PostponeJob(Job job) 301 { 302 lock (schedule) 303 { 304 ScheduledJob scheduledJob = job.Schedule.CreatePostponed(job); 305 schedule.Add(scheduledJob); 306 job.Queued = true; 307 job.Postponed = true; 308 Log.SchedulerJobStatus(scheduledJob); 309 } 310 } 311 312 /// <summary> 313 /// Queue job for scheduled run. 314 /// </summary> 315 /// <param name="job">Job.</param> 316 /// <param name="run">Run.</param> QueueJob(Job job, DateTime run)317 private void QueueJob(Job job, DateTime run) 318 { 319 DateTime next = job.Schedule.CalculateNextRun(run); 320 double offset = 0; 321 322 // Adjust offset to avoid simultaneously job execution. 323 if (job.Schedule.CanMove) 324 { 325 lock (schedule) 326 { 327 while (schedule.Count(j => Math.Abs(next.Subtract(j.Run).TotalMilliseconds) < 5000) != 0) 328 { 329 double move = job.Schedule.Frequency/40.0; 330 next = next.AddMinutes(move); 331 offset += move; 332 } 333 } 334 } 335 336 lock (schedule) 337 { 338 ScheduledJob scheduledJob = job.Schedule.CreateNew(job, run, offset); 339 schedule.Add(scheduledJob); 340 job.Queued = true; 341 job.Postponed = false; 342 Log.SchedulerJobStatus(scheduledJob); 343 } 344 } 345 346 #region IDisposable Support 347 348 private bool disposedValue; 349 Dispose(bool disposing)350 private void Dispose(bool disposing) 351 { 352 if (disposedValue) 353 return; 354 355 if (disposing) 356 { 357 WaitHandle.Dispose(); 358 } 359 360 disposedValue = true; 361 } 362 ~Scheduler()363 ~Scheduler() 364 { 365 Dispose(false); 366 } 367 Dispose()368 public void Dispose() 369 { 370 Dispose(true); 371 GC.SuppressFinalize(this); 372 } 373 374 #endregion 375 } 376 }