4e92b6c1fe9201a35cbe4ed7d3e74e2b8fb85014
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconmantasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 '''
19 This file - ConfigManagerTasklet()
20 |
21 +--|--> ConfigurationManager()
22 |
23 +--> rwconman_config.py - ConfigManagerConfig()
24 | |
25 | +--> ConfigManagerNSR()
26 |
27 +--> rwconman_events.py - ConfigManagerEvents()
28 |
29 +--> ConfigManagerROif()
30
31 '''
32
33 import asyncio
34 import logging
35 import os
36
37 import gi
38 gi.require_version('RwDts', '1.0')
39 gi.require_version('RwConmanYang', '1.0')
40
41 from gi.repository import (
42 RwDts as rwdts,
43 RwConmanYang as conmanY,
44 )
45
46 import rift.tasklets
47
48 from . import rwconman_config as Config
49 from . import rwconman_events as Event
50
51 def log_this_vnf(vnf_cfg):
52 log_vnf = ""
53 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
54 for item in used_item_list:
55 if item in vnf_cfg:
56 if item == 'mgmt_ip_address':
57 log_vnf += "({})".format(vnf_cfg[item])
58 else:
59 log_vnf += "{}/".format(vnf_cfg[item])
60 return log_vnf
61
62 class ConfigurationManager(object):
63 def __init__(self, log, loop, dts):
64 self._log = log
65 self._loop = loop
66 self._dts = dts
67 self.cfg_sleep = True
68 self.cfg_dir = os.path.join(os.environ["RIFT_INSTALL"], "etc/conman")
69 self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self)
70 self._event = Event.ConfigManagerEvents(self._dts, self._log, self._loop, self)
71 self.pending_cfg = []
72 self.pending_tasks = {}
73 self._nsr_objs = {}
74
75 self._handlers = [
76 self._config,
77 self._event,
78 ]
79
80
81 @asyncio.coroutine
82 def update_vnf_state(self, vnf_cfg, state):
83 nsr_obj = vnf_cfg['nsr_obj']
84 self._log.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj.nsr_name, vnf_cfg['vnfr_name'], state)
85 yield from nsr_obj.update_vnf_cm_state(vnf_cfg['vnfr'], state)
86
87 @asyncio.coroutine
88 def update_ns_state(self, nsr_obj, state):
89 self._log.info("Updating cm-state for NS(%s) to:%s", nsr_obj.nsr_name, state)
90 yield from nsr_obj.update_ns_cm_state(state)
91
92 def add_to_pending(self, nsr_obj):
93
94 if (nsr_obj not in self.pending_cfg and
95 nsr_obj.cm_nsr['state'] == nsr_obj.state_to_string(conmanY.RecordState.RECEIVED)):
96
97 self._log.info("Adding NS={} to pending config list"
98 .format(nsr_obj.nsr_name))
99
100 # Build the list
101 nsr_obj.vnf_cfg_list = []
102 # Sort all the VNF by their configuration attribute priority
103 sorted_dict = dict(sorted(nsr_obj.nsr_cfg_config_attributes_dict.items()))
104 for config_attributes_dict in sorted_dict.values():
105 # Iterate through each priority level
106 for config_priority in config_attributes_dict:
107 # Iterate through each vnfr at this priority level
108 vnfr = nsr_obj._vnfr_dict[config_priority['id']]
109 self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr['vnf_cfg']))
110 nsr_obj.vnf_cfg_list.append(vnfr['vnf_cfg'])
111 self.pending_cfg.append(nsr_obj)
112
113 def add_nsr_obj(self, nsr_obj):
114 self._log.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj)
115 self._nsr_objs[nsr_obj.nsr_id] = nsr_obj
116
117 def remove_nsr_obj(self, nsr_id):
118 self._log.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id)
119 del self._nsr_objs[nsr_id]
120
121 def get_nsr_obj(self, nsr_id):
122 self._log.debug("Returning nsr_obj (%s) from Configuration Manager", self._nsr_objs[nsr_id])
123 return self._nsr_objs.get(nsr_id)
124
125 @asyncio.coroutine
126 def configuration_handler(self):
127 @asyncio.coroutine
128 def process_vnf_cfg(agent_vnfr, nsr_obj):
129 vnf_cfg = agent_vnfr.vnf_cfg
130 done = False
131
132 if vnf_cfg['cfg_retries']:
133 # This failed previously, lets give it some time
134 yield from asyncio.sleep(5, loop=self._loop)
135
136 vnf_cfg['cfg_retries'] += 1
137
138 # Check to see if this vnfr is managed
139 done = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
140 'apply_initial_config',
141 nsr_obj.agent_nsr,
142 agent_vnfr)
143 self._log.debug("Apply configuration for VNF={} on attempt {} " \
144 "returned {}".format(log_this_vnf(vnf_cfg),
145 vnf_cfg['cfg_retries'],
146 done))
147
148 if done:
149 self._log.warn("Apply initial config on VNFR {}".
150 format(log_this_vnf(vnf_cfg)))
151 try:
152 yield from nsr_obj.parent.process_vnf_initial_config(
153 nsr_obj,
154 agent_vnfr.vnfr_msg)
155 yield from self.update_vnf_state(vnf_cfg,
156 conmanY.RecordState.READY)
157
158 except Exception as e:
159 nsr_obj.vnf_failed = True
160 self._log.exception(e)
161 yield from self.update_vnf_state(vnf_cfg,
162 conmanY.RecordState.CFG_FAILED)
163
164 else:
165 # Check to see if the VNF configure failed
166 status = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
167 'get_config_status',
168 nsr_obj.agent_nsr,
169 agent_vnfr)
170
171 if status and status == 'error':
172 # Failed configuration
173 nsr_obj.vnf_failed = True
174 done = True
175 yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.CFG_FAILED)
176 self._log.error("Failed to apply configuration for VNF = {}"
177 .format(log_this_vnf(vnf_cfg)))
178
179
180 return done
181
182 @asyncio.coroutine
183 def process_nsr_obj(nsr_obj):
184 # Return status, this will be set to False is if we fail to configure any VNF
185 ret_status = True
186
187 # Reset VNF failed flag
188 nsr_obj.vnf_failed = False
189 vnf_cfg_list = nsr_obj.vnf_cfg_list
190 while vnf_cfg_list:
191 # Check to make sure the NSR is still valid
192 if nsr_obj.parent.is_nsr_valid(nsr_obj.nsr_id) is False:
193 self._log.info("NSR {} not found, could be terminated".
194 format(nsr_obj.nsr_id))
195 return
196
197 # Need while loop here, since we will be removing list item
198 vnf_cfg = vnf_cfg_list.pop(0)
199 self._log.info("Applying Pending Configuration for VNF = %s / %s",
200 log_this_vnf(vnf_cfg), vnf_cfg['agent_vnfr'])
201 vnf_done = yield from process_vnf_cfg(vnf_cfg['agent_vnfr'], nsr_obj)
202 self._log.debug("Applied Pending Configuration for VNF = {}, status={}"
203 .format(log_this_vnf(vnf_cfg), vnf_done))
204
205 if not vnf_done:
206 # We will retry, but we will give other VNF chance first since this one failed.
207 vnf_cfg_list.append(vnf_cfg)
208
209 if nsr_obj.vnf_failed:
210 # Atleast one VNF config failed
211 ret_status = False
212
213 if ret_status:
214 # Apply NS initial config if present
215 nsr_obj.nsr_failed = False
216 self._log.debug("Apply initial config on NSR {}".format(nsr_obj.nsr_name))
217 try:
218 yield from nsr_obj.parent.process_ns_initial_config(nsr_obj)
219 except Exception as e:
220 nsr_obj.nsr_failed = True
221 self._log.exception(e)
222 ret_status = False
223
224 # Set the config status for the NSR
225 if ret_status:
226 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY)
227 elif nsr_obj.vnf_failed or nsr_obj.nsr_failed:
228 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED)
229 return ret_status
230
231 # Basically, this loop will never end.
232 while True:
233 # Check the pending tasks are complete
234 # Store a list of tasks that are completed and
235 # remove from the pending_tasks list outside loop
236 ids = []
237 for nsr_id, task in self.pending_tasks.items():
238 if task.done():
239 ids.append(nsr_id)
240 e = task.exception()
241 if e:
242 self._log.error("Exception in configuring nsr {}: {}".
243 format(nsr_id, e))
244 nsr_obj = self.get_nsr_obj(nsr_id)
245 if nsr_obj:
246 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e))
247
248 else:
249 rc = task.result()
250 self._log.debug("NSR {} configured: {}".format(nsr_id, rc))
251 else:
252 self._log.debug("NSR {} still configuring".format(nsr_id))
253
254 # Remove the completed tasks
255 for nsr_id in ids:
256 self.pending_tasks.pop(nsr_id)
257
258 # TODO (pjoseph): Fix this
259 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
260 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
261 # wrongfully 10 seconds in advance?
262 yield from asyncio.sleep(10, loop=self._loop)
263
264 if self.pending_cfg:
265 # get first NS, pending_cfg is nsr_obj list
266 nsr_obj = self.pending_cfg[0]
267 nsr_done = False
268 if nsr_obj.being_deleted is False:
269 # Process this NS, returns back same obj is successfull or exceeded retries
270 try:
271 self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name))
272
273 # Check if we already have a task running for this NSR
274 # Case where we are still configuring and terminate is called
275 if nsr_obj.nsr_id in self.pending_tasks:
276 self._log.error("NSR {} in state {} has a configure task running.".
277 format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state()))
278 # Terminate the task for this NSR
279 self.pending_tasks[nsr_obj.nsr_id].cancel()
280
281 yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS)
282
283 # Call in a separate thread
284 self.pending_tasks[nsr_obj.nsr_id] = \
285 self._loop.create_task(
286 process_nsr_obj(nsr_obj)
287 )
288
289 # Remove this nsr_obj
290 self.pending_cfg.remove(nsr_obj)
291
292 except Exception as e:
293 self._log.error("Failed to process NSR as %s", str(e))
294 self._log.exception(e)
295
296
297 @asyncio.coroutine
298 def register(self):
299 # Perform register() for all handlers
300 for reg in self._handlers:
301 yield from reg.register()
302
303 asyncio.ensure_future(self.configuration_handler(), loop=self._loop)
304
305 class ConfigManagerTasklet(rift.tasklets.Tasklet):
306 def __init__(self, *args, **kwargs):
307 super(ConfigManagerTasklet, self).__init__(*args, **kwargs)
308 self.rwlog.set_category("rw-conman-log")
309
310 self._dts = None
311 self._con_man = None
312
313 def start(self):
314 super(ConfigManagerTasklet, self).start()
315
316 self.log.debug("Registering with dts")
317
318 self._dts = rift.tasklets.DTS(self.tasklet_info,
319 conmanY.get_schema(),
320 self.loop,
321 self.on_dts_state_change)
322
323 self.log.debug("Created DTS Api GI Object: %s", self._dts)
324
325 def on_instance_started(self):
326 self.log.debug("Got instance started callback")
327
328 @asyncio.coroutine
329 def init(self):
330 self._log.info("Initializing the Configuration-Manager tasklet")
331 self._con_man = ConfigurationManager(self.log,
332 self.loop,
333 self._dts)
334 yield from self._con_man.register()
335
336 @asyncio.coroutine
337 def run(self):
338 pass
339
340 @asyncio.coroutine
341 def on_dts_state_change(self, state):
342 """Take action according to current dts state to transition
343 application into the corresponding application state
344
345 Arguments
346 state - current dts state
347 """
348 switch = {
349 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
350 rwdts.State.CONFIG: rwdts.State.RUN,
351 }
352
353 handlers = {
354 rwdts.State.INIT: self.init,
355 rwdts.State.RUN: self.run,
356 }
357
358 # Transition application to next state
359 handler = handlers.get(state, None)
360 if handler is not None:
361 yield from handler()
362
363 # Transition dts to next state
364 next_state = switch.get(state, None)
365 if next_state is not None:
366 self._dts.handle.set_state(next_state)