self.n2vc = N2VCJujuConnector(
log=self.logger,
loop=self.loop,
- url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
- username=self.vca_config.get('user', None),
- vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
loop=self.loop,
- url=None,
- username=None,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db
)
log=self.logger,
loop=self.loop,
on_update_db=self._on_update_k8s_db,
- vca_config=self.vca_config,
fs=self.fs,
db=self.db
)
except Exception as e:
self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
- async def _on_update_n2vc_db(self, table, filter, path, updated_data):
+ async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
# remove last dot from path (if exists)
if path.endswith('.'):
current_ns_status = nsr.get('nsState')
# get vca status for NS
- status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
+ status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id)
# vcaStatus
db_dict = dict()
db_dict['vcaStatus'] = status_dict
- await self.n2vc.update_vca_status(db_dict['vcaStatus'])
+ await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id)
# update configurationStatus for this VCA
try:
except Exception as e:
self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
- async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
+ async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None):
"""
Updating vca status in NSR record
:param cluster_uuid: UUID of a k8s cluster
nsr_id = filter.get('_id')
# get vca status for NS
- vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
- kdu_instance,
- complete_status=True,
- yaml_format=False)
+ vca_status = await self.k8sclusterjuju.status_kdu(
+ cluster_uuid,
+ kdu_instance,
+ complete_status=True,
+ yaml_format=False,
+ vca_id=vca_id,
+ )
# vcaStatus
db_dict = dict()
db_dict['vcaStatus'] = {nsr_id: vca_status}
- await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance)
+ await self.k8sclusterjuju.update_vca_status(
+ db_dict['vcaStatus'],
+ kdu_instance,
+ vca_id=vca_id,
+ )
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
raise LcmException("Configuration aborted because dependent charm/s timeout")
+ def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
+ return (
+ deep_get(db_vnfr, ("vca-id",)) or
+ deep_get(db_nsr, ("instantiate_params", "vcaId"))
+ )
+
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
ee_config_descriptor):
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
- vim_account_id = (
- deep_get(db_vnfr, ("vim-account-id",)) or
- deep_get(deploy_params, ("OSM", "vim_account_id"))
- )
- vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
- vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
namespace=namespace,
artifact_path=artifact_path,
db_dict=db_dict,
- cloud_name=vca_k8s_cloud,
- credential_name=vca_k8s_cloud_credential,
+ vca_id=vca_id,
)
elif vca_type == "helm" or vca_type == "helm-v3":
ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
- cloud_name=vca_cloud,
- credential_name=vca_cloud_credential,
+ vca_id=vca_id,
)
elif vca_type == "native_charm":
credentials=credentials,
namespace=namespace,
db_dict=db_dict,
- cloud_name=vca_cloud,
- credential_name=vca_cloud_credential,
+ vca_id=vca_id,
)
# for compatibility with MON/POL modules, the need model and application name at database
db_dict=db_dict,
config=config,
num_units=num_units,
+ vca_id=vca_id,
)
# write in db flag of configuration_sw already installed
# add relations for this VCA (wait for other peers related with this VCA)
await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
- vca_index=vca_index, vca_type=vca_type)
+ vca_index=vca_index, vca_id=vca_id, vca_type=vca_type)
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
# Needed to inject a ssh key
user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
step = "Install configuration Software, getting public ssh key"
- pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+ ee_id=ee_id,
+ db_dict=db_dict,
+ vca_id=vca_id
+ )
step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
else:
ee_id=ee_id,
primitive_name=initial_config_primitive["name"],
params_dict=primitive_params_,
- db_dict=db_dict
+ db_dict=db_dict,
+ vca_id=vca_id,
)
# Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
if check_if_terminated_needed:
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
- async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
- timeout: int = 3600, vca_type: str = None) -> bool:
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_index: int,
+ timeout: int = 3600,
+ vca_type: str = None,
+ vca_id: str = None,
+ ) -> bool:
# steps:
# 1. find all relations for this VCA
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
- endpoint_2=to_vca_endpoint)
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
# remove entry from relations list
ns_relations.remove(r)
else:
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
- endpoint_2=to_vca_endpoint)
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
# remove entry from relations list
vnf_relations.remove(r)
else:
return False
async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
- vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600,
+ vca_id: str = None):
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
kdu_name=k8s_instance_info["kdu-name"],
namespace=k8s_instance_info["namespace"],
kdu_instance=kdu_instance,
+ vca_id=vca_id,
)
self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_instance=kdu_instance,
primitive_name=initial_config_primitive["name"],
- params=primitive_params_, db_dict=db_dict_install),
- timeout=timeout)
+ params=primitive_params_, db_dict=db_dict_install,
+ vca_id=vca_id,
+ ),
+ timeout=timeout
+ )
except Exception as e:
# Prepare update db with error and raise exception
updated_v3_cluster_list = []
for vnfr_data in db_vnfrs.values():
+ vca_id = self.get_vca_id(vnfr_data, {})
for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
# Step 0: Prepare and set parameters
desc_params = parse_yaml_strings(kdur.get("additionalParams"))
vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
task = asyncio.ensure_future(
self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
- k8s_instance_info, k8params=desc_params, timeout=600))
+ k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
return vca["ee_id"]
- async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
- vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
+ async def destroy_N2VC(
+ self,
+ logging_text,
+ db_nslcmop,
+ vca_deployed,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=True,
+ scaling_in=False,
+ vca_id: str = None,
+ ):
"""
Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
:param logging_text:
mapped_primitive_params)
# Sub-operations: Call _ns_execute_primitive() instead of action()
try:
- result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
- mapped_primitive_params,
- vca_type=vca_type)
+ result, result_detail = await self._ns_execute_primitive(
+ vca_deployed["ee_id"], primitive,
+ mapped_primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
except LcmException:
# this happens when VCA is not deployed. In this case it is not needed to terminate
continue
await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
if destroy_ee:
- await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
+ await self.vca_map[vca_type].delete_execution_environment(
+ vca_deployed["ee_id"],
+ scaling_in=scaling_in,
+ vca_id=vca_id,
+ )
- async def _delete_all_N2VC(self, db_nsr: dict):
+ async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
namespace = "." + db_nsr["_id"]
try:
- await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+ await self.n2vc.delete_namespace(
+ namespace=namespace,
+ total_timeout=self.timeout_charm_delete,
+ vca_id=vca_id,
+ )
except N2VCNotFound: # already deleted. Skip
pass
self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
stage[1] = "Getting vnf descriptors from db."
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list}
db_vnfds_from_id = {}
db_vnfds_from_member_index = {}
# Loop over VNFRs
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
config_descriptor = None
+
+ vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr)
if not vca or not vca.get("ee_id"):
continue
if not vca.get("member-vnf-index"):
# self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
# vca_index, vca.get("ee_id"), vca_type, destroy_ee))
task = asyncio.ensure_future(
- self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
- destroy_ee, exec_terminate_primitives))
+ self.destroy_N2VC(
+ logging_text,
+ db_nslcmop,
+ vca,
+ config_descriptor,
+ vca_index,
+ destroy_ee,
+ exec_terminate_primitives,
+ vca_id=vca_id,
+ )
+ )
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
if nsr_deployed.get("VCA"):
stage[1] = "Deleting all execution environments."
self.logger.debug(logging_text + stage[1])
- task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
- timeout=self.timeout_charm_delete))
+ vca_id = self.get_vca_id({}, db_nsr)
+ task_delete_ee = asyncio.ensure_future(
+ asyncio.wait_for(
+ self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
+ timeout=self.timeout_charm_delete
+ )
+ )
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
continue
kdu_instance = kdu.get("kdu-instance")
if kdu.get("k8scluster-type") in self.k8scluster_map:
+ # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
+ vca_id = self.get_vca_id({}, db_nsr)
task_delete_kdu_instance = asyncio.ensure_future(
self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu_instance))
+ kdu_instance=kdu_instance,
+ vca_id=vca_id,
+ )
+ )
else:
self.logger.error(logging_text + "Unknown k8s deployment type {}".
format(kdu.get("k8scluster-type")))
.format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
return ee_id, vca_type
- async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
- timeout=None, vca_type=None, db_dict=None) -> (str, str):
+ async def _ns_execute_primitive(
+ self,
+ ee_id,
+ primitive,
+ primitive_params,
+ retries=0,
+ retries_interval=30,
+ timeout=None,
+ vca_type=None,
+ db_dict=None,
+ vca_id: str = None,
+ ) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
params_dict=primitive_params,
progress_timeout=self.timeout_progress_primitive,
total_timeout=self.timeout_primitive,
- db_dict=db_dict),
+ db_dict=db_dict,
+ vca_id=vca_id,
+ ),
timeout=timeout or self.timeout_primitive)
# execution was OK
break
self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_id = self.get_vca_id({}, db_nsr)
if db_nsr['_admin']['deployed']['K8s']:
for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
- await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
+ await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id)
else:
for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
table, filter = "nsrs", {"_id": nsr_id}
step = "Getting nsd from database"
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance")),
- timeout=timeout_ns_action)
+ kdu_instance=kdu.get("kdu-instance"),
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action
+ )
else:
kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
kdu_instance=kdu_instance,
primitive_name=primitive_name,
params=params, db_dict=db_dict,
- timeout=timeout_ns_action),
- timeout=timeout_ns_action)
+ timeout=timeout_ns_action,
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action
+ )
if detailed_status:
nslcmop_operation_state = 'COMPLETED'
primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
timeout=timeout_ns_action,
vca_type=vca_type,
- db_dict=db_dict)
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
db_nslcmop_update["detailed-status"] = detailed_status
error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
step = "Getting vnfr from database"
db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- ee_id, primitive_name, primitive_params, vca_type=vca_type)
+ ee_id, primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
operation_params = db_nslcmop.get("operationParams") or {}
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
- task = asyncio.ensure_future(asyncio.wait_for(
- self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, destroy_ee=True,
- exec_primitives=exec_terminate_primitives,
- scaling_in=True), timeout=self.timeout_charm_delete))
+ task = asyncio.ensure_future(
+ asyncio.wait_for(
+ self.destroy_N2VC(
+ logging_text,
+ db_nslcmop,
+ vca,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=exec_terminate_primitives,
+ scaling_in=True,
+ vca_id=vca_id,
+ ),
+ timeout=self.timeout_charm_delete
+ )
+ )
# wait before next removal
await asyncio.sleep(30)
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- ee_id, primitive_name, primitive_params, vca_type=vca_type)
+ ee_id,
+ primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+from unittest import TestCase
+from unittest.mock import Mock, patch, MagicMock
+
+
+from osm_common import msgbase
+from osm_common.dbbase import DbException
+from osm_lcm.vim_sdn import VcaLcm
+
+
+class AsyncMock(MagicMock):
+ async def __call__(self, *args, **kwargs):
+ return super(AsyncMock, self).__call__(*args, **kwargs)
+
+
+class TestVcaLcm(TestCase):
+ @patch("osm_lcm.lcm_utils.Database")
+ @patch("osm_lcm.lcm_utils.Filesystem")
+ def setUp(self, mock_filesystem, mock_database):
+ self.loop = asyncio.get_event_loop()
+ self.msg = Mock(msgbase.MsgBase())
+ self.lcm_tasks = Mock()
+ self.config = {"database": {"driver": "mongo"}}
+ self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.vca_lcm.db = Mock()
+ self.vca_lcm.fs = Mock()
+
+ def test_vca_lcm_create(self):
+ vca_content = {"op_id": "order-id", "_id": "id"}
+ db_vca = {
+ "_id": "vca-id",
+ "secret": "secret",
+ "cacert": "cacert",
+ "schema_version": "1.11",
+ }
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.vca_lcm.db.get_one.return_value = db_vca
+ self.vca_lcm.n2vc.validate_vca = AsyncMock()
+ self.vca_lcm.update_db_2 = Mock()
+
+ self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id))
+
+ self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id")
+ self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with(
+ db_vca,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version="1.11",
+ salt="vca-id",
+ )
+ self.vca_lcm.update_db_2.assert_called_with(
+ "vca",
+ "id",
+ {
+ "_admin.operationalState": "ENABLED",
+ "_admin.detailed-status": "Connectivity: ok",
+ },
+ )
+ self.lcm_tasks.unlock_HA.assert_called_with(
+ "vca",
+ "create",
+ "order-id",
+ operationState="COMPLETED",
+ detailed_status="VCA validated",
+ )
+ self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+ def test_vca_lcm_create_exception(self):
+ vca_content = {"op_id": "order-id", "_id": "id"}
+ db_vca = {
+ "_id": "vca-id",
+ "secret": "secret",
+ "cacert": "cacert",
+ "schema_version": "1.11",
+ }
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.vca_lcm.db.get_one.return_value = db_vca
+ self.vca_lcm.n2vc.validate_vca = AsyncMock()
+ self.vca_lcm.n2vc.validate_vca.side_effect = Exception("failed")
+ self.vca_lcm.update_db_2 = Mock()
+ self.vca_lcm.update_db_2.side_effect = DbException("failed")
+ self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id))
+
+ self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id")
+ self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with(
+ db_vca,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version="1.11",
+ salt="vca-id",
+ )
+ self.vca_lcm.update_db_2.assert_called_with(
+ "vca",
+ "id",
+ {
+ "_admin.operationalState": "ERROR",
+ "_admin.detailed-status": "Failed with exception: failed",
+ },
+ )
+ self.lcm_tasks.unlock_HA.assert_not_called()
+ self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+ def test_vca_lcm_delete(self):
+ vca_content = {"op_id": "order-id", "_id": "id"}
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.vca_lcm.update_db_2 = Mock()
+
+ self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id))
+
+ self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id")
+ self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"})
+ self.vca_lcm.update_db_2.assert_called_with("vca", "id", None)
+ self.lcm_tasks.unlock_HA.assert_called_with(
+ "vca",
+ "delete",
+ "order-id",
+ operationState="COMPLETED",
+ detailed_status="deleted",
+ )
+ self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+ def test_vca_lcm_delete_exception(self):
+ vca_content = {"op_id": "order-id", "_id": "id"}
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.vca_lcm.update_db_2 = Mock()
+ self.vca_lcm.db.del_one.side_effect = Exception("failed deleting")
+ self.vca_lcm.update_db_2.side_effect = DbException("failed")
+
+ self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id))
+
+ self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id")
+ self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"})
+ self.vca_lcm.update_db_2.assert_called_with(
+ "vca",
+ "id",
+ {
+ "_admin.operationalState": "ERROR",
+ "_admin.detailed-status": "Failed with exception: failed deleting",
+ },
+ )
+ self.lcm_tasks.unlock_HA.not_called()
+ self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_helm3_conn import K8sHelm3Connector
from n2vc.k8s_juju_conn import K8sJujuConnector
+from n2vc.n2vc_juju_conn import N2VCJujuConnector
from n2vc.exceptions import K8sException, N2VCException
from osm_common.dbbase import DbException
from copy import deepcopy
log=self.logger,
loop=self.loop,
on_update_db=None,
- vca_config=self.vca_config,
db=self.db,
fs=self.fs
)
for task_name in ("helm-chart", "juju-bundle", "helm-chart-v3"):
if init_target and task_name not in init_target:
continue
- task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials,
- reuse_cluster_uuid=k8scluster_id))
+ task = asyncio.ensure_future(
+ self.k8s_map[task_name].init_env(
+ k8s_credentials,
+ reuse_cluster_uuid=k8scluster_id,
+ vca_id=db_k8scluster.get("vca_id"),
+ )
+ )
pending_tasks.append(task)
task2name[task] = task_name
if k8s_jb_id: # delete in reverse order of creation
step = "Removing juju-bundle '{}'".format(k8s_jb_id)
uninstall_sw = deep_get(db_k8scluster, ("_admin", "juju-bundle", "created")) or False
- cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw)
+ cluster_removed = await self.juju_k8scluster.reset(
+ cluster_uuid=k8s_jb_id,
+ uninstall_sw=uninstall_sw,
+ vca_id=db_k8scluster.get("vca_id"),
+ )
db_k8scluster_update["_admin.juju-bundle.id"] = None
db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED"
self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+class VcaLcm(LcmBase):
+ timeout_create = 30
+
+ def __init__(self, msg, lcm_tasks, config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.vca")
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+
+ super().__init__(msg, self.logger)
+
+ # create N2VC connector
+ self.n2vc = N2VCJujuConnector(
+ log=self.logger,
+ loop=self.loop,
+ fs=self.fs,
+ db=self.db
+ )
+
+ def _get_vca_by_id(self, vca_id: str) -> dict:
+ db_vca = self.db.get_one("vca", {"_id": vca_id})
+ self.db.encrypt_decrypt_fields(
+ db_vca,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version=db_vca["schema_version"], salt=db_vca["_id"]
+ )
+ return db_vca
+
+ async def create(self, vca_content, order_id):
+ op_id = vca_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("vca", "create", op_id):
+ return
+
+ vca_id = vca_content["_id"]
+ self.logger.debug("Task vca_create={} {}".format(vca_id, "Enter"))
+
+ db_vca = None
+ db_vca_update = {}
+
+ try:
+ self.logger.debug("Task vca_create={} {}".format(vca_id, "Getting vca from db"))
+ db_vca = self._get_vca_by_id(vca_id)
+
+ task = asyncio.ensure_future(
+ asyncio.wait_for(
+ self.n2vc.validate_vca(db_vca["_id"]),
+ timeout=self.timeout_create,
+ )
+ )
+
+ await asyncio.wait([task], return_when=asyncio.FIRST_COMPLETED)
+ if task.exception():
+ raise task.exception()
+ self.logger.debug("Task vca_create={} {}".format(vca_id, "vca registered and validated successfully"))
+ db_vca_update["_admin.operationalState"] = "ENABLED"
+ db_vca_update["_admin.detailed-status"] = "Connectivity: ok"
+ operation_details = "VCA validated"
+ operation_state = "COMPLETED"
+
+ self.logger.debug("Task vca_create={} {}".format(vca_id, "Done. Result: {}".format(operation_state)))
+
+ except Exception as e:
+ error_msg = "Failed with exception: {}".format(e)
+ self.logger.error("Task vca_create={} {}".format(vca_id, error_msg))
+ db_vca_update["_admin.operationalState"] = "ERROR"
+ db_vca_update["_admin.detailed-status"] = error_msg
+ operation_state = "FAILED"
+ operation_details = error_msg
+ finally:
+ try:
+ self.update_db_2("vca", vca_id, db_vca_update)
+
+ # Register the operation and unlock
+ self.lcm_tasks.unlock_HA(
+ "vca",
+ "create",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details
+ )
+ except DbException as e:
+ self.logger.error("Task vca_create={} {}".format(vca_id, "Cannot update database: {}".format(e)))
+ self.lcm_tasks.remove("vca", vca_id, order_id)
+
+ async def delete(self, vca_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If "vim_content" does not include "op_id", we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
+ # Register "delete" task here for related future HA operations
+ op_id = vca_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("vca", "delete", op_id):
+ return
+
+ db_vca_update = {}
+ vca_id = vca_content["_id"]
+
+ try:
+ self.logger.debug("Task vca_delete={} {}".format(vca_id, "Deleting vca from db"))
+ self.db.del_one("vca", {"_id": vca_id})
+ db_vca_update = None
+ operation_details = "deleted"
+ operation_state = "COMPLETED"
+
+ self.logger.debug("Task vca_delete={} {}".format(vca_id, "Done. Result: {}".format(operation_state)))
+ except Exception as e:
+ error_msg = "Failed with exception: {}".format(e)
+ self.logger.error("Task vca_delete={} {}".format(vca_id, error_msg))
+ db_vca_update["_admin.operationalState"] = "ERROR"
+ db_vca_update["_admin.detailed-status"] = error_msg
+ operation_state = "FAILED"
+ operation_details = error_msg
+ finally:
+ try:
+ self.update_db_2("vca", vca_id, db_vca_update)
+ self.lcm_tasks.unlock_HA(
+ "vca",
+ "delete",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error("Task vca_delete={} {}".format(vca_id, "Cannot update database: {}".format(e)))
+ self.lcm_tasks.remove("vca", vca_id, order_id)
+
+
class K8sRepoLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config, loop):