Package zephir :: Package monitor :: Package agentmanager :: Module agent
[frames] | no frames]

Source Code for Module zephir.monitor.agentmanager.agent

  1  # -*- coding: UTF-8 -*- 
  2  ########################################################################### 
  3  # Eole NG - 2007 
  4  # Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon) 
  5  # Licence CeCill  cf /root/LicenceEole.txt 
  6  # eole@ac-dijon.fr 
  7  ########################################################################### 
  8   
  9  try: _ # localized string fetch function 
 10  except NameError: _ = str 
 11   
 12  import os, traceback 
 13  from datetime import timedelta 
 14  from ConfigParser import ConfigParser 
 15  from twisted.internet import defer 
 16  from twisted.web.microdom import * 
 17  # from twisted.persisted import marmalade 
 18  from twisted.persisted import aot 
 19   
 20  from zephir.monitor.agentmanager import config as cfg, rrd, status, util 
 21  from zephir.monitor.agentmanager.data import * 
 22  from zephir.monitor.agentmanager.util import log 
 23  from zephir.lib_zephir import log as zephir_log, is_locked 
 24   
 25  action_map = {0:'action_unknown',1:'action_ok',2:'action_warn',3:'action_error',4:'action_dependant'} 
 26   
27 -class AgentData:
28 """Persistance et accès aux données d'un agent. 29 30 Cette classe contient la « charge utile » d'un agent ; avant envoi 31 au serveur Zephir tous les agents sont convertis en une instance 32 de C{L{AgentData}}. 33 34 Attributs : 35 36 - C{name} : nom de l'agent 37 38 - C{period} : intervalle de mesures en secondes ; C{period=0} 39 désactive les mesures automatiques. 40 41 - C{description} : description de cette instance d'agent, 42 apparaissant sur la page de l'agent. 43 44 - C{max_status} : L{status<zephir.monitor.agentmanager.status>} le plus grave ; réinitialisé à chaque 45 envoi au serveur Zephir ou sur demande via XML-RPC. 46 47 - C{last_measure_date} : date de la dernière mesure. 48 49 - C{data} : liste d'objets représentant les données associées à 50 l'agent (cf. C{L{zephir.monitor.agentmanager.data}}). 51 """ 52
53 - def __init__(self, name, period, description, section, 54 max_status, max_status_date, last_status, last_measure_date, data, measure_data={}):
55 self.name = name 56 self.period = period 57 self.description = description 58 self.section = section 59 self.max_status = max_status 60 self.last_status = last_status 61 self.max_status_date = max_status_date 62 self.last_measure_date = last_measure_date 63 self.data = data 64 self.measure_data = measure_data
65 66
67 - def from_agent(self, agent):
68 """I{Factory Method} 69 70 @param agent: un agent concret 71 72 @return: une copie de C{agent} ne contenant plus que les 73 données utiles au serveur Zephir (instance de C{AgentData} 74 """ 75 result = self(agent.name, agent.period, agent.description, agent.section, 76 agent.max_status, agent.max_status_date, agent.last_status, agent.last_measure_date, agent.data, agent.measure_data) 77 return result
78 from_agent = classmethod(from_agent) 79 80
81 - def from_archive(self, archive_dir):
82 """I{Factory Method} 83 84 @param archive_dir: le chemin d'un répertoire contenant les 85 données d'un agent 86 87 @return: une instance de C{AgentData} chargée depuis le 88 système de fichiers 89 """ 90 xml_filename = os.path.join(archive_dir, 'agent.xml') 91 xml_file = file(xml_filename, 'r') 92 try: 93 # passage de persisted.marmalade à persisted.aot 94 try: 95 result = aot.unjellyFromSource(xml_file) 96 except: 97 xml_file.seek(0) 98 from twisted.persisted import marmalade 99 result = marmalade.unjellyFromXML(xml_file) 100 except Exception, e: 101 error_agent = LoadErrorAgent( 102 os.path.basename(archive_dir), 103 title = _("Loading archive %s failed") % archive_dir, 104 message = _("Loading agent from archive %s failed:\n%s") % (archive_dir, e)) 105 result = AgentData.from_agent(error_agent) 106 xml_file.close() 107 return result
108 from_archive = classmethod(from_archive) 109 110
111 - def archive(self, archive_dir):
112 """Sérialise l'agent sur disque, cf. L{from_archive}""" 113 xml_filename = os.path.join(archive_dir, 'agent.xml') 114 xml_file = file(xml_filename, 'w') 115 # écriture avec aot (marmalade deprecated) 116 aot.jellyToSource(self, xml_file) 117 # marmalade.jellyToXML(self, xml_file) 118 xml_file.close()
119
120 - def ensure_data_uptodate(self):
121 """Met à jour les données de l'agent sur disque""" 122 pass
123 124 125 126 127 STATUS_GRAPH_OPTIONS = [ 128 "-send-7days", "-l0", "-u1", "-g", 129 "-w112", "-h10", "-xHOUR:6:DAY:1:DAY:1:0:%d", 130 "CDEF:unknown=status,0,EQ", 131 "CDEF:ok=status,1,EQ", 132 "CDEF:warn=status,2,EQ", 133 "CDEF:error=status,3,EQ", 134 "AREA:unknown#666666", 135 "AREA:ok#33BB33", 136 "AREA:warn#CC6600", 137 "AREA:error#BB3333", 138 ] 139 140 STATUS_GRAPH_OPTIONS_MONTHLY = [ 141 "-send-1month", "-l0", "-u1", "-g", 142 "-w300", "-h10", "-xDAY:1:WEEK:1:DAY:7:0:%d", 143 "CDEF:unknown=status,0,EQ", 144 "CDEF:ok=status,1,EQ", 145 "CDEF:warn=status,2,EQ", 146 "CDEF:error=status,3,EQ", 147 "AREA:unknown#666666", 148 "AREA:ok#33BB33", 149 "AREA:warn#CC6600", 150 "AREA:error#BB3333", 151 ] 152
153 -def no_action(agent, old_status, new_status):
154 log.msg('%s : status changed to %s' % (agent.name, new_status))
155
156 -class Agent(AgentData):
157 """Classe abstraite des agents. 158 159 Un agent concret est une sous-classe d'C{L{Agent}} implémentant 160 (en particulier) la méthode C{L{measure()}}. 161 """ 162
163 - def __init__(self, name, 164 period = 60, 165 fields = ['value'], 166 description = None, 167 section = None, 168 modules = None, 169 requires = [], 170 **params):
171 if description is None: 172 description = self.__class__.__doc__ 173 AgentData.__init__(self, name, period, description, section, 174 status.Unknown(), "", status.Unknown(), "", [], {}) 175 self.fields = fields 176 self.data_needs_update = False 177 # those will be set by loader, eg UpdaterService 178 self.archive_dir = None 179 self.status_rrd = None 180 self.manager = None 181 # dépendances fonctionnelles 182 self.requires = requires 183 self.modules = modules 184 self.last_measure = None
185
186 - def init_data(self, archive_dir):
187 """Mémorise et initialise le répertoire d'archivage 188 189 Cette méthode sera appelée par le framework après chargement 190 de l'agent, afin de terminer les initialisations pour 191 lesquelles l'agent a besoin de connaître l'emplacement de ses 192 données sur disque. 193 """ 194 self.archive_dir = archive_dir 195 self.ensure_datadirs() 196 if self.period != 0: # period = 0 => no agent measures scheduled 197 status_period, xff = self.period, 0.75 198 else: 199 status_period, xff = 60, 1 200 201 # on récupère le dernier problème et sa date dans l'archive si elle est présente 202 xml_file = os.path.join(self.archive_dir, 'agent.xml') 203 if os.path.exists(xml_file): 204 try: 205 a = AgentData("temp", 60, "agent temporaire",status.Unknown(), "", status.Unknown(), "", [], {}) 206 a = a.from_archive(self.archive_dir) 207 self.max_status = a.max_status 208 self.max_status_date = a.max_status_date 209 del a 210 except: 211 pass 212 213 statusname = os.path.join(self.archive_dir, 'status') 214 self.status_rrd = rrd.Database(statusname + '.rrd', 215 step = status_period) 216 self.status_rrd.new_datasource( # status history 217 name = "status", 218 min_bound = 0, max_bound = len(status.STATUS_ORDER)) 219 if status_period > 3600: 220 # 1 record per week on 1 year 221 self.status_rrd.new_archive( 222 rows = 52, steps = (3600*24*7) / status_period, 223 consolidation = 'MAX', xfiles_factor = 0.75) 224 else: 225 # 1 record per 12 hours on 1 year 226 self.status_rrd.new_archive( 227 rows = 730, steps = (3600 * 12) / status_period, 228 consolidation = 'MAX', xfiles_factor = 0.75) 229 # 1 record per 2hour/pixel on 30 days 230 self.status_rrd.new_archive( 231 rows = 12*30, steps = 7200 / status_period, 232 consolidation = 'MAX', xfiles_factor = 0.75) 233 # 1 record per hour/pixel on 1 week 234 self.status_rrd.new_archive( 235 rows = 24*7, steps = 3600 / status_period, 236 consolidation = 'MAX', xfiles_factor = 0.75) 237 # 1 record per period on 1 day 238 self.status_rrd.new_archive( 239 rows = 24*3600/status_period, steps = 1, 240 consolidation = 'MAX', xfiles_factor = 0.75) 241 self.status_rrd.new_graph(pngname = statusname + '.png', 242 vnamedefs = { "status": ("status", 'MAX') }, 243 options = STATUS_GRAPH_OPTIONS) 244 self.status_rrd.new_graph(pngname = statusname + '_long.png', 245 vnamedefs = { "status": ("status", 'MAX') }, 246 options = STATUS_GRAPH_OPTIONS_MONTHLY) 247 # vérification ajoutée pour recréer les anciennes archives (1 semaine de données seulement) 248 try: 249 rrd_info = self.status_rrd.info() 250 if rrd_info: 251 if rrd_info.has_key('rra[1].cf') and not rrd_info.has_key('rra[2].cf'): 252 # suppression du fichier si 2 sources 253 os.unlink(statusname + '.rrd') 254 except: 255 # fichier rrd illisible, on le supprime 256 os.unlink(statusname + '.rrd') 257 self.status_rrd.create()
258 259 # Méthodes à spécialiser dans les agents concrets 260
261 - def measure(self):
262 """Prend concrètement une mesure. 263 264 Pour implémenter un agent, il faut implémenter au moins cette 265 méthode. 266 267 @return: Résultat de la mesure, un dictionnaire C{{champ: 268 valeur}} ou un objet C{L{twisted.internet.defer.Deferred}} 269 renvoyant ce dictionnaire. 270 """ 271 raise NotImplementedError
272
273 - def check_status(self):
274 """Renvoie le diagnostic de fonctionnement de l'agent. 275 276 L'implémentation par défaut dans C{L{Agent}} renvoie un statut 277 neutre. Les agents concrets doivent donc redéfinir cette 278 méthode pour annoncer un diagnostic utile. 279 """ 280 log.msg(_("Warning: agent class %s does not redefine method check_status()") 281 % self.__class__.__name__) 282 return status.Unknown()
283
284 - def update_status(self):
285 new_status = None 286 # si possible, on vérifie le dernier état des agents dont on dépend 287 if self.manager is not None: 288 for agent in self.requires: 289 try: 290 if self.manager.agents[agent].last_status == status.Error(): 291 new_status = status.Dependant() 292 except: 293 # agent non chargé ou état inconnu 294 pass 295 if new_status is None: 296 new_status = self.check_status() 297 # mise à jour de l'état 298 self.set_status(new_status)
299
300 - def set_status(self, s, reset = False):
301 """Mémorise le statut et met à jour C{statut_max} 302 303 @param s: statut actuel 304 @param reset: réinitialise C{max_status} à C{s} si C{reset==True} 305 """ 306 # Si on détecte un nouveau problème, on met à jour max_status et max_status_date 307 if s > self.last_status and s not in [status.OK(),status.Dependant()]: 308 self.max_status = s 309 self.max_status_date = self.last_measure_date 310 if self.last_status != s: 311 # l'état a changé, on execute éventuellement des actions 312 self.take_action(self.last_status, s) 313 self.last_status = s
314
315 - def reset_max_status(self):
316 """Réinitialise C{max_status} à la valeur courante du status 317 """ 318 self.set_status(self.check_status(), reset = True)
319 320 321 # Gestion des mesures
322 - def scheduled_measure(self):
323 """Déclenche une mesure programmée. 324 325 Prend une mesure et mémorise le résultat et l'heure. 326 """ 327 # Si un lock indique qu'un reconfigure est en cours, on ne prend pas de mesure 328 if not is_locked('actions'): 329 now = util.utcnow() 330 try: 331 m = self.measure() 332 except Exception, e: 333 print "erreur mesure (%s)" % self.name 334 traceback.print_exc() 335 self.handle_measure_exception(e) 336 else: 337 if isinstance(m, defer.Deferred): 338 m.addCallbacks( 339 lambda m: self.save_measure(Measure(now, m)), 340 lambda f: self.handle_measure_exception(f.trap(Exception))) 341 else: 342 self.save_measure(Measure(now, m))
343
344 - def save_measure(self, measure):
345 """Mémorise une mesure donnée. 346 347 Méthode à redéfinir dans les sous-classes concrètes de C{L{Agent}}. 348 (callback de succès pour C{L{scheduled_measure()}}) 349 """ 350 self.last_measure_date = measure.get_strdate() 351 self.data_needs_update = True 352 self.last_measure = measure 353 self.update_status() 354 self.status_rrd.update({'status': self.last_status.num_level()}, 355 util.utcnow())
356
357 - def check_action(self, action_name):
358 """retourne la liste des actions autorisées/interdites pour cet agent 359 """ 360 authorized = False 361 if self.manager: 362 for cfg_level in ["_eole", "_acad", ""]: 363 cfg_file = os.path.join(self.manager.config['action_dir'], 'actions%s.cfg' % cfg_level) 364 if os.path.isfile(cfg_file): 365 action_mngr = ConfigParser() 366 action_mngr.read(cfg_file) 367 try: 368 authorized = eval(action_mngr.get(self.name, action_name)) 369 except: 370 pass 371 return authorized
372
373 - def take_action(self, old_status, new_status):
374 """exécute des actions en cas de changement de status 375 """ 376 # si un fichier /var/run/actions existe, on n'exécute aurcune action 377 if not is_locked('actions'): 378 # recherche des actions prévues pour l'agent dans cet état 379 action_name = action_map.get(new_status.num_level(), None) 380 if action_name: 381 if self.check_action(action_name): 382 action_func = getattr(self, action_name, no_action) 383 # exécution des actions si besoin 384 msg = action_func(self, old_status, new_status) 385 # si un message est renvoyé, on le loggue sur zephir 386 if msg: 387 # action,etat,msg 388 log.msg(msg) 389 zephir_log('SURVEILLANCE',0,msg) 390 else: 391 log.msg('status not defined in action_map')
392
393 - def handle_measure_exception(self, exc):
394 """Callback d'erreur pour C{L{scheduled_measure()}} 395 """ 396 log.msg(_("/!\ Agent %s, exception during measure: %s") 397 % (self.name, str(exc))) 398 self.set_status(status.Error(str(exc)))
399 400
401 - def archive(self):
402 """Crée l'archive de l'agent sur disque 403 """ 404 self.ensure_data_uptodate() 405 AgentData.from_agent(self).archive(self.archive_dir)
406 407
408 - def ensure_data_uptodate(self):
409 self.ensure_datadirs() 410 if self.data_needs_update: 411 # would need locking here if multithreaded (archive must have 412 # up-to-date datas) 413 # for d in self.data: 414 # d.ensure_uptodate() 415 self.write_data() 416 self.update_status() 417 self.data_needs_update = False;
418
419 - def write_data(self):
420 """Écrit les données générées par l'agent sur disque 421 422 Méthode à redéfinir si nécessaire dans les sous-classes. 423 """ 424 additional_args = [] 425 this_morning = util.utcnow().replace(hour = 0, minute = 0, second = 0, microsecond = 0) 426 for i in range(7): 427 t = this_morning - i*timedelta(days = 1) 428 additional_args += "VRULE:%s#000000" % rrd.rrd_date_from_datetime(t) 429 self.status_rrd.graph_all()
430 431 # convenience method
432 - def ensure_datadirs(self):
433 """Méthode de convenance, cf C{L{zephir.monitor.agentmanager.util.ensure_dir}} 434 """ 435 assert self.archive_dir is not None 436 util.ensure_dirs(self.archive_dir)
437 438
439 -class TableAgent(Agent):
440 """Agent concret mémorisant ses mesures dans une table. 441 442 Les valeurs mesurées peuvent être non-numériques. 443 """ 444
445 - def __init__(self, name, 446 max_measures=100, 447 **params):
448 """ 449 @param max_measures: nombre maximal de mesures mémorisées 450 """ 451 Agent.__init__(self, name, **params) 452 assert max_measures >= 1 453 self.max_measures = max_measures 454 self.measures = [] 455 self.data = [MeasuresData(self.measures, self.fields)]
456 457
458 - def save_measure(self, measure):
459 """Maintient la table de mesures triée et en dessous de la taille 460 maximale (cf. C{Agent.save_measure}). 461 """ 462 Agent.save_measure(self, measure) 463 # drop old measures to keep list size under max_measures 464 # can't slice because the list must be modified in place 465 for x in range(max(0, len(self.measures) - self.max_measures +1)): 466 self.measures.pop(0) 467 self.measures.append(measure) 468 self.measures.sort()
469 470 471 472
473 -class MultiRRDAgent(Agent):
474 """Classe abstraite pour les agents utilisant RRDtool. 475 476 Les valeurs mesurées étant visualisées sous forme de graphes, 477 elles doivent être numériques. 478 479 Les agents de cette classe maintiennent plusieurs bases de données RRD et 480 génèrent des graphes au format PNG de leurs données. 481 """ 482
483 - def __init__(self, name, 484 datasources, archives, graphs, 485 **params):
486 """ 487 Les paramètres C{datasources}, C{archives} et C{graphs} sont 488 des listes de paramètres pour la configuration d'une L{base 489 RRD<agentmanager.rrd.Database>}. 490 """ 491 Agent.__init__(self, name, **params) 492 self.datasources = datasources 493 self.archives = archives 494 self.graphs = graphs 495 self.data = []
496 497
498 - def init_data(self, archive_dir):
499 """Crée et initialise la base RRD dans C{archive_dir}. 500 """ 501 Agent.init_data(self, archive_dir) 502 self.rrd = {} 503 arch_names = self.archives.keys() 504 arch_names.sort() 505 for name in arch_names: 506 rrdname = name + '.rrd' 507 self.rrd[name] = rrd.Database(os.path.join(self.archive_dir, rrdname), 508 step = self.period) 509 self.data.append(RRDFileData(rrdname)) 510 for ds in self.datasources[name]: self.rrd[name].new_datasource(**ds) 511 for rra in self.archives[name]: self.rrd[name].new_archive(**rra) 512 for g in self.graphs[name]: 513 self.data.append(ImgFileData(g['pngname'])) 514 self.data.append(HTMLData('<br>')) 515 g['pngname'] = os.path.join(self.archive_dir, g['pngname']) 516 self.rrd[name].new_graph(**g) 517 self.rrd[name].create()
518
519 - def save_measure(self, measure):
520 Agent.save_measure(self, measure) 521 # on ne prend que les mesures déclarées comme datasources 522 for name in self.archives.keys(): 523 rrd_values = {} 524 for datasource in self.datasources[name]: 525 if measure.value.has_key(datasource['name']): 526 rrd_values[datasource['name']] = measure.value[datasource['name']] 527 # update des bases rrd 528 self.rrd[name].update(rrd_values, measure.get_date())
529
530 - def write_data(self):
531 Agent.write_data(self) 532 for name in self.archives.keys(): 533 self.rrd[name].graph_all()
534 535
536 -class RRDAgent(Agent):
537 """Classe abstraite pour les agents utilisant RRDtool. 538 539 Les valeurs mesurées étant visualisées sous forme de graphes, 540 elles doivent être numériques. 541 542 Les agents de cette classe maintiennent une base de données RRD et 543 génèrent des graphes au format PNG de leurs données. 544 """ 545
546 - def __init__(self, name, 547 datasources, archives, graphs, 548 **params):
549 """ 550 Les paramètres C{datasources}, C{archives} et C{graphs} sont 551 des listes de paramètres pour la configuration d'une L{base 552 RRD<agentmanager.rrd.Database>}. 553 """ 554 Agent.__init__(self, name, **params) 555 self.datasources = datasources 556 self.archives = archives 557 self.graphs = graphs 558 self.data = []
559 560
561 - def init_data(self, archive_dir):
562 """Crée et initialise la base RRD dans C{archive_dir}. 563 """ 564 Agent.init_data(self, archive_dir) 565 rrdname = self.name + '.rrd' 566 self.rrd = rrd.Database(os.path.join(self.archive_dir, rrdname), 567 step = self.period) 568 self.data.append(RRDFileData(rrdname)) 569 for ds in self.datasources: self.rrd.new_datasource(**ds) 570 for rra in self.archives: self.rrd.new_archive(**rra) 571 for g in self.graphs: 572 self.data.append(ImgFileData(g['pngname'])) 573 self.data.append(HTMLData('<br>')) 574 g['pngname'] = os.path.join(self.archive_dir, g['pngname']) 575 self.rrd.new_graph(**g) 576 self.rrd.create()
577 578
579 - def save_measure(self, measure):
580 Agent.save_measure(self, measure) 581 # on ne prend que les mesures déclarées comme datasources 582 rrd_values = {} 583 for datasource in self.datasources: 584 rrd_values[datasource['name']] = measure.value[datasource['name']] 585 # update des bases rrd 586 self.rrd.update(rrd_values, measure.get_date())
587
588 - def write_data(self):
589 Agent.write_data(self) 590 self.rrd.graph_all()
591 592
593 -class LoadErrorAgent(Agent):
594 """Pseudo-agent représentant une erreur de chargement d'un fichier 595 de configuration. 596 """ 597 598 HTML = """<p class="error">%s</p>""" 599
600 - def __init__(self, name, 601 title=_("Error while loading agent"), 602 message="", 603 **params):
604 Agent.__init__(self, name, **params) 605 self.max_status = status.Error(title) 606 m = '<br />'.join(message.splitlines()) 607 self.html = HTMLData(LoadErrorAgent.HTML % m) 608 self.data = [self.html]
609
610 - def check_status(self):
611 return self.max_status()
612 613 614 615 # def test_main(): 616 # test_support.run_unittest(UserStringTest) 617 618 # if __name__ == "__main__": 619 # test_main() 620