1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 # get vca status for NS
368 vca_status
= await self
.k8sclusterjuju
.status_kdu(
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 await self
.k8sclusterjuju
.update_vca_status(
380 db_dict
["vcaStatus"],
386 self
.update_db_2("nsrs", nsr_id
, db_dict
)
388 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
390 except Exception as e
:
391 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
394 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
396 env
= Environment(undefined
=StrictUndefined
)
397 template
= env
.from_string(cloud_init_text
)
398 return template
.render(additional_params
or {})
399 except UndefinedError
as e
:
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
405 except (TemplateError
, TemplateNotFound
) as e
:
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
412 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
413 cloud_init_content
= cloud_init_file
= None
415 if vdu
.get("cloud-init-file"):
416 base_folder
= vnfd
["_admin"]["storage"]
417 if base_folder
["pkg-dir"]:
418 cloud_init_file
= "{}/{}/cloud_init/{}".format(
419 base_folder
["folder"],
420 base_folder
["pkg-dir"],
421 vdu
["cloud-init-file"],
424 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
425 base_folder
["folder"],
426 vdu
["cloud-init-file"],
428 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
429 cloud_init_content
= ci_file
.read()
430 elif vdu
.get("cloud-init"):
431 cloud_init_content
= vdu
["cloud-init"]
433 return cloud_init_content
434 except FsException
as e
:
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd
["id"], vdu
["id"], cloud_init_file
, e
441 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
443 vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]
445 additional_params
= vdur
.get("additionalParams")
446 return parse_yaml_strings(additional_params
)
448 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
450 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
451 :param vnfd: input vnfd
452 :param new_id: overrides vnf id if provided
453 :param additionalParams: Instantiation params for VNFs provided
454 :param nsrId: Id of the NSR
455 :return: copy of vnfd
457 vnfd_RO
= deepcopy(vnfd
)
458 # remove unused by RO configuration, monitoring, scaling and internal keys
459 vnfd_RO
.pop("_id", None)
460 vnfd_RO
.pop("_admin", None)
461 vnfd_RO
.pop("monitoring-param", None)
462 vnfd_RO
.pop("scaling-group-descriptor", None)
463 vnfd_RO
.pop("kdu", None)
464 vnfd_RO
.pop("k8s-cluster", None)
466 vnfd_RO
["id"] = new_id
468 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
469 for vdu
in get_iterable(vnfd_RO
, "vdu"):
470 vdu
.pop("cloud-init-file", None)
471 vdu
.pop("cloud-init", None)
475 def ip_profile_2_RO(ip_profile
):
476 RO_ip_profile
= deepcopy(ip_profile
)
477 if "dns-server" in RO_ip_profile
:
478 if isinstance(RO_ip_profile
["dns-server"], list):
479 RO_ip_profile
["dns-address"] = []
480 for ds
in RO_ip_profile
.pop("dns-server"):
481 RO_ip_profile
["dns-address"].append(ds
["address"])
483 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
484 if RO_ip_profile
.get("ip-version") == "ipv4":
485 RO_ip_profile
["ip-version"] = "IPv4"
486 if RO_ip_profile
.get("ip-version") == "ipv6":
487 RO_ip_profile
["ip-version"] = "IPv6"
488 if "dhcp-params" in RO_ip_profile
:
489 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
492 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
493 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
494 if db_vim
["_admin"]["operationalState"] != "ENABLED":
496 "VIM={} is not available. operationalState={}".format(
497 vim_account
, db_vim
["_admin"]["operationalState"]
500 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
503 def get_ro_wim_id_for_wim_account(self
, wim_account
):
504 if isinstance(wim_account
, str):
505 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
506 if db_wim
["_admin"]["operationalState"] != "ENABLED":
508 "WIM={} is not available. operationalState={}".format(
509 wim_account
, db_wim
["_admin"]["operationalState"]
512 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
517 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
519 db_vdu_push_list
= []
520 db_update
= {"_admin.modified": time()}
522 for vdu_id
, vdu_count
in vdu_create
.items():
526 for vdur
in reversed(db_vnfr
["vdur"])
527 if vdur
["vdu-id-ref"] == vdu_id
533 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
538 for count
in range(vdu_count
):
539 vdur_copy
= deepcopy(vdur
)
540 vdur_copy
["status"] = "BUILD"
541 vdur_copy
["status-detailed"] = None
542 vdur_copy
["ip-address"] = None
543 vdur_copy
["_id"] = str(uuid4())
544 vdur_copy
["count-index"] += count
+ 1
545 vdur_copy
["id"] = "{}-{}".format(
546 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
548 vdur_copy
.pop("vim_info", None)
549 for iface
in vdur_copy
["interfaces"]:
550 if iface
.get("fixed-ip"):
551 iface
["ip-address"] = self
.increment_ip_mac(
552 iface
["ip-address"], count
+ 1
555 iface
.pop("ip-address", None)
556 if iface
.get("fixed-mac"):
557 iface
["mac-address"] = self
.increment_ip_mac(
558 iface
["mac-address"], count
+ 1
561 iface
.pop("mac-address", None)
564 ) # only first vdu can be managment of vnf
565 db_vdu_push_list
.append(vdur_copy
)
566 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
568 for vdu_id
, vdu_count
in vdu_delete
.items():
570 indexes_to_delete
= [
572 for iv
in enumerate(db_vnfr
["vdur"])
573 if iv
[1]["vdu-id-ref"] == vdu_id
577 "vdur.{}.status".format(i
): "DELETING"
578 for i
in indexes_to_delete
[-vdu_count
:]
582 # it must be deleted one by one because common.db does not allow otherwise
585 for v
in reversed(db_vnfr
["vdur"])
586 if v
["vdu-id-ref"] == vdu_id
588 for vdu
in vdus_to_delete
[:vdu_count
]:
591 {"_id": db_vnfr
["_id"]},
593 pull
={"vdur": {"_id": vdu
["_id"]}},
595 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
596 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
597 # modify passed dictionary db_vnfr
598 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
599 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
601 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
603 Updates database nsr with the RO info for the created vld
604 :param ns_update_nsr: dictionary to be filled with the updated info
605 :param db_nsr: content of db_nsr. This is also modified
606 :param nsr_desc_RO: nsr descriptor from RO
607 :return: Nothing, LcmException is raised on errors
610 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
611 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
612 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
614 vld
["vim-id"] = net_RO
.get("vim_net_id")
615 vld
["name"] = net_RO
.get("vim_name")
616 vld
["status"] = net_RO
.get("status")
617 vld
["status-detailed"] = net_RO
.get("error_msg")
618 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
622 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
625 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
627 for db_vnfr
in db_vnfrs
.values():
628 vnfr_update
= {"status": "ERROR"}
629 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
630 if "status" not in vdur
:
631 vdur
["status"] = "ERROR"
632 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
634 vdur
["status-detailed"] = str(error_text
)
636 "vdur.{}.status-detailed".format(vdu_index
)
638 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
639 except DbException
as e
:
640 self
.logger
.error("Cannot update vnf. {}".format(e
))
642 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
644 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
645 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
646 :param nsr_desc_RO: nsr descriptor from RO
647 :return: Nothing, LcmException is raised on errors
649 for vnf_index
, db_vnfr
in db_vnfrs
.items():
650 for vnf_RO
in nsr_desc_RO
["vnfs"]:
651 if vnf_RO
["member_vnf_index"] != vnf_index
:
654 if vnf_RO
.get("ip_address"):
655 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
658 elif not db_vnfr
.get("ip-address"):
659 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
660 raise LcmExceptionNoMgmtIP(
661 "ns member_vnf_index '{}' has no IP address".format(
666 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
667 vdur_RO_count_index
= 0
668 if vdur
.get("pdu-type"):
670 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
671 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
673 if vdur
["count-index"] != vdur_RO_count_index
:
674 vdur_RO_count_index
+= 1
676 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
677 if vdur_RO
.get("ip_address"):
678 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
680 vdur
["ip-address"] = None
681 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
682 vdur
["name"] = vdur_RO
.get("vim_name")
683 vdur
["status"] = vdur_RO
.get("status")
684 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
685 for ifacer
in get_iterable(vdur
, "interfaces"):
686 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
687 if ifacer
["name"] == interface_RO
.get("internal_name"):
688 ifacer
["ip-address"] = interface_RO
.get(
691 ifacer
["mac-address"] = interface_RO
.get(
697 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
698 "from VIM info".format(
699 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
702 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
708 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
712 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
713 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
714 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
716 vld
["vim-id"] = net_RO
.get("vim_net_id")
717 vld
["name"] = net_RO
.get("vim_name")
718 vld
["status"] = net_RO
.get("status")
719 vld
["status-detailed"] = net_RO
.get("error_msg")
720 vnfr_update
["vld.{}".format(vld_index
)] = vld
724 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
729 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
734 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
739 def _get_ns_config_info(self
, nsr_id
):
741 Generates a mapping between vnf,vdu elements and the N2VC id
742 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
743 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
744 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
745 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
747 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
748 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
750 ns_config_info
= {"osm-config-mapping": mapping
}
751 for vca
in vca_deployed_list
:
752 if not vca
["member-vnf-index"]:
754 if not vca
["vdu_id"]:
755 mapping
[vca
["member-vnf-index"]] = vca
["application"]
759 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
761 ] = vca
["application"]
762 return ns_config_info
764 async def _instantiate_ng_ro(
781 def get_vim_account(vim_account_id
):
783 if vim_account_id
in db_vims
:
784 return db_vims
[vim_account_id
]
785 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
786 db_vims
[vim_account_id
] = db_vim
789 # modify target_vld info with instantiation parameters
790 def parse_vld_instantiation_params(
791 target_vim
, target_vld
, vld_params
, target_sdn
793 if vld_params
.get("ip-profile"):
794 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
797 if vld_params
.get("provider-network"):
798 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
801 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
802 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
805 if vld_params
.get("wimAccountId"):
806 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
807 target_vld
["vim_info"][target_wim
] = {}
808 for param
in ("vim-network-name", "vim-network-id"):
809 if vld_params
.get(param
):
810 if isinstance(vld_params
[param
], dict):
811 for vim
, vim_net
in vld_params
[param
].items():
812 other_target_vim
= "vim:" + vim
814 target_vld
["vim_info"],
815 (other_target_vim
, param
.replace("-", "_")),
818 else: # isinstance str
819 target_vld
["vim_info"][target_vim
][
820 param
.replace("-", "_")
821 ] = vld_params
[param
]
822 if vld_params
.get("common_id"):
823 target_vld
["common_id"] = vld_params
.get("common_id")
825 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
826 def update_ns_vld_target(target
, ns_params
):
827 for vnf_params
in ns_params
.get("vnf", ()):
828 if vnf_params
.get("vimAccountId"):
832 for vnfr
in db_vnfrs
.values()
833 if vnf_params
["member-vnf-index"]
834 == vnfr
["member-vnf-index-ref"]
838 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
839 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
840 target_vld
= find_in_list(
841 get_iterable(vdur
, "interfaces"),
842 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
845 if vnf_params
.get("vimAccountId") not in a_vld
.get(
848 target
["ns"]["vld"][a_index
].get("vim_info").update(
850 "vim:{}".format(vnf_params
["vimAccountId"]): {
851 "vim_network_name": ""
856 nslcmop_id
= db_nslcmop
["_id"]
858 "name": db_nsr
["name"],
861 "image": deepcopy(db_nsr
["image"]),
862 "flavor": deepcopy(db_nsr
["flavor"]),
863 "action_id": nslcmop_id
,
864 "cloud_init_content": {},
866 for image
in target
["image"]:
867 image
["vim_info"] = {}
868 for flavor
in target
["flavor"]:
869 flavor
["vim_info"] = {}
871 if db_nslcmop
.get("lcmOperationType") != "instantiate":
872 # get parameters of instantiation:
873 db_nslcmop_instantiate
= self
.db
.get_list(
876 "nsInstanceId": db_nslcmop
["nsInstanceId"],
877 "lcmOperationType": "instantiate",
880 ns_params
= db_nslcmop_instantiate
.get("operationParams")
882 ns_params
= db_nslcmop
.get("operationParams")
883 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
884 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
887 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
888 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
892 "mgmt-network": vld
.get("mgmt-network", False),
893 "type": vld
.get("type"),
896 "vim_network_name": vld
.get("vim-network-name"),
897 "vim_account_id": ns_params
["vimAccountId"],
901 # check if this network needs SDN assist
902 if vld
.get("pci-interfaces"):
903 db_vim
= get_vim_account(ns_params
["vimAccountId"])
904 sdnc_id
= db_vim
["config"].get("sdn-controller")
906 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
907 target_sdn
= "sdn:{}".format(sdnc_id
)
908 target_vld
["vim_info"][target_sdn
] = {
910 "target_vim": target_vim
,
912 "type": vld
.get("type"),
915 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
916 for nsd_vnf_profile
in nsd_vnf_profiles
:
917 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
918 if cp
["virtual-link-profile-id"] == vld
["id"]:
920 "member_vnf:{}.{}".format(
921 cp
["constituent-cpd-id"][0][
922 "constituent-base-element-id"
924 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
926 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
928 # check at nsd descriptor, if there is an ip-profile
930 nsd_vlp
= find_in_list(
931 get_virtual_link_profiles(nsd
),
932 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
937 and nsd_vlp
.get("virtual-link-protocol-data")
938 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
940 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
943 ip_profile_dest_data
= {}
944 if "ip-version" in ip_profile_source_data
:
945 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
948 if "cidr" in ip_profile_source_data
:
949 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
952 if "gateway-ip" in ip_profile_source_data
:
953 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
956 if "dhcp-enabled" in ip_profile_source_data
:
957 ip_profile_dest_data
["dhcp-params"] = {
958 "enabled": ip_profile_source_data
["dhcp-enabled"]
960 vld_params
["ip-profile"] = ip_profile_dest_data
962 # update vld_params with instantiation params
963 vld_instantiation_params
= find_in_list(
964 get_iterable(ns_params
, "vld"),
965 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
967 if vld_instantiation_params
:
968 vld_params
.update(vld_instantiation_params
)
969 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
970 target
["ns"]["vld"].append(target_vld
)
971 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
972 update_ns_vld_target(target
, ns_params
)
974 for vnfr
in db_vnfrs
.values():
976 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
978 vnf_params
= find_in_list(
979 get_iterable(ns_params
, "vnf"),
980 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
982 target_vnf
= deepcopy(vnfr
)
983 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
984 for vld
in target_vnf
.get("vld", ()):
985 # check if connected to a ns.vld, to fill target'
986 vnf_cp
= find_in_list(
987 vnfd
.get("int-virtual-link-desc", ()),
988 lambda cpd
: cpd
.get("id") == vld
["id"],
991 ns_cp
= "member_vnf:{}.{}".format(
992 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
994 if cp2target
.get(ns_cp
):
995 vld
["target"] = cp2target
[ns_cp
]
998 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1000 # check if this network needs SDN assist
1002 if vld
.get("pci-interfaces"):
1003 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1004 sdnc_id
= db_vim
["config"].get("sdn-controller")
1006 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1007 target_sdn
= "sdn:{}".format(sdnc_id
)
1008 vld
["vim_info"][target_sdn
] = {
1010 "target_vim": target_vim
,
1012 "type": vld
.get("type"),
1015 # check at vnfd descriptor, if there is an ip-profile
1017 vnfd_vlp
= find_in_list(
1018 get_virtual_link_profiles(vnfd
),
1019 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1023 and vnfd_vlp
.get("virtual-link-protocol-data")
1024 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1026 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1029 ip_profile_dest_data
= {}
1030 if "ip-version" in ip_profile_source_data
:
1031 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1034 if "cidr" in ip_profile_source_data
:
1035 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1038 if "gateway-ip" in ip_profile_source_data
:
1039 ip_profile_dest_data
[
1041 ] = ip_profile_source_data
["gateway-ip"]
1042 if "dhcp-enabled" in ip_profile_source_data
:
1043 ip_profile_dest_data
["dhcp-params"] = {
1044 "enabled": ip_profile_source_data
["dhcp-enabled"]
1047 vld_params
["ip-profile"] = ip_profile_dest_data
1048 # update vld_params with instantiation params
1050 vld_instantiation_params
= find_in_list(
1051 get_iterable(vnf_params
, "internal-vld"),
1052 lambda i_vld
: i_vld
["name"] == vld
["id"],
1054 if vld_instantiation_params
:
1055 vld_params
.update(vld_instantiation_params
)
1056 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1059 for vdur
in target_vnf
.get("vdur", ()):
1060 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1061 continue # This vdu must not be created
1062 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1064 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1067 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1068 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1071 and vdu_configuration
.get("config-access")
1072 and vdu_configuration
.get("config-access").get("ssh-access")
1074 vdur
["ssh-keys"] = ssh_keys_all
1075 vdur
["ssh-access-required"] = vdu_configuration
[
1077 ]["ssh-access"]["required"]
1080 and vnf_configuration
.get("config-access")
1081 and vnf_configuration
.get("config-access").get("ssh-access")
1082 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1084 vdur
["ssh-keys"] = ssh_keys_all
1085 vdur
["ssh-access-required"] = vnf_configuration
[
1087 ]["ssh-access"]["required"]
1088 elif ssh_keys_instantiation
and find_in_list(
1089 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1091 vdur
["ssh-keys"] = ssh_keys_instantiation
1093 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1095 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1097 if vdud
.get("cloud-init-file"):
1098 vdur
["cloud-init"] = "{}:file:{}".format(
1099 vnfd
["_id"], vdud
.get("cloud-init-file")
1101 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1102 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1103 base_folder
= vnfd
["_admin"]["storage"]
1104 if base_folder
["pkg-dir"]:
1105 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1106 base_folder
["folder"],
1107 base_folder
["pkg-dir"],
1108 vdud
.get("cloud-init-file"),
1111 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1112 base_folder
["folder"],
1113 vdud
.get("cloud-init-file"),
1115 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1116 target
["cloud_init_content"][
1119 elif vdud
.get("cloud-init"):
1120 vdur
["cloud-init"] = "{}:vdu:{}".format(
1121 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1123 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1124 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1127 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1128 deploy_params_vdu
= self
._format
_additional
_params
(
1129 vdur
.get("additionalParams") or {}
1131 deploy_params_vdu
["OSM"] = get_osm_params(
1132 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1134 vdur
["additionalParams"] = deploy_params_vdu
1137 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1138 if target_vim
not in ns_flavor
["vim_info"]:
1139 ns_flavor
["vim_info"][target_vim
] = {}
1142 # in case alternative images are provided we must check if they should be applied
1143 # for the vim_type, modify the vim_type taking into account
1144 ns_image_id
= int(vdur
["ns-image-id"])
1145 if vdur
.get("alt-image-ids"):
1146 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1147 vim_type
= db_vim
["vim_type"]
1148 for alt_image_id
in vdur
.get("alt-image-ids"):
1149 ns_alt_image
= target
["image"][int(alt_image_id
)]
1150 if vim_type
== ns_alt_image
.get("vim-type"):
1151 # must use alternative image
1153 "use alternative image id: {}".format(alt_image_id
)
1155 ns_image_id
= alt_image_id
1156 vdur
["ns-image-id"] = ns_image_id
1158 ns_image
= target
["image"][int(ns_image_id
)]
1159 if target_vim
not in ns_image
["vim_info"]:
1160 ns_image
["vim_info"][target_vim
] = {}
1162 vdur
["vim_info"] = {target_vim
: {}}
1163 # instantiation parameters
1165 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1166 # vdud["id"]), None)
1167 vdur_list
.append(vdur
)
1168 target_vnf
["vdur"] = vdur_list
1169 target
["vnf"].append(target_vnf
)
1171 desc
= await self
.RO
.deploy(nsr_id
, target
)
1172 self
.logger
.debug("RO return > {}".format(desc
))
1173 action_id
= desc
["action_id"]
1174 await self
._wait
_ng
_ro
(
1175 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1180 "_admin.deployed.RO.operational-status": "running",
1181 "detailed-status": " ".join(stage
),
1183 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1184 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1185 self
._write
_op
_status
(nslcmop_id
, stage
)
1187 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1191 async def _wait_ng_ro(
1200 detailed_status_old
= None
1202 start_time
= start_time
or time()
1203 while time() <= start_time
+ timeout
:
1204 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1205 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1206 if desc_status
["status"] == "FAILED":
1207 raise NgRoException(desc_status
["details"])
1208 elif desc_status
["status"] == "BUILD":
1210 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1211 elif desc_status
["status"] == "DONE":
1213 stage
[2] = "Deployed at VIM"
1216 assert False, "ROclient.check_ns_status returns unknown {}".format(
1217 desc_status
["status"]
1219 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1220 detailed_status_old
= stage
[2]
1221 db_nsr_update
["detailed-status"] = " ".join(stage
)
1222 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1223 self
._write
_op
_status
(nslcmop_id
, stage
)
1224 await asyncio
.sleep(15, loop
=self
.loop
)
1225 else: # timeout_ns_deploy
1226 raise NgRoException("Timeout waiting ns to deploy")
1228 async def _terminate_ng_ro(
1229 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1234 start_deploy
= time()
1241 "action_id": nslcmop_id
,
1243 desc
= await self
.RO
.deploy(nsr_id
, target
)
1244 action_id
= desc
["action_id"]
1245 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1246 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1249 + "ns terminate action at RO. action_id={}".format(action_id
)
1253 delete_timeout
= 20 * 60 # 20 minutes
1254 await self
._wait
_ng
_ro
(
1255 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1258 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1261 await self
.RO
.delete(nsr_id
)
1262 except Exception as e
:
1263 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1264 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1265 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1266 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1270 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1271 failed_detail
.append("delete conflict: {}".format(e
))
1274 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1277 failed_detail
.append("delete error: {}".format(e
))
1280 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1284 stage
[2] = "Error deleting from VIM"
1286 stage
[2] = "Deleted from VIM"
1287 db_nsr_update
["detailed-status"] = " ".join(stage
)
1288 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1289 self
._write
_op
_status
(nslcmop_id
, stage
)
1292 raise LcmException("; ".join(failed_detail
))
1295 async def instantiate_RO(
1309 :param logging_text: preffix text to use at logging
1310 :param nsr_id: nsr identity
1311 :param nsd: database content of ns descriptor
1312 :param db_nsr: database content of ns record
1313 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1315 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1316 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1317 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1318 :return: None or exception
1321 start_deploy
= time()
1322 ns_params
= db_nslcmop
.get("operationParams")
1323 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1324 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1326 timeout_ns_deploy
= self
.timeout
.get(
1327 "ns_deploy", self
.timeout_ns_deploy
1330 # Check for and optionally request placement optimization. Database will be updated if placement activated
1331 stage
[2] = "Waiting for Placement."
1332 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1333 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1334 for vnfr
in db_vnfrs
.values():
1335 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1338 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1340 return await self
._instantiate
_ng
_ro
(
1353 except Exception as e
:
1354 stage
[2] = "ERROR deploying at VIM"
1355 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1357 "Error deploying at VIM {}".format(e
),
1358 exc_info
=not isinstance(
1361 ROclient
.ROClientException
,
1370 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1372 Wait for kdu to be up, get ip address
1373 :param logging_text: prefix use for logging
1380 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1383 while nb_tries
< 360:
1384 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1388 for x
in get_iterable(db_vnfr
, "kdur")
1389 if x
.get("kdu-name") == kdu_name
1395 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1397 if kdur
.get("status"):
1398 if kdur
["status"] in ("READY", "ENABLED"):
1399 return kdur
.get("ip-address")
1402 "target KDU={} is in error state".format(kdu_name
)
1405 await asyncio
.sleep(10, loop
=self
.loop
)
1407 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1409 async def wait_vm_up_insert_key_ro(
1410 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1413 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1414 :param logging_text: prefix use for logging
1419 :param pub_key: public ssh key to inject, None to skip
1420 :param user: user to apply the public ssh key
1424 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1428 target_vdu_id
= None
1434 if ro_retries
>= 360: # 1 hour
1436 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1439 await asyncio
.sleep(10, loop
=self
.loop
)
1442 if not target_vdu_id
:
1443 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1445 if not vdu_id
: # for the VNF case
1446 if db_vnfr
.get("status") == "ERROR":
1448 "Cannot inject ssh-key because target VNF is in error state"
1450 ip_address
= db_vnfr
.get("ip-address")
1456 for x
in get_iterable(db_vnfr
, "vdur")
1457 if x
.get("ip-address") == ip_address
1465 for x
in get_iterable(db_vnfr
, "vdur")
1466 if x
.get("vdu-id-ref") == vdu_id
1467 and x
.get("count-index") == vdu_index
1473 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1474 ): # If only one, this should be the target vdu
1475 vdur
= db_vnfr
["vdur"][0]
1478 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1479 vnfr_id
, vdu_id
, vdu_index
1482 # New generation RO stores information at "vim_info"
1485 if vdur
.get("vim_info"):
1487 t
for t
in vdur
["vim_info"]
1488 ) # there should be only one key
1489 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1491 vdur
.get("pdu-type")
1492 or vdur
.get("status") == "ACTIVE"
1493 or ng_ro_status
== "ACTIVE"
1495 ip_address
= vdur
.get("ip-address")
1498 target_vdu_id
= vdur
["vdu-id-ref"]
1499 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1501 "Cannot inject ssh-key because target VM is in error state"
1504 if not target_vdu_id
:
1507 # inject public key into machine
1508 if pub_key
and user
:
1509 self
.logger
.debug(logging_text
+ "Inserting RO key")
1510 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1511 if vdur
.get("pdu-type"):
1512 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1515 ro_vm_id
= "{}-{}".format(
1516 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1517 ) # TODO add vdu_index
1521 "action": "inject_ssh_key",
1525 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1527 desc
= await self
.RO
.deploy(nsr_id
, target
)
1528 action_id
= desc
["action_id"]
1529 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1532 # wait until NS is deployed at RO
1534 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1535 ro_nsr_id
= deep_get(
1536 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1540 result_dict
= await self
.RO
.create_action(
1542 item_id_name
=ro_nsr_id
,
1544 "add_public_key": pub_key
,
1549 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1550 if not result_dict
or not isinstance(result_dict
, dict):
1552 "Unknown response from RO when injecting key"
1554 for result
in result_dict
.values():
1555 if result
.get("vim_result") == 200:
1558 raise ROclient
.ROClientException(
1559 "error injecting key: {}".format(
1560 result
.get("description")
1564 except NgRoException
as e
:
1566 "Reaching max tries injecting key. Error: {}".format(e
)
1568 except ROclient
.ROClientException
as e
:
1572 + "error injecting key: {}. Retrying until {} seconds".format(
1579 "Reaching max tries injecting key. Error: {}".format(e
)
1586 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1588 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1590 my_vca
= vca_deployed_list
[vca_index
]
1591 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1592 # vdu or kdu: no dependencies
1596 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1597 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1598 configuration_status_list
= db_nsr
["configurationStatus"]
1599 for index
, vca_deployed
in enumerate(configuration_status_list
):
1600 if index
== vca_index
:
1603 if not my_vca
.get("member-vnf-index") or (
1604 vca_deployed
.get("member-vnf-index")
1605 == my_vca
.get("member-vnf-index")
1607 internal_status
= configuration_status_list
[index
].get("status")
1608 if internal_status
== "READY":
1610 elif internal_status
== "BROKEN":
1612 "Configuration aborted because dependent charm/s has failed"
1617 # no dependencies, return
1619 await asyncio
.sleep(10)
1622 raise LcmException("Configuration aborted because dependent charm/s timeout")
1624 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1627 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1629 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1630 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1633 async def instantiate_N2VC(
1650 ee_config_descriptor
,
1652 nsr_id
= db_nsr
["_id"]
1653 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1654 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1655 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1656 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1658 "collection": "nsrs",
1659 "filter": {"_id": nsr_id
},
1660 "path": db_update_entry
,
1666 element_under_configuration
= nsr_id
1670 vnfr_id
= db_vnfr
["_id"]
1671 osm_config
["osm"]["vnf_id"] = vnfr_id
1673 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1675 if vca_type
== "native_charm":
1678 index_number
= vdu_index
or 0
1681 element_type
= "VNF"
1682 element_under_configuration
= vnfr_id
1683 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1685 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1686 element_type
= "VDU"
1687 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1688 osm_config
["osm"]["vdu_id"] = vdu_id
1690 namespace
+= ".{}".format(kdu_name
)
1691 element_type
= "KDU"
1692 element_under_configuration
= kdu_name
1693 osm_config
["osm"]["kdu_name"] = kdu_name
1696 if base_folder
["pkg-dir"]:
1697 artifact_path
= "{}/{}/{}/{}".format(
1698 base_folder
["folder"],
1699 base_folder
["pkg-dir"],
1701 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1706 artifact_path
= "{}/Scripts/{}/{}/".format(
1707 base_folder
["folder"],
1709 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1714 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list
= config_descriptor
.get(
1718 "initial-config-primitive"
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1727 # add config if not present for NS charm
1728 ee_descriptor_id
= ee_config_descriptor
.get("id")
1729 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1730 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id
= vca_deployed
.get("ee_id")
1743 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1744 # create or register execution environment in VCA
1745 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1747 self
._write
_configuration
_status
(
1749 vca_index
=vca_index
,
1751 element_under_configuration
=element_under_configuration
,
1752 element_type
=element_type
,
1755 step
= "create execution environment"
1756 self
.logger
.debug(logging_text
+ step
)
1760 if vca_type
== "k8s_proxy_charm":
1761 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1762 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1763 namespace
=namespace
,
1764 artifact_path
=artifact_path
,
1768 elif vca_type
== "helm" or vca_type
== "helm-v3":
1769 ee_id
, credentials
= await self
.vca_map
[
1771 ].create_execution_environment(
1772 namespace
=namespace
,
1776 artifact_path
=artifact_path
,
1780 ee_id
, credentials
= await self
.vca_map
[
1782 ].create_execution_environment(
1783 namespace
=namespace
,
1789 elif vca_type
== "native_charm":
1790 step
= "Waiting to VM being up and getting IP address"
1791 self
.logger
.debug(logging_text
+ step
)
1792 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1801 credentials
= {"hostname": rw_mgmt_ip
}
1803 username
= deep_get(
1804 config_descriptor
, ("config-access", "ssh-access", "default-user")
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username
and initial_config_primitive_list
:
1809 for config_primitive
in initial_config_primitive_list
:
1810 for param
in config_primitive
.get("parameter", ()):
1811 if param
["name"] == "ssh-username":
1812 username
= param
["value"]
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1819 credentials
["username"] = username
1820 # n2vc_redesign STEP 3.2
1822 self
._write
_configuration
_status
(
1824 vca_index
=vca_index
,
1825 status
="REGISTERING",
1826 element_under_configuration
=element_under_configuration
,
1827 element_type
=element_type
,
1830 step
= "register execution environment {}".format(credentials
)
1831 self
.logger
.debug(logging_text
+ step
)
1832 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1833 credentials
=credentials
,
1834 namespace
=namespace
,
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts
= ee_id
.split(".")
1842 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1843 if len(ee_id_parts
) >= 2:
1844 model_name
= ee_id_parts
[0]
1845 application_name
= ee_id_parts
[1]
1846 db_nsr_update
[db_update_entry
+ "model"] = model_name
1847 db_nsr_update
[db_update_entry
+ "application"] = application_name
1849 # n2vc_redesign STEP 3.3
1850 step
= "Install configuration Software"
1852 self
._write
_configuration
_status
(
1854 vca_index
=vca_index
,
1855 status
="INSTALLING SW",
1856 element_under_configuration
=element_under_configuration
,
1857 element_type
=element_type
,
1858 other_update
=db_nsr_update
,
1861 # TODO check if already done
1862 self
.logger
.debug(logging_text
+ step
)
1864 if vca_type
== "native_charm":
1865 config_primitive
= next(
1866 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1869 if config_primitive
:
1870 config
= self
._map
_primitive
_params
(
1871 config_primitive
, {}, deploy_params
1874 if vca_type
== "lxc_proxy_charm":
1875 if element_type
== "NS":
1876 num_units
= db_nsr
.get("config-units") or 1
1877 elif element_type
== "VNF":
1878 num_units
= db_vnfr
.get("config-units") or 1
1879 elif element_type
== "VDU":
1880 for v
in db_vnfr
["vdur"]:
1881 if vdu_id
== v
["vdu-id-ref"]:
1882 num_units
= v
.get("config-units") or 1
1884 if vca_type
!= "k8s_proxy_charm":
1885 await self
.vca_map
[vca_type
].install_configuration_sw(
1887 artifact_path
=artifact_path
,
1890 num_units
=num_units
,
1895 # write in db flag of configuration_sw already installed
1897 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self
._add
_vca
_relations
(
1902 logging_text
=logging_text
,
1905 vca_index
=vca_index
,
1908 # if SSH access is required, then get execution environment SSH public
1909 # if native charm we have waited already to VM be UP
1910 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1913 # self.logger.debug("get ssh key block")
1915 config_descriptor
, ("config-access", "ssh-access", "required")
1917 # self.logger.debug("ssh key needed")
1918 # Needed to inject a ssh key
1921 ("config-access", "ssh-access", "default-user"),
1923 step
= "Install configuration Software, getting public ssh key"
1924 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1925 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1928 step
= "Insert public key into VM user={} ssh_key={}".format(
1932 # self.logger.debug("no need to get ssh key")
1933 step
= "Waiting to VM being up and getting IP address"
1934 self
.logger
.debug(logging_text
+ step
)
1936 # n2vc_redesign STEP 5.1
1937 # wait for RO (ip-address) Insert pub_key into VM
1940 rw_mgmt_ip
= await self
.wait_kdu_up(
1941 logging_text
, nsr_id
, vnfr_id
, kdu_name
1944 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1954 rw_mgmt_ip
= None # This is for a NS configuration
1956 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1958 # store rw_mgmt_ip in deploy params for later replacement
1959 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1961 # n2vc_redesign STEP 6 Execute initial config primitive
1962 step
= "execute initial config primitive"
1964 # wait for dependent primitives execution (NS -> VNF -> VDU)
1965 if initial_config_primitive_list
:
1966 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1968 # stage, in function of element type: vdu, kdu, vnf or ns
1969 my_vca
= vca_deployed_list
[vca_index
]
1970 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1972 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1973 elif my_vca
.get("member-vnf-index"):
1975 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1978 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1980 self
._write
_configuration
_status
(
1981 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1984 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1986 check_if_terminated_needed
= True
1987 for initial_config_primitive
in initial_config_primitive_list
:
1988 # adding information on the vca_deployed if it is a NS execution environment
1989 if not vca_deployed
["member-vnf-index"]:
1990 deploy_params
["ns_config_info"] = json
.dumps(
1991 self
._get
_ns
_config
_info
(nsr_id
)
1993 # TODO check if already done
1994 primitive_params_
= self
._map
_primitive
_params
(
1995 initial_config_primitive
, {}, deploy_params
1998 step
= "execute primitive '{}' params '{}'".format(
1999 initial_config_primitive
["name"], primitive_params_
2001 self
.logger
.debug(logging_text
+ step
)
2002 await self
.vca_map
[vca_type
].exec_primitive(
2004 primitive_name
=initial_config_primitive
["name"],
2005 params_dict
=primitive_params_
,
2010 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2011 if check_if_terminated_needed
:
2012 if config_descriptor
.get("terminate-config-primitive"):
2014 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2016 check_if_terminated_needed
= False
2018 # TODO register in database that primitive is done
2020 # STEP 7 Configure metrics
2021 if vca_type
== "helm" or vca_type
== "helm-v3":
2022 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2024 artifact_path
=artifact_path
,
2025 ee_config_descriptor
=ee_config_descriptor
,
2028 target_ip
=rw_mgmt_ip
,
2034 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2037 for job
in prometheus_jobs
:
2041 "job_name": job
["job_name"]
2048 step
= "instantiated at VCA"
2049 self
.logger
.debug(logging_text
+ step
)
2051 self
._write
_configuration
_status
(
2052 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2055 except Exception as e
: # TODO not use Exception but N2VC exception
2056 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2058 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2061 "Exception while {} : {}".format(step
, e
), exc_info
=True
2063 self
._write
_configuration
_status
(
2064 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2066 raise LcmException("{} {}".format(step
, e
)) from e
2068 def _write_ns_status(
2072 current_operation
: str,
2073 current_operation_id
: str,
2074 error_description
: str = None,
2075 error_detail
: str = None,
2076 other_update
: dict = None,
2079 Update db_nsr fields.
2082 :param current_operation:
2083 :param current_operation_id:
2084 :param error_description:
2085 :param error_detail:
2086 :param other_update: Other required changes at database if provided, will be cleared
2090 db_dict
= other_update
or {}
2093 ] = current_operation_id
# for backward compatibility
2094 db_dict
["_admin.current-operation"] = current_operation_id
2095 db_dict
["_admin.operation-type"] = (
2096 current_operation
if current_operation
!= "IDLE" else None
2098 db_dict
["currentOperation"] = current_operation
2099 db_dict
["currentOperationID"] = current_operation_id
2100 db_dict
["errorDescription"] = error_description
2101 db_dict
["errorDetail"] = error_detail
2104 db_dict
["nsState"] = ns_state
2105 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2106 except DbException
as e
:
2107 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2109 def _write_op_status(
2113 error_message
: str = None,
2114 queuePosition
: int = 0,
2115 operation_state
: str = None,
2116 other_update
: dict = None,
2119 db_dict
= other_update
or {}
2120 db_dict
["queuePosition"] = queuePosition
2121 if isinstance(stage
, list):
2122 db_dict
["stage"] = stage
[0]
2123 db_dict
["detailed-status"] = " ".join(stage
)
2124 elif stage
is not None:
2125 db_dict
["stage"] = str(stage
)
2127 if error_message
is not None:
2128 db_dict
["errorMessage"] = error_message
2129 if operation_state
is not None:
2130 db_dict
["operationState"] = operation_state
2131 db_dict
["statusEnteredTime"] = time()
2132 self
.update_db_2("nslcmops", op_id
, db_dict
)
2133 except DbException
as e
:
2135 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2138 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2140 nsr_id
= db_nsr
["_id"]
2141 # configurationStatus
2142 config_status
= db_nsr
.get("configurationStatus")
2145 "configurationStatus.{}.status".format(index
): status
2146 for index
, v
in enumerate(config_status
)
2150 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2152 except DbException
as e
:
2154 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2157 def _write_configuration_status(
2162 element_under_configuration
: str = None,
2163 element_type
: str = None,
2164 other_update
: dict = None,
2167 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2168 # .format(vca_index, status))
2171 db_path
= "configurationStatus.{}.".format(vca_index
)
2172 db_dict
= other_update
or {}
2174 db_dict
[db_path
+ "status"] = status
2175 if element_under_configuration
:
2177 db_path
+ "elementUnderConfiguration"
2178 ] = element_under_configuration
2180 db_dict
[db_path
+ "elementType"] = element_type
2181 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2182 except DbException
as e
:
2184 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2185 status
, nsr_id
, vca_index
, e
2189 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2191 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2192 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2193 Database is used because the result can be obtained from a different LCM worker in case of HA.
2194 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2195 :param db_nslcmop: database content of nslcmop
2196 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2197 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2198 computed 'vim-account-id'
2201 nslcmop_id
= db_nslcmop
["_id"]
2202 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2203 if placement_engine
== "PLA":
2205 logging_text
+ "Invoke and wait for placement optimization"
2207 await self
.msg
.aiowrite(
2208 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2210 db_poll_interval
= 5
2211 wait
= db_poll_interval
* 10
2213 while not pla_result
and wait
>= 0:
2214 await asyncio
.sleep(db_poll_interval
)
2215 wait
-= db_poll_interval
2216 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2217 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2221 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2224 for pla_vnf
in pla_result
["vnf"]:
2225 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2226 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2231 {"_id": vnfr
["_id"]},
2232 {"vim-account-id": pla_vnf
["vimAccountId"]},
2235 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2238 def update_nsrs_with_pla_result(self
, params
):
2240 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2242 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2244 except Exception as e
:
2245 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2247 async def instantiate(self
, nsr_id
, nslcmop_id
):
2250 :param nsr_id: ns instance to deploy
2251 :param nslcmop_id: operation to run
2255 # Try to lock HA task here
2256 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2257 if not task_is_locked_by_me
:
2259 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2263 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2264 self
.logger
.debug(logging_text
+ "Enter")
2266 # get all needed from database
2268 # database nsrs record
2271 # database nslcmops record
2274 # update operation on nsrs
2276 # update operation on nslcmops
2277 db_nslcmop_update
= {}
2279 nslcmop_operation_state
= None
2280 db_vnfrs
= {} # vnf's info indexed by member-index
2282 tasks_dict_info
= {} # from task to info text
2286 "Stage 1/5: preparation of the environment.",
2287 "Waiting for previous operations to terminate.",
2290 # ^ stage, step, VIM progress
2292 # wait for any previous tasks in process
2293 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2295 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2296 stage
[1] = "Reading from database."
2297 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2298 db_nsr_update
["detailed-status"] = "creating"
2299 db_nsr_update
["operational-status"] = "init"
2300 self
._write
_ns
_status
(
2302 ns_state
="BUILDING",
2303 current_operation
="INSTANTIATING",
2304 current_operation_id
=nslcmop_id
,
2305 other_update
=db_nsr_update
,
2307 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2309 # read from db: operation
2310 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2311 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2312 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2313 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2314 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2316 ns_params
= db_nslcmop
.get("operationParams")
2317 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2318 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2320 timeout_ns_deploy
= self
.timeout
.get(
2321 "ns_deploy", self
.timeout_ns_deploy
2325 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2326 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2327 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2328 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2329 self
.fs
.sync(db_nsr
["nsd-id"])
2331 # nsr_name = db_nsr["name"] # TODO short-name??
2333 # read from db: vnf's of this ns
2334 stage
[1] = "Getting vnfrs from db."
2335 self
.logger
.debug(logging_text
+ stage
[1])
2336 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2338 # read from db: vnfd's for every vnf
2339 db_vnfds
= [] # every vnfd data
2341 # for each vnf in ns, read vnfd
2342 for vnfr
in db_vnfrs_list
:
2343 if vnfr
.get("kdur"):
2345 for kdur
in vnfr
["kdur"]:
2346 if kdur
.get("additionalParams"):
2347 kdur
["additionalParams"] = json
.loads(
2348 kdur
["additionalParams"]
2350 kdur_list
.append(kdur
)
2351 vnfr
["kdur"] = kdur_list
2353 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2354 vnfd_id
= vnfr
["vnfd-id"]
2355 vnfd_ref
= vnfr
["vnfd-ref"]
2356 self
.fs
.sync(vnfd_id
)
2358 # if we haven't this vnfd, read it from db
2359 if vnfd_id
not in db_vnfds
:
2361 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2364 self
.logger
.debug(logging_text
+ stage
[1])
2365 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2368 db_vnfds
.append(vnfd
)
2370 # Get or generates the _admin.deployed.VCA list
2371 vca_deployed_list
= None
2372 if db_nsr
["_admin"].get("deployed"):
2373 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2374 if vca_deployed_list
is None:
2375 vca_deployed_list
= []
2376 configuration_status_list
= []
2377 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2378 db_nsr_update
["configurationStatus"] = configuration_status_list
2379 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2380 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2381 elif isinstance(vca_deployed_list
, dict):
2382 # maintain backward compatibility. Change a dict to list at database
2383 vca_deployed_list
= list(vca_deployed_list
.values())
2384 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2385 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2388 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2390 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2391 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2393 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2394 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2395 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2397 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2400 # n2vc_redesign STEP 2 Deploy Network Scenario
2401 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2402 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2404 stage
[1] = "Deploying KDUs."
2405 # self.logger.debug(logging_text + "Before deploy_kdus")
2406 # Call to deploy_kdus in case exists the "vdu:kdu" param
2407 await self
.deploy_kdus(
2408 logging_text
=logging_text
,
2410 nslcmop_id
=nslcmop_id
,
2413 task_instantiation_info
=tasks_dict_info
,
2416 stage
[1] = "Getting VCA public key."
2417 # n2vc_redesign STEP 1 Get VCA public ssh-key
2418 # feature 1429. Add n2vc public key to needed VMs
2419 n2vc_key
= self
.n2vc
.get_public_key()
2420 n2vc_key_list
= [n2vc_key
]
2421 if self
.vca_config
.get("public_key"):
2422 n2vc_key_list
.append(self
.vca_config
["public_key"])
2424 stage
[1] = "Deploying NS at VIM."
2425 task_ro
= asyncio
.ensure_future(
2426 self
.instantiate_RO(
2427 logging_text
=logging_text
,
2431 db_nslcmop
=db_nslcmop
,
2434 n2vc_key_list
=n2vc_key_list
,
2438 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2439 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2441 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2442 stage
[1] = "Deploying Execution Environments."
2443 self
.logger
.debug(logging_text
+ stage
[1])
2445 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2446 for vnf_profile
in get_vnf_profiles(nsd
):
2447 vnfd_id
= vnf_profile
["vnfd-id"]
2448 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2449 member_vnf_index
= str(vnf_profile
["id"])
2450 db_vnfr
= db_vnfrs
[member_vnf_index
]
2451 base_folder
= vnfd
["_admin"]["storage"]
2457 # Get additional parameters
2458 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2459 if db_vnfr
.get("additionalParamsForVnf"):
2460 deploy_params
.update(
2461 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2464 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2465 if descriptor_config
:
2467 logging_text
=logging_text
2468 + "member_vnf_index={} ".format(member_vnf_index
),
2471 nslcmop_id
=nslcmop_id
,
2477 member_vnf_index
=member_vnf_index
,
2478 vdu_index
=vdu_index
,
2480 deploy_params
=deploy_params
,
2481 descriptor_config
=descriptor_config
,
2482 base_folder
=base_folder
,
2483 task_instantiation_info
=tasks_dict_info
,
2487 # Deploy charms for each VDU that supports one.
2488 for vdud
in get_vdu_list(vnfd
):
2490 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2491 vdur
= find_in_list(
2492 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2495 if vdur
.get("additionalParams"):
2496 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2498 deploy_params_vdu
= deploy_params
2499 deploy_params_vdu
["OSM"] = get_osm_params(
2500 db_vnfr
, vdu_id
, vdu_count_index
=0
2502 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2504 self
.logger
.debug("VDUD > {}".format(vdud
))
2506 "Descriptor config > {}".format(descriptor_config
)
2508 if descriptor_config
:
2511 for vdu_index
in range(vdud_count
):
2512 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2514 logging_text
=logging_text
2515 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2516 member_vnf_index
, vdu_id
, vdu_index
2520 nslcmop_id
=nslcmop_id
,
2526 member_vnf_index
=member_vnf_index
,
2527 vdu_index
=vdu_index
,
2529 deploy_params
=deploy_params_vdu
,
2530 descriptor_config
=descriptor_config
,
2531 base_folder
=base_folder
,
2532 task_instantiation_info
=tasks_dict_info
,
2535 for kdud
in get_kdu_list(vnfd
):
2536 kdu_name
= kdud
["name"]
2537 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2538 if descriptor_config
:
2543 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2545 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2546 if kdur
.get("additionalParams"):
2547 deploy_params_kdu
.update(
2548 parse_yaml_strings(kdur
["additionalParams"].copy())
2552 logging_text
=logging_text
,
2555 nslcmop_id
=nslcmop_id
,
2561 member_vnf_index
=member_vnf_index
,
2562 vdu_index
=vdu_index
,
2564 deploy_params
=deploy_params_kdu
,
2565 descriptor_config
=descriptor_config
,
2566 base_folder
=base_folder
,
2567 task_instantiation_info
=tasks_dict_info
,
2571 # Check if this NS has a charm configuration
2572 descriptor_config
= nsd
.get("ns-configuration")
2573 if descriptor_config
and descriptor_config
.get("juju"):
2576 member_vnf_index
= None
2582 # Get additional parameters
2583 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2584 if db_nsr
.get("additionalParamsForNs"):
2585 deploy_params
.update(
2586 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2588 base_folder
= nsd
["_admin"]["storage"]
2590 logging_text
=logging_text
,
2593 nslcmop_id
=nslcmop_id
,
2599 member_vnf_index
=member_vnf_index
,
2600 vdu_index
=vdu_index
,
2602 deploy_params
=deploy_params
,
2603 descriptor_config
=descriptor_config
,
2604 base_folder
=base_folder
,
2605 task_instantiation_info
=tasks_dict_info
,
2609 # rest of staff will be done at finally
2612 ROclient
.ROClientException
,
2618 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2621 except asyncio
.CancelledError
:
2623 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2625 exc
= "Operation was cancelled"
2626 except Exception as e
:
2627 exc
= traceback
.format_exc()
2628 self
.logger
.critical(
2629 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2634 error_list
.append(str(exc
))
2636 # wait for pending tasks
2638 stage
[1] = "Waiting for instantiate pending tasks."
2639 self
.logger
.debug(logging_text
+ stage
[1])
2640 error_list
+= await self
._wait
_for
_tasks
(
2648 stage
[1] = stage
[2] = ""
2649 except asyncio
.CancelledError
:
2650 error_list
.append("Cancelled")
2651 # TODO cancel all tasks
2652 except Exception as exc
:
2653 error_list
.append(str(exc
))
2655 # update operation-status
2656 db_nsr_update
["operational-status"] = "running"
2657 # let's begin with VCA 'configured' status (later we can change it)
2658 db_nsr_update
["config-status"] = "configured"
2659 for task
, task_name
in tasks_dict_info
.items():
2660 if not task
.done() or task
.cancelled() or task
.exception():
2661 if task_name
.startswith(self
.task_name_deploy_vca
):
2662 # A N2VC task is pending
2663 db_nsr_update
["config-status"] = "failed"
2665 # RO or KDU task is pending
2666 db_nsr_update
["operational-status"] = "failed"
2668 # update status at database
2670 error_detail
= ". ".join(error_list
)
2671 self
.logger
.error(logging_text
+ error_detail
)
2672 error_description_nslcmop
= "{} Detail: {}".format(
2673 stage
[0], error_detail
2675 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2676 nslcmop_id
, stage
[0]
2679 db_nsr_update
["detailed-status"] = (
2680 error_description_nsr
+ " Detail: " + error_detail
2682 db_nslcmop_update
["detailed-status"] = error_detail
2683 nslcmop_operation_state
= "FAILED"
2687 error_description_nsr
= error_description_nslcmop
= None
2689 db_nsr_update
["detailed-status"] = "Done"
2690 db_nslcmop_update
["detailed-status"] = "Done"
2691 nslcmop_operation_state
= "COMPLETED"
2694 self
._write
_ns
_status
(
2697 current_operation
="IDLE",
2698 current_operation_id
=None,
2699 error_description
=error_description_nsr
,
2700 error_detail
=error_detail
,
2701 other_update
=db_nsr_update
,
2703 self
._write
_op
_status
(
2706 error_message
=error_description_nslcmop
,
2707 operation_state
=nslcmop_operation_state
,
2708 other_update
=db_nslcmop_update
,
2711 if nslcmop_operation_state
:
2713 await self
.msg
.aiowrite(
2718 "nslcmop_id": nslcmop_id
,
2719 "operationState": nslcmop_operation_state
,
2723 except Exception as e
:
2725 logging_text
+ "kafka_write notification Exception {}".format(e
)
2728 self
.logger
.debug(logging_text
+ "Exit")
2729 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2731 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2732 if vnfd_id
not in cached_vnfds
:
2733 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2734 return cached_vnfds
[vnfd_id
]
2736 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2737 if vnf_profile_id
not in cached_vnfrs
:
2738 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2741 "member-vnf-index-ref": vnf_profile_id
,
2742 "nsr-id-ref": nsr_id
,
2745 return cached_vnfrs
[vnf_profile_id
]
2747 def _is_deployed_vca_in_relation(
2748 self
, vca
: DeployedVCA
, relation
: Relation
2751 for endpoint
in (relation
.provider
, relation
.requirer
):
2752 if endpoint
["kdu-resource-profile-id"]:
2755 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2756 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2757 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2763 def _update_ee_relation_data_with_implicit_data(
2764 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2766 ee_relation_data
= safe_get_ee_relation(
2767 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2769 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2770 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2771 "execution-environment-ref"
2773 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2774 vnfd_id
= vnf_profile
["vnfd-id"]
2775 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2778 if ee_relation_level
== EELevel
.VNF
2779 else ee_relation_data
["vdu-profile-id"]
2781 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2784 f
"not execution environments found for ee_relation {ee_relation_data}"
2786 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2787 return ee_relation_data
2789 def _get_ns_relations(
2792 nsd
: Dict
[str, Any
],
2794 cached_vnfds
: Dict
[str, Any
],
2795 ) -> List
[Relation
]:
2797 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2798 for r
in db_ns_relations
:
2799 provider_dict
= None
2800 requirer_dict
= None
2801 if all(key
in r
for key
in ("provider", "requirer")):
2802 provider_dict
= r
["provider"]
2803 requirer_dict
= r
["requirer"]
2804 elif "entities" in r
:
2805 provider_id
= r
["entities"][0]["id"]
2808 "endpoint": r
["entities"][0]["endpoint"],
2810 if provider_id
!= nsd
["id"]:
2811 provider_dict
["vnf-profile-id"] = provider_id
2812 requirer_id
= r
["entities"][1]["id"]
2815 "endpoint": r
["entities"][1]["endpoint"],
2817 if requirer_id
!= nsd
["id"]:
2818 requirer_dict
["vnf-profile-id"] = requirer_id
2820 raise Exception("provider/requirer or entities must be included in the relation.")
2821 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2822 nsr_id
, nsd
, provider_dict
, cached_vnfds
2824 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2825 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2827 provider
= EERelation(relation_provider
)
2828 requirer
= EERelation(relation_requirer
)
2829 relation
= Relation(r
["name"], provider
, requirer
)
2830 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2832 relations
.append(relation
)
2835 def _get_vnf_relations(
2838 nsd
: Dict
[str, Any
],
2840 cached_vnfds
: Dict
[str, Any
],
2841 ) -> List
[Relation
]:
2843 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2844 vnf_profile_id
= vnf_profile
["id"]
2845 vnfd_id
= vnf_profile
["vnfd-id"]
2846 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2847 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2848 for r
in db_vnf_relations
:
2849 provider_dict
= None
2850 requirer_dict
= None
2851 if all(key
in r
for key
in ("provider", "requirer")):
2852 provider_dict
= r
["provider"]
2853 requirer_dict
= r
["requirer"]
2854 elif "entities" in r
:
2855 provider_id
= r
["entities"][0]["id"]
2858 "vnf-profile-id": vnf_profile_id
,
2859 "endpoint": r
["entities"][0]["endpoint"],
2861 if provider_id
!= vnfd_id
:
2862 provider_dict
["vdu-profile-id"] = provider_id
2863 requirer_id
= r
["entities"][1]["id"]
2866 "vnf-profile-id": vnf_profile_id
,
2867 "endpoint": r
["entities"][1]["endpoint"],
2869 if requirer_id
!= vnfd_id
:
2870 requirer_dict
["vdu-profile-id"] = requirer_id
2872 raise Exception("provider/requirer or entities must be included in the relation.")
2873 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2874 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2876 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2877 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2879 provider
= EERelation(relation_provider
)
2880 requirer
= EERelation(relation_requirer
)
2881 relation
= Relation(r
["name"], provider
, requirer
)
2882 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2884 relations
.append(relation
)
2887 def _get_kdu_resource_data(
2889 ee_relation
: EERelation
,
2890 db_nsr
: Dict
[str, Any
],
2891 cached_vnfds
: Dict
[str, Any
],
2892 ) -> DeployedK8sResource
:
2893 nsd
= get_nsd(db_nsr
)
2894 vnf_profiles
= get_vnf_profiles(nsd
)
2895 vnfd_id
= find_in_list(
2897 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2899 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2900 kdu_resource_profile
= get_kdu_resource_profile(
2901 db_vnfd
, ee_relation
.kdu_resource_profile_id
2903 kdu_name
= kdu_resource_profile
["kdu-name"]
2904 deployed_kdu
, _
= get_deployed_kdu(
2905 db_nsr
.get("_admin", ()).get("deployed", ()),
2907 ee_relation
.vnf_profile_id
,
2909 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2912 def _get_deployed_component(
2914 ee_relation
: EERelation
,
2915 db_nsr
: Dict
[str, Any
],
2916 cached_vnfds
: Dict
[str, Any
],
2917 ) -> DeployedComponent
:
2918 nsr_id
= db_nsr
["_id"]
2919 deployed_component
= None
2920 ee_level
= EELevel
.get_level(ee_relation
)
2921 if ee_level
== EELevel
.NS
:
2922 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2924 deployed_component
= DeployedVCA(nsr_id
, vca
)
2925 elif ee_level
== EELevel
.VNF
:
2926 vca
= get_deployed_vca(
2930 "member-vnf-index": ee_relation
.vnf_profile_id
,
2931 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2935 deployed_component
= DeployedVCA(nsr_id
, vca
)
2936 elif ee_level
== EELevel
.VDU
:
2937 vca
= get_deployed_vca(
2940 "vdu_id": ee_relation
.vdu_profile_id
,
2941 "member-vnf-index": ee_relation
.vnf_profile_id
,
2942 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2946 deployed_component
= DeployedVCA(nsr_id
, vca
)
2947 elif ee_level
== EELevel
.KDU
:
2948 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2949 ee_relation
, db_nsr
, cached_vnfds
2951 if kdu_resource_data
:
2952 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2953 return deployed_component
2955 async def _add_relation(
2959 db_nsr
: Dict
[str, Any
],
2960 cached_vnfds
: Dict
[str, Any
],
2961 cached_vnfrs
: Dict
[str, Any
],
2963 deployed_provider
= self
._get
_deployed
_component
(
2964 relation
.provider
, db_nsr
, cached_vnfds
2966 deployed_requirer
= self
._get
_deployed
_component
(
2967 relation
.requirer
, db_nsr
, cached_vnfds
2971 and deployed_requirer
2972 and deployed_provider
.config_sw_installed
2973 and deployed_requirer
.config_sw_installed
2975 provider_db_vnfr
= (
2977 relation
.provider
.nsr_id
,
2978 relation
.provider
.vnf_profile_id
,
2981 if relation
.provider
.vnf_profile_id
2984 requirer_db_vnfr
= (
2986 relation
.requirer
.nsr_id
,
2987 relation
.requirer
.vnf_profile_id
,
2990 if relation
.requirer
.vnf_profile_id
2993 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
2994 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
2995 provider_relation_endpoint
= RelationEndpoint(
2996 deployed_provider
.ee_id
,
2998 relation
.provider
.endpoint
,
3000 requirer_relation_endpoint
= RelationEndpoint(
3001 deployed_requirer
.ee_id
,
3003 relation
.requirer
.endpoint
,
3005 await self
.vca_map
[vca_type
].add_relation(
3006 provider
=provider_relation_endpoint
,
3007 requirer
=requirer_relation_endpoint
,
3009 # remove entry from relations list
3013 async def _add_vca_relations(
3019 timeout
: int = 3600,
3023 # 1. find all relations for this VCA
3024 # 2. wait for other peers related
3028 # STEP 1: find all relations for this VCA
3031 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3032 nsd
= get_nsd(db_nsr
)
3035 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3036 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3041 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3042 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3044 # if no relations, terminate
3046 self
.logger
.debug(logging_text
+ " No relations")
3049 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3056 if now
- start
>= timeout
:
3057 self
.logger
.error(logging_text
+ " : timeout adding relations")
3060 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3061 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3063 # for each relation, find the VCA's related
3064 for relation
in relations
.copy():
3065 added
= await self
._add
_relation
(
3073 relations
.remove(relation
)
3076 self
.logger
.debug("Relations added")
3078 await asyncio
.sleep(5.0)
3082 except Exception as e
:
3083 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3086 async def _install_kdu(
3094 k8s_instance_info
: dict,
3095 k8params
: dict = None,
3101 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3104 "collection": "nsrs",
3105 "filter": {"_id": nsr_id
},
3106 "path": nsr_db_path
,
3109 if k8s_instance_info
.get("kdu-deployment-name"):
3110 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3112 kdu_instance
= self
.k8scluster_map
[
3114 ].generate_kdu_instance_name(
3115 db_dict
=db_dict_install
,
3116 kdu_model
=k8s_instance_info
["kdu-model"],
3117 kdu_name
=k8s_instance_info
["kdu-name"],
3120 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3122 await self
.k8scluster_map
[k8sclustertype
].install(
3123 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3124 kdu_model
=k8s_instance_info
["kdu-model"],
3127 db_dict
=db_dict_install
,
3129 kdu_name
=k8s_instance_info
["kdu-name"],
3130 namespace
=k8s_instance_info
["namespace"],
3131 kdu_instance
=kdu_instance
,
3135 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3138 # Obtain services to obtain management service ip
3139 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3140 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3141 kdu_instance
=kdu_instance
,
3142 namespace
=k8s_instance_info
["namespace"],
3145 # Obtain management service info (if exists)
3146 vnfr_update_dict
= {}
3147 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3149 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3154 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3157 for service
in kdud
.get("service", [])
3158 if service
.get("mgmt-service")
3160 for mgmt_service
in mgmt_services
:
3161 for service
in services
:
3162 if service
["name"].startswith(mgmt_service
["name"]):
3163 # Mgmt service found, Obtain service ip
3164 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3165 if isinstance(ip
, list) and len(ip
) == 1:
3169 "kdur.{}.ip-address".format(kdu_index
)
3172 # Check if must update also mgmt ip at the vnf
3173 service_external_cp
= mgmt_service
.get(
3174 "external-connection-point-ref"
3176 if service_external_cp
:
3178 deep_get(vnfd
, ("mgmt-interface", "cp"))
3179 == service_external_cp
3181 vnfr_update_dict
["ip-address"] = ip
3186 "external-connection-point-ref", ""
3188 == service_external_cp
,
3191 "kdur.{}.ip-address".format(kdu_index
)
3196 "Mgmt service name: {} not found".format(
3197 mgmt_service
["name"]
3201 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3202 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3204 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3207 and kdu_config
.get("initial-config-primitive")
3208 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3210 initial_config_primitive_list
= kdu_config
.get(
3211 "initial-config-primitive"
3213 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3215 for initial_config_primitive
in initial_config_primitive_list
:
3216 primitive_params_
= self
._map
_primitive
_params
(
3217 initial_config_primitive
, {}, {}
3220 await asyncio
.wait_for(
3221 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3222 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3223 kdu_instance
=kdu_instance
,
3224 primitive_name
=initial_config_primitive
["name"],
3225 params
=primitive_params_
,
3226 db_dict
=db_dict_install
,
3232 except Exception as e
:
3233 # Prepare update db with error and raise exception
3236 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3240 vnfr_data
.get("_id"),
3241 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3244 # ignore to keep original exception
3246 # reraise original error
3251 async def deploy_kdus(
3258 task_instantiation_info
,
3260 # Launch kdus if present in the descriptor
3262 k8scluster_id_2_uuic
= {
3263 "helm-chart-v3": {},
3268 async def _get_cluster_id(cluster_id
, cluster_type
):
3269 nonlocal k8scluster_id_2_uuic
3270 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3271 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3273 # check if K8scluster is creating and wait look if previous tasks in process
3274 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3275 "k8scluster", cluster_id
3278 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3279 task_name
, cluster_id
3281 self
.logger
.debug(logging_text
+ text
)
3282 await asyncio
.wait(task_dependency
, timeout
=3600)
3284 db_k8scluster
= self
.db
.get_one(
3285 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3287 if not db_k8scluster
:
3288 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3290 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3292 if cluster_type
== "helm-chart-v3":
3294 # backward compatibility for existing clusters that have not been initialized for helm v3
3295 k8s_credentials
= yaml
.safe_dump(
3296 db_k8scluster
.get("credentials")
3298 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3299 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3301 db_k8scluster_update
= {}
3302 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3303 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3304 db_k8scluster_update
[
3305 "_admin.helm-chart-v3.created"
3307 db_k8scluster_update
[
3308 "_admin.helm-chart-v3.operationalState"
3311 "k8sclusters", cluster_id
, db_k8scluster_update
3313 except Exception as e
:
3316 + "error initializing helm-v3 cluster: {}".format(str(e
))
3319 "K8s cluster '{}' has not been initialized for '{}'".format(
3320 cluster_id
, cluster_type
3325 "K8s cluster '{}' has not been initialized for '{}'".format(
3326 cluster_id
, cluster_type
3329 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3332 logging_text
+= "Deploy kdus: "
3335 db_nsr_update
= {"_admin.deployed.K8s": []}
3336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3339 updated_cluster_list
= []
3340 updated_v3_cluster_list
= []
3342 for vnfr_data
in db_vnfrs
.values():
3343 vca_id
= self
.get_vca_id(vnfr_data
, {})
3344 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3345 # Step 0: Prepare and set parameters
3346 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3347 vnfd_id
= vnfr_data
.get("vnfd-id")
3348 vnfd_with_id
= find_in_list(
3349 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3353 for kdud
in vnfd_with_id
["kdu"]
3354 if kdud
["name"] == kdur
["kdu-name"]
3356 namespace
= kdur
.get("k8s-namespace")
3357 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3358 if kdur
.get("helm-chart"):
3359 kdumodel
= kdur
["helm-chart"]
3360 # Default version: helm3, if helm-version is v2 assign v2
3361 k8sclustertype
= "helm-chart-v3"
3362 self
.logger
.debug("kdur: {}".format(kdur
))
3364 kdur
.get("helm-version")
3365 and kdur
.get("helm-version") == "v2"
3367 k8sclustertype
= "helm-chart"
3368 elif kdur
.get("juju-bundle"):
3369 kdumodel
= kdur
["juju-bundle"]
3370 k8sclustertype
= "juju-bundle"
3373 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3374 "juju-bundle. Maybe an old NBI version is running".format(
3375 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3378 # check if kdumodel is a file and exists
3380 vnfd_with_id
= find_in_list(
3381 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3383 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3384 if storage
: # may be not present if vnfd has not artifacts
3385 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3386 if storage
["pkg-dir"]:
3387 filename
= "{}/{}/{}s/{}".format(
3394 filename
= "{}/Scripts/{}s/{}".format(
3399 if self
.fs
.file_exists(
3400 filename
, mode
="file"
3401 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3402 kdumodel
= self
.fs
.path
+ filename
3403 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3405 except Exception: # it is not a file
3408 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3409 step
= "Synchronize repos for k8s cluster '{}'".format(
3412 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3416 k8sclustertype
== "helm-chart"
3417 and cluster_uuid
not in updated_cluster_list
3419 k8sclustertype
== "helm-chart-v3"
3420 and cluster_uuid
not in updated_v3_cluster_list
3422 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3423 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3424 cluster_uuid
=cluster_uuid
3427 if del_repo_list
or added_repo_dict
:
3428 if k8sclustertype
== "helm-chart":
3430 "_admin.helm_charts_added." + item
: None
3431 for item
in del_repo_list
3434 "_admin.helm_charts_added." + item
: name
3435 for item
, name
in added_repo_dict
.items()
3437 updated_cluster_list
.append(cluster_uuid
)
3438 elif k8sclustertype
== "helm-chart-v3":
3440 "_admin.helm_charts_v3_added." + item
: None
3441 for item
in del_repo_list
3444 "_admin.helm_charts_v3_added." + item
: name
3445 for item
, name
in added_repo_dict
.items()
3447 updated_v3_cluster_list
.append(cluster_uuid
)
3449 logging_text
+ "repos synchronized on k8s cluster "
3450 "'{}' to_delete: {}, to_add: {}".format(
3451 k8s_cluster_id
, del_repo_list
, added_repo_dict
3456 {"_id": k8s_cluster_id
},
3462 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3463 vnfr_data
["member-vnf-index-ref"],
3467 k8s_instance_info
= {
3468 "kdu-instance": None,
3469 "k8scluster-uuid": cluster_uuid
,
3470 "k8scluster-type": k8sclustertype
,
3471 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3472 "kdu-name": kdur
["kdu-name"],
3473 "kdu-model": kdumodel
,
3474 "namespace": namespace
,
3475 "kdu-deployment-name": kdu_deployment_name
,
3477 db_path
= "_admin.deployed.K8s.{}".format(index
)
3478 db_nsr_update
[db_path
] = k8s_instance_info
3479 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3480 vnfd_with_id
= find_in_list(
3481 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3483 task
= asyncio
.ensure_future(
3492 k8params
=desc_params
,
3497 self
.lcm_tasks
.register(
3501 "instantiate_KDU-{}".format(index
),
3504 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3510 except (LcmException
, asyncio
.CancelledError
):
3512 except Exception as e
:
3513 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3514 if isinstance(e
, (N2VCException
, DbException
)):
3515 self
.logger
.error(logging_text
+ msg
)
3517 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3518 raise LcmException(msg
)
3521 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3540 task_instantiation_info
,
3543 # launch instantiate_N2VC in a asyncio task and register task object
3544 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3545 # if not found, create one entry and update database
3546 # fill db_nsr._admin.deployed.VCA.<index>
3549 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3551 if "execution-environment-list" in descriptor_config
:
3552 ee_list
= descriptor_config
.get("execution-environment-list", [])
3553 elif "juju" in descriptor_config
:
3554 ee_list
= [descriptor_config
] # ns charms
3555 else: # other types as script are not supported
3558 for ee_item
in ee_list
:
3561 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3562 ee_item
.get("juju"), ee_item
.get("helm-chart")
3565 ee_descriptor_id
= ee_item
.get("id")
3566 if ee_item
.get("juju"):
3567 vca_name
= ee_item
["juju"].get("charm")
3570 if ee_item
["juju"].get("charm") is not None
3573 if ee_item
["juju"].get("cloud") == "k8s":
3574 vca_type
= "k8s_proxy_charm"
3575 elif ee_item
["juju"].get("proxy") is False:
3576 vca_type
= "native_charm"
3577 elif ee_item
.get("helm-chart"):
3578 vca_name
= ee_item
["helm-chart"]
3579 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3582 vca_type
= "helm-v3"
3585 logging_text
+ "skipping non juju neither charm configuration"
3590 for vca_index
, vca_deployed
in enumerate(
3591 db_nsr
["_admin"]["deployed"]["VCA"]
3593 if not vca_deployed
:
3596 vca_deployed
.get("member-vnf-index") == member_vnf_index
3597 and vca_deployed
.get("vdu_id") == vdu_id
3598 and vca_deployed
.get("kdu_name") == kdu_name
3599 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3600 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3604 # not found, create one.
3606 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3609 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3611 target
+= "/kdu/{}".format(kdu_name
)
3613 "target_element": target
,
3614 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3615 "member-vnf-index": member_vnf_index
,
3617 "kdu_name": kdu_name
,
3618 "vdu_count_index": vdu_index
,
3619 "operational-status": "init", # TODO revise
3620 "detailed-status": "", # TODO revise
3621 "step": "initial-deploy", # TODO revise
3623 "vdu_name": vdu_name
,
3625 "ee_descriptor_id": ee_descriptor_id
,
3629 # create VCA and configurationStatus in db
3631 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3632 "configurationStatus.{}".format(vca_index
): dict(),
3634 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3636 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3638 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3639 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3640 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3643 task_n2vc
= asyncio
.ensure_future(
3644 self
.instantiate_N2VC(
3645 logging_text
=logging_text
,
3646 vca_index
=vca_index
,
3652 vdu_index
=vdu_index
,
3653 deploy_params
=deploy_params
,
3654 config_descriptor
=descriptor_config
,
3655 base_folder
=base_folder
,
3656 nslcmop_id
=nslcmop_id
,
3660 ee_config_descriptor
=ee_item
,
3663 self
.lcm_tasks
.register(
3667 "instantiate_N2VC-{}".format(vca_index
),
3670 task_instantiation_info
[
3672 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3673 member_vnf_index
or "", vdu_id
or ""
3677 def _create_nslcmop(nsr_id
, operation
, params
):
3679 Creates a ns-lcm-opp content to be stored at database.
3680 :param nsr_id: internal id of the instance
3681 :param operation: instantiate, terminate, scale, action, ...
3682 :param params: user parameters for the operation
3683 :return: dictionary following SOL005 format
3685 # Raise exception if invalid arguments
3686 if not (nsr_id
and operation
and params
):
3688 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3695 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3696 "operationState": "PROCESSING",
3697 "statusEnteredTime": now
,
3698 "nsInstanceId": nsr_id
,
3699 "lcmOperationType": operation
,
3701 "isAutomaticInvocation": False,
3702 "operationParams": params
,
3703 "isCancelPending": False,
3705 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3706 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3711 def _format_additional_params(self
, params
):
3712 params
= params
or {}
3713 for key
, value
in params
.items():
3714 if str(value
).startswith("!!yaml "):
3715 params
[key
] = yaml
.safe_load(value
[7:])
3718 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3719 primitive
= seq
.get("name")
3720 primitive_params
= {}
3722 "member_vnf_index": vnf_index
,
3723 "primitive": primitive
,
3724 "primitive_params": primitive_params
,
3727 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3731 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3732 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3733 if op
.get("operationState") == "COMPLETED":
3734 # b. Skip sub-operation
3735 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3736 return self
.SUBOPERATION_STATUS_SKIP
3738 # c. retry executing sub-operation
3739 # The sub-operation exists, and operationState != 'COMPLETED'
3740 # Update operationState = 'PROCESSING' to indicate a retry.
3741 operationState
= "PROCESSING"
3742 detailed_status
= "In progress"
3743 self
._update
_suboperation
_status
(
3744 db_nslcmop
, op_index
, operationState
, detailed_status
3746 # Return the sub-operation index
3747 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3748 # with arguments extracted from the sub-operation
3751 # Find a sub-operation where all keys in a matching dictionary must match
3752 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3753 def _find_suboperation(self
, db_nslcmop
, match
):
3754 if db_nslcmop
and match
:
3755 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3756 for i
, op
in enumerate(op_list
):
3757 if all(op
.get(k
) == match
[k
] for k
in match
):
3759 return self
.SUBOPERATION_STATUS_NOT_FOUND
3761 # Update status for a sub-operation given its index
3762 def _update_suboperation_status(
3763 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3765 # Update DB for HA tasks
3766 q_filter
= {"_id": db_nslcmop
["_id"]}
3768 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3769 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3772 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3775 # Add sub-operation, return the index of the added sub-operation
3776 # Optionally, set operationState, detailed-status, and operationType
3777 # Status and type are currently set for 'scale' sub-operations:
3778 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3779 # 'detailed-status' : status message
3780 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3781 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3782 def _add_suboperation(
3790 mapped_primitive_params
,
3791 operationState
=None,
3792 detailed_status
=None,
3795 RO_scaling_info
=None,
3798 return self
.SUBOPERATION_STATUS_NOT_FOUND
3799 # Get the "_admin.operations" list, if it exists
3800 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3801 op_list
= db_nslcmop_admin
.get("operations")
3802 # Create or append to the "_admin.operations" list
3804 "member_vnf_index": vnf_index
,
3806 "vdu_count_index": vdu_count_index
,
3807 "primitive": primitive
,
3808 "primitive_params": mapped_primitive_params
,
3811 new_op
["operationState"] = operationState
3813 new_op
["detailed-status"] = detailed_status
3815 new_op
["lcmOperationType"] = operationType
3817 new_op
["RO_nsr_id"] = RO_nsr_id
3819 new_op
["RO_scaling_info"] = RO_scaling_info
3821 # No existing operations, create key 'operations' with current operation as first list element
3822 db_nslcmop_admin
.update({"operations": [new_op
]})
3823 op_list
= db_nslcmop_admin
.get("operations")
3825 # Existing operations, append operation to list
3826 op_list
.append(new_op
)
3828 db_nslcmop_update
= {"_admin.operations": op_list
}
3829 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3830 op_index
= len(op_list
) - 1
3833 # Helper methods for scale() sub-operations
3835 # pre-scale/post-scale:
3836 # Check for 3 different cases:
3837 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3838 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3839 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3840 def _check_or_add_scale_suboperation(
3844 vnf_config_primitive
,
3848 RO_scaling_info
=None,
3850 # Find this sub-operation
3851 if RO_nsr_id
and RO_scaling_info
:
3852 operationType
= "SCALE-RO"
3854 "member_vnf_index": vnf_index
,
3855 "RO_nsr_id": RO_nsr_id
,
3856 "RO_scaling_info": RO_scaling_info
,
3860 "member_vnf_index": vnf_index
,
3861 "primitive": vnf_config_primitive
,
3862 "primitive_params": primitive_params
,
3863 "lcmOperationType": operationType
,
3865 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3866 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3867 # a. New sub-operation
3868 # The sub-operation does not exist, add it.
3869 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3870 # The following parameters are set to None for all kind of scaling:
3872 vdu_count_index
= None
3874 if RO_nsr_id
and RO_scaling_info
:
3875 vnf_config_primitive
= None
3876 primitive_params
= None
3879 RO_scaling_info
= None
3880 # Initial status for sub-operation
3881 operationState
= "PROCESSING"
3882 detailed_status
= "In progress"
3883 # Add sub-operation for pre/post-scaling (zero or more operations)
3884 self
._add
_suboperation
(
3890 vnf_config_primitive
,
3898 return self
.SUBOPERATION_STATUS_NEW
3900 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3901 # or op_index (operationState != 'COMPLETED')
3902 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3904 # Function to return execution_environment id
3906 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3907 # TODO vdu_index_count
3908 for vca
in vca_deployed_list
:
3909 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3912 async def destroy_N2VC(
3920 exec_primitives
=True,
3925 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3926 :param logging_text:
3928 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3929 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3930 :param vca_index: index in the database _admin.deployed.VCA
3931 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3932 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3933 not executed properly
3934 :param scaling_in: True destroys the application, False destroys the model
3935 :return: None or exception
3940 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3941 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3945 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3947 # execute terminate_primitives
3949 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3950 config_descriptor
.get("terminate-config-primitive"),
3951 vca_deployed
.get("ee_descriptor_id"),
3953 vdu_id
= vca_deployed
.get("vdu_id")
3954 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3955 vdu_name
= vca_deployed
.get("vdu_name")
3956 vnf_index
= vca_deployed
.get("member-vnf-index")
3957 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3958 for seq
in terminate_primitives
:
3959 # For each sequence in list, get primitive and call _ns_execute_primitive()
3960 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3961 vnf_index
, seq
.get("name")
3963 self
.logger
.debug(logging_text
+ step
)
3964 # Create the primitive for each sequence, i.e. "primitive": "touch"
3965 primitive
= seq
.get("name")
3966 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3971 self
._add
_suboperation
(
3978 mapped_primitive_params
,
3980 # Sub-operations: Call _ns_execute_primitive() instead of action()
3982 result
, result_detail
= await self
._ns
_execute
_primitive
(
3983 vca_deployed
["ee_id"],
3985 mapped_primitive_params
,
3989 except LcmException
:
3990 # this happens when VCA is not deployed. In this case it is not needed to terminate
3992 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3993 if result
not in result_ok
:
3995 "terminate_primitive {} for vnf_member_index={} fails with "
3996 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3998 # set that this VCA do not need terminated
3999 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4003 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4006 # Delete Prometheus Jobs if any
4007 # This uses NSR_ID, so it will destroy any jobs under this index
4008 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4011 await self
.vca_map
[vca_type
].delete_execution_environment(
4012 vca_deployed
["ee_id"],
4013 scaling_in
=scaling_in
,
4018 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4019 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4020 namespace
= "." + db_nsr
["_id"]
4022 await self
.n2vc
.delete_namespace(
4023 namespace
=namespace
,
4024 total_timeout
=self
.timeout_charm_delete
,
4027 except N2VCNotFound
: # already deleted. Skip
4029 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4031 async def _terminate_RO(
4032 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4035 Terminates a deployment from RO
4036 :param logging_text:
4037 :param nsr_deployed: db_nsr._admin.deployed
4040 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4041 this method will update only the index 2, but it will write on database the concatenated content of the list
4046 ro_nsr_id
= ro_delete_action
= None
4047 if nsr_deployed
and nsr_deployed
.get("RO"):
4048 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4049 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4052 stage
[2] = "Deleting ns from VIM."
4053 db_nsr_update
["detailed-status"] = " ".join(stage
)
4054 self
._write
_op
_status
(nslcmop_id
, stage
)
4055 self
.logger
.debug(logging_text
+ stage
[2])
4056 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4057 self
._write
_op
_status
(nslcmop_id
, stage
)
4058 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4059 ro_delete_action
= desc
["action_id"]
4061 "_admin.deployed.RO.nsr_delete_action_id"
4062 ] = ro_delete_action
4063 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4064 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4065 if ro_delete_action
:
4066 # wait until NS is deleted from VIM
4067 stage
[2] = "Waiting ns deleted from VIM."
4068 detailed_status_old
= None
4072 + " RO_id={} ro_delete_action={}".format(
4073 ro_nsr_id
, ro_delete_action
4076 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4077 self
._write
_op
_status
(nslcmop_id
, stage
)
4079 delete_timeout
= 20 * 60 # 20 minutes
4080 while delete_timeout
> 0:
4081 desc
= await self
.RO
.show(
4083 item_id_name
=ro_nsr_id
,
4084 extra_item
="action",
4085 extra_item_id
=ro_delete_action
,
4089 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4091 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4092 if ns_status
== "ERROR":
4093 raise ROclient
.ROClientException(ns_status_info
)
4094 elif ns_status
== "BUILD":
4095 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4096 elif ns_status
== "ACTIVE":
4097 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4098 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4103 ), "ROclient.check_action_status returns unknown {}".format(
4106 if stage
[2] != detailed_status_old
:
4107 detailed_status_old
= stage
[2]
4108 db_nsr_update
["detailed-status"] = " ".join(stage
)
4109 self
._write
_op
_status
(nslcmop_id
, stage
)
4110 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4111 await asyncio
.sleep(5, loop
=self
.loop
)
4113 else: # delete_timeout <= 0:
4114 raise ROclient
.ROClientException(
4115 "Timeout waiting ns deleted from VIM"
4118 except Exception as e
:
4119 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4121 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4123 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4124 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4125 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4127 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4130 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4132 failed_detail
.append("delete conflict: {}".format(e
))
4135 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4138 failed_detail
.append("delete error: {}".format(e
))
4140 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4144 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4145 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4147 stage
[2] = "Deleting nsd from RO."
4148 db_nsr_update
["detailed-status"] = " ".join(stage
)
4149 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4150 self
._write
_op
_status
(nslcmop_id
, stage
)
4151 await self
.RO
.delete("nsd", ro_nsd_id
)
4153 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4155 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4156 except Exception as e
:
4158 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4160 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4162 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4165 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4167 failed_detail
.append(
4168 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4170 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4172 failed_detail
.append(
4173 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4175 self
.logger
.error(logging_text
+ failed_detail
[-1])
4177 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4178 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4179 if not vnf_deployed
or not vnf_deployed
["id"]:
4182 ro_vnfd_id
= vnf_deployed
["id"]
4185 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4186 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4188 db_nsr_update
["detailed-status"] = " ".join(stage
)
4189 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4190 self
._write
_op
_status
(nslcmop_id
, stage
)
4191 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4193 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4195 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4196 except Exception as e
:
4198 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4201 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4205 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4208 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4210 failed_detail
.append(
4211 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4213 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4215 failed_detail
.append(
4216 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4218 self
.logger
.error(logging_text
+ failed_detail
[-1])
4221 stage
[2] = "Error deleting from VIM"
4223 stage
[2] = "Deleted from VIM"
4224 db_nsr_update
["detailed-status"] = " ".join(stage
)
4225 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4226 self
._write
_op
_status
(nslcmop_id
, stage
)
4229 raise LcmException("; ".join(failed_detail
))
4231 async def terminate(self
, nsr_id
, nslcmop_id
):
4232 # Try to lock HA task here
4233 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4234 if not task_is_locked_by_me
:
4237 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4238 self
.logger
.debug(logging_text
+ "Enter")
4239 timeout_ns_terminate
= self
.timeout_ns_terminate
4242 operation_params
= None
4244 error_list
= [] # annotates all failed error messages
4245 db_nslcmop_update
= {}
4246 autoremove
= False # autoremove after terminated
4247 tasks_dict_info
= {}
4250 "Stage 1/3: Preparing task.",
4251 "Waiting for previous operations to terminate.",
4254 # ^ contains [stage, step, VIM-status]
4256 # wait for any previous tasks in process
4257 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4259 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4260 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4261 operation_params
= db_nslcmop
.get("operationParams") or {}
4262 if operation_params
.get("timeout_ns_terminate"):
4263 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4264 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4265 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4267 db_nsr_update
["operational-status"] = "terminating"
4268 db_nsr_update
["config-status"] = "terminating"
4269 self
._write
_ns
_status
(
4271 ns_state
="TERMINATING",
4272 current_operation
="TERMINATING",
4273 current_operation_id
=nslcmop_id
,
4274 other_update
=db_nsr_update
,
4276 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4277 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4278 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4281 stage
[1] = "Getting vnf descriptors from db."
4282 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4284 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4286 db_vnfds_from_id
= {}
4287 db_vnfds_from_member_index
= {}
4289 for vnfr
in db_vnfrs_list
:
4290 vnfd_id
= vnfr
["vnfd-id"]
4291 if vnfd_id
not in db_vnfds_from_id
:
4292 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4293 db_vnfds_from_id
[vnfd_id
] = vnfd
4294 db_vnfds_from_member_index
[
4295 vnfr
["member-vnf-index-ref"]
4296 ] = db_vnfds_from_id
[vnfd_id
]
4298 # Destroy individual execution environments when there are terminating primitives.
4299 # Rest of EE will be deleted at once
4300 # TODO - check before calling _destroy_N2VC
4301 # if not operation_params.get("skip_terminate_primitives"):#
4302 # or not vca.get("needed_terminate"):
4303 stage
[0] = "Stage 2/3 execute terminating primitives."
4304 self
.logger
.debug(logging_text
+ stage
[0])
4305 stage
[1] = "Looking execution environment that needs terminate."
4306 self
.logger
.debug(logging_text
+ stage
[1])
4308 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4309 config_descriptor
= None
4310 vca_member_vnf_index
= vca
.get("member-vnf-index")
4311 vca_id
= self
.get_vca_id(
4312 db_vnfrs_dict
.get(vca_member_vnf_index
)
4313 if vca_member_vnf_index
4317 if not vca
or not vca
.get("ee_id"):
4319 if not vca
.get("member-vnf-index"):
4321 config_descriptor
= db_nsr
.get("ns-configuration")
4322 elif vca
.get("vdu_id"):
4323 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4324 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4325 elif vca
.get("kdu_name"):
4326 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4327 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4329 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4330 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4331 vca_type
= vca
.get("type")
4332 exec_terminate_primitives
= not operation_params
.get(
4333 "skip_terminate_primitives"
4334 ) and vca
.get("needed_terminate")
4335 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4336 # pending native charms
4338 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4340 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4341 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4342 task
= asyncio
.ensure_future(
4350 exec_terminate_primitives
,
4354 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4356 # wait for pending tasks of terminate primitives
4360 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4362 error_list
= await self
._wait
_for
_tasks
(
4365 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4369 tasks_dict_info
.clear()
4371 return # raise LcmException("; ".join(error_list))
4373 # remove All execution environments at once
4374 stage
[0] = "Stage 3/3 delete all."
4376 if nsr_deployed
.get("VCA"):
4377 stage
[1] = "Deleting all execution environments."
4378 self
.logger
.debug(logging_text
+ stage
[1])
4379 vca_id
= self
.get_vca_id({}, db_nsr
)
4380 task_delete_ee
= asyncio
.ensure_future(
4382 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4383 timeout
=self
.timeout_charm_delete
,
4386 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4387 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4389 # Delete from k8scluster
4390 stage
[1] = "Deleting KDUs."
4391 self
.logger
.debug(logging_text
+ stage
[1])
4392 # print(nsr_deployed)
4393 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4394 if not kdu
or not kdu
.get("kdu-instance"):
4396 kdu_instance
= kdu
.get("kdu-instance")
4397 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4398 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4399 vca_id
= self
.get_vca_id({}, db_nsr
)
4400 task_delete_kdu_instance
= asyncio
.ensure_future(
4401 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4402 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4403 kdu_instance
=kdu_instance
,
4410 + "Unknown k8s deployment type {}".format(
4411 kdu
.get("k8scluster-type")
4416 task_delete_kdu_instance
4417 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4420 stage
[1] = "Deleting ns from VIM."
4422 task_delete_ro
= asyncio
.ensure_future(
4423 self
._terminate
_ng
_ro
(
4424 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4428 task_delete_ro
= asyncio
.ensure_future(
4430 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4433 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4435 # rest of staff will be done at finally
4438 ROclient
.ROClientException
,
4443 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4445 except asyncio
.CancelledError
:
4447 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4449 exc
= "Operation was cancelled"
4450 except Exception as e
:
4451 exc
= traceback
.format_exc()
4452 self
.logger
.critical(
4453 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4458 error_list
.append(str(exc
))
4460 # wait for pending tasks
4462 stage
[1] = "Waiting for terminate pending tasks."
4463 self
.logger
.debug(logging_text
+ stage
[1])
4464 error_list
+= await self
._wait
_for
_tasks
(
4467 timeout_ns_terminate
,
4471 stage
[1] = stage
[2] = ""
4472 except asyncio
.CancelledError
:
4473 error_list
.append("Cancelled")
4474 # TODO cancell all tasks
4475 except Exception as exc
:
4476 error_list
.append(str(exc
))
4477 # update status at database
4479 error_detail
= "; ".join(error_list
)
4480 # self.logger.error(logging_text + error_detail)
4481 error_description_nslcmop
= "{} Detail: {}".format(
4482 stage
[0], error_detail
4484 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4485 nslcmop_id
, stage
[0]
4488 db_nsr_update
["operational-status"] = "failed"
4489 db_nsr_update
["detailed-status"] = (
4490 error_description_nsr
+ " Detail: " + error_detail
4492 db_nslcmop_update
["detailed-status"] = error_detail
4493 nslcmop_operation_state
= "FAILED"
4497 error_description_nsr
= error_description_nslcmop
= None
4498 ns_state
= "NOT_INSTANTIATED"
4499 db_nsr_update
["operational-status"] = "terminated"
4500 db_nsr_update
["detailed-status"] = "Done"
4501 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4502 db_nslcmop_update
["detailed-status"] = "Done"
4503 nslcmop_operation_state
= "COMPLETED"
4506 self
._write
_ns
_status
(
4509 current_operation
="IDLE",
4510 current_operation_id
=None,
4511 error_description
=error_description_nsr
,
4512 error_detail
=error_detail
,
4513 other_update
=db_nsr_update
,
4515 self
._write
_op
_status
(
4518 error_message
=error_description_nslcmop
,
4519 operation_state
=nslcmop_operation_state
,
4520 other_update
=db_nslcmop_update
,
4522 if ns_state
== "NOT_INSTANTIATED":
4526 {"nsr-id-ref": nsr_id
},
4527 {"_admin.nsState": "NOT_INSTANTIATED"},
4529 except DbException
as e
:
4532 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4536 if operation_params
:
4537 autoremove
= operation_params
.get("autoremove", False)
4538 if nslcmop_operation_state
:
4540 await self
.msg
.aiowrite(
4545 "nslcmop_id": nslcmop_id
,
4546 "operationState": nslcmop_operation_state
,
4547 "autoremove": autoremove
,
4551 except Exception as e
:
4553 logging_text
+ "kafka_write notification Exception {}".format(e
)
4556 self
.logger
.debug(logging_text
+ "Exit")
4557 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4559 async def _wait_for_tasks(
4560 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4563 error_detail_list
= []
4565 pending_tasks
= list(created_tasks_info
.keys())
4566 num_tasks
= len(pending_tasks
)
4568 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4569 self
._write
_op
_status
(nslcmop_id
, stage
)
4570 while pending_tasks
:
4572 _timeout
= timeout
+ time_start
- time()
4573 done
, pending_tasks
= await asyncio
.wait(
4574 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4576 num_done
+= len(done
)
4577 if not done
: # Timeout
4578 for task
in pending_tasks
:
4579 new_error
= created_tasks_info
[task
] + ": Timeout"
4580 error_detail_list
.append(new_error
)
4581 error_list
.append(new_error
)
4584 if task
.cancelled():
4587 exc
= task
.exception()
4589 if isinstance(exc
, asyncio
.TimeoutError
):
4591 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4592 error_list
.append(created_tasks_info
[task
])
4593 error_detail_list
.append(new_error
)
4600 ROclient
.ROClientException
,
4606 self
.logger
.error(logging_text
+ new_error
)
4608 exc_traceback
= "".join(
4609 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4613 + created_tasks_info
[task
]
4619 logging_text
+ created_tasks_info
[task
] + ": Done"
4621 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4623 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4624 if nsr_id
: # update also nsr
4629 "errorDescription": "Error at: " + ", ".join(error_list
),
4630 "errorDetail": ". ".join(error_detail_list
),
4633 self
._write
_op
_status
(nslcmop_id
, stage
)
4634 return error_detail_list
4637 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4639 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4640 The default-value is used. If it is between < > it look for a value at instantiation_params
4641 :param primitive_desc: portion of VNFD/NSD that describes primitive
4642 :param params: Params provided by user
4643 :param instantiation_params: Instantiation params provided by user
4644 :return: a dictionary with the calculated params
4646 calculated_params
= {}
4647 for parameter
in primitive_desc
.get("parameter", ()):
4648 param_name
= parameter
["name"]
4649 if param_name
in params
:
4650 calculated_params
[param_name
] = params
[param_name
]
4651 elif "default-value" in parameter
or "value" in parameter
:
4652 if "value" in parameter
:
4653 calculated_params
[param_name
] = parameter
["value"]
4655 calculated_params
[param_name
] = parameter
["default-value"]
4657 isinstance(calculated_params
[param_name
], str)
4658 and calculated_params
[param_name
].startswith("<")
4659 and calculated_params
[param_name
].endswith(">")
4661 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4662 calculated_params
[param_name
] = instantiation_params
[
4663 calculated_params
[param_name
][1:-1]
4667 "Parameter {} needed to execute primitive {} not provided".format(
4668 calculated_params
[param_name
], primitive_desc
["name"]
4673 "Parameter {} needed to execute primitive {} not provided".format(
4674 param_name
, primitive_desc
["name"]
4678 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4679 calculated_params
[param_name
] = yaml
.safe_dump(
4680 calculated_params
[param_name
], default_flow_style
=True, width
=256
4682 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4684 ].startswith("!!yaml "):
4685 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4686 if parameter
.get("data-type") == "INTEGER":
4688 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4689 except ValueError: # error converting string to int
4691 "Parameter {} of primitive {} must be integer".format(
4692 param_name
, primitive_desc
["name"]
4695 elif parameter
.get("data-type") == "BOOLEAN":
4696 calculated_params
[param_name
] = not (
4697 (str(calculated_params
[param_name
])).lower() == "false"
4700 # add always ns_config_info if primitive name is config
4701 if primitive_desc
["name"] == "config":
4702 if "ns_config_info" in instantiation_params
:
4703 calculated_params
["ns_config_info"] = instantiation_params
[
4706 return calculated_params
4708 def _look_for_deployed_vca(
4715 ee_descriptor_id
=None,
4717 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4718 for vca
in deployed_vca
:
4721 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4724 vdu_count_index
is not None
4725 and vdu_count_index
!= vca
["vdu_count_index"]
4728 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4730 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4734 # vca_deployed not found
4736 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4737 " is not deployed".format(
4746 ee_id
= vca
.get("ee_id")
4748 "type", "lxc_proxy_charm"
4749 ) # default value for backward compatibility - proxy charm
4752 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4753 "execution environment".format(
4754 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4757 return ee_id
, vca_type
4759 async def _ns_execute_primitive(
4765 retries_interval
=30,
4772 if primitive
== "config":
4773 primitive_params
= {"params": primitive_params
}
4775 vca_type
= vca_type
or "lxc_proxy_charm"
4779 output
= await asyncio
.wait_for(
4780 self
.vca_map
[vca_type
].exec_primitive(
4782 primitive_name
=primitive
,
4783 params_dict
=primitive_params
,
4784 progress_timeout
=self
.timeout_progress_primitive
,
4785 total_timeout
=self
.timeout_primitive
,
4790 timeout
=timeout
or self
.timeout_primitive
,
4794 except asyncio
.CancelledError
:
4796 except Exception as e
: # asyncio.TimeoutError
4797 if isinstance(e
, asyncio
.TimeoutError
):
4802 "Error executing action {} on {} -> {}".format(
4807 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4809 return "FAILED", str(e
)
4811 return "COMPLETED", output
4813 except (LcmException
, asyncio
.CancelledError
):
4815 except Exception as e
:
4816 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4818 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4820 Updating the vca_status with latest juju information in nsrs record
4821 :param: nsr_id: Id of the nsr
4822 :param: nslcmop_id: Id of the nslcmop
4826 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4827 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4828 vca_id
= self
.get_vca_id({}, db_nsr
)
4829 if db_nsr
["_admin"]["deployed"]["K8s"]:
4830 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4831 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4832 await self
._on
_update
_k
8s
_db
(
4833 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4836 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4837 table
, filter = "nsrs", {"_id": nsr_id
}
4838 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4839 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4841 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4842 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4844 async def action(self
, nsr_id
, nslcmop_id
):
4845 # Try to lock HA task here
4846 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4847 if not task_is_locked_by_me
:
4850 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4851 self
.logger
.debug(logging_text
+ "Enter")
4852 # get all needed from database
4856 db_nslcmop_update
= {}
4857 nslcmop_operation_state
= None
4858 error_description_nslcmop
= None
4861 # wait for any previous tasks in process
4862 step
= "Waiting for previous operations to terminate"
4863 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4865 self
._write
_ns
_status
(
4868 current_operation
="RUNNING ACTION",
4869 current_operation_id
=nslcmop_id
,
4872 step
= "Getting information from database"
4873 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4874 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4875 if db_nslcmop
["operationParams"].get("primitive_params"):
4876 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4877 db_nslcmop
["operationParams"]["primitive_params"]
4880 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4881 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4882 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4883 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4884 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4885 primitive
= db_nslcmop
["operationParams"]["primitive"]
4886 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4887 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4888 "timeout_ns_action", self
.timeout_primitive
4892 step
= "Getting vnfr from database"
4893 db_vnfr
= self
.db
.get_one(
4894 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4896 if db_vnfr
.get("kdur"):
4898 for kdur
in db_vnfr
["kdur"]:
4899 if kdur
.get("additionalParams"):
4900 kdur
["additionalParams"] = json
.loads(
4901 kdur
["additionalParams"]
4903 kdur_list
.append(kdur
)
4904 db_vnfr
["kdur"] = kdur_list
4905 step
= "Getting vnfd from database"
4906 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4908 step
= "Getting nsd from database"
4909 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4911 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4912 # for backward compatibility
4913 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4914 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4915 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4916 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4918 # look for primitive
4919 config_primitive_desc
= descriptor_configuration
= None
4921 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4923 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4925 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4927 descriptor_configuration
= db_nsd
.get("ns-configuration")
4929 if descriptor_configuration
and descriptor_configuration
.get(
4932 for config_primitive
in descriptor_configuration
["config-primitive"]:
4933 if config_primitive
["name"] == primitive
:
4934 config_primitive_desc
= config_primitive
4937 if not config_primitive_desc
:
4938 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4940 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4944 primitive_name
= primitive
4945 ee_descriptor_id
= None
4947 primitive_name
= config_primitive_desc
.get(
4948 "execution-environment-primitive", primitive
4950 ee_descriptor_id
= config_primitive_desc
.get(
4951 "execution-environment-ref"
4957 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4959 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4962 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4964 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4966 desc_params
= parse_yaml_strings(
4967 db_vnfr
.get("additionalParamsForVnf")
4970 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4971 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4972 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4974 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4975 actions
.add(primitive
["name"])
4976 for primitive
in kdu_configuration
.get("config-primitive", []):
4977 actions
.add(primitive
["name"])
4978 kdu_action
= True if primitive_name
in actions
else False
4980 # TODO check if ns is in a proper status
4982 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4984 # kdur and desc_params already set from before
4985 if primitive_params
:
4986 desc_params
.update(primitive_params
)
4987 # TODO Check if we will need something at vnf level
4988 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4990 kdu_name
== kdu
["kdu-name"]
4991 and kdu
["member-vnf-index"] == vnf_index
4996 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4999 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5000 msg
= "unknown k8scluster-type '{}'".format(
5001 kdu
.get("k8scluster-type")
5003 raise LcmException(msg
)
5006 "collection": "nsrs",
5007 "filter": {"_id": nsr_id
},
5008 "path": "_admin.deployed.K8s.{}".format(index
),
5012 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5014 step
= "Executing kdu {}".format(primitive_name
)
5015 if primitive_name
== "upgrade":
5016 if desc_params
.get("kdu_model"):
5017 kdu_model
= desc_params
.get("kdu_model")
5018 del desc_params
["kdu_model"]
5020 kdu_model
= kdu
.get("kdu-model")
5021 parts
= kdu_model
.split(sep
=":")
5023 kdu_model
= parts
[0]
5025 detailed_status
= await asyncio
.wait_for(
5026 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5027 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5028 kdu_instance
=kdu
.get("kdu-instance"),
5030 kdu_model
=kdu_model
,
5033 timeout
=timeout_ns_action
,
5035 timeout
=timeout_ns_action
+ 10,
5038 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5040 elif primitive_name
== "rollback":
5041 detailed_status
= await asyncio
.wait_for(
5042 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5043 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5044 kdu_instance
=kdu
.get("kdu-instance"),
5047 timeout
=timeout_ns_action
,
5049 elif primitive_name
== "status":
5050 detailed_status
= await asyncio
.wait_for(
5051 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5052 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5053 kdu_instance
=kdu
.get("kdu-instance"),
5056 timeout
=timeout_ns_action
,
5059 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5060 kdu
["kdu-name"], nsr_id
5062 params
= self
._map
_primitive
_params
(
5063 config_primitive_desc
, primitive_params
, desc_params
5066 detailed_status
= await asyncio
.wait_for(
5067 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5068 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5069 kdu_instance
=kdu_instance
,
5070 primitive_name
=primitive_name
,
5073 timeout
=timeout_ns_action
,
5076 timeout
=timeout_ns_action
,
5080 nslcmop_operation_state
= "COMPLETED"
5082 detailed_status
= ""
5083 nslcmop_operation_state
= "FAILED"
5085 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5086 nsr_deployed
["VCA"],
5087 member_vnf_index
=vnf_index
,
5089 vdu_count_index
=vdu_count_index
,
5090 ee_descriptor_id
=ee_descriptor_id
,
5092 for vca_index
, vca_deployed
in enumerate(
5093 db_nsr
["_admin"]["deployed"]["VCA"]
5095 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5097 "collection": "nsrs",
5098 "filter": {"_id": nsr_id
},
5099 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5103 nslcmop_operation_state
,
5105 ) = await self
._ns
_execute
_primitive
(
5107 primitive
=primitive_name
,
5108 primitive_params
=self
._map
_primitive
_params
(
5109 config_primitive_desc
, primitive_params
, desc_params
5111 timeout
=timeout_ns_action
,
5117 db_nslcmop_update
["detailed-status"] = detailed_status
5118 error_description_nslcmop
= (
5119 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5123 + " task Done with result {} {}".format(
5124 nslcmop_operation_state
, detailed_status
5127 return # database update is called inside finally
5129 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5130 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5132 except asyncio
.CancelledError
:
5134 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5136 exc
= "Operation was cancelled"
5137 except asyncio
.TimeoutError
:
5138 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5140 except Exception as e
:
5141 exc
= traceback
.format_exc()
5142 self
.logger
.critical(
5143 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5152 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5153 nslcmop_operation_state
= "FAILED"
5155 self
._write
_ns
_status
(
5159 ], # TODO check if degraded. For the moment use previous status
5160 current_operation
="IDLE",
5161 current_operation_id
=None,
5162 # error_description=error_description_nsr,
5163 # error_detail=error_detail,
5164 other_update
=db_nsr_update
,
5167 self
._write
_op
_status
(
5170 error_message
=error_description_nslcmop
,
5171 operation_state
=nslcmop_operation_state
,
5172 other_update
=db_nslcmop_update
,
5175 if nslcmop_operation_state
:
5177 await self
.msg
.aiowrite(
5182 "nslcmop_id": nslcmop_id
,
5183 "operationState": nslcmop_operation_state
,
5187 except Exception as e
:
5189 logging_text
+ "kafka_write notification Exception {}".format(e
)
5191 self
.logger
.debug(logging_text
+ "Exit")
5192 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5193 return nslcmop_operation_state
, detailed_status
5195 async def scale(self
, nsr_id
, nslcmop_id
):
5196 # Try to lock HA task here
5197 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5198 if not task_is_locked_by_me
:
5201 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5202 stage
= ["", "", ""]
5203 tasks_dict_info
= {}
5204 # ^ stage, step, VIM progress
5205 self
.logger
.debug(logging_text
+ "Enter")
5206 # get all needed from database
5208 db_nslcmop_update
= {}
5211 # in case of error, indicates what part of scale was failed to put nsr at error status
5212 scale_process
= None
5213 old_operational_status
= ""
5214 old_config_status
= ""
5217 # wait for any previous tasks in process
5218 step
= "Waiting for previous operations to terminate"
5219 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5220 self
._write
_ns
_status
(
5223 current_operation
="SCALING",
5224 current_operation_id
=nslcmop_id
,
5227 step
= "Getting nslcmop from database"
5229 step
+ " after having waited for previous tasks to be completed"
5231 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5233 step
= "Getting nsr from database"
5234 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5235 old_operational_status
= db_nsr
["operational-status"]
5236 old_config_status
= db_nsr
["config-status"]
5238 step
= "Parsing scaling parameters"
5239 db_nsr_update
["operational-status"] = "scaling"
5240 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5241 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5243 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5245 ]["member-vnf-index"]
5246 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5248 ]["scaling-group-descriptor"]
5249 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5250 # for backward compatibility
5251 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5252 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5253 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5254 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5256 step
= "Getting vnfr from database"
5257 db_vnfr
= self
.db
.get_one(
5258 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5261 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5263 step
= "Getting vnfd from database"
5264 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5266 base_folder
= db_vnfd
["_admin"]["storage"]
5268 step
= "Getting scaling-group-descriptor"
5269 scaling_descriptor
= find_in_list(
5270 get_scaling_aspect(db_vnfd
),
5271 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5273 if not scaling_descriptor
:
5275 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5276 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5279 step
= "Sending scale order to VIM"
5280 # TODO check if ns is in a proper status
5282 if not db_nsr
["_admin"].get("scaling-group"):
5287 "_admin.scaling-group": [
5288 {"name": scaling_group
, "nb-scale-op": 0}
5292 admin_scale_index
= 0
5294 for admin_scale_index
, admin_scale_info
in enumerate(
5295 db_nsr
["_admin"]["scaling-group"]
5297 if admin_scale_info
["name"] == scaling_group
:
5298 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5300 else: # not found, set index one plus last element and add new entry with the name
5301 admin_scale_index
+= 1
5303 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5306 vca_scaling_info
= []
5307 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5308 if scaling_type
== "SCALE_OUT":
5309 if "aspect-delta-details" not in scaling_descriptor
:
5311 "Aspect delta details not fount in scaling descriptor {}".format(
5312 scaling_descriptor
["name"]
5315 # count if max-instance-count is reached
5316 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5318 scaling_info
["scaling_direction"] = "OUT"
5319 scaling_info
["vdu-create"] = {}
5320 scaling_info
["kdu-create"] = {}
5321 for delta
in deltas
:
5322 for vdu_delta
in delta
.get("vdu-delta", {}):
5323 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5324 # vdu_index also provides the number of instance of the targeted vdu
5325 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5326 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5330 additional_params
= (
5331 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5334 cloud_init_list
= []
5336 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5337 max_instance_count
= 10
5338 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5339 max_instance_count
= vdu_profile
.get(
5340 "max-number-of-instances", 10
5343 default_instance_num
= get_number_of_instances(
5346 instances_number
= vdu_delta
.get("number-of-instances", 1)
5347 nb_scale_op
+= instances_number
5349 new_instance_count
= nb_scale_op
+ default_instance_num
5350 # Control if new count is over max and vdu count is less than max.
5351 # Then assign new instance count
5352 if new_instance_count
> max_instance_count
> vdu_count
:
5353 instances_number
= new_instance_count
- max_instance_count
5355 instances_number
= instances_number
5357 if new_instance_count
> max_instance_count
:
5359 "reached the limit of {} (max-instance-count) "
5360 "scaling-out operations for the "
5361 "scaling-group-descriptor '{}'".format(
5362 nb_scale_op
, scaling_group
5365 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5367 # TODO Information of its own ip is not available because db_vnfr is not updated.
5368 additional_params
["OSM"] = get_osm_params(
5369 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5371 cloud_init_list
.append(
5372 self
._parse
_cloud
_init
(
5379 vca_scaling_info
.append(
5381 "osm_vdu_id": vdu_delta
["id"],
5382 "member-vnf-index": vnf_index
,
5384 "vdu_index": vdu_index
+ x
,
5387 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5388 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5389 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5390 kdu_name
= kdu_profile
["kdu-name"]
5391 resource_name
= kdu_profile
["resource-name"]
5393 # Might have different kdus in the same delta
5394 # Should have list for each kdu
5395 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5396 scaling_info
["kdu-create"][kdu_name
] = []
5398 kdur
= get_kdur(db_vnfr
, kdu_name
)
5399 if kdur
.get("helm-chart"):
5400 k8s_cluster_type
= "helm-chart-v3"
5401 self
.logger
.debug("kdur: {}".format(kdur
))
5403 kdur
.get("helm-version")
5404 and kdur
.get("helm-version") == "v2"
5406 k8s_cluster_type
= "helm-chart"
5407 raise NotImplementedError
5408 elif kdur
.get("juju-bundle"):
5409 k8s_cluster_type
= "juju-bundle"
5412 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5413 "juju-bundle. Maybe an old NBI version is running".format(
5414 db_vnfr
["member-vnf-index-ref"], kdu_name
5418 max_instance_count
= 10
5419 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5420 max_instance_count
= kdu_profile
.get(
5421 "max-number-of-instances", 10
5424 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5425 deployed_kdu
, _
= get_deployed_kdu(
5426 nsr_deployed
, kdu_name
, vnf_index
5428 if deployed_kdu
is None:
5430 "KDU '{}' for vnf '{}' not deployed".format(
5434 kdu_instance
= deployed_kdu
.get("kdu-instance")
5435 instance_num
= await self
.k8scluster_map
[
5437 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5438 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5439 "number-of-instances", 1
5442 # Control if new count is over max and instance_num is less than max.
5443 # Then assign max instance number to kdu replica count
5444 if kdu_replica_count
> max_instance_count
> instance_num
:
5445 kdu_replica_count
= max_instance_count
5446 if kdu_replica_count
> max_instance_count
:
5448 "reached the limit of {} (max-instance-count) "
5449 "scaling-out operations for the "
5450 "scaling-group-descriptor '{}'".format(
5451 instance_num
, scaling_group
5455 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5456 vca_scaling_info
.append(
5458 "osm_kdu_id": kdu_name
,
5459 "member-vnf-index": vnf_index
,
5461 "kdu_index": instance_num
+ x
- 1,
5464 scaling_info
["kdu-create"][kdu_name
].append(
5466 "member-vnf-index": vnf_index
,
5468 "k8s-cluster-type": k8s_cluster_type
,
5469 "resource-name": resource_name
,
5470 "scale": kdu_replica_count
,
5473 elif scaling_type
== "SCALE_IN":
5474 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5476 scaling_info
["scaling_direction"] = "IN"
5477 scaling_info
["vdu-delete"] = {}
5478 scaling_info
["kdu-delete"] = {}
5480 for delta
in deltas
:
5481 for vdu_delta
in delta
.get("vdu-delta", {}):
5482 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5483 min_instance_count
= 0
5484 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5485 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5486 min_instance_count
= vdu_profile
["min-number-of-instances"]
5488 default_instance_num
= get_number_of_instances(
5489 db_vnfd
, vdu_delta
["id"]
5491 instance_num
= vdu_delta
.get("number-of-instances", 1)
5492 nb_scale_op
-= instance_num
5494 new_instance_count
= nb_scale_op
+ default_instance_num
5496 if new_instance_count
< min_instance_count
< vdu_count
:
5497 instances_number
= min_instance_count
- new_instance_count
5499 instances_number
= instance_num
5501 if new_instance_count
< min_instance_count
:
5503 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5504 "scaling-group-descriptor '{}'".format(
5505 nb_scale_op
, scaling_group
5508 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5509 vca_scaling_info
.append(
5511 "osm_vdu_id": vdu_delta
["id"],
5512 "member-vnf-index": vnf_index
,
5514 "vdu_index": vdu_index
- 1 - x
,
5517 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5518 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5519 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5520 kdu_name
= kdu_profile
["kdu-name"]
5521 resource_name
= kdu_profile
["resource-name"]
5523 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5524 scaling_info
["kdu-delete"][kdu_name
] = []
5526 kdur
= get_kdur(db_vnfr
, kdu_name
)
5527 if kdur
.get("helm-chart"):
5528 k8s_cluster_type
= "helm-chart-v3"
5529 self
.logger
.debug("kdur: {}".format(kdur
))
5531 kdur
.get("helm-version")
5532 and kdur
.get("helm-version") == "v2"
5534 k8s_cluster_type
= "helm-chart"
5535 raise NotImplementedError
5536 elif kdur
.get("juju-bundle"):
5537 k8s_cluster_type
= "juju-bundle"
5540 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5541 "juju-bundle. Maybe an old NBI version is running".format(
5542 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5546 min_instance_count
= 0
5547 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5548 min_instance_count
= kdu_profile
["min-number-of-instances"]
5550 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5551 deployed_kdu
, _
= get_deployed_kdu(
5552 nsr_deployed
, kdu_name
, vnf_index
5554 if deployed_kdu
is None:
5556 "KDU '{}' for vnf '{}' not deployed".format(
5560 kdu_instance
= deployed_kdu
.get("kdu-instance")
5561 instance_num
= await self
.k8scluster_map
[
5563 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5564 kdu_replica_count
= instance_num
- kdu_delta
.get(
5565 "number-of-instances", 1
5568 if kdu_replica_count
< min_instance_count
< instance_num
:
5569 kdu_replica_count
= min_instance_count
5570 if kdu_replica_count
< min_instance_count
:
5572 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5573 "scaling-group-descriptor '{}'".format(
5574 instance_num
, scaling_group
5578 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5579 vca_scaling_info
.append(
5581 "osm_kdu_id": kdu_name
,
5582 "member-vnf-index": vnf_index
,
5584 "kdu_index": instance_num
- x
- 1,
5587 scaling_info
["kdu-delete"][kdu_name
].append(
5589 "member-vnf-index": vnf_index
,
5591 "k8s-cluster-type": k8s_cluster_type
,
5592 "resource-name": resource_name
,
5593 "scale": kdu_replica_count
,
5597 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5598 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5599 if scaling_info
["scaling_direction"] == "IN":
5600 for vdur
in reversed(db_vnfr
["vdur"]):
5601 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5602 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5603 scaling_info
["vdu"].append(
5605 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5606 "vdu_id": vdur
["vdu-id-ref"],
5610 for interface
in vdur
["interfaces"]:
5611 scaling_info
["vdu"][-1]["interface"].append(
5613 "name": interface
["name"],
5614 "ip_address": interface
["ip-address"],
5615 "mac_address": interface
.get("mac-address"),
5618 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5621 step
= "Executing pre-scale vnf-config-primitive"
5622 if scaling_descriptor
.get("scaling-config-action"):
5623 for scaling_config_action
in scaling_descriptor
[
5624 "scaling-config-action"
5627 scaling_config_action
.get("trigger") == "pre-scale-in"
5628 and scaling_type
== "SCALE_IN"
5630 scaling_config_action
.get("trigger") == "pre-scale-out"
5631 and scaling_type
== "SCALE_OUT"
5633 vnf_config_primitive
= scaling_config_action
[
5634 "vnf-config-primitive-name-ref"
5636 step
= db_nslcmop_update
[
5638 ] = "executing pre-scale scaling-config-action '{}'".format(
5639 vnf_config_primitive
5642 # look for primitive
5643 for config_primitive
in (
5644 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5645 ).get("config-primitive", ()):
5646 if config_primitive
["name"] == vnf_config_primitive
:
5650 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5651 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5652 "primitive".format(scaling_group
, vnf_config_primitive
)
5655 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5656 if db_vnfr
.get("additionalParamsForVnf"):
5657 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5659 scale_process
= "VCA"
5660 db_nsr_update
["config-status"] = "configuring pre-scaling"
5661 primitive_params
= self
._map
_primitive
_params
(
5662 config_primitive
, {}, vnfr_params
5665 # Pre-scale retry check: Check if this sub-operation has been executed before
5666 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5669 vnf_config_primitive
,
5673 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5674 # Skip sub-operation
5675 result
= "COMPLETED"
5676 result_detail
= "Done"
5679 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5680 vnf_config_primitive
, result
, result_detail
5684 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5685 # New sub-operation: Get index of this sub-operation
5687 len(db_nslcmop
.get("_admin", {}).get("operations"))
5692 + "vnf_config_primitive={} New sub-operation".format(
5693 vnf_config_primitive
5697 # retry: Get registered params for this existing sub-operation
5698 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5701 vnf_index
= op
.get("member_vnf_index")
5702 vnf_config_primitive
= op
.get("primitive")
5703 primitive_params
= op
.get("primitive_params")
5706 + "vnf_config_primitive={} Sub-operation retry".format(
5707 vnf_config_primitive
5710 # Execute the primitive, either with new (first-time) or registered (reintent) args
5711 ee_descriptor_id
= config_primitive
.get(
5712 "execution-environment-ref"
5714 primitive_name
= config_primitive
.get(
5715 "execution-environment-primitive", vnf_config_primitive
5717 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5718 nsr_deployed
["VCA"],
5719 member_vnf_index
=vnf_index
,
5721 vdu_count_index
=None,
5722 ee_descriptor_id
=ee_descriptor_id
,
5724 result
, result_detail
= await self
._ns
_execute
_primitive
(
5733 + "vnf_config_primitive={} Done with result {} {}".format(
5734 vnf_config_primitive
, result
, result_detail
5737 # Update operationState = COMPLETED | FAILED
5738 self
._update
_suboperation
_status
(
5739 db_nslcmop
, op_index
, result
, result_detail
5742 if result
== "FAILED":
5743 raise LcmException(result_detail
)
5744 db_nsr_update
["config-status"] = old_config_status
5745 scale_process
= None
5749 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5752 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5755 # SCALE-IN VCA - BEGIN
5756 if vca_scaling_info
:
5757 step
= db_nslcmop_update
[
5759 ] = "Deleting the execution environments"
5760 scale_process
= "VCA"
5761 for vca_info
in vca_scaling_info
:
5762 if vca_info
["type"] == "delete":
5763 member_vnf_index
= str(vca_info
["member-vnf-index"])
5765 logging_text
+ "vdu info: {}".format(vca_info
)
5767 if vca_info
.get("osm_vdu_id"):
5768 vdu_id
= vca_info
["osm_vdu_id"]
5769 vdu_index
= int(vca_info
["vdu_index"])
5772 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5773 member_vnf_index
, vdu_id
, vdu_index
5777 kdu_id
= vca_info
["osm_kdu_id"]
5780 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5781 member_vnf_index
, kdu_id
, vdu_index
5783 stage
[2] = step
= "Scaling in VCA"
5784 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5785 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5786 config_update
= db_nsr
["configurationStatus"]
5787 for vca_index
, vca
in enumerate(vca_update
):
5789 (vca
or vca
.get("ee_id"))
5790 and vca
["member-vnf-index"] == member_vnf_index
5791 and vca
["vdu_count_index"] == vdu_index
5793 if vca
.get("vdu_id"):
5794 config_descriptor
= get_configuration(
5795 db_vnfd
, vca
.get("vdu_id")
5797 elif vca
.get("kdu_name"):
5798 config_descriptor
= get_configuration(
5799 db_vnfd
, vca
.get("kdu_name")
5802 config_descriptor
= get_configuration(
5803 db_vnfd
, db_vnfd
["id"]
5805 operation_params
= (
5806 db_nslcmop
.get("operationParams") or {}
5808 exec_terminate_primitives
= not operation_params
.get(
5809 "skip_terminate_primitives"
5810 ) and vca
.get("needed_terminate")
5811 task
= asyncio
.ensure_future(
5820 exec_primitives
=exec_terminate_primitives
,
5824 timeout
=self
.timeout_charm_delete
,
5827 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5830 del vca_update
[vca_index
]
5831 del config_update
[vca_index
]
5832 # wait for pending tasks of terminate primitives
5836 + "Waiting for tasks {}".format(
5837 list(tasks_dict_info
.keys())
5840 error_list
= await self
._wait
_for
_tasks
(
5844 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5849 tasks_dict_info
.clear()
5851 raise LcmException("; ".join(error_list
))
5853 db_vca_and_config_update
= {
5854 "_admin.deployed.VCA": vca_update
,
5855 "configurationStatus": config_update
,
5858 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5860 scale_process
= None
5861 # SCALE-IN VCA - END
5864 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5865 scale_process
= "RO"
5866 if self
.ro_config
.get("ng"):
5867 await self
._scale
_ng
_ro
(
5868 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5870 scaling_info
.pop("vdu-create", None)
5871 scaling_info
.pop("vdu-delete", None)
5873 scale_process
= None
5877 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5878 scale_process
= "KDU"
5879 await self
._scale
_kdu
(
5880 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5882 scaling_info
.pop("kdu-create", None)
5883 scaling_info
.pop("kdu-delete", None)
5885 scale_process
= None
5889 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5891 # SCALE-UP VCA - BEGIN
5892 if vca_scaling_info
:
5893 step
= db_nslcmop_update
[
5895 ] = "Creating new execution environments"
5896 scale_process
= "VCA"
5897 for vca_info
in vca_scaling_info
:
5898 if vca_info
["type"] == "create":
5899 member_vnf_index
= str(vca_info
["member-vnf-index"])
5901 logging_text
+ "vdu info: {}".format(vca_info
)
5903 vnfd_id
= db_vnfr
["vnfd-ref"]
5904 if vca_info
.get("osm_vdu_id"):
5905 vdu_index
= int(vca_info
["vdu_index"])
5906 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5907 if db_vnfr
.get("additionalParamsForVnf"):
5908 deploy_params
.update(
5910 db_vnfr
["additionalParamsForVnf"].copy()
5913 descriptor_config
= get_configuration(
5914 db_vnfd
, db_vnfd
["id"]
5916 if descriptor_config
:
5921 logging_text
=logging_text
5922 + "member_vnf_index={} ".format(member_vnf_index
),
5925 nslcmop_id
=nslcmop_id
,
5931 member_vnf_index
=member_vnf_index
,
5932 vdu_index
=vdu_index
,
5934 deploy_params
=deploy_params
,
5935 descriptor_config
=descriptor_config
,
5936 base_folder
=base_folder
,
5937 task_instantiation_info
=tasks_dict_info
,
5940 vdu_id
= vca_info
["osm_vdu_id"]
5941 vdur
= find_in_list(
5942 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5944 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5945 if vdur
.get("additionalParams"):
5946 deploy_params_vdu
= parse_yaml_strings(
5947 vdur
["additionalParams"]
5950 deploy_params_vdu
= deploy_params
5951 deploy_params_vdu
["OSM"] = get_osm_params(
5952 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5954 if descriptor_config
:
5959 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5960 member_vnf_index
, vdu_id
, vdu_index
5962 stage
[2] = step
= "Scaling out VCA"
5963 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5965 logging_text
=logging_text
5966 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5967 member_vnf_index
, vdu_id
, vdu_index
5971 nslcmop_id
=nslcmop_id
,
5977 member_vnf_index
=member_vnf_index
,
5978 vdu_index
=vdu_index
,
5980 deploy_params
=deploy_params_vdu
,
5981 descriptor_config
=descriptor_config
,
5982 base_folder
=base_folder
,
5983 task_instantiation_info
=tasks_dict_info
,
5987 kdu_name
= vca_info
["osm_kdu_id"]
5988 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5989 if descriptor_config
:
5991 kdu_index
= int(vca_info
["kdu_index"])
5995 for x
in db_vnfr
["kdur"]
5996 if x
["kdu-name"] == kdu_name
5998 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5999 if kdur
.get("additionalParams"):
6000 deploy_params_kdu
= parse_yaml_strings(
6001 kdur
["additionalParams"]
6005 logging_text
=logging_text
,
6008 nslcmop_id
=nslcmop_id
,
6014 member_vnf_index
=member_vnf_index
,
6015 vdu_index
=kdu_index
,
6017 deploy_params
=deploy_params_kdu
,
6018 descriptor_config
=descriptor_config
,
6019 base_folder
=base_folder
,
6020 task_instantiation_info
=tasks_dict_info
,
6023 # SCALE-UP VCA - END
6024 scale_process
= None
6027 # execute primitive service POST-SCALING
6028 step
= "Executing post-scale vnf-config-primitive"
6029 if scaling_descriptor
.get("scaling-config-action"):
6030 for scaling_config_action
in scaling_descriptor
[
6031 "scaling-config-action"
6034 scaling_config_action
.get("trigger") == "post-scale-in"
6035 and scaling_type
== "SCALE_IN"
6037 scaling_config_action
.get("trigger") == "post-scale-out"
6038 and scaling_type
== "SCALE_OUT"
6040 vnf_config_primitive
= scaling_config_action
[
6041 "vnf-config-primitive-name-ref"
6043 step
= db_nslcmop_update
[
6045 ] = "executing post-scale scaling-config-action '{}'".format(
6046 vnf_config_primitive
6049 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6050 if db_vnfr
.get("additionalParamsForVnf"):
6051 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6053 # look for primitive
6054 for config_primitive
in (
6055 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6056 ).get("config-primitive", ()):
6057 if config_primitive
["name"] == vnf_config_primitive
:
6061 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6062 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6063 "config-primitive".format(
6064 scaling_group
, vnf_config_primitive
6067 scale_process
= "VCA"
6068 db_nsr_update
["config-status"] = "configuring post-scaling"
6069 primitive_params
= self
._map
_primitive
_params
(
6070 config_primitive
, {}, vnfr_params
6073 # Post-scale retry check: Check if this sub-operation has been executed before
6074 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6077 vnf_config_primitive
,
6081 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6082 # Skip sub-operation
6083 result
= "COMPLETED"
6084 result_detail
= "Done"
6087 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6088 vnf_config_primitive
, result
, result_detail
6092 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6093 # New sub-operation: Get index of this sub-operation
6095 len(db_nslcmop
.get("_admin", {}).get("operations"))
6100 + "vnf_config_primitive={} New sub-operation".format(
6101 vnf_config_primitive
6105 # retry: Get registered params for this existing sub-operation
6106 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6109 vnf_index
= op
.get("member_vnf_index")
6110 vnf_config_primitive
= op
.get("primitive")
6111 primitive_params
= op
.get("primitive_params")
6114 + "vnf_config_primitive={} Sub-operation retry".format(
6115 vnf_config_primitive
6118 # Execute the primitive, either with new (first-time) or registered (reintent) args
6119 ee_descriptor_id
= config_primitive
.get(
6120 "execution-environment-ref"
6122 primitive_name
= config_primitive
.get(
6123 "execution-environment-primitive", vnf_config_primitive
6125 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6126 nsr_deployed
["VCA"],
6127 member_vnf_index
=vnf_index
,
6129 vdu_count_index
=None,
6130 ee_descriptor_id
=ee_descriptor_id
,
6132 result
, result_detail
= await self
._ns
_execute
_primitive
(
6141 + "vnf_config_primitive={} Done with result {} {}".format(
6142 vnf_config_primitive
, result
, result_detail
6145 # Update operationState = COMPLETED | FAILED
6146 self
._update
_suboperation
_status
(
6147 db_nslcmop
, op_index
, result
, result_detail
6150 if result
== "FAILED":
6151 raise LcmException(result_detail
)
6152 db_nsr_update
["config-status"] = old_config_status
6153 scale_process
= None
6158 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6159 db_nsr_update
["operational-status"] = (
6161 if old_operational_status
== "failed"
6162 else old_operational_status
6164 db_nsr_update
["config-status"] = old_config_status
6167 ROclient
.ROClientException
,
6172 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6174 except asyncio
.CancelledError
:
6176 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6178 exc
= "Operation was cancelled"
6179 except Exception as e
:
6180 exc
= traceback
.format_exc()
6181 self
.logger
.critical(
6182 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6186 self
._write
_ns
_status
(
6189 current_operation
="IDLE",
6190 current_operation_id
=None,
6193 stage
[1] = "Waiting for instantiate pending tasks."
6194 self
.logger
.debug(logging_text
+ stage
[1])
6195 exc
= await self
._wait
_for
_tasks
(
6198 self
.timeout_ns_deploy
,
6206 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6207 nslcmop_operation_state
= "FAILED"
6209 db_nsr_update
["operational-status"] = old_operational_status
6210 db_nsr_update
["config-status"] = old_config_status
6211 db_nsr_update
["detailed-status"] = ""
6213 if "VCA" in scale_process
:
6214 db_nsr_update
["config-status"] = "failed"
6215 if "RO" in scale_process
:
6216 db_nsr_update
["operational-status"] = "failed"
6219 ] = "FAILED scaling nslcmop={} {}: {}".format(
6220 nslcmop_id
, step
, exc
6223 error_description_nslcmop
= None
6224 nslcmop_operation_state
= "COMPLETED"
6225 db_nslcmop_update
["detailed-status"] = "Done"
6227 self
._write
_op
_status
(
6230 error_message
=error_description_nslcmop
,
6231 operation_state
=nslcmop_operation_state
,
6232 other_update
=db_nslcmop_update
,
6235 self
._write
_ns
_status
(
6238 current_operation
="IDLE",
6239 current_operation_id
=None,
6240 other_update
=db_nsr_update
,
6243 if nslcmop_operation_state
:
6247 "nslcmop_id": nslcmop_id
,
6248 "operationState": nslcmop_operation_state
,
6250 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6251 except Exception as e
:
6253 logging_text
+ "kafka_write notification Exception {}".format(e
)
6255 self
.logger
.debug(logging_text
+ "Exit")
6256 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6258 async def _scale_kdu(
6259 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6261 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6262 for kdu_name
in _scaling_info
:
6263 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6264 deployed_kdu
, index
= get_deployed_kdu(
6265 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6267 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6268 kdu_instance
= deployed_kdu
["kdu-instance"]
6269 scale
= int(kdu_scaling_info
["scale"])
6270 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6273 "collection": "nsrs",
6274 "filter": {"_id": nsr_id
},
6275 "path": "_admin.deployed.K8s.{}".format(index
),
6278 step
= "scaling application {}".format(
6279 kdu_scaling_info
["resource-name"]
6281 self
.logger
.debug(logging_text
+ step
)
6283 if kdu_scaling_info
["type"] == "delete":
6284 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6287 and kdu_config
.get("terminate-config-primitive")
6288 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6290 terminate_config_primitive_list
= kdu_config
.get(
6291 "terminate-config-primitive"
6293 terminate_config_primitive_list
.sort(
6294 key
=lambda val
: int(val
["seq"])
6298 terminate_config_primitive
6299 ) in terminate_config_primitive_list
:
6300 primitive_params_
= self
._map
_primitive
_params
(
6301 terminate_config_primitive
, {}, {}
6303 step
= "execute terminate config primitive"
6304 self
.logger
.debug(logging_text
+ step
)
6305 await asyncio
.wait_for(
6306 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6307 cluster_uuid
=cluster_uuid
,
6308 kdu_instance
=kdu_instance
,
6309 primitive_name
=terminate_config_primitive
["name"],
6310 params
=primitive_params_
,
6317 await asyncio
.wait_for(
6318 self
.k8scluster_map
[k8s_cluster_type
].scale(
6321 kdu_scaling_info
["resource-name"],
6324 timeout
=self
.timeout_vca_on_error
,
6327 if kdu_scaling_info
["type"] == "create":
6328 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6331 and kdu_config
.get("initial-config-primitive")
6332 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6334 initial_config_primitive_list
= kdu_config
.get(
6335 "initial-config-primitive"
6337 initial_config_primitive_list
.sort(
6338 key
=lambda val
: int(val
["seq"])
6341 for initial_config_primitive
in initial_config_primitive_list
:
6342 primitive_params_
= self
._map
_primitive
_params
(
6343 initial_config_primitive
, {}, {}
6345 step
= "execute initial config primitive"
6346 self
.logger
.debug(logging_text
+ step
)
6347 await asyncio
.wait_for(
6348 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6349 cluster_uuid
=cluster_uuid
,
6350 kdu_instance
=kdu_instance
,
6351 primitive_name
=initial_config_primitive
["name"],
6352 params
=primitive_params_
,
6359 async def _scale_ng_ro(
6360 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6362 nsr_id
= db_nslcmop
["nsInstanceId"]
6363 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6366 # read from db: vnfd's for every vnf
6369 # for each vnf in ns, read vnfd
6370 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6371 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6372 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6373 # if we haven't this vnfd, read it from db
6374 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6376 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6377 db_vnfds
.append(vnfd
)
6378 n2vc_key
= self
.n2vc
.get_public_key()
6379 n2vc_key_list
= [n2vc_key
]
6382 vdu_scaling_info
.get("vdu-create"),
6383 vdu_scaling_info
.get("vdu-delete"),
6386 # db_vnfr has been updated, update db_vnfrs to use it
6387 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6388 await self
._instantiate
_ng
_ro
(
6398 start_deploy
=time(),
6399 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6401 if vdu_scaling_info
.get("vdu-delete"):
6403 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6406 async def extract_prometheus_scrape_jobs(
6410 ee_config_descriptor
,
6415 # look if exist a file called 'prometheus*.j2' and
6416 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6420 for f
in artifact_content
6421 if f
.startswith("prometheus") and f
.endswith(".j2")
6427 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6431 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6432 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6434 vnfr_id
= vnfr_id
.replace("-", "")
6436 "JOB_NAME": vnfr_id
,
6437 "TARGET_IP": target_ip
,
6438 "EXPORTER_POD_IP": host_name
,
6439 "EXPORTER_POD_PORT": host_port
,
6441 job_list
= parse_job(job_data
, variables
)
6442 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6443 for job
in job_list
:
6445 not isinstance(job
.get("job_name"), str)
6446 or vnfr_id
not in job
["job_name"]
6448 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6449 job
["nsr_id"] = nsr_id
6450 job
["vnfr_id"] = vnfr_id
6453 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6455 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6457 :param: vim_account_id: VIM Account ID
6459 :return: (cloud_name, cloud_credential)
6461 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6462 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6464 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6466 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6468 :param: vim_account_id: VIM Account ID
6470 :return: (cloud_name, cloud_credential)
6472 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6473 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")