1# 2# Copyright 2016, Martin Owens <doctormo@gmail.com> 3# 4# This library is free software; you can redistribute it and/or 5# modify it under the terms of the GNU Lesser General Public 6# License as published by the Free Software Foundation; either 7# version 3.0 of the License, or (at your option) any later version. 8# 9# This library is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12# Lesser General Public License for more details. 13# 14# You should have received a copy of the GNU Lesser General Public 15# License along with this library. 16# 17""" 18The crontabs manager will list all available crontabs on the system. 19""" 20 21import os 22import sys 23import pwd 24import itertools 25 26from os import stat, access, X_OK 27from pwd import getpwuid 28from crontab import CronTab 29 30class UserSpool(list): 31 """Generates all user crontabs, yields both owned and abandoned tabs""" 32 def __init__(self, loc, tabs=None): 33 for username in self.listdir(loc): 34 tab = self.generate(loc, username) 35 if tab: 36 self.append(tab) 37 if not self: 38 tab = CronTab(user=True) 39 if tab: 40 self.append(tab) 41 42 def listdir(self, loc): 43 try: 44 return os.listdir(loc) 45 except OSError: 46 return [] 47 48 def get_owner(self, path): 49 """Returns user file at path""" 50 try: 51 return getpwuid(stat(path).st_uid).pw_name 52 except KeyError: 53 return 54 55 def generate(self, loc, username): 56 path = os.path.join(loc, username) 57 if username != self.get_owner(path): 58 # Abandoned crontab pool entry! 59 return CronTab(tabfile=path) 60 return CronTab(user=username) 61 62 63class SystemTab(list): 64 """Generates all system tabs""" 65 def __init__(self, loc, tabs=None): 66 if os.path.isdir(loc): 67 for item in os.listdir(loc): 68 if item[0] == '.': 69 continue 70 path = os.path.join(loc, item) 71 self.append(CronTab(user=False, tabfile=path)) 72 elif os.path.isfile(loc): 73 self.append(CronTab(user=False, tabfile=loc)) 74 75 76class AnaCronTab(list): 77 """Attempts to digest anacron entries (if possible)""" 78 def __init__(self, loc, tabs=None): 79 if tabs and os.path.isdir(loc): 80 self.append(CronTab(user=False)) 81 jobs = list(tabs.all.find_command(loc)) 82 if jobs: 83 for item in os.listdir(loc): 84 self.add(loc, item, jobs[0]) 85 jobs[0].delete() 86 87 def add(self, loc, item, anajob): 88 path = os.path.join(loc, item) 89 if item in ['0anacron'] or item[0] == '.' or not access(path, X_OK): 90 return 91 job = self[0].new(command=path, user=anajob.user) 92 job.set_comment('Anacron %s' % loc.split('.')[-1]) 93 job.setall(anajob) 94 return job 95 96 97# Files are direct, directories are listed (recursively) 98KNOWN_LOCATIONS = [ 99 # Known linux locations (Debian, RedHat, etc) 100 (UserSpool, '/var/spool/cron/crontabs/'), 101 (SystemTab, '/etc/crontab'), 102 (SystemTab, '/etc/cron.d/'), 103 # Anacron digestion (we want to know more) 104 (AnaCronTab, '/etc/cron.hourly'), 105 (AnaCronTab, '/etc/cron.daily'), 106 (AnaCronTab, '/etc/cron.weekly'), 107 (AnaCronTab, '/etc/cron.monthly'), 108 # Known MacOSX locations 109 # None 110 # Other (windows, bsd) 111 # None 112] 113 114class CronTabs(list): 115 """Singleton dictionary of all detectable crontabs""" 116 _all = None 117 _self = None 118 119 def __new__(cls, *args, **kw): 120 if not cls._self: 121 cls._self = super(CronTabs, cls).__new__(cls, *args, **kw) 122 return cls._self 123 124 def __init__(self): 125 if not self: 126 for loc in KNOWN_LOCATIONS: 127 self.add(*loc) 128 129 def add(self, cls, *args): 130 for tab in cls(*args, tabs=self): 131 self.append(tab) 132 self._all = None 133 134 @property 135 def all(self): 136 """Return a CronTab object with all jobs (read-only)""" 137 if self._all is None: 138 self._all = CronTab(user=False) 139 for tab in self: 140 for job in tab: 141 if job.user is None: 142 job.user = tab.user or 'unknown' 143 self._all.append(job) 144 return self._all 145 146