Change tag based on job status
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import time
20
21 from gi.repository import (
22 NsrYang,
23 RwTypes,
24 RwcalYang,
25 RwNsrYang,
26 RwConfigAgentYang,
27 RwDts as rwdts)
28
29 import rift.tasklets
30
31 import rift.mano.utils.juju_api as juju
32
33
34 class ConfigAgentAccountNotFound(Exception):
35 pass
36
37 class JujuClient(object):
38 def __init__(self, log, ip, port, user, passwd):
39 self._log = log
40 self._ip = ip
41 self._port = port
42 self._user = user
43 self._passwd = passwd
44
45 self._api = juju.JujuApi(log=log,
46 server=ip, port=port,
47 user=user, secret=passwd)
48
49
50 def validate_account_creds(self):
51 status = RwcalYang.CloudConnectionStatus()
52 try:
53 env = self._api._get_env()
54 except juju.JujuEnvError as e:
55 msg = "JujuClient: Invalid account credentials: %s", str(e)
56 self._log.error(msg)
57 raise Exception(msg)
58 except ConnectionRefusedError as e:
59 msg = "JujuClient: Wrong IP or Port: %s", str(e)
60 self._log.error(msg)
61 raise Exception(msg)
62 except Exception as e:
63 msg = "JujuClient: Connection Failed: %s", str(e)
64 self._log.error(msg)
65 raise Exception(msg)
66 else:
67 status.status = "success"
68 status.details = "Connection was successful"
69 self._log.info("JujuClient: Connection Successful")
70
71 return status
72
73
74 class ConfigAgentAccount(object):
75 def __init__(self, log, account_msg):
76 self._log = log
77 self._account_msg = account_msg.deep_copy()
78
79 if account_msg.account_type == "juju":
80 self._cfg_agent_client_plugin = JujuClient(
81 log,
82 account_msg.juju.ip_address,
83 account_msg.juju.port,
84 account_msg.juju.user,
85 account_msg.juju.secret)
86 else:
87 self._cfg_agent_client_plugin = None
88
89 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
90 status="unknown",
91 details="Connection status lookup not started"
92 )
93
94 self._validate_task = None
95
96 @property
97 def name(self):
98 return self._account_msg.name
99
100 @property
101 def account_msg(self):
102 return self._account_msg
103
104 @property
105 def account_type(self):
106 return self._account_msg.account_type
107
108 @property
109 def connection_status(self):
110 return self._status
111
112 def update_from_cfg(self, cfg):
113 self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
114 raise NotImplementedError("Update config agent account not yet supported")
115
116 @asyncio.coroutine
117 def validate_cfg_agent_account_credentials(self, loop):
118 self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
119
120 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
121 status="validating",
122 details="Config Agent account connection validation in progress"
123 )
124
125 if self._cfg_agent_client_plugin is None:
126 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
127 status="unknown",
128 details="Config Agent account does not support validation of account creds"
129 )
130 else:
131 try:
132 status = yield from loop.run_in_executor(
133 None,
134 self._cfg_agent_client_plugin.validate_account_creds
135 )
136 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict())
137 except Exception as e:
138 self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
139 status="failure",
140 details="Error - " + str(e)
141 )
142
143 self._log.info("Got config agent account validation response: %s", self._status)
144
145 def start_validate_credentials(self, loop):
146 if self._validate_task is not None:
147 self._validate_task.cancel()
148 self._validate_task = None
149
150 self._validate_task = asyncio.ensure_future(
151 self.validate_cfg_agent_account_credentials(loop),
152 loop=loop
153 )
154
155 class CfgAgentDtsOperdataHandler(object):
156 def __init__(self, dts, log, loop):
157 self._dts = dts
158 self._log = log
159 self._loop = loop
160
161 self.cfg_agent_accounts = {}
162
163 def add_cfg_agent_account(self, account_msg):
164 account = ConfigAgentAccount(self._log, account_msg)
165 self.cfg_agent_accounts[account.name] = account
166 self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
167
168 account.start_validate_credentials(self._loop)
169
170 def delete_cfg_agent_account(self, account_name):
171 del self.cfg_agent_accounts[account_name]
172 self._log.info("ConfigAgent Operdata Handler deleted.")
173
174 def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
175 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
176 saved_cfg_agent_accounts = []
177
178 if cfg_agent_account_name is None or cfg_agent_account_name == "":
179 cfg_agent_accounts = list(self.cfg_agent_accounts.values())
180 saved_cfg_agent_accounts.extend(cfg_agent_accounts)
181 elif cfg_agent_account_name in self.cfg_agent_accounts:
182 account = self.cfg_agent_accounts[cfg_agent_account_name]
183 saved_cfg_agent_accounts.append(account)
184 else:
185 errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
186 raise KeyError(errstr)
187
188 return saved_cfg_agent_accounts
189
190
191 def _register_show_status(self):
192 def get_xpath(cfg_agent_name=None):
193 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
194 "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else ''
195 )
196
197 @asyncio.coroutine
198 def on_prepare(xact_info, action, ks_path, msg):
199 path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path)
200 cfg_agent_account_name = path_entry.key00.name
201 self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
202
203 try:
204 saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
205 for account in saved_accounts:
206 connection_status = account.connection_status
207 self._log.debug("Responding to config agent connection status request: %s", connection_status)
208 xact_info.respond_xpath(
209 rwdts.XactRspCode.MORE,
210 xpath=get_xpath(account.name),
211 msg=account.connection_status,
212 )
213 except KeyError as e:
214 self._log.warning(str(e))
215 xact_info.respond_xpath(rwdts.XactRspCode.NA)
216 return
217
218 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
219
220 yield from self._dts.register(
221 xpath=get_xpath(),
222 handler=rift.tasklets.DTS.RegistrationHandler(
223 on_prepare=on_prepare),
224 flags=rwdts.Flag.PUBLISHER,
225 )
226
227 def _register_validate_rpc(self):
228 def get_xpath():
229 return "/rw-config-agent:update-cfg-agent-status"
230
231 @asyncio.coroutine
232 def on_prepare(xact_info, action, ks_path, msg):
233 if not msg.has_field("cfg_agent_account"):
234 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
235
236 cfg_agent_account_name = msg.cfg_agent_account
237 try:
238 account = self.cfg_agent_accounts[cfg_agent_account_name]
239 except KeyError:
240 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
241
242 account.start_validate_credentials(self._loop)
243
244 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
245
246 yield from self._dts.register(
247 xpath=get_xpath(),
248 handler=rift.tasklets.DTS.RegistrationHandler(
249 on_prepare=on_prepare
250 ),
251 flags=rwdts.Flag.PUBLISHER,
252 )
253
254 @asyncio.coroutine
255 def register(self):
256 yield from self._register_show_status()
257 yield from self._register_validate_rpc()
258
259 class ConfigAgentJob(object):
260 """A wrapper over the config agent job object, providing some
261 convenience functions.
262
263 YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
264 ||
265 ==> VNFRS
266 ||
267 ==> Primitives
268
269 """
270 # The normalizes the state terms from Juju to our yang models
271 # Juju : Yang model
272 STATUS_MAP = {"completed": "success",
273 "pending" : "pending",
274 "running" : "pending",
275 "failed" : "failure"}
276
277 def __init__(self, nsr_id, job, tasks=None):
278 """
279 Args:
280 nsr_id (uuid): ID of NSR record
281 job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
282 tasks: List of asyncio.tasks. If provided the job monitor will
283 use it to monitor the tasks instead of the execution IDs
284 """
285 self._job = job
286 self.nsr_id = nsr_id
287 self.tasks = tasks
288 self._regh = None
289
290 @property
291 def id(self):
292 """Job id"""
293 return self._job.job_id
294
295 @property
296 def name(self):
297 """Job name"""
298 return self._job.job_name
299
300 @property
301 def job_status(self):
302 """Status of the job (success|pending|failure)"""
303 return self._job.job_status
304
305 @job_status.setter
306 def job_status(self, value):
307 """Setter for job status"""
308 self._job.job_status = value
309
310 @property
311 def job(self):
312 """Gi object"""
313 return self._job
314
315 @property
316 def xpath(self):
317 """Xpath of the job"""
318 return ("D,/nsr:ns-instance-opdata" +
319 "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
320 "/nsr:config-agent-job[nsr:job-id='{}']"
321 ).format(self.nsr_id, self.id)
322
323 @property
324 def regh(self):
325 """Registration handle for the job"""
326 return self._regh
327
328 @regh.setter
329 def regh(self, hdl):
330 """Setter for registration handle"""
331 self._regh = hdl
332
333 @staticmethod
334 def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
335 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
336 to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
337
338 Args:
339 nsr_id (uuid): NSR ID
340 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
341 tasks (list): A list of asyncio.Tasks
342
343 Returns:
344 ConfigAgentJob
345 """
346 # Shortcuts to prevent the HUUGE names.
347 CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
348 CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
349 CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
350 CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
351
352 job = CfgAgentJob.from_dict({
353 "job_id": rpc_output.job_id,
354 "job_name" : rpc_output.name,
355 "job_status": "pending",
356 "triggered_by": rpc_output.triggered_by,
357 "create_time": rpc_output.create_time,
358 "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
359 "parameter": [param.as_dict() for param in rpc_output.parameter],
360 "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
361 })
362
363 for vnfr in rpc_output.vnf_out_list:
364 vnfr_job = CfgAgentVnfr.from_dict({
365 "id": vnfr.vnfr_id_ref,
366 "vnf_job_status": "pending",
367 })
368
369 for primitive in vnfr.vnf_out_primitive:
370 vnf_primitive = CfgAgentPrimitive.from_dict({
371 "name": primitive.name,
372 "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
373 "execution_id": primitive.execution_id
374 })
375
376 # Copy over the input param
377 for param in primitive.parameter:
378 vnf_primitive.parameter.append(
379 CfgAgentPrimitiveParam.from_dict({
380 "name": param.name,
381 "value": param.value
382 }))
383
384 vnfr_job.primitive.append(vnf_primitive)
385
386 job.vnfr.append(vnfr_job)
387
388 return ConfigAgentJob(nsr_id, job, tasks)
389
390
391 class ConfigAgentJobMonitor(object):
392 """Job monitor: Polls the Juju controller and get the status.
393 Rules:
394 If all Primitive are success, then vnf & nsr status will be "success"
395 If any one Primitive reaches a failed state then both vnf and nsr will fail.
396 """
397 POLLING_PERIOD = 2
398
399 def __init__(self, dts, log, job, executor, loop, config_plugin):
400 """
401 Args:
402 dts : DTS handle
403 log : log handle
404 job (ConfigAgentJob): ConfigAgentJob instance
405 executor (concurrent.futures): Executor for juju status api calls
406 loop (eventloop): Current event loop instance
407 config_plugin : Config plugin to be used.
408 """
409 self.job = job
410 self.log = log
411 self.loop = loop
412 self.executor = executor
413 self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
414 self.config_plugin = config_plugin
415 self.dts = dts
416
417 @asyncio.coroutine
418 def _monitor_processes(self, registration_handle):
419 result = 0
420 errs = ""
421 for process in self.job.tasks:
422 if isinstance(process, asyncio.subprocess.Process):
423 rc = yield from process.wait()
424 err = yield from process.stderr.read()
425
426 else:
427 # Task instance
428 rc = yield from process
429 err = ''
430
431 self.log.debug("Process {} returned rc: {}, err: {}".
432 format(process, rc, err))
433
434 if len(err):
435 if rc == 0:
436 errs += "<success>{}</success>".format(err)
437 else:
438 errs += "<error>{}</error>".format(err)
439 result |= rc
440
441 if result == 0:
442 self.job.job_status = "success"
443 else:
444 self.job.job_status = "failure"
445
446 if len(errs):
447 self.job.job.job_status_details = errs
448
449 registration_handle.update_element(self.job.xpath, self.job.job)
450
451 def get_error_details(self):
452 '''Get the error details from failed primitives'''
453 errs = ''
454 for vnfr in self.job.job.vnfr:
455 if vnfr.vnf_job_status != "failure":
456 continue
457
458 for primitive in vnfr.primitive:
459 if primitive.execution_status == "failure":
460 errs += '<error>'
461 if primitive.execution_error_details:
462 errs += primitive.execution_error_details
463 else:
464 errs += '{}: Unknown error'.format(primitive.name)
465 errs += "</error>"
466
467 return errs
468
469 @asyncio.coroutine
470 def publish_action_status(self):
471 """
472 Starts publishing the status for jobs/primitives
473 """
474 registration_handle = yield from self.dts.register(
475 xpath=self.job.xpath,
476 handler=rift.tasklets.DTS.RegistrationHandler(),
477 flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
478 )
479
480 self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
481 self.job.regh = registration_handle
482
483 try:
484 registration_handle.create_element(self.job.xpath, self.job.job)
485
486 # If the config is done via a user defined script
487 if self.job.tasks is not None:
488 yield from self._monitor_processes(registration_handle)
489 return
490
491 prev = time.time()
492 # Run until pending moves to either failure/success
493 while self.job.job_status == "pending":
494 curr = time.time()
495
496 if curr - prev < self.polling_period:
497 pause = self.polling_period - (curr - prev)
498 yield from asyncio.sleep(pause, loop=self.loop)
499
500 prev = time.time()
501
502 tasks = []
503 for vnfr in self.job.job.vnfr:
504 task = self.loop.create_task(self.get_vnfr_status(vnfr))
505 tasks.append(task)
506
507 # Exit, if no tasks are found
508 if not tasks:
509 break
510
511 yield from asyncio.wait(tasks, loop=self.loop)
512
513 job_status = [task.result() for task in tasks]
514
515 if "failure" in job_status:
516 self.job.job_status = "failure"
517 errs = self.get_error_details()
518 if len(errs):
519 self.job.job.job_status_details = errs
520 elif "pending" in job_status:
521 self.job.job_status = "pending"
522 else:
523 self.job.job_status = "success"
524
525 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
526 # self.job.job_status,
527 # self.job.xpath,
528 # self.job.nsr_id))
529
530 registration_handle.update_element(self.job.xpath, self.job.job)
531
532
533 except Exception as e:
534 self.log.exception(e)
535 raise
536
537
538 @asyncio.coroutine
539 def get_vnfr_status(self, vnfr):
540 """Schedules tasks for all containing primitives and updates it's own
541 status.
542
543 Args:
544 vnfr : Vnfr job record containing primitives.
545
546 Returns:
547 (str): "success|failure|pending"
548 """
549 tasks = []
550 job_status = []
551
552 for primitive in vnfr.primitive:
553 if primitive.execution_status != 'pending':
554 continue
555
556 if primitive.execution_id == "":
557 # Actions which failed to queue can have empty id
558 job_status.append(primitive.execution_status)
559 continue
560
561 elif primitive.execution_id == "config":
562 # Config job. Check if service is active
563 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
564
565 else:
566 task = self.loop.create_task(self.get_primitive_status(primitive))
567
568 tasks.append(task)
569
570 if tasks:
571 yield from asyncio.wait(tasks, loop=self.loop)
572
573 job_status.extend([task.result() for task in tasks])
574 if "failure" in job_status:
575 vnfr.vnf_job_status = "failure"
576 return "failure"
577
578 elif "pending" in job_status:
579 vnfr.vnf_job_status = "pending"
580 return "pending"
581
582 else:
583 vnfr.vnf_job_status = "success"
584 return "success"
585
586 @asyncio.coroutine
587 def get_service_status(self, vnfr_id, primitive):
588 try:
589 status = yield from self.loop.run_in_executor(
590 self.executor,
591 self.config_plugin.get_service_status,
592 vnfr_id
593 )
594
595 self.log.debug("Service status: {}".format(status))
596 if status in ['error', 'blocked']:
597 self.log.warning("Execution of config {} failed: {}".
598 format(primitive.execution_id, status))
599 primitive.execution_error_details = 'Config failed'
600 status = 'failure'
601 elif status in ['active']:
602 status = 'success'
603 elif status is None:
604 status = 'failure'
605 else:
606 status = 'pending'
607
608 except Exception as e:
609 self.log.exception(e)
610 status = "failed"
611
612 primitive.execution_status = status
613 return primitive.execution_status
614
615 @asyncio.coroutine
616 def get_primitive_status(self, primitive):
617 """
618 Queries the juju api and gets the status of the execution id.
619
620 Args:
621 primitive : Primitive containing the execution ID.
622 """
623
624 try:
625 resp = yield from self.loop.run_in_executor(
626 self.executor,
627 self.config_plugin.get_action_status,
628 primitive.execution_id
629 )
630
631 self.log.debug("Action status: {}".format(resp))
632 status = resp['status']
633 if status == 'failed':
634 self.log.warning("Execution of action {} failed: {}".
635 format(primitive.execution_id, resp))
636 primitive.execution_error_details = resp['message']
637
638 except Exception as e:
639 self.log.exception(e)
640 status = "failed"
641
642 # Handle case status is None
643 if status:
644 primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
645 else:
646 primitive.execution_status = "failure"
647
648 return primitive.execution_status
649
650
651 class CfgAgentJobDtsHandler(object):
652 """Dts Handler for CfgAgent"""
653 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
654
655 def __init__(self, dts, log, loop, nsm, cfgm):
656 """
657 Args:
658 dts : Dts Handle.
659 log : Log handle.
660 loop : Event loop.
661 nsm : NsmManager.
662 cfgm : ConfigManager.
663 """
664 self._dts = dts
665 self._log = log
666 self._loop = loop
667 self._cfgm = cfgm
668 self._nsm = nsm
669
670 self._regh = None
671 self._nsr_regh = None
672
673 @property
674 def regh(self):
675 """ Return registration handle """
676 return self._regh
677
678 @property
679 def nsm(self):
680 """ Return the NSManager manager instance """
681 return self._nsm
682
683 @property
684 def cfgm(self):
685 """ Return the ConfigManager manager instance """
686 return self._cfgm
687
688 @staticmethod
689 def cfg_job_xpath(nsr_id, job_id):
690 return ("D,/nsr:ns-instance-opdata" +
691 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
692 "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
693
694 @asyncio.coroutine
695 def register(self):
696 """ Register for NS monitoring read from dts """
697
698 @asyncio.coroutine
699 def on_prepare(xact_info, action, ks_path, msg):
700 """ prepare callback from dts """
701 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
702 if action == rwdts.QueryAction.READ:
703 schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
704 path_entry = schema.keyspec_to_entry(ks_path)
705 try:
706 nsr_id = path_entry.key00.ns_instance_config_ref
707
708 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
709 nsr_ids = []
710 if nsr_id is None or nsr_id == "":
711 nsrs = list(self.nsm.nsrs.values())
712 nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
713 else:
714 nsr_ids = [nsr_id]
715
716 for nsr_id in nsr_ids:
717 jobs = self.cfgm.get_job(nsr_id)
718
719 for job in jobs:
720 xact_info.respond_xpath(
721 rwdts.XactRspCode.MORE,
722 CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
723 job.job)
724
725 except Exception as e:
726 self._log.exception("Caught exception:%s", str(e))
727 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
728
729 else:
730 xact_info.respond_xpath(rwdts.XactRspCode.NA)
731
732 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
733 with self._dts.group_create() as group:
734 self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
735 handler=hdl,
736 flags=rwdts.Flag.PUBLISHER,
737 )
738
739 @asyncio.coroutine
740 def _terminate_nsr(self, nsr_id):
741 self._log.debug("NSR {} being terminated".format(nsr_id))
742 jobs = self.cfgm.get_job(nsr_id)
743 for job in jobs:
744 path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
745 with self._dts.transaction() as xact:
746 self._log.debug("Deleting job: {}".format(path))
747 job.regh.delete_element(path)
748 self._log.debug("Deleted job: {}".format(path))
749
750 # Remove the NSR id in manager
751 self.cfgm.del_nsr(nsr_id)
752
753 @property
754 def nsr_xpath(self):
755 return "D,/nsr:ns-instance-opdata/nsr:nsr"
756
757 @asyncio.coroutine
758 def register_for_nsr(self):
759 """ Register for NSR changes """
760
761 @asyncio.coroutine
762 def on_prepare(xact_info, query_action, ks_path, msg):
763 """ This NSR is created """
764 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
765 query_action,
766 ks_path,
767 msg)
768
769 if (query_action == rwdts.QueryAction.UPDATE or
770 query_action == rwdts.QueryAction.CREATE):
771 pass
772 elif query_action == rwdts.QueryAction.DELETE:
773 nsr_id = msg.ns_instance_config_ref
774 asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
775 else:
776 raise NotImplementedError(
777 "%s action on cm-state not supported",
778 query_action)
779
780 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
781
782 try:
783 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
784 self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
785 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
786 handler=handler)
787 except Exception as e:
788 self._log.error("Failed to register for NSR changes as %s", str(e))
789
790
791 class ConfigAgentJobManager(object):
792 """A central class that manager all the Config Agent related data,
793 Including updating the status
794
795 TODO: Needs to support multiple config agents.
796 """
797 def __init__(self, dts, log, loop, nsm):
798 """
799 Args:
800 dts : Dts handle
801 log : Log handler
802 loop : Event loop
803 nsm : NsmTasklet instance
804 """
805 self.jobs = {}
806 self.dts = dts
807 self.log = log
808 self.loop = loop
809 self.nsm = nsm
810 self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
811 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
812
813 def add_job(self, rpc_output, tasks=None):
814 """Once an RPC is trigger add a now job
815
816 Args:
817 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
818 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
819 tasks(list) A list of asyncio.Tasks
820
821 """
822 nsr_id = rpc_output.nsr_id_ref
823
824 job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
825
826 self.log.debug("Creating a job monitor for Job id: {}".format(
827 rpc_output.job_id))
828
829 if nsr_id not in self.jobs:
830 self.jobs[nsr_id] = [job]
831 else:
832 self.jobs[nsr_id].append(job)
833
834 # If the tasks are none, assume juju actions
835 # TBD: This logic need to be revisited
836 ca = self.nsm.config_agent_plugins[0]
837 if tasks is None:
838 for agent in self.nsm.config_agent_plugins:
839 if agent.agent_type == 'juju':
840 ca = agent
841 break
842
843 # For every Job we will schedule a new monitoring process.
844 job_monitor = ConfigAgentJobMonitor(
845 self.dts,
846 self.log,
847 job,
848 self.executor,
849 self.loop,
850 ca
851 )
852 task = self.loop.create_task(job_monitor.publish_action_status())
853
854 def get_job(self, nsr_id):
855 """Get the job associated with the NSR Id, if present."""
856 try:
857 return self.jobs[nsr_id]
858 except KeyError:
859 return []
860
861 def del_nsr(self, nsr_id):
862 """Delete a NSR id from the jobs list"""
863 if nsr_id in self.jobs:
864 self.jobs.pop(nsr_id)
865
866 @asyncio.coroutine
867 def register(self):
868 yield from self.handler.register()
869 yield from self.handler.register_for_nsr()