1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.lib.service.scheduler;
20 
21 import org.apache.hadoop.classification.InterfaceAudience;
22 import org.apache.hadoop.lib.lang.RunnableCallable;
23 import org.apache.hadoop.lib.server.BaseService;
24 import org.apache.hadoop.lib.server.Server;
25 import org.apache.hadoop.lib.server.ServiceException;
26 import org.apache.hadoop.lib.service.Instrumentation;
27 import org.apache.hadoop.lib.service.Scheduler;
28 import org.apache.hadoop.lib.util.Check;
29 import org.apache.hadoop.util.Time;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 
33 import java.text.MessageFormat;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 
39 @InterfaceAudience.Private
40 public class SchedulerService extends BaseService implements Scheduler {
41   private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
42 
43   private static final String INST_GROUP = "scheduler";
44 
45   public static final String PREFIX = "scheduler";
46 
47   public static final String CONF_THREADS = "threads";
48 
49   private ScheduledExecutorService scheduler;
50 
SchedulerService()51   public SchedulerService() {
52     super(PREFIX);
53   }
54 
55   @Override
init()56   public void init() throws ServiceException {
57     int threads = getServiceConfig().getInt(CONF_THREADS, 5);
58     scheduler = new ScheduledThreadPoolExecutor(threads);
59     LOG.debug("Scheduler started");
60   }
61 
62   @Override
destroy()63   public void destroy() {
64     try {
65       long limit = Time.now() + 30 * 1000;
66       scheduler.shutdownNow();
67       while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
68         LOG.debug("Waiting for scheduler to shutdown");
69         if (Time.now() > limit) {
70           LOG.warn("Gave up waiting for scheduler to shutdown");
71           break;
72         }
73       }
74       if (scheduler.isTerminated()) {
75         LOG.debug("Scheduler shutdown");
76       }
77     } catch (InterruptedException ex) {
78       LOG.warn(ex.getMessage(), ex);
79     }
80   }
81 
82   @Override
getServiceDependencies()83   public Class[] getServiceDependencies() {
84     return new Class[]{Instrumentation.class};
85   }
86 
87   @Override
getInterface()88   public Class getInterface() {
89     return Scheduler.class;
90   }
91 
92   @Override
schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit)93   public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) {
94     Check.notNull(callable, "callable");
95     if (!scheduler.isShutdown()) {
96       LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]",
97                 new Object[]{callable, delay, interval, unit});
98       Runnable r = new Runnable() {
99         @Override
100         public void run() {
101           String instrName = callable.getClass().getSimpleName();
102           Instrumentation instr = getServer().get(Instrumentation.class);
103           if (getServer().getStatus() == Server.Status.HALTED) {
104             LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus());
105             instr.incr(INST_GROUP, instrName + ".skips", 1);
106           } else {
107             LOG.debug("Executing [{}]", callable);
108             instr.incr(INST_GROUP, instrName + ".execs", 1);
109             Instrumentation.Cron cron = instr.createCron().start();
110             try {
111               callable.call();
112             } catch (Exception ex) {
113               instr.incr(INST_GROUP, instrName + ".fails", 1);
114               LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex});
115             } finally {
116               instr.addCron(INST_GROUP, instrName, cron.stop());
117             }
118           }
119         }
120       };
121       scheduler.scheduleWithFixedDelay(r, delay, interval, unit);
122     } else {
123       throw new IllegalStateException(
124         MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable));
125     }
126   }
127 
128   @Override
schedule(Runnable runnable, long delay, long interval, TimeUnit unit)129   public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) {
130     schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit);
131   }
132 
133 }
134