2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
30 import rift
.mano
.utils
.juju_api
as juju
33 class ConfigAgentAccountNotFound(Exception):
36 class JujuClient(object):
37 def __init__(self
, log
, ip
, port
, user
, passwd
):
44 self
._api
= juju
.JujuApi(log
=log
,
46 user
=user
, secret
=passwd
)
49 def validate_account_creds(self
):
50 status
= RwcalYang
.CloudConnectionStatus()
52 env
= self
._api
._get
_env
()
53 except juju
.JujuEnvError
as e
:
54 msg
= "JujuClient: Invalid account credentials: %s", str(e
)
57 except ConnectionRefusedError
as e
:
58 msg
= "JujuClient: Wrong IP or Port: %s", str(e
)
61 except Exception as e
:
62 msg
= "JujuClient: Connection Failed: %s", str(e
)
66 status
.status
= "success"
67 status
.details
= "Connection was successful"
68 self
._log
.info("JujuClient: Connection Successful")
73 class ConfigAgentAccount(object):
74 def __init__(self
, log
, account_msg
):
76 self
._account
_msg
= account_msg
.deep_copy()
78 if account_msg
.account_type
== "juju":
79 self
._cfg
_agent
_client
_plugin
= JujuClient(
81 account_msg
.juju
.ip_address
,
82 account_msg
.juju
.port
,
83 account_msg
.juju
.user
,
84 account_msg
.juju
.secret
)
86 self
._cfg
_agent
_client
_plugin
= None
88 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
90 details
="Connection status lookup not started"
93 self
._validate
_task
= None
97 return self
._account
_msg
.name
100 def account_msg(self
):
101 return self
._account
_msg
104 def account_type(self
):
105 return self
._account
_msg
.account_type
108 def connection_status(self
):
111 def update_from_cfg(self
, cfg
):
112 self
._log
.debug("Updating parent ConfigAgentAccount to %s", cfg
)
113 raise NotImplementedError("Update config agent account not yet supported")
116 def validate_cfg_agent_account_credentials(self
, loop
):
117 self
._log
.debug("Validating Config Agent Account %s, credential status %s", self
._account
_msg
, self
._status
)
119 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
121 details
="Config Agent account connection validation in progress"
124 if self
._cfg
_agent
_client
_plugin
is None:
125 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
127 details
="Config Agent account does not support validation of account creds"
131 status
= yield from loop
.run_in_executor(
133 self
._cfg
_agent
_client
_plugin
.validate_account_creds
135 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus
.from_dict(status
.as_dict())
136 except Exception as e
:
137 self
._status
= RwConfigAgentYang
.ConfigAgentAccount_ConnectionStatus(
139 details
="Error - " + str(e
)
142 self
._log
.info("Got config agent account validation response: %s", self
._status
)
144 def start_validate_credentials(self
, loop
):
145 if self
._validate
_task
is not None:
146 self
._validate
_task
.cancel()
147 self
._validate
_task
= None
149 self
._validate
_task
= asyncio
.ensure_future(
150 self
.validate_cfg_agent_account_credentials(loop
),
154 class CfgAgentDtsOperdataHandler(object):
155 def __init__(self
, dts
, log
, loop
, project
):
159 self
._project
= project
161 self
.cfg_agent_accounts
= {}
162 self
._show
_reg
= None
165 def add_cfg_agent_account(self
, account_msg
):
166 account
= ConfigAgentAccount(self
._log
, account_msg
)
167 self
.cfg_agent_accounts
[account
.name
] = account
168 self
._log
.info("ConfigAgent Operdata Handler added. Starting account validation")
170 account
.start_validate_credentials(self
._loop
)
172 def delete_cfg_agent_account(self
, account_name
):
173 del self
.cfg_agent_accounts
[account_name
]
174 self
._log
.info("ConfigAgent Operdata Handler deleted.")
176 def get_saved_cfg_agent_accounts(self
, cfg_agent_account_name
):
177 ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
178 saved_cfg_agent_accounts
= []
180 if cfg_agent_account_name
is None or cfg_agent_account_name
== "":
181 cfg_agent_accounts
= list(self
.cfg_agent_accounts
.values())
182 saved_cfg_agent_accounts
.extend(cfg_agent_accounts
)
183 elif cfg_agent_account_name
in self
.cfg_agent_accounts
:
184 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
185 saved_cfg_agent_accounts
.append(account
)
187 errstr
= "Config Agent account {} does not exist".format(cfg_agent_account_name
)
188 raise KeyError(errstr
)
190 return saved_cfg_agent_accounts
193 def _register_show_status(self
):
194 def get_xpath(cfg_agent_name
=None):
195 return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
196 "[name='%s']" % cfg_agent_name
if cfg_agent_name
is not None else ''
200 def on_prepare(xact_info
, action
, ks_path
, msg
):
201 path_entry
= RwConfigAgentYang
.ConfigAgentAccount
.schema().keyspec_to_entry(ks_path
)
202 cfg_agent_account_name
= path_entry
.key00
.name
203 self
._log
.debug("Got show cfg_agent connection status request: %s", ks_path
.create_string())
206 saved_accounts
= self
.get_saved_cfg_agent_accounts(cfg_agent_account_name
)
207 for account
in saved_accounts
:
208 connection_status
= account
.connection_status
209 self
._log
.debug("Responding to config agent connection status request: %s", connection_status
)
210 xpath
= self
._project
.add_project(get_xpath(account
.name
))
211 xact_info
.respond_xpath(
212 rwdts
.XactRspCode
.MORE
,
214 msg
=account
.connection_status
,
216 except KeyError as e
:
217 self
._log
.warning(str(e
))
218 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
221 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
223 xpath
= self
._project
.add_project(get_xpath())
224 self
._show
_reg
= yield from self
._dts
.register(
226 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
227 on_prepare
=on_prepare
),
228 flags
=rwdts
.Flag
.PUBLISHER
,
231 def _register_validate_rpc(self
):
233 return "/rw-config-agent:update-cfg-agent-status"
236 def on_prepare(xact_info
, action
, ks_path
, msg
):
237 if not msg
.has_field("cfg_agent_account"):
238 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
240 cfg_agent_account_name
= msg
.cfg_agent_account
242 if not self
._project
.rpc_check(msg
, xact_info
=xact_info
):
246 account
= self
.cfg_agent_accounts
[cfg_agent_account_name
]
248 raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name
)
250 account
.start_validate_credentials(self
._loop
)
252 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
254 self
._rpc
_reg
= yield from self
._dts
.register(
256 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
257 on_prepare
=on_prepare
259 flags
=rwdts
.Flag
.PUBLISHER
,
264 yield from self
._register
_show
_status
()
265 yield from self
._register
_validate
_rpc
()
267 def deregister(self
):
268 self
._show
_reg
.deregister()
269 self
._rpc
_reg
.deregister()
272 class ConfigAgentJob(object):
273 """A wrapper over the config agent job object, providing some
274 convenience functions.
276 YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
283 # The normalizes the state terms from Juju to our yang models
285 STATUS_MAP
= {"completed": "success",
286 "pending" : "pending",
287 "running" : "pending",
288 "failed" : "failure"}
290 def __init__(self
, nsr_id
, job
, project
, tasks
=None):
293 nsr_id (uuid): ID of NSR record
294 job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
295 tasks: List of asyncio.tasks. If provided the job monitor will
296 use it to monitor the tasks instead of the execution IDs
301 self
._project
= project
308 return self
._job
.job_id
313 return self
._job
.job_name
316 def job_status(self
):
317 """Status of the job (success|pending|failure)"""
318 return self
._job
.job_status
321 def job_status(self
, value
):
322 """Setter for job status"""
323 self
._job
.job_status
= value
332 """Xpath of the job"""
333 return self
._project
.add_project(("D,/nsr:ns-instance-opdata" +
334 "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
335 "/nsr:config-agent-job[nsr:job-id='{}']"
336 ).format(self
.nsr_id
, self
.id))
340 """Registration handle for the job"""
345 """Setter for registration handle"""
349 def convert_rpc_input_to_job(nsr_id
, rpc_output
, tasks
):
350 """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
351 to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
354 nsr_id (uuid): NSR ID
355 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
356 tasks (list): A list of asyncio.Tasks
361 # Shortcuts to prevent the HUUGE names.
362 CfgAgentJob
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
363 CfgAgentVnfr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
364 CfgAgentPrimitive
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
365 CfgAgentPrimitiveParam
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
367 job
= CfgAgentJob
.from_dict({
368 "job_id": rpc_output
.job_id
,
369 "job_name" : rpc_output
.name
,
370 "job_status": "pending",
371 "triggered_by": rpc_output
.triggered_by
,
372 "create_time": rpc_output
.create_time
,
373 "job_status_details": rpc_output
.job_status_details
if rpc_output
.job_status_details
is not None else None,
374 "parameter": [param
.as_dict() for param
in rpc_output
.parameter
],
375 "parameter_group": [pg
.as_dict() for pg
in rpc_output
.parameter_group
]
378 for vnfr
in rpc_output
.vnf_out_list
:
379 vnfr_job
= CfgAgentVnfr
.from_dict({
380 "id": vnfr
.vnfr_id_ref
,
381 "vnf_job_status": "pending",
384 for primitive
in vnfr
.vnf_out_primitive
:
385 vnf_primitive
= CfgAgentPrimitive
.from_dict({
386 "name": primitive
.name
,
387 "execution_status": ConfigAgentJob
.STATUS_MAP
[primitive
.execution_status
],
388 "execution_id": primitive
.execution_id
391 # Copy over the input param
392 for param
in primitive
.parameter
:
393 vnf_primitive
.parameter
.append(
394 CfgAgentPrimitiveParam
.from_dict({
399 vnfr_job
.primitive
.append(vnf_primitive
)
401 job
.vnfr
.append(vnfr_job
)
403 return ConfigAgentJob(nsr_id
, job
, project
, tasks
)
406 class ConfigAgentJobMonitor(object):
407 """Job monitor: Polls the Juju controller and get the status.
409 If all Primitive are success, then vnf & nsr status will be "success"
410 If any one Primitive reaches a failed state then both vnf and nsr will fail.
414 def __init__(self
, dts
, log
, job
, executor
, loop
, config_plugin
):
419 job (ConfigAgentJob): ConfigAgentJob instance
420 executor (concurrent.futures): Executor for juju status api calls
421 loop (eventloop): Current event loop instance
422 config_plugin : Config plugin to be used.
427 self
.executor
= executor
428 self
.polling_period
= ConfigAgentJobMonitor
.POLLING_PERIOD
429 self
.config_plugin
= config_plugin
433 def _monitor_processes(self
, registration_handle
):
436 for process
in self
.job
.tasks
:
437 if isinstance(process
, asyncio
.subprocess
.Process
):
438 rc
= yield from process
.wait()
439 err
= yield from process
.stderr
.read()
443 rc
= yield from process
446 self
.log
.debug("Process {} returned rc: {}, err: {}".
447 format(process
, rc
, err
))
451 errs
+= "<success>{}</success>".format(err
)
453 errs
+= "<error>{}</error>".format(err
)
457 self
.job
.job_status
= "success"
459 self
.job
.job_status
= "failure"
462 self
.job
.job
.job_status_details
= errs
464 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
466 def get_error_details(self
):
467 '''Get the error details from failed primitives'''
469 for vnfr
in self
.job
.job
.vnfr
:
470 if vnfr
.vnf_job_status
!= "failure":
473 for primitive
in vnfr
.primitive
:
474 if primitive
.execution_status
== "failure":
476 if primitive
.execution_error_details
:
477 errs
+= primitive
.execution_error_details
479 errs
+= '{}: Unknown error'.format(primitive
.name
)
485 def publish_action_status(self
):
487 Starts publishing the status for jobs/primitives
489 registration_handle
= yield from self
.dts
.register(
490 xpath
=self
.job
.xpath
,
491 handler
=rift
.tasklets
.DTS
.RegistrationHandler(),
492 flags
=(rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
),
495 self
.log
.debug('preparing to publish job status for {}'.format(self
.job
.xpath
))
496 self
.job
.regh
= registration_handle
499 registration_handle
.create_element(self
.job
.xpath
, self
.job
.job
)
501 # If the config is done via a user defined script
502 if self
.job
.tasks
is not None:
503 yield from self
._monitor
_processes
(registration_handle
)
507 # Run until pending moves to either failure/success
508 while self
.job
.job_status
== "pending":
511 if curr
- prev
< self
.polling_period
:
512 pause
= self
.polling_period
- (curr
- prev
)
513 yield from asyncio
.sleep(pause
, loop
=self
.loop
)
518 for vnfr
in self
.job
.job
.vnfr
:
519 task
= self
.loop
.create_task(self
.get_vnfr_status(vnfr
))
522 # Exit, if no tasks are found
526 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
528 job_status
= [task
.result() for task
in tasks
]
530 if "failure" in job_status
:
531 self
.job
.job_status
= "failure"
532 errs
= self
.get_error_details()
534 self
.job
.job
.job_status_details
= errs
535 elif "pending" in job_status
:
536 self
.job
.job_status
= "pending"
538 self
.job
.job_status
= "success"
540 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
541 # self.job.job_status,
545 registration_handle
.update_element(self
.job
.xpath
, self
.job
.job
)
548 except Exception as e
:
549 self
.log
.exception(e
)
554 def get_vnfr_status(self
, vnfr
):
555 """Schedules tasks for all containing primitives and updates it's own
559 vnfr : Vnfr job record containing primitives.
562 (str): "success|failure|pending"
567 for primitive
in vnfr
.primitive
:
568 if primitive
.execution_status
!= 'pending':
571 if primitive
.execution_id
== "":
572 # Actions which failed to queue can have empty id
573 job_status
.append(primitive
.execution_status
)
576 elif primitive
.execution_id
== "config":
577 # Config job. Check if service is active
578 task
= self
.loop
.create_task(self
.get_service_status(vnfr
.id, primitive
))
581 task
= self
.loop
.create_task(self
.get_primitive_status(primitive
))
586 yield from asyncio
.wait(tasks
, loop
=self
.loop
)
588 job_status
.extend([task
.result() for task
in tasks
])
589 if "failure" in job_status
:
590 vnfr
.vnf_job_status
= "failure"
593 elif "pending" in job_status
:
594 vnfr
.vnf_job_status
= "pending"
598 vnfr
.vnf_job_status
= "success"
602 def get_service_status(self
, vnfr_id
, primitive
):
604 status
= yield from self
.loop
.run_in_executor(
606 self
.config_plugin
.get_service_status
,
610 self
.log
.debug("Service status: {}".format(status
))
611 if status
in ['error', 'blocked']:
612 self
.log
.warning("Execution of config {} failed: {}".
613 format(primitive
.execution_id
, status
))
614 primitive
.execution_error_details
= 'Config failed'
616 elif status
in ['active']:
623 except Exception as e
:
624 self
.log
.exception(e
)
627 primitive
.execution_status
= status
628 return primitive
.execution_status
631 def get_primitive_status(self
, primitive
):
633 Queries the juju api and gets the status of the execution id.
636 primitive : Primitive containing the execution ID.
640 resp
= yield from self
.loop
.run_in_executor(
642 self
.config_plugin
.get_action_status
,
643 primitive
.execution_id
646 self
.log
.debug("Action status: {}".format(resp
))
647 status
= resp
['status']
648 if status
== 'failed':
649 self
.log
.warning("Execution of action {} failed: {}".
650 format(primitive
.execution_id
, resp
))
651 primitive
.execution_error_details
= resp
['message']
653 except Exception as e
:
654 self
.log
.exception(e
)
657 # Handle case status is None
659 primitive
.execution_status
= ConfigAgentJob
.STATUS_MAP
[status
]
661 primitive
.execution_status
= "failure"
663 return primitive
.execution_status
666 class CfgAgentJobDtsHandler(object):
667 """Dts Handler for CfgAgent"""
668 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
670 def __init__(self
, dts
, log
, loop
, nsm
, cfgm
):
677 cfgm : ConfigManager.
686 self
._nsr
_regh
= None
687 self
._project
= cfgm
.project
691 """ Return registration handle """
696 """ Return the NSManager manager instance """
701 """ Return the ConfigManager manager instance """
705 def cfg_job_xpath(nsr_id
, job_id
):
706 return self
._project
.add_project(("D,/nsr:ns-instance-opdata" +
707 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
708 "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id
, job_id
))
712 """ Register for NS monitoring read from dts """
715 def on_prepare(xact_info
, action
, ks_path
, msg
):
716 """ prepare callback from dts """
717 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
718 if action
== rwdts
.QueryAction
.READ
:
719 schema
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
.schema()
720 path_entry
= schema
.keyspec_to_entry(ks_path
)
722 nsr_id
= path_entry
.key00
.ns_instance_config_ref
724 #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
726 if nsr_id
is None or nsr_id
== "":
727 nsrs
= list(self
.nsm
.nsrs
.values())
728 nsr_ids
= [nsr
.id for nsr
in nsrs
if nsr
is not None]
732 for nsr_id
in nsr_ids
:
733 jobs
= self
.cfgm
.get_job(nsr_id
)
736 xact_info
.respond_xpath(
737 rwdts
.XactRspCode
.MORE
,
738 CfgAgentJobDtsHandler
.cfg_job_xpath(nsr_id
, job
.id),
741 except Exception as e
:
742 self
._log
.exception("Caught exception:%s", str(e
))
743 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
746 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
748 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
749 with self
._dts
.group_create() as group
:
750 self
._regh
= group
.register(xpath
=self
._project
.add_project(
751 CfgAgentJobDtsHandler
.XPATH
),
753 flags
=rwdts
.Flag
.PUBLISHER
,
757 def _terminate_nsr(self
, nsr_id
):
758 self
._log
.debug("NSR {} being terminated".format(nsr_id
))
759 jobs
= self
.cfgm
.get_job(nsr_id
)
761 path
= CfgAgentJobDtsHandler
.cfg_job_xpath(nsr_id
, job
.id)
762 with self
._dts
.transaction() as xact
:
763 self
._log
.debug("Deleting job: {}".format(path
))
764 job
.regh
.delete_element(path
)
765 self
._log
.debug("Deleted job: {}".format(path
))
767 # Remove the NSR id in manager
768 self
.cfgm
.del_nsr(nsr_id
)
772 return self
._project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
775 def register_for_nsr(self
):
776 """ Register for NSR changes """
779 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
780 """ This NSR is created """
781 self
._log
.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
786 if (query_action
== rwdts
.QueryAction
.UPDATE
or
787 query_action
== rwdts
.QueryAction
.CREATE
):
789 elif query_action
== rwdts
.QueryAction
.DELETE
:
790 nsr_id
= msg
.ns_instance_config_ref
791 asyncio
.ensure_future(self
._terminate
_nsr
(nsr_id
), loop
=self
._loop
)
793 raise NotImplementedError(
794 "%s action on cm-state not supported",
797 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
800 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
801 self
._nsr
_regh
= yield from self
._dts
.register(self
.nsr_xpath
,
802 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
804 except Exception as e
:
805 self
._log
.error("Failed to register for NSR changes as %s", str(e
))
807 def deregister(self
):
808 self
._log
.debug("De-register config agent job for project".
809 format(self
._project
.name
))
811 self
._regh
.deregister()
815 self
._nsr
_regh
.deregister()
816 self
._nsr
_regh
= None
819 class ConfigAgentJobManager(object):
820 """A central class that manager all the Config Agent related data,
821 Including updating the status
823 TODO: Needs to support multiple config agents.
825 def __init__(self
, dts
, log
, loop
, project
, nsm
):
831 nsm : NsmTasklet instance
838 self
.project
= project
839 self
.handler
= CfgAgentJobDtsHandler(dts
, log
, loop
, nsm
, self
)
840 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
842 def add_job(self
, rpc_output
, tasks
=None):
843 """Once an RPC is triggered, add a new job
846 rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
847 rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
848 tasks(list) A list of asyncio.Tasks
851 nsr_id
= rpc_output
.nsr_id_ref
853 job
= ConfigAgentJob
.convert_rpc_input_to_job(nsr_id
, rpc_output
,
856 self
.log
.debug("Creating a job monitor for Job id: {}".format(
859 if nsr_id
not in self
.jobs
:
860 self
.jobs
[nsr_id
] = [job
]
862 self
.jobs
[nsr_id
].append(job
)
864 # If the tasks are none, assume juju actions
865 # TBD: This logic need to be revisited
866 ca
= self
.nsm
.config_agent_plugins
[0]
868 for agent
in self
.nsm
.config_agent_plugins
:
869 if agent
.agent_type
== 'juju':
873 # For every Job we will schedule a new monitoring process.
874 job_monitor
= ConfigAgentJobMonitor(
882 task
= self
.loop
.create_task(job_monitor
.publish_action_status())
884 def get_job(self
, nsr_id
):
885 """Get the job associated with the NSR Id, if present."""
887 return self
.jobs
[nsr_id
]
891 def del_nsr(self
, nsr_id
):
892 """Delete a NSR id from the jobs list"""
893 if nsr_id
in self
.jobs
:
894 self
.jobs
.pop(nsr_id
)
898 yield from self
.handler
.register()
899 yield from self
.handler
.register_for_nsr()
901 def deregister(self
):
902 yield from self
.handler
.deregister()