Merge from OSM SO master
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconmantasklet.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 '''
19 This file - ConfigManagerTasklet()
20 |
21 +
22 |
23 ConfigManagerProject()
24 |
25 +--|--> ConfigurationManager()
26 |
27 +--> rwconman_config.py - ConfigManagerConfig()
28 | |
29 | +--> ConfigManagerNSR()
30 |
31 +--> rwconman_events.py - ConfigManagerEvents()
32 |
33 +--> ConfigManagerROif()
34
35 '''
36
37 import asyncio
38 import logging
39 import os
40
41 import gi
42 gi.require_version('RwDts', '1.0')
43 gi.require_version('RwConmanYang', '1.0')
44
45 from gi.repository import (
46 RwDts as rwdts,
47 RwConmanYang as conmanY,
48 )
49
50 import rift.tasklets
51 from rift.mano.utils.project import (
52 ManoProject,
53 ProjectHandler,
54 )
55
56 from . import rwconman_config as Config
57 from . import rwconman_events as Event
58
59 def log_this_vnf(vnf_cfg):
60 log_vnf = ""
61 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
62 for item in used_item_list:
63 if item in vnf_cfg:
64 if item == 'mgmt_ip_address':
65 log_vnf += "({})".format(vnf_cfg[item])
66 else:
67 log_vnf += "{}/".format(vnf_cfg[item])
68 return log_vnf
69
70 class ConfigurationManager(object):
71 def __init__(self, log, loop, dts, project):
72 self._log = log
73 self._loop = loop
74 self._dts = dts
75 self._project = project
76
77 self.cfg_sleep = True
78 self.cfg_dir = os.path.join(os.environ["RIFT_INSTALL"], "etc/conman")
79 self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self)
80 self._event = Event.ConfigManagerEvents(self._dts, self._log, self._loop, self)
81 self.pending_cfg = []
82 self.pending_tasks = {}
83 self._nsr_objs = {}
84 self._task = None # The configuration_handler task
85
86 self._handlers = [
87 self._config,
88 self._event,
89 ]
90
91
92 @asyncio.coroutine
93 def update_vnf_state(self, vnf_cfg, state):
94 nsr_obj = vnf_cfg['nsr_obj']
95 self._log.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj.nsr_name, vnf_cfg['vnfr_name'], state)
96 yield from nsr_obj.update_vnf_cm_state(vnf_cfg['vnfr'], state)
97
98 @asyncio.coroutine
99 def update_ns_state(self, nsr_obj, state):
100 self._log.info("Updating cm-state for NS(%s) to:%s", nsr_obj.nsr_name, state)
101 yield from nsr_obj.update_ns_cm_state(state)
102
103 def add_to_pending(self, nsr_obj):
104
105 if (nsr_obj not in self.pending_cfg and
106 nsr_obj.cm_nsr['state'] == nsr_obj.state_to_string(conmanY.RecordState.RECEIVED)):
107
108 self._log.info("Adding NS={} to pending config list"
109 .format(nsr_obj.nsr_name))
110
111 # Build the list
112 nsr_obj.vnf_cfg_list = []
113 # Sort all the VNF by their configuration attribute priority
114 sorted_dict = dict(sorted(nsr_obj.nsr_cfg_config_attributes_dict.items()))
115 for config_attributes_dict in sorted_dict.values():
116 # Iterate through each priority level
117 for config_priority in config_attributes_dict:
118 # Iterate through each vnfr at this priority level
119 vnfr = nsr_obj._vnfr_dict[config_priority['id']]
120 self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr['vnf_cfg']))
121 nsr_obj.vnf_cfg_list.append(vnfr['vnf_cfg'])
122 self.pending_cfg.append(nsr_obj)
123
124 def add_nsr_obj(self, nsr_obj):
125 self._log.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj)
126 self._nsr_objs[nsr_obj.nsr_id] = nsr_obj
127
128 def remove_nsr_obj(self, nsr_id):
129 self._log.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id)
130 del self._nsr_objs[nsr_id]
131
132 def get_nsr_obj(self, nsr_id):
133 self._log.debug("Returning nsr_obj (%s) from Configuration Manager", self._nsr_objs[nsr_id])
134 return self._nsr_objs.get(nsr_id)
135
136 @asyncio.coroutine
137 def configuration_handler(self):
138 @asyncio.coroutine
139 def process_vnf_cfg(agent_vnfr, nsr_obj):
140 vnf_cfg = agent_vnfr.vnf_cfg
141 done = False
142
143 if vnf_cfg['cfg_retries']:
144 # This failed previously, lets give it some time
145 yield from asyncio.sleep(5, loop=self._loop)
146
147 vnf_cfg['cfg_retries'] += 1
148
149 # Check to see if this vnfr is managed
150 done = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
151 'apply_initial_config',
152 nsr_obj.agent_nsr,
153 agent_vnfr)
154 self._log.debug("Apply configuration for VNF={} on attempt {} " \
155 "returned {}".format(log_this_vnf(vnf_cfg),
156 vnf_cfg['cfg_retries'],
157 done))
158
159 if done:
160 self._log.warn("Apply initial config on VNFR {}".
161 format(log_this_vnf(vnf_cfg)))
162 try:
163 yield from nsr_obj.parent.process_vnf_initial_config(
164 nsr_obj,
165 agent_vnfr.vnfr_msg)
166 yield from self.update_vnf_state(vnf_cfg,
167 conmanY.RecordState.READY)
168
169 except Exception as e:
170 nsr_obj.vnf_failed = True
171 self._log.exception(e)
172 yield from self.update_vnf_state(vnf_cfg,
173 conmanY.RecordState.CFG_FAILED)
174
175 else:
176 # Check to see if the VNF configure failed
177 status = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
178 'get_config_status',
179 nsr_obj.agent_nsr,
180 agent_vnfr)
181
182 if status and status == 'error':
183 # Failed configuration
184 nsr_obj.vnf_failed = True
185 done = True
186 yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.CFG_FAILED)
187 self._log.error("Failed to apply configuration for VNF = {}"
188 .format(log_this_vnf(vnf_cfg)))
189
190
191 return done
192
193 @asyncio.coroutine
194 def process_nsr_obj(nsr_obj):
195 # Return status, this will be set to False is if we fail to configure any VNF
196 ret_status = True
197
198 # Reset VNF failed flag
199 nsr_obj.vnf_failed = False
200 vnf_cfg_list = nsr_obj.vnf_cfg_list
201 while vnf_cfg_list:
202 # Check to make sure the NSR is still valid
203 if nsr_obj.parent.is_nsr_valid(nsr_obj.nsr_id) is False:
204 self._log.info("NSR {} not found, could be terminated".
205 format(nsr_obj.nsr_id))
206 return
207
208 # Need while loop here, since we will be removing list item
209 vnf_cfg = vnf_cfg_list.pop(0)
210 self._log.info("Applying Pending Configuration for VNF = %s / %s",
211 log_this_vnf(vnf_cfg), vnf_cfg['agent_vnfr'])
212 vnf_done = yield from process_vnf_cfg(vnf_cfg['agent_vnfr'], nsr_obj)
213 self._log.debug("Applied Pending Configuration for VNF = {}, status={}"
214 .format(log_this_vnf(vnf_cfg), vnf_done))
215
216 if not vnf_done:
217 # We will retry, but we will give other VNF chance first since this one failed.
218 vnf_cfg_list.append(vnf_cfg)
219
220 if nsr_obj.vnf_failed:
221 # Atleast one VNF config failed
222 ret_status = False
223
224 if ret_status:
225 # Apply NS initial config if present
226 nsr_obj.nsr_failed = False
227 self._log.debug("Apply initial config on NSR {}".format(nsr_obj.nsr_name))
228 try:
229 yield from nsr_obj.parent.process_ns_initial_config(nsr_obj)
230 except Exception as e:
231 nsr_obj.nsr_failed = True
232 self._log.exception(e)
233 ret_status = False
234
235 # Set the config status for the NSR
236 if ret_status:
237 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY)
238 elif nsr_obj.vnf_failed or nsr_obj.nsr_failed:
239 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED)
240 return ret_status
241
242 try:
243 # Basically, this loop will never end.
244 while True:
245 # Check the pending tasks are complete
246 # Store a list of tasks that are completed and
247 # remove from the pending_tasks list outside loop
248 ids = []
249 for nsr_id, task in self.pending_tasks.items():
250 if task.done():
251 ids.append(nsr_id)
252 e = task.exception()
253 if e:
254 self._log.error("Exception in configuring nsr {}: {}".
255 format(nsr_id, e))
256 nsr_obj = self.get_nsr_obj(nsr_id)
257 if nsr_obj:
258 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e))
259
260 else:
261 rc = task.result()
262 self._log.debug("NSR {} configured: {}".format(nsr_id, rc))
263 else:
264 self._log.debug("NSR {} still configuring".format(nsr_id))
265
266 # Remove the completed tasks
267 for nsr_id in ids:
268 self.pending_tasks.pop(nsr_id)
269
270 # TODO (pjoseph): Fix this
271 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
272 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
273 # wrongfully 10 seconds in advance?
274 yield from asyncio.sleep(10, loop=self._loop)
275
276 if self.pending_cfg:
277 # get first NS, pending_cfg is nsr_obj list
278 nsr_obj = self.pending_cfg[0]
279 nsr_done = False
280 if nsr_obj.being_deleted is False:
281 # Process this NS, returns back same obj is successfull or exceeded retries
282 try:
283 self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name))
284
285 # Check if we already have a task running for this NSR
286 # Case where we are still configuring and terminate is called
287 if nsr_obj.nsr_id in self.pending_tasks:
288 self._log.error("NSR {} in state {} has a configure task running.".
289 format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state()))
290 # Terminate the task for this NSR
291 self.pending_tasks[nsr_obj.nsr_id].cancel()
292
293 yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS)
294
295 # Call in a separate thread
296 self.pending_tasks[nsr_obj.nsr_id] = \
297 self._loop.create_task(
298 process_nsr_obj(nsr_obj)
299 )
300
301 # Remove this nsr_obj
302 self.pending_cfg.remove(nsr_obj)
303
304 except Exception as e:
305 self._log.error("Failed to process NSR as %s", str(e))
306 self._log.exception(e)
307
308 except asyncio.CancelledError as e:
309 self._log.debug("Stopped configuration handler for project {}".format(self._project))
310
311 @asyncio.coroutine
312 def register(self):
313 # Perform register() for all handlers
314 for reg in self._handlers:
315 yield from reg.register()
316
317 self._task = asyncio.ensure_future(self.configuration_handler(), loop=self._loop)
318
319 def deregister(self):
320 self._log.debug("De-register conman for project {}".format(self._project.name))
321 self._task.cancel()
322
323 for reg in self._handlers:
324 reg.deregister()
325
326
327 class ConfigManagerProject(ManoProject):
328
329 def __init__(self, name, tasklet, **kw):
330 super(ConfigManagerProject, self).__init__(tasklet.log, name)
331 self.update(tasklet)
332
333 self._con_man = None
334
335 @asyncio.coroutine
336 def register (self):
337 self._log.info("Initializing the Configuration-Manager tasklet")
338 self._con_man = ConfigurationManager(self.log,
339 self.loop,
340 self._dts,
341 self,)
342 yield from self._con_man.register()
343
344 def deregister(self):
345 self._log.debug("De-register project {}".format(self.name))
346 self._con_man.deregister()
347
348
349 class ConfigManagerTasklet(rift.tasklets.Tasklet):
350 def __init__(self, *args, **kwargs):
351 super(ConfigManagerTasklet, self).__init__(*args, **kwargs)
352 self.rwlog.set_category("rw-conman-log")
353
354 self._dts = None
355
356 self.project_handler = None
357 self.projects = {}
358
359 @property
360 def dts(self):
361 return self._dts
362
363 def start(self):
364 super(ConfigManagerTasklet, self).start()
365
366 self.log.debug("Registering with dts")
367
368 self._dts = rift.tasklets.DTS(self.tasklet_info,
369 conmanY.get_schema(),
370 self.loop,
371 self.on_dts_state_change)
372
373 self.log.debug("Created DTS Api GI Object: %s", self._dts)
374
375 def on_instance_started(self):
376 self.log.debug("Got instance started callback")
377
378 @asyncio.coroutine
379 def init(self):
380 self.log.debug("creating project handler")
381 self.project_handler = ProjectHandler(self, ConfigManagerProject)
382 self.project_handler.register()
383
384 @asyncio.coroutine
385 def run(self):
386 pass
387
388 @asyncio.coroutine
389 def on_dts_state_change(self, state):
390 """Take action according to current dts state to transition
391 application into the corresponding application state
392
393 Arguments
394 state - current dts state
395 """
396 switch = {
397 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
398 rwdts.State.CONFIG: rwdts.State.RUN,
399 }
400
401 handlers = {
402 rwdts.State.INIT: self.init,
403 rwdts.State.RUN: self.run,
404 }
405
406 # Transition application to next state
407 handler = handlers.get(state, None)
408 if handler is not None:
409 yield from handler()
410
411 # Transition dts to next state
412 next_state = switch.get(state, None)
413 if next_state is not None:
414 self._dts.handle.set_state(next_state)