83912dbcf67aaef88fc19b82ac48cc78f97a1f1b
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import gi
20 import time
21 import gi
22
23 gi.require_version('RwNsrYang', '1.0')
24
25 from gi.repository import (
26 NsrYang,
27 RwTypes,
28 RwcalYang,
29 RwNsrYang,
30 RwConfigAgentYang,
31 RwDts as rwdts)
32 gi.require_version('RwKeyspec', '1.0')
33 from gi.repository.RwKeyspec import quoted_key
34
35 import rift.tasklets
36 import rift.mano.utils.juju_api as juju
37
38
39 class ConfigAgentAccountNotFound(Exception):
40 pass
41
42 class JujuClient(object):
43 def __init__(self, log, ip, port, user, passwd):
44 self._log = log
45 self._ip = ip
46 self._port = port
47 self._user = user
48 self._passwd = passwd
49
50 self._api = juju.JujuApi(log=log,
51 server=ip, port=port,
52 user=user, secret=passwd)
53
54
55 def validate_account_creds(self):
56 status = RwcalYang.YangData_Rwcal_ConnectionStatus()
57 try:
58 env = self._api._get_env()
59 except juju.JujuEnvError as e:
60 msg = "JujuClient: Invalid account credentials: %s", str(e)
61 self._log.error(msg)
62 raise Exception(msg)
63 except ConnectionRefusedError as e:
64 msg = "JujuClient: Wrong IP or Port: %s", str(e)
65 self._log.error(msg)
66 raise Exception(msg)
67 except Exception as e:
68 msg = "JujuClient: Connection Failed: %s", str(e)
69 self._log.error(msg)
70 raise Exception(msg)
71 else:
72 status.status = "success"
73 status.details = "Connection was successful"
74 self._log.info("JujuClient: Connection Successful")
75
76 return status
77
78
79 class ConfigAgentAccount(object):
80 def __init__(self, log, account_msg):
81 self._log = log
82 self._account_msg = account_msg.deep_copy()
83
84 if account_msg.account_type == "juju":
85 self._cfg_agent_client_plugin = JujuClient(
86 log,
87 account_msg.juju.ip_address,
88 account_msg.juju.port,
89 account_msg.juju.user,
90 account_msg.juju.secret)
91 else:
92 self._cfg_agent_client_plugin = None
93
94 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
95 status="unknown",
96 details="Connection status lookup not started"
97 )
98
99 self._validate_task = None
100
101 @property
102 def name(self):
103 return self._account_msg.name
104
105 @property
106 def account_msg(self):
107 return self._account_msg
108
109 @property
110 def account_type(self):
111 return self._account_msg.account_type
112
113 @property
114 def connection_status(self):
115 return self._status
116
117 def update_from_cfg(self, cfg):
118 self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
119 raise NotImplementedError("Update config agent account not yet supported")
120
121 @asyncio.coroutine
122 def validate_cfg_agent_account_credentials(self, loop):
123 self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
124
125 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
126 status="validating",
127 details="Config Agent account connection validation in progress"
128 )
129
130 if self._cfg_agent_client_plugin is None:
131 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
132 status="unknown",
133 details="Config Agent account does not support validation of account creds"
134 )
135 else:
136 try:
137 status = yield from loop.run_in_executor(
138 None,
139 self._cfg_agent_client_plugin.validate_account_creds
140 )
141 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict())
142 except Exception as e:
143 self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
144 status="failure",
145 details="Error - " + str(e)
146 )
147
148 self._log.info("Got config agent account validation response: %s", self._status)
149
150 def start_validate_credentials(self, loop):
151 if self._validate_task is not None:
152 self._validate_task.cancel()
153 self._validate_task = None
154
155 self._validate_task = asyncio.ensure_future(
156 self.validate_cfg_agent_account_credentials(loop),
157 loop=loop
158 )
159
160 class CfgAgentDtsOperdataHandler(object):
161 def __init__(self, dts, log, loop, project):
162 self._dts = dts
163 self._log = log
164 self._loop = loop
165 self._project = project
166
167 self.cfg_agent_accounts = {}
168 self._show_reg = None
169 self._rpc_reg = None
170
171 def add_cfg_agent_account(self, account_msg):
172 account = ConfigAgentAccount(self._log, account_msg)
173 self.cfg_agent_accounts[account.name] = account
174 self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
175
176 account.start_validate_credentials(self._loop)
177
178 def delete_cfg_agent_account(self, account_name):
179 del self.cfg_agent_accounts[account_name]
180 self._log.info("ConfigAgent Operdata Handler deleted.")
181
182 def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
183 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
184 saved_cfg_agent_accounts = []
185
186 if cfg_agent_account_name is None or cfg_agent_account_name == "":
187 cfg_agent_accounts = list(self.cfg_agent_accounts.values())
188 saved_cfg_agent_accounts.extend(cfg_agent_accounts)
189 elif cfg_agent_account_name in self.cfg_agent_accounts:
190 account = self.cfg_agent_accounts[cfg_agent_account_name]
191 saved_cfg_agent_accounts.append(account)
192 else:
193 errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
194 raise KeyError(errstr)
195
196 return saved_cfg_agent_accounts
197
198
199 def _register_show_status(self):
200 def get_xpath(cfg_agent_name=None):
201 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
202 "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else ''
203 )
204
205 @asyncio.coroutine
206 def on_prepare(xact_info, action, ks_path, msg):
207 path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path)
208 cfg_agent_account_name = path_entry.key00.name
209 self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
210
211 try:
212 saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
213 for account in saved_accounts:
214 connection_status = account.connection_status
215 self._log.debug("Responding to config agent connection status request: %s", connection_status)
216 xpath = self._project.add_project(get_xpath(account.name))
217 xact_info.respond_xpath(
218 rwdts.XactRspCode.MORE,
219 xpath=xpath,
220 msg=account.connection_status,
221 )
222 except KeyError as e:
223 self._log.warning(str(e))
224 xact_info.respond_xpath(rwdts.XactRspCode.NA)
225 return
226
227 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
228
229 xpath = self._project.add_project(get_xpath())
230 self._show_reg = yield from self._dts.register(
231 xpath=xpath,
232 handler=rift.tasklets.DTS.RegistrationHandler(
233 on_prepare=on_prepare),
234 flags=rwdts.Flag.PUBLISHER,
235 )
236
237 def _register_validate_rpc(self):
238 def get_xpath():
239 return "/rw-config-agent:update-cfg-agent-status"
240
241 @asyncio.coroutine
242 def on_prepare(xact_info, action, ks_path, msg):
243 if not msg.has_field("cfg_agent_account"):
244 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
245
246 cfg_agent_account_name = msg.cfg_agent_account
247
248 if not self._project.rpc_check(msg, xact_info=xact_info):
249 return
250
251 try:
252 account = self.cfg_agent_accounts[cfg_agent_account_name]
253 except KeyError:
254 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
255
256 account.start_validate_credentials(self._loop)
257
258 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
259
260 self._rpc_reg = yield from self._dts.register(
261 xpath=get_xpath(),
262 handler=rift.tasklets.DTS.RegistrationHandler(
263 on_prepare=on_prepare
264 ),
265 flags=rwdts.Flag.PUBLISHER,
266 )
267
268 @asyncio.coroutine
269 def register(self):
270 yield from self._register_show_status()
271 yield from self._register_validate_rpc()
272
273 def deregister(self):
274 self._show_reg.deregister()
275 self._rpc_reg.deregister()
276
277
278 class ConfigAgentJob(object):
279 """A wrapper over the config agent job object, providing some
280 convenience functions.
281
282 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
283 ||
284 ==> VNFRS
285 ||
286 ==> Primitives
287
288 """
289 # The normalizes the state terms from Juju to our yang models
290 # Juju : Yang model
291 STATUS_MAP = {"completed": "success",
292 "pending" : "pending",
293 "running" : "pending",
294 "failed" : "failure"}
295
296 def __init__(self, nsr_id, job, project, tasks=None):
297 """
298 Args:
299 nsr_id (uuid): ID of NSR record
300 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
301 tasks: List of asyncio.tasks. If provided the job monitor will
302 use it to monitor the tasks instead of the execution IDs
303 """
304 self._job = job
305 self.nsr_id = nsr_id
306 self.tasks = tasks
307 self._project = project
308
309 self._regh = None
310
311 @property
312 def id(self):
313 """Job id"""
314 return self._job.job_id
315
316 @property
317 def name(self):
318 """Job name"""
319 return self._job.job_name
320
321 @property
322 def job_status(self):
323 """Status of the job (success|pending|failure)"""
324 return self._job.job_status
325
326 @job_status.setter
327 def job_status(self, value):
328 """Setter for job status"""
329 self._job.job_status = value
330
331 @property
332 def job(self):
333 """Gi object"""
334 return self._job
335
336 @property
337 def xpath(self):
338 """Xpath of the job"""
339 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
340 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
341 "/nsr:config-agent-job[nsr:job-id={}]"
342 ).format(quoted_key(self.nsr_id), quoted_key(str(self.id))))
343
344 @property
345 def regh(self):
346 """Registration handle for the job"""
347 return self._regh
348
349 @regh.setter
350 def regh(self, hdl):
351 """Setter for registration handle"""
352 self._regh = hdl
353
354 @staticmethod
355 def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project):
356 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
357 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
358
359 Args:
360 nsr_id (uuid): NSR ID
361 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
362 tasks (list): A list of asyncio.Tasks
363
364 Returns:
365 ConfigAgentJob
366 """
367 # Shortcuts to prevent the HUUGE names.
368 CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
369 CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
370 CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
371 CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
372
373 job = CfgAgentJob.from_dict({
374 "job_id": rpc_output.job_id,
375 "job_name" : rpc_output.name,
376 "job_status": "pending",
377 "triggered_by": rpc_output.triggered_by,
378 "create_time": rpc_output.create_time,
379 "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
380 "parameter": [param.as_dict() for param in rpc_output.parameter],
381 "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
382 })
383
384 for vnfr in rpc_output.vnf_out_list:
385 vnfr_job = CfgAgentVnfr.from_dict({
386 "id": vnfr.vnfr_id_ref,
387 "vnf_job_status": "pending",
388 })
389
390 for primitive in vnfr.vnf_out_primitive:
391 vnf_primitive = CfgAgentPrimitive.from_dict({
392 "name": primitive.name,
393 "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
394 "execution_id": primitive.execution_id,
395 "execution_error_details": primitive.execution_error_details,
396 })
397
398 # Copy over the input param
399 for param in primitive.parameter:
400 vnf_primitive.parameter.append(
401 CfgAgentPrimitiveParam.from_dict({
402 "name": param.name,
403 "value": param.value
404 }))
405
406 vnfr_job.primitive.append(vnf_primitive)
407
408 job.vnfr.append(vnfr_job)
409
410 return ConfigAgentJob(nsr_id, job, project, tasks)
411
412
413 class ConfigAgentJobMonitor(object):
414 """Job monitor: Polls the Juju controller and get the status.
415 Rules:
416 If all Primitive are success, then vnf & nsr status will be "success"
417 If any one Primitive reaches a failed state then both vnf and nsr will fail.
418 """
419 POLLING_PERIOD = 2
420
421 def __init__(self, dts, log, job, executor, loop, config_plugin):
422 """
423 Args:
424 dts : DTS handle
425 log : log handle
426 job (ConfigAgentJob): ConfigAgentJob instance
427 executor (concurrent.futures): Executor for juju status api calls
428 loop (eventloop): Current event loop instance
429 config_plugin : Config plugin to be used.
430 """
431 self.job = job
432 self.log = log
433 self.loop = loop
434 self.executor = executor
435 self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
436 self.config_plugin = config_plugin
437 self.dts = dts
438
439 @asyncio.coroutine
440 def _monitor_processes(self, registration_handle):
441 result = 0
442 errs = ""
443 for process in self.job.tasks:
444 if isinstance(process, asyncio.subprocess.Process):
445 rc = yield from process.wait()
446 err = yield from process.stderr.read()
447
448 else:
449 # Task instance
450 rc = yield from process
451 err = ''
452
453 self.log.debug("Process {} returned rc: {}, err: {}".
454 format(process, rc, err))
455
456 if len(err):
457 if rc == 0:
458 errs += "<success>{}</success>".format(err)
459 else:
460 errs += "<error>{}</error>".format(err)
461 result |= rc
462
463 if result == 0:
464 self.job.job_status = "success"
465 else:
466 self.job.job_status = "failure"
467
468 if len(errs):
469 self.job.job.job_status_details = errs
470
471 registration_handle.update_element(self.job.xpath, self.job.job)
472
473 def get_execution_details(self):
474 '''Get the error details from failed primitives'''
475 errs = ''
476 for vnfr in self.job.job.vnfr:
477 for primitive in vnfr.primitive:
478 if primitive.execution_status == "failure":
479 errs += '<error>'
480 if primitive.execution_error_details:
481 errs += primitive.execution_error_details
482 else:
483 errs += '{}: Unknown error'.format(primitive.name)
484 errs += "</error>"
485 else:
486 if primitive.execution_error_details:
487 errs += '<{status}>{details}</{status}>'.format(
488 status=primitive.execution_status,
489 details=primitive.execution_error_details)
490 return errs
491
492 @asyncio.coroutine
493 def publish_action_status(self):
494 """
495 Starts publishing the status for jobs/primitives
496 """
497 registration_handle = yield from self.dts.register(
498 xpath=self.job.xpath,
499 handler=rift.tasklets.DTS.RegistrationHandler(),
500 flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
501 )
502
503 self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
504 self.job.regh = registration_handle
505
506 try:
507 registration_handle.create_element(self.job.xpath, self.job.job)
508
509 # If the config is done via a user defined script
510 if self.job.tasks is not None:
511 yield from self._monitor_processes(registration_handle)
512 return
513
514 prev = time.time()
515 # Run until pending moves to either failure/success
516 while self.job.job_status == "pending":
517 curr = time.time()
518
519 if curr - prev < self.polling_period:
520 pause = self.polling_period - (curr - prev)
521 yield from asyncio.sleep(pause, loop=self.loop)
522
523 prev = time.time()
524
525 tasks = []
526 for vnfr in self.job.job.vnfr:
527 task = self.loop.create_task(self.get_vnfr_status(vnfr))
528 tasks.append(task)
529
530 # Exit, if no tasks are found
531 if not tasks:
532 break
533
534 yield from asyncio.wait(tasks, loop=self.loop)
535
536 job_status = [task.result() for task in tasks]
537
538 if "failure" in job_status:
539 self.job.job_status = "failure"
540 elif "pending" in job_status:
541 self.job.job_status = "pending"
542 else:
543 self.job.job_status = "success"
544
545 errs = self.get_execution_details()
546 if len(errs):
547 self.job.job.job_status_details = errs
548
549 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
550 # self.job.job_status,
551 # self.job.xpath,
552 # self.job.nsr_id))
553
554 registration_handle.update_element(self.job.xpath, self.job.job)
555
556 registration_handle.update_element(self.job.xpath, self.job.job)
557
558 except Exception as e:
559 self.log.exception(e)
560 raise
561
562
563 @asyncio.coroutine
564 def get_vnfr_status(self, vnfr):
565 """Schedules tasks for all containing primitives and updates it's own
566 status.
567
568 Args:
569 vnfr : Vnfr job record containing primitives.
570
571 Returns:
572 (str): "success|failure|pending"
573 """
574 tasks = []
575 job_status = []
576
577 for primitive in vnfr.primitive:
578 if primitive.execution_status != 'pending':
579 if primitive.execution_id == "":
580 # We may not have processed the status for these yet
581 job_status.append(primitive.execution_status)
582 continue
583
584 if primitive.execution_id == "":
585 # Actions which failed to queue can have empty id
586 job_status.append(primitive.execution_status)
587 continue
588
589 if primitive.execution_id == "config":
590 # Config job. Check if service is active
591 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
592
593 else:
594 task = self.loop.create_task(self.get_primitive_status(primitive))
595
596 tasks.append(task)
597
598 if tasks:
599 yield from asyncio.wait(tasks, loop=self.loop)
600
601 job_status.extend([task.result() for task in tasks])
602 if "failure" in job_status:
603 vnfr.vnf_job_status = "failure"
604 return "failure"
605
606 elif "pending" in job_status:
607 vnfr.vnf_job_status = "pending"
608 return "pending"
609
610 else:
611 vnfr.vnf_job_status = "success"
612 return "success"
613
614 @asyncio.coroutine
615 def get_service_status(self, vnfr_id, primitive):
616 try:
617 status = yield from self.loop.run_in_executor(
618 self.executor,
619 self.config_plugin.get_service_status,
620 vnfr_id
621 )
622
623 self.log.debug("Service status: {}".format(status))
624 if status in ['error', 'blocked']:
625 self.log.warning("Execution of config {} failed: {}".
626 format(primitive.execution_id, status))
627 primitive.execution_error_details = 'Config failed'
628 status = 'failure'
629 elif status in ['active']:
630 status = 'success'
631 elif status is None:
632 status = 'failure'
633 else:
634 status = 'pending'
635
636 except Exception as e:
637 self.log.exception(e)
638 status = "failed"
639
640 primitive.execution_status = status
641 return primitive.execution_status
642
643 @asyncio.coroutine
644 def get_primitive_status(self, primitive):
645 """
646 Queries the juju api and gets the status of the execution id.
647
648 Args:
649 primitive : Primitive containing the execution ID.
650 """
651
652 try:
653 resp = yield from self.loop.run_in_executor(
654 self.executor,
655 self.config_plugin.get_action_status,
656 primitive.execution_id
657 )
658
659 self.log.debug("Action status: {}".format(resp))
660 status = resp['status']
661 if status == 'failed':
662 self.log.warning("Execution of action {} failed: {}".
663 format(primitive.execution_id, resp))
664 primitive.execution_error_details = resp['message']
665
666 except Exception as e:
667 self.log.exception(e)
668 status = "failed"
669
670 # Handle case status is None
671 if status:
672 primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
673 else:
674 primitive.execution_status = "failure"
675
676 return primitive.execution_status
677
678
679 class CfgAgentJobDtsHandler(object):
680 """Dts Handler for CfgAgent"""
681 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
682
683 def __init__(self, dts, log, loop, nsm, cfgm):
684 """
685 Args:
686 dts : Dts Handle.
687 log : Log handle.
688 loop : Event loop.
689 nsm : NsmManager.
690 cfgm : ConfigManager.
691 """
692 self._dts = dts
693 self._log = log
694 self._loop = loop
695 self._cfgm = cfgm
696 self._nsm = nsm
697
698 self._regh = None
699 self._project = cfgm.project
700
701 @property
702 def regh(self):
703 """ Return registration handle """
704 return self._regh
705
706 @property
707 def nsm(self):
708 """ Return the NSManager manager instance """
709 return self._nsm
710
711 @property
712 def cfgm(self):
713 """ Return the ConfigManager manager instance """
714 return self._cfgm
715
716 def cfg_job_xpath(self, nsr_id, job_id):
717 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
718 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
719 "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id))))
720
721 @asyncio.coroutine
722 def register(self):
723 """ Register for NS monitoring read from dts """
724
725 @asyncio.coroutine
726 def on_prepare(xact_info, action, ks_path, msg):
727 """ prepare callback from dts """
728 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
729 if action == rwdts.QueryAction.READ:
730 schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
731 path_entry = schema.keyspec_to_entry(ks_path)
732 try:
733 nsr_id = path_entry.key00.ns_instance_config_ref
734
735 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
736 nsr_ids = []
737 if nsr_id is None or nsr_id == "":
738 nsrs = list(self.nsm.nsrs.values())
739 nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
740 else:
741 nsr_ids = [nsr_id]
742
743 for nsr_id in nsr_ids:
744 jobs = self.cfgm.get_job(nsr_id)
745
746 for job in jobs:
747 xact_info.respond_xpath(
748 rwdts.XactRspCode.MORE,
749 self.cfg_job_xpath(nsr_id, job.id),
750 job.job)
751
752 except Exception as e:
753 self._log.exception("Caught exception:%s", str(e))
754 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
755
756 else:
757 xact_info.respond_xpath(rwdts.XactRspCode.NA)
758
759 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
760 with self._dts.group_create() as group:
761 self._regh = group.register(xpath=self._project.add_project(
762 CfgAgentJobDtsHandler.XPATH),
763 handler=hdl,
764 flags=rwdts.Flag.PUBLISHER,
765 )
766
767 def _terminate_nsr(self, nsr_id):
768 self._log.debug("NSR {} being terminated".format(nsr_id))
769 jobs = self.cfgm.get_job(nsr_id)
770 for job in jobs:
771 path = self.cfg_job_xpath(nsr_id, job.id)
772 with self._dts.transaction() as xact:
773 self._log.debug("Deleting job: {}".format(path))
774 job.regh.delete_element(path)
775 self._log.debug("Deleted job: {}".format(path))
776
777 # Remove the NSR id in manager
778 self.cfgm.del_nsr(nsr_id)
779
780 @property
781 def nsr_xpath(self):
782 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
783
784 def deregister(self):
785 self._log.debug("De-register config agent job for project".
786 format(self._project.name))
787 if self._regh:
788 self._regh.deregister()
789 self._regh = None
790
791
792 class ConfigAgentJobManager(object):
793 """A central class that manager all the Config Agent related data,
794 Including updating the status
795
796 TODO: Needs to support multiple config agents.
797 """
798 def __init__(self, dts, log, loop, project, nsm):
799 """
800 Args:
801 dts : Dts handle
802 log : Log handler
803 loop : Event loop
804 nsm : NsmTasklet instance
805 """
806 self.jobs = {}
807 self.dts = dts
808 self.log = log
809 self.loop = loop
810 self.nsm = nsm
811 self.project = project
812 self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
813 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
814
815 def add_job(self, rpc_output, tasks=None):
816 """Once an RPC is triggered, add a new job
817
818 Args:
819 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
820 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
821 tasks(list) A list of asyncio.Tasks
822
823 """
824 nsr_id = rpc_output.nsr_id_ref
825
826 job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
827 tasks, self.project)
828
829 self.log.debug("Creating a job monitor for Job id: {}".format(
830 rpc_output.job_id))
831
832 if nsr_id not in self.jobs:
833 self.jobs[nsr_id] = [job]
834 else:
835 self.jobs[nsr_id].append(job)
836
837 # If the tasks are none, assume juju actions
838 # TBD: This logic need to be revisited
839 ca = self.nsm.config_agent_plugins[0]
840 if tasks is None:
841 for agent in self.nsm.config_agent_plugins:
842 if agent.agent_type == 'juju':
843 ca = agent
844 break
845
846 def done_callback(fut):
847 e = fut.exception()
848 if e:
849 self.log.error("Exception on monitor job {}: {}".
850 format(rpc_output.job_id, e))
851 fut.print_stack()
852 self.log.debug("Monitor job done for {}".format(rpc_output.job_id))
853
854 # For every Job we will schedule a new monitoring process.
855 job_monitor = ConfigAgentJobMonitor(
856 self.dts,
857 self.log,
858 job,
859 self.executor,
860 self.loop,
861 ca
862 )
863 task = self.loop.create_task(job_monitor.publish_action_status())
864 task.add_done_callback(done_callback)
865
866 def get_job(self, nsr_id):
867 """Get the job associated with the NSR Id, if present."""
868 try:
869 return self.jobs[nsr_id]
870 except KeyError:
871 return []
872
873 def del_nsr(self, nsr_id):
874 """Delete a NSR id from the jobs list"""
875 if nsr_id in self.jobs:
876 self.jobs.pop(nsr_id)
877
878 @asyncio.coroutine
879 def register(self):
880 yield from self.handler.register()
881 # yield from self.handler.register_for_nsr()
882
883 def deregister(self):
884 self.handler.deregister()
885 self.handler = None