76d7281eaebaa19c7619df5d8b1b55f772ecb4cc
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import logging
8 import logging.handlers
9 import getopt
10 import functools
11 import sys
12 from osm_common import dbmemory
13 from osm_common import dbmongo
14 from osm_common import fslocal
15 from osm_common import msglocal
16 from osm_common import msgkafka
17 from osm_common.dbbase import DbException
18 from osm_common.fsbase import FsException
19 from osm_common.msgbase import MsgException
20 from os import environ, path
21 from n2vc.vnf import N2VC
22 from n2vc import version as N2VC_version
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 __author__ = "Alfonso Tierno"
30
31
32 class LcmException(Exception):
33 pass
34
35
36 class Lcm:
37
38 def __init__(self, config_file):
39 """
40 Init, Connect to database, filesystem storage, and messaging
41 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
42 :return: None
43 """
44
45 self.db = None
46 self.msg = None
47 self.fs = None
48 self.pings_not_received = 1
49
50 # contains created tasks/futures to be able to cancel
51 self.lcm_ns_tasks = {}
52 self.lcm_vim_tasks = {}
53 self.lcm_sdn_tasks = {}
54 # logging
55 self.logger = logging.getLogger('lcm')
56 # load configuration
57 config = self.read_config_file(config_file)
58 self.config = config
59 self.ro_config = {
60 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
61 "tenant": config.get("tenant", "osm"),
62 "logger_name": "lcm.ROclient",
63 "loglevel": "ERROR",
64 }
65
66 self.vca = config["VCA"] # TODO VCA
67 self.loop = None
68
69 # logging
70 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
71 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
72 config["database"]["logger_name"] = "lcm.db"
73 config["storage"]["logger_name"] = "lcm.fs"
74 config["message"]["logger_name"] = "lcm.msg"
75 if "logfile" in config["global"]:
76 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
77 maxBytes=100e6, backupCount=9, delay=0)
78 file_handler.setFormatter(log_formatter_simple)
79 self.logger.addHandler(file_handler)
80 else:
81 str_handler = logging.StreamHandler()
82 str_handler.setFormatter(log_formatter_simple)
83 self.logger.addHandler(str_handler)
84
85 if config["global"].get("loglevel"):
86 self.logger.setLevel(config["global"]["loglevel"])
87
88 # logging other modules
89 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
90 config[k1]["logger_name"] = logname
91 logger_module = logging.getLogger(logname)
92 if "logfile" in config[k1]:
93 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
94 maxBytes=100e6, backupCount=9, delay=0)
95 file_handler.setFormatter(log_formatter_simple)
96 logger_module.addHandler(file_handler)
97 if "loglevel" in config[k1]:
98 logger_module.setLevel(config[k1]["loglevel"])
99
100 self.n2vc = N2VC(
101 log=self.logger,
102 server=config['VCA']['host'],
103 port=config['VCA']['port'],
104 user=config['VCA']['user'],
105 secret=config['VCA']['secret'],
106 # TODO: This should point to the base folder where charms are stored,
107 # if there is a common one (like object storage). Otherwise, leave
108 # it unset and pass it via DeployCharms
109 # artifacts=config['VCA'][''],
110 artifacts=None,
111 )
112 # check version of N2VC
113 # TODO enhance with int conversion or from distutils.version import LooseVersion
114 # or with list(map(int, version.split(".")))
115 if N2VC_version < "0.0.2":
116 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
117 try:
118 if config["database"]["driver"] == "mongo":
119 self.db = dbmongo.DbMongo()
120 self.db.db_connect(config["database"])
121 elif config["database"]["driver"] == "memory":
122 self.db = dbmemory.DbMemory()
123 self.db.db_connect(config["database"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
126 config["database"]["driver"]))
127
128 if config["storage"]["driver"] == "local":
129 self.fs = fslocal.FsLocal()
130 self.fs.fs_connect(config["storage"])
131 else:
132 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
133 config["storage"]["driver"]))
134
135 if config["message"]["driver"] == "local":
136 self.msg = msglocal.MsgLocal()
137 self.msg.connect(config["message"])
138 elif config["message"]["driver"] == "kafka":
139 self.msg = msgkafka.MsgKafka()
140 self.msg.connect(config["message"])
141 else:
142 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
143 config["storage"]["driver"]))
144 except (DbException, FsException, MsgException) as e:
145 self.logger.critical(str(e), exc_info=True)
146 raise LcmException(str(e))
147
148 def update_db(self, item, _id, _desc):
149 try:
150 self.db.replace(item, _id, _desc)
151 except DbException as e:
152 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
153
154 def update_db_2(self, item, _id, _desc):
155 try:
156 self.db.set_one(item, {"_id": _id}, _desc)
157 except DbException as e:
158 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
159
160 async def vim_create(self, vim_content, order_id):
161 vim_id = vim_content["_id"]
162 logging_text = "Task vim_create={} ".format(vim_id)
163 self.logger.debug(logging_text + "Enter")
164 db_vim = None
165 exc = None
166 RO_sdn_id = None
167 RO_sdn_port_mapping = None
168 try:
169 step = "Getting vim-id='{}' from db".format(vim_id)
170 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
171 if "_admin" not in db_vim:
172 db_vim["_admin"] = {}
173 if "deployed" not in db_vim["_admin"]:
174 db_vim["_admin"]["deployed"] = {}
175 db_vim["_admin"]["deployed"]["RO"] = None
176 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
177 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
178 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
179 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
180 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
181 else:
182 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
183 vim_content["config"]["sdn-controller"]))
184
185 step = "Creating vim at RO"
186 RO = ROclient.ROClient(self.loop, **self.ro_config)
187 vim_RO = deepcopy(vim_content)
188 vim_RO.pop("_id", None)
189 vim_RO.pop("_admin", None)
190 vim_RO.pop("schema_version", None)
191 vim_RO.pop("schema_type", None)
192 vim_RO.pop("vim_tenant_name", None)
193 vim_RO["type"] = vim_RO.pop("vim_type")
194 vim_RO.pop("vim_user", None)
195 vim_RO.pop("vim_password", None)
196 if RO_sdn_id:
197 vim_RO["config"]["sdn-controller"] = RO_sdn_id
198 desc = await RO.create("vim", descriptor=vim_RO)
199 RO_vim_id = desc["uuid"]
200 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
201 self.update_db("vim_accounts", vim_id, db_vim)
202
203 step = "Creating vim_account at RO"
204 vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
205 "vim_username": vim_content["vim_user"],
206 "vim_password": vim_content["vim_password"]
207 }
208 if vim_RO.get("config"):
209 vim_account_RO["config"] = vim_RO["config"]
210 if "sdn-controller" in vim_account_RO["config"]:
211 del vim_account_RO["config"]["sdn-controller"]
212 if "sdn-port-mapping" in vim_account_RO["config"]:
213 del vim_account_RO["config"]["sdn-port-mapping"]
214 await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO)
215 db_vim["_admin"]["operationalState"] = "ENABLED"
216 self.update_db("vim_accounts", vim_id, db_vim)
217
218 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
219 return RO_vim_id
220
221 except (ROclient.ROClientException, DbException) as e:
222 self.logger.error(logging_text + "Exit Exception {}".format(e))
223 exc = e
224 except Exception as e:
225 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
226 exc = e
227 finally:
228 if exc and db_vim:
229 db_vim["_admin"]["operationalState"] = "ERROR"
230 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
231 self.update_db("vim_accounts", vim_id, db_vim)
232
233 async def vim_edit(self, vim_content, order_id):
234 vim_id = vim_content["_id"]
235 logging_text = "Task vim_edit={} ".format(vim_id)
236 self.logger.debug(logging_text + "Enter")
237 db_vim = None
238 exc = None
239 RO_sdn_id = None
240 step = "Getting vim-id='{}' from db".format(vim_id)
241 try:
242 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
243 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
244 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
245 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
246 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
247 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get(
248 "RO"):
249 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
250 else:
251 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
252 vim_content["config"]["sdn-controller"]))
253
254 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
255 step = "Editing vim at RO"
256 RO = ROclient.ROClient(self.loop, **self.ro_config)
257 vim_RO = deepcopy(vim_content)
258 vim_RO.pop("_id", None)
259 vim_RO.pop("_admin", None)
260 vim_RO.pop("schema_version", None)
261 vim_RO.pop("schema_type", None)
262 vim_RO.pop("vim_tenant_name", None)
263 if "vim_type" in vim_RO:
264 vim_RO["type"] = vim_RO.pop("vim_type")
265 vim_RO.pop("vim_user", None)
266 vim_RO.pop("vim_password", None)
267 if RO_sdn_id:
268 vim_RO["config"]["sdn-controller"] = RO_sdn_id
269 # TODO make a deep update of sdn-port-mapping
270 if vim_RO:
271 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
272
273 step = "Editing vim-account at RO tenant"
274 vim_account_RO = {}
275 if "config" in vim_content:
276 if "sdn-controller" in vim_content["config"]:
277 del vim_content["config"]["sdn-controller"]
278 if "sdn-port-mapping" in vim_content["config"]:
279 del vim_content["config"]["sdn-port-mapping"]
280 if not vim_content["config"]:
281 del vim_content["config"]
282 for k in ("vim_tenant_name", "vim_password", "config"):
283 if k in vim_content:
284 vim_account_RO[k] = vim_content[k]
285 if "vim_user" in vim_content:
286 vim_content["vim_username"] = vim_content["vim_user"]
287 if vim_account_RO:
288 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
289 db_vim["_admin"]["operationalState"] = "ENABLED"
290 self.update_db("vim_accounts", vim_id, db_vim)
291
292 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
293 return RO_vim_id
294
295 except (ROclient.ROClientException, DbException) as e:
296 self.logger.error(logging_text + "Exit Exception {}".format(e))
297 exc = e
298 except Exception as e:
299 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
300 exc = e
301 finally:
302 if exc and db_vim:
303 db_vim["_admin"]["operationalState"] = "ERROR"
304 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
305 self.update_db("vim_accounts", vim_id, db_vim)
306
307 async def vim_delete(self, vim_id, order_id):
308 logging_text = "Task vim_delete={} ".format(vim_id)
309 self.logger.debug(logging_text + "Enter")
310 db_vim = None
311 exc = None
312 step = "Getting vim from db"
313 try:
314 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
315 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
316 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
317 RO = ROclient.ROClient(self.loop, **self.ro_config)
318 step = "Detaching vim from RO tenant"
319 try:
320 await RO.detach_datacenter(RO_vim_id)
321 except ROclient.ROClientException as e:
322 if e.http_code == 404: # not found
323 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
324 else:
325 raise
326
327 step = "Deleting vim from RO"
328 try:
329 await RO.delete("vim", RO_vim_id)
330 except ROclient.ROClientException as e:
331 if e.http_code == 404: # not found
332 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
333 else:
334 raise
335 else:
336 # nothing to delete
337 self.logger.error(logging_text + "Skipping. There is not RO information at database")
338 self.db.del_one("vim_accounts", {"_id": vim_id})
339 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
340 return None
341
342 except (ROclient.ROClientException, DbException) as e:
343 self.logger.error(logging_text + "Exit Exception {}".format(e))
344 exc = e
345 except Exception as e:
346 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
347 exc = e
348 finally:
349 if exc and db_vim:
350 db_vim["_admin"]["operationalState"] = "ERROR"
351 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
352 self.update_db("vim_accounts", vim_id, db_vim)
353
354 async def sdn_create(self, sdn_content, order_id):
355 sdn_id = sdn_content["_id"]
356 logging_text = "Task sdn_create={} ".format(sdn_id)
357 self.logger.debug(logging_text + "Enter")
358 db_sdn = None
359 exc = None
360 try:
361 step = "Getting sdn from db"
362 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
363 if "_admin" not in db_sdn:
364 db_sdn["_admin"] = {}
365 if "deployed" not in db_sdn["_admin"]:
366 db_sdn["_admin"]["deployed"] = {}
367 db_sdn["_admin"]["deployed"]["RO"] = None
368
369 step = "Creating sdn at RO"
370 RO = ROclient.ROClient(self.loop, **self.ro_config)
371 sdn_RO = deepcopy(sdn_content)
372 sdn_RO.pop("_id", None)
373 sdn_RO.pop("_admin", None)
374 sdn_RO.pop("schema_version", None)
375 sdn_RO.pop("schema_type", None)
376 sdn_RO.pop("description", None)
377 desc = await RO.create("sdn", descriptor=sdn_RO)
378 RO_sdn_id = desc["uuid"]
379 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
380 db_sdn["_admin"]["operationalState"] = "ENABLED"
381 self.update_db("sdns", sdn_id, db_sdn)
382 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
383 return RO_sdn_id
384
385 except (ROclient.ROClientException, DbException) as e:
386 self.logger.error(logging_text + "Exit Exception {}".format(e))
387 exc = e
388 except Exception as e:
389 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
390 exc = e
391 finally:
392 if exc and db_sdn:
393 db_sdn["_admin"]["operationalState"] = "ERROR"
394 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
395 self.update_db("sdns", sdn_id, db_sdn)
396
397 async def sdn_edit(self, sdn_content, order_id):
398 sdn_id = sdn_content["_id"]
399 logging_text = "Task sdn_edit={} ".format(sdn_id)
400 self.logger.debug(logging_text + "Enter")
401 db_sdn = None
402 exc = None
403 step = "Getting sdn from db"
404 try:
405 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
406 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
407 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
408 RO = ROclient.ROClient(self.loop, **self.ro_config)
409 step = "Editing sdn at RO"
410 sdn_RO = deepcopy(sdn_content)
411 sdn_RO.pop("_id", None)
412 sdn_RO.pop("_admin", None)
413 sdn_RO.pop("schema_version", None)
414 sdn_RO.pop("schema_type", None)
415 sdn_RO.pop("description", None)
416 if sdn_RO:
417 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
418 db_sdn["_admin"]["operationalState"] = "ENABLED"
419 self.update_db("sdns", sdn_id, db_sdn)
420
421 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
422 return RO_sdn_id
423
424 except (ROclient.ROClientException, DbException) as e:
425 self.logger.error(logging_text + "Exit Exception {}".format(e))
426 exc = e
427 except Exception as e:
428 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
429 exc = e
430 finally:
431 if exc and db_sdn:
432 db_sdn["_admin"]["operationalState"] = "ERROR"
433 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
434 self.update_db("sdns", sdn_id, db_sdn)
435
436 async def sdn_delete(self, sdn_id, order_id):
437 logging_text = "Task sdn_delete={} ".format(sdn_id)
438 self.logger.debug(logging_text + "Enter")
439 db_sdn = None
440 exc = None
441 step = "Getting sdn from db"
442 try:
443 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
444 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
445 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
446 RO = ROclient.ROClient(self.loop, **self.ro_config)
447 step = "Deleting sdn from RO"
448 try:
449 await RO.delete("sdn", RO_sdn_id)
450 except ROclient.ROClientException as e:
451 if e.http_code == 404: # not found
452 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
453 else:
454 raise
455 else:
456 # nothing to delete
457 self.logger.error(logging_text + "Skipping. There is not RO information at database")
458 self.db.del_one("sdns", {"_id": sdn_id})
459 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
460 return None
461
462 except (ROclient.ROClientException, DbException) as e:
463 self.logger.error(logging_text + "Exit Exception {}".format(e))
464 exc = e
465 except Exception as e:
466 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
467 exc = e
468 finally:
469 if exc and db_sdn:
470 db_sdn["_admin"]["operationalState"] = "ERROR"
471 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
472 self.update_db("sdns", sdn_id, db_sdn)
473
474 def vnfd2RO(self, vnfd, new_id=None):
475 """
476 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
477 :param vnfd: input vnfd
478 :param new_id: overrides vnf id if provided
479 :return: copy of vnfd
480 """
481 ci_file = None
482 try:
483 vnfd_RO = deepcopy(vnfd)
484 vnfd_RO.pop("_id", None)
485 vnfd_RO.pop("_admin", None)
486 if new_id:
487 vnfd_RO["id"] = new_id
488 for vdu in vnfd_RO["vdu"]:
489 if "cloud-init-file" in vdu:
490 base_folder = vnfd["_admin"]["storage"]
491 clout_init_file = "{}/{}/cloud_init/{}".format(
492 base_folder["folder"],
493 base_folder["pkg-dir"],
494 vdu["cloud-init-file"]
495 )
496 ci_file = self.fs.file_open(clout_init_file, "r")
497 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
498 # convert to base 64 or similar
499 clout_init_content = ci_file.read()
500 ci_file.close()
501 ci_file = None
502 vdu.pop("cloud-init-file", None)
503 vdu["cloud-init"] = clout_init_content
504 return vnfd_RO
505 except FsException as e:
506 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
507 finally:
508 if ci_file:
509 ci_file.close()
510
511 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, member_vnf_index,
512 task=None):
513 """
514 Callback both for charm status change and task completion
515 :param model_name: Charm model name
516 :param application_name: Charm application name
517 :param status: Can be
518 - blocked: The unit needs manual intervention
519 - maintenance: The unit is actively deploying/configuring
520 - waiting: The unit is waiting for another charm to be ready
521 - active: The unit is deployed, configured, and ready
522 - error: The charm has failed and needs attention.
523 - terminated: The charm has been destroyed
524 - removing,
525 - removed
526 :param message: detailed message error
527 :param db_nsr: nsr database content
528 :param db_nslcmop: nslcmop database content
529 :param member_vnf_index: NSD member-vnf-index
530 :param task: None for charm status change, or task for completion task callback
531 :return:
532 """
533 nsr_id = None
534 nslcmop_id = None
535 update_nsr = update_nslcmop = False
536 try:
537 nsr_id = db_nsr["_id"]
538 nslcmop_id = db_nslcmop["_id"]
539 nsr_lcm = db_nsr["_admin"]["deployed"]
540 ns_action = db_nslcmop["lcmOperationType"]
541 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
542 member_vnf_index)
543
544 if task:
545 if task.cancelled():
546 self.logger.debug(logging_text + " task Cancelled")
547 # TODO update db_nslcmop
548 return
549
550 if task.done():
551 exc = task.exception()
552 if exc:
553 self.logger.error(logging_text + " task Exception={}".format(exc))
554 if ns_action in ("instantiate", "terminate"):
555 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error"
556 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc)
557 elif ns_action == "action":
558 db_nslcmop["operationState"] = "FAILED"
559 db_nslcmop["detailed-status"] = str(exc)
560 db_nslcmop["statusEnteredTime"] = time()
561 update_nslcmop = True
562 return
563
564 else:
565 self.logger.debug(logging_text + " task Done")
566 # TODO revise with Adam if action is finished and ok when task is done
567 if ns_action == "action":
568 db_nslcmop["operationState"] = "COMPLETED"
569 db_nslcmop["detailed-status"] = "Done"
570 db_nslcmop["statusEnteredTime"] = time()
571 update_nslcmop = True
572 # task is Done, but callback is still ongoing. So ignore
573 return
574 elif status:
575 self.logger.debug(logging_text + " Enter status={}".format(status))
576 if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status:
577 return # same status, ignore
578 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status
579 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message)
580 else:
581 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
582 return
583
584 all_active = True
585 status_map = {}
586 n2vc_error_text = [] # contain text error list. If empty no one is in error status
587 for vnf_index, vca_info in nsr_lcm["VCA"].items():
588 vca_status = vca_info["operational-status"]
589 if vca_status not in status_map:
590 # Initialize it
591 status_map[vca_status] = 0
592 status_map[vca_status] += 1
593
594 if vca_status != "active":
595 all_active = False
596 elif vca_status in ("error", "blocked"):
597 n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status,
598 vca_info["detailed-status"]))
599
600 if all_active:
601 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id,
602 member_vnf_index))
603 db_nsr["config-status"] = "configured"
604 db_nsr["detailed-status"] = "done"
605 db_nslcmop["operationState"] = "COMPLETED"
606 db_nslcmop["detailed-status"] = "Done"
607 db_nslcmop["statusEnteredTime"] = time()
608 elif n2vc_error_text:
609 db_nsr["config-status"] = "failed"
610 error_text = "fail configuring " + ";".join(n2vc_error_text)
611 db_nsr["detailed-status"] = error_text
612 db_nslcmop["operationState"] = "FAILED_TEMP"
613 db_nslcmop["detailed-status"] = error_text
614 db_nslcmop["statusEnteredTime"] = time()
615 else:
616 cs = "configuring: "
617 separator = ""
618 for status, num in status_map.items():
619 cs += separator + "{}: {}".format(status, num)
620 separator = ", "
621 db_nsr["config-status"] = cs
622 db_nsr["detailed-status"] = cs
623 db_nslcmop["detailed-status"] = cs
624 update_nsr = update_nslcmop = True
625
626 except Exception as e:
627 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True)
628 finally:
629 try:
630 if update_nslcmop:
631 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
632 if update_nsr:
633 self.update_db("nsrs", nsr_id, db_nsr)
634 except Exception as e:
635 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
636 member_vnf_index, e), exc_info=True)
637
638 def ns_params_2_RO(self, ns_params):
639 """
640 Creates a RO ns descriptor from OSM ns_instantite params
641 :param ns_params: OSM instantiate params
642 :return: The RO ns descriptor
643 """
644 vim_2_RO = {}
645
646 def vim_account_2_RO(vim_account):
647 if vim_account in vim_2_RO:
648 return vim_2_RO[vim_account]
649 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
650 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
651 # #TODO check if VIM is creating and wait
652 if db_vim["_admin"]["operationalState"] != "ENABLED":
653 raise LcmException("VIM={} is not available. operationalState={}".format(
654 vim_account, db_vim["_admin"]["operationalState"]))
655 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
656 vim_2_RO[vim_account] = RO_vim_id
657 return RO_vim_id
658
659 if not ns_params:
660 return None
661 RO_ns_params = {
662 # "name": ns_params["nsName"],
663 # "description": ns_params.get("nsDescription"),
664 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
665 # "scenario": ns_params["nsdId"],
666 "vnfs": {},
667 "networks": {},
668 }
669 if ns_params.get("ssh-authorized-key"):
670 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
671 if ns_params.get("vnf"):
672 for vnf in ns_params["vnf"]:
673 RO_vnf = {}
674 if "vimAccountId" in vnf:
675 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
676 if RO_vnf:
677 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
678 if ns_params.get("vld"):
679 for vld in ns_params["vld"]:
680 RO_vld = {}
681 if "ip-profile" in vld:
682 RO_vld["ip-profile"] = vld["ip-profile"]
683 if "vim-network-name" in vld:
684 RO_vld["sites"] = []
685 if isinstance(vld["vim-network-name"], dict):
686 for vim_account, vim_net in vld["vim-network-name"].items():
687 RO_vld["sites"].append({
688 "netmap-use": vim_net,
689 "datacenter": vim_account_2_RO(vim_account)
690 })
691 else: # isinstance str
692 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
693 if RO_vld:
694 RO_ns_params["networks"][vld["name"]] = RO_vld
695 return RO_ns_params
696
697 async def ns_instantiate(self, nsr_id, nslcmop_id):
698 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
699 self.logger.debug(logging_text + "Enter")
700 # get all needed from database
701 db_nsr = None
702 db_nslcmop = None
703 db_vnfr = {}
704 exc = None
705 step = "Getting nsr, nslcmop, RO_vims from db"
706 try:
707 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 nsd = db_nsr["nsd"]
710 nsr_name = db_nsr["name"] # TODO short-name??
711 needed_vnfd = {}
712 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
713 for c_vnf in nsd["constituent-vnfd"]:
714 vnfd_id = c_vnf["vnfd-id-ref"]
715 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
716 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
717 if vnfd_id not in needed_vnfd:
718 step = "Getting vnfd={} from db".format(vnfd_id)
719 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
720
721 nsr_lcm = db_nsr["_admin"].get("deployed")
722 if not nsr_lcm:
723 nsr_lcm = db_nsr["_admin"]["deployed"] = {
724 "id": nsr_id,
725 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
726 "nsr_ip": {},
727 "VCA": {},
728 }
729 db_nsr["detailed-status"] = "creating"
730 db_nsr["operational-status"] = "init"
731
732 RO = ROclient.ROClient(self.loop, **self.ro_config)
733
734 # get vnfds, instantiate at RO
735 for vnfd_id, vnfd in needed_vnfd.items():
736 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
737 self.logger.debug(logging_text + step)
738 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
739
740 # look if present
741 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
742 if vnfd_list:
743 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
744 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
745 vnfd_id, vnfd_list[0]["uuid"]))
746 else:
747 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
748 desc = await RO.create("vnfd", descriptor=vnfd_RO)
749 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
750 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
751 self.update_db("nsrs", nsr_id, db_nsr)
752
753 # create nsd at RO
754 nsd_id = nsd["id"]
755 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
756 self.logger.debug(logging_text + step)
757
758 nsd_id_RO = nsd_id + "." + nsd_id[:200]
759 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
760 if nsd_list:
761 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
762 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
763 nsd_id, nsd_list[0]["uuid"]))
764 else:
765 nsd_RO = deepcopy(nsd)
766 nsd_RO["id"] = nsd_id_RO
767 nsd_RO.pop("_id", None)
768 nsd_RO.pop("_admin", None)
769 for c_vnf in nsd_RO["constituent-vnfd"]:
770 vnfd_id = c_vnf["vnfd-id-ref"]
771 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
772 desc = await RO.create("nsd", descriptor=nsd_RO)
773 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
774 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
775 self.update_db("nsrs", nsr_id, db_nsr)
776
777 # Crate ns at RO
778 # if present use it unless in error status
779 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
780 if RO_nsr_id:
781 try:
782 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
783 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
784 desc = await RO.show("ns", RO_nsr_id)
785 except ROclient.ROClientException as e:
786 if e.http_code != HTTPStatus.NOT_FOUND:
787 raise
788 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
789 if RO_nsr_id:
790 ns_status, ns_status_info = RO.check_ns_status(desc)
791 nsr_lcm["RO"]["nsr_status"] = ns_status
792 if ns_status == "ERROR":
793 step = db_nsr["detailed-status"] = "Deleting ns at RO"
794 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
795 await RO.delete("ns", RO_nsr_id)
796 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
797 if not RO_nsr_id:
798 step = db_nsr["detailed-status"] = "Creating ns at RO"
799 self.logger.debug(logging_text + step)
800 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
801 desc = await RO.create("ns", descriptor=RO_ns_params,
802 name=db_nsr["name"],
803 scenario=nsr_lcm["RO"]["nsd_id"])
804 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
805 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
806 nsr_lcm["RO"]["nsr_status"] = "BUILD"
807
808 self.update_db("nsrs", nsr_id, db_nsr)
809 # update VNFR vimAccount
810 step = "Updating VNFR vimAcccount"
811 for vnf_index, vnfr in db_vnfr.items():
812 if vnfr.get("vim-account-id"):
813 continue
814 if db_nsr["instantiate_params"].get("vnf") and db_nsr["instantiate_params"]["vnf"].get(vnf_index) \
815 and db_nsr["instantiate_params"]["vnf"][vnf_index].get("vimAccountId"):
816 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vnf"][vnf_index]["vimAccountId"]
817 else:
818 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
819 self.update_db("vnfrs", vnfr["_id"], vnfr)
820
821 # wait until NS is ready
822 step = ns_status_detailed = "Waiting ns ready at RO"
823 db_nsr["detailed-status"] = ns_status_detailed
824 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
825 deployment_timeout = 2 * 3600 # Two hours
826 while deployment_timeout > 0:
827 desc = await RO.show("ns", RO_nsr_id)
828 ns_status, ns_status_info = RO.check_ns_status(desc)
829 nsr_lcm["RO"]["nsr_status"] = ns_status
830 if ns_status == "ERROR":
831 raise ROclient.ROClientException(ns_status_info)
832 elif ns_status == "BUILD":
833 db_nsr_detailed_status_old = db_nsr["detailed-status"]
834 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
835 if db_nsr_detailed_status_old != db_nsr["detailed-status"]:
836 self.update_db("nsrs", nsr_id, db_nsr)
837 elif ns_status == "ACTIVE":
838 step = "Waiting for management IP address from VIM"
839 try:
840 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
841 break
842 except ROclient.ROClientException as e:
843 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
844 raise e
845 else:
846 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
847 await asyncio.sleep(5, loop=self.loop)
848 deployment_timeout -= 5
849 if deployment_timeout <= 0:
850 raise ROclient.ROClientException("Timeout waiting ns to be ready")
851 step = "Updating VNFRs"
852 for vnf_index, vnfr_deployed in ns_RO_info.items():
853 vnfr = db_vnfr[vnf_index]
854 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
855 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
856 for vdur in vnfr["vdur"]:
857 if vdur["vdu-id-ref"] == vdu_id:
858 vdur["vim-id"] = vdu_deployed.get("vim_id")
859 vdur["ip-address"] = vdu_deployed.get("ip_address")
860 break
861 self.update_db("vnfrs", vnfr["_id"], vnfr)
862
863 db_nsr["detailed-status"] = "Configuring vnfr"
864 self.update_db("nsrs", nsr_id, db_nsr)
865
866 # The parameters we'll need to deploy a charm
867 number_to_configure = 0
868
869 def deploy():
870 """An inner function to deploy the charm from either vnf or vdu
871 """
872
873 # Login to the VCA.
874 # if number_to_configure == 0:
875 # self.logger.debug("Logging into N2VC...")
876 # task = asyncio.ensure_future(self.n2vc.login())
877 # yield from asyncio.wait_for(task, 30.0)
878 # self.logger.debug("Logged into N2VC!")
879
880 # # await self.n2vc.login()
881
882 # Note: The charm needs to exist on disk at the location
883 # specified by charm_path.
884 base_folder = vnfd["_admin"]["storage"]
885 storage_params = self.fs.get_params()
886 charm_path = "{}{}/{}/charms/{}".format(
887 storage_params["path"],
888 base_folder["folder"],
889 base_folder["pkg-dir"],
890 proxy_charm
891 )
892
893 # Setup the runtime parameters for this VNF
894 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
895
896 # ns_name will be ignored in the current version of N2VC
897 # but will be implemented for the next point release.
898 model_name = 'default'
899 application_name = self.n2vc.FormatApplicationName(
900 nsr_name,
901 vnf_index,
902 vnfd['name'],
903 )
904
905 nsr_lcm["VCA"][vnf_index] = {
906 "model": model_name,
907 "application": application_name,
908 "operational-status": "init",
909 "detailed-status": "",
910 "vnfd_id": vnfd_id,
911 }
912
913 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
914 proxy_charm))
915 task = asyncio.ensure_future(
916 self.n2vc.DeployCharms(
917 model_name, # The network service name
918 application_name, # The application name
919 vnfd, # The vnf descriptor
920 charm_path, # Path to charm
921 params, # Runtime params, like mgmt ip
922 {}, # for native charms only
923 self.n2vc_callback, # Callback for status changes
924 db_nsr, # Callback parameter
925 db_nslcmop,
926 vnf_index, # Callback parameter
927 None, # Callback parameter (task)
928 )
929 )
930 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
931 db_nsr, db_nslcmop, vnf_index))
932 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
933
934 # TODO: Make this call inside deploy()
935 # Login to the VCA. If there are multiple calls to login(),
936 # subsequent calls will be a nop and return immediately.
937 await self.n2vc.login()
938
939 step = "Looking for needed vnfd to configure"
940 self.logger.debug(logging_text + step)
941 for c_vnf in nsd["constituent-vnfd"]:
942 vnfd_id = c_vnf["vnfd-id-ref"]
943 vnf_index = str(c_vnf["member-vnf-index"])
944 vnfd = needed_vnfd[vnfd_id]
945
946 # Check if this VNF has a charm configuration
947 vnf_config = vnfd.get("vnf-configuration")
948
949 if vnf_config and vnf_config.get("juju"):
950 proxy_charm = vnf_config["juju"]["charm"]
951 params = {}
952
953 if proxy_charm:
954 if 'initial-config-primitive' in vnf_config:
955 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
956
957 deploy()
958 number_to_configure += 1
959
960 # Deploy charms for each VDU that supports one.
961 for vdu in vnfd['vdu']:
962 vdu_config = vdu.get('vdu-configuration')
963 proxy_charm = None
964 params = {}
965
966 if vdu_config and vdu_config.get("juju"):
967 proxy_charm = vdu_config["juju"]["charm"]
968
969 if 'initial-config-primitive' in vdu_config:
970 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
971
972 if proxy_charm:
973 deploy()
974 number_to_configure += 1
975
976 if number_to_configure:
977 db_nsr["config-status"] = "configuring"
978 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
979 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
980 else:
981 db_nslcmop["operationState"] = "COMPLETED"
982 db_nslcmop["detailed-status"] = "done"
983 db_nsr["config-status"] = "configured"
984 db_nsr["detailed-status"] = "done"
985 db_nsr["operational-status"] = "running"
986 self.update_db("nsrs", nsr_id, db_nsr)
987 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
988 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
989 return nsr_lcm
990
991 except (ROclient.ROClientException, DbException, LcmException) as e:
992 self.logger.error(logging_text + "Exit Exception {}".format(e))
993 exc = e
994 except Exception as e:
995 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
996 exc = e
997 finally:
998 if exc:
999 if db_nsr:
1000 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
1001 db_nsr["operational-status"] = "failed"
1002 self.update_db("nsrs", nsr_id, db_nsr)
1003 if db_nslcmop:
1004 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
1005 db_nslcmop["operationState"] = "FAILED"
1006 db_nslcmop["statusEnteredTime"] = time()
1007 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
1008
1009 async def ns_terminate(self, nsr_id, nslcmop_id):
1010 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1011 self.logger.debug(logging_text + "Enter")
1012 db_nsr = None
1013 db_nslcmop = None
1014 exc = None
1015 step = "Getting nsr, nslcmop from db"
1016 failed_detail = [] # annotates all failed error messages
1017 vca_task_list = []
1018 vca_task_dict = {}
1019 try:
1020 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1021 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1022 # nsd = db_nsr["nsd"]
1023 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
1024 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1025 return
1026 # TODO ALF remove
1027 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1028 # #TODO check if VIM is creating and wait
1029 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1030
1031 db_nsr_update = {
1032 "operational-status": "terminating",
1033 "config-status": "terminating",
1034 "detailed-status": "Deleting charms",
1035 }
1036 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1037
1038 try:
1039 self.logger.debug(logging_text + step)
1040 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
1041 if deploy_info and deploy_info.get("application"):
1042 task = asyncio.ensure_future(
1043 self.n2vc.RemoveCharms(
1044 deploy_info['model'],
1045 deploy_info['application'],
1046 # self.n2vc_callback,
1047 # db_nsr,
1048 # db_nslcmop,
1049 # vnf_index,
1050 )
1051 )
1052 vca_task_list.append(task)
1053 vca_task_dict[vnf_index] = task
1054 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1055 # deploy_info['application'], None, db_nsr,
1056 # db_nslcmop, vnf_index))
1057 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1058 except Exception as e:
1059 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1060 # remove from RO
1061
1062 RO = ROclient.ROClient(self.loop, **self.ro_config)
1063 # Delete ns
1064 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1065 if RO_nsr_id:
1066 try:
1067 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1068 self.logger.debug(logging_text + step)
1069 await RO.delete("ns", RO_nsr_id)
1070 nsr_lcm["RO"]["nsr_id"] = None
1071 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1072 except ROclient.ROClientException as e:
1073 if e.http_code == 404: # not found
1074 nsr_lcm["RO"]["nsr_id"] = None
1075 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1076 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1077 elif e.http_code == 409: # conflict
1078 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1079 self.logger.debug(logging_text + failed_detail[-1])
1080 else:
1081 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1082 self.logger.error(logging_text + failed_detail[-1])
1083
1084 # Delete nsd
1085 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1086 if RO_nsd_id:
1087 try:
1088 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1089 await RO.delete("nsd", RO_nsd_id)
1090 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1091 nsr_lcm["RO"]["nsd_id"] = None
1092 except ROclient.ROClientException as e:
1093 if e.http_code == 404: # not found
1094 nsr_lcm["RO"]["nsd_id"] = None
1095 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1096 elif e.http_code == 409: # conflict
1097 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1098 self.logger.debug(logging_text + failed_detail[-1])
1099 else:
1100 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1101 self.logger.error(logging_text + failed_detail[-1])
1102
1103 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1104 if not RO_vnfd_id:
1105 continue
1106 try:
1107 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1108 await RO.delete("vnfd", RO_vnfd_id)
1109 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1110 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1111 except ROclient.ROClientException as e:
1112 if e.http_code == 404: # not found
1113 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1114 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1115 elif e.http_code == 409: # conflict
1116 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1117 self.logger.debug(logging_text + failed_detail[-1])
1118 else:
1119 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1120 self.logger.error(logging_text + failed_detail[-1])
1121
1122 if vca_task_list:
1123 await asyncio.wait(vca_task_list, timeout=300)
1124 for vnf_index, task in vca_task_dict.items():
1125 if task.cancelled():
1126 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1127 elif task.done():
1128 exc = task.exception()
1129 if exc:
1130 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1131 else:
1132 nsr_lcm["VCA"][vnf_index] = None
1133 else: # timeout
1134 # TODO Should it be cancelled?!!
1135 task.cancel()
1136 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1137
1138 if failed_detail:
1139 self.logger.error(logging_text + " ;".join(failed_detail))
1140 db_nsr_update = {
1141 "operational-status": "failed",
1142 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1143 "_admin": {"deployed": nsr_lcm, }
1144 }
1145 db_nslcmop_update = {
1146 "detailed-status": "; ".join(failed_detail),
1147 "operationState": "FAILED",
1148 "statusEnteredTime": time()
1149 }
1150 elif db_nslcmop["operationParams"].get("autoremove"):
1151 self.db.del_one("nsrs", {"_id": nsr_id})
1152 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1153 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1154 else:
1155 db_nsr_update = {
1156 "operational-status": "terminated",
1157 "detailed-status": "Done",
1158 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1159 }
1160 db_nslcmop_update = {
1161 "detailed-status": "Done",
1162 "operationState": "COMPLETED",
1163 "statusEnteredTime": time()
1164 }
1165 self.logger.debug(logging_text + "Exit")
1166
1167 except (ROclient.ROClientException, DbException) as e:
1168 self.logger.error(logging_text + "Exit Exception {}".format(e))
1169 exc = e
1170 except Exception as e:
1171 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1172 exc = e
1173 finally:
1174 if exc and db_nslcmop:
1175 db_nslcmop_update = {
1176 "detailed-status": "FAILED {}: {}".format(step, exc),
1177 "operationState": "FAILED",
1178 "statusEnteredTime": time(),
1179 }
1180 if db_nslcmop_update:
1181 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1182 if db_nsr_update:
1183 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1184
1185 async def ns_action(self, nsr_id, nslcmop_id):
1186 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1187 self.logger.debug(logging_text + "Enter")
1188 # get all needed from database
1189 db_nsr = None
1190 db_nslcmop = None
1191 db_nslcmop_update = None
1192 exc = None
1193 try:
1194 step = "Getting information from database"
1195 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1196 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1197 nsr_lcm = db_nsr["_admin"].get("deployed")
1198 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1199
1200 # TODO check if ns is in a proper status
1201 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1202 if not vca_deployed:
1203 raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index))
1204 model_name = vca_deployed.get("model")
1205 application_name = vca_deployed.get("application")
1206 if not model_name or not application_name:
1207 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index))
1208 if vca_deployed["operational-status"] != "active":
1209 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1210 vnf_index, vca_deployed["operational-status"]))
1211 primitive = db_nslcmop["operationParams"]["primitive"]
1212 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1213 callback = None # self.n2vc_callback
1214 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1215 await self.n2vc.login()
1216 task = asyncio.ensure_future(
1217 self.n2vc.ExecutePrimitive(
1218 model_name,
1219 application_name,
1220 primitive, callback,
1221 *callback_args,
1222 **primitive_params
1223 )
1224 )
1225 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1226 # db_nsr, db_nslcmop, vnf_index))
1227 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1228 # wait until completed with timeout
1229 await asyncio.wait((task,), timeout=300)
1230
1231 result = "FAILED" # by default
1232 result_detail = ""
1233 if task.cancelled():
1234 db_nslcmop["detailed-status"] = "Task has been cancelled"
1235 elif task.done():
1236 exc = task.exception()
1237 if exc:
1238 result_detail = str(exc)
1239 else:
1240 self.logger.debug(logging_text + " task Done")
1241 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1242 result = "COMPLETED"
1243 result_detail = "Done"
1244 else: # timeout
1245 # TODO Should it be cancelled?!!
1246 task.cancel()
1247 result_detail = "timeout"
1248
1249 db_nslcmop_update = {
1250 "detailed-status": result_detail,
1251 "operationState": result,
1252 "statusEnteredTime": time()
1253 }
1254 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1255 return # database update is called inside finally
1256
1257 except (DbException, LcmException) as e:
1258 self.logger.error(logging_text + "Exit Exception {}".format(e))
1259 exc = e
1260 except Exception as e:
1261 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1262 exc = e
1263 finally:
1264 if exc and db_nslcmop:
1265 db_nslcmop_update = {
1266 "detailed-status": "FAILED {}: {}".format(step, exc),
1267 "operationState": "FAILED",
1268 "statusEnteredTime": time(),
1269 }
1270 if db_nslcmop_update:
1271 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1272
1273 async def test(self, param=None):
1274 self.logger.debug("Starting/Ending test task: {}".format(param))
1275
1276 def cancel_tasks(self, topic, _id):
1277 """
1278 Cancel all active tasks of a concrete nsr or vim identified for _id
1279 :param topic: can be ns or vim_account
1280 :param _id: nsr or vim identity
1281 :return: None, or raises an exception if not possible
1282 """
1283 if topic == "ns":
1284 lcm_tasks = self.lcm_ns_tasks
1285 elif topic == "vim_account":
1286 lcm_tasks = self.lcm_vim_tasks
1287 elif topic == "sdn":
1288 lcm_tasks = self.lcm_sdn_tasks
1289
1290 if not lcm_tasks.get(_id):
1291 return
1292 for order_id, tasks_set in lcm_tasks[_id].items():
1293 for task_name, task in tasks_set.items():
1294 result = task.cancel()
1295 if result:
1296 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1297 lcm_tasks[_id] = {}
1298
1299 async def kafka_ping(self):
1300 self.logger.debug("Task kafka_ping Enter")
1301 consecutive_errors = 0
1302 first_start = True
1303 kafka_has_received = False
1304 self.pings_not_received = 1
1305 while True:
1306 try:
1307 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1308 # time between pings are low when it is not received and at starting
1309 wait_time = 5 if not kafka_has_received else 120
1310 if not self.pings_not_received:
1311 kafka_has_received = True
1312 self.pings_not_received += 1
1313 await asyncio.sleep(wait_time, loop=self.loop)
1314 if self.pings_not_received > 10:
1315 raise LcmException("It is not receiving pings from Kafka bus")
1316 consecutive_errors = 0
1317 first_start = False
1318 except LcmException:
1319 raise
1320 except Exception as e:
1321 # if not first_start is the first time after starting. So leave more time and wait
1322 # to allow kafka starts
1323 if consecutive_errors == 8 if not first_start else 30:
1324 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1325 raise
1326 consecutive_errors += 1
1327 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1328 wait_time = 1 if not first_start else 5
1329 await asyncio.sleep(wait_time, loop=self.loop)
1330
1331 async def kafka_read(self):
1332 self.logger.debug("Task kafka_read Enter")
1333 order_id = 1
1334 # future = asyncio.Future()
1335 consecutive_errors = 0
1336 first_start = True
1337 while consecutive_errors < 10:
1338 try:
1339 topics = ("admin", "ns", "vim_account", "sdn")
1340 topic, command, params = await self.msg.aioread(topics, self.loop)
1341 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1342 consecutive_errors = 0
1343 first_start = False
1344 order_id += 1
1345 if command == "exit":
1346 print("Bye!")
1347 break
1348 elif command.startswith("#"):
1349 continue
1350 elif command == "echo":
1351 # just for test
1352 print(params)
1353 sys.stdout.flush()
1354 continue
1355 elif command == "test":
1356 asyncio.Task(self.test(params), loop=self.loop)
1357 continue
1358
1359 if topic == "admin":
1360 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1361 self.pings_not_received = 0
1362 continue
1363 elif topic == "ns":
1364 if command == "instantiate":
1365 # self.logger.debug("Deploying NS {}".format(nsr_id))
1366 nslcmop = params
1367 nslcmop_id = nslcmop["_id"]
1368 nsr_id = nslcmop["nsInstanceId"]
1369 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1370 if nsr_id not in self.lcm_ns_tasks:
1371 self.lcm_ns_tasks[nsr_id] = {}
1372 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1373 continue
1374 elif command == "terminate":
1375 # self.logger.debug("Deleting NS {}".format(nsr_id))
1376 nslcmop = params
1377 nslcmop_id = nslcmop["_id"]
1378 nsr_id = nslcmop["nsInstanceId"]
1379 self.cancel_tasks(topic, nsr_id)
1380 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1381 if nsr_id not in self.lcm_ns_tasks:
1382 self.lcm_ns_tasks[nsr_id] = {}
1383 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1384 continue
1385 elif command == "action":
1386 # self.logger.debug("Update NS {}".format(nsr_id))
1387 nslcmop = params
1388 nslcmop_id = nslcmop["_id"]
1389 nsr_id = nslcmop["nsInstanceId"]
1390 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1391 if nsr_id not in self.lcm_ns_tasks:
1392 self.lcm_ns_tasks[nsr_id] = {}
1393 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1394 continue
1395 elif command == "show":
1396 try:
1397 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1398 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1399 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1400 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
1401 db_nsr["detailed-status"],
1402 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1403 except Exception as e:
1404 print("nsr {} not found: {}".format(nsr_id, e))
1405 sys.stdout.flush()
1406 continue
1407 elif command == "deleted":
1408 continue # TODO cleaning of task just in case should be done
1409 elif topic == "vim_account":
1410 vim_id = params["_id"]
1411 if command == "create":
1412 task = asyncio.ensure_future(self.vim_create(params, order_id))
1413 if vim_id not in self.lcm_vim_tasks:
1414 self.lcm_vim_tasks[vim_id] = {}
1415 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1416 continue
1417 elif command == "delete":
1418 self.cancel_tasks(topic, vim_id)
1419 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1420 if vim_id not in self.lcm_vim_tasks:
1421 self.lcm_vim_tasks[vim_id] = {}
1422 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1423 continue
1424 elif command == "show":
1425 print("not implemented show with vim_account")
1426 sys.stdout.flush()
1427 continue
1428 elif command == "edit":
1429 task = asyncio.ensure_future(self.vim_edit(params, order_id))
1430 if vim_id not in self.lcm_vim_tasks:
1431 self.lcm_vim_tasks[vim_id] = {}
1432 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1433 continue
1434 elif topic == "sdn":
1435 _sdn_id = params["_id"]
1436 if command == "create":
1437 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1438 if _sdn_id not in self.lcm_sdn_tasks:
1439 self.lcm_sdn_tasks[_sdn_id] = {}
1440 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1441 continue
1442 elif command == "delete":
1443 self.cancel_tasks(topic, _sdn_id)
1444 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1445 if _sdn_id not in self.lcm_sdn_tasks:
1446 self.lcm_sdn_tasks[_sdn_id] = {}
1447 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1448 continue
1449 elif command == "edit":
1450 task = asyncio.ensure_future(self.sdn_edit(params, order_id))
1451 if _sdn_id not in self.lcm_sdn_tasks:
1452 self.lcm_sdn_tasks[_sdn_id] = {}
1453 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1454 continue
1455 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1456 except Exception as e:
1457 # if not first_start is the first time after starting. So leave more time and wait
1458 # to allow kafka starts
1459 if consecutive_errors == 8 if not first_start else 30:
1460 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1461 raise
1462 consecutive_errors += 1
1463 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1464 wait_time = 2 if not first_start else 5
1465 await asyncio.sleep(wait_time, loop=self.loop)
1466
1467 # self.logger.debug("Task kafka_read terminating")
1468 self.logger.debug("Task kafka_read exit")
1469
1470 def start(self):
1471 self.loop = asyncio.get_event_loop()
1472 self.loop.run_until_complete(asyncio.gather(
1473 self.kafka_read(),
1474 self.kafka_ping()
1475 ))
1476 # TODO
1477 # self.logger.debug("Terminating cancelling creation tasks")
1478 # self.cancel_tasks("ALL", "create")
1479 # timeout = 200
1480 # while self.is_pending_tasks():
1481 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1482 # await asyncio.sleep(2, loop=self.loop)
1483 # timeout -= 2
1484 # if not timeout:
1485 # self.cancel_tasks("ALL", "ALL")
1486 self.loop.close()
1487 self.loop = None
1488 if self.db:
1489 self.db.db_disconnect()
1490 if self.msg:
1491 self.msg.disconnect()
1492 if self.fs:
1493 self.fs.fs_disconnect()
1494
1495 def read_config_file(self, config_file):
1496 # TODO make a [ini] + yaml inside parser
1497 # the configparser library is not suitable, because it does not admit comments at the end of line,
1498 # and not parse integer or boolean
1499 try:
1500 with open(config_file) as f:
1501 conf = yaml.load(f)
1502 for k, v in environ.items():
1503 if not k.startswith("OSMLCM_"):
1504 continue
1505 k_items = k.lower().split("_")
1506 c = conf
1507 try:
1508 for k_item in k_items[1:-1]:
1509 if k_item in ("ro", "vca"):
1510 # put in capital letter
1511 k_item = k_item.upper()
1512 c = c[k_item]
1513 if k_items[-1] == "port":
1514 c[k_items[-1]] = int(v)
1515 else:
1516 c[k_items[-1]] = v
1517 except Exception as e:
1518 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1519
1520 return conf
1521 except Exception as e:
1522 self.logger.critical("At config file '{}': {}".format(config_file, e))
1523 exit(1)
1524
1525
1526 def usage():
1527 print("""Usage: {} [options]
1528 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
1529 -h|--help: shows this help
1530 """.format(sys.argv[0]))
1531 # --log-socket-host HOST: send logs to this host")
1532 # --log-socket-port PORT: send logs using this port (default: 9022)")
1533
1534
1535 if __name__ == '__main__':
1536 try:
1537 # load parameters and configuration
1538 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
1539 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
1540 config_file = None
1541 for o, a in opts:
1542 if o in ("-h", "--help"):
1543 usage()
1544 sys.exit()
1545 elif o in ("-c", "--config"):
1546 config_file = a
1547 # elif o == "--log-socket-port":
1548 # log_socket_port = a
1549 # elif o == "--log-socket-host":
1550 # log_socket_host = a
1551 # elif o == "--log-file":
1552 # log_file = a
1553 else:
1554 assert False, "Unhandled option"
1555 if config_file:
1556 if not path.isfile(config_file):
1557 print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
1558 exit(1)
1559 else:
1560 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
1561 if path.isfile(config_file):
1562 break
1563 else:
1564 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
1565 exit(1)
1566 lcm = Lcm(config_file)
1567 lcm.start()
1568 except getopt.GetoptError as e:
1569 print(str(e), file=sys.stderr)
1570 # usage()
1571 exit(1)