Fix the status returned by validate_account_creds
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import time
20 import gi
21
22 gi.require_version('RwNsrYang', '1.0')
23
24 from gi.repository import (
25 NsrYang,
26 RwTypes,
27 RwcalYang,
28 RwNsrYang,
29 RwConfigAgentYang,
30 RwDts as rwdts)
31 gi.require_version('RwKeyspec', '1.0')
32 from gi.repository.RwKeyspec import quoted_key
33
34 import rift.tasklets
35 import rift.mano.utils.juju_api as juju
36
37
38 class ConfigAgentAccountNotFound(Exception):
39 pass
40
41
42 class JujuClient(object):
43 def __init__(self, log, ip, port, user, passwd):
44 self._log = log
45 self._ip = ip
46 self._port = port
47 self._user = user
48 self._passwd = passwd
49
50 self._api = juju.JujuApi(log=log,
51 server=ip, port=port,
52 user=user, secret=passwd)
53
54 def validate_account_creds(self):
55 """Validate the account credentials.
56
57 Verifies if the account credentials can connect and login to a Juju
58 controller at the provided IP address.
59 """
60 status = "unknown"
61 details = "Connection status not known."
62
63 loop = asyncio.new_event_loop()
64 asyncio.set_event_loop(loop)
65 try:
66 loop.run_until_complete(asyncio.gather(
67 self._api.logout(),
68 self._api.login(),
69 loop=loop,
70 ))
71 except Exception as e:
72 msg = "JujuClient: Connection Failed: %s", str(e)
73 self._log.error(msg)
74 raise Exception(msg)
75 else:
76 self._log.error("Success reached.")
77 status = "success"
78 details = "Connection was successful"
79 self._log.info("JujuClient: Connection Successful")
80 finally:
81 loop.close()
82
83 return RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
84 status=status,
85 details=details,
86 )
87
88
89 class ConfigAgentAccount(object):
90 def __init__(self, log, account_msg):
91 self._log = log
92 self._account_msg = account_msg.deep_copy()
93
94 if account_msg.account_type == "juju":
95 self._cfg_agent_client_plugin = JujuClient(
96 log,
97 account_msg.juju.ip_address,
98 account_msg.juju.port,
99 account_msg.juju.user,
100 account_msg.juju.secret)
101 else:
102 self._cfg_agent_client_plugin = None
103
104 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
105 status="unknown",
106 details="Connection status lookup not started"
107 )
108
109 self._validate_task = None
110
111 @property
112 def name(self):
113 return self._account_msg.name
114
115 @property
116 def account_msg(self):
117 return self._account_msg
118
119 @property
120 def account_type(self):
121 return self._account_msg.account_type
122
123 @property
124 def connection_status(self):
125 return self._status
126
127 def update_from_cfg(self, cfg):
128 self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
129 raise NotImplementedError("Update config agent account not yet supported")
130
131 @asyncio.coroutine
132 def validate_cfg_agent_account_credentials(self, loop):
133 self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
134
135 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
136 status="validating",
137 details="Config Agent account connection validation in progress"
138 )
139
140 if self._cfg_agent_client_plugin is None:
141 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
142 status="unknown",
143 details="Config Agent account does not support validation of account creds"
144 )
145 else:
146 try:
147 status = yield from loop.run_in_executor(
148 None,
149 self._cfg_agent_client_plugin.validate_account_creds,
150 )
151 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict())
152 except Exception as e:
153 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
154 status="failure",
155 details="Error - " + str(e)
156 )
157
158 self._log.info("Got config agent account validation response: %s", self._status)
159
160 def start_validate_credentials(self, loop):
161 if self._validate_task is not None:
162 self._validate_task.cancel()
163 self._validate_task = None
164
165 self._validate_task = asyncio.ensure_future(
166 self.validate_cfg_agent_account_credentials(loop),
167 loop=loop
168 )
169
170 class CfgAgentDtsOperdataHandler(object):
171 def __init__(self, dts, log, loop, project):
172 self._dts = dts
173 self._log = log
174 self._loop = loop
175 self._project = project
176
177 self.cfg_agent_accounts = {}
178 self._show_reg = None
179 self._rpc_reg = None
180
181 def add_cfg_agent_account(self, account_msg):
182 account = ConfigAgentAccount(self._log, account_msg)
183 self.cfg_agent_accounts[account.name] = account
184 self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
185
186 account.start_validate_credentials(self._loop)
187
188 def delete_cfg_agent_account(self, account_name):
189 del self.cfg_agent_accounts[account_name]
190 self._log.info("ConfigAgent Operdata Handler deleted.")
191
192 def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
193 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
194 saved_cfg_agent_accounts = []
195
196 if cfg_agent_account_name is None or cfg_agent_account_name == "":
197 cfg_agent_accounts = list(self.cfg_agent_accounts.values())
198 saved_cfg_agent_accounts.extend(cfg_agent_accounts)
199 elif cfg_agent_account_name in self.cfg_agent_accounts:
200 account = self.cfg_agent_accounts[cfg_agent_account_name]
201 saved_cfg_agent_accounts.append(account)
202 else:
203 errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
204 raise KeyError(errstr)
205
206 return saved_cfg_agent_accounts
207
208
209 def _register_show_status(self):
210 def get_xpath(cfg_agent_name=None):
211 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
212 "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else ''
213 )
214
215 @asyncio.coroutine
216 def on_prepare(xact_info, action, ks_path, msg):
217 path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path)
218 cfg_agent_account_name = path_entry.key00.name
219 self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
220
221 try:
222 saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
223 for account in saved_accounts:
224 connection_status = account.connection_status
225 self._log.debug("Responding to config agent connection status request: %s", connection_status)
226 xpath = self._project.add_project(get_xpath(account.name))
227 xact_info.respond_xpath(
228 rwdts.XactRspCode.MORE,
229 xpath=xpath,
230 msg=account.connection_status,
231 )
232 except KeyError as e:
233 self._log.warning(str(e))
234 xact_info.respond_xpath(rwdts.XactRspCode.NA)
235 return
236
237 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
238
239 xpath = self._project.add_project(get_xpath())
240 self._show_reg = yield from self._dts.register(
241 xpath=xpath,
242 handler=rift.tasklets.DTS.RegistrationHandler(
243 on_prepare=on_prepare),
244 flags=rwdts.Flag.PUBLISHER,
245 )
246
247 def _register_validate_rpc(self):
248 def get_xpath():
249 return "/rw-config-agent:update-cfg-agent-status"
250
251 @asyncio.coroutine
252 def on_prepare(xact_info, action, ks_path, msg):
253 if not msg.has_field("cfg_agent_account"):
254 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
255
256 cfg_agent_account_name = msg.cfg_agent_account
257
258 if not self._project.rpc_check(msg, xact_info=xact_info):
259 return
260
261 try:
262 account = self.cfg_agent_accounts[cfg_agent_account_name]
263 except KeyError:
264 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
265
266 account.start_validate_credentials(self._loop)
267
268 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
269
270 self._rpc_reg = yield from self._dts.register(
271 xpath=get_xpath(),
272 handler=rift.tasklets.DTS.RegistrationHandler(
273 on_prepare=on_prepare
274 ),
275 flags=rwdts.Flag.PUBLISHER,
276 )
277
278 @asyncio.coroutine
279 def register(self):
280 yield from self._register_show_status()
281 yield from self._register_validate_rpc()
282
283 def deregister(self):
284 self._show_reg.deregister()
285 self._rpc_reg.deregister()
286
287
288 class ConfigAgentJob(object):
289 """A wrapper over the config agent job object, providing some
290 convenience functions.
291
292 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
293 ||
294 ==> VNFRS
295 ||
296 ==> Primitives
297
298 """
299 # The normalizes the state terms from Juju to our yang models
300 # Juju : Yang model
301 STATUS_MAP = {"completed": "success",
302 "pending" : "pending",
303 "running" : "pending",
304 "failed" : "failure"}
305
306 def __init__(self, nsr_id, job, project, tasks=None):
307 """
308 Args:
309 nsr_id (uuid): ID of NSR record
310 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
311 tasks: List of asyncio.tasks. If provided the job monitor will
312 use it to monitor the tasks instead of the execution IDs
313 """
314 self._job = job
315 self.nsr_id = nsr_id
316 self.tasks = tasks
317 self._project = project
318
319 self._regh = None
320
321 @property
322 def id(self):
323 """Job id"""
324 return self._job.job_id
325
326 @property
327 def name(self):
328 """Job name"""
329 return self._job.job_name
330
331 @property
332 def job_status(self):
333 """Status of the job (success|pending|failure)"""
334 return self._job.job_status
335
336 @job_status.setter
337 def job_status(self, value):
338 """Setter for job status"""
339 self._job.job_status = value
340
341 @property
342 def job(self):
343 """Gi object"""
344 return self._job
345
346 @property
347 def xpath(self):
348 """Xpath of the job"""
349 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
350 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
351 "/nsr:config-agent-job[nsr:job-id={}]"
352 ).format(quoted_key(self.nsr_id), quoted_key(str(self.id))))
353
354 @property
355 def regh(self):
356 """Registration handle for the job"""
357 return self._regh
358
359 @regh.setter
360 def regh(self, hdl):
361 """Setter for registration handle"""
362 self._regh = hdl
363
364 @staticmethod
365 def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project):
366 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
367 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
368
369 Args:
370 nsr_id (uuid): NSR ID
371 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
372 tasks (list): A list of asyncio.Tasks
373
374 Returns:
375 ConfigAgentJob
376 """
377 # Shortcuts to prevent the HUUGE names.
378 CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
379 CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
380 CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
381 CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
382
383 job = CfgAgentJob.from_dict({
384 "job_id": rpc_output.job_id,
385 "job_name" : rpc_output.name,
386 "job_status": "pending",
387 "triggered_by": rpc_output.triggered_by,
388 "create_time": rpc_output.create_time,
389 "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
390 "parameter": [param.as_dict() for param in rpc_output.parameter],
391 "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
392 })
393
394 for vnfr in rpc_output.vnf_out_list:
395 vnfr_job = CfgAgentVnfr.from_dict({
396 "id": vnfr.vnfr_id_ref,
397 "vnf_job_status": "pending",
398 })
399
400 for primitive in vnfr.vnf_out_primitive:
401 vnf_primitive = CfgAgentPrimitive.from_dict({
402 "name": primitive.name,
403 "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
404 "execution_id": primitive.execution_id,
405 "execution_error_details": primitive.execution_error_details,
406 })
407
408 # Copy over the input param
409 for param in primitive.parameter:
410 vnf_primitive.parameter.append(
411 CfgAgentPrimitiveParam.from_dict({
412 "name": param.name,
413 "value": param.value
414 }))
415
416 vnfr_job.primitive.append(vnf_primitive)
417
418 job.vnfr.append(vnfr_job)
419
420 return ConfigAgentJob(nsr_id, job, project, tasks)
421
422
423 class ConfigAgentJobMonitor(object):
424 """Job monitor: Polls the Juju controller and get the status.
425 Rules:
426 If all Primitive are success, then vnf & nsr status will be "success"
427 If any one Primitive reaches a failed state then both vnf and nsr will fail.
428 """
429 POLLING_PERIOD = 2
430
431 def __init__(self, dts, log, job, executor, loop, config_plugin):
432 """
433 Args:
434 dts : DTS handle
435 log : log handle
436 job (ConfigAgentJob): ConfigAgentJob instance
437 executor (concurrent.futures): Executor for juju status api calls
438 loop (eventloop): Current event loop instance
439 config_plugin : Config plugin to be used.
440 """
441 self.job = job
442 self.log = log
443 self.loop = loop
444 self.executor = executor
445 self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
446 self.config_plugin = config_plugin
447 self.dts = dts
448
449 @asyncio.coroutine
450 def _monitor_processes(self, registration_handle):
451 result = 0
452 errs = ""
453 for process in self.job.tasks:
454 if isinstance(process, asyncio.subprocess.Process):
455 rc = yield from process.wait()
456 err = yield from process.stderr.read()
457
458 else:
459 # Task instance
460 rc = yield from process
461 err = ''
462
463 self.log.debug("Process {} returned rc: {}, err: {}".
464 format(process, rc, err))
465
466 if len(err):
467 if rc == 0:
468 errs += "<success>{}</success>".format(err)
469 else:
470 errs += "<error>{}</error>".format(err)
471 result |= rc
472
473 if result == 0:
474 self.job.job_status = "success"
475 else:
476 self.job.job_status = "failure"
477
478 if len(errs):
479 self.job.job.job_status_details = errs
480
481 registration_handle.update_element(self.job.xpath, self.job.job)
482
483 def get_execution_details(self):
484 '''Get the error details from failed primitives'''
485 errs = ''
486 for vnfr in self.job.job.vnfr:
487 for primitive in vnfr.primitive:
488 if primitive.execution_status == "failure":
489 errs += '<error>'
490 if primitive.execution_error_details:
491 errs += primitive.execution_error_details
492 else:
493 errs += '{}: Unknown error'.format(primitive.name)
494 errs += "</error>"
495 else:
496 if primitive.execution_error_details:
497 errs += '<{status}>{details}</{status}>'.format(
498 status=primitive.execution_status,
499 details=primitive.execution_error_details)
500 return errs
501
502 @asyncio.coroutine
503 def publish_action_status(self):
504 """
505 Starts publishing the status for jobs/primitives
506 """
507 registration_handle = yield from self.dts.register(
508 xpath=self.job.xpath,
509 handler=rift.tasklets.DTS.RegistrationHandler(),
510 flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
511 )
512
513 self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
514 self.job.regh = registration_handle
515
516 try:
517 registration_handle.create_element(self.job.xpath, self.job.job)
518
519 # If the config is done via a user defined script
520 if self.job.tasks is not None:
521 yield from self._monitor_processes(registration_handle)
522 return
523
524 prev = time.time()
525 # Run until pending moves to either failure/success
526 while self.job.job_status == "pending":
527 curr = time.time()
528
529 if curr - prev < self.polling_period:
530 pause = self.polling_period - (curr - prev)
531 yield from asyncio.sleep(pause, loop=self.loop)
532
533 prev = time.time()
534
535 tasks = []
536 for vnfr in self.job.job.vnfr:
537 task = self.loop.create_task(self.get_vnfr_status(vnfr))
538 tasks.append(task)
539
540 # Exit, if no tasks are found
541 if not tasks:
542 break
543
544 yield from asyncio.wait(tasks, loop=self.loop)
545
546 job_status = [task.result() for task in tasks]
547
548 if "failure" in job_status:
549 self.job.job_status = "failure"
550 elif "pending" in job_status:
551 self.job.job_status = "pending"
552 else:
553 self.job.job_status = "success"
554
555 errs = self.get_execution_details()
556 if len(errs):
557 self.job.job.job_status_details = errs
558
559 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
560 # self.job.job_status,
561 # self.job.xpath,
562 # self.job.nsr_id))
563
564 registration_handle.update_element(self.job.xpath, self.job.job)
565
566 registration_handle.update_element(self.job.xpath, self.job.job)
567
568 except Exception as e:
569 self.log.exception(e)
570 raise
571
572
573 @asyncio.coroutine
574 def get_vnfr_status(self, vnfr):
575 """Schedules tasks for all containing primitives and updates it's own
576 status.
577
578 Args:
579 vnfr : Vnfr job record containing primitives.
580
581 Returns:
582 (str): "success|failure|pending"
583 """
584 tasks = []
585 job_status = []
586
587 for primitive in vnfr.primitive:
588 if primitive.execution_status != 'pending':
589 if primitive.execution_id == "":
590 # We may not have processed the status for these yet
591 job_status.append(primitive.execution_status)
592 continue
593
594 if primitive.execution_id == "":
595 # Actions which failed to queue can have empty id
596 job_status.append(primitive.execution_status)
597 continue
598
599 if primitive.execution_id == "config":
600 # Config job. Check if service is active
601 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
602
603 else:
604 task = self.loop.create_task(self.get_primitive_status(primitive))
605
606 tasks.append(task)
607
608 if tasks:
609 yield from asyncio.wait(tasks, loop=self.loop)
610
611 job_status.extend([task.result() for task in tasks])
612 if "failure" in job_status:
613 vnfr.vnf_job_status = "failure"
614 return "failure"
615
616 elif "pending" in job_status:
617 vnfr.vnf_job_status = "pending"
618 return "pending"
619
620 else:
621 vnfr.vnf_job_status = "success"
622 return "success"
623
624 @asyncio.coroutine
625 def get_service_status(self, vnfr_id, primitive):
626 try:
627 status = yield from self.loop.run_in_executor(
628 self.executor,
629 self.config_plugin.get_service_status,
630 vnfr_id
631 )
632
633 self.log.debug("Service status: {}".format(status))
634 if status in ['error', 'blocked']:
635 self.log.warning("Execution of config {} failed: {}".
636 format(primitive.execution_id, status))
637 primitive.execution_error_details = 'Config failed'
638 status = 'failure'
639 elif status in ['active']:
640 status = 'success'
641 elif status is None:
642 status = 'failure'
643 else:
644 status = 'pending'
645
646 except Exception as e:
647 self.log.exception(e)
648 status = "failed"
649
650 primitive.execution_status = status
651 return primitive.execution_status
652
653 @asyncio.coroutine
654 def get_primitive_status(self, primitive):
655 """
656 Queries the juju api and gets the status of the execution id.
657
658 Args:
659 primitive : Primitive containing the execution ID.
660 """
661
662 try:
663 resp = yield from self.loop.run_in_executor(
664 self.executor,
665 self.config_plugin.get_action_status,
666 primitive.execution_id
667 )
668
669 self.log.debug("Action status: {}".format(resp))
670 status = resp['status']
671 if status == 'failed':
672 self.log.warning("Execution of action {} failed: {}".
673 format(primitive.execution_id, resp))
674 primitive.execution_error_details = resp['message']
675
676 except Exception as e:
677 self.log.exception(e)
678 status = "failed"
679
680 # Handle case status is None
681 if status:
682 primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
683 else:
684 primitive.execution_status = "failure"
685
686 return primitive.execution_status
687
688
689 class CfgAgentJobDtsHandler(object):
690 """Dts Handler for CfgAgent"""
691 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
692
693 def __init__(self, dts, log, loop, nsm, cfgm):
694 """
695 Args:
696 dts : Dts Handle.
697 log : Log handle.
698 loop : Event loop.
699 nsm : NsmManager.
700 cfgm : ConfigManager.
701 """
702 self._dts = dts
703 self._log = log
704 self._loop = loop
705 self._cfgm = cfgm
706 self._nsm = nsm
707
708 self._regh = None
709 self._project = cfgm.project
710
711 @property
712 def regh(self):
713 """ Return registration handle """
714 return self._regh
715
716 @property
717 def nsm(self):
718 """ Return the NSManager manager instance """
719 return self._nsm
720
721 @property
722 def cfgm(self):
723 """ Return the ConfigManager manager instance """
724 return self._cfgm
725
726 def cfg_job_xpath(self, nsr_id, job_id):
727 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
728 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
729 "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id))))
730
731 @asyncio.coroutine
732 def register(self):
733 """ Register for NS monitoring read from dts """
734
735 @asyncio.coroutine
736 def on_prepare(xact_info, action, ks_path, msg):
737 """ prepare callback from dts """
738 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
739 if action == rwdts.QueryAction.READ:
740 schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
741 path_entry = schema.keyspec_to_entry(ks_path)
742 try:
743 nsr_id = path_entry.key00.ns_instance_config_ref
744
745 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
746 nsr_ids = []
747 if nsr_id is None or nsr_id == "":
748 nsrs = list(self.nsm.nsrs.values())
749 nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
750 else:
751 nsr_ids = [nsr_id]
752
753 for nsr_id in nsr_ids:
754 jobs = self.cfgm.get_job(nsr_id)
755
756 for job in jobs:
757 xact_info.respond_xpath(
758 rwdts.XactRspCode.MORE,
759 self.cfg_job_xpath(nsr_id, job.id),
760 job.job)
761
762 except Exception as e:
763 self._log.exception("Caught exception:%s", str(e))
764 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
765
766 else:
767 xact_info.respond_xpath(rwdts.XactRspCode.NA)
768
769 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
770 with self._dts.group_create() as group:
771 self._regh = group.register(xpath=self._project.add_project(
772 CfgAgentJobDtsHandler.XPATH),
773 handler=hdl,
774 flags=rwdts.Flag.PUBLISHER,
775 )
776
777 def _terminate_nsr(self, nsr_id):
778 self._log.debug("NSR {} being terminated".format(nsr_id))
779 jobs = self.cfgm.get_job(nsr_id)
780 for job in jobs:
781 path = self.cfg_job_xpath(nsr_id, job.id)
782 with self._dts.transaction() as xact:
783 self._log.debug("Deleting job: {}".format(path))
784 job.regh.delete_element(path)
785 self._log.debug("Deleted job: {}".format(path))
786
787 # Remove the NSR id in manager
788 self.cfgm.del_nsr(nsr_id)
789
790 @property
791 def nsr_xpath(self):
792 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
793
794 def deregister(self):
795 self._log.debug("De-register config agent job for project".
796 format(self._project.name))
797 if self._regh:
798 self._regh.deregister()
799 self._regh = None
800
801
802 class ConfigAgentJobManager(object):
803 """A central class that manager all the Config Agent related data,
804 Including updating the status
805
806 TODO: Needs to support multiple config agents.
807 """
808 def __init__(self, dts, log, loop, project, nsm):
809 """
810 Args:
811 dts : Dts handle
812 log : Log handler
813 loop : Event loop
814 nsm : NsmTasklet instance
815 """
816 self.jobs = {}
817 self.dts = dts
818 self.log = log
819 self.loop = loop
820 self.nsm = nsm
821 self.project = project
822 self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
823 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
824
825 def add_job(self, rpc_output, tasks=None):
826 """Once an RPC is triggered, add a new job
827
828 Args:
829 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
830 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
831 tasks(list) A list of asyncio.Tasks
832
833 """
834 nsr_id = rpc_output.nsr_id_ref
835
836 job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
837 tasks, self.project)
838
839 self.log.debug("Creating a job monitor for Job id: {}".format(
840 rpc_output.job_id))
841
842 if nsr_id not in self.jobs:
843 self.jobs[nsr_id] = [job]
844 else:
845 self.jobs[nsr_id].append(job)
846
847 # If the tasks are none, assume juju actions
848 # TBD: This logic need to be revisited
849 ca = self.nsm.config_agent_plugins[0]
850 if tasks is None:
851 for agent in self.nsm.config_agent_plugins:
852 if agent.agent_type == 'juju':
853 ca = agent
854 break
855
856 def done_callback(fut):
857 e = fut.exception()
858 if e:
859 self.log.error("Exception on monitor job {}: {}".
860 format(rpc_output.job_id, e))
861 fut.print_stack()
862 self.log.debug("Monitor job done for {}".format(rpc_output.job_id))
863
864 # For every Job we will schedule a new monitoring process.
865 job_monitor = ConfigAgentJobMonitor(
866 self.dts,
867 self.log,
868 job,
869 self.executor,
870 self.loop,
871 ca
872 )
873 task = self.loop.create_task(job_monitor.publish_action_status())
874 task.add_done_callback(done_callback)
875
876 def get_job(self, nsr_id):
877 """Get the job associated with the NSR Id, if present."""
878 try:
879 return self.jobs[nsr_id]
880 except KeyError:
881 return []
882
883 def del_nsr(self, nsr_id):
884 """Delete a NSR id from the jobs list"""
885 if nsr_id in self.jobs:
886 self.jobs.pop(nsr_id)
887
888 @asyncio.coroutine
889 def register(self):
890 yield from self.handler.register()
891 # yield from self.handler.register_for_nsr()
892
893 def deregister(self):
894 self.handler.deregister()
895 self.handler = None