8c10554f7ac5a6f80c89e73362e408b2fb1f2c55
[osm/LCM.git] / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import logging
8 import logging.handlers
9 import getopt
10 import functools
11 import sys
12 import traceback
13 from osm_common import dbmemory
14 from osm_common import dbmongo
15 from osm_common import fslocal
16 from osm_common import msglocal
17 from osm_common import msgkafka
18 from osm_common.dbbase import DbException
19 from osm_common.fsbase import FsException
20 from osm_common.msgbase import MsgException
21 from os import environ, path
22 from n2vc.vnf import N2VC
23 from n2vc import version as N2VC_version
24
25 from copy import deepcopy
26 from http import HTTPStatus
27 from time import time
28
29
30 __author__ = "Alfonso Tierno"
31 min_RO_version = [0, 5, 69]
32
33
34 class LcmException(Exception):
35 pass
36
37
38 class Lcm:
39
40 def __init__(self, config_file):
41 """
42 Init, Connect to database, filesystem storage, and messaging
43 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
44 :return: None
45 """
46
47 self.db = None
48 self.msg = None
49 self.fs = None
50 self.pings_not_received = 1
51
52 # contains created tasks/futures to be able to cancel
53 self.lcm_ns_tasks = {}
54 self.lcm_vim_tasks = {}
55 self.lcm_sdn_tasks = {}
56 # logging
57 self.logger = logging.getLogger('lcm')
58 # load configuration
59 config = self.read_config_file(config_file)
60 self.config = config
61 self.ro_config = {
62 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
63 "tenant": config.get("tenant", "osm"),
64 "logger_name": "lcm.ROclient",
65 "loglevel": "ERROR",
66 }
67
68 self.vca = config["VCA"] # TODO VCA
69 self.loop = None
70
71 # logging
72 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
73 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
74 config["database"]["logger_name"] = "lcm.db"
75 config["storage"]["logger_name"] = "lcm.fs"
76 config["message"]["logger_name"] = "lcm.msg"
77 if "logfile" in config["global"]:
78 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
79 maxBytes=100e6, backupCount=9, delay=0)
80 file_handler.setFormatter(log_formatter_simple)
81 self.logger.addHandler(file_handler)
82 else:
83 str_handler = logging.StreamHandler()
84 str_handler.setFormatter(log_formatter_simple)
85 self.logger.addHandler(str_handler)
86
87 if config["global"].get("loglevel"):
88 self.logger.setLevel(config["global"]["loglevel"])
89
90 # logging other modules
91 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
92 config[k1]["logger_name"] = logname
93 logger_module = logging.getLogger(logname)
94 if "logfile" in config[k1]:
95 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
96 maxBytes=100e6, backupCount=9, delay=0)
97 file_handler.setFormatter(log_formatter_simple)
98 logger_module.addHandler(file_handler)
99 if "loglevel" in config[k1]:
100 logger_module.setLevel(config[k1]["loglevel"])
101
102 self.n2vc = N2VC(
103 log=self.logger,
104 server=config['VCA']['host'],
105 port=config['VCA']['port'],
106 user=config['VCA']['user'],
107 secret=config['VCA']['secret'],
108 # TODO: This should point to the base folder where charms are stored,
109 # if there is a common one (like object storage). Otherwise, leave
110 # it unset and pass it via DeployCharms
111 # artifacts=config['VCA'][''],
112 artifacts=None,
113 )
114 # check version of N2VC
115 # TODO enhance with int conversion or from distutils.version import LooseVersion
116 # or with list(map(int, version.split(".")))
117 if N2VC_version < "0.0.2":
118 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
119
120 try:
121 # TODO check database version
122 if config["database"]["driver"] == "mongo":
123 self.db = dbmongo.DbMongo()
124 self.db.db_connect(config["database"])
125 elif config["database"]["driver"] == "memory":
126 self.db = dbmemory.DbMemory()
127 self.db.db_connect(config["database"])
128 else:
129 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
130 config["database"]["driver"]))
131
132 if config["storage"]["driver"] == "local":
133 self.fs = fslocal.FsLocal()
134 self.fs.fs_connect(config["storage"])
135 else:
136 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
137 config["storage"]["driver"]))
138
139 if config["message"]["driver"] == "local":
140 self.msg = msglocal.MsgLocal()
141 self.msg.connect(config["message"])
142 elif config["message"]["driver"] == "kafka":
143 self.msg = msgkafka.MsgKafka()
144 self.msg.connect(config["message"])
145 else:
146 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
147 config["storage"]["driver"]))
148 except (DbException, FsException, MsgException) as e:
149 self.logger.critical(str(e), exc_info=True)
150 raise LcmException(str(e))
151
152 async def check_RO_version(self):
153 try:
154 RO = ROclient.ROClient(self.loop, **self.ro_config)
155 RO_version = await RO.get_version()
156 if RO_version < min_RO_version:
157 raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format(
158 *RO_version, *min_RO_version
159 ))
160 except ROclient.ROClientException as e:
161 self.logger.critical("Error while conneting to osm/RO " + str(e), exc_info=True)
162 raise LcmException(str(e))
163
164 def update_db(self, item, _id, _desc):
165 try:
166 self.db.replace(item, _id, _desc)
167 except DbException as e:
168 self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
169
170 def update_db_2(self, item, _id, _desc):
171 """
172 Updates database with _desc information. Upon success _desc is cleared
173 :param item:
174 :param _id:
175 :param _desc:
176 :return:
177 """
178 if not _desc:
179 return
180 try:
181 self.db.set_one(item, {"_id": _id}, _desc)
182 _desc.clear()
183 except DbException as e:
184 self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
185
186 async def vim_create(self, vim_content, order_id):
187 vim_id = vim_content["_id"]
188 logging_text = "Task vim_create={} ".format(vim_id)
189 self.logger.debug(logging_text + "Enter")
190 db_vim = None
191 exc = None
192 RO_sdn_id = None
193 try:
194 step = "Getting vim-id='{}' from db".format(vim_id)
195 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
196 if "_admin" not in db_vim:
197 db_vim["_admin"] = {}
198 if "deployed" not in db_vim["_admin"]:
199 db_vim["_admin"]["deployed"] = {}
200 db_vim["_admin"]["deployed"]["RO"] = None
201 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
202 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
203 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
204 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
205 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
206 else:
207 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
208 vim_content["config"]["sdn-controller"]))
209
210 step = "Creating vim at RO"
211 RO = ROclient.ROClient(self.loop, **self.ro_config)
212 vim_RO = deepcopy(vim_content)
213 vim_RO.pop("_id", None)
214 vim_RO.pop("_admin", None)
215 vim_RO.pop("schema_version", None)
216 vim_RO.pop("schema_type", None)
217 vim_RO.pop("vim_tenant_name", None)
218 vim_RO["type"] = vim_RO.pop("vim_type")
219 vim_RO.pop("vim_user", None)
220 vim_RO.pop("vim_password", None)
221 if RO_sdn_id:
222 vim_RO["config"]["sdn-controller"] = RO_sdn_id
223 desc = await RO.create("vim", descriptor=vim_RO)
224 RO_vim_id = desc["uuid"]
225 db_vim["_admin"]["deployed"]["RO"] = RO_vim_id
226 self.update_db("vim_accounts", vim_id, db_vim)
227
228 step = "Creating vim_account at RO"
229 vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
230 "vim_username": vim_content["vim_user"],
231 "vim_password": vim_content["vim_password"]
232 }
233 if vim_RO.get("config"):
234 vim_account_RO["config"] = vim_RO["config"]
235 if "sdn-controller" in vim_account_RO["config"]:
236 del vim_account_RO["config"]["sdn-controller"]
237 if "sdn-port-mapping" in vim_account_RO["config"]:
238 del vim_account_RO["config"]["sdn-port-mapping"]
239 await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO)
240 db_vim["_admin"]["operationalState"] = "ENABLED"
241 self.update_db("vim_accounts", vim_id, db_vim)
242
243 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
244 return RO_vim_id
245
246 except (ROclient.ROClientException, DbException) as e:
247 self.logger.error(logging_text + "Exit Exception {}".format(e))
248 exc = e
249 except Exception as e:
250 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
251 exc = e
252 finally:
253 if exc and db_vim:
254 db_vim["_admin"]["operationalState"] = "ERROR"
255 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
256 self.update_db("vim_accounts", vim_id, db_vim)
257
258 async def vim_edit(self, vim_content, order_id):
259 vim_id = vim_content["_id"]
260 logging_text = "Task vim_edit={} ".format(vim_id)
261 self.logger.debug(logging_text + "Enter")
262 db_vim = None
263 exc = None
264 RO_sdn_id = None
265 step = "Getting vim-id='{}' from db".format(vim_id)
266 try:
267 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
268 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
269 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
270 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
271 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
272 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get(
273 "RO"):
274 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
275 else:
276 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
277 vim_content["config"]["sdn-controller"]))
278
279 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
280 step = "Editing vim at RO"
281 RO = ROclient.ROClient(self.loop, **self.ro_config)
282 vim_RO = deepcopy(vim_content)
283 vim_RO.pop("_id", None)
284 vim_RO.pop("_admin", None)
285 vim_RO.pop("schema_version", None)
286 vim_RO.pop("schema_type", None)
287 vim_RO.pop("vim_tenant_name", None)
288 if "vim_type" in vim_RO:
289 vim_RO["type"] = vim_RO.pop("vim_type")
290 vim_RO.pop("vim_user", None)
291 vim_RO.pop("vim_password", None)
292 if RO_sdn_id:
293 vim_RO["config"]["sdn-controller"] = RO_sdn_id
294 # TODO make a deep update of sdn-port-mapping
295 if vim_RO:
296 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
297
298 step = "Editing vim-account at RO tenant"
299 vim_account_RO = {}
300 if "config" in vim_content:
301 if "sdn-controller" in vim_content["config"]:
302 del vim_content["config"]["sdn-controller"]
303 if "sdn-port-mapping" in vim_content["config"]:
304 del vim_content["config"]["sdn-port-mapping"]
305 if not vim_content["config"]:
306 del vim_content["config"]
307 for k in ("vim_tenant_name", "vim_password", "config"):
308 if k in vim_content:
309 vim_account_RO[k] = vim_content[k]
310 if "vim_user" in vim_content:
311 vim_content["vim_username"] = vim_content["vim_user"]
312 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
313 # vim_thread. RO will remove and relaunch a new thread for this vim_account
314 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
315 db_vim["_admin"]["operationalState"] = "ENABLED"
316 self.update_db("vim_accounts", vim_id, db_vim)
317
318 self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
319 return RO_vim_id
320
321 except (ROclient.ROClientException, DbException) as e:
322 self.logger.error(logging_text + "Exit Exception {}".format(e))
323 exc = e
324 except Exception as e:
325 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
326 exc = e
327 finally:
328 if exc and db_vim:
329 db_vim["_admin"]["operationalState"] = "ERROR"
330 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
331 self.update_db("vim_accounts", vim_id, db_vim)
332
333 async def vim_delete(self, vim_id, order_id):
334 logging_text = "Task vim_delete={} ".format(vim_id)
335 self.logger.debug(logging_text + "Enter")
336 db_vim = None
337 exc = None
338 step = "Getting vim from db"
339 try:
340 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
341 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
342 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
343 RO = ROclient.ROClient(self.loop, **self.ro_config)
344 step = "Detaching vim from RO tenant"
345 try:
346 await RO.detach_datacenter(RO_vim_id)
347 except ROclient.ROClientException as e:
348 if e.http_code == 404: # not found
349 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
350 else:
351 raise
352
353 step = "Deleting vim from RO"
354 try:
355 await RO.delete("vim", RO_vim_id)
356 except ROclient.ROClientException as e:
357 if e.http_code == 404: # not found
358 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
359 else:
360 raise
361 else:
362 # nothing to delete
363 self.logger.error(logging_text + "Skipping. There is not RO information at database")
364 self.db.del_one("vim_accounts", {"_id": vim_id})
365 self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id))
366 return None
367
368 except (ROclient.ROClientException, DbException) as e:
369 self.logger.error(logging_text + "Exit Exception {}".format(e))
370 exc = e
371 except Exception as e:
372 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
373 exc = e
374 finally:
375 if exc and db_vim:
376 db_vim["_admin"]["operationalState"] = "ERROR"
377 db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
378 self.update_db("vim_accounts", vim_id, db_vim)
379
380 async def sdn_create(self, sdn_content, order_id):
381 sdn_id = sdn_content["_id"]
382 logging_text = "Task sdn_create={} ".format(sdn_id)
383 self.logger.debug(logging_text + "Enter")
384 db_sdn = None
385 exc = None
386 try:
387 step = "Getting sdn from db"
388 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
389 if "_admin" not in db_sdn:
390 db_sdn["_admin"] = {}
391 if "deployed" not in db_sdn["_admin"]:
392 db_sdn["_admin"]["deployed"] = {}
393 db_sdn["_admin"]["deployed"]["RO"] = None
394
395 step = "Creating sdn at RO"
396 RO = ROclient.ROClient(self.loop, **self.ro_config)
397 sdn_RO = deepcopy(sdn_content)
398 sdn_RO.pop("_id", None)
399 sdn_RO.pop("_admin", None)
400 sdn_RO.pop("schema_version", None)
401 sdn_RO.pop("schema_type", None)
402 sdn_RO.pop("description", None)
403 desc = await RO.create("sdn", descriptor=sdn_RO)
404 RO_sdn_id = desc["uuid"]
405 db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id
406 db_sdn["_admin"]["operationalState"] = "ENABLED"
407 self.update_db("sdns", sdn_id, db_sdn)
408 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
409 return RO_sdn_id
410
411 except (ROclient.ROClientException, DbException) as e:
412 self.logger.error(logging_text + "Exit Exception {}".format(e))
413 exc = e
414 except Exception as e:
415 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
416 exc = e
417 finally:
418 if exc and db_sdn:
419 db_sdn["_admin"]["operationalState"] = "ERROR"
420 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
421 self.update_db("sdns", sdn_id, db_sdn)
422
423 async def sdn_edit(self, sdn_content, order_id):
424 sdn_id = sdn_content["_id"]
425 logging_text = "Task sdn_edit={} ".format(sdn_id)
426 self.logger.debug(logging_text + "Enter")
427 db_sdn = None
428 exc = None
429 step = "Getting sdn from db"
430 try:
431 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
432 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
433 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
434 RO = ROclient.ROClient(self.loop, **self.ro_config)
435 step = "Editing sdn at RO"
436 sdn_RO = deepcopy(sdn_content)
437 sdn_RO.pop("_id", None)
438 sdn_RO.pop("_admin", None)
439 sdn_RO.pop("schema_version", None)
440 sdn_RO.pop("schema_type", None)
441 sdn_RO.pop("description", None)
442 if sdn_RO:
443 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
444 db_sdn["_admin"]["operationalState"] = "ENABLED"
445 self.update_db("sdns", sdn_id, db_sdn)
446
447 self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
448 return RO_sdn_id
449
450 except (ROclient.ROClientException, DbException) as e:
451 self.logger.error(logging_text + "Exit Exception {}".format(e))
452 exc = e
453 except Exception as e:
454 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
455 exc = e
456 finally:
457 if exc and db_sdn:
458 db_sdn["_admin"]["operationalState"] = "ERROR"
459 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
460 self.update_db("sdns", sdn_id, db_sdn)
461
462 async def sdn_delete(self, sdn_id, order_id):
463 logging_text = "Task sdn_delete={} ".format(sdn_id)
464 self.logger.debug(logging_text + "Enter")
465 db_sdn = None
466 exc = None
467 step = "Getting sdn from db"
468 try:
469 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
470 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
471 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
472 RO = ROclient.ROClient(self.loop, **self.ro_config)
473 step = "Deleting sdn from RO"
474 try:
475 await RO.delete("sdn", RO_sdn_id)
476 except ROclient.ROClientException as e:
477 if e.http_code == 404: # not found
478 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
479 else:
480 raise
481 else:
482 # nothing to delete
483 self.logger.error(logging_text + "Skipping. There is not RO information at database")
484 self.db.del_one("sdns", {"_id": sdn_id})
485 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
486 return None
487
488 except (ROclient.ROClientException, DbException) as e:
489 self.logger.error(logging_text + "Exit Exception {}".format(e))
490 exc = e
491 except Exception as e:
492 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
493 exc = e
494 finally:
495 if exc and db_sdn:
496 db_sdn["_admin"]["operationalState"] = "ERROR"
497 db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step, exc)
498 self.update_db("sdns", sdn_id, db_sdn)
499
500 def vnfd2RO(self, vnfd, new_id=None):
501 """
502 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
503 :param vnfd: input vnfd
504 :param new_id: overrides vnf id if provided
505 :return: copy of vnfd
506 """
507 ci_file = None
508 try:
509 vnfd_RO = deepcopy(vnfd)
510 vnfd_RO.pop("_id", None)
511 vnfd_RO.pop("_admin", None)
512 if new_id:
513 vnfd_RO["id"] = new_id
514 for vdu in vnfd_RO["vdu"]:
515 if "cloud-init-file" in vdu:
516 base_folder = vnfd["_admin"]["storage"]
517 clout_init_file = "{}/{}/cloud_init/{}".format(
518 base_folder["folder"],
519 base_folder["pkg-dir"],
520 vdu["cloud-init-file"]
521 )
522 ci_file = self.fs.file_open(clout_init_file, "r")
523 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
524 # convert to base 64 or similar
525 clout_init_content = ci_file.read()
526 ci_file.close()
527 ci_file = None
528 vdu.pop("cloud-init-file", None)
529 vdu["cloud-init"] = clout_init_content
530 # remnove unused by RO configuration, monitoring, scaling
531 vnfd_RO.pop("vnf-configuration", None)
532 vnfd_RO.pop("monitoring-param", None)
533 vnfd_RO.pop("scaling-group-descriptor", None)
534 return vnfd_RO
535 except FsException as e:
536 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
537 finally:
538 if ci_file:
539 ci_file.close()
540
541 def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, member_vnf_index,
542 task=None):
543 """
544 Callback both for charm status change and task completion
545 :param model_name: Charm model name
546 :param application_name: Charm application name
547 :param status: Can be
548 - blocked: The unit needs manual intervention
549 - maintenance: The unit is actively deploying/configuring
550 - waiting: The unit is waiting for another charm to be ready
551 - active: The unit is deployed, configured, and ready
552 - error: The charm has failed and needs attention.
553 - terminated: The charm has been destroyed
554 - removing,
555 - removed
556 :param message: detailed message error
557 :param db_nsr: nsr database content
558 :param db_nslcmop: nslcmop database content
559 :param member_vnf_index: NSD member-vnf-index
560 :param task: None for charm status change, or task for completion task callback
561 :return:
562 """
563 nsr_id = None
564 nslcmop_id = None
565 db_nsr_update = {}
566 db_nslcmop_update = {}
567 try:
568 nsr_id = db_nsr["_id"]
569 nslcmop_id = db_nslcmop["_id"]
570 nsr_lcm = db_nsr["_admin"]["deployed"]
571 ns_operation = db_nslcmop["lcmOperationType"]
572 logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_operation, nslcmop_id,
573 member_vnf_index)
574
575 if task:
576 if task.cancelled():
577 self.logger.debug(logging_text + " task Cancelled")
578 # TODO update db_nslcmop
579 return
580
581 if task.done():
582 exc = task.exception()
583 if exc:
584 self.logger.error(logging_text + " task Exception={}".format(exc))
585 if ns_operation in ("instantiate", "terminate"):
586 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error"
587 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = \
588 "error"
589 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc)
590 db_nsr_update[
591 "_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(exc)
592 elif ns_operation == "action":
593 db_nslcmop_update["operationState"] = "FAILED"
594 db_nslcmop_update["detailed-status"] = str(exc)
595 db_nslcmop_update["statusEnteredTime"] = time()
596 return
597
598 else:
599 self.logger.debug(logging_text + " task Done")
600 # TODO revise with Adam if action is finished and ok when task is done
601 if ns_operation == "action":
602 db_nslcmop_update["operationState"] = "COMPLETED"
603 db_nslcmop_update["detailed-status"] = "Done"
604 db_nslcmop_update["statusEnteredTime"] = time()
605 # task is Done, but callback is still ongoing. So ignore
606 return
607 elif status:
608 self.logger.debug(logging_text + " Enter status={}".format(status))
609 if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status:
610 return # same status, ignore
611 nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status
612 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = status
613 nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message)
614 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(message)
615 else:
616 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
617 return
618
619 all_active = True
620 status_map = {}
621 n2vc_error_text = [] # contain text error list. If empty no one is in error status
622 for vnf_index, vca_info in nsr_lcm["VCA"].items():
623 vca_status = vca_info["operational-status"]
624 if vca_status not in status_map:
625 # Initialize it
626 status_map[vca_status] = 0
627 status_map[vca_status] += 1
628
629 if vca_status != "active":
630 all_active = False
631 elif vca_status in ("error", "blocked"):
632 n2vc_error_text.append("member_vnf_index={} {}: {}".format(member_vnf_index, vca_status,
633 vca_info["detailed-status"]))
634
635 if all_active:
636 self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id,
637 member_vnf_index))
638 db_nsr_update["config-status"] = "configured"
639 db_nsr_update["detailed-status"] = "done"
640 db_nslcmop_update["operationState"] = "COMPLETED"
641 db_nslcmop_update["detailed-status"] = "Done"
642 db_nslcmop_update["statusEnteredTime"] = time()
643 elif n2vc_error_text:
644 db_nsr_update["config-status"] = "failed"
645 error_text = "fail configuring " + ";".join(n2vc_error_text)
646 db_nsr_update["detailed-status"] = error_text
647 db_nslcmop_update["operationState"] = "FAILED_TEMP"
648 db_nslcmop_update["detailed-status"] = error_text
649 db_nslcmop_update["statusEnteredTime"] = time()
650 else:
651 cs = "configuring: "
652 separator = ""
653 for status, num in status_map.items():
654 cs += separator + "{}: {}".format(status, num)
655 separator = ", "
656 db_nsr_update["config-status"] = cs
657 db_nsr_update["detailed-status"] = cs
658 db_nslcmop_update["detailed-status"] = cs
659
660 except Exception as e:
661 self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True)
662 finally:
663 try:
664 if db_nslcmop_update:
665 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
666 if db_nsr_update:
667 self.update_db_2("nsrs", nsr_id, db_nsr_update)
668 except Exception as e:
669 self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
670 member_vnf_index, e), exc_info=True)
671
672 def ns_params_2_RO(self, ns_params):
673 """
674 Creates a RO ns descriptor from OSM ns_instantite params
675 :param ns_params: OSM instantiate params
676 :return: The RO ns descriptor
677 """
678 vim_2_RO = {}
679
680 def vim_account_2_RO(vim_account):
681 if vim_account in vim_2_RO:
682 return vim_2_RO[vim_account]
683 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
684 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
685 # #TODO check if VIM is creating and wait
686 if db_vim["_admin"]["operationalState"] != "ENABLED":
687 raise LcmException("VIM={} is not available. operationalState={}".format(
688 vim_account, db_vim["_admin"]["operationalState"]))
689 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
690 vim_2_RO[vim_account] = RO_vim_id
691 return RO_vim_id
692
693 if not ns_params:
694 return None
695 RO_ns_params = {
696 # "name": ns_params["nsName"],
697 # "description": ns_params.get("nsDescription"),
698 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
699 # "scenario": ns_params["nsdId"],
700 "vnfs": {},
701 "networks": {},
702 }
703 if ns_params.get("ssh-authorized-key"):
704 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh-authorized-key"]}
705 if ns_params.get("vnf"):
706 for vnf in ns_params["vnf"]:
707 RO_vnf = {}
708 if "vimAccountId" in vnf:
709 RO_vnf["datacenter"] = vim_account_2_RO(vnf["vimAccountId"])
710 if RO_vnf:
711 RO_ns_params["vnfs"][vnf["member-vnf-index"]] = RO_vnf
712 if ns_params.get("vld"):
713 for vld in ns_params["vld"]:
714 RO_vld = {}
715 if "ip-profile" in vld:
716 RO_vld["ip-profile"] = vld["ip-profile"]
717 if "vim-network-name" in vld:
718 RO_vld["sites"] = []
719 if isinstance(vld["vim-network-name"], dict):
720 for vim_account, vim_net in vld["vim-network-name"].items():
721 RO_vld["sites"].append({
722 "netmap-use": vim_net,
723 "datacenter": vim_account_2_RO(vim_account)
724 })
725 else: # isinstance str
726 RO_vld["sites"].append({"netmap-use": vld["vim-network-name"]})
727 if RO_vld:
728 RO_ns_params["networks"][vld["name"]] = RO_vld
729 return RO_ns_params
730
731 def ns_update_vnfr(self, db_vnfrs, ns_RO_info):
732 """
733 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
734 :param db_vnfrs:
735 :param ns_RO_info:
736 :return:
737 """
738 for vnf_index, db_vnfr in db_vnfrs.items():
739 vnfr_deployed = ns_RO_info.get(vnf_index)
740 if not vnfr_deployed:
741 continue
742 vnfr_update = {}
743 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnfr_deployed.get("ip_address")
744 for index, vdur in enumerate(db_vnfr["vdur"]):
745 vdu_deployed = vnfr_deployed["vdur"].get(vdur["vdu-id-ref"])
746 if not vdu_deployed:
747 continue
748 vnfr_update["vdur.{}.vim-id".format(index)] = vdu_deployed.get("vim_id")
749 db_vnfr["vdur"][index]["vim-id"] = vnfr_update["vdur.{}.vim-id".format(index)]
750 vnfr_update["vdur.{}.ip-address".format(index)] = vdu_deployed.get("ip_address")
751 db_vnfr["vdur"][index]["ip-address"] = vnfr_update["vdur.{}.ip-address".format(index)]
752 for index2, interface in enumerate(vdur["interfaces"]):
753 iface_deployed = vdu_deployed["interfaces"].get(interface["name"])
754 if not iface_deployed:
755 continue
756 db_vnfr["vdur"][index]["interfaces"][index2]["vim-id"] =\
757 vnfr_update["vdur.{}.interfaces.{}.vim-id".format(index, index2)] = iface_deployed.get("vim_id")
758 db_vnfr["vdur"][index]["interfaces"][index2]["ip-address"] =\
759 vnfr_update["vdur.{}.interfaces.{}.ip-address".format(index, index2)] = iface_deployed.get(
760 "ip_address")
761 db_vnfr["vdur"][index]["interfaces"][index2]["mac-address"] =\
762 vnfr_update["vdur.{}.interfaces.{}.mac-address".format(index, index2)] = iface_deployed.get(
763 "mac_address")
764 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
765
766 def ns_update_vnfr_2(self, db_vnfrs, nsr_desc_RO):
767 """
768 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
769 :param db_vnfrs:
770 :param nsr_desc_RO:
771 :return:
772 """
773 for vnf_index, db_vnfr in db_vnfrs.items():
774 for vnf_RO in nsr_desc_RO["vnfs"]:
775 if vnf_RO["member_vnf_index"] == vnf_index:
776 vnfr_update = {}
777 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO.get("ip_address")
778 vdur_list = []
779 for vdur_RO in vnf_RO.get("vms", ()):
780 vdur = {
781 "vim-id": vdur_RO.get("vim_vm_id"),
782 "ip-address": vdur_RO.get("ip_address"),
783 "vdu-id-ref": vdur_RO.get("vdu_osm_id"),
784 "name": vdur_RO.get("vim_name"),
785 "status": vdur_RO.get("status"),
786 "status-detailed": vdur_RO.get("error_msg"),
787 "interfaces": []
788 }
789
790 for interface_RO in vdur_RO.get("interfaces", ()):
791 vdur["interfaces"].append({
792 "ip-address": interface_RO.get("ip_address"),
793 "mac-address": interface_RO.get("mac_address"),
794 "name": interface_RO.get("external_name"),
795 })
796 vdur_list.append(vdur)
797 db_vnfr["vdur"] = vnfr_update["vdur"] = vdur_list
798 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
799 break
800
801 else:
802 raise LcmException("ns_update_vnfr_2: Not found member_vnf_index={} at RO info".format(vnf_index))
803
804 async def create_monitoring(self, nsr_id, vnf_member_index, vnfd_desc):
805 if not vnfd_desc.get("scaling-group-descriptor"):
806 return
807 for scaling_group in vnfd_desc["scaling-group-descriptor"]:
808 scaling_policy_desc = {}
809 scaling_desc = {
810 "ns_id": nsr_id,
811 "scaling_group_descriptor": {
812 "name": scaling_group["name"],
813 "scaling_policy": scaling_policy_desc
814 }
815 }
816 for scaling_policy in scaling_group.get("scaling-policy"):
817 scaling_policy_desc["scale_in_operation_type"] = scaling_policy_desc["scale_out_operation_type"] = \
818 scaling_policy["scaling-type"]
819 scaling_policy_desc["threshold_time"] = scaling_policy["threshold-time"]
820 scaling_policy_desc["cooldown_time"] = scaling_policy["cooldown-time"]
821 scaling_policy_desc["scaling_criteria"] = []
822 for scaling_criteria in scaling_policy.get("scaling-criteria"):
823 scaling_criteria_desc = {"scale_in_threshold": scaling_criteria.get("scale-in-threshold"),
824 "scale_out_threshold": scaling_criteria.get("scale-out-threshold"),
825 }
826 if not scaling_criteria.get("vnf-monitoring-param-ref"):
827 continue
828 for monitoring_param in vnfd_desc.get("monitoring-param", ()):
829 if monitoring_param["id"] == scaling_criteria["vnf-monitoring-param-ref"]:
830 scaling_criteria_desc["monitoring_param"] = {
831 "id": monitoring_param["id"],
832 "name": monitoring_param["name"],
833 "aggregation_type": monitoring_param.get("aggregation-type"),
834 "vdu_name": monitoring_param.get("vdu-ref"),
835 "vnf_member_index": vnf_member_index,
836 }
837
838 scaling_policy_desc["scaling_criteria"].append(scaling_criteria_desc)
839 break
840 else:
841 self.logger.error(
842 "Task ns={} member_vnf_index={} Invalid vnfd vnf-monitoring-param-ref={} not in "
843 "monitoring-param list".format(nsr_id, vnf_member_index,
844 scaling_criteria["vnf-monitoring-param-ref"]))
845
846 await self.msg.aiowrite("lcm_pm", "configure_scaling", scaling_desc, self.loop)
847
848 async def ns_instantiate(self, nsr_id, nslcmop_id):
849 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
850 self.logger.debug(logging_text + "Enter")
851 # get all needed from database
852 db_nsr = None
853 db_nslcmop = None
854 db_nsr_update = {}
855 db_nslcmop_update = {}
856 db_vnfrs = {}
857 RO_descriptor_number = 0 # number of descriptors created at RO
858 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
859 exc = None
860 try:
861 step = "Getting nslcmop={} from db".format(nslcmop_id)
862 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
863 step = "Getting nsr={} from db".format(nsr_id)
864 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
865 nsd = db_nsr["nsd"]
866 nsr_name = db_nsr["name"] # TODO short-name??
867 needed_vnfd = {}
868 vnfr_filter = {"nsr-id-ref": nsr_id, "member-vnf-index-ref": None}
869 for c_vnf in nsd["constituent-vnfd"]:
870 vnfd_id = c_vnf["vnfd-id-ref"]
871 vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
872 step = "Getting vnfr={} of nsr={} from db".format(c_vnf["member-vnf-index"], nsr_id)
873 db_vnfrs[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
874 if vnfd_id not in needed_vnfd:
875 step = "Getting vnfd={} from db".format(vnfd_id)
876 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
877
878 nsr_lcm = db_nsr["_admin"].get("deployed")
879 if not nsr_lcm:
880 nsr_lcm = db_nsr["_admin"]["deployed"] = {
881 "id": nsr_id,
882 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
883 "nsr_ip": {},
884 "VCA": {},
885 }
886 db_nsr_update["detailed-status"] = "creating"
887 db_nsr_update["operational-status"] = "init"
888
889 RO = ROclient.ROClient(self.loop, **self.ro_config)
890
891 # get vnfds, instantiate at RO
892 for vnfd_id, vnfd in needed_vnfd.items():
893 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
894 # self.logger.debug(logging_text + step)
895 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_id[:23])
896 descriptor_id_2_RO[vnfd_id] = vnfd_id_RO
897 RO_descriptor_number += 1
898
899 # look if present
900 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
901 if vnfd_list:
902 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
903 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
904 vnfd_id, vnfd_list[0]["uuid"]))
905 else:
906 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
907 desc = await RO.create("vnfd", descriptor=vnfd_RO)
908 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
909 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
910 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
911 vnfd_id, desc["uuid"]))
912 self.update_db_2("nsrs", nsr_id, db_nsr_update)
913
914 # create nsd at RO
915 nsd_id = nsd["id"]
916 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
917 # self.logger.debug(logging_text + step)
918
919 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_id[:23])
920 descriptor_id_2_RO[nsd_id] = RO_osm_nsd_id
921 RO_descriptor_number += 1
922 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
923 if nsd_list:
924 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
925 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
926 nsd_id, RO_nsd_uuid))
927 else:
928 nsd_RO = deepcopy(nsd)
929 nsd_RO["id"] = RO_osm_nsd_id
930 nsd_RO.pop("_id", None)
931 nsd_RO.pop("_admin", None)
932 for c_vnf in nsd_RO["constituent-vnfd"]:
933 vnfd_id = c_vnf["vnfd-id-ref"]
934 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
935 desc = await RO.create("nsd", descriptor=nsd_RO)
936 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
937 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
938 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, RO_nsd_uuid))
939 self.update_db_2("nsrs", nsr_id, db_nsr_update)
940
941 # Crate ns at RO
942 # if present use it unless in error status
943 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
944 if RO_nsr_id:
945 try:
946 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
947 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
948 desc = await RO.show("ns", RO_nsr_id)
949 except ROclient.ROClientException as e:
950 if e.http_code != HTTPStatus.NOT_FOUND:
951 raise
952 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
953 if RO_nsr_id:
954 ns_status, ns_status_info = RO.check_ns_status(desc)
955 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
956 if ns_status == "ERROR":
957 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
958 self.logger.debug(logging_text + step)
959 await RO.delete("ns", RO_nsr_id)
960 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
961 if not RO_nsr_id:
962 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
963 # self.logger.debug(logging_text + step)
964 RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
965 desc = await RO.create("ns", descriptor=RO_ns_params,
966 name=db_nsr["name"],
967 scenario=RO_nsd_uuid)
968 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
969 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
970 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
971 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
972 self.update_db_2("nsrs", nsr_id, db_nsr_update)
973
974 # update VNFR vimAccount
975 step = "Updating VNFR vimAcccount"
976 for vnf_index, vnfr in db_vnfrs.items():
977 if vnfr.get("vim-account-id"):
978 continue
979 vnfr_update = {"vim-account-id": db_nsr["instantiate_params"]["vimAccountId"]}
980 if db_nsr["instantiate_params"].get("vnf"):
981 for vnf_params in db_nsr["instantiate_params"]["vnf"]:
982 if vnf_params.get("member-vnf-index") == vnf_index:
983 if vnf_params.get("vimAccountId"):
984 vnfr_update["vim-account-id"] = vnf_params.get("vimAccountId")
985 break
986 self.update_db_2("vnfrs", vnfr["_id"], vnfr_update)
987
988 # wait until NS is ready
989 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
990 detailed_status_old = None
991 self.logger.debug(logging_text + step)
992
993 deployment_timeout = 2 * 3600 # Two hours
994 while deployment_timeout > 0:
995 desc = await RO.show("ns", RO_nsr_id)
996 ns_status, ns_status_info = RO.check_ns_status(desc)
997 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
998 if ns_status == "ERROR":
999 raise ROclient.ROClientException(ns_status_info)
1000 elif ns_status == "BUILD":
1001 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
1002 elif ns_status == "ACTIVE":
1003 step = detailed_status = "Waiting for management IP address reported by the VIM"
1004 try:
1005 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
1006 break
1007 except ROclient.ROClientException as e:
1008 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
1009 raise e
1010 else:
1011 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1012 if detailed_status != detailed_status_old:
1013 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
1014 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1015 await asyncio.sleep(5, loop=self.loop)
1016 deployment_timeout -= 5
1017 if deployment_timeout <= 0:
1018 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1019
1020 step = "Updating VNFRs"
1021 # self.ns_update_vnfr(db_vnfrs, ns_RO_info)
1022 self.ns_update_vnfr_2(db_vnfrs, desc)
1023
1024 db_nsr["detailed-status"] = "Configuring vnfr"
1025 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1026
1027 # The parameters we'll need to deploy a charm
1028 number_to_configure = 0
1029
1030 def deploy():
1031 """An inner function to deploy the charm from either vnf or vdu
1032 """
1033
1034 # Login to the VCA.
1035 # if number_to_configure == 0:
1036 # self.logger.debug("Logging into N2VC...")
1037 # task = asyncio.ensure_future(self.n2vc.login())
1038 # yield from asyncio.wait_for(task, 30.0)
1039 # self.logger.debug("Logged into N2VC!")
1040
1041 # # await self.n2vc.login()
1042
1043 # Note: The charm needs to exist on disk at the location
1044 # specified by charm_path.
1045 base_folder = vnfd["_admin"]["storage"]
1046 storage_params = self.fs.get_params()
1047 charm_path = "{}{}/{}/charms/{}".format(
1048 storage_params["path"],
1049 base_folder["folder"],
1050 base_folder["pkg-dir"],
1051 proxy_charm
1052 )
1053
1054 # Setup the runtime parameters for this VNF
1055 params['rw_mgmt_ip'] = db_vnfrs[vnf_index]["ip-address"]
1056
1057 # ns_name will be ignored in the current version of N2VC
1058 # but will be implemented for the next point release.
1059 model_name = 'default'
1060 application_name = self.n2vc.FormatApplicationName(
1061 nsr_name,
1062 vnf_index,
1063 vnfd['name'],
1064 )
1065 if not nsr_lcm.get("VCA"):
1066 nsr_lcm["VCA"] = {}
1067 nsr_lcm["VCA"][vnf_index] = db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = {
1068 "model": model_name,
1069 "application": application_name,
1070 "operational-status": "init",
1071 "detailed-status": "",
1072 "vnfd_id": vnfd_id,
1073 }
1074 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1075
1076 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
1077 proxy_charm))
1078 task = asyncio.ensure_future(
1079 self.n2vc.DeployCharms(
1080 model_name, # The network service name
1081 application_name, # The application name
1082 vnfd, # The vnf descriptor
1083 charm_path, # Path to charm
1084 params, # Runtime params, like mgmt ip
1085 {}, # for native charms only
1086 self.n2vc_callback, # Callback for status changes
1087 db_nsr, # Callback parameter
1088 db_nslcmop,
1089 vnf_index, # Callback parameter
1090 None, # Callback parameter (task)
1091 )
1092 )
1093 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
1094 db_nsr, db_nslcmop, vnf_index))
1095 self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
1096
1097 step = "Looking for needed vnfd to configure"
1098 self.logger.debug(logging_text + step)
1099
1100 for c_vnf in nsd["constituent-vnfd"]:
1101 vnfd_id = c_vnf["vnfd-id-ref"]
1102 vnf_index = str(c_vnf["member-vnf-index"])
1103 vnfd = needed_vnfd[vnfd_id]
1104
1105 # Check if this VNF has a charm configuration
1106 vnf_config = vnfd.get("vnf-configuration")
1107
1108 if vnf_config and vnf_config.get("juju"):
1109 proxy_charm = vnf_config["juju"]["charm"]
1110 params = {}
1111
1112 if proxy_charm:
1113 if 'initial-config-primitive' in vnf_config:
1114 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
1115
1116 # Login to the VCA. If there are multiple calls to login(),
1117 # subsequent calls will be a nop and return immediately.
1118 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
1119 await self.n2vc.login()
1120 deploy()
1121 number_to_configure += 1
1122
1123 # Deploy charms for each VDU that supports one.
1124 for vdu in vnfd['vdu']:
1125 vdu_config = vdu.get('vdu-configuration')
1126 proxy_charm = None
1127 params = {}
1128
1129 if vdu_config and vdu_config.get("juju"):
1130 proxy_charm = vdu_config["juju"]["charm"]
1131
1132 if 'initial-config-primitive' in vdu_config:
1133 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
1134
1135 if proxy_charm:
1136 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
1137 await self.n2vc.login()
1138 deploy()
1139 number_to_configure += 1
1140
1141 if number_to_configure:
1142 db_nsr_update["config-status"] = "configuring"
1143 db_nsr_update["operational-status"] = "running"
1144 db_nsr_update["detailed-status"] = "configuring: init: {}".format(number_to_configure)
1145 db_nslcmop_update["detailed-status"] = "configuring: init: {}".format(number_to_configure)
1146 else:
1147 db_nslcmop_update["operationState"] = "COMPLETED"
1148 db_nslcmop_update["statusEnteredTime"] = time()
1149 db_nslcmop_update["detailed-status"] = "done"
1150 db_nsr_update["config-status"] = "configured"
1151 db_nsr_update["detailed-status"] = "done"
1152 db_nsr_update["operational-status"] = "running"
1153 step = "Sending monitoring parameters to PM"
1154 # for c_vnf in nsd["constituent-vnfd"]:
1155 # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]])
1156 try:
1157 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1158 except Exception as e:
1159 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1160
1161 self.logger.debug(logging_text + "Exit")
1162 return
1163
1164 except (ROclient.ROClientException, DbException, LcmException) as e:
1165 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1166 exc = e
1167 except asyncio.CancelledError:
1168 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1169 exc = "Operation was cancelled"
1170 except Exception as e:
1171 exc = traceback.format_exc()
1172 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1173 exc_info=True)
1174 finally:
1175 if exc:
1176 if db_nsr:
1177 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1178 db_nsr_update["operational-status"] = "failed"
1179 if db_nslcmop:
1180 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1181 db_nslcmop_update["operationState"] = "FAILED"
1182 db_nslcmop_update["statusEnteredTime"] = time()
1183 if db_nsr_update:
1184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1185 if db_nslcmop_update:
1186 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1187
1188 async def ns_terminate(self, nsr_id, nslcmop_id):
1189 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1190 self.logger.debug(logging_text + "Enter")
1191 db_nsr = None
1192 db_nslcmop = None
1193 exc = None
1194 failed_detail = [] # annotates all failed error messages
1195 vca_task_list = []
1196 vca_task_dict = {}
1197 db_nsr_update = {}
1198 db_nslcmop_update = {}
1199 try:
1200 step = "Getting nslcmop={} from db".format(nslcmop_id)
1201 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1202 step = "Getting nsr={} from db".format(nsr_id)
1203 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1204 # nsd = db_nsr["nsd"]
1205 nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed"))
1206 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1207 return
1208 # TODO ALF remove
1209 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1210 # #TODO check if VIM is creating and wait
1211 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1212
1213 db_nsr_update["operational-status"] = "terminating"
1214 db_nsr_update["config-status"] = "terminating"
1215
1216 if nsr_lcm and nsr_lcm.get("VCA"):
1217 try:
1218 step = "Scheduling configuration charms removing"
1219 db_nsr_update["detailed-status"] = "Deleting charms"
1220 self.logger.debug(logging_text + step)
1221 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1222 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
1223 if deploy_info and deploy_info.get("application"):
1224 task = asyncio.ensure_future(
1225 self.n2vc.RemoveCharms(
1226 deploy_info['model'],
1227 deploy_info['application'],
1228 # self.n2vc_callback,
1229 # db_nsr,
1230 # db_nslcmop,
1231 # vnf_index,
1232 )
1233 )
1234 vca_task_list.append(task)
1235 vca_task_dict[vnf_index] = task
1236 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1237 # deploy_info['application'], None, db_nsr,
1238 # db_nslcmop, vnf_index))
1239 self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
1240 except Exception as e:
1241 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1242
1243 # remove from RO
1244 RO_fail = False
1245 RO = ROclient.ROClient(self.loop, **self.ro_config)
1246 # Delete ns
1247 if nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsr_id"):
1248 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
1249 try:
1250 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1251 self.logger.debug(logging_text + step)
1252 await RO.delete("ns", RO_nsr_id)
1253 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1254 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1255 except ROclient.ROClientException as e:
1256 if e.http_code == 404: # not found
1257 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1258 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1259 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1260 elif e.http_code == 409: # conflict
1261 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1262 self.logger.debug(logging_text + failed_detail[-1])
1263 RO_fail = True
1264 else:
1265 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1266 self.logger.error(logging_text + failed_detail[-1])
1267 RO_fail = True
1268
1269 # Delete nsd
1270 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"):
1271 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1272 try:
1273 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1274 "Deleting nsd at RO"
1275 await RO.delete("nsd", RO_nsd_id)
1276 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1277 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1278 except ROclient.ROClientException as e:
1279 if e.http_code == 404: # not found
1280 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1281 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1282 elif e.http_code == 409: # conflict
1283 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1284 self.logger.debug(logging_text + failed_detail[-1])
1285 RO_fail = True
1286 else:
1287 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1288 self.logger.error(logging_text + failed_detail[-1])
1289 RO_fail = True
1290
1291 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"):
1292 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1293 if not RO_vnfd_id:
1294 continue
1295 try:
1296 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1297 "Deleting vnfd={} at RO".format(vnf_id)
1298 await RO.delete("vnfd", RO_vnfd_id)
1299 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1300 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1301 except ROclient.ROClientException as e:
1302 if e.http_code == 404: # not found
1303 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1304 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1305 elif e.http_code == 409: # conflict
1306 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1307 self.logger.debug(logging_text + failed_detail[-1])
1308 else:
1309 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1310 self.logger.error(logging_text + failed_detail[-1])
1311
1312 if vca_task_list:
1313 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1314 "Waiting for deletion of configuration charms"
1315 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1316 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1317 await asyncio.wait(vca_task_list, timeout=300)
1318 for vnf_index, task in vca_task_dict.items():
1319 if task.cancelled():
1320 failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index))
1321 elif task.done():
1322 exc = task.exception()
1323 if exc:
1324 failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
1325 else:
1326 db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = None
1327 else: # timeout
1328 # TODO Should it be cancelled?!!
1329 task.cancel()
1330 failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index))
1331
1332 if failed_detail:
1333 self.logger.error(logging_text + " ;".join(failed_detail))
1334 db_nsr_update["operational-status"] = "failed"
1335 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1336 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1337 db_nslcmop_update["operationState"] = "FAILED"
1338 db_nslcmop_update["statusEnteredTime"] = time()
1339 elif db_nslcmop["operationParams"].get("autoremove"):
1340 self.db.del_one("nsrs", {"_id": nsr_id})
1341 db_nsr_update.clear()
1342 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1343 db_nslcmop_update.clear()
1344 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1345 self.logger.debug(logging_text + "Delete from database")
1346 else:
1347 db_nsr_update["operational-status"] = "terminated"
1348 db_nsr_update["detailed-status"] = "Done"
1349 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1350 db_nslcmop_update["detailed-status"] = "Done"
1351 db_nslcmop_update["operationState"] = "COMPLETED"
1352 db_nslcmop_update["statusEnteredTime"] = time()
1353 try:
1354 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1355 except Exception as e:
1356 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1357 self.logger.debug(logging_text + "Exit")
1358
1359 except (ROclient.ROClientException, DbException) as e:
1360 self.logger.error(logging_text + "Exit Exception {}".format(e))
1361 exc = e
1362 except asyncio.CancelledError:
1363 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1364 exc = "Operation was cancelled"
1365 except Exception as e:
1366 exc = traceback.format_exc()
1367 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1368 finally:
1369 if exc and db_nslcmop:
1370 db_nslcmop_update = {
1371 "detailed-status": "FAILED {}: {}".format(step, exc),
1372 "operationState": "FAILED",
1373 "statusEnteredTime": time(),
1374 }
1375 if db_nslcmop_update:
1376 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1377 if db_nsr_update:
1378 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1379
1380 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, primitive, primitive_params):
1381 vca_deployed = db_deployed["VCA"].get(member_vnf_index)
1382 if not vca_deployed:
1383 raise LcmException("charm for member_vnf_index={} is not deployed".format(member_vnf_index))
1384 model_name = vca_deployed.get("model")
1385 application_name = vca_deployed.get("application")
1386 if not model_name or not application_name:
1387 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index))
1388 if vca_deployed["operational-status"] != "active":
1389 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1390 member_vnf_index, vca_deployed["operational-status"]))
1391 callback = None # self.n2vc_callback
1392 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1393 await self.n2vc.login()
1394 task = asyncio.ensure_future(
1395 self.n2vc.ExecutePrimitive(
1396 model_name,
1397 application_name,
1398 primitive, callback,
1399 *callback_args,
1400 **primitive_params
1401 )
1402 )
1403 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1404 # db_nsr, db_nslcmop, member_vnf_index))
1405 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1406 # wait until completed with timeout
1407 await asyncio.wait((task,), timeout=600)
1408
1409 result = "FAILED" # by default
1410 result_detail = ""
1411 if task.cancelled():
1412 result_detail = "Task has been cancelled"
1413 elif task.done():
1414 exc = task.exception()
1415 if exc:
1416 result_detail = str(exc)
1417 else:
1418 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1419 result = "COMPLETED"
1420 result_detail = "Done"
1421 else: # timeout
1422 # TODO Should it be cancelled?!!
1423 task.cancel()
1424 result_detail = "timeout"
1425 return result, result_detail
1426
1427 async def ns_action(self, nsr_id, nslcmop_id):
1428 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1429 self.logger.debug(logging_text + "Enter")
1430 # get all needed from database
1431 db_nsr = None
1432 db_nslcmop = None
1433 db_nslcmop_update = None
1434 exc = None
1435 try:
1436 step = "Getting information from database"
1437 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1438 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1439 nsr_lcm = db_nsr["_admin"].get("deployed")
1440 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1441
1442 # TODO check if ns is in a proper status
1443 primitive = db_nslcmop["operationParams"]["primitive"]
1444 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1445 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, primitive, primitive_params)
1446 db_nslcmop_update = {
1447 "detailed-status": result_detail,
1448 "operationState": result,
1449 "statusEnteredTime": time()
1450 }
1451 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1452 return # database update is called inside finally
1453
1454 except (DbException, LcmException) as e:
1455 self.logger.error(logging_text + "Exit Exception {}".format(e))
1456 exc = e
1457 except asyncio.CancelledError:
1458 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1459 exc = "Operation was cancelled"
1460 except Exception as e:
1461 exc = traceback.format_exc()
1462 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1463 finally:
1464 if exc and db_nslcmop:
1465 db_nslcmop_update = {
1466 "detailed-status": "FAILED {}: {}".format(step, exc),
1467 "operationState": "FAILED",
1468 "statusEnteredTime": time(),
1469 }
1470 if db_nslcmop_update:
1471 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1472
1473 async def ns_scale(self, nsr_id, nslcmop_id):
1474 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1475 self.logger.debug(logging_text + "Enter")
1476 # get all needed from database
1477 db_nsr = None
1478 db_nslcmop = None
1479 db_nslcmop_update = {}
1480 db_nsr_update = {}
1481 exc = None
1482 try:
1483 step = "Getting nslcmop from database"
1484 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1485 step = "Getting nsr from database"
1486 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1487 step = "Parsing scaling parameters"
1488 nsr_lcm = db_nsr["_admin"].get("deployed")
1489 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
1490 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1491 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1492 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1493 scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1494
1495 step = "Getting vnfr from database"
1496 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1497 step = "Getting vnfd from database"
1498 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1499 step = "Getting scaling-group-descriptor"
1500 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1501 if scaling_descriptor["name"] == scaling_group:
1502 break
1503 else:
1504 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1505 "at vnfd:scaling-group-descriptor".format(scaling_group))
1506 cooldown_time = 0
1507 for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1508 cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1509 if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1510 break
1511
1512 # TODO check if ns is in a proper status
1513 step = "Sending scale order to RO"
1514 nb_scale_op = 0
1515 if not db_nsr["_admin"].get("scaling-group"):
1516 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1517 admin_scale_index = 0
1518 else:
1519 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1520 if admin_scale_info["name"] == scaling_group:
1521 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1522 break
1523 RO_scaling_info = []
1524 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1525 if scaling_type == "SCALE_OUT":
1526 # count if max-instance-count is reached
1527 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1528 max_instance_count = int(scaling_descriptor["max-instance-count"])
1529 if nb_scale_op >= max_instance_count:
1530 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1531 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1532 nb_scale_op = nb_scale_op + 1
1533 vdu_scaling_info["scaling_direction"] = "OUT"
1534 vdu_scaling_info["vdu-create"] = {}
1535 for vdu_scale_info in scaling_descriptor["vdu"]:
1536 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1537 "type": "create", "count": vdu_scale_info.get("count", 1)})
1538 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1539 elif scaling_type == "SCALE_IN":
1540 # count if min-instance-count is reached
1541 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1542 min_instance_count = int(scaling_descriptor["min-instance-count"])
1543 if nb_scale_op <= min_instance_count:
1544 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1545 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1546 nb_scale_op = nb_scale_op - 1
1547 vdu_scaling_info["scaling_direction"] = "IN"
1548 vdu_scaling_info["vdu-delete"] = {}
1549 for vdu_scale_info in scaling_descriptor["vdu"]:
1550 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1551 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1552 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1553
1554 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1555 if vdu_scaling_info["scaling_direction"] == "IN":
1556 for vdur in reversed(db_vnfr["vdur"]):
1557 if vdu_scaling_info["vdu-delete"].get(vdur["vdu-id-ref"]):
1558 vdu_scaling_info["vdu-delete"][vdur["vdu-id-ref"]] -= 1
1559 vdu_scaling_info["vdu"].append({
1560 "name": vdur["name"],
1561 "vdu_id": vdur["vdu-id-ref"],
1562 "interface": []
1563 })
1564 for interface in vdur["interfaces"]:
1565 vdu_scaling_info["vdu"][-1]["interface"].append({
1566 "name": interface["name"],
1567 "ip_address": interface["ip-address"],
1568 "mac_address": interface.get("mac-address"),
1569 })
1570 del vdu_scaling_info["vdu-delete"]
1571
1572 # execute primitive service PRE-SCALING
1573 step = "Executing pre-scale vnf-config-primitive"
1574 if scaling_descriptor.get("scaling-config-action"):
1575 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1576 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1577 and scaling_type == "SCALE_IN":
1578 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1579 step = db_nslcmop_update["detailed-status"] = \
1580 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1581 # look for primitive
1582 primitive_params = {}
1583 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1584 if config_primitive["name"] == vnf_config_primitive:
1585 for parameter in config_primitive.get("parameter", ()):
1586 if 'default-value' in parameter and \
1587 parameter['default-value'] == "<VDU_SCALE_INFO>":
1588 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1589 default_flow_style=True,
1590 width=256)
1591 break
1592 else:
1593 raise LcmException(
1594 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1595 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1596 "primitive".format(scaling_group, config_primitive))
1597 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1598 vnf_config_primitive, primitive_params)
1599 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1600 vnf_config_primitive, result, result_detail))
1601 if result == "FAILED":
1602 raise LcmException(result_detail)
1603
1604 if RO_scaling_info:
1605 RO = ROclient.ROClient(self.loop, **self.ro_config)
1606 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1607 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1608 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1609 # TODO mark db_nsr_update as scaling
1610 # wait until ready
1611 RO_nslcmop_id = RO_desc["instance_action_id"]
1612 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1613
1614 RO_task_done = False
1615 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1616 detailed_status_old = None
1617 self.logger.debug(logging_text + step)
1618
1619 deployment_timeout = 1 * 3600 # One hours
1620 while deployment_timeout > 0:
1621 if not RO_task_done:
1622 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1623 extra_item_id=RO_nslcmop_id)
1624 ns_status, ns_status_info = RO.check_action_status(desc)
1625 if ns_status == "ERROR":
1626 raise ROclient.ROClientException(ns_status_info)
1627 elif ns_status == "BUILD":
1628 detailed_status = step + "; {}".format(ns_status_info)
1629 elif ns_status == "ACTIVE":
1630 RO_task_done = True
1631 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1632 self.logger.debug(logging_text + step)
1633 else:
1634 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1635 else:
1636 desc = await RO.show("ns", RO_nsr_id)
1637 ns_status, ns_status_info = RO.check_ns_status(desc)
1638 if ns_status == "ERROR":
1639 raise ROclient.ROClientException(ns_status_info)
1640 elif ns_status == "BUILD":
1641 detailed_status = step + "; {}".format(ns_status_info)
1642 elif ns_status == "ACTIVE":
1643 step = detailed_status = "Waiting for management IP address reported by the VIM"
1644 try:
1645 desc = await RO.show("ns", RO_nsr_id)
1646 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
1647 break
1648 except ROclient.ROClientException as e:
1649 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
1650 raise e
1651 else:
1652 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1653 if detailed_status != detailed_status_old:
1654 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1655 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1656
1657 await asyncio.sleep(5, loop=self.loop)
1658 deployment_timeout -= 5
1659 if deployment_timeout <= 0:
1660 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1661
1662 step = "Updating VNFRs"
1663 # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info)
1664 self.ns_update_vnfr_2({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1665
1666 # update VDU_SCALING_INFO with the obtained ip_addresses
1667 if vdu_scaling_info["scaling_direction"] == "OUT":
1668 for vdur in reversed(db_vnfr["vdur"]):
1669 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1670 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1671 vdu_scaling_info["vdu"].append({
1672 "name": vdur["name"],
1673 "vdu_id": vdur["vdu-id-ref"],
1674 "interface": []
1675 })
1676 for interface in vdur["interfaces"]:
1677 vdu_scaling_info["vdu"][-1]["interface"].append({
1678 "name": interface["name"],
1679 "ip_address": interface["ip-address"],
1680 "mac_address": interface.get("mac-address"),
1681 })
1682 del vdu_scaling_info["vdu-create"]
1683
1684 if db_nsr_update:
1685 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1686
1687 # execute primitive service POST-SCALING
1688 step = "Executing post-scale vnf-config-primitive"
1689 if scaling_descriptor.get("scaling-config-action"):
1690 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1691 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1692 and scaling_type == "SCALE_OUT":
1693 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1694 step = db_nslcmop_update["detailed-status"] = \
1695 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1696 # look for primitive
1697 primitive_params = {}
1698 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1699 if config_primitive["name"] == vnf_config_primitive:
1700 for parameter in config_primitive.get("parameter", ()):
1701 if 'default-value' in parameter and \
1702 parameter['default-value'] == "<VDU_SCALE_INFO>":
1703 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1704 default_flow_style=True,
1705 width=256)
1706 break
1707 else:
1708 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1709 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1710 "match any vnf-cnfiguration:config-primitive".format(scaling_group,
1711 config_primitive))
1712 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1713 vnf_config_primitive, primitive_params)
1714 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1715 vnf_config_primitive, result, result_detail))
1716 if result == "FAILED":
1717 raise LcmException(result_detail)
1718
1719 db_nslcmop_update["operationState"] = "COMPLETED"
1720 db_nslcmop_update["statusEnteredTime"] = time()
1721 db_nslcmop_update["detailed-status"] = "done"
1722 db_nsr_update["detailed-status"] = "done"
1723 try:
1724 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1725 if cooldown_time:
1726 await asyncio.sleep(cooldown_time)
1727 await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1728 except Exception as e:
1729 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1730 self.logger.debug(logging_text + "Exit Ok")
1731 return
1732 except (ROclient.ROClientException, DbException, LcmException) as e:
1733 self.logger.error(logging_text + "Exit Exception {}".format(e))
1734 exc = e
1735 except asyncio.CancelledError:
1736 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1737 exc = "Operation was cancelled"
1738 except Exception as e:
1739 exc = traceback.format_exc()
1740 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1741 finally:
1742 if exc:
1743 db_nsr_update = None
1744 if db_nslcmop:
1745 db_nslcmop_update = {
1746 "detailed-status": "FAILED {}: {}".format(step, exc),
1747 "operationState": "FAILED",
1748 "statusEnteredTime": time(),
1749 }
1750 if db_nslcmop_update:
1751 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1752 if db_nsr_update:
1753 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1754
1755 async def test(self, param=None):
1756 self.logger.debug("Starting/Ending test task: {}".format(param))
1757
1758 def cancel_tasks(self, topic, _id):
1759 """
1760 Cancel all active tasks of a concrete nsr or vim identified for _id
1761 :param topic: can be ns or vim_account
1762 :param _id: nsr or vim identity
1763 :return: None, or raises an exception if not possible
1764 """
1765 if topic == "ns":
1766 lcm_tasks = self.lcm_ns_tasks
1767 elif topic == "vim_account":
1768 lcm_tasks = self.lcm_vim_tasks
1769 elif topic == "sdn":
1770 lcm_tasks = self.lcm_sdn_tasks
1771
1772 if not lcm_tasks.get(_id):
1773 return
1774 for order_id, tasks_set in lcm_tasks[_id].items():
1775 for task_name, task in tasks_set.items():
1776 result = task.cancel()
1777 if result:
1778 self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
1779 lcm_tasks[_id] = {}
1780
1781 async def kafka_ping(self):
1782 self.logger.debug("Task kafka_ping Enter")
1783 consecutive_errors = 0
1784 first_start = True
1785 kafka_has_received = False
1786 self.pings_not_received = 1
1787 while True:
1788 try:
1789 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
1790 # time between pings are low when it is not received and at starting
1791 wait_time = 5 if not kafka_has_received else 120
1792 if not self.pings_not_received:
1793 kafka_has_received = True
1794 self.pings_not_received += 1
1795 await asyncio.sleep(wait_time, loop=self.loop)
1796 if self.pings_not_received > 10:
1797 raise LcmException("It is not receiving pings from Kafka bus")
1798 consecutive_errors = 0
1799 first_start = False
1800 except LcmException:
1801 raise
1802 except Exception as e:
1803 # if not first_start is the first time after starting. So leave more time and wait
1804 # to allow kafka starts
1805 if consecutive_errors == 8 if not first_start else 30:
1806 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1807 raise
1808 consecutive_errors += 1
1809 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1810 wait_time = 1 if not first_start else 5
1811 await asyncio.sleep(wait_time, loop=self.loop)
1812
1813 async def kafka_read(self):
1814 self.logger.debug("Task kafka_read Enter")
1815 order_id = 1
1816 # future = asyncio.Future()
1817 consecutive_errors = 0
1818 first_start = True
1819 while consecutive_errors < 10:
1820 try:
1821 topics = ("admin", "ns", "vim_account", "sdn")
1822 topic, command, params = await self.msg.aioread(topics, self.loop)
1823 if topic != "admin" and command != "ping":
1824 self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
1825 consecutive_errors = 0
1826 first_start = False
1827 order_id += 1
1828 if command == "exit":
1829 print("Bye!")
1830 break
1831 elif command.startswith("#"):
1832 continue
1833 elif command == "echo":
1834 # just for test
1835 print(params)
1836 sys.stdout.flush()
1837 continue
1838 elif command == "test":
1839 asyncio.Task(self.test(params), loop=self.loop)
1840 continue
1841
1842 if topic == "admin":
1843 if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
1844 self.pings_not_received = 0
1845 continue
1846 elif topic == "ns":
1847 if command == "instantiate":
1848 # self.logger.debug("Deploying NS {}".format(nsr_id))
1849 nslcmop = params
1850 nslcmop_id = nslcmop["_id"]
1851 nsr_id = nslcmop["nsInstanceId"]
1852 task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id))
1853 if nsr_id not in self.lcm_ns_tasks:
1854 self.lcm_ns_tasks[nsr_id] = {}
1855 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task}
1856 continue
1857 elif command == "terminate":
1858 # self.logger.debug("Deleting NS {}".format(nsr_id))
1859 nslcmop = params
1860 nslcmop_id = nslcmop["_id"]
1861 nsr_id = nslcmop["nsInstanceId"]
1862 self.cancel_tasks(topic, nsr_id)
1863 task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id))
1864 if nsr_id not in self.lcm_ns_tasks:
1865 self.lcm_ns_tasks[nsr_id] = {}
1866 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task}
1867 continue
1868 elif command == "action":
1869 # self.logger.debug("Update NS {}".format(nsr_id))
1870 nslcmop = params
1871 nslcmop_id = nslcmop["_id"]
1872 nsr_id = nslcmop["nsInstanceId"]
1873 task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id))
1874 if nsr_id not in self.lcm_ns_tasks:
1875 self.lcm_ns_tasks[nsr_id] = {}
1876 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
1877 continue
1878 elif command == "scale":
1879 # self.logger.debug("Update NS {}".format(nsr_id))
1880 nslcmop = params
1881 nslcmop_id = nslcmop["_id"]
1882 nsr_id = nslcmop["nsInstanceId"]
1883 task = asyncio.ensure_future(self.ns_scale(nsr_id, nslcmop_id))
1884 if nsr_id not in self.lcm_ns_tasks:
1885 self.lcm_ns_tasks[nsr_id] = {}
1886 self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_scale": task}
1887 continue
1888 elif command == "show":
1889 try:
1890 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1891 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1892 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1893 "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"],
1894 db_nsr["detailed-status"],
1895 db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id)))
1896 except Exception as e:
1897 print("nsr {} not found: {}".format(nsr_id, e))
1898 sys.stdout.flush()
1899 continue
1900 elif command == "deleted":
1901 continue # TODO cleaning of task just in case should be done
1902 elif topic == "vim_account":
1903 vim_id = params["_id"]
1904 if command == "create":
1905 task = asyncio.ensure_future(self.vim_create(params, order_id))
1906 if vim_id not in self.lcm_vim_tasks:
1907 self.lcm_vim_tasks[vim_id] = {}
1908 self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task}
1909 continue
1910 elif command == "delete":
1911 self.cancel_tasks(topic, vim_id)
1912 task = asyncio.ensure_future(self.vim_delete(vim_id, order_id))
1913 if vim_id not in self.lcm_vim_tasks:
1914 self.lcm_vim_tasks[vim_id] = {}
1915 self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task}
1916 continue
1917 elif command == "show":
1918 print("not implemented show with vim_account")
1919 sys.stdout.flush()
1920 continue
1921 elif command == "edit":
1922 task = asyncio.ensure_future(self.vim_edit(params, order_id))
1923 if vim_id not in self.lcm_vim_tasks:
1924 self.lcm_vim_tasks[vim_id] = {}
1925 self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task}
1926 continue
1927 elif topic == "sdn":
1928 _sdn_id = params["_id"]
1929 if command == "create":
1930 task = asyncio.ensure_future(self.sdn_create(params, order_id))
1931 if _sdn_id not in self.lcm_sdn_tasks:
1932 self.lcm_sdn_tasks[_sdn_id] = {}
1933 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task}
1934 continue
1935 elif command == "delete":
1936 self.cancel_tasks(topic, _sdn_id)
1937 task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id))
1938 if _sdn_id not in self.lcm_sdn_tasks:
1939 self.lcm_sdn_tasks[_sdn_id] = {}
1940 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task}
1941 continue
1942 elif command == "edit":
1943 task = asyncio.ensure_future(self.sdn_edit(params, order_id))
1944 if _sdn_id not in self.lcm_sdn_tasks:
1945 self.lcm_sdn_tasks[_sdn_id] = {}
1946 self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task}
1947 continue
1948 self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
1949 except Exception as e:
1950 # if not first_start is the first time after starting. So leave more time and wait
1951 # to allow kafka starts
1952 if consecutive_errors == 8 if not first_start else 30:
1953 self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e))
1954 raise
1955 consecutive_errors += 1
1956 self.logger.error("Task kafka_read retrying after Exception {}".format(e))
1957 wait_time = 2 if not first_start else 5
1958 await asyncio.sleep(wait_time, loop=self.loop)
1959
1960 # self.logger.debug("Task kafka_read terminating")
1961 self.logger.debug("Task kafka_read exit")
1962
1963 def start(self):
1964 self.loop = asyncio.get_event_loop()
1965
1966 # check RO version
1967 self.loop.run_until_complete(self.check_RO_version())
1968
1969 self.loop.run_until_complete(asyncio.gather(
1970 self.kafka_read(),
1971 self.kafka_ping()
1972 ))
1973 # TODO
1974 # self.logger.debug("Terminating cancelling creation tasks")
1975 # self.cancel_tasks("ALL", "create")
1976 # timeout = 200
1977 # while self.is_pending_tasks():
1978 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1979 # await asyncio.sleep(2, loop=self.loop)
1980 # timeout -= 2
1981 # if not timeout:
1982 # self.cancel_tasks("ALL", "ALL")
1983 self.loop.close()
1984 self.loop = None
1985 if self.db:
1986 self.db.db_disconnect()
1987 if self.msg:
1988 self.msg.disconnect()
1989 if self.fs:
1990 self.fs.fs_disconnect()
1991
1992 def read_config_file(self, config_file):
1993 # TODO make a [ini] + yaml inside parser
1994 # the configparser library is not suitable, because it does not admit comments at the end of line,
1995 # and not parse integer or boolean
1996 try:
1997 with open(config_file) as f:
1998 conf = yaml.load(f)
1999 for k, v in environ.items():
2000 if not k.startswith("OSMLCM_"):
2001 continue
2002 k_items = k.lower().split("_")
2003 c = conf
2004 try:
2005 for k_item in k_items[1:-1]:
2006 if k_item in ("ro", "vca"):
2007 # put in capital letter
2008 k_item = k_item.upper()
2009 c = c[k_item]
2010 if k_items[-1] == "port":
2011 c[k_items[-1]] = int(v)
2012 else:
2013 c[k_items[-1]] = v
2014 except Exception as e:
2015 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
2016
2017 return conf
2018 except Exception as e:
2019 self.logger.critical("At config file '{}': {}".format(config_file, e))
2020 exit(1)
2021
2022
2023 def usage():
2024 print("""Usage: {} [options]
2025 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
2026 -h|--help: shows this help
2027 """.format(sys.argv[0]))
2028 # --log-socket-host HOST: send logs to this host")
2029 # --log-socket-port PORT: send logs using this port (default: 9022)")
2030
2031
2032 if __name__ == '__main__':
2033 try:
2034 # load parameters and configuration
2035 opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
2036 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
2037 config_file = None
2038 for o, a in opts:
2039 if o in ("-h", "--help"):
2040 usage()
2041 sys.exit()
2042 elif o in ("-c", "--config"):
2043 config_file = a
2044 # elif o == "--log-socket-port":
2045 # log_socket_port = a
2046 # elif o == "--log-socket-host":
2047 # log_socket_host = a
2048 # elif o == "--log-file":
2049 # log_file = a
2050 else:
2051 assert False, "Unhandled option"
2052 if config_file:
2053 if not path.isfile(config_file):
2054 print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
2055 exit(1)
2056 else:
2057 for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
2058 if path.isfile(config_file):
2059 break
2060 else:
2061 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
2062 exit(1)
2063 lcm = Lcm(config_file)
2064 lcm.start()
2065 except (LcmException, getopt.GetoptError) as e:
2066 print(str(e), file=sys.stderr)
2067 # usage()
2068 exit(1)