self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
+ self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
+ self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.vca_config, self.loop)
async def check_RO_version(self):
tries = 14
except Exception as e:
self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e))
return
+ elif topic == "k8scluster":
+ if command == "create" or command == "created":
+ k8scluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8scluster.create(params, order_id))
+ self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_create", task)
+ return
+ elif command == "delete" or command == "deleted":
+ k8scluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
+ self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
+ return
+ elif topic == "k8srepo":
+ if command == "create" or command == "created":
+ k8srepo_id = params.get("_id")
+ self.logger.debug("k8srepo_id = {}".format(k8srepo_id))
+ task = asyncio.ensure_future(self.k8srepo.create(params, order_id))
+ self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_create", task)
+ return
+ elif command == "delete" or command == "deleted":
+ k8srepo_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8srepo.delete(params, order_id))
+ self.lcm_tasks.register("k8srepo", k8srepo_id, order_id, "k8srepo_delete", task)
+ return
elif topic == "ns":
- if command == "instantiate":
+ if command == "instantiate" or command == "instantiated":
# self.logger.debug("Deploying NS {}".format(nsr_id))
nslcmop = params
nslcmop_id = nslcmop["_id"]
task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
return
- elif command == "terminate":
+ elif command == "terminate" or command == "terminated":
# self.logger.debug("Deleting NS {}".format(nsr_id))
nslcmop = params
nslcmop_id = nslcmop["_id"]
elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
return
elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
- if command == "instantiate":
+ if command == "instantiate" or command == "instantiated":
# self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
nsilcmop = params
nsilcmop_id = nsilcmop["_id"] # slice operation id
task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
return
- elif command == "terminate":
+ elif command == "terminate" or command == "terminated":
# self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
nsilcmop = params
nsilcmop_id = nsilcmop["_id"] # slice operation id
return
elif topic == "vim_account":
vim_id = params["_id"]
- if command == "create":
+ if command == "create" or command == "created":
task = asyncio.ensure_future(self.vim.create(params, order_id))
self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
return
- elif command == "delete":
+ elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, vim_id)
task = asyncio.ensure_future(self.vim.delete(params, order_id))
self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task)
print("not implemented show with vim_account")
sys.stdout.flush()
return
- elif command == "edit":
+ elif command == "edit" or command == "edited":
task = asyncio.ensure_future(self.vim.edit(params, order_id))
self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
return
elif topic == "wim_account":
wim_id = params["_id"]
- if command == "create":
+ if command == "create" or command == "created":
task = asyncio.ensure_future(self.wim.create(params, order_id))
self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
return
- elif command == "delete":
+ elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, wim_id)
task = asyncio.ensure_future(self.wim.delete(params, order_id))
self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
print("not implemented show with wim_account")
sys.stdout.flush()
return
- elif command == "edit":
+ elif command == "edit" or command == "edited":
task = asyncio.ensure_future(self.wim.edit(params, order_id))
self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
return
elif topic == "sdn":
_sdn_id = params["_id"]
- if command == "create":
+ if command == "create" or command == "created":
task = asyncio.ensure_future(self.sdn.create(params, order_id))
self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
return
- elif command == "delete":
+ elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, _sdn_id)
task = asyncio.ensure_future(self.sdn.delete(params, order_id))
self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task)
return
- elif command == "edit":
+ elif command == "edit" or command == "edited":
task = asyncio.ensure_future(self.sdn.edit(params, order_id))
self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task)
return
self.first_start = True
while self.consecutive_errors < 10:
try:
- topics = ("ns", "vim_account", "wim_account", "sdn", "nsi")
+ topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo")
topics_admin = ("admin", )
await asyncio.gather(
self.msg.aioread(topics, self.loop, self.kafka_read_callback),
from osm_lcm import ROclient
from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
+from n2vc.k8s_helm_conn import K8sHelmConnector
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
# api_proxy=vca_config.get('apiproxy')
)
+ self.k8sclusterhelm = K8sHelmConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helmpath"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None,
+ )
+
# create RO client
self.RO = ROclient.ROClient(self.loop, **self.ro_config)
def _on_update_n2vc_db(self, table, filter, path, updated_data):
- self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={})'
+ self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
.format(table, filter, path, updated_data))
+ return
# write NS status to database
# try:
# # nsrs_id = filter.get('_id')
+ # # print(nsrs_id)
# # get ns record
# nsr = self.db.get_one(table=table, q_filter=filter)
# # get VCA deployed list
# vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
# # get RO deployed
- # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
+ # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
# for vca in vca_list:
- # status = vca.get('status')
- # detailed_status = vca.get('detailed-status')
- # for ro in ro_list:
- # pass
+ # # status = vca.get('status')
+ # # print(status)
+ # # detailed_status = vca.get('detailed-status')
+ # # print(detailed_status)
+ # # for ro in ro_list:
+ # # print(ro)
#
# except Exception as e:
- # self.logger.error('_on_update_n2vc_db(table={},filter={},path={},updated_data={}) Error updating db: {}'
- # .format(table, filter, path, updated_data, e))
+ # self.logger.error('Error writing NS status to db: {}'.format(e))
def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
"""
vnfd_RO.pop("vnf-configuration", None)
vnfd_RO.pop("monitoring-param", None)
vnfd_RO.pop("scaling-group-descriptor", None)
+ vnfd_RO.pop("kdu", None)
+ vnfd_RO.pop("k8s-cluster", None)
if new_id:
vnfd_RO["id"] = new_id
RO_descriptor_number = 0 # number of descriptors created at RO
vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
start_deploy = time()
+ vdu_flag = False # If any of the VNFDs has VDUs
ns_params = db_nslcmop.get("operationParams")
# deploy RO
# get vnfds, instantiate at RO
+
for c_vnf in nsd.get("constituent-vnfd", ()):
member_vnf_index = c_vnf["member-vnf-index"]
vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
+ if vnfd.get("vdu"):
+ vdu_flag = True
vnfd_ref = vnfd["id"]
- step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(
- vnfd_ref, member_vnf_index)
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
+ " RO".format(vnfd_ref, member_vnf_index)
# self.logger.debug(logging_text + step)
vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# create nsd at RO
nsd_ref = nsd["id"]
- step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
# self.logger.debug(logging_text + step)
RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# Crate ns at RO
# if present use it unless in error status
RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
if RO_nsr_id:
try:
- step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
# self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
desc = await self.RO.show("ns", RO_nsr_id)
except ROclient.ROClientException as e:
ns_status, ns_status_info = self.RO.check_ns_status(desc)
db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
if ns_status == "ERROR":
- step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
+ .format(RO_nsr_id)
self.logger.debug(logging_text + step)
await self.RO.delete("ns", RO_nsr_id)
RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
if not RO_nsr_id:
- step = db_nsr_update["detailed-status"] = "Checking dependencies"
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
# self.logger.debug(logging_text + step)
# check if VIM is creating and wait look if previous tasks in process
self.logger.debug(logging_text + step)
await asyncio.wait(task_dependency, timeout=3600)
- step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
+ step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
+ # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# wait until NS is ready
step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
elif ns_status == "ACTIVE":
step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
try:
- self.ns_update_vnfr(db_vnfrs, desc)
+ if vdu_flag:
+ self.ns_update_vnfr(db_vnfrs, desc)
break
except LcmExceptionNoMgmtIP:
pass
else:
assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
if detailed_status != detailed_status_old:
- detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
+ detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
await asyncio.sleep(5, loop=self.loop)
else: # total_deploy_timeout
raise ROclient.ROClientException("Timeout waiting ns to be ready")
step = "Updating NSR"
self.ns_update_nsr(db_nsr_update, db_nsr, desc)
- db_nsr_update["operational-status"] = "running"
- db_nsr["detailed-status"] = "Deployed at VIM"
- db_nsr_update["detailed-status"] = "Deployed at VIM"
+ db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
+ db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
+ db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
step = "Deployed at VIM"
self.logger.debug(logging_text + step)
# returns IP address
async def insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
+ self.logger.debug(logging_text + "Starting insert_key_ro")
+
ro_nsr_id = None
ip_address = None
nb_tries = 0
if not target_vdu_id:
continue
+ self.logger.debug(logging_text + "IP address={}".format(ip_address))
+
# inject public key into machine
if pub_key and user:
+ self.logger.debug(logging_text + "Inserting RO key")
try:
ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
result_dict = await self.RO.create_action(
raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
self.logger.debug(logging_text + "error injecting key: {}".format(e))
else:
- # no user or pub_key ---> break while true
break
return ip_address
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
- vdu_index, config_descriptor, deploy_params, base_folder):
+ kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
'path': db_update_entry
}
+
logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} "\
.format(db_vnfr["member-vnf-index-ref"], vdu_id, vdu_index)
db_nslcmop = None
# update operation on nsrs
- db_nsr_update = {"_admin.nslcmop": nslcmop_id}
+ db_nsr_update = {"_admin.nslcmop": nslcmop_id,
+ "_admin.current-operation": nslcmop_id,
+ "_admin.operation-type": "instantiate"}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
# update operation on nslcmops
db_nslcmop_update = {}
try:
# wait for any previous tasks in process
step = "Waiting for previous tasks"
+ self.logger.debug(logging_text + step)
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
# read from db: vnf's of this ns
step = "Getting vnfrs from db"
+ self.logger.debug(logging_text + step)
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
# read from db: vnfd's for every vnf
if vnfd_id not in db_vnfds:
# read from cb
step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
+ self.logger.debug(logging_text + step)
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
# store vnfd
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
-
+ self.logger.debug(logging_text + "Before deploy_kdus")
+ db_k8scluster_list = self.db.get_list("k8sclusters", {})
+ # Call to deploy_kdus in case exists the "vdu:kdu" param
+ task_kdu = asyncio.ensure_future(
+ self.deploy_kdus(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nsd=nsd,
+ db_nsr=db_nsr,
+ db_nslcmop=db_nslcmop,
+ db_vnfrs=db_vnfrs,
+ db_vnfds_ref=db_vnfds_ref,
+ db_k8scluster=db_k8scluster_list
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
+ task_instantiation_list.append(task_kdu)
# n2vc_redesign STEP 1 Get VCA public ssh-key
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = await self.n2vc.get_public_key()
vdu_id = None
vdu_index = 0
vdu_name = None
+ kdu_name = None
# Get additional parameters
deploy_params = {}
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
+ kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
# "member_vnf_index={}".format(vdu_id, member_vnf_index))
# vdu_name = vdur.get("name")
vdu_name = None
+ kdu_name = None
for vdu_index in range(int(vdud.get("count", 1))):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
+ kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
base_folder=base_folder,
task_instantiation_list=task_instantiation_list
)
+ for kdud in get_iterable(vnfd, 'kdu'):
+ kdu_name = kdud["name"]
+ descriptor_config = kdud.get('kdu-configuration')
+ if descriptor_config and descriptor_config.get("juju"):
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ # vdu_name = None
+
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_list=task_instantiation_list
+ )
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
db_vnfr = None
member_vnf_index = None
vdu_id = None
+ kdu_name = None
vdu_index = 0
vdu_name = None
# Get additional parameters
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
+ kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
db_nsr_update["config-status"] = "configured"
db_nsr_update["detailed-status"] = "done"
- return
-
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
exc = e
try:
if db_nsr:
db_nsr_update["_admin.nslcmop"] = None
+ db_nsr_update["_admin.current-operation"] = None
+ db_nsr_update["_admin.operation-type"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
if db_nslcmop_update:
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+ async def deploy_kdus(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, db_k8scluster):
+ # Launch kdus if present in the descriptor
+ logging_text = "Deploy kdus: "
+ db_nsr_update = {}
+ db_nsr_update["_admin.deployed.K8s"] = []
+ try:
+ # Look for all vnfds
+ # db_nsr_update["_admin.deployed.K8s"] = []
+ vnf_update = []
+ task_list = []
+ for c_vnf in nsd.get("constituent-vnfd", ()):
+ vnfr = db_vnfrs[c_vnf["member-vnf-index"]]
+ member_vnf_index = c_vnf["member-vnf-index"]
+ vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
+ vnfd_ref = vnfd["id"]
+ desc_params = {}
+
+ step = "Checking kdu from vnf: {} - member-vnf-index: {}".format(vnfd_ref, member_vnf_index)
+ self.logger.debug(logging_text + step)
+ if vnfd.get("kdu"):
+ step = "vnf: {} has kdus".format(vnfd_ref)
+ self.logger.debug(logging_text + step)
+ for vnfr_name, vnfr_data in db_vnfrs.items():
+ if vnfr_data["vnfd-ref"] == vnfd["id"]:
+ if vnfr_data.get("additionalParamsForVnf"):
+ desc_params = self._format_additional_params(vnfr_data["additionalParamsForVnf"])
+ break
+ else:
+ raise LcmException("VNF descriptor not found with id: {}".format(vnfr_data["vnfd-ref"]))
+ self.logger.debug(logging_text + step)
+
+ for kdur in vnfr.get("kdur"):
+ index = 0
+ for k8scluster in db_k8scluster:
+ if kdur["k8s-cluster"]["id"] == k8scluster["_id"]:
+ cluster_uuid = k8scluster["cluster-uuid"]
+ break
+ else:
+ raise LcmException("K8scluster not found with id: {}".format(kdur["k8s-cluster"]["id"]))
+ self.logger.debug(logging_text + step)
+
+ step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
+ self.logger.debug(logging_text + step)
+ for kdu in vnfd.get("kdu"):
+ if kdu.get("name") == kdur["kdu-name"]:
+ break
+ else:
+ raise LcmException("KDU not found with name: {} in VNFD {}".format(kdur["kdu-name"],
+ vnfd["name"]))
+ self.logger.debug(logging_text + step)
+ kdumodel = None
+ k8sclustertype = None
+ if kdu.get("helm-chart"):
+ kdumodel = kdu["helm-chart"]
+ k8sclustertype = "chart"
+ elif kdu.get("juju-bundle"):
+ kdumodel = kdu["juju-bundle"]
+ k8sclustertype = "juju"
+ k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
+ "vnfr-id": vnfr["id"], "k8scluster-type": k8sclustertype,
+ "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ db_nsr_update["_admin.deployed.K8s"].append(k8s_instace_info)
+ db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
+ "{}".format(index)}
+ if k8sclustertype == "chart":
+ task = self.k8sclusterhelm.install(cluster_uuid=cluster_uuid,
+ kdu_model=kdumodel, atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=300)
+ else:
+ # TODO I need the juju connector in place
+ pass
+ task_list.append(task)
+ index += 1
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ done = None
+ pending = None
+ if len(task_list) > 0:
+ self.logger.debug('Waiting for terminate pending tasks...')
+ done, pending = await asyncio.wait(task_list, timeout=3600)
+ if not pending:
+ for fut in done:
+ k8s_instance = fut.result()
+ k8s_instace_info = {"kdu-instance": k8s_instance, "k8scluster-uuid": cluster_uuid,
+ "vnfr-id": vnfr["id"], "k8scluster-type": k8sclustertype,
+ "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ vnf_update.append(k8s_instace_info)
+ self.logger.debug('All tasks finished...')
+ else:
+ self.logger.info('There are pending tasks: {}'.format(pending))
+
+ db_nsr_update["_admin.deployed.K8s"] = vnf_update
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
+ raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
+ finally:
+ # TODO Write in data base
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
- member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
+ kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
base_folder, task_instantiation_list):
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
continue
if vca_deployed.get("member-vnf-index") == member_vnf_index and \
vca_deployed.get("vdu_id") == vdu_id and \
+ vca_deployed.get("kdu_name") == kdu_name and \
vca_deployed.get("vdu_count_index", 0) == vdu_index:
break
else:
vca_deployed = {
"member-vnf-index": member_vnf_index,
"vdu_id": vdu_id,
+ "kdu_name": kdu_name,
"vdu_count_index": vdu_index,
"operational-status": "init", # TODO revise
"detailed-status": "", # TODO revise
db_nsr=db_nsr,
db_vnfr=db_vnfr,
vdu_id=vdu_id,
+ kdu_name=kdu_name,
vdu_index=vdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
}
return nslcmop
+ def _format_additional_params(self, params):
+
+ for key, value in params.items():
+ if str(value).startswith("!!yaml "):
+ params[key] = yaml.safe_load(value[7:])
+
+ return params
+
def _get_terminate_primitive_params(self, seq, vnf_index):
primitive = seq.get('name')
primitive_params = {}
db_nslcmop = None
exc = None
failed_detail = [] # annotates all failed error messages
- db_nsr_update = {"_admin.nslcmop": nslcmop_id}
+ db_nsr_update = {"_admin.nslcmop": nslcmop_id,
+ "_admin.current-operation": nslcmop_id,
+ "_admin.operation-type": "terminate"}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
db_nslcmop_update = {}
nslcmop_operation_state = None
autoremove = False # autoremove after terminated
# Call internal terminate action
await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
+ pending_tasks = []
+
db_nsr_update["operational-status"] = "terminating"
db_nsr_update["config-status"] = "terminating"
self.logger.error(msg)
failed_detail.append(msg)
+ try:
+ # Delete from k8scluster
+ step = "delete kdus"
+ self.logger.debug(logging_text + step)
+ print(nsr_deployed)
+ if nsr_deployed:
+ for kdu in nsr_deployed.get("K8s"):
+ if kdu.get("k8scluster-type") == "chart":
+ task_delete_kdu_instance = asyncio.ensure_future(self.k8sclusterhelm.uninstall(
+ cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu.get("kdu-instance")))
+ elif kdu.get("k8scluster-type") == "juju":
+ # TODO Juju connector needed
+ pass
+ else:
+ msg = "k8scluster-type not defined"
+ raise LcmException(msg)
+
+ pending_tasks.append(task_delete_kdu_instance)
+ except LcmException as e:
+ msg = "Failed while deleting KDUs from NS: {}".format(e)
+ self.logger.error(msg)
+ failed_detail.append(msg)
+
# remove from RO
RO_fail = False
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
if db_nsr:
db_nsr_update["_admin.nslcmop"] = None
+ db_nsr_update["_admin.current-operation"] = None
+ db_nsr_update["_admin.operation-type"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
# get all needed from database
db_nsr = None
db_nslcmop = None
- db_nsr_update = {"_admin.nslcmop": nslcmop_id}
+ db_nsr_update = {"_admin.nslcmop": nslcmop_id,
+ "_admin.current-operation": nslcmop_id,
+ "_admin.operation-type": "action"}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
db_nslcmop_update = {}
nslcmop_operation_state = None
nslcmop_operation_state_detail = None
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
vdu_id = db_nslcmop["operationParams"].get("vdu_id")
+ kdu_name = db_nslcmop["operationParams"].get("kdu_name")
vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
vdu_name = db_nslcmop["operationParams"].get("vdu_name")
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
+ elif kdu_name:
+ self.logger.debug(logging_text + "Checking actions in KDUs")
+ desc_params = {}
+ if vnf_index:
+ if db_vnfr.get("additionalParamsForVnf") and db_vnfr["additionalParamsForVnf"].\
+ get("member-vnf-index") == vnf_index:
+ desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].
+ get("additionalParams"))
+ if primitive_params:
+ desc_params.update(primitive_params)
+ # TODO Check if we will need something at vnf level
+ index = 0
+ for kdu in get_iterable(nsr_deployed, "K8s"):
+ if kdu_name == kdu["kdu-name"]:
+ db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index)}
+ if primitive == "upgrade":
+ if desc_params.get("kdu_model"):
+ kdu_model = desc_params.get("kdu_model")
+ del desc_params["kdu_model"]
+ else:
+ kdu_model = kdu.get("kdu-model")
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+
+ if kdu.get("k8scluster-type") == "chart":
+ output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ atomic=True, kdu_model=kdu_model,
+ params=desc_params, db_dict=db_dict, timeout=300)
+ elif kdu.get("k8scluster-type") == "juju":
+ # TODO Juju connector needed
+ pass
+ else:
+ msg = "k8scluster-type not defined"
+ raise LcmException(msg)
+
+ self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
+ break
+ elif primitive == "rollback":
+ if kdu.get("k8scluster-type") == "chart":
+ output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ db_dict=db_dict)
+ elif kdu.get("k8scluster-type") == "juju":
+ # TODO Juju connector needed
+ pass
+ else:
+ msg = "k8scluster-type not defined"
+ raise LcmException(msg)
+ break
+ elif primitive == "status":
+ if kdu.get("k8scluster-type") == "chart":
+ output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"))
+ elif kdu.get("k8scluster-type") == "juju":
+ # TODO Juju connector needed
+ pass
+ else:
+ msg = "k8scluster-type not defined"
+ raise LcmException(msg)
+ break
+ index += 1
+
+ else:
+ raise LcmException("KDU '{}' not found".format(kdu_name))
+ if output:
+ db_nslcmop_update["detailed-status"] = output
+ db_nslcmop_update["operationState"] = 'COMPLETED'
+ db_nslcmop_update["statusEnteredTime"] = time()
+ else:
+ db_nslcmop_update["detailed-status"] = ''
+ db_nslcmop_update["operationState"] = 'FAILED'
+ db_nslcmop_update["statusEnteredTime"] = time()
+ return
elif vnf_index:
for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
if config_primitive["name"] == primitive:
if db_vnfr.get("additionalParamsForVnf"):
desc_params.update(db_vnfr["additionalParamsForVnf"])
else:
- if db_nsr.get("additionalParamsForVnf"):
+ if db_nsr.get("additionalParamsForNs"):
desc_params.update(db_nsr["additionalParamsForNs"])
# TODO check if ns is in a proper status
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
if db_nsr:
db_nsr_update["_admin.nslcmop"] = None
+ db_nsr_update["_admin.operation-type"] = None
+ db_nsr_update["_admin.nslcmop"] = None
+ db_nsr_update["_admin.current-operation"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
db_nslcmop = None
db_nslcmop_update = {}
nslcmop_operation_state = None
- db_nsr_update = {"_admin.nslcmop": nslcmop_id}
+ db_nsr_update = {"_admin.nslcmop": nslcmop_id,
+ "_admin.current-operation": nslcmop_id,
+ "_admin.operation-type": "scale"}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
exc = None
# in case of error, indicates what part of scale was failed to put nsr at error status
scale_process = None
db_nsr_update["operational-status"] = "scaling"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ #######
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+ vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
+ vdu_id = db_nslcmop["operationParams"].get("vdu_id")
+ vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
+ vdu_name = db_nslcmop["operationParams"].get("vdu_name")
+ #######
+
RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
if db_nslcmop and db_nslcmop_update:
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
if db_nsr:
+ db_nsr_update["_admin.current-operation"] = None
+ db_nsr_update["_admin.operation-type"] = None
db_nsr_update["_admin.nslcmop"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
# under the License.
##
+import asyncio
+import yaml
import logging
import logging.handlers
from osm_lcm import ROclient
from osm_lcm.lcm_utils import LcmException, LcmBase
+from n2vc.k8s_helm_conn import K8sHelmConnector
from osm_common.dbbase import DbException
from copy import deepcopy
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
self.lcm_tasks.remove("sdn", sdn_id, order_id)
+
+
+class K8sClusterLcm(LcmBase):
+
+ def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger('lcm.k8scluster')
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.vca_config = vca_config
+ self.fs = fs
+ self.db = db
+
+ self.k8scluster = K8sHelmConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helmpath"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None
+ )
+
+ super().__init__(db, msg, fs, self.logger)
+
+ async def create(self, k8scluster_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'create' task here for related future HA operations
+ op_id = k8scluster_content.pop('op_id', None)
+ if not self.lcm_tasks.lock_HA('k8scluster', 'create', op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ k8scluster_content.pop("op_id", None)
+ logging_text = "Task k8scluster_create={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8scluster = None
+ db_k8scluster_update = {}
+
+ exc = None
+ operationState_HA = ''
+ detailed_status_HA = ''
+ try:
+ step = "Getting k8scluster-id='{}' from db".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
+ self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'],
+ schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"])
+ print(db_k8scluster.get("credentials"))
+ print("\n\n\n FIN CREDENTIALS")
+ print(yaml.safe_dump(db_k8scluster.get("credentials")))
+ print("\n\n\n FIN OUTPUT")
+ cluster_uuid, uninstall_sw = await self.k8scluster.init_env(yaml.safe_dump(db_k8scluster.
+ get("credentials")))
+ db_k8scluster_update["cluster-uuid"] = cluster_uuid
+ if uninstall_sw:
+ db_k8scluster_update["uninstall-sw"] = uninstall_sw
+ step = "Getting the list of repos"
+ self.logger.debug(logging_text + step)
+ task_list = []
+ db_k8srepo_list = self.db.get_list("k8srepos", {})
+ for repo in db_k8srepo_list:
+ step = "Adding repo {} to cluster: {}".format(repo["name"], cluster_uuid)
+ self.logger.debug(logging_text + step)
+ task = asyncio.ensure_future(self.k8scluster.repo_add(cluster_uuid=cluster_uuid,
+ name=repo["name"], url=repo["url"],
+ repo_type="chart"))
+ task_list.append(task)
+ if not repo["_admin"].get("cluster-inserted"):
+ repo["_admin"]["cluster-inserted"] = []
+ repo["_admin"]["cluster-inserted"].append(cluster_uuid)
+ self.update_db_2("k8srepos", repo["_id"], repo)
+
+ done = None
+ pending = None
+ if len(task_list) > 0:
+ self.logger.debug('Waiting for terminate pending tasks...')
+ done, pending = await asyncio.wait(task_list, timeout=3600)
+ if not pending:
+ self.logger.debug('All tasks finished...')
+ else:
+ self.logger.info('There are pending tasks: {}'.format(pending))
+ db_k8scluster_update["_admin.operationalState"] = "ENABLED"
+
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_k8scluster:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ # Mark the k8scluster 'create' HA task as erroneous
+ operationState_HA = 'FAILED'
+ detailed_status_HA = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8scluster_update:
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ # Register the K8scluster 'create' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.register_HA('k8scluster', 'create', op_id,
+ operationState=operationState_HA,
+ detailed_status=detailed_status_HA)
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
+
+ async def delete(self, k8scluster_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'delete' task here for related future HA operations
+ op_id = k8scluster_content.pop('op_id', None)
+ if not self.lcm_tasks.lock_HA('k8scluster', 'delete', op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ k8scluster_content.pop("op_id", None)
+ logging_text = "Task k8scluster_delete={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8scluster = None
+ db_k8scluster_update = {}
+ exc = None
+ operationState_HA = ''
+ detailed_status_HA = ''
+ try:
+ step = "Getting k8scluster='{}' from db".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
+ uninstall_sw = db_k8scluster.get("uninstall-sw")
+ if uninstall_sw is False or uninstall_sw is None:
+ uninstall_sw = False
+ cluster_removed = await self.k8scluster.reset(cluster_uuid=db_k8scluster.get("cluster-uuid"),
+ uninstall_sw=uninstall_sw)
+
+ if cluster_removed:
+ step = "Removing k8scluster='{}' from db".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo_list = self.db.get_list("k8srepos", {})
+ for k8srepo in db_k8srepo_list:
+ index = 0
+ for cluster in k8srepo["_admin"]["cluster-inserted"]:
+ if db_k8scluster.get("cluster-uuid") == cluster:
+ del(k8srepo["_admin"]["cluster-inserted"][index])
+ break
+ index += 1
+ self.update_db_2("k8srepos", k8srepo["_id"], k8srepo)
+ self.db.del_one("k8sclusters", {"_id": k8scluster_id})
+ else:
+ raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id))
+ # if not cluster_removed:
+ # raise Exception("K8scluster was not properly removed")
+
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_k8scluster:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ # Mark the WIM 'create' HA task as erroneous
+ operationState_HA = 'FAILED'
+ detailed_status_HA = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8scluster_update:
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ # Register the K8scluster 'delete' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.register_HA('k8scluster', 'delete', op_id,
+ operationState=operationState_HA,
+ detailed_status=detailed_status_HA)
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
+
+
+class K8sRepoLcm(LcmBase):
+
+ def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger('lcm.k8srepo')
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.vca_config = vca_config
+ self.fs = fs
+ self.db = db
+
+ self.k8srepo = K8sHelmConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helmpath"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None
+ )
+
+ super().__init__(db, msg, fs, self.logger)
+
+ async def create(self, k8srepo_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'create' task here for related future HA operations
+
+ op_id = k8srepo_content.pop('op_id', None)
+ if not self.lcm_tasks.lock_HA('k8srepo', 'create', op_id):
+ return
+
+ k8srepo_id = k8srepo_content.get("_id")
+ logging_text = "Task k8srepo_create={} ".format(k8srepo_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8srepo = None
+ db_k8srepo_update = {}
+ exc = None
+ operationState_HA = ''
+ detailed_status_HA = ''
+ try:
+ step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
+ step = "Getting k8scluster_list from db"
+ self.logger.debug(logging_text + step)
+ db_k8scluster_list = self.db.get_list("k8sclusters", {})
+ db_k8srepo_update["_admin.cluster-inserted"] = []
+ task_list = []
+ for k8scluster in db_k8scluster_list:
+ step = "Adding repo to cluster: {}".format(k8scluster["cluster-uuid"])
+ self.logger.debug(logging_text + step)
+ task = asyncio.ensure_future(self.k8srepo.repo_add(cluster_uuid=k8scluster["cluster-uuid"],
+ name=db_k8srepo["name"], url=db_k8srepo["url"],
+ repo_type="chart"))
+ task_list.append(task)
+ db_k8srepo_update["_admin.cluster-inserted"].append(k8scluster["cluster-uuid"])
+
+ done = None
+ pending = None
+ if len(task_list) > 0:
+ self.logger.debug('Waiting for terminate pending tasks...')
+ done, pending = await asyncio.wait(task_list, timeout=3600)
+ if not pending:
+ self.logger.debug('All tasks finished...')
+ else:
+ self.logger.info('There are pending tasks: {}'.format(pending))
+ db_k8srepo_update["_admin.operationalState"] = "ENABLED"
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_k8srepo:
+ db_k8srepo_update["_admin.operationalState"] = "ERROR"
+ db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ # Mark the WIM 'create' HA task as erroneous
+ operationState_HA = 'FAILED'
+ detailed_status_HA = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8srepo_update:
+ self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
+ # Register the K8srepo 'create' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.register_HA('k8srepo', 'create', op_id,
+ operationState=operationState_HA,
+ detailed_status=detailed_status_HA)
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
+
+ async def delete(self, k8srepo_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'delete' task here for related future HA operations
+ op_id = k8srepo_content.pop('op_id', None)
+ if not self.lcm_tasks.lock_HA('k8srepo', 'delete', op_id):
+ return
+
+ k8srepo_id = k8srepo_content.get("_id")
+ logging_text = "Task k8srepo_delete={} ".format(k8srepo_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8srepo = None
+ db_k8srepo_update = {}
+
+ operationState_HA = ''
+ detailed_status_HA = ''
+ try:
+ step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
+ step = "Getting k8scluster_list from db"
+ self.logger.debug(logging_text + step)
+ db_k8scluster_list = self.db.get_list("k8sclusters", {})
+
+ task_list = []
+ for k8scluster in db_k8scluster_list:
+ task = asyncio.ensure_future(self.k8srepo.repo_remove(cluster_uuid=k8scluster["cluster-uuid"],
+ name=db_k8srepo["name"]))
+ task_list.append(task)
+ done = None
+ pending = None
+ if len(task_list) > 0:
+ self.logger.debug('Waiting for terminate pending tasks...')
+ done, pending = await asyncio.wait(task_list, timeout=3600)
+ if not pending:
+ self.logger.debug('All tasks finished...')
+ else:
+ self.logger.info('There are pending tasks: {}'.format(pending))
+ self.db.del_one("k8srepos", {"_id": k8srepo_id})
+
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_k8srepo:
+ db_k8srepo_update["_admin.operationalState"] = "ERROR"
+ db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ # Mark the WIM 'create' HA task as erroneous
+ operationState_HA = 'FAILED'
+ detailed_status_HA = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8srepo_update:
+ self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
+ # Register the K8srepo 'delete' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.register_HA('k8srepo', 'delete', op_id,
+ operationState=operationState_HA,
+ detailed_status=detailed_status_HA)
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)