blob: aa24caa1ecec27d77ef36c20525be0728b50f895 [file] [log] [blame]
tierno0aef0db2018-02-01 19:13:07 +01001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
4import asyncio
5import yaml
6import ROclient
7import dbmemory
8import dbmongo
9import fslocal
10import msglocal
tiernof3c4dbc2018-02-05 14:53:28 +010011import msgkafka
tiernoae501922018-02-06 23:17:16 +010012import logging
13import functools
tiernof3a54432018-03-21 11:34:00 +010014import sys
tierno0aef0db2018-02-01 19:13:07 +010015from dbbase import DbException
16from fsbase import FsException
17from msgbase import MsgException
tiernof3c4dbc2018-02-05 14:53:28 +010018from os import environ
Adam Israel354ead92018-03-18 14:46:23 -040019# from vca import DeployApplication, RemoveApplication
20from n2vc.vnf import N2VC
tiernof3a54432018-03-21 11:34:00 +010021# import os.path
22# import time
Adam Israel354ead92018-03-18 14:46:23 -040023
tiernoae501922018-02-06 23:17:16 +010024from copy import deepcopy
25from http import HTTPStatus
tierno0aef0db2018-02-01 19:13:07 +010026
tiernof3a54432018-03-21 11:34:00 +010027
tierno0aef0db2018-02-01 19:13:07 +010028class LcmException(Exception):
29 pass
30
31
32class Lcm:
33
tiernof3c4dbc2018-02-05 14:53:28 +010034 def __init__(self, config_file):
tierno0aef0db2018-02-01 19:13:07 +010035 """
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
38 :return: None
39 """
tiernof3a54432018-03-21 11:34:00 +010040
41 self.db = None
42 self.msg = None
43 self.fs = None
tierno0aef0db2018-02-01 19:13:07 +010044 # contains created tasks/futures to be able to cancel
tiernof3a54432018-03-21 11:34:00 +010045
46 self.lcm_ns_tasks = {}
47 self.lcm_vim_tasks = {}
48 self.lcm_sdn_tasks = {}
tierno0aef0db2018-02-01 19:13:07 +010049 # logging
50 self.logger = logging.getLogger('lcm')
tiernof3c4dbc2018-02-05 14:53:28 +010051 # load configuration
52 config = self.read_config_file(config_file)
53 self.config = config
tiernoae501922018-02-06 23:17:16 +010054 self.ro_config={
55 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
56 "tenant": config.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
58 "loglevel": "ERROR",
59 }
Adam Israel354ead92018-03-18 14:46:23 -040060
tierno0aef0db2018-02-01 19:13:07 +010061 self.vca = config["VCA"] # TODO VCA
62 self.loop = None
tiernoae501922018-02-06 23:17:16 +010063
64 # logging
65 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
67 config["database"]["logger_name"] = "lcm.db"
68 config["storage"]["logger_name"] = "lcm.fs"
69 config["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config["global"]:
71 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
72 maxBytes=100e6, backupCount=9, delay=0)
73 file_handler.setFormatter(log_formatter_simple)
74 self.logger.addHandler(file_handler)
75 else:
76 str_handler = logging.StreamHandler()
77 str_handler.setFormatter(log_formatter_simple)
78 self.logger.addHandler(str_handler)
79
80 if config["global"].get("loglevel"):
81 self.logger.setLevel(config["global"]["loglevel"])
82
83 # logging other modules
84 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config[k1]["logger_name"] = logname
86 logger_module = logging.getLogger(logname)
87 if "logfile" in config[k1]:
88 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
89 maxBytes=100e6, backupCount=9, delay=0)
90 file_handler.setFormatter(log_formatter_simple)
91 logger_module.addHandler(file_handler)
92 if "loglevel" in config[k1]:
93 logger_module.setLevel(config[k1]["loglevel"])
94
Adam Israel354ead92018-03-18 14:46:23 -040095 self.n2vc = N2VC(
96 log=self.logger,
97 server=config['VCA']['host'],
98 port=config['VCA']['port'],
99 user=config['VCA']['user'],
100 secret=config['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
105 artifacts=None,
106 )
107
tierno0aef0db2018-02-01 19:13:07 +0100108 try:
109 if config["database"]["driver"] == "mongo":
tiernoae501922018-02-06 23:17:16 +0100110 self.db = dbmongo.DbMongo()
tierno0aef0db2018-02-01 19:13:07 +0100111 self.db.db_connect(config["database"])
112 elif config["database"]["driver"] == "memory":
tiernoae501922018-02-06 23:17:16 +0100113 self.db = dbmemory.DbMemory()
tierno0aef0db2018-02-01 19:13:07 +0100114 self.db.db_connect(config["database"])
115 else:
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config["database"]["driver"]))
118
119 if config["storage"]["driver"] == "local":
120 self.fs = fslocal.FsLocal()
121 self.fs.fs_connect(config["storage"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config["storage"]["driver"]))
125
126 if config["message"]["driver"] == "local":
tiernoae501922018-02-06 23:17:16 +0100127 self.msg = msglocal.MsgLocal()
tierno0aef0db2018-02-01 19:13:07 +0100128 self.msg.connect(config["message"])
tiernof3c4dbc2018-02-05 14:53:28 +0100129 elif config["message"]["driver"] == "kafka":
130 self.msg = msgkafka.MsgKafka()
131 self.msg.connect(config["message"])
tierno0aef0db2018-02-01 19:13:07 +0100132 else:
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config["storage"]["driver"]))
135 except (DbException, FsException, MsgException) as e:
tiernof3c4dbc2018-02-05 14:53:28 +0100136 self.logger.critical(str(e), exc_info=True)
tierno0aef0db2018-02-01 19:13:07 +0100137 raise LcmException(str(e))
138
tiernof3a54432018-03-21 11:34:00 +0100139 def update_db(self, item, _id, _desc):
tierno0aef0db2018-02-01 19:13:07 +0100140 try:
tiernof3a54432018-03-21 11:34:00 +0100141 self.db.replace(item, _id, _desc)
tiernoae501922018-02-06 23:17:16 +0100142 except DbException as e:
tiernof3a54432018-03-21 11:34:00 +0100143 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
144
145 async def create_vim(self, vim_content, order_id):
146 vim_id = vim_content["_id"]
147 logging_text = "Task create_vim={} ".format(vim_id)
148 self.logger.debug(logging_text + "Enter")
149 db_vim = None
150 exc = None
151 try:
152 step = "Getting vim from db"
153 db_vim = self.db.get_one("vims", {"_id": vim_id})
154 if "_admin" not in db_vim:
155 db_vim["_admin"] = {}
156 if "deploy" not in db_vim["_admin"]:
157 db_vim["_admin"]["deploy"] = {}
158 db_vim["_admin"]["deploy"]["RO"] = None
159
160 step = "Creating vim at RO"
161 RO = ROclient.ROClient(self.loop, **self.ro_config)
162 vim_RO = deepcopy(vim_content)
163 vim_RO.pop("_id", None)
164 vim_RO.pop("_admin", None)
165 vim_RO.pop("schema_version", None)
166 vim_RO.pop("schema_type", None)
167 vim_RO.pop("vim_tenant_name", None)
168 vim_RO["type"] = vim_RO.pop("vim_type")
169 vim_RO.pop("vim_user", None)
170 vim_RO.pop("vim_password", None)
171 desc = await RO.create("vim", descriptor=vim_RO)
172 RO_vim_id = desc["uuid"]
173 db_vim["_admin"]["deploy"]["RO"] = RO_vim_id
174 self.update_db("vims", vim_id, db_vim)
175
176 step = "Attach vim to RO tenant"
177 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
178 "vim_username": vim_content["vim_user"],
179 "vim_password": vim_content["vim_password"],
180 "config": vim_content["config"]
181 }
182 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
183 db_vim["_admin"]["operationalState"] = "ENABLED"
184 self.update_db("vims", vim_id, db_vim)
185
186 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
187 return RO_vim_id
188
189 except (ROclient.ROClientException, DbException) as e:
190 self.logger.error(logging_text + "Exit Exception {}".format(e))
191 exc = e
192 except Exception as e:
193 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
194 exc = e
195 finally:
196 if exc and db_vim:
197 db_vim["_admin"]["operationalState"] = "ERROR"
198 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
199 self.update_db("vims", vim_id, db_vim)
200
201 async def edit_vim(self, vim_content, order_id):
202 vim_id = vim_content["_id"]
203 logging_text = "Task edit_vim={} ".format(vim_id)
204 self.logger.debug(logging_text + "Enter")
205 db_vim = None
206 exc = None
207 step = "Getting vim from db"
208 try:
209 db_vim = self.db.get_one("vims", {"_id": vim_id})
210 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
211 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
212 step = "Editing vim at RO"
213 RO = ROclient.ROClient(self.loop, **self.ro_config)
214 vim_RO = deepcopy(vim_content)
215 vim_RO.pop("_id", None)
216 vim_RO.pop("_admin", None)
217 vim_RO.pop("schema_version", None)
218 vim_RO.pop("schema_type", None)
219 vim_RO.pop("vim_tenant_name", None)
220 vim_RO["type"] = vim_RO.pop("vim_type")
221 vim_RO.pop("vim_user", None)
222 vim_RO.pop("vim_password", None)
223 if vim_RO:
224 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
225
226 step = "Editing vim-account at RO tenant"
227 vim_RO = {}
228 for k in ("vim_tenant_name", "vim_password", "config"):
229 if k in vim_content:
230 vim_RO[k] = vim_content[k]
231 if "vim_user" in vim_content:
232 vim_content["vim_username"] = vim_content["vim_user"]
233 if vim_RO:
234 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
235 db_vim["_admin"]["operationalState"] = "ENABLED"
236 self.update_db("vims", vim_id, db_vim)
237
238 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
239 return RO_vim_id
240
241 except (ROclient.ROClientException, DbException) as e:
242 self.logger.error(logging_text + "Exit Exception {}".format(e))
243 exc = e
244 except Exception as e:
245 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
246 exc = e
247 finally:
248 if exc and db_vim:
249 db_vim["_admin"]["operationalState"] = "ERROR"
250 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
251 self.update_db("vims", vim_id, db_vim)
252
253 async def delete_vim(self, vim_id, order_id):
254 logging_text = "Task delete_vim={} ".format(vim_id)
255 self.logger.debug(logging_text + "Enter")
256 db_vim = None
257 exc = None
258 step = "Getting vim from db"
259 try:
260 db_vim = self.db.get_one("vims", {"_id": vim_id})
261 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
262 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
263 RO = ROclient.ROClient(self.loop, **self.ro_config)
264 step = "Detaching vim from RO tenant"
265 try:
266 await RO.detach_datacenter(RO_vim_id)
267 except ROclient.ROClientException as e:
268 if e.http_code == 404: # not found
269 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
270 else:
271 raise
272
273 step = "Deleting vim from RO"
274 try:
275 await RO.delete("vim", RO_vim_id)
276 except ROclient.ROClientException as e:
277 if e.http_code == 404: # not found
278 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
279 else:
280 raise
281 else:
282 # nothing to delete
283 self.logger.error(logging_text + "Skipping. There is not RO information at database")
284 self.db.del_one("vims", {"_id": vim_id})
285 self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id))
286 return None
287
288 except (ROclient.ROClientException, DbException) as e:
289 self.logger.error(logging_text + "Exit Exception {}".format(e))
290 exc = e
291 except Exception as e:
292 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
293 exc = e
294 finally:
295 if exc and db_vim:
296 db_vim["_admin"]["operationalState"] = "ERROR"
297 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
298 self.update_db("vims", vim_id, db_vim)
299
300 async def create_sdn(self, sdn_content, order_id):
301 sdn_id = sdn_content["_id"]
302 logging_text = "Task create_sdn={} ".format(sdn_id)
303 self.logger.debug(logging_text + "Enter")
304 db_sdn = None
305 exc = None
306 try:
307 step = "Getting sdn from db"
308 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
309 if "_admin" not in db_sdn:
310 db_sdn["_admin"] = {}
311 if "deploy" not in db_sdn["_admin"]:
312 db_sdn["_admin"]["deploy"] = {}
313 db_sdn["_admin"]["deploy"]["RO"] = None
314
315 step = "Creating sdn at RO"
316 RO = ROclient.ROClient(self.loop, **self.ro_config)
317 sdn_RO = deepcopy(sdn_content)
318 sdn_RO.pop("_id", None)
319 sdn_RO.pop("_admin", None)
320 sdn_RO.pop("schema_version", None)
321 sdn_RO.pop("schema_type", None)
322 desc = await RO.create("sdn", descriptor=sdn_RO)
323 RO_sdn_id = desc["uuid"]
324 db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn["_admin"]["operationalState"] = "ENABLED"
326 self.update_db("sdns", sdn_id, db_sdn)
327 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
328 return RO_sdn_id
329
330 except (ROclient.ROClientException, DbException) as e:
331 self.logger.error(logging_text + "Exit Exception {}".format(e))
332 exc = e
333 except Exception as e:
334 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
335 exc = e
336 finally:
337 if exc and db_sdn:
338 db_sdn["_admin"]["operationalState"] = "ERROR"
339 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
340 self.update_db("sdns", sdn_id, db_sdn)
341
342 async def edit_sdn(self, sdn_content, order_id):
343 sdn_id = sdn_content["_id"]
344 logging_text = "Task edit_sdn={} ".format(sdn_id)
345 self.logger.debug(logging_text + "Enter")
346 db_sdn = None
347 exc = None
348 step = "Getting sdn from db"
349 try:
350 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
351 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
352 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
353 RO = ROclient.ROClient(self.loop, **self.ro_config)
354 step = "Editing sdn at RO"
355 sdn_RO = deepcopy(sdn_content)
356 sdn_RO.pop("_id", None)
357 sdn_RO.pop("_admin", None)
358 sdn_RO.pop("schema_version", None)
359 sdn_RO.pop("schema_type", None)
360 if sdn_RO:
361 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
362 db_sdn["_admin"]["operationalState"] = "ENABLED"
363 self.update_db("sdns", sdn_id, db_sdn)
364
365 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
366 return RO_sdn_id
367
368 except (ROclient.ROClientException, DbException) as e:
369 self.logger.error(logging_text + "Exit Exception {}".format(e))
370 exc = e
371 except Exception as e:
372 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
373 exc = e
374 finally:
375 if exc and db_sdn:
376 db_sdn["_admin"]["operationalState"] = "ERROR"
377 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 async def delete_sdn(self, sdn_id, order_id):
381 logging_text = "Task delete_sdn={} ".format(sdn_id)
382 self.logger.debug(logging_text + "Enter")
383 db_sdn = None
384 exc = None
385 step = "Getting sdn from db"
386 try:
387 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
388 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
389 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
390 RO = ROclient.ROClient(self.loop, **self.ro_config)
391 step = "Deleting sdn from RO"
392 try:
393 await RO.delete("sdn", RO_sdn_id)
394 except ROclient.ROClientException as e:
395 if e.http_code == 404: # not found
396 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
397 else:
398 raise
399 else:
400 # nothing to delete
401 self.logger.error(logging_text + "Skipping. There is not RO information at database")
402 self.db.del_one("sdns", {"_id": sdn_id})
403 self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id))
404 return None
405
406 except (ROclient.ROClientException, DbException) as e:
407 self.logger.error(logging_text + "Exit Exception {}".format(e))
408 exc = e
409 except Exception as e:
410 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
411 exc = e
412 finally:
413 if exc and db_sdn:
414 db_sdn["_admin"]["operationalState"] = "ERROR"
415 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
416 self.update_db("sdns", sdn_id, db_sdn)
417
418 def vnfd2RO(self, vnfd, new_id=None):
419 """
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
424 """
425 ci_file = None
426 try:
427 vnfd_RO = deepcopy(vnfd)
428 vnfd_RO.pop("_id", None)
429 vnfd_RO.pop("_admin", None)
430 if new_id:
431 vnfd_RO["id"] = new_id
432 for vdu in vnfd_RO["vdu"]:
433 if "cloud-init-file" in vdu:
434 base_folder = vnfd["_admin"]["storage"]
435 clout_init_file = "{}/{}/cloud_init/{}".format(
436 base_folder["folder"],
437 base_folder["pkg-dir"],
438 vdu["cloud-init-file"]
439 )
440 ci_file = self.fs.file_open(clout_init_file, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content = ci_file.read()
443 ci_file.close()
444 ci_file = None
445 vdu.pop("cloud-init-file", None)
446 vdu["cloud-init"] = clout_init_content
447 return vnfd_RO
448 except FsException as e:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
450 finally:
451 if ci_file:
452 ci_file.close()
tierno0aef0db2018-02-01 19:13:07 +0100453
Adam Israel1cf93af2018-04-04 20:01:47 -0400454 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
Adam Israel354ead92018-03-18 14:46:23 -0400455 """Update the lcm database with the status of the charm.
456
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
tiernof3a54432018-03-21 11:34:00 +0100464 - removing,
465 - removed
Adam Israel354ead92018-03-18 14:46:23 -0400466
467 Updates the network service's config-status to reflect the state of all
468 charms.
469 """
tiernof3a54432018-03-21 11:34:00 +0100470 nsr_id = None
Adam Israel1cf93af2018-04-04 20:01:47 -0400471 try:
Adam Israel1cf93af2018-04-04 20:01:47 -0400472 nsr_id = db_nsr["_id"]
473 nsr_lcm = db_nsr["_admin"]["deploy"]
Adam Israel1cf93af2018-04-04 20:01:47 -0400474 if task:
475 if task.cancelled():
tiernof3a54432018-03-21 11:34:00 +0100476 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index))
Adam Israel1cf93af2018-04-04 20:01:47 -0400477 return
Adam Israel354ead92018-03-18 14:46:23 -0400478
Adam Israel1cf93af2018-04-04 20:01:47 -0400479 if task.done():
480 exc = task.exception()
481 if exc:
tiernof3a54432018-03-21 11:34:00 +0100482 self.logger.error(
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc))
484 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
485 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
486 else:
487 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status:
492 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status))
493 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
494 return # same status, ignore
495 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
Adam Israel1cf93af2018-04-04 20:01:47 -0400498 else:
tiernof3a54432018-03-21 11:34:00 +0100499 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True)
tiernoae501922018-02-06 23:17:16 +0100500 return
tiernof3a54432018-03-21 11:34:00 +0100501
502 some_failed = False
503 all_active = True
504 status_map = {}
505 for vnf_index, vca_info in nsr_lcm["VCA"].items():
506 vca_status = vca_info["operational-status"]
507 if vca_status not in status_map:
508 # Initialize it
509 status_map[vca_status] = 0
510 status_map[vca_status] += 1
511
512 if vca_status != "active":
513 all_active = False
514 if vca_status == "error":
515 some_failed = True
tiernoae501922018-02-06 23:17:16 +0100516 db_nsr["config-status"] = "failed"
tiernof3a54432018-03-21 11:34:00 +0100517 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index,
518 vca_info["detailed-status"])
519 break
520
521 if all_active:
522 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index))
523 db_nsr["config-status"] = "configured"
524 db_nsr["detailed-status"] = "done"
525 elif some_failed:
tiernoae501922018-02-06 23:17:16 +0100526 pass
tiernof3a54432018-03-21 11:34:00 +0100527 else:
528 cs = "configuring: "
529 separator = ""
530 for status, num in status_map.items():
531 cs += separator + "{}: {}".format(status, num)
532 separator = ", "
533 db_nsr["config-status"] = cs
534 self.update_db("nsrs", nsr_id, db_nsr)
535
tiernoae501922018-02-06 23:17:16 +0100536 except Exception as e:
tiernof3a54432018-03-21 11:34:00 +0100537 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True)
tiernoae501922018-02-06 23:17:16 +0100538
539 async def create_ns(self, nsr_id, order_id):
540 logging_text = "Task create_ns={} ".format(nsr_id)
541 self.logger.debug(logging_text + "Enter")
542 # get all needed from database
543 db_nsr = None
544 exc = None
545 step = "Getting nsr from db"
546 try:
547 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
548 nsd = db_nsr["nsd"]
tiernof3a54432018-03-21 11:34:00 +0100549 nsr_name = db_nsr["name"] # TODO short-name??
tiernoae501922018-02-06 23:17:16 +0100550 needed_vnfd = {}
tierno0aef0db2018-02-01 19:13:07 +0100551 for c_vnf in nsd["constituent-vnfd"]:
552 vnfd_id = c_vnf["vnfd-id-ref"]
tiernoae501922018-02-06 23:17:16 +0100553 if vnfd_id not in needed_vnfd:
554 step = "Getting vnfd={} from db".format(vnfd_id)
555 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
556
557 nsr_lcm = {
558 "id": nsr_id,
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
560 "nsr_ip": {},
561 "VCA": {},
562 }
563 db_nsr["_admin"]["deploy"] = nsr_lcm
564 db_nsr["detailed-status"] = "creating"
565 db_nsr["operational-status"] = "init"
566
567 deloyment_timeout = 120
568
569 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
570
571 # get vnfds, instantiate at RO
572 for vnfd_id, vnfd in needed_vnfd.items():
573 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
574 self.logger.debug(logging_text + step)
575 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
tierno0aef0db2018-02-01 19:13:07 +0100576
tiernof3c4dbc2018-02-05 14:53:28 +0100577 # look if present
tiernoae501922018-02-06 23:17:16 +0100578 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
tiernof3c4dbc2018-02-05 14:53:28 +0100579 if vnfd_list:
580 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
tiernoae501922018-02-06 23:17:16 +0100581 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id, vnfd_list[0]["uuid"]))
tiernof3c4dbc2018-02-05 14:53:28 +0100583 else:
tiernof3a54432018-03-21 11:34:00 +0100584 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
tiernoae501922018-02-06 23:17:16 +0100585 desc = await RO.create("vnfd", descriptor=vnfd_RO)
tierno0aef0db2018-02-01 19:13:07 +0100586 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
tiernof3a54432018-03-21 11:34:00 +0100587 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100588
589 # create nsd at RO
tiernoae501922018-02-06 23:17:16 +0100590 nsd_id = nsd["id"]
591 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
592 self.logger.debug(logging_text + step)
tiernof3c4dbc2018-02-05 14:53:28 +0100593
tiernoae501922018-02-06 23:17:16 +0100594 nsd_id_RO = nsd_id + "." + nsd_id[:200]
595 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
tiernof3c4dbc2018-02-05 14:53:28 +0100596 if nsd_list:
597 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
tiernoae501922018-02-06 23:17:16 +0100598 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id, nsd_list[0]["uuid"]))
tiernof3c4dbc2018-02-05 14:53:28 +0100600 else:
tiernoae501922018-02-06 23:17:16 +0100601 nsd_RO = deepcopy(nsd)
602 nsd_RO["id"] = nsd_id_RO
603 nsd_RO.pop("_id", None)
604 nsd_RO.pop("_admin", None)
605 for c_vnf in nsd_RO["constituent-vnfd"]:
606 vnfd_id = c_vnf["vnfd-id-ref"]
607 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
608 desc = await RO.create("nsd", descriptor=nsd_RO)
tierno0aef0db2018-02-01 19:13:07 +0100609 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
tiernof3a54432018-03-21 11:34:00 +0100610 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100611
612 # Crate ns at RO
tiernoae501922018-02-06 23:17:16 +0100613 # if present use it unless in error status
614 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
615 if RO_nsr_id:
616 try:
617 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
618 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
619 desc = await RO.show("ns", RO_nsr_id)
620 except ROclient.ROClientException as e:
621 if e.http_code != HTTPStatus.NOT_FOUND:
622 raise
623 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
624 if RO_nsr_id:
625 ns_status, ns_status_info = RO.check_ns_status(desc)
626 nsr_lcm["RO"]["nsr_status"] = ns_status
627 if ns_status == "ERROR":
628 step = db_nsr["detailed-status"] = "Deleting ns at RO"
629 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
630 await RO.delete("ns", RO_nsr_id)
631 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
632
633 if not RO_nsr_id:
634 step = db_nsr["detailed-status"] = "Creating ns at RO"
635 self.logger.debug(logging_text + step)
636
637 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
638 scenario=nsr_lcm["RO"]["nsd_id"])
639 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
640 nsr_lcm["RO"]["nsr_status"] = "BUILD"
tiernof3a54432018-03-21 11:34:00 +0100641 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100642
643 # wait until NS is ready
tiernoae501922018-02-06 23:17:16 +0100644 step = ns_status_detailed = "Waiting ns ready at RO"
645 db_nsr["detailed-status"] = ns_status_detailed
646 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
tierno0aef0db2018-02-01 19:13:07 +0100647 deloyment_timeout = 600
648 while deloyment_timeout > 0:
tierno0aef0db2018-02-01 19:13:07 +0100649 desc = await RO.show("ns", RO_nsr_id)
650 ns_status, ns_status_info = RO.check_ns_status(desc)
651 nsr_lcm["RO"]["nsr_status"] = ns_status
652 if ns_status == "ERROR":
653 raise ROclient.ROClientException(ns_status_info)
654 elif ns_status == "BUILD":
tiernoae501922018-02-06 23:17:16 +0100655 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
tiernof3a54432018-03-21 11:34:00 +0100656 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100657 elif ns_status == "ACTIVE":
658 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
659 break
660 else:
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
662
663 await asyncio.sleep(5, loop=self.loop)
664 deloyment_timeout -= 5
665 if deloyment_timeout <= 0:
tiernoae501922018-02-06 23:17:16 +0100666 raise ROclient.ROClientException("Timeout waiting ns to be ready")
tiernof3c4dbc2018-02-05 14:53:28 +0100667 db_nsr["detailed-status"] = "Configuring vnfr"
tiernof3a54432018-03-21 11:34:00 +0100668 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100669
tiernoae501922018-02-06 23:17:16 +0100670 vnfd_to_config = 0
671 step = "Looking for needed vnfd to configure"
672 self.logger.debug(logging_text + step)
tierno0aef0db2018-02-01 19:13:07 +0100673 for c_vnf in nsd["constituent-vnfd"]:
674 vnfd_id = c_vnf["vnfd-id-ref"]
tiernoae501922018-02-06 23:17:16 +0100675 vnf_index = str(c_vnf["member-vnf-index"])
676 vnfd = needed_vnfd[vnfd_id]
tierno0aef0db2018-02-01 19:13:07 +0100677 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
tiernoae501922018-02-06 23:17:16 +0100678 vnfd_to_config += 1
tierno0aef0db2018-02-01 19:13:07 +0100679 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
Adam Israelc887cc22018-02-05 13:52:15 +0100680
681 # Note: The charm needs to exist on disk at the location
682 # specified by charm_path.
tierno0aef0db2018-02-01 19:13:07 +0100683 base_folder = vnfd["_admin"]["storage"]
tiernof3a54432018-03-21 11:34:00 +0100684 storage_params = self.fs.get_params()
Adam Israelc887cc22018-02-05 13:52:15 +0100685 charm_path = "{}{}/{}/charms/{}".format(
tiernof3a54432018-03-21 11:34:00 +0100686 storage_params["path"],
Adam Israelc887cc22018-02-05 13:52:15 +0100687 base_folder["folder"],
tiernof3a54432018-03-21 11:34:00 +0100688 base_folder["pkg-dir"],
Adam Israelc887cc22018-02-05 13:52:15 +0100689 proxy_charm
690 )
Adam Israel354ead92018-03-18 14:46:23 -0400691
Adam Israel1cf93af2018-04-04 20:01:47 -0400692 # Setup the runtime parameters for this VNF
693 params = {
694 'rw_mgmt_ip': nsr_lcm['nsr_ip'][vnf_index],
695 }
696
tiernof3a54432018-03-21 11:34:00 +0100697 # model_name will be ignored in the current version of N2VC
Adam Israel1cf93af2018-04-04 20:01:47 -0400698 # but will be implemented for the next point release.
tiernof3a54432018-03-21 11:34:00 +0100699 model_name = 'default'
Adam Israel1cf93af2018-04-04 20:01:47 -0400700 application_name = self.n2vc.FormatApplicationName(
tiernof3a54432018-03-21 11:34:00 +0100701 nsr_name, # 'default',
Adam Israel1cf93af2018-04-04 20:01:47 -0400702 vnf_index,
tiernof3a54432018-03-21 11:34:00 +0100703 vnfd['name'],
Adam Israel1cf93af2018-04-04 20:01:47 -0400704 )
tiernof3a54432018-03-21 11:34:00 +0100705 # TODO N2VC implement this inside n2vc.FormatApplicationName
706 application_name = application_name[:50]
Adam Israel1cf93af2018-04-04 20:01:47 -0400707
708 nsr_lcm["VCA"][vnf_index] = {
tiernof3a54432018-03-21 11:34:00 +0100709 "model": model_name,
Adam Israel1cf93af2018-04-04 20:01:47 -0400710 "application": application_name,
711 "operational-status": "init",
tiernof3a54432018-03-21 11:34:00 +0100712 "detailed-status": "",
Adam Israel1cf93af2018-04-04 20:01:47 -0400713 "vnfd_id": vnfd_id,
714 }
715
tiernof3a54432018-03-21 11:34:00 +0100716 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
Adam Israelc887cc22018-02-05 13:52:15 +0100717 task = asyncio.ensure_future(
Adam Israel1cf93af2018-04-04 20:01:47 -0400718 self.n2vc.DeployCharms(
tiernof3a54432018-03-21 11:34:00 +0100719 model_name, # The network service name
Adam Israel1cf93af2018-04-04 20:01:47 -0400720 application_name, # The application name
721 vnfd, # The vnf descriptor
722 charm_path, # Path to charm
723 params, # Runtime params, like mgmt ip
724 {}, # for native charms only
725 self.n2vc_callback, # Callback for status changes
726 db_nsr, # Callback parameter
727 vnf_index, # Callback parameter
728 None, # Callback parameter (task)
729 )
Adam Israelc887cc22018-02-05 13:52:15 +0100730 )
tiernof3a54432018-03-21 11:34:00 +0100731 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name,
732 None, db_nsr, vnf_index))
Adam Israel354ead92018-03-18 14:46:23 -0400733
tiernof3a54432018-03-21 11:34:00 +0100734 self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
tiernoae501922018-02-06 23:17:16 +0100735 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
tiernof3a54432018-03-21 11:34:00 +0100736 db_nsr["detailed-status"] = "configuring: init: {}".format(vnfd_to_config) if vnfd_to_config else "done"
tiernof3c4dbc2018-02-05 14:53:28 +0100737 db_nsr["operational-status"] = "running"
tiernof3a54432018-03-21 11:34:00 +0100738 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100739
tiernof3a54432018-03-21 11:34:00 +0100740 self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id))
tierno0aef0db2018-02-01 19:13:07 +0100741 return nsr_lcm
742
tiernof3a54432018-03-21 11:34:00 +0100743 except (ROclient.ROClientException, DbException, LcmException) as e:
tiernoae501922018-02-06 23:17:16 +0100744 self.logger.error(logging_text + "Exit Exception {}".format(e))
745 exc = e
Adam Israelc887cc22018-02-05 13:52:15 +0100746 except Exception as e:
tiernoae501922018-02-06 23:17:16 +0100747 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
748 exc = e
749 finally:
750 if exc and db_nsr:
751 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
752 db_nsr["operational-status"] = "failed"
tiernof3a54432018-03-21 11:34:00 +0100753 self.update_db("nsrs", nsr_id, db_nsr)
Adam Israelc887cc22018-02-05 13:52:15 +0100754
tiernoae501922018-02-06 23:17:16 +0100755 async def delete_ns(self, nsr_id, order_id):
756 logging_text = "Task delete_ns={} ".format(nsr_id)
757 self.logger.debug(logging_text + "Enter")
758 db_nsr = None
759 exc = None
760 step = "Getting nsr from db"
761 try:
762 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
763 nsd = db_nsr["nsd"]
764 nsr_lcm = db_nsr["_admin"]["deploy"]
765
tiernof3a54432018-03-21 11:34:00 +0100766 db_nsr["operational-status"] = "terminating"
767 db_nsr["config-status"] = "terminating"
tiernoae501922018-02-06 23:17:16 +0100768 db_nsr["detailed-status"] = "Deleting charms"
tiernof3a54432018-03-21 11:34:00 +0100769 self.update_db("nsrs", nsr_id, db_nsr)
tiernoae501922018-02-06 23:17:16 +0100770
tiernof3c4dbc2018-02-05 14:53:28 +0100771 try:
tiernoae501922018-02-06 23:17:16 +0100772 self.logger.debug(logging_text + step)
773 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
Adam Israel1cf93af2018-04-04 20:01:47 -0400774 if deploy_info and deploy_info.get("application"):
775 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
Adam Israel354ead92018-03-18 14:46:23 -0400776
Adam Israel1cf93af2018-04-04 20:01:47 -0400777 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
tiernoae501922018-02-06 23:17:16 +0100778 task = asyncio.ensure_future(
Adam Israel1cf93af2018-04-04 20:01:47 -0400779 self.n2vc.RemoveCharms(
780 deploy_info['model'],
781 deploy_info['application'],
782 self.n2vc_callback,
783 db_nsr,
784 vnf_index,
785 )
tiernoae501922018-02-06 23:17:16 +0100786 )
tiernof3a54432018-03-21 11:34:00 +0100787 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
788 # deploy_info['application'],None, db_nsr, vnf_index))
789 self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
tiernoae501922018-02-06 23:17:16 +0100790 except Exception as e:
791 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
792 # remove from RO
793
794 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
795 # Delete ns
796 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
797 if RO_nsr_id:
798 try:
799 step = db_nsr["detailed-status"] = "Deleting ns at RO"
800 self.logger.debug(logging_text + step)
801 desc = await RO.delete("ns", RO_nsr_id)
tiernof3c4dbc2018-02-05 14:53:28 +0100802 nsr_lcm["RO"]["nsr_id"] = None
803 nsr_lcm["RO"]["nsr_status"] = "DELETED"
tiernoae501922018-02-06 23:17:16 +0100804 except ROclient.ROClientException as e:
805 if e.http_code == 404: # not found
806 nsr_lcm["RO"]["nsr_id"] = None
807 nsr_lcm["RO"]["nsr_status"] = "DELETED"
808 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
809 elif e.http_code == 409: #conflict
810 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
811 else:
812 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
tiernof3a54432018-03-21 11:34:00 +0100813 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100814
tiernoae501922018-02-06 23:17:16 +0100815 # Delete nsd
816 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
817 if RO_nsd_id:
818 try:
819 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
820 desc = await RO.delete("nsd", RO_nsd_id)
821 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
tiernof3c4dbc2018-02-05 14:53:28 +0100822 nsr_lcm["RO"]["nsd_id"] = None
tiernoae501922018-02-06 23:17:16 +0100823 except ROclient.ROClientException as e:
824 if e.http_code == 404: # not found
825 nsr_lcm["RO"]["nsd_id"] = None
826 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
827 elif e.http_code == 409: #conflict
828 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
829 else:
830 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
tiernof3a54432018-03-21 11:34:00 +0100831 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100832
tiernoae501922018-02-06 23:17:16 +0100833 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
834 if not RO_vnfd_id:
835 continue
836 try:
837 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
838 desc = await RO.delete("vnfd", RO_vnfd_id)
839 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
tierno0aef0db2018-02-01 19:13:07 +0100840 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
tiernoae501922018-02-06 23:17:16 +0100841 except ROclient.ROClientException as e:
842 if e.http_code == 404: # not found
843 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
844 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
845 elif e.http_code == 409: #conflict
846 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
847 else:
848 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
tiernof3a54432018-03-21 11:34:00 +0100849 self.update_db("nsrs", nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100850
tiernoae501922018-02-06 23:17:16 +0100851 # TODO delete from database or mark as deleted???
852 db_nsr["operational-status"] = "terminated"
853 self.db.del_one("nsrs", {"_id": nsr_id})
854 self.logger.debug(logging_text + "Exit")
855
856 except (ROclient.ROClientException, DbException) as e:
857 self.logger.error(logging_text + "Exit Exception {}".format(e))
858 exc = e
859 except Exception as e:
860 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
861 exc = e
862 finally:
863 if exc and db_nsr:
864 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
865 db_nsr["operational-status"] = "failed"
tiernof3a54432018-03-21 11:34:00 +0100866 self.update_db("nsrs", nsr_id, db_nsr)
tiernof3c4dbc2018-02-05 14:53:28 +0100867
tierno0aef0db2018-02-01 19:13:07 +0100868 async def test(self, param=None):
869 self.logger.debug("Starting/Ending test task: {}".format(param))
870
tiernof3a54432018-03-21 11:34:00 +0100871 def cancel_tasks(self, topic, _id):
tierno0aef0db2018-02-01 19:13:07 +0100872 """
tiernof3a54432018-03-21 11:34:00 +0100873 Cancel all active tasks of a concrete nsr or vim identified for _id
874 :param topic: can be ns or vim_account
875 :param _id: nsr or vim identity
tierno0aef0db2018-02-01 19:13:07 +0100876 :return: None, or raises an exception if not possible
877 """
tiernof3a54432018-03-21 11:34:00 +0100878 if topic == "ns":
879 lcm_tasks = self.lcm_ns_tasks
880 elif topic== "vim_account":
881 lcm_tasks = self.lcm_vim_tasks
882 elif topic== "sdn":
883 lcm_tasks = self.lcm_sdn_tasks
884
885 if not lcm_tasks.get(_id):
tierno0aef0db2018-02-01 19:13:07 +0100886 return
tiernof3a54432018-03-21 11:34:00 +0100887 for order_id, tasks_set in lcm_tasks[_id].items():
tierno0aef0db2018-02-01 19:13:07 +0100888 for task_name, task in tasks_set.items():
889 result = task.cancel()
890 if result:
tiernof3a54432018-03-21 11:34:00 +0100891 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
892 lcm_tasks[_id] = {}
tierno0aef0db2018-02-01 19:13:07 +0100893
tierno0aef0db2018-02-01 19:13:07 +0100894 async def read_kafka(self):
tiernof3a54432018-03-21 11:34:00 +0100895 self.logger.debug("Task Kafka Enter")
tierno0aef0db2018-02-01 19:13:07 +0100896 order_id = 1
897 # future = asyncio.Future()
tiernof3a54432018-03-21 11:34:00 +0100898 consecutive_errors = 0
899 while consecutive_errors < 10:
900 try:
901 topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop)
902 self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params))
903 consecutive_errors = 0
904 order_id += 1
905 if command == "exit":
906 print("Bye!")
907 break
908 elif command.startswith("#"):
909 continue
910 elif command == "echo":
911 # just for test
912 print(params)
913 sys.stdout.flush()
914 continue
915 elif command == "test":
916 asyncio.Task(self.test(params), loop=self.loop)
917 continue
tierno0aef0db2018-02-01 19:13:07 +0100918
tiernof3a54432018-03-21 11:34:00 +0100919 if topic == "ns":
920 nsr_id = params.strip()
921 if command == "create":
922 # self.logger.debug("Deploying NS {}".format(nsr_id))
923 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
924 if nsr_id not in self.lcm_ns_tasks:
925 self.lcm_ns_tasks[nsr_id] = {}
926 self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task}
927 continue
928 elif command == "delete":
929 # self.logger.debug("Deleting NS {}".format(nsr_id))
930 self.cancel_tasks(topic, nsr_id)
931 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
932 if nsr_id not in self.lcm_ns_tasks:
933 self.lcm_ns_tasks[nsr_id] = {}
934 self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task}
935 continue
936 elif command == "show":
937 try:
938 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
939 print(
940 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
941 "{}\n deploy: {}\n tasks: {}".format(
tiernoae501922018-02-06 23:17:16 +0100942 nsr_id, db_nsr["operational-status"],
943 db_nsr["config-status"], db_nsr["detailed-status"],
tiernof3a54432018-03-21 11:34:00 +0100944 db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id)))
945 except Exception as e:
946 print("nsr {} not found: {}".format(nsr_id, e))
947 sys.stdout.flush()
948 continue
949 elif topic == "vim_account":
950 vim_id = params["_id"]
951 if command == "create":
952 task = asyncio.ensure_future(self.create_vim(params, order_id))
953 if vim_id not in self.lcm_vim_tasks:
954 self.lcm_vim_tasks[vim_id] = {}
955 self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task}
956 continue
957 elif command == "delete":
958 self.cancel_tasks(topic, vim_id)
959 task = asyncio.ensure_future(self.delete_vim(vim_id, order_id))
960 if vim_id not in self.lcm_vim_tasks:
961 self.lcm_vim_tasks[vim_id] = {}
962 self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task}
963 continue
964 elif command == "show":
965 print("not implemented show with vim_account")
966 sys.stdout.flush()
967 continue
968 elif command == "edit":
969 task = asyncio.ensure_future(self.edit_vim(vim_id, order_id))
970 if vim_id not in self.lcm_vim_tasks:
971 self.lcm_vim_tasks[vim_id] = {}
972 self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task}
973 continue
974 elif topic == "sdn":
975 _sdn_id = params["_id"]
976 if command == "create":
977 task = asyncio.ensure_future(self.create_sdn(params, order_id))
978 if _sdn_id not in self.lcm_sdn_tasks:
979 self.lcm_sdn_tasks[_sdn_id] = {}
980 self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task}
981 continue
982 elif command == "delete":
983 self.cancel_tasks(topic, _sdn_id)
984 task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id))
985 if _sdn_id not in self.lcm_sdn_tasks:
986 self.lcm_sdn_tasks[_sdn_id] = {}
987 self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task}
988 continue
989 elif command == "edit":
990 task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id))
991 if _sdn_id not in self.lcm_sdn_tasks:
992 self.lcm_sdn_tasks[_sdn_id] = {}
993 self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task}
994 continue
995 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
996 except Exception as e:
997 if consecutive_errors == 5:
998 self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e))
999 break
1000 else:
1001 consecutive_errors += 1
1002 self.logger.error("Task Kafka Exception {}".format(e))
1003 await asyncio.sleep(1, loop=self.loop)
1004 self.logger.debug("Task Kafka terminating")
1005 # TODO
1006 # self.cancel_tasks("ALL", "create")
1007 # timeout = 200
1008 # while self.is_pending_tasks():
1009 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1010 # await asyncio.sleep(2, loop=self.loop)
1011 # timeout -= 2
1012 # if not timeout:
1013 # self.cancel_tasks("ALL", "ALL")
1014 self.logger.debug("Task Kafka exit")
tierno0aef0db2018-02-01 19:13:07 +01001015
1016 def start(self):
1017 self.loop = asyncio.get_event_loop()
1018 self.loop.run_until_complete(self.read_kafka())
1019 self.loop.close()
1020 self.loop = None
tiernof3a54432018-03-21 11:34:00 +01001021 if self.db:
1022 self.db.db_disconnect()
1023 if self.msg:
1024 self.msg.disconnect()
1025 if self.fs:
1026 self.fs.fs_disconnect()
tierno0aef0db2018-02-01 19:13:07 +01001027
1028
tiernof3c4dbc2018-02-05 14:53:28 +01001029 def read_config_file(self, config_file):
1030 # TODO make a [ini] + yaml inside parser
1031 # the configparser library is not suitable, because it does not admit comments at the end of line,
1032 # and not parse integer or boolean
1033 try:
1034 with open(config_file) as f:
1035 conf = yaml.load(f)
1036 for k, v in environ.items():
1037 if not k.startswith("OSMLCM_"):
1038 continue
1039 k_items = k.lower().split("_")
1040 c = conf
1041 try:
1042 for k_item in k_items[1:-1]:
1043 if k_item in ("ro", "vca"):
1044 # put in capital letter
1045 k_item = k_item.upper()
1046 c = c[k_item]
1047 if k_items[-1] == "port":
1048 c[k_items[-1]] = int(v)
1049 else:
1050 c[k_items[-1]] = v
1051 except Exception as e:
1052 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1053
1054 return conf
1055 except Exception as e:
1056 self.logger.critical("At config file '{}': {}".format(config_file, e))
tiernof3a54432018-03-21 11:34:00 +01001057 exit(1)
tierno0aef0db2018-02-01 19:13:07 +01001058
1059
tierno0aef0db2018-02-01 19:13:07 +01001060if __name__ == '__main__':
1061
1062 config_file = "lcm.cfg"
tiernof3c4dbc2018-02-05 14:53:28 +01001063 lcm = Lcm(config_file)
tierno0aef0db2018-02-01 19:13:07 +01001064
tierno0aef0db2018-02-01 19:13:07 +01001065 lcm.start()