2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
22 gi
.require_version('RwNsrYang', '1.0')
24 from gi
.repository
import (
31 gi
.require_version('RwKeyspec', '1.0')
32 from gi
.repository
.RwKeyspec
import quoted_key
35 import rift
.mano
.utils
.juju_api
as juju
38 class ConfigAgentAccountNotFound(Exception):
42 class JujuClient(object):
43 def __init__(self
, log
, ip
, port
, user
, passwd
):
50 self
._api
= juju
.JujuApi(log
=log
,
52 user
=user
, secret
=passwd
)
54 def validate_account_creds(self
):
55 """Validate the account credentials.
57 Verifies if the account credentials can connect and login to a Juju
58 controller at the provided IP address.
61 details
= "Connection status not known."
63 loop
= asyncio
.new_event_loop()
64 asyncio
.set_event_loop(loop
)
66 loop
.run_until_complete(asyncio
.gather(
71 except Exception as e
:
72 msg
= "JujuClient: Connection Failed: %s", str(e
)
76 self
._log
.error("Success reached.")
78 details
= "Connection was successful"
79 self
._log
.info("JujuClient: Connection Successful")
83 return RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
85 details
="Connection status lookup not started"
89 class ConfigAgentAccount(object):
90 def __init__(self
, log
, account_msg
):
92 self
._account
_msg
= account_msg
.deep_copy()
94 if account_msg
.account_type
== "juju":
95 self
._cfg
_agent
_client
_plugin
= JujuClient(
97 account_msg
.juju
.ip_address
,
98 account_msg
.juju
.port
,
99 account_msg
.juju
.user
,
100 account_msg
.juju
.secret
)
102 self
._cfg
_agent
_client
_plugin
= None
104 self
._status
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
106 details
="Connection status lookup not started"
109 self
._validate
_task
= None
113 return self
._account
_msg
.name
116 def account_msg(self
):
117 return self
._account
_msg
120 def account_type(self
):
121 return self
._account
_msg
.account_type
124 def connection_status(self
):
127 def update_from_cfg(self
, cfg
):
128 self
._log
.debug("Updating parent ConfigAgentAccount to %s", cfg
)
129 raise NotImplementedError("Update config agent account not yet supported")
132 def validate_cfg_agent_account_credentials(self
, loop
):
133 self
._log
.debug("Validating Config Agent Account %s, credential status %s", self
._account
_msg
, self
._status
)
135 self
._status
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
137 details
="Config Agent account connection validation in progress"
140 if self
._cfg
_agent
_client
_plugin
is None:
141 self
._status
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
143 details
="Config Agent account does not support validation of account creds"
147 status
= yield from loop
.run_in_executor(
149 self
._cfg
_agent
_client
_plugin
.validate_account_creds
,
151 self
._status
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus
.from_dict(status
.as_dict())
152 except Exception as e
:
153 self
._status
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
155 details
="Error - " + str(e
)
158 self
._log
.info("Got config agent account validation response: %s", self
._status
)
160 def start_validate_credentials(self
, loop
):
161 if self
._validate
_task
is not None:
162 self
._validate
_task
.cancel()
163 self
._validate
_task
= None
165 self
._validate
_task
= asyncio
.ensure_future(
166 self
.validate_cfg_agent_account_credentials(loop
),
170 class CfgAgentDtsOperdataHandler(object):
171 def __init__(self
, dts
, log
, loop
, project
):
175 self
._project
= project
177 self
.cfg_agent_accounts
= {}
178 self
._show
_reg
= None
181 def add_cfg_agent_account(self
, account_msg
):
182 account
= ConfigAgentAccount(self
._log
, account_msg
)
183 self
.cfg_agent_accounts
[account
.name
] = account
184 self
._log
.info("ConfigAgent Operdata Handler added. Starting account validation")
186 account
.start_validate_credentials(self
._loop
)
188 def delete_cfg_agent_account(self
, account_name
):
189 del self
.cfg_agent_accounts
[account_name
]
190 self
._log
.info("ConfigAgent Operdata Handler deleted.")
192 def get_saved_cfg_agent_accounts(self
, cfg_agent_account_name
):
193 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
194 saved_cfg_agent_accounts
= []
196 if cfg_agent_account_name
is None or cfg_agent_account_name
== "":
197 cfg_agent_accounts
= list(self
.cfg_agent_accounts
.values())
198 saved_cfg_agent_accounts
.extend(cfg_agent_accounts
)
199 elif cfg_agent_account_name
in self
.cfg_agent_accounts
:
200 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
201 saved_cfg_agent_accounts
.append(account
)
203 errstr
= "Config Agent account {} does not exist".format(cfg_agent_account_name
)
204 raise KeyError(errstr
)
206 return saved_cfg_agent_accounts
209 def _register_show_status(self
):
210 def get_xpath(cfg_agent_name
=None):
211 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
212 "[name=%s]" % quoted_key(cfg_agent_name
) if cfg_agent_name
is not None else ''
216 def on_prepare(xact_info
, action
, ks_path
, msg
):
217 path_entry
= RwConfigAgentYang
.YangData_RwProject_Project_ConfigAgent_Account
.schema().keyspec_to_entry(ks_path
)
218 cfg_agent_account_name
= path_entry
.key00
.name
219 self
._log
.debug("Got show cfg_agent connection status request: %s", ks_path
.create_string())
222 saved_accounts
= self
.get_saved_cfg_agent_accounts(cfg_agent_account_name
)
223 for account
in saved_accounts
:
224 connection_status
= account
.connection_status
225 self
._log
.debug("Responding to config agent connection status request: %s", connection_status
)
226 xpath
= self
._project
.add_project(get_xpath(account
.name
))
227 xact_info
.respond_xpath(
228 rwdts
.XactRspCode
.MORE
,
230 msg
=account
.connection_status
,
232 except KeyError as e
:
233 self
._log
.warning(str(e
))
234 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
237 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
239 xpath
= self
._project
.add_project(get_xpath())
240 self
._show
_reg
= yield from self
._dts
.register(
242 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
243 on_prepare
=on_prepare
),
244 flags
=rwdts
.Flag
.PUBLISHER
,
247 def _register_validate_rpc(self
):
249 return "/rw-config-agent:update-cfg-agent-status"
252 def on_prepare(xact_info
, action
, ks_path
, msg
):
253 if not msg
.has_field("cfg_agent_account"):
254 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
256 cfg_agent_account_name
= msg
.cfg_agent_account
258 if not self
._project
.rpc_check(msg
, xact_info
=xact_info
):
262 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
264 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name
)
266 account
.start_validate_credentials(self
._loop
)
268 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
270 self
._rpc
_reg
= yield from self
._dts
.register(
272 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
273 on_prepare
=on_prepare
275 flags
=rwdts
.Flag
.PUBLISHER
,
280 yield from self
._register
_show
_status
()
281 yield from self
._register
_validate
_rpc
()
283 def deregister(self
):
284 self
._show
_reg
.deregister()
285 self
._rpc
_reg
.deregister()
288 class ConfigAgentJob(object):
289 """A wrapper over the config agent job object, providing some
290 convenience functions.
292 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
299 # The normalizes the state terms from Juju to our yang models
301 STATUS_MAP
= {"completed": "success",
302 "pending" : "pending",
303 "running" : "pending",
304 "failed" : "failure"}
306 def __init__(self
, nsr_id
, job
, project
, tasks
=None):
309 nsr_id (uuid): ID of NSR record
310 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
311 tasks: List of asyncio.tasks. If provided the job monitor will
312 use it to monitor the tasks instead of the execution IDs
317 self
._project
= project
324 return self
._job
.job_id
329 return self
._job
.job_name
332 def job_status(self
):
333 """Status of the job (success|pending|failure)"""
334 return self
._job
.job_status
337 def job_status(self
, value
):
338 """Setter for job status"""
339 self
._job
.job_status
= value
348 """Xpath of the job"""
349 return self
._project
.add_project(("D,/nsr:ns-instance-opdata" +
350 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
351 "/nsr:config-agent-job[nsr:job-id={}]"
352 ).format(quoted_key(self
.nsr_id
), quoted_key(str(self
.id))))
356 """Registration handle for the job"""
361 """Setter for registration handle"""
365 def convert_rpc_input_to_job(nsr_id
, rpc_output
, tasks
, project
):
366 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
367 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
370 nsr_id (uuid): NSR ID
371 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
372 tasks (list): A list of asyncio.Tasks
377 # Shortcuts to prevent the HUUGE names.
378 CfgAgentJob
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
379 CfgAgentVnfr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
380 CfgAgentPrimitive
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
381 CfgAgentPrimitiveParam
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
383 job
= CfgAgentJob
.from_dict({
384 "job_id": rpc_output
.job_id
,
385 "job_name" : rpc_output
.name
,
386 "job_status": "pending",
387 "triggered_by": rpc_output
.triggered_by
,
388 "create_time": rpc_output
.create_time
,
389 "job_status_details": rpc_output
.job_status_details
if rpc_output
.job_status_details
is not None else None,
390 "parameter": [param
.as_dict() for param
in rpc_output
.parameter
],
391 "parameter_group": [pg
.as_dict() for pg
in rpc_output
.parameter_group
]
394 for vnfr
in rpc_output
.vnf_out_list
:
395 vnfr_job
= CfgAgentVnfr
.from_dict({
396 "id": vnfr
.vnfr_id_ref
,
397 "vnf_job_status": "pending",
400 for primitive
in vnfr
.vnf_out_primitive
:
401 vnf_primitive
= CfgAgentPrimitive
.from_dict({
402 "name": primitive
.name
,
403 "execution_status": ConfigAgentJob
.STATUS_MAP
[primitive
.execution_status
],
404 "execution_id": primitive
.execution_id
,
405 "execution_error_details": primitive
.execution_error_details
,
408 # Copy over the input param
409 for param
in primitive
.parameter
:
410 vnf_primitive
.parameter
.append(
411 CfgAgentPrimitiveParam
.from_dict({
416 vnfr_job
.primitive
.append(vnf_primitive
)
418 job
.vnfr
.append(vnfr_job
)
420 return ConfigAgentJob(nsr_id
, job
, project
, tasks
)
423 class ConfigAgentJobMonitor(object):
424 """Job monitor: Polls the Juju controller and get the status.
426 If all Primitive are success, then vnf & nsr status will be "success"
427 If any one Primitive reaches a failed state then both vnf and nsr will fail.
431 def __init__(self
, dts
, log
, job
, executor
, loop
, config_plugin
):
436 job (ConfigAgentJob): ConfigAgentJob instance
437 executor (concurrent.futures): Executor for juju status api calls
438 loop (eventloop): Current event loop instance
439 config_plugin : Config plugin to be used.
444 self
.executor
= executor
445 self
.polling_period
= ConfigAgentJobMonitor
.POLLING_PERIOD
446 self
.config_plugin
= config_plugin
450 def _monitor_processes(self
, registration_handle
):
453 for process
in self
.job
.tasks
:
454 if isinstance(process
, asyncio
.subprocess
.Process
):
455 rc
= yield from process
.wait()
456 err
= yield from process
.stderr
.read()
460 rc
= yield from process
463 self
.log
.debug("Process {} returned rc: {}, err: {}".
464 format(process
, rc
, err
))
468 errs
+= "<success>{}</success>".format(err
)
470 errs
+= "<error>{}</error>".format(err
)
474 self
.job
.job_status
= "success"
476 self
.job
.job_status
= "failure"
479 self
.job
.job
.job_status_details
= errs
481 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
483 def get_execution_details(self
):
484 '''Get the error details from failed primitives'''
486 for vnfr
in self
.job
.job
.vnfr
:
487 for primitive
in vnfr
.primitive
:
488 if primitive
.execution_status
== "failure":
490 if primitive
.execution_error_details
:
491 errs
+= primitive
.execution_error_details
493 errs
+= '{}: Unknown error'.format(primitive
.name
)
496 if primitive
.execution_error_details
:
497 errs
+= '<{status}>{details}</{status}>'.format(
498 status
=primitive
.execution_status
,
499 details
=primitive
.execution_error_details
)
503 def publish_action_status(self
):
505 Starts publishing the status for jobs/primitives
507 registration_handle
= yield from self
.dts
.register(
508 xpath
=self
.job
.xpath
,
509 handler
=rift
.tasklets
.DTS
.RegistrationHandler(),
510 flags
=(rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
),
513 self
.log
.debug('preparing to publish job status for {}'.format(self
.job
.xpath
))
514 self
.job
.regh
= registration_handle
517 registration_handle
.create_element(self
.job
.xpath
, self
.job
.job
)
519 # If the config is done via a user defined script
520 if self
.job
.tasks
is not None:
521 yield from self
._monitor
_processes
(registration_handle
)
525 # Run until pending moves to either failure/success
526 while self
.job
.job_status
== "pending":
529 if curr
- prev
< self
.polling_period
:
530 pause
= self
.polling_period
- (curr
- prev
)
531 yield from asyncio
.sleep(pause
, loop
=self
.loop
)
536 for vnfr
in self
.job
.job
.vnfr
:
537 task
= self
.loop
.create_task(self
.get_vnfr_status(vnfr
))
540 # Exit, if no tasks are found
544 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
546 job_status
= [task
.result() for task
in tasks
]
548 if "failure" in job_status
:
549 self
.job
.job_status
= "failure"
550 elif "pending" in job_status
:
551 self
.job
.job_status
= "pending"
553 self
.job
.job_status
= "success"
555 errs
= self
.get_execution_details()
557 self
.job
.job
.job_status_details
= errs
559 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
560 # self.job.job_status,
564 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
566 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
568 except Exception as e
:
569 self
.log
.exception(e
)
574 def get_vnfr_status(self
, vnfr
):
575 """Schedules tasks for all containing primitives and updates it's own
579 vnfr : Vnfr job record containing primitives.
582 (str): "success|failure|pending"
587 for primitive
in vnfr
.primitive
:
588 if primitive
.execution_status
!= 'pending':
589 if primitive
.execution_id
== "":
590 # We may not have processed the status for these yet
591 job_status
.append(primitive
.execution_status
)
594 if primitive
.execution_id
== "":
595 # Actions which failed to queue can have empty id
596 job_status
.append(primitive
.execution_status
)
599 if primitive
.execution_id
== "config":
600 # Config job. Check if service is active
601 task
= self
.loop
.create_task(self
.get_service_status(vnfr
.id, primitive
))
604 task
= self
.loop
.create_task(self
.get_primitive_status(primitive
))
609 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
611 job_status
.extend([task
.result() for task
in tasks
])
612 if "failure" in job_status
:
613 vnfr
.vnf_job_status
= "failure"
616 elif "pending" in job_status
:
617 vnfr
.vnf_job_status
= "pending"
621 vnfr
.vnf_job_status
= "success"
625 def get_service_status(self
, vnfr_id
, primitive
):
627 status
= yield from self
.loop
.run_in_executor(
629 self
.config_plugin
.get_service_status
,
633 self
.log
.debug("Service status: {}".format(status
))
634 if status
in ['error', 'blocked']:
635 self
.log
.warning("Execution of config {} failed: {}".
636 format(primitive
.execution_id
, status
))
637 primitive
.execution_error_details
= 'Config failed'
639 elif status
in ['active']:
646 except Exception as e
:
647 self
.log
.exception(e
)
650 primitive
.execution_status
= status
651 return primitive
.execution_status
654 def get_primitive_status(self
, primitive
):
656 Queries the juju api and gets the status of the execution id.
659 primitive : Primitive containing the execution ID.
663 resp
= yield from self
.loop
.run_in_executor(
665 self
.config_plugin
.get_action_status
,
666 primitive
.execution_id
669 self
.log
.debug("Action status: {}".format(resp
))
670 status
= resp
['status']
671 if status
== 'failed':
672 self
.log
.warning("Execution of action {} failed: {}".
673 format(primitive
.execution_id
, resp
))
674 primitive
.execution_error_details
= resp
['message']
676 except Exception as e
:
677 self
.log
.exception(e
)
680 # Handle case status is None
682 primitive
.execution_status
= ConfigAgentJob
.STATUS_MAP
[status
]
684 primitive
.execution_status
= "failure"
686 return primitive
.execution_status
689 class CfgAgentJobDtsHandler(object):
690 """Dts Handler for CfgAgent"""
691 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
693 def __init__(self
, dts
, log
, loop
, nsm
, cfgm
):
700 cfgm : ConfigManager.
709 self
._project
= cfgm
.project
713 """ Return registration handle """
718 """ Return the NSManager manager instance """
723 """ Return the ConfigManager manager instance """
726 def cfg_job_xpath(self
, nsr_id
, job_id
):
727 return self
._project
.add_project(("D,/nsr:ns-instance-opdata" +
728 "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
729 "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id
), quoted_key(str(job_id
))))
733 """ Register for NS monitoring read from dts """
736 def on_prepare(xact_info
, action
, ks_path
, msg
):
737 """ prepare callback from dts """
738 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
739 if action
== rwdts
.QueryAction
.READ
:
740 schema
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
.schema()
741 path_entry
= schema
.keyspec_to_entry(ks_path
)
743 nsr_id
= path_entry
.key00
.ns_instance_config_ref
745 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
747 if nsr_id
is None or nsr_id
== "":
748 nsrs
= list(self
.nsm
.nsrs
.values())
749 nsr_ids
= [nsr
.id for nsr
in nsrs
if nsr
is not None]
753 for nsr_id
in nsr_ids
:
754 jobs
= self
.cfgm
.get_job(nsr_id
)
757 xact_info
.respond_xpath(
758 rwdts
.XactRspCode
.MORE
,
759 self
.cfg_job_xpath(nsr_id
, job
.id),
762 except Exception as e
:
763 self
._log
.exception("Caught exception:%s", str(e
))
764 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
767 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
769 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
770 with self
._dts
.group_create() as group
:
771 self
._regh
= group
.register(xpath
=self
._project
.add_project(
772 CfgAgentJobDtsHandler
.XPATH
),
774 flags
=rwdts
.Flag
.PUBLISHER
,
777 def _terminate_nsr(self
, nsr_id
):
778 self
._log
.debug("NSR {} being terminated".format(nsr_id
))
779 jobs
= self
.cfgm
.get_job(nsr_id
)
781 path
= self
.cfg_job_xpath(nsr_id
, job
.id)
782 with self
._dts
.transaction() as xact
:
783 self
._log
.debug("Deleting job: {}".format(path
))
784 job
.regh
.delete_element(path
)
785 self
._log
.debug("Deleted job: {}".format(path
))
787 # Remove the NSR id in manager
788 self
.cfgm
.del_nsr(nsr_id
)
792 return self
._project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
794 def deregister(self
):
795 self
._log
.debug("De-register config agent job for project".
796 format(self
._project
.name
))
798 self
._regh
.deregister()
802 class ConfigAgentJobManager(object):
803 """A central class that manager all the Config Agent related data,
804 Including updating the status
806 TODO: Needs to support multiple config agents.
808 def __init__(self
, dts
, log
, loop
, project
, nsm
):
814 nsm : NsmTasklet instance
821 self
.project
= project
822 self
.handler
= CfgAgentJobDtsHandler(dts
, log
, loop
, nsm
, self
)
823 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
825 def add_job(self
, rpc_output
, tasks
=None):
826 """Once an RPC is triggered, add a new job
829 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
830 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
831 tasks(list) A list of asyncio.Tasks
834 nsr_id
= rpc_output
.nsr_id_ref
836 job
= ConfigAgentJob
.convert_rpc_input_to_job(nsr_id
, rpc_output
,
839 self
.log
.debug("Creating a job monitor for Job id: {}".format(
842 if nsr_id
not in self
.jobs
:
843 self
.jobs
[nsr_id
] = [job
]
845 self
.jobs
[nsr_id
].append(job
)
847 # If the tasks are none, assume juju actions
848 # TBD: This logic need to be revisited
849 ca
= self
.nsm
.config_agent_plugins
[0]
851 for agent
in self
.nsm
.config_agent_plugins
:
852 if agent
.agent_type
== 'juju':
856 def done_callback(fut
):
859 self
.log
.error("Exception on monitor job {}: {}".
860 format(rpc_output
.job_id
, e
))
862 self
.log
.debug("Monitor job done for {}".format(rpc_output
.job_id
))
864 # For every Job we will schedule a new monitoring process.
865 job_monitor
= ConfigAgentJobMonitor(
873 task
= self
.loop
.create_task(job_monitor
.publish_action_status())
874 task
.add_done_callback(done_callback
)
876 def get_job(self
, nsr_id
):
877 """Get the job associated with the NSR Id, if present."""
879 return self
.jobs
[nsr_id
]
883 def del_nsr(self
, nsr_id
):
884 """Delete a NSR id from the jobs list"""
885 if nsr_id
in self
.jobs
:
886 self
.jobs
.pop(nsr_id
)
890 yield from self
.handler
.register()
891 # yield from self.handler.register_for_nsr()
893 def deregister(self
):
894 self
.handler
.deregister()