3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 This file - ConfigManagerTasklet()
23 ConfigManagerProject()
25 +--|--> ConfigurationManager()
27 +--> rwconman_config.py - ConfigManagerConfig()
29 | +--> ConfigManagerNSR()
31 +--> rwconman_events.py - ConfigManagerEvents()
33 +--> ConfigManagerROif()
42 gi
.require_version('RwDts', '1.0')
43 gi
.require_version('RwConmanYang', '1.0')
45 from gi
.repository
import (
47 RwConmanYang
as conmanY
,
51 from rift
.mano
.utils
.project
import (
56 from . import rwconman_config
as Config
57 from . import rwconman_events
as Event
59 def log_this_vnf(vnf_cfg
):
61 used_item_list
= ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
62 for item
in used_item_list
:
64 if item
== 'mgmt_ip_address':
65 log_vnf
+= "({})".format(vnf_cfg
[item
])
67 log_vnf
+= "{}/".format(vnf_cfg
[item
])
70 class ConfigurationManager(object):
71 def __init__(self
, log
, loop
, dts
, project
):
75 self
._project
= project
78 self
.cfg_dir
= os
.path
.join(os
.environ
["RIFT_INSTALL"], "etc/conman")
79 self
._config
= Config
.ConfigManagerConfig(self
._dts
, self
._log
, self
._loop
, self
)
80 self
._event
= Event
.ConfigManagerEvents(self
._dts
, self
._log
, self
._loop
, self
)
82 self
.pending_tasks
= {}
84 self
._task
= None # The configuration_handler task
93 def update_vnf_state(self
, vnf_cfg
, state
):
94 nsr_obj
= vnf_cfg
['nsr_obj']
95 self
._log
.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj
.nsr_name
, vnf_cfg
['vnfr_name'], state
)
96 yield from nsr_obj
.update_vnf_cm_state(vnf_cfg
['vnfr'], state
)
99 def update_ns_state(self
, nsr_obj
, state
):
100 self
._log
.info("Updating cm-state for NS(%s) to:%s", nsr_obj
.nsr_name
, state
)
101 yield from nsr_obj
.update_ns_cm_state(state
)
103 def add_to_pending(self
, nsr_obj
):
105 if (nsr_obj
not in self
.pending_cfg
and
106 nsr_obj
.cm_nsr
['state'] == nsr_obj
.state_to_string(conmanY
.RecordState
.RECEIVED
)):
108 self
._log
.info("Adding NS={} to pending config list"
109 .format(nsr_obj
.nsr_name
))
112 nsr_obj
.vnf_cfg_list
= []
113 # Sort all the VNF by their configuration attribute priority
114 sorted_dict
= dict(sorted(nsr_obj
.nsr_cfg_config_attributes_dict
.items()))
115 for config_attributes_dict
in sorted_dict
.values():
116 # Iterate through each priority level
117 for config_priority
in config_attributes_dict
:
118 # Iterate through each vnfr at this priority level
119 vnfr
= nsr_obj
._vnfr
_dict
[config_priority
['id']]
120 self
._log
.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr
['vnf_cfg']))
121 nsr_obj
.vnf_cfg_list
.append(vnfr
['vnf_cfg'])
122 self
.pending_cfg
.append(nsr_obj
)
124 def add_nsr_obj(self
, nsr_obj
):
125 self
._log
.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj
)
126 self
._nsr
_objs
[nsr_obj
.nsr_id
] = nsr_obj
128 def remove_nsr_obj(self
, nsr_id
):
129 self
._log
.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id
)
130 del self
._nsr
_objs
[nsr_id
]
132 def get_nsr_obj(self
, nsr_id
):
133 self
._log
.debug("Returning nsr_obj (%s) from Configuration Manager", self
._nsr
_objs
[nsr_id
])
134 return self
._nsr
_objs
.get(nsr_id
)
137 def configuration_handler(self
):
139 def process_vnf_cfg(agent_vnfr
, nsr_obj
):
140 vnf_cfg
= agent_vnfr
.vnf_cfg
143 if vnf_cfg
['cfg_retries']:
144 # This failed previously, lets give it some time
145 yield from asyncio
.sleep(5, loop
=self
._loop
)
147 vnf_cfg
['cfg_retries'] += 1
149 # Check to see if this vnfr is managed
150 done
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
151 'apply_initial_config',
154 self
._log
.debug("Apply configuration for VNF={} on attempt {} " \
155 "returned {}".format(log_this_vnf(vnf_cfg
),
156 vnf_cfg
['cfg_retries'],
160 self
._log
.warn("Apply initial config on VNFR {}".
161 format(log_this_vnf(vnf_cfg
)))
163 yield from nsr_obj
.parent
.process_vnf_initial_config(
166 yield from self
.update_vnf_state(vnf_cfg
,
167 conmanY
.RecordState
.READY
)
169 except Exception as e
:
170 nsr_obj
.vnf_failed
= True
171 self
._log
.exception(e
)
172 yield from self
.update_vnf_state(vnf_cfg
,
173 conmanY
.RecordState
.CFG_FAILED
)
176 # Check to see if the VNF configure failed
177 status
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
182 if status
and status
== 'error':
183 # Failed configuration
184 nsr_obj
.vnf_failed
= True
186 yield from self
.update_vnf_state(vnf_cfg
, conmanY
.RecordState
.CFG_FAILED
)
187 self
._log
.error("Failed to apply configuration for VNF = {}"
188 .format(log_this_vnf(vnf_cfg
)))
194 def process_nsr_obj(nsr_obj
):
195 # Return status, this will be set to False is if we fail to configure any VNF
198 # Reset VNF failed flag
199 nsr_obj
.vnf_failed
= False
200 vnf_cfg_list
= nsr_obj
.vnf_cfg_list
202 # Check to make sure the NSR is still valid
203 if nsr_obj
.parent
.is_nsr_valid(nsr_obj
.nsr_id
) is False:
204 self
._log
.info("NSR {} not found, could be terminated".
205 format(nsr_obj
.nsr_id
))
208 # Need while loop here, since we will be removing list item
209 vnf_cfg
= vnf_cfg_list
.pop(0)
210 self
._log
.info("Applying Pending Configuration for VNF = %s / %s",
211 log_this_vnf(vnf_cfg
), vnf_cfg
['agent_vnfr'])
212 vnf_done
= yield from process_vnf_cfg(vnf_cfg
['agent_vnfr'], nsr_obj
)
213 self
._log
.debug("Applied Pending Configuration for VNF = {}, status={}"
214 .format(log_this_vnf(vnf_cfg
), vnf_done
))
217 # We will retry, but we will give other VNF chance first since this one failed.
218 vnf_cfg_list
.append(vnf_cfg
)
220 if nsr_obj
.vnf_failed
:
221 # Atleast one VNF config failed
225 # Apply NS initial config if present
226 nsr_obj
.nsr_failed
= False
227 self
._log
.debug("Apply initial config on NSR {}".format(nsr_obj
.nsr_name
))
229 yield from nsr_obj
.parent
.process_ns_initial_config(nsr_obj
)
230 except Exception as e
:
231 nsr_obj
.nsr_failed
= True
232 self
._log
.exception(e
)
235 # Set the config status for the NSR
237 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.READY
)
238 elif nsr_obj
.vnf_failed
or nsr_obj
.nsr_failed
:
239 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
)
243 # Basically, this loop will never end.
245 # Check the pending tasks are complete
246 # Store a list of tasks that are completed and
247 # remove from the pending_tasks list outside loop
249 for nsr_id
, task
in self
.pending_tasks
.items():
254 self
._log
.error("Exception in configuring nsr {}: {}".
256 nsr_obj
= self
.get_nsr_obj(nsr_id
)
258 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
, str(e
))
262 self
._log
.debug("NSR {} configured: {}".format(nsr_id
, rc
))
264 self
._log
.debug("NSR {} still configuring".format(nsr_id
))
266 # Remove the completed tasks
268 self
.pending_tasks
.pop(nsr_id
)
270 # TODO (pjoseph): Fix this
271 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
272 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
273 # wrongfully 10 seconds in advance?
274 yield from asyncio
.sleep(10, loop
=self
._loop
)
277 # get first NS, pending_cfg is nsr_obj list
278 nsr_obj
= self
.pending_cfg
[0]
280 if nsr_obj
.being_deleted
is False:
281 # Process this NS, returns back same obj is successfull or exceeded retries
283 self
._log
.info("Processing NSR:{}".format(nsr_obj
.nsr_name
))
285 # Check if we already have a task running for this NSR
286 # Case where we are still configuring and terminate is called
287 if nsr_obj
.nsr_id
in self
.pending_tasks
:
288 self
._log
.error("NSR {} in state {} has a configure task running.".
289 format(nsr_obj
.nsr_name
, nsr_obj
.get_ns_cm_state()))
290 # Terminate the task for this NSR
291 self
.pending_tasks
[nsr_obj
.nsr_id
].cancel()
293 yield from self
.update_ns_state(nsr_obj
, conmanY
.RecordState
.CFG_PROCESS
)
295 # Call in a separate thread
296 self
.pending_tasks
[nsr_obj
.nsr_id
] = \
297 self
._loop
.create_task(
298 process_nsr_obj(nsr_obj
)
301 # Remove this nsr_obj
302 self
.pending_cfg
.remove(nsr_obj
)
304 except Exception as e
:
305 self
._log
.error("Failed to process NSR as %s", str(e
))
306 self
._log
.exception(e
)
308 except asyncio
.CancelledError
as e
:
309 self
._log
.debug("Stopped configuration handler for project {}".format(self
._project
))
313 # Perform register() for all handlers
314 for reg
in self
._handlers
:
315 yield from reg
.register()
317 self
._task
= asyncio
.ensure_future(self
.configuration_handler(), loop
=self
._loop
)
319 def deregister(self
):
320 self
._log
.debug("De-register conman for project {}".format(self
._project
.name
))
323 for reg
in self
._handlers
:
327 class ConfigManagerProject(ManoProject
):
329 def __init__(self
, name
, tasklet
, **kw
):
330 super(ConfigManagerProject
, self
).__init
__(tasklet
.log
, name
)
337 self
._log
.info("Initializing the Configuration-Manager tasklet")
338 self
._con
_man
= ConfigurationManager(self
.log
,
342 yield from self
._con
_man
.register()
344 def deregister(self
):
345 self
._log
.debug("De-register project {}".format(self
.name
))
346 self
._con
_man
.deregister()
349 class ConfigManagerTasklet(rift
.tasklets
.Tasklet
):
350 def __init__(self
, *args
, **kwargs
):
351 super(ConfigManagerTasklet
, self
).__init
__(*args
, **kwargs
)
352 self
.rwlog
.set_category("rw-conman-log")
356 self
.project_handler
= None
364 super(ConfigManagerTasklet
, self
).start()
366 self
.log
.debug("Registering with dts")
368 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
369 conmanY
.get_schema(),
371 self
.on_dts_state_change
)
373 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
375 def on_instance_started(self
):
376 self
.log
.debug("Got instance started callback")
380 self
.log
.debug("creating project handler")
381 self
.project_handler
= ProjectHandler(self
, ConfigManagerProject
)
382 self
.project_handler
.register()
389 def on_dts_state_change(self
, state
):
390 """Take action according to current dts state to transition
391 application into the corresponding application state
394 state - current dts state
397 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
398 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
402 rwdts
.State
.INIT
: self
.init
,
403 rwdts
.State
.RUN
: self
.run
,
406 # Transition application to next state
407 handler
= handlers
.get(state
, None)
408 if handler
is not None:
411 # Transition dts to next state
412 next_state
= switch
.get(state
, None)
413 if next_state
is not None:
414 self
._dts
.handle
.set_state(next_state
)