Improve N2VC integration
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26
27
28 class LcmException(Exception):
29 pass
30
31
32 class Lcm:
33
34 def __init__(self, config_file):
35 """
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
38 :return: None
39 """
40
41 self.db = None
42 self.msg = None
43 self.fs = None
44 # contains created tasks/futures to be able to cancel
45
46 self.lcm_ns_tasks = {}
47 self.lcm_vim_tasks = {}
48 self.lcm_sdn_tasks = {}
49 # logging
50 self.logger = logging.getLogger('lcm')
51 # load configuration
52 config = self.read_config_file(config_file)
53 self.config = config
54 self.ro_config={
55 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
56 "tenant": config.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
58 "loglevel": "ERROR",
59 }
60
61 self.vca = config["VCA"] # TODO VCA
62 self.loop = None
63
64 # logging
65 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
67 config["database"]["logger_name"] = "lcm.db"
68 config["storage"]["logger_name"] = "lcm.fs"
69 config["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config["global"]:
71 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
72 maxBytes=100e6, backupCount=9, delay=0)
73 file_handler.setFormatter(log_formatter_simple)
74 self.logger.addHandler(file_handler)
75 else:
76 str_handler = logging.StreamHandler()
77 str_handler.setFormatter(log_formatter_simple)
78 self.logger.addHandler(str_handler)
79
80 if config["global"].get("loglevel"):
81 self.logger.setLevel(config["global"]["loglevel"])
82
83 # logging other modules
84 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config[k1]["logger_name"] = logname
86 logger_module = logging.getLogger(logname)
87 if "logfile" in config[k1]:
88 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
89 maxBytes=100e6, backupCount=9, delay=0)
90 file_handler.setFormatter(log_formatter_simple)
91 logger_module.addHandler(file_handler)
92 if "loglevel" in config[k1]:
93 logger_module.setLevel(config[k1]["loglevel"])
94
95 self.n2vc = N2VC(
96 log=self.logger,
97 server=config['VCA']['host'],
98 port=config['VCA']['port'],
99 user=config['VCA']['user'],
100 secret=config['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
105 artifacts=None,
106 )
107
108 try:
109 if config["database"]["driver"] == "mongo":
110 self.db = dbmongo.DbMongo()
111 self.db.db_connect(config["database"])
112 elif config["database"]["driver"] == "memory":
113 self.db = dbmemory.DbMemory()
114 self.db.db_connect(config["database"])
115 else:
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config["database"]["driver"]))
118
119 if config["storage"]["driver"] == "local":
120 self.fs = fslocal.FsLocal()
121 self.fs.fs_connect(config["storage"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config["storage"]["driver"]))
125
126 if config["message"]["driver"] == "local":
127 self.msg = msglocal.MsgLocal()
128 self.msg.connect(config["message"])
129 elif config["message"]["driver"] == "kafka":
130 self.msg = msgkafka.MsgKafka()
131 self.msg.connect(config["message"])
132 else:
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config["storage"]["driver"]))
135 except (DbException, FsException, MsgException) as e:
136 self.logger.critical(str(e), exc_info=True)
137 raise LcmException(str(e))
138
139 def update_db(self, item, _id, _desc):
140 try:
141 self.db.replace(item, _id, _desc)
142 except DbException as e:
143 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
144
145 async def create_vim(self, vim_content, order_id):
146 vim_id = vim_content["_id"]
147 logging_text = "Task create_vim={} ".format(vim_id)
148 self.logger.debug(logging_text + "Enter")
149 db_vim = None
150 exc = None
151 try:
152 step = "Getting vim from db"
153 db_vim = self.db.get_one("vims", {"_id": vim_id})
154 if "_admin" not in db_vim:
155 db_vim["_admin"] = {}
156 if "deploy" not in db_vim["_admin"]:
157 db_vim["_admin"]["deploy"] = {}
158 db_vim["_admin"]["deploy"]["RO"] = None
159
160 step = "Creating vim at RO"
161 RO = ROclient.ROClient(self.loop, **self.ro_config)
162 vim_RO = deepcopy(vim_content)
163 vim_RO.pop("_id", None)
164 vim_RO.pop("_admin", None)
165 vim_RO.pop("schema_version", None)
166 vim_RO.pop("schema_type", None)
167 vim_RO.pop("vim_tenant_name", None)
168 vim_RO["type"] = vim_RO.pop("vim_type")
169 vim_RO.pop("vim_user", None)
170 vim_RO.pop("vim_password", None)
171 desc = await RO.create("vim", descriptor=vim_RO)
172 RO_vim_id = desc["uuid"]
173 db_vim["_admin"]["deploy"]["RO"] = RO_vim_id
174 self.update_db("vims", vim_id, db_vim)
175
176 step = "Attach vim to RO tenant"
177 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
178 "vim_username": vim_content["vim_user"],
179 "vim_password": vim_content["vim_password"],
180 "config": vim_content["config"]
181 }
182 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
183 db_vim["_admin"]["operationalState"] = "ENABLED"
184 self.update_db("vims", vim_id, db_vim)
185
186 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
187 return RO_vim_id
188
189 except (ROclient.ROClientException, DbException) as e:
190 self.logger.error(logging_text + "Exit Exception {}".format(e))
191 exc = e
192 except Exception as e:
193 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
194 exc = e
195 finally:
196 if exc and db_vim:
197 db_vim["_admin"]["operationalState"] = "ERROR"
198 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
199 self.update_db("vims", vim_id, db_vim)
200
201 async def edit_vim(self, vim_content, order_id):
202 vim_id = vim_content["_id"]
203 logging_text = "Task edit_vim={} ".format(vim_id)
204 self.logger.debug(logging_text + "Enter")
205 db_vim = None
206 exc = None
207 step = "Getting vim from db"
208 try:
209 db_vim = self.db.get_one("vims", {"_id": vim_id})
210 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
211 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
212 step = "Editing vim at RO"
213 RO = ROclient.ROClient(self.loop, **self.ro_config)
214 vim_RO = deepcopy(vim_content)
215 vim_RO.pop("_id", None)
216 vim_RO.pop("_admin", None)
217 vim_RO.pop("schema_version", None)
218 vim_RO.pop("schema_type", None)
219 vim_RO.pop("vim_tenant_name", None)
220 vim_RO["type"] = vim_RO.pop("vim_type")
221 vim_RO.pop("vim_user", None)
222 vim_RO.pop("vim_password", None)
223 if vim_RO:
224 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
225
226 step = "Editing vim-account at RO tenant"
227 vim_RO = {}
228 for k in ("vim_tenant_name", "vim_password", "config"):
229 if k in vim_content:
230 vim_RO[k] = vim_content[k]
231 if "vim_user" in vim_content:
232 vim_content["vim_username"] = vim_content["vim_user"]
233 if vim_RO:
234 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
235 db_vim["_admin"]["operationalState"] = "ENABLED"
236 self.update_db("vims", vim_id, db_vim)
237
238 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
239 return RO_vim_id
240
241 except (ROclient.ROClientException, DbException) as e:
242 self.logger.error(logging_text + "Exit Exception {}".format(e))
243 exc = e
244 except Exception as e:
245 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
246 exc = e
247 finally:
248 if exc and db_vim:
249 db_vim["_admin"]["operationalState"] = "ERROR"
250 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
251 self.update_db("vims", vim_id, db_vim)
252
253 async def delete_vim(self, vim_id, order_id):
254 logging_text = "Task delete_vim={} ".format(vim_id)
255 self.logger.debug(logging_text + "Enter")
256 db_vim = None
257 exc = None
258 step = "Getting vim from db"
259 try:
260 db_vim = self.db.get_one("vims", {"_id": vim_id})
261 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
262 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
263 RO = ROclient.ROClient(self.loop, **self.ro_config)
264 step = "Detaching vim from RO tenant"
265 try:
266 await RO.detach_datacenter(RO_vim_id)
267 except ROclient.ROClientException as e:
268 if e.http_code == 404: # not found
269 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
270 else:
271 raise
272
273 step = "Deleting vim from RO"
274 try:
275 await RO.delete("vim", RO_vim_id)
276 except ROclient.ROClientException as e:
277 if e.http_code == 404: # not found
278 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
279 else:
280 raise
281 else:
282 # nothing to delete
283 self.logger.error(logging_text + "Skipping. There is not RO information at database")
284 self.db.del_one("vims", {"_id": vim_id})
285 self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id))
286 return None
287
288 except (ROclient.ROClientException, DbException) as e:
289 self.logger.error(logging_text + "Exit Exception {}".format(e))
290 exc = e
291 except Exception as e:
292 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
293 exc = e
294 finally:
295 if exc and db_vim:
296 db_vim["_admin"]["operationalState"] = "ERROR"
297 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
298 self.update_db("vims", vim_id, db_vim)
299
300 async def create_sdn(self, sdn_content, order_id):
301 sdn_id = sdn_content["_id"]
302 logging_text = "Task create_sdn={} ".format(sdn_id)
303 self.logger.debug(logging_text + "Enter")
304 db_sdn = None
305 exc = None
306 try:
307 step = "Getting sdn from db"
308 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
309 if "_admin" not in db_sdn:
310 db_sdn["_admin"] = {}
311 if "deploy" not in db_sdn["_admin"]:
312 db_sdn["_admin"]["deploy"] = {}
313 db_sdn["_admin"]["deploy"]["RO"] = None
314
315 step = "Creating sdn at RO"
316 RO = ROclient.ROClient(self.loop, **self.ro_config)
317 sdn_RO = deepcopy(sdn_content)
318 sdn_RO.pop("_id", None)
319 sdn_RO.pop("_admin", None)
320 sdn_RO.pop("schema_version", None)
321 sdn_RO.pop("schema_type", None)
322 desc = await RO.create("sdn", descriptor=sdn_RO)
323 RO_sdn_id = desc["uuid"]
324 db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn["_admin"]["operationalState"] = "ENABLED"
326 self.update_db("sdns", sdn_id, db_sdn)
327 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
328 return RO_sdn_id
329
330 except (ROclient.ROClientException, DbException) as e:
331 self.logger.error(logging_text + "Exit Exception {}".format(e))
332 exc = e
333 except Exception as e:
334 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
335 exc = e
336 finally:
337 if exc and db_sdn:
338 db_sdn["_admin"]["operationalState"] = "ERROR"
339 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
340 self.update_db("sdns", sdn_id, db_sdn)
341
342 async def edit_sdn(self, sdn_content, order_id):
343 sdn_id = sdn_content["_id"]
344 logging_text = "Task edit_sdn={} ".format(sdn_id)
345 self.logger.debug(logging_text + "Enter")
346 db_sdn = None
347 exc = None
348 step = "Getting sdn from db"
349 try:
350 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
351 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
352 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
353 RO = ROclient.ROClient(self.loop, **self.ro_config)
354 step = "Editing sdn at RO"
355 sdn_RO = deepcopy(sdn_content)
356 sdn_RO.pop("_id", None)
357 sdn_RO.pop("_admin", None)
358 sdn_RO.pop("schema_version", None)
359 sdn_RO.pop("schema_type", None)
360 if sdn_RO:
361 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
362 db_sdn["_admin"]["operationalState"] = "ENABLED"
363 self.update_db("sdns", sdn_id, db_sdn)
364
365 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
366 return RO_sdn_id
367
368 except (ROclient.ROClientException, DbException) as e:
369 self.logger.error(logging_text + "Exit Exception {}".format(e))
370 exc = e
371 except Exception as e:
372 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
373 exc = e
374 finally:
375 if exc and db_sdn:
376 db_sdn["_admin"]["operationalState"] = "ERROR"
377 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 async def delete_sdn(self, sdn_id, order_id):
381 logging_text = "Task delete_sdn={} ".format(sdn_id)
382 self.logger.debug(logging_text + "Enter")
383 db_sdn = None
384 exc = None
385 step = "Getting sdn from db"
386 try:
387 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
388 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
389 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
390 RO = ROclient.ROClient(self.loop, **self.ro_config)
391 step = "Deleting sdn from RO"
392 try:
393 await RO.delete("sdn", RO_sdn_id)
394 except ROclient.ROClientException as e:
395 if e.http_code == 404: # not found
396 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
397 else:
398 raise
399 else:
400 # nothing to delete
401 self.logger.error(logging_text + "Skipping. There is not RO information at database")
402 self.db.del_one("sdns", {"_id": sdn_id})
403 self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id))
404 return None
405
406 except (ROclient.ROClientException, DbException) as e:
407 self.logger.error(logging_text + "Exit Exception {}".format(e))
408 exc = e
409 except Exception as e:
410 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
411 exc = e
412 finally:
413 if exc and db_sdn:
414 db_sdn["_admin"]["operationalState"] = "ERROR"
415 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
416 self.update_db("sdns", sdn_id, db_sdn)
417
418 def vnfd2RO(self, vnfd, new_id=None):
419 """
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
424 """
425 ci_file = None
426 try:
427 vnfd_RO = deepcopy(vnfd)
428 vnfd_RO.pop("_id", None)
429 vnfd_RO.pop("_admin", None)
430 if new_id:
431 vnfd_RO["id"] = new_id
432 for vdu in vnfd_RO["vdu"]:
433 if "cloud-init-file" in vdu:
434 base_folder = vnfd["_admin"]["storage"]
435 clout_init_file = "{}/{}/cloud_init/{}".format(
436 base_folder["folder"],
437 base_folder["pkg-dir"],
438 vdu["cloud-init-file"]
439 )
440 ci_file = self.fs.file_open(clout_init_file, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content = ci_file.read()
443 ci_file.close()
444 ci_file = None
445 vdu.pop("cloud-init-file", None)
446 vdu["cloud-init"] = clout_init_content
447 return vnfd_RO
448 except FsException as e:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
450 finally:
451 if ci_file:
452 ci_file.close()
453
454 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
455 """Update the lcm database with the status of the charm.
456
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
464 - removing,
465 - removed
466
467 Updates the network service's config-status to reflect the state of all
468 charms.
469 """
470 nsr_id = None
471 try:
472 nsr_id = db_nsr["_id"]
473 nsr_lcm = db_nsr["_admin"]["deploy"]
474 if task:
475 if task.cancelled():
476 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index))
477 return
478
479 if task.done():
480 exc = task.exception()
481 if exc:
482 self.logger.error(
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc))
484 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
485 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
486 else:
487 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status:
492 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status))
493 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
494 return # same status, ignore
495 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
498 else:
499 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True)
500 return
501
502 some_failed = False
503 all_active = True
504 status_map = {}
505 for vnf_index, vca_info in nsr_lcm["VCA"].items():
506 vca_status = vca_info["operational-status"]
507 if vca_status not in status_map:
508 # Initialize it
509 status_map[vca_status] = 0
510 status_map[vca_status] += 1
511
512 if vca_status != "active":
513 all_active = False
514 if vca_status == "error":
515 some_failed = True
516 db_nsr["config-status"] = "failed"
517 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index,
518 vca_info["detailed-status"])
519 break
520
521 if all_active:
522 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index))
523 db_nsr["config-status"] = "configured"
524 db_nsr["detailed-status"] = "done"
525 elif some_failed:
526 pass
527 else:
528 cs = "configuring: "
529 separator = ""
530 for status, num in status_map.items():
531 cs += separator + "{}: {}".format(status, num)
532 separator = ", "
533 db_nsr["config-status"] = cs
534 self.update_db("nsrs", nsr_id, db_nsr)
535
536 except Exception as e:
537 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True)
538
539 async def create_ns(self, nsr_id, order_id):
540 logging_text = "Task create_ns={} ".format(nsr_id)
541 self.logger.debug(logging_text + "Enter")
542 # get all needed from database
543 db_nsr = None
544 exc = None
545 step = "Getting nsr from db"
546 try:
547 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
548 nsd = db_nsr["nsd"]
549 nsr_name = db_nsr["name"] # TODO short-name??
550 needed_vnfd = {}
551 for c_vnf in nsd["constituent-vnfd"]:
552 vnfd_id = c_vnf["vnfd-id-ref"]
553 if vnfd_id not in needed_vnfd:
554 step = "Getting vnfd={} from db".format(vnfd_id)
555 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
556
557 nsr_lcm = {
558 "id": nsr_id,
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
560 "nsr_ip": {},
561 "VCA": {},
562 }
563 db_nsr["_admin"]["deploy"] = nsr_lcm
564 db_nsr["detailed-status"] = "creating"
565 db_nsr["operational-status"] = "init"
566
567 deloyment_timeout = 120
568
569 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
570
571 # get vnfds, instantiate at RO
572 for vnfd_id, vnfd in needed_vnfd.items():
573 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
574 self.logger.debug(logging_text + step)
575 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
576
577 # look if present
578 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
579 if vnfd_list:
580 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
581 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id, vnfd_list[0]["uuid"]))
583 else:
584 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
585 desc = await RO.create("vnfd", descriptor=vnfd_RO)
586 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
587 self.update_db("nsrs", nsr_id, db_nsr)
588
589 # create nsd at RO
590 nsd_id = nsd["id"]
591 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
592 self.logger.debug(logging_text + step)
593
594 nsd_id_RO = nsd_id + "." + nsd_id[:200]
595 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
596 if nsd_list:
597 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
598 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id, nsd_list[0]["uuid"]))
600 else:
601 nsd_RO = deepcopy(nsd)
602 nsd_RO["id"] = nsd_id_RO
603 nsd_RO.pop("_id", None)
604 nsd_RO.pop("_admin", None)
605 for c_vnf in nsd_RO["constituent-vnfd"]:
606 vnfd_id = c_vnf["vnfd-id-ref"]
607 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
608 desc = await RO.create("nsd", descriptor=nsd_RO)
609 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
610 self.update_db("nsrs", nsr_id, db_nsr)
611
612 # Crate ns at RO
613 # if present use it unless in error status
614 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
615 if RO_nsr_id:
616 try:
617 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
618 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
619 desc = await RO.show("ns", RO_nsr_id)
620 except ROclient.ROClientException as e:
621 if e.http_code != HTTPStatus.NOT_FOUND:
622 raise
623 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
624 if RO_nsr_id:
625 ns_status, ns_status_info = RO.check_ns_status(desc)
626 nsr_lcm["RO"]["nsr_status"] = ns_status
627 if ns_status == "ERROR":
628 step = db_nsr["detailed-status"] = "Deleting ns at RO"
629 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
630 await RO.delete("ns", RO_nsr_id)
631 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
632
633 if not RO_nsr_id:
634 step = db_nsr["detailed-status"] = "Creating ns at RO"
635 self.logger.debug(logging_text + step)
636
637 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
638 scenario=nsr_lcm["RO"]["nsd_id"])
639 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
640 nsr_lcm["RO"]["nsr_status"] = "BUILD"
641 self.update_db("nsrs", nsr_id, db_nsr)
642
643 # wait until NS is ready
644 step = ns_status_detailed = "Waiting ns ready at RO"
645 db_nsr["detailed-status"] = ns_status_detailed
646 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
647 deloyment_timeout = 600
648 while deloyment_timeout > 0:
649 desc = await RO.show("ns", RO_nsr_id)
650 ns_status, ns_status_info = RO.check_ns_status(desc)
651 nsr_lcm["RO"]["nsr_status"] = ns_status
652 if ns_status == "ERROR":
653 raise ROclient.ROClientException(ns_status_info)
654 elif ns_status == "BUILD":
655 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
656 self.update_db("nsrs", nsr_id, db_nsr)
657 elif ns_status == "ACTIVE":
658 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
659 break
660 else:
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
662
663 await asyncio.sleep(5, loop=self.loop)
664 deloyment_timeout -= 5
665 if deloyment_timeout <= 0:
666 raise ROclient.ROClientException("Timeout waiting ns to be ready")
667 db_nsr["detailed-status"] = "Configuring vnfr"
668 self.update_db("nsrs", nsr_id, db_nsr)
669
670 # The parameters we'll need to deploy a charm
671 number_to_configure = 0
672
673 def deploy():
674 """An inner function to deploy the charm from either vnf or vdu
675 """
676
677 # Login to the VCA.
678 # if number_to_configure == 0:
679 # self.logger.debug("Logging into N2VC...")
680 # task = asyncio.ensure_future(self.n2vc.login())
681 # yield from asyncio.wait_for(task, 30.0)
682 # self.logger.debug("Logged into N2VC!")
683
684 ## await self.n2vc.login()
685
686 # Note: The charm needs to exist on disk at the location
687 # specified by charm_path.
688 base_folder = vnfd["_admin"]["storage"]
689 storage_params = self.fs.get_params()
690 charm_path = "{}{}/{}/charms/{}".format(
691 storage_params["path"],
692 base_folder["folder"],
693 base_folder["pkg-dir"],
694 proxy_charm
695 )
696
697 # Setup the runtime parameters for this VNF
698 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip'][vnf_index]
699
700 # ns_name will be ignored in the current version of N2VC
701 # but will be implemented for the next point release.
702 model_name = 'default'
703 application_name = self.n2vc.FormatApplicationName(
704 nsr_name,
705 vnf_index,
706 vnfd['name'],
707 )
708
709 nsr_lcm["VCA"][vnf_index] = {
710 "model": model_name,
711 "application": application_name,
712 "operational-status": "init",
713 "detailed-status": "",
714 "vnfd_id": vnfd_id,
715 }
716
717 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
718 task = asyncio.ensure_future(
719 self.n2vc.DeployCharms(
720 model_name, # The network service name
721 application_name, # The application name
722 vnfd, # The vnf descriptor
723 charm_path, # Path to charm
724 params, # Runtime params, like mgmt ip
725 {}, # for native charms only
726 self.n2vc_callback, # Callback for status changes
727 db_nsr, # Callback parameter
728 vnf_index, # Callback parameter
729 None, # Callback parameter (task)
730 )
731 )
732 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, db_nsr, vnf_index))
733 self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
734
735 # TODO: Make this call inside deploy()
736 # Login to the VCA. If there are multiple calls to login(),
737 # subsequent calls will be a nop and return immediately.
738 await self.n2vc.login()
739
740 step = "Looking for needed vnfd to configure"
741 self.logger.debug(logging_text + step)
742 for c_vnf in nsd["constituent-vnfd"]:
743 vnfd_id = c_vnf["vnfd-id-ref"]
744 vnf_index = str(c_vnf["member-vnf-index"])
745 vnfd = needed_vnfd[vnfd_id]
746
747 # Deploy charms for each VDU that supports one.
748 for vdu in vnfd['vdu']:
749 vdu_config = vdu.get('vdu-configuration')
750 proxy_charm = None
751 params = {}
752
753 if vdu_config and vdu_config.get("juju"):
754 proxy_charm = vdu_config["juju"]["charm"]
755
756 if 'initial-config-primitive' in vdu_config:
757 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
758
759 else:
760 # If a VDU doesn't declare it's own charm, check
761 # if the VNF does and deploy that instead.
762
763 # Check if this VNF has a charm configuration
764 vnf_config = vnfd.get("vnf-configuration")
765
766 if vnf_config and vnf_config.get("juju"):
767 proxy_charm = vnf_config["juju"]["charm"]
768
769 if 'initial-config-primitive' in vnf_config:
770 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
771
772 if proxy_charm:
773 deploy()
774 number_to_configure += 1
775
776 db_nsr["config-status"] = "configuring" if number_to_configure else "configured"
777 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure) if number_to_configure else "done"
778 db_nsr["operational-status"] = "running"
779 self.update_db("nsrs", nsr_id, db_nsr)
780
781 self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id))
782 return nsr_lcm
783
784 except (ROclient.ROClientException, DbException, LcmException) as e:
785 self.logger.error(logging_text + "Exit Exception {}".format(e))
786 exc = e
787 except Exception as e:
788 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
789 exc = e
790 finally:
791 if exc and db_nsr:
792 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
793 db_nsr["operational-status"] = "failed"
794 self.update_db("nsrs", nsr_id, db_nsr)
795
796 async def delete_ns(self, nsr_id, order_id):
797 logging_text = "Task delete_ns={} ".format(nsr_id)
798 self.logger.debug(logging_text + "Enter")
799 db_nsr = None
800 exc = None
801 step = "Getting nsr from db"
802 try:
803 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
804 nsd = db_nsr["nsd"]
805 nsr_lcm = db_nsr["_admin"]["deploy"]
806
807 db_nsr["operational-status"] = "terminating"
808 db_nsr["config-status"] = "terminating"
809 db_nsr["detailed-status"] = "Deleting charms"
810 self.update_db("nsrs", nsr_id, db_nsr)
811
812 try:
813 self.logger.debug(logging_text + step)
814 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
815 if deploy_info and deploy_info.get("application"):
816 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
817
818 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
819 task = asyncio.ensure_future(
820 self.n2vc.RemoveCharms(
821 deploy_info['model'],
822 deploy_info['application'],
823 self.n2vc_callback,
824 db_nsr,
825 vnf_index,
826 )
827 )
828 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
829 # deploy_info['application'],None, db_nsr, vnf_index))
830 self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
831 except Exception as e:
832 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
833 # remove from RO
834
835 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
836 # Delete ns
837 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
838 if RO_nsr_id:
839 try:
840 step = db_nsr["detailed-status"] = "Deleting ns at RO"
841 self.logger.debug(logging_text + step)
842 desc = await RO.delete("ns", RO_nsr_id)
843 nsr_lcm["RO"]["nsr_id"] = None
844 nsr_lcm["RO"]["nsr_status"] = "DELETED"
845 except ROclient.ROClientException as e:
846 if e.http_code == 404: # not found
847 nsr_lcm["RO"]["nsr_id"] = None
848 nsr_lcm["RO"]["nsr_status"] = "DELETED"
849 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
850 elif e.http_code == 409: #conflict
851 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
852 else:
853 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
854 self.update_db("nsrs", nsr_id, db_nsr)
855
856 # Delete nsd
857 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
858 if RO_nsd_id:
859 try:
860 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
861 desc = await RO.delete("nsd", RO_nsd_id)
862 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
863 nsr_lcm["RO"]["nsd_id"] = None
864 except ROclient.ROClientException as e:
865 if e.http_code == 404: # not found
866 nsr_lcm["RO"]["nsd_id"] = None
867 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
868 elif e.http_code == 409: #conflict
869 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
870 else:
871 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
872 self.update_db("nsrs", nsr_id, db_nsr)
873
874 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
875 if not RO_vnfd_id:
876 continue
877 try:
878 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
879 desc = await RO.delete("vnfd", RO_vnfd_id)
880 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
881 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
882 except ROclient.ROClientException as e:
883 if e.http_code == 404: # not found
884 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
885 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
886 elif e.http_code == 409: #conflict
887 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
888 else:
889 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
890 self.update_db("nsrs", nsr_id, db_nsr)
891
892 # TODO delete from database or mark as deleted???
893 db_nsr["operational-status"] = "terminated"
894 self.db.del_one("nsrs", {"_id": nsr_id})
895 self.logger.debug(logging_text + "Exit")
896
897 except (ROclient.ROClientException, DbException) as e:
898 self.logger.error(logging_text + "Exit Exception {}".format(e))
899 exc = e
900 except Exception as e:
901 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
902 exc = e
903 finally:
904 if exc and db_nsr:
905 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
906 db_nsr["operational-status"] = "failed"
907 self.update_db("nsrs", nsr_id, db_nsr)
908
909 async def test(self, param=None):
910 self.logger.debug("Starting/Ending test task: {}".format(param))
911
912 def cancel_tasks(self, topic, _id):
913 """
914 Cancel all active tasks of a concrete nsr or vim identified for _id
915 :param topic: can be ns or vim_account
916 :param _id: nsr or vim identity
917 :return: None, or raises an exception if not possible
918 """
919 if topic == "ns":
920 lcm_tasks = self.lcm_ns_tasks
921 elif topic== "vim_account":
922 lcm_tasks = self.lcm_vim_tasks
923 elif topic== "sdn":
924 lcm_tasks = self.lcm_sdn_tasks
925
926 if not lcm_tasks.get(_id):
927 return
928 for order_id, tasks_set in lcm_tasks[_id].items():
929 for task_name, task in tasks_set.items():
930 result = task.cancel()
931 if result:
932 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
933 lcm_tasks[_id] = {}
934
935 async def read_kafka(self):
936 self.logger.debug("Task Kafka Enter")
937 order_id = 1
938 # future = asyncio.Future()
939 consecutive_errors = 0
940 while consecutive_errors < 10:
941 try:
942 topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop)
943 self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params))
944 consecutive_errors = 0
945 order_id += 1
946 if command == "exit":
947 print("Bye!")
948 break
949 elif command.startswith("#"):
950 continue
951 elif command == "echo":
952 # just for test
953 print(params)
954 sys.stdout.flush()
955 continue
956 elif command == "test":
957 asyncio.Task(self.test(params), loop=self.loop)
958 continue
959
960 if topic == "ns":
961 nsr_id = params.strip()
962 if command == "create":
963 # self.logger.debug("Deploying NS {}".format(nsr_id))
964 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
965 if nsr_id not in self.lcm_ns_tasks:
966 self.lcm_ns_tasks[nsr_id] = {}
967 self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task}
968 continue
969 elif command == "delete":
970 # self.logger.debug("Deleting NS {}".format(nsr_id))
971 self.cancel_tasks(topic, nsr_id)
972 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
973 if nsr_id not in self.lcm_ns_tasks:
974 self.lcm_ns_tasks[nsr_id] = {}
975 self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task}
976 continue
977 elif command == "show":
978 try:
979 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
980 print(
981 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
982 "{}\n deploy: {}\n tasks: {}".format(
983 nsr_id, db_nsr["operational-status"],
984 db_nsr["config-status"], db_nsr["detailed-status"],
985 db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id)))
986 except Exception as e:
987 print("nsr {} not found: {}".format(nsr_id, e))
988 sys.stdout.flush()
989 continue
990 elif topic == "vim_account":
991 vim_id = params["_id"]
992 if command == "create":
993 task = asyncio.ensure_future(self.create_vim(params, order_id))
994 if vim_id not in self.lcm_vim_tasks:
995 self.lcm_vim_tasks[vim_id] = {}
996 self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task}
997 continue
998 elif command == "delete":
999 self.cancel_tasks(topic, vim_id)
1000 task = asyncio.ensure_future(self.delete_vim(vim_id, order_id))
1001 if vim_id not in self.lcm_vim_tasks:
1002 self.lcm_vim_tasks[vim_id] = {}
1003 self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task}
1004 continue
1005 elif command == "show":
1006 print("not implemented show with vim_account")
1007 sys.stdout.flush()
1008 continue
1009 elif command == "edit":
1010 task = asyncio.ensure_future(self.edit_vim(vim_id, order_id))
1011 if vim_id not in self.lcm_vim_tasks:
1012 self.lcm_vim_tasks[vim_id] = {}
1013 self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task}
1014 continue
1015 elif topic == "sdn":
1016 _sdn_id = params["_id"]
1017 if command == "create":
1018 task = asyncio.ensure_future(self.create_sdn(params, order_id))
1019 if _sdn_id not in self.lcm_sdn_tasks:
1020 self.lcm_sdn_tasks[_sdn_id] = {}
1021 self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task}
1022 continue
1023 elif command == "delete":
1024 self.cancel_tasks(topic, _sdn_id)
1025 task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id))
1026 if _sdn_id not in self.lcm_sdn_tasks:
1027 self.lcm_sdn_tasks[_sdn_id] = {}
1028 self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task}
1029 continue
1030 elif command == "edit":
1031 task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id))
1032 if _sdn_id not in self.lcm_sdn_tasks:
1033 self.lcm_sdn_tasks[_sdn_id] = {}
1034 self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task}
1035 continue
1036 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1037 except Exception as e:
1038 if consecutive_errors == 5:
1039 self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e))
1040 break
1041 else:
1042 consecutive_errors += 1
1043 self.logger.error("Task Kafka Exception {}".format(e))
1044 await asyncio.sleep(1, loop=self.loop)
1045 self.logger.debug("Task Kafka terminating")
1046 # TODO
1047 # self.cancel_tasks("ALL", "create")
1048 # timeout = 200
1049 # while self.is_pending_tasks():
1050 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1051 # await asyncio.sleep(2, loop=self.loop)
1052 # timeout -= 2
1053 # if not timeout:
1054 # self.cancel_tasks("ALL", "ALL")
1055 self.logger.debug("Task Kafka exit")
1056
1057 def start(self):
1058 self.loop = asyncio.get_event_loop()
1059 self.loop.run_until_complete(self.read_kafka())
1060 self.loop.close()
1061 self.loop = None
1062 if self.db:
1063 self.db.db_disconnect()
1064 if self.msg:
1065 self.msg.disconnect()
1066 if self.fs:
1067 self.fs.fs_disconnect()
1068
1069
1070 def read_config_file(self, config_file):
1071 # TODO make a [ini] + yaml inside parser
1072 # the configparser library is not suitable, because it does not admit comments at the end of line,
1073 # and not parse integer or boolean
1074 try:
1075 with open(config_file) as f:
1076 conf = yaml.load(f)
1077 for k, v in environ.items():
1078 if not k.startswith("OSMLCM_"):
1079 continue
1080 k_items = k.lower().split("_")
1081 c = conf
1082 try:
1083 for k_item in k_items[1:-1]:
1084 if k_item in ("ro", "vca"):
1085 # put in capital letter
1086 k_item = k_item.upper()
1087 c = c[k_item]
1088 if k_items[-1] == "port":
1089 c[k_items[-1]] = int(v)
1090 else:
1091 c[k_items[-1]] = v
1092 except Exception as e:
1093 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1094
1095 return conf
1096 except Exception as e:
1097 self.logger.critical("At config file '{}': {}".format(config_file, e))
1098 exit(1)
1099
1100
1101 if __name__ == '__main__':
1102
1103 config_file = "lcm.cfg"
1104 lcm = Lcm(config_file)
1105
1106 lcm.start()