Merge from OSM SO master
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import time
20
21 from gi.repository import (
22 NsrYang,
23 RwTypes,
24 RwcalYang,
25 RwNsrYang,
26 RwConfigAgentYang,
27 RwDts as rwdts)
28
29 import rift.tasklets
30 import rift.mano.utils.juju_api as juju
31
32
33 class ConfigAgentAccountNotFound(Exception):
34 pass
35
36 class JujuClient(object):
37 def __init__(self, log, ip, port, user, passwd):
38 self._log = log
39 self._ip = ip
40 self._port = port
41 self._user = user
42 self._passwd = passwd
43
44 self._api = juju.JujuApi(log=log,
45 server=ip, port=port,
46 user=user, secret=passwd)
47
48
49 def validate_account_creds(self):
50 status = RwcalYang.CloudConnectionStatus()
51 try:
52 env = self._api._get_env()
53 except juju.JujuEnvError as e:
54 msg = "JujuClient: Invalid account credentials: %s", str(e)
55 self._log.error(msg)
56 raise Exception(msg)
57 except ConnectionRefusedError as e:
58 msg = "JujuClient: Wrong IP or Port: %s", str(e)
59 self._log.error(msg)
60 raise Exception(msg)
61 except Exception as e:
62 msg = "JujuClient: Connection Failed: %s", str(e)
63 self._log.error(msg)
64 raise Exception(msg)
65 else:
66 status.status = "success"
67 status.details = "Connection was successful"
68 self._log.info("JujuClient: Connection Successful")
69
70 return status
71
72
73 class ConfigAgentAccount(object):
74 def __init__(self, log, account_msg):
75 self._log = log
76 self._account_msg = account_msg.deep_copy()
77
78 if account_msg.account_type == "juju":
79 self._cfg_agent_client_plugin = JujuClient(
80 log,
81 account_msg.juju.ip_address,
82 account_msg.juju.port,
83 account_msg.juju.user,
84 account_msg.juju.secret)
85 else:
86 self._cfg_agent_client_plugin = None
87
88 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
89 status="unknown",
90 details="Connection status lookup not started"
91 )
92
93 self._validate_task = None
94
95 @property
96 def name(self):
97 return self._account_msg.name
98
99 @property
100 def account_msg(self):
101 return self._account_msg
102
103 @property
104 def account_type(self):
105 return self._account_msg.account_type
106
107 @property
108 def connection_status(self):
109 return self._status
110
111 def update_from_cfg(self, cfg):
112 self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
113 raise NotImplementedError("Update config agent account not yet supported")
114
115 @asyncio.coroutine
116 def validate_cfg_agent_account_credentials(self, loop):
117 self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
118
119 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
120 status="validating",
121 details="Config Agent account connection validation in progress"
122 )
123
124 if self._cfg_agent_client_plugin is None:
125 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
126 status="unknown",
127 details="Config Agent account does not support validation of account creds"
128 )
129 else:
130 try:
131 status = yield from loop.run_in_executor(
132 None,
133 self._cfg_agent_client_plugin.validate_account_creds
134 )
135 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict())
136 except Exception as e:
137 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
138 status="failure",
139 details="Error - " + str(e)
140 )
141
142 self._log.info("Got config agent account validation response: %s", self._status)
143
144 def start_validate_credentials(self, loop):
145 if self._validate_task is not None:
146 self._validate_task.cancel()
147 self._validate_task = None
148
149 self._validate_task = asyncio.ensure_future(
150 self.validate_cfg_agent_account_credentials(loop),
151 loop=loop
152 )
153
154 class CfgAgentDtsOperdataHandler(object):
155 def __init__(self, dts, log, loop, project):
156 self._dts = dts
157 self._log = log
158 self._loop = loop
159 self._project = project
160
161 self.cfg_agent_accounts = {}
162 self._show_reg = None
163 self._rpc_reg = None
164
165 def add_cfg_agent_account(self, account_msg):
166 account = ConfigAgentAccount(self._log, account_msg)
167 self.cfg_agent_accounts[account.name] = account
168 self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
169
170 account.start_validate_credentials(self._loop)
171
172 def delete_cfg_agent_account(self, account_name):
173 del self.cfg_agent_accounts[account_name]
174 self._log.info("ConfigAgent Operdata Handler deleted.")
175
176 def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
177 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
178 saved_cfg_agent_accounts = []
179
180 if cfg_agent_account_name is None or cfg_agent_account_name == "":
181 cfg_agent_accounts = list(self.cfg_agent_accounts.values())
182 saved_cfg_agent_accounts.extend(cfg_agent_accounts)
183 elif cfg_agent_account_name in self.cfg_agent_accounts:
184 account = self.cfg_agent_accounts[cfg_agent_account_name]
185 saved_cfg_agent_accounts.append(account)
186 else:
187 errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
188 raise KeyError(errstr)
189
190 return saved_cfg_agent_accounts
191
192
193 def _register_show_status(self):
194 def get_xpath(cfg_agent_name=None):
195 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
196 "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else ''
197 )
198
199 @asyncio.coroutine
200 def on_prepare(xact_info, action, ks_path, msg):
201 path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path)
202 cfg_agent_account_name = path_entry.key00.name
203 self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
204
205 try:
206 saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
207 for account in saved_accounts:
208 connection_status = account.connection_status
209 self._log.debug("Responding to config agent connection status request: %s", connection_status)
210 xpath = self._project.add_project(get_xpath(account.name))
211 xact_info.respond_xpath(
212 rwdts.XactRspCode.MORE,
213 xpath=xpath,
214 msg=account.connection_status,
215 )
216 except KeyError as e:
217 self._log.warning(str(e))
218 xact_info.respond_xpath(rwdts.XactRspCode.NA)
219 return
220
221 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
222
223 xpath = self._project.add_project(get_xpath())
224 self._show_reg = yield from self._dts.register(
225 xpath=xpath,
226 handler=rift.tasklets.DTS.RegistrationHandler(
227 on_prepare=on_prepare),
228 flags=rwdts.Flag.PUBLISHER,
229 )
230
231 def _register_validate_rpc(self):
232 def get_xpath():
233 return "/rw-config-agent:update-cfg-agent-status"
234
235 @asyncio.coroutine
236 def on_prepare(xact_info, action, ks_path, msg):
237 if not msg.has_field("cfg_agent_account"):
238 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
239
240 cfg_agent_account_name = msg.cfg_agent_account
241
242 if not self._project.rpc_check(msg, xact_info=xact_info):
243 return
244
245 try:
246 account = self.cfg_agent_accounts[cfg_agent_account_name]
247 except KeyError:
248 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
249
250 account.start_validate_credentials(self._loop)
251
252 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
253
254 self._rpc_reg = yield from self._dts.register(
255 xpath=get_xpath(),
256 handler=rift.tasklets.DTS.RegistrationHandler(
257 on_prepare=on_prepare
258 ),
259 flags=rwdts.Flag.PUBLISHER,
260 )
261
262 @asyncio.coroutine
263 def register(self):
264 yield from self._register_show_status()
265 yield from self._register_validate_rpc()
266
267 def deregister(self):
268 self._show_reg.deregister()
269 self._rpc_reg.deregister()
270
271
272 class ConfigAgentJob(object):
273 """A wrapper over the config agent job object, providing some
274 convenience functions.
275
276 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
277 ||
278 ==> VNFRS
279 ||
280 ==> Primitives
281
282 """
283 # The normalizes the state terms from Juju to our yang models
284 # Juju : Yang model
285 STATUS_MAP = {"completed": "success",
286 "pending" : "pending",
287 "running" : "pending",
288 "failed" : "failure"}
289
290 def __init__(self, nsr_id, job, project, tasks=None):
291 """
292 Args:
293 nsr_id (uuid): ID of NSR record
294 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
295 tasks: List of asyncio.tasks. If provided the job monitor will
296 use it to monitor the tasks instead of the execution IDs
297 """
298 self._job = job
299 self.nsr_id = nsr_id
300 self.tasks = tasks
301 self._project = project
302
303 self._regh = None
304
305 @property
306 def id(self):
307 """Job id"""
308 return self._job.job_id
309
310 @property
311 def name(self):
312 """Job name"""
313 return self._job.job_name
314
315 @property
316 def job_status(self):
317 """Status of the job (success|pending|failure)"""
318 return self._job.job_status
319
320 @job_status.setter
321 def job_status(self, value):
322 """Setter for job status"""
323 self._job.job_status = value
324
325 @property
326 def job(self):
327 """Gi object"""
328 return self._job
329
330 @property
331 def xpath(self):
332 """Xpath of the job"""
333 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
334 "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
335 "/nsr:config-agent-job[nsr:job-id='{}']"
336 ).format(self.nsr_id, self.id))
337
338 @property
339 def regh(self):
340 """Registration handle for the job"""
341 return self._regh
342
343 @regh.setter
344 def regh(self, hdl):
345 """Setter for registration handle"""
346 self._regh = hdl
347
348 @staticmethod
349 def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
350 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
351 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
352
353 Args:
354 nsr_id (uuid): NSR ID
355 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
356 tasks (list): A list of asyncio.Tasks
357
358 Returns:
359 ConfigAgentJob
360 """
361 # Shortcuts to prevent the HUUGE names.
362 CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
363 CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
364 CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
365 CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
366
367 job = CfgAgentJob.from_dict({
368 "job_id": rpc_output.job_id,
369 "job_name" : rpc_output.name,
370 "job_status": "pending",
371 "triggered_by": rpc_output.triggered_by,
372 "create_time": rpc_output.create_time,
373 "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
374 "parameter": [param.as_dict() for param in rpc_output.parameter],
375 "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
376 })
377
378 for vnfr in rpc_output.vnf_out_list:
379 vnfr_job = CfgAgentVnfr.from_dict({
380 "id": vnfr.vnfr_id_ref,
381 "vnf_job_status": "pending",
382 })
383
384 for primitive in vnfr.vnf_out_primitive:
385 vnf_primitive = CfgAgentPrimitive.from_dict({
386 "name": primitive.name,
387 "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
388 "execution_id": primitive.execution_id
389 })
390
391 # Copy over the input param
392 for param in primitive.parameter:
393 vnf_primitive.parameter.append(
394 CfgAgentPrimitiveParam.from_dict({
395 "name": param.name,
396 "value": param.value
397 }))
398
399 vnfr_job.primitive.append(vnf_primitive)
400
401 job.vnfr.append(vnfr_job)
402
403 return ConfigAgentJob(nsr_id, job, project, tasks)
404
405
406 class ConfigAgentJobMonitor(object):
407 """Job monitor: Polls the Juju controller and get the status.
408 Rules:
409 If all Primitive are success, then vnf & nsr status will be "success"
410 If any one Primitive reaches a failed state then both vnf and nsr will fail.
411 """
412 POLLING_PERIOD = 2
413
414 def __init__(self, dts, log, job, executor, loop, config_plugin):
415 """
416 Args:
417 dts : DTS handle
418 log : log handle
419 job (ConfigAgentJob): ConfigAgentJob instance
420 executor (concurrent.futures): Executor for juju status api calls
421 loop (eventloop): Current event loop instance
422 config_plugin : Config plugin to be used.
423 """
424 self.job = job
425 self.log = log
426 self.loop = loop
427 self.executor = executor
428 self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
429 self.config_plugin = config_plugin
430 self.dts = dts
431
432 @asyncio.coroutine
433 def _monitor_processes(self, registration_handle):
434 result = 0
435 errs = ""
436 for process in self.job.tasks:
437 if isinstance(process, asyncio.subprocess.Process):
438 rc = yield from process.wait()
439 err = yield from process.stderr.read()
440
441 else:
442 # Task instance
443 rc = yield from process
444 err = ''
445
446 self.log.debug("Process {} returned rc: {}, err: {}".
447 format(process, rc, err))
448
449 if len(err):
450 if rc == 0:
451 errs += "<success>{}</success>".format(err)
452 else:
453 errs += "<error>{}</error>".format(err)
454 result |= rc
455
456 if result == 0:
457 self.job.job_status = "success"
458 else:
459 self.job.job_status = "failure"
460
461 if len(errs):
462 self.job.job.job_status_details = errs
463
464 registration_handle.update_element(self.job.xpath, self.job.job)
465
466 def get_error_details(self):
467 '''Get the error details from failed primitives'''
468 errs = ''
469 for vnfr in self.job.job.vnfr:
470 if vnfr.vnf_job_status != "failure":
471 continue
472
473 for primitive in vnfr.primitive:
474 if primitive.execution_status == "failure":
475 errs += '<error>'
476 if primitive.execution_error_details:
477 errs += primitive.execution_error_details
478 else:
479 errs += '{}: Unknown error'.format(primitive.name)
480 errs += "</error>"
481
482 return errs
483
484 @asyncio.coroutine
485 def publish_action_status(self):
486 """
487 Starts publishing the status for jobs/primitives
488 """
489 registration_handle = yield from self.dts.register(
490 xpath=self.job.xpath,
491 handler=rift.tasklets.DTS.RegistrationHandler(),
492 flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
493 )
494
495 self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
496 self.job.regh = registration_handle
497
498 try:
499 registration_handle.create_element(self.job.xpath, self.job.job)
500
501 # If the config is done via a user defined script
502 if self.job.tasks is not None:
503 yield from self._monitor_processes(registration_handle)
504 return
505
506 prev = time.time()
507 # Run until pending moves to either failure/success
508 while self.job.job_status == "pending":
509 curr = time.time()
510
511 if curr - prev < self.polling_period:
512 pause = self.polling_period - (curr - prev)
513 yield from asyncio.sleep(pause, loop=self.loop)
514
515 prev = time.time()
516
517 tasks = []
518 for vnfr in self.job.job.vnfr:
519 task = self.loop.create_task(self.get_vnfr_status(vnfr))
520 tasks.append(task)
521
522 # Exit, if no tasks are found
523 if not tasks:
524 break
525
526 yield from asyncio.wait(tasks, loop=self.loop)
527
528 job_status = [task.result() for task in tasks]
529
530 if "failure" in job_status:
531 self.job.job_status = "failure"
532 errs = self.get_error_details()
533 if len(errs):
534 self.job.job.job_status_details = errs
535 elif "pending" in job_status:
536 self.job.job_status = "pending"
537 else:
538 self.job.job_status = "success"
539
540 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
541 # self.job.job_status,
542 # self.job.xpath,
543 # self.job.nsr_id))
544
545 registration_handle.update_element(self.job.xpath, self.job.job)
546
547
548 except Exception as e:
549 self.log.exception(e)
550 raise
551
552
553 @asyncio.coroutine
554 def get_vnfr_status(self, vnfr):
555 """Schedules tasks for all containing primitives and updates it's own
556 status.
557
558 Args:
559 vnfr : Vnfr job record containing primitives.
560
561 Returns:
562 (str): "success|failure|pending"
563 """
564 tasks = []
565 job_status = []
566
567 for primitive in vnfr.primitive:
568 if primitive.execution_status != 'pending':
569 continue
570
571 if primitive.execution_id == "":
572 # Actions which failed to queue can have empty id
573 job_status.append(primitive.execution_status)
574 continue
575
576 elif primitive.execution_id == "config":
577 # Config job. Check if service is active
578 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
579
580 else:
581 task = self.loop.create_task(self.get_primitive_status(primitive))
582
583 tasks.append(task)
584
585 if tasks:
586 yield from asyncio.wait(tasks, loop=self.loop)
587
588 job_status.extend([task.result() for task in tasks])
589 if "failure" in job_status:
590 vnfr.vnf_job_status = "failure"
591 return "failure"
592
593 elif "pending" in job_status:
594 vnfr.vnf_job_status = "pending"
595 return "pending"
596
597 else:
598 vnfr.vnf_job_status = "success"
599 return "success"
600
601 @asyncio.coroutine
602 def get_service_status(self, vnfr_id, primitive):
603 try:
604 status = yield from self.loop.run_in_executor(
605 self.executor,
606 self.config_plugin.get_service_status,
607 vnfr_id
608 )
609
610 self.log.debug("Service status: {}".format(status))
611 if status in ['error', 'blocked']:
612 self.log.warning("Execution of config {} failed: {}".
613 format(primitive.execution_id, status))
614 primitive.execution_error_details = 'Config failed'
615 status = 'failure'
616 elif status in ['active']:
617 status = 'success'
618 elif status is None:
619 status = 'failure'
620 else:
621 status = 'pending'
622
623 except Exception as e:
624 self.log.exception(e)
625 status = "failed"
626
627 primitive.execution_status = status
628 return primitive.execution_status
629
630 @asyncio.coroutine
631 def get_primitive_status(self, primitive):
632 """
633 Queries the juju api and gets the status of the execution id.
634
635 Args:
636 primitive : Primitive containing the execution ID.
637 """
638
639 try:
640 resp = yield from self.loop.run_in_executor(
641 self.executor,
642 self.config_plugin.get_action_status,
643 primitive.execution_id
644 )
645
646 self.log.debug("Action status: {}".format(resp))
647 status = resp['status']
648 if status == 'failed':
649 self.log.warning("Execution of action {} failed: {}".
650 format(primitive.execution_id, resp))
651 primitive.execution_error_details = resp['message']
652
653 except Exception as e:
654 self.log.exception(e)
655 status = "failed"
656
657 # Handle case status is None
658 if status:
659 primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
660 else:
661 primitive.execution_status = "failure"
662
663 return primitive.execution_status
664
665
666 class CfgAgentJobDtsHandler(object):
667 """Dts Handler for CfgAgent"""
668 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
669
670 def __init__(self, dts, log, loop, nsm, cfgm):
671 """
672 Args:
673 dts : Dts Handle.
674 log : Log handle.
675 loop : Event loop.
676 nsm : NsmManager.
677 cfgm : ConfigManager.
678 """
679 self._dts = dts
680 self._log = log
681 self._loop = loop
682 self._cfgm = cfgm
683 self._nsm = nsm
684
685 self._regh = None
686 self._nsr_regh = None
687 self._project = cfgm.project
688
689 @property
690 def regh(self):
691 """ Return registration handle """
692 return self._regh
693
694 @property
695 def nsm(self):
696 """ Return the NSManager manager instance """
697 return self._nsm
698
699 @property
700 def cfgm(self):
701 """ Return the ConfigManager manager instance """
702 return self._cfgm
703
704 @staticmethod
705 def cfg_job_xpath(nsr_id, job_id):
706 return self._project.add_project(("D,/nsr:ns-instance-opdata" +
707 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
708 "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id))
709
710 @asyncio.coroutine
711 def register(self):
712 """ Register for NS monitoring read from dts """
713
714 @asyncio.coroutine
715 def on_prepare(xact_info, action, ks_path, msg):
716 """ prepare callback from dts """
717 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
718 if action == rwdts.QueryAction.READ:
719 schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
720 path_entry = schema.keyspec_to_entry(ks_path)
721 try:
722 nsr_id = path_entry.key00.ns_instance_config_ref
723
724 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
725 nsr_ids = []
726 if nsr_id is None or nsr_id == "":
727 nsrs = list(self.nsm.nsrs.values())
728 nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
729 else:
730 nsr_ids = [nsr_id]
731
732 for nsr_id in nsr_ids:
733 jobs = self.cfgm.get_job(nsr_id)
734
735 for job in jobs:
736 xact_info.respond_xpath(
737 rwdts.XactRspCode.MORE,
738 CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
739 job.job)
740
741 except Exception as e:
742 self._log.exception("Caught exception:%s", str(e))
743 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
744
745 else:
746 xact_info.respond_xpath(rwdts.XactRspCode.NA)
747
748 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
749 with self._dts.group_create() as group:
750 self._regh = group.register(xpath=self._project.add_project(
751 CfgAgentJobDtsHandler.XPATH),
752 handler=hdl,
753 flags=rwdts.Flag.PUBLISHER,
754 )
755
756 @asyncio.coroutine
757 def _terminate_nsr(self, nsr_id):
758 self._log.debug("NSR {} being terminated".format(nsr_id))
759 jobs = self.cfgm.get_job(nsr_id)
760 for job in jobs:
761 path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
762 with self._dts.transaction() as xact:
763 self._log.debug("Deleting job: {}".format(path))
764 job.regh.delete_element(path)
765 self._log.debug("Deleted job: {}".format(path))
766
767 # Remove the NSR id in manager
768 self.cfgm.del_nsr(nsr_id)
769
770 @property
771 def nsr_xpath(self):
772 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
773
774 @asyncio.coroutine
775 def register_for_nsr(self):
776 """ Register for NSR changes """
777
778 @asyncio.coroutine
779 def on_prepare(xact_info, query_action, ks_path, msg):
780 """ This NSR is created """
781 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
782 query_action,
783 ks_path,
784 msg)
785
786 if (query_action == rwdts.QueryAction.UPDATE or
787 query_action == rwdts.QueryAction.CREATE):
788 pass
789 elif query_action == rwdts.QueryAction.DELETE:
790 nsr_id = msg.ns_instance_config_ref
791 asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
792 else:
793 raise NotImplementedError(
794 "%s action on cm-state not supported",
795 query_action)
796
797 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
798
799 try:
800 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
801 self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
802 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
803 handler=handler)
804 except Exception as e:
805 self._log.error("Failed to register for NSR changes as %s", str(e))
806
807 def deregister(self):
808 self._log.debug("De-register config agent job for project".
809 format(self._project.name))
810 if self._regh:
811 self._regh.deregister()
812 self._regh = None
813
814 if self._nsr_regh:
815 self._nsr_regh.deregister()
816 self._nsr_regh = None
817
818
819 class ConfigAgentJobManager(object):
820 """A central class that manager all the Config Agent related data,
821 Including updating the status
822
823 TODO: Needs to support multiple config agents.
824 """
825 def __init__(self, dts, log, loop, project, nsm):
826 """
827 Args:
828 dts : Dts handle
829 log : Log handler
830 loop : Event loop
831 nsm : NsmTasklet instance
832 """
833 self.jobs = {}
834 self.dts = dts
835 self.log = log
836 self.loop = loop
837 self.nsm = nsm
838 self.project = project
839 self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
840 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
841
842 def add_job(self, rpc_output, tasks=None):
843 """Once an RPC is triggered, add a new job
844
845 Args:
846 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
847 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
848 tasks(list) A list of asyncio.Tasks
849
850 """
851 nsr_id = rpc_output.nsr_id_ref
852
853 job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
854 tasks, self.project)
855
856 self.log.debug("Creating a job monitor for Job id: {}".format(
857 rpc_output.job_id))
858
859 if nsr_id not in self.jobs:
860 self.jobs[nsr_id] = [job]
861 else:
862 self.jobs[nsr_id].append(job)
863
864 # If the tasks are none, assume juju actions
865 # TBD: This logic need to be revisited
866 ca = self.nsm.config_agent_plugins[0]
867 if tasks is None:
868 for agent in self.nsm.config_agent_plugins:
869 if agent.agent_type == 'juju':
870 ca = agent
871 break
872
873 # For every Job we will schedule a new monitoring process.
874 job_monitor = ConfigAgentJobMonitor(
875 self.dts,
876 self.log,
877 job,
878 self.executor,
879 self.loop,
880 ca
881 )
882 task = self.loop.create_task(job_monitor.publish_action_status())
883
884 def get_job(self, nsr_id):
885 """Get the job associated with the NSR Id, if present."""
886 try:
887 return self.jobs[nsr_id]
888 except KeyError:
889 return []
890
891 def del_nsr(self, nsr_id):
892 """Delete a NSR id from the jobs list"""
893 if nsr_id in self.jobs:
894 self.jobs.pop(nsr_id)
895
896 @asyncio.coroutine
897 def register(self):
898 yield from self.handler.register()
899 yield from self.handler.register_for_nsr()
900
901 def deregister(self):
902 yield from self.handler.deregister()