3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 This file - ConfigManagerTasklet()
23 ConfigManagerProject()
25 +--|--> ConfigurationManager()
27 +--> rwconman_config.py - ConfigManagerConfig()
29 +--> ConfigManagerNSR()
38 gi
.require_version('RwDts', '1.0')
39 gi
.require_version('RwConmanYang', '1.0')
41 from gi
.repository
import (
43 RwConmanYang
as conmanY
,
47 from rift
.mano
.utils
.project
import (
52 from . import rwconman_config
as Config
54 def log_this_vnf(vnf_cfg
):
56 used_item_list
= ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
57 for item
in used_item_list
:
59 if item
== 'mgmt_ip_address':
60 log_vnf
+= "({})".format(vnf_cfg
[item
])
62 log_vnf
+= "{}/".format(vnf_cfg
[item
])
65 class ConfigurationManager(object):
66 def __init__(self
, log
, loop
, dts
, project
):
70 self
._project
= project
73 self
._config
= Config
.ConfigManagerConfig(self
._dts
, self
._log
, self
._loop
, self
)
75 self
.pending_tasks
= {}
77 self
._task
= None # The configuration_handler task
85 def update_vnf_state(self
, vnf_cfg
, state
):
86 nsr_obj
= vnf_cfg
['nsr_obj']
87 self
._log
.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj
.nsr_name
, vnf_cfg
['vnfr_name'], state
)
88 yield from nsr_obj
.update_vnf_cm_state(vnf_cfg
['vnfr'], state
)
91 def update_ns_state(self
, nsr_obj
, state
):
92 self
._log
.info("Updating cm-state for NS(%s) to:%s", nsr_obj
.nsr_name
, state
)
93 yield from nsr_obj
.update_ns_cm_state(state
)
95 def add_to_pending(self
, nsr_obj
, cfg_vnfr_list
):
97 if (nsr_obj
not in self
.pending_cfg
and
98 nsr_obj
.cm_nsr
['state'] == nsr_obj
.state_to_string(conmanY
.RecordState
.RECEIVED
)):
100 self
._log
.info("Adding NS={} to pending config list"
101 .format(nsr_obj
.nsr_name
))
103 for cfg_vnfr
in cfg_vnfr_list
:
104 self
._log
.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(cfg_vnfr
['vnf_cfg']))
105 nsr_obj
.vnf_cfg_list
.append(cfg_vnfr
['vnf_cfg'])
106 self
.pending_cfg
.append(nsr_obj
)
108 def add_nsr_obj(self
, nsr_obj
):
109 self
._log
.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj
)
110 self
._nsr
_objs
[nsr_obj
.nsr_id
] = nsr_obj
112 def remove_nsr_obj(self
, nsr_id
):
113 self
._log
.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id
)
114 del self
._nsr
_objs
[nsr_id
]
116 def get_nsr_obj(self
, nsr_id
):
117 if nsr_id
not in self
._nsr
_objs
:
118 self
._log
.info("NSR %s not found", nsr_id
)
120 self
._log
.debug("Returning nsr_obj (%s) from Configuration Manager", self
._nsr
_objs
[nsr_id
])
121 return self
._nsr
_objs
.get(nsr_id
)
124 def configuration_handler(self
):
126 def process_vnf_cfg(agent_vnfr
, nsr_obj
):
127 vnf_cfg
= agent_vnfr
.vnf_cfg
130 if vnf_cfg
['cfg_retries']:
131 # This failed previously, lets give it some time
132 yield from asyncio
.sleep(5, loop
=self
._loop
)
134 vnf_cfg
['cfg_retries'] += 1
136 # Check to see if this vnfr is managed
137 done
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
138 'apply_initial_config',
141 self
._log
.debug("Apply configuration for VNF={} on attempt {} " \
142 "returned {}".format(log_this_vnf(vnf_cfg
),
143 vnf_cfg
['cfg_retries'],
147 self
._log
.debug("Apply initial config on VNFR {}".
148 format(log_this_vnf(vnf_cfg
)))
150 yield from nsr_obj
.parent
.process_vnf_initial_config(
154 yield from self
.update_vnf_state(vnf_cfg
,
155 conmanY
.RecordState
.READY
)
157 except Exception as e
:
158 nsr_obj
.vnf_failed
= True
159 self
._log
.exception(e
)
160 yield from self
.update_vnf_state(vnf_cfg
,
161 conmanY
.RecordState
.CFG_FAILED
)
164 self
._log
.debug("Getting config status {}".format(log_this_vnf(vnf_cfg
)))
165 # Check to see if the VNF configure failed
166 status
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
171 if status
and status
== 'error':
172 # Failed configuration
173 nsr_obj
.vnf_failed
= True
175 yield from self
.update_vnf_state(vnf_cfg
, conmanY
.RecordState
.CFG_FAILED
)
176 self
._log
.error("Failed to apply configuration for VNF = {}"
177 .format(log_this_vnf(vnf_cfg
)))
183 def process_nsr_obj(nsr_obj
):
184 # Return status, this will be set to False is if we fail to configure any VNF
187 # Reset VNF failed flag
188 nsr_obj
.vnf_failed
= False
189 vnf_cfg_list
= nsr_obj
.vnf_cfg_list
191 # Check to make sure the NSR is still valid
192 if nsr_obj
.parent
.is_nsr_valid(nsr_obj
.nsr_id
) is False:
193 self
._log
.info("NSR {} not found, could be terminated".
194 format(nsr_obj
.nsr_id
))
197 # Need while loop here, since we will be removing list item
198 vnf_cfg
= vnf_cfg_list
.pop(0)
199 self
._log
.info("Applying Pending Configuration for VNF = %s / %s",
200 log_this_vnf(vnf_cfg
), vnf_cfg
['agent_vnfr'])
201 vnf_done
= yield from process_vnf_cfg(vnf_cfg
['agent_vnfr'], nsr_obj
)
202 self
._log
.debug("Applied Pending Configuration for VNF = {}, status={}"
203 .format(log_this_vnf(vnf_cfg
), vnf_done
))
206 # We will retry, but we will give other VNF chance first since this one failed.
207 vnf_cfg_list
.append(vnf_cfg
)
209 if nsr_obj
.vnf_failed
:
210 # Atleast one VNF config failed
214 # Apply NS initial config if present
215 nsr_obj
.nsr_failed
= False
216 self
._log
.debug("Apply initial config on NSR {}".format(nsr_obj
.nsr_name
))
218 yield from nsr_obj
.parent
.process_ns_initial_config(nsr_obj
, self
._project
.name
)
219 except Exception as e
:
220 nsr_obj
.nsr_failed
= True
221 self
._log
.exception(e
)
224 # Set the config status for the NSR
226 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.READY
)
227 elif nsr_obj
.vnf_failed
or nsr_obj
.nsr_failed
:
228 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
)
232 # Basically, this loop will never end.
234 # Check the pending tasks are complete
235 # Store a list of tasks that are completed and
236 # remove from the pending_tasks list outside loop
238 for nsr_id
, task
in self
.pending_tasks
.items():
243 self
._log
.error("Exception in configuring nsr {}: {}".
245 nsr_obj
= self
.get_nsr_obj(nsr_id
)
247 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
, str(e
))
251 self
._log
.debug("NSR {} configured: {}".format(nsr_id
, rc
))
253 self
._log
.debug("NSR {} still configuring".format(nsr_id
))
255 # Remove the completed tasks
257 self
.pending_tasks
.pop(nsr_id
)
259 # TODO (pjoseph): Fix this
260 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
261 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
262 # wrongfully 10 seconds in advance?
263 yield from asyncio
.sleep(10, loop
=self
._loop
)
266 # get first NS, pending_cfg is nsr_obj list
267 nsr_obj
= self
.pending_cfg
[0]
269 if nsr_obj
.being_deleted
is False:
270 # Process this NS, returns back same obj is successfull or exceeded retries
272 self
._log
.info("Processing NSR:{}".format(nsr_obj
.nsr_name
))
274 # Check if we already have a task running for this NSR
275 # Case where we are still configuring and terminate is called
276 if nsr_obj
.nsr_id
in self
.pending_tasks
:
277 self
._log
.error("NSR {} in state {} has a configure task running.".
278 format(nsr_obj
.nsr_name
, nsr_obj
.get_ns_cm_state()))
279 # Terminate the task for this NSR
280 self
.pending_tasks
[nsr_obj
.nsr_id
].cancel()
282 yield from self
.update_ns_state(nsr_obj
, conmanY
.RecordState
.CFG_PROCESS
)
284 # Call in a separate thread
285 self
.pending_tasks
[nsr_obj
.nsr_id
] = \
286 self
._loop
.create_task(
287 process_nsr_obj(nsr_obj
)
290 # Remove this nsr_obj
291 self
.pending_cfg
.remove(nsr_obj
)
293 except Exception as e
:
294 self
._log
.error("Failed to process NSR as %s", str(e
))
295 self
._log
.exception(e
)
297 except asyncio
.CancelledError
as e
:
298 self
._log
.debug("Stopped configuration handler for project {}".format(self
._project
))
302 # Perform register() for all handlers
303 for reg
in self
._handlers
:
304 yield from reg
.register()
306 self
._task
= asyncio
.ensure_future(self
.configuration_handler(), loop
=self
._loop
)
308 def deregister(self
):
309 self
._log
.debug("De-register conman for project {}".format(self
._project
.name
))
312 for reg
in self
._handlers
:
316 class ConfigManagerProject(ManoProject
):
318 def __init__(self
, name
, tasklet
, **kw
):
319 super(ConfigManagerProject
, self
).__init
__(tasklet
.log
, name
)
326 self
._log
.info("Initializing the Configuration-Manager tasklet")
327 self
._con
_man
= ConfigurationManager(self
.log
,
331 yield from self
._con
_man
.register()
333 def deregister(self
):
334 self
._log
.debug("De-register project {}".format(self
.name
))
335 self
._con
_man
.deregister()
338 class ConfigManagerTasklet(rift
.tasklets
.Tasklet
):
339 def __init__(self
, *args
, **kwargs
):
340 super(ConfigManagerTasklet
, self
).__init
__(*args
, **kwargs
)
341 self
.rwlog
.set_category("rw-conman-log")
345 self
.project_handler
= None
353 super(ConfigManagerTasklet
, self
).start()
355 self
.log
.debug("Registering with dts")
357 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
358 conmanY
.get_schema(),
360 self
.on_dts_state_change
)
362 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
364 def on_instance_started(self
):
365 self
.log
.debug("Got instance started callback")
369 self
.log
.debug("creating project handler")
370 self
.project_handler
= ProjectHandler(self
, ConfigManagerProject
)
371 self
.project_handler
.register()
378 def on_dts_state_change(self
, state
):
379 """Take action according to current dts state to transition
380 application into the corresponding application state
383 state - current dts state
386 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
387 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
391 rwdts
.State
.INIT
: self
.init
,
392 rwdts
.State
.RUN
: self
.run
,
395 # Transition application to next state
396 handler
= handlers
.get(state
, None)
397 if handler
is not None:
400 # Transition dts to next state
401 next_state
= switch
.get(state
, None)
402 if next_state
is not None:
403 self
._dts
.handle
.set_state(next_state
)