2 # -*- coding: utf-8 -*-
8 import logging
.handlers
13 from osm_common
import dbmemory
14 from osm_common
import dbmongo
15 from osm_common
import fslocal
16 from osm_common
import msglocal
17 from osm_common
import msgkafka
18 from osm_common
.dbbase
import DbException
19 from osm_common
.fsbase
import FsException
20 from osm_common
.msgbase
import MsgException
21 from os
import environ
, path
22 from n2vc
.vnf
import N2VC
23 from n2vc
import version
as N2VC_version
25 from copy
import deepcopy
26 from http
import HTTPStatus
30 __author__
= "Alfonso Tierno"
31 min_RO_version
= [0, 5, 69]
34 class LcmException(Exception):
40 def __init__(self
, config_file
):
42 Init, Connect to database, filesystem storage, and messaging
43 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
50 self
.pings_not_received
= 1
52 # contains created tasks/futures to be able to cancel
53 self
.lcm_ns_tasks
= {}
54 self
.lcm_vim_tasks
= {}
55 self
.lcm_sdn_tasks
= {}
57 self
.logger
= logging
.getLogger('lcm')
59 config
= self
.read_config_file(config_file
)
62 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
63 "tenant": config
.get("tenant", "osm"),
64 "logger_name": "lcm.ROclient",
68 self
.vca
= config
["VCA"] # TODO VCA
72 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
73 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
74 config
["database"]["logger_name"] = "lcm.db"
75 config
["storage"]["logger_name"] = "lcm.fs"
76 config
["message"]["logger_name"] = "lcm.msg"
77 if "logfile" in config
["global"]:
78 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
79 maxBytes
=100e6
, backupCount
=9, delay
=0)
80 file_handler
.setFormatter(log_formatter_simple
)
81 self
.logger
.addHandler(file_handler
)
83 str_handler
= logging
.StreamHandler()
84 str_handler
.setFormatter(log_formatter_simple
)
85 self
.logger
.addHandler(str_handler
)
87 if config
["global"].get("loglevel"):
88 self
.logger
.setLevel(config
["global"]["loglevel"])
90 # logging other modules
91 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
92 config
[k1
]["logger_name"] = logname
93 logger_module
= logging
.getLogger(logname
)
94 if "logfile" in config
[k1
]:
95 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
96 maxBytes
=100e6
, backupCount
=9, delay
=0)
97 file_handler
.setFormatter(log_formatter_simple
)
98 logger_module
.addHandler(file_handler
)
99 if "loglevel" in config
[k1
]:
100 logger_module
.setLevel(config
[k1
]["loglevel"])
104 server
=config
['VCA']['host'],
105 port
=config
['VCA']['port'],
106 user
=config
['VCA']['user'],
107 secret
=config
['VCA']['secret'],
108 # TODO: This should point to the base folder where charms are stored,
109 # if there is a common one (like object storage). Otherwise, leave
110 # it unset and pass it via DeployCharms
111 # artifacts=config['VCA'][''],
114 # check version of N2VC
115 # TODO enhance with int conversion or from distutils.version import LooseVersion
116 # or with list(map(int, version.split(".")))
117 if N2VC_version
< "0.0.2":
118 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version
))
121 # TODO check database version
122 if config
["database"]["driver"] == "mongo":
123 self
.db
= dbmongo
.DbMongo()
124 self
.db
.db_connect(config
["database"])
125 elif config
["database"]["driver"] == "memory":
126 self
.db
= dbmemory
.DbMemory()
127 self
.db
.db_connect(config
["database"])
129 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
130 config
["database"]["driver"]))
132 if config
["storage"]["driver"] == "local":
133 self
.fs
= fslocal
.FsLocal()
134 self
.fs
.fs_connect(config
["storage"])
136 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
137 config
["storage"]["driver"]))
139 if config
["message"]["driver"] == "local":
140 self
.msg
= msglocal
.MsgLocal()
141 self
.msg
.connect(config
["message"])
142 elif config
["message"]["driver"] == "kafka":
143 self
.msg
= msgkafka
.MsgKafka()
144 self
.msg
.connect(config
["message"])
146 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
147 config
["storage"]["driver"]))
148 except (DbException
, FsException
, MsgException
) as e
:
149 self
.logger
.critical(str(e
), exc_info
=True)
150 raise LcmException(str(e
))
152 async def check_RO_version(self
):
154 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
155 RO_version
= await RO
.get_version()
156 if RO_version
< min_RO_version
:
157 raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format(
158 *RO_version
, *min_RO_version
160 except ROclient
.ROClientException
as e
:
161 self
.logger
.critical("Error while conneting to osm/RO " + str(e
), exc_info
=True)
162 raise LcmException(str(e
))
164 def update_db(self
, item
, _id
, _desc
):
166 self
.db
.replace(item
, _id
, _desc
)
167 except DbException
as e
:
168 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
170 def update_db_2(self
, item
, _id
, _desc
):
172 Updates database with _desc information. Upon success _desc is cleared
181 self
.db
.set_one(item
, {"_id": _id
}, _desc
)
183 except DbException
as e
:
184 self
.logger
.error("Updating {} _id={} with '{}'. Error: {}".format(item
, _id
, _desc
, e
))
186 async def vim_create(self
, vim_content
, order_id
):
187 vim_id
= vim_content
["_id"]
188 logging_text
= "Task vim_create={} ".format(vim_id
)
189 self
.logger
.debug(logging_text
+ "Enter")
194 step
= "Getting vim-id='{}' from db".format(vim_id
)
195 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
196 if "_admin" not in db_vim
:
197 db_vim
["_admin"] = {}
198 if "deployed" not in db_vim
["_admin"]:
199 db_vim
["_admin"]["deployed"] = {}
200 db_vim
["_admin"]["deployed"]["RO"] = None
201 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
202 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
203 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
204 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
205 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
207 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
208 vim_content
["config"]["sdn-controller"]))
210 step
= "Creating vim at RO"
211 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
212 vim_RO
= deepcopy(vim_content
)
213 vim_RO
.pop("_id", None)
214 vim_RO
.pop("_admin", None)
215 vim_RO
.pop("schema_version", None)
216 vim_RO
.pop("schema_type", None)
217 vim_RO
.pop("vim_tenant_name", None)
218 vim_RO
["type"] = vim_RO
.pop("vim_type")
219 vim_RO
.pop("vim_user", None)
220 vim_RO
.pop("vim_password", None)
222 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
223 desc
= await RO
.create("vim", descriptor
=vim_RO
)
224 RO_vim_id
= desc
["uuid"]
225 db_vim
["_admin"]["deployed"]["RO"] = RO_vim_id
226 self
.update_db("vim_accounts", vim_id
, db_vim
)
228 step
= "Creating vim_account at RO"
229 vim_account_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
230 "vim_username": vim_content
["vim_user"],
231 "vim_password": vim_content
["vim_password"]
233 if vim_RO
.get("config"):
234 vim_account_RO
["config"] = vim_RO
["config"]
235 if "sdn-controller" in vim_account_RO
["config"]:
236 del vim_account_RO
["config"]["sdn-controller"]
237 if "sdn-port-mapping" in vim_account_RO
["config"]:
238 del vim_account_RO
["config"]["sdn-port-mapping"]
239 await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_account_RO
)
240 db_vim
["_admin"]["operationalState"] = "ENABLED"
241 self
.update_db("vim_accounts", vim_id
, db_vim
)
243 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
246 except (ROclient
.ROClientException
, DbException
) as e
:
247 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
249 except Exception as e
:
250 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
254 db_vim
["_admin"]["operationalState"] = "ERROR"
255 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
256 self
.update_db("vim_accounts", vim_id
, db_vim
)
258 async def vim_edit(self
, vim_content
, order_id
):
259 vim_id
= vim_content
["_id"]
260 logging_text
= "Task vim_edit={} ".format(vim_id
)
261 self
.logger
.debug(logging_text
+ "Enter")
265 step
= "Getting vim-id='{}' from db".format(vim_id
)
267 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
268 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
269 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
270 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
271 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
272 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get(
274 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
276 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
277 vim_content
["config"]["sdn-controller"]))
279 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
280 step
= "Editing vim at RO"
281 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
282 vim_RO
= deepcopy(vim_content
)
283 vim_RO
.pop("_id", None)
284 vim_RO
.pop("_admin", None)
285 vim_RO
.pop("schema_version", None)
286 vim_RO
.pop("schema_type", None)
287 vim_RO
.pop("vim_tenant_name", None)
288 if "vim_type" in vim_RO
:
289 vim_RO
["type"] = vim_RO
.pop("vim_type")
290 vim_RO
.pop("vim_user", None)
291 vim_RO
.pop("vim_password", None)
293 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
294 # TODO make a deep update of sdn-port-mapping
296 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
298 step
= "Editing vim-account at RO tenant"
300 if "config" in vim_content
:
301 if "sdn-controller" in vim_content
["config"]:
302 del vim_content
["config"]["sdn-controller"]
303 if "sdn-port-mapping" in vim_content
["config"]:
304 del vim_content
["config"]["sdn-port-mapping"]
305 if not vim_content
["config"]:
306 del vim_content
["config"]
307 for k
in ("vim_tenant_name", "vim_password", "config"):
309 vim_account_RO
[k
] = vim_content
[k
]
310 if "vim_user" in vim_content
:
311 vim_content
["vim_username"] = vim_content
["vim_user"]
312 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
313 # vim_thread. RO will remove and relaunch a new thread for this vim_account
314 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
315 db_vim
["_admin"]["operationalState"] = "ENABLED"
316 self
.update_db("vim_accounts", vim_id
, db_vim
)
318 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
321 except (ROclient
.ROClientException
, DbException
) as e
:
322 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
324 except Exception as e
:
325 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
329 db_vim
["_admin"]["operationalState"] = "ERROR"
330 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
331 self
.update_db("vim_accounts", vim_id
, db_vim
)
333 async def vim_delete(self
, vim_id
, order_id
):
334 logging_text
= "Task vim_delete={} ".format(vim_id
)
335 self
.logger
.debug(logging_text
+ "Enter")
338 step
= "Getting vim from db"
340 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
341 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
342 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
343 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
344 step
= "Detaching vim from RO tenant"
346 await RO
.detach_datacenter(RO_vim_id
)
347 except ROclient
.ROClientException
as e
:
348 if e
.http_code
== 404: # not found
349 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
353 step
= "Deleting vim from RO"
355 await RO
.delete("vim", RO_vim_id
)
356 except ROclient
.ROClientException
as e
:
357 if e
.http_code
== 404: # not found
358 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
363 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
364 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
365 self
.logger
.debug("vim_delete task vim_id={} Exit Ok".format(vim_id
))
368 except (ROclient
.ROClientException
, DbException
) as e
:
369 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
371 except Exception as e
:
372 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
376 db_vim
["_admin"]["operationalState"] = "ERROR"
377 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
378 self
.update_db("vim_accounts", vim_id
, db_vim
)
380 async def sdn_create(self
, sdn_content
, order_id
):
381 sdn_id
= sdn_content
["_id"]
382 logging_text
= "Task sdn_create={} ".format(sdn_id
)
383 self
.logger
.debug(logging_text
+ "Enter")
387 step
= "Getting sdn from db"
388 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
389 if "_admin" not in db_sdn
:
390 db_sdn
["_admin"] = {}
391 if "deployed" not in db_sdn
["_admin"]:
392 db_sdn
["_admin"]["deployed"] = {}
393 db_sdn
["_admin"]["deployed"]["RO"] = None
395 step
= "Creating sdn at RO"
396 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
397 sdn_RO
= deepcopy(sdn_content
)
398 sdn_RO
.pop("_id", None)
399 sdn_RO
.pop("_admin", None)
400 sdn_RO
.pop("schema_version", None)
401 sdn_RO
.pop("schema_type", None)
402 sdn_RO
.pop("description", None)
403 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
404 RO_sdn_id
= desc
["uuid"]
405 db_sdn
["_admin"]["deployed"]["RO"] = RO_sdn_id
406 db_sdn
["_admin"]["operationalState"] = "ENABLED"
407 self
.update_db("sdns", sdn_id
, db_sdn
)
408 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
411 except (ROclient
.ROClientException
, DbException
) as e
:
412 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
414 except Exception as e
:
415 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
419 db_sdn
["_admin"]["operationalState"] = "ERROR"
420 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
421 self
.update_db("sdns", sdn_id
, db_sdn
)
423 async def sdn_edit(self
, sdn_content
, order_id
):
424 sdn_id
= sdn_content
["_id"]
425 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
426 self
.logger
.debug(logging_text
+ "Enter")
429 step
= "Getting sdn from db"
431 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
432 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
433 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
434 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
435 step
= "Editing sdn at RO"
436 sdn_RO
= deepcopy(sdn_content
)
437 sdn_RO
.pop("_id", None)
438 sdn_RO
.pop("_admin", None)
439 sdn_RO
.pop("schema_version", None)
440 sdn_RO
.pop("schema_type", None)
441 sdn_RO
.pop("description", None)
443 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
444 db_sdn
["_admin"]["operationalState"] = "ENABLED"
445 self
.update_db("sdns", sdn_id
, db_sdn
)
447 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
450 except (ROclient
.ROClientException
, DbException
) as e
:
451 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
453 except Exception as e
:
454 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
458 db_sdn
["_admin"]["operationalState"] = "ERROR"
459 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
460 self
.update_db("sdns", sdn_id
, db_sdn
)
462 async def sdn_delete(self
, sdn_id
, order_id
):
463 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
464 self
.logger
.debug(logging_text
+ "Enter")
467 step
= "Getting sdn from db"
469 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
470 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
471 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
472 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
473 step
= "Deleting sdn from RO"
475 await RO
.delete("sdn", RO_sdn_id
)
476 except ROclient
.ROClientException
as e
:
477 if e
.http_code
== 404: # not found
478 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
483 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
484 self
.db
.del_one("sdns", {"_id": sdn_id
})
485 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
488 except (ROclient
.ROClientException
, DbException
) as e
:
489 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
491 except Exception as e
:
492 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
496 db_sdn
["_admin"]["operationalState"] = "ERROR"
497 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
498 self
.update_db("sdns", sdn_id
, db_sdn
)
500 def vnfd2RO(self
, vnfd
, new_id
=None):
502 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
503 :param vnfd: input vnfd
504 :param new_id: overrides vnf id if provided
505 :return: copy of vnfd
509 vnfd_RO
= deepcopy(vnfd
)
510 vnfd_RO
.pop("_id", None)
511 vnfd_RO
.pop("_admin", None)
513 vnfd_RO
["id"] = new_id
514 for vdu
in vnfd_RO
["vdu"]:
515 if "cloud-init-file" in vdu
:
516 base_folder
= vnfd
["_admin"]["storage"]
517 clout_init_file
= "{}/{}/cloud_init/{}".format(
518 base_folder
["folder"],
519 base_folder
["pkg-dir"],
520 vdu
["cloud-init-file"]
522 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
523 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
524 # convert to base 64 or similar
525 clout_init_content
= ci_file
.read()
528 vdu
.pop("cloud-init-file", None)
529 vdu
["cloud-init"] = clout_init_content
530 # remnove unused by RO configuration, monitoring, scaling
531 vnfd_RO
.pop("vnf-configuration", None)
532 vnfd_RO
.pop("monitoring-param", None)
533 vnfd_RO
.pop("scaling-group-descriptor", None)
535 except FsException
as e
:
536 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
541 def n2vc_callback(self
, model_name
, application_name
, status
, message
, db_nsr
, db_nslcmop
, member_vnf_index
,
544 Callback both for charm status change and task completion
545 :param model_name: Charm model name
546 :param application_name: Charm application name
547 :param status: Can be
548 - blocked: The unit needs manual intervention
549 - maintenance: The unit is actively deploying/configuring
550 - waiting: The unit is waiting for another charm to be ready
551 - active: The unit is deployed, configured, and ready
552 - error: The charm has failed and needs attention.
553 - terminated: The charm has been destroyed
556 :param message: detailed message error
557 :param db_nsr: nsr database content
558 :param db_nslcmop: nslcmop database content
559 :param member_vnf_index: NSD member-vnf-index
560 :param task: None for charm status change, or task for completion task callback
566 db_nslcmop_update
= {}
568 nsr_id
= db_nsr
["_id"]
569 nslcmop_id
= db_nslcmop
["_id"]
570 nsr_lcm
= db_nsr
["_admin"]["deployed"]
571 ns_operation
= db_nslcmop
["lcmOperationType"]
572 logging_text
= "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id
, ns_operation
, nslcmop_id
,
577 self
.logger
.debug(logging_text
+ " task Cancelled")
578 # TODO update db_nslcmop
582 exc
= task
.exception()
584 self
.logger
.error(logging_text
+ " task Exception={}".format(exc
))
585 if ns_operation
in ("instantiate", "terminate"):
586 nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] = "error"
587 db_nsr_update
["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index
)] = \
589 nsr_lcm
["VCA"][member_vnf_index
]['detailed-status'] = str(exc
)
591 "_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index
)] = str(exc
)
592 elif ns_operation
== "action":
593 db_nslcmop_update
["operationState"] = "FAILED"
594 db_nslcmop_update
["detailed-status"] = str(exc
)
595 db_nslcmop_update
["statusEnteredTime"] = time()
599 self
.logger
.debug(logging_text
+ " task Done")
600 # TODO revise with Adam if action is finished and ok when task is done
601 if ns_operation
== "action":
602 db_nslcmop_update
["operationState"] = "COMPLETED"
603 db_nslcmop_update
["detailed-status"] = "Done"
604 db_nslcmop_update
["statusEnteredTime"] = time()
605 # task is Done, but callback is still ongoing. So ignore
608 self
.logger
.debug(logging_text
+ " Enter status={}".format(status
))
609 if nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] == status
:
610 return # same status, ignore
611 nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] = status
612 db_nsr_update
["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index
)] = status
613 nsr_lcm
["VCA"][member_vnf_index
]['detailed-status'] = str(message
)
614 db_nsr_update
["_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index
)] = str(message
)
616 self
.logger
.critical(logging_text
+ " Enter with bad parameters", exc_info
=True)
621 n2vc_error_text
= [] # contain text error list. If empty no one is in error status
622 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
623 vca_status
= vca_info
["operational-status"]
624 if vca_status
not in status_map
:
626 status_map
[vca_status
] = 0
627 status_map
[vca_status
] += 1
629 if vca_status
!= "active":
631 elif vca_status
in ("error", "blocked"):
632 n2vc_error_text
.append("member_vnf_index={} {}: {}".format(member_vnf_index
, vca_status
,
633 vca_info
["detailed-status"]))
636 self
.logger
.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id
,
638 db_nsr_update
["config-status"] = "configured"
639 db_nsr_update
["detailed-status"] = "done"
640 db_nslcmop_update
["operationState"] = "COMPLETED"
641 db_nslcmop_update
["detailed-status"] = "Done"
642 db_nslcmop_update
["statusEnteredTime"] = time()
643 elif n2vc_error_text
:
644 db_nsr_update
["config-status"] = "failed"
645 error_text
= "fail configuring " + ";".join(n2vc_error_text
)
646 db_nsr_update
["detailed-status"] = error_text
647 db_nslcmop_update
["operationState"] = "FAILED_TEMP"
648 db_nslcmop_update
["detailed-status"] = error_text
649 db_nslcmop_update
["statusEnteredTime"] = time()
653 for status
, num
in status_map
.items():
654 cs
+= separator
+ "{}: {}".format(status
, num
)
656 db_nsr_update
["config-status"] = cs
657 db_nsr_update
["detailed-status"] = cs
658 db_nslcmop_update
["detailed-status"] = cs
660 except Exception as e
:
661 self
.logger
.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index
, e
), exc_info
=True)
664 if db_nslcmop_update
:
665 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
667 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
668 except Exception as e
:
669 self
.logger
.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
670 member_vnf_index
, e
), exc_info
=True)
672 def ns_params_2_RO(self
, ns_params
):
674 Creates a RO ns descriptor from OSM ns_instantite params
675 :param ns_params: OSM instantiate params
676 :return: The RO ns descriptor
680 def vim_account_2_RO(vim_account
):
681 if vim_account
in vim_2_RO
:
682 return vim_2_RO
[vim_account
]
683 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
684 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
685 # #TODO check if VIM is creating and wait
686 if db_vim
["_admin"]["operationalState"] != "ENABLED":
687 raise LcmException("VIM={} is not available. operationalState={}".format(
688 vim_account
, db_vim
["_admin"]["operationalState"]))
689 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
690 vim_2_RO
[vim_account
] = RO_vim_id
696 # "name": ns_params["nsName"],
697 # "description": ns_params.get("nsDescription"),
698 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
699 # "scenario": ns_params["nsdId"],
703 if ns_params
.get("ssh-authorized-key"):
704 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh-authorized-key"]}
705 if ns_params
.get("vnf"):
706 for vnf
in ns_params
["vnf"]:
708 if "vimAccountId" in vnf
:
709 RO_vnf
["datacenter"] = vim_account_2_RO(vnf
["vimAccountId"])
711 RO_ns_params
["vnfs"][vnf
["member-vnf-index"]] = RO_vnf
712 if ns_params
.get("vld"):
713 for vld
in ns_params
["vld"]:
715 if "ip-profile" in vld
:
716 RO_vld
["ip-profile"] = vld
["ip-profile"]
717 if "vim-network-name" in vld
:
719 if isinstance(vld
["vim-network-name"], dict):
720 for vim_account
, vim_net
in vld
["vim-network-name"].items():
721 RO_vld
["sites"].append({
722 "netmap-use": vim_net
,
723 "datacenter": vim_account_2_RO(vim_account
)
725 else: # isinstance str
726 RO_vld
["sites"].append({"netmap-use": vld
["vim-network-name"]})
728 RO_ns_params
["networks"][vld
["name"]] = RO_vld
731 def ns_update_vnfr(self
, db_vnfrs
, ns_RO_info
):
733 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
738 for vnf_index
, db_vnfr
in db_vnfrs
.items():
739 vnfr_deployed
= ns_RO_info
.get(vnf_index
)
740 if not vnfr_deployed
:
743 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnfr_deployed
.get("ip_address")
744 for index
, vdur
in enumerate(db_vnfr
["vdur"]):
745 vdu_deployed
= vnfr_deployed
["vdur"].get(vdur
["vdu-id-ref"])
748 vnfr_update
["vdur.{}.vim-id".format(index
)] = vdu_deployed
.get("vim_id")
749 db_vnfr
["vdur"][index
]["vim-id"] = vnfr_update
["vdur.{}.vim-id".format(index
)]
750 vnfr_update
["vdur.{}.ip-address".format(index
)] = vdu_deployed
.get("ip_address")
751 db_vnfr
["vdur"][index
]["ip-address"] = vnfr_update
["vdur.{}.ip-address".format(index
)]
752 for index2
, interface
in enumerate(vdur
["interfaces"]):
753 iface_deployed
= vdu_deployed
["interfaces"].get(interface
["name"])
754 if not iface_deployed
:
756 db_vnfr
["vdur"][index
]["interfaces"][index2
]["vim-id"] =\
757 vnfr_update
["vdur.{}.interfaces.{}.vim-id".format(index
, index2
)] = iface_deployed
.get("vim_id")
758 db_vnfr
["vdur"][index
]["interfaces"][index2
]["ip-address"] =\
759 vnfr_update
["vdur.{}.interfaces.{}.ip-address".format(index
, index2
)] = iface_deployed
.get(
761 db_vnfr
["vdur"][index
]["interfaces"][index2
]["mac-address"] =\
762 vnfr_update
["vdur.{}.interfaces.{}.mac-address".format(index
, index2
)] = iface_deployed
.get(
764 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
766 def ns_update_vnfr_2(self
, db_vnfrs
, nsr_desc_RO
):
768 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
773 for vnf_index
, db_vnfr
in db_vnfrs
.items():
774 for vnf_RO
in nsr_desc_RO
["vnfs"]:
775 if vnf_RO
["member_vnf_index"] == vnf_index
:
777 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
.get("ip_address")
779 for vdur_RO
in vnf_RO
.get("vms", ()):
781 "vim-id": vdur_RO
.get("vim_vm_id"),
782 "ip-address": vdur_RO
.get("ip_address"),
783 "vdu-id-ref": vdur_RO
.get("vdu_osm_id"),
784 "name": vdur_RO
.get("vim_name"),
785 "status": vdur_RO
.get("status"),
786 "status-detailed": vdur_RO
.get("error_msg"),
790 for interface_RO
in vdur_RO
.get("interfaces", ()):
791 vdur
["interfaces"].append({
792 "ip-address": interface_RO
.get("ip_address"),
793 "mac-address": interface_RO
.get("mac_address"),
794 "name": interface_RO
.get("external_name"),
796 vdur_list
.append(vdur
)
797 db_vnfr
["vdur"] = vnfr_update
["vdur"] = vdur_list
798 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
802 raise LcmException("ns_update_vnfr_2: Not found member_vnf_index={} at RO info".format(vnf_index
))
804 async def create_monitoring(self
, nsr_id
, vnf_member_index
, vnfd_desc
):
805 if not vnfd_desc
.get("scaling-group-descriptor"):
807 for scaling_group
in vnfd_desc
["scaling-group-descriptor"]:
808 scaling_policy_desc
= {}
811 "scaling_group_descriptor": {
812 "name": scaling_group
["name"],
813 "scaling_policy": scaling_policy_desc
816 for scaling_policy
in scaling_group
.get("scaling-policy"):
817 scaling_policy_desc
["scale_in_operation_type"] = scaling_policy_desc
["scale_out_operation_type"] = \
818 scaling_policy
["scaling-type"]
819 scaling_policy_desc
["threshold_time"] = scaling_policy
["threshold-time"]
820 scaling_policy_desc
["cooldown_time"] = scaling_policy
["cooldown-time"]
821 scaling_policy_desc
["scaling_criteria"] = []
822 for scaling_criteria
in scaling_policy
.get("scaling-criteria"):
823 scaling_criteria_desc
= {"scale_in_threshold": scaling_criteria
.get("scale-in-threshold"),
824 "scale_out_threshold": scaling_criteria
.get("scale-out-threshold"),
826 if not scaling_criteria
.get("vnf-monitoring-param-ref"):
828 for monitoring_param
in vnfd_desc
.get("monitoring-param", ()):
829 if monitoring_param
["id"] == scaling_criteria
["vnf-monitoring-param-ref"]:
830 scaling_criteria_desc
["monitoring_param"] = {
831 "id": monitoring_param
["id"],
832 "name": monitoring_param
["name"],
833 "aggregation_type": monitoring_param
.get("aggregation-type"),
834 "vdu_name": monitoring_param
.get("vdu-ref"),
835 "vnf_member_index": vnf_member_index
,
838 scaling_policy_desc
["scaling_criteria"].append(scaling_criteria_desc
)
842 "Task ns={} member_vnf_index={} Invalid vnfd vnf-monitoring-param-ref={} not in "
843 "monitoring-param list".format(nsr_id
, vnf_member_index
,
844 scaling_criteria
["vnf-monitoring-param-ref"]))
846 await self
.msg
.aiowrite("lcm_pm", "configure_scaling", scaling_desc
, self
.loop
)
848 async def ns_instantiate(self
, nsr_id
, nslcmop_id
):
849 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
850 self
.logger
.debug(logging_text
+ "Enter")
851 # get all needed from database
855 db_nslcmop_update
= {}
857 RO_descriptor_number
= 0 # number of descriptors created at RO
858 descriptor_id_2_RO
= {} # map between vnfd/nsd id to the id used at RO
861 step
= "Getting nslcmop={} from db".format(nslcmop_id
)
862 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
863 step
= "Getting nsr={} from db".format(nsr_id
)
864 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
866 nsr_name
= db_nsr
["name"] # TODO short-name??
868 vnfr_filter
= {"nsr-id-ref": nsr_id
, "member-vnf-index-ref": None}
869 for c_vnf
in nsd
["constituent-vnfd"]:
870 vnfd_id
= c_vnf
["vnfd-id-ref"]
871 vnfr_filter
["member-vnf-index-ref"] = c_vnf
["member-vnf-index"]
872 step
= "Getting vnfr={} of nsr={} from db".format(c_vnf
["member-vnf-index"], nsr_id
)
873 db_vnfrs
[c_vnf
["member-vnf-index"]] = self
.db
.get_one("vnfrs", vnfr_filter
)
874 if vnfd_id
not in needed_vnfd
:
875 step
= "Getting vnfd={} from db".format(vnfd_id
)
876 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
878 nsr_lcm
= db_nsr
["_admin"].get("deployed")
880 nsr_lcm
= db_nsr
["_admin"]["deployed"] = {
882 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
886 db_nsr_update
["detailed-status"] = "creating"
887 db_nsr_update
["operational-status"] = "init"
889 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
891 # get vnfds, instantiate at RO
892 for vnfd_id
, vnfd
in needed_vnfd
.items():
893 step
= db_nsr_update
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
894 # self.logger.debug(logging_text + step)
895 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, vnfd_id
[:23])
896 descriptor_id_2_RO
[vnfd_id
] = vnfd_id_RO
897 RO_descriptor_number
+= 1
900 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
902 db_nsr_update
["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id
)] = vnfd_list
[0]["uuid"]
903 self
.logger
.debug(logging_text
+ "vnfd={} exists at RO. Using RO_id={}".format(
904 vnfd_id
, vnfd_list
[0]["uuid"]))
906 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
907 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
908 db_nsr_update
["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id
)] = desc
["uuid"]
909 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
910 self
.logger
.debug(logging_text
+ "vnfd={} created at RO. RO_id={}".format(
911 vnfd_id
, desc
["uuid"]))
912 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
916 step
= db_nsr_update
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
917 # self.logger.debug(logging_text + step)
919 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_id
[:23])
920 descriptor_id_2_RO
[nsd_id
] = RO_osm_nsd_id
921 RO_descriptor_number
+= 1
922 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
924 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
925 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
926 nsd_id
, RO_nsd_uuid
))
928 nsd_RO
= deepcopy(nsd
)
929 nsd_RO
["id"] = RO_osm_nsd_id
930 nsd_RO
.pop("_id", None)
931 nsd_RO
.pop("_admin", None)
932 for c_vnf
in nsd_RO
["constituent-vnfd"]:
933 vnfd_id
= c_vnf
["vnfd-id-ref"]
934 c_vnf
["vnfd-id-ref"] = descriptor_id_2_RO
[vnfd_id
]
935 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
936 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
937 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
938 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_id
, RO_nsd_uuid
))
939 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
942 # if present use it unless in error status
943 RO_nsr_id
= db_nsr
["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
946 step
= db_nsr_update
["detailed-status"] = "Looking for existing ns at RO"
947 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
948 desc
= await RO
.show("ns", RO_nsr_id
)
949 except ROclient
.ROClientException
as e
:
950 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
952 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
954 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
955 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
956 if ns_status
== "ERROR":
957 step
= db_nsr_update
["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
958 self
.logger
.debug(logging_text
+ step
)
959 await RO
.delete("ns", RO_nsr_id
)
960 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
962 step
= db_nsr_update
["detailed-status"] = "Creating ns at RO"
963 # self.logger.debug(logging_text + step)
964 RO_ns_params
= self
.ns_params_2_RO(db_nsr
.get("instantiate_params"))
965 desc
= await RO
.create("ns", descriptor
=RO_ns_params
,
967 scenario
=RO_nsd_uuid
)
968 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
969 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
970 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
971 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
972 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
974 # update VNFR vimAccount
975 step
= "Updating VNFR vimAcccount"
976 for vnf_index
, vnfr
in db_vnfrs
.items():
977 if vnfr
.get("vim-account-id"):
979 vnfr_update
= {"vim-account-id": db_nsr
["instantiate_params"]["vimAccountId"]}
980 if db_nsr
["instantiate_params"].get("vnf"):
981 for vnf_params
in db_nsr
["instantiate_params"]["vnf"]:
982 if vnf_params
.get("member-vnf-index") == vnf_index
:
983 if vnf_params
.get("vimAccountId"):
984 vnfr_update
["vim-account-id"] = vnf_params
.get("vimAccountId")
986 self
.update_db_2("vnfrs", vnfr
["_id"], vnfr_update
)
988 # wait until NS is ready
989 step
= ns_status_detailed
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
990 detailed_status_old
= None
991 self
.logger
.debug(logging_text
+ step
)
993 deployment_timeout
= 2 * 3600 # Two hours
994 while deployment_timeout
> 0:
995 desc
= await RO
.show("ns", RO_nsr_id
)
996 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
997 db_nsr_update
["admin.deployed.RO.nsr_status"] = ns_status
998 if ns_status
== "ERROR":
999 raise ROclient
.ROClientException(ns_status_info
)
1000 elif ns_status
== "BUILD":
1001 detailed_status
= ns_status_detailed
+ "; {}".format(ns_status_info
)
1002 elif ns_status
== "ACTIVE":
1003 step
= detailed_status
= "Waiting for management IP address reported by the VIM"
1005 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_info(desc
)
1007 except ROclient
.ROClientException
as e
:
1008 if e
.http_code
!= 409: # IP address is not ready return code is 409 CONFLICT
1011 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1012 if detailed_status
!= detailed_status_old
:
1013 detailed_status_old
= db_nsr_update
["detailed-status"] = detailed_status
1014 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1015 await asyncio
.sleep(5, loop
=self
.loop
)
1016 deployment_timeout
-= 5
1017 if deployment_timeout
<= 0:
1018 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1020 step
= "Updating VNFRs"
1021 # self.ns_update_vnfr(db_vnfrs, ns_RO_info)
1022 self
.ns_update_vnfr_2(db_vnfrs
, desc
)
1024 db_nsr
["detailed-status"] = "Configuring vnfr"
1025 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1027 # The parameters we'll need to deploy a charm
1028 number_to_configure
= 0
1031 """An inner function to deploy the charm from either vnf or vdu
1035 # if number_to_configure == 0:
1036 # self.logger.debug("Logging into N2VC...")
1037 # task = asyncio.ensure_future(self.n2vc.login())
1038 # yield from asyncio.wait_for(task, 30.0)
1039 # self.logger.debug("Logged into N2VC!")
1041 # # await self.n2vc.login()
1043 # Note: The charm needs to exist on disk at the location
1044 # specified by charm_path.
1045 base_folder
= vnfd
["_admin"]["storage"]
1046 storage_params
= self
.fs
.get_params()
1047 charm_path
= "{}{}/{}/charms/{}".format(
1048 storage_params
["path"],
1049 base_folder
["folder"],
1050 base_folder
["pkg-dir"],
1054 # Setup the runtime parameters for this VNF
1055 params
['rw_mgmt_ip'] = db_vnfrs
[vnf_index
]["ip-address"]
1057 # ns_name will be ignored in the current version of N2VC
1058 # but will be implemented for the next point release.
1059 model_name
= 'default'
1060 application_name
= self
.n2vc
.FormatApplicationName(
1065 if not nsr_lcm
.get("VCA"):
1067 nsr_lcm
["VCA"][vnf_index
] = db_nsr_update
["_admin.deployed.VCA.{}".format(vnf_index
)] = {
1068 "model": model_name
,
1069 "application": application_name
,
1070 "operational-status": "init",
1071 "detailed-status": "",
1074 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1076 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
,
1078 task
= asyncio
.ensure_future(
1079 self
.n2vc
.DeployCharms(
1080 model_name
, # The network service name
1081 application_name
, # The application name
1082 vnfd
, # The vnf descriptor
1083 charm_path
, # Path to charm
1084 params
, # Runtime params, like mgmt ip
1085 {}, # for native charms only
1086 self
.n2vc_callback
, # Callback for status changes
1087 db_nsr
, # Callback parameter
1089 vnf_index
, # Callback parameter
1090 None, # Callback parameter (task)
1093 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
, None, None,
1094 db_nsr
, db_nslcmop
, vnf_index
))
1095 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["create_charm:" + vnf_index
] = task
1097 step
= "Looking for needed vnfd to configure"
1098 self
.logger
.debug(logging_text
+ step
)
1100 for c_vnf
in nsd
["constituent-vnfd"]:
1101 vnfd_id
= c_vnf
["vnfd-id-ref"]
1102 vnf_index
= str(c_vnf
["member-vnf-index"])
1103 vnfd
= needed_vnfd
[vnfd_id
]
1105 # Check if this VNF has a charm configuration
1106 vnf_config
= vnfd
.get("vnf-configuration")
1108 if vnf_config
and vnf_config
.get("juju"):
1109 proxy_charm
= vnf_config
["juju"]["charm"]
1113 if 'initial-config-primitive' in vnf_config
:
1114 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
1116 # Login to the VCA. If there are multiple calls to login(),
1117 # subsequent calls will be a nop and return immediately.
1118 step
= "connecting to N2VC to configure vnf {}".format(vnf_index
)
1119 await self
.n2vc
.login()
1121 number_to_configure
+= 1
1123 # Deploy charms for each VDU that supports one.
1124 for vdu
in vnfd
['vdu']:
1125 vdu_config
= vdu
.get('vdu-configuration')
1129 if vdu_config
and vdu_config
.get("juju"):
1130 proxy_charm
= vdu_config
["juju"]["charm"]
1132 if 'initial-config-primitive' in vdu_config
:
1133 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
1136 step
= "connecting to N2VC to configure vdu {} from vnf {}".format(vdu
["id"], vnf_index
)
1137 await self
.n2vc
.login()
1139 number_to_configure
+= 1
1141 if number_to_configure
:
1142 db_nsr_update
["config-status"] = "configuring"
1143 db_nsr_update
["operational-status"] = "running"
1144 db_nsr_update
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
1145 db_nslcmop_update
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
1147 db_nslcmop_update
["operationState"] = "COMPLETED"
1148 db_nslcmop_update
["statusEnteredTime"] = time()
1149 db_nslcmop_update
["detailed-status"] = "done"
1150 db_nsr_update
["config-status"] = "configured"
1151 db_nsr_update
["detailed-status"] = "done"
1152 db_nsr_update
["operational-status"] = "running"
1153 step
= "Sending monitoring parameters to PM"
1154 # for c_vnf in nsd["constituent-vnfd"]:
1155 # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]])
1157 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
})
1158 except Exception as e
:
1159 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
1161 self
.logger
.debug(logging_text
+ "Exit")
1164 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
1165 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(step
, e
))
1167 except asyncio
.CancelledError
:
1168 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
1169 exc
= "Operation was cancelled"
1170 except Exception as e
:
1171 exc
= traceback
.format_exc()
1172 self
.logger
.critical(logging_text
+ "Exit Exception {} while '{}': {}".format(type(e
).__name
__, step
, e
),
1177 db_nsr_update
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1178 db_nsr_update
["operational-status"] = "failed"
1180 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
1181 db_nslcmop_update
["operationState"] = "FAILED"
1182 db_nslcmop_update
["statusEnteredTime"] = time()
1184 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1185 if db_nslcmop_update
:
1186 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1188 async def ns_terminate(self
, nsr_id
, nslcmop_id
):
1189 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
1190 self
.logger
.debug(logging_text
+ "Enter")
1194 failed_detail
= [] # annotates all failed error messages
1198 db_nslcmop_update
= {}
1200 step
= "Getting nslcmop={} from db".format(nslcmop_id
)
1201 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1202 step
= "Getting nsr={} from db".format(nsr_id
)
1203 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1204 # nsd = db_nsr["nsd"]
1205 nsr_lcm
= deepcopy(db_nsr
["_admin"].get("deployed"))
1206 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
1209 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1210 # #TODO check if VIM is creating and wait
1211 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1213 db_nsr_update
["operational-status"] = "terminating"
1214 db_nsr_update
["config-status"] = "terminating"
1216 if nsr_lcm
and nsr_lcm
.get("VCA"):
1218 step
= "Scheduling configuration charms removing"
1219 db_nsr_update
["detailed-status"] = "Deleting charms"
1220 self
.logger
.debug(logging_text
+ step
)
1221 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1222 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
1223 if deploy_info
and deploy_info
.get("application"):
1224 task
= asyncio
.ensure_future(
1225 self
.n2vc
.RemoveCharms(
1226 deploy_info
['model'],
1227 deploy_info
['application'],
1228 # self.n2vc_callback,
1234 vca_task_list
.append(task
)
1235 vca_task_dict
[vnf_index
] = task
1236 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1237 # deploy_info['application'], None, db_nsr,
1238 # db_nslcmop, vnf_index))
1239 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["delete_charm:" + vnf_index
] = task
1240 except Exception as e
:
1241 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
1245 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1247 if nsr_lcm
and nsr_lcm
.get("RO") and nsr_lcm
["RO"].get("nsr_id"):
1248 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
1250 step
= db_nsr_update
["detailed-status"] = db_nslcmop_update
["detailed-status"] = "Deleting ns at RO"
1251 self
.logger
.debug(logging_text
+ step
)
1252 await RO
.delete("ns", RO_nsr_id
)
1253 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1254 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1255 except ROclient
.ROClientException
as e
:
1256 if e
.http_code
== 404: # not found
1257 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1258 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1259 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
1260 elif e
.http_code
== 409: # conflict
1261 failed_detail
.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
1262 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1265 failed_detail
.append("RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
1266 self
.logger
.error(logging_text
+ failed_detail
[-1])
1270 if not RO_fail
and nsr_lcm
and nsr_lcm
.get("RO") and nsr_lcm
["RO"].get("nsd_id"):
1271 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
1273 step
= db_nsr_update
["detailed-status"] = db_nslcmop_update
["detailed-status"] =\
1274 "Deleting nsd at RO"
1275 await RO
.delete("nsd", RO_nsd_id
)
1276 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
1277 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
1278 except ROclient
.ROClientException
as e
:
1279 if e
.http_code
== 404: # not found
1280 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
1281 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
1282 elif e
.http_code
== 409: # conflict
1283 failed_detail
.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
1284 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1287 failed_detail
.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
1288 self
.logger
.error(logging_text
+ failed_detail
[-1])
1291 if not RO_fail
and nsr_lcm
and nsr_lcm
.get("RO") and nsr_lcm
["RO"].get("vnfd_id"):
1292 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
1296 step
= db_nsr_update
["detailed-status"] = db_nslcmop_update
["detailed-status"] =\
1297 "Deleting vnfd={} at RO".format(vnf_id
)
1298 await RO
.delete("vnfd", RO_vnfd_id
)
1299 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
1300 db_nsr_update
["_admin.deployed.RO.vnfd_id.{}".format(vnf_id
)] = None
1301 except ROclient
.ROClientException
as e
:
1302 if e
.http_code
== 404: # not found
1303 db_nsr_update
["_admin.deployed.RO.vnfd_id.{}".format(vnf_id
)] = None
1304 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
1305 elif e
.http_code
== 409: # conflict
1306 failed_detail
.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
1307 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1309 failed_detail
.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
1310 self
.logger
.error(logging_text
+ failed_detail
[-1])
1313 db_nsr_update
["detailed-status"] = db_nslcmop_update
["detailed-status"] =\
1314 "Waiting for deletion of configuration charms"
1315 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1316 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1317 await asyncio
.wait(vca_task_list
, timeout
=300)
1318 for vnf_index
, task
in vca_task_dict
.items():
1319 if task
.cancelled():
1320 failed_detail
.append("VCA[{}] Deletion has been cancelled".format(vnf_index
))
1322 exc
= task
.exception()
1324 failed_detail
.append("VCA[{}] Deletion exception: {}".format(vnf_index
, exc
))
1326 db_nsr_update
["_admin.deployed.VCA.{}".format(vnf_index
)] = None
1328 # TODO Should it be cancelled?!!
1330 failed_detail
.append("VCA[{}] Deletion timeout".format(vnf_index
))
1333 self
.logger
.error(logging_text
+ " ;".join(failed_detail
))
1334 db_nsr_update
["operational-status"] = "failed"
1335 db_nsr_update
["detailed-status"] = "Deletion errors " + "; ".join(failed_detail
)
1336 db_nslcmop_update
["detailed-status"] = "; ".join(failed_detail
)
1337 db_nslcmop_update
["operationState"] = "FAILED"
1338 db_nslcmop_update
["statusEnteredTime"] = time()
1339 elif db_nslcmop
["operationParams"].get("autoremove"):
1340 self
.db
.del_one("nsrs", {"_id": nsr_id
})
1341 db_nsr_update
.clear()
1342 self
.db
.del_list("nslcmops", {"nsInstanceId": nsr_id
})
1343 db_nslcmop_update
.clear()
1344 self
.db
.del_list("vnfrs", {"nsr-id-ref": nsr_id
})
1345 self
.logger
.debug(logging_text
+ "Delete from database")
1347 db_nsr_update
["operational-status"] = "terminated"
1348 db_nsr_update
["detailed-status"] = "Done"
1349 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
1350 db_nslcmop_update
["detailed-status"] = "Done"
1351 db_nslcmop_update
["operationState"] = "COMPLETED"
1352 db_nslcmop_update
["statusEnteredTime"] = time()
1354 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
})
1355 except Exception as e
:
1356 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
1357 self
.logger
.debug(logging_text
+ "Exit")
1359 except (ROclient
.ROClientException
, DbException
) as e
:
1360 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1362 except asyncio
.CancelledError
:
1363 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
1364 exc
= "Operation was cancelled"
1365 except Exception as e
:
1366 exc
= traceback
.format_exc()
1367 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1369 if exc
and db_nslcmop
:
1370 db_nslcmop_update
= {
1371 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1372 "operationState": "FAILED",
1373 "statusEnteredTime": time(),
1375 if db_nslcmop_update
:
1376 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1378 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1380 async def _ns_execute_primitive(self
, db_deployed
, member_vnf_index
, primitive
, primitive_params
):
1381 vca_deployed
= db_deployed
["VCA"].get(member_vnf_index
)
1382 if not vca_deployed
:
1383 raise LcmException("charm for member_vnf_index={} is not deployed".format(member_vnf_index
))
1384 model_name
= vca_deployed
.get("model")
1385 application_name
= vca_deployed
.get("application")
1386 if not model_name
or not application_name
:
1387 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index
))
1388 if vca_deployed
["operational-status"] != "active":
1389 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1390 member_vnf_index
, vca_deployed
["operational-status"]))
1391 callback
= None # self.n2vc_callback
1392 callback_args
= () # [db_nsr, db_nslcmop, member_vnf_index, None]
1393 await self
.n2vc
.login()
1394 task
= asyncio
.ensure_future(
1395 self
.n2vc
.ExecutePrimitive(
1398 primitive
, callback
,
1403 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1404 # db_nsr, db_nslcmop, member_vnf_index))
1405 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1406 # wait until completed with timeout
1407 await asyncio
.wait((task
,), timeout
=600)
1409 result
= "FAILED" # by default
1411 if task
.cancelled():
1412 result_detail
= "Task has been cancelled"
1414 exc
= task
.exception()
1416 result_detail
= str(exc
)
1418 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1419 result
= "COMPLETED"
1420 result_detail
= "Done"
1422 # TODO Should it be cancelled?!!
1424 result_detail
= "timeout"
1425 return result
, result_detail
1427 async def ns_action(self
, nsr_id
, nslcmop_id
):
1428 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
1429 self
.logger
.debug(logging_text
+ "Enter")
1430 # get all needed from database
1433 db_nslcmop_update
= None
1436 step
= "Getting information from database"
1437 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1438 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1439 nsr_lcm
= db_nsr
["_admin"].get("deployed")
1440 vnf_index
= db_nslcmop
["operationParams"]["member_vnf_index"]
1442 # TODO check if ns is in a proper status
1443 primitive
= db_nslcmop
["operationParams"]["primitive"]
1444 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
1445 result
, result_detail
= await self
._ns
_execute
_primitive
(nsr_lcm
, vnf_index
, primitive
, primitive_params
)
1446 db_nslcmop_update
= {
1447 "detailed-status": result_detail
,
1448 "operationState": result
,
1449 "statusEnteredTime": time()
1451 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(result
, result_detail
))
1452 return # database update is called inside finally
1454 except (DbException
, LcmException
) as e
:
1455 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1457 except asyncio
.CancelledError
:
1458 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
1459 exc
= "Operation was cancelled"
1460 except Exception as e
:
1461 exc
= traceback
.format_exc()
1462 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
1464 if exc
and db_nslcmop
:
1465 db_nslcmop_update
= {
1466 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1467 "operationState": "FAILED",
1468 "statusEnteredTime": time(),
1470 if db_nslcmop_update
:
1471 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1473 async def ns_scale(self
, nsr_id
, nslcmop_id
):
1474 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
1475 self
.logger
.debug(logging_text
+ "Enter")
1476 # get all needed from database
1479 db_nslcmop_update
= {}
1483 step
= "Getting nslcmop from database"
1484 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1485 step
= "Getting nsr from database"
1486 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1487 step
= "Parsing scaling parameters"
1488 nsr_lcm
= db_nsr
["_admin"].get("deployed")
1489 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
1490 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1491 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1492 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
1493 scaling_policy
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1495 step
= "Getting vnfr from database"
1496 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
1497 step
= "Getting vnfd from database"
1498 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
1499 step
= "Getting scaling-group-descriptor"
1500 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
1501 if scaling_descriptor
["name"] == scaling_group
:
1504 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1505 "at vnfd:scaling-group-descriptor".format(scaling_group
))
1507 for scaling_policy_descriptor
in scaling_descriptor
.get("scaling-policy", ()):
1508 cooldown_time
= scaling_policy_descriptor
.get("cooldown-time", 0)
1509 if scaling_policy
and scaling_policy
== scaling_policy_descriptor
.get("name"):
1512 # TODO check if ns is in a proper status
1513 step
= "Sending scale order to RO"
1515 if not db_nsr
["_admin"].get("scaling-group"):
1516 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
1517 admin_scale_index
= 0
1519 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
1520 if admin_scale_info
["name"] == scaling_group
:
1521 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
1523 RO_scaling_info
= []
1524 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
1525 if scaling_type
== "SCALE_OUT":
1526 # count if max-instance-count is reached
1527 if "max-instance-count" in scaling_descriptor
and scaling_descriptor
["max-instance-count"] is not None:
1528 max_instance_count
= int(scaling_descriptor
["max-instance-count"])
1529 if nb_scale_op
>= max_instance_count
:
1530 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1531 " scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
1532 nb_scale_op
= nb_scale_op
+ 1
1533 vdu_scaling_info
["scaling_direction"] = "OUT"
1534 vdu_scaling_info
["vdu-create"] = {}
1535 for vdu_scale_info
in scaling_descriptor
["vdu"]:
1536 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
1537 "type": "create", "count": vdu_scale_info
.get("count", 1)})
1538 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
1539 elif scaling_type
== "SCALE_IN":
1540 # count if min-instance-count is reached
1541 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
1542 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
1543 if nb_scale_op
<= min_instance_count
:
1544 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1545 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
1546 nb_scale_op
= nb_scale_op
- 1
1547 vdu_scaling_info
["scaling_direction"] = "IN"
1548 vdu_scaling_info
["vdu-delete"] = {}
1549 for vdu_scale_info
in scaling_descriptor
["vdu"]:
1550 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
1551 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
1552 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
1554 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1555 if vdu_scaling_info
["scaling_direction"] == "IN":
1556 for vdur
in reversed(db_vnfr
["vdur"]):
1557 if vdu_scaling_info
["vdu-delete"].get(vdur
["vdu-id-ref"]):
1558 vdu_scaling_info
["vdu-delete"][vdur
["vdu-id-ref"]] -= 1
1559 vdu_scaling_info
["vdu"].append({
1560 "name": vdur
["name"],
1561 "vdu_id": vdur
["vdu-id-ref"],
1564 for interface
in vdur
["interfaces"]:
1565 vdu_scaling_info
["vdu"][-1]["interface"].append({
1566 "name": interface
["name"],
1567 "ip_address": interface
["ip-address"],
1568 "mac_address": interface
.get("mac-address"),
1570 del vdu_scaling_info
["vdu-delete"]
1572 # execute primitive service PRE-SCALING
1573 step
= "Executing pre-scale vnf-config-primitive"
1574 if scaling_descriptor
.get("scaling-config-action"):
1575 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
1576 if scaling_config_action
.get("trigger") and scaling_config_action
["trigger"] == "pre-scale-in" \
1577 and scaling_type
== "SCALE_IN":
1578 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
1579 step
= db_nslcmop_update
["detailed-status"] = \
1580 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
1581 # look for primitive
1582 primitive_params
= {}
1583 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
1584 if config_primitive
["name"] == vnf_config_primitive
:
1585 for parameter
in config_primitive
.get("parameter", ()):
1586 if 'default-value' in parameter
and \
1587 parameter
['default-value'] == "<VDU_SCALE_INFO>":
1588 primitive_params
[parameter
["name"]] = yaml
.safe_dump(vdu_scaling_info
,
1589 default_flow_style
=True,
1594 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1595 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1596 "primitive".format(scaling_group
, config_primitive
))
1597 result
, result_detail
= await self
._ns
_execute
_primitive
(nsr_lcm
, vnf_index
,
1598 vnf_config_primitive
, primitive_params
)
1599 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
1600 vnf_config_primitive
, result
, result_detail
))
1601 if result
== "FAILED":
1602 raise LcmException(result_detail
)
1605 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1606 RO_desc
= await RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
1607 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
1608 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
1609 # TODO mark db_nsr_update as scaling
1611 RO_nslcmop_id
= RO_desc
["instance_action_id"]
1612 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
1614 RO_task_done
= False
1615 step
= detailed_status
= "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id
)
1616 detailed_status_old
= None
1617 self
.logger
.debug(logging_text
+ step
)
1619 deployment_timeout
= 1 * 3600 # One hours
1620 while deployment_timeout
> 0:
1621 if not RO_task_done
:
1622 desc
= await RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
1623 extra_item_id
=RO_nslcmop_id
)
1624 ns_status
, ns_status_info
= RO
.check_action_status(desc
)
1625 if ns_status
== "ERROR":
1626 raise ROclient
.ROClientException(ns_status_info
)
1627 elif ns_status
== "BUILD":
1628 detailed_status
= step
+ "; {}".format(ns_status_info
)
1629 elif ns_status
== "ACTIVE":
1631 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
1632 self
.logger
.debug(logging_text
+ step
)
1634 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
1636 desc
= await RO
.show("ns", RO_nsr_id
)
1637 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
1638 if ns_status
== "ERROR":
1639 raise ROclient
.ROClientException(ns_status_info
)
1640 elif ns_status
== "BUILD":
1641 detailed_status
= step
+ "; {}".format(ns_status_info
)
1642 elif ns_status
== "ACTIVE":
1643 step
= detailed_status
= "Waiting for management IP address reported by the VIM"
1645 desc
= await RO
.show("ns", RO_nsr_id
)
1646 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_info(desc
)
1648 except ROclient
.ROClientException
as e
:
1649 if e
.http_code
!= 409: # IP address is not ready return code is 409 CONFLICT
1652 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1653 if detailed_status
!= detailed_status_old
:
1654 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
1655 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1657 await asyncio
.sleep(5, loop
=self
.loop
)
1658 deployment_timeout
-= 5
1659 if deployment_timeout
<= 0:
1660 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1662 step
= "Updating VNFRs"
1663 # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info)
1664 self
.ns_update_vnfr_2({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
1666 # update VDU_SCALING_INFO with the obtained ip_addresses
1667 if vdu_scaling_info
["scaling_direction"] == "OUT":
1668 for vdur
in reversed(db_vnfr
["vdur"]):
1669 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
1670 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
1671 vdu_scaling_info
["vdu"].append({
1672 "name": vdur
["name"],
1673 "vdu_id": vdur
["vdu-id-ref"],
1676 for interface
in vdur
["interfaces"]:
1677 vdu_scaling_info
["vdu"][-1]["interface"].append({
1678 "name": interface
["name"],
1679 "ip_address": interface
["ip-address"],
1680 "mac_address": interface
.get("mac-address"),
1682 del vdu_scaling_info
["vdu-create"]
1685 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1687 # execute primitive service POST-SCALING
1688 step
= "Executing post-scale vnf-config-primitive"
1689 if scaling_descriptor
.get("scaling-config-action"):
1690 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
1691 if scaling_config_action
.get("trigger") and scaling_config_action
["trigger"] == "post-scale-out" \
1692 and scaling_type
== "SCALE_OUT":
1693 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
1694 step
= db_nslcmop_update
["detailed-status"] = \
1695 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
1696 # look for primitive
1697 primitive_params
= {}
1698 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
1699 if config_primitive
["name"] == vnf_config_primitive
:
1700 for parameter
in config_primitive
.get("parameter", ()):
1701 if 'default-value' in parameter
and \
1702 parameter
['default-value'] == "<VDU_SCALE_INFO>":
1703 primitive_params
[parameter
["name"]] = yaml
.safe_dump(vdu_scaling_info
,
1704 default_flow_style
=True,
1708 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1709 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1710 "match any vnf-cnfiguration:config-primitive".format(scaling_group
,
1712 result
, result_detail
= await self
._ns
_execute
_primitive
(nsr_lcm
, vnf_index
,
1713 vnf_config_primitive
, primitive_params
)
1714 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
1715 vnf_config_primitive
, result
, result_detail
))
1716 if result
== "FAILED":
1717 raise LcmException(result_detail
)
1719 db_nslcmop_update
["operationState"] = "COMPLETED"
1720 db_nslcmop_update
["statusEnteredTime"] = time()
1721 db_nslcmop_update
["detailed-status"] = "done"
1722 db_nsr_update
["detailed-status"] = "done"
1724 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
})
1726 await asyncio
.sleep(cooldown_time
)
1727 await self
.msg
.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
})
1728 except Exception as e
:
1729 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
1730 self
.logger
.debug(logging_text
+ "Exit Ok")
1732 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
1733 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1735 except asyncio
.CancelledError
:
1736 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
1737 exc
= "Operation was cancelled"
1738 except Exception as e
:
1739 exc
= traceback
.format_exc()
1740 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
1743 db_nsr_update
= None
1745 db_nslcmop_update
= {
1746 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1747 "operationState": "FAILED",
1748 "statusEnteredTime": time(),
1750 if db_nslcmop_update
:
1751 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1753 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1755 async def test(self
, param
=None):
1756 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
1758 def cancel_tasks(self
, topic
, _id
):
1760 Cancel all active tasks of a concrete nsr or vim identified for _id
1761 :param topic: can be ns or vim_account
1762 :param _id: nsr or vim identity
1763 :return: None, or raises an exception if not possible
1766 lcm_tasks
= self
.lcm_ns_tasks
1767 elif topic
== "vim_account":
1768 lcm_tasks
= self
.lcm_vim_tasks
1769 elif topic
== "sdn":
1770 lcm_tasks
= self
.lcm_sdn_tasks
1772 if not lcm_tasks
.get(_id
):
1774 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
1775 for task_name
, task
in tasks_set
.items():
1776 result
= task
.cancel()
1778 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
1781 async def kafka_ping(self
):
1782 self
.logger
.debug("Task kafka_ping Enter")
1783 consecutive_errors
= 0
1785 kafka_has_received
= False
1786 self
.pings_not_received
= 1
1789 await self
.msg
.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self
.loop
)
1790 # time between pings are low when it is not received and at starting
1791 wait_time
= 5 if not kafka_has_received
else 120
1792 if not self
.pings_not_received
:
1793 kafka_has_received
= True
1794 self
.pings_not_received
+= 1
1795 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1796 if self
.pings_not_received
> 10:
1797 raise LcmException("It is not receiving pings from Kafka bus")
1798 consecutive_errors
= 0
1800 except LcmException
:
1802 except Exception as e
:
1803 # if not first_start is the first time after starting. So leave more time and wait
1804 # to allow kafka starts
1805 if consecutive_errors
== 8 if not first_start
else 30:
1806 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1808 consecutive_errors
+= 1
1809 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1810 wait_time
= 1 if not first_start
else 5
1811 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1813 async def kafka_read(self
):
1814 self
.logger
.debug("Task kafka_read Enter")
1816 # future = asyncio.Future()
1817 consecutive_errors
= 0
1819 while consecutive_errors
< 10:
1821 topics
= ("admin", "ns", "vim_account", "sdn")
1822 topic
, command
, params
= await self
.msg
.aioread(topics
, self
.loop
)
1823 if topic
!= "admin" and command
!= "ping":
1824 self
.logger
.debug("Task kafka_read receives {} {}: {}".format(topic
, command
, params
))
1825 consecutive_errors
= 0
1828 if command
== "exit":
1831 elif command
.startswith("#"):
1833 elif command
== "echo":
1838 elif command
== "test":
1839 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
1842 if topic
== "admin":
1843 if command
== "ping" and params
["to"] == "lcm" and params
["from"] == "lcm":
1844 self
.pings_not_received
= 0
1847 if command
== "instantiate":
1848 # self.logger.debug("Deploying NS {}".format(nsr_id))
1850 nslcmop_id
= nslcmop
["_id"]
1851 nsr_id
= nslcmop
["nsInstanceId"]
1852 task
= asyncio
.ensure_future(self
.ns_instantiate(nsr_id
, nslcmop_id
))
1853 if nsr_id
not in self
.lcm_ns_tasks
:
1854 self
.lcm_ns_tasks
[nsr_id
] = {}
1855 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_instantiate": task
}
1857 elif command
== "terminate":
1858 # self.logger.debug("Deleting NS {}".format(nsr_id))
1860 nslcmop_id
= nslcmop
["_id"]
1861 nsr_id
= nslcmop
["nsInstanceId"]
1862 self
.cancel_tasks(topic
, nsr_id
)
1863 task
= asyncio
.ensure_future(self
.ns_terminate(nsr_id
, nslcmop_id
))
1864 if nsr_id
not in self
.lcm_ns_tasks
:
1865 self
.lcm_ns_tasks
[nsr_id
] = {}
1866 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_terminate": task
}
1868 elif command
== "action":
1869 # self.logger.debug("Update NS {}".format(nsr_id))
1871 nslcmop_id
= nslcmop
["_id"]
1872 nsr_id
= nslcmop
["nsInstanceId"]
1873 task
= asyncio
.ensure_future(self
.ns_action(nsr_id
, nslcmop_id
))
1874 if nsr_id
not in self
.lcm_ns_tasks
:
1875 self
.lcm_ns_tasks
[nsr_id
] = {}
1876 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_action": task
}
1878 elif command
== "scale":
1879 # self.logger.debug("Update NS {}".format(nsr_id))
1881 nslcmop_id
= nslcmop
["_id"]
1882 nsr_id
= nslcmop
["nsInstanceId"]
1883 task
= asyncio
.ensure_future(self
.ns_scale(nsr_id
, nslcmop_id
))
1884 if nsr_id
not in self
.lcm_ns_tasks
:
1885 self
.lcm_ns_tasks
[nsr_id
] = {}
1886 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_scale": task
}
1888 elif command
== "show":
1890 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1891 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1892 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1893 "".format(nsr_id
, db_nsr
["operational-status"], db_nsr
["config-status"],
1894 db_nsr
["detailed-status"],
1895 db_nsr
["_admin"]["deployed"], self
.lcm_ns_tasks
.get(nsr_id
)))
1896 except Exception as e
:
1897 print("nsr {} not found: {}".format(nsr_id
, e
))
1900 elif command
== "deleted":
1901 continue # TODO cleaning of task just in case should be done
1902 elif topic
== "vim_account":
1903 vim_id
= params
["_id"]
1904 if command
== "create":
1905 task
= asyncio
.ensure_future(self
.vim_create(params
, order_id
))
1906 if vim_id
not in self
.lcm_vim_tasks
:
1907 self
.lcm_vim_tasks
[vim_id
] = {}
1908 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_create": task
}
1910 elif command
== "delete":
1911 self
.cancel_tasks(topic
, vim_id
)
1912 task
= asyncio
.ensure_future(self
.vim_delete(vim_id
, order_id
))
1913 if vim_id
not in self
.lcm_vim_tasks
:
1914 self
.lcm_vim_tasks
[vim_id
] = {}
1915 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_delete": task
}
1917 elif command
== "show":
1918 print("not implemented show with vim_account")
1921 elif command
== "edit":
1922 task
= asyncio
.ensure_future(self
.vim_edit(params
, order_id
))
1923 if vim_id
not in self
.lcm_vim_tasks
:
1924 self
.lcm_vim_tasks
[vim_id
] = {}
1925 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_edit": task
}
1927 elif topic
== "sdn":
1928 _sdn_id
= params
["_id"]
1929 if command
== "create":
1930 task
= asyncio
.ensure_future(self
.sdn_create(params
, order_id
))
1931 if _sdn_id
not in self
.lcm_sdn_tasks
:
1932 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1933 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_create": task
}
1935 elif command
== "delete":
1936 self
.cancel_tasks(topic
, _sdn_id
)
1937 task
= asyncio
.ensure_future(self
.sdn_delete(_sdn_id
, order_id
))
1938 if _sdn_id
not in self
.lcm_sdn_tasks
:
1939 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1940 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_delete": task
}
1942 elif command
== "edit":
1943 task
= asyncio
.ensure_future(self
.sdn_edit(params
, order_id
))
1944 if _sdn_id
not in self
.lcm_sdn_tasks
:
1945 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1946 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_edit": task
}
1948 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
1949 except Exception as e
:
1950 # if not first_start is the first time after starting. So leave more time and wait
1951 # to allow kafka starts
1952 if consecutive_errors
== 8 if not first_start
else 30:
1953 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1955 consecutive_errors
+= 1
1956 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1957 wait_time
= 2 if not first_start
else 5
1958 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1960 # self.logger.debug("Task kafka_read terminating")
1961 self
.logger
.debug("Task kafka_read exit")
1964 self
.loop
= asyncio
.get_event_loop()
1967 self
.loop
.run_until_complete(self
.check_RO_version())
1969 self
.loop
.run_until_complete(asyncio
.gather(
1974 # self.logger.debug("Terminating cancelling creation tasks")
1975 # self.cancel_tasks("ALL", "create")
1977 # while self.is_pending_tasks():
1978 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1979 # await asyncio.sleep(2, loop=self.loop)
1982 # self.cancel_tasks("ALL", "ALL")
1986 self
.db
.db_disconnect()
1988 self
.msg
.disconnect()
1990 self
.fs
.fs_disconnect()
1992 def read_config_file(self
, config_file
):
1993 # TODO make a [ini] + yaml inside parser
1994 # the configparser library is not suitable, because it does not admit comments at the end of line,
1995 # and not parse integer or boolean
1997 with
open(config_file
) as f
:
1999 for k
, v
in environ
.items():
2000 if not k
.startswith("OSMLCM_"):
2002 k_items
= k
.lower().split("_")
2005 for k_item
in k_items
[1:-1]:
2006 if k_item
in ("ro", "vca"):
2007 # put in capital letter
2008 k_item
= k_item
.upper()
2010 if k_items
[-1] == "port":
2011 c
[k_items
[-1]] = int(v
)
2014 except Exception as e
:
2015 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
2018 except Exception as e
:
2019 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
2024 print("""Usage: {} [options]
2025 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
2026 -h|--help: shows this help
2027 """.format(sys
.argv
[0]))
2028 # --log-socket-host HOST: send logs to this host")
2029 # --log-socket-port PORT: send logs using this port (default: 9022)")
2032 if __name__
== '__main__':
2034 # load parameters and configuration
2035 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hc:", ["config=", "help"])
2036 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
2039 if o
in ("-h", "--help"):
2042 elif o
in ("-c", "--config"):
2044 # elif o == "--log-socket-port":
2045 # log_socket_port = a
2046 # elif o == "--log-socket-host":
2047 # log_socket_host = a
2048 # elif o == "--log-file":
2051 assert False, "Unhandled option"
2053 if not path
.isfile(config_file
):
2054 print("configuration file '{}' that not exist".format(config_file
), file=sys
.stderr
)
2057 for config_file
in (__file__
[:__file__
.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
2058 if path
.isfile(config_file
):
2061 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys
.stderr
)
2063 lcm
= Lcm(config_file
)
2065 except (LcmException
, getopt
.GetoptError
) as e
:
2066 print(str(e
), file=sys
.stderr
)