From: Dario Faccin Date: Thu, 1 Jun 2023 08:15:34 +0000 (+0200) Subject: Update from master X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F49%2F13449%2F2;p=osm%2FLCM.git Update from master Squashed commit of the following: commit 7494498b9271bca751fce47c9695891d475b3dbf Author: k4.rahul Date: Wed Apr 19 17:00:52 2023 +0530 Feature 10945: Service KPI of VNF using exporter endpoint Change-Id: Ic62d3e2a5dc315768b851135c800609d3805078b Signed-off-by: k4.rahul commit cf47a3b2ef1fe3e095c81dc6033c67b067adb08c Author: k4.rahul Date: Thu Apr 27 12:08:48 2023 +0530 Coverity-CWE 330: Use of Insufficiently Random Values (137944 Cryptographically weak PRNG) replaced SystemRandom().randint() instead of randint() Change-Id: I02dd1387b795c856d0cd05d0366b7bb410016a8b Signed-off-by: k4.rahul commit f4c1d2fe2123e408eec90f4a7c24504479c29837 Author: Gulsum Atici Date: Mon May 15 15:45:31 2023 +0300 Fix Healing operation: additionalParams KeyError If target_vnf does not have additionalParams KeyError is raised and additionalParams become optional within this change. Change-Id: I69cb579882b305befd2a64a60bede450e8e6c10d Signed-off-by: Gulsum Atici commit 1addc93e479dcb97fdfecc74606559d9897217ec Author: Mark Beierl Date: Thu May 18 15:11:34 2023 -0400 Making main async The initializers for Helm attempt to schedule tasks in the event loop, but now that we are not starting one explicitly and passing it around, we need to have a loop already started at the time of constructor. By making start() async, there is a running loop, and functions like k8s_helm_conn.py (~ #81) can call asyncio.create_task Change-Id: Ia4bf25bd5060dc27f07e63c7395dae3a88247a0e Signed-off-by: Mark Beierl commit e789898a1681e9b8568f57608d7604a447250fe5 Author: Gabriel Cuba Date: Thu May 11 01:57:21 2023 -0500 Ubuntu 22.04 and Python 3.10 preparation Change-Id: I8871964f01afac8f601678249acf779426c25090 Signed-off-by: Gabriel Cuba Signed-off-by: Mark Beierl commit c7aeb55412c899de7ed6d5e8e3d792759298bdd6 Author: garciadeblas Date: Tue Apr 18 15:08:24 2023 +0200 Clean stage-archive.sh Change-Id: Ifc77f07d81a215c0f824f46df85d906917dadf97 Signed-off-by: garciadeblas commit e416ea07b6029cf9f2d65248720b9b562a409200 Author: aguilard Date: Mon May 8 15:09:37 2023 +0000 Update ns.py to append osm to metric_name to be BWC Change-Id: I9319296e0f79dcab95878a925977514847c770db Signed-off-by: aguilard commit 1ae3c56140dbb3426addacceffe85be0bdee3c45 Author: aguilard Date: Thu Feb 16 17:24:35 2023 +0000 Feature 10981: added Mongo accesses needed for NGSA Change-Id: If3942d060f468382c7796a7e610bce9b21ab93fc Signed-off-by: aguilard commit 63f9af6180fc2d4561fd73b92f66bea0434ab678 Author: k4.rahul Date: Thu Apr 27 16:38:28 2023 +0530 Coverity-CWE 398: 7PK - Code Quality (137908 Copy-paste error) Coverity fix for CWE 398: 7PK - Code Quality (137908 Copy-paste error) Change-Id: I9e390dd01ff91d3525f327170e5fcd8074dc3a2f Signed-off-by: k4.rahul commit 734c32b3d1f47f41cfc1a926449a8eccacb3e645 Author: garciadeblas Date: Wed Mar 1 11:39:15 2023 +0100 Change in tox.ini to use allowlist_externals Change-Id: Ib9c7c76859fa3e92db8baba65c5b625f40c4fddd Signed-off-by: garciadeblas commit e19017dba6e578c4923d65e8da8e1285f74be476 Author: Gabriel Cuba Date: Mon Mar 13 22:34:44 2023 -0500 Feature 10975: Get vim-flavor-id from instantiation params Change-Id: If0d5e836a1cd61ac3e62ade1bc5768d1f08ee147 Signed-off-by: Gabriel Cuba commit 0ceae9a95032d5d101c3eb19354733ab40c53200 Author: Gabriel Cuba Date: Wed Apr 26 10:50:30 2023 -0500 Bug 2236 - Upgrade of Helm Charts is failing when the Helm Chart is embeded in the package Change-Id: Ib1675664d7eb2363444ba5b0d9438b8673ae8715 Signed-off-by: Gabriel Cuba commit 8b2a7ea1130ea1850093648ac9bb080fb2b54506 Author: Pedro Escaleira Date: Tue Mar 28 18:44:11 2023 +0100 Bug 2230 fixed: added verification for when there are no WIM accounts Change-Id: Ib12c9ea9262b8aa46d12f3f760e952ce4df8c056 Signed-off-by: Pedro Escaleira commit f0af5e6329b1cf966707d98a35281f8cf284aff4 Author: Gabriel Cuba Date: Tue Mar 14 00:27:49 2023 -0500 Feature 10978: Add ipv6_address_mode to ip_profile Change-Id: Ib9c999abe7a80486a0c8283a6ce622ebde9ca6da Signed-off-by: Gabriel Cuba commit 87f5f03155d092c22f2bdf7303f10abf06f42531 Author: Gabriel Cuba Date: Tue Feb 28 18:51:19 2023 -0500 Adds TimeoutError handling to the retryer wrapper function. This will ensure that gRPC calls succed while upgrading a Helm based EE. Change-Id: I49a0ec370986e45e8b779f361ee4d72ff5a15ef1 Signed-off-by: Gabriel Cuba commit c773744f338d3c019b4978b0fc591d14c614b4cf Author: Gabriel Cuba Date: Tue Feb 14 13:09:18 2023 -0500 Build IP profile using the format RO expects, so no further translation is needed. Related RO change: https://osm.etsi.org/gerrit/12966 Change-Id: I5f686f00a5c1d500255d38ae135f81544df32556 Signed-off-by: Gabriel Cuba Change-Id: I4130018de79a00a85ec27604757edb3f9cf2ae36 Signed-off-by: Dario Faccin --- diff --git a/Dockerfile b/Dockerfile index c597522..78946a6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ # devops-stages/stage-build.sh # -FROM ubuntu:20.04 +FROM ubuntu:22.04 ARG APT_PROXY RUN if [ ! -z $APT_PROXY ] ; then \ @@ -37,7 +37,6 @@ RUN DEBIAN_FRONTEND=noninteractive apt-get update && \ python3 \ python3-all \ python3-dev \ - python3-setuptools - -RUN python3 -m easy_install pip==21.3.1 -RUN pip install tox==3.24.5 + python3-setuptools \ + python3-pip \ + tox diff --git a/devops-stages/stage-archive.sh b/devops-stages/stage-archive.sh index 8759429..91a6a73 100755 --- a/devops-stages/stage-archive.sh +++ b/devops-stages/stage-archive.sh @@ -19,7 +19,4 @@ rm -rf pool rm -rf dists mkdir -p pool/$MDG mv deb_dist/*.deb pool/$MDG/ -mkdir -p dists/unstable/$MDG/binary-amd64/ -apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages -gzip -9fk dists/unstable/$MDG/binary-amd64/Packages -echo "dists/**,pool/$MDG/*.deb" + diff --git a/osm_lcm/ROclient.py b/osm_lcm/ROclient.py index 8d6f510..b924906 100644 --- a/osm_lcm/ROclient.py +++ b/osm_lcm/ROclient.py @@ -131,8 +131,7 @@ class ROClient: timeout_large = 120 timeout_short = 30 - def __init__(self, loop, uri, **kwargs): - self.loop = loop + def __init__(self, uri, **kwargs): self.uri = uri self.username = kwargs.get("username") @@ -829,6 +828,11 @@ class ROClient: except asyncio.TimeoutError: raise ROClientException("Timeout", http_code=504) except Exception as e: + self.logger.critical( + "Got invalid version text: '{}'; causing exception {}".format( + response_text, str(e) + ) + ) raise ROClientException( "Got invalid version text: '{}'; causing exception {}".format( response_text, e @@ -849,7 +853,7 @@ class ROClient: raise ROClientException("Invalid item {}".format(item)) if item == "tenant": all_tenants = None - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: content = await self._list_item( session, self.client_to_RO[item], @@ -899,7 +903,7 @@ class ROClient: elif item == "vim_account": all_tenants = False - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: content = await self._get_item( session, self.client_to_RO[item], @@ -928,7 +932,7 @@ class ROClient: if item in ("tenant", "vim", "wim"): all_tenants = None - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: result = await self._del_item( session, self.client_to_RO[item], @@ -980,7 +984,7 @@ class ROClient: create_desc = self._create_envelop(item, desc) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: _all_tenants = all_tenants if item == "vim": _all_tenants = True @@ -1044,7 +1048,7 @@ class ROClient: create_desc = self._create_envelop(item, desc) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: outdata = await self._create_item( session, self.client_to_RO[item], @@ -1099,7 +1103,7 @@ class ROClient: # create_desc = self._create_envelop(item, desc) create_desc = desc - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: _all_tenants = all_tenants if item == "vim": _all_tenants = True @@ -1161,7 +1165,7 @@ class ROClient: ) create_desc = self._create_envelop(item, desc) payload_req = yaml.safe_dump(create_desc) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: # check that exist item_id = await self._get_item_uuid( session, self.client_to_RO[item], item_id_name, all_tenants=True @@ -1202,7 +1206,7 @@ class ROClient: async def detach(self, item, item_id_name=None): # TODO replace the code with delete_item(vim_account,...) try: - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: # check that exist item_id = await self._get_item_uuid( session, self.client_to_RO[item], item_id_name, all_tenants=False @@ -1445,119 +1449,3 @@ class ROClient: raise ROClientException(str(content), http_code=mano_response.status) else: raise ROClientException("Unknown value for action '{}".format(str(action))) - - -if __name__ == "__main__": - RO_URL = "http://localhost:9090/openmano" - TEST_TENANT = "myTenant" - TEST_VIM1 = "myvim" - TEST_URL1 = "https://localhost:5000/v1" - TEST_TYPE1 = "openstack" - TEST_CONFIG1 = {"use_floating_ip": True} - TEST_VIM2 = "myvim2" - TEST_URL2 = "https://localhost:5000/v2" - TEST_TYPE2 = "openvim" - TEST_CONFIG2 = {"config2": "config2", "config3": True} - - streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" - logging.basicConfig(format=streamformat) - logger = logging.getLogger("ROClient") - - tenant_id = None - vim_id = False - loop = asyncio.get_event_loop() - myClient = ROClient(uri=RO_URL, loop=loop, loglevel="DEBUG") - try: - # test tenant - content = loop.run_until_complete(myClient.get_list("tenant")) - print("tenants", content) - content = loop.run_until_complete(myClient.create("tenant", name=TEST_TENANT)) - tenant_id = True - content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT)) - print("tenant", TEST_TENANT, content) - content = loop.run_until_complete( - myClient.edit("tenant", TEST_TENANT, description="another description") - ) - content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT)) - print("tenant edited", TEST_TENANT, content) - myClient["tenant"] = TEST_TENANT - - # test VIM - content = loop.run_until_complete( - myClient.create( - "vim", - name=TEST_VIM1, - type=TEST_TYPE1, - vim_url=TEST_URL1, - config=TEST_CONFIG1, - ) - ) - vim_id = True - content = loop.run_until_complete(myClient.get_list("vim")) - print("vim", content) - content = loop.run_until_complete(myClient.show("vim", TEST_VIM1)) - print("vim", TEST_VIM1, content) - content = loop.run_until_complete( - myClient.edit( - "vim", - TEST_VIM1, - description="another description", - name=TEST_VIM2, - type=TEST_TYPE2, - vim_url=TEST_URL2, - config=TEST_CONFIG2, - ) - ) - content = loop.run_until_complete(myClient.show("vim", TEST_VIM2)) - print("vim edited", TEST_VIM2, content) - - # test VIM_ACCOUNT - content = loop.run_until_complete( - myClient.attach_datacenter( - TEST_VIM2, - vim_username="user", - vim_password="pass", - vim_tenant_name="vimtenant1", - config=TEST_CONFIG1, - ) - ) - vim_id = True - content = loop.run_until_complete(myClient.get_list("vim_account")) - print("vim_account", content) - content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2)) - print("vim_account", TEST_VIM2, content) - content = loop.run_until_complete( - myClient.edit( - "vim_account", - TEST_VIM2, - vim_username="user2", - vim_password="pass2", - vim_tenant_name="vimtenant2", - config=TEST_CONFIG2, - ) - ) - content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2)) - print("vim_account edited", TEST_VIM2, content) - - myClient["vim"] = TEST_VIM2 - - except Exception as e: - logger.error("Error {}".format(e), exc_info=True) - - for item in ( - ("vim_account", TEST_VIM1), - ("vim", TEST_VIM1), - ("vim_account", TEST_VIM2), - ("vim", TEST_VIM2), - ("tenant", TEST_TENANT), - ): - try: - content = loop.run_until_complete(myClient.delete(item[0], item[1])) - print("{} {} deleted; {}".format(item[0], item[1], content)) - except Exception as e: - if e.http_code == 404: - print("{} {} not present or already deleted".format(item[0], item[1])) - else: - logger.error("Error {}".format(e), exc_info=True) - - loop.close() diff --git a/osm_lcm/data_utils/lcm_config.py b/osm_lcm/data_utils/lcm_config.py index ffd236e..2e57d3c 100644 --- a/osm_lcm/data_utils/lcm_config.py +++ b/osm_lcm/data_utils/lcm_config.py @@ -122,9 +122,12 @@ class VcaConfig(OsmConfigman): eegrpcinittimeout: int = None eegrpctimeout: int = None eegrpc_tls_enforce: bool = False + eegrpc_pod_admission_policy: str = "baseline" loglevel: str = "DEBUG" logfile: str = None ca_store: str = "/etc/ssl/certs/osm-ca.crt" + client_cert_path: str = "/etc/ssl/lcm-client/tls.crt" + client_key_path: str = "/etc/ssl/lcm-client/tls.key" kubectl_osm_namespace: str = "osm" kubectl_osm_cluster_name: str = "_system-osm-k8s" helm_ee_service_port: int = 50050 diff --git a/osm_lcm/data_utils/wim.py b/osm_lcm/data_utils/wim.py index c8ce0bf..ce2c26c 100644 --- a/osm_lcm/data_utils/wim.py +++ b/osm_lcm/data_utils/wim.py @@ -80,6 +80,11 @@ def select_feasible_wim_account(db_nsr, db_vnfrs, target_vld, vld_params, logger candidate_wims = get_candidate_wims(vims_to_connect) logger.info("Candidate WIMs: {:s}".format(str(candidate_wims))) + # check if there are no WIM candidates + if len(candidate_wims) == 0: + logger.info("No WIM accounts found") + return None, None + # check if a desired wim_account_id is specified in vld_params wim_account_id = vld_params.get("wimAccountId") if wim_account_id: diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 723ca7a..2fc479f 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -70,7 +70,7 @@ class Lcm: main_config = LcmCfg() - def __init__(self, config_file, loop=None): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -96,7 +96,6 @@ class Lcm: self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict())) # TODO: check if lcm_hc.py is necessary self.health_check_file = get_health_check_file(self.main_config.to_dict()) - self.loop = loop or asyncio.get_event_loop() self.ns = ( self.netslice ) = ( @@ -168,7 +167,7 @@ class Lcm: # copy message configuration in order to remove 'group_id' for msg_admin config_message = self.main_config.message.to_dict() - config_message["loop"] = self.loop + config_message["loop"] = asyncio.get_event_loop() if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) @@ -205,12 +204,12 @@ class Lcm: # try new RO, if fail old RO try: self.main_config.RO.uri = ro_uri + "ro" - ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = NgRoClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = True except Exception: self.main_config.RO.uri = ro_uri + "openmano" - ro_server = ROClient(self.loop, **self.main_config.RO.to_dict()) + ro_server = ROClient(**self.main_config.RO.to_dict()) ro_version = await ro_server.get_version() self.main_config.RO.ng = False if versiontuple(ro_version) < versiontuple(min_RO_version): @@ -262,7 +261,6 @@ class Lcm: "worker_id": self.worker_id, "version": lcm_version, }, - self.loop, ) # time between pings are low when it is not received and at starting wait_time = ( @@ -273,7 +271,7 @@ class Lcm: if not self.pings_not_received: kafka_has_received = True self.pings_not_received += 1 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) if self.pings_not_received > 10: raise LcmException("It is not receiving pings from Kafka bus") consecutive_errors = 0 @@ -295,7 +293,7 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) def kafka_read_callback(self, topic, command, params): order_id = 1 @@ -317,7 +315,7 @@ class Lcm: sys.stdout.flush() return elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) + asyncio.Task(self.test(params)) return if topic == "admin": @@ -672,11 +670,10 @@ class Lcm: topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.loop, self.kafka_read_callback, from_beginning=True + topics, self.kafka_read_callback, from_beginning=True ), self.msg_admin.aioread( topics_admin, - self.loop, self.kafka_read_callback, group_id=False, ), @@ -700,42 +697,35 @@ class Lcm: "Task kafka_read retrying after Exception {}".format(e) ) wait_time = 2 if not self.first_start else 5 - await asyncio.sleep(wait_time, loop=self.loop) + await asyncio.sleep(wait_time) # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def start(self): + async def kafka_read_ping(self): + await asyncio.gather(self.kafka_read(), self.kafka_ping()) + + async def start(self): # check RO version - self.loop.run_until_complete(self.check_RO_version()) + await self.check_RO_version() - self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config) # TODO: modify the rest of classes to use the LcmCfg object instead of dicts self.netslice = netslice.NetsliceLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns - ) - self.vim = vim_sdn.VimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.wim = vim_sdn.WimLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.sdn = vim_sdn.SdnLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns ) + self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8scluster = vim_sdn.K8sClusterLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop - ) - self.vca = vim_sdn.VcaLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.k8srepo = vim_sdn.K8sRepoLcm( - self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop + self.msg, self.lcm_tasks, self.main_config.to_dict() ) - self.loop.run_until_complete( - asyncio.gather(self.kafka_read(), self.kafka_ping()) - ) + await self.kafka_read_ping() # TODO # self.logger.debug("Terminating cancelling creation tasks") @@ -743,12 +733,10 @@ class Lcm: # timeout = 200 # while self.is_pending_tasks(): # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") - # await asyncio.sleep(2, loop=self.loop) + # await asyncio.sleep(2) # timeout -= 2 # if not timeout: # self.lcm_tasks.cancel("ALL", "ALL") - self.loop.close() - self.loop = None if self.db: self.db.db_disconnect() if self.msg: @@ -856,7 +844,7 @@ if __name__ == "__main__": ) exit(1) lcm = Lcm(config_file) - lcm.start() + asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage() diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 4d1bfb2..d7db639 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -47,7 +47,7 @@ from osm_lcm.lcm_utils import deep_get def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): - retry_exceptions = ConnectionRefusedError + retry_exceptions = (ConnectionRefusedError, TimeoutError) @functools.wraps(func) async def wrapped(*args, **kwargs): @@ -79,21 +79,16 @@ def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_dela def create_secure_context( - trusted: str, + trusted: str, client_cert_path: str, client_key_path: str ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = True ctx.minimum_version = ssl.TLSVersion.TLSv1_2 - # TODO: client TLS - # ctx.load_cert_chain(str(client_cert), str(client_key)) + ctx.load_cert_chain(client_cert_path, client_key_path) ctx.load_verify_locations(trusted) ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20") ctx.set_alpn_protocols(["h2"]) - try: - ctx.set_npn_protocols(["h2"]) - except NotImplementedError: - pass return ctx @@ -101,7 +96,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): def __init__( self, log: object = None, - loop: object = None, vca_config: VcaConfig = None, on_update_db=None, ): @@ -114,7 +108,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): # parent class constructor N2VCConnector.__init__( - self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs + self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs ) self.vca_config = vca_config @@ -191,6 +185,9 @@ class LCMHelmConn(N2VCConnector, LcmBase): and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ + if not namespace: + namespace = self.vca_config.kubectl_osm_namespace + self.log.info( "create_execution_environment: namespace: {}, artifact_path: {}, " "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format( @@ -254,7 +251,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self.vca_config.kubectl_osm_namespace, + namespace=namespace, params=config, db_dict=db_dict, timeout=progress_timeout, @@ -268,15 +265,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self.vca_config.kubectl_osm_namespace, + namespace=namespace, params=config, db_dict=db_dict, timeout=progress_timeout, ) - ee_id = "{}:{}.{}".format( - vca_type, self.vca_config.kubectl_osm_namespace, helm_id - ) + ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id) return ee_id, None except N2VCException: raise @@ -428,6 +423,37 @@ class LCMHelmConn(N2VCConnector, LcmBase): certificate_name=certificate_name, ) + async def setup_ns_namespace( + self, + name: str, + ): + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + await self._k8sclusterhelm3.create_namespace( + namespace=name, + cluster_uuid=system_cluster_uuid, + labels={ + "pod-security.kubernetes.io/enforce": self.vca_config.eegrpc_pod_admission_policy + }, + ) + await self._k8sclusterhelm3.setup_default_rbac( + name="ee-role", + namespace=name, + api_groups=[""], + resources=["secrets"], + verbs=["get"], + service_account="default", + cluster_uuid=system_cluster_uuid, + ) + await self._k8sclusterhelm3.copy_secret_data( + src_secret="osm-ca", + dst_secret="osm-ca", + src_namespace=self.vca_config.kubectl_osm_namespace, + dst_namespace=name, + cluster_uuid=system_cluster_uuid, + data_key="ca.crt", + ) + async def register_execution_environment( self, namespace: str, @@ -698,8 +724,12 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def delete_namespace( self, namespace: str, db_dict: dict = None, total_timeout: float = None ): - # method not implemented for this connector, execution environments must be deleted individually - pass + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + await self._k8sclusterhelm3.delete_namespace( + namespace=namespace, + cluster_uuid=system_cluster_uuid, + ) async def install_k8s_proxy_charm( self, @@ -772,7 +802,11 @@ class LCMHelmConn(N2VCConnector, LcmBase): else: return "ERROR", "No result received" - ssl_context = create_secure_context(self.vca_config.ca_store) + ssl_context = create_secure_context( + self.vca_config.ca_store, + self.vca_config.client_cert_path, + self.vca_config.client_key_path, + ) channel = Channel( ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context ) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 7ce1841..d91eaed 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -173,6 +173,35 @@ def get_ee_id_parts(ee_id): return version, namespace, helm_id +def vld_to_ro_ip_profile(source_data): + if source_data: + return { + "ip_version": "IPv4" + if "v4" in source_data.get("ip-version", "ipv4") + else "IPv6", + "subnet_address": source_data.get("cidr") + or source_data.get("subnet-address"), + "gateway_address": source_data.get("gateway-ip") + or source_data.get("gateway-address"), + "dns_address": ";".join( + [v["address"] for v in source_data["dns-server"] if v.get("address")] + ) + if source_data.get("dns-server") + else None, + "dhcp_enabled": source_data.get("dhcp-params", {}).get("enabled", False) + or source_data.get("dhcp-enabled", False), + "dhcp_start_address": source_data["dhcp-params"].get("start-address") + if source_data.get("dhcp-params") + else None, + "dhcp_count": source_data["dhcp-params"].get("count") + if source_data.get("dhcp-params") + else None, + "ipv6_address_mode": source_data["ipv6-address-mode"] + if "ipv6-address-mode" in source_data + else None, + } + + class LcmBase: def __init__(self, msg, logger): """ diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py index 47e3f19..8fe291b 100644 --- a/osm_lcm/netslice.py +++ b/osm_lcm/netslice.py @@ -34,7 +34,7 @@ __author__ = "Felipe Vicens, Pol Alemany, Alfonso Tierno" class NetsliceLcm(LcmBase): - def __init__(self, msg, lcm_tasks, config, loop, ns): + def __init__(self, msg, lcm_tasks, config, ns): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -42,7 +42,6 @@ class NetsliceLcm(LcmBase): """ # logging self.logger = logging.getLogger("lcm.netslice") - self.loop = loop self.lcm_tasks = lcm_tasks self.ns = ns self.ro_config = config["RO"] @@ -90,7 +89,7 @@ class NetsliceLcm(LcmBase): db_nsilcmop_update = {} nsilcmop_operation_state = None vim_2_RO = {} - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) nsi_vld_instantiationi_params = {} def ip_profile_2_RO(ip_profile): @@ -498,7 +497,7 @@ class NetsliceLcm(LcmBase): break # TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300) - await asyncio.sleep(5, loop=self.loop) + await asyncio.sleep(5) else: # timeout_nsi_deploy reached: raise LcmException("Timeout waiting nsi to be ready.") @@ -583,7 +582,7 @@ class NetsliceLcm(LcmBase): db_nsilcmop = None db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} db_nsilcmop_update = {} - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) nsir_deployed = None failed_detail = [] # annotates all failed error messages nsilcmop_operation_state = None @@ -724,7 +723,7 @@ class NetsliceLcm(LcmBase): ) break - await asyncio.sleep(5, loop=self.loop) + await asyncio.sleep(5) termination_timeout -= 5 if termination_timeout <= 0: @@ -857,7 +856,6 @@ class NetsliceLcm(LcmBase): "operationState": nsilcmop_operation_state, "autoremove": autoremove, }, - loop=self.loop, ) except Exception as e: self.logger.error( diff --git a/osm_lcm/ng_ro.py b/osm_lcm/ng_ro.py index 95aa5c9..9426488 100644 --- a/osm_lcm/ng_ro.py +++ b/osm_lcm/ng_ro.py @@ -67,8 +67,7 @@ class NgRoClient: timeout_large = 120 timeout_short = 30 - def __init__(self, loop, uri, **kwargs): - self.loop = loop + def __init__(self, uri, **kwargs): self.endpoint_url = uri if not self.endpoint_url.endswith("/"): self.endpoint_url += "/" @@ -103,7 +102,7 @@ class NgRoClient: payload_req = yaml.safe_dump(target) url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("NG-RO POST %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) async with session.post( @@ -136,7 +135,7 @@ class NgRoClient: payload_req = yaml.safe_dump(target) url = "{}/ns/v1/migrate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("NG-RO POST %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) async with session.post( @@ -172,7 +171,7 @@ class NgRoClient: url = "{}/ns/v1/{operation_type}/{nsr_id}".format( self.endpoint_url, operation_type=operation_type, nsr_id=nsr_id ) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("NG-RO POST %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) async with session.post( @@ -197,7 +196,7 @@ class NgRoClient: url = "{}/ns/v1/deploy/{nsr_id}/{action_id}".format( self.endpoint_url, nsr_id=nsr_id, action_id=action_id ) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("GET %s", url) # timeout = aiohttp.ClientTimeout(total=self.timeout_short) async with session.get(url, headers=self.headers_req) as response: @@ -219,7 +218,7 @@ class NgRoClient: async def delete(self, nsr_id): try: url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("DELETE %s", url) # timeout = aiohttp.ClientTimeout(total=self.timeout_short) async with session.delete(url, headers=self.headers_req) as response: @@ -242,7 +241,7 @@ class NgRoClient: """ try: response_text = "" - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: url = "{}/version".format(self.endpoint_url) self.logger.debug("RO GET %s", url) # timeout = aiohttp.ClientTimeout(total=self.timeout_short) @@ -293,7 +292,7 @@ class NgRoClient: payload_req = yaml.safe_dump(target) url = "{}/ns/v1/recreate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("NG-RO POST %s %s", url, payload_req) async with session.post( url, headers=self.headers_req, data=payload_req @@ -317,7 +316,7 @@ class NgRoClient: url = "{}/ns/v1/recreate/{nsr_id}/{action_id}".format( self.endpoint_url, nsr_id=nsr_id, action_id=action_id ) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("GET %s", url) async with session.get(url, headers=self.headers_req) as response: response_text = await response.read() @@ -350,7 +349,7 @@ class NgRoClient: url = "{}/ns/v1/verticalscale/{nsr_id}".format( self.endpoint_url, nsr_id=nsr_id ) - async with aiohttp.ClientSession(loop=self.loop) as session: + async with aiohttp.ClientSession() as session: self.logger.debug("NG-RO POST %s %s", url, payload_req) async with session.post( url, headers=self.headers_req, data=payload_req diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 915581f..999aee6 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -61,6 +61,7 @@ from osm_lcm.lcm_utils import ( check_juju_bundle_existence, get_charm_artifact_path, get_ee_id_parts, + vld_to_ro_ip_profile, ) from osm_lcm.data_utils.nsd import ( get_ns_configuration_relation_list, @@ -123,7 +124,7 @@ from copy import copy, deepcopy from time import time from uuid import uuid4 -from random import randint +from random import SystemRandom __author__ = "Alfonso Tierno " @@ -132,9 +133,18 @@ class NsLcm(LcmBase): SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 SUBOPERATION_STATUS_SKIP = -3 + EE_TLS_NAME = "ee-tls" task_name_deploy_vca = "Deploying VCA" - - def __init__(self, msg, lcm_tasks, config: LcmCfg, loop): + rel_operation_types = { + "GE": ">=", + "LE": "<=", + "GT": ">", + "LT": "<", + "EQ": "==", + "NE": "!=", + } + + def __init__(self, msg, lcm_tasks, config: LcmCfg): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -144,7 +154,6 @@ class NsLcm(LcmBase): self.db = Database().instance.db self.fs = Filesystem().instance.fs - self.loop = loop self.lcm_tasks = lcm_tasks self.timeout = config.timeout self.ro_config = config.RO @@ -153,7 +162,6 @@ class NsLcm(LcmBase): # create N2VC connector self.n2vc = N2VCJujuConnector( log=self.logger, - loop=self.loop, on_update_db=self._on_update_n2vc_db, fs=self.fs, db=self.db, @@ -161,7 +169,6 @@ class NsLcm(LcmBase): self.conn_helm_ee = LCMHelmConn( log=self.logger, - loop=self.loop, vca_config=self.vca_config, on_update_db=self._on_update_n2vc_db, ) @@ -188,7 +195,6 @@ class NsLcm(LcmBase): kubectl_command=self.vca_config.kubectlpath, juju_command=self.vca_config.jujupath, log=self.logger, - loop=self.loop, on_update_db=self._on_update_k8s_db, fs=self.fs, db=self.db, @@ -211,7 +217,7 @@ class NsLcm(LcmBase): } # create RO client - self.RO = NgRoClient(self.loop, **self.ro_config.to_dict()) + self.RO = NgRoClient(**self.ro_config.to_dict()) self.op_status_map = { "instantiation": self.RO.status, @@ -831,9 +837,9 @@ class NsLcm(LcmBase): target_vim, target_vld, vld_params, target_sdn ): if vld_params.get("ip-profile"): - target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[ - "ip-profile" - ] + target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile( + vld_params["ip-profile"] + ) if vld_params.get("provider-network"): target_vld["vim_info"][target_vim]["provider_network"] = vld_params[ "provider-network" @@ -966,6 +972,10 @@ class NsLcm(LcmBase): image["vim_info"] = {} for flavor in target["flavor"]: flavor["vim_info"] = {} + if db_nsr.get("shared-volumes"): + target["shared-volumes"] = deepcopy(db_nsr["shared-volumes"]) + for shared_volumes in target["shared-volumes"]: + shared_volumes["vim_info"] = {} if db_nsr.get("affinity-or-anti-affinity-group"): target["affinity-or-anti-affinity-group"] = deepcopy( db_nsr["affinity-or-anti-affinity-group"] @@ -1044,27 +1054,9 @@ class NsLcm(LcmBase): and nsd_vlp.get("virtual-link-protocol-data") and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data") ): - ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][ + vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][ "l3-protocol-data" ] - ip_profile_dest_data = {} - if "ip-version" in ip_profile_source_data: - ip_profile_dest_data["ip-version"] = ip_profile_source_data[ - "ip-version" - ] - if "cidr" in ip_profile_source_data: - ip_profile_dest_data["subnet-address"] = ip_profile_source_data[ - "cidr" - ] - if "gateway-ip" in ip_profile_source_data: - ip_profile_dest_data["gateway-address"] = ip_profile_source_data[ - "gateway-ip" - ] - if "dhcp-enabled" in ip_profile_source_data: - ip_profile_dest_data["dhcp-params"] = { - "enabled": ip_profile_source_data["dhcp-enabled"] - } - vld_params["ip-profile"] = ip_profile_dest_data # update vld_params with instantiation params vld_instantiation_params = find_in_list( @@ -1130,28 +1122,9 @@ class NsLcm(LcmBase): and vnfd_vlp.get("virtual-link-protocol-data") and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data") ): - ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][ + vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][ "l3-protocol-data" ] - ip_profile_dest_data = {} - if "ip-version" in ip_profile_source_data: - ip_profile_dest_data["ip-version"] = ip_profile_source_data[ - "ip-version" - ] - if "cidr" in ip_profile_source_data: - ip_profile_dest_data["subnet-address"] = ip_profile_source_data[ - "cidr" - ] - if "gateway-ip" in ip_profile_source_data: - ip_profile_dest_data[ - "gateway-address" - ] = ip_profile_source_data["gateway-ip"] - if "dhcp-enabled" in ip_profile_source_data: - ip_profile_dest_data["dhcp-params"] = { - "enabled": ip_profile_source_data["dhcp-enabled"] - } - - vld_params["ip-profile"] = ip_profile_dest_data # update vld_params with instantiation params if vnf_params: vld_instantiation_params = find_in_list( @@ -1273,6 +1246,15 @@ class NsLcm(LcmBase): if target_vim not in ns_ags["vim_info"]: ns_ags["vim_info"][target_vim] = {} + # shared-volumes + if vdur.get("shared-volumes-id"): + for sv_id in vdur["shared-volumes-id"]: + ns_sv = find_in_list( + target["shared-volumes"], lambda sv: sv_id in sv["id"] + ) + if ns_sv: + ns_sv["vim_info"][target_vim] = {} + vdur["vim_info"] = {target_vim: {}} # instantiation parameters if vnf_params: @@ -1286,6 +1268,9 @@ class NsLcm(LcmBase): vdu_instantiation_params, vdud ) vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes + vdur["additionalParams"]["OSM"][ + "vim_flavor_id" + ] = vdu_instantiation_params.get("vim-flavor-id") vdur_list.append(vdur) target_vnf["vdur"] = vdur_list target["vnf"].append(target_vnf) @@ -1351,7 +1336,7 @@ class NsLcm(LcmBase): db_nsr_update["detailed-status"] = " ".join(stage) self.update_db_2("nsrs", nsr_id, db_nsr_update) self._write_op_status(nslcmop_id, stage) - await asyncio.sleep(15, loop=self.loop) + await asyncio.sleep(15) else: # timeout_ns_deploy raise NgRoException("Timeout waiting ns to deploy") @@ -1537,7 +1522,7 @@ class NsLcm(LcmBase): "target KDU={} is in error state".format(kdu_name) ) - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) nb_tries += 1 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name)) @@ -1568,7 +1553,7 @@ class NsLcm(LcmBase): "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id) ) - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) # get ip address if not target_vdu_id: @@ -1854,7 +1839,7 @@ class NsLcm(LcmBase): ee_id, credentials = await self.vca_map[ vca_type ].create_execution_environment( - namespace=namespace, + namespace=nsr_id, reuse_ee_id=ee_id, db_dict=db_dict, config=osm_config, @@ -2329,9 +2314,7 @@ class NsLcm(LcmBase): self.logger.debug( logging_text + "Invoke and wait for placement optimization" ) - await self.msg.aiowrite( - "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop - ) + await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id}) db_poll_interval = 5 wait = db_poll_interval * 10 pla_result = None @@ -2360,6 +2343,299 @@ class NsLcm(LcmBase): vnfr["vim-account-id"] = pla_vnf["vimAccountId"] return modified + def _gather_vnfr_healing_alerts(self, vnfr, vnfd): + alerts = [] + nsr_id = vnfr["nsr-id-ref"] + df = vnfd.get("df", [{}])[0] + # Checking for auto-healing configuration + if "healing-aspect" in df: + healing_aspects = df["healing-aspect"] + for healing in healing_aspects: + for healing_policy in healing.get("healing-policy", ()): + vdu_id = healing_policy["vdu-id"] + vdur = next( + (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]), + {}, + ) + if not vdur: + continue + metric_name = "vm_status" + vdu_name = vdur.get("name") + vnf_member_index = vnfr["member-vnf-index-ref"] + uuid = str(uuid4()) + name = f"healing_{uuid}" + action = healing_policy + # action_on_recovery = healing.get("action-on-recovery") + # cooldown_time = healing.get("cooldown-time") + # day1 = healing.get("day1") + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_name": vdu_name, + }, + "alarm_status": "ok", + "action_type": "healing", + "action": action, + } + alerts.append(alert) + return alerts + + def _gather_vnfr_scaling_alerts(self, vnfr, vnfd): + alerts = [] + nsr_id = vnfr["nsr-id-ref"] + df = vnfd.get("df", [{}])[0] + # Checking for auto-scaling configuration + if "scaling-aspect" in df: + scaling_aspects = df["scaling-aspect"] + all_vnfd_monitoring_params = {} + for ivld in vnfd.get("int-virtual-link-desc", ()): + for mp in ivld.get("monitoring-parameters", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for vdu in vnfd.get("vdu", ()): + for mp in vdu.get("monitoring-parameter", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for df in vnfd.get("df", ()): + for mp in df.get("monitoring-parameter", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for scaling_aspect in scaling_aspects: + scaling_group_name = scaling_aspect.get("name", "") + # Get monitored VDUs + all_monitored_vdus = set() + for delta in scaling_aspect.get("aspect-delta-details", {}).get( + "deltas", () + ): + for vdu_delta in delta.get("vdu-delta", ()): + all_monitored_vdus.add(vdu_delta.get("id")) + monitored_vdurs = list( + filter( + lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus, + vnfr["vdur"], + ) + ) + if not monitored_vdurs: + self.logger.error( + "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric" + ) + continue + for scaling_policy in scaling_aspect.get("scaling-policy", ()): + if scaling_policy["scaling-type"] != "automatic": + continue + threshold_time = scaling_policy.get("threshold-time", "1") + cooldown_time = scaling_policy.get("cooldown-time", "0") + for scaling_criteria in scaling_policy["scaling-criteria"]: + monitoring_param_ref = scaling_criteria.get( + "vnf-monitoring-param-ref" + ) + vnf_monitoring_param = all_vnfd_monitoring_params[ + monitoring_param_ref + ] + for vdur in monitored_vdurs: + vdu_id = vdur["vdu-id-ref"] + metric_name = vnf_monitoring_param.get("performance-metric") + metric_name = f"osm_{metric_name}" + vnf_member_index = vnfr["member-vnf-index-ref"] + scalein_threshold = scaling_criteria.get( + "scale-in-threshold" + ) + scaleout_threshold = scaling_criteria.get( + "scale-out-threshold" + ) + # Looking for min/max-number-of-instances + instances_min_number = 1 + instances_max_number = 1 + vdu_profile = df["vdu-profile"] + if vdu_profile: + profile = next( + item for item in vdu_profile if item["id"] == vdu_id + ) + instances_min_number = profile.get( + "min-number-of-instances", 1 + ) + instances_max_number = profile.get( + "max-number-of-instances", 1 + ) + + if scalein_threshold: + uuid = str(uuid4()) + name = f"scalein_{uuid}" + operation = scaling_criteria[ + "scale-in-relational-operation" + ] + rel_operator = self.rel_operation_types.get( + operation, "<=" + ) + metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' + expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})" + labels = { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + } + prom_cfg = { + "alert": name, + "expr": expression, + "for": str(threshold_time) + "m", + "labels": labels, + } + action = scaling_policy + action = { + "scaling-group": scaling_group_name, + "cooldown-time": cooldown_time, + } + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + }, + "alarm_status": "ok", + "action_type": "scale_in", + "action": action, + "prometheus_config": prom_cfg, + } + alerts.append(alert) + + if scaleout_threshold: + uuid = str(uuid4()) + name = f"scaleout_{uuid}" + operation = scaling_criteria[ + "scale-out-relational-operation" + ] + rel_operator = self.rel_operation_types.get( + operation, "<=" + ) + metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' + expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})" + labels = { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + } + prom_cfg = { + "alert": name, + "expr": expression, + "for": str(threshold_time) + "m", + "labels": labels, + } + action = scaling_policy + action = { + "scaling-group": scaling_group_name, + "cooldown-time": cooldown_time, + } + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + }, + "alarm_status": "ok", + "action_type": "scale_out", + "action": action, + "prometheus_config": prom_cfg, + } + alerts.append(alert) + return alerts + + def _gather_vnfr_alarm_alerts(self, vnfr, vnfd): + alerts = [] + nsr_id = vnfr["nsr-id-ref"] + vnf_member_index = vnfr["member-vnf-index-ref"] + + # Checking for VNF alarm configuration + for vdur in vnfr["vdur"]: + vdu_id = vdur["vdu-id-ref"] + vdu = next(filter(lambda vdu: vdu["id"] == vdu_id, vnfd["vdu"])) + if "alarm" in vdu: + # Get VDU monitoring params, since alerts are based on them + vdu_monitoring_params = {} + for mp in vdu.get("monitoring-parameter", []): + vdu_monitoring_params[mp.get("id")] = mp + if not vdu_monitoring_params: + self.logger.error( + "VDU alarm refers to a VDU monitoring param, but there are no VDU monitoring params in the VDU" + ) + continue + # Get alarms in the VDU + alarm_descriptors = vdu["alarm"] + # Create VDU alarms for each alarm in the VDU + for alarm_descriptor in alarm_descriptors: + # Check that the VDU alarm refers to a proper monitoring param + alarm_monitoring_param = alarm_descriptor.get( + "vnf-monitoring-param-ref", "" + ) + vdu_specific_monitoring_param = vdu_monitoring_params.get( + alarm_monitoring_param, {} + ) + if not vdu_specific_monitoring_param: + self.logger.error( + "VDU alarm refers to a VDU monitoring param not present in the VDU" + ) + continue + metric_name = vdu_specific_monitoring_param.get( + "performance-metric" + ) + if not metric_name: + self.logger.error( + "VDU alarm refers to a VDU monitoring param that has no associated performance-metric" + ) + continue + # Set params of the alarm to be created in Prometheus + metric_name = f"osm_{metric_name}" + metric_threshold = alarm_descriptor.get("value") + uuid = str(uuid4()) + alert_name = f"vdu_alarm_{uuid}" + operation = alarm_descriptor["operation"] + rel_operator = self.rel_operation_types.get(operation, "<=") + metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' + expression = f"{metric_selector} {rel_operator} {metric_threshold}" + labels = { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + "vdu_name": "{{ $labels.vdu_name }}", + } + prom_cfg = { + "alert": alert_name, + "expr": expression, + "for": "1m", # default value. Ideally, this should be related to an IM param, but there is not such param + "labels": labels, + } + alarm_action = dict() + for action_type in ["ok", "insufficient-data", "alarm"]: + if ( + "actions" in alarm_descriptor + and action_type in alarm_descriptor["actions"] + ): + alarm_action[action_type] = alarm_descriptor["actions"][ + action_type + ] + alert = { + "uuid": uuid, + "name": alert_name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + }, + "alarm_status": "ok", + "action_type": "vdu_alarm", + "action": alarm_action, + "prometheus_config": prom_cfg, + } + alerts.append(alert) + return alerts + def update_nsrs_with_pla_result(self, params): try: nslcmop_id = deep_get(params, ("placement", "nslcmopId")) @@ -2569,13 +2845,16 @@ class NsLcm(LcmBase): # create namespace and certificate if any helm based EE is present in the NS if check_helm_ee_in_ns(db_vnfds): - # TODO: create EE namespace + await self.vca_map["helm-v3"].setup_ns_namespace( + name=nsr_id, + ) # create TLS certificates await self.vca_map["helm-v3"].create_tls_certificate( - secret_name="ee-tls-{}".format(nsr_id), + secret_name=self.EE_TLS_NAME, dns_prefix="*", nsr_id=nsr_id, usage="server auth", + namespace=nsr_id, ) nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI @@ -2711,6 +2990,65 @@ class NsLcm(LcmBase): stage=stage, ) + # Check if each vnf has exporter for metric collection if so update prometheus job records + if "exporters-endpoints" in vnfd.get("df")[0]: + exporter_config = vnfd.get("df")[0].get("exporters-endpoints") + self.logger.debug("exporter config :{}".format(exporter_config)) + artifact_path = "{}/{}/{}".format( + base_folder["folder"], + base_folder["pkg-dir"], + "exporter-endpoint", + ) + ee_id = None + ee_config_descriptor = exporter_config + vnfr_id = db_vnfr["id"] + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( + logging_text, + nsr_id, + vnfr_id, + vdu_id=None, + vdu_index=None, + user=None, + pub_key=None, + ) + self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip)) + self.logger.debug("Artifact_path:{}".format(artifact_path)) + db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) + vdu_id_for_prom = None + vdu_index_for_prom = None + for x in get_iterable(db_vnfr, "vdur"): + vdu_id_for_prom = x.get("vdu-id-ref") + vdu_index_for_prom = x.get("count-index") + prometheus_jobs = await self.extract_prometheus_scrape_jobs( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, + element_type="VDU", + vdu_id=vdu_id_for_prom, + vdu_index=vdu_index_for_prom, + ) + + self.logger.debug("Prometheus job:{}".format(prometheus_jobs)) + if prometheus_jobs: + db_nsr_update["_admin.deployed.prometheus_jobs"] = prometheus_jobs + self.update_db_2( + "nsrs", + nsr_id, + db_nsr_update, + ) + + for job in prometheus_jobs: + self.db.set_one( + "prometheus_jobs", + {"job_name": job["job_name"]}, + job, + upsert=True, + fail_on_empty=False, + ) + # Check if this NS has a charm configuration descriptor_config = nsd.get("ns-configuration") if descriptor_config and descriptor_config.get("juju"): @@ -2834,7 +3172,27 @@ class NsLcm(LcmBase): db_nsr_update["detailed-status"] = "Done" db_nslcmop_update["detailed-status"] = "Done" nslcmop_operation_state = "COMPLETED" - + # Gather auto-healing and auto-scaling alerts for each vnfr + healing_alerts = [] + scaling_alerts = [] + for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}): + vnfd = next( + (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None + ) + healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd) + for alert in healing_alerts: + self.logger.info(f"Storing healing alert in MongoDB: {alert}") + self.db.create("alerts", alert) + + scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd) + for alert in scaling_alerts: + self.logger.info(f"Storing scaling alert in MongoDB: {alert}") + self.db.create("alerts", alert) + + alarm_alerts = self._gather_vnfr_alarm_alerts(vnfr, vnfd) + for alert in alarm_alerts: + self.logger.info(f"Storing VNF alarm alert in MongoDB: {alert}") + self.db.create("alerts", alert) if db_nsr: self._write_ns_status( nsr_id=nsr_id, @@ -2863,7 +3221,6 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, }, - loop=self.loop, ) except Exception as e: self.logger.error( @@ -4109,7 +4466,7 @@ class NsLcm(LcmBase): # TODO vdu_index_count for vca in vca_deployed_list: if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id: - return vca["ee_id"] + return vca.get("ee_id") async def destroy_N2VC( self, @@ -4391,9 +4748,12 @@ class NsLcm(LcmBase): # Delete Namespace and Certificates if necessary if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())): await self.vca_map["helm-v3"].delete_tls_certificate( - certificate_name=db_nslcmop["nsInstanceId"], + namespace=db_nslcmop["nsInstanceId"], + certificate_name=self.EE_TLS_NAME, + ) + await self.vca_map["helm-v3"].delete_namespace( + namespace=db_nslcmop["nsInstanceId"], ) - # TODO: Delete namespace # Delete from k8scluster stage[1] = "Deleting KDUs." @@ -4550,12 +4910,13 @@ class NsLcm(LcmBase): "operationState": nslcmop_operation_state, "autoremove": autoremove, }, - loop=self.loop, ) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) ) + self.logger.debug(f"Deleting alerts: ns_id={nsr_id}") + self.db.del_list("alerts", {"tags.ns_id": nsr_id}) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") @@ -4806,7 +5167,7 @@ class NsLcm(LcmBase): ) ) # wait and retry - await asyncio.sleep(retries_interval, loop=self.loop) + await asyncio.sleep(retries_interval) else: if isinstance(e, asyncio.TimeoutError): e = N2VCException( @@ -5046,9 +5407,10 @@ class NsLcm(LcmBase): del desc_params["kdu_model"] else: kdu_model = kdu.get("kdu-model") - parts = kdu_model.split(sep=":") - if len(parts) == 2: - kdu_model = parts[0] + if kdu_model.count("/") < 2: # helm chart is not embedded + parts = kdu_model.split(sep=":") + if len(parts) == 2: + kdu_model = parts[0] if desc_params.get("kdu_atomic_upgrade"): atomic_upgrade = desc_params.get( "kdu_atomic_upgrade" @@ -5217,7 +5579,6 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, }, - loop=self.loop, ) except Exception as e: self.logger.error( @@ -6048,7 +6409,7 @@ class NsLcm(LcmBase): and member_vnf_index ): msg.update({"vnf_member_index": member_vnf_index}) - await self.msg.aiowrite("ns", change_type, msg, loop=self.loop) + await self.msg.aiowrite("ns", change_type, msg) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) @@ -7081,7 +7442,7 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, } - await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop) + await self.msg.aiowrite("ns", "scaled", msg) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) @@ -7301,6 +7662,8 @@ class NsLcm(LcmBase): ) if not job_file: return + self.logger.debug("Artifact path{}".format(artifact_path)) + self.logger.debug("job file{}".format(job_file)) with self.fs.file_open((artifact_path, job_file), "r") as f: job_data = f.read() @@ -7339,7 +7702,7 @@ class NsLcm(LcmBase): kdur_name = kdur.get("name") break - await asyncio.sleep(10, loop=self.loop) + await asyncio.sleep(10) else: if vdu_id and vdu_index is not None: raise LcmException( @@ -7351,21 +7714,33 @@ class NsLcm(LcmBase): ) # TODO get_service - _, _, service = ee_id.partition(".") # remove prefix "namespace." - host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) - host_port = "80" - vnfr_id = vnfr_id.replace("-", "") - variables = { - "JOB_NAME": vnfr_id, - "TARGET_IP": target_ip, - "EXPORTER_POD_IP": host_name, - "EXPORTER_POD_PORT": host_port, - "NSR_ID": nsr_id, - "VNF_MEMBER_INDEX": vnf_member_index, - "VDUR_NAME": vdur_name, - "KDUR_NAME": kdur_name, - "ELEMENT_TYPE": element_type, - } + if ee_id is not None: + _, _, service = ee_id.partition(".") # remove prefix "namespace." + host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) + host_port = "80" + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "EXPORTER_POD_IP": host_name, + "EXPORTER_POD_PORT": host_port, + "NSR_ID": nsr_id, + "VNF_MEMBER_INDEX": vnf_member_index, + "VDUR_NAME": vdur_name, + "KDUR_NAME": kdur_name, + "ELEMENT_TYPE": element_type, + } + else: + metric_path = ee_config_descriptor["metric-path"] + target_port = ee_config_descriptor["metric-port"] + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "TARGET_PORT": target_port, + "METRIC_PATH": metric_path, + } + job_list = parse_job(job_data, variables) # ensure job_name is using the vnfr_id. Adding the metadata nsr_id for job in job_list: @@ -7373,7 +7748,7 @@ class NsLcm(LcmBase): not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"] ): - job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) + job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000)) job["nsr_id"] = nsr_id job["vnfr_id"] = vnfr_id return job_list @@ -7582,7 +7957,7 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, } - await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop) + await self.msg.aiowrite("ns", "migrated", msg) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) @@ -7706,9 +8081,9 @@ class NsLcm(LcmBase): for target_vdu in target_vdu_list: deploy_params_vdu = target_vdu # Set run-day1 vnf level value if not vdu level value exists - if not deploy_params_vdu.get("run-day1") and target_vnf[ - "additionalParams" - ].get("run-day1"): + if not deploy_params_vdu.get("run-day1") and target_vnf.get( + "additionalParams", {} + ).get("run-day1"): deploy_params_vdu["run-day1"] = target_vnf[ "additionalParams" ].get("run-day1") @@ -7868,7 +8243,7 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, } - await self.msg.aiowrite("ns", "healed", msg, loop=self.loop) + await self.msg.aiowrite("ns", "healed", msg) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) @@ -8549,7 +8924,7 @@ class NsLcm(LcmBase): self.logger.debug("Wait Heal RO > {}".format(operational_status_ro)) if operational_status_ro != "healing": break - await asyncio.sleep(15, loop=self.loop) + await asyncio.sleep(15) else: # timeout_ns_deploy raise NgRoException("Timeout waiting ns to deploy") @@ -8647,7 +9022,7 @@ class NsLcm(LcmBase): "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state, } - await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop) + await self.msg.aiowrite("ns", "verticalscaled", msg) except Exception as e: self.logger.error( logging_text + "kafka_write notification Exception {}".format(e) diff --git a/osm_lcm/tests/test_lcm_helm_conn.py b/osm_lcm/tests/test_lcm_helm_conn.py index 2ea9ae8..b4af5a3 100644 --- a/osm_lcm/tests/test_lcm_helm_conn.py +++ b/osm_lcm/tests/test_lcm_helm_conn.py @@ -56,9 +56,7 @@ class TestLcmHelmConn(asynctest.TestCase): lcm_helm_conn.K8sHelm3Connector ) vca_config = VcaConfig(vca_config) - self.helm_conn = LCMHelmConn( - loop=self.loop, vca_config=vca_config, log=self.logger - ) + self.helm_conn = LCMHelmConn(vca_config=vca_config, log=self.logger) @asynctest.fail_on(active_handles=True) async def test_create_execution_environment(self): @@ -89,14 +87,14 @@ class TestLcmHelmConn(asynctest.TestCase): ) self.assertEqual( ee_id, - "{}:{}.{}".format("helm-v3", "osm", helm_chart_id), - "Check ee_id format: :.", + "{}:{}.{}".format("helm-v3", namespace, helm_chart_id), + "Check ee_id format: :.", ) self.helm_conn._k8sclusterhelm3.install.assert_called_once_with( "myk8s_id", kdu_model="/helm_sample_charm", kdu_instance=helm_chart_id, - namespace="osm", + namespace=namespace, db_dict=db_dict, params=None, timeout=None, diff --git a/osm_lcm/tests/test_lcm_utils.py b/osm_lcm/tests/test_lcm_utils.py index 3305fa6..5557e70 100644 --- a/osm_lcm/tests/test_lcm_utils.py +++ b/osm_lcm/tests/test_lcm_utils.py @@ -27,7 +27,7 @@ from osm_common.msgkafka import MsgKafka from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem -from osm_lcm.lcm_utils import LcmBase, LcmException +from osm_lcm.lcm_utils import LcmBase, LcmException, vld_to_ro_ip_profile from osm_lcm.tests import test_db_descriptors as descriptors tmpdir = tempfile.mkdtemp()[1] @@ -438,3 +438,96 @@ class TestLcmBase(TestCase): with self.assertRaises(FileNotFoundError): LcmBase.compare_charmdir_hash(charm_dir1, charm_dir2) self.assertEqual(mock_checksum.dirhash.call_count, 1) + + +class TestLcmUtils(TestCase): + def setUp(self): + pass + + def test__vld_to_ro_ip_profile_with_none(self): + vld_data = None + + result = vld_to_ro_ip_profile( + source_data=vld_data, + ) + + self.assertIsNone(result) + + def test__vld_to_ro_ip_profile_with_empty_profile(self): + vld_data = {} + + result = vld_to_ro_ip_profile( + source_data=vld_data, + ) + + self.assertIsNone(result) + + def test__vld_to_ro_ip_profile_with_wrong_profile(self): + vld_data = { + "no-profile": "here", + } + expected_result = { + "ip_version": "IPv4", + "subnet_address": None, + "gateway_address": None, + "dns_address": None, + "dhcp_enabled": False, + "dhcp_start_address": None, + "dhcp_count": None, + "ipv6_address_mode": None, + } + + result = vld_to_ro_ip_profile( + source_data=vld_data, + ) + + self.assertDictEqual(expected_result, result) + + def test__vld_to_ro_ip_profile_with_ipv4_profile(self): + vld_data = { + "ip-version": "ipv4", + "cidr": "192.168.0.0/24", + "gateway-ip": "192.168.0.254", + "dhcp-enabled": True, + "dns-server": [{"address": "8.8.8.8"}], + } + expected_result = { + "ip_version": "IPv4", + "subnet_address": "192.168.0.0/24", + "gateway_address": "192.168.0.254", + "dns_address": "8.8.8.8", + "dhcp_enabled": True, + "dhcp_start_address": None, + "dhcp_count": None, + "ipv6_address_mode": None, + } + + result = vld_to_ro_ip_profile( + source_data=vld_data, + ) + + self.assertDictEqual(expected_result, result) + + def test__vld_to_ro_ip_profile_with_ipv6_profile(self): + vld_data = { + "ip-version": "ipv6", + "cidr": "2001:0200:0001::/48", + "gateway-ip": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe", + "dhcp-enabled": True, + } + expected_result = { + "ip_version": "IPv6", + "subnet_address": "2001:0200:0001::/48", + "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe", + "dns_address": None, + "dhcp_enabled": True, + "dhcp_start_address": None, + "dhcp_count": None, + "ipv6_address_mode": None, + } + + result = vld_to_ro_ip_profile( + source_data=vld_data, + ) + + self.assertDictEqual(expected_result, result) diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 7e72700..91ad6a3 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -117,7 +117,7 @@ class TestBaseNS(asynctest.TestCase): ("active", "Ready!"), ): # call callback after some time - asyncio.sleep(5, loop=self.loop) + asyncio.sleep(5) callback(model_name, application_name, status, message, *callback_args) @staticmethod @@ -253,7 +253,7 @@ class TestBaseNS(asynctest.TestCase): ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn) def create_nslcm_class(self): - self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config, self.loop) + self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config) self.my_ns.fs = self.fs self.my_ns.db = self.db self.my_ns._wait_dependent_n2vc = asynctest.CoroutineMock() @@ -312,9 +312,7 @@ class TestBaseNS(asynctest.TestCase): def mock_ro(self): if not getenv("OSMLCMTEST_RO_NOMOCK"): - self.my_ns.RO = asynctest.Mock( - NgRoClient(self.loop, **lcm_config.RO.to_dict()) - ) + self.my_ns.RO = asynctest.Mock(NgRoClient(**lcm_config.RO.to_dict())) # TODO first time should be empty list, following should return a dict # self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[]) self.my_ns.RO.deploy = asynctest.CoroutineMock( @@ -326,87 +324,6 @@ class TestBaseNS(asynctest.TestCase): # "description": "done"}}) self.my_ns.RO.delete = asynctest.CoroutineMock(self.my_ns.RO.delete) - # @asynctest.fail_on(active_handles=True) # all async tasks must be completed - # async def test_instantiate(self): - # nsr_id = descriptors.test_ids["TEST-A"]["ns"] - # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"] - # # print("Test instantiate started") - - # # delete deployed information of database - # if not getenv("OSMLCMTEST_DB_NOMOCK"): - # if self.db.get_list("nsrs")[0]["_admin"].get("deployed"): - # del self.db.get_list("nsrs")[0]["_admin"]["deployed"] - # for db_vnfr in self.db.get_list("vnfrs"): - # db_vnfr.pop("ip_address", None) - # for db_vdur in db_vnfr["vdur"]: - # db_vdur.pop("ip_address", None) - # db_vdur.pop("mac_address", None) - # if getenv("OSMLCMTEST_RO_VIMID"): - # self.db.get_list("vim_accounts")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID") - # if getenv("OSMLCMTEST_RO_VIMID"): - # self.db.get_list("nsrs")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID") - - # await self.my_ns.instantiate(nsr_id, nslcmop_id) - - # self.msg.aiowrite.assert_called_once_with("ns", "instantiated", - # {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, - # "operationState": "COMPLETED"}, - # loop=self.loop) - # self.lcm_tasks.lock_HA.assert_called_once_with('ns', 'nslcmops', nslcmop_id) - # if not getenv("OSMLCMTEST_LOGGING_NOMOCK"): - # self.assertTrue(self.my_ns.logger.debug.called, "Debug method not called") - # self.my_ns.logger.error.assert_not_called() - # self.my_ns.logger.exception().assert_not_called() - - # if not getenv("OSMLCMTEST_DB_NOMOCK"): - # self.assertTrue(self.db.set_one.called, "db.set_one not called") - # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) - # self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated") - # for vnfr in db_vnfrs_list: - # self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated") - - # if not getenv("OSMLCMTEST_VCA_NOMOCK"): - # # check intial-primitives called - # self.assertTrue(self.my_ns.n2vc.exec_primitive.called, - # "Exec primitive not called for initial config primitive") - # for _call in self.my_ns.n2vc.exec_primitive.call_args_list: - # self.assertIn(_call[1]["primitive_name"], ("config", "touch"), - # "called exec primitive with a primitive different than config or touch") - - # # TODO add more checks of called methods - # # TODO add a terminate - - # async def test_instantiate_ee_list(self): - # # Using modern IM where configuration is in the new format of execution_environment_list - # ee_descriptor_id = "charm_simple" - # non_used_initial_primitive = { - # "name": "not_to_be_called", - # "seq": 3, - # "execution-environment-ref": "not_used_ee" - # } - # ee_list = [ - # { - # "id": ee_descriptor_id, - # "juju": {"charm": "simple"}, - - # }, - # ] - - # self.db.set_one( - # "vnfds", - # q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}, - # update_dict={"vnf-configuration.0.execution-environment-list": ee_list, - # "vnf-configuration.0.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id, - # "vnf-configuration.0.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id, - # "vnf-configuration.0.initial-config-primitive.2": non_used_initial_primitive, - # "vnf-configuration.0.config-primitive.0.execution-environment-ref": ee_descriptor_id, - # "vnf-configuration.0.config-primitive.0.execution-environment-primitive": "touch_charm", - # }, - # unset={"vnf-configuration.juju": None}) - # await self.test_instantiate() - # # this will check that the initial-congig-primitive 'not_to_be_called' is not called - class TestMyNS(TestBaseNS): @asynctest.fail_on(active_handles=True) diff --git a/osm_lcm/tests/test_vim_sdn.py b/osm_lcm/tests/test_vim_sdn.py index 98b3c5d..7bd6c65 100644 --- a/osm_lcm/tests/test_vim_sdn.py +++ b/osm_lcm/tests/test_vim_sdn.py @@ -35,7 +35,7 @@ class TestVcaLcm(TestCase): self.msg = Mock(msgbase.MsgBase()) self.lcm_tasks = Mock() self.config = {"database": {"driver": "mongo"}} - self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config) self.vca_lcm.db = Mock() self.vca_lcm.fs = Mock() @@ -343,9 +343,7 @@ class TestK8SClusterLcm(TestCase): "kubectlpath": "/usr/bin/kubectl", } } - self.k8scluster_lcm = K8sClusterLcm( - self.msg, self.lcm_tasks, self.vca_config, self.loop - ) + self.k8scluster_lcm = K8sClusterLcm(self.msg, self.lcm_tasks, self.vca_config) self.k8scluster_lcm.db = Mock() self.k8scluster_lcm.fs = Mock() diff --git a/osm_lcm/vim_sdn.py b/osm_lcm/vim_sdn.py index ea25c2b..47015c0 100644 --- a/osm_lcm/vim_sdn.py +++ b/osm_lcm/vim_sdn.py @@ -46,7 +46,7 @@ class VimLcm(LcmBase): ), } - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -54,7 +54,6 @@ class VimLcm(LcmBase): """ self.logger = logging.getLogger("lcm.vim") - self.loop = loop self.lcm_tasks = lcm_tasks self.ro_config = config["RO"] @@ -111,7 +110,7 @@ class VimLcm(LcmBase): db_vim_update["_admin.deployed.RO"] = None db_vim_update["_admin.detailed-status"] = step self.update_db_2("vim_accounts", vim_id, db_vim_update) - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) vim_RO = deepcopy(vim_content) vim_RO.pop("_id", None) vim_RO.pop("_admin", None) @@ -270,7 +269,7 @@ class VimLcm(LcmBase): RO_vim_id = db_vim["_admin"]["deployed"]["RO"] step = "Editing vim at RO" - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) vim_RO = deepcopy(vim_content) vim_RO.pop("_id", None) vim_RO.pop("_admin", None) @@ -394,7 +393,7 @@ class VimLcm(LcmBase): and db_vim["_admin"]["deployed"].get("RO") ): RO_vim_id = db_vim["_admin"]["deployed"]["RO"] - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) step = "Detaching vim from RO tenant" try: await RO.detach("vim_account", RO_vim_id) @@ -465,7 +464,7 @@ class WimLcm(LcmBase): # values that are encrypted at wim config because they are passwords wim_config_encrypted = () - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -473,7 +472,6 @@ class WimLcm(LcmBase): """ self.logger = logging.getLogger("lcm.vim") - self.loop = loop self.lcm_tasks = lcm_tasks self.ro_config = config["RO"] @@ -502,7 +500,7 @@ class WimLcm(LcmBase): step = "Creating wim at RO" db_wim_update["_admin.detailed-status"] = step self.update_db_2("wim_accounts", wim_id, db_wim_update) - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) wim_RO = deepcopy(wim_content) wim_RO.pop("_id", None) wim_RO.pop("_admin", None) @@ -625,7 +623,7 @@ class WimLcm(LcmBase): ): RO_wim_id = db_wim["_admin"]["deployed"]["RO"] step = "Editing wim at RO" - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) wim_RO = deepcopy(wim_content) wim_RO.pop("_id", None) wim_RO.pop("_admin", None) @@ -741,7 +739,7 @@ class WimLcm(LcmBase): and db_wim["_admin"]["deployed"].get("RO") ): RO_wim_id = db_wim["_admin"]["deployed"]["RO"] - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) step = "Detaching wim from RO tenant" try: await RO.detach("wim_account", RO_wim_id) @@ -809,7 +807,7 @@ class WimLcm(LcmBase): class SdnLcm(LcmBase): - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -817,7 +815,6 @@ class SdnLcm(LcmBase): """ self.logger = logging.getLogger("lcm.sdn") - self.loop = loop self.lcm_tasks = lcm_tasks self.ro_config = config["RO"] @@ -848,7 +845,7 @@ class SdnLcm(LcmBase): db_sdn_update["_admin.detailed-status"] = step self.update_db_2("sdns", sdn_id, db_sdn_update) - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) sdn_RO = deepcopy(sdn_content) sdn_RO.pop("_id", None) sdn_RO.pop("_admin", None) @@ -931,7 +928,7 @@ class SdnLcm(LcmBase): and db_sdn["_admin"]["deployed"].get("RO") ): RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"] - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) step = "Editing sdn at RO" sdn_RO = deepcopy(sdn_content) sdn_RO.pop("_id", None) @@ -1011,7 +1008,7 @@ class SdnLcm(LcmBase): and db_sdn["_admin"]["deployed"].get("RO") ): RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"] - RO = ROclient.ROClient(self.loop, **self.ro_config) + RO = ROclient.ROClient(**self.ro_config) step = "Deleting sdn from RO" try: await RO.delete("sdn", RO_sdn_id) @@ -1068,7 +1065,7 @@ class SdnLcm(LcmBase): class K8sClusterLcm(LcmBase): timeout_create = 300 - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -1076,7 +1073,6 @@ class K8sClusterLcm(LcmBase): """ self.logger = logging.getLogger("lcm.k8scluster") - self.loop = loop self.lcm_tasks = lcm_tasks self.vca_config = config["VCA"] @@ -1104,7 +1100,6 @@ class K8sClusterLcm(LcmBase): kubectl_command=self.vca_config.get("kubectlpath"), juju_command=self.vca_config.get("jujupath"), log=self.logger, - loop=self.loop, on_update_db=None, db=self.db, fs=self.fs, @@ -1366,7 +1361,7 @@ class K8sClusterLcm(LcmBase): db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED" if k8s_h3c_id: - step = "Removing helm-chart-v3 '{}'".format(k8s_hc_id) + step = "Removing helm-chart-v3 '{}'".format(k8s_h3c_id) uninstall_sw = ( deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "created")) or False @@ -1451,7 +1446,7 @@ class K8sClusterLcm(LcmBase): class VcaLcm(LcmBase): timeout_create = 30 - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -1459,15 +1454,12 @@ class VcaLcm(LcmBase): """ self.logger = logging.getLogger("lcm.vca") - self.loop = loop self.lcm_tasks = lcm_tasks super().__init__(msg, self.logger) # create N2VC connector - self.n2vc = N2VCJujuConnector( - log=self.logger, loop=self.loop, fs=self.fs, db=self.db - ) + self.n2vc = N2VCJujuConnector(log=self.logger, fs=self.fs, db=self.db) def _get_vca_by_id(self, vca_id: str) -> dict: db_vca = self.db.get_one("vca", {"_id": vca_id}) @@ -1687,7 +1679,7 @@ class VcaLcm(LcmBase): class K8sRepoLcm(LcmBase): - def __init__(self, msg, lcm_tasks, config, loop): + def __init__(self, msg, lcm_tasks, config): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -1695,7 +1687,6 @@ class K8sRepoLcm(LcmBase): """ self.logger = logging.getLogger("lcm.k8srepo") - self.loop = loop self.lcm_tasks = lcm_tasks self.vca_config = config["VCA"] diff --git a/requirements-dev.txt b/requirements-dev.txt index d2653cc..2d76d5b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -30,7 +30,7 @@ cachetools==5.3.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # google-auth -certifi==2022.12.7 +certifi==2023.5.7 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # kubernetes @@ -40,18 +40,23 @@ cffi==1.15.1 # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # cryptography # pynacl -charset-normalizer==3.0.1 +charset-normalizer==3.1.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas # requests -cryptography==39.0.1 +cryptography==40.0.2 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # paramiko dataclasses==0.6 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas -google-auth==2.16.1 +dnspython==2.3.0 + # via + # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas + # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas + # pymongo +google-auth==2.17.3 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # kubernetes @@ -77,7 +82,8 @@ macaroonbakery==1.3.1 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # juju -motor==1.3.1 + # theblues +motor==3.1.2 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas @@ -93,7 +99,7 @@ oauthlib==3.2.2 # requests-oauthlib osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@paas # via -r requirements-dev.in -packaging==23.0 +packaging==23.1 # via # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas # aiokafka @@ -107,13 +113,13 @@ protobuf==3.20.3 # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas # macaroonbakery # temporalio -pyasn1==0.4.8 +pyasn1==0.5.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # juju # pyasn1-modules # rsa -pyasn1-modules==0.2.8 +pyasn1-modules==0.3.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # google-auth @@ -131,7 +137,7 @@ pymacaroons==0.13.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # macaroonbakery -pymongo==3.13.0 +pymongo==4.3.3 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas @@ -153,7 +159,7 @@ python-dateutil==2.8.2 # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas # kubernetes # temporalio -pytz==2022.7.1 +pytz==2023.3 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # pyrfc3339 @@ -163,7 +169,7 @@ pyyaml==5.4.1 # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas # juju # kubernetes -requests==2.28.2 +requests==2.30.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # hvac @@ -210,7 +216,7 @@ typing-inspect==0.8.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # juju -urllib3==1.26.14 +urllib3==2.0.2 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # kubernetes @@ -219,7 +225,7 @@ websocket-client==1.5.1 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # kubernetes -websockets==8.1 +websockets==11.0.3 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas # juju diff --git a/requirements-test.txt b/requirements-test.txt index 9842e2b..bbe5881 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -16,11 +16,11 @@ ####################################################################################### asynctest==0.13.0 # via -r requirements-test.in -coverage==7.2.3 +coverage==7.2.5 # via -r requirements-test.in -mock==5.0.1 +mock==5.0.2 # via -r requirements-test.in -nose2==0.12.0 +nose2==0.13.0 # via -r requirements-test.in parameterized==0.9.0 # via -r requirements-test.in diff --git a/requirements.in b/requirements.in index 7fde0fd..8a96125 100644 --- a/requirements.in +++ b/requirements.in @@ -22,4 +22,5 @@ idna jinja2 pyyaml==5.4.1 pydantic +protobuf==3.20.3 temporalio diff --git a/requirements.txt b/requirements.txt index 6436e26..25a8e68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ async-timeout==4.0.2 # via # -r requirements.in # aiohttp -attrs==22.2.0 +attrs==23.1.0 # via # aiohttp # glom @@ -30,7 +30,7 @@ boltons==23.0.0 # via # face # glom -charset-normalizer==3.0.1 +charset-normalizer==3.1.0 # via aiohttp checksumdir==1.2.0 # via -r requirements.in @@ -42,13 +42,13 @@ frozenlist==1.3.3 # via # aiohttp # aiosignal -glom==23.1.1 +glom==23.3.0 # via config-man -grpcio==1.51.3 +grpcio==1.54.2 # via grpcio-tools grpcio-tools==1.48.1 # via -r requirements.in -grpclib==0.4.3 +grpclib==0.4.4 # via -r requirements.in h2==4.1.0 # via grpclib @@ -71,9 +71,10 @@ multidict==6.0.4 # yarl protobuf==3.20.3 # via + # -r requirements.in # grpcio-tools # temporalio -pydantic==1.10.5 +pydantic==1.10.7 # via -r requirements.in python-dateutil==2.8.2 # via temporalio @@ -89,7 +90,7 @@ typing-extensions==4.5.0 # via # pydantic # temporalio -yarl==1.8.2 +yarl==1.9.2 # via aiohttp # The following packages are considered to be unsafe in a requirements file: diff --git a/tox.ini b/tox.ini index 0b5b8f7..9d18409 100644 --- a/tox.ini +++ b/tox.ini @@ -23,7 +23,7 @@ toxworkdir = /tmp/.tox [testenv] usedevelop = True -basepython = python3.8 +basepython = python3.10 setenv = VIRTUAL_ENV={envdir} PYTHONDONTWRITEBYTECODE = 1 deps = -r{toxinidir}/requirements.txt @@ -50,7 +50,7 @@ commands = coverage report --omit='*tests*' coverage html -d ./cover --omit='*tests*' coverage xml -o coverage.xml --omit=*tests* -whitelist_externals = sh +allowlist_externals = sh ####################################################################################### @@ -67,7 +67,7 @@ deps = {[testenv]deps} -r{toxinidir}/requirements-test.txt pylint commands = - pylint -E osm_lcm --extension-pkg-whitelist=pydantic --disable=E1101 # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961) + pylint -E osm_lcm --extension-pkg-allow-list=pydantic --disable=E1101 # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961) ####################################################################################### @@ -85,7 +85,7 @@ commands = [testenv:pip-compile] deps = pip-tools==6.6.2 skip_install = true -whitelist_externals = bash +allowlist_externals = bash [ commands = - bash -c "for file in requirements*.in ; do \ @@ -109,7 +109,7 @@ commands = python3 setup.py --command-packages=stdeb.command sdist_dsc sh -c 'cd deb_dist/osm-lcm*/ && dpkg-buildpackage -rfakeroot -uc -us' sh -c 'rm osm_lcm/requirements.txt' -whitelist_externals = sh +allowlist_externals = sh ####################################################################################### [flake8]