##
# version moved to lcm.py. uncomment if LCM is installed as library and installed
-version = '6.0.2.post1'
-version_date = '2018-08-30'
+version = '6.0.2.post2'
+version_date = '2018-09-11'
__author__ = "Alfonso Tierno"
-min_RO_version = "0.6.3"
+min_RO_version = "6.0.2"
min_n2vc_version = "0.0.2"
min_common_version = "0.1.19"
# uncomment if LCM is installed as library and installed, and get them from __init__.py
self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
async def check_RO_version(self):
- try:
- RO = ROclient.ROClient(self.loop, **self.ro_config)
- RO_version = await RO.get_version()
- if versiontuple(RO_version) < versiontuple(min_RO_version):
- raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format(
- *RO_version, *min_RO_version
- ))
- except ROclient.ROClientException as e:
- error_text = "Error while conneting to osm/RO " + str(e)
- self.logger.critical(error_text, exc_info=True)
- raise LcmException(error_text)
+ tries = 14
+ last_error = None
+ while True:
+ try:
+ ro_server = ROclient.ROClient(self.loop, **self.ro_config)
+ ro_version = await ro_server.get_version()
+ if versiontuple(ro_version) < versiontuple(min_RO_version):
+ raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
+ ro_version, min_RO_version))
+ self.logger.info("Connected to RO version {}".format(ro_version))
+ return
+ except ROclient.ROClientException as e:
+ tries -= 1
+ error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e)
+ if tries <= 0:
+ self.logger.critical(error_text)
+ raise LcmException(error_text)
+ if last_error != error_text:
+ last_error = error_text
+ self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries))
+ await asyncio.sleep(5)
async def test(self, param=None):
self.logger.debug("Starting/Ending test task: {}".format(param))
"""
filled = []
for point in v.split("."):
- filled.append(point.zfill(16))
+ point, _, _ = point.partition("+")
+ point, _, _ = point.partition("-")
+ filled.append(point.zfill(20))
return tuple(filled)
"wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
# "scenario": ns_params["nsdId"],
}
- if n2vc_key_list:
- for vnfd_ref, vnfd in vnfd_dict.items():
- vdu_needed_access = []
- mgmt_cp = None
- if vnfd.get("vnf-configuration"):
- if vnfd.get("mgmt-interface"):
- if vnfd["mgmt-interface"].get("vdu-id"):
- vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
- elif vnfd["mgmt-interface"].get("cp"):
- mgmt_cp = vnfd["mgmt-interface"]["cp"]
-
- for vdu in vnfd.get("vdu", ()):
- if vdu.get("vdu-configuration"):
+ n2vc_key_list = n2vc_key_list or []
+ for vnfd_ref, vnfd in vnfd_dict.items():
+ vdu_needed_access = []
+ mgmt_cp = None
+ if vnfd.get("vnf-configuration"):
+ ssh_required = vnfd["vnf-configuration"].get("config-access", {}).get("ssh-access").get("required")
+ if ssh_required and vnfd.get("mgmt-interface"):
+ if vnfd["mgmt-interface"].get("vdu-id"):
+ vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
+ elif vnfd["mgmt-interface"].get("cp"):
+ mgmt_cp = vnfd["mgmt-interface"]["cp"]
+
+ for vdu in vnfd.get("vdu", ()):
+ if vdu.get("vdu-configuration"):
+ ssh_required = vdu["vdu-configuration"].get("config-access", {}).get("ssh-access").get("required")
+ if ssh_required:
vdu_needed_access.append(vdu["id"])
- elif mgmt_cp:
- for vdu_interface in vdu.get("interface"):
- if vdu_interface.get("external-connection-point-ref") and \
- vdu_interface["external-connection-point-ref"] == mgmt_cp:
- vdu_needed_access.append(vdu["id"])
- mgmt_cp = None
- break
+ elif mgmt_cp:
+ for vdu_interface in vdu.get("interface"):
+ if vdu_interface.get("external-connection-point-ref") and \
+ vdu_interface["external-connection-point-ref"] == mgmt_cp:
+ vdu_needed_access.append(vdu["id"])
+ mgmt_cp = None
+ break
- if vdu_needed_access:
- for vnf_member in nsd.get("constituent-vnfd"):
- if vnf_member["vnfd-id-ref"] != vnfd_ref:
- continue
- for vdu in vdu_needed_access:
- populate_dict(RO_ns_params,
- ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
- n2vc_key_list)
+ if vdu_needed_access:
+ for vnf_member in nsd.get("constituent-vnfd"):
+ if vnf_member["vnfd-id-ref"] != vnfd_ref:
+ continue
+ for vdu in vdu_needed_access:
+ populate_dict(RO_ns_params,
+ ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
+ n2vc_key_list)
if ns_params.get("vduImage"):
RO_ns_params["vduImage"] = ns_params["vduImage"]
""" Perform unittests using asynctest of osm_lcm.ns module
It allows, if some testing ENV are supplied, testing without mocking some external libraries for debugging:
OSMLCMTEST_NS_PUBKEY: public ssh-key returned by N2VC to inject to VMs
+ OSMLCMTEST_NS_NAME: change name of NS
OSMLCMTEST_PACKAGES_PATH: path where the vnf-packages are stored (de-compressed), each one on a 'vnfd_id' folder
OSMLCMTEST_NS_IPADDRESS: IP address where emulated VMs are reached. Comma separate list
+ OSMLCMTEST_RO_VIMID: VIM id of RO target vim IP. Obtain it with openmano datcenter-list on RO container
OSMLCMTEST_VCA_NOMOCK: Do no mock the VCA, N2VC library, for debugging it
OSMLCMTEST_RO_NOMOCK: Do no mock the ROClient library, for debugging it
OSMLCMTEST_DB_NOMOCK: Do no mock the database library, for debugging it
vcpu-count: 1
version: '1.0'
vnf-configuration:
+ config-access:
+ ssh-access:
+ required: True
+ default-user: ubuntu
config-primitive:
- name: touch
parameter:
self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[])
self.my_ns.RO.create = asynctest.CoroutineMock(self.my_ns.RO.create, side_effect=self._ro_create())
self.my_ns.RO.show = asynctest.CoroutineMock(self.my_ns.RO.show, side_effect=self._ro_show())
+ self.my_ns.RO.create_action = asynctest.CoroutineMock(self.my_ns.RO.create_action)
@asynctest.fail_on(active_handles=True) # all async tasks must be completed
async def test_instantiate(self):
for db_vdur in db_vnfr["vdur"]:
db_vdur.pop("ip_address", None)
db_vdur.pop("mac_address", None)
+ if getenv("OSMLCMTEST_RO_VIMID"):
+ self.db_content["vim_accounts"][0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
+ if getenv("OSMLCMTEST_RO_VIMID"):
+ self.db_content["nsrs"][0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
await self.my_ns.instantiate(nsr_id, nslcmop_id)