1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
100 from copy
import copy
, deepcopy
101 from time
import time
102 from uuid
import uuid4
104 from random
import randint
106 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
109 class NsLcm(LcmBase
):
110 timeout_vca_on_error
= (
112 ) # Time for charm from first time at blocked,error status to mark as failed
113 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
114 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
115 timeout_charm_delete
= 10 * 60
116 timeout_primitive
= 30 * 60 # timeout for primitive execution
117 timeout_progress_primitive
= (
119 ) # timeout for some progress in a primitive execution
121 SUBOPERATION_STATUS_NOT_FOUND
= -1
122 SUBOPERATION_STATUS_NEW
= -2
123 SUBOPERATION_STATUS_SKIP
= -3
124 task_name_deploy_vca
= "Deploying VCA"
126 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
128 Init, Connect to database, filesystem storage, and messaging
129 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
132 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
134 self
.db
= Database().instance
.db
135 self
.fs
= Filesystem().instance
.fs
137 self
.lcm_tasks
= lcm_tasks
138 self
.timeout
= config
["timeout"]
139 self
.ro_config
= config
["ro_config"]
140 self
.ng_ro
= config
["ro_config"].get("ng")
141 self
.vca_config
= config
["VCA"].copy()
143 # create N2VC connector
144 self
.n2vc
= N2VCJujuConnector(
147 on_update_db
=self
._on
_update
_n
2vc
_db
,
152 self
.conn_helm_ee
= LCMHelmConn(
155 vca_config
=self
.vca_config
,
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
159 self
.k8sclusterhelm2
= K8sHelmConnector(
160 kubectl_command
=self
.vca_config
.get("kubectlpath"),
161 helm_command
=self
.vca_config
.get("helmpath"),
168 self
.k8sclusterhelm3
= K8sHelm3Connector(
169 kubectl_command
=self
.vca_config
.get("kubectlpath"),
170 helm_command
=self
.vca_config
.get("helm3path"),
177 self
.k8sclusterjuju
= K8sJujuConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 juju_command
=self
.vca_config
.get("jujupath"),
182 on_update_db
=self
._on
_update
_k
8s
_db
,
187 self
.k8scluster_map
= {
188 "helm-chart": self
.k8sclusterhelm2
,
189 "helm-chart-v3": self
.k8sclusterhelm3
,
190 "chart": self
.k8sclusterhelm3
,
191 "juju-bundle": self
.k8sclusterjuju
,
192 "juju": self
.k8sclusterjuju
,
196 "lxc_proxy_charm": self
.n2vc
,
197 "native_charm": self
.n2vc
,
198 "k8s_proxy_charm": self
.n2vc
,
199 "helm": self
.conn_helm_ee
,
200 "helm-v3": self
.conn_helm_ee
,
203 self
.prometheus
= prometheus
206 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
209 def increment_ip_mac(ip_mac
, vm_index
=1):
210 if not isinstance(ip_mac
, str):
213 # try with ipv4 look for last dot
214 i
= ip_mac
.rfind(".")
217 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
218 # try with ipv6 or mac look for last colon. Operate in hex
219 i
= ip_mac
.rfind(":")
222 # format in hex, len can be 2 for mac or 4 for ipv6
223 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
224 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
230 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
232 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
235 # TODO filter RO descriptor fields...
239 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
240 db_dict
["deploymentStatus"] = ro_descriptor
241 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
243 except Exception as e
:
245 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
248 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
250 # remove last dot from path (if exists)
251 if path
.endswith("."):
254 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
255 # .format(table, filter, path, updated_data))
258 nsr_id
= filter.get("_id")
260 # read ns record from database
261 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
262 current_ns_status
= nsr
.get("nsState")
264 # get vca status for NS
265 status_dict
= await self
.n2vc
.get_status(
266 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
271 db_dict
["vcaStatus"] = status_dict
272 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
274 # update configurationStatus for this VCA
276 vca_index
= int(path
[path
.rfind(".") + 1 :])
279 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
281 vca_status
= vca_list
[vca_index
].get("status")
283 configuration_status_list
= nsr
.get("configurationStatus")
284 config_status
= configuration_status_list
[vca_index
].get("status")
286 if config_status
== "BROKEN" and vca_status
!= "failed":
287 db_dict
["configurationStatus"][vca_index
] = "READY"
288 elif config_status
!= "BROKEN" and vca_status
== "failed":
289 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
290 except Exception as e
:
291 # not update configurationStatus
292 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
294 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
295 # if nsState = 'DEGRADED' check if all is OK
297 if current_ns_status
in ("READY", "DEGRADED"):
298 error_description
= ""
300 if status_dict
.get("machines"):
301 for machine_id
in status_dict
.get("machines"):
302 machine
= status_dict
.get("machines").get(machine_id
)
303 # check machine agent-status
304 if machine
.get("agent-status"):
305 s
= machine
.get("agent-status").get("status")
308 error_description
+= (
309 "machine {} agent-status={} ; ".format(
313 # check machine instance status
314 if machine
.get("instance-status"):
315 s
= machine
.get("instance-status").get("status")
318 error_description
+= (
319 "machine {} instance-status={} ; ".format(
324 if status_dict
.get("applications"):
325 for app_id
in status_dict
.get("applications"):
326 app
= status_dict
.get("applications").get(app_id
)
327 # check application status
328 if app
.get("status"):
329 s
= app
.get("status").get("status")
332 error_description
+= (
333 "application {} status={} ; ".format(app_id
, s
)
336 if error_description
:
337 db_dict
["errorDescription"] = error_description
338 if current_ns_status
== "READY" and is_degraded
:
339 db_dict
["nsState"] = "DEGRADED"
340 if current_ns_status
== "DEGRADED" and not is_degraded
:
341 db_dict
["nsState"] = "READY"
344 self
.update_db_2("nsrs", nsr_id
, db_dict
)
346 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
348 except Exception as e
:
349 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
351 async def _on_update_k8s_db(
352 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
355 Updating vca status in NSR record
356 :param cluster_uuid: UUID of a k8s cluster
357 :param kdu_instance: The unique name of the KDU instance
358 :param filter: To get nsr_id
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
366 nsr_id
= filter.get("_id")
368 # get vca status for NS
369 vca_status
= await self
.k8sclusterjuju
.status_kdu(
372 complete_status
=True,
378 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
380 await self
.k8sclusterjuju
.update_vca_status(
381 db_dict
["vcaStatus"],
387 self
.update_db_2("nsrs", nsr_id
, db_dict
)
389 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
391 except Exception as e
:
392 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
395 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
397 env
= Environment(undefined
=StrictUndefined
)
398 template
= env
.from_string(cloud_init_text
)
399 return template
.render(additional_params
or {})
400 except UndefinedError
as e
:
402 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
403 "file, must be provided in the instantiation parameters inside the "
404 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
406 except (TemplateError
, TemplateNotFound
) as e
:
408 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
413 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
414 cloud_init_content
= cloud_init_file
= None
416 if vdu
.get("cloud-init-file"):
417 base_folder
= vnfd
["_admin"]["storage"]
418 cloud_init_file
= "{}/{}/cloud_init/{}".format(
419 base_folder
["folder"],
420 base_folder
["pkg-dir"],
421 vdu
["cloud-init-file"],
423 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
424 cloud_init_content
= ci_file
.read()
425 elif vdu
.get("cloud-init"):
426 cloud_init_content
= vdu
["cloud-init"]
428 return cloud_init_content
429 except FsException
as e
:
431 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
432 vnfd
["id"], vdu
["id"], cloud_init_file
, e
436 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
438 vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]
440 additional_params
= vdur
.get("additionalParams")
441 return parse_yaml_strings(additional_params
)
443 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
445 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
446 :param vnfd: input vnfd
447 :param new_id: overrides vnf id if provided
448 :param additionalParams: Instantiation params for VNFs provided
449 :param nsrId: Id of the NSR
450 :return: copy of vnfd
452 vnfd_RO
= deepcopy(vnfd
)
453 # remove unused by RO configuration, monitoring, scaling and internal keys
454 vnfd_RO
.pop("_id", None)
455 vnfd_RO
.pop("_admin", None)
456 vnfd_RO
.pop("monitoring-param", None)
457 vnfd_RO
.pop("scaling-group-descriptor", None)
458 vnfd_RO
.pop("kdu", None)
459 vnfd_RO
.pop("k8s-cluster", None)
461 vnfd_RO
["id"] = new_id
463 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
464 for vdu
in get_iterable(vnfd_RO
, "vdu"):
465 vdu
.pop("cloud-init-file", None)
466 vdu
.pop("cloud-init", None)
470 def ip_profile_2_RO(ip_profile
):
471 RO_ip_profile
= deepcopy(ip_profile
)
472 if "dns-server" in RO_ip_profile
:
473 if isinstance(RO_ip_profile
["dns-server"], list):
474 RO_ip_profile
["dns-address"] = []
475 for ds
in RO_ip_profile
.pop("dns-server"):
476 RO_ip_profile
["dns-address"].append(ds
["address"])
478 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
479 if RO_ip_profile
.get("ip-version") == "ipv4":
480 RO_ip_profile
["ip-version"] = "IPv4"
481 if RO_ip_profile
.get("ip-version") == "ipv6":
482 RO_ip_profile
["ip-version"] = "IPv6"
483 if "dhcp-params" in RO_ip_profile
:
484 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
487 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
488 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
489 if db_vim
["_admin"]["operationalState"] != "ENABLED":
491 "VIM={} is not available. operationalState={}".format(
492 vim_account
, db_vim
["_admin"]["operationalState"]
495 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
498 def get_ro_wim_id_for_wim_account(self
, wim_account
):
499 if isinstance(wim_account
, str):
500 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
501 if db_wim
["_admin"]["operationalState"] != "ENABLED":
503 "WIM={} is not available. operationalState={}".format(
504 wim_account
, db_wim
["_admin"]["operationalState"]
507 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
512 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
514 db_vdu_push_list
= []
515 db_update
= {"_admin.modified": time()}
517 for vdu_id
, vdu_count
in vdu_create
.items():
521 for vdur
in reversed(db_vnfr
["vdur"])
522 if vdur
["vdu-id-ref"] == vdu_id
528 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
533 for count
in range(vdu_count
):
534 vdur_copy
= deepcopy(vdur
)
535 vdur_copy
["status"] = "BUILD"
536 vdur_copy
["status-detailed"] = None
537 vdur_copy
["ip-address"]: None
538 vdur_copy
["_id"] = str(uuid4())
539 vdur_copy
["count-index"] += count
+ 1
540 vdur_copy
["id"] = "{}-{}".format(
541 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
543 vdur_copy
.pop("vim_info", None)
544 for iface
in vdur_copy
["interfaces"]:
545 if iface
.get("fixed-ip"):
546 iface
["ip-address"] = self
.increment_ip_mac(
547 iface
["ip-address"], count
+ 1
550 iface
.pop("ip-address", None)
551 if iface
.get("fixed-mac"):
552 iface
["mac-address"] = self
.increment_ip_mac(
553 iface
["mac-address"], count
+ 1
556 iface
.pop("mac-address", None)
559 ) # only first vdu can be managment of vnf
560 db_vdu_push_list
.append(vdur_copy
)
561 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
563 for vdu_id
, vdu_count
in vdu_delete
.items():
565 indexes_to_delete
= [
567 for iv
in enumerate(db_vnfr
["vdur"])
568 if iv
[1]["vdu-id-ref"] == vdu_id
572 "vdur.{}.status".format(i
): "DELETING"
573 for i
in indexes_to_delete
[-vdu_count
:]
577 # it must be deleted one by one because common.db does not allow otherwise
580 for v
in reversed(db_vnfr
["vdur"])
581 if v
["vdu-id-ref"] == vdu_id
583 for vdu
in vdus_to_delete
[:vdu_count
]:
586 {"_id": db_vnfr
["_id"]},
588 pull
={"vdur": {"_id": vdu
["_id"]}},
590 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
591 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
592 # modify passed dictionary db_vnfr
593 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
594 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
596 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
598 Updates database nsr with the RO info for the created vld
599 :param ns_update_nsr: dictionary to be filled with the updated info
600 :param db_nsr: content of db_nsr. This is also modified
601 :param nsr_desc_RO: nsr descriptor from RO
602 :return: Nothing, LcmException is raised on errors
605 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
606 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
607 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
609 vld
["vim-id"] = net_RO
.get("vim_net_id")
610 vld
["name"] = net_RO
.get("vim_name")
611 vld
["status"] = net_RO
.get("status")
612 vld
["status-detailed"] = net_RO
.get("error_msg")
613 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
617 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
620 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
622 for db_vnfr
in db_vnfrs
.values():
623 vnfr_update
= {"status": "ERROR"}
624 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
625 if "status" not in vdur
:
626 vdur
["status"] = "ERROR"
627 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
629 vdur
["status-detailed"] = str(error_text
)
631 "vdur.{}.status-detailed".format(vdu_index
)
633 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
634 except DbException
as e
:
635 self
.logger
.error("Cannot update vnf. {}".format(e
))
637 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
639 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
640 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
641 :param nsr_desc_RO: nsr descriptor from RO
642 :return: Nothing, LcmException is raised on errors
644 for vnf_index
, db_vnfr
in db_vnfrs
.items():
645 for vnf_RO
in nsr_desc_RO
["vnfs"]:
646 if vnf_RO
["member_vnf_index"] != vnf_index
:
649 if vnf_RO
.get("ip_address"):
650 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
653 elif not db_vnfr
.get("ip-address"):
654 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
655 raise LcmExceptionNoMgmtIP(
656 "ns member_vnf_index '{}' has no IP address".format(
661 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
662 vdur_RO_count_index
= 0
663 if vdur
.get("pdu-type"):
665 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
666 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
668 if vdur
["count-index"] != vdur_RO_count_index
:
669 vdur_RO_count_index
+= 1
671 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
672 if vdur_RO
.get("ip_address"):
673 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
675 vdur
["ip-address"] = None
676 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
677 vdur
["name"] = vdur_RO
.get("vim_name")
678 vdur
["status"] = vdur_RO
.get("status")
679 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
680 for ifacer
in get_iterable(vdur
, "interfaces"):
681 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
682 if ifacer
["name"] == interface_RO
.get("internal_name"):
683 ifacer
["ip-address"] = interface_RO
.get(
686 ifacer
["mac-address"] = interface_RO
.get(
692 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
693 "from VIM info".format(
694 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
697 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
703 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
707 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
708 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
709 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
711 vld
["vim-id"] = net_RO
.get("vim_net_id")
712 vld
["name"] = net_RO
.get("vim_name")
713 vld
["status"] = net_RO
.get("status")
714 vld
["status-detailed"] = net_RO
.get("error_msg")
715 vnfr_update
["vld.{}".format(vld_index
)] = vld
719 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
724 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
729 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
734 def _get_ns_config_info(self
, nsr_id
):
736 Generates a mapping between vnf,vdu elements and the N2VC id
737 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
738 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
739 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
740 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
742 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
743 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
745 ns_config_info
= {"osm-config-mapping": mapping
}
746 for vca
in vca_deployed_list
:
747 if not vca
["member-vnf-index"]:
749 if not vca
["vdu_id"]:
750 mapping
[vca
["member-vnf-index"]] = vca
["application"]
754 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
756 ] = vca
["application"]
757 return ns_config_info
759 async def _instantiate_ng_ro(
776 def get_vim_account(vim_account_id
):
778 if vim_account_id
in db_vims
:
779 return db_vims
[vim_account_id
]
780 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
781 db_vims
[vim_account_id
] = db_vim
784 # modify target_vld info with instantiation parameters
785 def parse_vld_instantiation_params(
786 target_vim
, target_vld
, vld_params
, target_sdn
788 if vld_params
.get("ip-profile"):
789 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
792 if vld_params
.get("provider-network"):
793 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
796 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
797 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
800 if vld_params
.get("wimAccountId"):
801 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
802 target_vld
["vim_info"][target_wim
] = {}
803 for param
in ("vim-network-name", "vim-network-id"):
804 if vld_params
.get(param
):
805 if isinstance(vld_params
[param
], dict):
806 for vim
, vim_net
in vld_params
[param
].items():
807 other_target_vim
= "vim:" + vim
809 target_vld
["vim_info"],
810 (other_target_vim
, param
.replace("-", "_")),
813 else: # isinstance str
814 target_vld
["vim_info"][target_vim
][
815 param
.replace("-", "_")
816 ] = vld_params
[param
]
817 if vld_params
.get("common_id"):
818 target_vld
["common_id"] = vld_params
.get("common_id")
820 nslcmop_id
= db_nslcmop
["_id"]
822 "name": db_nsr
["name"],
825 "image": deepcopy(db_nsr
["image"]),
826 "flavor": deepcopy(db_nsr
["flavor"]),
827 "action_id": nslcmop_id
,
828 "cloud_init_content": {},
830 for image
in target
["image"]:
831 image
["vim_info"] = {}
832 for flavor
in target
["flavor"]:
833 flavor
["vim_info"] = {}
835 if db_nslcmop
.get("lcmOperationType") != "instantiate":
836 # get parameters of instantiation:
837 db_nslcmop_instantiate
= self
.db
.get_list(
840 "nsInstanceId": db_nslcmop
["nsInstanceId"],
841 "lcmOperationType": "instantiate",
844 ns_params
= db_nslcmop_instantiate
.get("operationParams")
846 ns_params
= db_nslcmop
.get("operationParams")
847 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
848 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
851 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
852 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
856 "mgmt-network": vld
.get("mgmt-network", False),
857 "type": vld
.get("type"),
860 "vim_network_name": vld
.get("vim-network-name"),
861 "vim_account_id": ns_params
["vimAccountId"],
865 # check if this network needs SDN assist
866 if vld
.get("pci-interfaces"):
867 db_vim
= get_vim_account(ns_params
["vimAccountId"])
868 sdnc_id
= db_vim
["config"].get("sdn-controller")
870 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
871 target_sdn
= "sdn:{}".format(sdnc_id
)
872 target_vld
["vim_info"][target_sdn
] = {
874 "target_vim": target_vim
,
876 "type": vld
.get("type"),
879 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
880 for nsd_vnf_profile
in nsd_vnf_profiles
:
881 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
882 if cp
["virtual-link-profile-id"] == vld
["id"]:
884 "member_vnf:{}.{}".format(
885 cp
["constituent-cpd-id"][0][
886 "constituent-base-element-id"
888 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
890 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
892 # check at nsd descriptor, if there is an ip-profile
894 nsd_vlp
= find_in_list(
895 get_virtual_link_profiles(nsd
),
896 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
901 and nsd_vlp
.get("virtual-link-protocol-data")
902 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
904 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
907 ip_profile_dest_data
= {}
908 if "ip-version" in ip_profile_source_data
:
909 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
912 if "cidr" in ip_profile_source_data
:
913 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
916 if "gateway-ip" in ip_profile_source_data
:
917 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
920 if "dhcp-enabled" in ip_profile_source_data
:
921 ip_profile_dest_data
["dhcp-params"] = {
922 "enabled": ip_profile_source_data
["dhcp-enabled"]
924 vld_params
["ip-profile"] = ip_profile_dest_data
926 # update vld_params with instantiation params
927 vld_instantiation_params
= find_in_list(
928 get_iterable(ns_params
, "vld"),
929 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
931 if vld_instantiation_params
:
932 vld_params
.update(vld_instantiation_params
)
933 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
934 target
["ns"]["vld"].append(target_vld
)
936 for vnfr
in db_vnfrs
.values():
938 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
940 vnf_params
= find_in_list(
941 get_iterable(ns_params
, "vnf"),
942 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
944 target_vnf
= deepcopy(vnfr
)
945 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
946 for vld
in target_vnf
.get("vld", ()):
947 # check if connected to a ns.vld, to fill target'
948 vnf_cp
= find_in_list(
949 vnfd
.get("int-virtual-link-desc", ()),
950 lambda cpd
: cpd
.get("id") == vld
["id"],
953 ns_cp
= "member_vnf:{}.{}".format(
954 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
956 if cp2target
.get(ns_cp
):
957 vld
["target"] = cp2target
[ns_cp
]
960 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
962 # check if this network needs SDN assist
964 if vld
.get("pci-interfaces"):
965 db_vim
= get_vim_account(vnfr
["vim-account-id"])
966 sdnc_id
= db_vim
["config"].get("sdn-controller")
968 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
969 target_sdn
= "sdn:{}".format(sdnc_id
)
970 vld
["vim_info"][target_sdn
] = {
972 "target_vim": target_vim
,
974 "type": vld
.get("type"),
977 # check at vnfd descriptor, if there is an ip-profile
979 vnfd_vlp
= find_in_list(
980 get_virtual_link_profiles(vnfd
),
981 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
985 and vnfd_vlp
.get("virtual-link-protocol-data")
986 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
988 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
991 ip_profile_dest_data
= {}
992 if "ip-version" in ip_profile_source_data
:
993 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
996 if "cidr" in ip_profile_source_data
:
997 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1000 if "gateway-ip" in ip_profile_source_data
:
1001 ip_profile_dest_data
[
1003 ] = ip_profile_source_data
["gateway-ip"]
1004 if "dhcp-enabled" in ip_profile_source_data
:
1005 ip_profile_dest_data
["dhcp-params"] = {
1006 "enabled": ip_profile_source_data
["dhcp-enabled"]
1009 vld_params
["ip-profile"] = ip_profile_dest_data
1010 # update vld_params with instantiation params
1012 vld_instantiation_params
= find_in_list(
1013 get_iterable(vnf_params
, "internal-vld"),
1014 lambda i_vld
: i_vld
["name"] == vld
["id"],
1016 if vld_instantiation_params
:
1017 vld_params
.update(vld_instantiation_params
)
1018 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1021 for vdur
in target_vnf
.get("vdur", ()):
1022 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1023 continue # This vdu must not be created
1024 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1026 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1029 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1030 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1033 and vdu_configuration
.get("config-access")
1034 and vdu_configuration
.get("config-access").get("ssh-access")
1036 vdur
["ssh-keys"] = ssh_keys_all
1037 vdur
["ssh-access-required"] = vdu_configuration
[
1039 ]["ssh-access"]["required"]
1042 and vnf_configuration
.get("config-access")
1043 and vnf_configuration
.get("config-access").get("ssh-access")
1044 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1046 vdur
["ssh-keys"] = ssh_keys_all
1047 vdur
["ssh-access-required"] = vnf_configuration
[
1049 ]["ssh-access"]["required"]
1050 elif ssh_keys_instantiation
and find_in_list(
1051 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1053 vdur
["ssh-keys"] = ssh_keys_instantiation
1055 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1057 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1059 if vdud
.get("cloud-init-file"):
1060 vdur
["cloud-init"] = "{}:file:{}".format(
1061 vnfd
["_id"], vdud
.get("cloud-init-file")
1063 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1064 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1065 base_folder
= vnfd
["_admin"]["storage"]
1066 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1067 base_folder
["folder"],
1068 base_folder
["pkg-dir"],
1069 vdud
.get("cloud-init-file"),
1071 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1072 target
["cloud_init_content"][
1075 elif vdud
.get("cloud-init"):
1076 vdur
["cloud-init"] = "{}:vdu:{}".format(
1077 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1079 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1080 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1083 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1084 deploy_params_vdu
= self
._format
_additional
_params
(
1085 vdur
.get("additionalParams") or {}
1087 deploy_params_vdu
["OSM"] = get_osm_params(
1088 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1090 vdur
["additionalParams"] = deploy_params_vdu
1093 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1094 if target_vim
not in ns_flavor
["vim_info"]:
1095 ns_flavor
["vim_info"][target_vim
] = {}
1098 # in case alternative images are provided we must check if they should be applied
1099 # for the vim_type, modify the vim_type taking into account
1100 ns_image_id
= int(vdur
["ns-image-id"])
1101 if vdur
.get("alt-image-ids"):
1102 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1103 vim_type
= db_vim
["vim_type"]
1104 for alt_image_id
in vdur
.get("alt-image-ids"):
1105 ns_alt_image
= target
["image"][int(alt_image_id
)]
1106 if vim_type
== ns_alt_image
.get("vim-type"):
1107 # must use alternative image
1109 "use alternative image id: {}".format(alt_image_id
)
1111 ns_image_id
= alt_image_id
1112 vdur
["ns-image-id"] = ns_image_id
1114 ns_image
= target
["image"][int(ns_image_id
)]
1115 if target_vim
not in ns_image
["vim_info"]:
1116 ns_image
["vim_info"][target_vim
] = {}
1118 vdur
["vim_info"] = {target_vim
: {}}
1119 # instantiation parameters
1121 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1122 # vdud["id"]), None)
1123 vdur_list
.append(vdur
)
1124 target_vnf
["vdur"] = vdur_list
1125 target
["vnf"].append(target_vnf
)
1127 desc
= await self
.RO
.deploy(nsr_id
, target
)
1128 self
.logger
.debug("RO return > {}".format(desc
))
1129 action_id
= desc
["action_id"]
1130 await self
._wait
_ng
_ro
(
1131 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1136 "_admin.deployed.RO.operational-status": "running",
1137 "detailed-status": " ".join(stage
),
1139 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1140 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1141 self
._write
_op
_status
(nslcmop_id
, stage
)
1143 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1147 async def _wait_ng_ro(
1156 detailed_status_old
= None
1158 start_time
= start_time
or time()
1159 while time() <= start_time
+ timeout
:
1160 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1161 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1162 if desc_status
["status"] == "FAILED":
1163 raise NgRoException(desc_status
["details"])
1164 elif desc_status
["status"] == "BUILD":
1166 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1167 elif desc_status
["status"] == "DONE":
1169 stage
[2] = "Deployed at VIM"
1172 assert False, "ROclient.check_ns_status returns unknown {}".format(
1173 desc_status
["status"]
1175 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1176 detailed_status_old
= stage
[2]
1177 db_nsr_update
["detailed-status"] = " ".join(stage
)
1178 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1179 self
._write
_op
_status
(nslcmop_id
, stage
)
1180 await asyncio
.sleep(15, loop
=self
.loop
)
1181 else: # timeout_ns_deploy
1182 raise NgRoException("Timeout waiting ns to deploy")
1184 async def _terminate_ng_ro(
1185 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1190 start_deploy
= time()
1197 "action_id": nslcmop_id
,
1199 desc
= await self
.RO
.deploy(nsr_id
, target
)
1200 action_id
= desc
["action_id"]
1201 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1202 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1205 + "ns terminate action at RO. action_id={}".format(action_id
)
1209 delete_timeout
= 20 * 60 # 20 minutes
1210 await self
._wait
_ng
_ro
(
1211 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1214 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1215 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1217 await self
.RO
.delete(nsr_id
)
1218 except Exception as e
:
1219 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1220 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1221 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1222 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1224 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1226 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1227 failed_detail
.append("delete conflict: {}".format(e
))
1230 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1233 failed_detail
.append("delete error: {}".format(e
))
1236 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1240 stage
[2] = "Error deleting from VIM"
1242 stage
[2] = "Deleted from VIM"
1243 db_nsr_update
["detailed-status"] = " ".join(stage
)
1244 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1245 self
._write
_op
_status
(nslcmop_id
, stage
)
1248 raise LcmException("; ".join(failed_detail
))
1251 async def instantiate_RO(
1265 :param logging_text: preffix text to use at logging
1266 :param nsr_id: nsr identity
1267 :param nsd: database content of ns descriptor
1268 :param db_nsr: database content of ns record
1269 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1271 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1272 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1273 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1274 :return: None or exception
1277 start_deploy
= time()
1278 ns_params
= db_nslcmop
.get("operationParams")
1279 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1280 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1282 timeout_ns_deploy
= self
.timeout
.get(
1283 "ns_deploy", self
.timeout_ns_deploy
1286 # Check for and optionally request placement optimization. Database will be updated if placement activated
1287 stage
[2] = "Waiting for Placement."
1288 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1289 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1290 for vnfr
in db_vnfrs
.values():
1291 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1294 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1296 return await self
._instantiate
_ng
_ro
(
1309 except Exception as e
:
1310 stage
[2] = "ERROR deploying at VIM"
1311 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1313 "Error deploying at VIM {}".format(e
),
1314 exc_info
=not isinstance(
1317 ROclient
.ROClientException
,
1326 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1328 Wait for kdu to be up, get ip address
1329 :param logging_text: prefix use for logging
1336 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1339 while nb_tries
< 360:
1340 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1344 for x
in get_iterable(db_vnfr
, "kdur")
1345 if x
.get("kdu-name") == kdu_name
1351 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1353 if kdur
.get("status"):
1354 if kdur
["status"] in ("READY", "ENABLED"):
1355 return kdur
.get("ip-address")
1358 "target KDU={} is in error state".format(kdu_name
)
1361 await asyncio
.sleep(10, loop
=self
.loop
)
1363 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1365 async def wait_vm_up_insert_key_ro(
1366 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1369 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1370 :param logging_text: prefix use for logging
1375 :param pub_key: public ssh key to inject, None to skip
1376 :param user: user to apply the public ssh key
1380 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1384 target_vdu_id
= None
1390 if ro_retries
>= 360: # 1 hour
1392 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1395 await asyncio
.sleep(10, loop
=self
.loop
)
1398 if not target_vdu_id
:
1399 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1401 if not vdu_id
: # for the VNF case
1402 if db_vnfr
.get("status") == "ERROR":
1404 "Cannot inject ssh-key because target VNF is in error state"
1406 ip_address
= db_vnfr
.get("ip-address")
1412 for x
in get_iterable(db_vnfr
, "vdur")
1413 if x
.get("ip-address") == ip_address
1421 for x
in get_iterable(db_vnfr
, "vdur")
1422 if x
.get("vdu-id-ref") == vdu_id
1423 and x
.get("count-index") == vdu_index
1429 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1430 ): # If only one, this should be the target vdu
1431 vdur
= db_vnfr
["vdur"][0]
1434 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1435 vnfr_id
, vdu_id
, vdu_index
1438 # New generation RO stores information at "vim_info"
1441 if vdur
.get("vim_info"):
1443 t
for t
in vdur
["vim_info"]
1444 ) # there should be only one key
1445 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1447 vdur
.get("pdu-type")
1448 or vdur
.get("status") == "ACTIVE"
1449 or ng_ro_status
== "ACTIVE"
1451 ip_address
= vdur
.get("ip-address")
1454 target_vdu_id
= vdur
["vdu-id-ref"]
1455 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1457 "Cannot inject ssh-key because target VM is in error state"
1460 if not target_vdu_id
:
1463 # inject public key into machine
1464 if pub_key
and user
:
1465 self
.logger
.debug(logging_text
+ "Inserting RO key")
1466 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1467 if vdur
.get("pdu-type"):
1468 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1471 ro_vm_id
= "{}-{}".format(
1472 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1473 ) # TODO add vdu_index
1477 "action": "inject_ssh_key",
1481 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1483 desc
= await self
.RO
.deploy(nsr_id
, target
)
1484 action_id
= desc
["action_id"]
1485 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1488 # wait until NS is deployed at RO
1490 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1491 ro_nsr_id
= deep_get(
1492 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1496 result_dict
= await self
.RO
.create_action(
1498 item_id_name
=ro_nsr_id
,
1500 "add_public_key": pub_key
,
1505 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1506 if not result_dict
or not isinstance(result_dict
, dict):
1508 "Unknown response from RO when injecting key"
1510 for result
in result_dict
.values():
1511 if result
.get("vim_result") == 200:
1514 raise ROclient
.ROClientException(
1515 "error injecting key: {}".format(
1516 result
.get("description")
1520 except NgRoException
as e
:
1522 "Reaching max tries injecting key. Error: {}".format(e
)
1524 except ROclient
.ROClientException
as e
:
1528 + "error injecting key: {}. Retrying until {} seconds".format(
1535 "Reaching max tries injecting key. Error: {}".format(e
)
1542 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1544 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1546 my_vca
= vca_deployed_list
[vca_index
]
1547 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1548 # vdu or kdu: no dependencies
1552 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1553 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1554 configuration_status_list
= db_nsr
["configurationStatus"]
1555 for index
, vca_deployed
in enumerate(configuration_status_list
):
1556 if index
== vca_index
:
1559 if not my_vca
.get("member-vnf-index") or (
1560 vca_deployed
.get("member-vnf-index")
1561 == my_vca
.get("member-vnf-index")
1563 internal_status
= configuration_status_list
[index
].get("status")
1564 if internal_status
== "READY":
1566 elif internal_status
== "BROKEN":
1568 "Configuration aborted because dependent charm/s has failed"
1573 # no dependencies, return
1575 await asyncio
.sleep(10)
1578 raise LcmException("Configuration aborted because dependent charm/s timeout")
1580 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1583 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1585 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1586 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1589 async def instantiate_N2VC(
1606 ee_config_descriptor
,
1608 nsr_id
= db_nsr
["_id"]
1609 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1610 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1611 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1612 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1614 "collection": "nsrs",
1615 "filter": {"_id": nsr_id
},
1616 "path": db_update_entry
,
1622 element_under_configuration
= nsr_id
1626 vnfr_id
= db_vnfr
["_id"]
1627 osm_config
["osm"]["vnf_id"] = vnfr_id
1629 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1631 if vca_type
== "native_charm":
1634 index_number
= vdu_index
or 0
1637 element_type
= "VNF"
1638 element_under_configuration
= vnfr_id
1639 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1641 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1642 element_type
= "VDU"
1643 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1644 osm_config
["osm"]["vdu_id"] = vdu_id
1646 namespace
+= ".{}".format(kdu_name
)
1647 element_type
= "KDU"
1648 element_under_configuration
= kdu_name
1649 osm_config
["osm"]["kdu_name"] = kdu_name
1652 artifact_path
= "{}/{}/{}/{}".format(
1653 base_folder
["folder"],
1654 base_folder
["pkg-dir"],
1656 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1661 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1663 # get initial_config_primitive_list that applies to this element
1664 initial_config_primitive_list
= config_descriptor
.get(
1665 "initial-config-primitive"
1669 "Initial config primitive list > {}".format(
1670 initial_config_primitive_list
1674 # add config if not present for NS charm
1675 ee_descriptor_id
= ee_config_descriptor
.get("id")
1676 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1677 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1678 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1682 "Initial config primitive list #2 > {}".format(
1683 initial_config_primitive_list
1686 # n2vc_redesign STEP 3.1
1687 # find old ee_id if exists
1688 ee_id
= vca_deployed
.get("ee_id")
1690 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1691 # create or register execution environment in VCA
1692 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1694 self
._write
_configuration
_status
(
1696 vca_index
=vca_index
,
1698 element_under_configuration
=element_under_configuration
,
1699 element_type
=element_type
,
1702 step
= "create execution environment"
1703 self
.logger
.debug(logging_text
+ step
)
1707 if vca_type
== "k8s_proxy_charm":
1708 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1709 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1710 namespace
=namespace
,
1711 artifact_path
=artifact_path
,
1715 elif vca_type
== "helm" or vca_type
== "helm-v3":
1716 ee_id
, credentials
= await self
.vca_map
[
1718 ].create_execution_environment(
1719 namespace
=namespace
,
1723 artifact_path
=artifact_path
,
1727 ee_id
, credentials
= await self
.vca_map
[
1729 ].create_execution_environment(
1730 namespace
=namespace
,
1736 elif vca_type
== "native_charm":
1737 step
= "Waiting to VM being up and getting IP address"
1738 self
.logger
.debug(logging_text
+ step
)
1739 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1748 credentials
= {"hostname": rw_mgmt_ip
}
1750 username
= deep_get(
1751 config_descriptor
, ("config-access", "ssh-access", "default-user")
1753 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1754 # merged. Meanwhile let's get username from initial-config-primitive
1755 if not username
and initial_config_primitive_list
:
1756 for config_primitive
in initial_config_primitive_list
:
1757 for param
in config_primitive
.get("parameter", ()):
1758 if param
["name"] == "ssh-username":
1759 username
= param
["value"]
1763 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1764 "'config-access.ssh-access.default-user'"
1766 credentials
["username"] = username
1767 # n2vc_redesign STEP 3.2
1769 self
._write
_configuration
_status
(
1771 vca_index
=vca_index
,
1772 status
="REGISTERING",
1773 element_under_configuration
=element_under_configuration
,
1774 element_type
=element_type
,
1777 step
= "register execution environment {}".format(credentials
)
1778 self
.logger
.debug(logging_text
+ step
)
1779 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1780 credentials
=credentials
,
1781 namespace
=namespace
,
1786 # for compatibility with MON/POL modules, the need model and application name at database
1787 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1788 ee_id_parts
= ee_id
.split(".")
1789 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1790 if len(ee_id_parts
) >= 2:
1791 model_name
= ee_id_parts
[0]
1792 application_name
= ee_id_parts
[1]
1793 db_nsr_update
[db_update_entry
+ "model"] = model_name
1794 db_nsr_update
[db_update_entry
+ "application"] = application_name
1796 # n2vc_redesign STEP 3.3
1797 step
= "Install configuration Software"
1799 self
._write
_configuration
_status
(
1801 vca_index
=vca_index
,
1802 status
="INSTALLING SW",
1803 element_under_configuration
=element_under_configuration
,
1804 element_type
=element_type
,
1805 other_update
=db_nsr_update
,
1808 # TODO check if already done
1809 self
.logger
.debug(logging_text
+ step
)
1811 if vca_type
== "native_charm":
1812 config_primitive
= next(
1813 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1816 if config_primitive
:
1817 config
= self
._map
_primitive
_params
(
1818 config_primitive
, {}, deploy_params
1821 if vca_type
== "lxc_proxy_charm":
1822 if element_type
== "NS":
1823 num_units
= db_nsr
.get("config-units") or 1
1824 elif element_type
== "VNF":
1825 num_units
= db_vnfr
.get("config-units") or 1
1826 elif element_type
== "VDU":
1827 for v
in db_vnfr
["vdur"]:
1828 if vdu_id
== v
["vdu-id-ref"]:
1829 num_units
= v
.get("config-units") or 1
1831 if vca_type
!= "k8s_proxy_charm":
1832 await self
.vca_map
[vca_type
].install_configuration_sw(
1834 artifact_path
=artifact_path
,
1837 num_units
=num_units
,
1842 # write in db flag of configuration_sw already installed
1844 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1847 # add relations for this VCA (wait for other peers related with this VCA)
1848 await self
._add
_vca
_relations
(
1849 logging_text
=logging_text
,
1852 vca_index
=vca_index
,
1855 # if SSH access is required, then get execution environment SSH public
1856 # if native charm we have waited already to VM be UP
1857 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1860 # self.logger.debug("get ssh key block")
1862 config_descriptor
, ("config-access", "ssh-access", "required")
1864 # self.logger.debug("ssh key needed")
1865 # Needed to inject a ssh key
1868 ("config-access", "ssh-access", "default-user"),
1870 step
= "Install configuration Software, getting public ssh key"
1871 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1872 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1875 step
= "Insert public key into VM user={} ssh_key={}".format(
1879 # self.logger.debug("no need to get ssh key")
1880 step
= "Waiting to VM being up and getting IP address"
1881 self
.logger
.debug(logging_text
+ step
)
1883 # n2vc_redesign STEP 5.1
1884 # wait for RO (ip-address) Insert pub_key into VM
1887 rw_mgmt_ip
= await self
.wait_kdu_up(
1888 logging_text
, nsr_id
, vnfr_id
, kdu_name
1891 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1901 rw_mgmt_ip
= None # This is for a NS configuration
1903 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1905 # store rw_mgmt_ip in deploy params for later replacement
1906 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1908 # n2vc_redesign STEP 6 Execute initial config primitive
1909 step
= "execute initial config primitive"
1911 # wait for dependent primitives execution (NS -> VNF -> VDU)
1912 if initial_config_primitive_list
:
1913 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1915 # stage, in function of element type: vdu, kdu, vnf or ns
1916 my_vca
= vca_deployed_list
[vca_index
]
1917 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1919 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1920 elif my_vca
.get("member-vnf-index"):
1922 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1925 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1927 self
._write
_configuration
_status
(
1928 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1931 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1933 check_if_terminated_needed
= True
1934 for initial_config_primitive
in initial_config_primitive_list
:
1935 # adding information on the vca_deployed if it is a NS execution environment
1936 if not vca_deployed
["member-vnf-index"]:
1937 deploy_params
["ns_config_info"] = json
.dumps(
1938 self
._get
_ns
_config
_info
(nsr_id
)
1940 # TODO check if already done
1941 primitive_params_
= self
._map
_primitive
_params
(
1942 initial_config_primitive
, {}, deploy_params
1945 step
= "execute primitive '{}' params '{}'".format(
1946 initial_config_primitive
["name"], primitive_params_
1948 self
.logger
.debug(logging_text
+ step
)
1949 await self
.vca_map
[vca_type
].exec_primitive(
1951 primitive_name
=initial_config_primitive
["name"],
1952 params_dict
=primitive_params_
,
1957 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1958 if check_if_terminated_needed
:
1959 if config_descriptor
.get("terminate-config-primitive"):
1961 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
1963 check_if_terminated_needed
= False
1965 # TODO register in database that primitive is done
1967 # STEP 7 Configure metrics
1968 if vca_type
== "helm" or vca_type
== "helm-v3":
1969 prometheus_jobs
= await self
.add_prometheus_metrics(
1971 artifact_path
=artifact_path
,
1972 ee_config_descriptor
=ee_config_descriptor
,
1975 target_ip
=rw_mgmt_ip
,
1981 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
1984 step
= "instantiated at VCA"
1985 self
.logger
.debug(logging_text
+ step
)
1987 self
._write
_configuration
_status
(
1988 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
1991 except Exception as e
: # TODO not use Exception but N2VC exception
1992 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1994 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
1997 "Exception while {} : {}".format(step
, e
), exc_info
=True
1999 self
._write
_configuration
_status
(
2000 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2002 raise LcmException("{} {}".format(step
, e
)) from e
2004 def _write_ns_status(
2008 current_operation
: str,
2009 current_operation_id
: str,
2010 error_description
: str = None,
2011 error_detail
: str = None,
2012 other_update
: dict = None,
2015 Update db_nsr fields.
2018 :param current_operation:
2019 :param current_operation_id:
2020 :param error_description:
2021 :param error_detail:
2022 :param other_update: Other required changes at database if provided, will be cleared
2026 db_dict
= other_update
or {}
2029 ] = current_operation_id
# for backward compatibility
2030 db_dict
["_admin.current-operation"] = current_operation_id
2031 db_dict
["_admin.operation-type"] = (
2032 current_operation
if current_operation
!= "IDLE" else None
2034 db_dict
["currentOperation"] = current_operation
2035 db_dict
["currentOperationID"] = current_operation_id
2036 db_dict
["errorDescription"] = error_description
2037 db_dict
["errorDetail"] = error_detail
2040 db_dict
["nsState"] = ns_state
2041 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2042 except DbException
as e
:
2043 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2045 def _write_op_status(
2049 error_message
: str = None,
2050 queuePosition
: int = 0,
2051 operation_state
: str = None,
2052 other_update
: dict = None,
2055 db_dict
= other_update
or {}
2056 db_dict
["queuePosition"] = queuePosition
2057 if isinstance(stage
, list):
2058 db_dict
["stage"] = stage
[0]
2059 db_dict
["detailed-status"] = " ".join(stage
)
2060 elif stage
is not None:
2061 db_dict
["stage"] = str(stage
)
2063 if error_message
is not None:
2064 db_dict
["errorMessage"] = error_message
2065 if operation_state
is not None:
2066 db_dict
["operationState"] = operation_state
2067 db_dict
["statusEnteredTime"] = time()
2068 self
.update_db_2("nslcmops", op_id
, db_dict
)
2069 except DbException
as e
:
2071 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2074 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2076 nsr_id
= db_nsr
["_id"]
2077 # configurationStatus
2078 config_status
= db_nsr
.get("configurationStatus")
2081 "configurationStatus.{}.status".format(index
): status
2082 for index
, v
in enumerate(config_status
)
2086 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2088 except DbException
as e
:
2090 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2093 def _write_configuration_status(
2098 element_under_configuration
: str = None,
2099 element_type
: str = None,
2100 other_update
: dict = None,
2103 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2104 # .format(vca_index, status))
2107 db_path
= "configurationStatus.{}.".format(vca_index
)
2108 db_dict
= other_update
or {}
2110 db_dict
[db_path
+ "status"] = status
2111 if element_under_configuration
:
2113 db_path
+ "elementUnderConfiguration"
2114 ] = element_under_configuration
2116 db_dict
[db_path
+ "elementType"] = element_type
2117 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2118 except DbException
as e
:
2120 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2121 status
, nsr_id
, vca_index
, e
2125 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2127 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2128 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2129 Database is used because the result can be obtained from a different LCM worker in case of HA.
2130 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2131 :param db_nslcmop: database content of nslcmop
2132 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2133 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2134 computed 'vim-account-id'
2137 nslcmop_id
= db_nslcmop
["_id"]
2138 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2139 if placement_engine
== "PLA":
2141 logging_text
+ "Invoke and wait for placement optimization"
2143 await self
.msg
.aiowrite(
2144 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2146 db_poll_interval
= 5
2147 wait
= db_poll_interval
* 10
2149 while not pla_result
and wait
>= 0:
2150 await asyncio
.sleep(db_poll_interval
)
2151 wait
-= db_poll_interval
2152 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2153 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2157 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2160 for pla_vnf
in pla_result
["vnf"]:
2161 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2162 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2167 {"_id": vnfr
["_id"]},
2168 {"vim-account-id": pla_vnf
["vimAccountId"]},
2171 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2174 def update_nsrs_with_pla_result(self
, params
):
2176 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2178 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2180 except Exception as e
:
2181 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2183 async def instantiate(self
, nsr_id
, nslcmop_id
):
2186 :param nsr_id: ns instance to deploy
2187 :param nslcmop_id: operation to run
2191 # Try to lock HA task here
2192 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2193 if not task_is_locked_by_me
:
2195 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2199 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2200 self
.logger
.debug(logging_text
+ "Enter")
2202 # get all needed from database
2204 # database nsrs record
2207 # database nslcmops record
2210 # update operation on nsrs
2212 # update operation on nslcmops
2213 db_nslcmop_update
= {}
2215 nslcmop_operation_state
= None
2216 db_vnfrs
= {} # vnf's info indexed by member-index
2218 tasks_dict_info
= {} # from task to info text
2222 "Stage 1/5: preparation of the environment.",
2223 "Waiting for previous operations to terminate.",
2226 # ^ stage, step, VIM progress
2228 # wait for any previous tasks in process
2229 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2231 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2232 stage
[1] = "Reading from database."
2233 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2234 db_nsr_update
["detailed-status"] = "creating"
2235 db_nsr_update
["operational-status"] = "init"
2236 self
._write
_ns
_status
(
2238 ns_state
="BUILDING",
2239 current_operation
="INSTANTIATING",
2240 current_operation_id
=nslcmop_id
,
2241 other_update
=db_nsr_update
,
2243 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2245 # read from db: operation
2246 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2247 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2248 ns_params
= db_nslcmop
.get("operationParams")
2249 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2250 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2252 timeout_ns_deploy
= self
.timeout
.get(
2253 "ns_deploy", self
.timeout_ns_deploy
2257 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2258 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2259 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2260 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2261 self
.fs
.sync(db_nsr
["nsd-id"])
2263 # nsr_name = db_nsr["name"] # TODO short-name??
2265 # read from db: vnf's of this ns
2266 stage
[1] = "Getting vnfrs from db."
2267 self
.logger
.debug(logging_text
+ stage
[1])
2268 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2270 # read from db: vnfd's for every vnf
2271 db_vnfds
= [] # every vnfd data
2273 # for each vnf in ns, read vnfd
2274 for vnfr
in db_vnfrs_list
:
2275 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2276 vnfd_id
= vnfr
["vnfd-id"]
2277 vnfd_ref
= vnfr
["vnfd-ref"]
2278 self
.fs
.sync(vnfd_id
)
2280 # if we haven't this vnfd, read it from db
2281 if vnfd_id
not in db_vnfds
:
2283 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2286 self
.logger
.debug(logging_text
+ stage
[1])
2287 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2290 db_vnfds
.append(vnfd
)
2292 # Get or generates the _admin.deployed.VCA list
2293 vca_deployed_list
= None
2294 if db_nsr
["_admin"].get("deployed"):
2295 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2296 if vca_deployed_list
is None:
2297 vca_deployed_list
= []
2298 configuration_status_list
= []
2299 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2300 db_nsr_update
["configurationStatus"] = configuration_status_list
2301 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2302 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2303 elif isinstance(vca_deployed_list
, dict):
2304 # maintain backward compatibility. Change a dict to list at database
2305 vca_deployed_list
= list(vca_deployed_list
.values())
2306 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2307 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2310 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2312 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2313 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2315 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2316 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2317 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2319 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2322 # n2vc_redesign STEP 2 Deploy Network Scenario
2323 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2324 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2326 stage
[1] = "Deploying KDUs."
2327 # self.logger.debug(logging_text + "Before deploy_kdus")
2328 # Call to deploy_kdus in case exists the "vdu:kdu" param
2329 await self
.deploy_kdus(
2330 logging_text
=logging_text
,
2332 nslcmop_id
=nslcmop_id
,
2335 task_instantiation_info
=tasks_dict_info
,
2338 stage
[1] = "Getting VCA public key."
2339 # n2vc_redesign STEP 1 Get VCA public ssh-key
2340 # feature 1429. Add n2vc public key to needed VMs
2341 n2vc_key
= self
.n2vc
.get_public_key()
2342 n2vc_key_list
= [n2vc_key
]
2343 if self
.vca_config
.get("public_key"):
2344 n2vc_key_list
.append(self
.vca_config
["public_key"])
2346 stage
[1] = "Deploying NS at VIM."
2347 task_ro
= asyncio
.ensure_future(
2348 self
.instantiate_RO(
2349 logging_text
=logging_text
,
2353 db_nslcmop
=db_nslcmop
,
2356 n2vc_key_list
=n2vc_key_list
,
2360 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2361 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2363 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2364 stage
[1] = "Deploying Execution Environments."
2365 self
.logger
.debug(logging_text
+ stage
[1])
2367 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2368 for vnf_profile
in get_vnf_profiles(nsd
):
2369 vnfd_id
= vnf_profile
["vnfd-id"]
2370 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2371 member_vnf_index
= str(vnf_profile
["id"])
2372 db_vnfr
= db_vnfrs
[member_vnf_index
]
2373 base_folder
= vnfd
["_admin"]["storage"]
2379 # Get additional parameters
2380 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2381 if db_vnfr
.get("additionalParamsForVnf"):
2382 deploy_params
.update(
2383 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2386 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2387 if descriptor_config
:
2389 logging_text
=logging_text
2390 + "member_vnf_index={} ".format(member_vnf_index
),
2393 nslcmop_id
=nslcmop_id
,
2399 member_vnf_index
=member_vnf_index
,
2400 vdu_index
=vdu_index
,
2402 deploy_params
=deploy_params
,
2403 descriptor_config
=descriptor_config
,
2404 base_folder
=base_folder
,
2405 task_instantiation_info
=tasks_dict_info
,
2409 # Deploy charms for each VDU that supports one.
2410 for vdud
in get_vdu_list(vnfd
):
2412 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2413 vdur
= find_in_list(
2414 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2417 if vdur
.get("additionalParams"):
2418 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2420 deploy_params_vdu
= deploy_params
2421 deploy_params_vdu
["OSM"] = get_osm_params(
2422 db_vnfr
, vdu_id
, vdu_count_index
=0
2424 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2426 self
.logger
.debug("VDUD > {}".format(vdud
))
2428 "Descriptor config > {}".format(descriptor_config
)
2430 if descriptor_config
:
2433 for vdu_index
in range(vdud_count
):
2434 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2436 logging_text
=logging_text
2437 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2438 member_vnf_index
, vdu_id
, vdu_index
2442 nslcmop_id
=nslcmop_id
,
2448 member_vnf_index
=member_vnf_index
,
2449 vdu_index
=vdu_index
,
2451 deploy_params
=deploy_params_vdu
,
2452 descriptor_config
=descriptor_config
,
2453 base_folder
=base_folder
,
2454 task_instantiation_info
=tasks_dict_info
,
2457 for kdud
in get_kdu_list(vnfd
):
2458 kdu_name
= kdud
["name"]
2459 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2460 if descriptor_config
:
2465 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2467 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2468 if kdur
.get("additionalParams"):
2469 deploy_params_kdu
= parse_yaml_strings(
2470 kdur
["additionalParams"]
2474 logging_text
=logging_text
,
2477 nslcmop_id
=nslcmop_id
,
2483 member_vnf_index
=member_vnf_index
,
2484 vdu_index
=vdu_index
,
2486 deploy_params
=deploy_params_kdu
,
2487 descriptor_config
=descriptor_config
,
2488 base_folder
=base_folder
,
2489 task_instantiation_info
=tasks_dict_info
,
2493 # Check if this NS has a charm configuration
2494 descriptor_config
= nsd
.get("ns-configuration")
2495 if descriptor_config
and descriptor_config
.get("juju"):
2498 member_vnf_index
= None
2504 # Get additional parameters
2505 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2506 if db_nsr
.get("additionalParamsForNs"):
2507 deploy_params
.update(
2508 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2510 base_folder
= nsd
["_admin"]["storage"]
2512 logging_text
=logging_text
,
2515 nslcmop_id
=nslcmop_id
,
2521 member_vnf_index
=member_vnf_index
,
2522 vdu_index
=vdu_index
,
2524 deploy_params
=deploy_params
,
2525 descriptor_config
=descriptor_config
,
2526 base_folder
=base_folder
,
2527 task_instantiation_info
=tasks_dict_info
,
2531 # rest of staff will be done at finally
2534 ROclient
.ROClientException
,
2540 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2543 except asyncio
.CancelledError
:
2545 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2547 exc
= "Operation was cancelled"
2548 except Exception as e
:
2549 exc
= traceback
.format_exc()
2550 self
.logger
.critical(
2551 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2556 error_list
.append(str(exc
))
2558 # wait for pending tasks
2560 stage
[1] = "Waiting for instantiate pending tasks."
2561 self
.logger
.debug(logging_text
+ stage
[1])
2562 error_list
+= await self
._wait
_for
_tasks
(
2570 stage
[1] = stage
[2] = ""
2571 except asyncio
.CancelledError
:
2572 error_list
.append("Cancelled")
2573 # TODO cancel all tasks
2574 except Exception as exc
:
2575 error_list
.append(str(exc
))
2577 # update operation-status
2578 db_nsr_update
["operational-status"] = "running"
2579 # let's begin with VCA 'configured' status (later we can change it)
2580 db_nsr_update
["config-status"] = "configured"
2581 for task
, task_name
in tasks_dict_info
.items():
2582 if not task
.done() or task
.cancelled() or task
.exception():
2583 if task_name
.startswith(self
.task_name_deploy_vca
):
2584 # A N2VC task is pending
2585 db_nsr_update
["config-status"] = "failed"
2587 # RO or KDU task is pending
2588 db_nsr_update
["operational-status"] = "failed"
2590 # update status at database
2592 error_detail
= ". ".join(error_list
)
2593 self
.logger
.error(logging_text
+ error_detail
)
2594 error_description_nslcmop
= "{} Detail: {}".format(
2595 stage
[0], error_detail
2597 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2598 nslcmop_id
, stage
[0]
2601 db_nsr_update
["detailed-status"] = (
2602 error_description_nsr
+ " Detail: " + error_detail
2604 db_nslcmop_update
["detailed-status"] = error_detail
2605 nslcmop_operation_state
= "FAILED"
2609 error_description_nsr
= error_description_nslcmop
= None
2611 db_nsr_update
["detailed-status"] = "Done"
2612 db_nslcmop_update
["detailed-status"] = "Done"
2613 nslcmop_operation_state
= "COMPLETED"
2616 self
._write
_ns
_status
(
2619 current_operation
="IDLE",
2620 current_operation_id
=None,
2621 error_description
=error_description_nsr
,
2622 error_detail
=error_detail
,
2623 other_update
=db_nsr_update
,
2625 self
._write
_op
_status
(
2628 error_message
=error_description_nslcmop
,
2629 operation_state
=nslcmop_operation_state
,
2630 other_update
=db_nslcmop_update
,
2633 if nslcmop_operation_state
:
2635 await self
.msg
.aiowrite(
2640 "nslcmop_id": nslcmop_id
,
2641 "operationState": nslcmop_operation_state
,
2645 except Exception as e
:
2647 logging_text
+ "kafka_write notification Exception {}".format(e
)
2650 self
.logger
.debug(logging_text
+ "Exit")
2651 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2653 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2654 if vnfd_id
not in cached_vnfds
:
2655 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2656 return cached_vnfds
[vnfd_id
]
2658 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2659 if vnf_profile_id
not in cached_vnfrs
:
2660 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2663 "member-vnf-index-ref": vnf_profile_id
,
2664 "nsr-id-ref": nsr_id
,
2667 return cached_vnfrs
[vnf_profile_id
]
2669 def _is_deployed_vca_in_relation(
2670 self
, vca
: DeployedVCA
, relation
: Relation
2673 for endpoint
in (relation
.provider
, relation
.requirer
):
2674 if endpoint
["kdu-resource-profile-id"]:
2677 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2678 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2679 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2685 def _update_ee_relation_data_with_implicit_data(
2686 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2688 ee_relation_data
= safe_get_ee_relation(
2689 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2691 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2692 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2693 "execution-environment-ref"
2695 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2696 vnfd_id
= vnf_profile
["vnfd-id"]
2697 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2700 if ee_relation_level
== EELevel
.VNF
2701 else ee_relation_data
["vdu-profile-id"]
2703 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2706 f
"not execution environments found for ee_relation {ee_relation_data}"
2708 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2709 return ee_relation_data
2711 def _get_ns_relations(
2714 nsd
: Dict
[str, Any
],
2716 cached_vnfds
: Dict
[str, Any
],
2719 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2720 for r
in db_ns_relations
:
2721 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2722 nsr_id
, nsd
, r
["provider"], cached_vnfds
2724 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2725 nsr_id
, nsd
, r
["requirer"], cached_vnfds
2727 provider
= EERelation(relation_provider
)
2728 requirer
= EERelation(relation_requirer
)
2729 relation
= Relation(r
["name"], provider
, requirer
)
2730 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2732 relations
.append(relation
)
2735 def _get_vnf_relations(
2738 nsd
: Dict
[str, Any
],
2740 cached_vnfds
: Dict
[str, Any
],
2743 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2744 vnf_profile_id
= vnf_profile
["id"]
2745 vnfd_id
= vnf_profile
["vnfd-id"]
2746 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2747 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2748 for r
in db_vnf_relations
:
2749 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2750 nsr_id
, nsd
, r
["provider"], cached_vnfds
, vnf_profile_id
=vnf_profile_id
2752 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2753 nsr_id
, nsd
, r
["requirer"], cached_vnfds
, vnf_profile_id
=vnf_profile_id
2755 provider
= EERelation(relation_provider
)
2756 requirer
= EERelation(relation_requirer
)
2757 relation
= Relation(r
["name"], provider
, requirer
)
2758 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2760 relations
.append(relation
)
2763 def _get_kdu_resource_data(
2765 ee_relation
: EERelation
,
2766 db_nsr
: Dict
[str, Any
],
2767 cached_vnfds
: Dict
[str, Any
],
2768 ) -> DeployedK8sResource
:
2769 nsd
= get_nsd(db_nsr
)
2770 vnf_profiles
= get_vnf_profiles(nsd
)
2771 vnfd_id
= find_in_list(
2773 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2775 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2776 kdu_resource_profile
= get_kdu_resource_profile(
2777 db_vnfd
, ee_relation
.kdu_resource_profile_id
2779 kdu_name
= kdu_resource_profile
["kdu-name"]
2780 deployed_kdu
, _
= get_deployed_kdu(
2781 db_nsr
.get("_admin", ()).get("deployed", ()),
2783 ee_relation
.vnf_profile_id
,
2785 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2788 def _get_deployed_component(
2790 ee_relation
: EERelation
,
2791 db_nsr
: Dict
[str, Any
],
2792 cached_vnfds
: Dict
[str, Any
],
2793 ) -> DeployedComponent
:
2794 nsr_id
= db_nsr
["_id"]
2795 deployed_component
= None
2796 ee_level
= EELevel
.get_level(ee_relation
)
2797 if ee_level
== EELevel
.NS
:
2798 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2800 deployed_component
= DeployedVCA(nsr_id
, vca
)
2801 elif ee_level
== EELevel
.VNF
:
2802 vca
= get_deployed_vca(
2806 "member-vnf-index": ee_relation
.vnf_profile_id
,
2807 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2811 deployed_component
= DeployedVCA(nsr_id
, vca
)
2812 elif ee_level
== EELevel
.VDU
:
2813 vca
= get_deployed_vca(
2816 "vdu_id": ee_relation
.vdu_profile_id
,
2817 "member-vnf-index": ee_relation
.vnf_profile_id
,
2818 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2822 deployed_component
= DeployedVCA(nsr_id
, vca
)
2823 elif ee_level
== EELevel
.KDU
:
2824 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2825 ee_relation
, db_nsr
, cached_vnfds
2827 if kdu_resource_data
:
2828 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2829 return deployed_component
2831 async def _add_relation(
2835 db_nsr
: Dict
[str, Any
],
2836 cached_vnfds
: Dict
[str, Any
],
2837 cached_vnfrs
: Dict
[str, Any
],
2839 deployed_provider
= self
._get
_deployed
_component
(
2840 relation
.provider
, db_nsr
, cached_vnfds
2842 deployed_requirer
= self
._get
_deployed
_component
(
2843 relation
.requirer
, db_nsr
, cached_vnfds
2847 and deployed_requirer
2848 and deployed_provider
.config_sw_installed
2849 and deployed_requirer
.config_sw_installed
2851 provider_db_vnfr
= (
2853 relation
.provider
.nsr_id
,
2854 relation
.provider
.vnf_profile_id
,
2857 if relation
.provider
.vnf_profile_id
2860 requirer_db_vnfr
= (
2862 relation
.requirer
.nsr_id
,
2863 relation
.requirer
.vnf_profile_id
,
2866 if relation
.requirer
.vnf_profile_id
2869 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
2870 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
2871 provider_relation_endpoint
= RelationEndpoint(
2872 deployed_provider
.ee_id
,
2874 relation
.provider
.endpoint
,
2876 requirer_relation_endpoint
= RelationEndpoint(
2877 deployed_requirer
.ee_id
,
2879 relation
.requirer
.endpoint
,
2881 await self
.vca_map
[vca_type
].add_relation(
2882 provider
=provider_relation_endpoint
,
2883 requirer
=requirer_relation_endpoint
,
2885 # remove entry from relations list
2889 async def _add_vca_relations(
2895 timeout
: int = 3600,
2899 # 1. find all relations for this VCA
2900 # 2. wait for other peers related
2904 # STEP 1: find all relations for this VCA
2907 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2908 nsd
= get_nsd(db_nsr
)
2911 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
2912 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
2917 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
2918 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
2920 # if no relations, terminate
2922 self
.logger
.debug(logging_text
+ " No relations")
2925 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
2932 if now
- start
>= timeout
:
2933 self
.logger
.error(logging_text
+ " : timeout adding relations")
2936 # reload nsr from database (we need to update record: _admin.deployed.VCA)
2937 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2939 # for each relation, find the VCA's related
2940 for relation
in relations
.copy():
2941 added
= await self
._add
_relation
(
2949 relations
.remove(relation
)
2952 self
.logger
.debug("Relations added")
2954 await asyncio
.sleep(5.0)
2958 except Exception as e
:
2959 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2962 async def _install_kdu(
2970 k8s_instance_info
: dict,
2971 k8params
: dict = None,
2977 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2980 "collection": "nsrs",
2981 "filter": {"_id": nsr_id
},
2982 "path": nsr_db_path
,
2985 if k8s_instance_info
.get("kdu-deployment-name"):
2986 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2988 kdu_instance
= self
.k8scluster_map
[
2990 ].generate_kdu_instance_name(
2991 db_dict
=db_dict_install
,
2992 kdu_model
=k8s_instance_info
["kdu-model"],
2993 kdu_name
=k8s_instance_info
["kdu-name"],
2996 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2998 await self
.k8scluster_map
[k8sclustertype
].install(
2999 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3000 kdu_model
=k8s_instance_info
["kdu-model"],
3003 db_dict
=db_dict_install
,
3005 kdu_name
=k8s_instance_info
["kdu-name"],
3006 namespace
=k8s_instance_info
["namespace"],
3007 kdu_instance
=kdu_instance
,
3011 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3014 # Obtain services to obtain management service ip
3015 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3016 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3017 kdu_instance
=kdu_instance
,
3018 namespace
=k8s_instance_info
["namespace"],
3021 # Obtain management service info (if exists)
3022 vnfr_update_dict
= {}
3023 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3025 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3030 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3033 for service
in kdud
.get("service", [])
3034 if service
.get("mgmt-service")
3036 for mgmt_service
in mgmt_services
:
3037 for service
in services
:
3038 if service
["name"].startswith(mgmt_service
["name"]):
3039 # Mgmt service found, Obtain service ip
3040 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3041 if isinstance(ip
, list) and len(ip
) == 1:
3045 "kdur.{}.ip-address".format(kdu_index
)
3048 # Check if must update also mgmt ip at the vnf
3049 service_external_cp
= mgmt_service
.get(
3050 "external-connection-point-ref"
3052 if service_external_cp
:
3054 deep_get(vnfd
, ("mgmt-interface", "cp"))
3055 == service_external_cp
3057 vnfr_update_dict
["ip-address"] = ip
3062 "external-connection-point-ref", ""
3064 == service_external_cp
,
3067 "kdur.{}.ip-address".format(kdu_index
)
3072 "Mgmt service name: {} not found".format(
3073 mgmt_service
["name"]
3077 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3078 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3080 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3083 and kdu_config
.get("initial-config-primitive")
3084 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3086 initial_config_primitive_list
= kdu_config
.get(
3087 "initial-config-primitive"
3089 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3091 for initial_config_primitive
in initial_config_primitive_list
:
3092 primitive_params_
= self
._map
_primitive
_params
(
3093 initial_config_primitive
, {}, {}
3096 await asyncio
.wait_for(
3097 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3098 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3099 kdu_instance
=kdu_instance
,
3100 primitive_name
=initial_config_primitive
["name"],
3101 params
=primitive_params_
,
3102 db_dict
=db_dict_install
,
3108 except Exception as e
:
3109 # Prepare update db with error and raise exception
3112 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3116 vnfr_data
.get("_id"),
3117 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3120 # ignore to keep original exception
3122 # reraise original error
3127 async def deploy_kdus(
3134 task_instantiation_info
,
3136 # Launch kdus if present in the descriptor
3138 k8scluster_id_2_uuic
= {
3139 "helm-chart-v3": {},
3144 async def _get_cluster_id(cluster_id
, cluster_type
):
3145 nonlocal k8scluster_id_2_uuic
3146 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3147 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3149 # check if K8scluster is creating and wait look if previous tasks in process
3150 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3151 "k8scluster", cluster_id
3154 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3155 task_name
, cluster_id
3157 self
.logger
.debug(logging_text
+ text
)
3158 await asyncio
.wait(task_dependency
, timeout
=3600)
3160 db_k8scluster
= self
.db
.get_one(
3161 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3163 if not db_k8scluster
:
3164 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3166 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3168 if cluster_type
== "helm-chart-v3":
3170 # backward compatibility for existing clusters that have not been initialized for helm v3
3171 k8s_credentials
= yaml
.safe_dump(
3172 db_k8scluster
.get("credentials")
3174 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3175 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3177 db_k8scluster_update
= {}
3178 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3179 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3180 db_k8scluster_update
[
3181 "_admin.helm-chart-v3.created"
3183 db_k8scluster_update
[
3184 "_admin.helm-chart-v3.operationalState"
3187 "k8sclusters", cluster_id
, db_k8scluster_update
3189 except Exception as e
:
3192 + "error initializing helm-v3 cluster: {}".format(str(e
))
3195 "K8s cluster '{}' has not been initialized for '{}'".format(
3196 cluster_id
, cluster_type
3201 "K8s cluster '{}' has not been initialized for '{}'".format(
3202 cluster_id
, cluster_type
3205 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3208 logging_text
+= "Deploy kdus: "
3211 db_nsr_update
= {"_admin.deployed.K8s": []}
3212 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3215 updated_cluster_list
= []
3216 updated_v3_cluster_list
= []
3218 for vnfr_data
in db_vnfrs
.values():
3219 vca_id
= self
.get_vca_id(vnfr_data
, {})
3220 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3221 # Step 0: Prepare and set parameters
3222 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3223 vnfd_id
= vnfr_data
.get("vnfd-id")
3224 vnfd_with_id
= find_in_list(
3225 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3229 for kdud
in vnfd_with_id
["kdu"]
3230 if kdud
["name"] == kdur
["kdu-name"]
3232 namespace
= kdur
.get("k8s-namespace")
3233 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3234 if kdur
.get("helm-chart"):
3235 kdumodel
= kdur
["helm-chart"]
3236 # Default version: helm3, if helm-version is v2 assign v2
3237 k8sclustertype
= "helm-chart-v3"
3238 self
.logger
.debug("kdur: {}".format(kdur
))
3240 kdur
.get("helm-version")
3241 and kdur
.get("helm-version") == "v2"
3243 k8sclustertype
= "helm-chart"
3244 elif kdur
.get("juju-bundle"):
3245 kdumodel
= kdur
["juju-bundle"]
3246 k8sclustertype
= "juju-bundle"
3249 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3250 "juju-bundle. Maybe an old NBI version is running".format(
3251 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3254 # check if kdumodel is a file and exists
3256 vnfd_with_id
= find_in_list(
3257 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3259 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3260 if storage
and storage
.get(
3262 ): # may be not present if vnfd has not artifacts
3263 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3264 filename
= "{}/{}/{}s/{}".format(
3270 if self
.fs
.file_exists(
3271 filename
, mode
="file"
3272 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3273 kdumodel
= self
.fs
.path
+ filename
3274 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3276 except Exception: # it is not a file
3279 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3280 step
= "Synchronize repos for k8s cluster '{}'".format(
3283 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3287 k8sclustertype
== "helm-chart"
3288 and cluster_uuid
not in updated_cluster_list
3290 k8sclustertype
== "helm-chart-v3"
3291 and cluster_uuid
not in updated_v3_cluster_list
3293 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3294 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3295 cluster_uuid
=cluster_uuid
3298 if del_repo_list
or added_repo_dict
:
3299 if k8sclustertype
== "helm-chart":
3301 "_admin.helm_charts_added." + item
: None
3302 for item
in del_repo_list
3305 "_admin.helm_charts_added." + item
: name
3306 for item
, name
in added_repo_dict
.items()
3308 updated_cluster_list
.append(cluster_uuid
)
3309 elif k8sclustertype
== "helm-chart-v3":
3311 "_admin.helm_charts_v3_added." + item
: None
3312 for item
in del_repo_list
3315 "_admin.helm_charts_v3_added." + item
: name
3316 for item
, name
in added_repo_dict
.items()
3318 updated_v3_cluster_list
.append(cluster_uuid
)
3320 logging_text
+ "repos synchronized on k8s cluster "
3321 "'{}' to_delete: {}, to_add: {}".format(
3322 k8s_cluster_id
, del_repo_list
, added_repo_dict
3327 {"_id": k8s_cluster_id
},
3333 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3334 vnfr_data
["member-vnf-index-ref"],
3338 k8s_instance_info
= {
3339 "kdu-instance": None,
3340 "k8scluster-uuid": cluster_uuid
,
3341 "k8scluster-type": k8sclustertype
,
3342 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3343 "kdu-name": kdur
["kdu-name"],
3344 "kdu-model": kdumodel
,
3345 "namespace": namespace
,
3346 "kdu-deployment-name": kdu_deployment_name
,
3348 db_path
= "_admin.deployed.K8s.{}".format(index
)
3349 db_nsr_update
[db_path
] = k8s_instance_info
3350 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3351 vnfd_with_id
= find_in_list(
3352 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3354 task
= asyncio
.ensure_future(
3363 k8params
=desc_params
,
3368 self
.lcm_tasks
.register(
3372 "instantiate_KDU-{}".format(index
),
3375 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3381 except (LcmException
, asyncio
.CancelledError
):
3383 except Exception as e
:
3384 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3385 if isinstance(e
, (N2VCException
, DbException
)):
3386 self
.logger
.error(logging_text
+ msg
)
3388 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3389 raise LcmException(msg
)
3392 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3411 task_instantiation_info
,
3414 # launch instantiate_N2VC in a asyncio task and register task object
3415 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3416 # if not found, create one entry and update database
3417 # fill db_nsr._admin.deployed.VCA.<index>
3420 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3422 if "execution-environment-list" in descriptor_config
:
3423 ee_list
= descriptor_config
.get("execution-environment-list", [])
3424 elif "juju" in descriptor_config
:
3425 ee_list
= [descriptor_config
] # ns charms
3426 else: # other types as script are not supported
3429 for ee_item
in ee_list
:
3432 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3433 ee_item
.get("juju"), ee_item
.get("helm-chart")
3436 ee_descriptor_id
= ee_item
.get("id")
3437 if ee_item
.get("juju"):
3438 vca_name
= ee_item
["juju"].get("charm")
3441 if ee_item
["juju"].get("charm") is not None
3444 if ee_item
["juju"].get("cloud") == "k8s":
3445 vca_type
= "k8s_proxy_charm"
3446 elif ee_item
["juju"].get("proxy") is False:
3447 vca_type
= "native_charm"
3448 elif ee_item
.get("helm-chart"):
3449 vca_name
= ee_item
["helm-chart"]
3450 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3453 vca_type
= "helm-v3"
3456 logging_text
+ "skipping non juju neither charm configuration"
3461 for vca_index
, vca_deployed
in enumerate(
3462 db_nsr
["_admin"]["deployed"]["VCA"]
3464 if not vca_deployed
:
3467 vca_deployed
.get("member-vnf-index") == member_vnf_index
3468 and vca_deployed
.get("vdu_id") == vdu_id
3469 and vca_deployed
.get("kdu_name") == kdu_name
3470 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3471 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3475 # not found, create one.
3477 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3480 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3482 target
+= "/kdu/{}".format(kdu_name
)
3484 "target_element": target
,
3485 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3486 "member-vnf-index": member_vnf_index
,
3488 "kdu_name": kdu_name
,
3489 "vdu_count_index": vdu_index
,
3490 "operational-status": "init", # TODO revise
3491 "detailed-status": "", # TODO revise
3492 "step": "initial-deploy", # TODO revise
3494 "vdu_name": vdu_name
,
3496 "ee_descriptor_id": ee_descriptor_id
,
3500 # create VCA and configurationStatus in db
3502 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3503 "configurationStatus.{}".format(vca_index
): dict(),
3505 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3507 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3509 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3510 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3511 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3514 task_n2vc
= asyncio
.ensure_future(
3515 self
.instantiate_N2VC(
3516 logging_text
=logging_text
,
3517 vca_index
=vca_index
,
3523 vdu_index
=vdu_index
,
3524 deploy_params
=deploy_params
,
3525 config_descriptor
=descriptor_config
,
3526 base_folder
=base_folder
,
3527 nslcmop_id
=nslcmop_id
,
3531 ee_config_descriptor
=ee_item
,
3534 self
.lcm_tasks
.register(
3538 "instantiate_N2VC-{}".format(vca_index
),
3541 task_instantiation_info
[
3543 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3544 member_vnf_index
or "", vdu_id
or ""
3548 def _create_nslcmop(nsr_id
, operation
, params
):
3550 Creates a ns-lcm-opp content to be stored at database.
3551 :param nsr_id: internal id of the instance
3552 :param operation: instantiate, terminate, scale, action, ...
3553 :param params: user parameters for the operation
3554 :return: dictionary following SOL005 format
3556 # Raise exception if invalid arguments
3557 if not (nsr_id
and operation
and params
):
3559 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3566 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3567 "operationState": "PROCESSING",
3568 "statusEnteredTime": now
,
3569 "nsInstanceId": nsr_id
,
3570 "lcmOperationType": operation
,
3572 "isAutomaticInvocation": False,
3573 "operationParams": params
,
3574 "isCancelPending": False,
3576 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3577 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3582 def _format_additional_params(self
, params
):
3583 params
= params
or {}
3584 for key
, value
in params
.items():
3585 if str(value
).startswith("!!yaml "):
3586 params
[key
] = yaml
.safe_load(value
[7:])
3589 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3590 primitive
= seq
.get("name")
3591 primitive_params
= {}
3593 "member_vnf_index": vnf_index
,
3594 "primitive": primitive
,
3595 "primitive_params": primitive_params
,
3598 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3602 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3603 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3604 if op
.get("operationState") == "COMPLETED":
3605 # b. Skip sub-operation
3606 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3607 return self
.SUBOPERATION_STATUS_SKIP
3609 # c. retry executing sub-operation
3610 # The sub-operation exists, and operationState != 'COMPLETED'
3611 # Update operationState = 'PROCESSING' to indicate a retry.
3612 operationState
= "PROCESSING"
3613 detailed_status
= "In progress"
3614 self
._update
_suboperation
_status
(
3615 db_nslcmop
, op_index
, operationState
, detailed_status
3617 # Return the sub-operation index
3618 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3619 # with arguments extracted from the sub-operation
3622 # Find a sub-operation where all keys in a matching dictionary must match
3623 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3624 def _find_suboperation(self
, db_nslcmop
, match
):
3625 if db_nslcmop
and match
:
3626 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3627 for i
, op
in enumerate(op_list
):
3628 if all(op
.get(k
) == match
[k
] for k
in match
):
3630 return self
.SUBOPERATION_STATUS_NOT_FOUND
3632 # Update status for a sub-operation given its index
3633 def _update_suboperation_status(
3634 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3636 # Update DB for HA tasks
3637 q_filter
= {"_id": db_nslcmop
["_id"]}
3639 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3640 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3643 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3646 # Add sub-operation, return the index of the added sub-operation
3647 # Optionally, set operationState, detailed-status, and operationType
3648 # Status and type are currently set for 'scale' sub-operations:
3649 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3650 # 'detailed-status' : status message
3651 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3652 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3653 def _add_suboperation(
3661 mapped_primitive_params
,
3662 operationState
=None,
3663 detailed_status
=None,
3666 RO_scaling_info
=None,
3669 return self
.SUBOPERATION_STATUS_NOT_FOUND
3670 # Get the "_admin.operations" list, if it exists
3671 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3672 op_list
= db_nslcmop_admin
.get("operations")
3673 # Create or append to the "_admin.operations" list
3675 "member_vnf_index": vnf_index
,
3677 "vdu_count_index": vdu_count_index
,
3678 "primitive": primitive
,
3679 "primitive_params": mapped_primitive_params
,
3682 new_op
["operationState"] = operationState
3684 new_op
["detailed-status"] = detailed_status
3686 new_op
["lcmOperationType"] = operationType
3688 new_op
["RO_nsr_id"] = RO_nsr_id
3690 new_op
["RO_scaling_info"] = RO_scaling_info
3692 # No existing operations, create key 'operations' with current operation as first list element
3693 db_nslcmop_admin
.update({"operations": [new_op
]})
3694 op_list
= db_nslcmop_admin
.get("operations")
3696 # Existing operations, append operation to list
3697 op_list
.append(new_op
)
3699 db_nslcmop_update
= {"_admin.operations": op_list
}
3700 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3701 op_index
= len(op_list
) - 1
3704 # Helper methods for scale() sub-operations
3706 # pre-scale/post-scale:
3707 # Check for 3 different cases:
3708 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3709 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3710 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3711 def _check_or_add_scale_suboperation(
3715 vnf_config_primitive
,
3719 RO_scaling_info
=None,
3721 # Find this sub-operation
3722 if RO_nsr_id
and RO_scaling_info
:
3723 operationType
= "SCALE-RO"
3725 "member_vnf_index": vnf_index
,
3726 "RO_nsr_id": RO_nsr_id
,
3727 "RO_scaling_info": RO_scaling_info
,
3731 "member_vnf_index": vnf_index
,
3732 "primitive": vnf_config_primitive
,
3733 "primitive_params": primitive_params
,
3734 "lcmOperationType": operationType
,
3736 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3737 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3738 # a. New sub-operation
3739 # The sub-operation does not exist, add it.
3740 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3741 # The following parameters are set to None for all kind of scaling:
3743 vdu_count_index
= None
3745 if RO_nsr_id
and RO_scaling_info
:
3746 vnf_config_primitive
= None
3747 primitive_params
= None
3750 RO_scaling_info
= None
3751 # Initial status for sub-operation
3752 operationState
= "PROCESSING"
3753 detailed_status
= "In progress"
3754 # Add sub-operation for pre/post-scaling (zero or more operations)
3755 self
._add
_suboperation
(
3761 vnf_config_primitive
,
3769 return self
.SUBOPERATION_STATUS_NEW
3771 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3772 # or op_index (operationState != 'COMPLETED')
3773 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3775 # Function to return execution_environment id
3777 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3778 # TODO vdu_index_count
3779 for vca
in vca_deployed_list
:
3780 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3783 async def destroy_N2VC(
3791 exec_primitives
=True,
3796 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3797 :param logging_text:
3799 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3800 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3801 :param vca_index: index in the database _admin.deployed.VCA
3802 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3803 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3804 not executed properly
3805 :param scaling_in: True destroys the application, False destroys the model
3806 :return: None or exception
3811 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3812 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3816 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3818 # execute terminate_primitives
3820 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3821 config_descriptor
.get("terminate-config-primitive"),
3822 vca_deployed
.get("ee_descriptor_id"),
3824 vdu_id
= vca_deployed
.get("vdu_id")
3825 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3826 vdu_name
= vca_deployed
.get("vdu_name")
3827 vnf_index
= vca_deployed
.get("member-vnf-index")
3828 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3829 for seq
in terminate_primitives
:
3830 # For each sequence in list, get primitive and call _ns_execute_primitive()
3831 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3832 vnf_index
, seq
.get("name")
3834 self
.logger
.debug(logging_text
+ step
)
3835 # Create the primitive for each sequence, i.e. "primitive": "touch"
3836 primitive
= seq
.get("name")
3837 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3842 self
._add
_suboperation
(
3849 mapped_primitive_params
,
3851 # Sub-operations: Call _ns_execute_primitive() instead of action()
3853 result
, result_detail
= await self
._ns
_execute
_primitive
(
3854 vca_deployed
["ee_id"],
3856 mapped_primitive_params
,
3860 except LcmException
:
3861 # this happens when VCA is not deployed. In this case it is not needed to terminate
3863 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3864 if result
not in result_ok
:
3866 "terminate_primitive {} for vnf_member_index={} fails with "
3867 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3869 # set that this VCA do not need terminated
3870 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3874 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3877 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3878 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3881 await self
.vca_map
[vca_type
].delete_execution_environment(
3882 vca_deployed
["ee_id"],
3883 scaling_in
=scaling_in
,
3888 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3889 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3890 namespace
= "." + db_nsr
["_id"]
3892 await self
.n2vc
.delete_namespace(
3893 namespace
=namespace
,
3894 total_timeout
=self
.timeout_charm_delete
,
3897 except N2VCNotFound
: # already deleted. Skip
3899 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3901 async def _terminate_RO(
3902 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3905 Terminates a deployment from RO
3906 :param logging_text:
3907 :param nsr_deployed: db_nsr._admin.deployed
3910 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3911 this method will update only the index 2, but it will write on database the concatenated content of the list
3916 ro_nsr_id
= ro_delete_action
= None
3917 if nsr_deployed
and nsr_deployed
.get("RO"):
3918 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3919 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3922 stage
[2] = "Deleting ns from VIM."
3923 db_nsr_update
["detailed-status"] = " ".join(stage
)
3924 self
._write
_op
_status
(nslcmop_id
, stage
)
3925 self
.logger
.debug(logging_text
+ stage
[2])
3926 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3927 self
._write
_op
_status
(nslcmop_id
, stage
)
3928 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3929 ro_delete_action
= desc
["action_id"]
3931 "_admin.deployed.RO.nsr_delete_action_id"
3932 ] = ro_delete_action
3933 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3934 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3935 if ro_delete_action
:
3936 # wait until NS is deleted from VIM
3937 stage
[2] = "Waiting ns deleted from VIM."
3938 detailed_status_old
= None
3942 + " RO_id={} ro_delete_action={}".format(
3943 ro_nsr_id
, ro_delete_action
3946 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3947 self
._write
_op
_status
(nslcmop_id
, stage
)
3949 delete_timeout
= 20 * 60 # 20 minutes
3950 while delete_timeout
> 0:
3951 desc
= await self
.RO
.show(
3953 item_id_name
=ro_nsr_id
,
3954 extra_item
="action",
3955 extra_item_id
=ro_delete_action
,
3959 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3961 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3962 if ns_status
== "ERROR":
3963 raise ROclient
.ROClientException(ns_status_info
)
3964 elif ns_status
== "BUILD":
3965 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3966 elif ns_status
== "ACTIVE":
3967 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3968 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3973 ), "ROclient.check_action_status returns unknown {}".format(
3976 if stage
[2] != detailed_status_old
:
3977 detailed_status_old
= stage
[2]
3978 db_nsr_update
["detailed-status"] = " ".join(stage
)
3979 self
._write
_op
_status
(nslcmop_id
, stage
)
3980 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3981 await asyncio
.sleep(5, loop
=self
.loop
)
3983 else: # delete_timeout <= 0:
3984 raise ROclient
.ROClientException(
3985 "Timeout waiting ns deleted from VIM"
3988 except Exception as e
:
3989 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3991 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3993 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3994 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3995 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3997 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4000 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4002 failed_detail
.append("delete conflict: {}".format(e
))
4005 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4008 failed_detail
.append("delete error: {}".format(e
))
4010 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4014 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4015 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4017 stage
[2] = "Deleting nsd from RO."
4018 db_nsr_update
["detailed-status"] = " ".join(stage
)
4019 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4020 self
._write
_op
_status
(nslcmop_id
, stage
)
4021 await self
.RO
.delete("nsd", ro_nsd_id
)
4023 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4025 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4026 except Exception as e
:
4028 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4030 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4032 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4035 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4037 failed_detail
.append(
4038 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4040 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4042 failed_detail
.append(
4043 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4045 self
.logger
.error(logging_text
+ failed_detail
[-1])
4047 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4048 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4049 if not vnf_deployed
or not vnf_deployed
["id"]:
4052 ro_vnfd_id
= vnf_deployed
["id"]
4055 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4056 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4058 db_nsr_update
["detailed-status"] = " ".join(stage
)
4059 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4060 self
._write
_op
_status
(nslcmop_id
, stage
)
4061 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4063 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4065 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4066 except Exception as e
:
4068 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4071 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4075 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4078 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4080 failed_detail
.append(
4081 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4083 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4085 failed_detail
.append(
4086 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4088 self
.logger
.error(logging_text
+ failed_detail
[-1])
4091 stage
[2] = "Error deleting from VIM"
4093 stage
[2] = "Deleted from VIM"
4094 db_nsr_update
["detailed-status"] = " ".join(stage
)
4095 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4096 self
._write
_op
_status
(nslcmop_id
, stage
)
4099 raise LcmException("; ".join(failed_detail
))
4101 async def terminate(self
, nsr_id
, nslcmop_id
):
4102 # Try to lock HA task here
4103 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4104 if not task_is_locked_by_me
:
4107 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4108 self
.logger
.debug(logging_text
+ "Enter")
4109 timeout_ns_terminate
= self
.timeout_ns_terminate
4112 operation_params
= None
4114 error_list
= [] # annotates all failed error messages
4115 db_nslcmop_update
= {}
4116 autoremove
= False # autoremove after terminated
4117 tasks_dict_info
= {}
4120 "Stage 1/3: Preparing task.",
4121 "Waiting for previous operations to terminate.",
4124 # ^ contains [stage, step, VIM-status]
4126 # wait for any previous tasks in process
4127 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4129 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4130 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4131 operation_params
= db_nslcmop
.get("operationParams") or {}
4132 if operation_params
.get("timeout_ns_terminate"):
4133 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4134 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4135 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4137 db_nsr_update
["operational-status"] = "terminating"
4138 db_nsr_update
["config-status"] = "terminating"
4139 self
._write
_ns
_status
(
4141 ns_state
="TERMINATING",
4142 current_operation
="TERMINATING",
4143 current_operation_id
=nslcmop_id
,
4144 other_update
=db_nsr_update
,
4146 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4147 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4148 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4151 stage
[1] = "Getting vnf descriptors from db."
4152 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4154 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4156 db_vnfds_from_id
= {}
4157 db_vnfds_from_member_index
= {}
4159 for vnfr
in db_vnfrs_list
:
4160 vnfd_id
= vnfr
["vnfd-id"]
4161 if vnfd_id
not in db_vnfds_from_id
:
4162 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4163 db_vnfds_from_id
[vnfd_id
] = vnfd
4164 db_vnfds_from_member_index
[
4165 vnfr
["member-vnf-index-ref"]
4166 ] = db_vnfds_from_id
[vnfd_id
]
4168 # Destroy individual execution environments when there are terminating primitives.
4169 # Rest of EE will be deleted at once
4170 # TODO - check before calling _destroy_N2VC
4171 # if not operation_params.get("skip_terminate_primitives"):#
4172 # or not vca.get("needed_terminate"):
4173 stage
[0] = "Stage 2/3 execute terminating primitives."
4174 self
.logger
.debug(logging_text
+ stage
[0])
4175 stage
[1] = "Looking execution environment that needs terminate."
4176 self
.logger
.debug(logging_text
+ stage
[1])
4178 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4179 config_descriptor
= None
4180 vca_member_vnf_index
= vca
.get("member-vnf-index")
4181 vca_id
= self
.get_vca_id(
4182 db_vnfrs_dict
.get(vca_member_vnf_index
)
4183 if vca_member_vnf_index
4187 if not vca
or not vca
.get("ee_id"):
4189 if not vca
.get("member-vnf-index"):
4191 config_descriptor
= db_nsr
.get("ns-configuration")
4192 elif vca
.get("vdu_id"):
4193 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4194 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4195 elif vca
.get("kdu_name"):
4196 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4197 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4199 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4200 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4201 vca_type
= vca
.get("type")
4202 exec_terminate_primitives
= not operation_params
.get(
4203 "skip_terminate_primitives"
4204 ) and vca
.get("needed_terminate")
4205 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4206 # pending native charms
4208 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4210 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4211 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4212 task
= asyncio
.ensure_future(
4220 exec_terminate_primitives
,
4224 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4226 # wait for pending tasks of terminate primitives
4230 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4232 error_list
= await self
._wait
_for
_tasks
(
4235 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4239 tasks_dict_info
.clear()
4241 return # raise LcmException("; ".join(error_list))
4243 # remove All execution environments at once
4244 stage
[0] = "Stage 3/3 delete all."
4246 if nsr_deployed
.get("VCA"):
4247 stage
[1] = "Deleting all execution environments."
4248 self
.logger
.debug(logging_text
+ stage
[1])
4249 vca_id
= self
.get_vca_id({}, db_nsr
)
4250 task_delete_ee
= asyncio
.ensure_future(
4252 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4253 timeout
=self
.timeout_charm_delete
,
4256 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4257 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4259 # Delete from k8scluster
4260 stage
[1] = "Deleting KDUs."
4261 self
.logger
.debug(logging_text
+ stage
[1])
4262 # print(nsr_deployed)
4263 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4264 if not kdu
or not kdu
.get("kdu-instance"):
4266 kdu_instance
= kdu
.get("kdu-instance")
4267 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4268 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4269 vca_id
= self
.get_vca_id({}, db_nsr
)
4270 task_delete_kdu_instance
= asyncio
.ensure_future(
4271 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4272 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4273 kdu_instance
=kdu_instance
,
4280 + "Unknown k8s deployment type {}".format(
4281 kdu
.get("k8scluster-type")
4286 task_delete_kdu_instance
4287 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4290 stage
[1] = "Deleting ns from VIM."
4292 task_delete_ro
= asyncio
.ensure_future(
4293 self
._terminate
_ng
_ro
(
4294 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4298 task_delete_ro
= asyncio
.ensure_future(
4300 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4303 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4305 # rest of staff will be done at finally
4308 ROclient
.ROClientException
,
4313 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4315 except asyncio
.CancelledError
:
4317 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4319 exc
= "Operation was cancelled"
4320 except Exception as e
:
4321 exc
= traceback
.format_exc()
4322 self
.logger
.critical(
4323 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4328 error_list
.append(str(exc
))
4330 # wait for pending tasks
4332 stage
[1] = "Waiting for terminate pending tasks."
4333 self
.logger
.debug(logging_text
+ stage
[1])
4334 error_list
+= await self
._wait
_for
_tasks
(
4337 timeout_ns_terminate
,
4341 stage
[1] = stage
[2] = ""
4342 except asyncio
.CancelledError
:
4343 error_list
.append("Cancelled")
4344 # TODO cancell all tasks
4345 except Exception as exc
:
4346 error_list
.append(str(exc
))
4347 # update status at database
4349 error_detail
= "; ".join(error_list
)
4350 # self.logger.error(logging_text + error_detail)
4351 error_description_nslcmop
= "{} Detail: {}".format(
4352 stage
[0], error_detail
4354 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4355 nslcmop_id
, stage
[0]
4358 db_nsr_update
["operational-status"] = "failed"
4359 db_nsr_update
["detailed-status"] = (
4360 error_description_nsr
+ " Detail: " + error_detail
4362 db_nslcmop_update
["detailed-status"] = error_detail
4363 nslcmop_operation_state
= "FAILED"
4367 error_description_nsr
= error_description_nslcmop
= None
4368 ns_state
= "NOT_INSTANTIATED"
4369 db_nsr_update
["operational-status"] = "terminated"
4370 db_nsr_update
["detailed-status"] = "Done"
4371 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4372 db_nslcmop_update
["detailed-status"] = "Done"
4373 nslcmop_operation_state
= "COMPLETED"
4376 self
._write
_ns
_status
(
4379 current_operation
="IDLE",
4380 current_operation_id
=None,
4381 error_description
=error_description_nsr
,
4382 error_detail
=error_detail
,
4383 other_update
=db_nsr_update
,
4385 self
._write
_op
_status
(
4388 error_message
=error_description_nslcmop
,
4389 operation_state
=nslcmop_operation_state
,
4390 other_update
=db_nslcmop_update
,
4392 if ns_state
== "NOT_INSTANTIATED":
4396 {"nsr-id-ref": nsr_id
},
4397 {"_admin.nsState": "NOT_INSTANTIATED"},
4399 except DbException
as e
:
4402 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4406 if operation_params
:
4407 autoremove
= operation_params
.get("autoremove", False)
4408 if nslcmop_operation_state
:
4410 await self
.msg
.aiowrite(
4415 "nslcmop_id": nslcmop_id
,
4416 "operationState": nslcmop_operation_state
,
4417 "autoremove": autoremove
,
4421 except Exception as e
:
4423 logging_text
+ "kafka_write notification Exception {}".format(e
)
4426 self
.logger
.debug(logging_text
+ "Exit")
4427 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4429 async def _wait_for_tasks(
4430 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4433 error_detail_list
= []
4435 pending_tasks
= list(created_tasks_info
.keys())
4436 num_tasks
= len(pending_tasks
)
4438 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4439 self
._write
_op
_status
(nslcmop_id
, stage
)
4440 while pending_tasks
:
4442 _timeout
= timeout
+ time_start
- time()
4443 done
, pending_tasks
= await asyncio
.wait(
4444 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4446 num_done
+= len(done
)
4447 if not done
: # Timeout
4448 for task
in pending_tasks
:
4449 new_error
= created_tasks_info
[task
] + ": Timeout"
4450 error_detail_list
.append(new_error
)
4451 error_list
.append(new_error
)
4454 if task
.cancelled():
4457 exc
= task
.exception()
4459 if isinstance(exc
, asyncio
.TimeoutError
):
4461 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4462 error_list
.append(created_tasks_info
[task
])
4463 error_detail_list
.append(new_error
)
4470 ROclient
.ROClientException
,
4476 self
.logger
.error(logging_text
+ new_error
)
4478 exc_traceback
= "".join(
4479 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4483 + created_tasks_info
[task
]
4489 logging_text
+ created_tasks_info
[task
] + ": Done"
4491 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4493 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4494 if nsr_id
: # update also nsr
4499 "errorDescription": "Error at: " + ", ".join(error_list
),
4500 "errorDetail": ". ".join(error_detail_list
),
4503 self
._write
_op
_status
(nslcmop_id
, stage
)
4504 return error_detail_list
4507 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4509 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4510 The default-value is used. If it is between < > it look for a value at instantiation_params
4511 :param primitive_desc: portion of VNFD/NSD that describes primitive
4512 :param params: Params provided by user
4513 :param instantiation_params: Instantiation params provided by user
4514 :return: a dictionary with the calculated params
4516 calculated_params
= {}
4517 for parameter
in primitive_desc
.get("parameter", ()):
4518 param_name
= parameter
["name"]
4519 if param_name
in params
:
4520 calculated_params
[param_name
] = params
[param_name
]
4521 elif "default-value" in parameter
or "value" in parameter
:
4522 if "value" in parameter
:
4523 calculated_params
[param_name
] = parameter
["value"]
4525 calculated_params
[param_name
] = parameter
["default-value"]
4527 isinstance(calculated_params
[param_name
], str)
4528 and calculated_params
[param_name
].startswith("<")
4529 and calculated_params
[param_name
].endswith(">")
4531 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4532 calculated_params
[param_name
] = instantiation_params
[
4533 calculated_params
[param_name
][1:-1]
4537 "Parameter {} needed to execute primitive {} not provided".format(
4538 calculated_params
[param_name
], primitive_desc
["name"]
4543 "Parameter {} needed to execute primitive {} not provided".format(
4544 param_name
, primitive_desc
["name"]
4548 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4549 calculated_params
[param_name
] = yaml
.safe_dump(
4550 calculated_params
[param_name
], default_flow_style
=True, width
=256
4552 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4554 ].startswith("!!yaml "):
4555 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4556 if parameter
.get("data-type") == "INTEGER":
4558 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4559 except ValueError: # error converting string to int
4561 "Parameter {} of primitive {} must be integer".format(
4562 param_name
, primitive_desc
["name"]
4565 elif parameter
.get("data-type") == "BOOLEAN":
4566 calculated_params
[param_name
] = not (
4567 (str(calculated_params
[param_name
])).lower() == "false"
4570 # add always ns_config_info if primitive name is config
4571 if primitive_desc
["name"] == "config":
4572 if "ns_config_info" in instantiation_params
:
4573 calculated_params
["ns_config_info"] = instantiation_params
[
4576 return calculated_params
4578 def _look_for_deployed_vca(
4585 ee_descriptor_id
=None,
4587 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4588 for vca
in deployed_vca
:
4591 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4594 vdu_count_index
is not None
4595 and vdu_count_index
!= vca
["vdu_count_index"]
4598 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4600 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4604 # vca_deployed not found
4606 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4607 " is not deployed".format(
4616 ee_id
= vca
.get("ee_id")
4618 "type", "lxc_proxy_charm"
4619 ) # default value for backward compatibility - proxy charm
4622 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4623 "execution environment".format(
4624 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4627 return ee_id
, vca_type
4629 async def _ns_execute_primitive(
4635 retries_interval
=30,
4642 if primitive
== "config":
4643 primitive_params
= {"params": primitive_params
}
4645 vca_type
= vca_type
or "lxc_proxy_charm"
4649 output
= await asyncio
.wait_for(
4650 self
.vca_map
[vca_type
].exec_primitive(
4652 primitive_name
=primitive
,
4653 params_dict
=primitive_params
,
4654 progress_timeout
=self
.timeout_progress_primitive
,
4655 total_timeout
=self
.timeout_primitive
,
4660 timeout
=timeout
or self
.timeout_primitive
,
4664 except asyncio
.CancelledError
:
4666 except Exception as e
: # asyncio.TimeoutError
4667 if isinstance(e
, asyncio
.TimeoutError
):
4672 "Error executing action {} on {} -> {}".format(
4677 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4679 return "FAILED", str(e
)
4681 return "COMPLETED", output
4683 except (LcmException
, asyncio
.CancelledError
):
4685 except Exception as e
:
4686 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4688 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4690 Updating the vca_status with latest juju information in nsrs record
4691 :param: nsr_id: Id of the nsr
4692 :param: nslcmop_id: Id of the nslcmop
4696 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4697 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4698 vca_id
= self
.get_vca_id({}, db_nsr
)
4699 if db_nsr
["_admin"]["deployed"]["K8s"]:
4700 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4701 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4702 await self
._on
_update
_k
8s
_db
(
4703 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4706 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4707 table
, filter = "nsrs", {"_id": nsr_id
}
4708 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4709 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4711 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4712 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4714 async def action(self
, nsr_id
, nslcmop_id
):
4715 # Try to lock HA task here
4716 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4717 if not task_is_locked_by_me
:
4720 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4721 self
.logger
.debug(logging_text
+ "Enter")
4722 # get all needed from database
4726 db_nslcmop_update
= {}
4727 nslcmop_operation_state
= None
4728 error_description_nslcmop
= None
4731 # wait for any previous tasks in process
4732 step
= "Waiting for previous operations to terminate"
4733 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4735 self
._write
_ns
_status
(
4738 current_operation
="RUNNING ACTION",
4739 current_operation_id
=nslcmop_id
,
4742 step
= "Getting information from database"
4743 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4744 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4746 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4747 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4748 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4749 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4750 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4751 primitive
= db_nslcmop
["operationParams"]["primitive"]
4752 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4753 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4754 "timeout_ns_action", self
.timeout_primitive
4758 step
= "Getting vnfr from database"
4759 db_vnfr
= self
.db
.get_one(
4760 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4762 step
= "Getting vnfd from database"
4763 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4765 step
= "Getting nsd from database"
4766 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4768 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4769 # for backward compatibility
4770 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4771 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4772 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4773 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4775 # look for primitive
4776 config_primitive_desc
= descriptor_configuration
= None
4778 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4780 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4782 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4784 descriptor_configuration
= db_nsd
.get("ns-configuration")
4786 if descriptor_configuration
and descriptor_configuration
.get(
4789 for config_primitive
in descriptor_configuration
["config-primitive"]:
4790 if config_primitive
["name"] == primitive
:
4791 config_primitive_desc
= config_primitive
4794 if not config_primitive_desc
:
4795 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4797 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4801 primitive_name
= primitive
4802 ee_descriptor_id
= None
4804 primitive_name
= config_primitive_desc
.get(
4805 "execution-environment-primitive", primitive
4807 ee_descriptor_id
= config_primitive_desc
.get(
4808 "execution-environment-ref"
4814 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4816 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4819 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4821 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4823 desc_params
= parse_yaml_strings(
4824 db_vnfr
.get("additionalParamsForVnf")
4827 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4828 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4829 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4831 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4832 actions
.add(primitive
["name"])
4833 for primitive
in kdu_configuration
.get("config-primitive", []):
4834 actions
.add(primitive
["name"])
4835 kdu_action
= True if primitive_name
in actions
else False
4837 # TODO check if ns is in a proper status
4839 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4841 # kdur and desc_params already set from before
4842 if primitive_params
:
4843 desc_params
.update(primitive_params
)
4844 # TODO Check if we will need something at vnf level
4845 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4847 kdu_name
== kdu
["kdu-name"]
4848 and kdu
["member-vnf-index"] == vnf_index
4853 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4856 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4857 msg
= "unknown k8scluster-type '{}'".format(
4858 kdu
.get("k8scluster-type")
4860 raise LcmException(msg
)
4863 "collection": "nsrs",
4864 "filter": {"_id": nsr_id
},
4865 "path": "_admin.deployed.K8s.{}".format(index
),
4869 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4871 step
= "Executing kdu {}".format(primitive_name
)
4872 if primitive_name
== "upgrade":
4873 if desc_params
.get("kdu_model"):
4874 kdu_model
= desc_params
.get("kdu_model")
4875 del desc_params
["kdu_model"]
4877 kdu_model
= kdu
.get("kdu-model")
4878 parts
= kdu_model
.split(sep
=":")
4880 kdu_model
= parts
[0]
4882 detailed_status
= await asyncio
.wait_for(
4883 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4884 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4885 kdu_instance
=kdu
.get("kdu-instance"),
4887 kdu_model
=kdu_model
,
4890 timeout
=timeout_ns_action
,
4892 timeout
=timeout_ns_action
+ 10,
4895 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4897 elif primitive_name
== "rollback":
4898 detailed_status
= await asyncio
.wait_for(
4899 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4900 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4901 kdu_instance
=kdu
.get("kdu-instance"),
4904 timeout
=timeout_ns_action
,
4906 elif primitive_name
== "status":
4907 detailed_status
= await asyncio
.wait_for(
4908 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4909 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4910 kdu_instance
=kdu
.get("kdu-instance"),
4913 timeout
=timeout_ns_action
,
4916 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4917 kdu
["kdu-name"], nsr_id
4919 params
= self
._map
_primitive
_params
(
4920 config_primitive_desc
, primitive_params
, desc_params
4923 detailed_status
= await asyncio
.wait_for(
4924 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4925 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4926 kdu_instance
=kdu_instance
,
4927 primitive_name
=primitive_name
,
4930 timeout
=timeout_ns_action
,
4933 timeout
=timeout_ns_action
,
4937 nslcmop_operation_state
= "COMPLETED"
4939 detailed_status
= ""
4940 nslcmop_operation_state
= "FAILED"
4942 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4943 nsr_deployed
["VCA"],
4944 member_vnf_index
=vnf_index
,
4946 vdu_count_index
=vdu_count_index
,
4947 ee_descriptor_id
=ee_descriptor_id
,
4949 for vca_index
, vca_deployed
in enumerate(
4950 db_nsr
["_admin"]["deployed"]["VCA"]
4952 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4954 "collection": "nsrs",
4955 "filter": {"_id": nsr_id
},
4956 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4960 nslcmop_operation_state
,
4962 ) = await self
._ns
_execute
_primitive
(
4964 primitive
=primitive_name
,
4965 primitive_params
=self
._map
_primitive
_params
(
4966 config_primitive_desc
, primitive_params
, desc_params
4968 timeout
=timeout_ns_action
,
4974 db_nslcmop_update
["detailed-status"] = detailed_status
4975 error_description_nslcmop
= (
4976 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
4980 + " task Done with result {} {}".format(
4981 nslcmop_operation_state
, detailed_status
4984 return # database update is called inside finally
4986 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
4987 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4989 except asyncio
.CancelledError
:
4991 logging_text
+ "Cancelled Exception while '{}'".format(step
)
4993 exc
= "Operation was cancelled"
4994 except asyncio
.TimeoutError
:
4995 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
4997 except Exception as e
:
4998 exc
= traceback
.format_exc()
4999 self
.logger
.critical(
5000 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5009 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5010 nslcmop_operation_state
= "FAILED"
5012 self
._write
_ns
_status
(
5016 ], # TODO check if degraded. For the moment use previous status
5017 current_operation
="IDLE",
5018 current_operation_id
=None,
5019 # error_description=error_description_nsr,
5020 # error_detail=error_detail,
5021 other_update
=db_nsr_update
,
5024 self
._write
_op
_status
(
5027 error_message
=error_description_nslcmop
,
5028 operation_state
=nslcmop_operation_state
,
5029 other_update
=db_nslcmop_update
,
5032 if nslcmop_operation_state
:
5034 await self
.msg
.aiowrite(
5039 "nslcmop_id": nslcmop_id
,
5040 "operationState": nslcmop_operation_state
,
5044 except Exception as e
:
5046 logging_text
+ "kafka_write notification Exception {}".format(e
)
5048 self
.logger
.debug(logging_text
+ "Exit")
5049 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5050 return nslcmop_operation_state
, detailed_status
5052 async def scale(self
, nsr_id
, nslcmop_id
):
5053 # Try to lock HA task here
5054 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5055 if not task_is_locked_by_me
:
5058 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5059 stage
= ["", "", ""]
5060 tasks_dict_info
= {}
5061 # ^ stage, step, VIM progress
5062 self
.logger
.debug(logging_text
+ "Enter")
5063 # get all needed from database
5065 db_nslcmop_update
= {}
5068 # in case of error, indicates what part of scale was failed to put nsr at error status
5069 scale_process
= None
5070 old_operational_status
= ""
5071 old_config_status
= ""
5074 # wait for any previous tasks in process
5075 step
= "Waiting for previous operations to terminate"
5076 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5077 self
._write
_ns
_status
(
5080 current_operation
="SCALING",
5081 current_operation_id
=nslcmop_id
,
5084 step
= "Getting nslcmop from database"
5086 step
+ " after having waited for previous tasks to be completed"
5088 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5090 step
= "Getting nsr from database"
5091 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5092 old_operational_status
= db_nsr
["operational-status"]
5093 old_config_status
= db_nsr
["config-status"]
5095 step
= "Parsing scaling parameters"
5096 db_nsr_update
["operational-status"] = "scaling"
5097 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5098 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5100 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5102 ]["member-vnf-index"]
5103 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5105 ]["scaling-group-descriptor"]
5106 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5107 # for backward compatibility
5108 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5109 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5110 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5111 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5113 step
= "Getting vnfr from database"
5114 db_vnfr
= self
.db
.get_one(
5115 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5118 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5120 step
= "Getting vnfd from database"
5121 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5123 base_folder
= db_vnfd
["_admin"]["storage"]
5125 step
= "Getting scaling-group-descriptor"
5126 scaling_descriptor
= find_in_list(
5127 get_scaling_aspect(db_vnfd
),
5128 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5130 if not scaling_descriptor
:
5132 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5133 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5136 step
= "Sending scale order to VIM"
5137 # TODO check if ns is in a proper status
5139 if not db_nsr
["_admin"].get("scaling-group"):
5144 "_admin.scaling-group": [
5145 {"name": scaling_group
, "nb-scale-op": 0}
5149 admin_scale_index
= 0
5151 for admin_scale_index
, admin_scale_info
in enumerate(
5152 db_nsr
["_admin"]["scaling-group"]
5154 if admin_scale_info
["name"] == scaling_group
:
5155 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5157 else: # not found, set index one plus last element and add new entry with the name
5158 admin_scale_index
+= 1
5160 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5163 vca_scaling_info
= []
5164 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5165 if scaling_type
== "SCALE_OUT":
5166 if "aspect-delta-details" not in scaling_descriptor
:
5168 "Aspect delta details not fount in scaling descriptor {}".format(
5169 scaling_descriptor
["name"]
5172 # count if max-instance-count is reached
5173 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5175 scaling_info
["scaling_direction"] = "OUT"
5176 scaling_info
["vdu-create"] = {}
5177 scaling_info
["kdu-create"] = {}
5178 for delta
in deltas
:
5179 for vdu_delta
in delta
.get("vdu-delta", {}):
5180 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5181 # vdu_index also provides the number of instance of the targeted vdu
5182 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5183 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5187 additional_params
= (
5188 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5191 cloud_init_list
= []
5193 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5194 max_instance_count
= 10
5195 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5196 max_instance_count
= vdu_profile
.get(
5197 "max-number-of-instances", 10
5200 default_instance_num
= get_number_of_instances(
5203 instances_number
= vdu_delta
.get("number-of-instances", 1)
5204 nb_scale_op
+= instances_number
5206 new_instance_count
= nb_scale_op
+ default_instance_num
5207 # Control if new count is over max and vdu count is less than max.
5208 # Then assign new instance count
5209 if new_instance_count
> max_instance_count
> vdu_count
:
5210 instances_number
= new_instance_count
- max_instance_count
5212 instances_number
= instances_number
5214 if new_instance_count
> max_instance_count
:
5216 "reached the limit of {} (max-instance-count) "
5217 "scaling-out operations for the "
5218 "scaling-group-descriptor '{}'".format(
5219 nb_scale_op
, scaling_group
5222 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5224 # TODO Information of its own ip is not available because db_vnfr is not updated.
5225 additional_params
["OSM"] = get_osm_params(
5226 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5228 cloud_init_list
.append(
5229 self
._parse
_cloud
_init
(
5236 vca_scaling_info
.append(
5238 "osm_vdu_id": vdu_delta
["id"],
5239 "member-vnf-index": vnf_index
,
5241 "vdu_index": vdu_index
+ x
,
5244 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5245 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5246 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5247 kdu_name
= kdu_profile
["kdu-name"]
5248 resource_name
= kdu_profile
["resource-name"]
5250 # Might have different kdus in the same delta
5251 # Should have list for each kdu
5252 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5253 scaling_info
["kdu-create"][kdu_name
] = []
5255 kdur
= get_kdur(db_vnfr
, kdu_name
)
5256 if kdur
.get("helm-chart"):
5257 k8s_cluster_type
= "helm-chart-v3"
5258 self
.logger
.debug("kdur: {}".format(kdur
))
5260 kdur
.get("helm-version")
5261 and kdur
.get("helm-version") == "v2"
5263 k8s_cluster_type
= "helm-chart"
5264 raise NotImplementedError
5265 elif kdur
.get("juju-bundle"):
5266 k8s_cluster_type
= "juju-bundle"
5269 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5270 "juju-bundle. Maybe an old NBI version is running".format(
5271 db_vnfr
["member-vnf-index-ref"], kdu_name
5275 max_instance_count
= 10
5276 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5277 max_instance_count
= kdu_profile
.get(
5278 "max-number-of-instances", 10
5281 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5282 deployed_kdu
, _
= get_deployed_kdu(
5283 nsr_deployed
, kdu_name
, vnf_index
5285 if deployed_kdu
is None:
5287 "KDU '{}' for vnf '{}' not deployed".format(
5291 kdu_instance
= deployed_kdu
.get("kdu-instance")
5292 instance_num
= await self
.k8scluster_map
[
5294 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5295 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5296 "number-of-instances", 1
5299 # Control if new count is over max and instance_num is less than max.
5300 # Then assign max instance number to kdu replica count
5301 if kdu_replica_count
> max_instance_count
> instance_num
:
5302 kdu_replica_count
= max_instance_count
5303 if kdu_replica_count
> max_instance_count
:
5305 "reached the limit of {} (max-instance-count) "
5306 "scaling-out operations for the "
5307 "scaling-group-descriptor '{}'".format(
5308 instance_num
, scaling_group
5312 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5313 vca_scaling_info
.append(
5315 "osm_kdu_id": kdu_name
,
5316 "member-vnf-index": vnf_index
,
5318 "kdu_index": instance_num
+ x
- 1,
5321 scaling_info
["kdu-create"][kdu_name
].append(
5323 "member-vnf-index": vnf_index
,
5325 "k8s-cluster-type": k8s_cluster_type
,
5326 "resource-name": resource_name
,
5327 "scale": kdu_replica_count
,
5330 elif scaling_type
== "SCALE_IN":
5331 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5333 scaling_info
["scaling_direction"] = "IN"
5334 scaling_info
["vdu-delete"] = {}
5335 scaling_info
["kdu-delete"] = {}
5337 for delta
in deltas
:
5338 for vdu_delta
in delta
.get("vdu-delta", {}):
5339 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5340 min_instance_count
= 0
5341 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5342 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5343 min_instance_count
= vdu_profile
["min-number-of-instances"]
5345 default_instance_num
= get_number_of_instances(
5346 db_vnfd
, vdu_delta
["id"]
5348 instance_num
= vdu_delta
.get("number-of-instances", 1)
5349 nb_scale_op
-= instance_num
5351 new_instance_count
= nb_scale_op
+ default_instance_num
5353 if new_instance_count
< min_instance_count
< vdu_count
:
5354 instances_number
= min_instance_count
- new_instance_count
5356 instances_number
= instance_num
5358 if new_instance_count
< min_instance_count
:
5360 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5361 "scaling-group-descriptor '{}'".format(
5362 nb_scale_op
, scaling_group
5365 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5366 vca_scaling_info
.append(
5368 "osm_vdu_id": vdu_delta
["id"],
5369 "member-vnf-index": vnf_index
,
5371 "vdu_index": vdu_index
- 1 - x
,
5374 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5375 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5376 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5377 kdu_name
= kdu_profile
["kdu-name"]
5378 resource_name
= kdu_profile
["resource-name"]
5380 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5381 scaling_info
["kdu-delete"][kdu_name
] = []
5383 kdur
= get_kdur(db_vnfr
, kdu_name
)
5384 if kdur
.get("helm-chart"):
5385 k8s_cluster_type
= "helm-chart-v3"
5386 self
.logger
.debug("kdur: {}".format(kdur
))
5388 kdur
.get("helm-version")
5389 and kdur
.get("helm-version") == "v2"
5391 k8s_cluster_type
= "helm-chart"
5392 raise NotImplementedError
5393 elif kdur
.get("juju-bundle"):
5394 k8s_cluster_type
= "juju-bundle"
5397 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5398 "juju-bundle. Maybe an old NBI version is running".format(
5399 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5403 min_instance_count
= 0
5404 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5405 min_instance_count
= kdu_profile
["min-number-of-instances"]
5407 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5408 deployed_kdu
, _
= get_deployed_kdu(
5409 nsr_deployed
, kdu_name
, vnf_index
5411 if deployed_kdu
is None:
5413 "KDU '{}' for vnf '{}' not deployed".format(
5417 kdu_instance
= deployed_kdu
.get("kdu-instance")
5418 instance_num
= await self
.k8scluster_map
[
5420 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5421 kdu_replica_count
= instance_num
- kdu_delta
.get(
5422 "number-of-instances", 1
5425 if kdu_replica_count
< min_instance_count
< instance_num
:
5426 kdu_replica_count
= min_instance_count
5427 if kdu_replica_count
< min_instance_count
:
5429 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5430 "scaling-group-descriptor '{}'".format(
5431 instance_num
, scaling_group
5435 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5436 vca_scaling_info
.append(
5438 "osm_kdu_id": kdu_name
,
5439 "member-vnf-index": vnf_index
,
5441 "kdu_index": instance_num
- x
- 1,
5444 scaling_info
["kdu-delete"][kdu_name
].append(
5446 "member-vnf-index": vnf_index
,
5448 "k8s-cluster-type": k8s_cluster_type
,
5449 "resource-name": resource_name
,
5450 "scale": kdu_replica_count
,
5454 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5455 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5456 if scaling_info
["scaling_direction"] == "IN":
5457 for vdur
in reversed(db_vnfr
["vdur"]):
5458 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5459 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5460 scaling_info
["vdu"].append(
5462 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5463 "vdu_id": vdur
["vdu-id-ref"],
5467 for interface
in vdur
["interfaces"]:
5468 scaling_info
["vdu"][-1]["interface"].append(
5470 "name": interface
["name"],
5471 "ip_address": interface
["ip-address"],
5472 "mac_address": interface
.get("mac-address"),
5475 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5478 step
= "Executing pre-scale vnf-config-primitive"
5479 if scaling_descriptor
.get("scaling-config-action"):
5480 for scaling_config_action
in scaling_descriptor
[
5481 "scaling-config-action"
5484 scaling_config_action
.get("trigger") == "pre-scale-in"
5485 and scaling_type
== "SCALE_IN"
5487 scaling_config_action
.get("trigger") == "pre-scale-out"
5488 and scaling_type
== "SCALE_OUT"
5490 vnf_config_primitive
= scaling_config_action
[
5491 "vnf-config-primitive-name-ref"
5493 step
= db_nslcmop_update
[
5495 ] = "executing pre-scale scaling-config-action '{}'".format(
5496 vnf_config_primitive
5499 # look for primitive
5500 for config_primitive
in (
5501 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5502 ).get("config-primitive", ()):
5503 if config_primitive
["name"] == vnf_config_primitive
:
5507 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5508 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5509 "primitive".format(scaling_group
, vnf_config_primitive
)
5512 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5513 if db_vnfr
.get("additionalParamsForVnf"):
5514 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5516 scale_process
= "VCA"
5517 db_nsr_update
["config-status"] = "configuring pre-scaling"
5518 primitive_params
= self
._map
_primitive
_params
(
5519 config_primitive
, {}, vnfr_params
5522 # Pre-scale retry check: Check if this sub-operation has been executed before
5523 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5526 vnf_config_primitive
,
5530 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5531 # Skip sub-operation
5532 result
= "COMPLETED"
5533 result_detail
= "Done"
5536 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5537 vnf_config_primitive
, result
, result_detail
5541 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5542 # New sub-operation: Get index of this sub-operation
5544 len(db_nslcmop
.get("_admin", {}).get("operations"))
5549 + "vnf_config_primitive={} New sub-operation".format(
5550 vnf_config_primitive
5554 # retry: Get registered params for this existing sub-operation
5555 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5558 vnf_index
= op
.get("member_vnf_index")
5559 vnf_config_primitive
= op
.get("primitive")
5560 primitive_params
= op
.get("primitive_params")
5563 + "vnf_config_primitive={} Sub-operation retry".format(
5564 vnf_config_primitive
5567 # Execute the primitive, either with new (first-time) or registered (reintent) args
5568 ee_descriptor_id
= config_primitive
.get(
5569 "execution-environment-ref"
5571 primitive_name
= config_primitive
.get(
5572 "execution-environment-primitive", vnf_config_primitive
5574 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5575 nsr_deployed
["VCA"],
5576 member_vnf_index
=vnf_index
,
5578 vdu_count_index
=None,
5579 ee_descriptor_id
=ee_descriptor_id
,
5581 result
, result_detail
= await self
._ns
_execute
_primitive
(
5590 + "vnf_config_primitive={} Done with result {} {}".format(
5591 vnf_config_primitive
, result
, result_detail
5594 # Update operationState = COMPLETED | FAILED
5595 self
._update
_suboperation
_status
(
5596 db_nslcmop
, op_index
, result
, result_detail
5599 if result
== "FAILED":
5600 raise LcmException(result_detail
)
5601 db_nsr_update
["config-status"] = old_config_status
5602 scale_process
= None
5606 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5609 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5612 # SCALE-IN VCA - BEGIN
5613 if vca_scaling_info
:
5614 step
= db_nslcmop_update
[
5616 ] = "Deleting the execution environments"
5617 scale_process
= "VCA"
5618 for vca_info
in vca_scaling_info
:
5619 if vca_info
["type"] == "delete":
5620 member_vnf_index
= str(vca_info
["member-vnf-index"])
5622 logging_text
+ "vdu info: {}".format(vca_info
)
5624 if vca_info
.get("osm_vdu_id"):
5625 vdu_id
= vca_info
["osm_vdu_id"]
5626 vdu_index
= int(vca_info
["vdu_index"])
5629 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5630 member_vnf_index
, vdu_id
, vdu_index
5634 kdu_id
= vca_info
["osm_kdu_id"]
5637 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5638 member_vnf_index
, kdu_id
, vdu_index
5640 stage
[2] = step
= "Scaling in VCA"
5641 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5642 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5643 config_update
= db_nsr
["configurationStatus"]
5644 for vca_index
, vca
in enumerate(vca_update
):
5646 (vca
or vca
.get("ee_id"))
5647 and vca
["member-vnf-index"] == member_vnf_index
5648 and vca
["vdu_count_index"] == vdu_index
5650 if vca
.get("vdu_id"):
5651 config_descriptor
= get_configuration(
5652 db_vnfd
, vca
.get("vdu_id")
5654 elif vca
.get("kdu_name"):
5655 config_descriptor
= get_configuration(
5656 db_vnfd
, vca
.get("kdu_name")
5659 config_descriptor
= get_configuration(
5660 db_vnfd
, db_vnfd
["id"]
5662 operation_params
= (
5663 db_nslcmop
.get("operationParams") or {}
5665 exec_terminate_primitives
= not operation_params
.get(
5666 "skip_terminate_primitives"
5667 ) and vca
.get("needed_terminate")
5668 task
= asyncio
.ensure_future(
5677 exec_primitives
=exec_terminate_primitives
,
5681 timeout
=self
.timeout_charm_delete
,
5684 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5687 del vca_update
[vca_index
]
5688 del config_update
[vca_index
]
5689 # wait for pending tasks of terminate primitives
5693 + "Waiting for tasks {}".format(
5694 list(tasks_dict_info
.keys())
5697 error_list
= await self
._wait
_for
_tasks
(
5701 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5706 tasks_dict_info
.clear()
5708 raise LcmException("; ".join(error_list
))
5710 db_vca_and_config_update
= {
5711 "_admin.deployed.VCA": vca_update
,
5712 "configurationStatus": config_update
,
5715 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5717 scale_process
= None
5718 # SCALE-IN VCA - END
5721 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5722 scale_process
= "RO"
5723 if self
.ro_config
.get("ng"):
5724 await self
._scale
_ng
_ro
(
5725 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5727 scaling_info
.pop("vdu-create", None)
5728 scaling_info
.pop("vdu-delete", None)
5730 scale_process
= None
5734 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5735 scale_process
= "KDU"
5736 await self
._scale
_kdu
(
5737 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5739 scaling_info
.pop("kdu-create", None)
5740 scaling_info
.pop("kdu-delete", None)
5742 scale_process
= None
5746 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5748 # SCALE-UP VCA - BEGIN
5749 if vca_scaling_info
:
5750 step
= db_nslcmop_update
[
5752 ] = "Creating new execution environments"
5753 scale_process
= "VCA"
5754 for vca_info
in vca_scaling_info
:
5755 if vca_info
["type"] == "create":
5756 member_vnf_index
= str(vca_info
["member-vnf-index"])
5758 logging_text
+ "vdu info: {}".format(vca_info
)
5760 vnfd_id
= db_vnfr
["vnfd-ref"]
5761 if vca_info
.get("osm_vdu_id"):
5762 vdu_index
= int(vca_info
["vdu_index"])
5763 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5764 if db_vnfr
.get("additionalParamsForVnf"):
5765 deploy_params
.update(
5767 db_vnfr
["additionalParamsForVnf"].copy()
5770 descriptor_config
= get_configuration(
5771 db_vnfd
, db_vnfd
["id"]
5773 if descriptor_config
:
5778 logging_text
=logging_text
5779 + "member_vnf_index={} ".format(member_vnf_index
),
5782 nslcmop_id
=nslcmop_id
,
5788 member_vnf_index
=member_vnf_index
,
5789 vdu_index
=vdu_index
,
5791 deploy_params
=deploy_params
,
5792 descriptor_config
=descriptor_config
,
5793 base_folder
=base_folder
,
5794 task_instantiation_info
=tasks_dict_info
,
5797 vdu_id
= vca_info
["osm_vdu_id"]
5798 vdur
= find_in_list(
5799 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5801 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5802 if vdur
.get("additionalParams"):
5803 deploy_params_vdu
= parse_yaml_strings(
5804 vdur
["additionalParams"]
5807 deploy_params_vdu
= deploy_params
5808 deploy_params_vdu
["OSM"] = get_osm_params(
5809 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5811 if descriptor_config
:
5816 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5817 member_vnf_index
, vdu_id
, vdu_index
5819 stage
[2] = step
= "Scaling out VCA"
5820 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5822 logging_text
=logging_text
5823 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5824 member_vnf_index
, vdu_id
, vdu_index
5828 nslcmop_id
=nslcmop_id
,
5834 member_vnf_index
=member_vnf_index
,
5835 vdu_index
=vdu_index
,
5837 deploy_params
=deploy_params_vdu
,
5838 descriptor_config
=descriptor_config
,
5839 base_folder
=base_folder
,
5840 task_instantiation_info
=tasks_dict_info
,
5844 kdu_name
= vca_info
["osm_kdu_id"]
5845 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5846 if descriptor_config
:
5848 kdu_index
= int(vca_info
["kdu_index"])
5852 for x
in db_vnfr
["kdur"]
5853 if x
["kdu-name"] == kdu_name
5855 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5856 if kdur
.get("additionalParams"):
5857 deploy_params_kdu
= parse_yaml_strings(
5858 kdur
["additionalParams"]
5862 logging_text
=logging_text
,
5865 nslcmop_id
=nslcmop_id
,
5871 member_vnf_index
=member_vnf_index
,
5872 vdu_index
=kdu_index
,
5874 deploy_params
=deploy_params_kdu
,
5875 descriptor_config
=descriptor_config
,
5876 base_folder
=base_folder
,
5877 task_instantiation_info
=tasks_dict_info
,
5880 # SCALE-UP VCA - END
5881 scale_process
= None
5884 # execute primitive service POST-SCALING
5885 step
= "Executing post-scale vnf-config-primitive"
5886 if scaling_descriptor
.get("scaling-config-action"):
5887 for scaling_config_action
in scaling_descriptor
[
5888 "scaling-config-action"
5891 scaling_config_action
.get("trigger") == "post-scale-in"
5892 and scaling_type
== "SCALE_IN"
5894 scaling_config_action
.get("trigger") == "post-scale-out"
5895 and scaling_type
== "SCALE_OUT"
5897 vnf_config_primitive
= scaling_config_action
[
5898 "vnf-config-primitive-name-ref"
5900 step
= db_nslcmop_update
[
5902 ] = "executing post-scale scaling-config-action '{}'".format(
5903 vnf_config_primitive
5906 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5907 if db_vnfr
.get("additionalParamsForVnf"):
5908 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5910 # look for primitive
5911 for config_primitive
in (
5912 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5913 ).get("config-primitive", ()):
5914 if config_primitive
["name"] == vnf_config_primitive
:
5918 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5919 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5920 "config-primitive".format(
5921 scaling_group
, vnf_config_primitive
5924 scale_process
= "VCA"
5925 db_nsr_update
["config-status"] = "configuring post-scaling"
5926 primitive_params
= self
._map
_primitive
_params
(
5927 config_primitive
, {}, vnfr_params
5930 # Post-scale retry check: Check if this sub-operation has been executed before
5931 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5934 vnf_config_primitive
,
5938 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5939 # Skip sub-operation
5940 result
= "COMPLETED"
5941 result_detail
= "Done"
5944 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5945 vnf_config_primitive
, result
, result_detail
5949 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5950 # New sub-operation: Get index of this sub-operation
5952 len(db_nslcmop
.get("_admin", {}).get("operations"))
5957 + "vnf_config_primitive={} New sub-operation".format(
5958 vnf_config_primitive
5962 # retry: Get registered params for this existing sub-operation
5963 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5966 vnf_index
= op
.get("member_vnf_index")
5967 vnf_config_primitive
= op
.get("primitive")
5968 primitive_params
= op
.get("primitive_params")
5971 + "vnf_config_primitive={} Sub-operation retry".format(
5972 vnf_config_primitive
5975 # Execute the primitive, either with new (first-time) or registered (reintent) args
5976 ee_descriptor_id
= config_primitive
.get(
5977 "execution-environment-ref"
5979 primitive_name
= config_primitive
.get(
5980 "execution-environment-primitive", vnf_config_primitive
5982 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5983 nsr_deployed
["VCA"],
5984 member_vnf_index
=vnf_index
,
5986 vdu_count_index
=None,
5987 ee_descriptor_id
=ee_descriptor_id
,
5989 result
, result_detail
= await self
._ns
_execute
_primitive
(
5998 + "vnf_config_primitive={} Done with result {} {}".format(
5999 vnf_config_primitive
, result
, result_detail
6002 # Update operationState = COMPLETED | FAILED
6003 self
._update
_suboperation
_status
(
6004 db_nslcmop
, op_index
, result
, result_detail
6007 if result
== "FAILED":
6008 raise LcmException(result_detail
)
6009 db_nsr_update
["config-status"] = old_config_status
6010 scale_process
= None
6015 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6016 db_nsr_update
["operational-status"] = (
6018 if old_operational_status
== "failed"
6019 else old_operational_status
6021 db_nsr_update
["config-status"] = old_config_status
6024 ROclient
.ROClientException
,
6029 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6031 except asyncio
.CancelledError
:
6033 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6035 exc
= "Operation was cancelled"
6036 except Exception as e
:
6037 exc
= traceback
.format_exc()
6038 self
.logger
.critical(
6039 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6043 self
._write
_ns
_status
(
6046 current_operation
="IDLE",
6047 current_operation_id
=None,
6050 stage
[1] = "Waiting for instantiate pending tasks."
6051 self
.logger
.debug(logging_text
+ stage
[1])
6052 exc
= await self
._wait
_for
_tasks
(
6055 self
.timeout_ns_deploy
,
6063 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6064 nslcmop_operation_state
= "FAILED"
6066 db_nsr_update
["operational-status"] = old_operational_status
6067 db_nsr_update
["config-status"] = old_config_status
6068 db_nsr_update
["detailed-status"] = ""
6070 if "VCA" in scale_process
:
6071 db_nsr_update
["config-status"] = "failed"
6072 if "RO" in scale_process
:
6073 db_nsr_update
["operational-status"] = "failed"
6076 ] = "FAILED scaling nslcmop={} {}: {}".format(
6077 nslcmop_id
, step
, exc
6080 error_description_nslcmop
= None
6081 nslcmop_operation_state
= "COMPLETED"
6082 db_nslcmop_update
["detailed-status"] = "Done"
6084 self
._write
_op
_status
(
6087 error_message
=error_description_nslcmop
,
6088 operation_state
=nslcmop_operation_state
,
6089 other_update
=db_nslcmop_update
,
6092 self
._write
_ns
_status
(
6095 current_operation
="IDLE",
6096 current_operation_id
=None,
6097 other_update
=db_nsr_update
,
6100 if nslcmop_operation_state
:
6104 "nslcmop_id": nslcmop_id
,
6105 "operationState": nslcmop_operation_state
,
6107 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6108 except Exception as e
:
6110 logging_text
+ "kafka_write notification Exception {}".format(e
)
6112 self
.logger
.debug(logging_text
+ "Exit")
6113 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6115 async def _scale_kdu(
6116 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6118 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6119 for kdu_name
in _scaling_info
:
6120 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6121 deployed_kdu
, index
= get_deployed_kdu(
6122 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6124 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6125 kdu_instance
= deployed_kdu
["kdu-instance"]
6126 scale
= int(kdu_scaling_info
["scale"])
6127 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6130 "collection": "nsrs",
6131 "filter": {"_id": nsr_id
},
6132 "path": "_admin.deployed.K8s.{}".format(index
),
6135 step
= "scaling application {}".format(
6136 kdu_scaling_info
["resource-name"]
6138 self
.logger
.debug(logging_text
+ step
)
6140 if kdu_scaling_info
["type"] == "delete":
6141 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6144 and kdu_config
.get("terminate-config-primitive")
6145 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6147 terminate_config_primitive_list
= kdu_config
.get(
6148 "terminate-config-primitive"
6150 terminate_config_primitive_list
.sort(
6151 key
=lambda val
: int(val
["seq"])
6155 terminate_config_primitive
6156 ) in terminate_config_primitive_list
:
6157 primitive_params_
= self
._map
_primitive
_params
(
6158 terminate_config_primitive
, {}, {}
6160 step
= "execute terminate config primitive"
6161 self
.logger
.debug(logging_text
+ step
)
6162 await asyncio
.wait_for(
6163 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6164 cluster_uuid
=cluster_uuid
,
6165 kdu_instance
=kdu_instance
,
6166 primitive_name
=terminate_config_primitive
["name"],
6167 params
=primitive_params_
,
6174 await asyncio
.wait_for(
6175 self
.k8scluster_map
[k8s_cluster_type
].scale(
6178 kdu_scaling_info
["resource-name"],
6181 timeout
=self
.timeout_vca_on_error
,
6184 if kdu_scaling_info
["type"] == "create":
6185 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6188 and kdu_config
.get("initial-config-primitive")
6189 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6191 initial_config_primitive_list
= kdu_config
.get(
6192 "initial-config-primitive"
6194 initial_config_primitive_list
.sort(
6195 key
=lambda val
: int(val
["seq"])
6198 for initial_config_primitive
in initial_config_primitive_list
:
6199 primitive_params_
= self
._map
_primitive
_params
(
6200 initial_config_primitive
, {}, {}
6202 step
= "execute initial config primitive"
6203 self
.logger
.debug(logging_text
+ step
)
6204 await asyncio
.wait_for(
6205 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6206 cluster_uuid
=cluster_uuid
,
6207 kdu_instance
=kdu_instance
,
6208 primitive_name
=initial_config_primitive
["name"],
6209 params
=primitive_params_
,
6216 async def _scale_ng_ro(
6217 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6219 nsr_id
= db_nslcmop
["nsInstanceId"]
6220 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6223 # read from db: vnfd's for every vnf
6226 # for each vnf in ns, read vnfd
6227 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6228 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6229 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6230 # if we haven't this vnfd, read it from db
6231 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6233 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6234 db_vnfds
.append(vnfd
)
6235 n2vc_key
= self
.n2vc
.get_public_key()
6236 n2vc_key_list
= [n2vc_key
]
6239 vdu_scaling_info
.get("vdu-create"),
6240 vdu_scaling_info
.get("vdu-delete"),
6243 # db_vnfr has been updated, update db_vnfrs to use it
6244 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6245 await self
._instantiate
_ng
_ro
(
6255 start_deploy
=time(),
6256 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6258 if vdu_scaling_info
.get("vdu-delete"):
6260 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6263 async def add_prometheus_metrics(
6264 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6266 if not self
.prometheus
:
6268 # look if exist a file called 'prometheus*.j2' and
6269 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6273 for f
in artifact_content
6274 if f
.startswith("prometheus") and f
.endswith(".j2")
6280 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6284 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6285 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6287 vnfr_id
= vnfr_id
.replace("-", "")
6289 "JOB_NAME": vnfr_id
,
6290 "TARGET_IP": target_ip
,
6291 "EXPORTER_POD_IP": host_name
,
6292 "EXPORTER_POD_PORT": host_port
,
6294 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6295 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6296 for job
in job_list
:
6298 not isinstance(job
.get("job_name"), str)
6299 or vnfr_id
not in job
["job_name"]
6301 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6302 job
["nsr_id"] = nsr_id
6303 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6304 if await self
.prometheus
.update(job_dict
):
6305 return list(job_dict
.keys())
6307 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6309 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6311 :param: vim_account_id: VIM Account ID
6313 :return: (cloud_name, cloud_credential)
6315 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6316 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6318 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6320 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6322 :param: vim_account_id: VIM Account ID
6324 :return: (cloud_name, cloud_credential)
6326 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6327 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")