1 //
2 // Copyright (c) 2013-2017 Carsten Sonne Larsen <cs@innolan.net>
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 using System;
23 using System.Collections;
24 using System.Collections.Generic;
25 using System.Globalization;
26 using System.Linq;
27 using System.Threading;
28 using Ntp.Common.Log;
29 
30 namespace Ntp.Common.Process
31 {
32     /// <summary>
33     /// A scheduler performs scheduling of jobs according to job schedule descriptions.
34     /// </summary>
35     public sealed class Scheduler : IScheduler, IDisposable
36     {
37         /// <summary>
38         /// Initializes a new instance of the <see cref="Scheduler" /> class.
39         /// </summary>
40         /// <param name="log">Log.</param>
Scheduler(LogBase log)41         public Scheduler(LogBase log)
42         {
43             StartTime = DateTime.Now;
44             firstRun = true;
45             Active = false;
46 
47             runningThreads = new List<Thread>();
48             WaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
49 
50             schedule = new List<ScheduledJob>();
51             jobs = new List<Job>();
52             ActivityLog = LogFactory.CreateActivityLog();
53 
54             LogGroup logGroup = LogFactory.CreateGroupLog();
55             logGroup.Add(log);
56             logGroup.Add(ActivityLog);
57 
58             Log = logGroup;
59         }
60 
61         private readonly List<Job> jobs;
62         private readonly List<Thread> runningThreads;
63         private readonly List<ScheduledJob> schedule;
64         private bool firstRun;
65 
66         /// <summary>
67         /// Gets the wait handle of this <see cref="Scheduler" />.
68         /// </summary>
69         /// <value>The wait handle.</value>
70         public EventWaitHandle WaitHandle { get; }
71 
72         /// <summary>
73         /// Gets the start up time of this <see cref="Scheduler" />.
74         /// </summary>
75         /// <value>The start up time.</value>
76         public DateTime StartTime { get; }
77 
78         /// <summary>
79         /// Gets the log used by this <see cref="Scheduler" />.
80         /// </summary>
81         /// <value>The log.</value>
82         public LogBase Log { get; }
83 
84         /// <summary>
85         /// Gets the enumerator.
86         /// </summary>
87         /// <returns>The enumerator.</returns>
GetEnumerator()88         public IEnumerator<Job> GetEnumerator()
89         {
90             return jobs.GetEnumerator();
91         }
92 
93         /// <summary>
94         /// Gets the enumerator.
95         /// </summary>
96         /// <returns>The enumerator.</returns>
IEnumerable.GetEnumerator()97         IEnumerator IEnumerable.GetEnumerator()
98         {
99             return jobs.GetEnumerator();
100         }
101 
102         /// <summary>
103         /// Gets a value indicating whether this <see cref="Scheduler" /> is active.
104         /// </summary>
105         /// <value><c>true</c> if active; otherwise, <c>false</c>.</value>
106         public bool Active { get; private set; }
107 
108         /// <summary>
109         /// Gets the activity log.
110         /// </summary>
111         /// <value>The activity log.</value>
112         public ActivityLog ActivityLog { get; }
113 
114         /// <summary>
115         /// Gets the schedule.
116         /// </summary>
117         /// <value>The schedule.</value>
118         public IEnumerable<ScheduledJob> Schedule => schedule;
119 
120         /// <summary>
121         /// Gets the next job to be executed.
122         /// </summary>
123         /// <value>The next job.</value>
124         public ScheduledJob NextJob { get; private set; }
125 
126         /// <summary>
127         /// Add the specified job to the scheduler queue.
128         /// </summary>
129         /// <param name="description">Description.</param>
Add(JobDescription description)130         public void Add(JobDescription description)
131         {
132             if (description.Configuration == null || description.Configuration.Frequency == -1)
133                 return;
134 
135             var scheduleDescription = new JobScheduleDescription(
136                 description.Configuration.InitialRun,
137                 description.Configuration.FixedRun,
138                 description.Configuration.Frequency);
139 
140             var job = new Job(description, scheduleDescription, Log);
141             jobs.Add(job);
142 
143             Log.SchedulerJobAdded(description, job);
144 
145             QueueJob(job, StartTime);
146         }
147 
148         /// <summary>
149         /// Run the scheduler "queue pump" method.
150         /// </summary>
RunOneCycle()151         public void RunOneCycle()
152         {
153             Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
154 
155             if (firstRun)
156             {
157                 firstRun = false;
158                 Active = true;
159 
160                 lock (schedule)
161                 {
162                     Log.SchedulerStart(schedule.Count);
163                 }
164             }
165 
166             // Take the next job of the queue
167             ScheduledJob next;
168             lock (schedule)
169             {
170                 next = schedule.OrderBy(j => j.Run).ThenBy(j => j.Job.Description.Priority).First();
171                 schedule.Remove(next);
172                 next.Job.Queued = false;
173                 NextJob = next;
174             }
175 
176             // Wait until job should be run
177             int wait = Convert.ToInt32(next.Run.Subtract(DateTime.Now).TotalMilliseconds);
178             if (wait < 0)
179             {
180                 if (wait < -5000)
181                     Log.SchedulerBehind();
182 
183                 wait = 0;
184             }
185 
186             bool signal = WaitHandle.WaitOne(wait);
187             if (signal)
188             {
189                 // Abort
190                 return;
191             }
192 
193             // Check if the job is already running and postpone if needed
194             lock (schedule)
195             {
196                 if (next.Job.Description.ThreadType == ThreadType.SingleThreaded &&
197                     schedule.Count(j => j.Job.Description.ThreadType == ThreadType.SingleThreaded && j.Job.Running) != 0)
198                 {
199                     PostponeJob(next.Job);
200                     return;
201                 }
202             }
203 
204             Log.SchedulerJobExecuting(next);
205             if (next.Job.Description.ThreadType != ThreadType.NoThread)
206             {
207                 // Queue the job on the threadpool
208                 ThreadPool.QueueUserWorkItem(JobThreadStart, next.Job);
209             }
210             else
211             {
212                 // Execute directly in scheduler thread
213                 ExecuteJob(next.Job);
214             }
215 
216             // Dont re-schedule "run-only-once" jobs.
217             if (next.Job.Schedule.Frequency == 0)
218                 return;
219 
220             QueueJob(next.Job, DateTime.Now);
221         }
222 
Stop()223         public void Stop()
224         {
225             int count;
226 
227             // Remove finished threads
228             lock (runningThreads)
229             {
230                 foreach (var finished in runningThreads.Where(t => !t.IsAlive).ToList())
231                     runningThreads.Remove(finished);
232 
233                 count = runningThreads.Count;
234             }
235 
236             if (count != 0)
237                 Log.SchedulerWaiting(count);
238 
239             lock (runningThreads)
240             {
241                 foreach (var thread in runningThreads.ToList())
242                 {
243                     // Wait maximum 15 seconds
244                     thread.Join(15000/count);
245                     if (!thread.IsAlive)
246                         continue;
247 
248                     Log.SchedulerAbort(thread);
249                     thread.Abort();
250                 }
251             }
252 
253             Log.SchedulerFinished();
254         }
255 
ExecuteJob(Job job)256         private void ExecuteJob(Job job)
257         {
258             try
259             {
260                 job.Execute();
261             }
262             catch (Exception e)
263             {
264                 Log.SchedulerError(job.Description.Name, e);
265             }
266         }
267 
JobThreadStart(object stateInfo)268         private void JobThreadStart(object stateInfo)
269         {
270             try
271             {
272                 lock (runningThreads)
273                     runningThreads.Add(Thread.CurrentThread);
274 
275                 var job = (Job) stateInfo;
276                 Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
277                 //Thread.CurrentThread.Name = job.Description.Name;
278                 job.Execute();
279             }
280             catch (Exception e)
281             {
282                 Log.SchedulerError(Thread.CurrentThread.Name, e);
283             }
284             finally
285             {
286                 lock (runningThreads)
287                 {
288                     if (runningThreads.Contains(Thread.CurrentThread))
289                     {
290                         runningThreads.Remove(Thread.CurrentThread);
291                     }
292                 }
293             }
294         }
295 
296         /// <summary>
297         /// Postpones the job when putting it to the queue.
298         /// </summary>
299         /// <param name="job">Job.</param>
PostponeJob(Job job)300         private void PostponeJob(Job job)
301         {
302             lock (schedule)
303             {
304                 ScheduledJob scheduledJob = job.Schedule.CreatePostponed(job);
305                 schedule.Add(scheduledJob);
306                 job.Queued = true;
307                 job.Postponed = true;
308                 Log.SchedulerJobStatus(scheduledJob);
309             }
310         }
311 
312         /// <summary>
313         /// Queue job for scheduled run.
314         /// </summary>
315         /// <param name="job">Job.</param>
316         /// <param name="run">Run.</param>
QueueJob(Job job, DateTime run)317         private void QueueJob(Job job, DateTime run)
318         {
319             DateTime next = job.Schedule.CalculateNextRun(run);
320             double offset = 0;
321 
322             // Adjust offset to avoid simultaneously job execution.
323             if (job.Schedule.CanMove)
324             {
325                 lock (schedule)
326                 {
327                     while (schedule.Count(j => Math.Abs(next.Subtract(j.Run).TotalMilliseconds) < 5000) != 0)
328                     {
329                         double move = job.Schedule.Frequency/40.0;
330                         next = next.AddMinutes(move);
331                         offset += move;
332                     }
333                 }
334             }
335 
336             lock (schedule)
337             {
338                 ScheduledJob scheduledJob = job.Schedule.CreateNew(job, run, offset);
339                 schedule.Add(scheduledJob);
340                 job.Queued = true;
341                 job.Postponed = false;
342                 Log.SchedulerJobStatus(scheduledJob);
343             }
344         }
345 
346         #region IDisposable Support
347 
348         private bool disposedValue;
349 
Dispose(bool disposing)350         private void Dispose(bool disposing)
351         {
352             if (disposedValue)
353                 return;
354 
355             if (disposing)
356             {
357                 WaitHandle.Dispose();
358             }
359 
360             disposedValue = true;
361         }
362 
~Scheduler()363         ~Scheduler()
364         {
365             Dispose(false);
366         }
367 
Dispose()368         public void Dispose()
369         {
370             Dispose(true);
371             GC.SuppressFinalize(this);
372         }
373 
374         #endregion
375     }
376 }