Fix juju/tasklet integration
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import time
20 import gi
21
22 gi.require_version('RwNsrYang', '1.0')
23
24 from gi.repository import (
25 NsrYang,
26 RwTypes,
27 RwcalYang,
28 RwNsrYang,
29 RwConfigAgentYang,
30 RwDts as rwdts)
31 gi.require_version('RwKeyspec', '1.0')
32 from gi.repository.RwKeyspec import quoted_key
33
34 import rift.tasklets
35 import rift.mano.utils.juju_api as juju
36
37
38 class ConfigAgentAccountNotFound(Exception):
39 pass
40
41
42 class JujuClient(object):
43 def __init__(self, log, ip, port, user, passwd):
44 self._log = log
45 self._ip = ip
46 self._port = port
47 self._user = user
48 self._passwd = passwd
49
50 self._api = juju.JujuApi(log=log,
51 server=ip, port=port,
52 user=user, secret=passwd)
53
54 def validate_account_creds(self):
55 """Validate the account credentials.
56
57 Verifies if the account credentials can connect and login to a Juju
58 controller at the provided IP address.
59 """
60 status = "unknown"
61 details = "Connection status not known."
62
63 loop = asyncio.new_event_loop()
64 asyncio.set_event_loop(loop)
65 try:
66 loop.run_until_complete(asyncio.gather(
67 self._api.logout(),
68 self._api.login(),
69 loop=loop,
70 ))
71 except Exception as e:
72 loop.close()
73
74 msg = "JujuClient: Connection Failed: %s", str(e)
75 self._log.error(msg)
76 status = "failure"
77 details = msg
78 raise Exception(msg)
79 else:
80 self._log.error("Success reached.")
81 status = "success"
82 details = "Connection was successful"
83 self._log.info("JujuClient: Connection Successful")
84 finally:
85 loop.close()
86
87 return RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
88 status=status,
89 details=details,
90 )
91
92
93 class ConfigAgentAccount(object):
94 def __init__(self, log, account_msg):
95 self._log = log
96 self._account_msg = account_msg.deep_copy()
97
98 if account_msg.account_type == "juju":
99 self._cfg_agent_client_plugin = JujuClient(
100 log,
101 account_msg.juju.ip_address,
102 account_msg.juju.port,
103 account_msg.juju.user,
104 account_msg.juju.secret)
105 else:
106 self._cfg_agent_client_plugin = None
107
108 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
109 status="unknown",
110 details="Connection status lookup not started"
111 )
112
113 self._validate_task = None
114
115 @property
116 def name(self):
117 return self._account_msg.name
118
119 @property
120 def account_msg(self):
121 return self._account_msg
122
123 @property
124 def account_type(self):
125 return self._account_msg.account_type
126
127 @property
128 def connection_status(self):
129 return self._status
130
131 def update_from_cfg(self, cfg):
132 self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
133 raise NotImplementedError("Update config agent account not yet supported")
134
135 @asyncio.coroutine
136 def validate_cfg_agent_account_credentials(self, loop):
137 self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
138
139 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
140 status="validating",
141 details="Config Agent account connection validation in progress"
142 )
143
144 if self._cfg_agent_client_plugin is None:
145 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
146 status="unknown",
147 details="Config Agent account does not support validation of account creds"
148 )
149 else:
150 try:
151 status = yield from loop.run_in_executor(
152 None,
153 self._cfg_agent_client_plugin.validate_account_creds,
154 )
155 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict())
156 except Exception as e:
157 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
158 status="failure",
159 details="Error - " + str(e)
160 )
161
162 self._log.info("Got config agent account validation response: %s", self._status)
163
164 def start_validate_credentials(self, loop):
165 if self._validate_task is not None:
166 self._validate_task.cancel()
167 self._validate_task = None
168
169 self._validate_task = asyncio.ensure_future(
170 self.validate_cfg_agent_account_credentials(loop),
171 loop=loop
172 )
173
174 class CfgAgentDtsOperdataHandler(object):
175 def __init__(self, dts, log, loop, project):
176 self._dts = dts
177 self._log = log
178 self._loop = loop
179 self._project = project
180
181 self.cfg_agent_accounts = {}
182 self._show_reg = None
183 self._rpc_reg = None
184
185 def add_cfg_agent_account(self, account_msg):
186 account = ConfigAgentAccount(self._log, account_msg)
187 self.cfg_agent_accounts[account.name] = account
188 self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
189
190 account.start_validate_credentials(self._loop)
191
192 def delete_cfg_agent_account(self, account_name):
193 del self.cfg_agent_accounts[account_name]
194 self._log.info("ConfigAgent Operdata Handler deleted.")
195
196 def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
197 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
198 saved_cfg_agent_accounts = []
199
200 if cfg_agent_account_name is None or cfg_agent_account_name == "":
201 cfg_agent_accounts = list(self.cfg_agent_accounts.values())
202 saved_cfg_agent_accounts.extend(cfg_agent_accounts)
203 elif cfg_agent_account_name in self.cfg_agent_accounts:
204 account = self.cfg_agent_accounts[cfg_agent_account_name]
205 saved_cfg_agent_accounts.append(account)
206 else:
207 errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
208 raise KeyError(errstr)
209
210 return saved_cfg_agent_accounts
211
212
213 def _register_show_status(self):
214 def get_xpath(cfg_agent_name=None):
215 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
216 "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else ''
217 )
218
219 @asyncio.coroutine
220 def on_prepare(xact_info, action, ks_path, msg):
221 path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path)
222 cfg_agent_account_name = path_entry.key00.name
223 self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
224
225 try:
226 saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
227 for account in saved_accounts:
228 connection_status = account.connection_status
229 self._log.debug("Responding to config agent connection status request: %s", connection_status)
230 xpath = self._project.add_project(get_xpath(account.name))
231 xact_info.respond_xpath(
232 rwdts.XactRspCode.MORE,
233 xpath=xpath,
234 msg=account.connection_status,
235 )
236 except KeyError as e:
237 self._log.warning(str(e))
238 xact_info.respond_xpath(rwdts.XactRspCode.NA)
239 return
240
241 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
242
243 xpath = self._project.add_project(get_xpath())
244 self._show_reg = yield from self._dts.register(
245 xpath=xpath,
246 handler=rift.tasklets.DTS.RegistrationHandler(
247 on_prepare=on_prepare),
248 flags=rwdts.Flag.PUBLISHER,
249 )
250
251 def _register_validate_rpc(self):
252 def get_xpath():
253 return "/rw-config-agent:update-cfg-agent-status"
254
255 @asyncio.coroutine
256 def on_prepare(xact_info, action, ks_path, msg):
257 if not msg.has_field("cfg_agent_account"):
258 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
259
260 cfg_agent_account_name = msg.cfg_agent_account
261
262 if not self._project.rpc_check(msg, xact_info=xact_info):
263 return
264
265 try:
266 account = self.cfg_agent_accounts[cfg_agent_account_name]
267 except KeyError:
268 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
269
270 account.start_validate_credentials(self._loop)
271
272 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
273
274 self._rpc_reg = yield from self._dts.register(
275 xpath=get_xpath(),
276 handler=rift.tasklets.DTS.RegistrationHandler(
277 on_prepare=on_prepare
278 ),
279 flags=rwdts.Flag.PUBLISHER,
280 )
281
282 @asyncio.coroutine
283 def register(self):
284 yield from self._register_show_status()
285 yield from self._register_validate_rpc()
286
287 def deregister(self):
288 self._show_reg.deregister()
289 self._rpc_reg.deregister()
290
291
292 class ConfigAgentJob(object):
293 """A wrapper over the config agent job object, providing some
294 convenience functions.
295
296 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
297 ||
298 ==> VNFRS
299 ||
300 ==> Primitives
301
302 """
303 # The normalizes the state terms from Juju to our yang models
304 # Juju : Yang model
305 STATUS_MAP = {"completed": "success",
306 "pending" : "pending",
307 "running" : "pending",
308 "failed" : "failure"}
309
310 def __init__(self, nsr_id, job, project, tasks=None):
311 """
312 Args:
313 nsr_id (uuid): ID of NSR record
314 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
315 tasks: List of asyncio.tasks. If provided the job monitor will
316 use it to monitor the tasks instead of the execution IDs
317 """
318 self._job = job
319 self.nsr_id = nsr_id
320 self.tasks = tasks
321 self._project = project
322
323 self._regh = None
324
325 @property
326 def id(self):
327 """Job id"""
328 return self._job.job_id
329
330 @property
331 def name(self):
332 """Job name"""
333 return self._job.job_name
334
335 @property
336 def job_status(self):
337 """Status of the job (success|pending|failure)"""
338 return self._job.job_status
339
340 @job_status.setter
341 def job_status(self, value):
342 """Setter for job status"""
343 self._job.job_status = value
344
345 @property
346 def job(self):
347 """Gi object"""
348 return self._job
349
350 @property
351 def xpath(self):
352 """Xpath of the job"""
353 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
354 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
355 "/nsr:config-agent-job[nsr:job-id={}]"
356 ).format(quoted_key(self.nsr_id), quoted_key(str(self.id))))
357
358 @property
359 def regh(self):
360 """Registration handle for the job"""
361 return self._regh
362
363 @regh.setter
364 def regh(self, hdl):
365 """Setter for registration handle"""
366 self._regh = hdl
367
368 @staticmethod
369 def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project):
370 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
371 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
372
373 Args:
374 nsr_id (uuid): NSR ID
375 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
376 tasks (list): A list of asyncio.Tasks
377
378 Returns:
379 ConfigAgentJob
380 """
381 # Shortcuts to prevent the HUUGE names.
382 CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
383 CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
384 CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
385 CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
386
387 job = CfgAgentJob.from_dict({
388 "job_id": rpc_output.job_id,
389 "job_name" : rpc_output.name,
390 "job_status": "pending",
391 "triggered_by": rpc_output.triggered_by,
392 "create_time": rpc_output.create_time,
393 "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
394 "parameter": [param.as_dict() for param in rpc_output.parameter],
395 "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
396 })
397
398 for vnfr in rpc_output.vnf_out_list:
399 vnfr_job = CfgAgentVnfr.from_dict({
400 "id": vnfr.vnfr_id_ref,
401 "vnf_job_status": "pending",
402 })
403
404 for primitive in vnfr.vnf_out_primitive:
405 vnf_primitive = CfgAgentPrimitive.from_dict({
406 "name": primitive.name,
407 "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
408 "execution_id": primitive.execution_id,
409 "execution_error_details": primitive.execution_error_details,
410 })
411
412 # Copy over the input param
413 for param in primitive.parameter:
414 vnf_primitive.parameter.append(
415 CfgAgentPrimitiveParam.from_dict({
416 "name": param.name,
417 "value": param.value
418 }))
419
420 vnfr_job.primitive.append(vnf_primitive)
421
422 job.vnfr.append(vnfr_job)
423
424 return ConfigAgentJob(nsr_id, job, project, tasks)
425
426
427 class ConfigAgentJobMonitor(object):
428 """Job monitor: Polls the Juju controller and get the status.
429 Rules:
430 If all Primitive are success, then vnf & nsr status will be "success"
431 If any one Primitive reaches a failed state then both vnf and nsr will fail.
432 """
433 POLLING_PERIOD = 2
434
435 def __init__(self, dts, log, job, executor, loop, config_plugin):
436 """
437 Args:
438 dts : DTS handle
439 log : log handle
440 job (ConfigAgentJob): ConfigAgentJob instance
441 executor (concurrent.futures): Executor for juju status api calls
442 loop (eventloop): Current event loop instance
443 config_plugin : Config plugin to be used.
444 """
445 self.job = job
446 self.log = log
447 self.loop = loop
448 self.executor = executor
449 self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
450 self.config_plugin = config_plugin
451 self.dts = dts
452
453 @asyncio.coroutine
454 def _monitor_processes(self, registration_handle):
455 result = 0
456 errs = ""
457 for process in self.job.tasks:
458 if isinstance(process, asyncio.subprocess.Process):
459 rc = yield from process.wait()
460 err = yield from process.stderr.read()
461
462 else:
463 # Task instance
464 rc = yield from process
465 err = ''
466
467 self.log.debug("Process {} returned rc: {}, err: {}".
468 format(process, rc, err))
469
470 if len(err):
471 if rc == 0:
472 errs += "<success>{}</success>".format(err)
473 else:
474 errs += "<error>{}</error>".format(err)
475 result |= rc
476
477 if result == 0:
478 self.job.job_status = "success"
479 else:
480 self.job.job_status = "failure"
481
482 if len(errs):
483 self.job.job.job_status_details = errs
484
485 registration_handle.update_element(self.job.xpath, self.job.job)
486
487 def get_execution_details(self):
488 '''Get the error details from failed primitives'''
489 errs = ''
490 for vnfr in self.job.job.vnfr:
491 for primitive in vnfr.primitive:
492 if primitive.execution_status == "failure":
493 errs += '<error>'
494 if primitive.execution_error_details:
495 errs += primitive.execution_error_details
496 else:
497 errs += '{}: Unknown error'.format(primitive.name)
498 errs += "</error>"
499 else:
500 if primitive.execution_error_details:
501 errs += '<{status}>{details}</{status}>'.format(
502 status=primitive.execution_status,
503 details=primitive.execution_error_details)
504 return errs
505
506 @asyncio.coroutine
507 def publish_action_status(self):
508 """
509 Starts publishing the status for jobs/primitives
510 """
511 registration_handle = yield from self.dts.register(
512 xpath=self.job.xpath,
513 handler=rift.tasklets.DTS.RegistrationHandler(),
514 flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
515 )
516
517 self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
518 self.job.regh = registration_handle
519
520 try:
521 registration_handle.create_element(self.job.xpath, self.job.job)
522
523 # If the config is done via a user defined script
524 if self.job.tasks is not None:
525 yield from self._monitor_processes(registration_handle)
526 return
527
528 prev = time.time()
529 # Run until pending moves to either failure/success
530 while self.job.job_status == "pending":
531 curr = time.time()
532
533 if curr - prev < self.polling_period:
534 pause = self.polling_period - (curr - prev)
535 yield from asyncio.sleep(pause, loop=self.loop)
536
537 prev = time.time()
538
539 tasks = []
540 for vnfr in self.job.job.vnfr:
541 task = self.loop.create_task(self.get_vnfr_status(vnfr))
542 tasks.append(task)
543
544 # Exit, if no tasks are found
545 if not tasks:
546 break
547
548 yield from asyncio.wait(tasks, loop=self.loop)
549
550 job_status = [task.result() for task in tasks]
551
552 if "failure" in job_status:
553 self.job.job_status = "failure"
554 elif "pending" in job_status:
555 self.job.job_status = "pending"
556 else:
557 self.job.job_status = "success"
558
559 errs = self.get_execution_details()
560 if len(errs):
561 self.job.job.job_status_details = errs
562
563 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
564 # self.job.job_status,
565 # self.job.xpath,
566 # self.job.nsr_id))
567
568 registration_handle.update_element(self.job.xpath, self.job.job)
569
570 registration_handle.update_element(self.job.xpath, self.job.job)
571
572 except Exception as e:
573 self.log.exception(e)
574 raise
575
576
577 @asyncio.coroutine
578 def get_vnfr_status(self, vnfr):
579 """Schedules tasks for all containing primitives and updates it's own
580 status.
581
582 Args:
583 vnfr : Vnfr job record containing primitives.
584
585 Returns:
586 (str): "success|failure|pending"
587 """
588 tasks = []
589 job_status = []
590
591 for primitive in vnfr.primitive:
592 if primitive.execution_status != 'pending':
593 if primitive.execution_id == "":
594 # We may not have processed the status for these yet
595 job_status.append(primitive.execution_status)
596 continue
597
598 if primitive.execution_id == "":
599 # Actions which failed to queue can have empty id
600 job_status.append(primitive.execution_status)
601 continue
602
603 if primitive.execution_id == "config":
604 # Config job. Check if service is active
605 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
606
607 else:
608 task = self.loop.create_task(self.get_primitive_status(primitive))
609
610 tasks.append(task)
611
612 if tasks:
613 yield from asyncio.wait(tasks, loop=self.loop)
614
615 job_status.extend([task.result() for task in tasks])
616 if "failure" in job_status:
617 vnfr.vnf_job_status = "failure"
618 return "failure"
619
620 elif "pending" in job_status:
621 vnfr.vnf_job_status = "pending"
622 return "pending"
623
624 else:
625 vnfr.vnf_job_status = "success"
626 return "success"
627
628 @asyncio.coroutine
629 def get_service_status(self, vnfr_id, primitive):
630 try:
631 status = yield from self.loop.run_in_executor(
632 self.executor,
633 self.config_plugin.get_service_status,
634 vnfr_id
635 )
636
637 self.log.debug("Service status: {}".format(status))
638 if status in ['error', 'blocked']:
639 self.log.warning("Execution of config {} failed: {}".
640 format(primitive.execution_id, status))
641 primitive.execution_error_details = 'Config failed'
642 status = 'failure'
643 elif status in ['active']:
644 status = 'success'
645 elif status is None:
646 status = 'failure'
647 else:
648 status = 'pending'
649
650 except Exception as e:
651 self.log.exception(e)
652 status = "failed"
653
654 primitive.execution_status = status
655 return primitive.execution_status
656
657 @asyncio.coroutine
658 def get_primitive_status(self, primitive):
659 """
660 Queries the juju api and gets the status of the execution id.
661
662 Args:
663 primitive : Primitive containing the execution ID.
664 """
665
666 try:
667 resp = yield from self.loop.run_in_executor(
668 self.executor,
669 self.config_plugin.get_action_status,
670 primitive.execution_id
671 )
672
673 self.log.debug("Action status: {}".format(resp))
674 status = resp['status']
675 if status == 'failed':
676 self.log.warning("Execution of action {} failed: {}".
677 format(primitive.execution_id, resp))
678 primitive.execution_error_details = resp['message']
679
680 except Exception as e:
681 self.log.exception(e)
682 status = "failed"
683
684 # Handle case status is None
685 if status:
686 primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
687 else:
688 primitive.execution_status = "failure"
689
690 return primitive.execution_status
691
692
693 class CfgAgentJobDtsHandler(object):
694 """Dts Handler for CfgAgent"""
695 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
696
697 def __init__(self, dts, log, loop, nsm, cfgm):
698 """
699 Args:
700 dts : Dts Handle.
701 log : Log handle.
702 loop : Event loop.
703 nsm : NsmManager.
704 cfgm : ConfigManager.
705 """
706 self._dts = dts
707 self._log = log
708 self._loop = loop
709 self._cfgm = cfgm
710 self._nsm = nsm
711
712 self._regh = None
713 self._project = cfgm.project
714
715 @property
716 def regh(self):
717 """ Return registration handle """
718 return self._regh
719
720 @property
721 def nsm(self):
722 """ Return the NSManager manager instance """
723 return self._nsm
724
725 @property
726 def cfgm(self):
727 """ Return the ConfigManager manager instance """
728 return self._cfgm
729
730 def cfg_job_xpath(self, nsr_id, job_id):
731 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
732 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
733 "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id))))
734
735 @asyncio.coroutine
736 def register(self):
737 """ Register for NS monitoring read from dts """
738
739 @asyncio.coroutine
740 def on_prepare(xact_info, action, ks_path, msg):
741 """ prepare callback from dts """
742 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
743 if action == rwdts.QueryAction.READ:
744 schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
745 path_entry = schema.keyspec_to_entry(ks_path)
746 try:
747 nsr_id = path_entry.key00.ns_instance_config_ref
748
749 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
750 nsr_ids = []
751 if nsr_id is None or nsr_id == "":
752 nsrs = list(self.nsm.nsrs.values())
753 nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
754 else:
755 nsr_ids = [nsr_id]
756
757 for nsr_id in nsr_ids:
758 jobs = self.cfgm.get_job(nsr_id)
759
760 for job in jobs:
761 xact_info.respond_xpath(
762 rwdts.XactRspCode.MORE,
763 self.cfg_job_xpath(nsr_id, job.id),
764 job.job)
765
766 except Exception as e:
767 self._log.exception("Caught exception:%s", str(e))
768 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
769
770 else:
771 xact_info.respond_xpath(rwdts.XactRspCode.NA)
772
773 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
774 with self._dts.group_create() as group:
775 self._regh = group.register(xpath=self._project.add_project(
776 CfgAgentJobDtsHandler.XPATH),
777 handler=hdl,
778 flags=rwdts.Flag.PUBLISHER,
779 )
780
781 def _terminate_nsr(self, nsr_id):
782 self._log.debug("NSR {} being terminated".format(nsr_id))
783 jobs = self.cfgm.get_job(nsr_id)
784 for job in jobs:
785 path = self.cfg_job_xpath(nsr_id, job.id)
786 with self._dts.transaction() as xact:
787 self._log.debug("Deleting job: {}".format(path))
788 job.regh.delete_element(path)
789 self._log.debug("Deleted job: {}".format(path))
790
791 # Remove the NSR id in manager
792 self.cfgm.del_nsr(nsr_id)
793
794 @property
795 def nsr_xpath(self):
796 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
797
798 def deregister(self):
799 self._log.debug("De-register config agent job for project".
800 format(self._project.name))
801 if self._regh:
802 self._regh.deregister()
803 self._regh = None
804
805
806 class ConfigAgentJobManager(object):
807 """A central class that manager all the Config Agent related data,
808 Including updating the status
809
810 TODO: Needs to support multiple config agents.
811 """
812 def __init__(self, dts, log, loop, project, nsm):
813 """
814 Args:
815 dts : Dts handle
816 log : Log handler
817 loop : Event loop
818 nsm : NsmTasklet instance
819 """
820 self.jobs = {}
821 self.dts = dts
822 self.log = log
823 self.loop = loop
824 self.nsm = nsm
825 self.project = project
826 self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
827 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
828
829 def add_job(self, rpc_output, tasks=None):
830 """Once an RPC is triggered, add a new job
831
832 Args:
833 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
834 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
835 tasks(list) A list of asyncio.Tasks
836
837 """
838 nsr_id = rpc_output.nsr_id_ref
839
840 job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
841 tasks, self.project)
842
843 self.log.debug("Creating a job monitor for Job id: {}".format(
844 rpc_output.job_id))
845
846 if nsr_id not in self.jobs:
847 self.jobs[nsr_id] = [job]
848 else:
849 self.jobs[nsr_id].append(job)
850
851 # If the tasks are none, assume juju actions
852 # TBD: This logic need to be revisited
853 ca = self.nsm.config_agent_plugins[0]
854 if tasks is None:
855 for agent in self.nsm.config_agent_plugins:
856 if agent.agent_type == 'juju':
857 ca = agent
858 break
859
860 def done_callback(fut):
861 e = fut.exception()
862 if e:
863 self.log.error("Exception on monitor job {}: {}".
864 format(rpc_output.job_id, e))
865 fut.print_stack()
866 self.log.debug("Monitor job done for {}".format(rpc_output.job_id))
867
868 # For every Job we will schedule a new monitoring process.
869 job_monitor = ConfigAgentJobMonitor(
870 self.dts,
871 self.log,
872 job,
873 self.executor,
874 self.loop,
875 ca
876 )
877 task = self.loop.create_task(job_monitor.publish_action_status())
878 task.add_done_callback(done_callback)
879
880 def get_job(self, nsr_id):
881 """Get the job associated with the NSR Id, if present."""
882 try:
883 return self.jobs[nsr_id]
884 except KeyError:
885 return []
886
887 def del_nsr(self, nsr_id):
888 """Delete a NSR id from the jobs list"""
889 if nsr_id in self.jobs:
890 self.jobs.pop(nsr_id)
891
892 @asyncio.coroutine
893 def register(self):
894 yield from self.handler.register()
895 # yield from self.handler.register_for_nsr()
896
897 def deregister(self):
898 self.handler.deregister()
899 self.handler = None