Package zephir :: Package monitor :: Package agentmanager :: Module zephirservice
[frames] | no frames]

Source Code for Module zephir.monitor.agentmanager.zephirservice

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  ########################################################################### 
  8   
  9  """ 
 10  Services Twisted de collection et de publication de données. 
 11  """ 
 12   
 13  import locale, gettext, os, pwd, shutil, random 
 14  from glob import glob 
 15  from ConfigParser import ConfigParser 
 16   
 17  # install locales early 
 18  from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR 
 19  APP = 'zephir-agents' 
 20  DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n') 
 21  gettext.install(APP, DIR, unicode=False) 
 22   
 23   
 24  from twisted.application import internet, service 
 25  from twisted.internet import utils, reactor 
 26  from twisted.web import resource, server, static, util, xmlrpc 
 27  from twisted.python import syslog 
 28   
 29  from zephir.monitor.agentmanager import config as cfg 
 30  from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files, log 
 31  from zephir.monitor.agentmanager.web_resources import ZephirServerResource 
 32  from zephir.monitor.agentmanager.clientmanager import ClientManager 
 33   
 34  try: 
 35      import zephir.zephir_conf.zephir_conf as conf_zeph 
 36      from zephir.lib_zephir import zephir_proxy, convert, zephir_dir, update_sudoers 
 37      from zephir.lib_zephir import log as zeph_log 
 38      registered = 1 
 39  except: 
 40      # serveur non enregistré sur zephir 
 41      registered = 0 
 42   
