1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
65 from osm_lcm
.data_utils
.nsd
import (
66 get_ns_configuration_relation_list
,
70 from osm_lcm
.data_utils
.vnfd
import (
76 get_ee_sorted_initial_config_primitive_list
,
77 get_ee_sorted_terminate_config_primitive_list
,
79 get_virtual_link_profiles
,
84 get_number_of_instances
,
86 get_kdu_resource_profile
,
87 find_software_version
,
90 from osm_lcm
.data_utils
.list_utils
import find_in_list
91 from osm_lcm
.data_utils
.vnfr
import (
95 get_volumes_from_instantiation_params
,
97 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
98 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
99 from n2vc
.definitions
import RelationEndpoint
100 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
101 from n2vc
.k8s_juju_conn
import K8sJujuConnector
103 from osm_common
.dbbase
import DbException
104 from osm_common
.fsbase
import FsException
106 from osm_lcm
.data_utils
.database
.database
import Database
107 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
108 from osm_lcm
.data_utils
.wim
import (
110 get_target_wim_attrs
,
111 select_feasible_wim_account
,
114 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
115 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
117 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
118 from osm_lcm
.osm_config
import OsmConfigBuilder
119 from osm_lcm
.prometheus
import parse_job
121 from copy
import copy
, deepcopy
122 from time
import time
123 from uuid
import uuid4
125 from random
import SystemRandom
127 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
130 class NsLcm(LcmBase
):
131 SUBOPERATION_STATUS_NOT_FOUND
= -1
132 SUBOPERATION_STATUS_NEW
= -2
133 SUBOPERATION_STATUS_SKIP
= -3
134 EE_TLS_NAME
= "ee-tls"
135 task_name_deploy_vca
= "Deploying VCA"
136 rel_operation_types
= {
145 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
):
147 Init, Connect to database, filesystem storage, and messaging
148 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
151 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
153 self
.db
= Database().instance
.db
154 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
.timeout
157 self
.ro_config
= config
.RO
158 self
.vca_config
= config
.VCA
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
163 on_update_db
=self
._on
_update
_n
2vc
_db
,
168 self
.conn_helm_ee
= LCMHelmConn(
170 vca_config
=self
.vca_config
,
171 on_update_db
=self
._on
_update
_n
2vc
_db
,
174 self
.k8sclusterhelm3
= K8sHelm3Connector(
175 kubectl_command
=self
.vca_config
.kubectlpath
,
176 helm_command
=self
.vca_config
.helm3path
,
183 self
.k8sclusterjuju
= K8sJujuConnector(
184 kubectl_command
=self
.vca_config
.kubectlpath
,
185 juju_command
=self
.vca_config
.jujupath
,
187 on_update_db
=self
._on
_update
_k
8s
_db
,
192 self
.k8scluster_map
= {
193 "helm-chart-v3": self
.k8sclusterhelm3
,
194 "chart": self
.k8sclusterhelm3
,
195 "juju-bundle": self
.k8sclusterjuju
,
196 "juju": self
.k8sclusterjuju
,
200 "lxc_proxy_charm": self
.n2vc
,
201 "native_charm": self
.n2vc
,
202 "k8s_proxy_charm": self
.n2vc
,
203 "helm": self
.conn_helm_ee
,
204 "helm-v3": self
.conn_helm_ee
,
208 self
.RO
= NgRoClient(**self
.ro_config
.to_dict())
210 self
.op_status_map
= {
211 "instantiation": self
.RO
.status
,
212 "termination": self
.RO
.status
,
213 "migrate": self
.RO
.status
,
214 "healing": self
.RO
.recreate_status
,
215 "verticalscale": self
.RO
.status
,
216 "start_stop_rebuild": self
.RO
.status
,
220 def increment_ip_mac(ip_mac
, vm_index
=1):
221 if not isinstance(ip_mac
, str):
224 # try with ipv4 look for last dot
225 i
= ip_mac
.rfind(".")
228 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
229 # try with ipv6 or mac look for last colon. Operate in hex
230 i
= ip_mac
.rfind(":")
233 # format in hex, len can be 2 for mac or 4 for ipv6
234 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
235 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
241 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
242 # remove last dot from path (if exists)
243 if path
.endswith("."):
246 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
247 # .format(table, filter, path, updated_data))
249 nsr_id
= filter.get("_id")
251 # read ns record from database
252 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
253 current_ns_status
= nsr
.get("nsState")
255 # get vca status for NS
256 status_dict
= await self
.n2vc
.get_status(
257 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
262 db_dict
["vcaStatus"] = status_dict
264 # update configurationStatus for this VCA
266 vca_index
= int(path
[path
.rfind(".") + 1 :])
269 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
271 vca_status
= vca_list
[vca_index
].get("status")
273 configuration_status_list
= nsr
.get("configurationStatus")
274 config_status
= configuration_status_list
[vca_index
].get("status")
276 if config_status
== "BROKEN" and vca_status
!= "failed":
277 db_dict
["configurationStatus"][vca_index
] = "READY"
278 elif config_status
!= "BROKEN" and vca_status
== "failed":
279 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
280 except Exception as e
:
281 # not update configurationStatus
282 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
284 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
285 # if nsState = 'DEGRADED' check if all is OK
287 if current_ns_status
in ("READY", "DEGRADED"):
288 error_description
= ""
290 if status_dict
.get("machines"):
291 for machine_id
in status_dict
.get("machines"):
292 machine
= status_dict
.get("machines").get(machine_id
)
293 # check machine agent-status
294 if machine
.get("agent-status"):
295 s
= machine
.get("agent-status").get("status")
298 error_description
+= (
299 "machine {} agent-status={} ; ".format(
303 # check machine instance status
304 if machine
.get("instance-status"):
305 s
= machine
.get("instance-status").get("status")
308 error_description
+= (
309 "machine {} instance-status={} ; ".format(
314 if status_dict
.get("applications"):
315 for app_id
in status_dict
.get("applications"):
316 app
= status_dict
.get("applications").get(app_id
)
317 # check application status
318 if app
.get("status"):
319 s
= app
.get("status").get("status")
322 error_description
+= (
323 "application {} status={} ; ".format(app_id
, s
)
326 if error_description
:
327 db_dict
["errorDescription"] = error_description
328 if current_ns_status
== "READY" and is_degraded
:
329 db_dict
["nsState"] = "DEGRADED"
330 if current_ns_status
== "DEGRADED" and not is_degraded
:
331 db_dict
["nsState"] = "READY"
334 self
.update_db_2("nsrs", nsr_id
, db_dict
)
336 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
338 except Exception as e
:
339 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
341 async def _on_update_k8s_db(
342 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
345 Updating vca status in NSR record
346 :param cluster_uuid: UUID of a k8s cluster
347 :param kdu_instance: The unique name of the KDU instance
348 :param filter: To get nsr_id
349 :cluster_type: The cluster type (juju, k8s)
353 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
354 # .format(cluster_uuid, kdu_instance, filter))
356 nsr_id
= filter.get("_id")
358 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
359 cluster_uuid
=cluster_uuid
,
360 kdu_instance
=kdu_instance
,
362 complete_status
=True,
368 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
371 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
375 self
.update_db_2("nsrs", nsr_id
, db_dict
)
376 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
378 except Exception as e
:
379 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
382 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
385 undefined
=StrictUndefined
,
386 autoescape
=select_autoescape(default_for_string
=True, default
=True),
388 template
= env
.from_string(cloud_init_text
)
389 return template
.render(additional_params
or {})
390 except UndefinedError
as e
:
392 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
393 "file, must be provided in the instantiation parameters inside the "
394 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
396 except (TemplateError
, TemplateNotFound
) as e
:
398 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
403 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
404 cloud_init_content
= cloud_init_file
= None
406 if vdu
.get("cloud-init-file"):
407 base_folder
= vnfd
["_admin"]["storage"]
408 if base_folder
["pkg-dir"]:
409 cloud_init_file
= "{}/{}/cloud_init/{}".format(
410 base_folder
["folder"],
411 base_folder
["pkg-dir"],
412 vdu
["cloud-init-file"],
415 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
416 base_folder
["folder"],
417 vdu
["cloud-init-file"],
419 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
420 cloud_init_content
= ci_file
.read()
421 elif vdu
.get("cloud-init"):
422 cloud_init_content
= vdu
["cloud-init"]
424 return cloud_init_content
425 except FsException
as e
:
427 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
428 vnfd
["id"], vdu
["id"], cloud_init_file
, e
432 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
434 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
436 additional_params
= vdur
.get("additionalParams")
437 return parse_yaml_strings(additional_params
)
440 def ip_profile_2_RO(ip_profile
):
441 RO_ip_profile
= deepcopy(ip_profile
)
442 if "dns-server" in RO_ip_profile
:
443 if isinstance(RO_ip_profile
["dns-server"], list):
444 RO_ip_profile
["dns-address"] = []
445 for ds
in RO_ip_profile
.pop("dns-server"):
446 RO_ip_profile
["dns-address"].append(ds
["address"])
448 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
449 if RO_ip_profile
.get("ip-version") == "ipv4":
450 RO_ip_profile
["ip-version"] = "IPv4"
451 if RO_ip_profile
.get("ip-version") == "ipv6":
452 RO_ip_profile
["ip-version"] = "IPv6"
453 if "dhcp-params" in RO_ip_profile
:
454 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
457 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
458 db_vdu_push_list
= []
460 db_update
= {"_admin.modified": time()}
462 for vdu_id
, vdu_count
in vdu_create
.items():
466 for vdur
in reversed(db_vnfr
["vdur"])
467 if vdur
["vdu-id-ref"] == vdu_id
472 # Read the template saved in the db:
474 "No vdur in the database. Using the vdur-template to scale"
476 vdur_template
= db_vnfr
.get("vdur-template")
477 if not vdur_template
:
479 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
483 vdur
= vdur_template
[0]
484 # Delete a template from the database after using it
487 {"_id": db_vnfr
["_id"]},
489 pull
={"vdur-template": {"_id": vdur
["_id"]}},
491 for count
in range(vdu_count
):
492 vdur_copy
= deepcopy(vdur
)
493 vdur_copy
["status"] = "BUILD"
494 vdur_copy
["status-detailed"] = None
495 vdur_copy
["ip-address"] = None
496 vdur_copy
["_id"] = str(uuid4())
497 vdur_copy
["count-index"] += count
+ 1
498 vdur_copy
["id"] = "{}-{}".format(
499 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
501 vdur_copy
.pop("vim_info", None)
502 for iface
in vdur_copy
["interfaces"]:
503 if iface
.get("fixed-ip"):
504 iface
["ip-address"] = self
.increment_ip_mac(
505 iface
["ip-address"], count
+ 1
508 iface
.pop("ip-address", None)
509 if iface
.get("fixed-mac"):
510 iface
["mac-address"] = self
.increment_ip_mac(
511 iface
["mac-address"], count
+ 1
514 iface
.pop("mac-address", None)
518 ) # only first vdu can be managment of vnf
519 db_vdu_push_list
.append(vdur_copy
)
520 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
522 if len(db_vnfr
["vdur"]) == 1:
523 # The scale will move to 0 instances
525 "Scaling to 0 !, creating the template with the last vdur"
527 template_vdur
= [db_vnfr
["vdur"][0]]
528 for vdu_id
, vdu_count
in vdu_delete
.items():
530 indexes_to_delete
= [
532 for iv
in enumerate(db_vnfr
["vdur"])
533 if iv
[1]["vdu-id-ref"] == vdu_id
537 "vdur.{}.status".format(i
): "DELETING"
538 for i
in indexes_to_delete
[-vdu_count
:]
542 # it must be deleted one by one because common.db does not allow otherwise
545 for v
in reversed(db_vnfr
["vdur"])
546 if v
["vdu-id-ref"] == vdu_id
548 for vdu
in vdus_to_delete
[:vdu_count
]:
551 {"_id": db_vnfr
["_id"]},
553 pull
={"vdur": {"_id": vdu
["_id"]}},
557 db_push
["vdur"] = db_vdu_push_list
559 db_push
["vdur-template"] = template_vdur
562 db_vnfr
["vdur-template"] = template_vdur
563 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
564 # modify passed dictionary db_vnfr
565 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
566 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
568 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
570 Updates database nsr with the RO info for the created vld
571 :param ns_update_nsr: dictionary to be filled with the updated info
572 :param db_nsr: content of db_nsr. This is also modified
573 :param nsr_desc_RO: nsr descriptor from RO
574 :return: Nothing, LcmException is raised on errors
577 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
578 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
579 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
581 vld
["vim-id"] = net_RO
.get("vim_net_id")
582 vld
["name"] = net_RO
.get("vim_name")
583 vld
["status"] = net_RO
.get("status")
584 vld
["status-detailed"] = net_RO
.get("error_msg")
585 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
589 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
592 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
594 for db_vnfr
in db_vnfrs
.values():
595 vnfr_update
= {"status": "ERROR"}
596 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
597 if "status" not in vdur
:
598 vdur
["status"] = "ERROR"
599 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
601 vdur
["status-detailed"] = str(error_text
)
603 "vdur.{}.status-detailed".format(vdu_index
)
605 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
606 except DbException
as e
:
607 self
.logger
.error("Cannot update vnf. {}".format(e
))
609 def _get_ns_config_info(self
, nsr_id
):
611 Generates a mapping between vnf,vdu elements and the N2VC id
612 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
613 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
614 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
615 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
617 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
618 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
620 ns_config_info
= {"osm-config-mapping": mapping
}
621 for vca
in vca_deployed_list
:
622 if not vca
["member-vnf-index"]:
624 if not vca
["vdu_id"]:
625 mapping
[vca
["member-vnf-index"]] = vca
["application"]
629 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
631 ] = vca
["application"]
632 return ns_config_info
634 async def _instantiate_ng_ro(
650 def get_vim_account(vim_account_id
):
652 if vim_account_id
in db_vims
:
653 return db_vims
[vim_account_id
]
654 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
655 db_vims
[vim_account_id
] = db_vim
658 # modify target_vld info with instantiation parameters
659 def parse_vld_instantiation_params(
660 target_vim
, target_vld
, vld_params
, target_sdn
662 if vld_params
.get("ip-profile"):
663 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
664 vld_params
["ip-profile"]
666 if vld_params
.get("provider-network"):
667 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
670 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
671 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
675 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
676 # if wim_account_id is specified in vld_params, validate if it is feasible.
677 wim_account_id
, db_wim
= select_feasible_wim_account(
678 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
682 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
683 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
684 # update vld_params with correct WIM account Id
685 vld_params
["wimAccountId"] = wim_account_id
687 target_wim
= "wim:{}".format(wim_account_id
)
688 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
689 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
690 if len(sdn_ports
) > 0:
691 target_vld
["vim_info"][target_wim
] = target_wim_attrs
692 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
695 "Target VLD with WIM data: {:s}".format(str(target_vld
))
698 for param
in ("vim-network-name", "vim-network-id"):
699 if vld_params
.get(param
):
700 if isinstance(vld_params
[param
], dict):
701 for vim
, vim_net
in vld_params
[param
].items():
702 other_target_vim
= "vim:" + vim
704 target_vld
["vim_info"],
705 (other_target_vim
, param
.replace("-", "_")),
708 else: # isinstance str
709 target_vld
["vim_info"][target_vim
][
710 param
.replace("-", "_")
711 ] = vld_params
[param
]
712 if vld_params
.get("common_id"):
713 target_vld
["common_id"] = vld_params
.get("common_id")
715 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
716 def update_ns_vld_target(target
, ns_params
):
717 for vnf_params
in ns_params
.get("vnf", ()):
718 if vnf_params
.get("vimAccountId"):
722 for vnfr
in db_vnfrs
.values()
723 if vnf_params
["member-vnf-index"]
724 == vnfr
["member-vnf-index-ref"]
728 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
731 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
732 target_vld
= find_in_list(
733 get_iterable(vdur
, "interfaces"),
734 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
737 vld_params
= find_in_list(
738 get_iterable(ns_params
, "vld"),
739 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
742 if vnf_params
.get("vimAccountId") not in a_vld
.get(
745 target_vim_network_list
= [
746 v
for _
, v
in a_vld
.get("vim_info").items()
748 target_vim_network_name
= next(
750 item
.get("vim_network_name", "")
751 for item
in target_vim_network_list
756 target
["ns"]["vld"][a_index
].get("vim_info").update(
758 "vim:{}".format(vnf_params
["vimAccountId"]): {
759 "vim_network_name": target_vim_network_name
,
765 for param
in ("vim-network-name", "vim-network-id"):
766 if vld_params
.get(param
) and isinstance(
767 vld_params
[param
], dict
769 for vim
, vim_net
in vld_params
[
772 other_target_vim
= "vim:" + vim
774 target
["ns"]["vld"][a_index
].get(
779 param
.replace("-", "_"),
784 nslcmop_id
= db_nslcmop
["_id"]
786 "name": db_nsr
["name"],
789 "image": deepcopy(db_nsr
["image"]),
790 "flavor": deepcopy(db_nsr
["flavor"]),
791 "action_id": nslcmop_id
,
792 "cloud_init_content": {},
794 for image
in target
["image"]:
795 image
["vim_info"] = {}
796 for flavor
in target
["flavor"]:
797 flavor
["vim_info"] = {}
798 if db_nsr
.get("shared-volumes"):
799 target
["shared-volumes"] = deepcopy(db_nsr
["shared-volumes"])
800 for shared_volumes
in target
["shared-volumes"]:
801 shared_volumes
["vim_info"] = {}
802 if db_nsr
.get("affinity-or-anti-affinity-group"):
803 target
["affinity-or-anti-affinity-group"] = deepcopy(
804 db_nsr
["affinity-or-anti-affinity-group"]
806 for affinity_or_anti_affinity_group
in target
[
807 "affinity-or-anti-affinity-group"
809 affinity_or_anti_affinity_group
["vim_info"] = {}
811 if db_nslcmop
.get("lcmOperationType") != "instantiate":
812 # get parameters of instantiation:
813 db_nslcmop_instantiate
= self
.db
.get_list(
816 "nsInstanceId": db_nslcmop
["nsInstanceId"],
817 "lcmOperationType": "instantiate",
820 ns_params
= db_nslcmop_instantiate
.get("operationParams")
822 ns_params
= db_nslcmop
.get("operationParams")
823 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
824 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
827 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
828 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
832 "mgmt-network": vld
.get("mgmt-network", False),
833 "type": vld
.get("type"),
836 "vim_network_name": vld
.get("vim-network-name"),
837 "vim_account_id": ns_params
["vimAccountId"],
841 # check if this network needs SDN assist
842 if vld
.get("pci-interfaces"):
843 db_vim
= get_vim_account(ns_params
["vimAccountId"])
844 if vim_config
:= db_vim
.get("config"):
845 if sdnc_id
:= vim_config
.get("sdn-controller"):
846 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
847 target_sdn
= "sdn:{}".format(sdnc_id
)
848 target_vld
["vim_info"][target_sdn
] = {
850 "target_vim": target_vim
,
852 "type": vld
.get("type"),
855 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
856 for nsd_vnf_profile
in nsd_vnf_profiles
:
857 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
858 if cp
["virtual-link-profile-id"] == vld
["id"]:
860 "member_vnf:{}.{}".format(
861 cp
["constituent-cpd-id"][0][
862 "constituent-base-element-id"
864 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
866 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
868 # check at nsd descriptor, if there is an ip-profile
870 nsd_vlp
= find_in_list(
871 get_virtual_link_profiles(nsd
),
872 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
877 and nsd_vlp
.get("virtual-link-protocol-data")
878 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
880 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
884 # update vld_params with instantiation params
885 vld_instantiation_params
= find_in_list(
886 get_iterable(ns_params
, "vld"),
887 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
889 if vld_instantiation_params
:
890 vld_params
.update(vld_instantiation_params
)
891 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
892 target
["ns"]["vld"].append(target_vld
)
893 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
894 update_ns_vld_target(target
, ns_params
)
896 for vnfr
in db_vnfrs
.values():
898 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
900 vnf_params
= find_in_list(
901 get_iterable(ns_params
, "vnf"),
902 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
904 target_vnf
= deepcopy(vnfr
)
905 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
906 for vld
in target_vnf
.get("vld", ()):
907 # check if connected to a ns.vld, to fill target'
908 vnf_cp
= find_in_list(
909 vnfd
.get("int-virtual-link-desc", ()),
910 lambda cpd
: cpd
.get("id") == vld
["id"],
913 ns_cp
= "member_vnf:{}.{}".format(
914 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
916 if cp2target
.get(ns_cp
):
917 vld
["target"] = cp2target
[ns_cp
]
920 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
922 # check if this network needs SDN assist
924 if vld
.get("pci-interfaces"):
925 db_vim
= get_vim_account(vnfr
["vim-account-id"])
926 sdnc_id
= db_vim
["config"].get("sdn-controller")
928 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
929 target_sdn
= "sdn:{}".format(sdnc_id
)
930 vld
["vim_info"][target_sdn
] = {
932 "target_vim": target_vim
,
934 "type": vld
.get("type"),
937 # check at vnfd descriptor, if there is an ip-profile
939 vnfd_vlp
= find_in_list(
940 get_virtual_link_profiles(vnfd
),
941 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
945 and vnfd_vlp
.get("virtual-link-protocol-data")
946 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
948 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
951 # update vld_params with instantiation params
953 vld_instantiation_params
= find_in_list(
954 get_iterable(vnf_params
, "internal-vld"),
955 lambda i_vld
: i_vld
["name"] == vld
["id"],
957 if vld_instantiation_params
:
958 vld_params
.update(vld_instantiation_params
)
959 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
962 for vdur
in target_vnf
.get("vdur", ()):
963 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
964 continue # This vdu must not be created
965 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
967 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
970 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
971 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
974 and vdu_configuration
.get("config-access")
975 and vdu_configuration
.get("config-access").get("ssh-access")
977 vdur
["ssh-keys"] = ssh_keys_all
978 vdur
["ssh-access-required"] = vdu_configuration
[
980 ]["ssh-access"]["required"]
983 and vnf_configuration
.get("config-access")
984 and vnf_configuration
.get("config-access").get("ssh-access")
985 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
987 vdur
["ssh-keys"] = ssh_keys_all
988 vdur
["ssh-access-required"] = vnf_configuration
[
990 ]["ssh-access"]["required"]
991 elif ssh_keys_instantiation
and find_in_list(
992 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
994 vdur
["ssh-keys"] = ssh_keys_instantiation
996 self
.logger
.debug("NS > vdur > {}".format(vdur
))
998 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1000 if vdud
.get("cloud-init-file"):
1001 vdur
["cloud-init"] = "{}:file:{}".format(
1002 vnfd
["_id"], vdud
.get("cloud-init-file")
1004 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1005 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1006 base_folder
= vnfd
["_admin"]["storage"]
1007 if base_folder
["pkg-dir"]:
1008 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1009 base_folder
["folder"],
1010 base_folder
["pkg-dir"],
1011 vdud
.get("cloud-init-file"),
1014 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1015 base_folder
["folder"],
1016 vdud
.get("cloud-init-file"),
1018 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1019 target
["cloud_init_content"][
1022 elif vdud
.get("cloud-init"):
1023 vdur
["cloud-init"] = "{}:vdu:{}".format(
1024 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1026 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1027 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1030 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1031 deploy_params_vdu
= self
._format
_additional
_params
(
1032 vdur
.get("additionalParams") or {}
1034 deploy_params_vdu
["OSM"] = get_osm_params(
1035 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1037 vdur
["additionalParams"] = deploy_params_vdu
1040 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1041 if target_vim
not in ns_flavor
["vim_info"]:
1042 ns_flavor
["vim_info"][target_vim
] = {}
1045 # in case alternative images are provided we must check if they should be applied
1046 # for the vim_type, modify the vim_type taking into account
1047 ns_image_id
= int(vdur
["ns-image-id"])
1048 if vdur
.get("alt-image-ids"):
1049 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1050 vim_type
= db_vim
["vim_type"]
1051 for alt_image_id
in vdur
.get("alt-image-ids"):
1052 ns_alt_image
= target
["image"][int(alt_image_id
)]
1053 if vim_type
== ns_alt_image
.get("vim-type"):
1054 # must use alternative image
1056 "use alternative image id: {}".format(alt_image_id
)
1058 ns_image_id
= alt_image_id
1059 vdur
["ns-image-id"] = ns_image_id
1061 ns_image
= target
["image"][int(ns_image_id
)]
1062 if target_vim
not in ns_image
["vim_info"]:
1063 ns_image
["vim_info"][target_vim
] = {}
1066 if vdur
.get("affinity-or-anti-affinity-group-id"):
1067 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1068 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1069 if target_vim
not in ns_ags
["vim_info"]:
1070 ns_ags
["vim_info"][target_vim
] = {}
1073 if vdur
.get("shared-volumes-id"):
1074 for sv_id
in vdur
["shared-volumes-id"]:
1075 ns_sv
= find_in_list(
1076 target
["shared-volumes"], lambda sv
: sv_id
in sv
["id"]
1079 ns_sv
["vim_info"][target_vim
] = {}
1081 vdur
["vim_info"] = {target_vim
: {}}
1082 # instantiation parameters
1084 vdu_instantiation_params
= find_in_list(
1085 get_iterable(vnf_params
, "vdu"),
1086 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1088 if vdu_instantiation_params
:
1089 # Parse the vdu_volumes from the instantiation params
1090 vdu_volumes
= get_volumes_from_instantiation_params(
1091 vdu_instantiation_params
, vdud
1093 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1094 vdur
["additionalParams"]["OSM"][
1096 ] = vdu_instantiation_params
.get("vim-flavor-id")
1097 vdur_list
.append(vdur
)
1098 target_vnf
["vdur"] = vdur_list
1099 target
["vnf"].append(target_vnf
)
1101 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1102 desc
= await self
.RO
.deploy(nsr_id
, target
)
1103 self
.logger
.debug("RO return > {}".format(desc
))
1104 action_id
= desc
["action_id"]
1105 await self
._wait
_ng
_ro
(
1112 operation
="instantiation",
1117 "_admin.deployed.RO.operational-status": "running",
1118 "detailed-status": " ".join(stage
),
1120 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1122 self
._write
_op
_status
(nslcmop_id
, stage
)
1124 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1128 async def _wait_ng_ro(
1138 detailed_status_old
= None
1140 start_time
= start_time
or time()
1141 while time() <= start_time
+ timeout
:
1142 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1143 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1144 if desc_status
["status"] == "FAILED":
1145 raise NgRoException(desc_status
["details"])
1146 elif desc_status
["status"] == "BUILD":
1148 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1149 elif desc_status
["status"] == "DONE":
1151 stage
[2] = "Deployed at VIM"
1154 assert False, "ROclient.check_ns_status returns unknown {}".format(
1155 desc_status
["status"]
1157 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1158 detailed_status_old
= stage
[2]
1159 db_nsr_update
["detailed-status"] = " ".join(stage
)
1160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1161 self
._write
_op
_status
(nslcmop_id
, stage
)
1162 await asyncio
.sleep(15)
1163 else: # timeout_ns_deploy
1164 raise NgRoException("Timeout waiting ns to deploy")
1166 async def _terminate_ng_ro(
1167 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1172 start_deploy
= time()
1179 "action_id": nslcmop_id
,
1181 desc
= await self
.RO
.deploy(nsr_id
, target
)
1182 action_id
= desc
["action_id"]
1183 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1186 + "ns terminate action at RO. action_id={}".format(action_id
)
1190 delete_timeout
= 20 * 60 # 20 minutes
1191 await self
._wait
_ng
_ro
(
1198 operation
="termination",
1200 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1202 await self
.RO
.delete(nsr_id
)
1203 except NgRoException
as e
:
1204 if e
.http_code
== 404: # not found
1205 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1206 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1208 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1210 elif e
.http_code
== 409: # conflict
1211 failed_detail
.append("delete conflict: {}".format(e
))
1214 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1217 failed_detail
.append("delete error: {}".format(e
))
1220 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1222 except Exception as e
:
1223 failed_detail
.append("delete error: {}".format(e
))
1225 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1229 stage
[2] = "Error deleting from VIM"
1231 stage
[2] = "Deleted from VIM"
1232 db_nsr_update
["detailed-status"] = " ".join(stage
)
1233 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1234 self
._write
_op
_status
(nslcmop_id
, stage
)
1237 raise LcmException("; ".join(failed_detail
))
1240 async def instantiate_RO(
1254 :param logging_text: preffix text to use at logging
1255 :param nsr_id: nsr identity
1256 :param nsd: database content of ns descriptor
1257 :param db_nsr: database content of ns record
1258 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1260 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1261 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1262 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1263 :return: None or exception
1266 start_deploy
= time()
1267 ns_params
= db_nslcmop
.get("operationParams")
1268 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1269 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1271 timeout_ns_deploy
= self
.timeout
.ns_deploy
1273 # Check for and optionally request placement optimization. Database will be updated if placement activated
1274 stage
[2] = "Waiting for Placement."
1275 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1276 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1277 for vnfr
in db_vnfrs
.values():
1278 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1281 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1283 return await self
._instantiate
_ng
_ro
(
1296 except Exception as e
:
1297 stage
[2] = "ERROR deploying at VIM"
1298 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1300 "Error deploying at VIM {}".format(e
),
1301 exc_info
=not isinstance(
1304 ROclient
.ROClientException
,
1313 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1315 Wait for kdu to be up, get ip address
1316 :param logging_text: prefix use for logging
1320 :return: IP address, K8s services
1323 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1326 while nb_tries
< 360:
1327 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1331 for x
in get_iterable(db_vnfr
, "kdur")
1332 if x
.get("kdu-name") == kdu_name
1338 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1340 if kdur
.get("status"):
1341 if kdur
["status"] in ("READY", "ENABLED"):
1342 return kdur
.get("ip-address"), kdur
.get("services")
1345 "target KDU={} is in error state".format(kdu_name
)
1348 await asyncio
.sleep(10)
1350 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1352 async def wait_vm_up_insert_key_ro(
1353 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1356 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1357 :param logging_text: prefix use for logging
1362 :param pub_key: public ssh key to inject, None to skip
1363 :param user: user to apply the public ssh key
1367 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1369 target_vdu_id
= None
1374 if ro_retries
>= 360: # 1 hour
1376 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1379 await asyncio
.sleep(10)
1382 if not target_vdu_id
:
1383 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1385 if not vdu_id
: # for the VNF case
1386 if db_vnfr
.get("status") == "ERROR":
1388 "Cannot inject ssh-key because target VNF is in error state"
1390 ip_address
= db_vnfr
.get("ip-address")
1396 for x
in get_iterable(db_vnfr
, "vdur")
1397 if x
.get("ip-address") == ip_address
1405 for x
in get_iterable(db_vnfr
, "vdur")
1406 if x
.get("vdu-id-ref") == vdu_id
1407 and x
.get("count-index") == vdu_index
1413 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1414 ): # If only one, this should be the target vdu
1415 vdur
= db_vnfr
["vdur"][0]
1418 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1419 vnfr_id
, vdu_id
, vdu_index
1422 # New generation RO stores information at "vim_info"
1425 if vdur
.get("vim_info"):
1427 t
for t
in vdur
["vim_info"]
1428 ) # there should be only one key
1429 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1431 vdur
.get("pdu-type")
1432 or vdur
.get("status") == "ACTIVE"
1433 or ng_ro_status
== "ACTIVE"
1435 ip_address
= vdur
.get("ip-address")
1438 target_vdu_id
= vdur
["vdu-id-ref"]
1439 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1441 "Cannot inject ssh-key because target VM is in error state"
1444 if not target_vdu_id
:
1447 # inject public key into machine
1448 if pub_key
and user
:
1449 self
.logger
.debug(logging_text
+ "Inserting RO key")
1450 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1451 if vdur
.get("pdu-type"):
1452 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1457 "action": "inject_ssh_key",
1461 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1463 desc
= await self
.RO
.deploy(nsr_id
, target
)
1464 action_id
= desc
["action_id"]
1465 await self
._wait
_ng
_ro
(
1466 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1469 except NgRoException
as e
:
1471 "Reaching max tries injecting key. Error: {}".format(e
)
1478 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1480 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1482 my_vca
= vca_deployed_list
[vca_index
]
1483 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1484 # vdu or kdu: no dependencies
1488 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1489 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1490 configuration_status_list
= db_nsr
["configurationStatus"]
1491 for index
, vca_deployed
in enumerate(configuration_status_list
):
1492 if index
== vca_index
:
1495 if not my_vca
.get("member-vnf-index") or (
1496 vca_deployed
.get("member-vnf-index")
1497 == my_vca
.get("member-vnf-index")
1499 internal_status
= configuration_status_list
[index
].get("status")
1500 if internal_status
== "READY":
1502 elif internal_status
== "BROKEN":
1504 "Configuration aborted because dependent charm/s has failed"
1509 # no dependencies, return
1511 await asyncio
.sleep(10)
1514 raise LcmException("Configuration aborted because dependent charm/s timeout")
1516 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1519 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1521 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1522 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1525 async def instantiate_N2VC(
1543 ee_config_descriptor
,
1545 nsr_id
= db_nsr
["_id"]
1546 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1547 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1548 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1549 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1551 "collection": "nsrs",
1552 "filter": {"_id": nsr_id
},
1553 "path": db_update_entry
,
1558 element_under_configuration
= nsr_id
1562 vnfr_id
= db_vnfr
["_id"]
1563 osm_config
["osm"]["vnf_id"] = vnfr_id
1565 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1567 if vca_type
== "native_charm":
1570 index_number
= vdu_index
or 0
1573 element_type
= "VNF"
1574 element_under_configuration
= vnfr_id
1575 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1577 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1578 element_type
= "VDU"
1579 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1580 osm_config
["osm"]["vdu_id"] = vdu_id
1582 namespace
+= ".{}".format(kdu_name
)
1583 element_type
= "KDU"
1584 element_under_configuration
= kdu_name
1585 osm_config
["osm"]["kdu_name"] = kdu_name
1588 if base_folder
["pkg-dir"]:
1589 artifact_path
= "{}/{}/{}/{}".format(
1590 base_folder
["folder"],
1591 base_folder
["pkg-dir"],
1594 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1599 artifact_path
= "{}/Scripts/{}/{}/".format(
1600 base_folder
["folder"],
1603 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1608 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1610 # get initial_config_primitive_list that applies to this element
1611 initial_config_primitive_list
= config_descriptor
.get(
1612 "initial-config-primitive"
1616 "Initial config primitive list > {}".format(
1617 initial_config_primitive_list
1621 # add config if not present for NS charm
1622 ee_descriptor_id
= ee_config_descriptor
.get("id")
1623 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1624 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1625 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1629 "Initial config primitive list #2 > {}".format(
1630 initial_config_primitive_list
1633 # n2vc_redesign STEP 3.1
1634 # find old ee_id if exists
1635 ee_id
= vca_deployed
.get("ee_id")
1637 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1638 # create or register execution environment in VCA
1639 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm-v3"):
1640 self
._write
_configuration
_status
(
1642 vca_index
=vca_index
,
1644 element_under_configuration
=element_under_configuration
,
1645 element_type
=element_type
,
1648 step
= "create execution environment"
1649 self
.logger
.debug(logging_text
+ step
)
1653 if vca_type
== "k8s_proxy_charm":
1654 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1655 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1656 namespace
=namespace
,
1657 artifact_path
=artifact_path
,
1661 elif vca_type
== "helm-v3":
1662 ee_id
, credentials
= await self
.vca_map
[
1664 ].create_execution_environment(
1669 artifact_path
=artifact_path
,
1670 chart_model
=vca_name
,
1674 ee_id
, credentials
= await self
.vca_map
[
1676 ].create_execution_environment(
1677 namespace
=namespace
,
1683 elif vca_type
== "native_charm":
1684 step
= "Waiting to VM being up and getting IP address"
1685 self
.logger
.debug(logging_text
+ step
)
1686 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1695 credentials
= {"hostname": rw_mgmt_ip
}
1697 username
= deep_get(
1698 config_descriptor
, ("config-access", "ssh-access", "default-user")
1700 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1701 # merged. Meanwhile let's get username from initial-config-primitive
1702 if not username
and initial_config_primitive_list
:
1703 for config_primitive
in initial_config_primitive_list
:
1704 for param
in config_primitive
.get("parameter", ()):
1705 if param
["name"] == "ssh-username":
1706 username
= param
["value"]
1710 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1711 "'config-access.ssh-access.default-user'"
1713 credentials
["username"] = username
1714 # n2vc_redesign STEP 3.2
1716 self
._write
_configuration
_status
(
1718 vca_index
=vca_index
,
1719 status
="REGISTERING",
1720 element_under_configuration
=element_under_configuration
,
1721 element_type
=element_type
,
1724 step
= "register execution environment {}".format(credentials
)
1725 self
.logger
.debug(logging_text
+ step
)
1726 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1727 credentials
=credentials
,
1728 namespace
=namespace
,
1733 # for compatibility with MON/POL modules, the need model and application name at database
1734 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1735 ee_id_parts
= ee_id
.split(".")
1736 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1737 if len(ee_id_parts
) >= 2:
1738 model_name
= ee_id_parts
[0]
1739 application_name
= ee_id_parts
[1]
1740 db_nsr_update
[db_update_entry
+ "model"] = model_name
1741 db_nsr_update
[db_update_entry
+ "application"] = application_name
1743 # n2vc_redesign STEP 3.3
1744 step
= "Install configuration Software"
1746 self
._write
_configuration
_status
(
1748 vca_index
=vca_index
,
1749 status
="INSTALLING SW",
1750 element_under_configuration
=element_under_configuration
,
1751 element_type
=element_type
,
1752 other_update
=db_nsr_update
,
1755 # TODO check if already done
1756 self
.logger
.debug(logging_text
+ step
)
1758 if vca_type
== "native_charm":
1759 config_primitive
= next(
1760 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1763 if config_primitive
:
1764 config
= self
._map
_primitive
_params
(
1765 config_primitive
, {}, deploy_params
1768 if vca_type
== "lxc_proxy_charm":
1769 if element_type
== "NS":
1770 num_units
= db_nsr
.get("config-units") or 1
1771 elif element_type
== "VNF":
1772 num_units
= db_vnfr
.get("config-units") or 1
1773 elif element_type
== "VDU":
1774 for v
in db_vnfr
["vdur"]:
1775 if vdu_id
== v
["vdu-id-ref"]:
1776 num_units
= v
.get("config-units") or 1
1778 if vca_type
!= "k8s_proxy_charm":
1779 await self
.vca_map
[vca_type
].install_configuration_sw(
1781 artifact_path
=artifact_path
,
1784 num_units
=num_units
,
1789 # write in db flag of configuration_sw already installed
1791 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1794 # add relations for this VCA (wait for other peers related with this VCA)
1795 is_relation_added
= await self
._add
_vca
_relations
(
1796 logging_text
=logging_text
,
1799 vca_index
=vca_index
,
1802 if not is_relation_added
:
1803 raise LcmException("Relations could not be added to VCA.")
1805 # if SSH access is required, then get execution environment SSH public
1806 # if native charm we have waited already to VM be UP
1807 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm-v3"):
1810 # self.logger.debug("get ssh key block")
1812 config_descriptor
, ("config-access", "ssh-access", "required")
1814 # self.logger.debug("ssh key needed")
1815 # Needed to inject a ssh key
1818 ("config-access", "ssh-access", "default-user"),
1820 step
= "Install configuration Software, getting public ssh key"
1821 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1822 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1825 step
= "Insert public key into VM user={} ssh_key={}".format(
1829 # self.logger.debug("no need to get ssh key")
1830 step
= "Waiting to VM being up and getting IP address"
1831 self
.logger
.debug(logging_text
+ step
)
1833 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1836 # n2vc_redesign STEP 5.1
1837 # wait for RO (ip-address) Insert pub_key into VM
1840 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1841 logging_text
, nsr_id
, vnfr_id
, kdu_name
1843 vnfd
= self
.db
.get_one(
1845 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
1847 kdu
= get_kdu(vnfd
, kdu_name
)
1849 service
["name"] for service
in get_kdu_services(kdu
)
1851 exposed_services
= []
1852 for service
in services
:
1853 if any(s
in service
["name"] for s
in kdu_services
):
1854 exposed_services
.append(service
)
1855 await self
.vca_map
[vca_type
].exec_primitive(
1857 primitive_name
="config",
1859 "osm-config": json
.dumps(
1861 k8s
={"services": exposed_services
}
1868 # This verification is needed in order to avoid trying to add a public key
1869 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1870 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1871 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1873 elif db_vnfr
.get("vdur"):
1874 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1884 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1886 # store rw_mgmt_ip in deploy params for later replacement
1887 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1889 # n2vc_redesign STEP 6 Execute initial config primitive
1890 step
= "execute initial config primitive"
1892 # wait for dependent primitives execution (NS -> VNF -> VDU)
1893 if initial_config_primitive_list
:
1894 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1896 # stage, in function of element type: vdu, kdu, vnf or ns
1897 my_vca
= vca_deployed_list
[vca_index
]
1898 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1900 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1901 elif my_vca
.get("member-vnf-index"):
1903 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1906 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1908 self
._write
_configuration
_status
(
1909 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1912 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1914 check_if_terminated_needed
= True
1915 for initial_config_primitive
in initial_config_primitive_list
:
1916 # adding information on the vca_deployed if it is a NS execution environment
1917 if not vca_deployed
["member-vnf-index"]:
1918 deploy_params
["ns_config_info"] = json
.dumps(
1919 self
._get
_ns
_config
_info
(nsr_id
)
1921 # TODO check if already done
1922 primitive_params_
= self
._map
_primitive
_params
(
1923 initial_config_primitive
, {}, deploy_params
1926 step
= "execute primitive '{}' params '{}'".format(
1927 initial_config_primitive
["name"], primitive_params_
1929 self
.logger
.debug(logging_text
+ step
)
1930 await self
.vca_map
[vca_type
].exec_primitive(
1932 primitive_name
=initial_config_primitive
["name"],
1933 params_dict
=primitive_params_
,
1938 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1939 if check_if_terminated_needed
:
1940 if config_descriptor
.get("terminate-config-primitive"):
1942 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
1944 check_if_terminated_needed
= False
1946 # TODO register in database that primitive is done
1948 # STEP 7 Configure metrics
1949 if vca_type
== "helm-v3":
1950 # TODO: review for those cases where the helm chart is a reference and
1951 # is not part of the NF package
1952 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
1954 artifact_path
=artifact_path
,
1955 ee_config_descriptor
=ee_config_descriptor
,
1958 target_ip
=rw_mgmt_ip
,
1959 element_type
=element_type
,
1960 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
1962 vdu_index
=vdu_index
,
1964 kdu_index
=kdu_index
,
1970 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
1973 for job
in prometheus_jobs
:
1976 {"job_name": job
["job_name"]},
1979 fail_on_empty
=False,
1982 step
= "instantiated at VCA"
1983 self
.logger
.debug(logging_text
+ step
)
1985 self
._write
_configuration
_status
(
1986 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
1989 except Exception as e
: # TODO not use Exception but N2VC exception
1990 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1992 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
1995 "Exception while {} : {}".format(step
, e
), exc_info
=True
1997 self
._write
_configuration
_status
(
1998 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2000 raise LcmException("{}. {}".format(step
, e
)) from e
2002 def _write_ns_status(
2006 current_operation
: str,
2007 current_operation_id
: str,
2008 error_description
: str = None,
2009 error_detail
: str = None,
2010 other_update
: dict = None,
2013 Update db_nsr fields.
2016 :param current_operation:
2017 :param current_operation_id:
2018 :param error_description:
2019 :param error_detail:
2020 :param other_update: Other required changes at database if provided, will be cleared
2024 db_dict
= other_update
or {}
2027 ] = current_operation_id
# for backward compatibility
2028 db_dict
["_admin.current-operation"] = current_operation_id
2029 db_dict
["_admin.operation-type"] = (
2030 current_operation
if current_operation
!= "IDLE" else None
2032 db_dict
["currentOperation"] = current_operation
2033 db_dict
["currentOperationID"] = current_operation_id
2034 db_dict
["errorDescription"] = error_description
2035 db_dict
["errorDetail"] = error_detail
2038 db_dict
["nsState"] = ns_state
2039 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2040 except DbException
as e
:
2041 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2043 def _write_op_status(
2047 error_message
: str = None,
2048 queuePosition
: int = 0,
2049 operation_state
: str = None,
2050 other_update
: dict = None,
2053 db_dict
= other_update
or {}
2054 db_dict
["queuePosition"] = queuePosition
2055 if isinstance(stage
, list):
2056 db_dict
["stage"] = stage
[0]
2057 db_dict
["detailed-status"] = " ".join(stage
)
2058 elif stage
is not None:
2059 db_dict
["stage"] = str(stage
)
2061 if error_message
is not None:
2062 db_dict
["errorMessage"] = error_message
2063 if operation_state
is not None:
2064 db_dict
["operationState"] = operation_state
2065 db_dict
["statusEnteredTime"] = time()
2066 self
.update_db_2("nslcmops", op_id
, db_dict
)
2067 except DbException
as e
:
2069 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2072 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2074 nsr_id
= db_nsr
["_id"]
2075 # configurationStatus
2076 config_status
= db_nsr
.get("configurationStatus")
2079 "configurationStatus.{}.status".format(index
): status
2080 for index
, v
in enumerate(config_status
)
2084 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2086 except DbException
as e
:
2088 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2091 def _write_configuration_status(
2096 element_under_configuration
: str = None,
2097 element_type
: str = None,
2098 other_update
: dict = None,
2100 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2101 # .format(vca_index, status))
2104 db_path
= "configurationStatus.{}.".format(vca_index
)
2105 db_dict
= other_update
or {}
2107 db_dict
[db_path
+ "status"] = status
2108 if element_under_configuration
:
2110 db_path
+ "elementUnderConfiguration"
2111 ] = element_under_configuration
2113 db_dict
[db_path
+ "elementType"] = element_type
2114 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2115 except DbException
as e
:
2117 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2118 status
, nsr_id
, vca_index
, e
2122 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2124 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2125 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2126 Database is used because the result can be obtained from a different LCM worker in case of HA.
2127 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2128 :param db_nslcmop: database content of nslcmop
2129 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2130 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2131 computed 'vim-account-id'
2134 nslcmop_id
= db_nslcmop
["_id"]
2135 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2136 if placement_engine
== "PLA":
2138 logging_text
+ "Invoke and wait for placement optimization"
2140 await self
.msg
.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id
})
2141 db_poll_interval
= 5
2142 wait
= db_poll_interval
* 10
2144 while not pla_result
and wait
>= 0:
2145 await asyncio
.sleep(db_poll_interval
)
2146 wait
-= db_poll_interval
2147 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2148 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2152 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2155 for pla_vnf
in pla_result
["vnf"]:
2156 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2157 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2162 {"_id": vnfr
["_id"]},
2163 {"vim-account-id": pla_vnf
["vimAccountId"]},
2166 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2169 def _gather_vnfr_healing_alerts(self
, vnfr
, vnfd
):
2171 nsr_id
= vnfr
["nsr-id-ref"]
2172 df
= vnfd
.get("df", [{}])[0]
2173 # Checking for auto-healing configuration
2174 if "healing-aspect" in df
:
2175 healing_aspects
= df
["healing-aspect"]
2176 for healing
in healing_aspects
:
2177 for healing_policy
in healing
.get("healing-policy", ()):
2178 vdu_id
= healing_policy
["vdu-id"]
2180 (vdur
for vdur
in vnfr
["vdur"] if vdu_id
== vdur
["vdu-id-ref"]),
2185 metric_name
= "vm_status"
2186 vdu_name
= vdur
.get("name")
2187 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2189 name
= f
"healing_{uuid}"
2190 action
= healing_policy
2191 # action_on_recovery = healing.get("action-on-recovery")
2192 # cooldown_time = healing.get("cooldown-time")
2193 # day1 = healing.get("day1")
2197 "metric": metric_name
,
2200 "vnf_member_index": vnf_member_index
,
2201 "vdu_name": vdu_name
,
2203 "alarm_status": "ok",
2204 "action_type": "healing",
2207 alerts
.append(alert
)
2210 def _gather_vnfr_scaling_alerts(self
, vnfr
, vnfd
):
2212 nsr_id
= vnfr
["nsr-id-ref"]
2213 df
= vnfd
.get("df", [{}])[0]
2214 # Checking for auto-scaling configuration
2215 if "scaling-aspect" in df
:
2216 scaling_aspects
= df
["scaling-aspect"]
2217 all_vnfd_monitoring_params
= {}
2218 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
2219 for mp
in ivld
.get("monitoring-parameters", ()):
2220 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2221 for vdu
in vnfd
.get("vdu", ()):
2222 for mp
in vdu
.get("monitoring-parameter", ()):
2223 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2224 for df
in vnfd
.get("df", ()):
2225 for mp
in df
.get("monitoring-parameter", ()):
2226 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2227 for scaling_aspect
in scaling_aspects
:
2228 scaling_group_name
= scaling_aspect
.get("name", "")
2229 # Get monitored VDUs
2230 all_monitored_vdus
= set()
2231 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get(
2234 for vdu_delta
in delta
.get("vdu-delta", ()):
2235 all_monitored_vdus
.add(vdu_delta
.get("id"))
2236 monitored_vdurs
= list(
2238 lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
,
2242 if not monitored_vdurs
:
2244 "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
2247 for scaling_policy
in scaling_aspect
.get("scaling-policy", ()):
2248 if scaling_policy
["scaling-type"] != "automatic":
2250 threshold_time
= scaling_policy
.get("threshold-time", "1")
2251 cooldown_time
= scaling_policy
.get("cooldown-time", "0")
2252 for scaling_criteria
in scaling_policy
["scaling-criteria"]:
2253 monitoring_param_ref
= scaling_criteria
.get(
2254 "vnf-monitoring-param-ref"
2256 vnf_monitoring_param
= all_vnfd_monitoring_params
[
2257 monitoring_param_ref
2259 for vdur
in monitored_vdurs
:
2260 vdu_id
= vdur
["vdu-id-ref"]
2261 metric_name
= vnf_monitoring_param
.get("performance-metric")
2262 metric_name
= f
"osm_{metric_name}"
2263 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2264 scalein_threshold
= scaling_criteria
.get(
2265 "scale-in-threshold"
2267 scaleout_threshold
= scaling_criteria
.get(
2268 "scale-out-threshold"
2270 # Looking for min/max-number-of-instances
2271 instances_min_number
= 1
2272 instances_max_number
= 1
2273 vdu_profile
= df
["vdu-profile"]
2276 item
for item
in vdu_profile
if item
["id"] == vdu_id
2278 instances_min_number
= profile
.get(
2279 "min-number-of-instances", 1
2281 instances_max_number
= profile
.get(
2282 "max-number-of-instances", 1
2285 if scalein_threshold
:
2287 name
= f
"scalein_{uuid}"
2288 operation
= scaling_criteria
[
2289 "scale-in-relational-operation"
2291 rel_operator
= self
.rel_operation_types
.get(
2294 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2295 expression
= f
"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
2298 "vnf_member_index": vnf_member_index
,
2304 "for": str(threshold_time
) + "m",
2307 action
= scaling_policy
2309 "scaling-group": scaling_group_name
,
2310 "cooldown-time": cooldown_time
,
2315 "metric": metric_name
,
2318 "vnf_member_index": vnf_member_index
,
2321 "alarm_status": "ok",
2322 "action_type": "scale_in",
2324 "prometheus_config": prom_cfg
,
2326 alerts
.append(alert
)
2328 if scaleout_threshold
:
2330 name
= f
"scaleout_{uuid}"
2331 operation
= scaling_criteria
[
2332 "scale-out-relational-operation"
2334 rel_operator
= self
.rel_operation_types
.get(
2337 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2338 expression
= f
"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
2341 "vnf_member_index": vnf_member_index
,
2347 "for": str(threshold_time
) + "m",
2350 action
= scaling_policy
2352 "scaling-group": scaling_group_name
,
2353 "cooldown-time": cooldown_time
,
2358 "metric": metric_name
,
2361 "vnf_member_index": vnf_member_index
,
2364 "alarm_status": "ok",
2365 "action_type": "scale_out",
2367 "prometheus_config": prom_cfg
,
2369 alerts
.append(alert
)
2372 def _gather_vnfr_alarm_alerts(self
, vnfr
, vnfd
):
2374 nsr_id
= vnfr
["nsr-id-ref"]
2375 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2377 # Checking for VNF alarm configuration
2378 for vdur
in vnfr
["vdur"]:
2379 vdu_id
= vdur
["vdu-id-ref"]
2380 vdu
= next(filter(lambda vdu
: vdu
["id"] == vdu_id
, vnfd
["vdu"]))
2382 # Get VDU monitoring params, since alerts are based on them
2383 vdu_monitoring_params
= {}
2384 for mp
in vdu
.get("monitoring-parameter", []):
2385 vdu_monitoring_params
[mp
.get("id")] = mp
2386 if not vdu_monitoring_params
:
2388 "VDU alarm refers to a VDU monitoring param, but there are no VDU monitoring params in the VDU"
2391 # Get alarms in the VDU
2392 alarm_descriptors
= vdu
["alarm"]
2393 # Create VDU alarms for each alarm in the VDU
2394 for alarm_descriptor
in alarm_descriptors
:
2395 # Check that the VDU alarm refers to a proper monitoring param
2396 alarm_monitoring_param
= alarm_descriptor
.get(
2397 "vnf-monitoring-param-ref", ""
2399 vdu_specific_monitoring_param
= vdu_monitoring_params
.get(
2400 alarm_monitoring_param
, {}
2402 if not vdu_specific_monitoring_param
:
2404 "VDU alarm refers to a VDU monitoring param not present in the VDU"
2407 metric_name
= vdu_specific_monitoring_param
.get(
2408 "performance-metric"
2412 "VDU alarm refers to a VDU monitoring param that has no associated performance-metric"
2415 # Set params of the alarm to be created in Prometheus
2416 metric_name
= f
"osm_{metric_name}"
2417 metric_threshold
= alarm_descriptor
.get("value")
2419 alert_name
= f
"vdu_alarm_{uuid}"
2420 operation
= alarm_descriptor
["operation"]
2421 rel_operator
= self
.rel_operation_types
.get(operation
, "<=")
2422 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2423 expression
= f
"{metric_selector} {rel_operator} {metric_threshold}"
2426 "vnf_member_index": vnf_member_index
,
2428 "vdu_name": "{{ $labels.vdu_name }}",
2431 "alert": alert_name
,
2433 "for": "1m", # default value. Ideally, this should be related to an IM param, but there is not such param
2436 alarm_action
= dict()
2437 for action_type
in ["ok", "insufficient-data", "alarm"]:
2439 "actions" in alarm_descriptor
2440 and action_type
in alarm_descriptor
["actions"]
2442 alarm_action
[action_type
] = alarm_descriptor
["actions"][
2448 "metric": metric_name
,
2451 "vnf_member_index": vnf_member_index
,
2454 "alarm_status": "ok",
2455 "action_type": "vdu_alarm",
2456 "action": alarm_action
,
2457 "prometheus_config": prom_cfg
,
2459 alerts
.append(alert
)
2462 def update_nsrs_with_pla_result(self
, params
):
2464 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2466 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2468 except Exception as e
:
2469 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2471 async def instantiate(self
, nsr_id
, nslcmop_id
):
2474 :param nsr_id: ns instance to deploy
2475 :param nslcmop_id: operation to run
2479 # Try to lock HA task here
2480 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2481 if not task_is_locked_by_me
:
2483 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2487 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2488 self
.logger
.debug(logging_text
+ "Enter")
2490 # get all needed from database
2492 # database nsrs record
2495 # database nslcmops record
2498 # update operation on nsrs
2500 # update operation on nslcmops
2501 db_nslcmop_update
= {}
2503 timeout_ns_deploy
= self
.timeout
.ns_deploy
2505 nslcmop_operation_state
= None
2506 db_vnfrs
= {} # vnf's info indexed by member-index
2508 tasks_dict_info
= {} # from task to info text
2512 "Stage 1/5: preparation of the environment.",
2513 "Waiting for previous operations to terminate.",
2516 # ^ stage, step, VIM progress
2518 # wait for any previous tasks in process
2519 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2521 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2522 stage
[1] = "Reading from database."
2523 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2524 db_nsr_update
["detailed-status"] = "creating"
2525 db_nsr_update
["operational-status"] = "init"
2526 self
._write
_ns
_status
(
2528 ns_state
="BUILDING",
2529 current_operation
="INSTANTIATING",
2530 current_operation_id
=nslcmop_id
,
2531 other_update
=db_nsr_update
,
2533 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2535 # read from db: operation
2536 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2537 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2538 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2539 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2540 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2542 ns_params
= db_nslcmop
.get("operationParams")
2543 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2544 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2547 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2548 self
.logger
.debug(logging_text
+ stage
[1])
2549 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2550 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2551 self
.logger
.debug(logging_text
+ stage
[1])
2552 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2553 self
.fs
.sync(db_nsr
["nsd-id"])
2555 # nsr_name = db_nsr["name"] # TODO short-name??
2557 # read from db: vnf's of this ns
2558 stage
[1] = "Getting vnfrs from db."
2559 self
.logger
.debug(logging_text
+ stage
[1])
2560 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2562 # read from db: vnfd's for every vnf
2563 db_vnfds
= [] # every vnfd data
2565 # for each vnf in ns, read vnfd
2566 for vnfr
in db_vnfrs_list
:
2567 if vnfr
.get("kdur"):
2569 for kdur
in vnfr
["kdur"]:
2570 if kdur
.get("additionalParams"):
2571 kdur
["additionalParams"] = json
.loads(
2572 kdur
["additionalParams"]
2574 kdur_list
.append(kdur
)
2575 vnfr
["kdur"] = kdur_list
2577 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2578 vnfd_id
= vnfr
["vnfd-id"]
2579 vnfd_ref
= vnfr
["vnfd-ref"]
2580 self
.fs
.sync(vnfd_id
)
2582 # if we haven't this vnfd, read it from db
2583 if vnfd_id
not in db_vnfds
:
2585 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2588 self
.logger
.debug(logging_text
+ stage
[1])
2589 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2592 db_vnfds
.append(vnfd
)
2594 # Get or generates the _admin.deployed.VCA list
2595 vca_deployed_list
= None
2596 if db_nsr
["_admin"].get("deployed"):
2597 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2598 if vca_deployed_list
is None:
2599 vca_deployed_list
= []
2600 configuration_status_list
= []
2601 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2602 db_nsr_update
["configurationStatus"] = configuration_status_list
2603 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2604 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2605 elif isinstance(vca_deployed_list
, dict):
2606 # maintain backward compatibility. Change a dict to list at database
2607 vca_deployed_list
= list(vca_deployed_list
.values())
2608 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2609 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2612 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2614 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2615 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2617 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2618 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2619 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2621 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2624 # n2vc_redesign STEP 2 Deploy Network Scenario
2625 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2626 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2628 stage
[1] = "Deploying KDUs."
2629 # self.logger.debug(logging_text + "Before deploy_kdus")
2630 # Call to deploy_kdus in case exists the "vdu:kdu" param
2631 await self
.deploy_kdus(
2632 logging_text
=logging_text
,
2634 nslcmop_id
=nslcmop_id
,
2637 task_instantiation_info
=tasks_dict_info
,
2640 stage
[1] = "Getting VCA public key."
2641 # n2vc_redesign STEP 1 Get VCA public ssh-key
2642 # feature 1429. Add n2vc public key to needed VMs
2643 n2vc_key
= self
.n2vc
.get_public_key()
2644 n2vc_key_list
= [n2vc_key
]
2645 if self
.vca_config
.public_key
:
2646 n2vc_key_list
.append(self
.vca_config
.public_key
)
2648 stage
[1] = "Deploying NS at VIM."
2649 task_ro
= asyncio
.ensure_future(
2650 self
.instantiate_RO(
2651 logging_text
=logging_text
,
2655 db_nslcmop
=db_nslcmop
,
2658 n2vc_key_list
=n2vc_key_list
,
2662 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2663 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2665 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2666 stage
[1] = "Deploying Execution Environments."
2667 self
.logger
.debug(logging_text
+ stage
[1])
2669 # create namespace and certificate if any helm based EE is present in the NS
2670 if check_helm_ee_in_ns(db_vnfds
):
2671 await self
.vca_map
["helm-v3"].setup_ns_namespace(
2674 # create TLS certificates
2675 await self
.vca_map
["helm-v3"].create_tls_certificate(
2676 secret_name
=self
.EE_TLS_NAME
,
2679 usage
="server auth",
2683 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2684 for vnf_profile
in get_vnf_profiles(nsd
):
2685 vnfd_id
= vnf_profile
["vnfd-id"]
2686 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2687 member_vnf_index
= str(vnf_profile
["id"])
2688 db_vnfr
= db_vnfrs
[member_vnf_index
]
2689 base_folder
= vnfd
["_admin"]["storage"]
2696 # Get additional parameters
2697 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2698 if db_vnfr
.get("additionalParamsForVnf"):
2699 deploy_params
.update(
2700 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2703 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2704 if descriptor_config
:
2706 logging_text
=logging_text
2707 + "member_vnf_index={} ".format(member_vnf_index
),
2710 nslcmop_id
=nslcmop_id
,
2716 member_vnf_index
=member_vnf_index
,
2717 vdu_index
=vdu_index
,
2718 kdu_index
=kdu_index
,
2720 deploy_params
=deploy_params
,
2721 descriptor_config
=descriptor_config
,
2722 base_folder
=base_folder
,
2723 task_instantiation_info
=tasks_dict_info
,
2727 # Deploy charms for each VDU that supports one.
2728 for vdud
in get_vdu_list(vnfd
):
2730 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2731 vdur
= find_in_list(
2732 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2735 if vdur
.get("additionalParams"):
2736 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2738 deploy_params_vdu
= deploy_params
2739 deploy_params_vdu
["OSM"] = get_osm_params(
2740 db_vnfr
, vdu_id
, vdu_count_index
=0
2742 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2744 self
.logger
.debug("VDUD > {}".format(vdud
))
2746 "Descriptor config > {}".format(descriptor_config
)
2748 if descriptor_config
:
2752 for vdu_index
in range(vdud_count
):
2753 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2755 logging_text
=logging_text
2756 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2757 member_vnf_index
, vdu_id
, vdu_index
2761 nslcmop_id
=nslcmop_id
,
2767 kdu_index
=kdu_index
,
2768 member_vnf_index
=member_vnf_index
,
2769 vdu_index
=vdu_index
,
2771 deploy_params
=deploy_params_vdu
,
2772 descriptor_config
=descriptor_config
,
2773 base_folder
=base_folder
,
2774 task_instantiation_info
=tasks_dict_info
,
2777 for kdud
in get_kdu_list(vnfd
):
2778 kdu_name
= kdud
["name"]
2779 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2780 if descriptor_config
:
2784 kdu_index
, kdur
= next(
2786 for x
in enumerate(db_vnfr
["kdur"])
2787 if x
[1]["kdu-name"] == kdu_name
2789 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2790 if kdur
.get("additionalParams"):
2791 deploy_params_kdu
.update(
2792 parse_yaml_strings(kdur
["additionalParams"].copy())
2796 logging_text
=logging_text
,
2799 nslcmop_id
=nslcmop_id
,
2805 member_vnf_index
=member_vnf_index
,
2806 vdu_index
=vdu_index
,
2807 kdu_index
=kdu_index
,
2809 deploy_params
=deploy_params_kdu
,
2810 descriptor_config
=descriptor_config
,
2811 base_folder
=base_folder
,
2812 task_instantiation_info
=tasks_dict_info
,
2816 # Check if each vnf has exporter for metric collection if so update prometheus job records
2817 if "exporters-endpoints" in vnfd
.get("df")[0]:
2818 exporter_config
= vnfd
.get("df")[0].get("exporters-endpoints")
2819 self
.logger
.debug("exporter config :{}".format(exporter_config
))
2820 artifact_path
= "{}/{}/{}".format(
2821 base_folder
["folder"],
2822 base_folder
["pkg-dir"],
2823 "exporter-endpoint",
2826 ee_config_descriptor
= exporter_config
2827 vnfr_id
= db_vnfr
["id"]
2828 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2837 self
.logger
.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip
))
2838 self
.logger
.debug("Artifact_path:{}".format(artifact_path
))
2839 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
2840 vdu_id_for_prom
= None
2841 vdu_index_for_prom
= None
2842 for x
in get_iterable(db_vnfr
, "vdur"):
2843 vdu_id_for_prom
= x
.get("vdu-id-ref")
2844 vdu_index_for_prom
= x
.get("count-index")
2845 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2847 artifact_path
=artifact_path
,
2848 ee_config_descriptor
=ee_config_descriptor
,
2851 target_ip
=rw_mgmt_ip
,
2853 vdu_id
=vdu_id_for_prom
,
2854 vdu_index
=vdu_index_for_prom
,
2857 self
.logger
.debug("Prometheus job:{}".format(prometheus_jobs
))
2859 db_nsr_update
["_admin.deployed.prometheus_jobs"] = prometheus_jobs
2866 for job
in prometheus_jobs
:
2869 {"job_name": job
["job_name"]},
2872 fail_on_empty
=False,
2875 # Check if this NS has a charm configuration
2876 descriptor_config
= nsd
.get("ns-configuration")
2877 if descriptor_config
and descriptor_config
.get("juju"):
2880 member_vnf_index
= None
2887 # Get additional parameters
2888 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2889 if db_nsr
.get("additionalParamsForNs"):
2890 deploy_params
.update(
2891 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2893 base_folder
= nsd
["_admin"]["storage"]
2895 logging_text
=logging_text
,
2898 nslcmop_id
=nslcmop_id
,
2904 member_vnf_index
=member_vnf_index
,
2905 vdu_index
=vdu_index
,
2906 kdu_index
=kdu_index
,
2908 deploy_params
=deploy_params
,
2909 descriptor_config
=descriptor_config
,
2910 base_folder
=base_folder
,
2911 task_instantiation_info
=tasks_dict_info
,
2915 # rest of staff will be done at finally
2918 ROclient
.ROClientException
,
2924 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2927 except asyncio
.CancelledError
:
2929 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2931 exc
= "Operation was cancelled"
2932 except Exception as e
:
2933 exc
= traceback
.format_exc()
2934 self
.logger
.critical(
2935 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2940 error_list
.append(str(exc
))
2942 # wait for pending tasks
2944 stage
[1] = "Waiting for instantiate pending tasks."
2945 self
.logger
.debug(logging_text
+ stage
[1])
2946 error_list
+= await self
._wait
_for
_tasks
(
2954 stage
[1] = stage
[2] = ""
2955 except asyncio
.CancelledError
:
2956 error_list
.append("Cancelled")
2957 await self
._cancel
_pending
_tasks
(logging_text
, tasks_dict_info
)
2958 await self
._wait
_for
_tasks
(
2966 except Exception as exc
:
2967 error_list
.append(str(exc
))
2969 # update operation-status
2970 db_nsr_update
["operational-status"] = "running"
2971 # let's begin with VCA 'configured' status (later we can change it)
2972 db_nsr_update
["config-status"] = "configured"
2973 for task
, task_name
in tasks_dict_info
.items():
2974 if not task
.done() or task
.cancelled() or task
.exception():
2975 if task_name
.startswith(self
.task_name_deploy_vca
):
2976 # A N2VC task is pending
2977 db_nsr_update
["config-status"] = "failed"
2979 # RO or KDU task is pending
2980 db_nsr_update
["operational-status"] = "failed"
2982 # update status at database
2984 error_detail
= ". ".join(error_list
)
2985 self
.logger
.error(logging_text
+ error_detail
)
2986 error_description_nslcmop
= "{} Detail: {}".format(
2987 stage
[0], error_detail
2989 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2990 nslcmop_id
, stage
[0]
2993 db_nsr_update
["detailed-status"] = (
2994 error_description_nsr
+ " Detail: " + error_detail
2996 db_nslcmop_update
["detailed-status"] = error_detail
2997 nslcmop_operation_state
= "FAILED"
3001 error_description_nsr
= error_description_nslcmop
= None
3003 db_nsr_update
["detailed-status"] = "Done"
3004 db_nslcmop_update
["detailed-status"] = "Done"
3005 nslcmop_operation_state
= "COMPLETED"
3006 # Gather auto-healing and auto-scaling alerts for each vnfr
3009 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3011 (sub
for sub
in db_vnfds
if sub
["_id"] == vnfr
["vnfd-id"]), None
3013 healing_alerts
= self
._gather
_vnfr
_healing
_alerts
(vnfr
, vnfd
)
3014 for alert
in healing_alerts
:
3015 self
.logger
.info(f
"Storing healing alert in MongoDB: {alert}")
3016 self
.db
.create("alerts", alert
)
3018 scaling_alerts
= self
._gather
_vnfr
_scaling
_alerts
(vnfr
, vnfd
)
3019 for alert
in scaling_alerts
:
3020 self
.logger
.info(f
"Storing scaling alert in MongoDB: {alert}")
3021 self
.db
.create("alerts", alert
)
3023 alarm_alerts
= self
._gather
_vnfr
_alarm
_alerts
(vnfr
, vnfd
)
3024 for alert
in alarm_alerts
:
3025 self
.logger
.info(f
"Storing VNF alarm alert in MongoDB: {alert}")
3026 self
.db
.create("alerts", alert
)
3028 self
._write
_ns
_status
(
3031 current_operation
="IDLE",
3032 current_operation_id
=None,
3033 error_description
=error_description_nsr
,
3034 error_detail
=error_detail
,
3035 other_update
=db_nsr_update
,
3037 self
._write
_op
_status
(
3040 error_message
=error_description_nslcmop
,
3041 operation_state
=nslcmop_operation_state
,
3042 other_update
=db_nslcmop_update
,
3045 if nslcmop_operation_state
:
3047 await self
.msg
.aiowrite(
3052 "nslcmop_id": nslcmop_id
,
3053 "operationState": nslcmop_operation_state
,
3054 "startTime": db_nslcmop
["startTime"],
3055 "links": db_nslcmop
["links"],
3056 "operationParams": {
3057 "nsInstanceId": nsr_id
,
3058 "nsdId": db_nsr
["nsd-id"],
3062 except Exception as e
:
3064 logging_text
+ "kafka_write notification Exception {}".format(e
)
3067 self
.logger
.debug(logging_text
+ "Exit")
3068 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
3070 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
3071 if vnfd_id
not in cached_vnfds
:
3072 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
3073 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
3075 return cached_vnfds
[vnfd_id
]
3077 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
3078 if vnf_profile_id
not in cached_vnfrs
:
3079 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
3082 "member-vnf-index-ref": vnf_profile_id
,
3083 "nsr-id-ref": nsr_id
,
3086 return cached_vnfrs
[vnf_profile_id
]
3088 def _is_deployed_vca_in_relation(
3089 self
, vca
: DeployedVCA
, relation
: Relation
3092 for endpoint
in (relation
.provider
, relation
.requirer
):
3093 if endpoint
["kdu-resource-profile-id"]:
3096 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
3097 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
3098 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
3104 def _update_ee_relation_data_with_implicit_data(
3105 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
3107 ee_relation_data
= safe_get_ee_relation(
3108 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
3110 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
3111 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
3112 "execution-environment-ref"
3114 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
3115 vnfd_id
= vnf_profile
["vnfd-id"]
3116 project
= nsd
["_admin"]["projects_read"][0]
3117 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3120 if ee_relation_level
== EELevel
.VNF
3121 else ee_relation_data
["vdu-profile-id"]
3123 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
3126 f
"not execution environments found for ee_relation {ee_relation_data}"
3128 ee_relation_data
["execution-environment-ref"] = ee
["id"]
3129 return ee_relation_data
3131 def _get_ns_relations(
3134 nsd
: Dict
[str, Any
],
3136 cached_vnfds
: Dict
[str, Any
],
3137 ) -> List
[Relation
]:
3139 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3140 for r
in db_ns_relations
:
3141 provider_dict
= None
3142 requirer_dict
= None
3143 if all(key
in r
for key
in ("provider", "requirer")):
3144 provider_dict
= r
["provider"]
3145 requirer_dict
= r
["requirer"]
3146 elif "entities" in r
:
3147 provider_id
= r
["entities"][0]["id"]
3150 "endpoint": r
["entities"][0]["endpoint"],
3152 if provider_id
!= nsd
["id"]:
3153 provider_dict
["vnf-profile-id"] = provider_id
3154 requirer_id
= r
["entities"][1]["id"]
3157 "endpoint": r
["entities"][1]["endpoint"],
3159 if requirer_id
!= nsd
["id"]:
3160 requirer_dict
["vnf-profile-id"] = requirer_id
3163 "provider/requirer or entities must be included in the relation."
3165 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3166 nsr_id
, nsd
, provider_dict
, cached_vnfds
3168 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3169 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3171 provider
= EERelation(relation_provider
)
3172 requirer
= EERelation(relation_requirer
)
3173 relation
= Relation(r
["name"], provider
, requirer
)
3174 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3176 relations
.append(relation
)
3179 def _get_vnf_relations(
3182 nsd
: Dict
[str, Any
],
3184 cached_vnfds
: Dict
[str, Any
],
3185 ) -> List
[Relation
]:
3187 if vca
.target_element
== "ns":
3188 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3190 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3191 vnf_profile_id
= vnf_profile
["id"]
3192 vnfd_id
= vnf_profile
["vnfd-id"]
3193 project
= nsd
["_admin"]["projects_read"][0]
3194 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3195 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3196 for r
in db_vnf_relations
:
3197 provider_dict
= None
3198 requirer_dict
= None
3199 if all(key
in r
for key
in ("provider", "requirer")):
3200 provider_dict
= r
["provider"]
3201 requirer_dict
= r
["requirer"]
3202 elif "entities" in r
:
3203 provider_id
= r
["entities"][0]["id"]
3206 "vnf-profile-id": vnf_profile_id
,
3207 "endpoint": r
["entities"][0]["endpoint"],
3209 if provider_id
!= vnfd_id
:
3210 provider_dict
["vdu-profile-id"] = provider_id
3211 requirer_id
= r
["entities"][1]["id"]
3214 "vnf-profile-id": vnf_profile_id
,
3215 "endpoint": r
["entities"][1]["endpoint"],
3217 if requirer_id
!= vnfd_id
:
3218 requirer_dict
["vdu-profile-id"] = requirer_id
3221 "provider/requirer or entities must be included in the relation."
3223 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3224 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3226 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3227 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3229 provider
= EERelation(relation_provider
)
3230 requirer
= EERelation(relation_requirer
)
3231 relation
= Relation(r
["name"], provider
, requirer
)
3232 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3234 relations
.append(relation
)
3237 def _get_kdu_resource_data(
3239 ee_relation
: EERelation
,
3240 db_nsr
: Dict
[str, Any
],
3241 cached_vnfds
: Dict
[str, Any
],
3242 ) -> DeployedK8sResource
:
3243 nsd
= get_nsd(db_nsr
)
3244 vnf_profiles
= get_vnf_profiles(nsd
)
3245 vnfd_id
= find_in_list(
3247 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3249 project
= nsd
["_admin"]["projects_read"][0]
3250 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3251 kdu_resource_profile
= get_kdu_resource_profile(
3252 db_vnfd
, ee_relation
.kdu_resource_profile_id
3254 kdu_name
= kdu_resource_profile
["kdu-name"]
3255 deployed_kdu
, _
= get_deployed_kdu(
3256 db_nsr
.get("_admin", ()).get("deployed", ()),
3258 ee_relation
.vnf_profile_id
,
3260 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3263 def _get_deployed_component(
3265 ee_relation
: EERelation
,
3266 db_nsr
: Dict
[str, Any
],
3267 cached_vnfds
: Dict
[str, Any
],
3268 ) -> DeployedComponent
:
3269 nsr_id
= db_nsr
["_id"]
3270 deployed_component
= None
3271 ee_level
= EELevel
.get_level(ee_relation
)
3272 if ee_level
== EELevel
.NS
:
3273 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3275 deployed_component
= DeployedVCA(nsr_id
, vca
)
3276 elif ee_level
== EELevel
.VNF
:
3277 vca
= get_deployed_vca(
3281 "member-vnf-index": ee_relation
.vnf_profile_id
,
3282 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3286 deployed_component
= DeployedVCA(nsr_id
, vca
)
3287 elif ee_level
== EELevel
.VDU
:
3288 vca
= get_deployed_vca(
3291 "vdu_id": ee_relation
.vdu_profile_id
,
3292 "member-vnf-index": ee_relation
.vnf_profile_id
,
3293 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3297 deployed_component
= DeployedVCA(nsr_id
, vca
)
3298 elif ee_level
== EELevel
.KDU
:
3299 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3300 ee_relation
, db_nsr
, cached_vnfds
3302 if kdu_resource_data
:
3303 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3304 return deployed_component
3306 async def _add_relation(
3310 db_nsr
: Dict
[str, Any
],
3311 cached_vnfds
: Dict
[str, Any
],
3312 cached_vnfrs
: Dict
[str, Any
],
3314 deployed_provider
= self
._get
_deployed
_component
(
3315 relation
.provider
, db_nsr
, cached_vnfds
3317 deployed_requirer
= self
._get
_deployed
_component
(
3318 relation
.requirer
, db_nsr
, cached_vnfds
3322 and deployed_requirer
3323 and deployed_provider
.config_sw_installed
3324 and deployed_requirer
.config_sw_installed
3326 provider_db_vnfr
= (
3328 relation
.provider
.nsr_id
,
3329 relation
.provider
.vnf_profile_id
,
3332 if relation
.provider
.vnf_profile_id
3335 requirer_db_vnfr
= (
3337 relation
.requirer
.nsr_id
,
3338 relation
.requirer
.vnf_profile_id
,
3341 if relation
.requirer
.vnf_profile_id
3344 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3345 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3346 provider_relation_endpoint
= RelationEndpoint(
3347 deployed_provider
.ee_id
,
3349 relation
.provider
.endpoint
,
3351 requirer_relation_endpoint
= RelationEndpoint(
3352 deployed_requirer
.ee_id
,
3354 relation
.requirer
.endpoint
,
3357 await self
.vca_map
[vca_type
].add_relation(
3358 provider
=provider_relation_endpoint
,
3359 requirer
=requirer_relation_endpoint
,
3361 except N2VCException
as exception
:
3362 self
.logger
.error(exception
)
3363 raise LcmException(exception
)
3367 async def _add_vca_relations(
3373 timeout
: int = 3600,
3376 # 1. find all relations for this VCA
3377 # 2. wait for other peers related
3381 # STEP 1: find all relations for this VCA
3384 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3385 nsd
= get_nsd(db_nsr
)
3388 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3389 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3394 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3395 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3397 # if no relations, terminate
3399 self
.logger
.debug(logging_text
+ " No relations")
3402 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3409 if now
- start
>= timeout
:
3410 self
.logger
.error(logging_text
+ " : timeout adding relations")
3413 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3414 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3416 # for each relation, find the VCA's related
3417 for relation
in relations
.copy():
3418 added
= await self
._add
_relation
(
3426 relations
.remove(relation
)
3429 self
.logger
.debug("Relations added")
3431 await asyncio
.sleep(5.0)
3435 except Exception as e
:
3436 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3439 async def _install_kdu(
3447 k8s_instance_info
: dict,
3448 k8params
: dict = None,
3453 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3456 "collection": "nsrs",
3457 "filter": {"_id": nsr_id
},
3458 "path": nsr_db_path
,
3461 if k8s_instance_info
.get("kdu-deployment-name"):
3462 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3464 kdu_instance
= self
.k8scluster_map
[
3466 ].generate_kdu_instance_name(
3467 db_dict
=db_dict_install
,
3468 kdu_model
=k8s_instance_info
["kdu-model"],
3469 kdu_name
=k8s_instance_info
["kdu-name"],
3472 # Update the nsrs table with the kdu-instance value
3476 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3479 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3480 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3481 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3482 # namespace, this first verification could be removed, and the next step would be done for any kind
3484 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3485 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3486 if k8sclustertype
in ("juju", "juju-bundle"):
3487 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3488 # that the user passed a namespace which he wants its KDU to be deployed in)
3494 "_admin.projects_write": k8s_instance_info
["namespace"],
3495 "_admin.projects_read": k8s_instance_info
["namespace"],
3501 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3506 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3508 k8s_instance_info
["namespace"] = kdu_instance
3510 await self
.k8scluster_map
[k8sclustertype
].install(
3511 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3512 kdu_model
=k8s_instance_info
["kdu-model"],
3515 db_dict
=db_dict_install
,
3517 kdu_name
=k8s_instance_info
["kdu-name"],
3518 namespace
=k8s_instance_info
["namespace"],
3519 kdu_instance
=kdu_instance
,
3523 # Obtain services to obtain management service ip
3524 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3525 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3526 kdu_instance
=kdu_instance
,
3527 namespace
=k8s_instance_info
["namespace"],
3530 # Obtain management service info (if exists)
3531 vnfr_update_dict
= {}
3532 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3534 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3539 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3542 for service
in kdud
.get("service", [])
3543 if service
.get("mgmt-service")
3545 for mgmt_service
in mgmt_services
:
3546 for service
in services
:
3547 if service
["name"].startswith(mgmt_service
["name"]):
3548 # Mgmt service found, Obtain service ip
3549 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3550 if isinstance(ip
, list) and len(ip
) == 1:
3554 "kdur.{}.ip-address".format(kdu_index
)
3557 # Check if must update also mgmt ip at the vnf
3558 service_external_cp
= mgmt_service
.get(
3559 "external-connection-point-ref"
3561 if service_external_cp
:
3563 deep_get(vnfd
, ("mgmt-interface", "cp"))
3564 == service_external_cp
3566 vnfr_update_dict
["ip-address"] = ip
3571 "external-connection-point-ref", ""
3573 == service_external_cp
,
3576 "kdur.{}.ip-address".format(kdu_index
)
3581 "Mgmt service name: {} not found".format(
3582 mgmt_service
["name"]
3586 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3587 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3589 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3592 and kdu_config
.get("initial-config-primitive")
3593 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3595 initial_config_primitive_list
= kdu_config
.get(
3596 "initial-config-primitive"
3598 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3600 for initial_config_primitive
in initial_config_primitive_list
:
3601 primitive_params_
= self
._map
_primitive
_params
(
3602 initial_config_primitive
, {}, {}
3605 await asyncio
.wait_for(
3606 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3607 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3608 kdu_instance
=kdu_instance
,
3609 primitive_name
=initial_config_primitive
["name"],
3610 params
=primitive_params_
,
3611 db_dict
=db_dict_install
,
3617 except Exception as e
:
3618 # Prepare update db with error and raise exception
3621 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3625 vnfr_data
.get("_id"),
3626 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3628 except Exception as error
:
3629 # ignore to keep original exception
3630 self
.logger
.warning(
3631 f
"An exception occurred while updating DB: {str(error)}"
3633 # reraise original error
3638 async def deploy_kdus(
3645 task_instantiation_info
,
3647 # Launch kdus if present in the descriptor
3649 k8scluster_id_2_uuic
= {
3650 "helm-chart-v3": {},
3654 async def _get_cluster_id(cluster_id
, cluster_type
):
3655 nonlocal k8scluster_id_2_uuic
3656 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3657 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3659 # check if K8scluster is creating and wait look if previous tasks in process
3660 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3661 "k8scluster", cluster_id
3664 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3665 task_name
, cluster_id
3667 self
.logger
.debug(logging_text
+ text
)
3668 await asyncio
.wait(task_dependency
, timeout
=3600)
3670 db_k8scluster
= self
.db
.get_one(
3671 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3673 if not db_k8scluster
:
3674 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3676 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3678 if cluster_type
== "helm-chart-v3":
3680 # backward compatibility for existing clusters that have not been initialized for helm v3
3681 k8s_credentials
= yaml
.safe_dump(
3682 db_k8scluster
.get("credentials")
3684 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3685 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3687 db_k8scluster_update
= {}
3688 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3689 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3690 db_k8scluster_update
[
3691 "_admin.helm-chart-v3.created"
3693 db_k8scluster_update
[
3694 "_admin.helm-chart-v3.operationalState"
3697 "k8sclusters", cluster_id
, db_k8scluster_update
3699 except Exception as e
:
3702 + "error initializing helm-v3 cluster: {}".format(str(e
))
3705 "K8s cluster '{}' has not been initialized for '{}'".format(
3706 cluster_id
, cluster_type
3711 "K8s cluster '{}' has not been initialized for '{}'".format(
3712 cluster_id
, cluster_type
3715 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3718 logging_text
+= "Deploy kdus: "
3721 db_nsr_update
= {"_admin.deployed.K8s": []}
3722 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3725 updated_cluster_list
= []
3726 updated_v3_cluster_list
= []
3728 for vnfr_data
in db_vnfrs
.values():
3729 vca_id
= self
.get_vca_id(vnfr_data
, {})
3730 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3731 # Step 0: Prepare and set parameters
3732 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3733 vnfd_id
= vnfr_data
.get("vnfd-id")
3734 vnfd_with_id
= find_in_list(
3735 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3739 for kdud
in vnfd_with_id
["kdu"]
3740 if kdud
["name"] == kdur
["kdu-name"]
3742 namespace
= kdur
.get("k8s-namespace")
3743 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3744 if kdur
.get("helm-chart"):
3745 kdumodel
= kdur
["helm-chart"]
3746 # Default version: helm3, if helm-version is v2 assign v2
3747 k8sclustertype
= "helm-chart-v3"
3748 self
.logger
.debug("kdur: {}".format(kdur
))
3749 elif kdur
.get("juju-bundle"):
3750 kdumodel
= kdur
["juju-bundle"]
3751 k8sclustertype
= "juju-bundle"
3754 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3755 "juju-bundle. Maybe an old NBI version is running".format(
3756 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3759 # check if kdumodel is a file and exists
3761 vnfd_with_id
= find_in_list(
3762 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3764 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3765 if storage
: # may be not present if vnfd has not artifacts
3766 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3767 if storage
["pkg-dir"]:
3768 filename
= "{}/{}/{}s/{}".format(
3775 filename
= "{}/Scripts/{}s/{}".format(
3780 if self
.fs
.file_exists(
3781 filename
, mode
="file"
3782 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3783 kdumodel
= self
.fs
.path
+ filename
3784 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3786 except Exception as e
: # it is not a file
3787 self
.logger
.warning(f
"An exception occurred: {str(e)}")
3789 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3790 step
= "Synchronize repos for k8s cluster '{}'".format(
3793 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3797 k8sclustertype
== "helm-chart"
3798 and cluster_uuid
not in updated_cluster_list
3800 k8sclustertype
== "helm-chart-v3"
3801 and cluster_uuid
not in updated_v3_cluster_list
3803 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3804 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3805 cluster_uuid
=cluster_uuid
3808 if del_repo_list
or added_repo_dict
:
3809 if k8sclustertype
== "helm-chart":
3811 "_admin.helm_charts_added." + item
: None
3812 for item
in del_repo_list
3815 "_admin.helm_charts_added." + item
: name
3816 for item
, name
in added_repo_dict
.items()
3818 updated_cluster_list
.append(cluster_uuid
)
3819 elif k8sclustertype
== "helm-chart-v3":
3821 "_admin.helm_charts_v3_added." + item
: None
3822 for item
in del_repo_list
3825 "_admin.helm_charts_v3_added." + item
: name
3826 for item
, name
in added_repo_dict
.items()
3828 updated_v3_cluster_list
.append(cluster_uuid
)
3830 logging_text
+ "repos synchronized on k8s cluster "
3831 "'{}' to_delete: {}, to_add: {}".format(
3832 k8s_cluster_id
, del_repo_list
, added_repo_dict
3837 {"_id": k8s_cluster_id
},
3843 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3844 vnfr_data
["member-vnf-index-ref"],
3848 k8s_instance_info
= {
3849 "kdu-instance": None,
3850 "k8scluster-uuid": cluster_uuid
,
3851 "k8scluster-type": k8sclustertype
,
3852 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3853 "kdu-name": kdur
["kdu-name"],
3854 "kdu-model": kdumodel
,
3855 "namespace": namespace
,
3856 "kdu-deployment-name": kdu_deployment_name
,
3858 db_path
= "_admin.deployed.K8s.{}".format(index
)
3859 db_nsr_update
[db_path
] = k8s_instance_info
3860 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3861 vnfd_with_id
= find_in_list(
3862 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3864 task
= asyncio
.ensure_future(
3873 k8params
=desc_params
,
3878 self
.lcm_tasks
.register(
3882 "instantiate_KDU-{}".format(index
),
3885 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3891 except (LcmException
, asyncio
.CancelledError
):
3893 except Exception as e
:
3894 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3895 if isinstance(e
, (N2VCException
, DbException
)):
3896 self
.logger
.error(logging_text
+ msg
)
3898 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3899 raise LcmException(msg
)
3902 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3922 task_instantiation_info
,
3925 # launch instantiate_N2VC in a asyncio task and register task object
3926 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3927 # if not found, create one entry and update database
3928 # fill db_nsr._admin.deployed.VCA.<index>
3931 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3935 get_charm_name
= False
3936 if "execution-environment-list" in descriptor_config
:
3937 ee_list
= descriptor_config
.get("execution-environment-list", [])
3938 elif "juju" in descriptor_config
:
3939 ee_list
= [descriptor_config
] # ns charms
3940 if "execution-environment-list" not in descriptor_config
:
3941 # charm name is only required for ns charms
3942 get_charm_name
= True
3943 else: # other types as script are not supported
3946 for ee_item
in ee_list
:
3949 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3950 ee_item
.get("juju"), ee_item
.get("helm-chart")
3953 ee_descriptor_id
= ee_item
.get("id")
3954 if ee_item
.get("juju"):
3955 vca_name
= ee_item
["juju"].get("charm")
3957 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3960 if ee_item
["juju"].get("charm") is not None
3963 if ee_item
["juju"].get("cloud") == "k8s":
3964 vca_type
= "k8s_proxy_charm"
3965 elif ee_item
["juju"].get("proxy") is False:
3966 vca_type
= "native_charm"
3967 elif ee_item
.get("helm-chart"):
3968 vca_name
= ee_item
["helm-chart"]
3969 vca_type
= "helm-v3"
3972 logging_text
+ "skipping non juju neither charm configuration"
3977 for vca_index
, vca_deployed
in enumerate(
3978 db_nsr
["_admin"]["deployed"]["VCA"]
3980 if not vca_deployed
:
3983 vca_deployed
.get("member-vnf-index") == member_vnf_index
3984 and vca_deployed
.get("vdu_id") == vdu_id
3985 and vca_deployed
.get("kdu_name") == kdu_name
3986 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3987 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3991 # not found, create one.
3993 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3996 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3998 target
+= "/kdu/{}".format(kdu_name
)
4000 "target_element": target
,
4001 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
4002 "member-vnf-index": member_vnf_index
,
4004 "kdu_name": kdu_name
,
4005 "vdu_count_index": vdu_index
,
4006 "operational-status": "init", # TODO revise
4007 "detailed-status": "", # TODO revise
4008 "step": "initial-deploy", # TODO revise
4010 "vdu_name": vdu_name
,
4012 "ee_descriptor_id": ee_descriptor_id
,
4013 "charm_name": charm_name
,
4017 # create VCA and configurationStatus in db
4019 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
4020 "configurationStatus.{}".format(vca_index
): dict(),
4022 self
.update_db_2("nsrs", nsr_id
, db_dict
)
4024 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
4026 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
4027 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
4028 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
4031 task_n2vc
= asyncio
.ensure_future(
4032 self
.instantiate_N2VC(
4033 logging_text
=logging_text
,
4034 vca_index
=vca_index
,
4040 vdu_index
=vdu_index
,
4041 kdu_index
=kdu_index
,
4042 deploy_params
=deploy_params
,
4043 config_descriptor
=descriptor_config
,
4044 base_folder
=base_folder
,
4045 nslcmop_id
=nslcmop_id
,
4049 ee_config_descriptor
=ee_item
,
4052 self
.lcm_tasks
.register(
4056 "instantiate_N2VC-{}".format(vca_index
),
4059 task_instantiation_info
[
4061 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
4062 member_vnf_index
or "", vdu_id
or ""
4065 def _format_additional_params(self
, params
):
4066 params
= params
or {}
4067 for key
, value
in params
.items():
4068 if str(value
).startswith("!!yaml "):
4069 params
[key
] = yaml
.safe_load(value
[7:])
4072 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
4073 primitive
= seq
.get("name")
4074 primitive_params
= {}
4076 "member_vnf_index": vnf_index
,
4077 "primitive": primitive
,
4078 "primitive_params": primitive_params
,
4081 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
4085 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
4086 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
4087 if op
.get("operationState") == "COMPLETED":
4088 # b. Skip sub-operation
4089 # _ns_execute_primitive() or RO.create_action() will NOT be executed
4090 return self
.SUBOPERATION_STATUS_SKIP
4092 # c. retry executing sub-operation
4093 # The sub-operation exists, and operationState != 'COMPLETED'
4094 # Update operationState = 'PROCESSING' to indicate a retry.
4095 operationState
= "PROCESSING"
4096 detailed_status
= "In progress"
4097 self
._update
_suboperation
_status
(
4098 db_nslcmop
, op_index
, operationState
, detailed_status
4100 # Return the sub-operation index
4101 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4102 # with arguments extracted from the sub-operation
4105 # Find a sub-operation where all keys in a matching dictionary must match
4106 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4107 def _find_suboperation(self
, db_nslcmop
, match
):
4108 if db_nslcmop
and match
:
4109 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4110 for i
, op
in enumerate(op_list
):
4111 if all(op
.get(k
) == match
[k
] for k
in match
):
4113 return self
.SUBOPERATION_STATUS_NOT_FOUND
4115 # Update status for a sub-operation given its index
4116 def _update_suboperation_status(
4117 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4119 # Update DB for HA tasks
4120 q_filter
= {"_id": db_nslcmop
["_id"]}
4122 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4123 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4126 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4129 # Add sub-operation, return the index of the added sub-operation
4130 # Optionally, set operationState, detailed-status, and operationType
4131 # Status and type are currently set for 'scale' sub-operations:
4132 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4133 # 'detailed-status' : status message
4134 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4135 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4136 def _add_suboperation(
4144 mapped_primitive_params
,
4145 operationState
=None,
4146 detailed_status
=None,
4149 RO_scaling_info
=None,
4152 return self
.SUBOPERATION_STATUS_NOT_FOUND
4153 # Get the "_admin.operations" list, if it exists
4154 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4155 op_list
= db_nslcmop_admin
.get("operations")
4156 # Create or append to the "_admin.operations" list
4158 "member_vnf_index": vnf_index
,
4160 "vdu_count_index": vdu_count_index
,
4161 "primitive": primitive
,
4162 "primitive_params": mapped_primitive_params
,
4165 new_op
["operationState"] = operationState
4167 new_op
["detailed-status"] = detailed_status
4169 new_op
["lcmOperationType"] = operationType
4171 new_op
["RO_nsr_id"] = RO_nsr_id
4173 new_op
["RO_scaling_info"] = RO_scaling_info
4175 # No existing operations, create key 'operations' with current operation as first list element
4176 db_nslcmop_admin
.update({"operations": [new_op
]})
4177 op_list
= db_nslcmop_admin
.get("operations")
4179 # Existing operations, append operation to list
4180 op_list
.append(new_op
)
4182 db_nslcmop_update
= {"_admin.operations": op_list
}
4183 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4184 op_index
= len(op_list
) - 1
4187 # Helper methods for scale() sub-operations
4189 # pre-scale/post-scale:
4190 # Check for 3 different cases:
4191 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4192 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4193 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4194 def _check_or_add_scale_suboperation(
4198 vnf_config_primitive
,
4202 RO_scaling_info
=None,
4204 # Find this sub-operation
4205 if RO_nsr_id
and RO_scaling_info
:
4206 operationType
= "SCALE-RO"
4208 "member_vnf_index": vnf_index
,
4209 "RO_nsr_id": RO_nsr_id
,
4210 "RO_scaling_info": RO_scaling_info
,
4214 "member_vnf_index": vnf_index
,
4215 "primitive": vnf_config_primitive
,
4216 "primitive_params": primitive_params
,
4217 "lcmOperationType": operationType
,
4219 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4220 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4221 # a. New sub-operation
4222 # The sub-operation does not exist, add it.
4223 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4224 # The following parameters are set to None for all kind of scaling:
4226 vdu_count_index
= None
4228 if RO_nsr_id
and RO_scaling_info
:
4229 vnf_config_primitive
= None
4230 primitive_params
= None
4233 RO_scaling_info
= None
4234 # Initial status for sub-operation
4235 operationState
= "PROCESSING"
4236 detailed_status
= "In progress"
4237 # Add sub-operation for pre/post-scaling (zero or more operations)
4238 self
._add
_suboperation
(
4244 vnf_config_primitive
,
4252 return self
.SUBOPERATION_STATUS_NEW
4254 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4255 # or op_index (operationState != 'COMPLETED')
4256 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4258 # Function to return execution_environment id
4260 async def destroy_N2VC(
4268 exec_primitives
=True,
4273 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4274 :param logging_text:
4276 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4277 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4278 :param vca_index: index in the database _admin.deployed.VCA
4279 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4280 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4281 not executed properly
4282 :param scaling_in: True destroys the application, False destroys the model
4283 :return: None or exception
4288 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4289 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4293 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4295 # execute terminate_primitives
4297 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4298 config_descriptor
.get("terminate-config-primitive"),
4299 vca_deployed
.get("ee_descriptor_id"),
4301 vdu_id
= vca_deployed
.get("vdu_id")
4302 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4303 vdu_name
= vca_deployed
.get("vdu_name")
4304 vnf_index
= vca_deployed
.get("member-vnf-index")
4305 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4306 for seq
in terminate_primitives
:
4307 # For each sequence in list, get primitive and call _ns_execute_primitive()
4308 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4309 vnf_index
, seq
.get("name")
4311 self
.logger
.debug(logging_text
+ step
)
4312 # Create the primitive for each sequence, i.e. "primitive": "touch"
4313 primitive
= seq
.get("name")
4314 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4319 self
._add
_suboperation
(
4326 mapped_primitive_params
,
4328 # Sub-operations: Call _ns_execute_primitive() instead of action()
4330 result
, result_detail
= await self
._ns
_execute
_primitive
(
4331 vca_deployed
["ee_id"],
4333 mapped_primitive_params
,
4337 except LcmException
:
4338 # this happens when VCA is not deployed. In this case it is not needed to terminate
4340 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4341 if result
not in result_ok
:
4343 "terminate_primitive {} for vnf_member_index={} fails with "
4344 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4346 # set that this VCA do not need terminated
4347 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4351 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4354 # Delete Prometheus Jobs if any
4355 # This uses NSR_ID, so it will destroy any jobs under this index
4356 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4359 await self
.vca_map
[vca_type
].delete_execution_environment(
4360 vca_deployed
["ee_id"],
4361 scaling_in
=scaling_in
,
4366 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4367 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4368 namespace
= "." + db_nsr
["_id"]
4370 await self
.n2vc
.delete_namespace(
4371 namespace
=namespace
,
4372 total_timeout
=self
.timeout
.charm_delete
,
4375 except N2VCNotFound
: # already deleted. Skip
4377 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4379 async def terminate(self
, nsr_id
, nslcmop_id
):
4380 # Try to lock HA task here
4381 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4382 if not task_is_locked_by_me
:
4385 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4386 self
.logger
.debug(logging_text
+ "Enter")
4387 timeout_ns_terminate
= self
.timeout
.ns_terminate
4390 operation_params
= None
4392 error_list
= [] # annotates all failed error messages
4393 db_nslcmop_update
= {}
4394 autoremove
= False # autoremove after terminated
4395 tasks_dict_info
= {}
4398 "Stage 1/3: Preparing task.",
4399 "Waiting for previous operations to terminate.",
4402 # ^ contains [stage, step, VIM-status]
4404 # wait for any previous tasks in process
4405 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4407 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4408 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4409 operation_params
= db_nslcmop
.get("operationParams") or {}
4410 if operation_params
.get("timeout_ns_terminate"):
4411 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4412 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4413 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4415 db_nsr_update
["operational-status"] = "terminating"
4416 db_nsr_update
["config-status"] = "terminating"
4417 self
._write
_ns
_status
(
4419 ns_state
="TERMINATING",
4420 current_operation
="TERMINATING",
4421 current_operation_id
=nslcmop_id
,
4422 other_update
=db_nsr_update
,
4424 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4425 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4426 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4429 stage
[1] = "Getting vnf descriptors from db."
4430 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4432 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4434 db_vnfds_from_id
= {}
4435 db_vnfds_from_member_index
= {}
4437 for vnfr
in db_vnfrs_list
:
4438 vnfd_id
= vnfr
["vnfd-id"]
4439 if vnfd_id
not in db_vnfds_from_id
:
4440 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4441 db_vnfds_from_id
[vnfd_id
] = vnfd
4442 db_vnfds_from_member_index
[
4443 vnfr
["member-vnf-index-ref"]
4444 ] = db_vnfds_from_id
[vnfd_id
]
4446 # Destroy individual execution environments when there are terminating primitives.
4447 # Rest of EE will be deleted at once
4448 # TODO - check before calling _destroy_N2VC
4449 # if not operation_params.get("skip_terminate_primitives"):#
4450 # or not vca.get("needed_terminate"):
4451 stage
[0] = "Stage 2/3 execute terminating primitives."
4452 self
.logger
.debug(logging_text
+ stage
[0])
4453 stage
[1] = "Looking execution environment that needs terminate."
4454 self
.logger
.debug(logging_text
+ stage
[1])
4456 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4457 config_descriptor
= None
4458 vca_member_vnf_index
= vca
.get("member-vnf-index")
4459 vca_id
= self
.get_vca_id(
4460 db_vnfrs_dict
.get(vca_member_vnf_index
)
4461 if vca_member_vnf_index
4465 if not vca
or not vca
.get("ee_id"):
4467 if not vca
.get("member-vnf-index"):
4469 config_descriptor
= db_nsr
.get("ns-configuration")
4470 elif vca
.get("vdu_id"):
4471 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4472 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4473 elif vca
.get("kdu_name"):
4474 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4475 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4477 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4478 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4479 vca_type
= vca
.get("type")
4480 exec_terminate_primitives
= not operation_params
.get(
4481 "skip_terminate_primitives"
4482 ) and vca
.get("needed_terminate")
4483 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4484 # pending native charms
4485 destroy_ee
= True if vca_type
in ("helm-v3", "native_charm") else False
4486 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4487 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4488 task
= asyncio
.ensure_future(
4496 exec_terminate_primitives
,
4500 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4502 # wait for pending tasks of terminate primitives
4506 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4508 error_list
= await self
._wait
_for
_tasks
(
4511 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4515 tasks_dict_info
.clear()
4517 return # raise LcmException("; ".join(error_list))
4519 # remove All execution environments at once
4520 stage
[0] = "Stage 3/3 delete all."
4522 if nsr_deployed
.get("VCA"):
4523 stage
[1] = "Deleting all execution environments."
4524 self
.logger
.debug(logging_text
+ stage
[1])
4525 vca_id
= self
.get_vca_id({}, db_nsr
)
4526 task_delete_ee
= asyncio
.ensure_future(
4528 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4529 timeout
=self
.timeout
.charm_delete
,
4532 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4533 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4535 # Delete Namespace and Certificates if necessary
4536 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4537 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4538 namespace
=db_nslcmop
["nsInstanceId"],
4539 certificate_name
=self
.EE_TLS_NAME
,
4541 await self
.vca_map
["helm-v3"].delete_namespace(
4542 namespace
=db_nslcmop
["nsInstanceId"],
4545 # Delete from k8scluster
4546 stage
[1] = "Deleting KDUs."
4547 self
.logger
.debug(logging_text
+ stage
[1])
4548 # print(nsr_deployed)
4549 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4550 if not kdu
or not kdu
.get("kdu-instance"):
4552 kdu_instance
= kdu
.get("kdu-instance")
4553 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4554 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4555 vca_id
= self
.get_vca_id({}, db_nsr
)
4556 task_delete_kdu_instance
= asyncio
.ensure_future(
4557 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4558 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4559 kdu_instance
=kdu_instance
,
4561 namespace
=kdu
.get("namespace"),
4567 + "Unknown k8s deployment type {}".format(
4568 kdu
.get("k8scluster-type")
4573 task_delete_kdu_instance
4574 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4577 stage
[1] = "Deleting ns from VIM."
4578 if self
.ro_config
.ng
:
4579 task_delete_ro
= asyncio
.ensure_future(
4580 self
._terminate
_ng
_ro
(
4581 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4584 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4586 # rest of staff will be done at finally
4589 ROclient
.ROClientException
,
4594 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4596 except asyncio
.CancelledError
:
4598 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4600 exc
= "Operation was cancelled"
4601 except Exception as e
:
4602 exc
= traceback
.format_exc()
4603 self
.logger
.critical(
4604 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4609 error_list
.append(str(exc
))
4611 # wait for pending tasks
4613 stage
[1] = "Waiting for terminate pending tasks."
4614 self
.logger
.debug(logging_text
+ stage
[1])
4615 error_list
+= await self
._wait
_for
_tasks
(
4618 timeout_ns_terminate
,
4622 stage
[1] = stage
[2] = ""
4623 except asyncio
.CancelledError
:
4624 error_list
.append("Cancelled")
4625 await self
._cancel
_pending
_tasks
(logging_text
, tasks_dict_info
)
4626 await self
._wait
_for
_tasks
(
4629 timeout_ns_terminate
,
4633 except Exception as exc
:
4634 error_list
.append(str(exc
))
4635 # update status at database
4637 error_detail
= "; ".join(error_list
)
4638 # self.logger.error(logging_text + error_detail)
4639 error_description_nslcmop
= "{} Detail: {}".format(
4640 stage
[0], error_detail
4642 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4643 nslcmop_id
, stage
[0]
4646 db_nsr_update
["operational-status"] = "failed"
4647 db_nsr_update
["detailed-status"] = (
4648 error_description_nsr
+ " Detail: " + error_detail
4650 db_nslcmop_update
["detailed-status"] = error_detail
4651 nslcmop_operation_state
= "FAILED"
4655 error_description_nsr
= error_description_nslcmop
= None
4656 ns_state
= "NOT_INSTANTIATED"
4657 db_nsr_update
["operational-status"] = "terminated"
4658 db_nsr_update
["detailed-status"] = "Done"
4659 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4660 db_nslcmop_update
["detailed-status"] = "Done"
4661 nslcmop_operation_state
= "COMPLETED"
4664 self
._write
_ns
_status
(
4667 current_operation
="IDLE",
4668 current_operation_id
=None,
4669 error_description
=error_description_nsr
,
4670 error_detail
=error_detail
,
4671 other_update
=db_nsr_update
,
4673 self
._write
_op
_status
(
4676 error_message
=error_description_nslcmop
,
4677 operation_state
=nslcmop_operation_state
,
4678 other_update
=db_nslcmop_update
,
4680 if ns_state
== "NOT_INSTANTIATED":
4684 {"nsr-id-ref": nsr_id
},
4685 {"_admin.nsState": "NOT_INSTANTIATED"},
4687 except DbException
as e
:
4690 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4694 if operation_params
:
4695 autoremove
= operation_params
.get("autoremove", False)
4696 if nslcmop_operation_state
:
4698 await self
.msg
.aiowrite(
4703 "nslcmop_id": nslcmop_id
,
4704 "operationState": nslcmop_operation_state
,
4705 "autoremove": autoremove
,
4708 except Exception as e
:
4710 logging_text
+ "kafka_write notification Exception {}".format(e
)
4712 self
.logger
.debug(f
"Deleting alerts: ns_id={nsr_id}")
4713 self
.db
.del_list("alerts", {"tags.ns_id": nsr_id
})
4715 self
.logger
.debug(logging_text
+ "Exit")
4716 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4718 async def _wait_for_tasks(
4719 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4722 error_detail_list
= []
4724 pending_tasks
= list(created_tasks_info
.keys())
4725 num_tasks
= len(pending_tasks
)
4727 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4728 self
._write
_op
_status
(nslcmop_id
, stage
)
4729 while pending_tasks
:
4731 _timeout
= timeout
+ time_start
- time()
4732 done
, pending_tasks
= await asyncio
.wait(
4733 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4735 num_done
+= len(done
)
4736 if not done
: # Timeout
4737 for task
in pending_tasks
:
4738 new_error
= created_tasks_info
[task
] + ": Timeout"
4739 error_detail_list
.append(new_error
)
4740 error_list
.append(new_error
)
4743 if task
.cancelled():
4746 exc
= task
.exception()
4748 if isinstance(exc
, asyncio
.TimeoutError
):
4750 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4751 error_list
.append(created_tasks_info
[task
])
4752 error_detail_list
.append(new_error
)
4759 ROclient
.ROClientException
,
4765 self
.logger
.error(logging_text
+ new_error
)
4767 exc_traceback
= "".join(
4768 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4772 + created_tasks_info
[task
]
4778 logging_text
+ created_tasks_info
[task
] + ": Done"
4780 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4782 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4783 if nsr_id
: # update also nsr
4788 "errorDescription": "Error at: " + ", ".join(error_list
),
4789 "errorDetail": ". ".join(error_detail_list
),
4792 self
._write
_op
_status
(nslcmop_id
, stage
)
4793 return error_detail_list
4795 async def _cancel_pending_tasks(self
, logging_text
, created_tasks_info
):
4796 for task
, name
in created_tasks_info
.items():
4797 self
.logger
.debug(logging_text
+ "Cancelling task: " + name
)
4801 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4803 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4804 The default-value is used. If it is between < > it look for a value at instantiation_params
4805 :param primitive_desc: portion of VNFD/NSD that describes primitive
4806 :param params: Params provided by user
4807 :param instantiation_params: Instantiation params provided by user
4808 :return: a dictionary with the calculated params
4810 calculated_params
= {}
4811 for parameter
in primitive_desc
.get("parameter", ()):
4812 param_name
= parameter
["name"]
4813 if param_name
in params
:
4814 calculated_params
[param_name
] = params
[param_name
]
4815 elif "default-value" in parameter
or "value" in parameter
:
4816 if "value" in parameter
:
4817 calculated_params
[param_name
] = parameter
["value"]
4819 calculated_params
[param_name
] = parameter
["default-value"]
4821 isinstance(calculated_params
[param_name
], str)
4822 and calculated_params
[param_name
].startswith("<")
4823 and calculated_params
[param_name
].endswith(">")
4825 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4826 calculated_params
[param_name
] = instantiation_params
[
4827 calculated_params
[param_name
][1:-1]
4831 "Parameter {} needed to execute primitive {} not provided".format(
4832 calculated_params
[param_name
], primitive_desc
["name"]
4837 "Parameter {} needed to execute primitive {} not provided".format(
4838 param_name
, primitive_desc
["name"]
4842 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4843 calculated_params
[param_name
] = yaml
.safe_dump(
4844 calculated_params
[param_name
], default_flow_style
=True, width
=256
4846 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4848 ].startswith("!!yaml "):
4849 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4850 if parameter
.get("data-type") == "INTEGER":
4852 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4853 except ValueError: # error converting string to int
4855 "Parameter {} of primitive {} must be integer".format(
4856 param_name
, primitive_desc
["name"]
4859 elif parameter
.get("data-type") == "BOOLEAN":
4860 calculated_params
[param_name
] = not (
4861 (str(calculated_params
[param_name
])).lower() == "false"
4864 # add always ns_config_info if primitive name is config
4865 if primitive_desc
["name"] == "config":
4866 if "ns_config_info" in instantiation_params
:
4867 calculated_params
["ns_config_info"] = instantiation_params
[
4870 return calculated_params
4872 def _look_for_deployed_vca(
4879 ee_descriptor_id
=None,
4881 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4882 for vca
in deployed_vca
:
4885 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4888 vdu_count_index
is not None
4889 and vdu_count_index
!= vca
["vdu_count_index"]
4892 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4894 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4898 # vca_deployed not found
4900 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4901 " is not deployed".format(
4910 ee_id
= vca
.get("ee_id")
4912 "type", "lxc_proxy_charm"
4913 ) # default value for backward compatibility - proxy charm
4916 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4917 "execution environment".format(
4918 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4921 return ee_id
, vca_type
4923 async def _ns_execute_primitive(
4929 retries_interval
=30,
4936 if primitive
== "config":
4937 primitive_params
= {"params": primitive_params
}
4939 vca_type
= vca_type
or "lxc_proxy_charm"
4943 output
= await asyncio
.wait_for(
4944 self
.vca_map
[vca_type
].exec_primitive(
4946 primitive_name
=primitive
,
4947 params_dict
=primitive_params
,
4948 progress_timeout
=self
.timeout
.progress_primitive
,
4949 total_timeout
=self
.timeout
.primitive
,
4954 timeout
=timeout
or self
.timeout
.primitive
,
4958 except asyncio
.CancelledError
:
4960 except Exception as e
:
4964 "Error executing action {} on {} -> {}".format(
4969 await asyncio
.sleep(retries_interval
)
4971 if isinstance(e
, asyncio
.TimeoutError
):
4973 message
="Timed out waiting for action to complete"
4975 return "FAILED", getattr(e
, "message", repr(e
))
4977 return "COMPLETED", output
4979 except (LcmException
, asyncio
.CancelledError
):
4981 except Exception as e
:
4982 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4984 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4986 Updating the vca_status with latest juju information in nsrs record
4987 :param: nsr_id: Id of the nsr
4988 :param: nslcmop_id: Id of the nslcmop
4992 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4993 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4994 vca_id
= self
.get_vca_id({}, db_nsr
)
4995 if db_nsr
["_admin"]["deployed"]["K8s"]:
4996 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4997 cluster_uuid
, kdu_instance
, cluster_type
= (
4998 k8s
["k8scluster-uuid"],
4999 k8s
["kdu-instance"],
5000 k8s
["k8scluster-type"],
5002 await self
._on
_update
_k
8s
_db
(
5003 cluster_uuid
=cluster_uuid
,
5004 kdu_instance
=kdu_instance
,
5005 filter={"_id": nsr_id
},
5007 cluster_type
=cluster_type
,
5010 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5011 table
, filter = "nsrs", {"_id": nsr_id
}
5012 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5013 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5015 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5016 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5018 async def action(self
, nsr_id
, nslcmop_id
):
5019 # Try to lock HA task here
5020 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5021 if not task_is_locked_by_me
:
5024 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5025 self
.logger
.debug(logging_text
+ "Enter")
5026 # get all needed from database
5030 db_nslcmop_update
= {}
5031 nslcmop_operation_state
= None
5032 error_description_nslcmop
= None
5036 # wait for any previous tasks in process
5037 step
= "Waiting for previous operations to terminate"
5038 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5040 self
._write
_ns
_status
(
5043 current_operation
="RUNNING ACTION",
5044 current_operation_id
=nslcmop_id
,
5047 step
= "Getting information from database"
5048 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5049 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5050 if db_nslcmop
["operationParams"].get("primitive_params"):
5051 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5052 db_nslcmop
["operationParams"]["primitive_params"]
5055 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5056 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5057 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5058 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5059 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5060 primitive
= db_nslcmop
["operationParams"]["primitive"]
5061 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5062 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5063 "timeout_ns_action", self
.timeout
.primitive
5067 step
= "Getting vnfr from database"
5068 db_vnfr
= self
.db
.get_one(
5069 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5071 if db_vnfr
.get("kdur"):
5073 for kdur
in db_vnfr
["kdur"]:
5074 if kdur
.get("additionalParams"):
5075 kdur
["additionalParams"] = json
.loads(
5076 kdur
["additionalParams"]
5078 kdur_list
.append(kdur
)
5079 db_vnfr
["kdur"] = kdur_list
5080 step
= "Getting vnfd from database"
5081 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5083 # Sync filesystem before running a primitive
5084 self
.fs
.sync(db_vnfr
["vnfd-id"])
5086 step
= "Getting nsd from database"
5087 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5089 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5090 # for backward compatibility
5091 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5092 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5093 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5094 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5096 # look for primitive
5097 config_primitive_desc
= descriptor_configuration
= None
5099 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5101 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5103 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5105 descriptor_configuration
= db_nsd
.get("ns-configuration")
5107 if descriptor_configuration
and descriptor_configuration
.get(
5110 for config_primitive
in descriptor_configuration
["config-primitive"]:
5111 if config_primitive
["name"] == primitive
:
5112 config_primitive_desc
= config_primitive
5115 if not config_primitive_desc
:
5116 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5118 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5122 primitive_name
= primitive
5123 ee_descriptor_id
= None
5125 primitive_name
= config_primitive_desc
.get(
5126 "execution-environment-primitive", primitive
5128 ee_descriptor_id
= config_primitive_desc
.get(
5129 "execution-environment-ref"
5135 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5137 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5140 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5142 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5144 desc_params
= parse_yaml_strings(
5145 db_vnfr
.get("additionalParamsForVnf")
5148 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5149 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5150 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5152 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5153 actions
.add(primitive
["name"])
5154 for primitive
in kdu_configuration
.get("config-primitive", []):
5155 actions
.add(primitive
["name"])
5157 nsr_deployed
["K8s"],
5158 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5159 and kdu
["member-vnf-index"] == vnf_index
,
5163 if primitive_name
in actions
5164 and kdu
["k8scluster-type"] != "helm-chart-v3"
5168 # TODO check if ns is in a proper status
5170 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5172 # kdur and desc_params already set from before
5173 if primitive_params
:
5174 desc_params
.update(primitive_params
)
5175 # TODO Check if we will need something at vnf level
5176 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5178 kdu_name
== kdu
["kdu-name"]
5179 and kdu
["member-vnf-index"] == vnf_index
5184 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5187 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5188 msg
= "unknown k8scluster-type '{}'".format(
5189 kdu
.get("k8scluster-type")
5191 raise LcmException(msg
)
5194 "collection": "nsrs",
5195 "filter": {"_id": nsr_id
},
5196 "path": "_admin.deployed.K8s.{}".format(index
),
5200 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5202 step
= "Executing kdu {}".format(primitive_name
)
5203 if primitive_name
== "upgrade":
5204 if desc_params
.get("kdu_model"):
5205 kdu_model
= desc_params
.get("kdu_model")
5206 del desc_params
["kdu_model"]
5208 kdu_model
= kdu
.get("kdu-model")
5209 if kdu_model
.count("/") < 2: # helm chart is not embedded
5210 parts
= kdu_model
.split(sep
=":")
5212 kdu_model
= parts
[0]
5213 if desc_params
.get("kdu_atomic_upgrade"):
5214 atomic_upgrade
= desc_params
.get(
5215 "kdu_atomic_upgrade"
5216 ).lower() in ("yes", "true", "1")
5217 del desc_params
["kdu_atomic_upgrade"]
5219 atomic_upgrade
= True
5221 detailed_status
= await asyncio
.wait_for(
5222 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5223 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5224 kdu_instance
=kdu
.get("kdu-instance"),
5225 atomic
=atomic_upgrade
,
5226 kdu_model
=kdu_model
,
5229 timeout
=timeout_ns_action
,
5231 timeout
=timeout_ns_action
+ 10,
5234 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5236 elif primitive_name
== "rollback":
5237 detailed_status
= await asyncio
.wait_for(
5238 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5239 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5240 kdu_instance
=kdu
.get("kdu-instance"),
5243 timeout
=timeout_ns_action
,
5245 elif primitive_name
== "status":
5246 detailed_status
= await asyncio
.wait_for(
5247 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5248 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5249 kdu_instance
=kdu
.get("kdu-instance"),
5252 timeout
=timeout_ns_action
,
5255 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5256 kdu
["kdu-name"], nsr_id
5258 params
= self
._map
_primitive
_params
(
5259 config_primitive_desc
, primitive_params
, desc_params
5262 detailed_status
= await asyncio
.wait_for(
5263 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5264 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5265 kdu_instance
=kdu_instance
,
5266 primitive_name
=primitive_name
,
5269 timeout
=timeout_ns_action
,
5272 timeout
=timeout_ns_action
,
5276 nslcmop_operation_state
= "COMPLETED"
5278 detailed_status
= ""
5279 nslcmop_operation_state
= "FAILED"
5281 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5282 nsr_deployed
["VCA"],
5283 member_vnf_index
=vnf_index
,
5285 vdu_count_index
=vdu_count_index
,
5286 ee_descriptor_id
=ee_descriptor_id
,
5288 for vca_index
, vca_deployed
in enumerate(
5289 db_nsr
["_admin"]["deployed"]["VCA"]
5291 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5293 "collection": "nsrs",
5294 "filter": {"_id": nsr_id
},
5295 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5299 nslcmop_operation_state
,
5301 ) = await self
._ns
_execute
_primitive
(
5303 primitive
=primitive_name
,
5304 primitive_params
=self
._map
_primitive
_params
(
5305 config_primitive_desc
, primitive_params
, desc_params
5307 timeout
=timeout_ns_action
,
5313 db_nslcmop_update
["detailed-status"] = detailed_status
5314 error_description_nslcmop
= (
5315 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5319 + "Done with result {} {}".format(
5320 nslcmop_operation_state
, detailed_status
5323 return # database update is called inside finally
5325 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5326 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5328 except asyncio
.CancelledError
:
5330 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5332 exc
= "Operation was cancelled"
5333 except asyncio
.TimeoutError
:
5334 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5336 except Exception as e
:
5337 exc
= traceback
.format_exc()
5338 self
.logger
.critical(
5339 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5348 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5349 nslcmop_operation_state
= "FAILED"
5351 self
._write
_ns
_status
(
5355 ], # TODO check if degraded. For the moment use previous status
5356 current_operation
="IDLE",
5357 current_operation_id
=None,
5358 # error_description=error_description_nsr,
5359 # error_detail=error_detail,
5360 other_update
=db_nsr_update
,
5363 self
._write
_op
_status
(
5366 error_message
=error_description_nslcmop
,
5367 operation_state
=nslcmop_operation_state
,
5368 other_update
=db_nslcmop_update
,
5371 if nslcmop_operation_state
:
5373 await self
.msg
.aiowrite(
5378 "nslcmop_id": nslcmop_id
,
5379 "operationState": nslcmop_operation_state
,
5382 except Exception as e
:
5384 logging_text
+ "kafka_write notification Exception {}".format(e
)
5386 self
.logger
.debug(logging_text
+ "Exit")
5387 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5388 return nslcmop_operation_state
, detailed_status
5390 async def terminate_vdus(
5391 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5393 """This method terminates VDUs
5396 db_vnfr: VNF instance record
5397 member_vnf_index: VNF index to identify the VDUs to be removed
5398 db_nsr: NS instance record
5399 update_db_nslcmops: Nslcmop update record
5401 vca_scaling_info
= []
5402 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5403 scaling_info
["scaling_direction"] = "IN"
5404 scaling_info
["vdu-delete"] = {}
5405 scaling_info
["kdu-delete"] = {}
5406 db_vdur
= db_vnfr
.get("vdur")
5407 vdur_list
= copy(db_vdur
)
5409 for index
, vdu
in enumerate(vdur_list
):
5410 vca_scaling_info
.append(
5412 "osm_vdu_id": vdu
["vdu-id-ref"],
5413 "member-vnf-index": member_vnf_index
,
5415 "vdu_index": count_index
,
5418 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5419 scaling_info
["vdu"].append(
5421 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5422 "vdu_id": vdu
["vdu-id-ref"],
5426 for interface
in vdu
["interfaces"]:
5427 scaling_info
["vdu"][index
]["interface"].append(
5429 "name": interface
["name"],
5430 "ip_address": interface
["ip-address"],
5431 "mac_address": interface
.get("mac-address"),
5434 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5435 stage
[2] = "Terminating VDUs"
5436 if scaling_info
.get("vdu-delete"):
5437 # scale_process = "RO"
5438 if self
.ro_config
.ng
:
5439 await self
._scale
_ng
_ro
(
5448 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5449 """This method is to Remove VNF instances from NS.
5452 nsr_id: NS instance id
5453 nslcmop_id: nslcmop id of update
5454 vnf_instance_id: id of the VNF instance to be removed
5457 result: (str, str) COMPLETED/FAILED, details
5461 logging_text
= "Task ns={} update ".format(nsr_id
)
5462 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5463 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5464 if check_vnfr_count
> 1:
5465 stage
= ["", "", ""]
5466 step
= "Getting nslcmop from database"
5468 step
+ " after having waited for previous tasks to be completed"
5470 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5471 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5472 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5473 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5474 """ db_vnfr = self.db.get_one(
5475 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5477 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5478 await self
.terminate_vdus(
5487 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5488 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5489 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5490 "constituent-vnfr-ref"
5492 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5493 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5494 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5495 return "COMPLETED", "Done"
5497 step
= "Terminate VNF Failed with"
5499 "{} Cannot terminate the last VNF in this NS.".format(
5503 except (LcmException
, asyncio
.CancelledError
):
5505 except Exception as e
:
5506 self
.logger
.debug("Error removing VNF {}".format(e
))
5507 return "FAILED", "Error removing VNF {}".format(e
)
5509 async def _ns_redeploy_vnf(
5517 """This method updates and redeploys VNF instances
5520 nsr_id: NS instance id
5521 nslcmop_id: nslcmop id
5522 db_vnfd: VNF descriptor
5523 db_vnfr: VNF instance record
5524 db_nsr: NS instance record
5527 result: (str, str) COMPLETED/FAILED, details
5531 stage
= ["", "", ""]
5532 logging_text
= "Task ns={} update ".format(nsr_id
)
5533 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5534 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5536 # Terminate old VNF resources
5537 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5538 await self
.terminate_vdus(
5547 # old_vnfd_id = db_vnfr["vnfd-id"]
5548 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5549 new_db_vnfd
= db_vnfd
5550 # new_vnfd_ref = new_db_vnfd["id"]
5551 # new_vnfd_id = vnfd_id
5555 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5557 "name": cp
.get("id"),
5558 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5559 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5562 new_vnfr_cp
.append(vnf_cp
)
5563 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5564 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5565 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5567 "revision": latest_vnfd_revision
,
5568 "connection-point": new_vnfr_cp
,
5572 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5573 updated_db_vnfr
= self
.db
.get_one(
5575 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5578 # Instantiate new VNF resources
5579 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5580 vca_scaling_info
= []
5581 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5582 scaling_info
["scaling_direction"] = "OUT"
5583 scaling_info
["vdu-create"] = {}
5584 scaling_info
["kdu-create"] = {}
5585 vdud_instantiate_list
= db_vnfd
["vdu"]
5586 for index
, vdud
in enumerate(vdud_instantiate_list
):
5587 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5589 additional_params
= (
5590 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5593 cloud_init_list
= []
5595 # TODO Information of its own ip is not available because db_vnfr is not updated.
5596 additional_params
["OSM"] = get_osm_params(
5597 updated_db_vnfr
, vdud
["id"], 1
5599 cloud_init_list
.append(
5600 self
._parse
_cloud
_init
(
5607 vca_scaling_info
.append(
5609 "osm_vdu_id": vdud
["id"],
5610 "member-vnf-index": member_vnf_index
,
5612 "vdu_index": count_index
,
5615 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5616 if self
.ro_config
.ng
:
5618 "New Resources to be deployed: {}".format(scaling_info
)
5620 await self
._scale
_ng
_ro
(
5628 return "COMPLETED", "Done"
5629 except (LcmException
, asyncio
.CancelledError
):
5631 except Exception as e
:
5632 self
.logger
.debug("Error updating VNF {}".format(e
))
5633 return "FAILED", "Error updating VNF {}".format(e
)
5635 async def _ns_charm_upgrade(
5641 timeout
: float = None,
5643 """This method upgrade charms in VNF instances
5646 ee_id: Execution environment id
5647 path: Local path to the charm
5649 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5650 timeout: (Float) Timeout for the ns update operation
5653 result: (str, str) COMPLETED/FAILED, details
5656 charm_type
= charm_type
or "lxc_proxy_charm"
5657 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5661 charm_type
=charm_type
,
5662 timeout
=timeout
or self
.timeout
.ns_update
,
5666 return "COMPLETED", output
5668 except (LcmException
, asyncio
.CancelledError
):
5671 except Exception as e
:
5672 self
.logger
.debug("Error upgrading charm {}".format(path
))
5674 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5676 async def update(self
, nsr_id
, nslcmop_id
):
5677 """Update NS according to different update types
5679 This method performs upgrade of VNF instances then updates the revision
5680 number in VNF record
5683 nsr_id: Network service will be updated
5684 nslcmop_id: ns lcm operation id
5687 It may raise DbException, LcmException, N2VCException, K8sException
5690 # Try to lock HA task here
5691 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5692 if not task_is_locked_by_me
:
5695 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5696 self
.logger
.debug(logging_text
+ "Enter")
5698 # Set the required variables to be filled up later
5700 db_nslcmop_update
= {}
5702 nslcmop_operation_state
= None
5704 error_description_nslcmop
= ""
5706 change_type
= "updated"
5707 detailed_status
= ""
5708 member_vnf_index
= None
5711 # wait for any previous tasks in process
5712 step
= "Waiting for previous operations to terminate"
5713 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5714 self
._write
_ns
_status
(
5717 current_operation
="UPDATING",
5718 current_operation_id
=nslcmop_id
,
5721 step
= "Getting nslcmop from database"
5722 db_nslcmop
= self
.db
.get_one(
5723 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5725 update_type
= db_nslcmop
["operationParams"]["updateType"]
5727 step
= "Getting nsr from database"
5728 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5729 old_operational_status
= db_nsr
["operational-status"]
5730 db_nsr_update
["operational-status"] = "updating"
5731 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5732 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5734 if update_type
== "CHANGE_VNFPKG":
5735 # Get the input parameters given through update request
5736 vnf_instance_id
= db_nslcmop
["operationParams"][
5737 "changeVnfPackageData"
5738 ].get("vnfInstanceId")
5740 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5743 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5745 step
= "Getting vnfr from database"
5746 db_vnfr
= self
.db
.get_one(
5747 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5750 step
= "Getting vnfds from database"
5752 latest_vnfd
= self
.db
.get_one(
5753 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5755 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5758 current_vnf_revision
= db_vnfr
.get("revision", 1)
5759 current_vnfd
= self
.db
.get_one(
5761 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5762 fail_on_empty
=False,
5764 # Charm artifact paths will be filled up later
5766 current_charm_artifact_path
,
5767 target_charm_artifact_path
,
5768 charm_artifact_paths
,
5770 ) = ([], [], [], [])
5772 step
= "Checking if revision has changed in VNFD"
5773 if current_vnf_revision
!= latest_vnfd_revision
:
5774 change_type
= "policy_updated"
5776 # There is new revision of VNFD, update operation is required
5777 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5778 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5780 step
= "Removing the VNFD packages if they exist in the local path"
5781 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5782 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5784 step
= "Get the VNFD packages from FSMongo"
5785 self
.fs
.sync(from_path
=latest_vnfd_path
)
5786 self
.fs
.sync(from_path
=current_vnfd_path
)
5789 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5791 current_base_folder
= current_vnfd
["_admin"]["storage"]
5792 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5794 for vca_index
, vca_deployed
in enumerate(
5795 get_iterable(nsr_deployed
, "VCA")
5797 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5799 # Getting charm-id and charm-type
5800 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5801 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5802 vca_type
= vca_deployed
.get("type")
5803 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5806 ee_id
= vca_deployed
.get("ee_id")
5808 step
= "Getting descriptor config"
5809 if current_vnfd
.get("kdu"):
5810 search_key
= "kdu_name"
5812 search_key
= "vnfd_id"
5814 entity_id
= vca_deployed
.get(search_key
)
5816 descriptor_config
= get_configuration(
5817 current_vnfd
, entity_id
5820 if "execution-environment-list" in descriptor_config
:
5821 ee_list
= descriptor_config
.get(
5822 "execution-environment-list", []
5827 # There could be several charm used in the same VNF
5828 for ee_item
in ee_list
:
5829 if ee_item
.get("juju"):
5830 step
= "Getting charm name"
5831 charm_name
= ee_item
["juju"].get("charm")
5833 step
= "Setting Charm artifact paths"
5834 current_charm_artifact_path
.append(
5835 get_charm_artifact_path(
5836 current_base_folder
,
5839 current_vnf_revision
,
5842 target_charm_artifact_path
.append(
5843 get_charm_artifact_path(
5847 latest_vnfd_revision
,
5850 elif ee_item
.get("helm-chart"):
5851 # add chart to list and all parameters
5852 step
= "Getting helm chart name"
5853 chart_name
= ee_item
.get("helm-chart")
5854 vca_type
= "helm-v3"
5855 step
= "Setting Helm chart artifact paths"
5857 helm_artifacts
.append(
5859 "current_artifact_path": get_charm_artifact_path(
5860 current_base_folder
,
5863 current_vnf_revision
,
5865 "target_artifact_path": get_charm_artifact_path(
5869 latest_vnfd_revision
,
5872 "vca_index": vca_index
,
5873 "vdu_index": vdu_count_index
,
5877 charm_artifact_paths
= zip(
5878 current_charm_artifact_path
, target_charm_artifact_path
5881 step
= "Checking if software version has changed in VNFD"
5882 if find_software_version(current_vnfd
) != find_software_version(
5885 step
= "Checking if existing VNF has charm"
5886 for current_charm_path
, target_charm_path
in list(
5887 charm_artifact_paths
5889 if current_charm_path
:
5891 "Software version change is not supported as VNF instance {} has charm.".format(
5896 # There is no change in the charm package, then redeploy the VNF
5897 # based on new descriptor
5898 step
= "Redeploying VNF"
5899 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5900 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5901 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5903 if result
== "FAILED":
5904 nslcmop_operation_state
= result
5905 error_description_nslcmop
= detailed_status
5906 old_operational_status
= "failed"
5907 db_nslcmop_update
["detailed-status"] = detailed_status
5908 db_nsr_update
["detailed-status"] = detailed_status
5909 scaling_aspect
= get_scaling_aspect(latest_vnfd
)
5910 scaling_group_desc
= db_nsr
.get("_admin").get(
5911 "scaling-group", None
5913 if scaling_group_desc
:
5914 for aspect
in scaling_aspect
:
5915 scaling_group_id
= aspect
.get("id")
5916 for scale_index
, scaling_group
in enumerate(
5919 if scaling_group
.get("name") == scaling_group_id
:
5921 "_admin.scaling-group.{}.nb-scale-op".format(
5927 + " step {} Done with result {} {}".format(
5928 step
, nslcmop_operation_state
, detailed_status
5933 step
= "Checking if any charm package has changed or not"
5934 for current_charm_path
, target_charm_path
in list(
5935 charm_artifact_paths
5939 and target_charm_path
5940 and self
.check_charm_hash_changed(
5941 current_charm_path
, target_charm_path
5944 step
= "Checking whether VNF uses juju bundle"
5945 if check_juju_bundle_existence(current_vnfd
):
5947 "Charm upgrade is not supported for the instance which"
5948 " uses juju-bundle: {}".format(
5949 check_juju_bundle_existence(current_vnfd
)
5953 step
= "Upgrading Charm"
5957 ) = await self
._ns
_charm
_upgrade
(
5960 charm_type
=vca_type
,
5961 path
=self
.fs
.path
+ target_charm_path
,
5962 timeout
=timeout_seconds
,
5965 if result
== "FAILED":
5966 nslcmop_operation_state
= result
5967 error_description_nslcmop
= detailed_status
5969 db_nslcmop_update
["detailed-status"] = detailed_status
5972 + " step {} Done with result {} {}".format(
5973 step
, nslcmop_operation_state
, detailed_status
5977 step
= "Updating policies"
5978 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5979 result
= "COMPLETED"
5980 detailed_status
= "Done"
5981 db_nslcmop_update
["detailed-status"] = "Done"
5984 for item
in helm_artifacts
:
5986 item
["current_artifact_path"]
5987 and item
["target_artifact_path"]
5988 and self
.check_charm_hash_changed(
5989 item
["current_artifact_path"],
5990 item
["target_artifact_path"],
5994 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5997 vnfr_id
= db_vnfr
["_id"]
5998 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6000 "collection": "nsrs",
6001 "filter": {"_id": nsr_id
},
6002 "path": db_update_entry
,
6004 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6005 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6006 namespace
=namespace
,
6010 artifact_path
=item
["target_artifact_path"],
6013 vnf_id
= db_vnfr
.get("vnfd-ref")
6014 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6015 self
.logger
.debug("get ssh key block")
6019 ("config-access", "ssh-access", "required"),
6021 # Needed to inject a ssh key
6024 ("config-access", "ssh-access", "default-user"),
6027 "Install configuration Software, getting public ssh key"
6029 pub_key
= await self
.vca_map
[
6031 ].get_ee_ssh_public__key(
6032 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6036 "Insert public key into VM user={} ssh_key={}".format(
6040 self
.logger
.debug(logging_text
+ step
)
6042 # wait for RO (ip-address) Insert pub_key into VM
6043 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6053 initial_config_primitive_list
= config_descriptor
.get(
6054 "initial-config-primitive"
6056 config_primitive
= next(
6059 for p
in initial_config_primitive_list
6060 if p
["name"] == "config"
6064 if not config_primitive
:
6067 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6069 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6070 if db_vnfr
.get("additionalParamsForVnf"):
6071 deploy_params
.update(
6073 db_vnfr
["additionalParamsForVnf"].copy()
6076 primitive_params_
= self
._map
_primitive
_params
(
6077 config_primitive
, {}, deploy_params
6080 step
= "execute primitive '{}' params '{}'".format(
6081 config_primitive
["name"], primitive_params_
6083 self
.logger
.debug(logging_text
+ step
)
6084 await self
.vca_map
[vca_type
].exec_primitive(
6086 primitive_name
=config_primitive
["name"],
6087 params_dict
=primitive_params_
,
6093 step
= "Updating policies"
6094 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6095 detailed_status
= "Done"
6096 db_nslcmop_update
["detailed-status"] = "Done"
6098 # If nslcmop_operation_state is None, so any operation is not failed.
6099 if not nslcmop_operation_state
:
6100 nslcmop_operation_state
= "COMPLETED"
6102 # If update CHANGE_VNFPKG nslcmop_operation is successful
6103 # vnf revision need to be updated
6104 vnfr_update
["revision"] = latest_vnfd_revision
6105 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6109 + " task Done with result {} {}".format(
6110 nslcmop_operation_state
, detailed_status
6113 elif update_type
== "REMOVE_VNF":
6114 # This part is included in https://osm.etsi.org/gerrit/11876
6115 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6116 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6117 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6118 step
= "Removing VNF"
6119 (result
, detailed_status
) = await self
.remove_vnf(
6120 nsr_id
, nslcmop_id
, vnf_instance_id
6122 if result
== "FAILED":
6123 nslcmop_operation_state
= result
6124 error_description_nslcmop
= detailed_status
6125 db_nslcmop_update
["detailed-status"] = detailed_status
6126 change_type
= "vnf_terminated"
6127 if not nslcmop_operation_state
:
6128 nslcmop_operation_state
= "COMPLETED"
6131 + " task Done with result {} {}".format(
6132 nslcmop_operation_state
, detailed_status
6136 elif update_type
== "OPERATE_VNF":
6137 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6140 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6143 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6146 (result
, detailed_status
) = await self
.rebuild_start_stop(
6147 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6149 if result
== "FAILED":
6150 nslcmop_operation_state
= result
6151 error_description_nslcmop
= detailed_status
6152 db_nslcmop_update
["detailed-status"] = detailed_status
6153 if not nslcmop_operation_state
:
6154 nslcmop_operation_state
= "COMPLETED"
6157 + " task Done with result {} {}".format(
6158 nslcmop_operation_state
, detailed_status
6162 # If nslcmop_operation_state is None, so any operation is not failed.
6163 # All operations are executed in overall.
6164 if not nslcmop_operation_state
:
6165 nslcmop_operation_state
= "COMPLETED"
6166 db_nsr_update
["operational-status"] = old_operational_status
6168 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6169 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6171 except asyncio
.CancelledError
:
6173 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6175 exc
= "Operation was cancelled"
6176 except asyncio
.TimeoutError
:
6177 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6179 except Exception as e
:
6180 exc
= traceback
.format_exc()
6181 self
.logger
.critical(
6182 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6191 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6192 nslcmop_operation_state
= "FAILED"
6193 db_nsr_update
["operational-status"] = old_operational_status
6195 self
._write
_ns
_status
(
6197 ns_state
=db_nsr
["nsState"],
6198 current_operation
="IDLE",
6199 current_operation_id
=None,
6200 other_update
=db_nsr_update
,
6203 self
._write
_op
_status
(
6206 error_message
=error_description_nslcmop
,
6207 operation_state
=nslcmop_operation_state
,
6208 other_update
=db_nslcmop_update
,
6211 if nslcmop_operation_state
:
6215 "nslcmop_id": nslcmop_id
,
6216 "operationState": nslcmop_operation_state
,
6219 change_type
in ("vnf_terminated", "policy_updated")
6220 and member_vnf_index
6222 msg
.update({"vnf_member_index": member_vnf_index
})
6223 await self
.msg
.aiowrite("ns", change_type
, msg
)
6224 except Exception as e
:
6226 logging_text
+ "kafka_write notification Exception {}".format(e
)
6228 self
.logger
.debug(logging_text
+ "Exit")
6229 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6230 return nslcmop_operation_state
, detailed_status
6232 async def scale(self
, nsr_id
, nslcmop_id
):
6233 # Try to lock HA task here
6234 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6235 if not task_is_locked_by_me
:
6238 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6239 stage
= ["", "", ""]
6240 tasks_dict_info
= {}
6241 # ^ stage, step, VIM progress
6242 self
.logger
.debug(logging_text
+ "Enter")
6243 # get all needed from database
6245 db_nslcmop_update
= {}
6248 # in case of error, indicates what part of scale was failed to put nsr at error status
6249 scale_process
= None
6250 old_operational_status
= ""
6251 old_config_status
= ""
6255 # wait for any previous tasks in process
6256 step
= "Waiting for previous operations to terminate"
6257 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6258 self
._write
_ns
_status
(
6261 current_operation
="SCALING",
6262 current_operation_id
=nslcmop_id
,
6265 step
= "Getting nslcmop from database"
6267 step
+ " after having waited for previous tasks to be completed"
6269 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6271 step
= "Getting nsr from database"
6272 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6273 old_operational_status
= db_nsr
["operational-status"]
6274 old_config_status
= db_nsr
["config-status"]
6276 step
= "Parsing scaling parameters"
6277 db_nsr_update
["operational-status"] = "scaling"
6278 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6279 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6281 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6283 ]["member-vnf-index"]
6284 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6286 ]["scaling-group-descriptor"]
6287 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6288 # for backward compatibility
6289 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6290 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6291 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6292 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6294 step
= "Getting vnfr from database"
6295 db_vnfr
= self
.db
.get_one(
6296 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6299 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6301 step
= "Getting vnfd from database"
6302 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6304 base_folder
= db_vnfd
["_admin"]["storage"]
6306 step
= "Getting scaling-group-descriptor"
6307 scaling_descriptor
= find_in_list(
6308 get_scaling_aspect(db_vnfd
),
6309 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6311 if not scaling_descriptor
:
6313 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6314 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6317 step
= "Sending scale order to VIM"
6318 # TODO check if ns is in a proper status
6320 if not db_nsr
["_admin"].get("scaling-group"):
6325 "_admin.scaling-group": [
6326 {"name": scaling_group
, "nb-scale-op": 0}
6330 admin_scale_index
= 0
6332 for admin_scale_index
, admin_scale_info
in enumerate(
6333 db_nsr
["_admin"]["scaling-group"]
6335 if admin_scale_info
["name"] == scaling_group
:
6336 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6338 else: # not found, set index one plus last element and add new entry with the name
6339 admin_scale_index
+= 1
6341 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6344 vca_scaling_info
= []
6345 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6346 if scaling_type
== "SCALE_OUT":
6347 if "aspect-delta-details" not in scaling_descriptor
:
6349 "Aspect delta details not fount in scaling descriptor {}".format(
6350 scaling_descriptor
["name"]
6353 # count if max-instance-count is reached
6354 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6356 scaling_info
["scaling_direction"] = "OUT"
6357 scaling_info
["vdu-create"] = {}
6358 scaling_info
["kdu-create"] = {}
6359 for delta
in deltas
:
6360 for vdu_delta
in delta
.get("vdu-delta", {}):
6361 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6362 # vdu_index also provides the number of instance of the targeted vdu
6363 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6364 if vdu_index
<= len(db_vnfr
["vdur"]):
6365 vdu_name_id
= db_vnfr
["vdur"][vdu_index
- 1]["vdu-name"]
6367 db_vnfr
["_id"] + vdu_name_id
+ str(vdu_index
- 1)
6369 prom_job_name
= prom_job_name
.replace("_", "")
6370 prom_job_name
= prom_job_name
.replace("-", "")
6372 prom_job_name
= None
6373 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6377 additional_params
= (
6378 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6381 cloud_init_list
= []
6383 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6384 max_instance_count
= 10
6385 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6386 max_instance_count
= vdu_profile
.get(
6387 "max-number-of-instances", 10
6390 default_instance_num
= get_number_of_instances(
6393 instances_number
= vdu_delta
.get("number-of-instances", 1)
6394 nb_scale_op
+= instances_number
6396 new_instance_count
= nb_scale_op
+ default_instance_num
6397 # Control if new count is over max and vdu count is less than max.
6398 # Then assign new instance count
6399 if new_instance_count
> max_instance_count
> vdu_count
:
6400 instances_number
= new_instance_count
- max_instance_count
6402 instances_number
= instances_number
6404 if new_instance_count
> max_instance_count
:
6406 "reached the limit of {} (max-instance-count) "
6407 "scaling-out operations for the "
6408 "scaling-group-descriptor '{}'".format(
6409 nb_scale_op
, scaling_group
6412 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6414 # TODO Information of its own ip is not available because db_vnfr is not updated.
6415 additional_params
["OSM"] = get_osm_params(
6416 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6418 cloud_init_list
.append(
6419 self
._parse
_cloud
_init
(
6426 vca_scaling_info
.append(
6428 "osm_vdu_id": vdu_delta
["id"],
6429 "member-vnf-index": vnf_index
,
6431 "vdu_index": vdu_index
+ x
,
6434 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6435 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6436 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6437 kdu_name
= kdu_profile
["kdu-name"]
6438 resource_name
= kdu_profile
.get("resource-name", "")
6440 # Might have different kdus in the same delta
6441 # Should have list for each kdu
6442 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6443 scaling_info
["kdu-create"][kdu_name
] = []
6445 kdur
= get_kdur(db_vnfr
, kdu_name
)
6446 if kdur
.get("helm-chart"):
6447 k8s_cluster_type
= "helm-chart-v3"
6448 self
.logger
.debug("kdur: {}".format(kdur
))
6449 elif kdur
.get("juju-bundle"):
6450 k8s_cluster_type
= "juju-bundle"
6453 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6454 "juju-bundle. Maybe an old NBI version is running".format(
6455 db_vnfr
["member-vnf-index-ref"], kdu_name
6459 max_instance_count
= 10
6460 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6461 max_instance_count
= kdu_profile
.get(
6462 "max-number-of-instances", 10
6465 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6466 deployed_kdu
, _
= get_deployed_kdu(
6467 nsr_deployed
, kdu_name
, vnf_index
6469 if deployed_kdu
is None:
6471 "KDU '{}' for vnf '{}' not deployed".format(
6475 kdu_instance
= deployed_kdu
.get("kdu-instance")
6476 instance_num
= await self
.k8scluster_map
[
6482 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6483 kdu_model
=deployed_kdu
.get("kdu-model"),
6485 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6486 "number-of-instances", 1
6489 # Control if new count is over max and instance_num is less than max.
6490 # Then assign max instance number to kdu replica count
6491 if kdu_replica_count
> max_instance_count
> instance_num
:
6492 kdu_replica_count
= max_instance_count
6493 if kdu_replica_count
> max_instance_count
:
6495 "reached the limit of {} (max-instance-count) "
6496 "scaling-out operations for the "
6497 "scaling-group-descriptor '{}'".format(
6498 instance_num
, scaling_group
6502 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6503 vca_scaling_info
.append(
6505 "osm_kdu_id": kdu_name
,
6506 "member-vnf-index": vnf_index
,
6508 "kdu_index": instance_num
+ x
- 1,
6511 scaling_info
["kdu-create"][kdu_name
].append(
6513 "member-vnf-index": vnf_index
,
6515 "k8s-cluster-type": k8s_cluster_type
,
6516 "resource-name": resource_name
,
6517 "scale": kdu_replica_count
,
6520 elif scaling_type
== "SCALE_IN":
6521 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6523 scaling_info
["scaling_direction"] = "IN"
6524 scaling_info
["vdu-delete"] = {}
6525 scaling_info
["kdu-delete"] = {}
6527 for delta
in deltas
:
6528 for vdu_delta
in delta
.get("vdu-delta", {}):
6529 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6530 min_instance_count
= 0
6531 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6532 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6533 min_instance_count
= vdu_profile
["min-number-of-instances"]
6535 default_instance_num
= get_number_of_instances(
6536 db_vnfd
, vdu_delta
["id"]
6538 instance_num
= vdu_delta
.get("number-of-instances", 1)
6539 nb_scale_op
-= instance_num
6541 new_instance_count
= nb_scale_op
+ default_instance_num
6543 if new_instance_count
< min_instance_count
< vdu_count
:
6544 instances_number
= min_instance_count
- new_instance_count
6546 instances_number
= instance_num
6548 if new_instance_count
< min_instance_count
:
6550 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6551 "scaling-group-descriptor '{}'".format(
6552 nb_scale_op
, scaling_group
6555 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6556 vca_scaling_info
.append(
6558 "osm_vdu_id": vdu_delta
["id"],
6559 "member-vnf-index": vnf_index
,
6561 "vdu_index": vdu_index
- 1 - x
,
6564 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6565 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6566 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6567 kdu_name
= kdu_profile
["kdu-name"]
6568 resource_name
= kdu_profile
.get("resource-name", "")
6570 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6571 scaling_info
["kdu-delete"][kdu_name
] = []
6573 kdur
= get_kdur(db_vnfr
, kdu_name
)
6574 if kdur
.get("helm-chart"):
6575 k8s_cluster_type
= "helm-chart-v3"
6576 self
.logger
.debug("kdur: {}".format(kdur
))
6577 elif kdur
.get("juju-bundle"):
6578 k8s_cluster_type
= "juju-bundle"
6581 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6582 "juju-bundle. Maybe an old NBI version is running".format(
6583 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6587 min_instance_count
= 0
6588 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6589 min_instance_count
= kdu_profile
["min-number-of-instances"]
6591 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6592 deployed_kdu
, _
= get_deployed_kdu(
6593 nsr_deployed
, kdu_name
, vnf_index
6595 if deployed_kdu
is None:
6597 "KDU '{}' for vnf '{}' not deployed".format(
6601 kdu_instance
= deployed_kdu
.get("kdu-instance")
6602 instance_num
= await self
.k8scluster_map
[
6608 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6609 kdu_model
=deployed_kdu
.get("kdu-model"),
6611 kdu_replica_count
= instance_num
- kdu_delta
.get(
6612 "number-of-instances", 1
6615 if kdu_replica_count
< min_instance_count
< instance_num
:
6616 kdu_replica_count
= min_instance_count
6617 if kdu_replica_count
< min_instance_count
:
6619 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6620 "scaling-group-descriptor '{}'".format(
6621 instance_num
, scaling_group
6625 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6626 vca_scaling_info
.append(
6628 "osm_kdu_id": kdu_name
,
6629 "member-vnf-index": vnf_index
,
6631 "kdu_index": instance_num
- x
- 1,
6634 scaling_info
["kdu-delete"][kdu_name
].append(
6636 "member-vnf-index": vnf_index
,
6638 "k8s-cluster-type": k8s_cluster_type
,
6639 "resource-name": resource_name
,
6640 "scale": kdu_replica_count
,
6644 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6645 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6646 if scaling_info
["scaling_direction"] == "IN":
6647 for vdur
in reversed(db_vnfr
["vdur"]):
6648 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6649 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6650 scaling_info
["vdu"].append(
6652 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6653 "vdu_id": vdur
["vdu-id-ref"],
6657 for interface
in vdur
["interfaces"]:
6658 scaling_info
["vdu"][-1]["interface"].append(
6660 "name": interface
["name"],
6661 "ip_address": interface
["ip-address"],
6662 "mac_address": interface
.get("mac-address"),
6665 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6668 step
= "Executing pre-scale vnf-config-primitive"
6669 if scaling_descriptor
.get("scaling-config-action"):
6670 for scaling_config_action
in scaling_descriptor
[
6671 "scaling-config-action"
6674 scaling_config_action
.get("trigger") == "pre-scale-in"
6675 and scaling_type
== "SCALE_IN"
6677 scaling_config_action
.get("trigger") == "pre-scale-out"
6678 and scaling_type
== "SCALE_OUT"
6680 vnf_config_primitive
= scaling_config_action
[
6681 "vnf-config-primitive-name-ref"
6683 step
= db_nslcmop_update
[
6685 ] = "executing pre-scale scaling-config-action '{}'".format(
6686 vnf_config_primitive
6689 # look for primitive
6690 for config_primitive
in (
6691 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6692 ).get("config-primitive", ()):
6693 if config_primitive
["name"] == vnf_config_primitive
:
6697 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6698 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6699 "primitive".format(scaling_group
, vnf_config_primitive
)
6702 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6703 if db_vnfr
.get("additionalParamsForVnf"):
6704 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6706 scale_process
= "VCA"
6707 db_nsr_update
["config-status"] = "configuring pre-scaling"
6708 primitive_params
= self
._map
_primitive
_params
(
6709 config_primitive
, {}, vnfr_params
6712 # Pre-scale retry check: Check if this sub-operation has been executed before
6713 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6716 vnf_config_primitive
,
6720 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6721 # Skip sub-operation
6722 result
= "COMPLETED"
6723 result_detail
= "Done"
6726 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6727 vnf_config_primitive
, result
, result_detail
6731 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6732 # New sub-operation: Get index of this sub-operation
6734 len(db_nslcmop
.get("_admin", {}).get("operations"))
6739 + "vnf_config_primitive={} New sub-operation".format(
6740 vnf_config_primitive
6744 # retry: Get registered params for this existing sub-operation
6745 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6748 vnf_index
= op
.get("member_vnf_index")
6749 vnf_config_primitive
= op
.get("primitive")
6750 primitive_params
= op
.get("primitive_params")
6753 + "vnf_config_primitive={} Sub-operation retry".format(
6754 vnf_config_primitive
6757 # Execute the primitive, either with new (first-time) or registered (reintent) args
6758 ee_descriptor_id
= config_primitive
.get(
6759 "execution-environment-ref"
6761 primitive_name
= config_primitive
.get(
6762 "execution-environment-primitive", vnf_config_primitive
6764 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6765 nsr_deployed
["VCA"],
6766 member_vnf_index
=vnf_index
,
6768 vdu_count_index
=None,
6769 ee_descriptor_id
=ee_descriptor_id
,
6771 result
, result_detail
= await self
._ns
_execute
_primitive
(
6780 + "vnf_config_primitive={} Done with result {} {}".format(
6781 vnf_config_primitive
, result
, result_detail
6784 # Update operationState = COMPLETED | FAILED
6785 self
._update
_suboperation
_status
(
6786 db_nslcmop
, op_index
, result
, result_detail
6789 if result
== "FAILED":
6790 raise LcmException(result_detail
)
6791 db_nsr_update
["config-status"] = old_config_status
6792 scale_process
= None
6796 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6799 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6802 # SCALE-IN VCA - BEGIN
6803 if vca_scaling_info
:
6804 step
= db_nslcmop_update
[
6806 ] = "Deleting the execution environments"
6807 scale_process
= "VCA"
6808 for vca_info
in vca_scaling_info
:
6809 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6810 member_vnf_index
= str(vca_info
["member-vnf-index"])
6812 logging_text
+ "vdu info: {}".format(vca_info
)
6814 if vca_info
.get("osm_vdu_id"):
6815 vdu_id
= vca_info
["osm_vdu_id"]
6816 vdu_index
= int(vca_info
["vdu_index"])
6819 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6820 member_vnf_index
, vdu_id
, vdu_index
6822 stage
[2] = step
= "Scaling in VCA"
6823 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6824 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6825 config_update
= db_nsr
["configurationStatus"]
6826 for vca_index
, vca
in enumerate(vca_update
):
6828 (vca
or vca
.get("ee_id"))
6829 and vca
["member-vnf-index"] == member_vnf_index
6830 and vca
["vdu_count_index"] == vdu_index
6832 if vca
.get("vdu_id"):
6833 config_descriptor
= get_configuration(
6834 db_vnfd
, vca
.get("vdu_id")
6836 elif vca
.get("kdu_name"):
6837 config_descriptor
= get_configuration(
6838 db_vnfd
, vca
.get("kdu_name")
6841 config_descriptor
= get_configuration(
6842 db_vnfd
, db_vnfd
["id"]
6844 operation_params
= (
6845 db_nslcmop
.get("operationParams") or {}
6847 exec_terminate_primitives
= not operation_params
.get(
6848 "skip_terminate_primitives"
6849 ) and vca
.get("needed_terminate")
6850 task
= asyncio
.ensure_future(
6859 exec_primitives
=exec_terminate_primitives
,
6863 timeout
=self
.timeout
.charm_delete
,
6866 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6869 del vca_update
[vca_index
]
6870 del config_update
[vca_index
]
6871 # wait for pending tasks of terminate primitives
6875 + "Waiting for tasks {}".format(
6876 list(tasks_dict_info
.keys())
6879 error_list
= await self
._wait
_for
_tasks
(
6883 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6888 tasks_dict_info
.clear()
6890 raise LcmException("; ".join(error_list
))
6892 db_vca_and_config_update
= {
6893 "_admin.deployed.VCA": vca_update
,
6894 "configurationStatus": config_update
,
6897 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6899 scale_process
= None
6900 # SCALE-IN VCA - END
6903 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6904 scale_process
= "RO"
6905 if self
.ro_config
.ng
:
6906 await self
._scale
_ng
_ro
(
6907 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6909 scaling_info
.pop("vdu-create", None)
6910 scaling_info
.pop("vdu-delete", None)
6912 scale_process
= None
6916 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6917 scale_process
= "KDU"
6918 await self
._scale
_kdu
(
6919 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6921 scaling_info
.pop("kdu-create", None)
6922 scaling_info
.pop("kdu-delete", None)
6924 scale_process
= None
6928 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6930 # SCALE-UP VCA - BEGIN
6931 if vca_scaling_info
:
6932 step
= db_nslcmop_update
[
6934 ] = "Creating new execution environments"
6935 scale_process
= "VCA"
6936 for vca_info
in vca_scaling_info
:
6937 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6938 member_vnf_index
= str(vca_info
["member-vnf-index"])
6940 logging_text
+ "vdu info: {}".format(vca_info
)
6942 vnfd_id
= db_vnfr
["vnfd-ref"]
6943 if vca_info
.get("osm_vdu_id"):
6944 vdu_index
= int(vca_info
["vdu_index"])
6945 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6946 if db_vnfr
.get("additionalParamsForVnf"):
6947 deploy_params
.update(
6949 db_vnfr
["additionalParamsForVnf"].copy()
6952 descriptor_config
= get_configuration(
6953 db_vnfd
, db_vnfd
["id"]
6955 if descriptor_config
:
6961 logging_text
=logging_text
6962 + "member_vnf_index={} ".format(member_vnf_index
),
6965 nslcmop_id
=nslcmop_id
,
6971 kdu_index
=kdu_index
,
6972 member_vnf_index
=member_vnf_index
,
6973 vdu_index
=vdu_index
,
6975 deploy_params
=deploy_params
,
6976 descriptor_config
=descriptor_config
,
6977 base_folder
=base_folder
,
6978 task_instantiation_info
=tasks_dict_info
,
6981 vdu_id
= vca_info
["osm_vdu_id"]
6982 vdur
= find_in_list(
6983 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6985 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6986 if vdur
.get("additionalParams"):
6987 deploy_params_vdu
= parse_yaml_strings(
6988 vdur
["additionalParams"]
6991 deploy_params_vdu
= deploy_params
6992 deploy_params_vdu
["OSM"] = get_osm_params(
6993 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6995 if descriptor_config
:
7001 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7002 member_vnf_index
, vdu_id
, vdu_index
7004 stage
[2] = step
= "Scaling out VCA"
7005 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7007 logging_text
=logging_text
7008 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7009 member_vnf_index
, vdu_id
, vdu_index
7013 nslcmop_id
=nslcmop_id
,
7019 member_vnf_index
=member_vnf_index
,
7020 vdu_index
=vdu_index
,
7021 kdu_index
=kdu_index
,
7023 deploy_params
=deploy_params_vdu
,
7024 descriptor_config
=descriptor_config
,
7025 base_folder
=base_folder
,
7026 task_instantiation_info
=tasks_dict_info
,
7029 # SCALE-UP VCA - END
7030 scale_process
= None
7033 # execute primitive service POST-SCALING
7034 step
= "Executing post-scale vnf-config-primitive"
7035 if scaling_descriptor
.get("scaling-config-action"):
7036 for scaling_config_action
in scaling_descriptor
[
7037 "scaling-config-action"
7040 scaling_config_action
.get("trigger") == "post-scale-in"
7041 and scaling_type
== "SCALE_IN"
7043 scaling_config_action
.get("trigger") == "post-scale-out"
7044 and scaling_type
== "SCALE_OUT"
7046 vnf_config_primitive
= scaling_config_action
[
7047 "vnf-config-primitive-name-ref"
7049 step
= db_nslcmop_update
[
7051 ] = "executing post-scale scaling-config-action '{}'".format(
7052 vnf_config_primitive
7055 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7056 if db_vnfr
.get("additionalParamsForVnf"):
7057 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7059 # look for primitive
7060 for config_primitive
in (
7061 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7062 ).get("config-primitive", ()):
7063 if config_primitive
["name"] == vnf_config_primitive
:
7067 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7068 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7069 "config-primitive".format(
7070 scaling_group
, vnf_config_primitive
7073 scale_process
= "VCA"
7074 db_nsr_update
["config-status"] = "configuring post-scaling"
7075 primitive_params
= self
._map
_primitive
_params
(
7076 config_primitive
, {}, vnfr_params
7079 # Post-scale retry check: Check if this sub-operation has been executed before
7080 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7083 vnf_config_primitive
,
7087 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7088 # Skip sub-operation
7089 result
= "COMPLETED"
7090 result_detail
= "Done"
7093 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7094 vnf_config_primitive
, result
, result_detail
7098 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7099 # New sub-operation: Get index of this sub-operation
7101 len(db_nslcmop
.get("_admin", {}).get("operations"))
7106 + "vnf_config_primitive={} New sub-operation".format(
7107 vnf_config_primitive
7111 # retry: Get registered params for this existing sub-operation
7112 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7115 vnf_index
= op
.get("member_vnf_index")
7116 vnf_config_primitive
= op
.get("primitive")
7117 primitive_params
= op
.get("primitive_params")
7120 + "vnf_config_primitive={} Sub-operation retry".format(
7121 vnf_config_primitive
7124 # Execute the primitive, either with new (first-time) or registered (reintent) args
7125 ee_descriptor_id
= config_primitive
.get(
7126 "execution-environment-ref"
7128 primitive_name
= config_primitive
.get(
7129 "execution-environment-primitive", vnf_config_primitive
7131 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7132 nsr_deployed
["VCA"],
7133 member_vnf_index
=vnf_index
,
7135 vdu_count_index
=None,
7136 ee_descriptor_id
=ee_descriptor_id
,
7138 result
, result_detail
= await self
._ns
_execute
_primitive
(
7147 + "vnf_config_primitive={} Done with result {} {}".format(
7148 vnf_config_primitive
, result
, result_detail
7151 # Update operationState = COMPLETED | FAILED
7152 self
._update
_suboperation
_status
(
7153 db_nslcmop
, op_index
, result
, result_detail
7156 if result
== "FAILED":
7157 raise LcmException(result_detail
)
7158 db_nsr_update
["config-status"] = old_config_status
7159 scale_process
= None
7161 # Check if each vnf has exporter for metric collection if so update prometheus job records
7162 if scaling_type
== "SCALE_OUT":
7163 if "exporters-endpoints" in db_vnfd
.get("df")[0]:
7164 vnfr_id
= db_vnfr
["id"]
7165 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7166 exporter_config
= db_vnfd
.get("df")[0].get("exporters-endpoints")
7167 self
.logger
.debug("exporter config :{}".format(exporter_config
))
7168 artifact_path
= "{}/{}/{}".format(
7169 base_folder
["folder"],
7170 base_folder
["pkg-dir"],
7171 "exporter-endpoint",
7174 ee_config_descriptor
= exporter_config
7175 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
7179 vdu_id
=db_vnfr
["vdur"][-1]["vdu-id-ref"],
7180 vdu_index
=db_vnfr
["vdur"][-1]["count-index"],
7184 self
.logger
.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip
))
7185 self
.logger
.debug("Artifact_path:{}".format(artifact_path
))
7186 vdu_id_for_prom
= None
7187 vdu_index_for_prom
= None
7188 for x
in get_iterable(db_vnfr
, "vdur"):
7189 vdu_id_for_prom
= x
.get("vdu-id-ref")
7190 vdu_index_for_prom
= x
.get("count-index")
7191 vnfr_id
= vnfr_id
+ vdu_id
+ str(vdu_index
)
7192 vnfr_id
= vnfr_id
.replace("_", "")
7193 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
7195 artifact_path
=artifact_path
,
7196 ee_config_descriptor
=ee_config_descriptor
,
7199 target_ip
=rw_mgmt_ip
,
7201 vdu_id
=vdu_id_for_prom
,
7202 vdu_index
=vdu_index_for_prom
,
7205 self
.logger
.debug("Prometheus job:{}".format(prometheus_jobs
))
7208 "_admin.deployed.prometheus_jobs"
7216 for job
in prometheus_jobs
:
7222 fail_on_empty
=False,
7226 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7227 db_nsr_update
["operational-status"] = (
7229 if old_operational_status
== "failed"
7230 else old_operational_status
7232 db_nsr_update
["config-status"] = old_config_status
7235 ROclient
.ROClientException
,
7240 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7242 except asyncio
.CancelledError
:
7244 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7246 exc
= "Operation was cancelled"
7247 except Exception as e
:
7248 exc
= traceback
.format_exc()
7249 self
.logger
.critical(
7250 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7256 error_list
.append(str(exc
))
7257 self
._write
_ns
_status
(
7260 current_operation
="IDLE",
7261 current_operation_id
=None,
7265 stage
[1] = "Waiting for instantiate pending tasks."
7266 self
.logger
.debug(logging_text
+ stage
[1])
7267 exc
= await self
._wait
_for
_tasks
(
7270 self
.timeout
.ns_deploy
,
7275 except asyncio
.CancelledError
:
7276 error_list
.append("Cancelled")
7277 await self
._cancel
_pending
_tasks
(logging_text
, tasks_dict_info
)
7278 await self
._wait
_for
_tasks
(
7281 self
.timeout
.ns_deploy
,
7287 error_detail
= "; ".join(error_list
)
7290 ] = error_description_nslcmop
= "FAILED {}: {}".format(
7293 nslcmop_operation_state
= "FAILED"
7295 db_nsr_update
["operational-status"] = old_operational_status
7296 db_nsr_update
["config-status"] = old_config_status
7297 db_nsr_update
["detailed-status"] = ""
7299 if "VCA" in scale_process
:
7300 db_nsr_update
["config-status"] = "failed"
7301 if "RO" in scale_process
:
7302 db_nsr_update
["operational-status"] = "failed"
7305 ] = "FAILED scaling nslcmop={} {}: {}".format(
7306 nslcmop_id
, step
, error_detail
7309 error_description_nslcmop
= None
7310 nslcmop_operation_state
= "COMPLETED"
7311 db_nslcmop_update
["detailed-status"] = "Done"
7312 if scaling_type
== "SCALE_IN" and prom_job_name
is not None:
7315 {"job_name": prom_job_name
},
7316 fail_on_empty
=False,
7319 self
._write
_op
_status
(
7322 error_message
=error_description_nslcmop
,
7323 operation_state
=nslcmop_operation_state
,
7324 other_update
=db_nslcmop_update
,
7327 self
._write
_ns
_status
(
7330 current_operation
="IDLE",
7331 current_operation_id
=None,
7332 other_update
=db_nsr_update
,
7335 if nslcmop_operation_state
:
7339 "nslcmop_id": nslcmop_id
,
7340 "operationState": nslcmop_operation_state
,
7342 await self
.msg
.aiowrite("ns", "scaled", msg
)
7343 except Exception as e
:
7345 logging_text
+ "kafka_write notification Exception {}".format(e
)
7347 self
.logger
.debug(logging_text
+ "Exit")
7348 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7350 async def _scale_kdu(
7351 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7353 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7354 for kdu_name
in _scaling_info
:
7355 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7356 deployed_kdu
, index
= get_deployed_kdu(
7357 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7359 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7360 kdu_instance
= deployed_kdu
["kdu-instance"]
7361 kdu_model
= deployed_kdu
.get("kdu-model")
7362 scale
= int(kdu_scaling_info
["scale"])
7363 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7366 "collection": "nsrs",
7367 "filter": {"_id": nsr_id
},
7368 "path": "_admin.deployed.K8s.{}".format(index
),
7371 step
= "scaling application {}".format(
7372 kdu_scaling_info
["resource-name"]
7374 self
.logger
.debug(logging_text
+ step
)
7376 if kdu_scaling_info
["type"] == "delete":
7377 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7380 and kdu_config
.get("terminate-config-primitive")
7381 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7383 terminate_config_primitive_list
= kdu_config
.get(
7384 "terminate-config-primitive"
7386 terminate_config_primitive_list
.sort(
7387 key
=lambda val
: int(val
["seq"])
7391 terminate_config_primitive
7392 ) in terminate_config_primitive_list
:
7393 primitive_params_
= self
._map
_primitive
_params
(
7394 terminate_config_primitive
, {}, {}
7396 step
= "execute terminate config primitive"
7397 self
.logger
.debug(logging_text
+ step
)
7398 await asyncio
.wait_for(
7399 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7400 cluster_uuid
=cluster_uuid
,
7401 kdu_instance
=kdu_instance
,
7402 primitive_name
=terminate_config_primitive
["name"],
7403 params
=primitive_params_
,
7405 total_timeout
=self
.timeout
.primitive
,
7408 timeout
=self
.timeout
.primitive
7409 * self
.timeout
.primitive_outer_factor
,
7412 await asyncio
.wait_for(
7413 self
.k8scluster_map
[k8s_cluster_type
].scale(
7414 kdu_instance
=kdu_instance
,
7416 resource_name
=kdu_scaling_info
["resource-name"],
7417 total_timeout
=self
.timeout
.scale_on_error
,
7419 cluster_uuid
=cluster_uuid
,
7420 kdu_model
=kdu_model
,
7424 timeout
=self
.timeout
.scale_on_error
7425 * self
.timeout
.scale_on_error_outer_factor
,
7428 if kdu_scaling_info
["type"] == "create":
7429 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7432 and kdu_config
.get("initial-config-primitive")
7433 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7435 initial_config_primitive_list
= kdu_config
.get(
7436 "initial-config-primitive"
7438 initial_config_primitive_list
.sort(
7439 key
=lambda val
: int(val
["seq"])
7442 for initial_config_primitive
in initial_config_primitive_list
:
7443 primitive_params_
= self
._map
_primitive
_params
(
7444 initial_config_primitive
, {}, {}
7446 step
= "execute initial config primitive"
7447 self
.logger
.debug(logging_text
+ step
)
7448 await asyncio
.wait_for(
7449 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7450 cluster_uuid
=cluster_uuid
,
7451 kdu_instance
=kdu_instance
,
7452 primitive_name
=initial_config_primitive
["name"],
7453 params
=primitive_params_
,
7460 async def _scale_ng_ro(
7461 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7463 nsr_id
= db_nslcmop
["nsInstanceId"]
7464 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7467 # read from db: vnfd's for every vnf
7470 # for each vnf in ns, read vnfd
7471 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7472 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7473 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7474 # if we haven't this vnfd, read it from db
7475 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7477 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7478 db_vnfds
.append(vnfd
)
7479 n2vc_key
= self
.n2vc
.get_public_key()
7480 n2vc_key_list
= [n2vc_key
]
7483 vdu_scaling_info
.get("vdu-create"),
7484 vdu_scaling_info
.get("vdu-delete"),
7487 # db_vnfr has been updated, update db_vnfrs to use it
7488 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7489 await self
._instantiate
_ng
_ro
(
7499 start_deploy
=time(),
7500 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7502 if vdu_scaling_info
.get("vdu-delete"):
7504 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7507 async def extract_prometheus_scrape_jobs(
7511 ee_config_descriptor
: dict,
7516 vnf_member_index
: str = "",
7518 vdu_index
: int = None,
7520 kdu_index
: int = None,
7522 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7523 This method will wait until the corresponding VDU or KDU is fully instantiated
7526 ee_id (str): Execution Environment ID
7527 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7528 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7529 vnfr_id (str): VNFR ID where this EE applies
7530 nsr_id (str): NSR ID where this EE applies
7531 target_ip (str): VDU/KDU instance IP address
7532 element_type (str): NS or VNF or VDU or KDU
7533 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7534 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7535 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7536 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7537 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7540 LcmException: When the VDU or KDU instance was not found in an hour
7543 _type_: Prometheus jobs
7545 # default the vdur and kdur names to an empty string, to avoid any later
7546 # problem with Prometheus when the element type is not VDU or KDU
7550 # look if exist a file called 'prometheus*.j2' and
7551 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7555 for f
in artifact_content
7556 if f
.startswith("prometheus") and f
.endswith(".j2")
7562 self
.logger
.debug("Artifact path{}".format(artifact_path
))
7563 self
.logger
.debug("job file{}".format(job_file
))
7564 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7567 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7568 if element_type
in ("VDU", "KDU"):
7569 for _
in range(360):
7570 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7571 if vdu_id
and vdu_index
is not None:
7575 for x
in get_iterable(db_vnfr
, "vdur")
7577 x
.get("vdu-id-ref") == vdu_id
7578 and x
.get("count-index") == vdu_index
7583 if vdur
.get("name"):
7584 vdur_name
= vdur
.get("name")
7586 if kdu_name
and kdu_index
is not None:
7590 for x
in get_iterable(db_vnfr
, "kdur")
7592 x
.get("kdu-name") == kdu_name
7593 and x
.get("count-index") == kdu_index
7598 if kdur
.get("name"):
7599 kdur_name
= kdur
.get("name")
7602 await asyncio
.sleep(10)
7604 if vdu_id
and vdu_index
is not None:
7606 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7608 if kdu_name
and kdu_index
is not None:
7610 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7613 if ee_id
is not None:
7614 _
, namespace
, helm_id
= get_ee_id_parts(
7616 ) # get namespace and EE gRPC service name
7617 host_name
= f
'{helm_id}-{ee_config_descriptor["metric-service"]}.{namespace}.svc' # svc_name.namespace.svc
7619 vnfr_id
= vnfr_id
.replace("-", "")
7621 "JOB_NAME": vnfr_id
,
7622 "TARGET_IP": target_ip
,
7623 "EXPORTER_POD_IP": host_name
,
7624 "EXPORTER_POD_PORT": host_port
,
7626 "VNF_MEMBER_INDEX": vnf_member_index
,
7627 "VDUR_NAME": vdur_name
,
7628 "KDUR_NAME": kdur_name
,
7629 "ELEMENT_TYPE": element_type
,
7632 metric_path
= ee_config_descriptor
["metric-path"]
7633 target_port
= ee_config_descriptor
["metric-port"]
7634 vnfr_id
= vnfr_id
.replace("-", "")
7636 "JOB_NAME": vnfr_id
,
7637 "TARGET_IP": target_ip
,
7638 "TARGET_PORT": target_port
,
7639 "METRIC_PATH": metric_path
,
7642 job_list
= parse_job(job_data
, variables
)
7643 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7644 for job
in job_list
:
7646 not isinstance(job
.get("job_name"), str)
7647 or vnfr_id
not in job
["job_name"]
7649 job
["job_name"] = vnfr_id
+ "_" + str(SystemRandom().randint(1, 10000))
7650 job
["nsr_id"] = nsr_id
7651 job
["vnfr_id"] = vnfr_id
7654 async def rebuild_start_stop(
7655 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7657 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7658 self
.logger
.info(logging_text
+ "Enter")
7659 stage
= ["Preparing the environment", ""]
7660 # database nsrs record
7664 # in case of error, indicates what part of scale was failed to put nsr at error status
7665 start_deploy
= time()
7667 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7668 vim_account_id
= db_vnfr
.get("vim-account-id")
7669 vim_info_key
= "vim:" + vim_account_id
7670 vdu_id
= additional_param
["vdu_id"]
7671 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7672 vdur
= find_in_list(
7673 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7676 vdu_vim_name
= vdur
["name"]
7677 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7678 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7680 raise LcmException("Target vdu is not found")
7681 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7682 # wait for any previous tasks in process
7683 stage
[1] = "Waiting for previous operations to terminate"
7684 self
.logger
.info(stage
[1])
7685 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7687 stage
[1] = "Reading from database."
7688 self
.logger
.info(stage
[1])
7689 self
._write
_ns
_status
(
7692 current_operation
=operation_type
.upper(),
7693 current_operation_id
=nslcmop_id
,
7695 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7698 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7699 db_nsr_update
["operational-status"] = operation_type
7700 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7704 "vim_vm_id": vim_vm_id
,
7706 "vdu_index": additional_param
["count-index"],
7707 "vdu_id": vdur
["id"],
7708 "target_vim": target_vim
,
7709 "vim_account_id": vim_account_id
,
7712 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7713 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7714 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7715 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7716 self
.logger
.info("response from RO: {}".format(result_dict
))
7717 action_id
= result_dict
["action_id"]
7718 await self
._wait
_ng
_ro
(
7723 self
.timeout
.operate
,
7725 "start_stop_rebuild",
7727 return "COMPLETED", "Done"
7728 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7729 self
.logger
.error("Exit Exception {}".format(e
))
7731 except asyncio
.CancelledError
:
7732 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7733 exc
= "Operation was cancelled"
7734 except Exception as e
:
7735 exc
= traceback
.format_exc()
7736 self
.logger
.critical(
7737 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7739 return "FAILED", "Error in operate VNF {}".format(exc
)
7741 async def migrate(self
, nsr_id
, nslcmop_id
):
7743 Migrate VNFs and VDUs instances in a NS
7745 :param: nsr_id: NS Instance ID
7746 :param: nslcmop_id: nslcmop ID of migrate
7749 # Try to lock HA task here
7750 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7751 if not task_is_locked_by_me
:
7753 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7754 self
.logger
.debug(logging_text
+ "Enter")
7755 # get all needed from database
7757 db_nslcmop_update
= {}
7758 nslcmop_operation_state
= None
7762 # in case of error, indicates what part of scale was failed to put nsr at error status
7763 start_deploy
= time()
7766 # wait for any previous tasks in process
7767 step
= "Waiting for previous operations to terminate"
7768 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7770 self
._write
_ns
_status
(
7773 current_operation
="MIGRATING",
7774 current_operation_id
=nslcmop_id
,
7776 step
= "Getting nslcmop from database"
7778 step
+ " after having waited for previous tasks to be completed"
7780 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7781 migrate_params
= db_nslcmop
.get("operationParams")
7784 target
.update(migrate_params
)
7785 desc
= await self
.RO
.migrate(nsr_id
, target
)
7786 self
.logger
.debug("RO return > {}".format(desc
))
7787 action_id
= desc
["action_id"]
7788 await self
._wait
_ng
_ro
(
7793 self
.timeout
.migrate
,
7794 operation
="migrate",
7796 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7797 self
.logger
.error("Exit Exception {}".format(e
))
7799 except asyncio
.CancelledError
:
7800 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7801 exc
= "Operation was cancelled"
7802 except Exception as e
:
7803 exc
= traceback
.format_exc()
7804 self
.logger
.critical(
7805 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7808 self
._write
_ns
_status
(
7811 current_operation
="IDLE",
7812 current_operation_id
=None,
7815 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7816 nslcmop_operation_state
= "FAILED"
7818 nslcmop_operation_state
= "COMPLETED"
7819 db_nslcmop_update
["detailed-status"] = "Done"
7820 db_nsr_update
["detailed-status"] = "Done"
7822 self
._write
_op
_status
(
7826 operation_state
=nslcmop_operation_state
,
7827 other_update
=db_nslcmop_update
,
7829 if nslcmop_operation_state
:
7833 "nslcmop_id": nslcmop_id
,
7834 "operationState": nslcmop_operation_state
,
7836 await self
.msg
.aiowrite("ns", "migrated", msg
)
7837 except Exception as e
:
7839 logging_text
+ "kafka_write notification Exception {}".format(e
)
7841 self
.logger
.debug(logging_text
+ "Exit")
7842 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7844 async def heal(self
, nsr_id
, nslcmop_id
):
7848 :param nsr_id: ns instance to heal
7849 :param nslcmop_id: operation to run
7853 # Try to lock HA task here
7854 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7855 if not task_is_locked_by_me
:
7858 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7859 stage
= ["", "", ""]
7860 tasks_dict_info
= {}
7861 # ^ stage, step, VIM progress
7862 self
.logger
.debug(logging_text
+ "Enter")
7863 # get all needed from database
7865 db_nslcmop_update
= {}
7867 db_vnfrs
= {} # vnf's info indexed by _id
7869 old_operational_status
= ""
7870 old_config_status
= ""
7873 # wait for any previous tasks in process
7874 step
= "Waiting for previous operations to terminate"
7875 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7876 self
._write
_ns
_status
(
7879 current_operation
="HEALING",
7880 current_operation_id
=nslcmop_id
,
7883 step
= "Getting nslcmop from database"
7885 step
+ " after having waited for previous tasks to be completed"
7887 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7889 step
= "Getting nsr from database"
7890 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7891 old_operational_status
= db_nsr
["operational-status"]
7892 old_config_status
= db_nsr
["config-status"]
7895 "_admin.deployed.RO.operational-status": "healing",
7897 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7899 step
= "Sending heal order to VIM"
7901 logging_text
=logging_text
,
7903 db_nslcmop
=db_nslcmop
,
7908 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7909 self
.logger
.debug(logging_text
+ stage
[1])
7910 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7911 self
.fs
.sync(db_nsr
["nsd-id"])
7913 # read from db: vnfr's of this ns
7914 step
= "Getting vnfrs from db"
7915 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7916 for vnfr
in db_vnfrs_list
:
7917 db_vnfrs
[vnfr
["_id"]] = vnfr
7918 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7920 # Check for each target VNF
7921 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7922 for target_vnf
in target_list
:
7923 # Find this VNF in the list from DB
7924 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7926 db_vnfr
= db_vnfrs
[vnfr_id
]
7927 vnfd_id
= db_vnfr
.get("vnfd-id")
7928 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7929 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7930 base_folder
= vnfd
["_admin"]["storage"]
7935 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7936 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7938 # Check each target VDU and deploy N2VC
7939 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7942 if not target_vdu_list
:
7943 # Codigo nuevo para crear diccionario
7944 target_vdu_list
= []
7945 for existing_vdu
in db_vnfr
.get("vdur"):
7946 vdu_name
= existing_vdu
.get("vdu-name", None)
7947 vdu_index
= existing_vdu
.get("count-index", 0)
7948 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7951 vdu_to_be_healed
= {
7953 "count-index": vdu_index
,
7954 "run-day1": vdu_run_day1
,
7956 target_vdu_list
.append(vdu_to_be_healed
)
7957 for target_vdu
in target_vdu_list
:
7958 deploy_params_vdu
= target_vdu
7959 # Set run-day1 vnf level value if not vdu level value exists
7960 if not deploy_params_vdu
.get("run-day1") and target_vnf
.get(
7961 "additionalParams", {}
7963 deploy_params_vdu
["run-day1"] = target_vnf
[
7966 vdu_name
= target_vdu
.get("vdu-id", None)
7967 # TODO: Get vdu_id from vdud.
7969 # For multi instance VDU count-index is mandatory
7970 # For single session VDU count-indes is 0
7971 vdu_index
= target_vdu
.get("count-index", 0)
7973 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7974 stage
[1] = "Deploying Execution Environments."
7975 self
.logger
.debug(logging_text
+ stage
[1])
7977 # VNF Level charm. Normal case when proxy charms.
7978 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7979 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7980 if descriptor_config
:
7981 # Continue if healed machine is management machine
7982 vnf_ip_address
= db_vnfr
.get("ip-address")
7983 target_instance
= None
7984 for instance
in db_vnfr
.get("vdur", None):
7986 instance
["vdu-name"] == vdu_name
7987 and instance
["count-index"] == vdu_index
7989 target_instance
= instance
7991 if vnf_ip_address
== target_instance
.get("ip-address"):
7993 logging_text
=logging_text
7994 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7995 member_vnf_index
, vdu_name
, vdu_index
7999 nslcmop_id
=nslcmop_id
,
8005 member_vnf_index
=member_vnf_index
,
8008 deploy_params
=deploy_params_vdu
,
8009 descriptor_config
=descriptor_config
,
8010 base_folder
=base_folder
,
8011 task_instantiation_info
=tasks_dict_info
,
8015 # VDU Level charm. Normal case with native charms.
8016 descriptor_config
= get_configuration(vnfd
, vdu_name
)
8017 if descriptor_config
:
8019 logging_text
=logging_text
8020 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8021 member_vnf_index
, vdu_name
, vdu_index
8025 nslcmop_id
=nslcmop_id
,
8031 member_vnf_index
=member_vnf_index
,
8032 vdu_index
=vdu_index
,
8034 deploy_params
=deploy_params_vdu
,
8035 descriptor_config
=descriptor_config
,
8036 base_folder
=base_folder
,
8037 task_instantiation_info
=tasks_dict_info
,
8042 ROclient
.ROClientException
,
8047 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
8049 except asyncio
.CancelledError
:
8051 logging_text
+ "Cancelled Exception while '{}'".format(step
)
8053 exc
= "Operation was cancelled"
8054 except Exception as e
:
8055 exc
= traceback
.format_exc()
8056 self
.logger
.critical(
8057 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
8063 error_list
.append(str(exc
))
8066 stage
[1] = "Waiting for healing pending tasks."
8067 self
.logger
.debug(logging_text
+ stage
[1])
8068 exc
= await self
._wait
_for
_tasks
(
8071 self
.timeout
.ns_deploy
,
8076 except asyncio
.CancelledError
:
8077 error_list
.append("Cancelled")
8078 await self
._cancel
_pending
_tasks
(logging_text
, tasks_dict_info
)
8079 await self
._wait
_for
_tasks
(
8082 self
.timeout
.ns_deploy
,
8088 error_detail
= "; ".join(error_list
)
8091 ] = error_description_nslcmop
= "FAILED {}: {}".format(
8094 nslcmop_operation_state
= "FAILED"
8096 db_nsr_update
["operational-status"] = old_operational_status
8097 db_nsr_update
["config-status"] = old_config_status
8100 ] = "FAILED healing nslcmop={} {}: {}".format(
8101 nslcmop_id
, step
, error_detail
8103 for task
, task_name
in tasks_dict_info
.items():
8104 if not task
.done() or task
.cancelled() or task
.exception():
8105 if task_name
.startswith(self
.task_name_deploy_vca
):
8106 # A N2VC task is pending
8107 db_nsr_update
["config-status"] = "failed"
8109 # RO task is pending
8110 db_nsr_update
["operational-status"] = "failed"
8112 error_description_nslcmop
= None
8113 nslcmop_operation_state
= "COMPLETED"
8114 db_nslcmop_update
["detailed-status"] = "Done"
8115 db_nsr_update
["detailed-status"] = "Done"
8116 db_nsr_update
["operational-status"] = "running"
8117 db_nsr_update
["config-status"] = "configured"
8119 self
._write
_op
_status
(
8122 error_message
=error_description_nslcmop
,
8123 operation_state
=nslcmop_operation_state
,
8124 other_update
=db_nslcmop_update
,
8127 self
._write
_ns
_status
(
8130 current_operation
="IDLE",
8131 current_operation_id
=None,
8132 other_update
=db_nsr_update
,
8135 if nslcmop_operation_state
:
8139 "nslcmop_id": nslcmop_id
,
8140 "operationState": nslcmop_operation_state
,
8142 await self
.msg
.aiowrite("ns", "healed", msg
)
8143 except Exception as e
:
8145 logging_text
+ "kafka_write notification Exception {}".format(e
)
8147 self
.logger
.debug(logging_text
+ "Exit")
8148 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8159 :param logging_text: preffix text to use at logging
8160 :param nsr_id: nsr identity
8161 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8162 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8163 :return: None or exception
8166 def get_vim_account(vim_account_id
):
8168 if vim_account_id
in db_vims
:
8169 return db_vims
[vim_account_id
]
8170 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8171 db_vims
[vim_account_id
] = db_vim
8176 ns_params
= db_nslcmop
.get("operationParams")
8177 if ns_params
and ns_params
.get("timeout_ns_heal"):
8178 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8180 timeout_ns_heal
= self
.timeout
.ns_heal
8184 nslcmop_id
= db_nslcmop
["_id"]
8186 "action_id": nslcmop_id
,
8188 self
.logger
.warning(
8189 "db_nslcmop={} and timeout_ns_heal={}".format(
8190 db_nslcmop
, timeout_ns_heal
8193 target
.update(db_nslcmop
.get("operationParams", {}))
8195 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8196 desc
= await self
.RO
.recreate(nsr_id
, target
)
8197 self
.logger
.debug("RO return > {}".format(desc
))
8198 action_id
= desc
["action_id"]
8199 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8200 await self
._wait
_ng
_ro
(
8207 operation
="healing",
8212 "_admin.deployed.RO.operational-status": "running",
8213 "detailed-status": " ".join(stage
),
8215 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8216 self
._write
_op
_status
(nslcmop_id
, stage
)
8218 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8221 except Exception as e
:
8222 stage
[2] = "ERROR healing at VIM"
8223 # self.set_vnfr_at_error(db_vnfrs, str(e))
8225 "Error healing at VIM {}".format(e
),
8226 exc_info
=not isinstance(
8229 ROclient
.ROClientException
,
8255 task_instantiation_info
,
8258 # launch instantiate_N2VC in a asyncio task and register task object
8259 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8260 # if not found, create one entry and update database
8261 # fill db_nsr._admin.deployed.VCA.<index>
8264 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8268 get_charm_name
= False
8269 if "execution-environment-list" in descriptor_config
:
8270 ee_list
= descriptor_config
.get("execution-environment-list", [])
8271 elif "juju" in descriptor_config
:
8272 ee_list
= [descriptor_config
] # ns charms
8273 if "execution-environment-list" not in descriptor_config
:
8274 # charm name is only required for ns charms
8275 get_charm_name
= True
8276 else: # other types as script are not supported
8279 for ee_item
in ee_list
:
8282 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8283 ee_item
.get("juju"), ee_item
.get("helm-chart")
8286 ee_descriptor_id
= ee_item
.get("id")
8287 if ee_item
.get("juju"):
8288 vca_name
= ee_item
["juju"].get("charm")
8290 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8293 if ee_item
["juju"].get("charm") is not None
8296 if ee_item
["juju"].get("cloud") == "k8s":
8297 vca_type
= "k8s_proxy_charm"
8298 elif ee_item
["juju"].get("proxy") is False:
8299 vca_type
= "native_charm"
8300 elif ee_item
.get("helm-chart"):
8301 vca_name
= ee_item
["helm-chart"]
8302 vca_type
= "helm-v3"
8305 logging_text
+ "skipping non juju neither charm configuration"
8310 for vca_index
, vca_deployed
in enumerate(
8311 db_nsr
["_admin"]["deployed"]["VCA"]
8313 if not vca_deployed
:
8316 vca_deployed
.get("member-vnf-index") == member_vnf_index
8317 and vca_deployed
.get("vdu_id") == vdu_id
8318 and vca_deployed
.get("kdu_name") == kdu_name
8319 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8320 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8324 # not found, create one.
8326 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8329 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8331 target
+= "/kdu/{}".format(kdu_name
)
8333 "target_element": target
,
8334 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8335 "member-vnf-index": member_vnf_index
,
8337 "kdu_name": kdu_name
,
8338 "vdu_count_index": vdu_index
,
8339 "operational-status": "init", # TODO revise
8340 "detailed-status": "", # TODO revise
8341 "step": "initial-deploy", # TODO revise
8343 "vdu_name": vdu_name
,
8345 "ee_descriptor_id": ee_descriptor_id
,
8346 "charm_name": charm_name
,
8350 # create VCA and configurationStatus in db
8352 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8353 "configurationStatus.{}".format(vca_index
): dict(),
8355 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8357 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8359 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8360 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8361 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8364 task_n2vc
= asyncio
.ensure_future(
8366 logging_text
=logging_text
,
8367 vca_index
=vca_index
,
8373 vdu_index
=vdu_index
,
8374 deploy_params
=deploy_params
,
8375 config_descriptor
=descriptor_config
,
8376 base_folder
=base_folder
,
8377 nslcmop_id
=nslcmop_id
,
8381 ee_config_descriptor
=ee_item
,
8384 self
.lcm_tasks
.register(
8388 "instantiate_N2VC-{}".format(vca_index
),
8391 task_instantiation_info
[
8393 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8394 member_vnf_index
or "", vdu_id
or ""
8397 async def heal_N2VC(
8414 ee_config_descriptor
,
8416 nsr_id
= db_nsr
["_id"]
8417 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8418 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8419 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8420 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8422 "collection": "nsrs",
8423 "filter": {"_id": nsr_id
},
8424 "path": db_update_entry
,
8429 element_under_configuration
= nsr_id
8433 vnfr_id
= db_vnfr
["_id"]
8434 osm_config
["osm"]["vnf_id"] = vnfr_id
8436 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8438 if vca_type
== "native_charm":
8441 index_number
= vdu_index
or 0
8444 element_type
= "VNF"
8445 element_under_configuration
= vnfr_id
8446 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8448 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8449 element_type
= "VDU"
8450 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8451 osm_config
["osm"]["vdu_id"] = vdu_id
8453 namespace
+= ".{}".format(kdu_name
)
8454 element_type
= "KDU"
8455 element_under_configuration
= kdu_name
8456 osm_config
["osm"]["kdu_name"] = kdu_name
8459 if base_folder
["pkg-dir"]:
8460 artifact_path
= "{}/{}/{}/{}".format(
8461 base_folder
["folder"],
8462 base_folder
["pkg-dir"],
8465 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8470 artifact_path
= "{}/Scripts/{}/{}/".format(
8471 base_folder
["folder"],
8474 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8479 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8481 # get initial_config_primitive_list that applies to this element
8482 initial_config_primitive_list
= config_descriptor
.get(
8483 "initial-config-primitive"
8487 "Initial config primitive list > {}".format(
8488 initial_config_primitive_list
8492 # add config if not present for NS charm
8493 ee_descriptor_id
= ee_config_descriptor
.get("id")
8494 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8495 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8496 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8500 "Initial config primitive list #2 > {}".format(
8501 initial_config_primitive_list
8504 # n2vc_redesign STEP 3.1
8505 # find old ee_id if exists
8506 ee_id
= vca_deployed
.get("ee_id")
8508 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8509 # create or register execution environment in VCA. Only for native charms when healing
8510 if vca_type
== "native_charm":
8511 step
= "Waiting to VM being up and getting IP address"
8512 self
.logger
.debug(logging_text
+ step
)
8513 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8522 credentials
= {"hostname": rw_mgmt_ip
}
8524 username
= deep_get(
8525 config_descriptor
, ("config-access", "ssh-access", "default-user")
8527 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8528 # merged. Meanwhile let's get username from initial-config-primitive
8529 if not username
and initial_config_primitive_list
:
8530 for config_primitive
in initial_config_primitive_list
:
8531 for param
in config_primitive
.get("parameter", ()):
8532 if param
["name"] == "ssh-username":
8533 username
= param
["value"]
8537 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8538 "'config-access.ssh-access.default-user'"
8540 credentials
["username"] = username
8542 # n2vc_redesign STEP 3.2
8543 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8544 self
._write
_configuration
_status
(
8546 vca_index
=vca_index
,
8547 status
="REGISTERING",
8548 element_under_configuration
=element_under_configuration
,
8549 element_type
=element_type
,
8552 step
= "register execution environment {}".format(credentials
)
8553 self
.logger
.debug(logging_text
+ step
)
8554 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8555 credentials
=credentials
,
8556 namespace
=namespace
,
8561 # update ee_id en db
8563 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8565 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8567 # for compatibility with MON/POL modules, the need model and application name at database
8568 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8569 # Not sure if this need to be done when healing
8571 ee_id_parts = ee_id.split(".")
8572 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8573 if len(ee_id_parts) >= 2:
8574 model_name = ee_id_parts[0]
8575 application_name = ee_id_parts[1]
8576 db_nsr_update[db_update_entry + "model"] = model_name
8577 db_nsr_update[db_update_entry + "application"] = application_name
8580 # n2vc_redesign STEP 3.3
8581 # Install configuration software. Only for native charms.
8582 step
= "Install configuration Software"
8584 self
._write
_configuration
_status
(
8586 vca_index
=vca_index
,
8587 status
="INSTALLING SW",
8588 element_under_configuration
=element_under_configuration
,
8589 element_type
=element_type
,
8590 # other_update=db_nsr_update,
8594 # TODO check if already done
8595 self
.logger
.debug(logging_text
+ step
)
8597 if vca_type
== "native_charm":
8598 config_primitive
= next(
8599 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8602 if config_primitive
:
8603 config
= self
._map
_primitive
_params
(
8604 config_primitive
, {}, deploy_params
8606 await self
.vca_map
[vca_type
].install_configuration_sw(
8608 artifact_path
=artifact_path
,
8616 # write in db flag of configuration_sw already installed
8618 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8621 # Not sure if this need to be done when healing
8623 # add relations for this VCA (wait for other peers related with this VCA)
8624 await self._add_vca_relations(
8625 logging_text=logging_text,
8628 vca_index=vca_index,
8632 # if SSH access is required, then get execution environment SSH public
8633 # if native charm we have waited already to VM be UP
8634 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm-v3"):
8637 # self.logger.debug("get ssh key block")
8639 config_descriptor
, ("config-access", "ssh-access", "required")
8641 # self.logger.debug("ssh key needed")
8642 # Needed to inject a ssh key
8645 ("config-access", "ssh-access", "default-user"),
8647 step
= "Install configuration Software, getting public ssh key"
8648 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8649 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8652 step
= "Insert public key into VM user={} ssh_key={}".format(
8656 # self.logger.debug("no need to get ssh key")
8657 step
= "Waiting to VM being up and getting IP address"
8658 self
.logger
.debug(logging_text
+ step
)
8660 # n2vc_redesign STEP 5.1
8661 # wait for RO (ip-address) Insert pub_key into VM
8662 # IMPORTANT: We need do wait for RO to complete healing operation.
8663 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8666 rw_mgmt_ip
= await self
.wait_kdu_up(
8667 logging_text
, nsr_id
, vnfr_id
, kdu_name
8670 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8680 rw_mgmt_ip
= None # This is for a NS configuration
8682 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8684 # store rw_mgmt_ip in deploy params for later replacement
8685 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8688 # get run-day1 operation parameter
8689 runDay1
= deploy_params
.get("run-day1", False)
8691 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8694 # n2vc_redesign STEP 6 Execute initial config primitive
8695 step
= "execute initial config primitive"
8697 # wait for dependent primitives execution (NS -> VNF -> VDU)
8698 if initial_config_primitive_list
:
8699 await self
._wait
_dependent
_n
2vc
(
8700 nsr_id
, vca_deployed_list
, vca_index
8703 # stage, in function of element type: vdu, kdu, vnf or ns
8704 my_vca
= vca_deployed_list
[vca_index
]
8705 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8707 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8708 elif my_vca
.get("member-vnf-index"):
8710 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8713 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8715 self
._write
_configuration
_status
(
8716 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8719 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8721 check_if_terminated_needed
= True
8722 for initial_config_primitive
in initial_config_primitive_list
:
8723 # adding information on the vca_deployed if it is a NS execution environment
8724 if not vca_deployed
["member-vnf-index"]:
8725 deploy_params
["ns_config_info"] = json
.dumps(
8726 self
._get
_ns
_config
_info
(nsr_id
)
8728 # TODO check if already done
8729 primitive_params_
= self
._map
_primitive
_params
(
8730 initial_config_primitive
, {}, deploy_params
8733 step
= "execute primitive '{}' params '{}'".format(
8734 initial_config_primitive
["name"], primitive_params_
8736 self
.logger
.debug(logging_text
+ step
)
8737 await self
.vca_map
[vca_type
].exec_primitive(
8739 primitive_name
=initial_config_primitive
["name"],
8740 params_dict
=primitive_params_
,
8745 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8746 if check_if_terminated_needed
:
8747 if config_descriptor
.get("terminate-config-primitive"):
8751 {db_update_entry
+ "needed_terminate": True},
8753 check_if_terminated_needed
= False
8755 # TODO register in database that primitive is done
8757 # STEP 7 Configure metrics
8758 # Not sure if this need to be done when healing
8760 if vca_type == "helm" or vca_type == "helm-v3":
8761 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8763 artifact_path=artifact_path,
8764 ee_config_descriptor=ee_config_descriptor,
8767 target_ip=rw_mgmt_ip,
8773 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8776 for job in prometheus_jobs:
8779 {"job_name": job["job_name"]},
8782 fail_on_empty=False,
8786 step
= "instantiated at VCA"
8787 self
.logger
.debug(logging_text
+ step
)
8789 self
._write
_configuration
_status
(
8790 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8793 except Exception as e
: # TODO not use Exception but N2VC exception
8794 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8796 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8799 "Exception while {} : {}".format(step
, e
), exc_info
=True
8801 self
._write
_configuration
_status
(
8802 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8804 raise LcmException("{} {}".format(step
, e
)) from e
8806 async def _wait_heal_ro(
8812 while time() <= start_time
+ timeout
:
8813 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8814 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8815 "operational-status"
8817 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8818 if operational_status_ro
!= "healing":
8820 await asyncio
.sleep(15)
8821 else: # timeout_ns_deploy
8822 raise NgRoException("Timeout waiting ns to deploy")
8824 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8826 Vertical Scale the VDUs in a NS
8828 :param: nsr_id: NS Instance ID
8829 :param: nslcmop_id: nslcmop ID of migrate
8832 # Try to lock HA task here
8833 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8834 if not task_is_locked_by_me
:
8836 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8837 self
.logger
.debug(logging_text
+ "Enter")
8838 # get all needed from database
8840 db_nslcmop_update
= {}
8841 nslcmop_operation_state
= None
8844 old_vdu_index
= None
8845 old_flavor_id
= None
8849 # in case of error, indicates what part of scale was failed to put nsr at error status
8850 start_deploy
= time()
8853 # wait for any previous tasks in process
8854 step
= "Waiting for previous operations to terminate"
8855 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8857 self
._write
_ns
_status
(
8860 current_operation
="VerticalScale",
8861 current_operation_id
=nslcmop_id
,
8863 step
= "Getting nslcmop from database"
8865 step
+ " after having waited for previous tasks to be completed"
8867 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8868 operationParams
= db_nslcmop
.get("operationParams")
8869 # Update the VNFRS and NSRS with the requested flavour detail, So that ro tasks can function properly
8870 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8871 db_flavor
= db_nsr
.get("flavor")
8872 db_flavor_index
= str(len(db_flavor
))
8873 change_vnf_flavor_data
= operationParams
["changeVnfFlavorData"]
8874 flavor_dict
= change_vnf_flavor_data
["additionalParams"]
8875 count_index
= flavor_dict
["vduCountIndex"]
8876 vdu_id_ref
= flavor_dict
["vduid"]
8877 flavor_dict_update
= {
8878 "id": db_flavor_index
,
8879 "memory-mb": flavor_dict
["virtualMemory"],
8880 "name": f
"{vdu_id_ref}-{count_index}-flv",
8881 "storage-gb": flavor_dict
["sizeOfStorage"],
8882 "vcpu-count": flavor_dict
["numVirtualCpu"],
8884 db_flavor
.append(flavor_dict_update
)
8886 db_update
["flavor"] = db_flavor
8892 q_filter
=ns_q_filter
,
8893 update_dict
=db_update
,
8896 db_vnfr
= self
.db
.get_one(
8897 "vnfrs", {"_id": change_vnf_flavor_data
["vnfInstanceId"]}
8899 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
8901 vdur
.get("count-index") == count_index
8902 and vdur
.get("vdu-id-ref") == vdu_id_ref
8904 old_flavor_id
= vdur
.get("ns-flavor-id", 0)
8905 old_vdu_index
= vdu_index
8907 "_id": change_vnf_flavor_data
["vnfInstanceId"],
8908 "vdur.count-index": count_index
,
8909 "vdur.vdu-id-ref": vdu_id_ref
,
8911 q_filter
.update(filter_text
)
8914 "vdur.{}.ns-flavor-id".format(vdu_index
)
8919 update_dict
=db_update
,
8923 target
.update(operationParams
)
8924 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8925 self
.logger
.debug("RO return > {}".format(desc
))
8926 action_id
= desc
["action_id"]
8927 await self
._wait
_ng
_ro
(
8932 self
.timeout
.verticalscale
,
8933 operation
="verticalscale",
8937 ROclient
.ROClientException
,
8941 self
.logger
.error("Exit Exception {}".format(e
))
8943 except asyncio
.CancelledError
:
8944 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8945 exc
= "Operation was cancelled"
8946 except Exception as e
:
8947 exc
= traceback
.format_exc()
8948 self
.logger
.critical(
8949 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8952 self
._write
_ns
_status
(
8955 current_operation
="IDLE",
8956 current_operation_id
=None,
8959 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8960 nslcmop_operation_state
= "FAILED"
8962 "vdur.{}.ns-flavor-id".format(old_vdu_index
)
8965 nslcmop_operation_state
= "COMPLETED"
8966 db_nslcmop_update
["detailed-status"] = "Done"
8967 db_nsr_update
["detailed-status"] = "Done"
8969 self
._write
_op
_status
(
8973 operation_state
=nslcmop_operation_state
,
8974 other_update
=db_nslcmop_update
,
8976 if old_vdu_index
and old_db_update
!= {}:
8977 self
.logger
.critical(
8978 "Reverting Old Flavor -- : {}".format(old_db_update
)
8983 update_dict
=old_db_update
,
8986 if nslcmop_operation_state
:
8990 "nslcmop_id": nslcmop_id
,
8991 "operationState": nslcmop_operation_state
,
8993 await self
.msg
.aiowrite("ns", "verticalscaled", msg
)
8994 except Exception as e
:
8996 logging_text
+ "kafka_write notification Exception {}".format(e
)
8998 self
.logger
.debug(logging_text
+ "Exit")
8999 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")