aa24caa1ecec27d77ef36c20525be0728b50f895
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26
27
28 class LcmException(Exception):
29 pass
30
31
32 class Lcm:
33
34 def __init__(self, config_file):
35 """
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
38 :return: None
39 """
40
41 self.db = None
42 self.msg = None
43 self.fs = None
44 # contains created tasks/futures to be able to cancel
45
46 self.lcm_ns_tasks = {}
47 self.lcm_vim_tasks = {}
48 self.lcm_sdn_tasks = {}
49 # logging
50 self.logger = logging.getLogger('lcm')
51 # load configuration
52 config = self.read_config_file(config_file)
53 self.config = config
54 self.ro_config={
55 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
56 "tenant": config.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
58 "loglevel": "ERROR",
59 }
60
61 self.vca = config["VCA"] # TODO VCA
62 self.loop = None
63
64 # logging
65 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
67 config["database"]["logger_name"] = "lcm.db"
68 config["storage"]["logger_name"] = "lcm.fs"
69 config["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config["global"]:
71 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
72 maxBytes=100e6, backupCount=9, delay=0)
73 file_handler.setFormatter(log_formatter_simple)
74 self.logger.addHandler(file_handler)
75 else:
76 str_handler = logging.StreamHandler()
77 str_handler.setFormatter(log_formatter_simple)
78 self.logger.addHandler(str_handler)
79
80 if config["global"].get("loglevel"):
81 self.logger.setLevel(config["global"]["loglevel"])
82
83 # logging other modules
84 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config[k1]["logger_name"] = logname
86 logger_module = logging.getLogger(logname)
87 if "logfile" in config[k1]:
88 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
89 maxBytes=100e6, backupCount=9, delay=0)
90 file_handler.setFormatter(log_formatter_simple)
91 logger_module.addHandler(file_handler)
92 if "loglevel" in config[k1]:
93 logger_module.setLevel(config[k1]["loglevel"])
94
95 self.n2vc = N2VC(
96 log=self.logger,
97 server=config['VCA']['host'],
98 port=config['VCA']['port'],
99 user=config['VCA']['user'],
100 secret=config['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
105 artifacts=None,
106 )
107
108 try:
109 if config["database"]["driver"] == "mongo":
110 self.db = dbmongo.DbMongo()
111 self.db.db_connect(config["database"])
112 elif config["database"]["driver"] == "memory":
113 self.db = dbmemory.DbMemory()
114 self.db.db_connect(config["database"])
115 else:
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config["database"]["driver"]))
118
119 if config["storage"]["driver"] == "local":
120 self.fs = fslocal.FsLocal()
121 self.fs.fs_connect(config["storage"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config["storage"]["driver"]))
125
126 if config["message"]["driver"] == "local":
127 self.msg = msglocal.MsgLocal()
128 self.msg.connect(config["message"])
129 elif config["message"]["driver"] == "kafka":
130 self.msg = msgkafka.MsgKafka()
131 self.msg.connect(config["message"])
132 else:
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config["storage"]["driver"]))
135 except (DbException, FsException, MsgException) as e:
136 self.logger.critical(str(e), exc_info=True)
137 raise LcmException(str(e))
138
139 def update_db(self, item, _id, _desc):
140 try:
141 self.db.replace(item, _id, _desc)
142 except DbException as e:
143 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
144
145 async def create_vim(self, vim_content, order_id):
146 vim_id = vim_content["_id"]
147 logging_text = "Task create_vim={} ".format(vim_id)
148 self.logger.debug(logging_text + "Enter")
149 db_vim = None
150 exc = None
151 try:
152 step = "Getting vim from db"
153 db_vim = self.db.get_one("vims", {"_id": vim_id})
154 if "_admin" not in db_vim:
155 db_vim["_admin"] = {}
156 if "deploy" not in db_vim["_admin"]:
157 db_vim["_admin"]["deploy"] = {}
158 db_vim["_admin"]["deploy"]["RO"] = None
159
160 step = "Creating vim at RO"
161 RO = ROclient.ROClient(self.loop, **self.ro_config)
162 vim_RO = deepcopy(vim_content)
163 vim_RO.pop("_id", None)
164 vim_RO.pop("_admin", None)
165 vim_RO.pop("schema_version", None)
166 vim_RO.pop("schema_type", None)
167 vim_RO.pop("vim_tenant_name", None)
168 vim_RO["type"] = vim_RO.pop("vim_type")
169 vim_RO.pop("vim_user", None)
170 vim_RO.pop("vim_password", None)
171 desc = await RO.create("vim", descriptor=vim_RO)
172 RO_vim_id = desc["uuid"]
173 db_vim["_admin"]["deploy"]["RO"] = RO_vim_id
174 self.update_db("vims", vim_id, db_vim)
175
176 step = "Attach vim to RO tenant"
177 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
178 "vim_username": vim_content["vim_user"],
179 "vim_password": vim_content["vim_password"],
180 "config": vim_content["config"]
181 }
182 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
183 db_vim["_admin"]["operationalState"] = "ENABLED"
184 self.update_db("vims", vim_id, db_vim)
185
186 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
187 return RO_vim_id
188
189 except (ROclient.ROClientException, DbException) as e:
190 self.logger.error(logging_text + "Exit Exception {}".format(e))
191 exc = e
192 except Exception as e:
193 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
194 exc = e
195 finally:
196 if exc and db_vim:
197 db_vim["_admin"]["operationalState"] = "ERROR"
198 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
199 self.update_db("vims", vim_id, db_vim)
200
201 async def edit_vim(self, vim_content, order_id):
202 vim_id = vim_content["_id"]
203 logging_text = "Task edit_vim={} ".format(vim_id)
204 self.logger.debug(logging_text + "Enter")
205 db_vim = None
206 exc = None
207 step = "Getting vim from db"
208 try:
209 db_vim = self.db.get_one("vims", {"_id": vim_id})
210 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
211 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
212 step = "Editing vim at RO"
213 RO = ROclient.ROClient(self.loop, **self.ro_config)
214 vim_RO = deepcopy(vim_content)
215 vim_RO.pop("_id", None)
216 vim_RO.pop("_admin", None)
217 vim_RO.pop("schema_version", None)
218 vim_RO.pop("schema_type", None)
219 vim_RO.pop("vim_tenant_name", None)
220 vim_RO["type"] = vim_RO.pop("vim_type")
221 vim_RO.pop("vim_user", None)
222 vim_RO.pop("vim_password", None)
223 if vim_RO:
224 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
225
226 step = "Editing vim-account at RO tenant"
227 vim_RO = {}
228 for k in ("vim_tenant_name", "vim_password", "config"):
229 if k in vim_content:
230 vim_RO[k] = vim_content[k]
231 if "vim_user" in vim_content:
232 vim_content["vim_username"] = vim_content["vim_user"]
233 if vim_RO:
234 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
235 db_vim["_admin"]["operationalState"] = "ENABLED"
236 self.update_db("vims", vim_id, db_vim)
237
238 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
239 return RO_vim_id
240
241 except (ROclient.ROClientException, DbException) as e:
242 self.logger.error(logging_text + "Exit Exception {}".format(e))
243 exc = e
244 except Exception as e:
245 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
246 exc = e
247 finally:
248 if exc and db_vim:
249 db_vim["_admin"]["operationalState"] = "ERROR"
250 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
251 self.update_db("vims", vim_id, db_vim)
252
253 async def delete_vim(self, vim_id, order_id):
254 logging_text = "Task delete_vim={} ".format(vim_id)
255 self.logger.debug(logging_text + "Enter")
256 db_vim = None
257 exc = None
258 step = "Getting vim from db"
259 try:
260 db_vim = self.db.get_one("vims", {"_id": vim_id})
261 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
262 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
263 RO = ROclient.ROClient(self.loop, **self.ro_config)
264 step = "Detaching vim from RO tenant"
265 try:
266 await RO.detach_datacenter(RO_vim_id)
267 except ROclient.ROClientException as e:
268 if e.http_code == 404: # not found
269 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
270 else:
271 raise
272
273 step = "Deleting vim from RO"
274 try:
275 await RO.delete("vim", RO_vim_id)
276 except ROclient.ROClientException as e:
277 if e.http_code == 404: # not found
278 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
279 else:
280 raise
281 else:
282 # nothing to delete
283 self.logger.error(logging_text + "Skipping. There is not RO information at database")
284 self.db.del_one("vims", {"_id": vim_id})
285 self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id))
286 return None
287
288 except (ROclient.ROClientException, DbException) as e:
289 self.logger.error(logging_text + "Exit Exception {}".format(e))
290 exc = e
291 except Exception as e:
292 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
293 exc = e
294 finally:
295 if exc and db_vim:
296 db_vim["_admin"]["operationalState"] = "ERROR"
297 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
298 self.update_db("vims", vim_id, db_vim)
299
300 async def create_sdn(self, sdn_content, order_id):
301 sdn_id = sdn_content["_id"]
302 logging_text = "Task create_sdn={} ".format(sdn_id)
303 self.logger.debug(logging_text + "Enter")
304 db_sdn = None
305 exc = None
306 try:
307 step = "Getting sdn from db"
308 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
309 if "_admin" not in db_sdn:
310 db_sdn["_admin"] = {}
311 if "deploy" not in db_sdn["_admin"]:
312 db_sdn["_admin"]["deploy"] = {}
313 db_sdn["_admin"]["deploy"]["RO"] = None
314
315 step = "Creating sdn at RO"
316 RO = ROclient.ROClient(self.loop, **self.ro_config)
317 sdn_RO = deepcopy(sdn_content)
318 sdn_RO.pop("_id", None)
319 sdn_RO.pop("_admin", None)
320 sdn_RO.pop("schema_version", None)
321 sdn_RO.pop("schema_type", None)
322 desc = await RO.create("sdn", descriptor=sdn_RO)
323 RO_sdn_id = desc["uuid"]
324 db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn["_admin"]["operationalState"] = "ENABLED"
326 self.update_db("sdns", sdn_id, db_sdn)
327 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
328 return RO_sdn_id
329
330 except (ROclient.ROClientException, DbException) as e:
331 self.logger.error(logging_text + "Exit Exception {}".format(e))
332 exc = e
333 except Exception as e:
334 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
335 exc = e
336 finally:
337 if exc and db_sdn:
338 db_sdn["_admin"]["operationalState"] = "ERROR"
339 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
340 self.update_db("sdns", sdn_id, db_sdn)
341
342 async def edit_sdn(self, sdn_content, order_id):
343 sdn_id = sdn_content["_id"]
344 logging_text = "Task edit_sdn={} ".format(sdn_id)
345 self.logger.debug(logging_text + "Enter")
346 db_sdn = None
347 exc = None
348 step = "Getting sdn from db"
349 try:
350 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
351 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
352 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
353 RO = ROclient.ROClient(self.loop, **self.ro_config)
354 step = "Editing sdn at RO"
355 sdn_RO = deepcopy(sdn_content)
356 sdn_RO.pop("_id", None)
357 sdn_RO.pop("_admin", None)
358 sdn_RO.pop("schema_version", None)
359 sdn_RO.pop("schema_type", None)
360 if sdn_RO:
361 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
362 db_sdn["_admin"]["operationalState"] = "ENABLED"
363 self.update_db("sdns", sdn_id, db_sdn)
364
365 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
366 return RO_sdn_id
367
368 except (ROclient.ROClientException, DbException) as e:
369 self.logger.error(logging_text + "Exit Exception {}".format(e))
370 exc = e
371 except Exception as e:
372 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
373 exc = e
374 finally:
375 if exc and db_sdn:
376 db_sdn["_admin"]["operationalState"] = "ERROR"
377 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 async def delete_sdn(self, sdn_id, order_id):
381 logging_text = "Task delete_sdn={} ".format(sdn_id)
382 self.logger.debug(logging_text + "Enter")
383 db_sdn = None
384 exc = None
385 step = "Getting sdn from db"
386 try:
387 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
388 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
389 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
390 RO = ROclient.ROClient(self.loop, **self.ro_config)
391 step = "Deleting sdn from RO"
392 try:
393 await RO.delete("sdn", RO_sdn_id)
394 except ROclient.ROClientException as e:
395 if e.http_code == 404: # not found
396 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
397 else:
398 raise
399 else:
400 # nothing to delete
401 self.logger.error(logging_text + "Skipping. There is not RO information at database")
402 self.db.del_one("sdns", {"_id": sdn_id})
403 self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id))
404 return None
405
406 except (ROclient.ROClientException, DbException) as e:
407 self.logger.error(logging_text + "Exit Exception {}".format(e))
408 exc = e
409 except Exception as e:
410 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
411 exc = e
412 finally:
413 if exc and db_sdn:
414 db_sdn["_admin"]["operationalState"] = "ERROR"
415 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
416 self.update_db("sdns", sdn_id, db_sdn)
417
418 def vnfd2RO(self, vnfd, new_id=None):
419 """
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
424 """
425 ci_file = None
426 try:
427 vnfd_RO = deepcopy(vnfd)
428 vnfd_RO.pop("_id", None)
429 vnfd_RO.pop("_admin", None)
430 if new_id:
431 vnfd_RO["id"] = new_id
432 for vdu in vnfd_RO["vdu"]:
433 if "cloud-init-file" in vdu:
434 base_folder = vnfd["_admin"]["storage"]
435 clout_init_file = "{}/{}/cloud_init/{}".format(
436 base_folder["folder"],
437 base_folder["pkg-dir"],
438 vdu["cloud-init-file"]
439 )
440 ci_file = self.fs.file_open(clout_init_file, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content = ci_file.read()
443 ci_file.close()
444 ci_file = None
445 vdu.pop("cloud-init-file", None)
446 vdu["cloud-init"] = clout_init_content
447 return vnfd_RO
448 except FsException as e:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
450 finally:
451 if ci_file:
452 ci_file.close()
453
454 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
455 """Update the lcm database with the status of the charm.
456
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
464 - removing,
465 - removed
466
467 Updates the network service's config-status to reflect the state of all
468 charms.
469 """
470 nsr_id = None
471 try:
472 nsr_id = db_nsr["_id"]
473 nsr_lcm = db_nsr["_admin"]["deploy"]
474 if task:
475 if task.cancelled():
476 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index))
477 return
478
479 if task.done():
480 exc = task.exception()
481 if exc:
482 self.logger.error(
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc))
484 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
485 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
486 else:
487 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status:
492 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status))
493 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
494 return # same status, ignore
495 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
498 else:
499 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True)
500 return
501
502 some_failed = False
503 all_active = True
504 status_map = {}
505 for vnf_index, vca_info in nsr_lcm["VCA"].items():
506 vca_status = vca_info["operational-status"]
507 if vca_status not in status_map:
508 # Initialize it
509 status_map[vca_status] = 0
510 status_map[vca_status] += 1
511
512 if vca_status != "active":
513 all_active = False
514 if vca_status == "error":
515 some_failed = True
516 db_nsr["config-status"] = "failed"
517 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index,
518 vca_info["detailed-status"])
519 break
520
521 if all_active:
522 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index))
523 db_nsr["config-status"] = "configured"
524 db_nsr["detailed-status"] = "done"
525 elif some_failed:
526 pass
527 else:
528 cs = "configuring: "
529 separator = ""
530 for status, num in status_map.items():
531 cs += separator + "{}: {}".format(status, num)
532 separator = ", "
533 db_nsr["config-status"] = cs
534 self.update_db("nsrs", nsr_id, db_nsr)
535
536 except Exception as e:
537 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True)
538
539 async def create_ns(self, nsr_id, order_id):
540 logging_text = "Task create_ns={} ".format(nsr_id)
541 self.logger.debug(logging_text + "Enter")
542 # get all needed from database
543 db_nsr = None
544 exc = None
545 step = "Getting nsr from db"
546 try:
547 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
548 nsd = db_nsr["nsd"]
549 nsr_name = db_nsr["name"] # TODO short-name??
550 needed_vnfd = {}
551 for c_vnf in nsd["constituent-vnfd"]:
552 vnfd_id = c_vnf["vnfd-id-ref"]
553 if vnfd_id not in needed_vnfd:
554 step = "Getting vnfd={} from db".format(vnfd_id)
555 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
556
557 nsr_lcm = {
558 "id": nsr_id,
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
560 "nsr_ip": {},
561 "VCA": {},
562 }
563 db_nsr["_admin"]["deploy"] = nsr_lcm
564 db_nsr["detailed-status"] = "creating"
565 db_nsr["operational-status"] = "init"
566
567 deloyment_timeout = 120
568
569 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
570
571 # get vnfds, instantiate at RO
572 for vnfd_id, vnfd in needed_vnfd.items():
573 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
574 self.logger.debug(logging_text + step)
575 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
576
577 # look if present
578 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
579 if vnfd_list:
580 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
581 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id, vnfd_list[0]["uuid"]))
583 else:
584 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
585 desc = await RO.create("vnfd", descriptor=vnfd_RO)
586 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
587 self.update_db("nsrs", nsr_id, db_nsr)
588
589 # create nsd at RO
590 nsd_id = nsd["id"]
591 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
592 self.logger.debug(logging_text + step)
593
594 nsd_id_RO = nsd_id + "." + nsd_id[:200]
595 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
596 if nsd_list:
597 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
598 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id, nsd_list[0]["uuid"]))
600 else:
601 nsd_RO = deepcopy(nsd)
602 nsd_RO["id"] = nsd_id_RO
603 nsd_RO.pop("_id", None)
604 nsd_RO.pop("_admin", None)
605 for c_vnf in nsd_RO["constituent-vnfd"]:
606 vnfd_id = c_vnf["vnfd-id-ref"]
607 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
608 desc = await RO.create("nsd", descriptor=nsd_RO)
609 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
610 self.update_db("nsrs", nsr_id, db_nsr)
611
612 # Crate ns at RO
613 # if present use it unless in error status
614 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
615 if RO_nsr_id:
616 try:
617 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
618 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
619 desc = await RO.show("ns", RO_nsr_id)
620 except ROclient.ROClientException as e:
621 if e.http_code != HTTPStatus.NOT_FOUND:
622 raise
623 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
624 if RO_nsr_id:
625 ns_status, ns_status_info = RO.check_ns_status(desc)
626 nsr_lcm["RO"]["nsr_status"] = ns_status
627 if ns_status == "ERROR":
628 step = db_nsr["detailed-status"] = "Deleting ns at RO"
629 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
630 await RO.delete("ns", RO_nsr_id)
631 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
632
633 if not RO_nsr_id:
634 step = db_nsr["detailed-status"] = "Creating ns at RO"
635 self.logger.debug(logging_text + step)
636
637 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
638 scenario=nsr_lcm["RO"]["nsd_id"])
639 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
640 nsr_lcm["RO"]["nsr_status"] = "BUILD"
641 self.update_db("nsrs", nsr_id, db_nsr)
642
643 # wait until NS is ready
644 step = ns_status_detailed = "Waiting ns ready at RO"
645 db_nsr["detailed-status"] = ns_status_detailed
646 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
647 deloyment_timeout = 600
648 while deloyment_timeout > 0:
649 desc = await RO.show("ns", RO_nsr_id)
650 ns_status, ns_status_info = RO.check_ns_status(desc)
651 nsr_lcm["RO"]["nsr_status"] = ns_status
652 if ns_status == "ERROR":
653 raise ROclient.ROClientException(ns_status_info)
654 elif ns_status == "BUILD":
655 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
656 self.update_db("nsrs", nsr_id, db_nsr)
657 elif ns_status == "ACTIVE":
658 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
659 break
660 else:
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
662
663 await asyncio.sleep(5, loop=self.loop)
664 deloyment_timeout -= 5
665 if deloyment_timeout <= 0:
666 raise ROclient.ROClientException("Timeout waiting ns to be ready")
667 db_nsr["detailed-status"] = "Configuring vnfr"
668 self.update_db("nsrs", nsr_id, db_nsr)
669
670 vnfd_to_config = 0
671 step = "Looking for needed vnfd to configure"
672 self.logger.debug(logging_text + step)
673 for c_vnf in nsd["constituent-vnfd"]:
674 vnfd_id = c_vnf["vnfd-id-ref"]
675 vnf_index = str(c_vnf["member-vnf-index"])
676 vnfd = needed_vnfd[vnfd_id]
677 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
678 vnfd_to_config += 1
679 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
680
681 # Note: The charm needs to exist on disk at the location
682 # specified by charm_path.
683 base_folder = vnfd["_admin"]["storage"]
684 storage_params = self.fs.get_params()
685 charm_path = "{}{}/{}/charms/{}".format(
686 storage_params["path"],
687 base_folder["folder"],
688 base_folder["pkg-dir"],
689 proxy_charm
690 )
691
692 # Setup the runtime parameters for this VNF
693 params = {
694 'rw_mgmt_ip': nsr_lcm['nsr_ip'][vnf_index],
695 }
696
697 # model_name will be ignored in the current version of N2VC
698 # but will be implemented for the next point release.
699 model_name = 'default'
700 application_name = self.n2vc.FormatApplicationName(
701 nsr_name, # 'default',
702 vnf_index,
703 vnfd['name'],
704 )
705 # TODO N2VC implement this inside n2vc.FormatApplicationName
706 application_name = application_name[:50]
707
708 nsr_lcm["VCA"][vnf_index] = {
709 "model": model_name,
710 "application": application_name,
711 "operational-status": "init",
712 "detailed-status": "",
713 "vnfd_id": vnfd_id,
714 }
715
716 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
717 task = asyncio.ensure_future(
718 self.n2vc.DeployCharms(
719 model_name, # The network service name
720 application_name, # The application name
721 vnfd, # The vnf descriptor
722 charm_path, # Path to charm
723 params, # Runtime params, like mgmt ip
724 {}, # for native charms only
725 self.n2vc_callback, # Callback for status changes
726 db_nsr, # Callback parameter
727 vnf_index, # Callback parameter
728 None, # Callback parameter (task)
729 )
730 )
731 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name,
732 None, db_nsr, vnf_index))
733
734 self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
735 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
736 db_nsr["detailed-status"] = "configuring: init: {}".format(vnfd_to_config) if vnfd_to_config else "done"
737 db_nsr["operational-status"] = "running"
738 self.update_db("nsrs", nsr_id, db_nsr)
739
740 self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id))
741 return nsr_lcm
742
743 except (ROclient.ROClientException, DbException, LcmException) as e:
744 self.logger.error(logging_text + "Exit Exception {}".format(e))
745 exc = e
746 except Exception as e:
747 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
748 exc = e
749 finally:
750 if exc and db_nsr:
751 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
752 db_nsr["operational-status"] = "failed"
753 self.update_db("nsrs", nsr_id, db_nsr)
754
755 async def delete_ns(self, nsr_id, order_id):
756 logging_text = "Task delete_ns={} ".format(nsr_id)
757 self.logger.debug(logging_text + "Enter")
758 db_nsr = None
759 exc = None
760 step = "Getting nsr from db"
761 try:
762 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
763 nsd = db_nsr["nsd"]
764 nsr_lcm = db_nsr["_admin"]["deploy"]
765
766 db_nsr["operational-status"] = "terminating"
767 db_nsr["config-status"] = "terminating"
768 db_nsr["detailed-status"] = "Deleting charms"
769 self.update_db("nsrs", nsr_id, db_nsr)
770
771 try:
772 self.logger.debug(logging_text + step)
773 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
774 if deploy_info and deploy_info.get("application"):
775 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
776
777 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
778 task = asyncio.ensure_future(
779 self.n2vc.RemoveCharms(
780 deploy_info['model'],
781 deploy_info['application'],
782 self.n2vc_callback,
783 db_nsr,
784 vnf_index,
785 )
786 )
787 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
788 # deploy_info['application'],None, db_nsr, vnf_index))
789 self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
790 except Exception as e:
791 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
792 # remove from RO
793
794 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
795 # Delete ns
796 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
797 if RO_nsr_id:
798 try:
799 step = db_nsr["detailed-status"] = "Deleting ns at RO"
800 self.logger.debug(logging_text + step)
801 desc = await RO.delete("ns", RO_nsr_id)
802 nsr_lcm["RO"]["nsr_id"] = None
803 nsr_lcm["RO"]["nsr_status"] = "DELETED"
804 except ROclient.ROClientException as e:
805 if e.http_code == 404: # not found
806 nsr_lcm["RO"]["nsr_id"] = None
807 nsr_lcm["RO"]["nsr_status"] = "DELETED"
808 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
809 elif e.http_code == 409: #conflict
810 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
811 else:
812 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
813 self.update_db("nsrs", nsr_id, db_nsr)
814
815 # Delete nsd
816 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
817 if RO_nsd_id:
818 try:
819 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
820 desc = await RO.delete("nsd", RO_nsd_id)
821 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
822 nsr_lcm["RO"]["nsd_id"] = None
823 except ROclient.ROClientException as e:
824 if e.http_code == 404: # not found
825 nsr_lcm["RO"]["nsd_id"] = None
826 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
827 elif e.http_code == 409: #conflict
828 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
829 else:
830 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
831 self.update_db("nsrs", nsr_id, db_nsr)
832
833 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
834 if not RO_vnfd_id:
835 continue
836 try:
837 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
838 desc = await RO.delete("vnfd", RO_vnfd_id)
839 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
840 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
841 except ROclient.ROClientException as e:
842 if e.http_code == 404: # not found
843 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
844 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
845 elif e.http_code == 409: #conflict
846 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
847 else:
848 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
849 self.update_db("nsrs", nsr_id, db_nsr)
850
851 # TODO delete from database or mark as deleted???
852 db_nsr["operational-status"] = "terminated"
853 self.db.del_one("nsrs", {"_id": nsr_id})
854 self.logger.debug(logging_text + "Exit")
855
856 except (ROclient.ROClientException, DbException) as e:
857 self.logger.error(logging_text + "Exit Exception {}".format(e))
858 exc = e
859 except Exception as e:
860 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
861 exc = e
862 finally:
863 if exc and db_nsr:
864 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
865 db_nsr["operational-status"] = "failed"
866 self.update_db("nsrs", nsr_id, db_nsr)
867
868 async def test(self, param=None):
869 self.logger.debug("Starting/Ending test task: {}".format(param))
870
871 def cancel_tasks(self, topic, _id):
872 """
873 Cancel all active tasks of a concrete nsr or vim identified for _id
874 :param topic: can be ns or vim_account
875 :param _id: nsr or vim identity
876 :return: None, or raises an exception if not possible
877 """
878 if topic == "ns":
879 lcm_tasks = self.lcm_ns_tasks
880 elif topic== "vim_account":
881 lcm_tasks = self.lcm_vim_tasks
882 elif topic== "sdn":
883 lcm_tasks = self.lcm_sdn_tasks
884
885 if not lcm_tasks.get(_id):
886 return
887 for order_id, tasks_set in lcm_tasks[_id].items():
888 for task_name, task in tasks_set.items():
889 result = task.cancel()
890 if result:
891 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
892 lcm_tasks[_id] = {}
893
894 async def read_kafka(self):
895 self.logger.debug("Task Kafka Enter")
896 order_id = 1
897 # future = asyncio.Future()
898 consecutive_errors = 0
899 while consecutive_errors < 10:
900 try:
901 topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop)
902 self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params))
903 consecutive_errors = 0
904 order_id += 1
905 if command == "exit":
906 print("Bye!")
907 break
908 elif command.startswith("#"):
909 continue
910 elif command == "echo":
911 # just for test
912 print(params)
913 sys.stdout.flush()
914 continue
915 elif command == "test":
916 asyncio.Task(self.test(params), loop=self.loop)
917 continue
918
919 if topic == "ns":
920 nsr_id = params.strip()
921 if command == "create":
922 # self.logger.debug("Deploying NS {}".format(nsr_id))
923 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
924 if nsr_id not in self.lcm_ns_tasks:
925 self.lcm_ns_tasks[nsr_id] = {}
926 self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task}
927 continue
928 elif command == "delete":
929 # self.logger.debug("Deleting NS {}".format(nsr_id))
930 self.cancel_tasks(topic, nsr_id)
931 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
932 if nsr_id not in self.lcm_ns_tasks:
933 self.lcm_ns_tasks[nsr_id] = {}
934 self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task}
935 continue
936 elif command == "show":
937 try:
938 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
939 print(
940 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
941 "{}\n deploy: {}\n tasks: {}".format(
942 nsr_id, db_nsr["operational-status"],
943 db_nsr["config-status"], db_nsr["detailed-status"],
944 db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id)))
945 except Exception as e:
946 print("nsr {} not found: {}".format(nsr_id, e))
947 sys.stdout.flush()
948 continue
949 elif topic == "vim_account":
950 vim_id = params["_id"]
951 if command == "create":
952 task = asyncio.ensure_future(self.create_vim(params, order_id))
953 if vim_id not in self.lcm_vim_tasks:
954 self.lcm_vim_tasks[vim_id] = {}
955 self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task}
956 continue
957 elif command == "delete":
958 self.cancel_tasks(topic, vim_id)
959 task = asyncio.ensure_future(self.delete_vim(vim_id, order_id))
960 if vim_id not in self.lcm_vim_tasks:
961 self.lcm_vim_tasks[vim_id] = {}
962 self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task}
963 continue
964 elif command == "show":
965 print("not implemented show with vim_account")
966 sys.stdout.flush()
967 continue
968 elif command == "edit":
969 task = asyncio.ensure_future(self.edit_vim(vim_id, order_id))
970 if vim_id not in self.lcm_vim_tasks:
971 self.lcm_vim_tasks[vim_id] = {}
972 self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task}
973 continue
974 elif topic == "sdn":
975 _sdn_id = params["_id"]
976 if command == "create":
977 task = asyncio.ensure_future(self.create_sdn(params, order_id))
978 if _sdn_id not in self.lcm_sdn_tasks:
979 self.lcm_sdn_tasks[_sdn_id] = {}
980 self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task}
981 continue
982 elif command == "delete":
983 self.cancel_tasks(topic, _sdn_id)
984 task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id))
985 if _sdn_id not in self.lcm_sdn_tasks:
986 self.lcm_sdn_tasks[_sdn_id] = {}
987 self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task}
988 continue
989 elif command == "edit":
990 task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id))
991 if _sdn_id not in self.lcm_sdn_tasks:
992 self.lcm_sdn_tasks[_sdn_id] = {}
993 self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task}
994 continue
995 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
996 except Exception as e:
997 if consecutive_errors == 5:
998 self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e))
999 break
1000 else:
1001 consecutive_errors += 1
1002 self.logger.error("Task Kafka Exception {}".format(e))
1003 await asyncio.sleep(1, loop=self.loop)
1004 self.logger.debug("Task Kafka terminating")
1005 # TODO
1006 # self.cancel_tasks("ALL", "create")
1007 # timeout = 200
1008 # while self.is_pending_tasks():
1009 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1010 # await asyncio.sleep(2, loop=self.loop)
1011 # timeout -= 2
1012 # if not timeout:
1013 # self.cancel_tasks("ALL", "ALL")
1014 self.logger.debug("Task Kafka exit")
1015
1016 def start(self):
1017 self.loop = asyncio.get_event_loop()
1018 self.loop.run_until_complete(self.read_kafka())
1019 self.loop.close()
1020 self.loop = None
1021 if self.db:
1022 self.db.db_disconnect()
1023 if self.msg:
1024 self.msg.disconnect()
1025 if self.fs:
1026 self.fs.fs_disconnect()
1027
1028
1029 def read_config_file(self, config_file):
1030 # TODO make a [ini] + yaml inside parser
1031 # the configparser library is not suitable, because it does not admit comments at the end of line,
1032 # and not parse integer or boolean
1033 try:
1034 with open(config_file) as f:
1035 conf = yaml.load(f)
1036 for k, v in environ.items():
1037 if not k.startswith("OSMLCM_"):
1038 continue
1039 k_items = k.lower().split("_")
1040 c = conf
1041 try:
1042 for k_item in k_items[1:-1]:
1043 if k_item in ("ro", "vca"):
1044 # put in capital letter
1045 k_item = k_item.upper()
1046 c = c[k_item]
1047 if k_items[-1] == "port":
1048 c[k_items[-1]] = int(v)
1049 else:
1050 c[k_items[-1]] = v
1051 except Exception as e:
1052 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1053
1054 return conf
1055 except Exception as e:
1056 self.logger.critical("At config file '{}': {}".format(config_file, e))
1057 exit(1)
1058
1059
1060 if __name__ == '__main__':
1061
1062 config_file = "lcm.cfg"
1063 lcm = Lcm(config_file)
1064
1065 lcm.start()