# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
-
-RUN python3 -m easy_install pip==21.3.1
-RUN pip install tox==3.24.5
+ python3-setuptools \
+ python3-pip \
+ tox
rm -rf dists
mkdir -p pool/$MDG
mv deb_dist/*.deb pool/$MDG/
-mkdir -p dists/unstable/$MDG/binary-amd64/
-apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages
-gzip -9fk dists/unstable/$MDG/binary-amd64/Packages
-echo "dists/**,pool/$MDG/*.deb"
+
timeout_large = 120
timeout_short = 30
- def __init__(self, loop, uri, **kwargs):
- self.loop = loop
+ def __init__(self, uri, **kwargs):
self.uri = uri
self.username = kwargs.get("username")
except asyncio.TimeoutError:
raise ROClientException("Timeout", http_code=504)
except Exception as e:
+ self.logger.critical(
+ "Got invalid version text: '{}'; causing exception {}".format(
+ response_text, str(e)
+ )
+ )
raise ROClientException(
"Got invalid version text: '{}'; causing exception {}".format(
response_text, e
raise ROClientException("Invalid item {}".format(item))
if item == "tenant":
all_tenants = None
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
content = await self._list_item(
session,
self.client_to_RO[item],
elif item == "vim_account":
all_tenants = False
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
content = await self._get_item(
session,
self.client_to_RO[item],
if item in ("tenant", "vim", "wim"):
all_tenants = None
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
result = await self._del_item(
session,
self.client_to_RO[item],
create_desc = self._create_envelop(item, desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
_all_tenants = all_tenants
if item == "vim":
_all_tenants = True
create_desc = self._create_envelop(item, desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
outdata = await self._create_item(
session,
self.client_to_RO[item],
# create_desc = self._create_envelop(item, desc)
create_desc = desc
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
_all_tenants = all_tenants
if item == "vim":
_all_tenants = True
)
create_desc = self._create_envelop(item, desc)
payload_req = yaml.safe_dump(create_desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
# check that exist
item_id = await self._get_item_uuid(
session, self.client_to_RO[item], item_id_name, all_tenants=True
async def detach(self, item, item_id_name=None):
# TODO replace the code with delete_item(vim_account,...)
try:
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
# check that exist
item_id = await self._get_item_uuid(
session, self.client_to_RO[item], item_id_name, all_tenants=False
raise ROClientException(str(content), http_code=mano_response.status)
else:
raise ROClientException("Unknown value for action '{}".format(str(action)))
-
-
-if __name__ == "__main__":
- RO_URL = "http://localhost:9090/openmano"
- TEST_TENANT = "myTenant"
- TEST_VIM1 = "myvim"
- TEST_URL1 = "https://localhost:5000/v1"
- TEST_TYPE1 = "openstack"
- TEST_CONFIG1 = {"use_floating_ip": True}
- TEST_VIM2 = "myvim2"
- TEST_URL2 = "https://localhost:5000/v2"
- TEST_TYPE2 = "openvim"
- TEST_CONFIG2 = {"config2": "config2", "config3": True}
-
- streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
- logging.basicConfig(format=streamformat)
- logger = logging.getLogger("ROClient")
-
- tenant_id = None
- vim_id = False
- loop = asyncio.get_event_loop()
- myClient = ROClient(uri=RO_URL, loop=loop, loglevel="DEBUG")
- try:
- # test tenant
- content = loop.run_until_complete(myClient.get_list("tenant"))
- print("tenants", content)
- content = loop.run_until_complete(myClient.create("tenant", name=TEST_TENANT))
- tenant_id = True
- content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
- print("tenant", TEST_TENANT, content)
- content = loop.run_until_complete(
- myClient.edit("tenant", TEST_TENANT, description="another description")
- )
- content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
- print("tenant edited", TEST_TENANT, content)
- myClient["tenant"] = TEST_TENANT
-
- # test VIM
- content = loop.run_until_complete(
- myClient.create(
- "vim",
- name=TEST_VIM1,
- type=TEST_TYPE1,
- vim_url=TEST_URL1,
- config=TEST_CONFIG1,
- )
- )
- vim_id = True
- content = loop.run_until_complete(myClient.get_list("vim"))
- print("vim", content)
- content = loop.run_until_complete(myClient.show("vim", TEST_VIM1))
- print("vim", TEST_VIM1, content)
- content = loop.run_until_complete(
- myClient.edit(
- "vim",
- TEST_VIM1,
- description="another description",
- name=TEST_VIM2,
- type=TEST_TYPE2,
- vim_url=TEST_URL2,
- config=TEST_CONFIG2,
- )
- )
- content = loop.run_until_complete(myClient.show("vim", TEST_VIM2))
- print("vim edited", TEST_VIM2, content)
-
- # test VIM_ACCOUNT
- content = loop.run_until_complete(
- myClient.attach_datacenter(
- TEST_VIM2,
- vim_username="user",
- vim_password="pass",
- vim_tenant_name="vimtenant1",
- config=TEST_CONFIG1,
- )
- )
- vim_id = True
- content = loop.run_until_complete(myClient.get_list("vim_account"))
- print("vim_account", content)
- content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
- print("vim_account", TEST_VIM2, content)
- content = loop.run_until_complete(
- myClient.edit(
- "vim_account",
- TEST_VIM2,
- vim_username="user2",
- vim_password="pass2",
- vim_tenant_name="vimtenant2",
- config=TEST_CONFIG2,
- )
- )
- content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
- print("vim_account edited", TEST_VIM2, content)
-
- myClient["vim"] = TEST_VIM2
-
- except Exception as e:
- logger.error("Error {}".format(e), exc_info=True)
-
- for item in (
- ("vim_account", TEST_VIM1),
- ("vim", TEST_VIM1),
- ("vim_account", TEST_VIM2),
- ("vim", TEST_VIM2),
- ("tenant", TEST_TENANT),
- ):
- try:
- content = loop.run_until_complete(myClient.delete(item[0], item[1]))
- print("{} {} deleted; {}".format(item[0], item[1], content))
- except Exception as e:
- if e.http_code == 404:
- print("{} {} not present or already deleted".format(item[0], item[1]))
- else:
- logger.error("Error {}".format(e), exc_info=True)
-
- loop.close()
eegrpcinittimeout: int = None
eegrpctimeout: int = None
eegrpc_tls_enforce: bool = False
+ eegrpc_pod_admission_policy: str = "baseline"
loglevel: str = "DEBUG"
logfile: str = None
ca_store: str = "/etc/ssl/certs/osm-ca.crt"
+ client_cert_path: str = "/etc/ssl/lcm-client/tls.crt"
+ client_key_path: str = "/etc/ssl/lcm-client/tls.key"
kubectl_osm_namespace: str = "osm"
kubectl_osm_cluster_name: str = "_system-osm-k8s"
helm_ee_service_port: int = 50050
candidate_wims = get_candidate_wims(vims_to_connect)
logger.info("Candidate WIMs: {:s}".format(str(candidate_wims)))
+ # check if there are no WIM candidates
+ if len(candidate_wims) == 0:
+ logger.info("No WIM accounts found")
+ return None, None
+
# check if a desired wim_account_id is specified in vld_params
wim_account_id = vld_params.get("wimAccountId")
if wim_account_id:
main_config = LcmCfg()
- def __init__(self, config_file, loop=None):
+ def __init__(self, config_file):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
# TODO: check if lcm_hc.py is necessary
self.health_check_file = get_health_check_file(self.main_config.to_dict())
- self.loop = loop or asyncio.get_event_loop()
self.ns = (
self.netslice
) = (
# copy message configuration in order to remove 'group_id' for msg_admin
config_message = self.main_config.message.to_dict()
- config_message["loop"] = self.loop
+ config_message["loop"] = asyncio.get_event_loop()
if config_message["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg.connect(config_message)
# try new RO, if fail old RO
try:
self.main_config.RO.uri = ro_uri + "ro"
- ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
+ ro_server = NgRoClient(**self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
self.main_config.RO.ng = True
except Exception:
self.main_config.RO.uri = ro_uri + "openmano"
- ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
+ ro_server = ROClient(**self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
self.main_config.RO.ng = False
if versiontuple(ro_version) < versiontuple(min_RO_version):
"worker_id": self.worker_id,
"version": lcm_version,
},
- self.loop,
)
# time between pings are low when it is not received and at starting
wait_time = (
if not self.pings_not_received:
kafka_has_received = True
self.pings_not_received += 1
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
if self.pings_not_received > 10:
raise LcmException("It is not receiving pings from Kafka bus")
consecutive_errors = 0
"Task kafka_read retrying after Exception {}".format(e)
)
wait_time = 2 if not first_start else 5
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
def kafka_read_callback(self, topic, command, params):
order_id = 1
sys.stdout.flush()
return
elif command == "test":
- asyncio.Task(self.test(params), loop=self.loop)
+ asyncio.Task(self.test(params))
return
if topic == "admin":
topics_admin = ("admin",)
await asyncio.gather(
self.msg.aioread(
- topics, self.loop, self.kafka_read_callback, from_beginning=True
+ topics, self.kafka_read_callback, from_beginning=True
),
self.msg_admin.aioread(
topics_admin,
- self.loop,
self.kafka_read_callback,
group_id=False,
),
"Task kafka_read retrying after Exception {}".format(e)
)
wait_time = 2 if not self.first_start else 5
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
# self.logger.debug("Task kafka_read terminating")
self.logger.debug("Task kafka_read exit")
- def start(self):
+ async def kafka_read_ping(self):
+ await asyncio.gather(self.kafka_read(), self.kafka_ping())
+
+ async def start(self):
# check RO version
- self.loop.run_until_complete(self.check_RO_version())
+ await self.check_RO_version()
- self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
+ self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
# TODO: modify the rest of classes to use the LcmCfg object instead of dicts
self.netslice = netslice.NetsliceLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
- )
- self.vim = vim_sdn.VimLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.wim = vim_sdn.WimLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.sdn = vim_sdn.SdnLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
)
+ self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+ self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+ self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
self.k8scluster = vim_sdn.K8sClusterLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.vca = vim_sdn.VcaLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
)
+ self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
self.k8srepo = vim_sdn.K8sRepoLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
)
- self.loop.run_until_complete(
- asyncio.gather(self.kafka_read(), self.kafka_ping())
- )
+ await self.kafka_read_ping()
# TODO
# self.logger.debug("Terminating cancelling creation tasks")
# timeout = 200
# while self.is_pending_tasks():
# self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
- # await asyncio.sleep(2, loop=self.loop)
+ # await asyncio.sleep(2)
# timeout -= 2
# if not timeout:
# self.lcm_tasks.cancel("ALL", "ALL")
- self.loop.close()
- self.loop = None
if self.db:
self.db.db_disconnect()
if self.msg:
)
exit(1)
lcm = Lcm(config_file)
- lcm.start()
+ asyncio.run(lcm.start())
except (LcmException, getopt.GetoptError) as e:
print(str(e), file=sys.stderr)
# usage()
def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
def wrapper(func):
- retry_exceptions = ConnectionRefusedError
+ retry_exceptions = (ConnectionRefusedError, TimeoutError)
@functools.wraps(func)
async def wrapped(*args, **kwargs):
def create_secure_context(
- trusted: str,
+ trusted: str, client_cert_path: str, client_key_path: str
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
- # TODO: client TLS
- # ctx.load_cert_chain(str(client_cert), str(client_key))
+ ctx.load_cert_chain(client_cert_path, client_key_path)
ctx.load_verify_locations(trusted)
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
ctx.set_alpn_protocols(["h2"])
- try:
- ctx.set_npn_protocols(["h2"])
- except NotImplementedError:
- pass
return ctx
def __init__(
self,
log: object = None,
- loop: object = None,
vca_config: VcaConfig = None,
on_update_db=None,
):
# parent class constructor
N2VCConnector.__init__(
- self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
+ self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
)
self.vca_config = vca_config
and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
"""
+ if not namespace:
+ namespace = self.vca_config.kubectl_osm_namespace
+
self.log.info(
"create_execution_environment: namespace: {}, artifact_path: {}, "
"chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
system_cluster_uuid,
kdu_model=kdu_model,
kdu_instance=helm_id,
- namespace=self.vca_config.kubectl_osm_namespace,
+ namespace=namespace,
params=config,
db_dict=db_dict,
timeout=progress_timeout,
system_cluster_uuid,
kdu_model=kdu_model,
kdu_instance=helm_id,
- namespace=self.vca_config.kubectl_osm_namespace,
+ namespace=namespace,
params=config,
db_dict=db_dict,
timeout=progress_timeout,
)
- ee_id = "{}:{}.{}".format(
- vca_type, self.vca_config.kubectl_osm_namespace, helm_id
- )
+ ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id)
return ee_id, None
except N2VCException:
raise
certificate_name=certificate_name,
)
+ async def setup_ns_namespace(
+ self,
+ name: str,
+ ):
+ # Obtain system cluster id from database
+ system_cluster_uuid = await self._get_system_cluster_id()
+ await self._k8sclusterhelm3.create_namespace(
+ namespace=name,
+ cluster_uuid=system_cluster_uuid,
+ labels={
+ "pod-security.kubernetes.io/enforce": self.vca_config.eegrpc_pod_admission_policy
+ },
+ )
+ await self._k8sclusterhelm3.setup_default_rbac(
+ name="ee-role",
+ namespace=name,
+ api_groups=[""],
+ resources=["secrets"],
+ verbs=["get"],
+ service_account="default",
+ cluster_uuid=system_cluster_uuid,
+ )
+ await self._k8sclusterhelm3.copy_secret_data(
+ src_secret="osm-ca",
+ dst_secret="osm-ca",
+ src_namespace=self.vca_config.kubectl_osm_namespace,
+ dst_namespace=name,
+ cluster_uuid=system_cluster_uuid,
+ data_key="ca.crt",
+ )
+
async def register_execution_environment(
self,
namespace: str,
async def delete_namespace(
self, namespace: str, db_dict: dict = None, total_timeout: float = None
):
- # method not implemented for this connector, execution environments must be deleted individually
- pass
+ # Obtain system cluster id from database
+ system_cluster_uuid = await self._get_system_cluster_id()
+ await self._k8sclusterhelm3.delete_namespace(
+ namespace=namespace,
+ cluster_uuid=system_cluster_uuid,
+ )
async def install_k8s_proxy_charm(
self,
else:
return "ERROR", "No result received"
- ssl_context = create_secure_context(self.vca_config.ca_store)
+ ssl_context = create_secure_context(
+ self.vca_config.ca_store,
+ self.vca_config.client_cert_path,
+ self.vca_config.client_key_path,
+ )
channel = Channel(
ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
)
return version, namespace, helm_id
+def vld_to_ro_ip_profile(source_data):
+ if source_data:
+ return {
+ "ip_version": "IPv4"
+ if "v4" in source_data.get("ip-version", "ipv4")
+ else "IPv6",
+ "subnet_address": source_data.get("cidr")
+ or source_data.get("subnet-address"),
+ "gateway_address": source_data.get("gateway-ip")
+ or source_data.get("gateway-address"),
+ "dns_address": ";".join(
+ [v["address"] for v in source_data["dns-server"] if v.get("address")]
+ )
+ if source_data.get("dns-server")
+ else None,
+ "dhcp_enabled": source_data.get("dhcp-params", {}).get("enabled", False)
+ or source_data.get("dhcp-enabled", False),
+ "dhcp_start_address": source_data["dhcp-params"].get("start-address")
+ if source_data.get("dhcp-params")
+ else None,
+ "dhcp_count": source_data["dhcp-params"].get("count")
+ if source_data.get("dhcp-params")
+ else None,
+ "ipv6_address_mode": source_data["ipv6-address-mode"]
+ if "ipv6-address-mode" in source_data
+ else None,
+ }
+
+
class LcmBase:
def __init__(self, msg, logger):
"""
class NetsliceLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop, ns):
+ def __init__(self, msg, lcm_tasks, config, ns):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
# logging
self.logger = logging.getLogger("lcm.netslice")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ns = ns
self.ro_config = config["RO"]
db_nsilcmop_update = {}
nsilcmop_operation_state = None
vim_2_RO = {}
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
nsi_vld_instantiationi_params = {}
def ip_profile_2_RO(ip_profile):
break
# TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300)
- await asyncio.sleep(5, loop=self.loop)
+ await asyncio.sleep(5)
else: # timeout_nsi_deploy reached:
raise LcmException("Timeout waiting nsi to be ready.")
db_nsilcmop = None
db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
db_nsilcmop_update = {}
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
nsir_deployed = None
failed_detail = [] # annotates all failed error messages
nsilcmop_operation_state = None
)
break
- await asyncio.sleep(5, loop=self.loop)
+ await asyncio.sleep(5)
termination_timeout -= 5
if termination_timeout <= 0:
"operationState": nsilcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
timeout_large = 120
timeout_short = 30
- def __init__(self, loop, uri, **kwargs):
- self.loop = loop
+ def __init__(self, uri, **kwargs):
self.endpoint_url = uri
if not self.endpoint_url.endswith("/"):
self.endpoint_url += "/"
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/migrate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
url = "{}/ns/v1/{operation_type}/{nsr_id}".format(
self.endpoint_url, operation_type=operation_type, nsr_id=nsr_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
url = "{}/ns/v1/deploy/{nsr_id}/{action_id}".format(
self.endpoint_url, nsr_id=nsr_id, action_id=action_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("GET %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
async with session.get(url, headers=self.headers_req) as response:
async def delete(self, nsr_id):
try:
url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("DELETE %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
async with session.delete(url, headers=self.headers_req) as response:
"""
try:
response_text = ""
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
url = "{}/version".format(self.endpoint_url)
self.logger.debug("RO GET %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/recreate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
async with session.post(
url, headers=self.headers_req, data=payload_req
url = "{}/ns/v1/recreate/{nsr_id}/{action_id}".format(
self.endpoint_url, nsr_id=nsr_id, action_id=action_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("GET %s", url)
async with session.get(url, headers=self.headers_req) as response:
response_text = await response.read()
url = "{}/ns/v1/verticalscale/{nsr_id}".format(
self.endpoint_url, nsr_id=nsr_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
async with session.post(
url, headers=self.headers_req, data=payload_req
check_juju_bundle_existence,
get_charm_artifact_path,
get_ee_id_parts,
+ vld_to_ro_ip_profile,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
from time import time
from uuid import uuid4
-from random import randint
+from random import SystemRandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
+ EE_TLS_NAME = "ee-tls"
task_name_deploy_vca = "Deploying VCA"
-
- def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+
+ def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.timeout = config.timeout
self.ro_config = config.RO
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
- loop=self.loop,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
kubectl_command=self.vca_config.kubectlpath,
juju_command=self.vca_config.jujupath,
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
+ self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
- target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
- "ip-profile"
- ]
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
+ vld_params["ip-profile"]
+ )
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("shared-volumes"):
+ target["shared-volumes"] = deepcopy(db_nsr["shared-volumes"])
+ for shared_volumes in target["shared-volumes"]:
+ shared_volumes["vim_info"] = {}
if db_nsr.get("affinity-or-anti-affinity-group"):
target["affinity-or-anti-affinity-group"] = deepcopy(
db_nsr["affinity-or-anti-affinity-group"]
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
- "gateway-ip"
- ]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data[
- "gateway-address"
- ] = ip_profile_source_data["gateway-ip"]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
-
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
if target_vim not in ns_ags["vim_info"]:
ns_ags["vim_info"][target_vim] = {}
+ # shared-volumes
+ if vdur.get("shared-volumes-id"):
+ for sv_id in vdur["shared-volumes-id"]:
+ ns_sv = find_in_list(
+ target["shared-volumes"], lambda sv: sv_id in sv["id"]
+ )
+ if ns_sv:
+ ns_sv["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
if vnf_params:
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
+ vdur["additionalParams"]["OSM"][
+ "vim_flavor_id"
+ ] = vdu_instantiation_params.get("vim-flavor-id")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"target KDU={} is in error state".format(kdu_name)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
ee_id, credentials = await self.vca_map[
vca_type
].create_execution_environment(
- namespace=namespace,
+ namespace=nsr_id,
reuse_ee_id=ee_id,
db_dict=db_dict,
config=osm_config,
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
- await self.msg.aiowrite(
- "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
- )
+ await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
+ def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get("aspect-delta-details", {}).get(
+ "deltas", ()
+ ):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ self.logger.error(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
+ )
+ continue
+ for scaling_policy in scaling_aspect.get("scaling-policy", ()):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get("threshold-time", "1")
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ metric_name = f"osm_{metric_name}"
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get(
+ "scale-in-threshold"
+ )
+ scaleout_threshold = scaling_criteria.get(
+ "scale-out-threshold"
+ )
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item for item in vdu_profile if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get(
+ "min-number-of-instances", 1
+ )
+ instances_max_number = profile.get(
+ "max-number-of-instances", 1
+ )
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria[
+ "scale-in-relational-operation"
+ ]
+ rel_operator = self.rel_operation_types.get(
+ operation, "<="
+ )
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria[
+ "scale-out-relational-operation"
+ ]
+ rel_operator = self.rel_operation_types.get(
+ operation, "<="
+ )
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_alarm_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+
+ # Checking for VNF alarm configuration
+ for vdur in vnfr["vdur"]:
+ vdu_id = vdur["vdu-id-ref"]
+ vdu = next(filter(lambda vdu: vdu["id"] == vdu_id, vnfd["vdu"]))
+ if "alarm" in vdu:
+ # Get VDU monitoring params, since alerts are based on them
+ vdu_monitoring_params = {}
+ for mp in vdu.get("monitoring-parameter", []):
+ vdu_monitoring_params[mp.get("id")] = mp
+ if not vdu_monitoring_params:
+ self.logger.error(
+ "VDU alarm refers to a VDU monitoring param, but there are no VDU monitoring params in the VDU"
+ )
+ continue
+ # Get alarms in the VDU
+ alarm_descriptors = vdu["alarm"]
+ # Create VDU alarms for each alarm in the VDU
+ for alarm_descriptor in alarm_descriptors:
+ # Check that the VDU alarm refers to a proper monitoring param
+ alarm_monitoring_param = alarm_descriptor.get(
+ "vnf-monitoring-param-ref", ""
+ )
+ vdu_specific_monitoring_param = vdu_monitoring_params.get(
+ alarm_monitoring_param, {}
+ )
+ if not vdu_specific_monitoring_param:
+ self.logger.error(
+ "VDU alarm refers to a VDU monitoring param not present in the VDU"
+ )
+ continue
+ metric_name = vdu_specific_monitoring_param.get(
+ "performance-metric"
+ )
+ if not metric_name:
+ self.logger.error(
+ "VDU alarm refers to a VDU monitoring param that has no associated performance-metric"
+ )
+ continue
+ # Set params of the alarm to be created in Prometheus
+ metric_name = f"osm_{metric_name}"
+ metric_threshold = alarm_descriptor.get("value")
+ uuid = str(uuid4())
+ alert_name = f"vdu_alarm_{uuid}"
+ operation = alarm_descriptor["operation"]
+ rel_operator = self.rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"{metric_selector} {rel_operator} {metric_threshold}"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ "vdu_name": "{{ $labels.vdu_name }}",
+ }
+ prom_cfg = {
+ "alert": alert_name,
+ "expr": expression,
+ "for": "1m", # default value. Ideally, this should be related to an IM param, but there is not such param
+ "labels": labels,
+ }
+ alarm_action = dict()
+ for action_type in ["ok", "insufficient-data", "alarm"]:
+ if (
+ "actions" in alarm_descriptor
+ and action_type in alarm_descriptor["actions"]
+ ):
+ alarm_action[action_type] = alarm_descriptor["actions"][
+ action_type
+ ]
+ alert = {
+ "uuid": uuid,
+ "name": alert_name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "vdu_alarm",
+ "action": alarm_action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
# create namespace and certificate if any helm based EE is present in the NS
if check_helm_ee_in_ns(db_vnfds):
- # TODO: create EE namespace
+ await self.vca_map["helm-v3"].setup_ns_namespace(
+ name=nsr_id,
+ )
# create TLS certificates
await self.vca_map["helm-v3"].create_tls_certificate(
- secret_name="ee-tls-{}".format(nsr_id),
+ secret_name=self.EE_TLS_NAME,
dns_prefix="*",
nsr_id=nsr_id,
usage="server auth",
+ namespace=nsr_id,
)
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
stage=stage,
)
+ # Check if each vnf has exporter for metric collection if so update prometheus job records
+ if "exporters-endpoints" in vnfd.get("df")[0]:
+ exporter_config = vnfd.get("df")[0].get("exporters-endpoints")
+ self.logger.debug("exporter config :{}".format(exporter_config))
+ artifact_path = "{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "exporter-endpoint",
+ )
+ ee_id = None
+ ee_config_descriptor = exporter_config
+ vnfr_id = db_vnfr["id"]
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id=None,
+ vdu_index=None,
+ user=None,
+ pub_key=None,
+ )
+ self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip))
+ self.logger.debug("Artifact_path:{}".format(artifact_path))
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ vdu_id_for_prom = None
+ vdu_index_for_prom = None
+ for x in get_iterable(db_vnfr, "vdur"):
+ vdu_id_for_prom = x.get("vdu-id-ref")
+ vdu_index_for_prom = x.get("count-index")
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ element_type="VDU",
+ vdu_id=vdu_id_for_prom,
+ vdu_index=vdu_index_for_prom,
+ )
+
+ self.logger.debug("Prometheus job:{}".format(prometheus_jobs))
+ if prometheus_jobs:
+ db_nsr_update["_admin.deployed.prometheus_jobs"] = prometheus_jobs
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ db_nsr_update,
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
if descriptor_config and descriptor_config.get("juju"):
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
-
+ # Gather auto-healing and auto-scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ vnfd = next(
+ (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
+ )
+ healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
+ for alert in healing_alerts:
+ self.logger.info(f"Storing healing alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
+ for alert in scaling_alerts:
+ self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ alarm_alerts = self._gather_vnfr_alarm_alerts(vnfr, vnfd)
+ for alert in alarm_alerts:
+ self.logger.info(f"Storing VNF alarm alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
# TODO vdu_index_count
for vca in vca_deployed_list:
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
- return vca["ee_id"]
+ return vca.get("ee_id")
async def destroy_N2VC(
self,
# Delete Namespace and Certificates if necessary
if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())):
await self.vca_map["helm-v3"].delete_tls_certificate(
- certificate_name=db_nslcmop["nsInstanceId"],
+ namespace=db_nslcmop["nsInstanceId"],
+ certificate_name=self.EE_TLS_NAME,
+ )
+ await self.vca_map["helm-v3"].delete_namespace(
+ namespace=db_nslcmop["nsInstanceId"],
)
- # TODO: Delete namespace
# Delete from k8scluster
stage[1] = "Deleting KDUs."
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
+ self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
+ self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
)
)
# wait and retry
- await asyncio.sleep(retries_interval, loop=self.loop)
+ await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
+ if kdu_model.count("/") < 2: # helm chart is not embedded
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
if desc_params.get("kdu_atomic_upgrade"):
atomic_upgrade = desc_params.get(
"kdu_atomic_upgrade"
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
and member_vnf_index
):
msg.update({"vnf_member_index": member_vnf_index})
- await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
if not job_file:
return
+ self.logger.debug("Artifact path{}".format(artifact_path))
+ self.logger.debug("job file{}".format(job_file))
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
kdur_name = kdur.get("name")
break
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
else:
if vdu_id and vdu_index is not None:
raise LcmException(
)
# TODO get_service
- _, _, service = ee_id.partition(".") # remove prefix "namespace."
- host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
- host_port = "80"
- vnfr_id = vnfr_id.replace("-", "")
- variables = {
- "JOB_NAME": vnfr_id,
- "TARGET_IP": target_ip,
- "EXPORTER_POD_IP": host_name,
- "EXPORTER_POD_PORT": host_port,
- "NSR_ID": nsr_id,
- "VNF_MEMBER_INDEX": vnf_member_index,
- "VDUR_NAME": vdur_name,
- "KDUR_NAME": kdur_name,
- "ELEMENT_TYPE": element_type,
- }
+ if ee_id is not None:
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ "NSR_ID": nsr_id,
+ "VNF_MEMBER_INDEX": vnf_member_index,
+ "VDUR_NAME": vdur_name,
+ "KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
+ }
+ else:
+ metric_path = ee_config_descriptor["metric-path"]
+ target_port = ee_config_descriptor["metric-port"]
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "TARGET_PORT": target_port,
+ "METRIC_PATH": metric_path,
+ }
+
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
not isinstance(job.get("job_name"), str)
or vnfr_id not in job["job_name"]
):
- job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000))
job["nsr_id"] = nsr_id
job["vnfr_id"] = vnfr_id
return job_list
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
for target_vdu in target_vdu_list:
deploy_params_vdu = target_vdu
# Set run-day1 vnf level value if not vdu level value exists
- if not deploy_params_vdu.get("run-day1") and target_vnf[
- "additionalParams"
- ].get("run-day1"):
+ if not deploy_params_vdu.get("run-day1") and target_vnf.get(
+ "additionalParams", {}
+ ).get("run-day1"):
deploy_params_vdu["run-day1"] = target_vnf[
"additionalParams"
].get("run-day1")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
lcm_helm_conn.K8sHelm3Connector
)
vca_config = VcaConfig(vca_config)
- self.helm_conn = LCMHelmConn(
- loop=self.loop, vca_config=vca_config, log=self.logger
- )
+ self.helm_conn = LCMHelmConn(vca_config=vca_config, log=self.logger)
@asynctest.fail_on(active_handles=True)
async def test_create_execution_environment(self):
)
self.assertEqual(
ee_id,
- "{}:{}.{}".format("helm-v3", "osm", helm_chart_id),
- "Check ee_id format: <helm-version>:<default namespace>.<helm_chart-id>",
+ "{}:{}.{}".format("helm-v3", namespace, helm_chart_id),
+ "Check ee_id format: <helm-version>:<NS ID>.<helm_chart-id>",
)
self.helm_conn._k8sclusterhelm3.install.assert_called_once_with(
"myk8s_id",
kdu_model="/helm_sample_charm",
kdu_instance=helm_chart_id,
- namespace="osm",
+ namespace=namespace,
db_dict=db_dict,
params=None,
timeout=None,
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
-from osm_lcm.lcm_utils import LcmBase, LcmException
+from osm_lcm.lcm_utils import LcmBase, LcmException, vld_to_ro_ip_profile
from osm_lcm.tests import test_db_descriptors as descriptors
tmpdir = tempfile.mkdtemp()[1]
with self.assertRaises(FileNotFoundError):
LcmBase.compare_charmdir_hash(charm_dir1, charm_dir2)
self.assertEqual(mock_checksum.dirhash.call_count, 1)
+
+
+class TestLcmUtils(TestCase):
+ def setUp(self):
+ pass
+
+ def test__vld_to_ro_ip_profile_with_none(self):
+ vld_data = None
+
+ result = vld_to_ro_ip_profile(
+ source_data=vld_data,
+ )
+
+ self.assertIsNone(result)
+
+ def test__vld_to_ro_ip_profile_with_empty_profile(self):
+ vld_data = {}
+
+ result = vld_to_ro_ip_profile(
+ source_data=vld_data,
+ )
+
+ self.assertIsNone(result)
+
+ def test__vld_to_ro_ip_profile_with_wrong_profile(self):
+ vld_data = {
+ "no-profile": "here",
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": None,
+ "gateway_address": None,
+ "dns_address": None,
+ "dhcp_enabled": False,
+ "dhcp_start_address": None,
+ "dhcp_count": None,
+ "ipv6_address_mode": None,
+ }
+
+ result = vld_to_ro_ip_profile(
+ source_data=vld_data,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__vld_to_ro_ip_profile_with_ipv4_profile(self):
+ vld_data = {
+ "ip-version": "ipv4",
+ "cidr": "192.168.0.0/24",
+ "gateway-ip": "192.168.0.254",
+ "dhcp-enabled": True,
+ "dns-server": [{"address": "8.8.8.8"}],
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": "192.168.0.0/24",
+ "gateway_address": "192.168.0.254",
+ "dns_address": "8.8.8.8",
+ "dhcp_enabled": True,
+ "dhcp_start_address": None,
+ "dhcp_count": None,
+ "ipv6_address_mode": None,
+ }
+
+ result = vld_to_ro_ip_profile(
+ source_data=vld_data,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__vld_to_ro_ip_profile_with_ipv6_profile(self):
+ vld_data = {
+ "ip-version": "ipv6",
+ "cidr": "2001:0200:0001::/48",
+ "gateway-ip": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
+ "dhcp-enabled": True,
+ }
+ expected_result = {
+ "ip_version": "IPv6",
+ "subnet_address": "2001:0200:0001::/48",
+ "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
+ "dns_address": None,
+ "dhcp_enabled": True,
+ "dhcp_start_address": None,
+ "dhcp_count": None,
+ "ipv6_address_mode": None,
+ }
+
+ result = vld_to_ro_ip_profile(
+ source_data=vld_data,
+ )
+
+ self.assertDictEqual(expected_result, result)
("active", "Ready!"),
):
# call callback after some time
- asyncio.sleep(5, loop=self.loop)
+ asyncio.sleep(5)
callback(model_name, application_name, status, message, *callback_args)
@staticmethod
ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn)
def create_nslcm_class(self):
- self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config, self.loop)
+ self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config)
self.my_ns.fs = self.fs
self.my_ns.db = self.db
self.my_ns._wait_dependent_n2vc = asynctest.CoroutineMock()
def mock_ro(self):
if not getenv("OSMLCMTEST_RO_NOMOCK"):
- self.my_ns.RO = asynctest.Mock(
- NgRoClient(self.loop, **lcm_config.RO.to_dict())
- )
+ self.my_ns.RO = asynctest.Mock(NgRoClient(**lcm_config.RO.to_dict()))
# TODO first time should be empty list, following should return a dict
# self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[])
self.my_ns.RO.deploy = asynctest.CoroutineMock(
# "description": "done"}})
self.my_ns.RO.delete = asynctest.CoroutineMock(self.my_ns.RO.delete)
- # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
- # async def test_instantiate(self):
- # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
- # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
- # # print("Test instantiate started")
-
- # # delete deployed information of database
- # if not getenv("OSMLCMTEST_DB_NOMOCK"):
- # if self.db.get_list("nsrs")[0]["_admin"].get("deployed"):
- # del self.db.get_list("nsrs")[0]["_admin"]["deployed"]
- # for db_vnfr in self.db.get_list("vnfrs"):
- # db_vnfr.pop("ip_address", None)
- # for db_vdur in db_vnfr["vdur"]:
- # db_vdur.pop("ip_address", None)
- # db_vdur.pop("mac_address", None)
- # if getenv("OSMLCMTEST_RO_VIMID"):
- # self.db.get_list("vim_accounts")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
- # if getenv("OSMLCMTEST_RO_VIMID"):
- # self.db.get_list("nsrs")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
-
- # await self.my_ns.instantiate(nsr_id, nslcmop_id)
-
- # self.msg.aiowrite.assert_called_once_with("ns", "instantiated",
- # {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
- # "operationState": "COMPLETED"},
- # loop=self.loop)
- # self.lcm_tasks.lock_HA.assert_called_once_with('ns', 'nslcmops', nslcmop_id)
- # if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
- # self.assertTrue(self.my_ns.logger.debug.called, "Debug method not called")
- # self.my_ns.logger.error.assert_not_called()
- # self.my_ns.logger.exception().assert_not_called()
-
- # if not getenv("OSMLCMTEST_DB_NOMOCK"):
- # self.assertTrue(self.db.set_one.called, "db.set_one not called")
- # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
- # self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
- # for vnfr in db_vnfrs_list:
- # self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
-
- # if not getenv("OSMLCMTEST_VCA_NOMOCK"):
- # # check intial-primitives called
- # self.assertTrue(self.my_ns.n2vc.exec_primitive.called,
- # "Exec primitive not called for initial config primitive")
- # for _call in self.my_ns.n2vc.exec_primitive.call_args_list:
- # self.assertIn(_call[1]["primitive_name"], ("config", "touch"),
- # "called exec primitive with a primitive different than config or touch")
-
- # # TODO add more checks of called methods
- # # TODO add a terminate
-
- # async def test_instantiate_ee_list(self):
- # # Using modern IM where configuration is in the new format of execution_environment_list
- # ee_descriptor_id = "charm_simple"
- # non_used_initial_primitive = {
- # "name": "not_to_be_called",
- # "seq": 3,
- # "execution-environment-ref": "not_used_ee"
- # }
- # ee_list = [
- # {
- # "id": ee_descriptor_id,
- # "juju": {"charm": "simple"},
-
- # },
- # ]
-
- # self.db.set_one(
- # "vnfds",
- # q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"},
- # update_dict={"vnf-configuration.0.execution-environment-list": ee_list,
- # "vnf-configuration.0.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.initial-config-primitive.2": non_used_initial_primitive,
- # "vnf-configuration.0.config-primitive.0.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.config-primitive.0.execution-environment-primitive": "touch_charm",
- # },
- # unset={"vnf-configuration.juju": None})
- # await self.test_instantiate()
- # # this will check that the initial-congig-primitive 'not_to_be_called' is not called
-
class TestMyNS(TestBaseNS):
@asynctest.fail_on(active_handles=True)
self.msg = Mock(msgbase.MsgBase())
self.lcm_tasks = Mock()
self.config = {"database": {"driver": "mongo"}}
- self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config)
self.vca_lcm.db = Mock()
self.vca_lcm.fs = Mock()
"kubectlpath": "/usr/bin/kubectl",
}
}
- self.k8scluster_lcm = K8sClusterLcm(
- self.msg, self.lcm_tasks, self.vca_config, self.loop
- )
+ self.k8scluster_lcm = K8sClusterLcm(self.msg, self.lcm_tasks, self.vca_config)
self.k8scluster_lcm.db = Mock()
self.k8scluster_lcm.fs = Mock()
),
}
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_vim_update["_admin.deployed.RO"] = None
db_vim_update["_admin.detailed-status"] = step
self.update_db_2("vim_accounts", vim_id, db_vim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
step = "Editing vim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
and db_vim["_admin"]["deployed"].get("RO")
):
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching vim from RO tenant"
try:
await RO.detach("vim_account", RO_vim_id)
# values that are encrypted at wim config because they are passwords
wim_config_encrypted = ()
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
step = "Creating wim at RO"
db_wim_update["_admin.detailed-status"] = step
self.update_db_2("wim_accounts", wim_id, db_wim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
step = "Editing wim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
and db_wim["_admin"]["deployed"].get("RO")
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching wim from RO tenant"
try:
await RO.detach("wim_account", RO_wim_id)
class SdnLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.sdn")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_sdn_update["_admin.detailed-status"] = step
self.update_db_2("sdns", sdn_id, db_sdn_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
sdn_RO.pop("_admin", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Editing sdn at RO"
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Deleting sdn from RO"
try:
await RO.delete("sdn", RO_sdn_id)
class K8sClusterLcm(LcmBase):
timeout_create = 300
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8scluster")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]
kubectl_command=self.vca_config.get("kubectlpath"),
juju_command=self.vca_config.get("jujupath"),
log=self.logger,
- loop=self.loop,
on_update_db=None,
db=self.db,
fs=self.fs,
db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED"
if k8s_h3c_id:
- step = "Removing helm-chart-v3 '{}'".format(k8s_hc_id)
+ step = "Removing helm-chart-v3 '{}'".format(k8s_h3c_id)
uninstall_sw = (
deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "created"))
or False
class VcaLcm(LcmBase):
timeout_create = 30
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vca")
- self.loop = loop
self.lcm_tasks = lcm_tasks
super().__init__(msg, self.logger)
# create N2VC connector
- self.n2vc = N2VCJujuConnector(
- log=self.logger, loop=self.loop, fs=self.fs, db=self.db
- )
+ self.n2vc = N2VCJujuConnector(log=self.logger, fs=self.fs, db=self.db)
def _get_vca_by_id(self, vca_id: str) -> dict:
db_vca = self.db.get_one("vca", {"_id": vca_id})
class K8sRepoLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8srepo")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# google-auth
-certifi==2022.12.7
+certifi==2023.5.7
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# kubernetes
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# cryptography
# pynacl
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# requests
-cryptography==39.0.1
+cryptography==40.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# paramiko
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
-google-auth==2.16.1
+dnspython==2.3.0
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
+ # pymongo
+google-auth==2.17.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# kubernetes
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# juju
-motor==1.3.1
+ # theblues
+motor==3.1.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# requests-oauthlib
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@paas
# via -r requirements-dev.in
-packaging==23.0
+packaging==23.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# aiokafka
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# macaroonbakery
# temporalio
-pyasn1==0.4.8
+pyasn1==0.5.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# juju
# pyasn1-modules
# rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# google-auth
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# macaroonbakery
-pymongo==3.13.0
+pymongo==4.3.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# kubernetes
# temporalio
-pytz==2022.7.1
+pytz==2023.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# pyrfc3339
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# juju
# kubernetes
-requests==2.28.2
+requests==2.30.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# hvac
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# juju
-urllib3==1.26.14
+urllib3==2.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# kubernetes
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# kubernetes
-websockets==8.1
+websockets==11.0.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=paas
# juju
#######################################################################################
asynctest==0.13.0
# via -r requirements-test.in
-coverage==7.2.3
+coverage==7.2.5
# via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
# via -r requirements-test.in
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
parameterized==0.9.0
# via -r requirements-test.in
jinja2
pyyaml==5.4.1
pydantic
+protobuf==3.20.3
temporalio
# via
# -r requirements.in
# aiohttp
-attrs==22.2.0
+attrs==23.1.0
# via
# aiohttp
# glom
# via
# face
# glom
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via aiohttp
checksumdir==1.2.0
# via -r requirements.in
# via
# aiohttp
# aiosignal
-glom==23.1.1
+glom==23.3.0
# via config-man
-grpcio==1.51.3
+grpcio==1.54.2
# via grpcio-tools
grpcio-tools==1.48.1
# via -r requirements.in
-grpclib==0.4.3
+grpclib==0.4.4
# via -r requirements.in
h2==4.1.0
# via grpclib
# yarl
protobuf==3.20.3
# via
+ # -r requirements.in
# grpcio-tools
# temporalio
-pydantic==1.10.5
+pydantic==1.10.7
# via -r requirements.in
python-dateutil==2.8.2
# via temporalio
# via
# pydantic
# temporalio
-yarl==1.8.2
+yarl==1.9.2
# via aiohttp
# The following packages are considered to be unsafe in a requirements file:
[testenv]
usedevelop = True
-basepython = python3.8
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
-r{toxinidir}/requirements-test.txt
pylint
commands =
- pylint -E osm_lcm --extension-pkg-whitelist=pydantic --disable=E1101 # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961)
+ pylint -E osm_lcm --extension-pkg-allow-list=pydantic --disable=E1101 # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961)
#######################################################################################
[testenv:pip-compile]
deps = pip-tools==6.6.2
skip_install = true
-whitelist_externals = bash
+allowlist_externals = bash
[
commands =
- bash -c "for file in requirements*.in ; do \
python3 setup.py --command-packages=stdeb.command sdist_dsc
sh -c 'cd deb_dist/osm-lcm*/ && dpkg-buildpackage -rfakeroot -uc -us'
sh -c 'rm osm_lcm/requirements.txt'
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
[flake8]