3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 This file - ConfigManagerTasklet()
21 +--|--> ConfigurationManager()
23 +--> rwconman_config.py - ConfigManagerConfig()
25 | +--> ConfigManagerNSR()
27 +--> rwconman_events.py - ConfigManagerEvents()
29 +--> ConfigManagerROif()
38 gi
.require_version('RwDts', '1.0')
39 gi
.require_version('RwConmanYang', '1.0')
41 from gi
.repository
import (
43 RwConmanYang
as conmanY
,
48 from . import rwconman_config
as Config
49 from . import rwconman_events
as Event
51 def log_this_vnf(vnf_cfg
):
53 used_item_list
= ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
54 for item
in used_item_list
:
56 if item
== 'mgmt_ip_address':
57 log_vnf
+= "({})".format(vnf_cfg
[item
])
59 log_vnf
+= "{}/".format(vnf_cfg
[item
])
62 class ConfigurationManager(object):
63 def __init__(self
, log
, loop
, dts
):
68 self
.cfg_dir
= os
.path
.join(os
.environ
["RIFT_INSTALL"], "etc/conman")
69 self
._config
= Config
.ConfigManagerConfig(self
._dts
, self
._log
, self
._loop
, self
)
70 self
._event
= Event
.ConfigManagerEvents(self
._dts
, self
._log
, self
._loop
, self
)
72 self
.pending_tasks
= {}
82 def update_vnf_state(self
, vnf_cfg
, state
):
83 nsr_obj
= vnf_cfg
['nsr_obj']
84 self
._log
.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj
.nsr_name
, vnf_cfg
['vnfr_name'], state
)
85 yield from nsr_obj
.update_vnf_cm_state(vnf_cfg
['vnfr'], state
)
88 def update_ns_state(self
, nsr_obj
, state
):
89 self
._log
.info("Updating cm-state for NS(%s) to:%s", nsr_obj
.nsr_name
, state
)
90 yield from nsr_obj
.update_ns_cm_state(state
)
92 def add_to_pending(self
, nsr_obj
):
94 if (nsr_obj
not in self
.pending_cfg
and
95 nsr_obj
.cm_nsr
['state'] == nsr_obj
.state_to_string(conmanY
.RecordState
.RECEIVED
)):
97 self
._log
.info("Adding NS={} to pending config list"
98 .format(nsr_obj
.nsr_name
))
101 nsr_obj
.vnf_cfg_list
= []
102 # Sort all the VNF by their configuration attribute priority
103 sorted_dict
= dict(sorted(nsr_obj
.nsr_cfg_config_attributes_dict
.items()))
104 for config_attributes_dict
in sorted_dict
.values():
105 # Iterate through each priority level
106 for config_priority
in config_attributes_dict
:
107 # Iterate through each vnfr at this priority level
108 vnfr
= nsr_obj
._vnfr
_dict
[config_priority
['id']]
109 self
._log
.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr
['vnf_cfg']))
110 nsr_obj
.vnf_cfg_list
.append(vnfr
['vnf_cfg'])
111 self
.pending_cfg
.append(nsr_obj
)
113 def add_nsr_obj(self
, nsr_obj
):
114 self
._log
.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj
)
115 self
._nsr
_objs
[nsr_obj
.nsr_id
] = nsr_obj
117 def remove_nsr_obj(self
, nsr_id
):
118 self
._log
.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id
)
119 del self
._nsr
_objs
[nsr_id
]
121 def get_nsr_obj(self
, nsr_id
):
122 self
._log
.debug("Returning nsr_obj (%s) from Configuration Manager", self
._nsr
_objs
[nsr_id
])
123 return self
._nsr
_objs
.get(nsr_id
)
126 def configuration_handler(self
):
128 def process_vnf_cfg(agent_vnfr
, nsr_obj
):
129 vnf_cfg
= agent_vnfr
.vnf_cfg
132 if vnf_cfg
['cfg_retries']:
133 # This failed previously, lets give it some time
134 yield from asyncio
.sleep(5, loop
=self
._loop
)
136 vnf_cfg
['cfg_retries'] += 1
138 # Check to see if this vnfr is managed
139 done
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
140 'apply_initial_config',
143 self
._log
.debug("Apply configuration for VNF={} on attempt {} " \
144 "returned {}".format(log_this_vnf(vnf_cfg
),
145 vnf_cfg
['cfg_retries'],
149 yield from self
.update_vnf_state(vnf_cfg
, conmanY
.RecordState
.READY
)
152 # Check to see if the VNF configure failed
153 status
= yield from self
._config
._config
_agent
_mgr
.invoke_config_agent_plugins(
158 if status
and status
== 'error':
159 # Failed configuration
160 nsr_obj
.vnf_failed
= True
162 yield from self
.update_vnf_state(vnf_cfg
, conmanY
.RecordState
.CFG_FAILED
)
163 self
._log
.error("Failed to apply configuration for VNF = {}"
164 .format(log_this_vnf(vnf_cfg
)))
169 def process_nsr_obj(nsr_obj
):
170 # Return status, this will be set to False is if we fail to configure any VNF
173 # Reset VNF failed flag
174 nsr_obj
.vnf_failed
= False
175 vnf_cfg_list
= nsr_obj
.vnf_cfg_list
177 # Check to make sure the NSR is still valid
178 if nsr_obj
.parent
.is_nsr_valid(nsr_obj
.nsr_id
) is False:
179 self
._log
.info("NSR {} not found, could be terminated".
180 format(nsr_obj
.nsr_id
))
183 # Need while loop here, since we will be removing list item
184 vnf_cfg
= vnf_cfg_list
.pop(0)
185 self
._log
.info("Applying Pending Configuration for VNF = %s / %s",
186 log_this_vnf(vnf_cfg
), vnf_cfg
['agent_vnfr'])
187 vnf_done
= yield from process_vnf_cfg(vnf_cfg
['agent_vnfr'], nsr_obj
)
188 self
._log
.debug("Applied Pending Configuration for VNF = {}, status={}"
189 .format(log_this_vnf(vnf_cfg
), vnf_done
))
192 # We will retry, but we will give other VNF chance first since this one failed.
193 vnf_cfg_list
.append(vnf_cfg
)
195 if nsr_obj
.vnf_failed
:
196 # Atleast one VNF config failed
200 # Apply NS initial config if present
201 nsr_obj
.nsr_failed
= False
202 self
._log
.debug("Apply initial config on NSR {}".format(nsr_obj
.nsr_name
))
204 yield from nsr_obj
.parent
.process_ns_initial_config(nsr_obj
)
205 except Exception as e
:
206 nsr_obj
.nsr_failed
= True
207 self
._log
.exception(e
)
210 # Set the config status for the NSR
212 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.READY
)
213 elif nsr_obj
.vnf_failed
or nsr_obj
.nsr_failed
:
214 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
)
217 # Basically, this loop will never end.
219 # Check the pending tasks are complete
220 # Store a list of tasks that are completed and
221 # remove from the pending_tasks list outside loop
223 for nsr_id
, task
in self
.pending_tasks
.items():
228 self
._log
.error("Exception in configuring nsr {}: {}".
230 nsr_obj
= self
.get_nsr_obj(nsr_id
)
232 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_FAILED
, str(e
))
236 self
._log
.debug("NSR {} configured: {}".format(nsr_id
, rc
))
238 self
._log
.debug("NSR {} still configuring".format(nsr_id
))
240 # Remove the completed tasks
242 self
.pending_tasks
.pop(nsr_id
)
244 # TODO (pjoseph): Fix this
245 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
246 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
247 # wrongfully 10 seconds in advance?
248 yield from asyncio
.sleep(10, loop
=self
._loop
)
251 # get first NS, pending_cfg is nsr_obj list
252 nsr_obj
= self
.pending_cfg
[0]
254 if nsr_obj
.being_deleted
is False:
255 # Process this NS, returns back same obj is successfull or exceeded retries
257 self
._log
.info("Processing NSR:{}".format(nsr_obj
.nsr_name
))
259 # Check if we already have a task running for this NSR
260 # Case where we are still configuring and terminate is called
261 if nsr_obj
.nsr_id
in self
.pending_tasks
:
262 self
._log
.error("NSR {} in state {} has a configure task running.".
263 format(nsr_obj
.nsr_name
, nsr_obj
.get_ns_cm_state()))
264 # Terminate the task for this NSR
265 self
.pending_tasks
[nsr_obj
.nsr_id
].cancel()
267 yield from self
.update_ns_state(nsr_obj
, conmanY
.RecordState
.CFG_PROCESS
)
269 # Call in a separate thread
270 self
.pending_tasks
[nsr_obj
.nsr_id
] = \
271 self
._loop
.create_task(
272 process_nsr_obj(nsr_obj
)
275 # Remove this nsr_obj
276 self
.pending_cfg
.remove(nsr_obj
)
278 except Exception as e
:
279 self
._log
.error("Failed to process NSR as %s", str(e
))
280 self
._log
.exception(e
)
285 # Perform register() for all handlers
286 for reg
in self
._handlers
:
287 yield from reg
.register()
289 asyncio
.ensure_future(self
.configuration_handler(), loop
=self
._loop
)
291 class ConfigManagerTasklet(rift
.tasklets
.Tasklet
):
292 def __init__(self
, *args
, **kwargs
):
293 super(ConfigManagerTasklet
, self
).__init
__(*args
, **kwargs
)
294 self
.rwlog
.set_category("rw-conman-log")
300 super(ConfigManagerTasklet
, self
).start()
302 self
.log
.debug("Registering with dts")
304 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
305 conmanY
.get_schema(),
307 self
.on_dts_state_change
)
309 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
311 def on_instance_started(self
):
312 self
.log
.debug("Got instance started callback")
316 self
._log
.info("Initializing the Configuration-Manager tasklet")
317 self
._con
_man
= ConfigurationManager(self
.log
,
320 yield from self
._con
_man
.register()
327 def on_dts_state_change(self
, state
):
328 """Take action according to current dts state to transition
329 application into the corresponding application state
332 state - current dts state
335 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
336 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
340 rwdts
.State
.INIT
: self
.init
,
341 rwdts
.State
.RUN
: self
.run
,
344 # Transition application to next state
345 handler
= handlers
.get(state
, None)
346 if handler
is not None:
349 # Transition dts to next state
350 next_state
= switch
.get(state
, None)
351 if next_state
is not None:
352 self
._dts
.handle
.set_state(next_state
)