43 -class ZephirService(service.MultiService):
44 """Main Twisted service for Zephir apps""" 45
46 - def __init__(self, config, root_resource=None, serve_static=False):
47 """config will be completed by default values""" 48 service.MultiService.__init__(self) 49 self.config = cfg.DEFAULT_CONFIG.copy() 50 self.config.update(config) 51 self.updater = self.publisher = None 52 # mise à jour des scripts clients dans sudoers 53 if registered: 54 update_sudoers() 55 # parent web server 56 if root_resource is None: 57 self.root_resource = resource.Resource() 58 webserver = internet.TCPServer(self.config['webserver_port'], 59 server.Site(self.root_resource)) 60 webserver.setServiceParent(service.IServiceCollection(self)) 61 else: 62 self.root_resource = root_resource 63 # serve global static files 64 if serve_static: 65 self.root_resource.putChild('static', 66 static.File(self.config['static_web_dir']))
67 68 69 # subservices factory methods 70
71 - def with_updater(self):
72 assert self.updater is None 73 self.updater = UpdaterService(self.config, self, self.root_resource) 74 return self
75
76 - def with_publisher(self):
77 assert self.publisher is None 78 self.publisher = PublisherService(self.config, self, self.root_resource) 79 return self
80
82 assert self.updater is None 83 assert self.publisher is None 84 self.updater = UpdaterService(self.config, self, self.root_resource) 85 self.publisher = PublisherService(self.config, self, self.root_resource, 86 show_clients_page = False, 87 live_agents={self.config['host_ref']: self.updater.agents}) 88 return self
89 90 91 92
93 -class UpdaterService(service.MultiService, xmlrpc.XMLRPC):
94 """Schedules measures, data serialisation and upload.""" 95
96 - def __init__(self, config, parent, root_resource):
97 """config should be complete""" 98 service.MultiService.__init__(self) 99 xmlrpc.XMLRPC.__init__(self) 100 self.old_obs = None 101 self.config = config 102 # updates site.cfg file 103 self.update_static_data() 104 # start subservices 105 loc, enc = locale.getdefaultlocale() 106 log.msg(_('default locale: %s encoding: %s') % (loc, enc)) 107 if enc == 'utf': 108 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc) 109 self.agents = self.load_agents() 110 # attach to parent service 111 self.setServiceParent(service.IServiceCollection(parent)) 112 root_resource.putChild('xmlrpc', self)
113
114 - def startService(self):
115 """initialize zephir services""" 116 service.MultiService.startService(self) 117 reactor.callLater(2,self.schedule_all) 118 # mise à jour du préfixe de log (twisted par défaut) 119 # FIX : on conserve la référence à l'ancien observer pour 120 # éviter les pb à la fermeture du service 121 self.old_obs = log.theLogPublisher.observers[0] 122 try: 123 from zephir.backend import config as conf_zeph 124 log_prefix = 'zephir_backend' 125 except: 126 log_prefix = 'zephiragents' 127 new_obs = syslog.SyslogObserver(log_prefix, options=syslog.DEFAULT_OPTIONS, facility=syslog.DEFAULT_FACILITY) 128 log.addObserver(new_obs.emit) 129 log.removeObserver(self.old_obs) 130 if registered != 0: 131 # on est enregistré sur zephir => initiation de 132 # la création et l'envoi d'archives 133 self.setup_uucp() 134 # dans le cas ou un reboot a été demandé, on indique que le redémarrage est bon 135 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')): 136 try: 137 zeph_log('REBOOT',0,'redémarrage du serveur terminé') 138 os.unlink(os.path.join(zephir_dir,'reboot.lck')) 139 except: 140 pass
141
142 - def stopService(self):
143 """stops zephir services""" 144 if self.old_obs: 145 log.removeObserver(log.theLogPublisher.observers[0]) 146 log.addObserver(self.old_obs) 147 service.MultiService.stopService(self)
148
149 - def load_agents(self):
150 """Charge tous les agents du répertoire de configurations.""" 151 log.msg(_("Loading agents from %s...") % self.config['config_dir']) 152 loaded_agents = {} 153 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent")) 154 for f in list_agents: 155 log.msg(_(" from %s:") % os.path.basename(f)) 156 h = { 'AGENTS': None } 157 execfile(f, globals(), h) 158 assert h.has_key('AGENTS') 159 for a in h['AGENTS']: 160 assert not loaded_agents.has_key(a.name) 161 # init agent data and do a first archive 162 a.init_data(os.path.join(self.config['state_dir'], 163 self.config['host_ref'], 164 a.name)) 165 a.manager = self 166 a.archive() 167 loaded_agents[a.name] = a # /!\ écrasement des clés 168 log.msg(_(" %s, period %d") % (a.name, a.period)) 169 log.msg(_("Loaded.")) 170 return loaded_agents
171 172 173 # scheduling measures 174
175 - def schedule(self, agent_name):
176 """Planifie les mesures périodiques d'un agent.""" 177 assert self.agents.has_key(agent_name) 178 if self.agents[agent_name].period > 0: 179 timer = internet.TimerService(self.agents[agent_name].period, 180 self.wakeup_for_measure, agent_name) 181 timer.setName(agent_name) 182 timer.setServiceParent(service.IServiceCollection(self))
183 184
185 - def wakeup_for_measure(self, agent_name):
186 """Callback pour les mesures planifiées.""" 187 assert self.agents.has_key(agent_name) 188 # log.debug("Doing scheduled measure on " + agent_name) 189 self.agents[agent_name].scheduled_measure()
190 191
192 - def schedule_all(self):
193 """Planifie tous les agents chargés. 194 Démarre le cycle de mesures périodiques de chaque agent 195 chargé. La première mesure est prise immédiatement. 196 """ 197 for agent_name in self.agents.keys(): 198 # charge les actions disponibles (standard en premier, puis les actions locales) 199 # les actions locales écrasent les actions standard si les 2 existent 200 for action_dir in (os.path.join(self.config['action_dir'],'eole'), self.config['action_dir']): 201 f_actions = os.path.join(action_dir, "%s.actions" % agent_name) 202 if os.path.isfile(f_actions): 203 actions = {} 204 execfile(f_actions, globals(), actions) 205 for item in actions.keys(): 206 if item.startswith('action_'): 207 setattr(self.agents[agent_name], item, actions[item]) 208 # self.wakeup_for_measure(agent_name) # first measure at launch 209 self.schedule(agent_name)
210 211
212 - def timer_for_agent_named(self, agent_name):
213 assert self.agents.has_key(agent_name) 214 return self.getServiceNamed(agent_name)
215 216 217 # data upload to zephir server 218
219 - def setup_uucp(self):
220 ensure_dirs(self.config['uucp_dir']) 221 self.update_static_data() 222 # récupération du délai de connexion à zephir 223 try: 224 reload(conf_zeph) 225 # supression des éventuels répertoires de stats invalides 226 # par ex, en cas de désinscription zephir 'manuelle'. 227 228 # sur zephir : on garde toujours 0 pour éviter les conflits avec les serveurs enregistrés 229 if not os.path.isfile('/etc/init.d/zephir'): 230 for st_dir in os.listdir(self.config['state_dir']): 231 if st_dir != str(conf_zeph.id_serveur): 232 shutil.rmtree(os.path.join(self.config['state_dir'],st_dir)) 233 # vérification sur zephir du délai de connexion 234 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 235 except: 236 period = 0 237 238 if period < 30: 239 period = self.config['upload_period'] 240 log.msg(_('Using default period : %s seconds') % period) 241 # on ajoute un décalage aléatoire (entre 30 secondes et period) au premier démarrage 242 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 243 delay = random.randrange(30,period) 244 reactor.callLater(delay,self.wakeup_for_upload)
245
246 - def update_static_data(self):
247 original = os.path.join(self.config['config_dir'], 'site.cfg') 248 if os.path.isfile(original): 249 destination = cfg.client_data_dir(self.config, self.config['host_ref']) 250 ensure_dirs(destination) 251 need_copy = False 252 try: 253 org_mtime = os.path.getmtime(original) 254 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg')) 255 except OSError: 256 need_copy = True 257 if need_copy or (org_mtime > dest_mtime): 258 shutil.copy(original, destination)
259
260 - def wakeup_for_upload(self, recall=True):
261 # relecture du délai de connexion sur zephir 262 try: 263 reload(conf_zeph) 264 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1]) 265 except: 266 period = 0 267 # on relance la fonction dans le délai demandé 268 if period < 30: 269 period = self.config['upload_period'] 270 log.msg(_('Using default period : %s seconds') % period) 271 # on ajoute un décalage au premier démarrage 272 # (pour éviter trop de connexions simultanées si le service est relancé par crontab) 273 if recall: 274 reactor.callLater(period,self.wakeup_for_upload) 275 276 # virer l'ancienne archive du rép. uucp 277 for agent in self.agents.values(): 278 agent.archive() 279 # agent.reset_max_status() 280 self.update_static_data() 281 # archiver dans rép. uucp, donner les droits en lecture sur l'archive 282 try: 283 assert conf_zeph.id_serveur != 0 284 client_dir = os.path.join(self.config['tmp_data_dir'],str(conf_zeph.id_serveur)) 285 except: 286 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref']) 287 try: 288 # purge du répertoire temporaire 289 if os.path.isdir(client_dir): 290 shutil.rmtree(client_dir) 291 os.makedirs(client_dir) 292 except: # non existant 293 pass 294 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))] 295 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list')) 296 if os.path.exists(ignore_file): 297 args.append(ignore_file) 298 # on ne copie que les données des agents instanciés 299 # cela évite de remonter par exemple les stats rvp si le service a été désactivé 300 for agent_name in self.agents.keys(): 301 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name))) 302 args.append(os.path.abspath(client_dir)) 303 res = utils.getProcessOutput('/bin/cp', args = args) 304 res.addCallbacks(self._make_archive, 305 lambda x: log.msg(_("/!\ copy failed (%s)\n" 306 "data: %s") 307 % (x, self.config['state_dir'])))
308
309 - def _check_md5(self):
310 # calcul de sommes md5 pour config.eol et les patchs 311 rep_src = "/usr/share/eole/creole" 312 rep_conf = "/etc/eole" 313 data = [] 314 for src, dst, pattern in md5files[cfg.distrib_version]: 315 if src == 'variables.eol': 316 # cas particulier : variables.eol, on génère le fichier à chaque fois 317 orig_eol = os.path.join(rep_conf, 'config.eol') 318 if os.path.isfile(orig_eol): 319 var_eol = os.path.join(rep_src, 'variables.eol') 320 # on crée un fichier avec variable:valeur ordonné par nom de variable 321 conf = ConfigParser() 322 conf.read(orig_eol) 323 var_names = conf.sections() 324 var_names.sort() 325 f_var = file(var_eol, 'w') 326 for var_name in var_names: 327 if var_name != 'mode_zephir': 328 f_var.write("%s:%s\n" % (var_name, ''.join(eval(conf.get(var_name, 'val'))))) 329 f_var.close() 330 if os.path.isdir(os.path.join(rep_src,src)): 331 fics = os.listdir(os.path.join(rep_src,src)) 332 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics] 333 else: 334 fics = [(src,dst)] 335 for fic, fic_dst in fics: 336 if os.path.isfile(os.path.join(rep_src,fic)): 337 if (pattern is None) or fic.endswith(pattern): 338 md5res = md5file(os.path.join(rep_src,fic)) 339 data.append("%s %s\n" % (md5res, fic_dst)) 340 try: 341 assert conf_zeph.id_serveur != 0 342 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % str(conf_zeph.id_serveur)), "w") 343 except: 344 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w") 345 outf.writelines(data) 346 outf.close()
347
348 - def _get_packages(self, *args):
349 """génère une liste des paquets installés 350 """ 351 try: 352 assert conf_zeph.id_serveur != 0 353 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % str(conf_zeph.id_serveur))) 354 except: 355 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % self.config['host_ref'])) 356 os.system(cmd_pkg)
357
358 - def _make_archive(self,*args):
359 self._check_md5() 360 self._get_packages() 361 # compression des données à envoyer 362 try: 363 assert conf_zeph.id_serveur != 0 364 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % str(conf_zeph.id_serveur)) 365 except: 366 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % self.config['host_ref']) 367 tar_cwd = os.path.dirname(os.path.abspath(self.config['tmp_data_dir'])) 368 tar_dir = os.path.basename(os.path.abspath(self.config['tmp_data_dir'])) 369 res = utils.getProcessOutput('/bin/tar', 370 args = ('czf', tarball, 371 '--exclude', 'private', 372 '-C', tar_cwd, 373 tar_dir)) 374 res.addCallbacks(self._try_chown, 375 lambda x: log.msg(_("/!\ archiving failed (%s)\n" 376 "data: %s\narchive: %s") 377 % (x, self.config['state_dir'], tarball)), 378 callbackArgs = [tarball])
379
380 - def _try_chown(self, tar_output, tarball):
381 try: 382 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 383 uid = os.getuid() 384 os.chown(tarball, uucp_uid, uucp_gid) # only change group id so that uucp can read while we can still write 385 except OSError, e: 386 log.msg("/!\ chown error, check authorizations (%s)" % e) 387 # upload uucp 388 # on fait également un chown sur le fichier deffered_logs au cas ou il serait en root 389 try: 390 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4] 391 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid) 392 except: 393 log.msg("/!\ chown error on deffered_logs") 394 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
395 396 397 # xmlrpc methods 398
399 - def xmlrpc_list_agents(self):
400 """@return: Liste des agents chargés""" 401 return self.agents.keys()
402 xmlrpc_list_agents.signature = [['array']] 403
404 - def xmlrpc_agents_menu(self):
405 """@return: Liste des agents chargés et structure d'affichage""" 406 try: 407 menu = {} 408 for name, agent in self.agents.items(): 409 if agent.section != None: 410 if not menu.has_key(agent.section): 411 menu[agent.section] = [] 412 menu[agent.section].append((name, agent.description)) 413 return menu 414 except Exception, e: 415 log.msg(e)
416 xmlrpc_agents_menu.signature = [['struct']] 417
418 - def xmlrpc_status_for_agents(self, agent_name_list = []):
419 """ 420 @return: Les statuts des agents listés dans un dictionnaire 421 C{{nom:status}}. Le status est lui-même un dictionnaire avec 422 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents 423 effectivement chargés apparaîtront parmi les clés du 424 dictionnaire. 425 """ 426 result = {} 427 if len(agent_name_list) == 0: 428 agent_name_list = self.agents.keys() 429 for agent_name in agent_name_list: 430 if self.agents.has_key(agent_name): 431 result[agent_name] = self.agents[agent_name].check_status().to_dict() 432 return result
433 xmlrpc_status_for_agents.signature = [['string', 'struct']] 434
435 - def xmlrpc_reset_max_status_for_agents(self, agent_name_list=[]):
436 if len(agent_name_list) == 0: 437 agent_name_list = self.agents.keys() 438 for agent_name in agent_name_list: 439 if self.agents.has_key(agent_name): 440 self.agents[agent_name].reset_max_status() 441 return "ok"
442
444 self.wakeup_for_upload(False) 445 return "ok"
446 447
448 -class PublisherService(service.MultiService):
449 """Serves the web interface for current agent data""" 450
451 - def __init__(self, config, parent, root_resource, 452 live_agents=None, 453 show_clients_page=True):
454 """config should be complete""" 455 service.MultiService.__init__(self) 456 self.config = config 457 self.show_clients_page = show_clients_page 458 self.manager = ClientManager(self.config, live_agents) 459 # attach to parent service 460 self.setServiceParent(service.IServiceCollection(parent)) 461 # run webserver 462 rsrc = ZephirServerResource(self.config, self.manager) 463 root_resource.putChild('agents', rsrc) 464 default_page = './agents/' 465 if not self.show_clients_page: 466 default_page += self.config['host_ref'] + '/' 467 root_resource.putChild('', util.Redirect(default_page))
468 469 #TODO 470 # update resources: loading host structures, manager -> agent dict 471 # connect publisher and updater to zephir service (web server, config...) 472 473 # client manager: liste des host_ref, {host_ref => agent_manager} 474 # agent manager: structure, {nom => agent_data} 475