X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Foperdata.py;h=49f5b03b5f0e0e6e7b567110507cf9f04401fb45;hb=8dad2958cab56d275f7f24025d9f397be7da4e77;hp=551aea2da6ef6b99c625d6a2654432a95c40b765;hpb=d03647dd8a262a6d944ae6f287f483bb120b7429;p=osm%2FSO.git diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py index 551aea2d..49f5b03b 100644 --- a/common/python/rift/mano/config_agent/operdata.py +++ b/common/python/rift/mano/config_agent/operdata.py @@ -1,4 +1,4 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,6 +17,9 @@ import asyncio import concurrent.futures import time +import gi + +gi.require_version('RwNsrYang', '1.0') from gi.repository import ( NsrYang, @@ -25,15 +28,17 @@ from gi.repository import ( RwNsrYang, RwConfigAgentYang, RwDts as rwdts) +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key import rift.tasklets - import rift.mano.utils.juju_api as juju class ConfigAgentAccountNotFound(Exception): pass + class JujuClient(object): def __init__(self, log, ip, port, user, passwd): self._log = log @@ -46,29 +51,39 @@ class JujuClient(object): server=ip, port=port, user=user, secret=passwd) - def validate_account_creds(self): - status = RwcalYang.CloudConnectionStatus() + """Validate the account credentials. + + Verifies if the account credentials can connect and login to a Juju + controller at the provided IP address. + """ + status = "unknown" + details = "Connection status not known." + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) try: - env = self._api._get_env() - except juju.JujuEnvError as e: - msg = "JujuClient: Invalid account credentials: %s", str(e) - self._log.error(msg) - raise Exception(msg) - except ConnectionRefusedError as e: - msg = "JujuClient: Wrong IP or Port: %s", str(e) - self._log.error(msg) - raise Exception(msg) + loop.run_until_complete(asyncio.gather( + self._api.logout(), + self._api.login(), + loop=loop, + )) except Exception as e: msg = "JujuClient: Connection Failed: %s", str(e) self._log.error(msg) raise Exception(msg) else: - status.status = "success" - status.details = "Connection was successful" + self._log.error("Success reached.") + status = "success" + details = "Connection was successful" self._log.info("JujuClient: Connection Successful") + finally: + loop.close() - return status + return RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus( + status=status, + details=details, + ) class ConfigAgentAccount(object): @@ -86,7 +101,7 @@ class ConfigAgentAccount(object): else: self._cfg_agent_client_plugin = None - self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus( + self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus( status="unknown", details="Connection status lookup not started" ) @@ -117,13 +132,13 @@ class ConfigAgentAccount(object): def validate_cfg_agent_account_credentials(self, loop): self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status) - self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus( + self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus( status="validating", details="Config Agent account connection validation in progress" ) if self._cfg_agent_client_plugin is None: - self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus( + self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus( status="unknown", details="Config Agent account does not support validation of account creds" ) @@ -131,11 +146,11 @@ class ConfigAgentAccount(object): try: status = yield from loop.run_in_executor( None, - self._cfg_agent_client_plugin.validate_account_creds + self._cfg_agent_client_plugin.validate_account_creds, ) - self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict()) + self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict()) except Exception as e: - self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus( + self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus( status="failure", details="Error - " + str(e) ) @@ -153,12 +168,15 @@ class ConfigAgentAccount(object): ) class CfgAgentDtsOperdataHandler(object): - def __init__(self, dts, log, loop): + def __init__(self, dts, log, loop, project): self._dts = dts self._log = log self._loop = loop + self._project = project self.cfg_agent_accounts = {} + self._show_reg = None + self._rpc_reg = None def add_cfg_agent_account(self, account_msg): account = ConfigAgentAccount(self._log, account_msg) @@ -191,12 +209,12 @@ class CfgAgentDtsOperdataHandler(object): def _register_show_status(self): def get_xpath(cfg_agent_name=None): return "D,/rw-config-agent:config-agent/account{}/connection-status".format( - "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else '' + "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else '' ) @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): - path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path) + path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path) cfg_agent_account_name = path_entry.key00.name self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string()) @@ -205,9 +223,10 @@ class CfgAgentDtsOperdataHandler(object): for account in saved_accounts: connection_status = account.connection_status self._log.debug("Responding to config agent connection status request: %s", connection_status) + xpath = self._project.add_project(get_xpath(account.name)) xact_info.respond_xpath( rwdts.XactRspCode.MORE, - xpath=get_xpath(account.name), + xpath=xpath, msg=account.connection_status, ) except KeyError as e: @@ -217,12 +236,13 @@ class CfgAgentDtsOperdataHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) - yield from self._dts.register( - xpath=get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare), - flags=rwdts.Flag.PUBLISHER, - ) + xpath = self._project.add_project(get_xpath()) + self._show_reg = yield from self._dts.register( + xpath=xpath, + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare), + flags=rwdts.Flag.PUBLISHER, + ) def _register_validate_rpc(self): def get_xpath(): @@ -234,6 +254,10 @@ class CfgAgentDtsOperdataHandler(object): raise ConfigAgentAccountNotFound("Config Agent account name not provided") cfg_agent_account_name = msg.cfg_agent_account + + if not self._project.rpc_check(msg, xact_info=xact_info): + return + try: account = self.cfg_agent_accounts[cfg_agent_account_name] except KeyError: @@ -243,24 +267,29 @@ class CfgAgentDtsOperdataHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) - yield from self._dts.register( - xpath=get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._rpc_reg = yield from self._dts.register( + xpath=get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare + ), + flags=rwdts.Flag.PUBLISHER, + ) @asyncio.coroutine def register(self): yield from self._register_show_status() yield from self._register_validate_rpc() + def deregister(self): + self._show_reg.deregister() + self._rpc_reg.deregister() + + class ConfigAgentJob(object): """A wrapper over the config agent job object, providing some convenience functions. - YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains + YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains || ==> VNFRS || @@ -274,17 +303,20 @@ class ConfigAgentJob(object): "running" : "pending", "failed" : "failure"} - def __init__(self, nsr_id, job, tasks=None): + def __init__(self, nsr_id, job, project, tasks=None): """ Args: nsr_id (uuid): ID of NSR record - job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object + job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object tasks: List of asyncio.tasks. If provided the job monitor will use it to monitor the tasks instead of the execution IDs """ self._job = job self.nsr_id = nsr_id self.tasks = tasks + self._project = project + + self._regh = None @property def id(self): @@ -314,15 +346,25 @@ class ConfigAgentJob(object): @property def xpath(self): """Xpath of the job""" - return ("D,/nsr:ns-instance-opdata" + - "/nsr:nsr[nsr:ns-instance-config-ref='{}']" + - "/nsr:config-agent-job[nsr:job-id='{}']" - ).format(self.nsr_id, self.id) + return self._project.add_project(("D,/nsr:ns-instance-opdata" + + "/nsr:nsr[nsr:ns-instance-config-ref={}]" + + "/nsr:config-agent-job[nsr:job-id={}]" + ).format(quoted_key(self.nsr_id), quoted_key(str(self.id)))) + + @property + def regh(self): + """Registration handle for the job""" + return self._regh + + @regh.setter + def regh(self, hdl): + """Setter for registration handle""" + self._regh = hdl @staticmethod - def convert_rpc_input_to_job(nsr_id, rpc_output, tasks): + def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project): """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive - to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang) + to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang) Args: nsr_id (uuid): NSR ID @@ -333,10 +375,10 @@ class ConfigAgentJob(object): ConfigAgentJob """ # Shortcuts to prevent the HUUGE names. - CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob - CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr - CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive - CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter + CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob + CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr + CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive + CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter job = CfgAgentJob.from_dict({ "job_id": rpc_output.job_id, @@ -359,7 +401,8 @@ class ConfigAgentJob(object): vnf_primitive = CfgAgentPrimitive.from_dict({ "name": primitive.name, "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status], - "execution_id": primitive.execution_id + "execution_id": primitive.execution_id, + "execution_error_details": primitive.execution_error_details, }) # Copy over the input param @@ -374,7 +417,7 @@ class ConfigAgentJob(object): job.vnfr.append(vnfr_job) - return ConfigAgentJob(nsr_id, job, tasks) + return ConfigAgentJob(nsr_id, job, project, tasks) class ConfigAgentJobMonitor(object): @@ -406,9 +449,25 @@ class ConfigAgentJobMonitor(object): @asyncio.coroutine def _monitor_processes(self, registration_handle): result = 0 + errs = "" for process in self.job.tasks: - rc = yield from process - self.log.debug("Process {} returned rc: {}".format(process, rc)) + if isinstance(process, asyncio.subprocess.Process): + rc = yield from process.wait() + err = yield from process.stderr.read() + + else: + # Task instance + rc = yield from process + err = '' + + self.log.debug("Process {} returned rc: {}, err: {}". + format(process, rc, err)) + + if len(err): + if rc == 0: + errs += "{}".format(err) + else: + errs += "{}".format(err) result |= rc if result == 0: @@ -416,15 +475,15 @@ class ConfigAgentJobMonitor(object): else: self.job.job_status = "failure" + if len(errs): + self.job.job.job_status_details = errs + registration_handle.update_element(self.job.xpath, self.job.job) - def get_error_details(self): + def get_execution_details(self): '''Get the error details from failed primitives''' errs = '' for vnfr in self.job.job.vnfr: - if vnfr.vnf_job_status != "failure": - continue - for primitive in vnfr.primitive: if primitive.execution_status == "failure": errs += '' @@ -433,7 +492,11 @@ class ConfigAgentJobMonitor(object): else: errs += '{}: Unknown error'.format(primitive.name) errs += "" - + else: + if primitive.execution_error_details: + errs += '<{status}>{details}'.format( + status=primitive.execution_status, + details=primitive.execution_error_details) return errs @asyncio.coroutine @@ -448,6 +511,7 @@ class ConfigAgentJobMonitor(object): ) self.log.debug('preparing to publish job status for {}'.format(self.job.xpath)) + self.job.regh = registration_handle try: registration_handle.create_element(self.job.xpath, self.job.job) @@ -483,14 +547,15 @@ class ConfigAgentJobMonitor(object): if "failure" in job_status: self.job.job_status = "failure" - errs = self.get_error_details() - if len(errs): - self.job.job.job_status_details = errs elif "pending" in job_status: self.job.job_status = "pending" else: self.job.job_status = "success" + errs = self.get_execution_details() + if len(errs): + self.job.job.job_status_details = errs + # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format( # self.job.job_status, # self.job.xpath, @@ -498,6 +563,7 @@ class ConfigAgentJobMonitor(object): registration_handle.update_element(self.job.xpath, self.job.job) + registration_handle.update_element(self.job.xpath, self.job.job) except Exception as e: self.log.exception(e) @@ -520,6 +586,9 @@ class ConfigAgentJobMonitor(object): for primitive in vnfr.primitive: if primitive.execution_status != 'pending': + if primitive.execution_id == "": + # We may not have processed the status for these yet + job_status.append(primitive.execution_status) continue if primitive.execution_id == "": @@ -527,7 +596,7 @@ class ConfigAgentJobMonitor(object): job_status.append(primitive.execution_status) continue - elif primitive.execution_id == "config": + if primitive.execution_id == "config": # Config job. Check if service is active task = self.loop.create_task(self.get_service_status(vnfr.id, primitive)) @@ -637,6 +706,7 @@ class CfgAgentJobDtsHandler(object): self._nsm = nsm self._regh = None + self._project = cfgm.project @property def regh(self): @@ -653,11 +723,10 @@ class CfgAgentJobDtsHandler(object): """ Return the ConfigManager manager instance """ return self._cfgm - @staticmethod - def cfg_job_xpath(nsr_id, job_id): - return ("D,/nsr:ns-instance-opdata" + - "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" + - "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id) + def cfg_job_xpath(self, nsr_id, job_id): + return self._project.add_project(("D,/nsr:ns-instance-opdata" + + "/nsr:nsr[nsr:ns-instance-config-ref={}]" + + "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id)))) @asyncio.coroutine def register(self): @@ -668,7 +737,7 @@ class CfgAgentJobDtsHandler(object): """ prepare callback from dts """ xpath = ks_path.to_xpath(RwNsrYang.get_schema()) if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema() + schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema() path_entry = schema.keyspec_to_entry(ks_path) try: nsr_id = path_entry.key00.ns_instance_config_ref @@ -682,16 +751,13 @@ class CfgAgentJobDtsHandler(object): nsr_ids = [nsr_id] for nsr_id in nsr_ids: - job = self.cfgm.get_job(nsr_id) + jobs = self.cfgm.get_job(nsr_id) - # If no jobs are queued for the NSR - if job is None: - continue - - xact_info.respond_xpath( - rwdts.XactRspCode.MORE, - CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id), - job) + for job in jobs: + xact_info.respond_xpath( + rwdts.XactRspCode.MORE, + self.cfg_job_xpath(nsr_id, job.id), + job.job) except Exception as e: self._log.exception("Caught exception:%s", str(e)) @@ -702,11 +768,36 @@ class CfgAgentJobDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH, + self._regh = group.register(xpath=self._project.add_project( + CfgAgentJobDtsHandler.XPATH), handler=hdl, flags=rwdts.Flag.PUBLISHER, ) + def _terminate_nsr(self, nsr_id): + self._log.debug("NSR {} being terminated".format(nsr_id)) + jobs = self.cfgm.get_job(nsr_id) + for job in jobs: + path = self.cfg_job_xpath(nsr_id, job.id) + with self._dts.transaction() as xact: + self._log.debug("Deleting job: {}".format(path)) + job.regh.delete_element(path) + self._log.debug("Deleted job: {}".format(path)) + + # Remove the NSR id in manager + self.cfgm.del_nsr(nsr_id) + + @property + def nsr_xpath(self): + return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr") + + def deregister(self): + self._log.debug("De-register config agent job for project". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class ConfigAgentJobManager(object): """A central class that manager all the Config Agent related data, @@ -714,7 +805,7 @@ class ConfigAgentJobManager(object): TODO: Needs to support multiple config agents. """ - def __init__(self, dts, log, loop, nsm): + def __init__(self, dts, log, loop, project, nsm): """ Args: dts : Dts handle @@ -727,11 +818,12 @@ class ConfigAgentJobManager(object): self.log = log self.loop = loop self.nsm = nsm + self.project = project self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self) self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def add_job(self, rpc_output, tasks=None): - """Once an RPC is trigger add a now job + """Once an RPC is triggered, add a new job Args: rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output @@ -741,11 +833,17 @@ class ConfigAgentJobManager(object): """ nsr_id = rpc_output.nsr_id_ref - self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks) + job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, + tasks, self.project) self.log.debug("Creating a job monitor for Job id: {}".format( rpc_output.job_id)) + if nsr_id not in self.jobs: + self.jobs[nsr_id] = [job] + else: + self.jobs[nsr_id].append(job) + # If the tasks are none, assume juju actions # TBD: This logic need to be revisited ca = self.nsm.config_agent_plugins[0] @@ -755,24 +853,43 @@ class ConfigAgentJobManager(object): ca = agent break + def done_callback(fut): + e = fut.exception() + if e: + self.log.error("Exception on monitor job {}: {}". + format(rpc_output.job_id, e)) + fut.print_stack() + self.log.debug("Monitor job done for {}".format(rpc_output.job_id)) + # For every Job we will schedule a new monitoring process. job_monitor = ConfigAgentJobMonitor( self.dts, self.log, - self.jobs[nsr_id], + job, self.executor, self.loop, ca ) task = self.loop.create_task(job_monitor.publish_action_status()) + task.add_done_callback(done_callback) def get_job(self, nsr_id): """Get the job associated with the NSR Id, if present.""" try: - return self.jobs[nsr_id].job + return self.jobs[nsr_id] except KeyError: - return None + return [] + + def del_nsr(self, nsr_id): + """Delete a NSR id from the jobs list""" + if nsr_id in self.jobs: + self.jobs.pop(nsr_id) @asyncio.coroutine def register(self): yield from self.handler.register() + # yield from self.handler.register_for_nsr() + + def deregister(self): + self.handler.deregister() + self.handler = None