update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconmantasklet.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 '''
19 This file - ConfigManagerTasklet()
20 |
21 +
22 |
23 ConfigManagerProject()
24 |
25 +--|--> ConfigurationManager()
26 |
27 +--> rwconman_config.py - ConfigManagerConfig()
28 |
29 +--> ConfigManagerNSR()
30
31 '''
32
33 import asyncio
34 import logging
35 import os
36
37 import gi
38 gi.require_version('RwDts', '1.0')
39 gi.require_version('RwConmanYang', '1.0')
40
41 from gi.repository import (
42 RwDts as rwdts,
43 RwConmanYang as conmanY,
44 )
45
46 import rift.tasklets
47 from rift.mano.utils.project import (
48 ManoProject,
49 ProjectHandler,
50 )
51
52 from . import rwconman_config as Config
53
54 def log_this_vnf(vnf_cfg):
55 log_vnf = ""
56 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
57 for item in used_item_list:
58 if item in vnf_cfg:
59 if item == 'mgmt_ip_address':
60 log_vnf += "({})".format(vnf_cfg[item])
61 else:
62 log_vnf += "{}/".format(vnf_cfg[item])
63 return log_vnf
64
65 class ConfigurationManager(object):
66 def __init__(self, log, loop, dts, project):
67 self._log = log
68 self._loop = loop
69 self._dts = dts
70 self._project = project
71
72 self.cfg_sleep = True
73 self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self)
74 self.pending_cfg = []
75 self.pending_tasks = {}
76 self._nsr_objs = {}
77 self._task = None # The configuration_handler task
78
79 self._handlers = [
80 self._config
81 ]
82
83
84 @asyncio.coroutine
85 def update_vnf_state(self, vnf_cfg, state):
86 nsr_obj = vnf_cfg['nsr_obj']
87 self._log.info("Updating cm-state for VNF(%s/%s) to:%s", nsr_obj.nsr_name, vnf_cfg['vnfr_name'], state)
88 yield from nsr_obj.update_vnf_cm_state(vnf_cfg['vnfr'], state)
89
90 @asyncio.coroutine
91 def update_ns_state(self, nsr_obj, state):
92 self._log.info("Updating cm-state for NS(%s) to:%s", nsr_obj.nsr_name, state)
93 yield from nsr_obj.update_ns_cm_state(state)
94
95 def add_to_pending(self, nsr_obj, cfg_vnfr_list):
96
97 if (nsr_obj not in self.pending_cfg and
98 nsr_obj.cm_nsr['state'] == nsr_obj.state_to_string(conmanY.RecordState.RECEIVED)):
99
100 self._log.info("Adding NS={} to pending config list"
101 .format(nsr_obj.nsr_name))
102
103 for cfg_vnfr in cfg_vnfr_list:
104 self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(cfg_vnfr['vnf_cfg']))
105 nsr_obj.vnf_cfg_list.append(cfg_vnfr['vnf_cfg'])
106 self.pending_cfg.append(nsr_obj)
107
108 def add_nsr_obj(self, nsr_obj):
109 self._log.debug("Adding nsr_obj (%s) to Configuration Manager", nsr_obj)
110 self._nsr_objs[nsr_obj.nsr_id] = nsr_obj
111
112 def remove_nsr_obj(self, nsr_id):
113 self._log.debug("Removing nsr_obj (%s) from Configuration Manager", nsr_id)
114 del self._nsr_objs[nsr_id]
115
116 def get_nsr_obj(self, nsr_id):
117 if nsr_id not in self._nsr_objs:
118 self._log.info("NSR %s not found", nsr_id)
119 return None
120 self._log.debug("Returning nsr_obj (%s) from Configuration Manager", self._nsr_objs[nsr_id])
121 return self._nsr_objs.get(nsr_id)
122
123 @asyncio.coroutine
124 def configuration_handler(self):
125 @asyncio.coroutine
126 def process_vnf_cfg(agent_vnfr, nsr_obj):
127 vnf_cfg = agent_vnfr.vnf_cfg
128 done = False
129
130 if vnf_cfg['cfg_retries']:
131 # This failed previously, lets give it some time
132 yield from asyncio.sleep(5, loop=self._loop)
133
134 vnf_cfg['cfg_retries'] += 1
135
136 # Check to see if this vnfr is managed
137 done = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
138 'apply_initial_config',
139 nsr_obj.agent_nsr,
140 agent_vnfr)
141 self._log.debug("Apply configuration for VNF={} on attempt {} " \
142 "returned {}".format(log_this_vnf(vnf_cfg),
143 vnf_cfg['cfg_retries'],
144 done))
145
146 if done:
147 self._log.debug("Apply initial config on VNFR {}".
148 format(log_this_vnf(vnf_cfg)))
149 try:
150 yield from nsr_obj.parent.process_vnf_initial_config(
151 nsr_obj,
152 agent_vnfr.vnfr_msg,
153 self._project.name)
154 yield from self.update_vnf_state(vnf_cfg,
155 conmanY.RecordState.READY)
156
157 except Exception as e:
158 nsr_obj.vnf_failed = True
159 self._log.exception(e)
160 yield from self.update_vnf_state(vnf_cfg,
161 conmanY.RecordState.CFG_FAILED)
162
163 else:
164 self._log.debug("Getting config status {}".format(log_this_vnf(vnf_cfg)))
165 # Check to see if the VNF configure failed
166 status = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
167 'get_config_status',
168 nsr_obj.agent_nsr,
169 agent_vnfr)
170
171 if status and status == 'error':
172 # Failed configuration
173 nsr_obj.vnf_failed = True
174 done = True
175 yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.CFG_FAILED)
176 self._log.error("Failed to apply configuration for VNF = {}"
177 .format(log_this_vnf(vnf_cfg)))
178
179
180 return done
181
182 @asyncio.coroutine
183 def process_nsr_obj(nsr_obj):
184 # Return status, this will be set to False is if we fail to configure any VNF
185 ret_status = True
186
187 # Reset VNF failed flag
188 nsr_obj.vnf_failed = False
189 vnf_cfg_list = nsr_obj.vnf_cfg_list
190 while vnf_cfg_list:
191 # Check to make sure the NSR is still valid
192 if nsr_obj.parent.is_nsr_valid(nsr_obj.nsr_id) is False:
193 self._log.info("NSR {} not found, could be terminated".
194 format(nsr_obj.nsr_id))
195 return
196
197 # Need while loop here, since we will be removing list item
198 vnf_cfg = vnf_cfg_list.pop(0)
199 self._log.info("Applying Pending Configuration for VNF = %s / %s",
200 log_this_vnf(vnf_cfg), vnf_cfg['agent_vnfr'])
201 vnf_done = yield from process_vnf_cfg(vnf_cfg['agent_vnfr'], nsr_obj)
202 self._log.debug("Applied Pending Configuration for VNF = {}, status={}"
203 .format(log_this_vnf(vnf_cfg), vnf_done))
204
205 if not vnf_done:
206 # We will retry, but we will give other VNF chance first since this one failed.
207 vnf_cfg_list.append(vnf_cfg)
208
209 if nsr_obj.vnf_failed:
210 # Atleast one VNF config failed
211 ret_status = False
212
213 if ret_status:
214 # Apply NS initial config if present
215 nsr_obj.nsr_failed = False
216 self._log.debug("Apply initial config on NSR {}".format(nsr_obj.nsr_name))
217 try:
218 yield from nsr_obj.parent.process_ns_initial_config(nsr_obj, self._project.name)
219 except Exception as e:
220 nsr_obj.nsr_failed = True
221 self._log.exception(e)
222 ret_status = False
223
224 # Set the config status for the NSR
225 if ret_status:
226 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY)
227 elif nsr_obj.vnf_failed or nsr_obj.nsr_failed:
228 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED)
229 return ret_status
230
231 try:
232 # Basically, this loop will never end.
233 while True:
234 # Check the pending tasks are complete
235 # Store a list of tasks that are completed and
236 # remove from the pending_tasks list outside loop
237 ids = []
238 for nsr_id, task in self.pending_tasks.items():
239 if task.done():
240 ids.append(nsr_id)
241 e = task.exception()
242 if e:
243 self._log.error("Exception in configuring nsr {}: {}".
244 format(nsr_id, e))
245 nsr_obj = self.get_nsr_obj(nsr_id)
246 if nsr_obj:
247 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e))
248
249 else:
250 rc = task.result()
251 self._log.debug("NSR {} configured: {}".format(nsr_id, rc))
252 else:
253 self._log.debug("NSR {} still configuring".format(nsr_id))
254
255 # Remove the completed tasks
256 for nsr_id in ids:
257 self.pending_tasks.pop(nsr_id)
258
259 # TODO (pjoseph): Fix this
260 # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
261 # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
262 # wrongfully 10 seconds in advance?
263 yield from asyncio.sleep(10, loop=self._loop)
264
265 if self.pending_cfg:
266 # get first NS, pending_cfg is nsr_obj list
267 nsr_obj = self.pending_cfg[0]
268 nsr_done = False
269 if nsr_obj.being_deleted is False:
270 # Process this NS, returns back same obj is successfull or exceeded retries
271 try:
272 self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name))
273
274 # Check if we already have a task running for this NSR
275 # Case where we are still configuring and terminate is called
276 if nsr_obj.nsr_id in self.pending_tasks:
277 self._log.error("NSR {} in state {} has a configure task running.".
278 format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state()))
279 # Terminate the task for this NSR
280 self.pending_tasks[nsr_obj.nsr_id].cancel()
281
282 yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS)
283
284 # Call in a separate thread
285 self.pending_tasks[nsr_obj.nsr_id] = \
286 self._loop.create_task(
287 process_nsr_obj(nsr_obj)
288 )
289
290 # Remove this nsr_obj
291 self.pending_cfg.remove(nsr_obj)
292
293 except Exception as e:
294 self._log.error("Failed to process NSR as %s", str(e))
295 self._log.exception(e)
296
297 except asyncio.CancelledError as e:
298 self._log.debug("Stopped configuration handler for project {}".format(self._project))
299
300 @asyncio.coroutine
301 def register(self):
302 # Perform register() for all handlers
303 for reg in self._handlers:
304 yield from reg.register()
305
306 self._task = asyncio.ensure_future(self.configuration_handler(), loop=self._loop)
307
308 def deregister(self):
309 self._log.debug("De-register conman for project {}".format(self._project.name))
310 self._task.cancel()
311
312 for reg in self._handlers:
313 reg.deregister()
314
315
316 class ConfigManagerProject(ManoProject):
317
318 def __init__(self, name, tasklet, **kw):
319 super(ConfigManagerProject, self).__init__(tasklet.log, name)
320 self.update(tasklet)
321
322 self._con_man = None
323
324 @asyncio.coroutine
325 def register (self):
326 self._log.info("Initializing the Configuration-Manager tasklet")
327 self._con_man = ConfigurationManager(self.log,
328 self.loop,
329 self._dts,
330 self,)
331 yield from self._con_man.register()
332
333 def deregister(self):
334 self._log.debug("De-register project {}".format(self.name))
335 self._con_man.deregister()
336
337
338 class ConfigManagerTasklet(rift.tasklets.Tasklet):
339 def __init__(self, *args, **kwargs):
340 super(ConfigManagerTasklet, self).__init__(*args, **kwargs)
341 self.rwlog.set_category("rw-conman-log")
342
343 self._dts = None
344
345 self.project_handler = None
346 self.projects = {}
347
348 @property
349 def dts(self):
350 return self._dts
351
352 def start(self):
353 super(ConfigManagerTasklet, self).start()
354
355 self.log.debug("Registering with dts")
356
357 self._dts = rift.tasklets.DTS(self.tasklet_info,
358 conmanY.get_schema(),
359 self.loop,
360 self.on_dts_state_change)
361
362 self.log.debug("Created DTS Api GI Object: %s", self._dts)
363
364 def on_instance_started(self):
365 self.log.debug("Got instance started callback")
366
367 @asyncio.coroutine
368 def init(self):
369 self.log.debug("creating project handler")
370 self.project_handler = ProjectHandler(self, ConfigManagerProject)
371 self.project_handler.register()
372
373 @asyncio.coroutine
374 def run(self):
375 pass
376
377 @asyncio.coroutine
378 def on_dts_state_change(self, state):
379 """Take action according to current dts state to transition
380 application into the corresponding application state
381
382 Arguments
383 state - current dts state
384 """
385 switch = {
386 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
387 rwdts.State.CONFIG: rwdts.State.RUN,
388 }
389
390 handlers = {
391 rwdts.State.INIT: self.init,
392 rwdts.State.RUN: self.run,
393 }
394
395 # Transition application to next state
396 handler = handlers.get(state, None)
397 if handler is not None:
398 yield from handler()
399
400 # Transition dts to next state
401 next_state = switch.get(state, None)
402 if next_state is not None:
403 self._dts.handle.set_state(next_state)