else:
return "BUILD", "VMs: {}/{}, networks: {}/{}".format(vm_done, vm_total, net_done, net_total)
+ @staticmethod
+ def check_action_status(action_descriptor):
+ """
+ Inspect RO instance descriptor and indicates the status
+ :param action_descriptor: action instance descriptor obtained with self.show("ns", "action")
+ :return: status, message: status can be BUILD,ACTIVE,ERROR, message is a text message
+ """
+ net_total = 0
+ vm_total = 0
+ net_done = 0
+ vm_done = 0
+ other_total = 0
+ other_done = 0
+
+ for vim_action_set in action_descriptor["actions"]:
+ for vim_action in vim_action_set["vim_actions"]:
+ if vim_action["item"] == "instance_vms":
+ vm_total += 1
+ elif vim_action["item"] == "instance_nets":
+ net_total += 1
+ else:
+ other_total += 1
+ if vim_action["status"] == "FAILED":
+ return "ERROR", vim_action["error_msg"]
+ elif vim_action["status"] in ("DONE", "SUPERSEDED"):
+ if vim_action["item"] == "instance_vms":
+ vm_done += 1
+ elif vim_action["item"] == "instance_nets":
+ net_done += 1
+ else:
+ other_done += 1
+
+ if net_total == net_done and vm_total == vm_done and other_total == other_done:
+ return "ACTIVE", "VMs {}, networks: {}, other: {} ".format(vm_total, net_total, other_total)
+ else:
+ return "BUILD", "VMs: {}/{}, networks: {}/{}, other: {}/{}".format(vm_done, vm_total, net_done, net_total,
+ other_done, other_total)
+
@staticmethod
def get_ns_vnf_info(ns_descriptor):
"""
Get a dict with the VIM_id, ip_addresses, mac_addresses of every vnf and vdu
:param ns_descriptor: instance descriptor obtained with self.show("ns", )
- :return: dict with {<member_vnf_index>: {ip_address: XXXX, vdur:{ip_address: XXX, vim_id: XXXX}}}
+ :return: dict with:
+ <member_vnf_index>:
+ ip_address: XXXX,
+ vdur:
+ <vdu_osm_id>:
+ ip_address: XXX
+ vim_id: XXXX
+ interfaces:
+ <name>:
+ ip_address: XXX
+ mac_address: XXX
"""
ns_info = {}
for vnf in ns_descriptor["vnfs"]:
for vm in vnf["vms"]:
vdur = {
"vim_id": vm.get("vim_vm_id"),
- "ip_address": vm.get("ip_address")
+ "ip_address": vm.get("ip_address"),
+ "interfaces": {}
}
for iface in vm["interfaces"]:
if iface.get("type") == "mgmt" and not iface.get("ip_address"):
raise ROClientException("ns member_vnf_index '{}' vm '{}' management interface '{}' has no IP "
"address".format(vnf["member_vnf_index"], vm["vdu_osm_id"],
iface["external_name"]), http_code=409)
+ vdur["interfaces"][iface["internal_name"]] = {"ip_address": iface.get("ip_address"),
+ "mac_address": iface.get("mac_address"),
+ "vim_id": iface.get("vim_interface_id"),
+ }
vnfr_info["vdur"][vm["vdu_osm_id"]] = vdur
ns_info[str(vnf["member_vnf_index"])] = vnfr_info
return ns_info
raise ROClientException("No {} found with name '{}'".format(item[:-1], item_id_name), http_code=404)
return uuid
- async def _get_item(self, session, item, item_id_name, all_tenants=False):
+ async def _get_item(self, session, item, item_id_name, extra_item=None, extra_item_id=None, all_tenants=False):
if all_tenants:
tenant_text = "/any"
elif all_tenants is None:
uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants)
url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid)
+ if extra_item:
+ url += "/" + extra_item
+ if extra_item_id:
+ url += "/" + extra_item_id
self.logger.debug("GET %s", url)
with aiohttp.Timeout(self.timeout_short):
async with session.get(url, headers=self.headers_req) as response:
if not action:
action = ""
else:
- action = "/".format(action)
+ action = "/{}".format(action)
url = "{}{apiver}{tenant}/{item}{id}{action}".format(self.endpoint_url, apiver=api_version_text,
tenant=tenant_text, item=item, id=uuid, action=action)
raise ROClientException(response_text, http_code=response.status)
return self._parse_yaml(response_text, response=True)
+ async def get_version(self):
+ """
+ Obtain RO server version.
+ :return: a list with integers ["major", "minor", "release"]. Raises ROClientException on Error,
+ """
+ try:
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ url = "{}/version".format(self.endpoint_url)
+ self.logger.debug("RO GET %s", url)
+ with aiohttp.Timeout(self.timeout_short):
+ async with session.get(url, headers=self.headers_req) as response:
+ response_text = await response.read()
+ self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100]))
+ if response.status >= 300:
+ raise ROClientException(response_text, http_code=response.status)
+ for word in str(response_text).split(" "):
+ if "." in word:
+ version_text, _, _ = word.partition("-")
+ return list(map(int, version_text.split(".")))
+ raise ROClientException("Got invalid version text: '{}'".format(response_text), http_code=500)
+ except aiohttp.errors.ClientOSError as e:
+ raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
+ except Exception as e:
+ raise ROClientException("Got invalid version text: '{}'; causing exception {}".format(response_text, e),
+ http_code=500)
+
async def get_list(self, item, all_tenants=False, filter_by=None):
"""
Obtain a list of items filtering by the specigy filter_by.
return content
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
- async def show(self, item, item_id_name=None, all_tenants=False):
+ async def show(self, item, item_id_name=None, extra_item=None, extra_item_id=None, all_tenants=False):
"""
Obtain the information of an item from its id or name
:param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns'
:param item_id_name: RO id or name of the item. Raise and exception if more than one found
+ :param extra_item: if supplied, it is used to add to the URL.
+ Can be 'action' if item='ns'; 'networks' or'images' if item='vim'
+ :param extra_item_id: if supplied, it is used get details of a concrete extra_item.
:param all_tenants: True if not filtering by tenant. Only allowed for admin
:return: dictionary with the information or raises ROClientException on Error, NotFound, found several
"""
all_tenants = False
with aiohttp.ClientSession(loop=self.loop) as session:
- content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants)
+ content = await self._get_item(session, self.client_to_RO[item], item_id_name, extra_item=extra_item,
+ extra_item_id=extra_item_id, all_tenants=all_tenants)
return remove_envelop(item, content)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
async def delete(self, item, item_id_name=None, all_tenants=False):
"""
return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
async def edit(self, item, item_id_name, descriptor=None, descriptor_format=None, **kwargs):
""" Edit an item
:param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns', 'vim'
+ :param item_id_name: RO id or name of the item. Raise and exception if more than one found
:param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided
:param descriptor_format: Can be 'json' or 'yaml'
:param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type
return remove_envelop(item, outdata)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
async def create(self, item, descriptor=None, descriptor_format=None, **kwargs):
"""
return remove_envelop(item, outdata)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
+
+ async def create_action(self, item, item_id_name, descriptor=None, descriptor_format=None, **kwargs):
+ """
+ Performs an action over an item
+ :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn'
+ :param item_id_name: RO id or name of the item. Raise and exception if more than one found
+ :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided
+ :param descriptor_format: Can be 'json' or 'yaml'
+ :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type
+ keys can be a dot separated list to specify elements inside dict
+ :return: dictionary with the information or raises ROClientException on Error
+ """
+ try:
+ if isinstance(descriptor, str):
+ descriptor = self._parse(descriptor, descriptor_format)
+ elif descriptor:
+ pass
+ else:
+ descriptor = {}
+
+ if item not in self.client_to_RO:
+ raise ROClientException("Invalid item {}".format(item))
+ desc = remove_envelop(item, descriptor)
+
+ # Override descriptor with kwargs
+ if kwargs:
+ desc = self.update_descriptor(desc, kwargs)
+
+ all_tenants = False
+ if item in ('tenant', 'vim'):
+ all_tenants = None
+
+ action = None
+ if item == "vims":
+ action = "sdn_mapping"
+ elif item in ("vim_account", "ns"):
+ action = "action"
+
+ # create_desc = self._create_envelop(item, desc)
+ create_desc = desc
+
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ _all_tenants = all_tenants
+ if item == 'vim':
+ _all_tenants = True
+ # item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name,
+ # all_tenants=_all_tenants)
+ outdata = await self._create_item(session, self.client_to_RO[item], create_desc,
+ item_id_name=item_id_name, # item_id_name=item_id
+ action=action, all_tenants=_all_tenants)
+ return remove_envelop(item, outdata)
+ except aiohttp.errors.ClientOSError as e:
+ raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
async def attach_datacenter(self, datacenter=None, descriptor=None, descriptor_format=None, **kwargs):
- if isinstance(descriptor, str):
- descriptor = self._parse(descriptor, descriptor_format)
- elif descriptor:
- pass
- else:
- descriptor = {}
- desc = remove_envelop("vim", descriptor)
-
- # # check that exist
- # uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True)
- # tenant_text = "/" + self._get_tenant()
- if kwargs:
- desc = self.update_descriptor(desc, kwargs)
-
- if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"):
- raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided")
- create_desc = self._create_envelop("vim", desc)
- payload_req = yaml.safe_dump(create_desc)
- with aiohttp.ClientSession(loop=self.loop) as session:
- # check that exist
- item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True)
- await self._get_tenant(session)
+ try:
+ if isinstance(descriptor, str):
+ descriptor = self._parse(descriptor, descriptor_format)
+ elif descriptor:
+ pass
+ else:
+ descriptor = {}
+ desc = remove_envelop("vim", descriptor)
- url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant,
- datacenter=item_id)
- self.logger.debug("RO POST %s %s", url, payload_req)
- with aiohttp.Timeout(self.timeout_large):
- async with session.post(url, headers=self.headers_req, data=payload_req) as response:
- response_text = await response.read()
- self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100]))
- if response.status >= 300:
- raise ROClientException(response_text, http_code=response.status)
-
- response_desc = self._parse_yaml(response_text, response=True)
- desc = remove_envelop("vim", response_desc)
- return desc
+ # # check that exist
+ # uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True)
+ # tenant_text = "/" + self._get_tenant()
+ if kwargs:
+ desc = self.update_descriptor(desc, kwargs)
+
+ if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"):
+ raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided")
+ create_desc = self._create_envelop("vim", desc)
+ payload_req = yaml.safe_dump(create_desc)
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ # check that exist
+ item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True)
+ await self._get_tenant(session)
+
+ url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant,
+ datacenter=item_id)
+ self.logger.debug("RO POST %s %s", url, payload_req)
+ with aiohttp.Timeout(self.timeout_large):
+ async with session.post(url, headers=self.headers_req, data=payload_req) as response:
+ response_text = await response.read()
+ self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100]))
+ if response.status >= 300:
+ raise ROClientException(response_text, http_code=response.status)
+
+ response_desc = self._parse_yaml(response_text, response=True)
+ desc = remove_envelop("vim", response_desc)
+ return desc
+ except aiohttp.errors.ClientOSError as e:
+ raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
async def detach_datacenter(self, datacenter=None):
# TODO replace the code with delete_item(vim_account,...)
- with aiohttp.ClientSession(loop=self.loop) as session:
- # check that exist
- item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False)
- tenant = await self._get_tenant(session)
-
- url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant,
- datacenter=item_id)
- self.logger.debug("RO DELETE %s", url)
- with aiohttp.Timeout(self.timeout_large):
- async with session.delete(url, headers=self.headers_req) as response:
- response_text = await response.read()
- self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100]))
- if response.status >= 300:
- raise ROClientException(response_text, http_code=response.status)
-
- response_desc = self._parse_yaml(response_text, response=True)
- desc = remove_envelop("vim", response_desc)
- return desc
+ try:
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ # check that exist
+ item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False)
+ tenant = await self._get_tenant(session)
+
+ url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant,
+ datacenter=item_id)
+ self.logger.debug("RO DELETE %s", url)
+ with aiohttp.Timeout(self.timeout_large):
+ async with session.delete(url, headers=self.headers_req) as response:
+ response_text = await response.read()
+ self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100]))
+ if response.status >= 300:
+ raise ROClientException(response_text, http_code=response.status)
+
+ response_desc = self._parse_yaml(response_text, response=True)
+ desc = remove_envelop("vim", response_desc)
+ return desc
+ except aiohttp.errors.ClientOSError as e:
+ raise ROClientException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise ROClientException("Timeout", http_code=504)
# TODO convert to asyncio
# DATACENTERS
import getopt
import functools
import sys
+import traceback
from osm_common import dbmemory
from osm_common import dbmongo
from osm_common import fslocal
__author__ = "Alfonso Tierno"
+min_RO_version = [0, 5, 69]
class LcmException(Exception):
# or with list(map(int, version.split(".")))
if N2VC_version < "0.0.2":
raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version))
+
try:
+ # TODO check database version
if config["database"]["driver"] == "mongo":
self.db = dbmongo.DbMongo()
self.db.db_connect(config["database"])
self.logger.critical(str(e), exc_info=True)
raise LcmException(str(e))
+ async def check_RO_version(self):
+ try:
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO_version = await RO.get_version()
+ if RO_version < min_RO_version:
+ raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format(
+ *RO_version, *min_RO_version
+ ))
+ except ROclient.ROClientException as e:
+ self.logger.critical("Error while conneting to osm/RO " + str(e), exc_info=True)
+ raise LcmException(str(e))
+
def update_db(self, item, _id, _desc):
try:
self.db.replace(item, _id, _desc)
self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
def update_db_2(self, item, _id, _desc):
+ """
+ Updates database with _desc information. Upon success _desc is cleared
+ :param item:
+ :param _id:
+ :param _desc:
+ :return:
+ """
+ if not _desc:
+ return
try:
self.db.set_one(item, {"_id": _id}, _desc)
+ _desc.clear()
except DbException as e:
- self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
+ self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
async def vim_create(self, vim_content, order_id):
vim_id = vim_content["_id"]
ci_file = None
vdu.pop("cloud-init-file", None)
vdu["cloud-init"] = clout_init_content
+ # remnove unused by RO configuration, monitoring, scaling
+ vnfd_RO.pop("vnf-configuration", None)
+ vnfd_RO.pop("monitoring-param", None)
+ vnfd_RO.pop("scaling-group-descriptor", None)
return vnfd_RO
except FsException as e:
raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
"""
nsr_id = None
nslcmop_id = None
- update_nsr = update_nslcmop = False
+ db_nsr_update = {}
+ db_nslcmop_update = {}
try:
nsr_id = db_nsr["_id"]
nslcmop_id = db_nslcmop["_id"]
nsr_lcm = db_nsr["_admin"]["deployed"]
- ns_action = db_nslcmop["lcmOperationType"]
- logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id,
+ ns_operation = db_nslcmop["lcmOperationType"]
+ logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_operation, nslcmop_id,
member_vnf_index)
if task:
exc = task.exception()
if exc:
self.logger.error(logging_text + " task Exception={}".format(exc))
- if ns_action in ("instantiate", "terminate"):
+ if ns_operation in ("instantiate", "terminate"):
nsr_lcm["VCA"][member_vnf_index]['operational-status'] = "error"
+ db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = \
+ "error"
nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(exc)
- elif ns_action == "action":
- db_nslcmop["operationState"] = "FAILED"
- db_nslcmop["detailed-status"] = str(exc)
- db_nslcmop["statusEnteredTime"] = time()
- update_nslcmop = True
+ db_nsr_update[
+ "_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(exc)
+ elif ns_operation == "action":
+ db_nslcmop_update["operationState"] = "FAILED"
+ db_nslcmop_update["detailed-status"] = str(exc)
+ db_nslcmop_update["statusEnteredTime"] = time()
return
else:
self.logger.debug(logging_text + " task Done")
# TODO revise with Adam if action is finished and ok when task is done
- if ns_action == "action":
- db_nslcmop["operationState"] = "COMPLETED"
- db_nslcmop["detailed-status"] = "Done"
- db_nslcmop["statusEnteredTime"] = time()
- update_nslcmop = True
+ if ns_operation == "action":
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nslcmop_update["statusEnteredTime"] = time()
# task is Done, but callback is still ongoing. So ignore
return
elif status:
if nsr_lcm["VCA"][member_vnf_index]['operational-status'] == status:
return # same status, ignore
nsr_lcm["VCA"][member_vnf_index]['operational-status'] = status
+ db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(member_vnf_index)] = status
nsr_lcm["VCA"][member_vnf_index]['detailed-status'] = str(message)
+ db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(member_vnf_index)] = str(message)
else:
self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
return
if all_active:
self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id,
member_vnf_index))
- db_nsr["config-status"] = "configured"
- db_nsr["detailed-status"] = "done"
- db_nslcmop["operationState"] = "COMPLETED"
- db_nslcmop["detailed-status"] = "Done"
- db_nslcmop["statusEnteredTime"] = time()
+ db_nsr_update["config-status"] = "configured"
+ db_nsr_update["detailed-status"] = "done"
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nslcmop_update["statusEnteredTime"] = time()
elif n2vc_error_text:
- db_nsr["config-status"] = "failed"
+ db_nsr_update["config-status"] = "failed"
error_text = "fail configuring " + ";".join(n2vc_error_text)
- db_nsr["detailed-status"] = error_text
- db_nslcmop["operationState"] = "FAILED_TEMP"
- db_nslcmop["detailed-status"] = error_text
- db_nslcmop["statusEnteredTime"] = time()
+ db_nsr_update["detailed-status"] = error_text
+ db_nslcmop_update["operationState"] = "FAILED_TEMP"
+ db_nslcmop_update["detailed-status"] = error_text
+ db_nslcmop_update["statusEnteredTime"] = time()
else:
cs = "configuring: "
separator = ""
for status, num in status_map.items():
cs += separator + "{}: {}".format(status, num)
separator = ", "
- db_nsr["config-status"] = cs
- db_nsr["detailed-status"] = cs
- db_nslcmop["detailed-status"] = cs
- update_nsr = update_nslcmop = True
+ db_nsr_update["config-status"] = cs
+ db_nsr_update["detailed-status"] = cs
+ db_nslcmop_update["detailed-status"] = cs
except Exception as e:
self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index, e), exc_info=True)
finally:
try:
- if update_nslcmop:
- self.update_db("nslcmops", nslcmop_id, db_nslcmop)
- if update_nsr:
- self.update_db("nsrs", nsr_id, db_nsr)
+ if db_nslcmop_update:
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
except Exception as e:
self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
member_vnf_index, e), exc_info=True)
RO_ns_params["networks"][vld["name"]] = RO_vld
return RO_ns_params
+ def ns_update_vnfr(self, db_vnfrs, ns_RO_info):
+ """
+ Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
+ :param db_vnfrs:
+ :param ns_RO_info:
+ :return:
+ """
+ for vnf_index, db_vnfr in db_vnfrs.items():
+ vnfr_deployed = ns_RO_info.get(vnf_index)
+ if not vnfr_deployed:
+ continue
+ vnfr_update = {}
+ db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnfr_deployed.get("ip_address")
+ for index, vdur in enumerate(db_vnfr["vdur"]):
+ vdu_deployed = vnfr_deployed["vdur"].get(vdur["vdu-id-ref"])
+ if not vdu_deployed:
+ continue
+ vnfr_update["vdur.{}.vim-id".format(index)] = vdu_deployed.get("vim_id")
+ db_vnfr["vdur"][index]["vim-id"] = vnfr_update["vdur.{}.vim-id".format(index)]
+ vnfr_update["vdur.{}.ip-address".format(index)] = vdu_deployed.get("ip_address")
+ db_vnfr["vdur"][index]["ip-address"] = vnfr_update["vdur.{}.ip-address".format(index)]
+ for index2, interface in enumerate(vdur["interfaces"]):
+ iface_deployed = vdu_deployed["interfaces"].get(interface["name"])
+ if not iface_deployed:
+ continue
+ db_vnfr["vdur"][index]["interfaces"][index2]["vim-id"] =\
+ vnfr_update["vdur.{}.interfaces.{}.vim-id".format(index, index2)] = iface_deployed.get("vim_id")
+ db_vnfr["vdur"][index]["interfaces"][index2]["ip-address"] =\
+ vnfr_update["vdur.{}.interfaces.{}.ip-address".format(index, index2)] = iface_deployed.get(
+ "ip_address")
+ db_vnfr["vdur"][index]["interfaces"][index2]["mac-address"] =\
+ vnfr_update["vdur.{}.interfaces.{}.mac-address".format(index, index2)] = iface_deployed.get(
+ "mac_address")
+ self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
+
+ def ns_update_vnfr_2(self, db_vnfrs, nsr_desc_RO):
+ """
+ Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
+ :param db_vnfrs:
+ :param nsr_desc_RO:
+ :return:
+ """
+ for vnf_index, db_vnfr in db_vnfrs.items():
+ for vnf_RO in nsr_desc_RO["vnfs"]:
+ if vnf_RO["member_vnf_index"] == vnf_index:
+ vnfr_update = {}
+ db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO.get("ip_address")
+ vdur_list = []
+ for vdur_RO in vnf_RO.get("vms", ()):
+ vdur = {
+ "vim-id": vdur_RO.get("vim_vm_id"),
+ "ip-address": vdur_RO.get("ip_address"),
+ "vdu-id-ref": vdur_RO.get("vdu_osm_id"),
+ "name": vdur_RO.get("vim_name"),
+ "status": vdur_RO.get("status"),
+ "status-detailed": vdur_RO.get("error_msg"),
+ "interfaces": []
+ }
+
+ for interface_RO in vdur_RO.get("interfaces", ()):
+ vdur["interfaces"].append({
+ "ip-address": interface_RO.get("ip_address"),
+ "mac-address": interface_RO.get("mac_address"),
+ "name": interface_RO.get("external_name"),
+ })
+ vdur_list.append(vdur)
+ db_vnfr["vdur"] = vnfr_update["vdur"] = vdur_list
+ self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
+ break
+
+ else:
+ raise LcmException("ns_update_vnfr_2: Not found member_vnf_index={} at RO info".format(vnf_index))
+
+ async def create_monitoring(self, nsr_id, vnf_member_index, vnfd_desc):
+ if not vnfd_desc.get("scaling-group-descriptor"):
+ return
+ for scaling_group in vnfd_desc["scaling-group-descriptor"]:
+ scaling_policy_desc = {}
+ scaling_desc = {
+ "ns_id": nsr_id,
+ "scaling_group_descriptor": {
+ "name": scaling_group["name"],
+ "scaling_policy": scaling_policy_desc
+ }
+ }
+ for scaling_policy in scaling_group.get("scaling-policy"):
+ scaling_policy_desc["scale_in_operation_type"] = scaling_policy_desc["scale_out_operation_type"] = \
+ scaling_policy["scaling-type"]
+ scaling_policy_desc["threshold_time"] = scaling_policy["threshold-time"]
+ scaling_policy_desc["cooldown_time"] = scaling_policy["cooldown-time"]
+ scaling_policy_desc["scaling_criteria"] = []
+ for scaling_criteria in scaling_policy.get("scaling-criteria"):
+ scaling_criteria_desc = {"scale_in_threshold": scaling_criteria.get("scale-in-threshold"),
+ "scale_out_threshold": scaling_criteria.get("scale-out-threshold"),
+ }
+ if not scaling_criteria.get("vnf-monitoring-param-ref"):
+ continue
+ for monitoring_param in vnfd_desc.get("monitoring-param", ()):
+ if monitoring_param["id"] == scaling_criteria["vnf-monitoring-param-ref"]:
+ scaling_criteria_desc["monitoring_param"] = {
+ "id": monitoring_param["id"],
+ "name": monitoring_param["name"],
+ "aggregation_type": monitoring_param.get("aggregation-type"),
+ "vdu_name": monitoring_param.get("vdu-ref"),
+ "vnf_member_index": vnf_member_index,
+ }
+
+ scaling_policy_desc["scaling_criteria"].append(scaling_criteria_desc)
+ break
+ else:
+ self.logger.error(
+ "Task ns={} member_vnf_index={} Invalid vnfd vnf-monitoring-param-ref={} not in "
+ "monitoring-param list".format(nsr_id, vnf_member_index,
+ scaling_criteria["vnf-monitoring-param-ref"]))
+
+ await self.msg.aiowrite("lcm_pm", "configure_scaling", scaling_desc, self.loop)
+
async def ns_instantiate(self, nsr_id, nslcmop_id):
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop = None
- db_vnfr = {}
+ db_nsr_update = {}
+ db_nslcmop_update = {}
+ db_vnfrs = {}
+ RO_descriptor_number = 0 # number of descriptors created at RO
+ descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
exc = None
try:
step = "Getting nslcmop={} from db".format(nslcmop_id)
for c_vnf in nsd["constituent-vnfd"]:
vnfd_id = c_vnf["vnfd-id-ref"]
vnfr_filter["member-vnf-index-ref"] = c_vnf["member-vnf-index"]
- db_vnfr[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
+ step = "Getting vnfr={} of nsr={} from db".format(c_vnf["member-vnf-index"], nsr_id)
+ db_vnfrs[c_vnf["member-vnf-index"]] = self.db.get_one("vnfrs", vnfr_filter)
if vnfd_id not in needed_vnfd:
step = "Getting vnfd={} from db".format(vnfd_id)
needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
"nsr_ip": {},
"VCA": {},
}
- db_nsr["detailed-status"] = "creating"
- db_nsr["operational-status"] = "init"
+ db_nsr_update["detailed-status"] = "creating"
+ db_nsr_update["operational-status"] = "init"
RO = ROclient.ROClient(self.loop, **self.ro_config)
# get vnfds, instantiate at RO
for vnfd_id, vnfd in needed_vnfd.items():
- step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
+ step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
# self.logger.debug(logging_text + step)
- vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
+ vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_id[:23])
+ descriptor_id_2_RO[vnfd_id] = vnfd_id_RO
+ RO_descriptor_number += 1
# look if present
vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
if vnfd_list:
- nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
+ db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
vnfd_id, vnfd_list[0]["uuid"]))
else:
vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
desc = await RO.create("vnfd", descriptor=vnfd_RO)
- nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
- db_nsr["_admin"]["nsState"] = "INSTANTIATED"
+ db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
vnfd_id, desc["uuid"]))
- self.update_db("nsrs", nsr_id, db_nsr)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
# create nsd at RO
nsd_id = nsd["id"]
- step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
+ step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
# self.logger.debug(logging_text + step)
- nsd_id_RO = nsr_id + "." + nsd_id[:200]
- nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
+ RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_id[:23])
+ descriptor_id_2_RO[nsd_id] = RO_osm_nsd_id
+ RO_descriptor_number += 1
+ nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
if nsd_list:
- nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
- nsd_id, nsd_list[0]["uuid"]))
+ nsd_id, RO_nsd_uuid))
else:
nsd_RO = deepcopy(nsd)
- nsd_RO["id"] = nsd_id_RO
+ nsd_RO["id"] = RO_osm_nsd_id
nsd_RO.pop("_id", None)
nsd_RO.pop("_admin", None)
for c_vnf in nsd_RO["constituent-vnfd"]:
vnfd_id = c_vnf["vnfd-id-ref"]
- c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
+ c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
desc = await RO.create("nsd", descriptor=nsd_RO)
- db_nsr["_admin"]["nsState"] = "INSTANTIATED"
- nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
- self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, desc["uuid"]))
- self.update_db("nsrs", nsr_id, db_nsr)
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
+ self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_id, RO_nsd_uuid))
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
# Crate ns at RO
# if present use it unless in error status
- RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
+ RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
if RO_nsr_id:
try:
- step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
+ step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
# self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
desc = await RO.show("ns", RO_nsr_id)
except ROclient.ROClientException as e:
if e.http_code != HTTPStatus.NOT_FOUND:
raise
- RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
+ RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
if RO_nsr_id:
ns_status, ns_status_info = RO.check_ns_status(desc)
- nsr_lcm["RO"]["nsr_status"] = ns_status
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
if ns_status == "ERROR":
- step = db_nsr["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
+ step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
self.logger.debug(logging_text + step)
await RO.delete("ns", RO_nsr_id)
- RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
+ RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
if not RO_nsr_id:
- step = db_nsr["detailed-status"] = "Creating ns at RO"
+ step = db_nsr_update["detailed-status"] = "Creating ns at RO"
# self.logger.debug(logging_text + step)
RO_ns_params = self.ns_params_2_RO(db_nsr.get("instantiate_params"))
desc = await RO.create("ns", descriptor=RO_ns_params,
name=db_nsr["name"],
- scenario=nsr_lcm["RO"]["nsd_id"])
- RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
- db_nsr["_admin"]["nsState"] = "INSTANTIATED"
- nsr_lcm["RO"]["nsr_status"] = "BUILD"
+ scenario=RO_nsd_uuid)
+ RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
- self.update_db("nsrs", nsr_id, db_nsr)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
# update VNFR vimAccount
step = "Updating VNFR vimAcccount"
- for vnf_index, vnfr in db_vnfr.items():
+ for vnf_index, vnfr in db_vnfrs.items():
if vnfr.get("vim-account-id"):
continue
- vnfr["vim-account-id"] = db_nsr["instantiate_params"]["vimAccountId"]
+ vnfr_update = {"vim-account-id": db_nsr["instantiate_params"]["vimAccountId"]}
if db_nsr["instantiate_params"].get("vnf"):
for vnf_params in db_nsr["instantiate_params"]["vnf"]:
if vnf_params.get("member-vnf-index") == vnf_index:
if vnf_params.get("vimAccountId"):
- vnfr["vim-account-id"] = vnf_params.get("vimAccountId")
+ vnfr_update["vim-account-id"] = vnf_params.get("vimAccountId")
break
- self.update_db("vnfrs", vnfr["_id"], vnfr)
+ self.update_db_2("vnfrs", vnfr["_id"], vnfr_update)
# wait until NS is ready
- step = ns_status_detailed = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
- db_nsr["detailed-status"] = ns_status_detailed
+ step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
+ detailed_status_old = None
self.logger.debug(logging_text + step)
+
deployment_timeout = 2 * 3600 # Two hours
while deployment_timeout > 0:
desc = await RO.show("ns", RO_nsr_id)
ns_status, ns_status_info = RO.check_ns_status(desc)
- nsr_lcm["RO"]["nsr_status"] = ns_status
+ db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
if ns_status == "ERROR":
raise ROclient.ROClientException(ns_status_info)
elif ns_status == "BUILD":
- db_nsr_detailed_status_old = db_nsr["detailed-status"]
- db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
- if db_nsr_detailed_status_old != db_nsr["detailed-status"]:
- self.update_db("nsrs", nsr_id, db_nsr)
+ detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
elif ns_status == "ACTIVE":
- step = "Waiting for management IP address from VIM"
+ step = detailed_status = "Waiting for management IP address reported by the VIM"
try:
- ns_RO_info = nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
+ nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
break
except ROclient.ROClientException as e:
if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
raise e
else:
assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
+ if detailed_status != detailed_status_old:
+ detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
await asyncio.sleep(5, loop=self.loop)
deployment_timeout -= 5
if deployment_timeout <= 0:
raise ROclient.ROClientException("Timeout waiting ns to be ready")
step = "Updating VNFRs"
- for vnf_index, vnfr_deployed in ns_RO_info.items():
- vnfr = db_vnfr[vnf_index]
- vnfr["ip-address"] = vnfr_deployed.get("ip_address")
- for vdu_id, vdu_deployed in vnfr_deployed["vdur"].items():
- for vdur in vnfr["vdur"]:
- if vdur["vdu-id-ref"] == vdu_id:
- vdur["vim-id"] = vdu_deployed.get("vim_id")
- vdur["ip-address"] = vdu_deployed.get("ip_address")
- break
- self.update_db("vnfrs", vnfr["_id"], vnfr)
+ # self.ns_update_vnfr(db_vnfrs, ns_RO_info)
+ self.ns_update_vnfr_2(db_vnfrs, desc)
db_nsr["detailed-status"] = "Configuring vnfr"
- self.update_db("nsrs", nsr_id, db_nsr)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
# The parameters we'll need to deploy a charm
number_to_configure = 0
)
# Setup the runtime parameters for this VNF
- params['rw_mgmt_ip'] = db_vnfr[vnf_index]["ip-address"]
+ params['rw_mgmt_ip'] = db_vnfrs[vnf_index]["ip-address"]
# ns_name will be ignored in the current version of N2VC
# but will be implemented for the next point release.
vnf_index,
vnfd['name'],
)
-
- nsr_lcm["VCA"][vnf_index] = {
+ if not nsr_lcm.get("VCA"):
+ nsr_lcm["VCA"] = {}
+ nsr_lcm["VCA"][vnf_index] = db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = {
"model": model_name,
"application": application_name,
"operational-status": "init",
"detailed-status": "",
"vnfd_id": vnfd_id,
}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
proxy_charm))
db_nsr, db_nslcmop, vnf_index))
self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task
- # TODO: Make this call inside deploy()
- # Login to the VCA. If there are multiple calls to login(),
- # subsequent calls will be a nop and return immediately.
- await self.n2vc.login()
-
step = "Looking for needed vnfd to configure"
self.logger.debug(logging_text + step)
+
for c_vnf in nsd["constituent-vnfd"]:
vnfd_id = c_vnf["vnfd-id-ref"]
vnf_index = str(c_vnf["member-vnf-index"])
if 'initial-config-primitive' in vnf_config:
params['initial-config-primitive'] = vnf_config['initial-config-primitive']
+ # Login to the VCA. If there are multiple calls to login(),
+ # subsequent calls will be a nop and return immediately.
+ step = "connecting to N2VC to configure vnf {}".format(vnf_index)
+ await self.n2vc.login()
deploy()
number_to_configure += 1
params['initial-config-primitive'] = vdu_config['initial-config-primitive']
if proxy_charm:
+ step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
+ await self.n2vc.login()
deploy()
number_to_configure += 1
if number_to_configure:
- db_nsr["config-status"] = "configuring"
- db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure)
- db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure)
+ db_nsr_update["config-status"] = "configuring"
+ db_nsr_update["operational-status"] = "running"
+ db_nsr_update["detailed-status"] = "configuring: init: {}".format(number_to_configure)
+ db_nslcmop_update["detailed-status"] = "configuring: init: {}".format(number_to_configure)
else:
- db_nslcmop["operationState"] = "COMPLETED"
- db_nslcmop["statusEnteredTime"] = time()
- db_nslcmop["detailed-status"] = "done"
- db_nsr["config-status"] = "configured"
- db_nsr["detailed-status"] = "done"
- db_nsr["operational-status"] = "running"
- self.update_db("nsrs", nsr_id, db_nsr)
- self.update_db("nslcmops", nslcmop_id, db_nslcmop)
- self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id))
- return nsr_lcm
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ db_nslcmop_update["detailed-status"] = "done"
+ db_nsr_update["config-status"] = "configured"
+ db_nsr_update["detailed-status"] = "done"
+ db_nsr_update["operational-status"] = "running"
+ step = "Sending monitoring parameters to PM"
+ # for c_vnf in nsd["constituent-vnfd"]:
+ # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]])
+ try:
+ await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+
+ self.logger.debug(logging_text + "Exit")
+ return
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
except Exception as e:
+ exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
exc_info=True)
- exc = e
finally:
if exc:
if db_nsr:
- db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc)
- db_nsr["operational-status"] = "failed"
- self.update_db("nsrs", nsr_id, db_nsr)
+ db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
+ db_nsr_update["operational-status"] = "failed"
if db_nslcmop:
- db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc)
- db_nslcmop["operationState"] = "FAILED"
- db_nslcmop["statusEnteredTime"] = time()
- self.update_db("nslcmops", nslcmop_id, db_nslcmop)
+ db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
+ db_nslcmop_update["operationState"] = "FAILED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ if db_nslcmop_update:
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
async def ns_terminate(self, nsr_id, nslcmop_id):
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
db_nsr = None
db_nslcmop = None
exc = None
- step = "Getting nsr, nslcmop from db"
failed_detail = [] # annotates all failed error messages
vca_task_list = []
vca_task_dict = {}
+ db_nsr_update = {}
+ db_nslcmop_update = {}
try:
+ step = "Getting nslcmop={} from db".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ step = "Getting nsr={} from db".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
# nsd = db_nsr["nsd"]
- nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"])
+ nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed"))
if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
return
# TODO ALF remove
# #TODO check if VIM is creating and wait
# RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
- db_nsr_update = {
- "operational-status": "terminating",
- "config-status": "terminating",
- "detailed-status": "Deleting charms",
- }
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ db_nsr_update["operational-status"] = "terminating"
+ db_nsr_update["config-status"] = "terminating"
- try:
- self.logger.debug(logging_text + step)
- for vnf_index, deploy_info in nsr_lcm["VCA"].items():
- if deploy_info and deploy_info.get("application"):
- task = asyncio.ensure_future(
- self.n2vc.RemoveCharms(
- deploy_info['model'],
- deploy_info['application'],
- # self.n2vc_callback,
- # db_nsr,
- # db_nslcmop,
- # vnf_index,
+ if nsr_lcm and nsr_lcm.get("VCA"):
+ try:
+ step = "Scheduling configuration charms removing"
+ db_nsr_update["detailed-status"] = "Deleting charms"
+ self.logger.debug(logging_text + step)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ for vnf_index, deploy_info in nsr_lcm["VCA"].items():
+ if deploy_info and deploy_info.get("application"):
+ task = asyncio.ensure_future(
+ self.n2vc.RemoveCharms(
+ deploy_info['model'],
+ deploy_info['application'],
+ # self.n2vc_callback,
+ # db_nsr,
+ # db_nslcmop,
+ # vnf_index,
+ )
)
- )
- vca_task_list.append(task)
- vca_task_dict[vnf_index] = task
- # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
- # deploy_info['application'], None, db_nsr,
- # db_nslcmop, vnf_index))
- self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
- except Exception as e:
- self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
- # remove from RO
+ vca_task_list.append(task)
+ vca_task_dict[vnf_index] = task
+ # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
+ # deploy_info['application'], None, db_nsr,
+ # db_nslcmop, vnf_index))
+ self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task
+ except Exception as e:
+ self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
+ # remove from RO
+ RO_fail = False
RO = ROclient.ROClient(self.loop, **self.ro_config)
# Delete ns
- RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
- if RO_nsr_id:
+ if nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsr_id"):
+ RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
try:
- step = db_nsr["detailed-status"] = "Deleting ns at RO"
+ step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
self.logger.debug(logging_text + step)
await RO.delete("ns", RO_nsr_id)
- nsr_lcm["RO"]["nsr_id"] = None
- nsr_lcm["RO"]["nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
except ROclient.ROClientException as e:
if e.http_code == 404: # not found
- nsr_lcm["RO"]["nsr_id"] = None
- nsr_lcm["RO"]["nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
elif e.http_code == 409: # conflict
failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
self.logger.debug(logging_text + failed_detail[-1])
+ RO_fail = True
else:
failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
self.logger.error(logging_text + failed_detail[-1])
+ RO_fail = True
# Delete nsd
- RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
- if RO_nsd_id:
+ if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"):
+ RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
try:
- step = db_nsr["detailed-status"] = "Deleting nsd at RO"
+ step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
+ "Deleting nsd at RO"
await RO.delete("nsd", RO_nsd_id)
self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
- nsr_lcm["RO"]["nsd_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = None
except ROclient.ROClientException as e:
if e.http_code == 404: # not found
- nsr_lcm["RO"]["nsd_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = None
self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
elif e.http_code == 409: # conflict
failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
self.logger.debug(logging_text + failed_detail[-1])
+ RO_fail = True
else:
failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
self.logger.error(logging_text + failed_detail[-1])
+ RO_fail = True
- for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
- if not RO_vnfd_id:
- continue
- try:
- step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
- await RO.delete("vnfd", RO_vnfd_id)
- self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
- nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
- except ROclient.ROClientException as e:
- if e.http_code == 404: # not found
- nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
- self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
- elif e.http_code == 409: # conflict
- failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
- self.logger.debug(logging_text + failed_detail[-1])
- else:
- failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
- self.logger.error(logging_text + failed_detail[-1])
+ if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"):
+ for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
+ if not RO_vnfd_id:
+ continue
+ try:
+ step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
+ "Deleting vnfd={} at RO".format(vnf_id)
+ await RO.delete("vnfd", RO_vnfd_id)
+ self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
+ db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
+ self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
+ elif e.http_code == 409: # conflict
+ failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
+ self.logger.debug(logging_text + failed_detail[-1])
+ else:
+ failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
+ self.logger.error(logging_text + failed_detail[-1])
if vca_task_list:
+ db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
+ "Waiting for deletion of configuration charms"
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
await asyncio.wait(vca_task_list, timeout=300)
for vnf_index, task in vca_task_dict.items():
if task.cancelled():
if exc:
failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc))
else:
- nsr_lcm["VCA"][vnf_index] = None
+ db_nsr_update["_admin.deployed.VCA.{}".format(vnf_index)] = None
else: # timeout
# TODO Should it be cancelled?!!
task.cancel()
if failed_detail:
self.logger.error(logging_text + " ;".join(failed_detail))
- db_nsr_update = {
- "operational-status": "failed",
- "detailed-status": "Deletion errors " + "; ".join(failed_detail),
- "_admin.deployed": nsr_lcm
- }
- db_nslcmop_update = {
- "detailed-status": "; ".join(failed_detail),
- "operationState": "FAILED",
- "statusEnteredTime": time()
- }
+ db_nsr_update["operational-status"] = "failed"
+ db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
+ db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
+ db_nslcmop_update["operationState"] = "FAILED"
+ db_nslcmop_update["statusEnteredTime"] = time()
elif db_nslcmop["operationParams"].get("autoremove"):
self.db.del_one("nsrs", {"_id": nsr_id})
+ db_nsr_update.clear()
self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
+ db_nslcmop_update.clear()
self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
self.logger.debug(logging_text + "Delete from database")
else:
- db_nsr_update = {
- "operational-status": "terminated",
- "detailed-status": "Done",
- "_admin.deployed": nsr_lcm,
- "_admin.nsState": "NOT_INSTANTIATED"
- }
- db_nslcmop_update = {
- "detailed-status": "Done",
- "operationState": "COMPLETED",
- "statusEnteredTime": time()
- }
+ db_nsr_update["operational-status"] = "terminated"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ try:
+ await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
except (ROclient.ROClientException, DbException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
except Exception as e:
+ exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
- exc = e
finally:
if exc and db_nslcmop:
db_nslcmop_update = {
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ async def _ns_execute_primitive(self, db_deployed, member_vnf_index, primitive, primitive_params):
+ vca_deployed = db_deployed["VCA"].get(member_vnf_index)
+ if not vca_deployed:
+ raise LcmException("charm for member_vnf_index={} is not deployed".format(member_vnf_index))
+ model_name = vca_deployed.get("model")
+ application_name = vca_deployed.get("application")
+ if not model_name or not application_name:
+ raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index))
+ if vca_deployed["operational-status"] != "active":
+ raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
+ member_vnf_index, vca_deployed["operational-status"]))
+ callback = None # self.n2vc_callback
+ callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
+ await self.n2vc.login()
+ task = asyncio.ensure_future(
+ self.n2vc.ExecutePrimitive(
+ model_name,
+ application_name,
+ primitive, callback,
+ *callback_args,
+ **primitive_params
+ )
+ )
+ # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
+ # db_nsr, db_nslcmop, member_vnf_index))
+ # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
+ # wait until completed with timeout
+ await asyncio.wait((task,), timeout=600)
+
+ result = "FAILED" # by default
+ result_detail = ""
+ if task.cancelled():
+ result_detail = "Task has been cancelled"
+ elif task.done():
+ exc = task.exception()
+ if exc:
+ result_detail = str(exc)
+ else:
+ # TODO revise with Adam if action is finished and ok when task is done or callback is needed
+ result = "COMPLETED"
+ result_detail = "Done"
+ else: # timeout
+ # TODO Should it be cancelled?!!
+ task.cancel()
+ result_detail = "timeout"
+ return result, result_detail
+
async def ns_action(self, nsr_id, nslcmop_id):
logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
# TODO check if ns is in a proper status
- vca_deployed = nsr_lcm["VCA"].get(vnf_index)
- if not vca_deployed:
- raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index))
- model_name = vca_deployed.get("model")
- application_name = vca_deployed.get("application")
- if not model_name or not application_name:
- raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index))
- if vca_deployed["operational-status"] != "active":
- raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
- vnf_index, vca_deployed["operational-status"]))
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
- callback = None # self.n2vc_callback
- callback_args = () # [db_nsr, db_nslcmop, vnf_index, None]
- await self.n2vc.login()
- task = asyncio.ensure_future(
- self.n2vc.ExecutePrimitive(
- model_name,
- application_name,
- primitive, callback,
- *callback_args,
- **primitive_params
- )
- )
- # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
- # db_nsr, db_nslcmop, vnf_index))
- # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
- # wait until completed with timeout
- await asyncio.wait((task,), timeout=300)
-
- result = "FAILED" # by default
- result_detail = ""
- if task.cancelled():
- db_nslcmop["detailed-status"] = "Task has been cancelled"
- elif task.done():
- exc = task.exception()
- if exc:
- result_detail = str(exc)
- else:
- self.logger.debug(logging_text + " task Done")
- # TODO revise with Adam if action is finished and ok when task is done or callback is needed
- result = "COMPLETED"
- result_detail = "Done"
- else: # timeout
- # TODO Should it be cancelled?!!
- task.cancel()
- result_detail = "timeout"
-
+ result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index, primitive, primitive_params)
db_nslcmop_update = {
"detailed-status": result_detail,
"operationState": result,
except (DbException, LcmException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
except Exception as e:
+ exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
- exc = e
finally:
if exc and db_nslcmop:
db_nslcmop_update = {
if db_nslcmop_update:
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+ async def ns_scale(self, nsr_id, nslcmop_id):
+ logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ db_nsr_update = {}
+ exc = None
+ try:
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ step = "Parsing scaling parameters"
+ nsr_lcm = db_nsr["_admin"].get("deployed")
+ RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
+ vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
+ scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
+ scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
+ scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
+ step = "Getting vnfd from database"
+ db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+ step = "Getting scaling-group-descriptor"
+ for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
+ if scaling_descriptor["name"] == scaling_group:
+ break
+ else:
+ raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
+ "at vnfd:scaling-group-descriptor".format(scaling_group))
+ cooldown_time = 0
+ for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
+ cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
+ if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
+ break
+
+ # TODO check if ns is in a proper status
+ step = "Sending scale order to RO"
+ nb_scale_op = 0
+ if not db_nsr["_admin"].get("scaling-group"):
+ self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
+ admin_scale_index = 0
+ else:
+ for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
+ if admin_scale_info["name"] == scaling_group:
+ nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
+ break
+ RO_scaling_info = []
+ vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
+ if scaling_type == "SCALE_OUT":
+ # count if max-instance-count is reached
+ if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
+ max_instance_count = int(scaling_descriptor["max-instance-count"])
+ if nb_scale_op >= max_instance_count:
+ raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
+ " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
+ nb_scale_op = nb_scale_op + 1
+ vdu_scaling_info["scaling_direction"] = "OUT"
+ vdu_scaling_info["vdu-create"] = {}
+ for vdu_scale_info in scaling_descriptor["vdu"]:
+ RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
+ "type": "create", "count": vdu_scale_info.get("count", 1)})
+ vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
+ elif scaling_type == "SCALE_IN":
+ # count if min-instance-count is reached
+ if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
+ min_instance_count = int(scaling_descriptor["min-instance-count"])
+ if nb_scale_op <= min_instance_count:
+ raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
+ nb_scale_op = nb_scale_op - 1
+ vdu_scaling_info["scaling_direction"] = "IN"
+ vdu_scaling_info["vdu-delete"] = {}
+ for vdu_scale_info in scaling_descriptor["vdu"]:
+ RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
+ "type": "delete", "count": vdu_scale_info.get("count", 1)})
+ vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
+
+ # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
+ if vdu_scaling_info["scaling_direction"] == "IN":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_scaling_info["vdu-delete"].get(vdur["vdu-id-ref"]):
+ vdu_scaling_info["vdu-delete"][vdur["vdu-id-ref"]] -= 1
+ vdu_scaling_info["vdu"].append({
+ "name": vdur["name"],
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": []
+ })
+ for interface in vdur["interfaces"]:
+ vdu_scaling_info["vdu"][-1]["interface"].append({
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ del vdu_scaling_info["vdu-delete"]
+
+ # execute primitive service PRE-SCALING
+ step = "Executing pre-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor["scaling-config-action"]:
+ if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
+ and scaling_type == "SCALE_IN":
+ vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
+ step = db_nslcmop_update["detailed-status"] = \
+ "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
+ # look for primitive
+ primitive_params = {}
+ for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ for parameter in config_primitive.get("parameter", ()):
+ if 'default-value' in parameter and \
+ parameter['default-value'] == "<VDU_SCALE_INFO>":
+ primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
+ default_flow_style=True,
+ width=256)
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
+ "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
+ "primitive".format(scaling_group, config_primitive))
+ result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
+ vnf_config_primitive, primitive_params)
+ self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail))
+ if result == "FAILED":
+ raise LcmException(result_detail)
+
+ if RO_scaling_info:
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
+ db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
+ db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
+ # TODO mark db_nsr_update as scaling
+ # wait until ready
+ RO_nslcmop_id = RO_desc["instance_action_id"]
+ db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
+
+ RO_task_done = False
+ step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
+ detailed_status_old = None
+ self.logger.debug(logging_text + step)
+
+ deployment_timeout = 1 * 3600 # One hours
+ while deployment_timeout > 0:
+ if not RO_task_done:
+ desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
+ extra_item_id=RO_nslcmop_id)
+ ns_status, ns_status_info = RO.check_action_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ detailed_status = step + "; {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ RO_task_done = True
+ step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
+ self.logger.debug(logging_text + step)
+ else:
+ assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
+ else:
+ desc = await RO.show("ns", RO_nsr_id)
+ ns_status, ns_status_info = RO.check_ns_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ detailed_status = step + "; {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ step = detailed_status = "Waiting for management IP address reported by the VIM"
+ try:
+ desc = await RO.show("ns", RO_nsr_id)
+ nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
+ break
+ except ROclient.ROClientException as e:
+ if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
+ raise e
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
+ if detailed_status != detailed_status_old:
+ detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+
+ await asyncio.sleep(5, loop=self.loop)
+ deployment_timeout -= 5
+ if deployment_timeout <= 0:
+ raise ROclient.ROClientException("Timeout waiting ns to be ready")
+
+ step = "Updating VNFRs"
+ # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info)
+ self.ns_update_vnfr_2({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
+
+ # update VDU_SCALING_INFO with the obtained ip_addresses
+ if vdu_scaling_info["scaling_direction"] == "OUT":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
+ vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
+ vdu_scaling_info["vdu"].append({
+ "name": vdur["name"],
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": []
+ })
+ for interface in vdur["interfaces"]:
+ vdu_scaling_info["vdu"][-1]["interface"].append({
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ del vdu_scaling_info["vdu-create"]
+
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # execute primitive service POST-SCALING
+ step = "Executing post-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor["scaling-config-action"]:
+ if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
+ and scaling_type == "SCALE_OUT":
+ vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
+ step = db_nslcmop_update["detailed-status"] = \
+ "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
+ # look for primitive
+ primitive_params = {}
+ for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ for parameter in config_primitive.get("parameter", ()):
+ if 'default-value' in parameter and \
+ parameter['default-value'] == "<VDU_SCALE_INFO>":
+ primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
+ default_flow_style=True,
+ width=256)
+ break
+ else:
+ raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
+ "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
+ "match any vnf-cnfiguration:config-primitive".format(scaling_group,
+ config_primitive))
+ result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
+ vnf_config_primitive, primitive_params)
+ self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail))
+ if result == "FAILED":
+ raise LcmException(result_detail)
+
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ db_nslcmop_update["detailed-status"] = "done"
+ db_nsr_update["detailed-status"] = "done"
+ try:
+ await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ if cooldown_time:
+ await asyncio.sleep(cooldown_time)
+ await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+ self.logger.debug(logging_text + "Exit Ok")
+ return
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ if exc:
+ db_nsr_update = None
+ if db_nslcmop:
+ db_nslcmop_update = {
+ "detailed-status": "FAILED {}: {}".format(step, exc),
+ "operationState": "FAILED",
+ "statusEnteredTime": time(),
+ }
+ if db_nslcmop_update:
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
async def test(self, param=None):
self.logger.debug("Starting/Ending test task: {}".format(param))
self.lcm_ns_tasks[nsr_id] = {}
self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task}
continue
+ elif command == "scale":
+ # self.logger.debug("Update NS {}".format(nsr_id))
+ nslcmop = params
+ nslcmop_id = nslcmop["_id"]
+ nsr_id = nslcmop["nsInstanceId"]
+ task = asyncio.ensure_future(self.ns_scale(nsr_id, nslcmop_id))
+ if nsr_id not in self.lcm_ns_tasks:
+ self.lcm_ns_tasks[nsr_id] = {}
+ self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_scale": task}
+ continue
elif command == "show":
try:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
def start(self):
self.loop = asyncio.get_event_loop()
+
+ # check RO version
+ self.loop.run_until_complete(self.check_RO_version())
+
self.loop.run_until_complete(asyncio.gather(
self.kafka_read(),
self.kafka_ping()
exit(1)
lcm = Lcm(config_file)
lcm.start()
- except getopt.GetoptError as e:
+ except (LcmException, getopt.GetoptError) as e:
print(str(e), file=sys.stderr)
# usage()
exit(1)