1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
20 from typing
import Any
, Dict
, List
23 import logging
.handlers
34 from osm_lcm
import ROclient
35 from osm_lcm
.data_utils
.nsr
import (
38 get_deployed_vca_list
,
41 from osm_lcm
.data_utils
.vca
import (
50 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
51 from osm_lcm
.lcm_utils
import (
59 from osm_lcm
.data_utils
.nsd
import (
60 get_ns_configuration_relation_list
,
64 from osm_lcm
.data_utils
.vnfd
import (
68 get_ee_sorted_initial_config_primitive_list
,
69 get_ee_sorted_terminate_config_primitive_list
,
71 get_virtual_link_profiles
,
76 get_number_of_instances
,
78 get_kdu_resource_profile
,
80 from osm_lcm
.data_utils
.list_utils
import find_in_list
81 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
82 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
83 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
84 from n2vc
.definitions
import RelationEndpoint
85 from n2vc
.k8s_helm_conn
import K8sHelmConnector
86 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
87 from n2vc
.k8s_juju_conn
import K8sJujuConnector
89 from osm_common
.dbbase
import DbException
90 from osm_common
.fsbase
import FsException
92 from osm_lcm
.data_utils
.database
.database
import Database
93 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
95 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
96 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
98 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
99 from osm_lcm
.prometheus
import parse_job
101 from copy
import copy
, deepcopy
102 from time
import time
103 from uuid
import uuid4
105 from random
import randint
107 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
110 class NsLcm(LcmBase
):
111 timeout_vca_on_error
= (
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete
= 10 * 60
117 timeout_primitive
= 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive
= (
120 ) # timeout for some progress in a primitive execution
122 SUBOPERATION_STATUS_NOT_FOUND
= -1
123 SUBOPERATION_STATUS_NEW
= -2
124 SUBOPERATION_STATUS_SKIP
= -3
125 task_name_deploy_vca
= "Deploying VCA"
127 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
133 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
135 self
.db
= Database().instance
.db
136 self
.fs
= Filesystem().instance
.fs
138 self
.lcm_tasks
= lcm_tasks
139 self
.timeout
= config
["timeout"]
140 self
.ro_config
= config
["ro_config"]
141 self
.ng_ro
= config
["ro_config"].get("ng")
142 self
.vca_config
= config
["VCA"].copy()
144 # create N2VC connector
145 self
.n2vc
= N2VCJujuConnector(
148 on_update_db
=self
._on
_update
_n
2vc
_db
,
153 self
.conn_helm_ee
= LCMHelmConn(
156 vca_config
=self
.vca_config
,
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
160 self
.k8sclusterhelm2
= K8sHelmConnector(
161 kubectl_command
=self
.vca_config
.get("kubectlpath"),
162 helm_command
=self
.vca_config
.get("helmpath"),
169 self
.k8sclusterhelm3
= K8sHelm3Connector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helm3path"),
178 self
.k8sclusterjuju
= K8sJujuConnector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 juju_command
=self
.vca_config
.get("jujupath"),
183 on_update_db
=self
._on
_update
_k
8s
_db
,
188 self
.k8scluster_map
= {
189 "helm-chart": self
.k8sclusterhelm2
,
190 "helm-chart-v3": self
.k8sclusterhelm3
,
191 "chart": self
.k8sclusterhelm3
,
192 "juju-bundle": self
.k8sclusterjuju
,
193 "juju": self
.k8sclusterjuju
,
197 "lxc_proxy_charm": self
.n2vc
,
198 "native_charm": self
.n2vc
,
199 "k8s_proxy_charm": self
.n2vc
,
200 "helm": self
.conn_helm_ee
,
201 "helm-v3": self
.conn_helm_ee
,
205 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
208 def increment_ip_mac(ip_mac
, vm_index
=1):
209 if not isinstance(ip_mac
, str):
212 # try with ipv4 look for last dot
213 i
= ip_mac
.rfind(".")
216 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i
= ip_mac
.rfind(":")
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
223 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
229 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
234 # TODO filter RO descriptor fields...
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict
["deploymentStatus"] = ro_descriptor
240 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
242 except Exception as e
:
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
247 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
249 # remove last dot from path (if exists)
250 if path
.endswith("."):
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
257 nsr_id
= filter.get("_id")
259 # read ns record from database
260 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
261 current_ns_status
= nsr
.get("nsState")
263 # get vca status for NS
264 status_dict
= await self
.n2vc
.get_status(
265 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
270 db_dict
["vcaStatus"] = status_dict
271 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
273 # update configurationStatus for this VCA
275 vca_index
= int(path
[path
.rfind(".") + 1 :])
278 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
280 vca_status
= vca_list
[vca_index
].get("status")
282 configuration_status_list
= nsr
.get("configurationStatus")
283 config_status
= configuration_status_list
[vca_index
].get("status")
285 if config_status
== "BROKEN" and vca_status
!= "failed":
286 db_dict
["configurationStatus"][vca_index
] = "READY"
287 elif config_status
!= "BROKEN" and vca_status
== "failed":
288 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
289 except Exception as e
:
290 # not update configurationStatus
291 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
296 if current_ns_status
in ("READY", "DEGRADED"):
297 error_description
= ""
299 if status_dict
.get("machines"):
300 for machine_id
in status_dict
.get("machines"):
301 machine
= status_dict
.get("machines").get(machine_id
)
302 # check machine agent-status
303 if machine
.get("agent-status"):
304 s
= machine
.get("agent-status").get("status")
307 error_description
+= (
308 "machine {} agent-status={} ; ".format(
312 # check machine instance status
313 if machine
.get("instance-status"):
314 s
= machine
.get("instance-status").get("status")
317 error_description
+= (
318 "machine {} instance-status={} ; ".format(
323 if status_dict
.get("applications"):
324 for app_id
in status_dict
.get("applications"):
325 app
= status_dict
.get("applications").get(app_id
)
326 # check application status
327 if app
.get("status"):
328 s
= app
.get("status").get("status")
331 error_description
+= (
332 "application {} status={} ; ".format(app_id
, s
)
335 if error_description
:
336 db_dict
["errorDescription"] = error_description
337 if current_ns_status
== "READY" and is_degraded
:
338 db_dict
["nsState"] = "DEGRADED"
339 if current_ns_status
== "DEGRADED" and not is_degraded
:
340 db_dict
["nsState"] = "READY"
343 self
.update_db_2("nsrs", nsr_id
, db_dict
)
345 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
347 except Exception as e
:
348 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
350 async def _on_update_k8s_db(
351 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
365 nsr_id
= filter.get("_id")
367 # get vca status for NS
368 vca_status
= await self
.k8sclusterjuju
.status_kdu(
371 complete_status
=True,
377 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
379 await self
.k8sclusterjuju
.update_vca_status(
380 db_dict
["vcaStatus"],
386 self
.update_db_2("nsrs", nsr_id
, db_dict
)
388 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
390 except Exception as e
:
391 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
394 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
396 env
= Environment(undefined
=StrictUndefined
)
397 template
= env
.from_string(cloud_init_text
)
398 return template
.render(additional_params
or {})
399 except UndefinedError
as e
:
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
405 except (TemplateError
, TemplateNotFound
) as e
:
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
412 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
413 cloud_init_content
= cloud_init_file
= None
415 if vdu
.get("cloud-init-file"):
416 base_folder
= vnfd
["_admin"]["storage"]
417 if base_folder
["pkg-dir"]:
418 cloud_init_file
= "{}/{}/cloud_init/{}".format(
419 base_folder
["folder"],
420 base_folder
["pkg-dir"],
421 vdu
["cloud-init-file"],
424 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
425 base_folder
["folder"],
426 vdu
["cloud-init-file"],
428 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
429 cloud_init_content
= ci_file
.read()
430 elif vdu
.get("cloud-init"):
431 cloud_init_content
= vdu
["cloud-init"]
433 return cloud_init_content
434 except FsException
as e
:
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd
["id"], vdu
["id"], cloud_init_file
, e
441 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
443 vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]
445 additional_params
= vdur
.get("additionalParams")
446 return parse_yaml_strings(additional_params
)
448 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
450 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
451 :param vnfd: input vnfd
452 :param new_id: overrides vnf id if provided
453 :param additionalParams: Instantiation params for VNFs provided
454 :param nsrId: Id of the NSR
455 :return: copy of vnfd
457 vnfd_RO
= deepcopy(vnfd
)
458 # remove unused by RO configuration, monitoring, scaling and internal keys
459 vnfd_RO
.pop("_id", None)
460 vnfd_RO
.pop("_admin", None)
461 vnfd_RO
.pop("monitoring-param", None)
462 vnfd_RO
.pop("scaling-group-descriptor", None)
463 vnfd_RO
.pop("kdu", None)
464 vnfd_RO
.pop("k8s-cluster", None)
466 vnfd_RO
["id"] = new_id
468 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
469 for vdu
in get_iterable(vnfd_RO
, "vdu"):
470 vdu
.pop("cloud-init-file", None)
471 vdu
.pop("cloud-init", None)
475 def ip_profile_2_RO(ip_profile
):
476 RO_ip_profile
= deepcopy(ip_profile
)
477 if "dns-server" in RO_ip_profile
:
478 if isinstance(RO_ip_profile
["dns-server"], list):
479 RO_ip_profile
["dns-address"] = []
480 for ds
in RO_ip_profile
.pop("dns-server"):
481 RO_ip_profile
["dns-address"].append(ds
["address"])
483 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
484 if RO_ip_profile
.get("ip-version") == "ipv4":
485 RO_ip_profile
["ip-version"] = "IPv4"
486 if RO_ip_profile
.get("ip-version") == "ipv6":
487 RO_ip_profile
["ip-version"] = "IPv6"
488 if "dhcp-params" in RO_ip_profile
:
489 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
492 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
493 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
494 if db_vim
["_admin"]["operationalState"] != "ENABLED":
496 "VIM={} is not available. operationalState={}".format(
497 vim_account
, db_vim
["_admin"]["operationalState"]
500 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
503 def get_ro_wim_id_for_wim_account(self
, wim_account
):
504 if isinstance(wim_account
, str):
505 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
506 if db_wim
["_admin"]["operationalState"] != "ENABLED":
508 "WIM={} is not available. operationalState={}".format(
509 wim_account
, db_wim
["_admin"]["operationalState"]
512 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
517 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
519 db_vdu_push_list
= []
520 db_update
= {"_admin.modified": time()}
522 for vdu_id
, vdu_count
in vdu_create
.items():
526 for vdur
in reversed(db_vnfr
["vdur"])
527 if vdur
["vdu-id-ref"] == vdu_id
533 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
538 for count
in range(vdu_count
):
539 vdur_copy
= deepcopy(vdur
)
540 vdur_copy
["status"] = "BUILD"
541 vdur_copy
["status-detailed"] = None
542 vdur_copy
["ip-address"] = None
543 vdur_copy
["_id"] = str(uuid4())
544 vdur_copy
["count-index"] += count
+ 1
545 vdur_copy
["id"] = "{}-{}".format(
546 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
548 vdur_copy
.pop("vim_info", None)
549 for iface
in vdur_copy
["interfaces"]:
550 if iface
.get("fixed-ip"):
551 iface
["ip-address"] = self
.increment_ip_mac(
552 iface
["ip-address"], count
+ 1
555 iface
.pop("ip-address", None)
556 if iface
.get("fixed-mac"):
557 iface
["mac-address"] = self
.increment_ip_mac(
558 iface
["mac-address"], count
+ 1
561 iface
.pop("mac-address", None)
564 ) # only first vdu can be managment of vnf
565 db_vdu_push_list
.append(vdur_copy
)
566 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
568 for vdu_id
, vdu_count
in vdu_delete
.items():
570 indexes_to_delete
= [
572 for iv
in enumerate(db_vnfr
["vdur"])
573 if iv
[1]["vdu-id-ref"] == vdu_id
577 "vdur.{}.status".format(i
): "DELETING"
578 for i
in indexes_to_delete
[-vdu_count
:]
582 # it must be deleted one by one because common.db does not allow otherwise
585 for v
in reversed(db_vnfr
["vdur"])
586 if v
["vdu-id-ref"] == vdu_id
588 for vdu
in vdus_to_delete
[:vdu_count
]:
591 {"_id": db_vnfr
["_id"]},
593 pull
={"vdur": {"_id": vdu
["_id"]}},
595 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
596 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
597 # modify passed dictionary db_vnfr
598 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
599 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
601 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
603 Updates database nsr with the RO info for the created vld
604 :param ns_update_nsr: dictionary to be filled with the updated info
605 :param db_nsr: content of db_nsr. This is also modified
606 :param nsr_desc_RO: nsr descriptor from RO
607 :return: Nothing, LcmException is raised on errors
610 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
611 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
612 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
614 vld
["vim-id"] = net_RO
.get("vim_net_id")
615 vld
["name"] = net_RO
.get("vim_name")
616 vld
["status"] = net_RO
.get("status")
617 vld
["status-detailed"] = net_RO
.get("error_msg")
618 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
622 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
625 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
627 for db_vnfr
in db_vnfrs
.values():
628 vnfr_update
= {"status": "ERROR"}
629 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
630 if "status" not in vdur
:
631 vdur
["status"] = "ERROR"
632 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
634 vdur
["status-detailed"] = str(error_text
)
636 "vdur.{}.status-detailed".format(vdu_index
)
638 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
639 except DbException
as e
:
640 self
.logger
.error("Cannot update vnf. {}".format(e
))
642 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
644 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
645 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
646 :param nsr_desc_RO: nsr descriptor from RO
647 :return: Nothing, LcmException is raised on errors
649 for vnf_index
, db_vnfr
in db_vnfrs
.items():
650 for vnf_RO
in nsr_desc_RO
["vnfs"]:
651 if vnf_RO
["member_vnf_index"] != vnf_index
:
654 if vnf_RO
.get("ip_address"):
655 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
658 elif not db_vnfr
.get("ip-address"):
659 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
660 raise LcmExceptionNoMgmtIP(
661 "ns member_vnf_index '{}' has no IP address".format(
666 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
667 vdur_RO_count_index
= 0
668 if vdur
.get("pdu-type"):
670 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
671 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
673 if vdur
["count-index"] != vdur_RO_count_index
:
674 vdur_RO_count_index
+= 1
676 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
677 if vdur_RO
.get("ip_address"):
678 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
680 vdur
["ip-address"] = None
681 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
682 vdur
["name"] = vdur_RO
.get("vim_name")
683 vdur
["status"] = vdur_RO
.get("status")
684 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
685 for ifacer
in get_iterable(vdur
, "interfaces"):
686 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
687 if ifacer
["name"] == interface_RO
.get("internal_name"):
688 ifacer
["ip-address"] = interface_RO
.get(
691 ifacer
["mac-address"] = interface_RO
.get(
697 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
698 "from VIM info".format(
699 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
702 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
708 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
712 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
713 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
714 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
716 vld
["vim-id"] = net_RO
.get("vim_net_id")
717 vld
["name"] = net_RO
.get("vim_name")
718 vld
["status"] = net_RO
.get("status")
719 vld
["status-detailed"] = net_RO
.get("error_msg")
720 vnfr_update
["vld.{}".format(vld_index
)] = vld
724 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
729 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
734 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
739 def _get_ns_config_info(self
, nsr_id
):
741 Generates a mapping between vnf,vdu elements and the N2VC id
742 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
743 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
744 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
745 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
747 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
748 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
750 ns_config_info
= {"osm-config-mapping": mapping
}
751 for vca
in vca_deployed_list
:
752 if not vca
["member-vnf-index"]:
754 if not vca
["vdu_id"]:
755 mapping
[vca
["member-vnf-index"]] = vca
["application"]
759 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
761 ] = vca
["application"]
762 return ns_config_info
764 async def _instantiate_ng_ro(
781 def get_vim_account(vim_account_id
):
783 if vim_account_id
in db_vims
:
784 return db_vims
[vim_account_id
]
785 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
786 db_vims
[vim_account_id
] = db_vim
789 # modify target_vld info with instantiation parameters
790 def parse_vld_instantiation_params(
791 target_vim
, target_vld
, vld_params
, target_sdn
793 if vld_params
.get("ip-profile"):
794 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
797 if vld_params
.get("provider-network"):
798 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
801 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
802 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
805 if vld_params
.get("wimAccountId"):
806 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
807 target_vld
["vim_info"][target_wim
] = {}
808 for param
in ("vim-network-name", "vim-network-id"):
809 if vld_params
.get(param
):
810 if isinstance(vld_params
[param
], dict):
811 for vim
, vim_net
in vld_params
[param
].items():
812 other_target_vim
= "vim:" + vim
814 target_vld
["vim_info"],
815 (other_target_vim
, param
.replace("-", "_")),
818 else: # isinstance str
819 target_vld
["vim_info"][target_vim
][
820 param
.replace("-", "_")
821 ] = vld_params
[param
]
822 if vld_params
.get("common_id"):
823 target_vld
["common_id"] = vld_params
.get("common_id")
825 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
826 def update_ns_vld_target(target
, ns_params
):
827 for vnf_params
in ns_params
.get("vnf", ()):
828 if vnf_params
.get("vimAccountId"):
832 for vnfr
in db_vnfrs
.values()
833 if vnf_params
["member-vnf-index"]
834 == vnfr
["member-vnf-index-ref"]
838 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
839 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
840 target_vld
= find_in_list(
841 get_iterable(vdur
, "interfaces"),
842 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
845 if vnf_params
.get("vimAccountId") not in a_vld
.get(
848 target
["ns"]["vld"][a_index
].get("vim_info").update(
850 "vim:{}".format(vnf_params
["vimAccountId"]): {
851 "vim_network_name": ""
856 nslcmop_id
= db_nslcmop
["_id"]
858 "name": db_nsr
["name"],
861 "image": deepcopy(db_nsr
["image"]),
862 "flavor": deepcopy(db_nsr
["flavor"]),
863 "action_id": nslcmop_id
,
864 "cloud_init_content": {},
866 for image
in target
["image"]:
867 image
["vim_info"] = {}
868 for flavor
in target
["flavor"]:
869 flavor
["vim_info"] = {}
871 if db_nslcmop
.get("lcmOperationType") != "instantiate":
872 # get parameters of instantiation:
873 db_nslcmop_instantiate
= self
.db
.get_list(
876 "nsInstanceId": db_nslcmop
["nsInstanceId"],
877 "lcmOperationType": "instantiate",
880 ns_params
= db_nslcmop_instantiate
.get("operationParams")
882 ns_params
= db_nslcmop
.get("operationParams")
883 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
884 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
887 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
888 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
892 "mgmt-network": vld
.get("mgmt-network", False),
893 "type": vld
.get("type"),
896 "vim_network_name": vld
.get("vim-network-name"),
897 "vim_account_id": ns_params
["vimAccountId"],
901 # check if this network needs SDN assist
902 if vld
.get("pci-interfaces"):
903 db_vim
= get_vim_account(ns_params
["vimAccountId"])
904 sdnc_id
= db_vim
["config"].get("sdn-controller")
906 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
907 target_sdn
= "sdn:{}".format(sdnc_id
)
908 target_vld
["vim_info"][target_sdn
] = {
910 "target_vim": target_vim
,
912 "type": vld
.get("type"),
915 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
916 for nsd_vnf_profile
in nsd_vnf_profiles
:
917 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
918 if cp
["virtual-link-profile-id"] == vld
["id"]:
920 "member_vnf:{}.{}".format(
921 cp
["constituent-cpd-id"][0][
922 "constituent-base-element-id"
924 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
926 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
928 # check at nsd descriptor, if there is an ip-profile
930 nsd_vlp
= find_in_list(
931 get_virtual_link_profiles(nsd
),
932 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
937 and nsd_vlp
.get("virtual-link-protocol-data")
938 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
940 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
943 ip_profile_dest_data
= {}
944 if "ip-version" in ip_profile_source_data
:
945 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
948 if "cidr" in ip_profile_source_data
:
949 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
952 if "gateway-ip" in ip_profile_source_data
:
953 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
956 if "dhcp-enabled" in ip_profile_source_data
:
957 ip_profile_dest_data
["dhcp-params"] = {
958 "enabled": ip_profile_source_data
["dhcp-enabled"]
960 vld_params
["ip-profile"] = ip_profile_dest_data
962 # update vld_params with instantiation params
963 vld_instantiation_params
= find_in_list(
964 get_iterable(ns_params
, "vld"),
965 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
967 if vld_instantiation_params
:
968 vld_params
.update(vld_instantiation_params
)
969 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
970 target
["ns"]["vld"].append(target_vld
)
971 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
972 update_ns_vld_target(target
, ns_params
)
974 for vnfr
in db_vnfrs
.values():
976 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
978 vnf_params
= find_in_list(
979 get_iterable(ns_params
, "vnf"),
980 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
982 target_vnf
= deepcopy(vnfr
)
983 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
984 for vld
in target_vnf
.get("vld", ()):
985 # check if connected to a ns.vld, to fill target'
986 vnf_cp
= find_in_list(
987 vnfd
.get("int-virtual-link-desc", ()),
988 lambda cpd
: cpd
.get("id") == vld
["id"],
991 ns_cp
= "member_vnf:{}.{}".format(
992 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
994 if cp2target
.get(ns_cp
):
995 vld
["target"] = cp2target
[ns_cp
]
998 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1000 # check if this network needs SDN assist
1002 if vld
.get("pci-interfaces"):
1003 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1004 sdnc_id
= db_vim
["config"].get("sdn-controller")
1006 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1007 target_sdn
= "sdn:{}".format(sdnc_id
)
1008 vld
["vim_info"][target_sdn
] = {
1010 "target_vim": target_vim
,
1012 "type": vld
.get("type"),
1015 # check at vnfd descriptor, if there is an ip-profile
1017 vnfd_vlp
= find_in_list(
1018 get_virtual_link_profiles(vnfd
),
1019 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1023 and vnfd_vlp
.get("virtual-link-protocol-data")
1024 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1026 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1029 ip_profile_dest_data
= {}
1030 if "ip-version" in ip_profile_source_data
:
1031 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1034 if "cidr" in ip_profile_source_data
:
1035 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1038 if "gateway-ip" in ip_profile_source_data
:
1039 ip_profile_dest_data
[
1041 ] = ip_profile_source_data
["gateway-ip"]
1042 if "dhcp-enabled" in ip_profile_source_data
:
1043 ip_profile_dest_data
["dhcp-params"] = {
1044 "enabled": ip_profile_source_data
["dhcp-enabled"]
1047 vld_params
["ip-profile"] = ip_profile_dest_data
1048 # update vld_params with instantiation params
1050 vld_instantiation_params
= find_in_list(
1051 get_iterable(vnf_params
, "internal-vld"),
1052 lambda i_vld
: i_vld
["name"] == vld
["id"],
1054 if vld_instantiation_params
:
1055 vld_params
.update(vld_instantiation_params
)
1056 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1059 for vdur
in target_vnf
.get("vdur", ()):
1060 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1061 continue # This vdu must not be created
1062 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1064 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1067 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1068 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1071 and vdu_configuration
.get("config-access")
1072 and vdu_configuration
.get("config-access").get("ssh-access")
1074 vdur
["ssh-keys"] = ssh_keys_all
1075 vdur
["ssh-access-required"] = vdu_configuration
[
1077 ]["ssh-access"]["required"]
1080 and vnf_configuration
.get("config-access")
1081 and vnf_configuration
.get("config-access").get("ssh-access")
1082 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1084 vdur
["ssh-keys"] = ssh_keys_all
1085 vdur
["ssh-access-required"] = vnf_configuration
[
1087 ]["ssh-access"]["required"]
1088 elif ssh_keys_instantiation
and find_in_list(
1089 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1091 vdur
["ssh-keys"] = ssh_keys_instantiation
1093 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1095 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1097 if vdud
.get("cloud-init-file"):
1098 vdur
["cloud-init"] = "{}:file:{}".format(
1099 vnfd
["_id"], vdud
.get("cloud-init-file")
1101 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1102 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1103 base_folder
= vnfd
["_admin"]["storage"]
1104 if base_folder
["pkg-dir"]:
1105 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1106 base_folder
["folder"],
1107 base_folder
["pkg-dir"],
1108 vdud
.get("cloud-init-file"),
1111 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1112 base_folder
["folder"],
1113 vdud
.get("cloud-init-file"),
1115 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1116 target
["cloud_init_content"][
1119 elif vdud
.get("cloud-init"):
1120 vdur
["cloud-init"] = "{}:vdu:{}".format(
1121 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1123 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1124 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1127 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1128 deploy_params_vdu
= self
._format
_additional
_params
(
1129 vdur
.get("additionalParams") or {}
1131 deploy_params_vdu
["OSM"] = get_osm_params(
1132 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1134 vdur
["additionalParams"] = deploy_params_vdu
1137 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1138 if target_vim
not in ns_flavor
["vim_info"]:
1139 ns_flavor
["vim_info"][target_vim
] = {}
1142 # in case alternative images are provided we must check if they should be applied
1143 # for the vim_type, modify the vim_type taking into account
1144 ns_image_id
= int(vdur
["ns-image-id"])
1145 if vdur
.get("alt-image-ids"):
1146 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1147 vim_type
= db_vim
["vim_type"]
1148 for alt_image_id
in vdur
.get("alt-image-ids"):
1149 ns_alt_image
= target
["image"][int(alt_image_id
)]
1150 if vim_type
== ns_alt_image
.get("vim-type"):
1151 # must use alternative image
1153 "use alternative image id: {}".format(alt_image_id
)
1155 ns_image_id
= alt_image_id
1156 vdur
["ns-image-id"] = ns_image_id
1158 ns_image
= target
["image"][int(ns_image_id
)]
1159 if target_vim
not in ns_image
["vim_info"]:
1160 ns_image
["vim_info"][target_vim
] = {}
1162 vdur
["vim_info"] = {target_vim
: {}}
1163 # instantiation parameters
1165 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1166 # vdud["id"]), None)
1167 vdur_list
.append(vdur
)
1168 target_vnf
["vdur"] = vdur_list
1169 target
["vnf"].append(target_vnf
)
1171 desc
= await self
.RO
.deploy(nsr_id
, target
)
1172 self
.logger
.debug("RO return > {}".format(desc
))
1173 action_id
= desc
["action_id"]
1174 await self
._wait
_ng
_ro
(
1175 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1180 "_admin.deployed.RO.operational-status": "running",
1181 "detailed-status": " ".join(stage
),
1183 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1184 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1185 self
._write
_op
_status
(nslcmop_id
, stage
)
1187 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1191 async def _wait_ng_ro(
1200 detailed_status_old
= None
1202 start_time
= start_time
or time()
1203 while time() <= start_time
+ timeout
:
1204 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1205 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1206 if desc_status
["status"] == "FAILED":
1207 raise NgRoException(desc_status
["details"])
1208 elif desc_status
["status"] == "BUILD":
1210 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1211 elif desc_status
["status"] == "DONE":
1213 stage
[2] = "Deployed at VIM"
1216 assert False, "ROclient.check_ns_status returns unknown {}".format(
1217 desc_status
["status"]
1219 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1220 detailed_status_old
= stage
[2]
1221 db_nsr_update
["detailed-status"] = " ".join(stage
)
1222 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1223 self
._write
_op
_status
(nslcmop_id
, stage
)
1224 await asyncio
.sleep(15, loop
=self
.loop
)
1225 else: # timeout_ns_deploy
1226 raise NgRoException("Timeout waiting ns to deploy")
1228 async def _terminate_ng_ro(
1229 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1234 start_deploy
= time()
1241 "action_id": nslcmop_id
,
1243 desc
= await self
.RO
.deploy(nsr_id
, target
)
1244 action_id
= desc
["action_id"]
1245 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1246 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1249 + "ns terminate action at RO. action_id={}".format(action_id
)
1253 delete_timeout
= 20 * 60 # 20 minutes
1254 await self
._wait
_ng
_ro
(
1255 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1258 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1261 await self
.RO
.delete(nsr_id
)
1262 except Exception as e
:
1263 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1264 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1265 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1266 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1270 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1271 failed_detail
.append("delete conflict: {}".format(e
))
1274 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1277 failed_detail
.append("delete error: {}".format(e
))
1280 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1284 stage
[2] = "Error deleting from VIM"
1286 stage
[2] = "Deleted from VIM"
1287 db_nsr_update
["detailed-status"] = " ".join(stage
)
1288 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1289 self
._write
_op
_status
(nslcmop_id
, stage
)
1292 raise LcmException("; ".join(failed_detail
))
1295 async def instantiate_RO(
1309 :param logging_text: preffix text to use at logging
1310 :param nsr_id: nsr identity
1311 :param nsd: database content of ns descriptor
1312 :param db_nsr: database content of ns record
1313 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1315 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1316 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1317 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1318 :return: None or exception
1321 start_deploy
= time()
1322 ns_params
= db_nslcmop
.get("operationParams")
1323 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1324 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1326 timeout_ns_deploy
= self
.timeout
.get(
1327 "ns_deploy", self
.timeout_ns_deploy
1330 # Check for and optionally request placement optimization. Database will be updated if placement activated
1331 stage
[2] = "Waiting for Placement."
1332 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1333 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1334 for vnfr
in db_vnfrs
.values():
1335 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1338 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1340 return await self
._instantiate
_ng
_ro
(
1353 except Exception as e
:
1354 stage
[2] = "ERROR deploying at VIM"
1355 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1357 "Error deploying at VIM {}".format(e
),
1358 exc_info
=not isinstance(
1361 ROclient
.ROClientException
,
1370 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1372 Wait for kdu to be up, get ip address
1373 :param logging_text: prefix use for logging
1380 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1383 while nb_tries
< 360:
1384 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1388 for x
in get_iterable(db_vnfr
, "kdur")
1389 if x
.get("kdu-name") == kdu_name
1395 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1397 if kdur
.get("status"):
1398 if kdur
["status"] in ("READY", "ENABLED"):
1399 return kdur
.get("ip-address")
1402 "target KDU={} is in error state".format(kdu_name
)
1405 await asyncio
.sleep(10, loop
=self
.loop
)
1407 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1409 async def wait_vm_up_insert_key_ro(
1410 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1413 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1414 :param logging_text: prefix use for logging
1419 :param pub_key: public ssh key to inject, None to skip
1420 :param user: user to apply the public ssh key
1424 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1428 target_vdu_id
= None
1434 if ro_retries
>= 360: # 1 hour
1436 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1439 await asyncio
.sleep(10, loop
=self
.loop
)
1442 if not target_vdu_id
:
1443 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1445 if not vdu_id
: # for the VNF case
1446 if db_vnfr
.get("status") == "ERROR":
1448 "Cannot inject ssh-key because target VNF is in error state"
1450 ip_address
= db_vnfr
.get("ip-address")
1456 for x
in get_iterable(db_vnfr
, "vdur")
1457 if x
.get("ip-address") == ip_address
1465 for x
in get_iterable(db_vnfr
, "vdur")
1466 if x
.get("vdu-id-ref") == vdu_id
1467 and x
.get("count-index") == vdu_index
1473 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1474 ): # If only one, this should be the target vdu
1475 vdur
= db_vnfr
["vdur"][0]
1478 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1479 vnfr_id
, vdu_id
, vdu_index
1482 # New generation RO stores information at "vim_info"
1485 if vdur
.get("vim_info"):
1487 t
for t
in vdur
["vim_info"]
1488 ) # there should be only one key
1489 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1491 vdur
.get("pdu-type")
1492 or vdur
.get("status") == "ACTIVE"
1493 or ng_ro_status
== "ACTIVE"
1495 ip_address
= vdur
.get("ip-address")
1498 target_vdu_id
= vdur
["vdu-id-ref"]
1499 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1501 "Cannot inject ssh-key because target VM is in error state"
1504 if not target_vdu_id
:
1507 # inject public key into machine
1508 if pub_key
and user
:
1509 self
.logger
.debug(logging_text
+ "Inserting RO key")
1510 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1511 if vdur
.get("pdu-type"):
1512 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1515 ro_vm_id
= "{}-{}".format(
1516 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1517 ) # TODO add vdu_index
1521 "action": "inject_ssh_key",
1525 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1527 desc
= await self
.RO
.deploy(nsr_id
, target
)
1528 action_id
= desc
["action_id"]
1529 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1532 # wait until NS is deployed at RO
1534 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1535 ro_nsr_id
= deep_get(
1536 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1540 result_dict
= await self
.RO
.create_action(
1542 item_id_name
=ro_nsr_id
,
1544 "add_public_key": pub_key
,
1549 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1550 if not result_dict
or not isinstance(result_dict
, dict):
1552 "Unknown response from RO when injecting key"
1554 for result
in result_dict
.values():
1555 if result
.get("vim_result") == 200:
1558 raise ROclient
.ROClientException(
1559 "error injecting key: {}".format(
1560 result
.get("description")
1564 except NgRoException
as e
:
1566 "Reaching max tries injecting key. Error: {}".format(e
)
1568 except ROclient
.ROClientException
as e
:
1572 + "error injecting key: {}. Retrying until {} seconds".format(
1579 "Reaching max tries injecting key. Error: {}".format(e
)
1586 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1588 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1590 my_vca
= vca_deployed_list
[vca_index
]
1591 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1592 # vdu or kdu: no dependencies
1596 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1597 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1598 configuration_status_list
= db_nsr
["configurationStatus"]
1599 for index
, vca_deployed
in enumerate(configuration_status_list
):
1600 if index
== vca_index
:
1603 if not my_vca
.get("member-vnf-index") or (
1604 vca_deployed
.get("member-vnf-index")
1605 == my_vca
.get("member-vnf-index")
1607 internal_status
= configuration_status_list
[index
].get("status")
1608 if internal_status
== "READY":
1610 elif internal_status
== "BROKEN":
1612 "Configuration aborted because dependent charm/s has failed"
1617 # no dependencies, return
1619 await asyncio
.sleep(10)
1622 raise LcmException("Configuration aborted because dependent charm/s timeout")
1624 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1627 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1629 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1630 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1633 async def instantiate_N2VC(
1650 ee_config_descriptor
,
1652 nsr_id
= db_nsr
["_id"]
1653 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1654 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1655 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1656 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1658 "collection": "nsrs",
1659 "filter": {"_id": nsr_id
},
1660 "path": db_update_entry
,
1666 element_under_configuration
= nsr_id
1670 vnfr_id
= db_vnfr
["_id"]
1671 osm_config
["osm"]["vnf_id"] = vnfr_id
1673 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1675 if vca_type
== "native_charm":
1678 index_number
= vdu_index
or 0
1681 element_type
= "VNF"
1682 element_under_configuration
= vnfr_id
1683 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1685 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1686 element_type
= "VDU"
1687 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1688 osm_config
["osm"]["vdu_id"] = vdu_id
1690 namespace
+= ".{}".format(kdu_name
)
1691 element_type
= "KDU"
1692 element_under_configuration
= kdu_name
1693 osm_config
["osm"]["kdu_name"] = kdu_name
1696 if base_folder
["pkg-dir"]:
1697 artifact_path
= "{}/{}/{}/{}".format(
1698 base_folder
["folder"],
1699 base_folder
["pkg-dir"],
1701 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1706 artifact_path
= "{}/Scripts/{}/{}/".format(
1707 base_folder
["folder"],
1709 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1714 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list
= config_descriptor
.get(
1718 "initial-config-primitive"
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1727 # add config if not present for NS charm
1728 ee_descriptor_id
= ee_config_descriptor
.get("id")
1729 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1730 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id
= vca_deployed
.get("ee_id")
1743 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1744 # create or register execution environment in VCA
1745 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1747 self
._write
_configuration
_status
(
1749 vca_index
=vca_index
,
1751 element_under_configuration
=element_under_configuration
,
1752 element_type
=element_type
,
1755 step
= "create execution environment"
1756 self
.logger
.debug(logging_text
+ step
)
1760 if vca_type
== "k8s_proxy_charm":
1761 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1762 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1763 namespace
=namespace
,
1764 artifact_path
=artifact_path
,
1768 elif vca_type
== "helm" or vca_type
== "helm-v3":
1769 ee_id
, credentials
= await self
.vca_map
[
1771 ].create_execution_environment(
1772 namespace
=namespace
,
1776 artifact_path
=artifact_path
,
1780 ee_id
, credentials
= await self
.vca_map
[
1782 ].create_execution_environment(
1783 namespace
=namespace
,
1789 elif vca_type
== "native_charm":
1790 step
= "Waiting to VM being up and getting IP address"
1791 self
.logger
.debug(logging_text
+ step
)
1792 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1801 credentials
= {"hostname": rw_mgmt_ip
}
1803 username
= deep_get(
1804 config_descriptor
, ("config-access", "ssh-access", "default-user")
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username
and initial_config_primitive_list
:
1809 for config_primitive
in initial_config_primitive_list
:
1810 for param
in config_primitive
.get("parameter", ()):
1811 if param
["name"] == "ssh-username":
1812 username
= param
["value"]
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1819 credentials
["username"] = username
1820 # n2vc_redesign STEP 3.2
1822 self
._write
_configuration
_status
(
1824 vca_index
=vca_index
,
1825 status
="REGISTERING",
1826 element_under_configuration
=element_under_configuration
,
1827 element_type
=element_type
,
1830 step
= "register execution environment {}".format(credentials
)
1831 self
.logger
.debug(logging_text
+ step
)
1832 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1833 credentials
=credentials
,
1834 namespace
=namespace
,
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts
= ee_id
.split(".")
1842 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1843 if len(ee_id_parts
) >= 2:
1844 model_name
= ee_id_parts
[0]
1845 application_name
= ee_id_parts
[1]
1846 db_nsr_update
[db_update_entry
+ "model"] = model_name
1847 db_nsr_update
[db_update_entry
+ "application"] = application_name
1849 # n2vc_redesign STEP 3.3
1850 step
= "Install configuration Software"
1852 self
._write
_configuration
_status
(
1854 vca_index
=vca_index
,
1855 status
="INSTALLING SW",
1856 element_under_configuration
=element_under_configuration
,
1857 element_type
=element_type
,
1858 other_update
=db_nsr_update
,
1861 # TODO check if already done
1862 self
.logger
.debug(logging_text
+ step
)
1864 if vca_type
== "native_charm":
1865 config_primitive
= next(
1866 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1869 if config_primitive
:
1870 config
= self
._map
_primitive
_params
(
1871 config_primitive
, {}, deploy_params
1874 if vca_type
== "lxc_proxy_charm":
1875 if element_type
== "NS":
1876 num_units
= db_nsr
.get("config-units") or 1
1877 elif element_type
== "VNF":
1878 num_units
= db_vnfr
.get("config-units") or 1
1879 elif element_type
== "VDU":
1880 for v
in db_vnfr
["vdur"]:
1881 if vdu_id
== v
["vdu-id-ref"]:
1882 num_units
= v
.get("config-units") or 1
1884 if vca_type
!= "k8s_proxy_charm":
1885 await self
.vca_map
[vca_type
].install_configuration_sw(
1887 artifact_path
=artifact_path
,
1890 num_units
=num_units
,
1895 # write in db flag of configuration_sw already installed
1897 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self
._add
_vca
_relations
(
1902 logging_text
=logging_text
,
1905 vca_index
=vca_index
,
1908 # if SSH access is required, then get execution environment SSH public
1909 # if native charm we have waited already to VM be UP
1910 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1913 # self.logger.debug("get ssh key block")
1915 config_descriptor
, ("config-access", "ssh-access", "required")
1917 # self.logger.debug("ssh key needed")
1918 # Needed to inject a ssh key
1921 ("config-access", "ssh-access", "default-user"),
1923 step
= "Install configuration Software, getting public ssh key"
1924 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1925 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1928 step
= "Insert public key into VM user={} ssh_key={}".format(
1932 # self.logger.debug("no need to get ssh key")
1933 step
= "Waiting to VM being up and getting IP address"
1934 self
.logger
.debug(logging_text
+ step
)
1936 # n2vc_redesign STEP 5.1
1937 # wait for RO (ip-address) Insert pub_key into VM
1940 rw_mgmt_ip
= await self
.wait_kdu_up(
1941 logging_text
, nsr_id
, vnfr_id
, kdu_name
1944 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1954 rw_mgmt_ip
= None # This is for a NS configuration
1956 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1958 # store rw_mgmt_ip in deploy params for later replacement
1959 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1961 # n2vc_redesign STEP 6 Execute initial config primitive
1962 step
= "execute initial config primitive"
1964 # wait for dependent primitives execution (NS -> VNF -> VDU)
1965 if initial_config_primitive_list
:
1966 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1968 # stage, in function of element type: vdu, kdu, vnf or ns
1969 my_vca
= vca_deployed_list
[vca_index
]
1970 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1972 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1973 elif my_vca
.get("member-vnf-index"):
1975 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1978 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1980 self
._write
_configuration
_status
(
1981 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1984 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1986 check_if_terminated_needed
= True
1987 for initial_config_primitive
in initial_config_primitive_list
:
1988 # adding information on the vca_deployed if it is a NS execution environment
1989 if not vca_deployed
["member-vnf-index"]:
1990 deploy_params
["ns_config_info"] = json
.dumps(
1991 self
._get
_ns
_config
_info
(nsr_id
)
1993 # TODO check if already done
1994 primitive_params_
= self
._map
_primitive
_params
(
1995 initial_config_primitive
, {}, deploy_params
1998 step
= "execute primitive '{}' params '{}'".format(
1999 initial_config_primitive
["name"], primitive_params_
2001 self
.logger
.debug(logging_text
+ step
)
2002 await self
.vca_map
[vca_type
].exec_primitive(
2004 primitive_name
=initial_config_primitive
["name"],
2005 params_dict
=primitive_params_
,
2010 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2011 if check_if_terminated_needed
:
2012 if config_descriptor
.get("terminate-config-primitive"):
2014 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2016 check_if_terminated_needed
= False
2018 # TODO register in database that primitive is done
2020 # STEP 7 Configure metrics
2021 if vca_type
== "helm" or vca_type
== "helm-v3":
2022 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2024 artifact_path
=artifact_path
,
2025 ee_config_descriptor
=ee_config_descriptor
,
2028 target_ip
=rw_mgmt_ip
,
2034 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2037 for job
in prometheus_jobs
:
2041 "job_name": job
["job_name"]
2048 step
= "instantiated at VCA"
2049 self
.logger
.debug(logging_text
+ step
)
2051 self
._write
_configuration
_status
(
2052 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2055 except Exception as e
: # TODO not use Exception but N2VC exception
2056 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2058 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2061 "Exception while {} : {}".format(step
, e
), exc_info
=True
2063 self
._write
_configuration
_status
(
2064 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2066 raise LcmException("{} {}".format(step
, e
)) from e
2068 def _write_ns_status(
2072 current_operation
: str,
2073 current_operation_id
: str,
2074 error_description
: str = None,
2075 error_detail
: str = None,
2076 other_update
: dict = None,
2079 Update db_nsr fields.
2082 :param current_operation:
2083 :param current_operation_id:
2084 :param error_description:
2085 :param error_detail:
2086 :param other_update: Other required changes at database if provided, will be cleared
2090 db_dict
= other_update
or {}
2093 ] = current_operation_id
# for backward compatibility
2094 db_dict
["_admin.current-operation"] = current_operation_id
2095 db_dict
["_admin.operation-type"] = (
2096 current_operation
if current_operation
!= "IDLE" else None
2098 db_dict
["currentOperation"] = current_operation
2099 db_dict
["currentOperationID"] = current_operation_id
2100 db_dict
["errorDescription"] = error_description
2101 db_dict
["errorDetail"] = error_detail
2104 db_dict
["nsState"] = ns_state
2105 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2106 except DbException
as e
:
2107 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2109 def _write_op_status(
2113 error_message
: str = None,
2114 queuePosition
: int = 0,
2115 operation_state
: str = None,
2116 other_update
: dict = None,
2119 db_dict
= other_update
or {}
2120 db_dict
["queuePosition"] = queuePosition
2121 if isinstance(stage
, list):
2122 db_dict
["stage"] = stage
[0]
2123 db_dict
["detailed-status"] = " ".join(stage
)
2124 elif stage
is not None:
2125 db_dict
["stage"] = str(stage
)
2127 if error_message
is not None:
2128 db_dict
["errorMessage"] = error_message
2129 if operation_state
is not None:
2130 db_dict
["operationState"] = operation_state
2131 db_dict
["statusEnteredTime"] = time()
2132 self
.update_db_2("nslcmops", op_id
, db_dict
)
2133 except DbException
as e
:
2135 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2138 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2140 nsr_id
= db_nsr
["_id"]
2141 # configurationStatus
2142 config_status
= db_nsr
.get("configurationStatus")
2145 "configurationStatus.{}.status".format(index
): status
2146 for index
, v
in enumerate(config_status
)
2150 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2152 except DbException
as e
:
2154 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2157 def _write_configuration_status(
2162 element_under_configuration
: str = None,
2163 element_type
: str = None,
2164 other_update
: dict = None,
2167 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2168 # .format(vca_index, status))
2171 db_path
= "configurationStatus.{}.".format(vca_index
)
2172 db_dict
= other_update
or {}
2174 db_dict
[db_path
+ "status"] = status
2175 if element_under_configuration
:
2177 db_path
+ "elementUnderConfiguration"
2178 ] = element_under_configuration
2180 db_dict
[db_path
+ "elementType"] = element_type
2181 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2182 except DbException
as e
:
2184 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2185 status
, nsr_id
, vca_index
, e
2189 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2191 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2192 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2193 Database is used because the result can be obtained from a different LCM worker in case of HA.
2194 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2195 :param db_nslcmop: database content of nslcmop
2196 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2197 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2198 computed 'vim-account-id'
2201 nslcmop_id
= db_nslcmop
["_id"]
2202 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2203 if placement_engine
== "PLA":
2205 logging_text
+ "Invoke and wait for placement optimization"
2207 await self
.msg
.aiowrite(
2208 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2210 db_poll_interval
= 5
2211 wait
= db_poll_interval
* 10
2213 while not pla_result
and wait
>= 0:
2214 await asyncio
.sleep(db_poll_interval
)
2215 wait
-= db_poll_interval
2216 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2217 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2221 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2224 for pla_vnf
in pla_result
["vnf"]:
2225 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2226 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2231 {"_id": vnfr
["_id"]},
2232 {"vim-account-id": pla_vnf
["vimAccountId"]},
2235 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2238 def update_nsrs_with_pla_result(self
, params
):
2240 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2242 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2244 except Exception as e
:
2245 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2247 async def instantiate(self
, nsr_id
, nslcmop_id
):
2250 :param nsr_id: ns instance to deploy
2251 :param nslcmop_id: operation to run
2255 # Try to lock HA task here
2256 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2257 if not task_is_locked_by_me
:
2259 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2263 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2264 self
.logger
.debug(logging_text
+ "Enter")
2266 # get all needed from database
2268 # database nsrs record
2271 # database nslcmops record
2274 # update operation on nsrs
2276 # update operation on nslcmops
2277 db_nslcmop_update
= {}
2279 nslcmop_operation_state
= None
2280 db_vnfrs
= {} # vnf's info indexed by member-index
2282 tasks_dict_info
= {} # from task to info text
2286 "Stage 1/5: preparation of the environment.",
2287 "Waiting for previous operations to terminate.",
2290 # ^ stage, step, VIM progress
2292 # wait for any previous tasks in process
2293 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2295 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2296 stage
[1] = "Reading from database."
2297 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2298 db_nsr_update
["detailed-status"] = "creating"
2299 db_nsr_update
["operational-status"] = "init"
2300 self
._write
_ns
_status
(
2302 ns_state
="BUILDING",
2303 current_operation
="INSTANTIATING",
2304 current_operation_id
=nslcmop_id
,
2305 other_update
=db_nsr_update
,
2307 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2309 # read from db: operation
2310 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2311 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2312 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2313 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2314 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2316 ns_params
= db_nslcmop
.get("operationParams")
2317 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2318 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2320 timeout_ns_deploy
= self
.timeout
.get(
2321 "ns_deploy", self
.timeout_ns_deploy
2325 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2326 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2327 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2328 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2329 self
.fs
.sync(db_nsr
["nsd-id"])
2331 # nsr_name = db_nsr["name"] # TODO short-name??
2333 # read from db: vnf's of this ns
2334 stage
[1] = "Getting vnfrs from db."
2335 self
.logger
.debug(logging_text
+ stage
[1])
2336 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2338 # read from db: vnfd's for every vnf
2339 db_vnfds
= [] # every vnfd data
2341 # for each vnf in ns, read vnfd
2342 for vnfr
in db_vnfrs_list
:
2343 if vnfr
.get("kdur"):
2345 for kdur
in vnfr
["kdur"]:
2346 if kdur
.get("additionalParams"):
2347 kdur
["additionalParams"] = json
.loads(kdur
["additionalParams"])
2348 kdur_list
.append(kdur
)
2349 vnfr
["kdur"] = kdur_list
2351 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2352 vnfd_id
= vnfr
["vnfd-id"]
2353 vnfd_ref
= vnfr
["vnfd-ref"]
2354 self
.fs
.sync(vnfd_id
)
2356 # if we haven't this vnfd, read it from db
2357 if vnfd_id
not in db_vnfds
:
2359 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2362 self
.logger
.debug(logging_text
+ stage
[1])
2363 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2366 db_vnfds
.append(vnfd
)
2368 # Get or generates the _admin.deployed.VCA list
2369 vca_deployed_list
= None
2370 if db_nsr
["_admin"].get("deployed"):
2371 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2372 if vca_deployed_list
is None:
2373 vca_deployed_list
= []
2374 configuration_status_list
= []
2375 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2376 db_nsr_update
["configurationStatus"] = configuration_status_list
2377 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2378 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2379 elif isinstance(vca_deployed_list
, dict):
2380 # maintain backward compatibility. Change a dict to list at database
2381 vca_deployed_list
= list(vca_deployed_list
.values())
2382 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2383 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2386 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2388 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2389 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2391 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2392 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2393 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2395 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2398 # n2vc_redesign STEP 2 Deploy Network Scenario
2399 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2400 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2402 stage
[1] = "Deploying KDUs."
2403 # self.logger.debug(logging_text + "Before deploy_kdus")
2404 # Call to deploy_kdus in case exists the "vdu:kdu" param
2405 await self
.deploy_kdus(
2406 logging_text
=logging_text
,
2408 nslcmop_id
=nslcmop_id
,
2411 task_instantiation_info
=tasks_dict_info
,
2414 stage
[1] = "Getting VCA public key."
2415 # n2vc_redesign STEP 1 Get VCA public ssh-key
2416 # feature 1429. Add n2vc public key to needed VMs
2417 n2vc_key
= self
.n2vc
.get_public_key()
2418 n2vc_key_list
= [n2vc_key
]
2419 if self
.vca_config
.get("public_key"):
2420 n2vc_key_list
.append(self
.vca_config
["public_key"])
2422 stage
[1] = "Deploying NS at VIM."
2423 task_ro
= asyncio
.ensure_future(
2424 self
.instantiate_RO(
2425 logging_text
=logging_text
,
2429 db_nslcmop
=db_nslcmop
,
2432 n2vc_key_list
=n2vc_key_list
,
2436 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2437 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2439 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2440 stage
[1] = "Deploying Execution Environments."
2441 self
.logger
.debug(logging_text
+ stage
[1])
2443 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2444 for vnf_profile
in get_vnf_profiles(nsd
):
2445 vnfd_id
= vnf_profile
["vnfd-id"]
2446 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2447 member_vnf_index
= str(vnf_profile
["id"])
2448 db_vnfr
= db_vnfrs
[member_vnf_index
]
2449 base_folder
= vnfd
["_admin"]["storage"]
2455 # Get additional parameters
2456 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2457 if db_vnfr
.get("additionalParamsForVnf"):
2458 deploy_params
.update(
2459 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2462 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2463 if descriptor_config
:
2465 logging_text
=logging_text
2466 + "member_vnf_index={} ".format(member_vnf_index
),
2469 nslcmop_id
=nslcmop_id
,
2475 member_vnf_index
=member_vnf_index
,
2476 vdu_index
=vdu_index
,
2478 deploy_params
=deploy_params
,
2479 descriptor_config
=descriptor_config
,
2480 base_folder
=base_folder
,
2481 task_instantiation_info
=tasks_dict_info
,
2485 # Deploy charms for each VDU that supports one.
2486 for vdud
in get_vdu_list(vnfd
):
2488 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2489 vdur
= find_in_list(
2490 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2493 if vdur
.get("additionalParams"):
2494 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2496 deploy_params_vdu
= deploy_params
2497 deploy_params_vdu
["OSM"] = get_osm_params(
2498 db_vnfr
, vdu_id
, vdu_count_index
=0
2500 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2502 self
.logger
.debug("VDUD > {}".format(vdud
))
2504 "Descriptor config > {}".format(descriptor_config
)
2506 if descriptor_config
:
2509 for vdu_index
in range(vdud_count
):
2510 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2512 logging_text
=logging_text
2513 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2514 member_vnf_index
, vdu_id
, vdu_index
2518 nslcmop_id
=nslcmop_id
,
2524 member_vnf_index
=member_vnf_index
,
2525 vdu_index
=vdu_index
,
2527 deploy_params
=deploy_params_vdu
,
2528 descriptor_config
=descriptor_config
,
2529 base_folder
=base_folder
,
2530 task_instantiation_info
=tasks_dict_info
,
2533 for kdud
in get_kdu_list(vnfd
):
2534 kdu_name
= kdud
["name"]
2535 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2536 if descriptor_config
:
2541 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2543 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2544 if kdur
.get("additionalParams"):
2545 deploy_params_kdu
= parse_yaml_strings(
2546 kdur
["additionalParams"]
2550 logging_text
=logging_text
,
2553 nslcmop_id
=nslcmop_id
,
2559 member_vnf_index
=member_vnf_index
,
2560 vdu_index
=vdu_index
,
2562 deploy_params
=deploy_params_kdu
,
2563 descriptor_config
=descriptor_config
,
2564 base_folder
=base_folder
,
2565 task_instantiation_info
=tasks_dict_info
,
2569 # Check if this NS has a charm configuration
2570 descriptor_config
= nsd
.get("ns-configuration")
2571 if descriptor_config
and descriptor_config
.get("juju"):
2574 member_vnf_index
= None
2580 # Get additional parameters
2581 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2582 if db_nsr
.get("additionalParamsForNs"):
2583 deploy_params
.update(
2584 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2586 base_folder
= nsd
["_admin"]["storage"]
2588 logging_text
=logging_text
,
2591 nslcmop_id
=nslcmop_id
,
2597 member_vnf_index
=member_vnf_index
,
2598 vdu_index
=vdu_index
,
2600 deploy_params
=deploy_params
,
2601 descriptor_config
=descriptor_config
,
2602 base_folder
=base_folder
,
2603 task_instantiation_info
=tasks_dict_info
,
2607 # rest of staff will be done at finally
2610 ROclient
.ROClientException
,
2616 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2619 except asyncio
.CancelledError
:
2621 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2623 exc
= "Operation was cancelled"
2624 except Exception as e
:
2625 exc
= traceback
.format_exc()
2626 self
.logger
.critical(
2627 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2632 error_list
.append(str(exc
))
2634 # wait for pending tasks
2636 stage
[1] = "Waiting for instantiate pending tasks."
2637 self
.logger
.debug(logging_text
+ stage
[1])
2638 error_list
+= await self
._wait
_for
_tasks
(
2646 stage
[1] = stage
[2] = ""
2647 except asyncio
.CancelledError
:
2648 error_list
.append("Cancelled")
2649 # TODO cancel all tasks
2650 except Exception as exc
:
2651 error_list
.append(str(exc
))
2653 # update operation-status
2654 db_nsr_update
["operational-status"] = "running"
2655 # let's begin with VCA 'configured' status (later we can change it)
2656 db_nsr_update
["config-status"] = "configured"
2657 for task
, task_name
in tasks_dict_info
.items():
2658 if not task
.done() or task
.cancelled() or task
.exception():
2659 if task_name
.startswith(self
.task_name_deploy_vca
):
2660 # A N2VC task is pending
2661 db_nsr_update
["config-status"] = "failed"
2663 # RO or KDU task is pending
2664 db_nsr_update
["operational-status"] = "failed"
2666 # update status at database
2668 error_detail
= ". ".join(error_list
)
2669 self
.logger
.error(logging_text
+ error_detail
)
2670 error_description_nslcmop
= "{} Detail: {}".format(
2671 stage
[0], error_detail
2673 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2674 nslcmop_id
, stage
[0]
2677 db_nsr_update
["detailed-status"] = (
2678 error_description_nsr
+ " Detail: " + error_detail
2680 db_nslcmop_update
["detailed-status"] = error_detail
2681 nslcmop_operation_state
= "FAILED"
2685 error_description_nsr
= error_description_nslcmop
= None
2687 db_nsr_update
["detailed-status"] = "Done"
2688 db_nslcmop_update
["detailed-status"] = "Done"
2689 nslcmop_operation_state
= "COMPLETED"
2692 self
._write
_ns
_status
(
2695 current_operation
="IDLE",
2696 current_operation_id
=None,
2697 error_description
=error_description_nsr
,
2698 error_detail
=error_detail
,
2699 other_update
=db_nsr_update
,
2701 self
._write
_op
_status
(
2704 error_message
=error_description_nslcmop
,
2705 operation_state
=nslcmop_operation_state
,
2706 other_update
=db_nslcmop_update
,
2709 if nslcmop_operation_state
:
2711 await self
.msg
.aiowrite(
2716 "nslcmop_id": nslcmop_id
,
2717 "operationState": nslcmop_operation_state
,
2721 except Exception as e
:
2723 logging_text
+ "kafka_write notification Exception {}".format(e
)
2726 self
.logger
.debug(logging_text
+ "Exit")
2727 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2729 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2730 if vnfd_id
not in cached_vnfds
:
2731 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2732 return cached_vnfds
[vnfd_id
]
2734 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2735 if vnf_profile_id
not in cached_vnfrs
:
2736 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2739 "member-vnf-index-ref": vnf_profile_id
,
2740 "nsr-id-ref": nsr_id
,
2743 return cached_vnfrs
[vnf_profile_id
]
2745 def _is_deployed_vca_in_relation(
2746 self
, vca
: DeployedVCA
, relation
: Relation
2749 for endpoint
in (relation
.provider
, relation
.requirer
):
2750 if endpoint
["kdu-resource-profile-id"]:
2753 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2754 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2755 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2761 def _update_ee_relation_data_with_implicit_data(
2762 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2764 ee_relation_data
= safe_get_ee_relation(
2765 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2767 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2768 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2769 "execution-environment-ref"
2771 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2772 vnfd_id
= vnf_profile
["vnfd-id"]
2773 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2776 if ee_relation_level
== EELevel
.VNF
2777 else ee_relation_data
["vdu-profile-id"]
2779 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2782 f
"not execution environments found for ee_relation {ee_relation_data}"
2784 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2785 return ee_relation_data
2787 def _get_ns_relations(
2790 nsd
: Dict
[str, Any
],
2792 cached_vnfds
: Dict
[str, Any
],
2793 ) -> List
[Relation
]:
2795 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2796 for r
in db_ns_relations
:
2797 provider_dict
= None
2798 requirer_dict
= None
2799 if all(key
in r
for key
in ("provider", "requirer")):
2800 provider_dict
= r
["provider"]
2801 requirer_dict
= r
["requirer"]
2802 elif "entities" in r
:
2803 provider_id
= r
["entities"][0]["id"]
2806 "endpoint": r
["entities"][0]["endpoint"],
2808 if provider_id
!= nsd
["id"]:
2809 provider_dict
["vnf-profile-id"] = provider_id
2810 requirer_id
= r
["entities"][1]["id"]
2813 "endpoint": r
["entities"][1]["endpoint"],
2815 if requirer_id
!= nsd
["id"]:
2816 requirer_dict
["vnf-profile-id"] = requirer_id
2818 raise Exception("provider/requirer or entities must be included in the relation.")
2819 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2820 nsr_id
, nsd
, provider_dict
, cached_vnfds
2822 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2823 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2825 provider
= EERelation(relation_provider
)
2826 requirer
= EERelation(relation_requirer
)
2827 relation
= Relation(r
["name"], provider
, requirer
)
2828 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2830 relations
.append(relation
)
2833 def _get_vnf_relations(
2836 nsd
: Dict
[str, Any
],
2838 cached_vnfds
: Dict
[str, Any
],
2839 ) -> List
[Relation
]:
2841 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2842 vnf_profile_id
= vnf_profile
["id"]
2843 vnfd_id
= vnf_profile
["vnfd-id"]
2844 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2845 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2846 for r
in db_vnf_relations
:
2847 provider_dict
= None
2848 requirer_dict
= None
2849 if all(key
in r
for key
in ("provider", "requirer")):
2850 provider_dict
= r
["provider"]
2851 requirer_dict
= r
["requirer"]
2852 elif "entities" in r
:
2853 provider_id
= r
["entities"][0]["id"]
2856 "vnf-profile-id": vnf_profile_id
,
2857 "endpoint": r
["entities"][0]["endpoint"],
2859 if provider_id
!= vnfd_id
:
2860 provider_dict
["vdu-profile-id"] = provider_id
2861 requirer_id
= r
["entities"][1]["id"]
2864 "vnf-profile-id": vnf_profile_id
,
2865 "endpoint": r
["entities"][1]["endpoint"],
2867 if requirer_id
!= vnfd_id
:
2868 requirer_dict
["vdu-profile-id"] = requirer_id
2870 raise Exception("provider/requirer or entities must be included in the relation.")
2871 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2872 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2874 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2875 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2877 provider
= EERelation(relation_provider
)
2878 requirer
= EERelation(relation_requirer
)
2879 relation
= Relation(r
["name"], provider
, requirer
)
2880 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2882 relations
.append(relation
)
2885 def _get_kdu_resource_data(
2887 ee_relation
: EERelation
,
2888 db_nsr
: Dict
[str, Any
],
2889 cached_vnfds
: Dict
[str, Any
],
2890 ) -> DeployedK8sResource
:
2891 nsd
= get_nsd(db_nsr
)
2892 vnf_profiles
= get_vnf_profiles(nsd
)
2893 vnfd_id
= find_in_list(
2895 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2897 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2898 kdu_resource_profile
= get_kdu_resource_profile(
2899 db_vnfd
, ee_relation
.kdu_resource_profile_id
2901 kdu_name
= kdu_resource_profile
["kdu-name"]
2902 deployed_kdu
, _
= get_deployed_kdu(
2903 db_nsr
.get("_admin", ()).get("deployed", ()),
2905 ee_relation
.vnf_profile_id
,
2907 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2910 def _get_deployed_component(
2912 ee_relation
: EERelation
,
2913 db_nsr
: Dict
[str, Any
],
2914 cached_vnfds
: Dict
[str, Any
],
2915 ) -> DeployedComponent
:
2916 nsr_id
= db_nsr
["_id"]
2917 deployed_component
= None
2918 ee_level
= EELevel
.get_level(ee_relation
)
2919 if ee_level
== EELevel
.NS
:
2920 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
2922 deployed_component
= DeployedVCA(nsr_id
, vca
)
2923 elif ee_level
== EELevel
.VNF
:
2924 vca
= get_deployed_vca(
2928 "member-vnf-index": ee_relation
.vnf_profile_id
,
2929 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2933 deployed_component
= DeployedVCA(nsr_id
, vca
)
2934 elif ee_level
== EELevel
.VDU
:
2935 vca
= get_deployed_vca(
2938 "vdu_id": ee_relation
.vdu_profile_id
,
2939 "member-vnf-index": ee_relation
.vnf_profile_id
,
2940 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
2944 deployed_component
= DeployedVCA(nsr_id
, vca
)
2945 elif ee_level
== EELevel
.KDU
:
2946 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
2947 ee_relation
, db_nsr
, cached_vnfds
2949 if kdu_resource_data
:
2950 deployed_component
= DeployedK8sResource(kdu_resource_data
)
2951 return deployed_component
2953 async def _add_relation(
2957 db_nsr
: Dict
[str, Any
],
2958 cached_vnfds
: Dict
[str, Any
],
2959 cached_vnfrs
: Dict
[str, Any
],
2961 deployed_provider
= self
._get
_deployed
_component
(
2962 relation
.provider
, db_nsr
, cached_vnfds
2964 deployed_requirer
= self
._get
_deployed
_component
(
2965 relation
.requirer
, db_nsr
, cached_vnfds
2969 and deployed_requirer
2970 and deployed_provider
.config_sw_installed
2971 and deployed_requirer
.config_sw_installed
2973 provider_db_vnfr
= (
2975 relation
.provider
.nsr_id
,
2976 relation
.provider
.vnf_profile_id
,
2979 if relation
.provider
.vnf_profile_id
2982 requirer_db_vnfr
= (
2984 relation
.requirer
.nsr_id
,
2985 relation
.requirer
.vnf_profile_id
,
2988 if relation
.requirer
.vnf_profile_id
2991 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
2992 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
2993 provider_relation_endpoint
= RelationEndpoint(
2994 deployed_provider
.ee_id
,
2996 relation
.provider
.endpoint
,
2998 requirer_relation_endpoint
= RelationEndpoint(
2999 deployed_requirer
.ee_id
,
3001 relation
.requirer
.endpoint
,
3003 await self
.vca_map
[vca_type
].add_relation(
3004 provider
=provider_relation_endpoint
,
3005 requirer
=requirer_relation_endpoint
,
3007 # remove entry from relations list
3011 async def _add_vca_relations(
3017 timeout
: int = 3600,
3021 # 1. find all relations for this VCA
3022 # 2. wait for other peers related
3026 # STEP 1: find all relations for this VCA
3029 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3030 nsd
= get_nsd(db_nsr
)
3033 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3034 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3039 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3040 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3042 # if no relations, terminate
3044 self
.logger
.debug(logging_text
+ " No relations")
3047 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3054 if now
- start
>= timeout
:
3055 self
.logger
.error(logging_text
+ " : timeout adding relations")
3058 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3059 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3061 # for each relation, find the VCA's related
3062 for relation
in relations
.copy():
3063 added
= await self
._add
_relation
(
3071 relations
.remove(relation
)
3074 self
.logger
.debug("Relations added")
3076 await asyncio
.sleep(5.0)
3080 except Exception as e
:
3081 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3084 async def _install_kdu(
3092 k8s_instance_info
: dict,
3093 k8params
: dict = None,
3099 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3102 "collection": "nsrs",
3103 "filter": {"_id": nsr_id
},
3104 "path": nsr_db_path
,
3107 if k8s_instance_info
.get("kdu-deployment-name"):
3108 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3110 kdu_instance
= self
.k8scluster_map
[
3112 ].generate_kdu_instance_name(
3113 db_dict
=db_dict_install
,
3114 kdu_model
=k8s_instance_info
["kdu-model"],
3115 kdu_name
=k8s_instance_info
["kdu-name"],
3118 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3120 await self
.k8scluster_map
[k8sclustertype
].install(
3121 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3122 kdu_model
=k8s_instance_info
["kdu-model"],
3125 db_dict
=db_dict_install
,
3127 kdu_name
=k8s_instance_info
["kdu-name"],
3128 namespace
=k8s_instance_info
["namespace"],
3129 kdu_instance
=kdu_instance
,
3133 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3136 # Obtain services to obtain management service ip
3137 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3138 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3139 kdu_instance
=kdu_instance
,
3140 namespace
=k8s_instance_info
["namespace"],
3143 # Obtain management service info (if exists)
3144 vnfr_update_dict
= {}
3145 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3147 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3152 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3155 for service
in kdud
.get("service", [])
3156 if service
.get("mgmt-service")
3158 for mgmt_service
in mgmt_services
:
3159 for service
in services
:
3160 if service
["name"].startswith(mgmt_service
["name"]):
3161 # Mgmt service found, Obtain service ip
3162 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3163 if isinstance(ip
, list) and len(ip
) == 1:
3167 "kdur.{}.ip-address".format(kdu_index
)
3170 # Check if must update also mgmt ip at the vnf
3171 service_external_cp
= mgmt_service
.get(
3172 "external-connection-point-ref"
3174 if service_external_cp
:
3176 deep_get(vnfd
, ("mgmt-interface", "cp"))
3177 == service_external_cp
3179 vnfr_update_dict
["ip-address"] = ip
3184 "external-connection-point-ref", ""
3186 == service_external_cp
,
3189 "kdur.{}.ip-address".format(kdu_index
)
3194 "Mgmt service name: {} not found".format(
3195 mgmt_service
["name"]
3199 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3200 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3202 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3205 and kdu_config
.get("initial-config-primitive")
3206 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3208 initial_config_primitive_list
= kdu_config
.get(
3209 "initial-config-primitive"
3211 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3213 for initial_config_primitive
in initial_config_primitive_list
:
3214 primitive_params_
= self
._map
_primitive
_params
(
3215 initial_config_primitive
, {}, {}
3218 await asyncio
.wait_for(
3219 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3220 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3221 kdu_instance
=kdu_instance
,
3222 primitive_name
=initial_config_primitive
["name"],
3223 params
=primitive_params_
,
3224 db_dict
=db_dict_install
,
3230 except Exception as e
:
3231 # Prepare update db with error and raise exception
3234 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3238 vnfr_data
.get("_id"),
3239 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3242 # ignore to keep original exception
3244 # reraise original error
3249 async def deploy_kdus(
3256 task_instantiation_info
,
3258 # Launch kdus if present in the descriptor
3260 k8scluster_id_2_uuic
= {
3261 "helm-chart-v3": {},
3266 async def _get_cluster_id(cluster_id
, cluster_type
):
3267 nonlocal k8scluster_id_2_uuic
3268 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3269 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3271 # check if K8scluster is creating and wait look if previous tasks in process
3272 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3273 "k8scluster", cluster_id
3276 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3277 task_name
, cluster_id
3279 self
.logger
.debug(logging_text
+ text
)
3280 await asyncio
.wait(task_dependency
, timeout
=3600)
3282 db_k8scluster
= self
.db
.get_one(
3283 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3285 if not db_k8scluster
:
3286 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3288 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3290 if cluster_type
== "helm-chart-v3":
3292 # backward compatibility for existing clusters that have not been initialized for helm v3
3293 k8s_credentials
= yaml
.safe_dump(
3294 db_k8scluster
.get("credentials")
3296 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3297 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3299 db_k8scluster_update
= {}
3300 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3301 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3302 db_k8scluster_update
[
3303 "_admin.helm-chart-v3.created"
3305 db_k8scluster_update
[
3306 "_admin.helm-chart-v3.operationalState"
3309 "k8sclusters", cluster_id
, db_k8scluster_update
3311 except Exception as e
:
3314 + "error initializing helm-v3 cluster: {}".format(str(e
))
3317 "K8s cluster '{}' has not been initialized for '{}'".format(
3318 cluster_id
, cluster_type
3323 "K8s cluster '{}' has not been initialized for '{}'".format(
3324 cluster_id
, cluster_type
3327 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3330 logging_text
+= "Deploy kdus: "
3333 db_nsr_update
= {"_admin.deployed.K8s": []}
3334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3337 updated_cluster_list
= []
3338 updated_v3_cluster_list
= []
3340 for vnfr_data
in db_vnfrs
.values():
3341 vca_id
= self
.get_vca_id(vnfr_data
, {})
3342 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3343 # Step 0: Prepare and set parameters
3344 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3345 vnfd_id
= vnfr_data
.get("vnfd-id")
3346 vnfd_with_id
= find_in_list(
3347 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3351 for kdud
in vnfd_with_id
["kdu"]
3352 if kdud
["name"] == kdur
["kdu-name"]
3354 namespace
= kdur
.get("k8s-namespace")
3355 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3356 if kdur
.get("helm-chart"):
3357 kdumodel
= kdur
["helm-chart"]
3358 # Default version: helm3, if helm-version is v2 assign v2
3359 k8sclustertype
= "helm-chart-v3"
3360 self
.logger
.debug("kdur: {}".format(kdur
))
3362 kdur
.get("helm-version")
3363 and kdur
.get("helm-version") == "v2"
3365 k8sclustertype
= "helm-chart"
3366 elif kdur
.get("juju-bundle"):
3367 kdumodel
= kdur
["juju-bundle"]
3368 k8sclustertype
= "juju-bundle"
3371 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3372 "juju-bundle. Maybe an old NBI version is running".format(
3373 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3376 # check if kdumodel is a file and exists
3378 vnfd_with_id
= find_in_list(
3379 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3381 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3382 if storage
: # may be not present if vnfd has not artifacts
3383 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3384 if storage
["pkg-dir"]:
3385 filename
= "{}/{}/{}s/{}".format(
3392 filename
= "{}/Scripts/{}s/{}".format(
3397 if self
.fs
.file_exists(
3398 filename
, mode
="file"
3399 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3400 kdumodel
= self
.fs
.path
+ filename
3401 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3403 except Exception: # it is not a file
3406 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3407 step
= "Synchronize repos for k8s cluster '{}'".format(
3410 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3414 k8sclustertype
== "helm-chart"
3415 and cluster_uuid
not in updated_cluster_list
3417 k8sclustertype
== "helm-chart-v3"
3418 and cluster_uuid
not in updated_v3_cluster_list
3420 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3421 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3422 cluster_uuid
=cluster_uuid
3425 if del_repo_list
or added_repo_dict
:
3426 if k8sclustertype
== "helm-chart":
3428 "_admin.helm_charts_added." + item
: None
3429 for item
in del_repo_list
3432 "_admin.helm_charts_added." + item
: name
3433 for item
, name
in added_repo_dict
.items()
3435 updated_cluster_list
.append(cluster_uuid
)
3436 elif k8sclustertype
== "helm-chart-v3":
3438 "_admin.helm_charts_v3_added." + item
: None
3439 for item
in del_repo_list
3442 "_admin.helm_charts_v3_added." + item
: name
3443 for item
, name
in added_repo_dict
.items()
3445 updated_v3_cluster_list
.append(cluster_uuid
)
3447 logging_text
+ "repos synchronized on k8s cluster "
3448 "'{}' to_delete: {}, to_add: {}".format(
3449 k8s_cluster_id
, del_repo_list
, added_repo_dict
3454 {"_id": k8s_cluster_id
},
3460 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3461 vnfr_data
["member-vnf-index-ref"],
3465 k8s_instance_info
= {
3466 "kdu-instance": None,
3467 "k8scluster-uuid": cluster_uuid
,
3468 "k8scluster-type": k8sclustertype
,
3469 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3470 "kdu-name": kdur
["kdu-name"],
3471 "kdu-model": kdumodel
,
3472 "namespace": namespace
,
3473 "kdu-deployment-name": kdu_deployment_name
,
3475 db_path
= "_admin.deployed.K8s.{}".format(index
)
3476 db_nsr_update
[db_path
] = k8s_instance_info
3477 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3478 vnfd_with_id
= find_in_list(
3479 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3481 task
= asyncio
.ensure_future(
3490 k8params
=desc_params
,
3495 self
.lcm_tasks
.register(
3499 "instantiate_KDU-{}".format(index
),
3502 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3508 except (LcmException
, asyncio
.CancelledError
):
3510 except Exception as e
:
3511 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3512 if isinstance(e
, (N2VCException
, DbException
)):
3513 self
.logger
.error(logging_text
+ msg
)
3515 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3516 raise LcmException(msg
)
3519 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3538 task_instantiation_info
,
3541 # launch instantiate_N2VC in a asyncio task and register task object
3542 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3543 # if not found, create one entry and update database
3544 # fill db_nsr._admin.deployed.VCA.<index>
3547 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3549 if "execution-environment-list" in descriptor_config
:
3550 ee_list
= descriptor_config
.get("execution-environment-list", [])
3551 elif "juju" in descriptor_config
:
3552 ee_list
= [descriptor_config
] # ns charms
3553 else: # other types as script are not supported
3556 for ee_item
in ee_list
:
3559 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3560 ee_item
.get("juju"), ee_item
.get("helm-chart")
3563 ee_descriptor_id
= ee_item
.get("id")
3564 if ee_item
.get("juju"):
3565 vca_name
= ee_item
["juju"].get("charm")
3568 if ee_item
["juju"].get("charm") is not None
3571 if ee_item
["juju"].get("cloud") == "k8s":
3572 vca_type
= "k8s_proxy_charm"
3573 elif ee_item
["juju"].get("proxy") is False:
3574 vca_type
= "native_charm"
3575 elif ee_item
.get("helm-chart"):
3576 vca_name
= ee_item
["helm-chart"]
3577 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3580 vca_type
= "helm-v3"
3583 logging_text
+ "skipping non juju neither charm configuration"
3588 for vca_index
, vca_deployed
in enumerate(
3589 db_nsr
["_admin"]["deployed"]["VCA"]
3591 if not vca_deployed
:
3594 vca_deployed
.get("member-vnf-index") == member_vnf_index
3595 and vca_deployed
.get("vdu_id") == vdu_id
3596 and vca_deployed
.get("kdu_name") == kdu_name
3597 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3598 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3602 # not found, create one.
3604 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3607 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3609 target
+= "/kdu/{}".format(kdu_name
)
3611 "target_element": target
,
3612 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3613 "member-vnf-index": member_vnf_index
,
3615 "kdu_name": kdu_name
,
3616 "vdu_count_index": vdu_index
,
3617 "operational-status": "init", # TODO revise
3618 "detailed-status": "", # TODO revise
3619 "step": "initial-deploy", # TODO revise
3621 "vdu_name": vdu_name
,
3623 "ee_descriptor_id": ee_descriptor_id
,
3627 # create VCA and configurationStatus in db
3629 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3630 "configurationStatus.{}".format(vca_index
): dict(),
3632 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3634 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3636 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3637 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3638 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3641 task_n2vc
= asyncio
.ensure_future(
3642 self
.instantiate_N2VC(
3643 logging_text
=logging_text
,
3644 vca_index
=vca_index
,
3650 vdu_index
=vdu_index
,
3651 deploy_params
=deploy_params
,
3652 config_descriptor
=descriptor_config
,
3653 base_folder
=base_folder
,
3654 nslcmop_id
=nslcmop_id
,
3658 ee_config_descriptor
=ee_item
,
3661 self
.lcm_tasks
.register(
3665 "instantiate_N2VC-{}".format(vca_index
),
3668 task_instantiation_info
[
3670 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3671 member_vnf_index
or "", vdu_id
or ""
3675 def _create_nslcmop(nsr_id
, operation
, params
):
3677 Creates a ns-lcm-opp content to be stored at database.
3678 :param nsr_id: internal id of the instance
3679 :param operation: instantiate, terminate, scale, action, ...
3680 :param params: user parameters for the operation
3681 :return: dictionary following SOL005 format
3683 # Raise exception if invalid arguments
3684 if not (nsr_id
and operation
and params
):
3686 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3693 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3694 "operationState": "PROCESSING",
3695 "statusEnteredTime": now
,
3696 "nsInstanceId": nsr_id
,
3697 "lcmOperationType": operation
,
3699 "isAutomaticInvocation": False,
3700 "operationParams": params
,
3701 "isCancelPending": False,
3703 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3704 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3709 def _format_additional_params(self
, params
):
3710 params
= params
or {}
3711 for key
, value
in params
.items():
3712 if str(value
).startswith("!!yaml "):
3713 params
[key
] = yaml
.safe_load(value
[7:])
3716 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3717 primitive
= seq
.get("name")
3718 primitive_params
= {}
3720 "member_vnf_index": vnf_index
,
3721 "primitive": primitive
,
3722 "primitive_params": primitive_params
,
3725 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3729 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3730 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3731 if op
.get("operationState") == "COMPLETED":
3732 # b. Skip sub-operation
3733 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3734 return self
.SUBOPERATION_STATUS_SKIP
3736 # c. retry executing sub-operation
3737 # The sub-operation exists, and operationState != 'COMPLETED'
3738 # Update operationState = 'PROCESSING' to indicate a retry.
3739 operationState
= "PROCESSING"
3740 detailed_status
= "In progress"
3741 self
._update
_suboperation
_status
(
3742 db_nslcmop
, op_index
, operationState
, detailed_status
3744 # Return the sub-operation index
3745 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3746 # with arguments extracted from the sub-operation
3749 # Find a sub-operation where all keys in a matching dictionary must match
3750 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3751 def _find_suboperation(self
, db_nslcmop
, match
):
3752 if db_nslcmop
and match
:
3753 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3754 for i
, op
in enumerate(op_list
):
3755 if all(op
.get(k
) == match
[k
] for k
in match
):
3757 return self
.SUBOPERATION_STATUS_NOT_FOUND
3759 # Update status for a sub-operation given its index
3760 def _update_suboperation_status(
3761 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3763 # Update DB for HA tasks
3764 q_filter
= {"_id": db_nslcmop
["_id"]}
3766 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3767 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3770 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3773 # Add sub-operation, return the index of the added sub-operation
3774 # Optionally, set operationState, detailed-status, and operationType
3775 # Status and type are currently set for 'scale' sub-operations:
3776 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3777 # 'detailed-status' : status message
3778 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3779 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3780 def _add_suboperation(
3788 mapped_primitive_params
,
3789 operationState
=None,
3790 detailed_status
=None,
3793 RO_scaling_info
=None,
3796 return self
.SUBOPERATION_STATUS_NOT_FOUND
3797 # Get the "_admin.operations" list, if it exists
3798 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3799 op_list
= db_nslcmop_admin
.get("operations")
3800 # Create or append to the "_admin.operations" list
3802 "member_vnf_index": vnf_index
,
3804 "vdu_count_index": vdu_count_index
,
3805 "primitive": primitive
,
3806 "primitive_params": mapped_primitive_params
,
3809 new_op
["operationState"] = operationState
3811 new_op
["detailed-status"] = detailed_status
3813 new_op
["lcmOperationType"] = operationType
3815 new_op
["RO_nsr_id"] = RO_nsr_id
3817 new_op
["RO_scaling_info"] = RO_scaling_info
3819 # No existing operations, create key 'operations' with current operation as first list element
3820 db_nslcmop_admin
.update({"operations": [new_op
]})
3821 op_list
= db_nslcmop_admin
.get("operations")
3823 # Existing operations, append operation to list
3824 op_list
.append(new_op
)
3826 db_nslcmop_update
= {"_admin.operations": op_list
}
3827 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3828 op_index
= len(op_list
) - 1
3831 # Helper methods for scale() sub-operations
3833 # pre-scale/post-scale:
3834 # Check for 3 different cases:
3835 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3836 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3837 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3838 def _check_or_add_scale_suboperation(
3842 vnf_config_primitive
,
3846 RO_scaling_info
=None,
3848 # Find this sub-operation
3849 if RO_nsr_id
and RO_scaling_info
:
3850 operationType
= "SCALE-RO"
3852 "member_vnf_index": vnf_index
,
3853 "RO_nsr_id": RO_nsr_id
,
3854 "RO_scaling_info": RO_scaling_info
,
3858 "member_vnf_index": vnf_index
,
3859 "primitive": vnf_config_primitive
,
3860 "primitive_params": primitive_params
,
3861 "lcmOperationType": operationType
,
3863 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3864 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3865 # a. New sub-operation
3866 # The sub-operation does not exist, add it.
3867 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3868 # The following parameters are set to None for all kind of scaling:
3870 vdu_count_index
= None
3872 if RO_nsr_id
and RO_scaling_info
:
3873 vnf_config_primitive
= None
3874 primitive_params
= None
3877 RO_scaling_info
= None
3878 # Initial status for sub-operation
3879 operationState
= "PROCESSING"
3880 detailed_status
= "In progress"
3881 # Add sub-operation for pre/post-scaling (zero or more operations)
3882 self
._add
_suboperation
(
3888 vnf_config_primitive
,
3896 return self
.SUBOPERATION_STATUS_NEW
3898 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3899 # or op_index (operationState != 'COMPLETED')
3900 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3902 # Function to return execution_environment id
3904 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3905 # TODO vdu_index_count
3906 for vca
in vca_deployed_list
:
3907 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3910 async def destroy_N2VC(
3918 exec_primitives
=True,
3923 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3924 :param logging_text:
3926 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3927 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3928 :param vca_index: index in the database _admin.deployed.VCA
3929 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3930 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3931 not executed properly
3932 :param scaling_in: True destroys the application, False destroys the model
3933 :return: None or exception
3938 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3939 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3943 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3945 # execute terminate_primitives
3947 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3948 config_descriptor
.get("terminate-config-primitive"),
3949 vca_deployed
.get("ee_descriptor_id"),
3951 vdu_id
= vca_deployed
.get("vdu_id")
3952 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3953 vdu_name
= vca_deployed
.get("vdu_name")
3954 vnf_index
= vca_deployed
.get("member-vnf-index")
3955 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3956 for seq
in terminate_primitives
:
3957 # For each sequence in list, get primitive and call _ns_execute_primitive()
3958 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3959 vnf_index
, seq
.get("name")
3961 self
.logger
.debug(logging_text
+ step
)
3962 # Create the primitive for each sequence, i.e. "primitive": "touch"
3963 primitive
= seq
.get("name")
3964 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3969 self
._add
_suboperation
(
3976 mapped_primitive_params
,
3978 # Sub-operations: Call _ns_execute_primitive() instead of action()
3980 result
, result_detail
= await self
._ns
_execute
_primitive
(
3981 vca_deployed
["ee_id"],
3983 mapped_primitive_params
,
3987 except LcmException
:
3988 # this happens when VCA is not deployed. In this case it is not needed to terminate
3990 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3991 if result
not in result_ok
:
3993 "terminate_primitive {} for vnf_member_index={} fails with "
3994 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3996 # set that this VCA do not need terminated
3997 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4001 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4004 # Delete Prometheus Jobs if any
4005 # This uses NSR_ID, so it will destroy any jobs under this index
4006 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4009 await self
.vca_map
[vca_type
].delete_execution_environment(
4010 vca_deployed
["ee_id"],
4011 scaling_in
=scaling_in
,
4016 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4017 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4018 namespace
= "." + db_nsr
["_id"]
4020 await self
.n2vc
.delete_namespace(
4021 namespace
=namespace
,
4022 total_timeout
=self
.timeout_charm_delete
,
4025 except N2VCNotFound
: # already deleted. Skip
4027 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4029 async def _terminate_RO(
4030 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4033 Terminates a deployment from RO
4034 :param logging_text:
4035 :param nsr_deployed: db_nsr._admin.deployed
4038 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4039 this method will update only the index 2, but it will write on database the concatenated content of the list
4044 ro_nsr_id
= ro_delete_action
= None
4045 if nsr_deployed
and nsr_deployed
.get("RO"):
4046 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4047 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4050 stage
[2] = "Deleting ns from VIM."
4051 db_nsr_update
["detailed-status"] = " ".join(stage
)
4052 self
._write
_op
_status
(nslcmop_id
, stage
)
4053 self
.logger
.debug(logging_text
+ stage
[2])
4054 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4055 self
._write
_op
_status
(nslcmop_id
, stage
)
4056 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4057 ro_delete_action
= desc
["action_id"]
4059 "_admin.deployed.RO.nsr_delete_action_id"
4060 ] = ro_delete_action
4061 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4062 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4063 if ro_delete_action
:
4064 # wait until NS is deleted from VIM
4065 stage
[2] = "Waiting ns deleted from VIM."
4066 detailed_status_old
= None
4070 + " RO_id={} ro_delete_action={}".format(
4071 ro_nsr_id
, ro_delete_action
4074 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4075 self
._write
_op
_status
(nslcmop_id
, stage
)
4077 delete_timeout
= 20 * 60 # 20 minutes
4078 while delete_timeout
> 0:
4079 desc
= await self
.RO
.show(
4081 item_id_name
=ro_nsr_id
,
4082 extra_item
="action",
4083 extra_item_id
=ro_delete_action
,
4087 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4089 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4090 if ns_status
== "ERROR":
4091 raise ROclient
.ROClientException(ns_status_info
)
4092 elif ns_status
== "BUILD":
4093 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4094 elif ns_status
== "ACTIVE":
4095 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4096 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4101 ), "ROclient.check_action_status returns unknown {}".format(
4104 if stage
[2] != detailed_status_old
:
4105 detailed_status_old
= stage
[2]
4106 db_nsr_update
["detailed-status"] = " ".join(stage
)
4107 self
._write
_op
_status
(nslcmop_id
, stage
)
4108 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4109 await asyncio
.sleep(5, loop
=self
.loop
)
4111 else: # delete_timeout <= 0:
4112 raise ROclient
.ROClientException(
4113 "Timeout waiting ns deleted from VIM"
4116 except Exception as e
:
4117 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4119 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4121 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4122 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4123 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4125 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4128 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4130 failed_detail
.append("delete conflict: {}".format(e
))
4133 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4136 failed_detail
.append("delete error: {}".format(e
))
4138 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4142 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4143 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4145 stage
[2] = "Deleting nsd from RO."
4146 db_nsr_update
["detailed-status"] = " ".join(stage
)
4147 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4148 self
._write
_op
_status
(nslcmop_id
, stage
)
4149 await self
.RO
.delete("nsd", ro_nsd_id
)
4151 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4153 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4154 except Exception as e
:
4156 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4158 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4160 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4163 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4165 failed_detail
.append(
4166 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4168 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4170 failed_detail
.append(
4171 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4173 self
.logger
.error(logging_text
+ failed_detail
[-1])
4175 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4176 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4177 if not vnf_deployed
or not vnf_deployed
["id"]:
4180 ro_vnfd_id
= vnf_deployed
["id"]
4183 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4184 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4186 db_nsr_update
["detailed-status"] = " ".join(stage
)
4187 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4188 self
._write
_op
_status
(nslcmop_id
, stage
)
4189 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4191 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4193 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4194 except Exception as e
:
4196 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4199 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4203 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4206 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4208 failed_detail
.append(
4209 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4211 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4213 failed_detail
.append(
4214 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4216 self
.logger
.error(logging_text
+ failed_detail
[-1])
4219 stage
[2] = "Error deleting from VIM"
4221 stage
[2] = "Deleted from VIM"
4222 db_nsr_update
["detailed-status"] = " ".join(stage
)
4223 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4224 self
._write
_op
_status
(nslcmop_id
, stage
)
4227 raise LcmException("; ".join(failed_detail
))
4229 async def terminate(self
, nsr_id
, nslcmop_id
):
4230 # Try to lock HA task here
4231 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4232 if not task_is_locked_by_me
:
4235 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4236 self
.logger
.debug(logging_text
+ "Enter")
4237 timeout_ns_terminate
= self
.timeout_ns_terminate
4240 operation_params
= None
4242 error_list
= [] # annotates all failed error messages
4243 db_nslcmop_update
= {}
4244 autoremove
= False # autoremove after terminated
4245 tasks_dict_info
= {}
4248 "Stage 1/3: Preparing task.",
4249 "Waiting for previous operations to terminate.",
4252 # ^ contains [stage, step, VIM-status]
4254 # wait for any previous tasks in process
4255 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4257 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4258 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4259 operation_params
= db_nslcmop
.get("operationParams") or {}
4260 if operation_params
.get("timeout_ns_terminate"):
4261 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4262 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4263 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4265 db_nsr_update
["operational-status"] = "terminating"
4266 db_nsr_update
["config-status"] = "terminating"
4267 self
._write
_ns
_status
(
4269 ns_state
="TERMINATING",
4270 current_operation
="TERMINATING",
4271 current_operation_id
=nslcmop_id
,
4272 other_update
=db_nsr_update
,
4274 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4275 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4276 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4279 stage
[1] = "Getting vnf descriptors from db."
4280 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4282 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4284 db_vnfds_from_id
= {}
4285 db_vnfds_from_member_index
= {}
4287 for vnfr
in db_vnfrs_list
:
4288 vnfd_id
= vnfr
["vnfd-id"]
4289 if vnfd_id
not in db_vnfds_from_id
:
4290 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4291 db_vnfds_from_id
[vnfd_id
] = vnfd
4292 db_vnfds_from_member_index
[
4293 vnfr
["member-vnf-index-ref"]
4294 ] = db_vnfds_from_id
[vnfd_id
]
4296 # Destroy individual execution environments when there are terminating primitives.
4297 # Rest of EE will be deleted at once
4298 # TODO - check before calling _destroy_N2VC
4299 # if not operation_params.get("skip_terminate_primitives"):#
4300 # or not vca.get("needed_terminate"):
4301 stage
[0] = "Stage 2/3 execute terminating primitives."
4302 self
.logger
.debug(logging_text
+ stage
[0])
4303 stage
[1] = "Looking execution environment that needs terminate."
4304 self
.logger
.debug(logging_text
+ stage
[1])
4306 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4307 config_descriptor
= None
4308 vca_member_vnf_index
= vca
.get("member-vnf-index")
4309 vca_id
= self
.get_vca_id(
4310 db_vnfrs_dict
.get(vca_member_vnf_index
)
4311 if vca_member_vnf_index
4315 if not vca
or not vca
.get("ee_id"):
4317 if not vca
.get("member-vnf-index"):
4319 config_descriptor
= db_nsr
.get("ns-configuration")
4320 elif vca
.get("vdu_id"):
4321 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4322 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4323 elif vca
.get("kdu_name"):
4324 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4325 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4327 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4328 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4329 vca_type
= vca
.get("type")
4330 exec_terminate_primitives
= not operation_params
.get(
4331 "skip_terminate_primitives"
4332 ) and vca
.get("needed_terminate")
4333 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4334 # pending native charms
4336 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4338 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4339 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4340 task
= asyncio
.ensure_future(
4348 exec_terminate_primitives
,
4352 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4354 # wait for pending tasks of terminate primitives
4358 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4360 error_list
= await self
._wait
_for
_tasks
(
4363 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4367 tasks_dict_info
.clear()
4369 return # raise LcmException("; ".join(error_list))
4371 # remove All execution environments at once
4372 stage
[0] = "Stage 3/3 delete all."
4374 if nsr_deployed
.get("VCA"):
4375 stage
[1] = "Deleting all execution environments."
4376 self
.logger
.debug(logging_text
+ stage
[1])
4377 vca_id
= self
.get_vca_id({}, db_nsr
)
4378 task_delete_ee
= asyncio
.ensure_future(
4380 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4381 timeout
=self
.timeout_charm_delete
,
4384 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4385 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4387 # Delete from k8scluster
4388 stage
[1] = "Deleting KDUs."
4389 self
.logger
.debug(logging_text
+ stage
[1])
4390 # print(nsr_deployed)
4391 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4392 if not kdu
or not kdu
.get("kdu-instance"):
4394 kdu_instance
= kdu
.get("kdu-instance")
4395 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4396 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4397 vca_id
= self
.get_vca_id({}, db_nsr
)
4398 task_delete_kdu_instance
= asyncio
.ensure_future(
4399 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4400 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4401 kdu_instance
=kdu_instance
,
4408 + "Unknown k8s deployment type {}".format(
4409 kdu
.get("k8scluster-type")
4414 task_delete_kdu_instance
4415 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4418 stage
[1] = "Deleting ns from VIM."
4420 task_delete_ro
= asyncio
.ensure_future(
4421 self
._terminate
_ng
_ro
(
4422 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4426 task_delete_ro
= asyncio
.ensure_future(
4428 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4431 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4433 # rest of staff will be done at finally
4436 ROclient
.ROClientException
,
4441 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4443 except asyncio
.CancelledError
:
4445 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4447 exc
= "Operation was cancelled"
4448 except Exception as e
:
4449 exc
= traceback
.format_exc()
4450 self
.logger
.critical(
4451 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4456 error_list
.append(str(exc
))
4458 # wait for pending tasks
4460 stage
[1] = "Waiting for terminate pending tasks."
4461 self
.logger
.debug(logging_text
+ stage
[1])
4462 error_list
+= await self
._wait
_for
_tasks
(
4465 timeout_ns_terminate
,
4469 stage
[1] = stage
[2] = ""
4470 except asyncio
.CancelledError
:
4471 error_list
.append("Cancelled")
4472 # TODO cancell all tasks
4473 except Exception as exc
:
4474 error_list
.append(str(exc
))
4475 # update status at database
4477 error_detail
= "; ".join(error_list
)
4478 # self.logger.error(logging_text + error_detail)
4479 error_description_nslcmop
= "{} Detail: {}".format(
4480 stage
[0], error_detail
4482 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4483 nslcmop_id
, stage
[0]
4486 db_nsr_update
["operational-status"] = "failed"
4487 db_nsr_update
["detailed-status"] = (
4488 error_description_nsr
+ " Detail: " + error_detail
4490 db_nslcmop_update
["detailed-status"] = error_detail
4491 nslcmop_operation_state
= "FAILED"
4495 error_description_nsr
= error_description_nslcmop
= None
4496 ns_state
= "NOT_INSTANTIATED"
4497 db_nsr_update
["operational-status"] = "terminated"
4498 db_nsr_update
["detailed-status"] = "Done"
4499 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4500 db_nslcmop_update
["detailed-status"] = "Done"
4501 nslcmop_operation_state
= "COMPLETED"
4504 self
._write
_ns
_status
(
4507 current_operation
="IDLE",
4508 current_operation_id
=None,
4509 error_description
=error_description_nsr
,
4510 error_detail
=error_detail
,
4511 other_update
=db_nsr_update
,
4513 self
._write
_op
_status
(
4516 error_message
=error_description_nslcmop
,
4517 operation_state
=nslcmop_operation_state
,
4518 other_update
=db_nslcmop_update
,
4520 if ns_state
== "NOT_INSTANTIATED":
4524 {"nsr-id-ref": nsr_id
},
4525 {"_admin.nsState": "NOT_INSTANTIATED"},
4527 except DbException
as e
:
4530 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4534 if operation_params
:
4535 autoremove
= operation_params
.get("autoremove", False)
4536 if nslcmop_operation_state
:
4538 await self
.msg
.aiowrite(
4543 "nslcmop_id": nslcmop_id
,
4544 "operationState": nslcmop_operation_state
,
4545 "autoremove": autoremove
,
4549 except Exception as e
:
4551 logging_text
+ "kafka_write notification Exception {}".format(e
)
4554 self
.logger
.debug(logging_text
+ "Exit")
4555 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4557 async def _wait_for_tasks(
4558 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4561 error_detail_list
= []
4563 pending_tasks
= list(created_tasks_info
.keys())
4564 num_tasks
= len(pending_tasks
)
4566 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4567 self
._write
_op
_status
(nslcmop_id
, stage
)
4568 while pending_tasks
:
4570 _timeout
= timeout
+ time_start
- time()
4571 done
, pending_tasks
= await asyncio
.wait(
4572 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4574 num_done
+= len(done
)
4575 if not done
: # Timeout
4576 for task
in pending_tasks
:
4577 new_error
= created_tasks_info
[task
] + ": Timeout"
4578 error_detail_list
.append(new_error
)
4579 error_list
.append(new_error
)
4582 if task
.cancelled():
4585 exc
= task
.exception()
4587 if isinstance(exc
, asyncio
.TimeoutError
):
4589 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4590 error_list
.append(created_tasks_info
[task
])
4591 error_detail_list
.append(new_error
)
4598 ROclient
.ROClientException
,
4604 self
.logger
.error(logging_text
+ new_error
)
4606 exc_traceback
= "".join(
4607 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4611 + created_tasks_info
[task
]
4617 logging_text
+ created_tasks_info
[task
] + ": Done"
4619 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4621 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4622 if nsr_id
: # update also nsr
4627 "errorDescription": "Error at: " + ", ".join(error_list
),
4628 "errorDetail": ". ".join(error_detail_list
),
4631 self
._write
_op
_status
(nslcmop_id
, stage
)
4632 return error_detail_list
4635 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4637 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4638 The default-value is used. If it is between < > it look for a value at instantiation_params
4639 :param primitive_desc: portion of VNFD/NSD that describes primitive
4640 :param params: Params provided by user
4641 :param instantiation_params: Instantiation params provided by user
4642 :return: a dictionary with the calculated params
4644 calculated_params
= {}
4645 for parameter
in primitive_desc
.get("parameter", ()):
4646 param_name
= parameter
["name"]
4647 if param_name
in params
:
4648 calculated_params
[param_name
] = params
[param_name
]
4649 elif "default-value" in parameter
or "value" in parameter
:
4650 if "value" in parameter
:
4651 calculated_params
[param_name
] = parameter
["value"]
4653 calculated_params
[param_name
] = parameter
["default-value"]
4655 isinstance(calculated_params
[param_name
], str)
4656 and calculated_params
[param_name
].startswith("<")
4657 and calculated_params
[param_name
].endswith(">")
4659 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4660 calculated_params
[param_name
] = instantiation_params
[
4661 calculated_params
[param_name
][1:-1]
4665 "Parameter {} needed to execute primitive {} not provided".format(
4666 calculated_params
[param_name
], primitive_desc
["name"]
4671 "Parameter {} needed to execute primitive {} not provided".format(
4672 param_name
, primitive_desc
["name"]
4676 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4677 calculated_params
[param_name
] = yaml
.safe_dump(
4678 calculated_params
[param_name
], default_flow_style
=True, width
=256
4680 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4682 ].startswith("!!yaml "):
4683 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4684 if parameter
.get("data-type") == "INTEGER":
4686 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4687 except ValueError: # error converting string to int
4689 "Parameter {} of primitive {} must be integer".format(
4690 param_name
, primitive_desc
["name"]
4693 elif parameter
.get("data-type") == "BOOLEAN":
4694 calculated_params
[param_name
] = not (
4695 (str(calculated_params
[param_name
])).lower() == "false"
4698 # add always ns_config_info if primitive name is config
4699 if primitive_desc
["name"] == "config":
4700 if "ns_config_info" in instantiation_params
:
4701 calculated_params
["ns_config_info"] = instantiation_params
[
4704 return calculated_params
4706 def _look_for_deployed_vca(
4713 ee_descriptor_id
=None,
4715 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4716 for vca
in deployed_vca
:
4719 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4722 vdu_count_index
is not None
4723 and vdu_count_index
!= vca
["vdu_count_index"]
4726 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4728 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4732 # vca_deployed not found
4734 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4735 " is not deployed".format(
4744 ee_id
= vca
.get("ee_id")
4746 "type", "lxc_proxy_charm"
4747 ) # default value for backward compatibility - proxy charm
4750 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4751 "execution environment".format(
4752 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4755 return ee_id
, vca_type
4757 async def _ns_execute_primitive(
4763 retries_interval
=30,
4770 if primitive
== "config":
4771 primitive_params
= {"params": primitive_params
}
4773 vca_type
= vca_type
or "lxc_proxy_charm"
4777 output
= await asyncio
.wait_for(
4778 self
.vca_map
[vca_type
].exec_primitive(
4780 primitive_name
=primitive
,
4781 params_dict
=primitive_params
,
4782 progress_timeout
=self
.timeout_progress_primitive
,
4783 total_timeout
=self
.timeout_primitive
,
4788 timeout
=timeout
or self
.timeout_primitive
,
4792 except asyncio
.CancelledError
:
4794 except Exception as e
: # asyncio.TimeoutError
4795 if isinstance(e
, asyncio
.TimeoutError
):
4800 "Error executing action {} on {} -> {}".format(
4805 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4807 return "FAILED", str(e
)
4809 return "COMPLETED", output
4811 except (LcmException
, asyncio
.CancelledError
):
4813 except Exception as e
:
4814 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4816 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4818 Updating the vca_status with latest juju information in nsrs record
4819 :param: nsr_id: Id of the nsr
4820 :param: nslcmop_id: Id of the nslcmop
4824 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4825 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4826 vca_id
= self
.get_vca_id({}, db_nsr
)
4827 if db_nsr
["_admin"]["deployed"]["K8s"]:
4828 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4829 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4830 await self
._on
_update
_k
8s
_db
(
4831 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4834 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4835 table
, filter = "nsrs", {"_id": nsr_id
}
4836 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4837 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4839 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4840 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4842 async def action(self
, nsr_id
, nslcmop_id
):
4843 # Try to lock HA task here
4844 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4845 if not task_is_locked_by_me
:
4848 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4849 self
.logger
.debug(logging_text
+ "Enter")
4850 # get all needed from database
4854 db_nslcmop_update
= {}
4855 nslcmop_operation_state
= None
4856 error_description_nslcmop
= None
4859 # wait for any previous tasks in process
4860 step
= "Waiting for previous operations to terminate"
4861 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4863 self
._write
_ns
_status
(
4866 current_operation
="RUNNING ACTION",
4867 current_operation_id
=nslcmop_id
,
4870 step
= "Getting information from database"
4871 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4872 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4873 if db_nslcmop
["operationParams"].get("primitive_params"):
4874 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4875 db_nslcmop
["operationParams"]["primitive_params"]
4878 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4879 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4880 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4881 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4882 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4883 primitive
= db_nslcmop
["operationParams"]["primitive"]
4884 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4885 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4886 "timeout_ns_action", self
.timeout_primitive
4890 step
= "Getting vnfr from database"
4891 db_vnfr
= self
.db
.get_one(
4892 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4894 if db_vnfr
.get("kdur"):
4896 for kdur
in db_vnfr
["kdur"]:
4897 if kdur
.get("additionalParams"):
4898 kdur
["additionalParams"] = json
.loads(kdur
["additionalParams"])
4899 kdur_list
.append(kdur
)
4900 db_vnfr
["kdur"] = kdur_list
4901 step
= "Getting vnfd from database"
4902 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4904 step
= "Getting nsd from database"
4905 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4907 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4908 # for backward compatibility
4909 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4910 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4911 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4912 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4914 # look for primitive
4915 config_primitive_desc
= descriptor_configuration
= None
4917 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4919 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4921 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4923 descriptor_configuration
= db_nsd
.get("ns-configuration")
4925 if descriptor_configuration
and descriptor_configuration
.get(
4928 for config_primitive
in descriptor_configuration
["config-primitive"]:
4929 if config_primitive
["name"] == primitive
:
4930 config_primitive_desc
= config_primitive
4933 if not config_primitive_desc
:
4934 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4936 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4940 primitive_name
= primitive
4941 ee_descriptor_id
= None
4943 primitive_name
= config_primitive_desc
.get(
4944 "execution-environment-primitive", primitive
4946 ee_descriptor_id
= config_primitive_desc
.get(
4947 "execution-environment-ref"
4953 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4955 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4958 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4960 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4962 desc_params
= parse_yaml_strings(
4963 db_vnfr
.get("additionalParamsForVnf")
4966 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4967 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4968 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4970 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4971 actions
.add(primitive
["name"])
4972 for primitive
in kdu_configuration
.get("config-primitive", []):
4973 actions
.add(primitive
["name"])
4974 kdu_action
= True if primitive_name
in actions
else False
4976 # TODO check if ns is in a proper status
4978 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4980 # kdur and desc_params already set from before
4981 if primitive_params
:
4982 desc_params
.update(primitive_params
)
4983 # TODO Check if we will need something at vnf level
4984 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4986 kdu_name
== kdu
["kdu-name"]
4987 and kdu
["member-vnf-index"] == vnf_index
4992 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4995 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4996 msg
= "unknown k8scluster-type '{}'".format(
4997 kdu
.get("k8scluster-type")
4999 raise LcmException(msg
)
5002 "collection": "nsrs",
5003 "filter": {"_id": nsr_id
},
5004 "path": "_admin.deployed.K8s.{}".format(index
),
5008 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5010 step
= "Executing kdu {}".format(primitive_name
)
5011 if primitive_name
== "upgrade":
5012 if desc_params
.get("kdu_model"):
5013 kdu_model
= desc_params
.get("kdu_model")
5014 del desc_params
["kdu_model"]
5016 kdu_model
= kdu
.get("kdu-model")
5017 parts
= kdu_model
.split(sep
=":")
5019 kdu_model
= parts
[0]
5021 detailed_status
= await asyncio
.wait_for(
5022 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5023 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5024 kdu_instance
=kdu
.get("kdu-instance"),
5026 kdu_model
=kdu_model
,
5029 timeout
=timeout_ns_action
,
5031 timeout
=timeout_ns_action
+ 10,
5034 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5036 elif primitive_name
== "rollback":
5037 detailed_status
= await asyncio
.wait_for(
5038 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5039 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5040 kdu_instance
=kdu
.get("kdu-instance"),
5043 timeout
=timeout_ns_action
,
5045 elif primitive_name
== "status":
5046 detailed_status
= await asyncio
.wait_for(
5047 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5048 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5049 kdu_instance
=kdu
.get("kdu-instance"),
5052 timeout
=timeout_ns_action
,
5055 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5056 kdu
["kdu-name"], nsr_id
5058 params
= self
._map
_primitive
_params
(
5059 config_primitive_desc
, primitive_params
, desc_params
5062 detailed_status
= await asyncio
.wait_for(
5063 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5064 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5065 kdu_instance
=kdu_instance
,
5066 primitive_name
=primitive_name
,
5069 timeout
=timeout_ns_action
,
5072 timeout
=timeout_ns_action
,
5076 nslcmop_operation_state
= "COMPLETED"
5078 detailed_status
= ""
5079 nslcmop_operation_state
= "FAILED"
5081 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5082 nsr_deployed
["VCA"],
5083 member_vnf_index
=vnf_index
,
5085 vdu_count_index
=vdu_count_index
,
5086 ee_descriptor_id
=ee_descriptor_id
,
5088 for vca_index
, vca_deployed
in enumerate(
5089 db_nsr
["_admin"]["deployed"]["VCA"]
5091 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5093 "collection": "nsrs",
5094 "filter": {"_id": nsr_id
},
5095 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5099 nslcmop_operation_state
,
5101 ) = await self
._ns
_execute
_primitive
(
5103 primitive
=primitive_name
,
5104 primitive_params
=self
._map
_primitive
_params
(
5105 config_primitive_desc
, primitive_params
, desc_params
5107 timeout
=timeout_ns_action
,
5113 db_nslcmop_update
["detailed-status"] = detailed_status
5114 error_description_nslcmop
= (
5115 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5119 + " task Done with result {} {}".format(
5120 nslcmop_operation_state
, detailed_status
5123 return # database update is called inside finally
5125 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5126 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5128 except asyncio
.CancelledError
:
5130 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5132 exc
= "Operation was cancelled"
5133 except asyncio
.TimeoutError
:
5134 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5136 except Exception as e
:
5137 exc
= traceback
.format_exc()
5138 self
.logger
.critical(
5139 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5148 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5149 nslcmop_operation_state
= "FAILED"
5151 self
._write
_ns
_status
(
5155 ], # TODO check if degraded. For the moment use previous status
5156 current_operation
="IDLE",
5157 current_operation_id
=None,
5158 # error_description=error_description_nsr,
5159 # error_detail=error_detail,
5160 other_update
=db_nsr_update
,
5163 self
._write
_op
_status
(
5166 error_message
=error_description_nslcmop
,
5167 operation_state
=nslcmop_operation_state
,
5168 other_update
=db_nslcmop_update
,
5171 if nslcmop_operation_state
:
5173 await self
.msg
.aiowrite(
5178 "nslcmop_id": nslcmop_id
,
5179 "operationState": nslcmop_operation_state
,
5183 except Exception as e
:
5185 logging_text
+ "kafka_write notification Exception {}".format(e
)
5187 self
.logger
.debug(logging_text
+ "Exit")
5188 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5189 return nslcmop_operation_state
, detailed_status
5191 async def scale(self
, nsr_id
, nslcmop_id
):
5192 # Try to lock HA task here
5193 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5194 if not task_is_locked_by_me
:
5197 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5198 stage
= ["", "", ""]
5199 tasks_dict_info
= {}
5200 # ^ stage, step, VIM progress
5201 self
.logger
.debug(logging_text
+ "Enter")
5202 # get all needed from database
5204 db_nslcmop_update
= {}
5207 # in case of error, indicates what part of scale was failed to put nsr at error status
5208 scale_process
= None
5209 old_operational_status
= ""
5210 old_config_status
= ""
5213 # wait for any previous tasks in process
5214 step
= "Waiting for previous operations to terminate"
5215 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5216 self
._write
_ns
_status
(
5219 current_operation
="SCALING",
5220 current_operation_id
=nslcmop_id
,
5223 step
= "Getting nslcmop from database"
5225 step
+ " after having waited for previous tasks to be completed"
5227 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5229 step
= "Getting nsr from database"
5230 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5231 old_operational_status
= db_nsr
["operational-status"]
5232 old_config_status
= db_nsr
["config-status"]
5234 step
= "Parsing scaling parameters"
5235 db_nsr_update
["operational-status"] = "scaling"
5236 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5237 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5239 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5241 ]["member-vnf-index"]
5242 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5244 ]["scaling-group-descriptor"]
5245 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5246 # for backward compatibility
5247 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5248 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5249 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5250 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5252 step
= "Getting vnfr from database"
5253 db_vnfr
= self
.db
.get_one(
5254 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5257 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5259 step
= "Getting vnfd from database"
5260 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5262 base_folder
= db_vnfd
["_admin"]["storage"]
5264 step
= "Getting scaling-group-descriptor"
5265 scaling_descriptor
= find_in_list(
5266 get_scaling_aspect(db_vnfd
),
5267 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5269 if not scaling_descriptor
:
5271 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5272 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5275 step
= "Sending scale order to VIM"
5276 # TODO check if ns is in a proper status
5278 if not db_nsr
["_admin"].get("scaling-group"):
5283 "_admin.scaling-group": [
5284 {"name": scaling_group
, "nb-scale-op": 0}
5288 admin_scale_index
= 0
5290 for admin_scale_index
, admin_scale_info
in enumerate(
5291 db_nsr
["_admin"]["scaling-group"]
5293 if admin_scale_info
["name"] == scaling_group
:
5294 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5296 else: # not found, set index one plus last element and add new entry with the name
5297 admin_scale_index
+= 1
5299 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5302 vca_scaling_info
= []
5303 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5304 if scaling_type
== "SCALE_OUT":
5305 if "aspect-delta-details" not in scaling_descriptor
:
5307 "Aspect delta details not fount in scaling descriptor {}".format(
5308 scaling_descriptor
["name"]
5311 # count if max-instance-count is reached
5312 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5314 scaling_info
["scaling_direction"] = "OUT"
5315 scaling_info
["vdu-create"] = {}
5316 scaling_info
["kdu-create"] = {}
5317 for delta
in deltas
:
5318 for vdu_delta
in delta
.get("vdu-delta", {}):
5319 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5320 # vdu_index also provides the number of instance of the targeted vdu
5321 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5322 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5326 additional_params
= (
5327 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5330 cloud_init_list
= []
5332 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5333 max_instance_count
= 10
5334 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5335 max_instance_count
= vdu_profile
.get(
5336 "max-number-of-instances", 10
5339 default_instance_num
= get_number_of_instances(
5342 instances_number
= vdu_delta
.get("number-of-instances", 1)
5343 nb_scale_op
+= instances_number
5345 new_instance_count
= nb_scale_op
+ default_instance_num
5346 # Control if new count is over max and vdu count is less than max.
5347 # Then assign new instance count
5348 if new_instance_count
> max_instance_count
> vdu_count
:
5349 instances_number
= new_instance_count
- max_instance_count
5351 instances_number
= instances_number
5353 if new_instance_count
> max_instance_count
:
5355 "reached the limit of {} (max-instance-count) "
5356 "scaling-out operations for the "
5357 "scaling-group-descriptor '{}'".format(
5358 nb_scale_op
, scaling_group
5361 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5363 # TODO Information of its own ip is not available because db_vnfr is not updated.
5364 additional_params
["OSM"] = get_osm_params(
5365 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5367 cloud_init_list
.append(
5368 self
._parse
_cloud
_init
(
5375 vca_scaling_info
.append(
5377 "osm_vdu_id": vdu_delta
["id"],
5378 "member-vnf-index": vnf_index
,
5380 "vdu_index": vdu_index
+ x
,
5383 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5384 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5385 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5386 kdu_name
= kdu_profile
["kdu-name"]
5387 resource_name
= kdu_profile
["resource-name"]
5389 # Might have different kdus in the same delta
5390 # Should have list for each kdu
5391 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5392 scaling_info
["kdu-create"][kdu_name
] = []
5394 kdur
= get_kdur(db_vnfr
, kdu_name
)
5395 if kdur
.get("helm-chart"):
5396 k8s_cluster_type
= "helm-chart-v3"
5397 self
.logger
.debug("kdur: {}".format(kdur
))
5399 kdur
.get("helm-version")
5400 and kdur
.get("helm-version") == "v2"
5402 k8s_cluster_type
= "helm-chart"
5403 raise NotImplementedError
5404 elif kdur
.get("juju-bundle"):
5405 k8s_cluster_type
= "juju-bundle"
5408 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5409 "juju-bundle. Maybe an old NBI version is running".format(
5410 db_vnfr
["member-vnf-index-ref"], kdu_name
5414 max_instance_count
= 10
5415 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5416 max_instance_count
= kdu_profile
.get(
5417 "max-number-of-instances", 10
5420 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5421 deployed_kdu
, _
= get_deployed_kdu(
5422 nsr_deployed
, kdu_name
, vnf_index
5424 if deployed_kdu
is None:
5426 "KDU '{}' for vnf '{}' not deployed".format(
5430 kdu_instance
= deployed_kdu
.get("kdu-instance")
5431 instance_num
= await self
.k8scluster_map
[
5433 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5434 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5435 "number-of-instances", 1
5438 # Control if new count is over max and instance_num is less than max.
5439 # Then assign max instance number to kdu replica count
5440 if kdu_replica_count
> max_instance_count
> instance_num
:
5441 kdu_replica_count
= max_instance_count
5442 if kdu_replica_count
> max_instance_count
:
5444 "reached the limit of {} (max-instance-count) "
5445 "scaling-out operations for the "
5446 "scaling-group-descriptor '{}'".format(
5447 instance_num
, scaling_group
5451 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5452 vca_scaling_info
.append(
5454 "osm_kdu_id": kdu_name
,
5455 "member-vnf-index": vnf_index
,
5457 "kdu_index": instance_num
+ x
- 1,
5460 scaling_info
["kdu-create"][kdu_name
].append(
5462 "member-vnf-index": vnf_index
,
5464 "k8s-cluster-type": k8s_cluster_type
,
5465 "resource-name": resource_name
,
5466 "scale": kdu_replica_count
,
5469 elif scaling_type
== "SCALE_IN":
5470 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5472 scaling_info
["scaling_direction"] = "IN"
5473 scaling_info
["vdu-delete"] = {}
5474 scaling_info
["kdu-delete"] = {}
5476 for delta
in deltas
:
5477 for vdu_delta
in delta
.get("vdu-delta", {}):
5478 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5479 min_instance_count
= 0
5480 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5481 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5482 min_instance_count
= vdu_profile
["min-number-of-instances"]
5484 default_instance_num
= get_number_of_instances(
5485 db_vnfd
, vdu_delta
["id"]
5487 instance_num
= vdu_delta
.get("number-of-instances", 1)
5488 nb_scale_op
-= instance_num
5490 new_instance_count
= nb_scale_op
+ default_instance_num
5492 if new_instance_count
< min_instance_count
< vdu_count
:
5493 instances_number
= min_instance_count
- new_instance_count
5495 instances_number
= instance_num
5497 if new_instance_count
< min_instance_count
:
5499 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5500 "scaling-group-descriptor '{}'".format(
5501 nb_scale_op
, scaling_group
5504 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5505 vca_scaling_info
.append(
5507 "osm_vdu_id": vdu_delta
["id"],
5508 "member-vnf-index": vnf_index
,
5510 "vdu_index": vdu_index
- 1 - x
,
5513 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5514 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5515 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
5516 kdu_name
= kdu_profile
["kdu-name"]
5517 resource_name
= kdu_profile
["resource-name"]
5519 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5520 scaling_info
["kdu-delete"][kdu_name
] = []
5522 kdur
= get_kdur(db_vnfr
, kdu_name
)
5523 if kdur
.get("helm-chart"):
5524 k8s_cluster_type
= "helm-chart-v3"
5525 self
.logger
.debug("kdur: {}".format(kdur
))
5527 kdur
.get("helm-version")
5528 and kdur
.get("helm-version") == "v2"
5530 k8s_cluster_type
= "helm-chart"
5531 raise NotImplementedError
5532 elif kdur
.get("juju-bundle"):
5533 k8s_cluster_type
= "juju-bundle"
5536 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5537 "juju-bundle. Maybe an old NBI version is running".format(
5538 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5542 min_instance_count
= 0
5543 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5544 min_instance_count
= kdu_profile
["min-number-of-instances"]
5546 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5547 deployed_kdu
, _
= get_deployed_kdu(
5548 nsr_deployed
, kdu_name
, vnf_index
5550 if deployed_kdu
is None:
5552 "KDU '{}' for vnf '{}' not deployed".format(
5556 kdu_instance
= deployed_kdu
.get("kdu-instance")
5557 instance_num
= await self
.k8scluster_map
[
5559 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5560 kdu_replica_count
= instance_num
- kdu_delta
.get(
5561 "number-of-instances", 1
5564 if kdu_replica_count
< min_instance_count
< instance_num
:
5565 kdu_replica_count
= min_instance_count
5566 if kdu_replica_count
< min_instance_count
:
5568 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5569 "scaling-group-descriptor '{}'".format(
5570 instance_num
, scaling_group
5574 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5575 vca_scaling_info
.append(
5577 "osm_kdu_id": kdu_name
,
5578 "member-vnf-index": vnf_index
,
5580 "kdu_index": instance_num
- x
- 1,
5583 scaling_info
["kdu-delete"][kdu_name
].append(
5585 "member-vnf-index": vnf_index
,
5587 "k8s-cluster-type": k8s_cluster_type
,
5588 "resource-name": resource_name
,
5589 "scale": kdu_replica_count
,
5593 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5594 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5595 if scaling_info
["scaling_direction"] == "IN":
5596 for vdur
in reversed(db_vnfr
["vdur"]):
5597 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5598 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5599 scaling_info
["vdu"].append(
5601 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5602 "vdu_id": vdur
["vdu-id-ref"],
5606 for interface
in vdur
["interfaces"]:
5607 scaling_info
["vdu"][-1]["interface"].append(
5609 "name": interface
["name"],
5610 "ip_address": interface
["ip-address"],
5611 "mac_address": interface
.get("mac-address"),
5614 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5617 step
= "Executing pre-scale vnf-config-primitive"
5618 if scaling_descriptor
.get("scaling-config-action"):
5619 for scaling_config_action
in scaling_descriptor
[
5620 "scaling-config-action"
5623 scaling_config_action
.get("trigger") == "pre-scale-in"
5624 and scaling_type
== "SCALE_IN"
5626 scaling_config_action
.get("trigger") == "pre-scale-out"
5627 and scaling_type
== "SCALE_OUT"
5629 vnf_config_primitive
= scaling_config_action
[
5630 "vnf-config-primitive-name-ref"
5632 step
= db_nslcmop_update
[
5634 ] = "executing pre-scale scaling-config-action '{}'".format(
5635 vnf_config_primitive
5638 # look for primitive
5639 for config_primitive
in (
5640 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5641 ).get("config-primitive", ()):
5642 if config_primitive
["name"] == vnf_config_primitive
:
5646 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5647 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5648 "primitive".format(scaling_group
, vnf_config_primitive
)
5651 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5652 if db_vnfr
.get("additionalParamsForVnf"):
5653 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5655 scale_process
= "VCA"
5656 db_nsr_update
["config-status"] = "configuring pre-scaling"
5657 primitive_params
= self
._map
_primitive
_params
(
5658 config_primitive
, {}, vnfr_params
5661 # Pre-scale retry check: Check if this sub-operation has been executed before
5662 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5665 vnf_config_primitive
,
5669 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5670 # Skip sub-operation
5671 result
= "COMPLETED"
5672 result_detail
= "Done"
5675 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5676 vnf_config_primitive
, result
, result_detail
5680 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5681 # New sub-operation: Get index of this sub-operation
5683 len(db_nslcmop
.get("_admin", {}).get("operations"))
5688 + "vnf_config_primitive={} New sub-operation".format(
5689 vnf_config_primitive
5693 # retry: Get registered params for this existing sub-operation
5694 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5697 vnf_index
= op
.get("member_vnf_index")
5698 vnf_config_primitive
= op
.get("primitive")
5699 primitive_params
= op
.get("primitive_params")
5702 + "vnf_config_primitive={} Sub-operation retry".format(
5703 vnf_config_primitive
5706 # Execute the primitive, either with new (first-time) or registered (reintent) args
5707 ee_descriptor_id
= config_primitive
.get(
5708 "execution-environment-ref"
5710 primitive_name
= config_primitive
.get(
5711 "execution-environment-primitive", vnf_config_primitive
5713 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5714 nsr_deployed
["VCA"],
5715 member_vnf_index
=vnf_index
,
5717 vdu_count_index
=None,
5718 ee_descriptor_id
=ee_descriptor_id
,
5720 result
, result_detail
= await self
._ns
_execute
_primitive
(
5729 + "vnf_config_primitive={} Done with result {} {}".format(
5730 vnf_config_primitive
, result
, result_detail
5733 # Update operationState = COMPLETED | FAILED
5734 self
._update
_suboperation
_status
(
5735 db_nslcmop
, op_index
, result
, result_detail
5738 if result
== "FAILED":
5739 raise LcmException(result_detail
)
5740 db_nsr_update
["config-status"] = old_config_status
5741 scale_process
= None
5745 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5748 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5751 # SCALE-IN VCA - BEGIN
5752 if vca_scaling_info
:
5753 step
= db_nslcmop_update
[
5755 ] = "Deleting the execution environments"
5756 scale_process
= "VCA"
5757 for vca_info
in vca_scaling_info
:
5758 if vca_info
["type"] == "delete":
5759 member_vnf_index
= str(vca_info
["member-vnf-index"])
5761 logging_text
+ "vdu info: {}".format(vca_info
)
5763 if vca_info
.get("osm_vdu_id"):
5764 vdu_id
= vca_info
["osm_vdu_id"]
5765 vdu_index
= int(vca_info
["vdu_index"])
5768 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5769 member_vnf_index
, vdu_id
, vdu_index
5773 kdu_id
= vca_info
["osm_kdu_id"]
5776 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5777 member_vnf_index
, kdu_id
, vdu_index
5779 stage
[2] = step
= "Scaling in VCA"
5780 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5781 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5782 config_update
= db_nsr
["configurationStatus"]
5783 for vca_index
, vca
in enumerate(vca_update
):
5785 (vca
or vca
.get("ee_id"))
5786 and vca
["member-vnf-index"] == member_vnf_index
5787 and vca
["vdu_count_index"] == vdu_index
5789 if vca
.get("vdu_id"):
5790 config_descriptor
= get_configuration(
5791 db_vnfd
, vca
.get("vdu_id")
5793 elif vca
.get("kdu_name"):
5794 config_descriptor
= get_configuration(
5795 db_vnfd
, vca
.get("kdu_name")
5798 config_descriptor
= get_configuration(
5799 db_vnfd
, db_vnfd
["id"]
5801 operation_params
= (
5802 db_nslcmop
.get("operationParams") or {}
5804 exec_terminate_primitives
= not operation_params
.get(
5805 "skip_terminate_primitives"
5806 ) and vca
.get("needed_terminate")
5807 task
= asyncio
.ensure_future(
5816 exec_primitives
=exec_terminate_primitives
,
5820 timeout
=self
.timeout_charm_delete
,
5823 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5826 del vca_update
[vca_index
]
5827 del config_update
[vca_index
]
5828 # wait for pending tasks of terminate primitives
5832 + "Waiting for tasks {}".format(
5833 list(tasks_dict_info
.keys())
5836 error_list
= await self
._wait
_for
_tasks
(
5840 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5845 tasks_dict_info
.clear()
5847 raise LcmException("; ".join(error_list
))
5849 db_vca_and_config_update
= {
5850 "_admin.deployed.VCA": vca_update
,
5851 "configurationStatus": config_update
,
5854 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5856 scale_process
= None
5857 # SCALE-IN VCA - END
5860 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5861 scale_process
= "RO"
5862 if self
.ro_config
.get("ng"):
5863 await self
._scale
_ng
_ro
(
5864 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5866 scaling_info
.pop("vdu-create", None)
5867 scaling_info
.pop("vdu-delete", None)
5869 scale_process
= None
5873 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5874 scale_process
= "KDU"
5875 await self
._scale
_kdu
(
5876 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5878 scaling_info
.pop("kdu-create", None)
5879 scaling_info
.pop("kdu-delete", None)
5881 scale_process
= None
5885 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5887 # SCALE-UP VCA - BEGIN
5888 if vca_scaling_info
:
5889 step
= db_nslcmop_update
[
5891 ] = "Creating new execution environments"
5892 scale_process
= "VCA"
5893 for vca_info
in vca_scaling_info
:
5894 if vca_info
["type"] == "create":
5895 member_vnf_index
= str(vca_info
["member-vnf-index"])
5897 logging_text
+ "vdu info: {}".format(vca_info
)
5899 vnfd_id
= db_vnfr
["vnfd-ref"]
5900 if vca_info
.get("osm_vdu_id"):
5901 vdu_index
= int(vca_info
["vdu_index"])
5902 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5903 if db_vnfr
.get("additionalParamsForVnf"):
5904 deploy_params
.update(
5906 db_vnfr
["additionalParamsForVnf"].copy()
5909 descriptor_config
= get_configuration(
5910 db_vnfd
, db_vnfd
["id"]
5912 if descriptor_config
:
5917 logging_text
=logging_text
5918 + "member_vnf_index={} ".format(member_vnf_index
),
5921 nslcmop_id
=nslcmop_id
,
5927 member_vnf_index
=member_vnf_index
,
5928 vdu_index
=vdu_index
,
5930 deploy_params
=deploy_params
,
5931 descriptor_config
=descriptor_config
,
5932 base_folder
=base_folder
,
5933 task_instantiation_info
=tasks_dict_info
,
5936 vdu_id
= vca_info
["osm_vdu_id"]
5937 vdur
= find_in_list(
5938 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5940 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5941 if vdur
.get("additionalParams"):
5942 deploy_params_vdu
= parse_yaml_strings(
5943 vdur
["additionalParams"]
5946 deploy_params_vdu
= deploy_params
5947 deploy_params_vdu
["OSM"] = get_osm_params(
5948 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5950 if descriptor_config
:
5955 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5956 member_vnf_index
, vdu_id
, vdu_index
5958 stage
[2] = step
= "Scaling out VCA"
5959 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5961 logging_text
=logging_text
5962 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5963 member_vnf_index
, vdu_id
, vdu_index
5967 nslcmop_id
=nslcmop_id
,
5973 member_vnf_index
=member_vnf_index
,
5974 vdu_index
=vdu_index
,
5976 deploy_params
=deploy_params_vdu
,
5977 descriptor_config
=descriptor_config
,
5978 base_folder
=base_folder
,
5979 task_instantiation_info
=tasks_dict_info
,
5983 kdu_name
= vca_info
["osm_kdu_id"]
5984 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5985 if descriptor_config
:
5987 kdu_index
= int(vca_info
["kdu_index"])
5991 for x
in db_vnfr
["kdur"]
5992 if x
["kdu-name"] == kdu_name
5994 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5995 if kdur
.get("additionalParams"):
5996 deploy_params_kdu
= parse_yaml_strings(
5997 kdur
["additionalParams"]
6001 logging_text
=logging_text
,
6004 nslcmop_id
=nslcmop_id
,
6010 member_vnf_index
=member_vnf_index
,
6011 vdu_index
=kdu_index
,
6013 deploy_params
=deploy_params_kdu
,
6014 descriptor_config
=descriptor_config
,
6015 base_folder
=base_folder
,
6016 task_instantiation_info
=tasks_dict_info
,
6019 # SCALE-UP VCA - END
6020 scale_process
= None
6023 # execute primitive service POST-SCALING
6024 step
= "Executing post-scale vnf-config-primitive"
6025 if scaling_descriptor
.get("scaling-config-action"):
6026 for scaling_config_action
in scaling_descriptor
[
6027 "scaling-config-action"
6030 scaling_config_action
.get("trigger") == "post-scale-in"
6031 and scaling_type
== "SCALE_IN"
6033 scaling_config_action
.get("trigger") == "post-scale-out"
6034 and scaling_type
== "SCALE_OUT"
6036 vnf_config_primitive
= scaling_config_action
[
6037 "vnf-config-primitive-name-ref"
6039 step
= db_nslcmop_update
[
6041 ] = "executing post-scale scaling-config-action '{}'".format(
6042 vnf_config_primitive
6045 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6046 if db_vnfr
.get("additionalParamsForVnf"):
6047 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6049 # look for primitive
6050 for config_primitive
in (
6051 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6052 ).get("config-primitive", ()):
6053 if config_primitive
["name"] == vnf_config_primitive
:
6057 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6058 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6059 "config-primitive".format(
6060 scaling_group
, vnf_config_primitive
6063 scale_process
= "VCA"
6064 db_nsr_update
["config-status"] = "configuring post-scaling"
6065 primitive_params
= self
._map
_primitive
_params
(
6066 config_primitive
, {}, vnfr_params
6069 # Post-scale retry check: Check if this sub-operation has been executed before
6070 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6073 vnf_config_primitive
,
6077 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6078 # Skip sub-operation
6079 result
= "COMPLETED"
6080 result_detail
= "Done"
6083 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6084 vnf_config_primitive
, result
, result_detail
6088 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6089 # New sub-operation: Get index of this sub-operation
6091 len(db_nslcmop
.get("_admin", {}).get("operations"))
6096 + "vnf_config_primitive={} New sub-operation".format(
6097 vnf_config_primitive
6101 # retry: Get registered params for this existing sub-operation
6102 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6105 vnf_index
= op
.get("member_vnf_index")
6106 vnf_config_primitive
= op
.get("primitive")
6107 primitive_params
= op
.get("primitive_params")
6110 + "vnf_config_primitive={} Sub-operation retry".format(
6111 vnf_config_primitive
6114 # Execute the primitive, either with new (first-time) or registered (reintent) args
6115 ee_descriptor_id
= config_primitive
.get(
6116 "execution-environment-ref"
6118 primitive_name
= config_primitive
.get(
6119 "execution-environment-primitive", vnf_config_primitive
6121 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6122 nsr_deployed
["VCA"],
6123 member_vnf_index
=vnf_index
,
6125 vdu_count_index
=None,
6126 ee_descriptor_id
=ee_descriptor_id
,
6128 result
, result_detail
= await self
._ns
_execute
_primitive
(
6137 + "vnf_config_primitive={} Done with result {} {}".format(
6138 vnf_config_primitive
, result
, result_detail
6141 # Update operationState = COMPLETED | FAILED
6142 self
._update
_suboperation
_status
(
6143 db_nslcmop
, op_index
, result
, result_detail
6146 if result
== "FAILED":
6147 raise LcmException(result_detail
)
6148 db_nsr_update
["config-status"] = old_config_status
6149 scale_process
= None
6154 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6155 db_nsr_update
["operational-status"] = (
6157 if old_operational_status
== "failed"
6158 else old_operational_status
6160 db_nsr_update
["config-status"] = old_config_status
6163 ROclient
.ROClientException
,
6168 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6170 except asyncio
.CancelledError
:
6172 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6174 exc
= "Operation was cancelled"
6175 except Exception as e
:
6176 exc
= traceback
.format_exc()
6177 self
.logger
.critical(
6178 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6182 self
._write
_ns
_status
(
6185 current_operation
="IDLE",
6186 current_operation_id
=None,
6189 stage
[1] = "Waiting for instantiate pending tasks."
6190 self
.logger
.debug(logging_text
+ stage
[1])
6191 exc
= await self
._wait
_for
_tasks
(
6194 self
.timeout_ns_deploy
,
6202 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6203 nslcmop_operation_state
= "FAILED"
6205 db_nsr_update
["operational-status"] = old_operational_status
6206 db_nsr_update
["config-status"] = old_config_status
6207 db_nsr_update
["detailed-status"] = ""
6209 if "VCA" in scale_process
:
6210 db_nsr_update
["config-status"] = "failed"
6211 if "RO" in scale_process
:
6212 db_nsr_update
["operational-status"] = "failed"
6215 ] = "FAILED scaling nslcmop={} {}: {}".format(
6216 nslcmop_id
, step
, exc
6219 error_description_nslcmop
= None
6220 nslcmop_operation_state
= "COMPLETED"
6221 db_nslcmop_update
["detailed-status"] = "Done"
6223 self
._write
_op
_status
(
6226 error_message
=error_description_nslcmop
,
6227 operation_state
=nslcmop_operation_state
,
6228 other_update
=db_nslcmop_update
,
6231 self
._write
_ns
_status
(
6234 current_operation
="IDLE",
6235 current_operation_id
=None,
6236 other_update
=db_nsr_update
,
6239 if nslcmop_operation_state
:
6243 "nslcmop_id": nslcmop_id
,
6244 "operationState": nslcmop_operation_state
,
6246 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6247 except Exception as e
:
6249 logging_text
+ "kafka_write notification Exception {}".format(e
)
6251 self
.logger
.debug(logging_text
+ "Exit")
6252 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6254 async def _scale_kdu(
6255 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6257 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6258 for kdu_name
in _scaling_info
:
6259 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6260 deployed_kdu
, index
= get_deployed_kdu(
6261 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6263 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6264 kdu_instance
= deployed_kdu
["kdu-instance"]
6265 scale
= int(kdu_scaling_info
["scale"])
6266 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6269 "collection": "nsrs",
6270 "filter": {"_id": nsr_id
},
6271 "path": "_admin.deployed.K8s.{}".format(index
),
6274 step
= "scaling application {}".format(
6275 kdu_scaling_info
["resource-name"]
6277 self
.logger
.debug(logging_text
+ step
)
6279 if kdu_scaling_info
["type"] == "delete":
6280 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6283 and kdu_config
.get("terminate-config-primitive")
6284 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6286 terminate_config_primitive_list
= kdu_config
.get(
6287 "terminate-config-primitive"
6289 terminate_config_primitive_list
.sort(
6290 key
=lambda val
: int(val
["seq"])
6294 terminate_config_primitive
6295 ) in terminate_config_primitive_list
:
6296 primitive_params_
= self
._map
_primitive
_params
(
6297 terminate_config_primitive
, {}, {}
6299 step
= "execute terminate config primitive"
6300 self
.logger
.debug(logging_text
+ step
)
6301 await asyncio
.wait_for(
6302 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6303 cluster_uuid
=cluster_uuid
,
6304 kdu_instance
=kdu_instance
,
6305 primitive_name
=terminate_config_primitive
["name"],
6306 params
=primitive_params_
,
6313 await asyncio
.wait_for(
6314 self
.k8scluster_map
[k8s_cluster_type
].scale(
6317 kdu_scaling_info
["resource-name"],
6320 timeout
=self
.timeout_vca_on_error
,
6323 if kdu_scaling_info
["type"] == "create":
6324 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6327 and kdu_config
.get("initial-config-primitive")
6328 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6330 initial_config_primitive_list
= kdu_config
.get(
6331 "initial-config-primitive"
6333 initial_config_primitive_list
.sort(
6334 key
=lambda val
: int(val
["seq"])
6337 for initial_config_primitive
in initial_config_primitive_list
:
6338 primitive_params_
= self
._map
_primitive
_params
(
6339 initial_config_primitive
, {}, {}
6341 step
= "execute initial config primitive"
6342 self
.logger
.debug(logging_text
+ step
)
6343 await asyncio
.wait_for(
6344 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6345 cluster_uuid
=cluster_uuid
,
6346 kdu_instance
=kdu_instance
,
6347 primitive_name
=initial_config_primitive
["name"],
6348 params
=primitive_params_
,
6355 async def _scale_ng_ro(
6356 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6358 nsr_id
= db_nslcmop
["nsInstanceId"]
6359 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6362 # read from db: vnfd's for every vnf
6365 # for each vnf in ns, read vnfd
6366 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6367 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6368 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6369 # if we haven't this vnfd, read it from db
6370 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6372 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6373 db_vnfds
.append(vnfd
)
6374 n2vc_key
= self
.n2vc
.get_public_key()
6375 n2vc_key_list
= [n2vc_key
]
6378 vdu_scaling_info
.get("vdu-create"),
6379 vdu_scaling_info
.get("vdu-delete"),
6382 # db_vnfr has been updated, update db_vnfrs to use it
6383 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6384 await self
._instantiate
_ng
_ro
(
6394 start_deploy
=time(),
6395 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6397 if vdu_scaling_info
.get("vdu-delete"):
6399 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6402 async def extract_prometheus_scrape_jobs(
6406 ee_config_descriptor
,
6411 # look if exist a file called 'prometheus*.j2' and
6412 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6416 for f
in artifact_content
6417 if f
.startswith("prometheus") and f
.endswith(".j2")
6423 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6427 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6428 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6430 vnfr_id
= vnfr_id
.replace("-", "")
6432 "JOB_NAME": vnfr_id
,
6433 "TARGET_IP": target_ip
,
6434 "EXPORTER_POD_IP": host_name
,
6435 "EXPORTER_POD_PORT": host_port
,
6437 job_list
= parse_job(job_data
, variables
)
6438 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6439 for job
in job_list
:
6441 not isinstance(job
.get("job_name"), str)
6442 or vnfr_id
not in job
["job_name"]
6444 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6445 job
["nsr_id"] = nsr_id
6446 job
["vnfr_id"] = vnfr_id
6449 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6451 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6453 :param: vim_account_id: VIM Account ID
6455 :return: (cloud_name, cloud_credential)
6457 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6458 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6460 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6462 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6464 :param: vim_account_id: VIM Account ID
6466 :return: (cloud_name, cloud_credential)
6468 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6469 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")