2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
31 import rift
.mano
.utils
.juju_api
as juju
34 class ConfigAgentAccountNotFound(Exception):
37 class JujuClient(object):
38 def __init__(self
, log
, ip
, port
, user
, passwd
):
45 self
._api
= juju
.JujuApi(log
=log
,
47 user
=user
, secret
=passwd
)
50 def validate_account_creds(self
):
51 status
= RwcalYang
.CloudConnectionStatus()
53 env
= self
._api
._get
_env
()
54 except juju
.JujuEnvError
as e
:
55 msg
= "JujuClient: Invalid account credentials: %s", str(e
)
58 except ConnectionRefusedError
as e
:
59 msg
= "JujuClient: Wrong IP or Port: %s", str(e
)
62 except Exception as e
:
63 msg
= "JujuClient: Connection Failed: %s", str(e
)
67 status
.status
= "success"
68 status
.details
= "Connection was successful"
69 self
._log
.info("JujuClient: Connection Successful")
74 class ConfigAgentAccount(object):
75 def __init__(self
, log
, account_msg
):
77 self
._account
_msg
= account_msg
.deep_copy()
79 if account_msg
.account_type
== "juju":
80 self
._cfg
_agent
_client
_plugin
= JujuClient(
82 account_msg
.juju
.ip_address
,
83 account_msg
.juju
.port
,
84 account_msg
.juju
.user
,
85 account_msg
.juju
.secret
)
87 self
._cfg
_agent
_client
_plugin
= None
89 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
91 details
="Connection status lookup not started"
94 self
._validate
_task
= None
98 return self
._account
_msg
.name
101 def account_msg(self
):
102 return self
._account
_msg
105 def account_type(self
):
106 return self
._account
_msg
.account_type
109 def connection_status(self
):
112 def update_from_cfg(self
, cfg
):
113 self
._log
.debug("Updating parent ConfigAgentAccount to %s", cfg
)
114 raise NotImplementedError("Update config agent account not yet supported")
117 def validate_cfg_agent_account_credentials(self
, loop
):
118 self
._log
.debug("Validating Config Agent Account %s, credential status %s", self
._account
_msg
, self
._status
)
120 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
122 details
="Config Agent account connection validation in progress"
125 if self
._cfg
_agent
_client
_plugin
is None:
126 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
128 details
="Config Agent account does not support validation of account creds"
132 status
= yield from loop
.run_in_executor(
134 self
._cfg
_agent
_client
_plugin
.validate_account_creds
136 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus
.from_dict(status
.as_dict())
137 except Exception as e
:
138 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
140 details
="Error - " + str(e
)
143 self
._log
.info("Got config agent account validation response: %s", self
._status
)
145 def start_validate_credentials(self
, loop
):
146 if self
._validate
_task
is not None:
147 self
._validate
_task
.cancel()
148 self
._validate
_task
= None
150 self
._validate
_task
= asyncio
.ensure_future(
151 self
.validate_cfg_agent_account_credentials(loop
),
155 class CfgAgentDtsOperdataHandler(object):
156 def __init__(self
, dts
, log
, loop
):
161 self
.cfg_agent_accounts
= {}
163 def add_cfg_agent_account(self
, account_msg
):
164 account
= ConfigAgentAccount(self
._log
, account_msg
)
165 self
.cfg_agent_accounts
[account
.name
] = account
166 self
._log
.info("ConfigAgent Operdata Handler added. Starting account validation")
168 account
.start_validate_credentials(self
._loop
)
170 def delete_cfg_agent_account(self
, account_name
):
171 del self
.cfg_agent_accounts
[account_name
]
172 self
._log
.info("ConfigAgent Operdata Handler deleted.")
174 def get_saved_cfg_agent_accounts(self
, cfg_agent_account_name
):
175 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
176 saved_cfg_agent_accounts
= []
178 if cfg_agent_account_name
is None or cfg_agent_account_name
== "":
179 cfg_agent_accounts
= list(self
.cfg_agent_accounts
.values())
180 saved_cfg_agent_accounts
.extend(cfg_agent_accounts
)
181 elif cfg_agent_account_name
in self
.cfg_agent_accounts
:
182 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
183 saved_cfg_agent_accounts
.append(account
)
185 errstr
= "Config Agent account {} does not exist".format(cfg_agent_account_name
)
186 raise KeyError(errstr
)
188 return saved_cfg_agent_accounts
191 def _register_show_status(self
):
192 def get_xpath(cfg_agent_name
=None):
193 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
194 "[name='%s']" % cfg_agent_name
if cfg_agent_name
is not None else ''
198 def on_prepare(xact_info
, action
, ks_path
, msg
):
199 path_entry
= RwConfigAgentYang
.ConfigAgentAccount
.schema().keyspec_to_entry(ks_path
)
200 cfg_agent_account_name
= path_entry
.key00
.name
201 self
._log
.debug("Got show cfg_agent connection status request: %s", ks_path
.create_string())
204 saved_accounts
= self
.get_saved_cfg_agent_accounts(cfg_agent_account_name
)
205 for account
in saved_accounts
:
206 connection_status
= account
.connection_status
207 self
._log
.debug("Responding to config agent connection status request: %s", connection_status
)
208 xact_info
.respond_xpath(
209 rwdts
.XactRspCode
.MORE
,
210 xpath
=get_xpath(account
.name
),
211 msg
=account
.connection_status
,
213 except KeyError as e
:
214 self
._log
.warning(str(e
))
215 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
218 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
220 yield from self
._dts
.register(
222 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
223 on_prepare
=on_prepare
),
224 flags
=rwdts
.Flag
.PUBLISHER
,
227 def _register_validate_rpc(self
):
229 return "/rw-config-agent:update-cfg-agent-status"
232 def on_prepare(xact_info
, action
, ks_path
, msg
):
233 if not msg
.has_field("cfg_agent_account"):
234 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
236 cfg_agent_account_name
= msg
.cfg_agent_account
238 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
240 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name
)
242 account
.start_validate_credentials(self
._loop
)
244 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
246 yield from self
._dts
.register(
248 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
249 on_prepare
=on_prepare
251 flags
=rwdts
.Flag
.PUBLISHER
,
256 yield from self
._register
_show
_status
()
257 yield from self
._register
_validate
_rpc
()
259 class ConfigAgentJob(object):
260 """A wrapper over the config agent job object, providing some
261 convenience functions.
263 YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
270 # The normalizes the state terms from Juju to our yang models
272 STATUS_MAP
= {"completed": "success",
273 "pending" : "pending",
274 "running" : "pending",
275 "failed" : "failure"}
277 def __init__(self
, nsr_id
, job
, tasks
=None):
280 nsr_id (uuid): ID of NSR record
281 job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
282 tasks: List of asyncio.tasks. If provided the job monitor will
283 use it to monitor the tasks instead of the execution IDs
292 return self
._job
.job_id
297 return self
._job
.job_name
300 def job_status(self
):
301 """Status of the job (success|pending|failure)"""
302 return self
._job
.job_status
305 def job_status(self
, value
):
306 """Setter for job status"""
307 self
._job
.job_status
= value
316 """Xpath of the job"""
317 return ("D,/nsr:ns-instance-opdata" +
318 "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
319 "/nsr:config-agent-job[nsr:job-id='{}']"
320 ).format(self
.nsr_id
, self
.id)
323 def convert_rpc_input_to_job(nsr_id
, rpc_output
, tasks
):
324 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
325 to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
328 nsr_id (uuid): NSR ID
329 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
330 tasks (list): A list of asyncio.Tasks
335 # Shortcuts to prevent the HUUGE names.
336 CfgAgentJob
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
337 CfgAgentVnfr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
338 CfgAgentPrimitive
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
339 CfgAgentPrimitiveParam
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
341 job
= CfgAgentJob
.from_dict({
342 "job_id": rpc_output
.job_id
,
343 "job_name" : rpc_output
.name
,
344 "job_status": "pending",
345 "triggered_by": rpc_output
.triggered_by
,
346 "create_time": rpc_output
.create_time
,
347 "job_status_details": rpc_output
.job_status_details
if rpc_output
.job_status_details
is not None else None,
348 "parameter": [param
.as_dict() for param
in rpc_output
.parameter
],
349 "parameter_group": [pg
.as_dict() for pg
in rpc_output
.parameter_group
]
352 for vnfr
in rpc_output
.vnf_out_list
:
353 vnfr_job
= CfgAgentVnfr
.from_dict({
354 "id": vnfr
.vnfr_id_ref
,
355 "vnf_job_status": "pending",
358 for primitive
in vnfr
.vnf_out_primitive
:
359 vnf_primitive
= CfgAgentPrimitive
.from_dict({
360 "name": primitive
.name
,
361 "execution_status": ConfigAgentJob
.STATUS_MAP
[primitive
.execution_status
],
362 "execution_id": primitive
.execution_id
365 # Copy over the input param
366 for param
in primitive
.parameter
:
367 vnf_primitive
.parameter
.append(
368 CfgAgentPrimitiveParam
.from_dict({
373 vnfr_job
.primitive
.append(vnf_primitive
)
375 job
.vnfr
.append(vnfr_job
)
377 return ConfigAgentJob(nsr_id
, job
, tasks
)
380 class ConfigAgentJobMonitor(object):
381 """Job monitor: Polls the Juju controller and get the status.
383 If all Primitive are success, then vnf & nsr status will be "success"
384 If any one Primitive reaches a failed state then both vnf and nsr will fail.
388 def __init__(self
, dts
, log
, job
, executor
, loop
, config_plugin
):
393 job (ConfigAgentJob): ConfigAgentJob instance
394 executor (concurrent.futures): Executor for juju status api calls
395 loop (eventloop): Current event loop instance
396 config_plugin : Config plugin to be used.
401 self
.executor
= executor
402 self
.polling_period
= ConfigAgentJobMonitor
.POLLING_PERIOD
403 self
.config_plugin
= config_plugin
407 def _monitor_processes(self
, registration_handle
):
409 for process
in self
.job
.tasks
:
410 rc
= yield from process
411 self
.log
.debug("Process {} returned rc: {}".format(process
, rc
))
415 self
.job
.job_status
= "success"
417 self
.job
.job_status
= "failure"
419 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
421 def get_error_details(self
):
422 '''Get the error details from failed primitives'''
424 for vnfr
in self
.job
.job
.vnfr
:
425 if vnfr
.vnf_job_status
!= "failure":
428 for primitive
in vnfr
.primitive
:
429 if primitive
.execution_status
== "failure":
431 errs
+= primitive
.execution_error_details
437 def publish_action_status(self
):
439 Starts publishing the status for jobs/primitives
441 registration_handle
= yield from self
.dts
.register(
442 xpath
=self
.job
.xpath
,
443 handler
=rift
.tasklets
.DTS
.RegistrationHandler(),
444 flags
=(rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
),
447 self
.log
.debug('preparing to publish job status for {}'.format(self
.job
.xpath
))
450 registration_handle
.create_element(self
.job
.xpath
, self
.job
.job
)
452 # If the config is done via a user defined script
453 if self
.job
.tasks
is not None:
454 yield from self
._monitor
_processes
(registration_handle
)
458 # Run until pending moves to either failure/success
459 while self
.job
.job_status
== "pending":
462 if curr
- prev
< self
.polling_period
:
463 pause
= self
.polling_period
- (curr
- prev
)
464 yield from asyncio
.sleep(pause
, loop
=self
.loop
)
469 for vnfr
in self
.job
.job
.vnfr
:
470 task
= self
.loop
.create_task(self
.get_vnfr_status(vnfr
))
473 # Exit, if no tasks are found
477 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
479 job_status
= [task
.result() for task
in tasks
]
481 if "failure" in job_status
:
482 self
.job
.job_status
= "failure"
483 errs
= self
.get_error_details()
485 self
.job
.job
.job_status_details
= errs
486 elif "pending" in job_status
:
487 self
.job
.job_status
= "pending"
489 self
.job
.job_status
= "success"
491 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
492 # self.job.job_status,
496 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
499 except Exception as e
:
500 self
.log
.exception(e
)
505 def get_vnfr_status(self
, vnfr
):
506 """Schedules tasks for all containing primitives and updates it's own
510 vnfr : Vnfr job record containing primitives.
513 (str): "success|failure|pending"
518 for primitive
in vnfr
.primitive
:
519 if primitive
.execution_id
== "":
520 # TODO: For some config data, the id will be empty, check if
522 job_status
.append(primitive
.execution_status
)
525 task
= self
.loop
.create_task(self
.get_primitive_status(primitive
))
529 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
531 job_status
.extend([task
.result() for task
in tasks
])
532 if "failure" in job_status
:
533 vnfr
.vnf_job_status
= "failure"
536 elif "pending" in job_status
:
537 vnfr
.vnf_job_status
= "pending"
541 vnfr
.vnf_job_status
= "success"
545 def get_primitive_status(self
, primitive
):
547 Queries the juju api and gets the status of the execution id.
550 primitive : Primitive containing the execution ID.
554 resp
= yield from self
.loop
.run_in_executor(
556 self
.config_plugin
.get_action_status
,
557 primitive
.execution_id
560 status
= resp
['status']
561 if status
== 'failed':
562 self
.log
.warning("Execution of action {} failed: {}".
563 format(primitive
.execution_id
, resp
))
564 primitive
.execution_error_details
= resp
['message']
566 except Exception as e
:
567 self
.log
.exception(e
)
570 # Handle case status is None
572 primitive
.execution_status
= ConfigAgentJob
.STATUS_MAP
[status
]
574 primitive
.execution_status
= "failure"
576 return primitive
.execution_status
579 class CfgAgentJobDtsHandler(object):
580 """Dts Handler for CfgAgent"""
581 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
583 def __init__(self
, dts
, log
, loop
, nsm
, cfgm
):
590 cfgm : ConfigManager.
602 """ Return registration handle """
607 """ Return the NSManager manager instance """
612 """ Return the ConfigManager manager instance """
616 def cfg_job_xpath(nsr_id
, job_id
):
617 return ("D,/nsr:ns-instance-opdata" +
618 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
619 "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id
, job_id
)
623 """ Register for NS monitoring read from dts """
626 def on_prepare(xact_info
, action
, ks_path
, msg
):
627 """ prepare callback from dts """
628 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
629 if action
== rwdts
.QueryAction
.READ
:
630 schema
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.schema()
631 path_entry
= schema
.keyspec_to_entry(ks_path
)
633 nsr_id
= path_entry
.key00
.ns_instance_config_ref
635 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
637 if nsr_id
is None or nsr_id
== "":
638 nsrs
= list(self
.nsm
.nsrs
.values())
639 nsr_ids
= [nsr
.id for nsr
in nsrs
if nsr
is not None]
643 for nsr_id
in nsr_ids
:
644 job
= self
.cfgm
.get_job(nsr_id
)
646 # If no jobs are queued for the NSR
650 xact_info
.respond_xpath(
651 rwdts
.XactRspCode
.MORE
,
652 CfgAgentJobDtsHandler
.cfg_job_xpath(nsr_id
, job
.job_id
),
655 except Exception as e
:
656 self
._log
.exception("Caught exception:%s", str(e
))
657 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
660 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
662 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
663 with self
._dts
.group_create() as group
:
664 self
._regh
= group
.register(xpath
=CfgAgentJobDtsHandler
.XPATH
,
666 flags
=rwdts
.Flag
.PUBLISHER
,
670 class ConfigAgentJobManager(object):
671 """A central class that manager all the Config Agent related data,
672 Including updating the status
674 TODO: Needs to support multiple config agents.
676 def __init__(self
, dts
, log
, loop
, nsm
):
682 nsm : NsmTasklet instance
689 self
.handler
= CfgAgentJobDtsHandler(dts
, log
, loop
, nsm
, self
)
690 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
692 def add_job(self
, rpc_output
, tasks
=None):
693 """Once an RPC is trigger add a now job
696 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
697 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
698 tasks(list) A list of asyncio.Tasks
701 nsr_id
= rpc_output
.nsr_id_ref
703 self
.jobs
[nsr_id
] = ConfigAgentJob
.convert_rpc_input_to_job(nsr_id
, rpc_output
, tasks
)
705 self
.log
.debug("Creating a job monitor for Job id: {}".format(
708 # For every Job we will schedule a new monitoring process.
709 job_monitor
= ConfigAgentJobMonitor(
715 self
.nsm
.config_agent_plugins
[0] # Hack
717 task
= self
.loop
.create_task(job_monitor
.publish_action_status())
719 def get_job(self
, nsr_id
):
720 """Get the job associated with the NSR Id, if present."""
722 return self
.jobs
[nsr_id
].job
728 yield from self
.handler
.register()