c679add498e4e042349da939a5fd1e0a074e0d7d
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import logging
8 import logging.handlers
9 import getopt
10 import functools
11 import sys
12 from osm_common import dbmemory
13 from osm_common import dbmongo
14 from osm_common import fslocal
15 from osm_common import msglocal
16 from osm_common import msgkafka
17 from osm_common.dbbase import DbException
18 from osm_common.fsbase import FsException
19 from osm_common.msgbase import MsgException
20 from os import environ, path
21 from n2vc.vnf import N2VC
22 from n2vc import version as N2VC_version
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 __author__ = "Alfonso Tierno"
30
31
32 class LcmException(Exception):
33 pass
34
35
36 class Lcm:
37
38 def __init__(self, config_file):
39 """
40 Init, Connect to database, filesystem storage, and messaging
41 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
42 :return: None
43 """
44
45 self.db = None
46 self.msg = None
47 self.fs = None
48 self.pings_not_received = 1
49
50 # contains created tasks/futures to be able to cancel
51 self.lcm_ns_tasks = {}
52 self.lcm_vim_tasks = {}
53 self.lcm_sdn_tasks = {}
54 # logging
55 self.logger = logging.getLogger('lcm')
56 # load configuration
57 config = self.read_config_file(config_file)
58 self.config = config
59 self.ro_config = {
60 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
61 "tenant": config.get("tenant", "osm"),
62 "logger_name": "lcm.ROclient",
63 "loglevel": "ERROR",
64 }
65
66 self.vca = config["VCA"] # TODO VCA
67 self.loop = None
68
69 # logging
70 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
71 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
72 config["database"]["logger_name"] = "lcm.db"
73 config["storage"]["logger_name"] = "lcm.fs"
74 config["message"]["logger_name"] = "lcm.msg"
75 if "logfile" in config["global"]:
76 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
77 maxBytes=100e6, backupCount=9, delay=0)
78 file_handler.setFormatter(log_formatter_simple)
79 self.logger.addHandler(file_handler)
80 else:
81 str_handler = logging.StreamHandler()
82 str_handler.setFormatter(log_formatter_simple)
83 self.logger.addHandler(str_handler)
84
85 if config["global"].get("loglevel"):
86 self.logger.setLevel(config["global"]["loglevel"])
87
88 # logging other modules
89 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
90 config[k1]["logger_name"] = logname
91 logger_module = logging.getLogger(logname)
92 if "logfile" in config[k1]:
93 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
94 maxBytes=100e6, backupCount=9, delay=0)
95 file_handler.setFormatter(log_formatter_simple)
96 logger_module.addHandler(file_handler)
97 if "loglevel" in config[k1]:
98 logger_module.setLevel(config[k1]["loglevel"])
99
100 self.n2vc = N2VC(
101 log=self.logger,
102 server=config['VCA']['host'],
103 port=config['VCA']['port'],
104 user=config['VCA']['user'],
105 secret=config['VCA']['secret'],
106 # TODO: This should point to the base folder where charms are stored,
107 # if there is a common one (like object storage). Otherwise, leave
108 # it unset and pass it via DeployCharms
109 # artifacts=config['VCA'][''],
110 artifacts=None,
111 )
112 # check version of N2VC
113 # TODO enhance with int conversion or from distutils.version import LooseVersion
114 # or with list(map(int, version.split(".")))
115 if N2VC_version < "0.0.2":
116 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
117 try:
118 if config["database"]["driver"] == "mongo":
119 self.db = dbmongo.DbMongo()
120 self.db.db_connect(config["database"])
121 elif config["database"]["driver"] == "memory":
122 self.db = dbmemory.DbMemory()
123 self.db.db_connect(config["database"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
126 config["database"]["driver"]))
127
128 if config["storage"]["driver"] == "local":
129 self.fs = fslocal.FsLocal()
130 self.fs.fs_connect(config["storage"])
131 else:
132 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
133 config["storage"]["driver"]))
134
135 if config["message"]["driver"] == "local":
136 self.msg = msglocal.MsgLocal()
137 self.msg.connect(config["message"])
138 elif config["message"]["driver"] == "kafka":
139 self.msg = msgkafka.MsgKafka()
140 self.msg.connect(config["message"])
141 else:
142 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
143 config["storage"]["driver"]))
144 except (DbException, FsException, MsgException) as e:
145 self.logger.critical(str(e), exc_info=True)
146 raise LcmException(str(e))
147
148 def update_db(self, item, _id, _desc):
149 try:
150 self.db.replace(item, _id, _desc)
151 except DbException as e:
152 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
153
154 def update_db_2(self, item, _id, _desc):
155 try:
156 self.db.set_one(item, {"_id": _id}, _desc)
157 except DbException as e:
158 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
159
160 async def vim_create(self, vim_content, order_id):
161 vim_id = vim_content["_id"]
162 logging_text = "Task vim_create={} ".format(vim_id)
163 self.logger.debug(logging_text + "Enter")
164 db_vim = None
165 exc = None
166 RO_sdn_id = None
167 try:
168 step = "Getting vim-id='{}' from db".format(vim_id)
169 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
170 if "_admin" not in db_vim:
171 db_vim["_admin"] = {}
172 if "deployed" not in db_vim["_admin"]:
173 db_vim["_admin"]["deployed"] = {}
174 db_vim["_admin"]["deployed"]["RO"] = None
175 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
176 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
177 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
178 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
179 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
180 else:
181 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
182 vim_content["config"]["sdn-controller"]))
183
184 step = "Creating vim at RO"
185 RO = ROclient.ROClient(self.loop, **self.ro_config)
186 vim_RO = deepcopy(vim_content)
187 vim_RO.pop("_id", None)
188 vim_RO.pop("_admin", None)
189 vim_RO.pop("schema_version", None)
190 vim_RO.pop("schema_type", None)
191 vim_RO.pop("vim_tenant_name", None)
192 vim_RO["type"] = vim_RO.pop("vim_type")
193 vim_RO.pop("vim_user", None)
194 vim_RO.pop("vim_password", None)
195 if RO_sdn_id:
196 vim_RO["config"]["sdn-controller"] = RO_sdn_id
197 desc = await RO.create("vim", descriptor=vim_RO)
198 RO_vim_id = desc["uuid"]
199 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
200 self.update_db("vim_accounts", vim_id, db_vim)
201
202 step = "Creating vim_account at RO"
203 vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
204 "vim_username": vim_content["vim_user"],
205 "vim_password": vim_content["vim_password"]
206 }
207 if vim_RO.get("config"):
208 vim_account_RO["config"] = vim_RO["config"]
209 if "sdn-controller" in vim_account_RO["config"]:
210 del vim_account_RO["config"]["sdn-controller"]
211 if "sdn-port-mapping" in vim_account_RO["config"]:
212 del vim_account_RO["config"]["sdn-port-mapping"]
213 await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO)
214 db_vim["_admin"]["operationalState"] = "ENABLED"
215 self.update_db("vim_accounts", vim_id, db_vim)
216
217 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
218 return RO_vim_id
219
220 except (ROclient.ROClientException, DbException) as e:
221 self.logger.error(logging_text + "Exit Exception {}".format(e))
222 exc = e
223 except Exception as e:
224 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
225 exc = e
226 finally:
227 if exc and db_vim:
228 db_vim["_admin"]["operationalState"] = "ERROR"
229 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
230 self.update_db("vim_accounts", vim_id, db_vim)
231
232 async def vim_edit(self, vim_content, order_id):
233 vim_id = vim_content["_id"]
234 logging_text = "Task vim_edit={} ".format(vim_id)
235 self.logger.debug(logging_text + "Enter")
236 db_vim = None
237 exc = None
238 RO_sdn_id = None
239 step = "Getting vim-id='{}' from db".format(vim_id)
240 try:
241 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
242 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
243 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
244 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
245 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
246 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get(
247 "RO"):
248 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
249 else:
250 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
251 vim_content["config"]["sdn-controller"]))
252
253 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
254 step = "Editing vim at RO"
255 RO = ROclient.ROClient(self.loop, **self.ro_config)
256 vim_RO = deepcopy(vim_content)
257 vim_RO.pop("_id", None)
258 vim_RO.pop("_admin", None)
259 vim_RO.pop("schema_version", None)
260 vim_RO.pop("schema_type", None)
261 vim_RO.pop("vim_tenant_name", None)
262 if "vim_type" in vim_RO:
263 vim_RO["type"] = vim_RO.pop("vim_type")
264 vim_RO.pop("vim_user", None)
265 vim_RO.pop("vim_password", None)
266 if RO_sdn_id:
267 vim_RO["config"]["sdn-controller"] = RO_sdn_id
268 # TODO make a deep update of sdn-port-mapping
269 if vim_RO:
270 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
271
272 step = "Editing vim-account at RO tenant"
273 vim_account_RO = {}
274 if "config" in vim_content:
275 if "sdn-controller" in vim_content["config"]:
276 del vim_content["config"]["sdn-controller"]
277 if "sdn-port-mapping" in vim_content["config"]:
278 del vim_content["config"]["sdn-port-mapping"]
279 if not vim_content["config"]:
280 del vim_content["config"]
281 for k in ("vim_tenant_name", "vim_password", "config"):
282 if k in vim_content:
283 vim_account_RO[k] = vim_content[k]
284 if "vim_user" in vim_content:
285 vim_content["vim_username"] = vim_content["vim_user"]
286 if vim_account_RO:
287 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
288 db_vim["_admin"]["operationalState"] = "ENABLED"
289 self.update_db("vim_accounts", vim_id, db_vim)
290
291 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
292 return RO_vim_id
293
294 except (ROclient.ROClientException, DbException) as e:
295 self.logger.error(logging_text + "Exit Exception {}".format(e))
296 exc = e
297 except Exception as e:
298 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
299 exc = e
300 finally:
301 if exc and db_vim:
302 db_vim["_admin"]["operationalState"] = "ERROR"
303 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
304 self.update_db("vim_accounts", vim_id, db_vim)
305
306 async def vim_delete(self, vim_id, order_id):
307 logging_text = "Task vim_delete={} ".format(vim_id)
308 self.logger.debug(logging_text + "Enter")
309 db_vim = None
310 exc = None
311 step = "Getting vim from db"
312 try:
313 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
314 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
315 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
316 RO = ROclient.ROClient(self.loop, **self.ro_config)
317 step = "Detaching vim from RO tenant"
318 try:
319 await RO.detach_datacenter(RO_vim_id)
320 except ROclient.ROClientException as e:
321 if e.http_code == 404: # not found
322 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
323 else:
324 raise
325
326 step = "Deleting vim from RO"
327 try:
328 await RO.delete("vim", RO_vim_id)
329 except ROclient.ROClientException as e:
330 if e.http_code == 404: # not found
331 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
332 else:
333 raise
334 else:
335 # nothing to delete
336 self.logger.error(logging_text + "Skipping. There is not RO information at database")
337 self.db.del_one("vim_accounts", {"_id": vim_id})
338 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
339 return None
340
341 except (ROclient.ROClientException, DbException) as e:
342 self.logger.error(logging_text + "Exit Exception {}".format(e))
343 exc = e
344 except Exception as e:
345 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
346 exc = e
347 finally:
348 if exc and db_vim:
349 db_vim["_admin"]["operationalState"] = "ERROR"
350 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
351 self.update_db("vim_accounts", vim_id, db_vim)
352
353 async def sdn_create(self, sdn_content, order_id):
354 sdn_id = sdn_content["_id"]
355 logging_text = "Task sdn_create={} ".format(sdn_id)
356 self.logger.debug(logging_text + "Enter")
357 db_sdn = None
358 exc = None
359 try:
360 step = "Getting sdn from db"
361 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
362 if "_admin" not in db_sdn:
363 db_sdn["_admin"] = {}
364 if "deployed" not in db_sdn["_admin"]:
365 db_sdn["_admin"]["deployed"] = {}
366 db_sdn["_admin"]["deployed"]["RO"] = None
367
368 step = "Creating sdn at RO"
369 RO = ROclient.ROClient(self.loop, **self.ro_config)
370 sdn_RO = deepcopy(sdn_content)
371 sdn_RO.pop("_id", None)
372 sdn_RO.pop("_admin", None)
373 sdn_RO.pop("schema_version", None)
374 sdn_RO.pop("schema_type", None)
375 sdn_RO.pop("description", None)
376 desc = await RO.create("sdn", descriptor=sdn_RO)
377 RO_sdn_id = desc["uuid"]
378 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
379 db_sdn["_admin"]["operationalState"] = "ENABLED"
380 self.update_db("sdns", sdn_id, db_sdn)
381 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
382 return RO_sdn_id
383
384 except (ROclient.ROClientException, DbException) as e:
385 self.logger.error(logging_text + "Exit Exception {}".format(e))
386 exc = e
387 except Exception as e:
388 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
389 exc = e
390 finally:
391 if exc and db_sdn:
392 db_sdn["_admin"]["operationalState"] = "ERROR"
393 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
394 self.update_db("sdns", sdn_id, db_sdn)
395
396 async def sdn_edit(self, sdn_content, order_id):
397 sdn_id = sdn_content["_id"]
398 logging_text = "Task sdn_edit={} ".format(sdn_id)
399 self.logger.debug(logging_text + "Enter")
400 db_sdn = None
401 exc = None
402 step = "Getting sdn from db"
403 try:
404 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
405 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
406 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
407 RO = ROclient.ROClient(self.loop, **self.ro_config)
408 step = "Editing sdn at RO"
409 sdn_RO = deepcopy(sdn_content)
410 sdn_RO.pop("_id", None)
411 sdn_RO.pop("_admin", None)
412 sdn_RO.pop("schema_version", None)
413 sdn_RO.pop("schema_type", None)
414 sdn_RO.pop("description", None)
415 if sdn_RO:
416 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
417 db_sdn["_admin"]["operationalState"] = "ENABLED"
418 self.update_db("sdns", sdn_id, db_sdn)
419
420 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
421 return RO_sdn_id
422
423 except (ROclient.ROClientException, DbException) as e:
424 self.logger.error(logging_text + "Exit Exception {}".format(e))
425 exc = e
426 except Exception as e:
427 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
428 exc = e
429 finally:
430 if exc and db_sdn:
431 db_sdn["_admin"]["operationalState"] = "ERROR"
432 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
433 self.update_db("sdns", sdn_id, db_sdn)
434
435 async def sdn_delete(self, sdn_id, order_id):
436 logging_text = "Task sdn_delete={} ".format(sdn_id)
437 self.logger.debug(logging_text + "Enter")
438 db_sdn = None
439 exc = None
440 step = "Getting sdn from db"
441 try:
442 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
443 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
444 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
445 RO = ROclient.ROClient(self.loop, **self.ro_config)
446 step = "Deleting sdn from RO"
447 try:
448 await RO.delete("sdn", RO_sdn_id)
449 except ROclient.ROClientException as e:
450 if e.http_code == 404: # not found
451 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
452 else:
453 raise
454 else:
455 # nothing to delete
456 self.logger.error(logging_text + "Skipping. There is not RO information at database")
457 self.db.del_one("sdns", {"_id": sdn_id})
458 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
459 return None
460
461 except (ROclient.ROClientException, DbException) as e:
462 self.logger.error(logging_text + "Exit Exception {}".format(e))
463 exc = e
464 except Exception as e:
465 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
466 exc = e
467 finally:
468 if exc and db_sdn:
469 db_sdn["_admin"]["operationalState"] = "ERROR"
470 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
471 self.update_db("sdns", sdn_id, db_sdn)
472
473 def vnfd2RO(self, vnfd, new_id=None):
474 """
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :return: copy of vnfd
479 """
480 ci_file = None
481 try:
482 vnfd_RO = deepcopy(vnfd)
483 vnfd_RO.pop("_id", None)
484 vnfd_RO.pop("_admin", None)
485 if new_id:
486 vnfd_RO["id"] = new_id
487 for vdu in vnfd_RO["vdu"]:
488 if "cloud-init-file" in vdu:
489 base_folder = vnfd["_admin"]["storage"]
490 clout_init_file = "{}/{}/cloud_init/{}".format(
491 base_folder["folder"],
492 base_folder["pkg-dir"],
493 vdu["cloud-init-file"]
494 )
495 ci_file = self.fs.file_open(clout_init_file, "r")
496 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
497 # convert to base 64 or similar
498 clout_init_content = ci_file.read()
499 ci_file.close()
500 ci_file = None
501 vdu.pop("cloud-init-file", None)
502 vdu["cloud-init"] = clout_init_content
503 return vnfd_RO
504 except FsException as e:
505 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
506 finally:
507 if ci_file:
508 ci_file.close()
509
510 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, member_vnf_index,
511 task=None):
512 """
513 Callback both for charm status change and task completion
514 :param model_name: Charm model name
515 :param application_name: Charm application name
516 :param status: Can be
517 - blocked: The unit needs manual intervention
518 - maintenance: The unit is actively deploying/configuring
519 - waiting: The unit is waiting for another charm to be ready
520 - active: The unit is deployed, configured, and ready
521 - error: The charm has failed and needs attention.
522 - terminated: The charm has been destroyed
523 - removing,
524 - removed
525 :param message: detailed message error
526 :param db_nsr: nsr database content
527 :param db_nslcmop: nslcmop database content
528 :param member_vnf_index: NSD member-vnf-index
529 :param task: None for charm status change, or task for completion task callback
530 :return:
531 """
532 nsr_id = None
533 nslcmop_id = None
534 update_nsr = update_nslcmop = False
535 try:
536 nsr_id = db_nsr["_id"]
537 nslcmop_id = db_nslcmop["_id"]
538 nsr_lcm = db_nsr["_admin"]["deployed"]
539 ns_action = db_nslcmop["lcmOperationType"]
540 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
541 member_vnf_index)
542
543 if task:
544 if task.cancelled():
545 self.logger.debug(logging_text + " task Cancelled")
546 # TODO update db_nslcmop
547 return
548
549 if task.done():
550 exc = task.exception()
551 if exc:
552 self.logger.error(logging_text + " task Exception={}".format(exc))
553 if ns_action in ("instantiate", "terminate"):
554 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error"
555 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc)
556 elif ns_action == "action":
557 db_nslcmop["operationState"] = "FAILED"
558 db_nslcmop["detailed-status"] = str(exc)
559 db_nslcmop["statusEnteredTime"] = time()
560 update_nslcmop = True
561 return
562
563 else:
564 self.logger.debug(logging_text + " task Done")
565 # TODO revise with Adam if action is finished and ok when task is done
566 if ns_action == "action":
567 db_nslcmop["operationState"] = "COMPLETED"
568 db_nslcmop["detailed-status"] = "Done"
569 db_nslcmop["statusEnteredTime"] = time()
570 update_nslcmop = True
571 # task is Done, but callback is still ongoing. So ignore
572 return
573 elif status:
574 self.logger.debug(logging_text + " Enter status={}".format(status))
575 if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status:
576 return # same status, ignore
577 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status
578 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message)
579 else:
580 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
581 return
582
583 all_active = True
584 status_map = {}
585 n2vc_error_text = [] # contain text error list. If empty no one is in error status
586 for vnf_index, vca_info in nsr_lcm["VCA"].items():
587 vca_status = vca_info["operational-status"]
588 if vca_status not in status_map:
589 # Initialize it
590 status_map[vca_status] = 0
591 status_map[vca_status] += 1
592
593 if vca_status != "active":
594 all_active = False
595 elif vca_status in ("error", "blocked"):
596 n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status,
597 vca_info["detailed-status"]))
598
599 if all_active:
600 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id,
601 member_vnf_index))
602 db_nsr["config-status"] = "configured"
603 db_nsr["detailed-status"] = "done"
604 db_nslcmop["operationState"] = "COMPLETED"
605 db_nslcmop["detailed-status"] = "Done"
606 db_nslcmop["statusEnteredTime"] = time()
607 elif n2vc_error_text:
608 db_nsr["config-status"] = "failed"
609 error_text = "fail configuring " + ";".join(n2vc_error_text)
610 db_nsr["detailed-status"] = error_text
611 db_nslcmop["operationState"] = "FAILED_TEMP"
612 db_nslcmop["detailed-status"] = error_text
613 db_nslcmop["statusEnteredTime"] = time()
614 else:
615 cs = "configuring: "
616 separator = ""
617 for status, num in status_map.items():
618 cs += separator + "{}: {}".format(status, num)
619 separator = ", "
620 db_nsr["config-status"] = cs
621 db_nsr["detailed-status"] = cs
622 db_nslcmop["detailed-status"] = cs
623 update_nsr = update_nslcmop = True
624
625 except Exception as e:
626 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True)
627 finally:
628 try:
629 if update_nslcmop:
630 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
631 if update_nsr:
632 self.update_db("nsrs", nsr_id, db_nsr)
633 except Exception as e:
634 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
635 member_vnf_index, e), exc_info=True)
636
637 def ns_params_2_RO(self, ns_params):
638 """
639 Creates a RO ns descriptor from OSM ns_instantite params
640 :param ns_params: OSM instantiate params
641 :return: The RO ns descriptor
642 """
643 vim_2_RO = {}
644
645 def vim_account_2_RO(vim_account):
646 if vim_account in vim_2_RO:
647 return vim_2_RO[vim_account]
648 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
649 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
650 # #TODO check if VIM is creating and wait
651 if db_vim["_admin"]["operationalState"] != "ENABLED":
652 raise LcmException("VIM={} is not available. operationalState={}".format(
653 vim_account, db_vim["_admin"]["operationalState"]))
654 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
655 vim_2_RO[vim_account] = RO_vim_id
656 return RO_vim_id
657
658 if not ns_params:
659 return None
660 RO_ns_params = {
661 # "name": ns_params["nsName"],
662 # "description": ns_params.get("nsDescription"),
663 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
664 # "scenario": ns_params["nsdId"],
665 "vnfs": {},
666 "networks": {},
667 }
668 if ns_params.get("ssh-authorized-key"):
669 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
670 if ns_params.get("vnf"):
671 for vnf in ns_params["vnf"]:
672 RO_vnf = {}
673 if "vimAccountId" in vnf:
674 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
675 if RO_vnf:
676 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
677 if ns_params.get("vld"):
678 for vld in ns_params["vld"]:
679 RO_vld = {}
680 if "ip-profile" in vld:
681 RO_vld["ip-profile"] = vld["ip-profile"]
682 if "vim-network-name" in vld:
683 RO_vld["sites"] = []
684 if isinstance(vld["vim-network-name"], dict):
685 for vim_account, vim_net in vld["vim-network-name"].items():
686 RO_vld["sites"].append({
687 "netmap-use": vim_net,
688 "datacenter": vim_account_2_RO(vim_account)
689 })
690 else: # isinstance str
691 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
692 if RO_vld:
693 RO_ns_params["networks"][vld["name"]] = RO_vld
694 return RO_ns_params
695
696 async def ns_instantiate(self, nsr_id, nslcmop_id):
697 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
698 self.logger.debug(logging_text + "Enter")
699 # get all needed from database
700 db_nsr = None
701 db_nslcmop = None
702 db_vnfr = {}
703 exc = None
704 try:
705 step = "Getting nslcmop={} from db".format(nslcmop_id)
706 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
707 step = "Getting nsr={} from db".format(nsr_id)
708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
709 nsd = db_nsr["nsd"]
710 nsr_name = db_nsr["name"] # TODO short-name??
711 needed_vnfd = {}
712 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
713 for c_vnf in nsd["constituent-vnfd"]:
714 vnfd_id = c_vnf["vnfd-id-ref"]
715 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
716 db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
717 if vnfd_id not in needed_vnfd:
718 step = "Getting vnfd={} from db".format(vnfd_id)
719 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
720
721 nsr_lcm = db_nsr["_admin"].get("deployed")
722 if not nsr_lcm:
723 nsr_lcm = db_nsr["_admin"]["deployed"] = {
724 "id": nsr_id,
725 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
726 "nsr_ip": {},
727 "VCA": {},
728 }
729 db_nsr["detailed-status"] = "creating"
730 db_nsr["operational-status"] = "init"
731
732 RO = ROclient.ROClient(self.loop, **self.ro_config)
733
734 # get vnfds, instantiate at RO
735 for vnfd_id, vnfd in needed_vnfd.items():
736 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
737 # self.logger.debug(logging_text + step)
738 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
739
740 # look if present
741 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
742 if vnfd_list:
743 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
744 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
745 vnfd_id, vnfd_list[0]["uuid"]))
746 else:
747 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
748 desc = await RO.create("vnfd", descriptor=vnfd_RO)
749 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
750 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
751 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
752 vnfd_id, desc["uuid"]))
753 self.update_db("nsrs", nsr_id, db_nsr)
754
755 # create nsd at RO
756 nsd_id = nsd["id"]
757 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
758 # self.logger.debug(logging_text + step)
759
760 nsd_id_RO = nsr_id + "." + nsd_id[:200]
761 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
762 if nsd_list:
763 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
764 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
765 nsd_id, nsd_list[0]["uuid"]))
766 else:
767 nsd_RO = deepcopy(nsd)
768 nsd_RO["id"] = nsd_id_RO
769 nsd_RO.pop("_id", None)
770 nsd_RO.pop("_admin", None)
771 for c_vnf in nsd_RO["constituent-vnfd"]:
772 vnfd_id = c_vnf["vnfd-id-ref"]
773 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
774 desc = await RO.create("nsd", descriptor=nsd_RO)
775 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
776 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
777 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, desc["uuid"]))
778 self.update_db("nsrs", nsr_id, db_nsr)
779
780 # Crate ns at RO
781 # if present use it unless in error status
782 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
783 if RO_nsr_id:
784 try:
785 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
786 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
787 desc = await RO.show("ns", RO_nsr_id)
788 except ROclient.ROClientException as e:
789 if e.http_code != HTTPStatus.NOT_FOUND:
790 raise
791 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
792 if RO_nsr_id:
793 ns_status, ns_status_info = RO.check_ns_status(desc)
794 nsr_lcm["RO"]["nsr_status"] = ns_status
795 if ns_status == "ERROR":
796 step = db_nsr["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
797 self.logger.debug(logging_text + step)
798 await RO.delete("ns", RO_nsr_id)
799 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
800 if not RO_nsr_id:
801 step = db_nsr["detailed-status"] = "Creating ns at RO"
802 # self.logger.debug(logging_text + step)
803 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
804 desc = await RO.create("ns", descriptor=RO_ns_params,
805 name=db_nsr["name"],
806 scenario=nsr_lcm["RO"]["nsd_id"])
807 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
808 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
809 nsr_lcm["RO"]["nsr_status"] = "BUILD"
810 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
811 self.update_db("nsrs", nsr_id, db_nsr)
812
813 # update VNFR vimAccount
814 step = "Updating VNFR vimAcccount"
815 for vnf_index, vnfr in db_vnfr.items():
816 if vnfr.get("vim-account-id"):
817 continue
818 vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
819 if db_nsr["instantiate_params"].get("vnf"):
820 for vnf_params in db_nsr["instantiate_params"]["vnf"]:
821 if vnf_params.get("member-vnf-index") == vnf_index:
822 if vnf_params.get("vimAccountId"):
823 vnfr["vim-account-id"] = vnf_params.get("vimAccountId")
824 break
825 self.update_db("vnfrs", vnfr["_id"], vnfr)
826
827 # wait until NS is ready
828 step = ns_status_detailed = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
829 db_nsr["detailed-status"] = ns_status_detailed
830 self.logger.debug(logging_text + step)
831 deployment_timeout = 2 * 3600 # Two hours
832 while deployment_timeout > 0:
833 desc = await RO.show("ns", RO_nsr_id)
834 ns_status, ns_status_info = RO.check_ns_status(desc)
835 nsr_lcm["RO"]["nsr_status"] = ns_status
836 if ns_status == "ERROR":
837 raise ROclient.ROClientException(ns_status_info)
838 elif ns_status == "BUILD":
839 db_nsr_detailed_status_old = db_nsr["detailed-status"]
840 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
841 if db_nsr_detailed_status_old != db_nsr["detailed-status"]:
842 self.update_db("nsrs", nsr_id, db_nsr)
843 elif ns_status == "ACTIVE":
844 step = "Waiting for management IP address from VIM"
845 try:
846 ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
847 break
848 except ROclient.ROClientException as e:
849 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
850 raise e
851 else:
852 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
853 await asyncio.sleep(5, loop=self.loop)
854 deployment_timeout -= 5
855 if deployment_timeout <= 0:
856 raise ROclient.ROClientException("Timeout waiting ns to be ready")
857
858 step = "Updating VNFRs"
859 for vnf_index, vnfr_deployed in ns_RO_info.items():
860 vnfr = db_vnfr[vnf_index]
861 vnfr["ip-address"] = vnfr_deployed.get("ip_address")
862 for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
863 for vdur in vnfr["vdur"]:
864 if vdur["vdu-id-ref"] == vdu_id:
865 vdur["vim-id"] = vdu_deployed.get("vim_id")
866 vdur["ip-address"] = vdu_deployed.get("ip_address")
867 break
868 self.update_db("vnfrs", vnfr["_id"], vnfr)
869
870 db_nsr["detailed-status"] = "Configuring vnfr"
871 self.update_db("nsrs", nsr_id, db_nsr)
872
873 # The parameters we'll need to deploy a charm
874 number_to_configure = 0
875
876 def deploy():
877 """An inner function to deploy the charm from either vnf or vdu
878 """
879
880 # Login to the VCA.
881 # if number_to_configure == 0:
882 # self.logger.debug("Logging into N2VC...")
883 # task = asyncio.ensure_future(self.n2vc.login())
884 # yield from asyncio.wait_for(task, 30.0)
885 # self.logger.debug("Logged into N2VC!")
886
887 # # await self.n2vc.login()
888
889 # Note: The charm needs to exist on disk at the location
890 # specified by charm_path.
891 base_folder = vnfd["_admin"]["storage"]
892 storage_params = self.fs.get_params()
893 charm_path = "{}{}/{}/charms/{}".format(
894 storage_params["path"],
895 base_folder["folder"],
896 base_folder["pkg-dir"],
897 proxy_charm
898 )
899
900 # Setup the runtime parameters for this VNF
901 params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
902
903 # ns_name will be ignored in the current version of N2VC
904 # but will be implemented for the next point release.
905 model_name = 'default'
906 application_name = self.n2vc.FormatApplicationName(
907 nsr_name,
908 vnf_index,
909 vnfd['name'],
910 )
911
912 nsr_lcm["VCA"][vnf_index] = {
913 "model": model_name,
914 "application": application_name,
915 "operational-status": "init",
916 "detailed-status": "",
917 "vnfd_id": vnfd_id,
918 }
919
920 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
921 proxy_charm))
922 task = asyncio.ensure_future(
923 self.n2vc.DeployCharms(
924 model_name, # The network service name
925 application_name, # The application name
926 vnfd, # The vnf descriptor
927 charm_path, # Path to charm
928 params, # Runtime params, like mgmt ip
929 {}, # for native charms only
930 self.n2vc_callback, # Callback for status changes
931 db_nsr, # Callback parameter
932 db_nslcmop,
933 vnf_index, # Callback parameter
934 None, # Callback parameter (task)
935 )
936 )
937 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
938 db_nsr, db_nslcmop, vnf_index))
939 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
940
941 # TODO: Make this call inside deploy()
942 # Login to the VCA. If there are multiple calls to login(),
943 # subsequent calls will be a nop and return immediately.
944 await self.n2vc.login()
945
946 step = "Looking for needed vnfd to configure"
947 self.logger.debug(logging_text + step)
948 for c_vnf in nsd["constituent-vnfd"]:
949 vnfd_id = c_vnf["vnfd-id-ref"]
950 vnf_index = str(c_vnf["member-vnf-index"])
951 vnfd = needed_vnfd[vnfd_id]
952
953 # Check if this VNF has a charm configuration
954 vnf_config = vnfd.get("vnf-configuration")
955
956 if vnf_config and vnf_config.get("juju"):
957 proxy_charm = vnf_config["juju"]["charm"]
958 params = {}
959
960 if proxy_charm:
961 if 'initial-config-primitive' in vnf_config:
962 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
963
964 deploy()
965 number_to_configure += 1
966
967 # Deploy charms for each VDU that supports one.
968 for vdu in vnfd['vdu']:
969 vdu_config = vdu.get('vdu-configuration')
970 proxy_charm = None
971 params = {}
972
973 if vdu_config and vdu_config.get("juju"):
974 proxy_charm = vdu_config["juju"]["charm"]
975
976 if 'initial-config-primitive' in vdu_config:
977 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
978
979 if proxy_charm:
980 deploy()
981 number_to_configure += 1
982
983 if number_to_configure:
984 db_nsr["config-status"] = "configuring"
985 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
986 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
987 else:
988 db_nslcmop["operationState"] = "COMPLETED"
989 db_nslcmop["statusEnteredTime"] = time()
990 db_nslcmop["detailed-status"] = "done"
991 db_nsr["config-status"] = "configured"
992 db_nsr["detailed-status"] = "done"
993 db_nsr["operational-status"] = "running"
994 self.update_db("nsrs", nsr_id, db_nsr)
995 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
996 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
997 return nsr_lcm
998
999 except (ROclient.ROClientException, DbException, LcmException) as e:
1000 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1001 exc = e
1002 except Exception as e:
1003 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1004 exc_info=True)
1005 exc = e
1006 finally:
1007 if exc:
1008 if db_nsr:
1009 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
1010 db_nsr["operational-status"] = "failed"
1011 self.update_db("nsrs", nsr_id, db_nsr)
1012 if db_nslcmop:
1013 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
1014 db_nslcmop["operationState"] = "FAILED"
1015 db_nslcmop["statusEnteredTime"] = time()
1016 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
1017
1018 async def ns_terminate(self, nsr_id, nslcmop_id):
1019 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1020 self.logger.debug(logging_text + "Enter")
1021 db_nsr = None
1022 db_nslcmop = None
1023 exc = None
1024 step = "Getting nsr, nslcmop from db"
1025 failed_detail = [] # annotates all failed error messages
1026 vca_task_list = []
1027 vca_task_dict = {}
1028 try:
1029 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1030 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1031 # nsd = db_nsr["nsd"]
1032 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
1033 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1034 return
1035 # TODO ALF remove
1036 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1037 # #TODO check if VIM is creating and wait
1038 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1039
1040 db_nsr_update = {
1041 "operational-status": "terminating",
1042 "config-status": "terminating",
1043 "detailed-status": "Deleting charms",
1044 }
1045 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1046
1047 try:
1048 self.logger.debug(logging_text + step)
1049 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
1050 if deploy_info and deploy_info.get("application"):
1051 task = asyncio.ensure_future(
1052 self.n2vc.RemoveCharms(
1053 deploy_info['model'],
1054 deploy_info['application'],
1055 # self.n2vc_callback,
1056 # db_nsr,
1057 # db_nslcmop,
1058 # vnf_index,
1059 )
1060 )
1061 vca_task_list.append(task)
1062 vca_task_dict[vnf_index] = task
1063 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1064 # deploy_info['application'], None, db_nsr,
1065 # db_nslcmop, vnf_index))
1066 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1067 except Exception as e:
1068 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1069 # remove from RO
1070
1071 RO = ROclient.ROClient(self.loop, **self.ro_config)
1072 # Delete ns
1073 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1074 if RO_nsr_id:
1075 try:
1076 step = db_nsr["detailed-status"] = "Deleting ns at RO"
1077 self.logger.debug(logging_text + step)
1078 await RO.delete("ns", RO_nsr_id)
1079 nsr_lcm["RO"]["nsr_id"] = None
1080 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1081 except ROclient.ROClientException as e:
1082 if e.http_code == 404: # not found
1083 nsr_lcm["RO"]["nsr_id"] = None
1084 nsr_lcm["RO"]["nsr_status"] = "DELETED"
1085 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1086 elif e.http_code == 409: # conflict
1087 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1088 self.logger.debug(logging_text + failed_detail[-1])
1089 else:
1090 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1091 self.logger.error(logging_text + failed_detail[-1])
1092
1093 # Delete nsd
1094 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1095 if RO_nsd_id:
1096 try:
1097 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1098 await RO.delete("nsd", RO_nsd_id)
1099 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1100 nsr_lcm["RO"]["nsd_id"] = None
1101 except ROclient.ROClientException as e:
1102 if e.http_code == 404: # not found
1103 nsr_lcm["RO"]["nsd_id"] = None
1104 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1105 elif e.http_code == 409: # conflict
1106 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1107 self.logger.debug(logging_text + failed_detail[-1])
1108 else:
1109 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1110 self.logger.error(logging_text + failed_detail[-1])
1111
1112 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1113 if not RO_vnfd_id:
1114 continue
1115 try:
1116 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1117 await RO.delete("vnfd", RO_vnfd_id)
1118 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1119 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1120 except ROclient.ROClientException as e:
1121 if e.http_code == 404: # not found
1122 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1123 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1124 elif e.http_code == 409: # conflict
1125 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1126 self.logger.debug(logging_text + failed_detail[-1])
1127 else:
1128 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1129 self.logger.error(logging_text + failed_detail[-1])
1130
1131 if vca_task_list:
1132 await asyncio.wait(vca_task_list, timeout=300)
1133 for vnf_index, task in vca_task_dict.items():
1134 if task.cancelled():
1135 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1136 elif task.done():
1137 exc = task.exception()
1138 if exc:
1139 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1140 else:
1141 nsr_lcm["VCA"][vnf_index] = None
1142 else: # timeout
1143 # TODO Should it be cancelled?!!
1144 task.cancel()
1145 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1146
1147 if failed_detail:
1148 self.logger.error(logging_text + " ;".join(failed_detail))
1149 db_nsr_update = {
1150 "operational-status": "failed",
1151 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1152 "_admin.deployed": nsr_lcm
1153 }
1154 db_nslcmop_update = {
1155 "detailed-status": "; ".join(failed_detail),
1156 "operationState": "FAILED",
1157 "statusEnteredTime": time()
1158 }
1159 elif db_nslcmop["operationParams"].get("autoremove"):
1160 self.db.del_one("nsrs", {"_id": nsr_id})
1161 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1162 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1163 self.logger.debug(logging_text + "Delete from database")
1164 else:
1165 db_nsr_update = {
1166 "operational-status": "terminated",
1167 "detailed-status": "Done",
1168 "_admin.deployed": nsr_lcm,
1169 "_admin.nsState": "NOT_INSTANTIATED"
1170 }
1171 db_nslcmop_update = {
1172 "detailed-status": "Done",
1173 "operationState": "COMPLETED",
1174 "statusEnteredTime": time()
1175 }
1176 self.logger.debug(logging_text + "Exit")
1177
1178 except (ROclient.ROClientException, DbException) as e:
1179 self.logger.error(logging_text + "Exit Exception {}".format(e))
1180 exc = e
1181 except Exception as e:
1182 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1183 exc = e
1184 finally:
1185 if exc and db_nslcmop:
1186 db_nslcmop_update = {
1187 "detailed-status": "FAILED {}: {}".format(step, exc),
1188 "operationState": "FAILED",
1189 "statusEnteredTime": time(),
1190 }
1191 if db_nslcmop_update:
1192 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1193 if db_nsr_update:
1194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1195
1196 async def ns_action(self, nsr_id, nslcmop_id):
1197 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1198 self.logger.debug(logging_text + "Enter")
1199 # get all needed from database
1200 db_nsr = None
1201 db_nslcmop = None
1202 db_nslcmop_update = None
1203 exc = None
1204 try:
1205 step = "Getting information from database"
1206 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1207 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1208 nsr_lcm = db_nsr["_admin"].get("deployed")
1209 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1210
1211 # TODO check if ns is in a proper status
1212 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1213 if not vca_deployed:
1214 raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index))
1215 model_name = vca_deployed.get("model")
1216 application_name = vca_deployed.get("application")
1217 if not model_name or not application_name:
1218 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index))
1219 if vca_deployed["operational-status"] != "active":
1220 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1221 vnf_index, vca_deployed["operational-status"]))
1222 primitive = db_nslcmop["operationParams"]["primitive"]
1223 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1224 callback = None # self.n2vc_callback
1225 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1226 await self.n2vc.login()
1227 task = asyncio.ensure_future(
1228 self.n2vc.ExecutePrimitive(
1229 model_name,
1230 application_name,
1231 primitive, callback,
1232 *callback_args,
1233 **primitive_params
1234 )
1235 )
1236 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1237 # db_nsr, db_nslcmop, vnf_index))
1238 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1239 # wait until completed with timeout
1240 await asyncio.wait((task,), timeout=300)
1241
1242 result = "FAILED" # by default
1243 result_detail = ""
1244 if task.cancelled():
1245 db_nslcmop["detailed-status"] = "Task has been cancelled"
1246 elif task.done():
1247 exc = task.exception()
1248 if exc:
1249 result_detail = str(exc)
1250 else:
1251 self.logger.debug(logging_text + " task Done")
1252 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1253 result = "COMPLETED"
1254 result_detail = "Done"
1255 else: # timeout
1256 # TODO Should it be cancelled?!!
1257 task.cancel()
1258 result_detail = "timeout"
1259
1260 db_nslcmop_update = {
1261 "detailed-status": result_detail,
1262 "operationState": result,
1263 "statusEnteredTime": time()
1264 }
1265 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1266 return # database update is called inside finally
1267
1268 except (DbException, LcmException) as e:
1269 self.logger.error(logging_text + "Exit Exception {}".format(e))
1270 exc = e
1271 except Exception as e:
1272 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1273 exc = e
1274 finally:
1275 if exc and db_nslcmop:
1276 db_nslcmop_update = {
1277 "detailed-status": "FAILED {}: {}".format(step, exc),
1278 "operationState": "FAILED",
1279 "statusEnteredTime": time(),
1280 }
1281 if db_nslcmop_update:
1282 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1283
1284 async def test(self, param=None):
1285 self.logger.debug("Starting/Ending test task: {}".format(param))
1286
1287 def cancel_tasks(self, topic, _id):
1288 """
1289 Cancel all active tasks of a concrete nsr or vim identified for _id
1290 :param topic: can be ns or vim_account
1291 :param _id: nsr or vim identity
1292 :return: None, or raises an exception if not possible
1293 """
1294 if topic == "ns":
1295 lcm_tasks = self.lcm_ns_tasks
1296 elif topic == "vim_account":
1297 lcm_tasks = self.lcm_vim_tasks
1298 elif topic == "sdn":
1299 lcm_tasks = self.lcm_sdn_tasks
1300
1301 if not lcm_tasks.get(_id):
1302 return
1303 for order_id, tasks_set in lcm_tasks[_id].items():
1304 for task_name, task in tasks_set.items():
1305 result = task.cancel()
1306 if result:
1307 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1308 lcm_tasks[_id] = {}
1309
1310 async def kafka_ping(self):
1311 self.logger.debug("Task kafka_ping Enter")
1312 consecutive_errors = 0
1313 first_start = True
1314 kafka_has_received = False
1315 self.pings_not_received = 1
1316 while True:
1317 try:
1318 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1319 # time between pings are low when it is not received and at starting
1320 wait_time = 5 if not kafka_has_received else 120
1321 if not self.pings_not_received:
1322 kafka_has_received = True
1323 self.pings_not_received += 1
1324 await asyncio.sleep(wait_time, loop=self.loop)
1325 if self.pings_not_received > 10:
1326 raise LcmException("It is not receiving pings from Kafka bus")
1327 consecutive_errors = 0
1328 first_start = False
1329 except LcmException:
1330 raise
1331 except Exception as e:
1332 # if not first_start is the first time after starting. So leave more time and wait
1333 # to allow kafka starts
1334 if consecutive_errors == 8 if not first_start else 30:
1335 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1336 raise
1337 consecutive_errors += 1
1338 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1339 wait_time = 1 if not first_start else 5
1340 await asyncio.sleep(wait_time, loop=self.loop)
1341
1342 async def kafka_read(self):
1343 self.logger.debug("Task kafka_read Enter")
1344 order_id = 1
1345 # future = asyncio.Future()
1346 consecutive_errors = 0
1347 first_start = True
1348 while consecutive_errors < 10:
1349 try:
1350 topics = ("admin", "ns", "vim_account", "sdn")
1351 topic, command, params = await self.msg.aioread(topics, self.loop)
1352 if topic != "admin" and command != "ping":
1353 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1354 consecutive_errors = 0
1355 first_start = False
1356 order_id += 1
1357 if command == "exit":
1358 print("Bye!")
1359 break
1360 elif command.startswith("#"):
1361 continue
1362 elif command == "echo":
1363 # just for test
1364 print(params)
1365 sys.stdout.flush()
1366 continue
1367 elif command == "test":
1368 asyncio.Task(self.test(params), loop=self.loop)
1369 continue
1370
1371 if topic == "admin":
1372 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1373 self.pings_not_received = 0
1374 continue
1375 elif topic == "ns":
1376 if command == "instantiate":
1377 # self.logger.debug("Deploying NS {}".format(nsr_id))
1378 nslcmop = params
1379 nslcmop_id = nslcmop["_id"]
1380 nsr_id = nslcmop["nsInstanceId"]
1381 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1382 if nsr_id not in self.lcm_ns_tasks:
1383 self.lcm_ns_tasks[nsr_id] = {}
1384 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1385 continue
1386 elif command == "terminate":
1387 # self.logger.debug("Deleting NS {}".format(nsr_id))
1388 nslcmop = params
1389 nslcmop_id = nslcmop["_id"]
1390 nsr_id = nslcmop["nsInstanceId"]
1391 self.cancel_tasks(topic, nsr_id)
1392 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1393 if nsr_id not in self.lcm_ns_tasks:
1394 self.lcm_ns_tasks[nsr_id] = {}
1395 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1396 continue
1397 elif command == "action":
1398 # self.logger.debug("Update NS {}".format(nsr_id))
1399 nslcmop = params
1400 nslcmop_id = nslcmop["_id"]
1401 nsr_id = nslcmop["nsInstanceId"]
1402 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1403 if nsr_id not in self.lcm_ns_tasks:
1404 self.lcm_ns_tasks[nsr_id] = {}
1405 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1406 continue
1407 elif command == "show":
1408 try:
1409 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1410 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1411 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1412 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
1413 db_nsr["detailed-status"],
1414 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1415 except Exception as e:
1416 print("nsr {} not found: {}".format(nsr_id, e))
1417 sys.stdout.flush()
1418 continue
1419 elif command == "deleted":
1420 continue # TODO cleaning of task just in case should be done
1421 elif topic == "vim_account":
1422 vim_id = params["_id"]
1423 if command == "create":
1424 task = asyncio.ensure_future(self.vim_create(params, order_id))
1425 if vim_id not in self.lcm_vim_tasks:
1426 self.lcm_vim_tasks[vim_id] = {}
1427 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1428 continue
1429 elif command == "delete":
1430 self.cancel_tasks(topic, vim_id)
1431 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1432 if vim_id not in self.lcm_vim_tasks:
1433 self.lcm_vim_tasks[vim_id] = {}
1434 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1435 continue
1436 elif command == "show":
1437 print("not implemented show with vim_account")
1438 sys.stdout.flush()
1439 continue
1440 elif command == "edit":
1441 task = asyncio.ensure_future(self.vim_edit(params, order_id))
1442 if vim_id not in self.lcm_vim_tasks:
1443 self.lcm_vim_tasks[vim_id] = {}
1444 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1445 continue
1446 elif topic == "sdn":
1447 _sdn_id = params["_id"]
1448 if command == "create":
1449 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1450 if _sdn_id not in self.lcm_sdn_tasks:
1451 self.lcm_sdn_tasks[_sdn_id] = {}
1452 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1453 continue
1454 elif command == "delete":
1455 self.cancel_tasks(topic, _sdn_id)
1456 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1457 if _sdn_id not in self.lcm_sdn_tasks:
1458 self.lcm_sdn_tasks[_sdn_id] = {}
1459 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1460 continue
1461 elif command == "edit":
1462 task = asyncio.ensure_future(self.sdn_edit(params, order_id))
1463 if _sdn_id not in self.lcm_sdn_tasks:
1464 self.lcm_sdn_tasks[_sdn_id] = {}
1465 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1466 continue
1467 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1468 except Exception as e:
1469 # if not first_start is the first time after starting. So leave more time and wait
1470 # to allow kafka starts
1471 if consecutive_errors == 8 if not first_start else 30:
1472 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1473 raise
1474 consecutive_errors += 1
1475 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1476 wait_time = 2 if not first_start else 5
1477 await asyncio.sleep(wait_time, loop=self.loop)
1478
1479 # self.logger.debug("Task kafka_read terminating")
1480 self.logger.debug("Task kafka_read exit")
1481
1482 def start(self):
1483 self.loop = asyncio.get_event_loop()
1484 self.loop.run_until_complete(asyncio.gather(
1485 self.kafka_read(),
1486 self.kafka_ping()
1487 ))
1488 # TODO
1489 # self.logger.debug("Terminating cancelling creation tasks")
1490 # self.cancel_tasks("ALL", "create")
1491 # timeout = 200
1492 # while self.is_pending_tasks():
1493 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1494 # await asyncio.sleep(2, loop=self.loop)
1495 # timeout -= 2
1496 # if not timeout:
1497 # self.cancel_tasks("ALL", "ALL")
1498 self.loop.close()
1499 self.loop = None
1500 if self.db:
1501 self.db.db_disconnect()
1502 if self.msg:
1503 self.msg.disconnect()
1504 if self.fs:
1505 self.fs.fs_disconnect()
1506
1507 def read_config_file(self, config_file):
1508 # TODO make a [ini] + yaml inside parser
1509 # the configparser library is not suitable, because it does not admit comments at the end of line,
1510 # and not parse integer or boolean
1511 try:
1512 with open(config_file) as f:
1513 conf = yaml.load(f)
1514 for k, v in environ.items():
1515 if not k.startswith("OSMLCM_"):
1516 continue
1517 k_items = k.lower().split("_")
1518 c = conf
1519 try:
1520 for k_item in k_items[1:-1]:
1521 if k_item in ("ro", "vca"):
1522 # put in capital letter
1523 k_item = k_item.upper()
1524 c = c[k_item]
1525 if k_items[-1] == "port":
1526 c[k_items[-1]] = int(v)
1527 else:
1528 c[k_items[-1]] = v
1529 except Exception as e:
1530 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1531
1532 return conf
1533 except Exception as e:
1534 self.logger.critical("At config file '{}': {}".format(config_file, e))
1535 exit(1)
1536
1537
1538 def usage():
1539 print("""Usage: {} [options]
1540 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
1541 -h|--help: shows this help
1542 """.format(sys.argv[0]))
1543 # --log-socket-host HOST: send logs to this host")
1544 # --log-socket-port PORT: send logs using this port (default: 9022)")
1545
1546
1547 if __name__ == '__main__':
1548 try:
1549 # load parameters and configuration
1550 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
1551 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
1552 config_file = None
1553 for o, a in opts:
1554 if o in ("-h", "--help"):
1555 usage()
1556 sys.exit()
1557 elif o in ("-c", "--config"):
1558 config_file = a
1559 # elif o == "--log-socket-port":
1560 # log_socket_port = a
1561 # elif o == "--log-socket-host":
1562 # log_socket_host = a
1563 # elif o == "--log-file":
1564 # log_file = a
1565 else:
1566 assert False, "Unhandled option"
1567 if config_file:
1568 if not path.isfile(config_file):
1569 print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
1570 exit(1)
1571 else:
1572 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
1573 if path.isfile(config_file):
1574 break
1575 else:
1576 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
1577 exit(1)
1578 lcm = Lcm(config_file)
1579 lcm.start()
1580 except getopt.GetoptError as e:
1581 print(str(e), file=sys.stderr)
1582 # usage()
1583 exit(1)