2b62c1f4fd5bf4ef0016b756b900a40ff7837a0e
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 import sys
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from os import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc.vnf import N2VC
21 # import os.path
22 # import time
23
24 from copy import deepcopy
25 from http import HTTPStatus
26
27
28 class LcmException(Exception):
29 pass
30
31
32 class Lcm:
33
34 def __init__(self, config_file):
35 """
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
38 :return: None
39 """
40
41 self.db = None
42 self.msg = None
43 self.fs = None
44 # contains created tasks/futures to be able to cancel
45
46 self.lcm_ns_tasks = {}
47 self.lcm_vim_tasks = {}
48 self.lcm_sdn_tasks = {}
49 # logging
50 self.logger = logging.getLogger('lcm')
51 # load configuration
52 config = self.read_config_file(config_file)
53 self.config = config
54 self.ro_config={
55 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
56 "tenant": config.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
58 "loglevel": "ERROR",
59 }
60
61 self.vca = config["VCA"] # TODO VCA
62 self.loop = None
63
64 # logging
65 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
67 config["database"]["logger_name"] = "lcm.db"
68 config["storage"]["logger_name"] = "lcm.fs"
69 config["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config["global"]:
71 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
72 maxBytes=100e6, backupCount=9, delay=0)
73 file_handler.setFormatter(log_formatter_simple)
74 self.logger.addHandler(file_handler)
75 else:
76 str_handler = logging.StreamHandler()
77 str_handler.setFormatter(log_formatter_simple)
78 self.logger.addHandler(str_handler)
79
80 if config["global"].get("loglevel"):
81 self.logger.setLevel(config["global"]["loglevel"])
82
83 # logging other modules
84 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config[k1]["logger_name"] = logname
86 logger_module = logging.getLogger(logname)
87 if "logfile" in config[k1]:
88 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
89 maxBytes=100e6, backupCount=9, delay=0)
90 file_handler.setFormatter(log_formatter_simple)
91 logger_module.addHandler(file_handler)
92 if "loglevel" in config[k1]:
93 logger_module.setLevel(config[k1]["loglevel"])
94
95 self.n2vc = N2VC(
96 log=self.logger,
97 server=config['VCA']['host'],
98 port=config['VCA']['port'],
99 user=config['VCA']['user'],
100 secret=config['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
105 artifacts=None,
106 )
107
108 try:
109 if config["database"]["driver"] == "mongo":
110 self.db = dbmongo.DbMongo()
111 self.db.db_connect(config["database"])
112 elif config["database"]["driver"] == "memory":
113 self.db = dbmemory.DbMemory()
114 self.db.db_connect(config["database"])
115 else:
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config["database"]["driver"]))
118
119 if config["storage"]["driver"] == "local":
120 self.fs = fslocal.FsLocal()
121 self.fs.fs_connect(config["storage"])
122 else:
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config["storage"]["driver"]))
125
126 if config["message"]["driver"] == "local":
127 self.msg = msglocal.MsgLocal()
128 self.msg.connect(config["message"])
129 elif config["message"]["driver"] == "kafka":
130 self.msg = msgkafka.MsgKafka()
131 self.msg.connect(config["message"])
132 else:
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config["storage"]["driver"]))
135 except (DbException, FsException, MsgException) as e:
136 self.logger.critical(str(e), exc_info=True)
137 raise LcmException(str(e))
138
139 def update_db(self, item, _id, _desc):
140 try:
141 self.db.replace(item, _id, _desc)
142 except DbException as e:
143 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
144
145 async def create_vim(self, vim_content, order_id):
146 vim_id = vim_content["_id"]
147 logging_text = "Task create_vim={} ".format(vim_id)
148 self.logger.debug(logging_text + "Enter")
149 db_vim = None
150 exc = None
151 try:
152 step = "Getting vim from db"
153 db_vim = self.db.get_one("vims", {"_id": vim_id})
154 if "_admin" not in db_vim:
155 db_vim["_admin"] = {}
156 if "deploy" not in db_vim["_admin"]:
157 db_vim["_admin"]["deploy"] = {}
158 db_vim["_admin"]["deploy"]["RO"] = None
159
160 step = "Creating vim at RO"
161 RO = ROclient.ROClient(self.loop, **self.ro_config)
162 vim_RO = deepcopy(vim_content)
163 vim_RO.pop("_id", None)
164 vim_RO.pop("_admin", None)
165 vim_RO.pop("schema_version", None)
166 vim_RO.pop("schema_type", None)
167 vim_RO.pop("vim_tenant_name", None)
168 vim_RO["type"] = vim_RO.pop("vim_type")
169 vim_RO.pop("vim_user", None)
170 vim_RO.pop("vim_password", None)
171 desc = await RO.create("vim", descriptor=vim_RO)
172 RO_vim_id = desc["uuid"]
173 db_vim["_admin"]["deploy"]["RO"] = RO_vim_id
174 self.update_db("vims", vim_id, db_vim)
175
176 step = "Attach vim to RO tenant"
177 vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
178 "vim_username": vim_content["vim_user"],
179 "vim_password": vim_content["vim_password"],
180 "config": vim_content["config"]
181 }
182 desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
183 db_vim["_admin"]["operationalState"] = "ENABLED"
184 self.update_db("vims", vim_id, db_vim)
185
186 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
187 return RO_vim_id
188
189 except (ROclient.ROClientException, DbException) as e:
190 self.logger.error(logging_text + "Exit Exception {}".format(e))
191 exc = e
192 except Exception as e:
193 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
194 exc = e
195 finally:
196 if exc and db_vim:
197 db_vim["_admin"]["operationalState"] = "ERROR"
198 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
199 self.update_db("vims", vim_id, db_vim)
200
201 async def edit_vim(self, vim_content, order_id):
202 vim_id = vim_content["_id"]
203 logging_text = "Task edit_vim={} ".format(vim_id)
204 self.logger.debug(logging_text + "Enter")
205 db_vim = None
206 exc = None
207 step = "Getting vim from db"
208 try:
209 db_vim = self.db.get_one("vims", {"_id": vim_id})
210 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
211 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
212 step = "Editing vim at RO"
213 RO = ROclient.ROClient(self.loop, **self.ro_config)
214 vim_RO = deepcopy(vim_content)
215 vim_RO.pop("_id", None)
216 vim_RO.pop("_admin", None)
217 vim_RO.pop("schema_version", None)
218 vim_RO.pop("schema_type", None)
219 vim_RO.pop("vim_tenant_name", None)
220 vim_RO["type"] = vim_RO.pop("vim_type")
221 vim_RO.pop("vim_user", None)
222 vim_RO.pop("vim_password", None)
223 if vim_RO:
224 desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
225
226 step = "Editing vim-account at RO tenant"
227 vim_RO = {}
228 for k in ("vim_tenant_name", "vim_password", "config"):
229 if k in vim_content:
230 vim_RO[k] = vim_content[k]
231 if "vim_user" in vim_content:
232 vim_content["vim_username"] = vim_content["vim_user"]
233 if vim_RO:
234 desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
235 db_vim["_admin"]["operationalState"] = "ENABLED"
236 self.update_db("vims", vim_id, db_vim)
237
238 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
239 return RO_vim_id
240
241 except (ROclient.ROClientException, DbException) as e:
242 self.logger.error(logging_text + "Exit Exception {}".format(e))
243 exc = e
244 except Exception as e:
245 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
246 exc = e
247 finally:
248 if exc and db_vim:
249 db_vim["_admin"]["operationalState"] = "ERROR"
250 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
251 self.update_db("vims", vim_id, db_vim)
252
253 async def delete_vim(self, vim_id, order_id):
254 logging_text = "Task delete_vim={} ".format(vim_id)
255 self.logger.debug(logging_text + "Enter")
256 db_vim = None
257 exc = None
258 step = "Getting vim from db"
259 try:
260 db_vim = self.db.get_one("vims", {"_id": vim_id})
261 if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
262 RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
263 RO = ROclient.ROClient(self.loop, **self.ro_config)
264 step = "Detaching vim from RO tenant"
265 try:
266 await RO.detach_datacenter(RO_vim_id)
267 except ROclient.ROClientException as e:
268 if e.http_code == 404: # not found
269 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
270 else:
271 raise
272
273 step = "Deleting vim from RO"
274 try:
275 await RO.delete("vim", RO_vim_id)
276 except ROclient.ROClientException as e:
277 if e.http_code == 404: # not found
278 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
279 else:
280 raise
281 else:
282 # nothing to delete
283 self.logger.error(logging_text + "Skipping. There is not RO information at database")
284 self.db.del_one("vims", {"_id": vim_id})
285 self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id))
286 return None
287
288 except (ROclient.ROClientException, DbException) as e:
289 self.logger.error(logging_text + "Exit Exception {}".format(e))
290 exc = e
291 except Exception as e:
292 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
293 exc = e
294 finally:
295 if exc and db_vim:
296 db_vim["_admin"]["operationalState"] = "ERROR"
297 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
298 self.update_db("vims", vim_id, db_vim)
299
300 async def create_sdn(self, sdn_content, order_id):
301 sdn_id = sdn_content["_id"]
302 logging_text = "Task create_sdn={} ".format(sdn_id)
303 self.logger.debug(logging_text + "Enter")
304 db_sdn = None
305 exc = None
306 try:
307 step = "Getting sdn from db"
308 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
309 if "_admin" not in db_sdn:
310 db_sdn["_admin"] = {}
311 if "deploy" not in db_sdn["_admin"]:
312 db_sdn["_admin"]["deploy"] = {}
313 db_sdn["_admin"]["deploy"]["RO"] = None
314
315 step = "Creating sdn at RO"
316 RO = ROclient.ROClient(self.loop, **self.ro_config)
317 sdn_RO = deepcopy(sdn_content)
318 sdn_RO.pop("_id", None)
319 sdn_RO.pop("_admin", None)
320 sdn_RO.pop("schema_version", None)
321 sdn_RO.pop("schema_type", None)
322 desc = await RO.create("sdn", descriptor=sdn_RO)
323 RO_sdn_id = desc["uuid"]
324 db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn["_admin"]["operationalState"] = "ENABLED"
326 self.update_db("sdns", sdn_id, db_sdn)
327 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
328 return RO_sdn_id
329
330 except (ROclient.ROClientException, DbException) as e:
331 self.logger.error(logging_text + "Exit Exception {}".format(e))
332 exc = e
333 except Exception as e:
334 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
335 exc = e
336 finally:
337 if exc and db_sdn:
338 db_sdn["_admin"]["operationalState"] = "ERROR"
339 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
340 self.update_db("sdns", sdn_id, db_sdn)
341
342 async def edit_sdn(self, sdn_content, order_id):
343 sdn_id = sdn_content["_id"]
344 logging_text = "Task edit_sdn={} ".format(sdn_id)
345 self.logger.debug(logging_text + "Enter")
346 db_sdn = None
347 exc = None
348 step = "Getting sdn from db"
349 try:
350 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
351 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
352 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
353 RO = ROclient.ROClient(self.loop, **self.ro_config)
354 step = "Editing sdn at RO"
355 sdn_RO = deepcopy(sdn_content)
356 sdn_RO.pop("_id", None)
357 sdn_RO.pop("_admin", None)
358 sdn_RO.pop("schema_version", None)
359 sdn_RO.pop("schema_type", None)
360 if sdn_RO:
361 desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
362 db_sdn["_admin"]["operationalState"] = "ENABLED"
363 self.update_db("sdns", sdn_id, db_sdn)
364
365 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
366 return RO_sdn_id
367
368 except (ROclient.ROClientException, DbException) as e:
369 self.logger.error(logging_text + "Exit Exception {}".format(e))
370 exc = e
371 except Exception as e:
372 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
373 exc = e
374 finally:
375 if exc and db_sdn:
376 db_sdn["_admin"]["operationalState"] = "ERROR"
377 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
378 self.update_db("sdns", sdn_id, db_sdn)
379
380 async def delete_sdn(self, sdn_id, order_id):
381 logging_text = "Task delete_sdn={} ".format(sdn_id)
382 self.logger.debug(logging_text + "Enter")
383 db_sdn = None
384 exc = None
385 step = "Getting sdn from db"
386 try:
387 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
388 if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
389 RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
390 RO = ROclient.ROClient(self.loop, **self.ro_config)
391 step = "Deleting sdn from RO"
392 try:
393 await RO.delete("sdn", RO_sdn_id)
394 except ROclient.ROClientException as e:
395 if e.http_code == 404: # not found
396 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
397 else:
398 raise
399 else:
400 # nothing to delete
401 self.logger.error(logging_text + "Skipping. There is not RO information at database")
402 self.db.del_one("sdns", {"_id": sdn_id})
403 self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id))
404 return None
405
406 except (ROclient.ROClientException, DbException) as e:
407 self.logger.error(logging_text + "Exit Exception {}".format(e))
408 exc = e
409 except Exception as e:
410 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
411 exc = e
412 finally:
413 if exc and db_sdn:
414 db_sdn["_admin"]["operationalState"] = "ERROR"
415 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
416 self.update_db("sdns", sdn_id, db_sdn)
417
418 def vnfd2RO(self, vnfd, new_id=None):
419 """
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
424 """
425 ci_file = None
426 try:
427 vnfd_RO = deepcopy(vnfd)
428 vnfd_RO.pop("_id", None)
429 vnfd_RO.pop("_admin", None)
430 if new_id:
431 vnfd_RO["id"] = new_id
432 for vdu in vnfd_RO["vdu"]:
433 if "cloud-init-file" in vdu:
434 base_folder = vnfd["_admin"]["storage"]
435 clout_init_file = "{}/{}/cloud_init/{}".format(
436 base_folder["folder"],
437 base_folder["pkg-dir"],
438 vdu["cloud-init-file"]
439 )
440 ci_file = self.fs.file_open(clout_init_file, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content = ci_file.read()
443 ci_file.close()
444 ci_file = None
445 vdu.pop("cloud-init-file", None)
446 vdu["cloud-init"] = clout_init_content
447 return vnfd_RO
448 except FsException as e:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
450 finally:
451 if ci_file:
452 ci_file.close()
453
454 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
455 """Update the lcm database with the status of the charm.
456
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
464 - removing,
465 - removed
466
467 Updates the network service's config-status to reflect the state of all
468 charms.
469 """
470 nsr_id = None
471 try:
472 nsr_id = db_nsr["_id"]
473 nsr_lcm = db_nsr["_admin"]["deploy"]
474 if task:
475 if task.cancelled():
476 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index))
477 return
478
479 if task.done():
480 exc = task.exception()
481 if exc:
482 self.logger.error(
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc))
484 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
485 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
486 else:
487 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status:
492 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status))
493 if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
494 return # same status, ignore
495 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
498 else:
499 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True)
500 return
501
502 some_failed = False
503 all_active = True
504 status_map = {}
505 for vnf_index, vca_info in nsr_lcm["VCA"].items():
506 vca_status = vca_info["operational-status"]
507 if vca_status not in status_map:
508 # Initialize it
509 status_map[vca_status] = 0
510 status_map[vca_status] += 1
511
512 if vca_status != "active":
513 all_active = False
514 if vca_status == "error":
515 some_failed = True
516 db_nsr["config-status"] = "failed"
517 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index,
518 vca_info["detailed-status"])
519 break
520
521 if all_active:
522 self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index))
523 db_nsr["config-status"] = "configured"
524 db_nsr["detailed-status"] = "done"
525 elif some_failed:
526 pass
527 else:
528 cs = "configuring: "
529 separator = ""
530 for status, num in status_map.items():
531 cs += separator + "{}: {}".format(status, num)
532 separator = ", "
533 db_nsr["config-status"] = cs
534 self.update_db("nsrs", nsr_id, db_nsr)
535
536 except Exception as e:
537 self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True)
538
539 async def create_ns(self, nsr_id, order_id):
540 logging_text = "Task create_ns={} ".format(nsr_id)
541 self.logger.debug(logging_text + "Enter")
542 # get all needed from database
543 db_nsr = None
544 exc = None
545 step = "Getting nsr from db"
546 try:
547 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
548 nsd = db_nsr["nsd"]
549 nsr_name = db_nsr["name"] # TODO short-name??
550 needed_vnfd = {}
551 for c_vnf in nsd["constituent-vnfd"]:
552 vnfd_id = c_vnf["vnfd-id-ref"]
553 if vnfd_id not in needed_vnfd:
554 step = "Getting vnfd={} from db".format(vnfd_id)
555 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
556
557 nsr_lcm = {
558 "id": nsr_id,
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
560 "nsr_ip": {},
561 "VCA": {},
562 }
563 db_nsr["_admin"]["deploy"] = nsr_lcm
564 db_nsr["detailed-status"] = "creating"
565 db_nsr["operational-status"] = "init"
566
567 deloyment_timeout = 120
568
569 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
570
571 # get vnfds, instantiate at RO
572 for vnfd_id, vnfd in needed_vnfd.items():
573 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
574 self.logger.debug(logging_text + step)
575 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
576
577 # look if present
578 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
579 if vnfd_list:
580 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
581 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id, vnfd_list[0]["uuid"]))
583 else:
584 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
585 desc = await RO.create("vnfd", descriptor=vnfd_RO)
586 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
587 self.update_db("nsrs", nsr_id, db_nsr)
588
589 # create nsd at RO
590 nsd_id = nsd["id"]
591 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
592 self.logger.debug(logging_text + step)
593
594 nsd_id_RO = nsd_id + "." + nsd_id[:200]
595 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
596 if nsd_list:
597 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
598 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id, nsd_list[0]["uuid"]))
600 else:
601 nsd_RO = deepcopy(nsd)
602 nsd_RO["id"] = nsd_id_RO
603 nsd_RO.pop("_id", None)
604 nsd_RO.pop("_admin", None)
605 for c_vnf in nsd_RO["constituent-vnfd"]:
606 vnfd_id = c_vnf["vnfd-id-ref"]
607 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
608 desc = await RO.create("nsd", descriptor=nsd_RO)
609 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
610 self.update_db("nsrs", nsr_id, db_nsr)
611
612 # Crate ns at RO
613 # if present use it unless in error status
614 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
615 if RO_nsr_id:
616 try:
617 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
618 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
619 desc = await RO.show("ns", RO_nsr_id)
620 except ROclient.ROClientException as e:
621 if e.http_code != HTTPStatus.NOT_FOUND:
622 raise
623 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
624 if RO_nsr_id:
625 ns_status, ns_status_info = RO.check_ns_status(desc)
626 nsr_lcm["RO"]["nsr_status"] = ns_status
627 if ns_status == "ERROR":
628 step = db_nsr["detailed-status"] = "Deleting ns at RO"
629 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
630 await RO.delete("ns", RO_nsr_id)
631 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
632
633 if not RO_nsr_id:
634 step = db_nsr["detailed-status"] = "Creating ns at RO"
635 self.logger.debug(logging_text + step)
636
637 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
638 scenario=nsr_lcm["RO"]["nsd_id"])
639 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
640 nsr_lcm["RO"]["nsr_status"] = "BUILD"
641 self.update_db("nsrs", nsr_id, db_nsr)
642
643 # wait until NS is ready
644 step = ns_status_detailed = "Waiting ns ready at RO"
645 db_nsr["detailed-status"] = ns_status_detailed
646 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
647 deloyment_timeout = 600
648 while deloyment_timeout > 0:
649 desc = await RO.show("ns", RO_nsr_id)
650 ns_status, ns_status_info = RO.check_ns_status(desc)
651 nsr_lcm["RO"]["nsr_status"] = ns_status
652 if ns_status == "ERROR":
653 raise ROclient.ROClientException(ns_status_info)
654 elif ns_status == "BUILD":
655 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
656 self.update_db("nsrs", nsr_id, db_nsr)
657 elif ns_status == "ACTIVE":
658 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
659 break
660 else:
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
662
663 await asyncio.sleep(5, loop=self.loop)
664 deloyment_timeout -= 5
665 if deloyment_timeout <= 0:
666 raise ROclient.ROClientException("Timeout waiting ns to be ready")
667 db_nsr["detailed-status"] = "Configuring vnfr"
668 self.update_db("nsrs", nsr_id, db_nsr)
669
670 # The parameters we'll need to deploy a charm
671 number_to_configure = 0
672
673 def deploy():
674 """An inner function to deploy the charm from either vnf or vdu
675 """
676
677 # Login to the VCA.
678 # if number_to_configure == 0:
679 # self.logger.debug("Logging into N2VC...")
680 # task = asyncio.ensure_future(self.n2vc.login())
681 # yield from asyncio.wait_for(task, 30.0)
682 # self.logger.debug("Logged into N2VC!")
683
684 ## await self.n2vc.login()
685
686 # Note: The charm needs to exist on disk at the location
687 # specified by charm_path.
688 base_folder = vnfd["_admin"]["storage"]
689 storage_params = self.fs.get_params()
690 charm_path = "{}{}/{}/charms/{}".format(
691 storage_params["path"],
692 base_folder["folder"],
693 base_folder["pkg-dir"],
694 proxy_charm
695 )
696
697 # Setup the runtime parameters for this VNF
698 params['rw_mgmt_ip'] = nsr_lcm['nsr_ip'][vnf_index]
699
700 # ns_name will be ignored in the current version of N2VC
701 # but will be implemented for the next point release.
702 model_name = 'default'
703 application_name = self.n2vc.FormatApplicationName(
704 nsr_name,
705 vnf_index,
706 vnfd['name'],
707 )
708
709 nsr_lcm["VCA"][vnf_index] = {
710 "model": model_name,
711 "application": application_name,
712 "operational-status": "init",
713 "detailed-status": "",
714 "vnfd_id": vnfd_id,
715 }
716
717 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
718 task = asyncio.ensure_future(
719 self.n2vc.DeployCharms(
720 model_name, # The network service name
721 application_name, # The application name
722 vnfd, # The vnf descriptor
723 charm_path, # Path to charm
724 params, # Runtime params, like mgmt ip
725 {}, # for native charms only
726 self.n2vc_callback, # Callback for status changes
727 db_nsr, # Callback parameter
728 vnf_index, # Callback parameter
729 None, # Callback parameter (task)
730 )
731 )
732 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, db_nsr, vnf_index))
733 self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
734
735 # TODO: Make this call inside deploy()
736 # Login to the VCA. If there are multiple calls to login(),
737 # subsequent calls will be a nop and return immediately.
738 await self.n2vc.login()
739
740 step = "Looking for needed vnfd to configure"
741 self.logger.debug(logging_text + step)
742 for c_vnf in nsd["constituent-vnfd"]:
743 vnfd_id = c_vnf["vnfd-id-ref"]
744 vnf_index = str(c_vnf["member-vnf-index"])
745 vnfd = needed_vnfd[vnfd_id]
746
747 # Check if this VNF has a charm configuration
748 vnf_config = vnfd.get("vnf-configuration")
749
750 if vnf_config and vnf_config.get("juju"):
751 proxy_charm = vnf_config["juju"]["charm"]
752 params = {}
753
754 if proxy_charm:
755 if 'initial-config-primitive' in vnf_config:
756 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
757
758 deploy()
759 number_to_configure += 1
760
761 # Deploy charms for each VDU that supports one.
762 for vdu in vnfd['vdu']:
763 vdu_config = vdu.get('vdu-configuration')
764 proxy_charm = None
765 params = {}
766
767 if vdu_config and vdu_config.get("juju"):
768 proxy_charm = vdu_config["juju"]["charm"]
769
770 if 'initial-config-primitive' in vdu_config:
771 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
772
773 if proxy_charm:
774 deploy()
775 number_to_configure += 1
776
777 db_nsr["config-status"] = "configuring" if number_to_configure else "configured"
778 db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure) if number_to_configure else "done"
779 db_nsr["operational-status"] = "running"
780 self.update_db("nsrs", nsr_id, db_nsr)
781
782 self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id))
783 return nsr_lcm
784
785 except (ROclient.ROClientException, DbException, LcmException) as e:
786 self.logger.error(logging_text + "Exit Exception {}".format(e))
787 exc = e
788 except Exception as e:
789 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
790 exc = e
791 finally:
792 if exc and db_nsr:
793 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
794 db_nsr["operational-status"] = "failed"
795 self.update_db("nsrs", nsr_id, db_nsr)
796
797 async def delete_ns(self, nsr_id, order_id):
798 logging_text = "Task delete_ns={} ".format(nsr_id)
799 self.logger.debug(logging_text + "Enter")
800 db_nsr = None
801 exc = None
802 step = "Getting nsr from db"
803 try:
804 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
805 nsd = db_nsr["nsd"]
806 nsr_lcm = db_nsr["_admin"]["deploy"]
807
808 db_nsr["operational-status"] = "terminating"
809 db_nsr["config-status"] = "terminating"
810 db_nsr["detailed-status"] = "Deleting charms"
811 self.update_db("nsrs", nsr_id, db_nsr)
812
813 try:
814 self.logger.debug(logging_text + step)
815 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
816 if deploy_info and deploy_info.get("application"):
817 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
818
819 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
820 task = asyncio.ensure_future(
821 self.n2vc.RemoveCharms(
822 deploy_info['model'],
823 deploy_info['application'],
824 self.n2vc_callback,
825 db_nsr,
826 vnf_index,
827 )
828 )
829 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
830 # deploy_info['application'],None, db_nsr, vnf_index))
831 self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
832 except Exception as e:
833 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
834 # remove from RO
835
836 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
837 # Delete ns
838 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
839 if RO_nsr_id:
840 try:
841 step = db_nsr["detailed-status"] = "Deleting ns at RO"
842 self.logger.debug(logging_text + step)
843 desc = await RO.delete("ns", RO_nsr_id)
844 nsr_lcm["RO"]["nsr_id"] = None
845 nsr_lcm["RO"]["nsr_status"] = "DELETED"
846 except ROclient.ROClientException as e:
847 if e.http_code == 404: # not found
848 nsr_lcm["RO"]["nsr_id"] = None
849 nsr_lcm["RO"]["nsr_status"] = "DELETED"
850 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
851 elif e.http_code == 409: #conflict
852 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
853 else:
854 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
855 self.update_db("nsrs", nsr_id, db_nsr)
856
857 # Delete nsd
858 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
859 if RO_nsd_id:
860 try:
861 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
862 desc = await RO.delete("nsd", RO_nsd_id)
863 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
864 nsr_lcm["RO"]["nsd_id"] = None
865 except ROclient.ROClientException as e:
866 if e.http_code == 404: # not found
867 nsr_lcm["RO"]["nsd_id"] = None
868 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
869 elif e.http_code == 409: #conflict
870 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
871 else:
872 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
873 self.update_db("nsrs", nsr_id, db_nsr)
874
875 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
876 if not RO_vnfd_id:
877 continue
878 try:
879 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
880 desc = await RO.delete("vnfd", RO_vnfd_id)
881 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
882 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
883 except ROclient.ROClientException as e:
884 if e.http_code == 404: # not found
885 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
886 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
887 elif e.http_code == 409: #conflict
888 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
889 else:
890 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
891 self.update_db("nsrs", nsr_id, db_nsr)
892
893 # TODO delete from database or mark as deleted???
894 db_nsr["operational-status"] = "terminated"
895 self.db.del_one("nsrs", {"_id": nsr_id})
896 self.logger.debug(logging_text + "Exit")
897
898 except (ROclient.ROClientException, DbException) as e:
899 self.logger.error(logging_text + "Exit Exception {}".format(e))
900 exc = e
901 except Exception as e:
902 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
903 exc = e
904 finally:
905 if exc and db_nsr:
906 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
907 db_nsr["operational-status"] = "failed"
908 self.update_db("nsrs", nsr_id, db_nsr)
909
910 async def test(self, param=None):
911 self.logger.debug("Starting/Ending test task: {}".format(param))
912
913 def cancel_tasks(self, topic, _id):
914 """
915 Cancel all active tasks of a concrete nsr or vim identified for _id
916 :param topic: can be ns or vim_account
917 :param _id: nsr or vim identity
918 :return: None, or raises an exception if not possible
919 """
920 if topic == "ns":
921 lcm_tasks = self.lcm_ns_tasks
922 elif topic== "vim_account":
923 lcm_tasks = self.lcm_vim_tasks
924 elif topic== "sdn":
925 lcm_tasks = self.lcm_sdn_tasks
926
927 if not lcm_tasks.get(_id):
928 return
929 for order_id, tasks_set in lcm_tasks[_id].items():
930 for task_name, task in tasks_set.items():
931 result = task.cancel()
932 if result:
933 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
934 lcm_tasks[_id] = {}
935
936 async def read_kafka(self):
937 self.logger.debug("Task Kafka Enter")
938 order_id = 1
939 # future = asyncio.Future()
940 consecutive_errors = 0
941 while consecutive_errors < 10:
942 try:
943 topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop)
944 self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params))
945 consecutive_errors = 0
946 order_id += 1
947 if command == "exit":
948 print("Bye!")
949 break
950 elif command.startswith("#"):
951 continue
952 elif command == "echo":
953 # just for test
954 print(params)
955 sys.stdout.flush()
956 continue
957 elif command == "test":
958 asyncio.Task(self.test(params), loop=self.loop)
959 continue
960
961 if topic == "ns":
962 nsr_id = params.strip()
963 if command == "create":
964 # self.logger.debug("Deploying NS {}".format(nsr_id))
965 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
966 if nsr_id not in self.lcm_ns_tasks:
967 self.lcm_ns_tasks[nsr_id] = {}
968 self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task}
969 continue
970 elif command == "delete":
971 # self.logger.debug("Deleting NS {}".format(nsr_id))
972 self.cancel_tasks(topic, nsr_id)
973 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
974 if nsr_id not in self.lcm_ns_tasks:
975 self.lcm_ns_tasks[nsr_id] = {}
976 self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task}
977 continue
978 elif command == "show":
979 try:
980 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
981 print(
982 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
983 "{}\n deploy: {}\n tasks: {}".format(
984 nsr_id, db_nsr["operational-status"],
985 db_nsr["config-status"], db_nsr["detailed-status"],
986 db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id)))
987 except Exception as e:
988 print("nsr {} not found: {}".format(nsr_id, e))
989 sys.stdout.flush()
990 continue
991 elif topic == "vim_account":
992 vim_id = params["_id"]
993 if command == "create":
994 task = asyncio.ensure_future(self.create_vim(params, order_id))
995 if vim_id not in self.lcm_vim_tasks:
996 self.lcm_vim_tasks[vim_id] = {}
997 self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task}
998 continue
999 elif command == "delete":
1000 self.cancel_tasks(topic, vim_id)
1001 task = asyncio.ensure_future(self.delete_vim(vim_id, order_id))
1002 if vim_id not in self.lcm_vim_tasks:
1003 self.lcm_vim_tasks[vim_id] = {}
1004 self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task}
1005 continue
1006 elif command == "show":
1007 print("not implemented show with vim_account")
1008 sys.stdout.flush()
1009 continue
1010 elif command == "edit":
1011 task = asyncio.ensure_future(self.edit_vim(vim_id, order_id))
1012 if vim_id not in self.lcm_vim_tasks:
1013 self.lcm_vim_tasks[vim_id] = {}
1014 self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task}
1015 continue
1016 elif topic == "sdn":
1017 _sdn_id = params["_id"]
1018 if command == "create":
1019 task = asyncio.ensure_future(self.create_sdn(params, order_id))
1020 if _sdn_id not in self.lcm_sdn_tasks:
1021 self.lcm_sdn_tasks[_sdn_id] = {}
1022 self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task}
1023 continue
1024 elif command == "delete":
1025 self.cancel_tasks(topic, _sdn_id)
1026 task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id))
1027 if _sdn_id not in self.lcm_sdn_tasks:
1028 self.lcm_sdn_tasks[_sdn_id] = {}
1029 self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task}
1030 continue
1031 elif command == "edit":
1032 task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id))
1033 if _sdn_id not in self.lcm_sdn_tasks:
1034 self.lcm_sdn_tasks[_sdn_id] = {}
1035 self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task}
1036 continue
1037 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1038 except Exception as e:
1039 if consecutive_errors == 5:
1040 self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e))
1041 break
1042 else:
1043 consecutive_errors += 1
1044 self.logger.error("Task Kafka Exception {}".format(e))
1045 await asyncio.sleep(1, loop=self.loop)
1046 self.logger.debug("Task Kafka terminating")
1047 # TODO
1048 # self.cancel_tasks("ALL", "create")
1049 # timeout = 200
1050 # while self.is_pending_tasks():
1051 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1052 # await asyncio.sleep(2, loop=self.loop)
1053 # timeout -= 2
1054 # if not timeout:
1055 # self.cancel_tasks("ALL", "ALL")
1056 self.logger.debug("Task Kafka exit")
1057
1058 def start(self):
1059 self.loop = asyncio.get_event_loop()
1060 self.loop.run_until_complete(self.read_kafka())
1061 self.loop.close()
1062 self.loop = None
1063 if self.db:
1064 self.db.db_disconnect()
1065 if self.msg:
1066 self.msg.disconnect()
1067 if self.fs:
1068 self.fs.fs_disconnect()
1069
1070
1071 def read_config_file(self, config_file):
1072 # TODO make a [ini] + yaml inside parser
1073 # the configparser library is not suitable, because it does not admit comments at the end of line,
1074 # and not parse integer or boolean
1075 try:
1076 with open(config_file) as f:
1077 conf = yaml.load(f)
1078 for k, v in environ.items():
1079 if not k.startswith("OSMLCM_"):
1080 continue
1081 k_items = k.lower().split("_")
1082 c = conf
1083 try:
1084 for k_item in k_items[1:-1]:
1085 if k_item in ("ro", "vca"):
1086 # put in capital letter
1087 k_item = k_item.upper()
1088 c = c[k_item]
1089 if k_items[-1] == "port":
1090 c[k_items[-1]] = int(v)
1091 else:
1092 c[k_items[-1]] = v
1093 except Exception as e:
1094 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
1095
1096 return conf
1097 except Exception as e:
1098 self.logger.critical("At config file '{}': {}".format(config_file, e))
1099 exit(1)
1100
1101
1102 if __name__ == '__main__':
1103
1104 config_file = "lcm.cfg"
1105 lcm = Lcm(config_file)
1106
1107 lcm.start()