Allow ns-creation params: vim-network, multisite deployement
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26 from time import time
27
28
29 class LcmException(Exception):
30 pass
31
32
33 class Lcm:
34
35 def __init__(self, config_file):
36 """
37 Init, Connect to database, filesystem storage, and messaging
38 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
39 :return: None
40 """
41
42 self.db = None
43 self.msg = None
44 self.fs = None
45 self.pings_not_received = 1
46
47 # contains created tasks/futures to be able to cancel
48 self.lcm_ns_tasks = {}
49 self.lcm_vim_tasks = {}
50 self.lcm_sdn_tasks = {}
51 # logging
52 self.logger = logging.getLogger('lcm')
53 # load configuration
54 config = self.read_config_file(config_file)
55 self.config = config
56 self.ro_config={
57 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
58 "tenant": config.get("tenant", "osm"),
59 "logger_name": "lcm.ROclient",
60 "loglevel": "ERROR",
61 }
62
63 self.vca = config["VCA"] # TODO VCA
64 self.loop = None
65
66 # logging
67 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
68 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
69 config["database"]["logger_name"] = "lcm.db"
70 config["storage"]["logger_name"] = "lcm.fs"
71 config["message"]["logger_name"] = "lcm.msg"
72 if "logfile" in config["global"]:
73 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
74 maxBytes=100e6, backupCount=9, delay=0)
75 file_handler.setFormatter(log_formatter_simple)
76 self.logger.addHandler(file_handler)
77 else:
78 str_handler = logging.StreamHandler()
79 str_handler.setFormatter(log_formatter_simple)
80 self.logger.addHandler(str_handler)
81
82 if config["global"].get("loglevel"):
83 self.logger.setLevel(config["global"]["loglevel"])
84
85 # logging other modules
86 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
87 config[k1]["logger_name"] = logname
88 logger_module = logging.getLogger(logname)
89 if "logfile" in config[k1]:
90 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
91 maxBytes=100e6, backupCount=9, delay=0)
92 file_handler.setFormatter(log_formatter_simple)
93 logger_module.addHandler(file_handler)
94 if "loglevel" in config[k1]:
95 logger_module.setLevel(config[k1]["loglevel"])
96
97 self.n2vc = N2VC(
98 log=self.logger,
99 server=config['VCA']['host'],
100 port=config['VCA']['port'],
101 user=config['VCA']['user'],
102 secret=config['VCA']['secret'],
103 # TODO: This should point to the base folder where charms are stored,
104 # if there is a common one (like object storage). Otherwise, leave
105 # it unset and pass it via DeployCharms
106 # artifacts=config['VCA'][''],
107 artifacts=None,
108 )
109
110 try:
111 if config["database"]["driver"] == "mongo":
112 self.db = dbmongo.DbMongo()
113 self.db.db_connect(config["database"])
114 elif config["database"]["driver"] == "memory":
115 self.db = dbmemory.DbMemory()
116 self.db.db_connect(config["database"])
117 else:
118 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
119 config["database"]["driver"]))
120
121 if config["storage"]["driver"] == "local":
122 self.fs = fslocal.FsLocal()
123 self.fs.fs_connect(config["storage"])
124 else:
125 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
126 config["storage"]["driver"]))
127
128 if config["message"]["driver"] == "local":
129 self.msg = msglocal.MsgLocal()
130 self.msg.connect(config["message"])
131 elif config["message"]["driver"] == "kafka":
132 self.msg = msgkafka.MsgKafka()
133 self.msg.connect(config["message"])
134 else:
135 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
136 config["storage"]["driver"]))
137 except (DbException, FsException, MsgException) as e:
138 self.logger.critical(str(e), exc_info=True)
139 raise LcmException(str(e))
140
141 def update_db(self, item, _id, _desc):
142 try:
143 self.db.replace(item, _id, _desc)
144 except DbException as e:
145 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
146
147 def update_db_2(self, item, _id, _desc):
148 try:
149 self.db.set_one(item, {"_id": _id}, _desc)
150 except DbException as e:
151 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
152
153 async def vim_create(self, vim_content, order_id):
154 vim_id = vim_content["_id"]
155 logging_text = "Task vim_create={} ".format(vim_id)
156 self.logger.debug(logging_text + "Enter")
157 db_vim = None
158 exc = None
159 try:
160 step = "Getting vim from db"
161 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
162 if "_admin" not in db_vim:
163 db_vim["_admin"] = {}
164 if "deployed" not in db_vim["_admin"]:
165 db_vim["_admin"]["deployed"] = {}
166 db_vim["_admin"]["deployed"]["RO"] = None
167
168 step = "Creating vim at RO"
169 RO = ROclient.ROClient(self.loop, **self.ro_config)
170 vim_RO = deepcopy(vim_content)
171 vim_RO.pop("_id", None)
172 vim_RO.pop("_admin", None)
173 vim_RO.pop("schema_version", None)
174 vim_RO.pop("schema_type", None)
175 vim_RO.pop("vim_tenant_name", None)
176 vim_RO["type"] = vim_RO.pop("vim_type")
177 vim_RO.pop("vim_user", None)
178 vim_RO.pop("vim_password", None)
179 desc = await RO.create("vim", descriptor=vim_RO)
180 RO_vim_id = desc["uuid"]
181 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
182 self.update_db("vim_accounts", vim_id, db_vim)
183
184 step = "Attach vim to RO tenant"
185 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
186 "vim_username": vim_content["vim_user"],
187 "vim_password": vim_content["vim_password"],
188 "config": vim_content["config"]
189 }
190 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
191 db_vim["_admin"]["operationalState"] = "ENABLED"
192 self.update_db("vim_accounts", vim_id, db_vim)
193
194 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
195 return RO_vim_id
196
197 except (ROclient.ROClientException, DbException) as e:
198 self.logger.error(logging_text + "Exit Exception {}".format(e))
199 exc = e
200 except Exception as e:
201 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
202 exc = e
203 finally:
204 if exc and db_vim:
205 db_vim["_admin"]["operationalState"] = "ERROR"
206 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
207 self.update_db("vim_accounts", vim_id, db_vim)
208
209 async def vim_edit(self, vim_content, order_id):
210 vim_id = vim_content["_id"]
211 logging_text = "Task vim_edit={} ".format(vim_id)
212 self.logger.debug(logging_text + "Enter")
213 db_vim = None
214 exc = None
215 step = "Getting vim from db"
216 try:
217 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
218 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
219 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
220 step = "Editing vim at RO"
221 RO = ROclient.ROClient(self.loop, **self.ro_config)
222 vim_RO = deepcopy(vim_content)
223 vim_RO.pop("_id", None)
224 vim_RO.pop("_admin", None)
225 vim_RO.pop("schema_version", None)
226 vim_RO.pop("schema_type", None)
227 vim_RO.pop("vim_tenant_name", None)
228 vim_RO["type"] = vim_RO.pop("vim_type")
229 vim_RO.pop("vim_user", None)
230 vim_RO.pop("vim_password", None)
231 if vim_RO:
232 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
233
234 step = "Editing vim-account at RO tenant"
235 vim_RO = {}
236 for k in ("vim_tenant_name", "vim_password", "config"):
237 if k in vim_content:
238 vim_RO[k] = vim_content[k]
239 if "vim_user" in vim_content:
240 vim_content["vim_username"] = vim_content["vim_user"]
241 if vim_RO:
242 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
243 db_vim["_admin"]["operationalState"] = "ENABLED"
244 self.update_db("vim_accounts", vim_id, db_vim)
245
246 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
247 return RO_vim_id
248
249 except (ROclient.ROClientException, DbException) as e:
250 self.logger.error(logging_text + "Exit Exception {}".format(e))
251 exc = e
252 except Exception as e:
253 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
254 exc = e
255 finally:
256 if exc and db_vim:
257 db_vim["_admin"]["operationalState"] = "ERROR"
258 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
259 self.update_db("vim_accounts", vim_id, db_vim)
260
261 async def vim_delete(self, vim_id, order_id):
262 logging_text = "Task vim_delete={} ".format(vim_id)
263 self.logger.debug(logging_text + "Enter")
264 db_vim = None
265 exc = None
266 step = "Getting vim from db"
267 try:
268 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
269 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
270 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
271 RO = ROclient.ROClient(self.loop, **self.ro_config)
272 step = "Detaching vim from RO tenant"
273 try:
274 await RO.detach_datacenter(RO_vim_id)
275 except ROclient.ROClientException as e:
276 if e.http_code == 404: # not found
277 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
278 else:
279 raise
280
281 step = "Deleting vim from RO"
282 try:
283 await RO.delete("vim", RO_vim_id)
284 except ROclient.ROClientException as e:
285 if e.http_code == 404: # not found
286 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
287 else:
288 raise
289 else:
290 # nothing to delete
291 self.logger.error(logging_text + "Skipping. There is not RO information at database")
292 self.db.del_one("vim_accounts", {"_id": vim_id})
293 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
294 return None
295
296 except (ROclient.ROClientException, DbException) as e:
297 self.logger.error(logging_text + "Exit Exception {}".format(e))
298 exc = e
299 except Exception as e:
300 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
301 exc = e
302 finally:
303 if exc and db_vim:
304 db_vim["_admin"]["operationalState"] = "ERROR"
305 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
306 self.update_db("vim_accounts", vim_id, db_vim)
307
308 async def sdn_create(self, sdn_content, order_id):
309 sdn_id = sdn_content["_id"]
310 logging_text = "Task sdn_create={} ".format(sdn_id)
311 self.logger.debug(logging_text + "Enter")
312 db_sdn = None
313 exc = None
314 try:
315 step = "Getting sdn from db"
316 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
317 if "_admin" not in db_sdn:
318 db_sdn["_admin"] = {}
319 if "deployed" not in db_sdn["_admin"]:
320 db_sdn["_admin"]["deployed"] = {}
321 db_sdn["_admin"]["deployed"]["RO"] = None
322
323 step = "Creating sdn at RO"
324 RO = ROclient.ROClient(self.loop, **self.ro_config)
325 sdn_RO = deepcopy(sdn_content)
326 sdn_RO.pop("_id", None)
327 sdn_RO.pop("_admin", None)
328 sdn_RO.pop("schema_version", None)
329 sdn_RO.pop("schema_type", None)
330 desc = await RO.create("sdn", descriptor=sdn_RO)
331 RO_sdn_id = desc["uuid"]
332 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
333 db_sdn["_admin"]["operationalState"] = "ENABLED"
334 self.update_db("sdns", sdn_id, db_sdn)
335 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
336 return RO_sdn_id
337
338 except (ROclient.ROClientException, DbException) as e:
339 self.logger.error(logging_text + "Exit Exception {}".format(e))
340 exc = e
341 except Exception as e:
342 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
343 exc = e
344 finally:
345 if exc and db_sdn:
346 db_sdn["_admin"]["operationalState"] = "ERROR"
347 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
348 self.update_db("sdns", sdn_id, db_sdn)
349
350 async def sdn_edit(self, sdn_content, order_id):
351 sdn_id = sdn_content["_id"]
352 logging_text = "Task sdn_edit={} ".format(sdn_id)
353 self.logger.debug(logging_text + "Enter")
354 db_sdn = None
355 exc = None
356 step = "Getting sdn from db"
357 try:
358 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
359 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
360 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
361 RO = ROclient.ROClient(self.loop, **self.ro_config)
362 step = "Editing sdn at RO"
363 sdn_RO = deepcopy(sdn_content)
364 sdn_RO.pop("_id", None)
365 sdn_RO.pop("_admin", None)
366 sdn_RO.pop("schema_version", None)
367 sdn_RO.pop("schema_type", None)
368 if sdn_RO:
369 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
370 db_sdn["_admin"]["operationalState"] = "ENABLED"
371 self.update_db("sdns", sdn_id, db_sdn)
372
373 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
374 return RO_sdn_id
375
376 except (ROclient.ROClientException, DbException) as e:
377 self.logger.error(logging_text + "Exit Exception {}".format(e))
378 exc = e
379 except Exception as e:
380 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
381 exc = e
382 finally:
383 if exc and db_sdn:
384 db_sdn["_admin"]["operationalState"] = "ERROR"
385 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
386 self.update_db("sdns", sdn_id, db_sdn)
387
388 async def sdn_delete(self, sdn_id, order_id):
389 logging_text = "Task sdn_delete={} ".format(sdn_id)
390 self.logger.debug(logging_text + "Enter")
391 db_sdn = None
392 exc = None
393 step = "Getting sdn from db"
394 try:
395 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
396 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
397 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
398 RO = ROclient.ROClient(self.loop, **self.ro_config)
399 step = "Deleting sdn from RO"
400 try:
401 await RO.delete("sdn", RO_sdn_id)
402 except ROclient.ROClientException as e:
403 if e.http_code == 404: # not found
404 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
405 else:
406 raise
407 else:
408 # nothing to delete
409 self.logger.error(logging_text + "Skipping. There is not RO information at database")
410 self.db.del_one("sdns", {"_id": sdn_id})
411 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
412 return None
413
414 except (ROclient.ROClientException, DbException) as e:
415 self.logger.error(logging_text + "Exit Exception {}".format(e))
416 exc = e
417 except Exception as e:
418 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
419 exc = e
420 finally:
421 if exc and db_sdn:
422 db_sdn["_admin"]["operationalState"] = "ERROR"
423 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
424 self.update_db("sdns", sdn_id, db_sdn)
425
426 def vnfd2RO(self, vnfd, new_id=None):
427 """
428 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
429 :param vnfd: input vnfd
430 :param new_id: overrides vnf id if provided
431 :return: copy of vnfd
432 """
433 ci_file = None
434 try:
435 vnfd_RO = deepcopy(vnfd)
436 vnfd_RO.pop("_id", None)
437 vnfd_RO.pop("_admin", None)
438 if new_id:
439 vnfd_RO["id"] = new_id
440 for vdu in vnfd_RO["vdu"]:
441 if "cloud-init-file" in vdu:
442 base_folder = vnfd["_admin"]["storage"]
443 clout_init_file = "{}/{}/cloud_init/{}".format(
444 base_folder["folder"],
445 base_folder["pkg-dir"],
446 vdu["cloud-init-file"]
447 )
448 ci_file = self.fs.file_open(clout_init_file, "r")
449 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
450 clout_init_content = ci_file.read()
451 ci_file.close()
452 ci_file = None
453 vdu.pop("cloud-init-file", None)
454 vdu["cloud-init"] = clout_init_content
455 return vnfd_RO
456 except FsException as e:
457 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
458 finally:
459 if ci_file:
460 ci_file.close()
461
462 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, db_nslcmop, vnf_member_index, task=None):
463 """Update the lcm database with the status of the charm.
464
465 Updates the VNF's operational status with the state of the charm:
466 - blocked: The unit needs manual intervention
467 - maintenance: The unit is actively deploying/configuring
468 - waiting: The unit is waiting for another charm to be ready
469 - active: The unit is deployed, configured, and ready
470 - error: The charm has failed and needs attention.
471 - terminated: The charm has been destroyed
472 - removing,
473 - removed
474
475 Updates the network service's config-status to reflect the state of all
476 charms.
477 """
478 nsr_id = None
479 nslcmop_id = None
480 update_nsr = update_nslcmop = False
481 try:
482 nsr_id = db_nsr["_id"]
483 nslcmop_id = db_nslcmop["_id"]
484 nsr_lcm = db_nsr["_admin"]["deployed"]
485 ns_action = db_nslcmop["lcmOperationType"]
486 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
487 vnf_member_index)
488
489 if task:
490 if task.cancelled():
491 self.logger.debug(logging_text + " task Cancelled")
492 # TODO update db_nslcmop
493 return
494
495 if task.done():
496 exc = task.exception()
497 if exc:
498 self.logger.error(logging_text + " task Exception={}".format(exc))
499 if ns_action in ("instantiate", "terminate"):
500 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
501 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
502 elif ns_action == "action":
503 db_nslcmop["operationState"] = "FAILED"
504 db_nslcmop["detailedStatus"] = str(exc)
505 db_nslcmop["statusEnteredTime"] = time()
506 update_nslcmop = True
507 return
508
509 else:
510 self.logger.debug(logging_text + " task Done")
511 # TODO revise with Adam if action is finished and ok when task is done
512 if ns_action == "action":
513 db_nslcmop["operationState"] = "COMPLETED"
514 db_nslcmop["detailedStatus"] = "Done"
515 db_nslcmop["statusEnteredTime"] = time()
516 update_nslcmop = True
517 # task is Done, but callback is still ongoing. So ignore
518 return
519 elif workload_status:
520 self.logger.debug(logging_text + " Enter workload_status={}".format(workload_status))
521 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
522 return # same status, ignore
523 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
524 # TODO N2VC some error message in case of error should be obtained from N2VC
525 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
526 else:
527 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
528 return
529
530 some_failed = False
531 all_active = True
532 status_map = {}
533 for vnf_index, vca_info in nsr_lcm["VCA"].items():
534 vca_status = vca_info["operational-status"]
535 if vca_status not in status_map:
536 # Initialize it
537 status_map[vca_status] = 0
538 status_map[vca_status] += 1
539
540 if vca_status != "active":
541 all_active = False
542 if vca_status == "error":
543 some_failed = True
544 db_nsr["config-status"] = "failed"
545 error_text = "fail configuring vnf_index={} {}".format(vnf_member_index,
546 vca_info["detailed-status"])
547 db_nsr["detailed-status"] = error_text
548 db_nslcmop["operationState"] = "FAILED_TEMP"
549 db_nslcmop["detailedStatus"] = error_text
550 db_nslcmop["statusEnteredTime"] = time()
551 break
552
553 if all_active:
554 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index))
555 db_nsr["config-status"] = "configured"
556 db_nsr["detailed-status"] = "done"
557 db_nslcmop["operationState"] = "COMPLETED"
558 db_nslcmop["detailedStatus"] = "Done"
559 db_nslcmop["statusEnteredTime"] = time()
560 elif some_failed:
561 pass
562 else:
563 cs = "configuring: "
564 separator = ""
565 for status, num in status_map.items():
566 cs += separator + "{}: {}".format(status, num)
567 separator = ", "
568 db_nsr["config-status"] = cs
569 db_nslcmop["detailedStatus"] = cs
570 update_nsr = update_nslcmop = True
571
572 except Exception as e:
573 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True)
574 finally:
575 try:
576 if update_nslcmop:
577 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
578 if update_nsr:
579 self.update_db("nsrs", nsr_id, db_nsr)
580 except Exception as e:
581 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
582 vnf_member_index, e), exc_info=True)
583
584 def ns_params_2_RO(self, ns_params):
585 """
586 Creates a RO ns descriptor from OSM ns_instantite params
587 :param ns_params: OSM instantiate params
588 :return: The RO ns descriptor
589 """
590 vim_2_RO = {}
591 def vim_account_2_RO(vim_account):
592 if vim_account in vim_2_RO:
593 return vim_2_RO[vim_account]
594 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
595 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
596 # #TODO check if VIM is creating and wait
597 if db_vim["_admin"]["operationalState"] != "ENABLED":
598 raise LcmException("VIM={} is not available. operationalSstatus={}".format(
599 vim_account, db_vim["_admin"]["operationalState"]))
600 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
601 vim_2_RO[vim_account] = RO_vim_id
602 return RO_vim_id
603
604 if not ns_params:
605 return None
606 RO_ns_params = {
607 # "name": ns_params["nsName"],
608 # "description": ns_params.get("nsDescription"),
609 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
610 # "scenario": ns_params["nsdId"],
611 "vnfs": {},
612 "networks": {},
613 }
614 if ns_params.get("ssh-authorized-key"):
615 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
616 if ns_params.get("vnf"):
617 for vnf in ns_params["vnf"]:
618 RO_vnf = {}
619 if "vimAccountId" in vnf:
620 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
621 if RO_vnf:
622 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
623 if ns_params.get("vld"):
624 for vld in ns_params["vld"]:
625 RO_vld = {}
626 if "ip-profile" in vld:
627 RO_vld["ip-profile"] = vld["ip-profile"]
628 if "vim-network-name" in vld:
629 RO_vld["sites"] = []
630 if isinstance(vld["vim-network-name"], dict):
631 for vim_account, vim_net in vld["vim-network-name"].items():
632 RO_vld["sites"].append({
633 "netmap-use": vim_net,
634 "datacenter": vim_account_2_RO(vim_account)
635 })
636 else: #isinstance str
637 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
638 if RO_vld:
639 RO_ns_params["networks"][vld["name"]] = RO_vld
640 return RO_ns_params
641
642 async def ns_instantiate(self, nsr_id, nslcmop_id):
643 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
644 self.logger.debug(logging_text + "Enter")
645 # get all needed from database
646 db_nsr = None
647 db_nslcmop = None
648 exc = None
649 step = "Getting nsr, nslcmop, RO_vims from db"
650 try:
651 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
652 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
653 nsd = db_nsr["nsd"]
654 nsr_name = db_nsr["name"] # TODO short-name??
655
656 needed_vnfd = {}
657 for c_vnf in nsd["constituent-vnfd"]:
658 vnfd_id = c_vnf["vnfd-id-ref"]
659 if vnfd_id not in needed_vnfd:
660 step = "Getting vnfd={} from db".format(vnfd_id)
661 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
662
663 nsr_lcm = db_nsr["_admin"].get("deployed")
664 if not nsr_lcm:
665 nsr_lcm = db_nsr["_admin"]["deployed"] = {
666 "id": nsr_id,
667 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
668 "nsr_ip": {},
669 "VCA": {},
670 }
671 db_nsr["detailed-status"] = "creating"
672 db_nsr["operational-status"] = "init"
673
674 deloyment_timeout = 120
675
676 RO = ROclient.ROClient(self.loop, **self.ro_config)
677
678 # get vnfds, instantiate at RO
679 for vnfd_id, vnfd in needed_vnfd.items():
680 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
681 self.logger.debug(logging_text + step)
682 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
683
684 # look if present
685 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
686 if vnfd_list:
687 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
688 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
689 vnfd_id, vnfd_list[0]["uuid"]))
690 else:
691 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
692 desc = await RO.create("vnfd", descriptor=vnfd_RO)
693 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
694 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
695 self.update_db("nsrs", nsr_id, db_nsr)
696
697 # create nsd at RO
698 nsd_id = nsd["id"]
699 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
700 self.logger.debug(logging_text + step)
701
702 nsd_id_RO = nsd_id + "." + nsd_id[:200]
703 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
704 if nsd_list:
705 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
706 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
707 nsd_id, nsd_list[0]["uuid"]))
708 else:
709 nsd_RO = deepcopy(nsd)
710 nsd_RO["id"] = nsd_id_RO
711 nsd_RO.pop("_id", None)
712 nsd_RO.pop("_admin", None)
713 for c_vnf in nsd_RO["constituent-vnfd"]:
714 vnfd_id = c_vnf["vnfd-id-ref"]
715 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
716 desc = await RO.create("nsd", descriptor=nsd_RO)
717 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
718 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
719 self.update_db("nsrs", nsr_id, db_nsr)
720
721 # Crate ns at RO
722 # if present use it unless in error status
723 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
724 if RO_nsr_id:
725 try:
726 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
727 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
728 desc = await RO.show("ns", RO_nsr_id)
729 except ROclient.ROClientException as e:
730 if e.http_code != HTTPStatus.NOT_FOUND:
731 raise
732 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
733 if RO_nsr_id:
734 ns_status, ns_status_info = RO.check_ns_status(desc)
735 nsr_lcm["RO"]["nsr_status"] = ns_status
736 if ns_status == "ERROR":
737 step = db_nsr["detailed-status"] = "Deleting ns at RO"
738 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
739 await RO.delete("ns", RO_nsr_id)
740 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
741
742 if not RO_nsr_id:
743 step = db_nsr["detailed-status"] = "Creating ns at RO"
744 self.logger.debug(logging_text + step)
745 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
746 desc = await RO.create("ns", descriptor=RO_ns_params,
747 name=db_nsr["name"],
748 scenario=nsr_lcm["RO"]["nsd_id"])
749 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
750 db_nsr["_admin"]["nsState"] = "INSTANTIATED"
751 nsr_lcm["RO"]["nsr_status"] = "BUILD"
752 self.update_db("nsrs", nsr_id, db_nsr)
753
754 # wait until NS is ready
755 step = ns_status_detailed = "Waiting ns ready at RO"
756 db_nsr["detailed-status"] = ns_status_detailed
757 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
758 deloyment_timeout = 600
759 while deloyment_timeout > 0:
760 desc = await RO.show("ns", RO_nsr_id)
761 ns_status, ns_status_info = RO.check_ns_status(desc)
762 nsr_lcm["RO"]["nsr_status"] = ns_status
763 if ns_status == "ERROR":
764 raise ROclient.ROClientException(ns_status_info)
765 elif ns_status == "BUILD":
766 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
767 self.update_db("nsrs", nsr_id, db_nsr)
768 elif ns_status == "ACTIVE":
769 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
770 break
771 else:
772 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
773
774 await asyncio.sleep(5, loop=self.loop)
775 deloyment_timeout -= 5
776 if deloyment_timeout <= 0:
777 raise ROclient.ROClientException("Timeout waiting ns to be ready")
778 db_nsr["detailed-status"] = "Configuring vnfr"
779 self.update_db("nsrs", nsr_id, db_nsr)
780
781 # The parameters we'll need to deploy a charm
782 number_to_configure = 0
783
784 def deploy():
785 """An inner function to deploy the charm from either vnf or vdu
786 """
787
788 # Login to the VCA.
789 # if number_to_configure == 0:
790 # self.logger.debug("Logging into N2VC...")
791 # task = asyncio.ensure_future(self.n2vc.login())
792 # yield from asyncio.wait_for(task, 30.0)
793 # self.logger.debug("Logged into N2VC!")
794
795 ## await self.n2vc.login()
796
797 # Note: The charm needs to exist on disk at the location
798 # specified by charm_path.
799 base_folder = vnfd["_admin"]["storage"]
800 storage_params = self.fs.get_params()
801 charm_path = "{}{}/{}/charms/{}".format(
802 storage_params["path"],
803 base_folder["folder"],
804 base_folder["pkg-dir"],
805 proxy_charm
806 )
807
808 # Setup the runtime parameters for this VNF
809 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip']["vnf"][vnf_index]
810
811 # ns_name will be ignored in the current version of N2VC
812 # but will be implemented for the next point release.
813 model_name = 'default'
814 application_name = self.n2vc.FormatApplicationName(
815 nsr_name,
816 vnf_index,
817 vnfd['name'],
818 )
819
820 nsr_lcm["VCA"][vnf_index] = {
821 "model": model_name,
822 "application": application_name,
823 "operational-status": "init",
824 "detailed-status": "",
825 "vnfd_id": vnfd_id,
826 }
827
828 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
829 task = asyncio.ensure_future(
830 self.n2vc.DeployCharms(
831 model_name, # The network service name
832 application_name, # The application name
833 vnfd, # The vnf descriptor
834 charm_path, # Path to charm
835 params, # Runtime params, like mgmt ip
836 {}, # for native charms only
837 self.n2vc_callback, # Callback for status changes
838 db_nsr, # Callback parameter
839 db_nslcmop,
840 vnf_index, # Callback parameter
841 None, # Callback parameter (task)
842 )
843 )
844 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
845 db_nsr, db_nslcmop, vnf_index))
846 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
847
848 # TODO: Make this call inside deploy()
849 # Login to the VCA. If there are multiple calls to login(),
850 # subsequent calls will be a nop and return immediately.
851 await self.n2vc.login()
852
853 step = "Looking for needed vnfd to configure"
854 self.logger.debug(logging_text + step)
855 for c_vnf in nsd["constituent-vnfd"]:
856 vnfd_id = c_vnf["vnfd-id-ref"]
857 vnf_index = str(c_vnf["member-vnf-index"])
858 vnfd = needed_vnfd[vnfd_id]
859
860 # Check if this VNF has a charm configuration
861 vnf_config = vnfd.get("vnf-configuration")
862
863 if vnf_config and vnf_config.get("juju"):
864 proxy_charm = vnf_config["juju"]["charm"]
865 params = {}
866
867 if proxy_charm:
868 if 'initial-config-primitive' in vnf_config:
869 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
870
871 deploy()
872 number_to_configure += 1
873
874 # Deploy charms for each VDU that supports one.
875 for vdu in vnfd['vdu']:
876 vdu_config = vdu.get('vdu-configuration')
877 proxy_charm = None
878 params = {}
879
880 if vdu_config and vdu_config.get("juju"):
881 proxy_charm = vdu_config["juju"]["charm"]
882
883 if 'initial-config-primitive' in vdu_config:
884 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
885
886 if proxy_charm:
887 deploy()
888 number_to_configure += 1
889
890 if number_to_configure:
891 db_nsr["config-status"] = "configuring"
892 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
893 db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
894 else:
895 db_nslcmop["operationState"] = "COMPLETED"
896 db_nslcmop["detailed-status"] = "done"
897 db_nsr["config-status"] = "configured"
898 db_nsr["detailed-status"] = "done"
899 db_nsr["operational-status"] = "running"
900 self.update_db("nsrs", nsr_id, db_nsr)
901 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
902 self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
903 return nsr_lcm
904
905 except (ROclient.ROClientException, DbException, LcmException) as e:
906 self.logger.error(logging_text + "Exit Exception {}".format(e))
907 exc = e
908 except Exception as e:
909 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
910 exc = e
911 finally:
912 if exc:
913 if db_nsr:
914 db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
915 db_nsr["operational-status"] = "failed"
916 self.update_db("nsrs", nsr_id, db_nsr)
917 if db_nslcmop:
918 db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
919 db_nslcmop["operationState"] = "FAILED"
920 db_nslcmop["statusEnteredTime"] = time()
921 self.update_db("nslcmops", nslcmop_id, db_nslcmop)
922
923 async def ns_terminate(self, nsr_id, nslcmop_id):
924 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
925 self.logger.debug(logging_text + "Enter")
926 db_nsr = None
927 db_nslcmop = None
928 exc = None
929 step = "Getting nsr, nslcmop from db"
930 failed_detail = [] # annotates all failed error messages
931 vca_task_list = []
932 vca_task_dict = {}
933 try:
934 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
935 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
936 # nsd = db_nsr["nsd"]
937 nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
938 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
939 return
940 # TODO ALF remove
941 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
942 # #TODO check if VIM is creating and wait
943 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
944
945 db_nsr_update = {
946 "operational-status": "terminating",
947 "config-status": "terminating",
948 "detailed-status": "Deleting charms",
949 }
950 self.update_db_2("nsrs", nsr_id, db_nsr_update)
951
952 try:
953 self.logger.debug(logging_text + step)
954 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
955 if deploy_info and deploy_info.get("application"):
956 task = asyncio.ensure_future(
957 self.n2vc.RemoveCharms(
958 deploy_info['model'],
959 deploy_info['application'],
960 # self.n2vc_callback,
961 # db_nsr,
962 # db_nslcmop,
963 # vnf_index,
964 )
965 )
966 vca_task_list.append(task)
967 vca_task_dict[vnf_index] = task
968 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
969 # deploy_info['application'], None, db_nsr,
970 # db_nslcmop, vnf_index))
971 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
972 except Exception as e:
973 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
974 # remove from RO
975
976 RO = ROclient.ROClient(self.loop, **self.ro_config)
977 # Delete ns
978 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
979 if RO_nsr_id:
980 try:
981 step = db_nsr["detailed-status"] = "Deleting ns at RO"
982 self.logger.debug(logging_text + step)
983 desc = await RO.delete("ns", RO_nsr_id)
984 nsr_lcm["RO"]["nsr_id"] = None
985 nsr_lcm["RO"]["nsr_status"] = "DELETED"
986 except ROclient.ROClientException as e:
987 if e.http_code == 404: # not found
988 nsr_lcm["RO"]["nsr_id"] = None
989 nsr_lcm["RO"]["nsr_status"] = "DELETED"
990 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
991 elif e.http_code == 409: #conflict
992 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
993 self.logger.debug(logging_text + failed_detail[-1])
994 else:
995 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
996 self.logger.error(logging_text + failed_detail[-1])
997
998 # Delete nsd
999 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1000 if RO_nsd_id:
1001 try:
1002 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
1003 desc = await RO.delete("nsd", RO_nsd_id)
1004 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1005 nsr_lcm["RO"]["nsd_id"] = None
1006 except ROclient.ROClientException as e:
1007 if e.http_code == 404: # not found
1008 nsr_lcm["RO"]["nsd_id"] = None
1009 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1010 elif e.http_code == 409: #conflict
1011 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1012 self.logger.debug(logging_text + failed_detail[-1])
1013 else:
1014 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1015 self.logger.error(logging_text + failed_detail[-1])
1016
1017 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1018 if not RO_vnfd_id:
1019 continue
1020 try:
1021 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
1022 desc = await RO.delete("vnfd", RO_vnfd_id)
1023 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1024 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1025 except ROclient.ROClientException as e:
1026 if e.http_code == 404: # not found
1027 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
1028 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1029 elif e.http_code == 409: #conflict
1030 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1031 self.logger.debug(logging_text + failed_detail[-1])
1032 else:
1033 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1034 self.logger.error(logging_text + failed_detail[-1])
1035
1036 if vca_task_list:
1037 await asyncio.wait(vca_task_list, timeout=300)
1038 for vnf_index, task in vca_task_dict.items():
1039 if task.cancelled():
1040 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1041 elif task.done():
1042 exc = task.exception()
1043 if exc:
1044 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1045 else:
1046 nsr_lcm["VCA"][vnf_index] = None
1047 else: # timeout
1048 # TODO Should it be cancelled?!!
1049 task.cancel()
1050 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1051
1052 if failed_detail:
1053 self.logger.error(logging_text + " ;".join(failed_detail))
1054 db_nsr_update = {
1055 "operational-status": "failed",
1056 "detailed-status": "Deletion errors " + "; ".join(failed_detail),
1057 "_admin": {"deployed": nsr_lcm, }
1058 }
1059 db_nslcmop_update = {
1060 "detailedStatus": "; ".join(failed_detail),
1061 "operationState": "FAILED",
1062 "statusEnteredTime": time()
1063 }
1064 elif db_nslcmop["operationParams"].get("autoremove"):
1065 self.db.del_one("nsrs", {"_id": nsr_id})
1066 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1067 else:
1068 db_nsr_update = {
1069 "operational-status": "terminated",
1070 "detailed-status": "Done",
1071 "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"}
1072 }
1073 db_nslcmop_update = {
1074 "detailedStatus": "Done",
1075 "operationState": "COMPLETED",
1076 "statusEnteredTime": time()
1077 }
1078 self.logger.debug(logging_text + "Exit")
1079
1080 except (ROclient.ROClientException, DbException) as e:
1081 self.logger.error(logging_text + "Exit Exception {}".format(e))
1082 exc = e
1083 except Exception as e:
1084 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1085 exc = e
1086 finally:
1087 if exc and db_nslcmop:
1088 db_nslcmop_update = {
1089 "detailed-status": "FAILED {}: {}".format(step, exc),
1090 "operationState": "FAILED",
1091 "statusEnteredTime": time(),
1092 }
1093 if db_nslcmop_update:
1094 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1095 if db_nsr_update:
1096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1097
1098 async def ns_action(self, nsr_id, nslcmop_id):
1099 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1100 self.logger.debug(logging_text + "Enter")
1101 # get all needed from database
1102 db_nsr = None
1103 db_nslcmop = None
1104 db_nslcmop_update = None
1105 exc = None
1106 step = "Getting nsr, nslcmop"
1107 try:
1108 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1109 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1110 nsr_lcm = db_nsr["_admin"].get("deployed")
1111 vnf_index = db_nslcmop["operationParams"]["vnf_member_index"]
1112
1113 #TODO check if ns is in a proper status
1114 vca_deployed = nsr_lcm["VCA"].get(vnf_index)
1115 if not vca_deployed:
1116 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index))
1117 model_name = vca_deployed.get("model")
1118 application_name = vca_deployed.get("application")
1119 if not model_name or not application_name:
1120 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index))
1121 if vca_deployed["operational-status"] != "active":
1122 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1123 vnf_index, vca_deployed["operational-status"]))
1124 primitive = db_nslcmop["operationParams"]["primitive"]
1125 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1126 callback = None # self.n2vc_callback
1127 callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
1128 await self.n2vc.login()
1129 task = asyncio.ensure_future(
1130 self.n2vc.ExecutePrimitive(
1131 model_name,
1132 application_name,
1133 primitive, callback,
1134 *callback_args,
1135 **primitive_params
1136 )
1137 )
1138 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1139 # db_nsr, db_nslcmop, vnf_index))
1140 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1141 # wait until completed with timeout
1142 await asyncio.wait((task,), timeout=300)
1143
1144 result = "FAILED" # by default
1145 result_detail = ""
1146 if task.cancelled():
1147 db_nslcmop["detailedStatus"] = "Task has been cancelled"
1148 elif task.done():
1149 exc = task.exception()
1150 if exc:
1151 result_detail = str(exc)
1152 else:
1153 self.logger.debug(logging_text + " task Done")
1154 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1155 result = "COMPLETED"
1156 result_detail = "Done"
1157 else: # timeout
1158 # TODO Should it be cancelled?!!
1159 task.cancel()
1160 result_detail = "timeout"
1161
1162 db_nslcmop_update = {
1163 "detailedStatus": result_detail,
1164 "operationState": result,
1165 "statusEnteredTime": time()
1166 }
1167 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1168 return # database update is called inside finally
1169
1170 except (DbException, LcmException) as e:
1171 self.logger.error(logging_text + "Exit Exception {}".format(e))
1172 exc = e
1173 except Exception as e:
1174 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1175 exc = e
1176 finally:
1177 if exc and db_nslcmop:
1178 db_nslcmop_update = {
1179 "detailed-status": "FAILED {}: {}".format(step, exc),
1180 "operationState": "FAILED",
1181 "statusEnteredTime": time(),
1182 }
1183 if db_nslcmop_update:
1184 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1185
1186 async def test(self, param=None):
1187 self.logger.debug("Starting/Ending test task: {}".format(param))
1188
1189 def cancel_tasks(self, topic, _id):
1190 """
1191 Cancel all active tasks of a concrete nsr or vim identified for _id
1192 :param topic: can be ns or vim_account
1193 :param _id: nsr or vim identity
1194 :return: None, or raises an exception if not possible
1195 """
1196 if topic == "ns":
1197 lcm_tasks = self.lcm_ns_tasks
1198 elif topic== "vim_account":
1199 lcm_tasks = self.lcm_vim_tasks
1200 elif topic== "sdn":
1201 lcm_tasks = self.lcm_sdn_tasks
1202
1203 if not lcm_tasks.get(_id):
1204 return
1205 for order_id, tasks_set in lcm_tasks[_id].items():
1206 for task_name, task in tasks_set.items():
1207 result = task.cancel()
1208 if result:
1209 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1210 lcm_tasks[_id] = {}
1211
1212 async def kafka_ping(self):
1213 self.logger.debug("Task kafka_ping Enter")
1214 consecutive_errors = 0
1215 first_start = True
1216 kafka_has_received = False
1217 self.pings_not_received = 1
1218 while True:
1219 try:
1220 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1221 # time between pings are low when it is not received and at starting
1222 wait_time = 5 if not kafka_has_received else 120
1223 if not self.pings_not_received:
1224 kafka_has_received = True
1225 self.pings_not_received += 1
1226 await asyncio.sleep(wait_time, loop=self.loop)
1227 if self.pings_not_received > 10:
1228 raise LcmException("It is not receiving pings from Kafka bus")
1229 consecutive_errors = 0
1230 first_start = False
1231 except LcmException:
1232 raise
1233 except Exception as e:
1234 # if not first_start is the first time after starting. So leave more time and wait
1235 # to allow kafka starts
1236 if consecutive_errors == 8 if not first_start else 30:
1237 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1238 raise
1239 consecutive_errors += 1
1240 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1241 wait_time = 1 if not first_start else 5
1242 await asyncio.sleep(wait_time, loop=self.loop)
1243
1244 async def kafka_read(self):
1245 self.logger.debug("Task kafka_read Enter")
1246 order_id = 1
1247 # future = asyncio.Future()
1248 consecutive_errors = 0
1249 first_start = True
1250 while consecutive_errors < 10:
1251 try:
1252 topics = ("admin", "ns", "vim_account", "sdn")
1253 topic, command, params = await self.msg.aioread(topics, self.loop)
1254 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1255 consecutive_errors = 0
1256 first_start = False
1257 order_id += 1
1258 if command == "exit":
1259 print("Bye!")
1260 break
1261 elif command.startswith("#"):
1262 continue
1263 elif command == "echo":
1264 # just for test
1265 print(params)
1266 sys.stdout.flush()
1267 continue
1268 elif command == "test":
1269 asyncio.Task(self.test(params), loop=self.loop)
1270 continue
1271
1272 if topic == "admin":
1273 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1274 self.pings_not_received = 0
1275 continue
1276 elif topic == "ns":
1277 if command == "instantiate":
1278 # self.logger.debug("Deploying NS {}".format(nsr_id))
1279 nslcmop = params
1280 nslcmop_id = nslcmop["_id"]
1281 nsr_id = nslcmop["nsInstanceId"]
1282 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1283 if nsr_id not in self.lcm_ns_tasks:
1284 self.lcm_ns_tasks[nsr_id] = {}
1285 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1286 continue
1287 elif command == "terminate":
1288 # self.logger.debug("Deleting NS {}".format(nsr_id))
1289 nslcmop = params
1290 nslcmop_id = nslcmop["_id"]
1291 nsr_id = nslcmop["nsInstanceId"]
1292 self.cancel_tasks(topic, nsr_id)
1293 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1294 if nsr_id not in self.lcm_ns_tasks:
1295 self.lcm_ns_tasks[nsr_id] = {}
1296 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1297 continue
1298 elif command == "action":
1299 # self.logger.debug("Update NS {}".format(nsr_id))
1300 nslcmop = params
1301 nslcmop_id = nslcmop["_id"]
1302 nsr_id = nslcmop["nsInstanceId"]
1303 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1304 if nsr_id not in self.lcm_ns_tasks:
1305 self.lcm_ns_tasks[nsr_id] = {}
1306 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1307 continue
1308 elif command == "show":
1309 try:
1310 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1311 print(
1312 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1313 "{}\n deploy: {}\n tasks: {}".format(
1314 nsr_id, db_nsr["operational-status"],
1315 db_nsr["config-status"], db_nsr["detailed-status"],
1316 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1317 except Exception as e:
1318 print("nsr {} not found: {}".format(nsr_id, e))
1319 sys.stdout.flush()
1320 continue
1321 elif command == "deleted":
1322 continue # TODO cleaning of task just in case should be done
1323 elif topic == "vim_account":
1324 vim_id = params["_id"]
1325 if command == "create":
1326 task = asyncio.ensure_future(self.vim_create(params, order_id))
1327 if vim_id not in self.lcm_vim_tasks:
1328 self.lcm_vim_tasks[vim_id] = {}
1329 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1330 continue
1331 elif command == "delete":
1332 self.cancel_tasks(topic, vim_id)
1333 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1334 if vim_id not in self.lcm_vim_tasks:
1335 self.lcm_vim_tasks[vim_id] = {}
1336 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1337 continue
1338 elif command == "show":
1339 print("not implemented show with vim_account")
1340 sys.stdout.flush()
1341 continue
1342 elif command == "edit":
1343 task = asyncio.ensure_future(self.vim_edit(vim_id, order_id))
1344 if vim_id not in self.lcm_vim_tasks:
1345 self.lcm_vim_tasks[vim_id] = {}
1346 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1347 continue
1348 elif topic == "sdn":
1349 _sdn_id = params["_id"]
1350 if command == "create":
1351 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1352 if _sdn_id not in self.lcm_sdn_tasks:
1353 self.lcm_sdn_tasks[_sdn_id] = {}
1354 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1355 continue
1356 elif command == "delete":
1357 self.cancel_tasks(topic, _sdn_id)
1358 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1359 if _sdn_id not in self.lcm_sdn_tasks:
1360 self.lcm_sdn_tasks[_sdn_id] = {}
1361 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1362 continue
1363 elif command == "edit":
1364 task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id))
1365 if _sdn_id not in self.lcm_sdn_tasks:
1366 self.lcm_sdn_tasks[_sdn_id] = {}
1367 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1368 continue
1369 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1370 except Exception as e:
1371 # if not first_start is the first time after starting. So leave more time and wait
1372 # to allow kafka starts
1373 if consecutive_errors == 8 if not first_start else 30:
1374 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1375 raise
1376 consecutive_errors += 1
1377 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1378 wait_time = 2 if not first_start else 5
1379 await asyncio.sleep(wait_time, loop=self.loop)
1380
1381 # self.logger.debug("Task kafka_read terminating")
1382 self.logger.debug("Task kafka_read exit")
1383
1384 def start(self):
1385 self.loop = asyncio.get_event_loop()
1386 self.loop.run_until_complete(asyncio.gather(
1387 self.kafka_read(),
1388 self.kafka_ping()
1389 ))
1390 # TODO
1391 # self.logger.debug("Terminating cancelling creation tasks")
1392 # self.cancel_tasks("ALL", "create")
1393 # timeout = 200
1394 # while self.is_pending_tasks():
1395 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1396 # await asyncio.sleep(2, loop=self.loop)
1397 # timeout -= 2
1398 # if not timeout:
1399 # self.cancel_tasks("ALL", "ALL")
1400 self.loop.close()
1401 self.loop = None
1402 if self.db:
1403 self.db.db_disconnect()
1404 if self.msg:
1405 self.msg.disconnect()
1406 if self.fs:
1407 self.fs.fs_disconnect()
1408
1409
1410 def read_config_file(self, config_file):
1411 # TODO make a [ini] + yaml inside parser
1412 # the configparser library is not suitable, because it does not admit comments at the end of line,
1413 # and not parse integer or boolean
1414 try:
1415 with open(config_file) as f:
1416 conf = yaml.load(f)
1417 for k, v in environ.items():
1418 if not k.startswith("OSMLCM_"):
1419 continue
1420 k_items = k.lower().split("_")
1421 c = conf
1422 try:
1423 for k_item in k_items[1:-1]:
1424 if k_item in ("ro", "vca"):
1425 # put in capital letter
1426 k_item = k_item.upper()
1427 c = c[k_item]
1428 if k_items[-1] == "port":
1429 c[k_items[-1]] = int(v)
1430 else:
1431 c[k_items[-1]] = v
1432 except Exception as e:
1433 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1434
1435 return conf
1436 except Exception as e:
1437 self.logger.critical("At config file '{}': {}".format(config_file, e))
1438 exit(1)
1439
1440
1441 if __name__ == '__main__':
1442
1443 config_file = "lcm.cfg"
1444 lcm = Lcm(config_file)
1445
1446 lcm.start()