1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import SystemRandom
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
161 self
.conn_helm_ee
= LCMHelmConn(
163 vca_config
=self
.vca_config
,
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
167 self
.k8sclusterhelm2
= K8sHelmConnector(
168 kubectl_command
=self
.vca_config
.kubectlpath
,
169 helm_command
=self
.vca_config
.helmpath
,
176 self
.k8sclusterhelm3
= K8sHelm3Connector(
177 kubectl_command
=self
.vca_config
.kubectlpath
,
178 helm_command
=self
.vca_config
.helm3path
,
185 self
.k8sclusterjuju
= K8sJujuConnector(
186 kubectl_command
=self
.vca_config
.kubectlpath
,
187 juju_command
=self
.vca_config
.jujupath
,
189 on_update_db
=self
._on
_update
_k
8s
_db
,
194 self
.k8scluster_map
= {
195 "helm-chart": self
.k8sclusterhelm2
,
196 "helm-chart-v3": self
.k8sclusterhelm3
,
197 "chart": self
.k8sclusterhelm3
,
198 "juju-bundle": self
.k8sclusterjuju
,
199 "juju": self
.k8sclusterjuju
,
203 "lxc_proxy_charm": self
.n2vc
,
204 "native_charm": self
.n2vc
,
205 "k8s_proxy_charm": self
.n2vc
,
206 "helm": self
.conn_helm_ee
,
207 "helm-v3": self
.conn_helm_ee
,
211 self
.RO
= NgRoClient(**self
.ro_config
.to_dict())
213 self
.op_status_map
= {
214 "instantiation": self
.RO
.status
,
215 "termination": self
.RO
.status
,
216 "migrate": self
.RO
.status
,
217 "healing": self
.RO
.recreate_status
,
218 "verticalscale": self
.RO
.status
,
219 "start_stop_rebuild": self
.RO
.status
,
223 def increment_ip_mac(ip_mac
, vm_index
=1):
224 if not isinstance(ip_mac
, str):
227 # try with ipv4 look for last dot
228 i
= ip_mac
.rfind(".")
231 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
232 # try with ipv6 or mac look for last colon. Operate in hex
233 i
= ip_mac
.rfind(":")
236 # format in hex, len can be 2 for mac or 4 for ipv6
237 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
238 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
244 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
245 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
248 # TODO filter RO descriptor fields...
252 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
253 db_dict
["deploymentStatus"] = ro_descriptor
254 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
256 except Exception as e
:
258 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
261 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
262 # remove last dot from path (if exists)
263 if path
.endswith("."):
266 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
267 # .format(table, filter, path, updated_data))
269 nsr_id
= filter.get("_id")
271 # read ns record from database
272 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
273 current_ns_status
= nsr
.get("nsState")
275 # get vca status for NS
276 status_dict
= await self
.n2vc
.get_status(
277 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
282 db_dict
["vcaStatus"] = status_dict
284 # update configurationStatus for this VCA
286 vca_index
= int(path
[path
.rfind(".") + 1 :])
289 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
291 vca_status
= vca_list
[vca_index
].get("status")
293 configuration_status_list
= nsr
.get("configurationStatus")
294 config_status
= configuration_status_list
[vca_index
].get("status")
296 if config_status
== "BROKEN" and vca_status
!= "failed":
297 db_dict
["configurationStatus"][vca_index
] = "READY"
298 elif config_status
!= "BROKEN" and vca_status
== "failed":
299 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
300 except Exception as e
:
301 # not update configurationStatus
302 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
304 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
305 # if nsState = 'DEGRADED' check if all is OK
307 if current_ns_status
in ("READY", "DEGRADED"):
308 error_description
= ""
310 if status_dict
.get("machines"):
311 for machine_id
in status_dict
.get("machines"):
312 machine
= status_dict
.get("machines").get(machine_id
)
313 # check machine agent-status
314 if machine
.get("agent-status"):
315 s
= machine
.get("agent-status").get("status")
318 error_description
+= (
319 "machine {} agent-status={} ; ".format(
323 # check machine instance status
324 if machine
.get("instance-status"):
325 s
= machine
.get("instance-status").get("status")
328 error_description
+= (
329 "machine {} instance-status={} ; ".format(
334 if status_dict
.get("applications"):
335 for app_id
in status_dict
.get("applications"):
336 app
= status_dict
.get("applications").get(app_id
)
337 # check application status
338 if app
.get("status"):
339 s
= app
.get("status").get("status")
342 error_description
+= (
343 "application {} status={} ; ".format(app_id
, s
)
346 if error_description
:
347 db_dict
["errorDescription"] = error_description
348 if current_ns_status
== "READY" and is_degraded
:
349 db_dict
["nsState"] = "DEGRADED"
350 if current_ns_status
== "DEGRADED" and not is_degraded
:
351 db_dict
["nsState"] = "READY"
354 self
.update_db_2("nsrs", nsr_id
, db_dict
)
356 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
358 except Exception as e
:
359 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
361 async def _on_update_k8s_db(
362 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
365 Updating vca status in NSR record
366 :param cluster_uuid: UUID of a k8s cluster
367 :param kdu_instance: The unique name of the KDU instance
368 :param filter: To get nsr_id
369 :cluster_type: The cluster type (juju, k8s)
373 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
374 # .format(cluster_uuid, kdu_instance, filter))
376 nsr_id
= filter.get("_id")
378 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
379 cluster_uuid
=cluster_uuid
,
380 kdu_instance
=kdu_instance
,
382 complete_status
=True,
388 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
391 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
395 self
.update_db_2("nsrs", nsr_id
, db_dict
)
396 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
398 except Exception as e
:
399 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
402 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
405 undefined
=StrictUndefined
,
406 autoescape
=select_autoescape(default_for_string
=True, default
=True),
408 template
= env
.from_string(cloud_init_text
)
409 return template
.render(additional_params
or {})
410 except UndefinedError
as e
:
412 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
413 "file, must be provided in the instantiation parameters inside the "
414 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
416 except (TemplateError
, TemplateNotFound
) as e
:
418 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
423 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
424 cloud_init_content
= cloud_init_file
= None
426 if vdu
.get("cloud-init-file"):
427 base_folder
= vnfd
["_admin"]["storage"]
428 if base_folder
["pkg-dir"]:
429 cloud_init_file
= "{}/{}/cloud_init/{}".format(
430 base_folder
["folder"],
431 base_folder
["pkg-dir"],
432 vdu
["cloud-init-file"],
435 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
436 base_folder
["folder"],
437 vdu
["cloud-init-file"],
439 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
440 cloud_init_content
= ci_file
.read()
441 elif vdu
.get("cloud-init"):
442 cloud_init_content
= vdu
["cloud-init"]
444 return cloud_init_content
445 except FsException
as e
:
447 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
448 vnfd
["id"], vdu
["id"], cloud_init_file
, e
452 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
454 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
456 additional_params
= vdur
.get("additionalParams")
457 return parse_yaml_strings(additional_params
)
459 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
461 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
462 :param vnfd: input vnfd
463 :param new_id: overrides vnf id if provided
464 :param additionalParams: Instantiation params for VNFs provided
465 :param nsrId: Id of the NSR
466 :return: copy of vnfd
468 vnfd_RO
= deepcopy(vnfd
)
469 # remove unused by RO configuration, monitoring, scaling and internal keys
470 vnfd_RO
.pop("_id", None)
471 vnfd_RO
.pop("_admin", None)
472 vnfd_RO
.pop("monitoring-param", None)
473 vnfd_RO
.pop("scaling-group-descriptor", None)
474 vnfd_RO
.pop("kdu", None)
475 vnfd_RO
.pop("k8s-cluster", None)
477 vnfd_RO
["id"] = new_id
479 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
480 for vdu
in get_iterable(vnfd_RO
, "vdu"):
481 vdu
.pop("cloud-init-file", None)
482 vdu
.pop("cloud-init", None)
486 def ip_profile_2_RO(ip_profile
):
487 RO_ip_profile
= deepcopy(ip_profile
)
488 if "dns-server" in RO_ip_profile
:
489 if isinstance(RO_ip_profile
["dns-server"], list):
490 RO_ip_profile
["dns-address"] = []
491 for ds
in RO_ip_profile
.pop("dns-server"):
492 RO_ip_profile
["dns-address"].append(ds
["address"])
494 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
495 if RO_ip_profile
.get("ip-version") == "ipv4":
496 RO_ip_profile
["ip-version"] = "IPv4"
497 if RO_ip_profile
.get("ip-version") == "ipv6":
498 RO_ip_profile
["ip-version"] = "IPv6"
499 if "dhcp-params" in RO_ip_profile
:
500 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
503 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
504 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
505 if db_vim
["_admin"]["operationalState"] != "ENABLED":
507 "VIM={} is not available. operationalState={}".format(
508 vim_account
, db_vim
["_admin"]["operationalState"]
511 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
514 def get_ro_wim_id_for_wim_account(self
, wim_account
):
515 if isinstance(wim_account
, str):
516 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
517 if db_wim
["_admin"]["operationalState"] != "ENABLED":
519 "WIM={} is not available. operationalState={}".format(
520 wim_account
, db_wim
["_admin"]["operationalState"]
523 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
528 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
529 db_vdu_push_list
= []
531 db_update
= {"_admin.modified": time()}
533 for vdu_id
, vdu_count
in vdu_create
.items():
537 for vdur
in reversed(db_vnfr
["vdur"])
538 if vdur
["vdu-id-ref"] == vdu_id
543 # Read the template saved in the db:
545 "No vdur in the database. Using the vdur-template to scale"
547 vdur_template
= db_vnfr
.get("vdur-template")
548 if not vdur_template
:
550 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
554 vdur
= vdur_template
[0]
555 # Delete a template from the database after using it
558 {"_id": db_vnfr
["_id"]},
560 pull
={"vdur-template": {"_id": vdur
["_id"]}},
562 for count
in range(vdu_count
):
563 vdur_copy
= deepcopy(vdur
)
564 vdur_copy
["status"] = "BUILD"
565 vdur_copy
["status-detailed"] = None
566 vdur_copy
["ip-address"] = None
567 vdur_copy
["_id"] = str(uuid4())
568 vdur_copy
["count-index"] += count
+ 1
569 vdur_copy
["id"] = "{}-{}".format(
570 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
572 vdur_copy
.pop("vim_info", None)
573 for iface
in vdur_copy
["interfaces"]:
574 if iface
.get("fixed-ip"):
575 iface
["ip-address"] = self
.increment_ip_mac(
576 iface
["ip-address"], count
+ 1
579 iface
.pop("ip-address", None)
580 if iface
.get("fixed-mac"):
581 iface
["mac-address"] = self
.increment_ip_mac(
582 iface
["mac-address"], count
+ 1
585 iface
.pop("mac-address", None)
589 ) # only first vdu can be managment of vnf
590 db_vdu_push_list
.append(vdur_copy
)
591 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
593 if len(db_vnfr
["vdur"]) == 1:
594 # The scale will move to 0 instances
596 "Scaling to 0 !, creating the template with the last vdur"
598 template_vdur
= [db_vnfr
["vdur"][0]]
599 for vdu_id
, vdu_count
in vdu_delete
.items():
601 indexes_to_delete
= [
603 for iv
in enumerate(db_vnfr
["vdur"])
604 if iv
[1]["vdu-id-ref"] == vdu_id
608 "vdur.{}.status".format(i
): "DELETING"
609 for i
in indexes_to_delete
[-vdu_count
:]
613 # it must be deleted one by one because common.db does not allow otherwise
616 for v
in reversed(db_vnfr
["vdur"])
617 if v
["vdu-id-ref"] == vdu_id
619 for vdu
in vdus_to_delete
[:vdu_count
]:
622 {"_id": db_vnfr
["_id"]},
624 pull
={"vdur": {"_id": vdu
["_id"]}},
628 db_push
["vdur"] = db_vdu_push_list
630 db_push
["vdur-template"] = template_vdur
633 db_vnfr
["vdur-template"] = template_vdur
634 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
635 # modify passed dictionary db_vnfr
636 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
637 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
639 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
641 Updates database nsr with the RO info for the created vld
642 :param ns_update_nsr: dictionary to be filled with the updated info
643 :param db_nsr: content of db_nsr. This is also modified
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
648 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
649 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
650 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
652 vld
["vim-id"] = net_RO
.get("vim_net_id")
653 vld
["name"] = net_RO
.get("vim_name")
654 vld
["status"] = net_RO
.get("status")
655 vld
["status-detailed"] = net_RO
.get("error_msg")
656 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
660 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
663 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
665 for db_vnfr
in db_vnfrs
.values():
666 vnfr_update
= {"status": "ERROR"}
667 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
668 if "status" not in vdur
:
669 vdur
["status"] = "ERROR"
670 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
672 vdur
["status-detailed"] = str(error_text
)
674 "vdur.{}.status-detailed".format(vdu_index
)
676 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
677 except DbException
as e
:
678 self
.logger
.error("Cannot update vnf. {}".format(e
))
680 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
682 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
683 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
684 :param nsr_desc_RO: nsr descriptor from RO
685 :return: Nothing, LcmException is raised on errors
687 for vnf_index
, db_vnfr
in db_vnfrs
.items():
688 for vnf_RO
in nsr_desc_RO
["vnfs"]:
689 if vnf_RO
["member_vnf_index"] != vnf_index
:
692 if vnf_RO
.get("ip_address"):
693 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
696 elif not db_vnfr
.get("ip-address"):
697 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
698 raise LcmExceptionNoMgmtIP(
699 "ns member_vnf_index '{}' has no IP address".format(
704 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
705 vdur_RO_count_index
= 0
706 if vdur
.get("pdu-type"):
708 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
709 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
711 if vdur
["count-index"] != vdur_RO_count_index
:
712 vdur_RO_count_index
+= 1
714 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
715 if vdur_RO
.get("ip_address"):
716 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
718 vdur
["ip-address"] = None
719 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
720 vdur
["name"] = vdur_RO
.get("vim_name")
721 vdur
["status"] = vdur_RO
.get("status")
722 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
723 for ifacer
in get_iterable(vdur
, "interfaces"):
724 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
725 if ifacer
["name"] == interface_RO
.get("internal_name"):
726 ifacer
["ip-address"] = interface_RO
.get(
729 ifacer
["mac-address"] = interface_RO
.get(
735 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
736 "from VIM info".format(
737 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
740 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
750 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
751 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
752 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
754 vld
["vim-id"] = net_RO
.get("vim_net_id")
755 vld
["name"] = net_RO
.get("vim_name")
756 vld
["status"] = net_RO
.get("status")
757 vld
["status-detailed"] = net_RO
.get("error_msg")
758 vnfr_update
["vld.{}".format(vld_index
)] = vld
762 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
767 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
772 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
777 def _get_ns_config_info(self
, nsr_id
):
779 Generates a mapping between vnf,vdu elements and the N2VC id
780 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
781 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
782 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
783 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
786 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
788 ns_config_info
= {"osm-config-mapping": mapping
}
789 for vca
in vca_deployed_list
:
790 if not vca
["member-vnf-index"]:
792 if not vca
["vdu_id"]:
793 mapping
[vca
["member-vnf-index"]] = vca
["application"]
797 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
799 ] = vca
["application"]
800 return ns_config_info
802 async def _instantiate_ng_ro(
818 def get_vim_account(vim_account_id
):
820 if vim_account_id
in db_vims
:
821 return db_vims
[vim_account_id
]
822 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
823 db_vims
[vim_account_id
] = db_vim
826 # modify target_vld info with instantiation parameters
827 def parse_vld_instantiation_params(
828 target_vim
, target_vld
, vld_params
, target_sdn
830 if vld_params
.get("ip-profile"):
831 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
832 vld_params
["ip-profile"]
834 if vld_params
.get("provider-network"):
835 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
838 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
839 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
843 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
844 # if wim_account_id is specified in vld_params, validate if it is feasible.
845 wim_account_id
, db_wim
= select_feasible_wim_account(
846 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
850 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
851 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
852 # update vld_params with correct WIM account Id
853 vld_params
["wimAccountId"] = wim_account_id
855 target_wim
= "wim:{}".format(wim_account_id
)
856 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
857 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
858 if len(sdn_ports
) > 0:
859 target_vld
["vim_info"][target_wim
] = target_wim_attrs
860 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
863 "Target VLD with WIM data: {:s}".format(str(target_vld
))
866 for param
in ("vim-network-name", "vim-network-id"):
867 if vld_params
.get(param
):
868 if isinstance(vld_params
[param
], dict):
869 for vim
, vim_net
in vld_params
[param
].items():
870 other_target_vim
= "vim:" + vim
872 target_vld
["vim_info"],
873 (other_target_vim
, param
.replace("-", "_")),
876 else: # isinstance str
877 target_vld
["vim_info"][target_vim
][
878 param
.replace("-", "_")
879 ] = vld_params
[param
]
880 if vld_params
.get("common_id"):
881 target_vld
["common_id"] = vld_params
.get("common_id")
883 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
884 def update_ns_vld_target(target
, ns_params
):
885 for vnf_params
in ns_params
.get("vnf", ()):
886 if vnf_params
.get("vimAccountId"):
890 for vnfr
in db_vnfrs
.values()
891 if vnf_params
["member-vnf-index"]
892 == vnfr
["member-vnf-index-ref"]
896 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
899 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
900 target_vld
= find_in_list(
901 get_iterable(vdur
, "interfaces"),
902 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
905 vld_params
= find_in_list(
906 get_iterable(ns_params
, "vld"),
907 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
910 if vnf_params
.get("vimAccountId") not in a_vld
.get(
913 target_vim_network_list
= [
914 v
for _
, v
in a_vld
.get("vim_info").items()
916 target_vim_network_name
= next(
918 item
.get("vim_network_name", "")
919 for item
in target_vim_network_list
924 target
["ns"]["vld"][a_index
].get("vim_info").update(
926 "vim:{}".format(vnf_params
["vimAccountId"]): {
927 "vim_network_name": target_vim_network_name
,
933 for param
in ("vim-network-name", "vim-network-id"):
934 if vld_params
.get(param
) and isinstance(
935 vld_params
[param
], dict
937 for vim
, vim_net
in vld_params
[
940 other_target_vim
= "vim:" + vim
942 target
["ns"]["vld"][a_index
].get(
947 param
.replace("-", "_"),
952 nslcmop_id
= db_nslcmop
["_id"]
954 "name": db_nsr
["name"],
957 "image": deepcopy(db_nsr
["image"]),
958 "flavor": deepcopy(db_nsr
["flavor"]),
959 "action_id": nslcmop_id
,
960 "cloud_init_content": {},
962 for image
in target
["image"]:
963 image
["vim_info"] = {}
964 for flavor
in target
["flavor"]:
965 flavor
["vim_info"] = {}
966 if db_nsr
.get("affinity-or-anti-affinity-group"):
967 target
["affinity-or-anti-affinity-group"] = deepcopy(
968 db_nsr
["affinity-or-anti-affinity-group"]
970 for affinity_or_anti_affinity_group
in target
[
971 "affinity-or-anti-affinity-group"
973 affinity_or_anti_affinity_group
["vim_info"] = {}
975 if db_nslcmop
.get("lcmOperationType") != "instantiate":
976 # get parameters of instantiation:
977 db_nslcmop_instantiate
= self
.db
.get_list(
980 "nsInstanceId": db_nslcmop
["nsInstanceId"],
981 "lcmOperationType": "instantiate",
984 ns_params
= db_nslcmop_instantiate
.get("operationParams")
986 ns_params
= db_nslcmop
.get("operationParams")
987 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
988 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
991 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
992 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
996 "mgmt-network": vld
.get("mgmt-network", False),
997 "type": vld
.get("type"),
1000 "vim_network_name": vld
.get("vim-network-name"),
1001 "vim_account_id": ns_params
["vimAccountId"],
1005 # check if this network needs SDN assist
1006 if vld
.get("pci-interfaces"):
1007 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1008 if vim_config
:= db_vim
.get("config"):
1009 if sdnc_id
:= vim_config
.get("sdn-controller"):
1010 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1011 target_sdn
= "sdn:{}".format(sdnc_id
)
1012 target_vld
["vim_info"][target_sdn
] = {
1014 "target_vim": target_vim
,
1016 "type": vld
.get("type"),
1019 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1020 for nsd_vnf_profile
in nsd_vnf_profiles
:
1021 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1022 if cp
["virtual-link-profile-id"] == vld
["id"]:
1024 "member_vnf:{}.{}".format(
1025 cp
["constituent-cpd-id"][0][
1026 "constituent-base-element-id"
1028 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1030 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1032 # check at nsd descriptor, if there is an ip-profile
1034 nsd_vlp
= find_in_list(
1035 get_virtual_link_profiles(nsd
),
1036 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1041 and nsd_vlp
.get("virtual-link-protocol-data")
1042 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1044 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1048 # update vld_params with instantiation params
1049 vld_instantiation_params
= find_in_list(
1050 get_iterable(ns_params
, "vld"),
1051 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1053 if vld_instantiation_params
:
1054 vld_params
.update(vld_instantiation_params
)
1055 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1056 target
["ns"]["vld"].append(target_vld
)
1057 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1058 update_ns_vld_target(target
, ns_params
)
1060 for vnfr
in db_vnfrs
.values():
1061 vnfd
= find_in_list(
1062 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1064 vnf_params
= find_in_list(
1065 get_iterable(ns_params
, "vnf"),
1066 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1068 target_vnf
= deepcopy(vnfr
)
1069 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1070 for vld
in target_vnf
.get("vld", ()):
1071 # check if connected to a ns.vld, to fill target'
1072 vnf_cp
= find_in_list(
1073 vnfd
.get("int-virtual-link-desc", ()),
1074 lambda cpd
: cpd
.get("id") == vld
["id"],
1077 ns_cp
= "member_vnf:{}.{}".format(
1078 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1080 if cp2target
.get(ns_cp
):
1081 vld
["target"] = cp2target
[ns_cp
]
1084 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1086 # check if this network needs SDN assist
1088 if vld
.get("pci-interfaces"):
1089 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1090 sdnc_id
= db_vim
["config"].get("sdn-controller")
1092 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1093 target_sdn
= "sdn:{}".format(sdnc_id
)
1094 vld
["vim_info"][target_sdn
] = {
1096 "target_vim": target_vim
,
1098 "type": vld
.get("type"),
1101 # check at vnfd descriptor, if there is an ip-profile
1103 vnfd_vlp
= find_in_list(
1104 get_virtual_link_profiles(vnfd
),
1105 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1109 and vnfd_vlp
.get("virtual-link-protocol-data")
1110 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1112 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1115 # update vld_params with instantiation params
1117 vld_instantiation_params
= find_in_list(
1118 get_iterable(vnf_params
, "internal-vld"),
1119 lambda i_vld
: i_vld
["name"] == vld
["id"],
1121 if vld_instantiation_params
:
1122 vld_params
.update(vld_instantiation_params
)
1123 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1126 for vdur
in target_vnf
.get("vdur", ()):
1127 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1128 continue # This vdu must not be created
1129 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1131 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1134 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1135 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1138 and vdu_configuration
.get("config-access")
1139 and vdu_configuration
.get("config-access").get("ssh-access")
1141 vdur
["ssh-keys"] = ssh_keys_all
1142 vdur
["ssh-access-required"] = vdu_configuration
[
1144 ]["ssh-access"]["required"]
1147 and vnf_configuration
.get("config-access")
1148 and vnf_configuration
.get("config-access").get("ssh-access")
1149 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1151 vdur
["ssh-keys"] = ssh_keys_all
1152 vdur
["ssh-access-required"] = vnf_configuration
[
1154 ]["ssh-access"]["required"]
1155 elif ssh_keys_instantiation
and find_in_list(
1156 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1158 vdur
["ssh-keys"] = ssh_keys_instantiation
1160 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1162 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1164 if vdud
.get("cloud-init-file"):
1165 vdur
["cloud-init"] = "{}:file:{}".format(
1166 vnfd
["_id"], vdud
.get("cloud-init-file")
1168 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1169 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1170 base_folder
= vnfd
["_admin"]["storage"]
1171 if base_folder
["pkg-dir"]:
1172 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1173 base_folder
["folder"],
1174 base_folder
["pkg-dir"],
1175 vdud
.get("cloud-init-file"),
1178 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1179 base_folder
["folder"],
1180 vdud
.get("cloud-init-file"),
1182 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1183 target
["cloud_init_content"][
1186 elif vdud
.get("cloud-init"):
1187 vdur
["cloud-init"] = "{}:vdu:{}".format(
1188 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1190 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1191 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1194 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1195 deploy_params_vdu
= self
._format
_additional
_params
(
1196 vdur
.get("additionalParams") or {}
1198 deploy_params_vdu
["OSM"] = get_osm_params(
1199 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1201 vdur
["additionalParams"] = deploy_params_vdu
1204 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1205 if target_vim
not in ns_flavor
["vim_info"]:
1206 ns_flavor
["vim_info"][target_vim
] = {}
1209 # in case alternative images are provided we must check if they should be applied
1210 # for the vim_type, modify the vim_type taking into account
1211 ns_image_id
= int(vdur
["ns-image-id"])
1212 if vdur
.get("alt-image-ids"):
1213 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1214 vim_type
= db_vim
["vim_type"]
1215 for alt_image_id
in vdur
.get("alt-image-ids"):
1216 ns_alt_image
= target
["image"][int(alt_image_id
)]
1217 if vim_type
== ns_alt_image
.get("vim-type"):
1218 # must use alternative image
1220 "use alternative image id: {}".format(alt_image_id
)
1222 ns_image_id
= alt_image_id
1223 vdur
["ns-image-id"] = ns_image_id
1225 ns_image
= target
["image"][int(ns_image_id
)]
1226 if target_vim
not in ns_image
["vim_info"]:
1227 ns_image
["vim_info"][target_vim
] = {}
1230 if vdur
.get("affinity-or-anti-affinity-group-id"):
1231 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1232 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1233 if target_vim
not in ns_ags
["vim_info"]:
1234 ns_ags
["vim_info"][target_vim
] = {}
1236 vdur
["vim_info"] = {target_vim
: {}}
1237 # instantiation parameters
1239 vdu_instantiation_params
= find_in_list(
1240 get_iterable(vnf_params
, "vdu"),
1241 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1243 if vdu_instantiation_params
:
1244 # Parse the vdu_volumes from the instantiation params
1245 vdu_volumes
= get_volumes_from_instantiation_params(
1246 vdu_instantiation_params
, vdud
1248 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1249 vdur
["additionalParams"]["OSM"][
1251 ] = vdu_instantiation_params
.get("vim-flavor-id")
1252 vdur_list
.append(vdur
)
1253 target_vnf
["vdur"] = vdur_list
1254 target
["vnf"].append(target_vnf
)
1256 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1257 desc
= await self
.RO
.deploy(nsr_id
, target
)
1258 self
.logger
.debug("RO return > {}".format(desc
))
1259 action_id
= desc
["action_id"]
1260 await self
._wait
_ng
_ro
(
1267 operation
="instantiation",
1272 "_admin.deployed.RO.operational-status": "running",
1273 "detailed-status": " ".join(stage
),
1275 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1276 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1277 self
._write
_op
_status
(nslcmop_id
, stage
)
1279 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1283 async def _wait_ng_ro(
1293 detailed_status_old
= None
1295 start_time
= start_time
or time()
1296 while time() <= start_time
+ timeout
:
1297 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1298 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1299 if desc_status
["status"] == "FAILED":
1300 raise NgRoException(desc_status
["details"])
1301 elif desc_status
["status"] == "BUILD":
1303 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1304 elif desc_status
["status"] == "DONE":
1306 stage
[2] = "Deployed at VIM"
1309 assert False, "ROclient.check_ns_status returns unknown {}".format(
1310 desc_status
["status"]
1312 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1313 detailed_status_old
= stage
[2]
1314 db_nsr_update
["detailed-status"] = " ".join(stage
)
1315 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1316 self
._write
_op
_status
(nslcmop_id
, stage
)
1317 await asyncio
.sleep(15)
1318 else: # timeout_ns_deploy
1319 raise NgRoException("Timeout waiting ns to deploy")
1321 async def _terminate_ng_ro(
1322 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1327 start_deploy
= time()
1334 "action_id": nslcmop_id
,
1336 desc
= await self
.RO
.deploy(nsr_id
, target
)
1337 action_id
= desc
["action_id"]
1338 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1341 + "ns terminate action at RO. action_id={}".format(action_id
)
1345 delete_timeout
= 20 * 60 # 20 minutes
1346 await self
._wait
_ng
_ro
(
1353 operation
="termination",
1355 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1357 await self
.RO
.delete(nsr_id
)
1358 except NgRoException
as e
:
1359 if e
.http_code
== 404: # not found
1360 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1361 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1363 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1365 elif e
.http_code
== 409: # conflict
1366 failed_detail
.append("delete conflict: {}".format(e
))
1369 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1372 failed_detail
.append("delete error: {}".format(e
))
1375 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1377 except Exception as e
:
1378 failed_detail
.append("delete error: {}".format(e
))
1380 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1384 stage
[2] = "Error deleting from VIM"
1386 stage
[2] = "Deleted from VIM"
1387 db_nsr_update
["detailed-status"] = " ".join(stage
)
1388 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1389 self
._write
_op
_status
(nslcmop_id
, stage
)
1392 raise LcmException("; ".join(failed_detail
))
1395 async def instantiate_RO(
1409 :param logging_text: preffix text to use at logging
1410 :param nsr_id: nsr identity
1411 :param nsd: database content of ns descriptor
1412 :param db_nsr: database content of ns record
1413 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1415 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1416 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1417 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1418 :return: None or exception
1421 start_deploy
= time()
1422 ns_params
= db_nslcmop
.get("operationParams")
1423 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1424 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1426 timeout_ns_deploy
= self
.timeout
.ns_deploy
1428 # Check for and optionally request placement optimization. Database will be updated if placement activated
1429 stage
[2] = "Waiting for Placement."
1430 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1431 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1432 for vnfr
in db_vnfrs
.values():
1433 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1436 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1438 return await self
._instantiate
_ng
_ro
(
1451 except Exception as e
:
1452 stage
[2] = "ERROR deploying at VIM"
1453 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1455 "Error deploying at VIM {}".format(e
),
1456 exc_info
=not isinstance(
1459 ROclient
.ROClientException
,
1468 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1470 Wait for kdu to be up, get ip address
1471 :param logging_text: prefix use for logging
1475 :return: IP address, K8s services
1478 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1481 while nb_tries
< 360:
1482 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1486 for x
in get_iterable(db_vnfr
, "kdur")
1487 if x
.get("kdu-name") == kdu_name
1493 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1495 if kdur
.get("status"):
1496 if kdur
["status"] in ("READY", "ENABLED"):
1497 return kdur
.get("ip-address"), kdur
.get("services")
1500 "target KDU={} is in error state".format(kdu_name
)
1503 await asyncio
.sleep(10)
1505 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1507 async def wait_vm_up_insert_key_ro(
1508 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1511 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1512 :param logging_text: prefix use for logging
1517 :param pub_key: public ssh key to inject, None to skip
1518 :param user: user to apply the public ssh key
1522 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1524 target_vdu_id
= None
1529 if ro_retries
>= 360: # 1 hour
1531 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1534 await asyncio
.sleep(10)
1537 if not target_vdu_id
:
1538 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1540 if not vdu_id
: # for the VNF case
1541 if db_vnfr
.get("status") == "ERROR":
1543 "Cannot inject ssh-key because target VNF is in error state"
1545 ip_address
= db_vnfr
.get("ip-address")
1551 for x
in get_iterable(db_vnfr
, "vdur")
1552 if x
.get("ip-address") == ip_address
1560 for x
in get_iterable(db_vnfr
, "vdur")
1561 if x
.get("vdu-id-ref") == vdu_id
1562 and x
.get("count-index") == vdu_index
1568 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1569 ): # If only one, this should be the target vdu
1570 vdur
= db_vnfr
["vdur"][0]
1573 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1574 vnfr_id
, vdu_id
, vdu_index
1577 # New generation RO stores information at "vim_info"
1580 if vdur
.get("vim_info"):
1582 t
for t
in vdur
["vim_info"]
1583 ) # there should be only one key
1584 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1586 vdur
.get("pdu-type")
1587 or vdur
.get("status") == "ACTIVE"
1588 or ng_ro_status
== "ACTIVE"
1590 ip_address
= vdur
.get("ip-address")
1593 target_vdu_id
= vdur
["vdu-id-ref"]
1594 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1596 "Cannot inject ssh-key because target VM is in error state"
1599 if not target_vdu_id
:
1602 # inject public key into machine
1603 if pub_key
and user
:
1604 self
.logger
.debug(logging_text
+ "Inserting RO key")
1605 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1606 if vdur
.get("pdu-type"):
1607 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1612 "action": "inject_ssh_key",
1616 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1618 desc
= await self
.RO
.deploy(nsr_id
, target
)
1619 action_id
= desc
["action_id"]
1620 await self
._wait
_ng
_ro
(
1621 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1624 except NgRoException
as e
:
1626 "Reaching max tries injecting key. Error: {}".format(e
)
1633 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1635 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1637 my_vca
= vca_deployed_list
[vca_index
]
1638 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1639 # vdu or kdu: no dependencies
1643 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1644 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1645 configuration_status_list
= db_nsr
["configurationStatus"]
1646 for index
, vca_deployed
in enumerate(configuration_status_list
):
1647 if index
== vca_index
:
1650 if not my_vca
.get("member-vnf-index") or (
1651 vca_deployed
.get("member-vnf-index")
1652 == my_vca
.get("member-vnf-index")
1654 internal_status
= configuration_status_list
[index
].get("status")
1655 if internal_status
== "READY":
1657 elif internal_status
== "BROKEN":
1659 "Configuration aborted because dependent charm/s has failed"
1664 # no dependencies, return
1666 await asyncio
.sleep(10)
1669 raise LcmException("Configuration aborted because dependent charm/s timeout")
1671 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1674 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1676 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1677 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1680 async def instantiate_N2VC(
1698 ee_config_descriptor
,
1700 nsr_id
= db_nsr
["_id"]
1701 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1702 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1703 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1704 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1706 "collection": "nsrs",
1707 "filter": {"_id": nsr_id
},
1708 "path": db_update_entry
,
1713 element_under_configuration
= nsr_id
1717 vnfr_id
= db_vnfr
["_id"]
1718 osm_config
["osm"]["vnf_id"] = vnfr_id
1720 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1722 if vca_type
== "native_charm":
1725 index_number
= vdu_index
or 0
1728 element_type
= "VNF"
1729 element_under_configuration
= vnfr_id
1730 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1732 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1733 element_type
= "VDU"
1734 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1735 osm_config
["osm"]["vdu_id"] = vdu_id
1737 namespace
+= ".{}".format(kdu_name
)
1738 element_type
= "KDU"
1739 element_under_configuration
= kdu_name
1740 osm_config
["osm"]["kdu_name"] = kdu_name
1743 if base_folder
["pkg-dir"]:
1744 artifact_path
= "{}/{}/{}/{}".format(
1745 base_folder
["folder"],
1746 base_folder
["pkg-dir"],
1749 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1754 artifact_path
= "{}/Scripts/{}/{}/".format(
1755 base_folder
["folder"],
1758 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1763 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1765 # get initial_config_primitive_list that applies to this element
1766 initial_config_primitive_list
= config_descriptor
.get(
1767 "initial-config-primitive"
1771 "Initial config primitive list > {}".format(
1772 initial_config_primitive_list
1776 # add config if not present for NS charm
1777 ee_descriptor_id
= ee_config_descriptor
.get("id")
1778 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1779 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1780 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1784 "Initial config primitive list #2 > {}".format(
1785 initial_config_primitive_list
1788 # n2vc_redesign STEP 3.1
1789 # find old ee_id if exists
1790 ee_id
= vca_deployed
.get("ee_id")
1792 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1793 # create or register execution environment in VCA
1794 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1795 self
._write
_configuration
_status
(
1797 vca_index
=vca_index
,
1799 element_under_configuration
=element_under_configuration
,
1800 element_type
=element_type
,
1803 step
= "create execution environment"
1804 self
.logger
.debug(logging_text
+ step
)
1808 if vca_type
== "k8s_proxy_charm":
1809 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1810 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1811 namespace
=namespace
,
1812 artifact_path
=artifact_path
,
1816 elif vca_type
== "helm" or vca_type
== "helm-v3":
1817 ee_id
, credentials
= await self
.vca_map
[
1819 ].create_execution_environment(
1820 namespace
=namespace
,
1824 artifact_path
=artifact_path
,
1825 chart_model
=vca_name
,
1829 ee_id
, credentials
= await self
.vca_map
[
1831 ].create_execution_environment(
1832 namespace
=namespace
,
1838 elif vca_type
== "native_charm":
1839 step
= "Waiting to VM being up and getting IP address"
1840 self
.logger
.debug(logging_text
+ step
)
1841 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1850 credentials
= {"hostname": rw_mgmt_ip
}
1852 username
= deep_get(
1853 config_descriptor
, ("config-access", "ssh-access", "default-user")
1855 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1856 # merged. Meanwhile let's get username from initial-config-primitive
1857 if not username
and initial_config_primitive_list
:
1858 for config_primitive
in initial_config_primitive_list
:
1859 for param
in config_primitive
.get("parameter", ()):
1860 if param
["name"] == "ssh-username":
1861 username
= param
["value"]
1865 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1866 "'config-access.ssh-access.default-user'"
1868 credentials
["username"] = username
1869 # n2vc_redesign STEP 3.2
1871 self
._write
_configuration
_status
(
1873 vca_index
=vca_index
,
1874 status
="REGISTERING",
1875 element_under_configuration
=element_under_configuration
,
1876 element_type
=element_type
,
1879 step
= "register execution environment {}".format(credentials
)
1880 self
.logger
.debug(logging_text
+ step
)
1881 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1882 credentials
=credentials
,
1883 namespace
=namespace
,
1888 # for compatibility with MON/POL modules, the need model and application name at database
1889 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1890 ee_id_parts
= ee_id
.split(".")
1891 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1892 if len(ee_id_parts
) >= 2:
1893 model_name
= ee_id_parts
[0]
1894 application_name
= ee_id_parts
[1]
1895 db_nsr_update
[db_update_entry
+ "model"] = model_name
1896 db_nsr_update
[db_update_entry
+ "application"] = application_name
1898 # n2vc_redesign STEP 3.3
1899 step
= "Install configuration Software"
1901 self
._write
_configuration
_status
(
1903 vca_index
=vca_index
,
1904 status
="INSTALLING SW",
1905 element_under_configuration
=element_under_configuration
,
1906 element_type
=element_type
,
1907 other_update
=db_nsr_update
,
1910 # TODO check if already done
1911 self
.logger
.debug(logging_text
+ step
)
1913 if vca_type
== "native_charm":
1914 config_primitive
= next(
1915 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1918 if config_primitive
:
1919 config
= self
._map
_primitive
_params
(
1920 config_primitive
, {}, deploy_params
1923 if vca_type
== "lxc_proxy_charm":
1924 if element_type
== "NS":
1925 num_units
= db_nsr
.get("config-units") or 1
1926 elif element_type
== "VNF":
1927 num_units
= db_vnfr
.get("config-units") or 1
1928 elif element_type
== "VDU":
1929 for v
in db_vnfr
["vdur"]:
1930 if vdu_id
== v
["vdu-id-ref"]:
1931 num_units
= v
.get("config-units") or 1
1933 if vca_type
!= "k8s_proxy_charm":
1934 await self
.vca_map
[vca_type
].install_configuration_sw(
1936 artifact_path
=artifact_path
,
1939 num_units
=num_units
,
1944 # write in db flag of configuration_sw already installed
1946 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1949 # add relations for this VCA (wait for other peers related with this VCA)
1950 is_relation_added
= await self
._add
_vca
_relations
(
1951 logging_text
=logging_text
,
1954 vca_index
=vca_index
,
1957 if not is_relation_added
:
1958 raise LcmException("Relations could not be added to VCA.")
1960 # if SSH access is required, then get execution environment SSH public
1961 # if native charm we have waited already to VM be UP
1962 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1965 # self.logger.debug("get ssh key block")
1967 config_descriptor
, ("config-access", "ssh-access", "required")
1969 # self.logger.debug("ssh key needed")
1970 # Needed to inject a ssh key
1973 ("config-access", "ssh-access", "default-user"),
1975 step
= "Install configuration Software, getting public ssh key"
1976 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1977 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1980 step
= "Insert public key into VM user={} ssh_key={}".format(
1984 # self.logger.debug("no need to get ssh key")
1985 step
= "Waiting to VM being up and getting IP address"
1986 self
.logger
.debug(logging_text
+ step
)
1988 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1991 # n2vc_redesign STEP 5.1
1992 # wait for RO (ip-address) Insert pub_key into VM
1995 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1996 logging_text
, nsr_id
, vnfr_id
, kdu_name
1998 vnfd
= self
.db
.get_one(
2000 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2002 kdu
= get_kdu(vnfd
, kdu_name
)
2004 service
["name"] for service
in get_kdu_services(kdu
)
2006 exposed_services
= []
2007 for service
in services
:
2008 if any(s
in service
["name"] for s
in kdu_services
):
2009 exposed_services
.append(service
)
2010 await self
.vca_map
[vca_type
].exec_primitive(
2012 primitive_name
="config",
2014 "osm-config": json
.dumps(
2016 k8s
={"services": exposed_services
}
2023 # This verification is needed in order to avoid trying to add a public key
2024 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2025 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2026 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2028 elif db_vnfr
.get("vdur"):
2029 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2039 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2041 # store rw_mgmt_ip in deploy params for later replacement
2042 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2044 # n2vc_redesign STEP 6 Execute initial config primitive
2045 step
= "execute initial config primitive"
2047 # wait for dependent primitives execution (NS -> VNF -> VDU)
2048 if initial_config_primitive_list
:
2049 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2051 # stage, in function of element type: vdu, kdu, vnf or ns
2052 my_vca
= vca_deployed_list
[vca_index
]
2053 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2055 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2056 elif my_vca
.get("member-vnf-index"):
2058 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2061 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2063 self
._write
_configuration
_status
(
2064 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2067 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2069 check_if_terminated_needed
= True
2070 for initial_config_primitive
in initial_config_primitive_list
:
2071 # adding information on the vca_deployed if it is a NS execution environment
2072 if not vca_deployed
["member-vnf-index"]:
2073 deploy_params
["ns_config_info"] = json
.dumps(
2074 self
._get
_ns
_config
_info
(nsr_id
)
2076 # TODO check if already done
2077 primitive_params_
= self
._map
_primitive
_params
(
2078 initial_config_primitive
, {}, deploy_params
2081 step
= "execute primitive '{}' params '{}'".format(
2082 initial_config_primitive
["name"], primitive_params_
2084 self
.logger
.debug(logging_text
+ step
)
2085 await self
.vca_map
[vca_type
].exec_primitive(
2087 primitive_name
=initial_config_primitive
["name"],
2088 params_dict
=primitive_params_
,
2093 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2094 if check_if_terminated_needed
:
2095 if config_descriptor
.get("terminate-config-primitive"):
2097 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2099 check_if_terminated_needed
= False
2101 # TODO register in database that primitive is done
2103 # STEP 7 Configure metrics
2104 if vca_type
== "helm" or vca_type
== "helm-v3":
2105 # TODO: review for those cases where the helm chart is a reference and
2106 # is not part of the NF package
2107 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2109 artifact_path
=artifact_path
,
2110 ee_config_descriptor
=ee_config_descriptor
,
2113 target_ip
=rw_mgmt_ip
,
2114 element_type
=element_type
,
2115 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2117 vdu_index
=vdu_index
,
2119 kdu_index
=kdu_index
,
2125 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2128 for job
in prometheus_jobs
:
2131 {"job_name": job
["job_name"]},
2134 fail_on_empty
=False,
2137 step
= "instantiated at VCA"
2138 self
.logger
.debug(logging_text
+ step
)
2140 self
._write
_configuration
_status
(
2141 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2144 except Exception as e
: # TODO not use Exception but N2VC exception
2145 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2147 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2150 "Exception while {} : {}".format(step
, e
), exc_info
=True
2152 self
._write
_configuration
_status
(
2153 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2155 raise LcmException("{}. {}".format(step
, e
)) from e
2157 def _write_ns_status(
2161 current_operation
: str,
2162 current_operation_id
: str,
2163 error_description
: str = None,
2164 error_detail
: str = None,
2165 other_update
: dict = None,
2168 Update db_nsr fields.
2171 :param current_operation:
2172 :param current_operation_id:
2173 :param error_description:
2174 :param error_detail:
2175 :param other_update: Other required changes at database if provided, will be cleared
2179 db_dict
= other_update
or {}
2182 ] = current_operation_id
# for backward compatibility
2183 db_dict
["_admin.current-operation"] = current_operation_id
2184 db_dict
["_admin.operation-type"] = (
2185 current_operation
if current_operation
!= "IDLE" else None
2187 db_dict
["currentOperation"] = current_operation
2188 db_dict
["currentOperationID"] = current_operation_id
2189 db_dict
["errorDescription"] = error_description
2190 db_dict
["errorDetail"] = error_detail
2193 db_dict
["nsState"] = ns_state
2194 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2195 except DbException
as e
:
2196 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2198 def _write_op_status(
2202 error_message
: str = None,
2203 queuePosition
: int = 0,
2204 operation_state
: str = None,
2205 other_update
: dict = None,
2208 db_dict
= other_update
or {}
2209 db_dict
["queuePosition"] = queuePosition
2210 if isinstance(stage
, list):
2211 db_dict
["stage"] = stage
[0]
2212 db_dict
["detailed-status"] = " ".join(stage
)
2213 elif stage
is not None:
2214 db_dict
["stage"] = str(stage
)
2216 if error_message
is not None:
2217 db_dict
["errorMessage"] = error_message
2218 if operation_state
is not None:
2219 db_dict
["operationState"] = operation_state
2220 db_dict
["statusEnteredTime"] = time()
2221 self
.update_db_2("nslcmops", op_id
, db_dict
)
2222 except DbException
as e
:
2224 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2227 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2229 nsr_id
= db_nsr
["_id"]
2230 # configurationStatus
2231 config_status
= db_nsr
.get("configurationStatus")
2234 "configurationStatus.{}.status".format(index
): status
2235 for index
, v
in enumerate(config_status
)
2239 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2241 except DbException
as e
:
2243 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2246 def _write_configuration_status(
2251 element_under_configuration
: str = None,
2252 element_type
: str = None,
2253 other_update
: dict = None,
2255 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2256 # .format(vca_index, status))
2259 db_path
= "configurationStatus.{}.".format(vca_index
)
2260 db_dict
= other_update
or {}
2262 db_dict
[db_path
+ "status"] = status
2263 if element_under_configuration
:
2265 db_path
+ "elementUnderConfiguration"
2266 ] = element_under_configuration
2268 db_dict
[db_path
+ "elementType"] = element_type
2269 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2270 except DbException
as e
:
2272 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2273 status
, nsr_id
, vca_index
, e
2277 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2279 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2280 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2281 Database is used because the result can be obtained from a different LCM worker in case of HA.
2282 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2283 :param db_nslcmop: database content of nslcmop
2284 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2285 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2286 computed 'vim-account-id'
2289 nslcmop_id
= db_nslcmop
["_id"]
2290 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2291 if placement_engine
== "PLA":
2293 logging_text
+ "Invoke and wait for placement optimization"
2295 await self
.msg
.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id
})
2296 db_poll_interval
= 5
2297 wait
= db_poll_interval
* 10
2299 while not pla_result
and wait
>= 0:
2300 await asyncio
.sleep(db_poll_interval
)
2301 wait
-= db_poll_interval
2302 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2303 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2307 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2310 for pla_vnf
in pla_result
["vnf"]:
2311 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2312 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2317 {"_id": vnfr
["_id"]},
2318 {"vim-account-id": pla_vnf
["vimAccountId"]},
2321 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2324 def _gather_vnfr_healing_alerts(self
, vnfr
, vnfd
):
2326 nsr_id
= vnfr
["nsr-id-ref"]
2327 df
= vnfd
.get("df", [{}])[0]
2328 # Checking for auto-healing configuration
2329 if "healing-aspect" in df
:
2330 healing_aspects
= df
["healing-aspect"]
2331 for healing
in healing_aspects
:
2332 for healing_policy
in healing
.get("healing-policy", ()):
2333 vdu_id
= healing_policy
["vdu-id"]
2335 (vdur
for vdur
in vnfr
["vdur"] if vdu_id
== vdur
["vdu-id-ref"]),
2340 metric_name
= "vm_status"
2341 vdu_name
= vdur
.get("name")
2342 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2344 name
= f
"healing_{uuid}"
2345 action
= healing_policy
2346 # action_on_recovery = healing.get("action-on-recovery")
2347 # cooldown_time = healing.get("cooldown-time")
2348 # day1 = healing.get("day1")
2352 "metric": metric_name
,
2355 "vnf_member_index": vnf_member_index
,
2356 "vdu_name": vdu_name
,
2358 "alarm_status": "ok",
2359 "action_type": "healing",
2362 alerts
.append(alert
)
2365 def _gather_vnfr_scaling_alerts(self
, vnfr
, vnfd
):
2367 nsr_id
= vnfr
["nsr-id-ref"]
2368 df
= vnfd
.get("df", [{}])[0]
2369 # Checking for auto-scaling configuration
2370 if "scaling-aspect" in df
:
2371 rel_operation_types
= {
2379 scaling_aspects
= df
["scaling-aspect"]
2380 all_vnfd_monitoring_params
= {}
2381 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
2382 for mp
in ivld
.get("monitoring-parameters", ()):
2383 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2384 for vdu
in vnfd
.get("vdu", ()):
2385 for mp
in vdu
.get("monitoring-parameter", ()):
2386 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2387 for df
in vnfd
.get("df", ()):
2388 for mp
in df
.get("monitoring-parameter", ()):
2389 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2390 for scaling_aspect
in scaling_aspects
:
2391 scaling_group_name
= scaling_aspect
.get("name", "")
2392 # Get monitored VDUs
2393 all_monitored_vdus
= set()
2394 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get(
2397 for vdu_delta
in delta
.get("vdu-delta", ()):
2398 all_monitored_vdus
.add(vdu_delta
.get("id"))
2399 monitored_vdurs
= list(
2401 lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
,
2405 if not monitored_vdurs
:
2407 "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
2410 for scaling_policy
in scaling_aspect
.get("scaling-policy", ()):
2411 if scaling_policy
["scaling-type"] != "automatic":
2413 threshold_time
= scaling_policy
.get("threshold-time", "1")
2414 cooldown_time
= scaling_policy
.get("cooldown-time", "0")
2415 for scaling_criteria
in scaling_policy
["scaling-criteria"]:
2416 monitoring_param_ref
= scaling_criteria
.get(
2417 "vnf-monitoring-param-ref"
2419 vnf_monitoring_param
= all_vnfd_monitoring_params
[
2420 monitoring_param_ref
2422 for vdur
in monitored_vdurs
:
2423 vdu_id
= vdur
["vdu-id-ref"]
2424 metric_name
= vnf_monitoring_param
.get("performance-metric")
2425 metric_name
= f
"osm_{metric_name}"
2426 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2427 scalein_threshold
= scaling_criteria
.get(
2428 "scale-in-threshold"
2430 scaleout_threshold
= scaling_criteria
.get(
2431 "scale-out-threshold"
2433 # Looking for min/max-number-of-instances
2434 instances_min_number
= 1
2435 instances_max_number
= 1
2436 vdu_profile
= df
["vdu-profile"]
2439 item
for item
in vdu_profile
if item
["id"] == vdu_id
2441 instances_min_number
= profile
.get(
2442 "min-number-of-instances", 1
2444 instances_max_number
= profile
.get(
2445 "max-number-of-instances", 1
2448 if scalein_threshold
:
2450 name
= f
"scalein_{uuid}"
2451 operation
= scaling_criteria
[
2452 "scale-in-relational-operation"
2454 rel_operator
= rel_operation_types
.get(operation
, "<=")
2455 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2456 expression
= f
"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
2459 "vnf_member_index": vnf_member_index
,
2465 "for": str(threshold_time
) + "m",
2468 action
= scaling_policy
2470 "scaling-group": scaling_group_name
,
2471 "cooldown-time": cooldown_time
,
2476 "metric": metric_name
,
2479 "vnf_member_index": vnf_member_index
,
2482 "alarm_status": "ok",
2483 "action_type": "scale_in",
2485 "prometheus_config": prom_cfg
,
2487 alerts
.append(alert
)
2489 if scaleout_threshold
:
2491 name
= f
"scaleout_{uuid}"
2492 operation
= scaling_criteria
[
2493 "scale-out-relational-operation"
2495 rel_operator
= rel_operation_types
.get(operation
, "<=")
2496 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2497 expression
= f
"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
2500 "vnf_member_index": vnf_member_index
,
2506 "for": str(threshold_time
) + "m",
2509 action
= scaling_policy
2511 "scaling-group": scaling_group_name
,
2512 "cooldown-time": cooldown_time
,
2517 "metric": metric_name
,
2520 "vnf_member_index": vnf_member_index
,
2523 "alarm_status": "ok",
2524 "action_type": "scale_out",
2526 "prometheus_config": prom_cfg
,
2528 alerts
.append(alert
)
2531 def update_nsrs_with_pla_result(self
, params
):
2533 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2535 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2537 except Exception as e
:
2538 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2540 async def instantiate(self
, nsr_id
, nslcmop_id
):
2543 :param nsr_id: ns instance to deploy
2544 :param nslcmop_id: operation to run
2548 # Try to lock HA task here
2549 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2550 if not task_is_locked_by_me
:
2552 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2556 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2557 self
.logger
.debug(logging_text
+ "Enter")
2559 # get all needed from database
2561 # database nsrs record
2564 # database nslcmops record
2567 # update operation on nsrs
2569 # update operation on nslcmops
2570 db_nslcmop_update
= {}
2572 timeout_ns_deploy
= self
.timeout
.ns_deploy
2574 nslcmop_operation_state
= None
2575 db_vnfrs
= {} # vnf's info indexed by member-index
2577 tasks_dict_info
= {} # from task to info text
2581 "Stage 1/5: preparation of the environment.",
2582 "Waiting for previous operations to terminate.",
2585 # ^ stage, step, VIM progress
2587 # wait for any previous tasks in process
2588 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2590 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2591 stage
[1] = "Reading from database."
2592 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2593 db_nsr_update
["detailed-status"] = "creating"
2594 db_nsr_update
["operational-status"] = "init"
2595 self
._write
_ns
_status
(
2597 ns_state
="BUILDING",
2598 current_operation
="INSTANTIATING",
2599 current_operation_id
=nslcmop_id
,
2600 other_update
=db_nsr_update
,
2602 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2604 # read from db: operation
2605 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2606 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2607 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2608 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2609 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2611 ns_params
= db_nslcmop
.get("operationParams")
2612 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2613 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2616 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2617 self
.logger
.debug(logging_text
+ stage
[1])
2618 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2619 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2620 self
.logger
.debug(logging_text
+ stage
[1])
2621 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2622 self
.fs
.sync(db_nsr
["nsd-id"])
2624 # nsr_name = db_nsr["name"] # TODO short-name??
2626 # read from db: vnf's of this ns
2627 stage
[1] = "Getting vnfrs from db."
2628 self
.logger
.debug(logging_text
+ stage
[1])
2629 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2631 # read from db: vnfd's for every vnf
2632 db_vnfds
= [] # every vnfd data
2634 # for each vnf in ns, read vnfd
2635 for vnfr
in db_vnfrs_list
:
2636 if vnfr
.get("kdur"):
2638 for kdur
in vnfr
["kdur"]:
2639 if kdur
.get("additionalParams"):
2640 kdur
["additionalParams"] = json
.loads(
2641 kdur
["additionalParams"]
2643 kdur_list
.append(kdur
)
2644 vnfr
["kdur"] = kdur_list
2646 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2647 vnfd_id
= vnfr
["vnfd-id"]
2648 vnfd_ref
= vnfr
["vnfd-ref"]
2649 self
.fs
.sync(vnfd_id
)
2651 # if we haven't this vnfd, read it from db
2652 if vnfd_id
not in db_vnfds
:
2654 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2657 self
.logger
.debug(logging_text
+ stage
[1])
2658 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2661 db_vnfds
.append(vnfd
)
2663 # Get or generates the _admin.deployed.VCA list
2664 vca_deployed_list
= None
2665 if db_nsr
["_admin"].get("deployed"):
2666 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2667 if vca_deployed_list
is None:
2668 vca_deployed_list
= []
2669 configuration_status_list
= []
2670 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2671 db_nsr_update
["configurationStatus"] = configuration_status_list
2672 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2673 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2674 elif isinstance(vca_deployed_list
, dict):
2675 # maintain backward compatibility. Change a dict to list at database
2676 vca_deployed_list
= list(vca_deployed_list
.values())
2677 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2678 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2681 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2683 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2684 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2686 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2687 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2688 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2690 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2693 # n2vc_redesign STEP 2 Deploy Network Scenario
2694 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2695 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2697 stage
[1] = "Deploying KDUs."
2698 # self.logger.debug(logging_text + "Before deploy_kdus")
2699 # Call to deploy_kdus in case exists the "vdu:kdu" param
2700 await self
.deploy_kdus(
2701 logging_text
=logging_text
,
2703 nslcmop_id
=nslcmop_id
,
2706 task_instantiation_info
=tasks_dict_info
,
2709 stage
[1] = "Getting VCA public key."
2710 # n2vc_redesign STEP 1 Get VCA public ssh-key
2711 # feature 1429. Add n2vc public key to needed VMs
2712 n2vc_key
= self
.n2vc
.get_public_key()
2713 n2vc_key_list
= [n2vc_key
]
2714 if self
.vca_config
.public_key
:
2715 n2vc_key_list
.append(self
.vca_config
.public_key
)
2717 stage
[1] = "Deploying NS at VIM."
2718 task_ro
= asyncio
.ensure_future(
2719 self
.instantiate_RO(
2720 logging_text
=logging_text
,
2724 db_nslcmop
=db_nslcmop
,
2727 n2vc_key_list
=n2vc_key_list
,
2731 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2732 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2734 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2735 stage
[1] = "Deploying Execution Environments."
2736 self
.logger
.debug(logging_text
+ stage
[1])
2738 # create namespace and certificate if any helm based EE is present in the NS
2739 if check_helm_ee_in_ns(db_vnfds
):
2740 # TODO: create EE namespace
2741 # create TLS certificates
2742 await self
.vca_map
["helm-v3"].create_tls_certificate(
2743 secret_name
="ee-tls-{}".format(nsr_id
),
2746 usage
="server auth",
2749 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2750 for vnf_profile
in get_vnf_profiles(nsd
):
2751 vnfd_id
= vnf_profile
["vnfd-id"]
2752 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2753 member_vnf_index
= str(vnf_profile
["id"])
2754 db_vnfr
= db_vnfrs
[member_vnf_index
]
2755 base_folder
= vnfd
["_admin"]["storage"]
2762 # Get additional parameters
2763 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2764 if db_vnfr
.get("additionalParamsForVnf"):
2765 deploy_params
.update(
2766 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2769 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2770 if descriptor_config
:
2772 logging_text
=logging_text
2773 + "member_vnf_index={} ".format(member_vnf_index
),
2776 nslcmop_id
=nslcmop_id
,
2782 member_vnf_index
=member_vnf_index
,
2783 vdu_index
=vdu_index
,
2784 kdu_index
=kdu_index
,
2786 deploy_params
=deploy_params
,
2787 descriptor_config
=descriptor_config
,
2788 base_folder
=base_folder
,
2789 task_instantiation_info
=tasks_dict_info
,
2793 # Deploy charms for each VDU that supports one.
2794 for vdud
in get_vdu_list(vnfd
):
2796 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2797 vdur
= find_in_list(
2798 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2801 if vdur
.get("additionalParams"):
2802 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2804 deploy_params_vdu
= deploy_params
2805 deploy_params_vdu
["OSM"] = get_osm_params(
2806 db_vnfr
, vdu_id
, vdu_count_index
=0
2808 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2810 self
.logger
.debug("VDUD > {}".format(vdud
))
2812 "Descriptor config > {}".format(descriptor_config
)
2814 if descriptor_config
:
2818 for vdu_index
in range(vdud_count
):
2819 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2821 logging_text
=logging_text
2822 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2823 member_vnf_index
, vdu_id
, vdu_index
2827 nslcmop_id
=nslcmop_id
,
2833 kdu_index
=kdu_index
,
2834 member_vnf_index
=member_vnf_index
,
2835 vdu_index
=vdu_index
,
2837 deploy_params
=deploy_params_vdu
,
2838 descriptor_config
=descriptor_config
,
2839 base_folder
=base_folder
,
2840 task_instantiation_info
=tasks_dict_info
,
2843 for kdud
in get_kdu_list(vnfd
):
2844 kdu_name
= kdud
["name"]
2845 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2846 if descriptor_config
:
2850 kdu_index
, kdur
= next(
2852 for x
in enumerate(db_vnfr
["kdur"])
2853 if x
[1]["kdu-name"] == kdu_name
2855 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2856 if kdur
.get("additionalParams"):
2857 deploy_params_kdu
.update(
2858 parse_yaml_strings(kdur
["additionalParams"].copy())
2862 logging_text
=logging_text
,
2865 nslcmop_id
=nslcmop_id
,
2871 member_vnf_index
=member_vnf_index
,
2872 vdu_index
=vdu_index
,
2873 kdu_index
=kdu_index
,
2875 deploy_params
=deploy_params_kdu
,
2876 descriptor_config
=descriptor_config
,
2877 base_folder
=base_folder
,
2878 task_instantiation_info
=tasks_dict_info
,
2882 # Check if this NS has a charm configuration
2883 descriptor_config
= nsd
.get("ns-configuration")
2884 if descriptor_config
and descriptor_config
.get("juju"):
2887 member_vnf_index
= None
2894 # Get additional parameters
2895 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2896 if db_nsr
.get("additionalParamsForNs"):
2897 deploy_params
.update(
2898 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2900 base_folder
= nsd
["_admin"]["storage"]
2902 logging_text
=logging_text
,
2905 nslcmop_id
=nslcmop_id
,
2911 member_vnf_index
=member_vnf_index
,
2912 vdu_index
=vdu_index
,
2913 kdu_index
=kdu_index
,
2915 deploy_params
=deploy_params
,
2916 descriptor_config
=descriptor_config
,
2917 base_folder
=base_folder
,
2918 task_instantiation_info
=tasks_dict_info
,
2922 # rest of staff will be done at finally
2925 ROclient
.ROClientException
,
2931 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2934 except asyncio
.CancelledError
:
2936 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2938 exc
= "Operation was cancelled"
2939 except Exception as e
:
2940 exc
= traceback
.format_exc()
2941 self
.logger
.critical(
2942 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2947 error_list
.append(str(exc
))
2949 # wait for pending tasks
2951 stage
[1] = "Waiting for instantiate pending tasks."
2952 self
.logger
.debug(logging_text
+ stage
[1])
2953 error_list
+= await self
._wait
_for
_tasks
(
2961 stage
[1] = stage
[2] = ""
2962 except asyncio
.CancelledError
:
2963 error_list
.append("Cancelled")
2964 # TODO cancel all tasks
2965 except Exception as exc
:
2966 error_list
.append(str(exc
))
2968 # update operation-status
2969 db_nsr_update
["operational-status"] = "running"
2970 # let's begin with VCA 'configured' status (later we can change it)
2971 db_nsr_update
["config-status"] = "configured"
2972 for task
, task_name
in tasks_dict_info
.items():
2973 if not task
.done() or task
.cancelled() or task
.exception():
2974 if task_name
.startswith(self
.task_name_deploy_vca
):
2975 # A N2VC task is pending
2976 db_nsr_update
["config-status"] = "failed"
2978 # RO or KDU task is pending
2979 db_nsr_update
["operational-status"] = "failed"
2981 # update status at database
2983 error_detail
= ". ".join(error_list
)
2984 self
.logger
.error(logging_text
+ error_detail
)
2985 error_description_nslcmop
= "{} Detail: {}".format(
2986 stage
[0], error_detail
2988 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2989 nslcmop_id
, stage
[0]
2992 db_nsr_update
["detailed-status"] = (
2993 error_description_nsr
+ " Detail: " + error_detail
2995 db_nslcmop_update
["detailed-status"] = error_detail
2996 nslcmop_operation_state
= "FAILED"
3000 error_description_nsr
= error_description_nslcmop
= None
3002 db_nsr_update
["detailed-status"] = "Done"
3003 db_nslcmop_update
["detailed-status"] = "Done"
3004 nslcmop_operation_state
= "COMPLETED"
3005 # Gather auto-healing and auto-scaling alerts for each vnfr
3008 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3010 (sub
for sub
in db_vnfds
if sub
["_id"] == vnfr
["vnfd-id"]), None
3012 healing_alerts
= self
._gather
_vnfr
_healing
_alerts
(vnfr
, vnfd
)
3013 for alert
in healing_alerts
:
3014 self
.logger
.info(f
"Storing healing alert in MongoDB: {alert}")
3015 self
.db
.create("alerts", alert
)
3017 scaling_alerts
= self
._gather
_vnfr
_scaling
_alerts
(vnfr
, vnfd
)
3018 for alert
in scaling_alerts
:
3019 self
.logger
.info(f
"Storing scaling alert in MongoDB: {alert}")
3020 self
.db
.create("alerts", alert
)
3023 self
._write
_ns
_status
(
3026 current_operation
="IDLE",
3027 current_operation_id
=None,
3028 error_description
=error_description_nsr
,
3029 error_detail
=error_detail
,
3030 other_update
=db_nsr_update
,
3032 self
._write
_op
_status
(
3035 error_message
=error_description_nslcmop
,
3036 operation_state
=nslcmop_operation_state
,
3037 other_update
=db_nslcmop_update
,
3040 if nslcmop_operation_state
:
3042 await self
.msg
.aiowrite(
3047 "nslcmop_id": nslcmop_id
,
3048 "operationState": nslcmop_operation_state
,
3051 except Exception as e
:
3053 logging_text
+ "kafka_write notification Exception {}".format(e
)
3056 self
.logger
.debug(logging_text
+ "Exit")
3057 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
3059 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
3060 if vnfd_id
not in cached_vnfds
:
3061 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
3062 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
3064 return cached_vnfds
[vnfd_id
]
3066 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
3067 if vnf_profile_id
not in cached_vnfrs
:
3068 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
3071 "member-vnf-index-ref": vnf_profile_id
,
3072 "nsr-id-ref": nsr_id
,
3075 return cached_vnfrs
[vnf_profile_id
]
3077 def _is_deployed_vca_in_relation(
3078 self
, vca
: DeployedVCA
, relation
: Relation
3081 for endpoint
in (relation
.provider
, relation
.requirer
):
3082 if endpoint
["kdu-resource-profile-id"]:
3085 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
3086 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
3087 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
3093 def _update_ee_relation_data_with_implicit_data(
3094 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
3096 ee_relation_data
= safe_get_ee_relation(
3097 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
3099 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
3100 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
3101 "execution-environment-ref"
3103 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
3104 vnfd_id
= vnf_profile
["vnfd-id"]
3105 project
= nsd
["_admin"]["projects_read"][0]
3106 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3109 if ee_relation_level
== EELevel
.VNF
3110 else ee_relation_data
["vdu-profile-id"]
3112 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
3115 f
"not execution environments found for ee_relation {ee_relation_data}"
3117 ee_relation_data
["execution-environment-ref"] = ee
["id"]
3118 return ee_relation_data
3120 def _get_ns_relations(
3123 nsd
: Dict
[str, Any
],
3125 cached_vnfds
: Dict
[str, Any
],
3126 ) -> List
[Relation
]:
3128 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3129 for r
in db_ns_relations
:
3130 provider_dict
= None
3131 requirer_dict
= None
3132 if all(key
in r
for key
in ("provider", "requirer")):
3133 provider_dict
= r
["provider"]
3134 requirer_dict
= r
["requirer"]
3135 elif "entities" in r
:
3136 provider_id
= r
["entities"][0]["id"]
3139 "endpoint": r
["entities"][0]["endpoint"],
3141 if provider_id
!= nsd
["id"]:
3142 provider_dict
["vnf-profile-id"] = provider_id
3143 requirer_id
= r
["entities"][1]["id"]
3146 "endpoint": r
["entities"][1]["endpoint"],
3148 if requirer_id
!= nsd
["id"]:
3149 requirer_dict
["vnf-profile-id"] = requirer_id
3152 "provider/requirer or entities must be included in the relation."
3154 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3155 nsr_id
, nsd
, provider_dict
, cached_vnfds
3157 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3158 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3160 provider
= EERelation(relation_provider
)
3161 requirer
= EERelation(relation_requirer
)
3162 relation
= Relation(r
["name"], provider
, requirer
)
3163 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3165 relations
.append(relation
)
3168 def _get_vnf_relations(
3171 nsd
: Dict
[str, Any
],
3173 cached_vnfds
: Dict
[str, Any
],
3174 ) -> List
[Relation
]:
3176 if vca
.target_element
== "ns":
3177 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3179 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3180 vnf_profile_id
= vnf_profile
["id"]
3181 vnfd_id
= vnf_profile
["vnfd-id"]
3182 project
= nsd
["_admin"]["projects_read"][0]
3183 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3184 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3185 for r
in db_vnf_relations
:
3186 provider_dict
= None
3187 requirer_dict
= None
3188 if all(key
in r
for key
in ("provider", "requirer")):
3189 provider_dict
= r
["provider"]
3190 requirer_dict
= r
["requirer"]
3191 elif "entities" in r
:
3192 provider_id
= r
["entities"][0]["id"]
3195 "vnf-profile-id": vnf_profile_id
,
3196 "endpoint": r
["entities"][0]["endpoint"],
3198 if provider_id
!= vnfd_id
:
3199 provider_dict
["vdu-profile-id"] = provider_id
3200 requirer_id
= r
["entities"][1]["id"]
3203 "vnf-profile-id": vnf_profile_id
,
3204 "endpoint": r
["entities"][1]["endpoint"],
3206 if requirer_id
!= vnfd_id
:
3207 requirer_dict
["vdu-profile-id"] = requirer_id
3210 "provider/requirer or entities must be included in the relation."
3212 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3213 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3215 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3216 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3218 provider
= EERelation(relation_provider
)
3219 requirer
= EERelation(relation_requirer
)
3220 relation
= Relation(r
["name"], provider
, requirer
)
3221 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3223 relations
.append(relation
)
3226 def _get_kdu_resource_data(
3228 ee_relation
: EERelation
,
3229 db_nsr
: Dict
[str, Any
],
3230 cached_vnfds
: Dict
[str, Any
],
3231 ) -> DeployedK8sResource
:
3232 nsd
= get_nsd(db_nsr
)
3233 vnf_profiles
= get_vnf_profiles(nsd
)
3234 vnfd_id
= find_in_list(
3236 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3238 project
= nsd
["_admin"]["projects_read"][0]
3239 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3240 kdu_resource_profile
= get_kdu_resource_profile(
3241 db_vnfd
, ee_relation
.kdu_resource_profile_id
3243 kdu_name
= kdu_resource_profile
["kdu-name"]
3244 deployed_kdu
, _
= get_deployed_kdu(
3245 db_nsr
.get("_admin", ()).get("deployed", ()),
3247 ee_relation
.vnf_profile_id
,
3249 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3252 def _get_deployed_component(
3254 ee_relation
: EERelation
,
3255 db_nsr
: Dict
[str, Any
],
3256 cached_vnfds
: Dict
[str, Any
],
3257 ) -> DeployedComponent
:
3258 nsr_id
= db_nsr
["_id"]
3259 deployed_component
= None
3260 ee_level
= EELevel
.get_level(ee_relation
)
3261 if ee_level
== EELevel
.NS
:
3262 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3264 deployed_component
= DeployedVCA(nsr_id
, vca
)
3265 elif ee_level
== EELevel
.VNF
:
3266 vca
= get_deployed_vca(
3270 "member-vnf-index": ee_relation
.vnf_profile_id
,
3271 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3275 deployed_component
= DeployedVCA(nsr_id
, vca
)
3276 elif ee_level
== EELevel
.VDU
:
3277 vca
= get_deployed_vca(
3280 "vdu_id": ee_relation
.vdu_profile_id
,
3281 "member-vnf-index": ee_relation
.vnf_profile_id
,
3282 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3286 deployed_component
= DeployedVCA(nsr_id
, vca
)
3287 elif ee_level
== EELevel
.KDU
:
3288 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3289 ee_relation
, db_nsr
, cached_vnfds
3291 if kdu_resource_data
:
3292 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3293 return deployed_component
3295 async def _add_relation(
3299 db_nsr
: Dict
[str, Any
],
3300 cached_vnfds
: Dict
[str, Any
],
3301 cached_vnfrs
: Dict
[str, Any
],
3303 deployed_provider
= self
._get
_deployed
_component
(
3304 relation
.provider
, db_nsr
, cached_vnfds
3306 deployed_requirer
= self
._get
_deployed
_component
(
3307 relation
.requirer
, db_nsr
, cached_vnfds
3311 and deployed_requirer
3312 and deployed_provider
.config_sw_installed
3313 and deployed_requirer
.config_sw_installed
3315 provider_db_vnfr
= (
3317 relation
.provider
.nsr_id
,
3318 relation
.provider
.vnf_profile_id
,
3321 if relation
.provider
.vnf_profile_id
3324 requirer_db_vnfr
= (
3326 relation
.requirer
.nsr_id
,
3327 relation
.requirer
.vnf_profile_id
,
3330 if relation
.requirer
.vnf_profile_id
3333 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3334 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3335 provider_relation_endpoint
= RelationEndpoint(
3336 deployed_provider
.ee_id
,
3338 relation
.provider
.endpoint
,
3340 requirer_relation_endpoint
= RelationEndpoint(
3341 deployed_requirer
.ee_id
,
3343 relation
.requirer
.endpoint
,
3346 await self
.vca_map
[vca_type
].add_relation(
3347 provider
=provider_relation_endpoint
,
3348 requirer
=requirer_relation_endpoint
,
3350 except N2VCException
as exception
:
3351 self
.logger
.error(exception
)
3352 raise LcmException(exception
)
3356 async def _add_vca_relations(
3362 timeout
: int = 3600,
3365 # 1. find all relations for this VCA
3366 # 2. wait for other peers related
3370 # STEP 1: find all relations for this VCA
3373 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3374 nsd
= get_nsd(db_nsr
)
3377 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3378 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3383 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3384 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3386 # if no relations, terminate
3388 self
.logger
.debug(logging_text
+ " No relations")
3391 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3398 if now
- start
>= timeout
:
3399 self
.logger
.error(logging_text
+ " : timeout adding relations")
3402 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3403 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3405 # for each relation, find the VCA's related
3406 for relation
in relations
.copy():
3407 added
= await self
._add
_relation
(
3415 relations
.remove(relation
)
3418 self
.logger
.debug("Relations added")
3420 await asyncio
.sleep(5.0)
3424 except Exception as e
:
3425 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3428 async def _install_kdu(
3436 k8s_instance_info
: dict,
3437 k8params
: dict = None,
3442 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3445 "collection": "nsrs",
3446 "filter": {"_id": nsr_id
},
3447 "path": nsr_db_path
,
3450 if k8s_instance_info
.get("kdu-deployment-name"):
3451 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3453 kdu_instance
= self
.k8scluster_map
[
3455 ].generate_kdu_instance_name(
3456 db_dict
=db_dict_install
,
3457 kdu_model
=k8s_instance_info
["kdu-model"],
3458 kdu_name
=k8s_instance_info
["kdu-name"],
3461 # Update the nsrs table with the kdu-instance value
3465 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3468 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3469 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3470 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3471 # namespace, this first verification could be removed, and the next step would be done for any kind
3473 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3474 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3475 if k8sclustertype
in ("juju", "juju-bundle"):
3476 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3477 # that the user passed a namespace which he wants its KDU to be deployed in)
3483 "_admin.projects_write": k8s_instance_info
["namespace"],
3484 "_admin.projects_read": k8s_instance_info
["namespace"],
3490 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3495 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3497 k8s_instance_info
["namespace"] = kdu_instance
3499 await self
.k8scluster_map
[k8sclustertype
].install(
3500 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3501 kdu_model
=k8s_instance_info
["kdu-model"],
3504 db_dict
=db_dict_install
,
3506 kdu_name
=k8s_instance_info
["kdu-name"],
3507 namespace
=k8s_instance_info
["namespace"],
3508 kdu_instance
=kdu_instance
,
3512 # Obtain services to obtain management service ip
3513 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3514 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3515 kdu_instance
=kdu_instance
,
3516 namespace
=k8s_instance_info
["namespace"],
3519 # Obtain management service info (if exists)
3520 vnfr_update_dict
= {}
3521 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3523 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3528 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3531 for service
in kdud
.get("service", [])
3532 if service
.get("mgmt-service")
3534 for mgmt_service
in mgmt_services
:
3535 for service
in services
:
3536 if service
["name"].startswith(mgmt_service
["name"]):
3537 # Mgmt service found, Obtain service ip
3538 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3539 if isinstance(ip
, list) and len(ip
) == 1:
3543 "kdur.{}.ip-address".format(kdu_index
)
3546 # Check if must update also mgmt ip at the vnf
3547 service_external_cp
= mgmt_service
.get(
3548 "external-connection-point-ref"
3550 if service_external_cp
:
3552 deep_get(vnfd
, ("mgmt-interface", "cp"))
3553 == service_external_cp
3555 vnfr_update_dict
["ip-address"] = ip
3560 "external-connection-point-ref", ""
3562 == service_external_cp
,
3565 "kdur.{}.ip-address".format(kdu_index
)
3570 "Mgmt service name: {} not found".format(
3571 mgmt_service
["name"]
3575 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3576 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3578 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3581 and kdu_config
.get("initial-config-primitive")
3582 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3584 initial_config_primitive_list
= kdu_config
.get(
3585 "initial-config-primitive"
3587 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3589 for initial_config_primitive
in initial_config_primitive_list
:
3590 primitive_params_
= self
._map
_primitive
_params
(
3591 initial_config_primitive
, {}, {}
3594 await asyncio
.wait_for(
3595 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3596 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3597 kdu_instance
=kdu_instance
,
3598 primitive_name
=initial_config_primitive
["name"],
3599 params
=primitive_params_
,
3600 db_dict
=db_dict_install
,
3606 except Exception as e
:
3607 # Prepare update db with error and raise exception
3610 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3614 vnfr_data
.get("_id"),
3615 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3618 # ignore to keep original exception
3620 # reraise original error
3625 async def deploy_kdus(
3632 task_instantiation_info
,
3634 # Launch kdus if present in the descriptor
3636 k8scluster_id_2_uuic
= {
3637 "helm-chart-v3": {},
3642 async def _get_cluster_id(cluster_id
, cluster_type
):
3643 nonlocal k8scluster_id_2_uuic
3644 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3645 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3647 # check if K8scluster is creating and wait look if previous tasks in process
3648 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3649 "k8scluster", cluster_id
3652 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3653 task_name
, cluster_id
3655 self
.logger
.debug(logging_text
+ text
)
3656 await asyncio
.wait(task_dependency
, timeout
=3600)
3658 db_k8scluster
= self
.db
.get_one(
3659 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3661 if not db_k8scluster
:
3662 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3664 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3666 if cluster_type
== "helm-chart-v3":
3668 # backward compatibility for existing clusters that have not been initialized for helm v3
3669 k8s_credentials
= yaml
.safe_dump(
3670 db_k8scluster
.get("credentials")
3672 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3673 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3675 db_k8scluster_update
= {}
3676 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3677 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3678 db_k8scluster_update
[
3679 "_admin.helm-chart-v3.created"
3681 db_k8scluster_update
[
3682 "_admin.helm-chart-v3.operationalState"
3685 "k8sclusters", cluster_id
, db_k8scluster_update
3687 except Exception as e
:
3690 + "error initializing helm-v3 cluster: {}".format(str(e
))
3693 "K8s cluster '{}' has not been initialized for '{}'".format(
3694 cluster_id
, cluster_type
3699 "K8s cluster '{}' has not been initialized for '{}'".format(
3700 cluster_id
, cluster_type
3703 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3706 logging_text
+= "Deploy kdus: "
3709 db_nsr_update
= {"_admin.deployed.K8s": []}
3710 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3713 updated_cluster_list
= []
3714 updated_v3_cluster_list
= []
3716 for vnfr_data
in db_vnfrs
.values():
3717 vca_id
= self
.get_vca_id(vnfr_data
, {})
3718 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3719 # Step 0: Prepare and set parameters
3720 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3721 vnfd_id
= vnfr_data
.get("vnfd-id")
3722 vnfd_with_id
= find_in_list(
3723 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3727 for kdud
in vnfd_with_id
["kdu"]
3728 if kdud
["name"] == kdur
["kdu-name"]
3730 namespace
= kdur
.get("k8s-namespace")
3731 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3732 if kdur
.get("helm-chart"):
3733 kdumodel
= kdur
["helm-chart"]
3734 # Default version: helm3, if helm-version is v2 assign v2
3735 k8sclustertype
= "helm-chart-v3"
3736 self
.logger
.debug("kdur: {}".format(kdur
))
3738 kdur
.get("helm-version")
3739 and kdur
.get("helm-version") == "v2"
3741 k8sclustertype
= "helm-chart"
3742 elif kdur
.get("juju-bundle"):
3743 kdumodel
= kdur
["juju-bundle"]
3744 k8sclustertype
= "juju-bundle"
3747 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3748 "juju-bundle. Maybe an old NBI version is running".format(
3749 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3752 # check if kdumodel is a file and exists
3754 vnfd_with_id
= find_in_list(
3755 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3757 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3758 if storage
: # may be not present if vnfd has not artifacts
3759 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3760 if storage
["pkg-dir"]:
3761 filename
= "{}/{}/{}s/{}".format(
3768 filename
= "{}/Scripts/{}s/{}".format(
3773 if self
.fs
.file_exists(
3774 filename
, mode
="file"
3775 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3776 kdumodel
= self
.fs
.path
+ filename
3777 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3779 except Exception: # it is not a file
3782 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3783 step
= "Synchronize repos for k8s cluster '{}'".format(
3786 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3790 k8sclustertype
== "helm-chart"
3791 and cluster_uuid
not in updated_cluster_list
3793 k8sclustertype
== "helm-chart-v3"
3794 and cluster_uuid
not in updated_v3_cluster_list
3796 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3797 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3798 cluster_uuid
=cluster_uuid
3801 if del_repo_list
or added_repo_dict
:
3802 if k8sclustertype
== "helm-chart":
3804 "_admin.helm_charts_added." + item
: None
3805 for item
in del_repo_list
3808 "_admin.helm_charts_added." + item
: name
3809 for item
, name
in added_repo_dict
.items()
3811 updated_cluster_list
.append(cluster_uuid
)
3812 elif k8sclustertype
== "helm-chart-v3":
3814 "_admin.helm_charts_v3_added." + item
: None
3815 for item
in del_repo_list
3818 "_admin.helm_charts_v3_added." + item
: name
3819 for item
, name
in added_repo_dict
.items()
3821 updated_v3_cluster_list
.append(cluster_uuid
)
3823 logging_text
+ "repos synchronized on k8s cluster "
3824 "'{}' to_delete: {}, to_add: {}".format(
3825 k8s_cluster_id
, del_repo_list
, added_repo_dict
3830 {"_id": k8s_cluster_id
},
3836 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3837 vnfr_data
["member-vnf-index-ref"],
3841 k8s_instance_info
= {
3842 "kdu-instance": None,
3843 "k8scluster-uuid": cluster_uuid
,
3844 "k8scluster-type": k8sclustertype
,
3845 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3846 "kdu-name": kdur
["kdu-name"],
3847 "kdu-model": kdumodel
,
3848 "namespace": namespace
,
3849 "kdu-deployment-name": kdu_deployment_name
,
3851 db_path
= "_admin.deployed.K8s.{}".format(index
)
3852 db_nsr_update
[db_path
] = k8s_instance_info
3853 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3854 vnfd_with_id
= find_in_list(
3855 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3857 task
= asyncio
.ensure_future(
3866 k8params
=desc_params
,
3871 self
.lcm_tasks
.register(
3875 "instantiate_KDU-{}".format(index
),
3878 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3884 except (LcmException
, asyncio
.CancelledError
):
3886 except Exception as e
:
3887 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3888 if isinstance(e
, (N2VCException
, DbException
)):
3889 self
.logger
.error(logging_text
+ msg
)
3891 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3892 raise LcmException(msg
)
3895 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3915 task_instantiation_info
,
3918 # launch instantiate_N2VC in a asyncio task and register task object
3919 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3920 # if not found, create one entry and update database
3921 # fill db_nsr._admin.deployed.VCA.<index>
3924 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3928 get_charm_name
= False
3929 if "execution-environment-list" in descriptor_config
:
3930 ee_list
= descriptor_config
.get("execution-environment-list", [])
3931 elif "juju" in descriptor_config
:
3932 ee_list
= [descriptor_config
] # ns charms
3933 if "execution-environment-list" not in descriptor_config
:
3934 # charm name is only required for ns charms
3935 get_charm_name
= True
3936 else: # other types as script are not supported
3939 for ee_item
in ee_list
:
3942 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3943 ee_item
.get("juju"), ee_item
.get("helm-chart")
3946 ee_descriptor_id
= ee_item
.get("id")
3947 if ee_item
.get("juju"):
3948 vca_name
= ee_item
["juju"].get("charm")
3950 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3953 if ee_item
["juju"].get("charm") is not None
3956 if ee_item
["juju"].get("cloud") == "k8s":
3957 vca_type
= "k8s_proxy_charm"
3958 elif ee_item
["juju"].get("proxy") is False:
3959 vca_type
= "native_charm"
3960 elif ee_item
.get("helm-chart"):
3961 vca_name
= ee_item
["helm-chart"]
3962 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3965 vca_type
= "helm-v3"
3968 logging_text
+ "skipping non juju neither charm configuration"
3973 for vca_index
, vca_deployed
in enumerate(
3974 db_nsr
["_admin"]["deployed"]["VCA"]
3976 if not vca_deployed
:
3979 vca_deployed
.get("member-vnf-index") == member_vnf_index
3980 and vca_deployed
.get("vdu_id") == vdu_id
3981 and vca_deployed
.get("kdu_name") == kdu_name
3982 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3983 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3987 # not found, create one.
3989 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3992 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3994 target
+= "/kdu/{}".format(kdu_name
)
3996 "target_element": target
,
3997 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3998 "member-vnf-index": member_vnf_index
,
4000 "kdu_name": kdu_name
,
4001 "vdu_count_index": vdu_index
,
4002 "operational-status": "init", # TODO revise
4003 "detailed-status": "", # TODO revise
4004 "step": "initial-deploy", # TODO revise
4006 "vdu_name": vdu_name
,
4008 "ee_descriptor_id": ee_descriptor_id
,
4009 "charm_name": charm_name
,
4013 # create VCA and configurationStatus in db
4015 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
4016 "configurationStatus.{}".format(vca_index
): dict(),
4018 self
.update_db_2("nsrs", nsr_id
, db_dict
)
4020 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
4022 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
4023 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
4024 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
4027 task_n2vc
= asyncio
.ensure_future(
4028 self
.instantiate_N2VC(
4029 logging_text
=logging_text
,
4030 vca_index
=vca_index
,
4036 vdu_index
=vdu_index
,
4037 kdu_index
=kdu_index
,
4038 deploy_params
=deploy_params
,
4039 config_descriptor
=descriptor_config
,
4040 base_folder
=base_folder
,
4041 nslcmop_id
=nslcmop_id
,
4045 ee_config_descriptor
=ee_item
,
4048 self
.lcm_tasks
.register(
4052 "instantiate_N2VC-{}".format(vca_index
),
4055 task_instantiation_info
[
4057 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
4058 member_vnf_index
or "", vdu_id
or ""
4062 def _create_nslcmop(nsr_id
, operation
, params
):
4064 Creates a ns-lcm-opp content to be stored at database.
4065 :param nsr_id: internal id of the instance
4066 :param operation: instantiate, terminate, scale, action, ...
4067 :param params: user parameters for the operation
4068 :return: dictionary following SOL005 format
4070 # Raise exception if invalid arguments
4071 if not (nsr_id
and operation
and params
):
4073 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
4080 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
4081 "operationState": "PROCESSING",
4082 "statusEnteredTime": now
,
4083 "nsInstanceId": nsr_id
,
4084 "lcmOperationType": operation
,
4086 "isAutomaticInvocation": False,
4087 "operationParams": params
,
4088 "isCancelPending": False,
4090 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
4091 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
4096 def _format_additional_params(self
, params
):
4097 params
= params
or {}
4098 for key
, value
in params
.items():
4099 if str(value
).startswith("!!yaml "):
4100 params
[key
] = yaml
.safe_load(value
[7:])
4103 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
4104 primitive
= seq
.get("name")
4105 primitive_params
= {}
4107 "member_vnf_index": vnf_index
,
4108 "primitive": primitive
,
4109 "primitive_params": primitive_params
,
4112 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
4116 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
4117 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
4118 if op
.get("operationState") == "COMPLETED":
4119 # b. Skip sub-operation
4120 # _ns_execute_primitive() or RO.create_action() will NOT be executed
4121 return self
.SUBOPERATION_STATUS_SKIP
4123 # c. retry executing sub-operation
4124 # The sub-operation exists, and operationState != 'COMPLETED'
4125 # Update operationState = 'PROCESSING' to indicate a retry.
4126 operationState
= "PROCESSING"
4127 detailed_status
= "In progress"
4128 self
._update
_suboperation
_status
(
4129 db_nslcmop
, op_index
, operationState
, detailed_status
4131 # Return the sub-operation index
4132 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4133 # with arguments extracted from the sub-operation
4136 # Find a sub-operation where all keys in a matching dictionary must match
4137 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4138 def _find_suboperation(self
, db_nslcmop
, match
):
4139 if db_nslcmop
and match
:
4140 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4141 for i
, op
in enumerate(op_list
):
4142 if all(op
.get(k
) == match
[k
] for k
in match
):
4144 return self
.SUBOPERATION_STATUS_NOT_FOUND
4146 # Update status for a sub-operation given its index
4147 def _update_suboperation_status(
4148 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4150 # Update DB for HA tasks
4151 q_filter
= {"_id": db_nslcmop
["_id"]}
4153 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4154 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4157 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4160 # Add sub-operation, return the index of the added sub-operation
4161 # Optionally, set operationState, detailed-status, and operationType
4162 # Status and type are currently set for 'scale' sub-operations:
4163 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4164 # 'detailed-status' : status message
4165 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4166 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4167 def _add_suboperation(
4175 mapped_primitive_params
,
4176 operationState
=None,
4177 detailed_status
=None,
4180 RO_scaling_info
=None,
4183 return self
.SUBOPERATION_STATUS_NOT_FOUND
4184 # Get the "_admin.operations" list, if it exists
4185 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4186 op_list
= db_nslcmop_admin
.get("operations")
4187 # Create or append to the "_admin.operations" list
4189 "member_vnf_index": vnf_index
,
4191 "vdu_count_index": vdu_count_index
,
4192 "primitive": primitive
,
4193 "primitive_params": mapped_primitive_params
,
4196 new_op
["operationState"] = operationState
4198 new_op
["detailed-status"] = detailed_status
4200 new_op
["lcmOperationType"] = operationType
4202 new_op
["RO_nsr_id"] = RO_nsr_id
4204 new_op
["RO_scaling_info"] = RO_scaling_info
4206 # No existing operations, create key 'operations' with current operation as first list element
4207 db_nslcmop_admin
.update({"operations": [new_op
]})
4208 op_list
= db_nslcmop_admin
.get("operations")
4210 # Existing operations, append operation to list
4211 op_list
.append(new_op
)
4213 db_nslcmop_update
= {"_admin.operations": op_list
}
4214 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4215 op_index
= len(op_list
) - 1
4218 # Helper methods for scale() sub-operations
4220 # pre-scale/post-scale:
4221 # Check for 3 different cases:
4222 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4223 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4224 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4225 def _check_or_add_scale_suboperation(
4229 vnf_config_primitive
,
4233 RO_scaling_info
=None,
4235 # Find this sub-operation
4236 if RO_nsr_id
and RO_scaling_info
:
4237 operationType
= "SCALE-RO"
4239 "member_vnf_index": vnf_index
,
4240 "RO_nsr_id": RO_nsr_id
,
4241 "RO_scaling_info": RO_scaling_info
,
4245 "member_vnf_index": vnf_index
,
4246 "primitive": vnf_config_primitive
,
4247 "primitive_params": primitive_params
,
4248 "lcmOperationType": operationType
,
4250 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4251 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4252 # a. New sub-operation
4253 # The sub-operation does not exist, add it.
4254 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4255 # The following parameters are set to None for all kind of scaling:
4257 vdu_count_index
= None
4259 if RO_nsr_id
and RO_scaling_info
:
4260 vnf_config_primitive
= None
4261 primitive_params
= None
4264 RO_scaling_info
= None
4265 # Initial status for sub-operation
4266 operationState
= "PROCESSING"
4267 detailed_status
= "In progress"
4268 # Add sub-operation for pre/post-scaling (zero or more operations)
4269 self
._add
_suboperation
(
4275 vnf_config_primitive
,
4283 return self
.SUBOPERATION_STATUS_NEW
4285 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4286 # or op_index (operationState != 'COMPLETED')
4287 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4289 # Function to return execution_environment id
4291 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4292 # TODO vdu_index_count
4293 for vca
in vca_deployed_list
:
4294 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4297 async def destroy_N2VC(
4305 exec_primitives
=True,
4310 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4311 :param logging_text:
4313 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4314 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4315 :param vca_index: index in the database _admin.deployed.VCA
4316 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4317 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4318 not executed properly
4319 :param scaling_in: True destroys the application, False destroys the model
4320 :return: None or exception
4325 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4326 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4330 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4332 # execute terminate_primitives
4334 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4335 config_descriptor
.get("terminate-config-primitive"),
4336 vca_deployed
.get("ee_descriptor_id"),
4338 vdu_id
= vca_deployed
.get("vdu_id")
4339 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4340 vdu_name
= vca_deployed
.get("vdu_name")
4341 vnf_index
= vca_deployed
.get("member-vnf-index")
4342 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4343 for seq
in terminate_primitives
:
4344 # For each sequence in list, get primitive and call _ns_execute_primitive()
4345 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4346 vnf_index
, seq
.get("name")
4348 self
.logger
.debug(logging_text
+ step
)
4349 # Create the primitive for each sequence, i.e. "primitive": "touch"
4350 primitive
= seq
.get("name")
4351 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4356 self
._add
_suboperation
(
4363 mapped_primitive_params
,
4365 # Sub-operations: Call _ns_execute_primitive() instead of action()
4367 result
, result_detail
= await self
._ns
_execute
_primitive
(
4368 vca_deployed
["ee_id"],
4370 mapped_primitive_params
,
4374 except LcmException
:
4375 # this happens when VCA is not deployed. In this case it is not needed to terminate
4377 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4378 if result
not in result_ok
:
4380 "terminate_primitive {} for vnf_member_index={} fails with "
4381 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4383 # set that this VCA do not need terminated
4384 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4388 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4391 # Delete Prometheus Jobs if any
4392 # This uses NSR_ID, so it will destroy any jobs under this index
4393 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4396 await self
.vca_map
[vca_type
].delete_execution_environment(
4397 vca_deployed
["ee_id"],
4398 scaling_in
=scaling_in
,
4403 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4404 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4405 namespace
= "." + db_nsr
["_id"]
4407 await self
.n2vc
.delete_namespace(
4408 namespace
=namespace
,
4409 total_timeout
=self
.timeout
.charm_delete
,
4412 except N2VCNotFound
: # already deleted. Skip
4414 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4416 async def terminate(self
, nsr_id
, nslcmop_id
):
4417 # Try to lock HA task here
4418 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4419 if not task_is_locked_by_me
:
4422 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4423 self
.logger
.debug(logging_text
+ "Enter")
4424 timeout_ns_terminate
= self
.timeout
.ns_terminate
4427 operation_params
= None
4429 error_list
= [] # annotates all failed error messages
4430 db_nslcmop_update
= {}
4431 autoremove
= False # autoremove after terminated
4432 tasks_dict_info
= {}
4435 "Stage 1/3: Preparing task.",
4436 "Waiting for previous operations to terminate.",
4439 # ^ contains [stage, step, VIM-status]
4441 # wait for any previous tasks in process
4442 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4444 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4445 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4446 operation_params
= db_nslcmop
.get("operationParams") or {}
4447 if operation_params
.get("timeout_ns_terminate"):
4448 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4449 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4450 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4452 db_nsr_update
["operational-status"] = "terminating"
4453 db_nsr_update
["config-status"] = "terminating"
4454 self
._write
_ns
_status
(
4456 ns_state
="TERMINATING",
4457 current_operation
="TERMINATING",
4458 current_operation_id
=nslcmop_id
,
4459 other_update
=db_nsr_update
,
4461 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4462 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4463 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4466 stage
[1] = "Getting vnf descriptors from db."
4467 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4469 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4471 db_vnfds_from_id
= {}
4472 db_vnfds_from_member_index
= {}
4474 for vnfr
in db_vnfrs_list
:
4475 vnfd_id
= vnfr
["vnfd-id"]
4476 if vnfd_id
not in db_vnfds_from_id
:
4477 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4478 db_vnfds_from_id
[vnfd_id
] = vnfd
4479 db_vnfds_from_member_index
[
4480 vnfr
["member-vnf-index-ref"]
4481 ] = db_vnfds_from_id
[vnfd_id
]
4483 # Destroy individual execution environments when there are terminating primitives.
4484 # Rest of EE will be deleted at once
4485 # TODO - check before calling _destroy_N2VC
4486 # if not operation_params.get("skip_terminate_primitives"):#
4487 # or not vca.get("needed_terminate"):
4488 stage
[0] = "Stage 2/3 execute terminating primitives."
4489 self
.logger
.debug(logging_text
+ stage
[0])
4490 stage
[1] = "Looking execution environment that needs terminate."
4491 self
.logger
.debug(logging_text
+ stage
[1])
4493 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4494 config_descriptor
= None
4495 vca_member_vnf_index
= vca
.get("member-vnf-index")
4496 vca_id
= self
.get_vca_id(
4497 db_vnfrs_dict
.get(vca_member_vnf_index
)
4498 if vca_member_vnf_index
4502 if not vca
or not vca
.get("ee_id"):
4504 if not vca
.get("member-vnf-index"):
4506 config_descriptor
= db_nsr
.get("ns-configuration")
4507 elif vca
.get("vdu_id"):
4508 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4509 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4510 elif vca
.get("kdu_name"):
4511 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4512 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4514 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4515 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4516 vca_type
= vca
.get("type")
4517 exec_terminate_primitives
= not operation_params
.get(
4518 "skip_terminate_primitives"
4519 ) and vca
.get("needed_terminate")
4520 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4521 # pending native charms
4523 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4525 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4526 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4527 task
= asyncio
.ensure_future(
4535 exec_terminate_primitives
,
4539 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4541 # wait for pending tasks of terminate primitives
4545 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4547 error_list
= await self
._wait
_for
_tasks
(
4550 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4554 tasks_dict_info
.clear()
4556 return # raise LcmException("; ".join(error_list))
4558 # remove All execution environments at once
4559 stage
[0] = "Stage 3/3 delete all."
4561 if nsr_deployed
.get("VCA"):
4562 stage
[1] = "Deleting all execution environments."
4563 self
.logger
.debug(logging_text
+ stage
[1])
4564 vca_id
= self
.get_vca_id({}, db_nsr
)
4565 task_delete_ee
= asyncio
.ensure_future(
4567 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4568 timeout
=self
.timeout
.charm_delete
,
4571 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4572 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4574 # Delete Namespace and Certificates if necessary
4575 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4576 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4577 certificate_name
=db_nslcmop
["nsInstanceId"],
4579 # TODO: Delete namespace
4581 # Delete from k8scluster
4582 stage
[1] = "Deleting KDUs."
4583 self
.logger
.debug(logging_text
+ stage
[1])
4584 # print(nsr_deployed)
4585 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4586 if not kdu
or not kdu
.get("kdu-instance"):
4588 kdu_instance
= kdu
.get("kdu-instance")
4589 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4590 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4591 vca_id
= self
.get_vca_id({}, db_nsr
)
4592 task_delete_kdu_instance
= asyncio
.ensure_future(
4593 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4594 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4595 kdu_instance
=kdu_instance
,
4597 namespace
=kdu
.get("namespace"),
4603 + "Unknown k8s deployment type {}".format(
4604 kdu
.get("k8scluster-type")
4609 task_delete_kdu_instance
4610 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4613 stage
[1] = "Deleting ns from VIM."
4614 if self
.ro_config
.ng
:
4615 task_delete_ro
= asyncio
.ensure_future(
4616 self
._terminate
_ng
_ro
(
4617 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4620 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4622 # rest of staff will be done at finally
4625 ROclient
.ROClientException
,
4630 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4632 except asyncio
.CancelledError
:
4634 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4636 exc
= "Operation was cancelled"
4637 except Exception as e
:
4638 exc
= traceback
.format_exc()
4639 self
.logger
.critical(
4640 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4645 error_list
.append(str(exc
))
4647 # wait for pending tasks
4649 stage
[1] = "Waiting for terminate pending tasks."
4650 self
.logger
.debug(logging_text
+ stage
[1])
4651 error_list
+= await self
._wait
_for
_tasks
(
4654 timeout_ns_terminate
,
4658 stage
[1] = stage
[2] = ""
4659 except asyncio
.CancelledError
:
4660 error_list
.append("Cancelled")
4661 # TODO cancell all tasks
4662 except Exception as exc
:
4663 error_list
.append(str(exc
))
4664 # update status at database
4666 error_detail
= "; ".join(error_list
)
4667 # self.logger.error(logging_text + error_detail)
4668 error_description_nslcmop
= "{} Detail: {}".format(
4669 stage
[0], error_detail
4671 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4672 nslcmop_id
, stage
[0]
4675 db_nsr_update
["operational-status"] = "failed"
4676 db_nsr_update
["detailed-status"] = (
4677 error_description_nsr
+ " Detail: " + error_detail
4679 db_nslcmop_update
["detailed-status"] = error_detail
4680 nslcmop_operation_state
= "FAILED"
4684 error_description_nsr
= error_description_nslcmop
= None
4685 ns_state
= "NOT_INSTANTIATED"
4686 db_nsr_update
["operational-status"] = "terminated"
4687 db_nsr_update
["detailed-status"] = "Done"
4688 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4689 db_nslcmop_update
["detailed-status"] = "Done"
4690 nslcmop_operation_state
= "COMPLETED"
4693 self
._write
_ns
_status
(
4696 current_operation
="IDLE",
4697 current_operation_id
=None,
4698 error_description
=error_description_nsr
,
4699 error_detail
=error_detail
,
4700 other_update
=db_nsr_update
,
4702 self
._write
_op
_status
(
4705 error_message
=error_description_nslcmop
,
4706 operation_state
=nslcmop_operation_state
,
4707 other_update
=db_nslcmop_update
,
4709 if ns_state
== "NOT_INSTANTIATED":
4713 {"nsr-id-ref": nsr_id
},
4714 {"_admin.nsState": "NOT_INSTANTIATED"},
4716 except DbException
as e
:
4719 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4723 if operation_params
:
4724 autoremove
= operation_params
.get("autoremove", False)
4725 if nslcmop_operation_state
:
4727 await self
.msg
.aiowrite(
4732 "nslcmop_id": nslcmop_id
,
4733 "operationState": nslcmop_operation_state
,
4734 "autoremove": autoremove
,
4737 except Exception as e
:
4739 logging_text
+ "kafka_write notification Exception {}".format(e
)
4741 self
.logger
.debug(f
"Deleting alerts: ns_id={nsr_id}")
4742 self
.db
.del_list("alerts", {"tags.ns_id": nsr_id
})
4744 self
.logger
.debug(logging_text
+ "Exit")
4745 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4747 async def _wait_for_tasks(
4748 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4751 error_detail_list
= []
4753 pending_tasks
= list(created_tasks_info
.keys())
4754 num_tasks
= len(pending_tasks
)
4756 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4757 self
._write
_op
_status
(nslcmop_id
, stage
)
4758 while pending_tasks
:
4760 _timeout
= timeout
+ time_start
- time()
4761 done
, pending_tasks
= await asyncio
.wait(
4762 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4764 num_done
+= len(done
)
4765 if not done
: # Timeout
4766 for task
in pending_tasks
:
4767 new_error
= created_tasks_info
[task
] + ": Timeout"
4768 error_detail_list
.append(new_error
)
4769 error_list
.append(new_error
)
4772 if task
.cancelled():
4775 exc
= task
.exception()
4777 if isinstance(exc
, asyncio
.TimeoutError
):
4779 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4780 error_list
.append(created_tasks_info
[task
])
4781 error_detail_list
.append(new_error
)
4788 ROclient
.ROClientException
,
4794 self
.logger
.error(logging_text
+ new_error
)
4796 exc_traceback
= "".join(
4797 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4801 + created_tasks_info
[task
]
4807 logging_text
+ created_tasks_info
[task
] + ": Done"
4809 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4811 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4812 if nsr_id
: # update also nsr
4817 "errorDescription": "Error at: " + ", ".join(error_list
),
4818 "errorDetail": ". ".join(error_detail_list
),
4821 self
._write
_op
_status
(nslcmop_id
, stage
)
4822 return error_detail_list
4825 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4827 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4828 The default-value is used. If it is between < > it look for a value at instantiation_params
4829 :param primitive_desc: portion of VNFD/NSD that describes primitive
4830 :param params: Params provided by user
4831 :param instantiation_params: Instantiation params provided by user
4832 :return: a dictionary with the calculated params
4834 calculated_params
= {}
4835 for parameter
in primitive_desc
.get("parameter", ()):
4836 param_name
= parameter
["name"]
4837 if param_name
in params
:
4838 calculated_params
[param_name
] = params
[param_name
]
4839 elif "default-value" in parameter
or "value" in parameter
:
4840 if "value" in parameter
:
4841 calculated_params
[param_name
] = parameter
["value"]
4843 calculated_params
[param_name
] = parameter
["default-value"]
4845 isinstance(calculated_params
[param_name
], str)
4846 and calculated_params
[param_name
].startswith("<")
4847 and calculated_params
[param_name
].endswith(">")
4849 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4850 calculated_params
[param_name
] = instantiation_params
[
4851 calculated_params
[param_name
][1:-1]
4855 "Parameter {} needed to execute primitive {} not provided".format(
4856 calculated_params
[param_name
], primitive_desc
["name"]
4861 "Parameter {} needed to execute primitive {} not provided".format(
4862 param_name
, primitive_desc
["name"]
4866 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4867 calculated_params
[param_name
] = yaml
.safe_dump(
4868 calculated_params
[param_name
], default_flow_style
=True, width
=256
4870 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4872 ].startswith("!!yaml "):
4873 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4874 if parameter
.get("data-type") == "INTEGER":
4876 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4877 except ValueError: # error converting string to int
4879 "Parameter {} of primitive {} must be integer".format(
4880 param_name
, primitive_desc
["name"]
4883 elif parameter
.get("data-type") == "BOOLEAN":
4884 calculated_params
[param_name
] = not (
4885 (str(calculated_params
[param_name
])).lower() == "false"
4888 # add always ns_config_info if primitive name is config
4889 if primitive_desc
["name"] == "config":
4890 if "ns_config_info" in instantiation_params
:
4891 calculated_params
["ns_config_info"] = instantiation_params
[
4894 return calculated_params
4896 def _look_for_deployed_vca(
4903 ee_descriptor_id
=None,
4905 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4906 for vca
in deployed_vca
:
4909 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4912 vdu_count_index
is not None
4913 and vdu_count_index
!= vca
["vdu_count_index"]
4916 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4918 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4922 # vca_deployed not found
4924 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4925 " is not deployed".format(
4934 ee_id
= vca
.get("ee_id")
4936 "type", "lxc_proxy_charm"
4937 ) # default value for backward compatibility - proxy charm
4940 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4941 "execution environment".format(
4942 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4945 return ee_id
, vca_type
4947 async def _ns_execute_primitive(
4953 retries_interval
=30,
4960 if primitive
== "config":
4961 primitive_params
= {"params": primitive_params
}
4963 vca_type
= vca_type
or "lxc_proxy_charm"
4967 output
= await asyncio
.wait_for(
4968 self
.vca_map
[vca_type
].exec_primitive(
4970 primitive_name
=primitive
,
4971 params_dict
=primitive_params
,
4972 progress_timeout
=self
.timeout
.progress_primitive
,
4973 total_timeout
=self
.timeout
.primitive
,
4978 timeout
=timeout
or self
.timeout
.primitive
,
4982 except asyncio
.CancelledError
:
4984 except Exception as e
:
4988 "Error executing action {} on {} -> {}".format(
4993 await asyncio
.sleep(retries_interval
)
4995 if isinstance(e
, asyncio
.TimeoutError
):
4997 message
="Timed out waiting for action to complete"
4999 return "FAILED", getattr(e
, "message", repr(e
))
5001 return "COMPLETED", output
5003 except (LcmException
, asyncio
.CancelledError
):
5005 except Exception as e
:
5006 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5008 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5010 Updating the vca_status with latest juju information in nsrs record
5011 :param: nsr_id: Id of the nsr
5012 :param: nslcmop_id: Id of the nslcmop
5016 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5017 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5018 vca_id
= self
.get_vca_id({}, db_nsr
)
5019 if db_nsr
["_admin"]["deployed"]["K8s"]:
5020 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5021 cluster_uuid
, kdu_instance
, cluster_type
= (
5022 k8s
["k8scluster-uuid"],
5023 k8s
["kdu-instance"],
5024 k8s
["k8scluster-type"],
5026 await self
._on
_update
_k
8s
_db
(
5027 cluster_uuid
=cluster_uuid
,
5028 kdu_instance
=kdu_instance
,
5029 filter={"_id": nsr_id
},
5031 cluster_type
=cluster_type
,
5034 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5035 table
, filter = "nsrs", {"_id": nsr_id
}
5036 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5037 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5039 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5040 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5042 async def action(self
, nsr_id
, nslcmop_id
):
5043 # Try to lock HA task here
5044 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5045 if not task_is_locked_by_me
:
5048 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5049 self
.logger
.debug(logging_text
+ "Enter")
5050 # get all needed from database
5054 db_nslcmop_update
= {}
5055 nslcmop_operation_state
= None
5056 error_description_nslcmop
= None
5060 # wait for any previous tasks in process
5061 step
= "Waiting for previous operations to terminate"
5062 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5064 self
._write
_ns
_status
(
5067 current_operation
="RUNNING ACTION",
5068 current_operation_id
=nslcmop_id
,
5071 step
= "Getting information from database"
5072 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5073 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5074 if db_nslcmop
["operationParams"].get("primitive_params"):
5075 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5076 db_nslcmop
["operationParams"]["primitive_params"]
5079 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5080 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5081 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5082 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5083 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5084 primitive
= db_nslcmop
["operationParams"]["primitive"]
5085 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5086 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5087 "timeout_ns_action", self
.timeout
.primitive
5091 step
= "Getting vnfr from database"
5092 db_vnfr
= self
.db
.get_one(
5093 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5095 if db_vnfr
.get("kdur"):
5097 for kdur
in db_vnfr
["kdur"]:
5098 if kdur
.get("additionalParams"):
5099 kdur
["additionalParams"] = json
.loads(
5100 kdur
["additionalParams"]
5102 kdur_list
.append(kdur
)
5103 db_vnfr
["kdur"] = kdur_list
5104 step
= "Getting vnfd from database"
5105 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5107 # Sync filesystem before running a primitive
5108 self
.fs
.sync(db_vnfr
["vnfd-id"])
5110 step
= "Getting nsd from database"
5111 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5113 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5114 # for backward compatibility
5115 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5116 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5117 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5118 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5120 # look for primitive
5121 config_primitive_desc
= descriptor_configuration
= None
5123 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5125 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5127 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5129 descriptor_configuration
= db_nsd
.get("ns-configuration")
5131 if descriptor_configuration
and descriptor_configuration
.get(
5134 for config_primitive
in descriptor_configuration
["config-primitive"]:
5135 if config_primitive
["name"] == primitive
:
5136 config_primitive_desc
= config_primitive
5139 if not config_primitive_desc
:
5140 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5142 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5146 primitive_name
= primitive
5147 ee_descriptor_id
= None
5149 primitive_name
= config_primitive_desc
.get(
5150 "execution-environment-primitive", primitive
5152 ee_descriptor_id
= config_primitive_desc
.get(
5153 "execution-environment-ref"
5159 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5161 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5164 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5166 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5168 desc_params
= parse_yaml_strings(
5169 db_vnfr
.get("additionalParamsForVnf")
5172 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5173 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5174 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5176 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5177 actions
.add(primitive
["name"])
5178 for primitive
in kdu_configuration
.get("config-primitive", []):
5179 actions
.add(primitive
["name"])
5181 nsr_deployed
["K8s"],
5182 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5183 and kdu
["member-vnf-index"] == vnf_index
,
5187 if primitive_name
in actions
5188 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5192 # TODO check if ns is in a proper status
5194 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5196 # kdur and desc_params already set from before
5197 if primitive_params
:
5198 desc_params
.update(primitive_params
)
5199 # TODO Check if we will need something at vnf level
5200 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5202 kdu_name
== kdu
["kdu-name"]
5203 and kdu
["member-vnf-index"] == vnf_index
5208 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5211 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5212 msg
= "unknown k8scluster-type '{}'".format(
5213 kdu
.get("k8scluster-type")
5215 raise LcmException(msg
)
5218 "collection": "nsrs",
5219 "filter": {"_id": nsr_id
},
5220 "path": "_admin.deployed.K8s.{}".format(index
),
5224 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5226 step
= "Executing kdu {}".format(primitive_name
)
5227 if primitive_name
== "upgrade":
5228 if desc_params
.get("kdu_model"):
5229 kdu_model
= desc_params
.get("kdu_model")
5230 del desc_params
["kdu_model"]
5232 kdu_model
= kdu
.get("kdu-model")
5233 if kdu_model
.count("/") < 2: # helm chart is not embedded
5234 parts
= kdu_model
.split(sep
=":")
5236 kdu_model
= parts
[0]
5237 if desc_params
.get("kdu_atomic_upgrade"):
5238 atomic_upgrade
= desc_params
.get(
5239 "kdu_atomic_upgrade"
5240 ).lower() in ("yes", "true", "1")
5241 del desc_params
["kdu_atomic_upgrade"]
5243 atomic_upgrade
= True
5245 detailed_status
= await asyncio
.wait_for(
5246 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5247 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5248 kdu_instance
=kdu
.get("kdu-instance"),
5249 atomic
=atomic_upgrade
,
5250 kdu_model
=kdu_model
,
5253 timeout
=timeout_ns_action
,
5255 timeout
=timeout_ns_action
+ 10,
5258 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5260 elif primitive_name
== "rollback":
5261 detailed_status
= await asyncio
.wait_for(
5262 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5263 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5264 kdu_instance
=kdu
.get("kdu-instance"),
5267 timeout
=timeout_ns_action
,
5269 elif primitive_name
== "status":
5270 detailed_status
= await asyncio
.wait_for(
5271 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5272 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5273 kdu_instance
=kdu
.get("kdu-instance"),
5276 timeout
=timeout_ns_action
,
5279 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5280 kdu
["kdu-name"], nsr_id
5282 params
= self
._map
_primitive
_params
(
5283 config_primitive_desc
, primitive_params
, desc_params
5286 detailed_status
= await asyncio
.wait_for(
5287 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5288 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5289 kdu_instance
=kdu_instance
,
5290 primitive_name
=primitive_name
,
5293 timeout
=timeout_ns_action
,
5296 timeout
=timeout_ns_action
,
5300 nslcmop_operation_state
= "COMPLETED"
5302 detailed_status
= ""
5303 nslcmop_operation_state
= "FAILED"
5305 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5306 nsr_deployed
["VCA"],
5307 member_vnf_index
=vnf_index
,
5309 vdu_count_index
=vdu_count_index
,
5310 ee_descriptor_id
=ee_descriptor_id
,
5312 for vca_index
, vca_deployed
in enumerate(
5313 db_nsr
["_admin"]["deployed"]["VCA"]
5315 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5317 "collection": "nsrs",
5318 "filter": {"_id": nsr_id
},
5319 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5323 nslcmop_operation_state
,
5325 ) = await self
._ns
_execute
_primitive
(
5327 primitive
=primitive_name
,
5328 primitive_params
=self
._map
_primitive
_params
(
5329 config_primitive_desc
, primitive_params
, desc_params
5331 timeout
=timeout_ns_action
,
5337 db_nslcmop_update
["detailed-status"] = detailed_status
5338 error_description_nslcmop
= (
5339 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5343 + "Done with result {} {}".format(
5344 nslcmop_operation_state
, detailed_status
5347 return # database update is called inside finally
5349 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5350 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5352 except asyncio
.CancelledError
:
5354 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5356 exc
= "Operation was cancelled"
5357 except asyncio
.TimeoutError
:
5358 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5360 except Exception as e
:
5361 exc
= traceback
.format_exc()
5362 self
.logger
.critical(
5363 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5372 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5373 nslcmop_operation_state
= "FAILED"
5375 self
._write
_ns
_status
(
5379 ], # TODO check if degraded. For the moment use previous status
5380 current_operation
="IDLE",
5381 current_operation_id
=None,
5382 # error_description=error_description_nsr,
5383 # error_detail=error_detail,
5384 other_update
=db_nsr_update
,
5387 self
._write
_op
_status
(
5390 error_message
=error_description_nslcmop
,
5391 operation_state
=nslcmop_operation_state
,
5392 other_update
=db_nslcmop_update
,
5395 if nslcmop_operation_state
:
5397 await self
.msg
.aiowrite(
5402 "nslcmop_id": nslcmop_id
,
5403 "operationState": nslcmop_operation_state
,
5406 except Exception as e
:
5408 logging_text
+ "kafka_write notification Exception {}".format(e
)
5410 self
.logger
.debug(logging_text
+ "Exit")
5411 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5412 return nslcmop_operation_state
, detailed_status
5414 async def terminate_vdus(
5415 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5417 """This method terminates VDUs
5420 db_vnfr: VNF instance record
5421 member_vnf_index: VNF index to identify the VDUs to be removed
5422 db_nsr: NS instance record
5423 update_db_nslcmops: Nslcmop update record
5425 vca_scaling_info
= []
5426 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5427 scaling_info
["scaling_direction"] = "IN"
5428 scaling_info
["vdu-delete"] = {}
5429 scaling_info
["kdu-delete"] = {}
5430 db_vdur
= db_vnfr
.get("vdur")
5431 vdur_list
= copy(db_vdur
)
5433 for index
, vdu
in enumerate(vdur_list
):
5434 vca_scaling_info
.append(
5436 "osm_vdu_id": vdu
["vdu-id-ref"],
5437 "member-vnf-index": member_vnf_index
,
5439 "vdu_index": count_index
,
5442 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5443 scaling_info
["vdu"].append(
5445 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5446 "vdu_id": vdu
["vdu-id-ref"],
5450 for interface
in vdu
["interfaces"]:
5451 scaling_info
["vdu"][index
]["interface"].append(
5453 "name": interface
["name"],
5454 "ip_address": interface
["ip-address"],
5455 "mac_address": interface
.get("mac-address"),
5458 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5459 stage
[2] = "Terminating VDUs"
5460 if scaling_info
.get("vdu-delete"):
5461 # scale_process = "RO"
5462 if self
.ro_config
.ng
:
5463 await self
._scale
_ng
_ro
(
5472 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5473 """This method is to Remove VNF instances from NS.
5476 nsr_id: NS instance id
5477 nslcmop_id: nslcmop id of update
5478 vnf_instance_id: id of the VNF instance to be removed
5481 result: (str, str) COMPLETED/FAILED, details
5485 logging_text
= "Task ns={} update ".format(nsr_id
)
5486 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5487 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5488 if check_vnfr_count
> 1:
5489 stage
= ["", "", ""]
5490 step
= "Getting nslcmop from database"
5492 step
+ " after having waited for previous tasks to be completed"
5494 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5495 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5496 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5497 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5498 """ db_vnfr = self.db.get_one(
5499 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5501 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5502 await self
.terminate_vdus(
5511 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5512 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5513 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5514 "constituent-vnfr-ref"
5516 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5517 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5518 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5519 return "COMPLETED", "Done"
5521 step
= "Terminate VNF Failed with"
5523 "{} Cannot terminate the last VNF in this NS.".format(
5527 except (LcmException
, asyncio
.CancelledError
):
5529 except Exception as e
:
5530 self
.logger
.debug("Error removing VNF {}".format(e
))
5531 return "FAILED", "Error removing VNF {}".format(e
)
5533 async def _ns_redeploy_vnf(
5541 """This method updates and redeploys VNF instances
5544 nsr_id: NS instance id
5545 nslcmop_id: nslcmop id
5546 db_vnfd: VNF descriptor
5547 db_vnfr: VNF instance record
5548 db_nsr: NS instance record
5551 result: (str, str) COMPLETED/FAILED, details
5555 stage
= ["", "", ""]
5556 logging_text
= "Task ns={} update ".format(nsr_id
)
5557 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5558 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5560 # Terminate old VNF resources
5561 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5562 await self
.terminate_vdus(
5571 # old_vnfd_id = db_vnfr["vnfd-id"]
5572 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5573 new_db_vnfd
= db_vnfd
5574 # new_vnfd_ref = new_db_vnfd["id"]
5575 # new_vnfd_id = vnfd_id
5579 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5581 "name": cp
.get("id"),
5582 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5583 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5586 new_vnfr_cp
.append(vnf_cp
)
5587 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5588 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5589 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5591 "revision": latest_vnfd_revision
,
5592 "connection-point": new_vnfr_cp
,
5596 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5597 updated_db_vnfr
= self
.db
.get_one(
5599 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5602 # Instantiate new VNF resources
5603 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5604 vca_scaling_info
= []
5605 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5606 scaling_info
["scaling_direction"] = "OUT"
5607 scaling_info
["vdu-create"] = {}
5608 scaling_info
["kdu-create"] = {}
5609 vdud_instantiate_list
= db_vnfd
["vdu"]
5610 for index
, vdud
in enumerate(vdud_instantiate_list
):
5611 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5613 additional_params
= (
5614 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5617 cloud_init_list
= []
5619 # TODO Information of its own ip is not available because db_vnfr is not updated.
5620 additional_params
["OSM"] = get_osm_params(
5621 updated_db_vnfr
, vdud
["id"], 1
5623 cloud_init_list
.append(
5624 self
._parse
_cloud
_init
(
5631 vca_scaling_info
.append(
5633 "osm_vdu_id": vdud
["id"],
5634 "member-vnf-index": member_vnf_index
,
5636 "vdu_index": count_index
,
5639 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5640 if self
.ro_config
.ng
:
5642 "New Resources to be deployed: {}".format(scaling_info
)
5644 await self
._scale
_ng
_ro
(
5652 return "COMPLETED", "Done"
5653 except (LcmException
, asyncio
.CancelledError
):
5655 except Exception as e
:
5656 self
.logger
.debug("Error updating VNF {}".format(e
))
5657 return "FAILED", "Error updating VNF {}".format(e
)
5659 async def _ns_charm_upgrade(
5665 timeout
: float = None,
5667 """This method upgrade charms in VNF instances
5670 ee_id: Execution environment id
5671 path: Local path to the charm
5673 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5674 timeout: (Float) Timeout for the ns update operation
5677 result: (str, str) COMPLETED/FAILED, details
5680 charm_type
= charm_type
or "lxc_proxy_charm"
5681 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5685 charm_type
=charm_type
,
5686 timeout
=timeout
or self
.timeout
.ns_update
,
5690 return "COMPLETED", output
5692 except (LcmException
, asyncio
.CancelledError
):
5695 except Exception as e
:
5696 self
.logger
.debug("Error upgrading charm {}".format(path
))
5698 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5700 async def update(self
, nsr_id
, nslcmop_id
):
5701 """Update NS according to different update types
5703 This method performs upgrade of VNF instances then updates the revision
5704 number in VNF record
5707 nsr_id: Network service will be updated
5708 nslcmop_id: ns lcm operation id
5711 It may raise DbException, LcmException, N2VCException, K8sException
5714 # Try to lock HA task here
5715 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5716 if not task_is_locked_by_me
:
5719 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5720 self
.logger
.debug(logging_text
+ "Enter")
5722 # Set the required variables to be filled up later
5724 db_nslcmop_update
= {}
5726 nslcmop_operation_state
= None
5728 error_description_nslcmop
= ""
5730 change_type
= "updated"
5731 detailed_status
= ""
5732 member_vnf_index
= None
5735 # wait for any previous tasks in process
5736 step
= "Waiting for previous operations to terminate"
5737 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5738 self
._write
_ns
_status
(
5741 current_operation
="UPDATING",
5742 current_operation_id
=nslcmop_id
,
5745 step
= "Getting nslcmop from database"
5746 db_nslcmop
= self
.db
.get_one(
5747 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5749 update_type
= db_nslcmop
["operationParams"]["updateType"]
5751 step
= "Getting nsr from database"
5752 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5753 old_operational_status
= db_nsr
["operational-status"]
5754 db_nsr_update
["operational-status"] = "updating"
5755 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5756 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5758 if update_type
== "CHANGE_VNFPKG":
5759 # Get the input parameters given through update request
5760 vnf_instance_id
= db_nslcmop
["operationParams"][
5761 "changeVnfPackageData"
5762 ].get("vnfInstanceId")
5764 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5767 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5769 step
= "Getting vnfr from database"
5770 db_vnfr
= self
.db
.get_one(
5771 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5774 step
= "Getting vnfds from database"
5776 latest_vnfd
= self
.db
.get_one(
5777 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5779 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5782 current_vnf_revision
= db_vnfr
.get("revision", 1)
5783 current_vnfd
= self
.db
.get_one(
5785 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5786 fail_on_empty
=False,
5788 # Charm artifact paths will be filled up later
5790 current_charm_artifact_path
,
5791 target_charm_artifact_path
,
5792 charm_artifact_paths
,
5794 ) = ([], [], [], [])
5796 step
= "Checking if revision has changed in VNFD"
5797 if current_vnf_revision
!= latest_vnfd_revision
:
5798 change_type
= "policy_updated"
5800 # There is new revision of VNFD, update operation is required
5801 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5802 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5804 step
= "Removing the VNFD packages if they exist in the local path"
5805 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5806 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5808 step
= "Get the VNFD packages from FSMongo"
5809 self
.fs
.sync(from_path
=latest_vnfd_path
)
5810 self
.fs
.sync(from_path
=current_vnfd_path
)
5813 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5815 current_base_folder
= current_vnfd
["_admin"]["storage"]
5816 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5818 for vca_index
, vca_deployed
in enumerate(
5819 get_iterable(nsr_deployed
, "VCA")
5821 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5823 # Getting charm-id and charm-type
5824 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5825 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5826 vca_type
= vca_deployed
.get("type")
5827 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5830 ee_id
= vca_deployed
.get("ee_id")
5832 step
= "Getting descriptor config"
5833 if current_vnfd
.get("kdu"):
5834 search_key
= "kdu_name"
5836 search_key
= "vnfd_id"
5838 entity_id
= vca_deployed
.get(search_key
)
5840 descriptor_config
= get_configuration(
5841 current_vnfd
, entity_id
5844 if "execution-environment-list" in descriptor_config
:
5845 ee_list
= descriptor_config
.get(
5846 "execution-environment-list", []
5851 # There could be several charm used in the same VNF
5852 for ee_item
in ee_list
:
5853 if ee_item
.get("juju"):
5854 step
= "Getting charm name"
5855 charm_name
= ee_item
["juju"].get("charm")
5857 step
= "Setting Charm artifact paths"
5858 current_charm_artifact_path
.append(
5859 get_charm_artifact_path(
5860 current_base_folder
,
5863 current_vnf_revision
,
5866 target_charm_artifact_path
.append(
5867 get_charm_artifact_path(
5871 latest_vnfd_revision
,
5874 elif ee_item
.get("helm-chart"):
5875 # add chart to list and all parameters
5876 step
= "Getting helm chart name"
5877 chart_name
= ee_item
.get("helm-chart")
5879 ee_item
.get("helm-version")
5880 and ee_item
.get("helm-version") == "v2"
5884 vca_type
= "helm-v3"
5885 step
= "Setting Helm chart artifact paths"
5887 helm_artifacts
.append(
5889 "current_artifact_path": get_charm_artifact_path(
5890 current_base_folder
,
5893 current_vnf_revision
,
5895 "target_artifact_path": get_charm_artifact_path(
5899 latest_vnfd_revision
,
5902 "vca_index": vca_index
,
5903 "vdu_index": vdu_count_index
,
5907 charm_artifact_paths
= zip(
5908 current_charm_artifact_path
, target_charm_artifact_path
5911 step
= "Checking if software version has changed in VNFD"
5912 if find_software_version(current_vnfd
) != find_software_version(
5915 step
= "Checking if existing VNF has charm"
5916 for current_charm_path
, target_charm_path
in list(
5917 charm_artifact_paths
5919 if current_charm_path
:
5921 "Software version change is not supported as VNF instance {} has charm.".format(
5926 # There is no change in the charm package, then redeploy the VNF
5927 # based on new descriptor
5928 step
= "Redeploying VNF"
5929 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5930 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5931 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5933 if result
== "FAILED":
5934 nslcmop_operation_state
= result
5935 error_description_nslcmop
= detailed_status
5936 db_nslcmop_update
["detailed-status"] = detailed_status
5939 + " step {} Done with result {} {}".format(
5940 step
, nslcmop_operation_state
, detailed_status
5945 step
= "Checking if any charm package has changed or not"
5946 for current_charm_path
, target_charm_path
in list(
5947 charm_artifact_paths
5951 and target_charm_path
5952 and self
.check_charm_hash_changed(
5953 current_charm_path
, target_charm_path
5956 step
= "Checking whether VNF uses juju bundle"
5957 if check_juju_bundle_existence(current_vnfd
):
5959 "Charm upgrade is not supported for the instance which"
5960 " uses juju-bundle: {}".format(
5961 check_juju_bundle_existence(current_vnfd
)
5965 step
= "Upgrading Charm"
5969 ) = await self
._ns
_charm
_upgrade
(
5972 charm_type
=vca_type
,
5973 path
=self
.fs
.path
+ target_charm_path
,
5974 timeout
=timeout_seconds
,
5977 if result
== "FAILED":
5978 nslcmop_operation_state
= result
5979 error_description_nslcmop
= detailed_status
5981 db_nslcmop_update
["detailed-status"] = detailed_status
5984 + " step {} Done with result {} {}".format(
5985 step
, nslcmop_operation_state
, detailed_status
5989 step
= "Updating policies"
5990 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5991 result
= "COMPLETED"
5992 detailed_status
= "Done"
5993 db_nslcmop_update
["detailed-status"] = "Done"
5996 for item
in helm_artifacts
:
5998 item
["current_artifact_path"]
5999 and item
["target_artifact_path"]
6000 and self
.check_charm_hash_changed(
6001 item
["current_artifact_path"],
6002 item
["target_artifact_path"],
6006 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6009 vnfr_id
= db_vnfr
["_id"]
6010 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6012 "collection": "nsrs",
6013 "filter": {"_id": nsr_id
},
6014 "path": db_update_entry
,
6016 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6017 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6018 namespace
=namespace
,
6022 artifact_path
=item
["target_artifact_path"],
6025 vnf_id
= db_vnfr
.get("vnfd-ref")
6026 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6027 self
.logger
.debug("get ssh key block")
6031 ("config-access", "ssh-access", "required"),
6033 # Needed to inject a ssh key
6036 ("config-access", "ssh-access", "default-user"),
6039 "Install configuration Software, getting public ssh key"
6041 pub_key
= await self
.vca_map
[
6043 ].get_ee_ssh_public__key(
6044 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6048 "Insert public key into VM user={} ssh_key={}".format(
6052 self
.logger
.debug(logging_text
+ step
)
6054 # wait for RO (ip-address) Insert pub_key into VM
6055 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6065 initial_config_primitive_list
= config_descriptor
.get(
6066 "initial-config-primitive"
6068 config_primitive
= next(
6071 for p
in initial_config_primitive_list
6072 if p
["name"] == "config"
6076 if not config_primitive
:
6079 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6081 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6082 if db_vnfr
.get("additionalParamsForVnf"):
6083 deploy_params
.update(
6085 db_vnfr
["additionalParamsForVnf"].copy()
6088 primitive_params_
= self
._map
_primitive
_params
(
6089 config_primitive
, {}, deploy_params
6092 step
= "execute primitive '{}' params '{}'".format(
6093 config_primitive
["name"], primitive_params_
6095 self
.logger
.debug(logging_text
+ step
)
6096 await self
.vca_map
[vca_type
].exec_primitive(
6098 primitive_name
=config_primitive
["name"],
6099 params_dict
=primitive_params_
,
6105 step
= "Updating policies"
6106 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6107 detailed_status
= "Done"
6108 db_nslcmop_update
["detailed-status"] = "Done"
6110 # If nslcmop_operation_state is None, so any operation is not failed.
6111 if not nslcmop_operation_state
:
6112 nslcmop_operation_state
= "COMPLETED"
6114 # If update CHANGE_VNFPKG nslcmop_operation is successful
6115 # vnf revision need to be updated
6116 vnfr_update
["revision"] = latest_vnfd_revision
6117 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6121 + " task Done with result {} {}".format(
6122 nslcmop_operation_state
, detailed_status
6125 elif update_type
== "REMOVE_VNF":
6126 # This part is included in https://osm.etsi.org/gerrit/11876
6127 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6128 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6129 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6130 step
= "Removing VNF"
6131 (result
, detailed_status
) = await self
.remove_vnf(
6132 nsr_id
, nslcmop_id
, vnf_instance_id
6134 if result
== "FAILED":
6135 nslcmop_operation_state
= result
6136 error_description_nslcmop
= detailed_status
6137 db_nslcmop_update
["detailed-status"] = detailed_status
6138 change_type
= "vnf_terminated"
6139 if not nslcmop_operation_state
:
6140 nslcmop_operation_state
= "COMPLETED"
6143 + " task Done with result {} {}".format(
6144 nslcmop_operation_state
, detailed_status
6148 elif update_type
== "OPERATE_VNF":
6149 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6152 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6155 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6158 (result
, detailed_status
) = await self
.rebuild_start_stop(
6159 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6161 if result
== "FAILED":
6162 nslcmop_operation_state
= result
6163 error_description_nslcmop
= detailed_status
6164 db_nslcmop_update
["detailed-status"] = detailed_status
6165 if not nslcmop_operation_state
:
6166 nslcmop_operation_state
= "COMPLETED"
6169 + " task Done with result {} {}".format(
6170 nslcmop_operation_state
, detailed_status
6174 # If nslcmop_operation_state is None, so any operation is not failed.
6175 # All operations are executed in overall.
6176 if not nslcmop_operation_state
:
6177 nslcmop_operation_state
= "COMPLETED"
6178 db_nsr_update
["operational-status"] = old_operational_status
6180 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6181 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6183 except asyncio
.CancelledError
:
6185 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6187 exc
= "Operation was cancelled"
6188 except asyncio
.TimeoutError
:
6189 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6191 except Exception as e
:
6192 exc
= traceback
.format_exc()
6193 self
.logger
.critical(
6194 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6203 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6204 nslcmop_operation_state
= "FAILED"
6205 db_nsr_update
["operational-status"] = old_operational_status
6207 self
._write
_ns
_status
(
6209 ns_state
=db_nsr
["nsState"],
6210 current_operation
="IDLE",
6211 current_operation_id
=None,
6212 other_update
=db_nsr_update
,
6215 self
._write
_op
_status
(
6218 error_message
=error_description_nslcmop
,
6219 operation_state
=nslcmop_operation_state
,
6220 other_update
=db_nslcmop_update
,
6223 if nslcmop_operation_state
:
6227 "nslcmop_id": nslcmop_id
,
6228 "operationState": nslcmop_operation_state
,
6231 change_type
in ("vnf_terminated", "policy_updated")
6232 and member_vnf_index
6234 msg
.update({"vnf_member_index": member_vnf_index
})
6235 await self
.msg
.aiowrite("ns", change_type
, msg
)
6236 except Exception as e
:
6238 logging_text
+ "kafka_write notification Exception {}".format(e
)
6240 self
.logger
.debug(logging_text
+ "Exit")
6241 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6242 return nslcmop_operation_state
, detailed_status
6244 async def scale(self
, nsr_id
, nslcmop_id
):
6245 # Try to lock HA task here
6246 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6247 if not task_is_locked_by_me
:
6250 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6251 stage
= ["", "", ""]
6252 tasks_dict_info
= {}
6253 # ^ stage, step, VIM progress
6254 self
.logger
.debug(logging_text
+ "Enter")
6255 # get all needed from database
6257 db_nslcmop_update
= {}
6260 # in case of error, indicates what part of scale was failed to put nsr at error status
6261 scale_process
= None
6262 old_operational_status
= ""
6263 old_config_status
= ""
6266 # wait for any previous tasks in process
6267 step
= "Waiting for previous operations to terminate"
6268 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6269 self
._write
_ns
_status
(
6272 current_operation
="SCALING",
6273 current_operation_id
=nslcmop_id
,
6276 step
= "Getting nslcmop from database"
6278 step
+ " after having waited for previous tasks to be completed"
6280 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6282 step
= "Getting nsr from database"
6283 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6284 old_operational_status
= db_nsr
["operational-status"]
6285 old_config_status
= db_nsr
["config-status"]
6287 step
= "Parsing scaling parameters"
6288 db_nsr_update
["operational-status"] = "scaling"
6289 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6290 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6292 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6294 ]["member-vnf-index"]
6295 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6297 ]["scaling-group-descriptor"]
6298 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6299 # for backward compatibility
6300 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6301 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6302 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6305 step
= "Getting vnfr from database"
6306 db_vnfr
= self
.db
.get_one(
6307 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6310 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6312 step
= "Getting vnfd from database"
6313 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6315 base_folder
= db_vnfd
["_admin"]["storage"]
6317 step
= "Getting scaling-group-descriptor"
6318 scaling_descriptor
= find_in_list(
6319 get_scaling_aspect(db_vnfd
),
6320 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6322 if not scaling_descriptor
:
6324 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6325 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6328 step
= "Sending scale order to VIM"
6329 # TODO check if ns is in a proper status
6331 if not db_nsr
["_admin"].get("scaling-group"):
6336 "_admin.scaling-group": [
6337 {"name": scaling_group
, "nb-scale-op": 0}
6341 admin_scale_index
= 0
6343 for admin_scale_index
, admin_scale_info
in enumerate(
6344 db_nsr
["_admin"]["scaling-group"]
6346 if admin_scale_info
["name"] == scaling_group
:
6347 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6349 else: # not found, set index one plus last element and add new entry with the name
6350 admin_scale_index
+= 1
6352 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6355 vca_scaling_info
= []
6356 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6357 if scaling_type
== "SCALE_OUT":
6358 if "aspect-delta-details" not in scaling_descriptor
:
6360 "Aspect delta details not fount in scaling descriptor {}".format(
6361 scaling_descriptor
["name"]
6364 # count if max-instance-count is reached
6365 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6367 scaling_info
["scaling_direction"] = "OUT"
6368 scaling_info
["vdu-create"] = {}
6369 scaling_info
["kdu-create"] = {}
6370 for delta
in deltas
:
6371 for vdu_delta
in delta
.get("vdu-delta", {}):
6372 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6373 # vdu_index also provides the number of instance of the targeted vdu
6374 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6375 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6379 additional_params
= (
6380 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6383 cloud_init_list
= []
6385 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6386 max_instance_count
= 10
6387 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6388 max_instance_count
= vdu_profile
.get(
6389 "max-number-of-instances", 10
6392 default_instance_num
= get_number_of_instances(
6395 instances_number
= vdu_delta
.get("number-of-instances", 1)
6396 nb_scale_op
+= instances_number
6398 new_instance_count
= nb_scale_op
+ default_instance_num
6399 # Control if new count is over max and vdu count is less than max.
6400 # Then assign new instance count
6401 if new_instance_count
> max_instance_count
> vdu_count
:
6402 instances_number
= new_instance_count
- max_instance_count
6404 instances_number
= instances_number
6406 if new_instance_count
> max_instance_count
:
6408 "reached the limit of {} (max-instance-count) "
6409 "scaling-out operations for the "
6410 "scaling-group-descriptor '{}'".format(
6411 nb_scale_op
, scaling_group
6414 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6416 # TODO Information of its own ip is not available because db_vnfr is not updated.
6417 additional_params
["OSM"] = get_osm_params(
6418 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6420 cloud_init_list
.append(
6421 self
._parse
_cloud
_init
(
6428 vca_scaling_info
.append(
6430 "osm_vdu_id": vdu_delta
["id"],
6431 "member-vnf-index": vnf_index
,
6433 "vdu_index": vdu_index
+ x
,
6436 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6437 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6438 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6439 kdu_name
= kdu_profile
["kdu-name"]
6440 resource_name
= kdu_profile
.get("resource-name", "")
6442 # Might have different kdus in the same delta
6443 # Should have list for each kdu
6444 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6445 scaling_info
["kdu-create"][kdu_name
] = []
6447 kdur
= get_kdur(db_vnfr
, kdu_name
)
6448 if kdur
.get("helm-chart"):
6449 k8s_cluster_type
= "helm-chart-v3"
6450 self
.logger
.debug("kdur: {}".format(kdur
))
6452 kdur
.get("helm-version")
6453 and kdur
.get("helm-version") == "v2"
6455 k8s_cluster_type
= "helm-chart"
6456 elif kdur
.get("juju-bundle"):
6457 k8s_cluster_type
= "juju-bundle"
6460 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6461 "juju-bundle. Maybe an old NBI version is running".format(
6462 db_vnfr
["member-vnf-index-ref"], kdu_name
6466 max_instance_count
= 10
6467 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6468 max_instance_count
= kdu_profile
.get(
6469 "max-number-of-instances", 10
6472 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6473 deployed_kdu
, _
= get_deployed_kdu(
6474 nsr_deployed
, kdu_name
, vnf_index
6476 if deployed_kdu
is None:
6478 "KDU '{}' for vnf '{}' not deployed".format(
6482 kdu_instance
= deployed_kdu
.get("kdu-instance")
6483 instance_num
= await self
.k8scluster_map
[
6489 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6490 kdu_model
=deployed_kdu
.get("kdu-model"),
6492 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6493 "number-of-instances", 1
6496 # Control if new count is over max and instance_num is less than max.
6497 # Then assign max instance number to kdu replica count
6498 if kdu_replica_count
> max_instance_count
> instance_num
:
6499 kdu_replica_count
= max_instance_count
6500 if kdu_replica_count
> max_instance_count
:
6502 "reached the limit of {} (max-instance-count) "
6503 "scaling-out operations for the "
6504 "scaling-group-descriptor '{}'".format(
6505 instance_num
, scaling_group
6509 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6510 vca_scaling_info
.append(
6512 "osm_kdu_id": kdu_name
,
6513 "member-vnf-index": vnf_index
,
6515 "kdu_index": instance_num
+ x
- 1,
6518 scaling_info
["kdu-create"][kdu_name
].append(
6520 "member-vnf-index": vnf_index
,
6522 "k8s-cluster-type": k8s_cluster_type
,
6523 "resource-name": resource_name
,
6524 "scale": kdu_replica_count
,
6527 elif scaling_type
== "SCALE_IN":
6528 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6530 scaling_info
["scaling_direction"] = "IN"
6531 scaling_info
["vdu-delete"] = {}
6532 scaling_info
["kdu-delete"] = {}
6534 for delta
in deltas
:
6535 for vdu_delta
in delta
.get("vdu-delta", {}):
6536 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6537 min_instance_count
= 0
6538 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6539 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6540 min_instance_count
= vdu_profile
["min-number-of-instances"]
6542 default_instance_num
= get_number_of_instances(
6543 db_vnfd
, vdu_delta
["id"]
6545 instance_num
= vdu_delta
.get("number-of-instances", 1)
6546 nb_scale_op
-= instance_num
6548 new_instance_count
= nb_scale_op
+ default_instance_num
6550 if new_instance_count
< min_instance_count
< vdu_count
:
6551 instances_number
= min_instance_count
- new_instance_count
6553 instances_number
= instance_num
6555 if new_instance_count
< min_instance_count
:
6557 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6558 "scaling-group-descriptor '{}'".format(
6559 nb_scale_op
, scaling_group
6562 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6563 vca_scaling_info
.append(
6565 "osm_vdu_id": vdu_delta
["id"],
6566 "member-vnf-index": vnf_index
,
6568 "vdu_index": vdu_index
- 1 - x
,
6571 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6572 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6573 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6574 kdu_name
= kdu_profile
["kdu-name"]
6575 resource_name
= kdu_profile
.get("resource-name", "")
6577 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6578 scaling_info
["kdu-delete"][kdu_name
] = []
6580 kdur
= get_kdur(db_vnfr
, kdu_name
)
6581 if kdur
.get("helm-chart"):
6582 k8s_cluster_type
= "helm-chart-v3"
6583 self
.logger
.debug("kdur: {}".format(kdur
))
6585 kdur
.get("helm-version")
6586 and kdur
.get("helm-version") == "v2"
6588 k8s_cluster_type
= "helm-chart"
6589 elif kdur
.get("juju-bundle"):
6590 k8s_cluster_type
= "juju-bundle"
6593 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6594 "juju-bundle. Maybe an old NBI version is running".format(
6595 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6599 min_instance_count
= 0
6600 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6601 min_instance_count
= kdu_profile
["min-number-of-instances"]
6603 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6604 deployed_kdu
, _
= get_deployed_kdu(
6605 nsr_deployed
, kdu_name
, vnf_index
6607 if deployed_kdu
is None:
6609 "KDU '{}' for vnf '{}' not deployed".format(
6613 kdu_instance
= deployed_kdu
.get("kdu-instance")
6614 instance_num
= await self
.k8scluster_map
[
6620 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6621 kdu_model
=deployed_kdu
.get("kdu-model"),
6623 kdu_replica_count
= instance_num
- kdu_delta
.get(
6624 "number-of-instances", 1
6627 if kdu_replica_count
< min_instance_count
< instance_num
:
6628 kdu_replica_count
= min_instance_count
6629 if kdu_replica_count
< min_instance_count
:
6631 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6632 "scaling-group-descriptor '{}'".format(
6633 instance_num
, scaling_group
6637 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6638 vca_scaling_info
.append(
6640 "osm_kdu_id": kdu_name
,
6641 "member-vnf-index": vnf_index
,
6643 "kdu_index": instance_num
- x
- 1,
6646 scaling_info
["kdu-delete"][kdu_name
].append(
6648 "member-vnf-index": vnf_index
,
6650 "k8s-cluster-type": k8s_cluster_type
,
6651 "resource-name": resource_name
,
6652 "scale": kdu_replica_count
,
6656 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6657 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6658 if scaling_info
["scaling_direction"] == "IN":
6659 for vdur
in reversed(db_vnfr
["vdur"]):
6660 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6661 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6662 scaling_info
["vdu"].append(
6664 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6665 "vdu_id": vdur
["vdu-id-ref"],
6669 for interface
in vdur
["interfaces"]:
6670 scaling_info
["vdu"][-1]["interface"].append(
6672 "name": interface
["name"],
6673 "ip_address": interface
["ip-address"],
6674 "mac_address": interface
.get("mac-address"),
6677 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6680 step
= "Executing pre-scale vnf-config-primitive"
6681 if scaling_descriptor
.get("scaling-config-action"):
6682 for scaling_config_action
in scaling_descriptor
[
6683 "scaling-config-action"
6686 scaling_config_action
.get("trigger") == "pre-scale-in"
6687 and scaling_type
== "SCALE_IN"
6689 scaling_config_action
.get("trigger") == "pre-scale-out"
6690 and scaling_type
== "SCALE_OUT"
6692 vnf_config_primitive
= scaling_config_action
[
6693 "vnf-config-primitive-name-ref"
6695 step
= db_nslcmop_update
[
6697 ] = "executing pre-scale scaling-config-action '{}'".format(
6698 vnf_config_primitive
6701 # look for primitive
6702 for config_primitive
in (
6703 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6704 ).get("config-primitive", ()):
6705 if config_primitive
["name"] == vnf_config_primitive
:
6709 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6710 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6711 "primitive".format(scaling_group
, vnf_config_primitive
)
6714 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6715 if db_vnfr
.get("additionalParamsForVnf"):
6716 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6718 scale_process
= "VCA"
6719 db_nsr_update
["config-status"] = "configuring pre-scaling"
6720 primitive_params
= self
._map
_primitive
_params
(
6721 config_primitive
, {}, vnfr_params
6724 # Pre-scale retry check: Check if this sub-operation has been executed before
6725 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6728 vnf_config_primitive
,
6732 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6733 # Skip sub-operation
6734 result
= "COMPLETED"
6735 result_detail
= "Done"
6738 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6739 vnf_config_primitive
, result
, result_detail
6743 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6744 # New sub-operation: Get index of this sub-operation
6746 len(db_nslcmop
.get("_admin", {}).get("operations"))
6751 + "vnf_config_primitive={} New sub-operation".format(
6752 vnf_config_primitive
6756 # retry: Get registered params for this existing sub-operation
6757 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6760 vnf_index
= op
.get("member_vnf_index")
6761 vnf_config_primitive
= op
.get("primitive")
6762 primitive_params
= op
.get("primitive_params")
6765 + "vnf_config_primitive={} Sub-operation retry".format(
6766 vnf_config_primitive
6769 # Execute the primitive, either with new (first-time) or registered (reintent) args
6770 ee_descriptor_id
= config_primitive
.get(
6771 "execution-environment-ref"
6773 primitive_name
= config_primitive
.get(
6774 "execution-environment-primitive", vnf_config_primitive
6776 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6777 nsr_deployed
["VCA"],
6778 member_vnf_index
=vnf_index
,
6780 vdu_count_index
=None,
6781 ee_descriptor_id
=ee_descriptor_id
,
6783 result
, result_detail
= await self
._ns
_execute
_primitive
(
6792 + "vnf_config_primitive={} Done with result {} {}".format(
6793 vnf_config_primitive
, result
, result_detail
6796 # Update operationState = COMPLETED | FAILED
6797 self
._update
_suboperation
_status
(
6798 db_nslcmop
, op_index
, result
, result_detail
6801 if result
== "FAILED":
6802 raise LcmException(result_detail
)
6803 db_nsr_update
["config-status"] = old_config_status
6804 scale_process
= None
6808 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6811 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6814 # SCALE-IN VCA - BEGIN
6815 if vca_scaling_info
:
6816 step
= db_nslcmop_update
[
6818 ] = "Deleting the execution environments"
6819 scale_process
= "VCA"
6820 for vca_info
in vca_scaling_info
:
6821 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6822 member_vnf_index
= str(vca_info
["member-vnf-index"])
6824 logging_text
+ "vdu info: {}".format(vca_info
)
6826 if vca_info
.get("osm_vdu_id"):
6827 vdu_id
= vca_info
["osm_vdu_id"]
6828 vdu_index
= int(vca_info
["vdu_index"])
6831 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6832 member_vnf_index
, vdu_id
, vdu_index
6834 stage
[2] = step
= "Scaling in VCA"
6835 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6836 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6837 config_update
= db_nsr
["configurationStatus"]
6838 for vca_index
, vca
in enumerate(vca_update
):
6840 (vca
or vca
.get("ee_id"))
6841 and vca
["member-vnf-index"] == member_vnf_index
6842 and vca
["vdu_count_index"] == vdu_index
6844 if vca
.get("vdu_id"):
6845 config_descriptor
= get_configuration(
6846 db_vnfd
, vca
.get("vdu_id")
6848 elif vca
.get("kdu_name"):
6849 config_descriptor
= get_configuration(
6850 db_vnfd
, vca
.get("kdu_name")
6853 config_descriptor
= get_configuration(
6854 db_vnfd
, db_vnfd
["id"]
6856 operation_params
= (
6857 db_nslcmop
.get("operationParams") or {}
6859 exec_terminate_primitives
= not operation_params
.get(
6860 "skip_terminate_primitives"
6861 ) and vca
.get("needed_terminate")
6862 task
= asyncio
.ensure_future(
6871 exec_primitives
=exec_terminate_primitives
,
6875 timeout
=self
.timeout
.charm_delete
,
6878 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6881 del vca_update
[vca_index
]
6882 del config_update
[vca_index
]
6883 # wait for pending tasks of terminate primitives
6887 + "Waiting for tasks {}".format(
6888 list(tasks_dict_info
.keys())
6891 error_list
= await self
._wait
_for
_tasks
(
6895 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6900 tasks_dict_info
.clear()
6902 raise LcmException("; ".join(error_list
))
6904 db_vca_and_config_update
= {
6905 "_admin.deployed.VCA": vca_update
,
6906 "configurationStatus": config_update
,
6909 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6911 scale_process
= None
6912 # SCALE-IN VCA - END
6915 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6916 scale_process
= "RO"
6917 if self
.ro_config
.ng
:
6918 await self
._scale
_ng
_ro
(
6919 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6921 scaling_info
.pop("vdu-create", None)
6922 scaling_info
.pop("vdu-delete", None)
6924 scale_process
= None
6928 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6929 scale_process
= "KDU"
6930 await self
._scale
_kdu
(
6931 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6933 scaling_info
.pop("kdu-create", None)
6934 scaling_info
.pop("kdu-delete", None)
6936 scale_process
= None
6940 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6942 # SCALE-UP VCA - BEGIN
6943 if vca_scaling_info
:
6944 step
= db_nslcmop_update
[
6946 ] = "Creating new execution environments"
6947 scale_process
= "VCA"
6948 for vca_info
in vca_scaling_info
:
6949 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6950 member_vnf_index
= str(vca_info
["member-vnf-index"])
6952 logging_text
+ "vdu info: {}".format(vca_info
)
6954 vnfd_id
= db_vnfr
["vnfd-ref"]
6955 if vca_info
.get("osm_vdu_id"):
6956 vdu_index
= int(vca_info
["vdu_index"])
6957 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6958 if db_vnfr
.get("additionalParamsForVnf"):
6959 deploy_params
.update(
6961 db_vnfr
["additionalParamsForVnf"].copy()
6964 descriptor_config
= get_configuration(
6965 db_vnfd
, db_vnfd
["id"]
6967 if descriptor_config
:
6973 logging_text
=logging_text
6974 + "member_vnf_index={} ".format(member_vnf_index
),
6977 nslcmop_id
=nslcmop_id
,
6983 kdu_index
=kdu_index
,
6984 member_vnf_index
=member_vnf_index
,
6985 vdu_index
=vdu_index
,
6987 deploy_params
=deploy_params
,
6988 descriptor_config
=descriptor_config
,
6989 base_folder
=base_folder
,
6990 task_instantiation_info
=tasks_dict_info
,
6993 vdu_id
= vca_info
["osm_vdu_id"]
6994 vdur
= find_in_list(
6995 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6997 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6998 if vdur
.get("additionalParams"):
6999 deploy_params_vdu
= parse_yaml_strings(
7000 vdur
["additionalParams"]
7003 deploy_params_vdu
= deploy_params
7004 deploy_params_vdu
["OSM"] = get_osm_params(
7005 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7007 if descriptor_config
:
7013 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7014 member_vnf_index
, vdu_id
, vdu_index
7016 stage
[2] = step
= "Scaling out VCA"
7017 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7019 logging_text
=logging_text
7020 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7021 member_vnf_index
, vdu_id
, vdu_index
7025 nslcmop_id
=nslcmop_id
,
7031 member_vnf_index
=member_vnf_index
,
7032 vdu_index
=vdu_index
,
7033 kdu_index
=kdu_index
,
7035 deploy_params
=deploy_params_vdu
,
7036 descriptor_config
=descriptor_config
,
7037 base_folder
=base_folder
,
7038 task_instantiation_info
=tasks_dict_info
,
7041 # SCALE-UP VCA - END
7042 scale_process
= None
7045 # execute primitive service POST-SCALING
7046 step
= "Executing post-scale vnf-config-primitive"
7047 if scaling_descriptor
.get("scaling-config-action"):
7048 for scaling_config_action
in scaling_descriptor
[
7049 "scaling-config-action"
7052 scaling_config_action
.get("trigger") == "post-scale-in"
7053 and scaling_type
== "SCALE_IN"
7055 scaling_config_action
.get("trigger") == "post-scale-out"
7056 and scaling_type
== "SCALE_OUT"
7058 vnf_config_primitive
= scaling_config_action
[
7059 "vnf-config-primitive-name-ref"
7061 step
= db_nslcmop_update
[
7063 ] = "executing post-scale scaling-config-action '{}'".format(
7064 vnf_config_primitive
7067 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7068 if db_vnfr
.get("additionalParamsForVnf"):
7069 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7071 # look for primitive
7072 for config_primitive
in (
7073 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7074 ).get("config-primitive", ()):
7075 if config_primitive
["name"] == vnf_config_primitive
:
7079 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7080 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7081 "config-primitive".format(
7082 scaling_group
, vnf_config_primitive
7085 scale_process
= "VCA"
7086 db_nsr_update
["config-status"] = "configuring post-scaling"
7087 primitive_params
= self
._map
_primitive
_params
(
7088 config_primitive
, {}, vnfr_params
7091 # Post-scale retry check: Check if this sub-operation has been executed before
7092 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7095 vnf_config_primitive
,
7099 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7100 # Skip sub-operation
7101 result
= "COMPLETED"
7102 result_detail
= "Done"
7105 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7106 vnf_config_primitive
, result
, result_detail
7110 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7111 # New sub-operation: Get index of this sub-operation
7113 len(db_nslcmop
.get("_admin", {}).get("operations"))
7118 + "vnf_config_primitive={} New sub-operation".format(
7119 vnf_config_primitive
7123 # retry: Get registered params for this existing sub-operation
7124 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7127 vnf_index
= op
.get("member_vnf_index")
7128 vnf_config_primitive
= op
.get("primitive")
7129 primitive_params
= op
.get("primitive_params")
7132 + "vnf_config_primitive={} Sub-operation retry".format(
7133 vnf_config_primitive
7136 # Execute the primitive, either with new (first-time) or registered (reintent) args
7137 ee_descriptor_id
= config_primitive
.get(
7138 "execution-environment-ref"
7140 primitive_name
= config_primitive
.get(
7141 "execution-environment-primitive", vnf_config_primitive
7143 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7144 nsr_deployed
["VCA"],
7145 member_vnf_index
=vnf_index
,
7147 vdu_count_index
=None,
7148 ee_descriptor_id
=ee_descriptor_id
,
7150 result
, result_detail
= await self
._ns
_execute
_primitive
(
7159 + "vnf_config_primitive={} Done with result {} {}".format(
7160 vnf_config_primitive
, result
, result_detail
7163 # Update operationState = COMPLETED | FAILED
7164 self
._update
_suboperation
_status
(
7165 db_nslcmop
, op_index
, result
, result_detail
7168 if result
== "FAILED":
7169 raise LcmException(result_detail
)
7170 db_nsr_update
["config-status"] = old_config_status
7171 scale_process
= None
7176 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7177 db_nsr_update
["operational-status"] = (
7179 if old_operational_status
== "failed"
7180 else old_operational_status
7182 db_nsr_update
["config-status"] = old_config_status
7185 ROclient
.ROClientException
,
7190 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7192 except asyncio
.CancelledError
:
7194 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7196 exc
= "Operation was cancelled"
7197 except Exception as e
:
7198 exc
= traceback
.format_exc()
7199 self
.logger
.critical(
7200 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7204 self
._write
_ns
_status
(
7207 current_operation
="IDLE",
7208 current_operation_id
=None,
7211 stage
[1] = "Waiting for instantiate pending tasks."
7212 self
.logger
.debug(logging_text
+ stage
[1])
7213 exc
= await self
._wait
_for
_tasks
(
7216 self
.timeout
.ns_deploy
,
7224 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7225 nslcmop_operation_state
= "FAILED"
7227 db_nsr_update
["operational-status"] = old_operational_status
7228 db_nsr_update
["config-status"] = old_config_status
7229 db_nsr_update
["detailed-status"] = ""
7231 if "VCA" in scale_process
:
7232 db_nsr_update
["config-status"] = "failed"
7233 if "RO" in scale_process
:
7234 db_nsr_update
["operational-status"] = "failed"
7237 ] = "FAILED scaling nslcmop={} {}: {}".format(
7238 nslcmop_id
, step
, exc
7241 error_description_nslcmop
= None
7242 nslcmop_operation_state
= "COMPLETED"
7243 db_nslcmop_update
["detailed-status"] = "Done"
7245 self
._write
_op
_status
(
7248 error_message
=error_description_nslcmop
,
7249 operation_state
=nslcmop_operation_state
,
7250 other_update
=db_nslcmop_update
,
7253 self
._write
_ns
_status
(
7256 current_operation
="IDLE",
7257 current_operation_id
=None,
7258 other_update
=db_nsr_update
,
7261 if nslcmop_operation_state
:
7265 "nslcmop_id": nslcmop_id
,
7266 "operationState": nslcmop_operation_state
,
7268 await self
.msg
.aiowrite("ns", "scaled", msg
)
7269 except Exception as e
:
7271 logging_text
+ "kafka_write notification Exception {}".format(e
)
7273 self
.logger
.debug(logging_text
+ "Exit")
7274 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7276 async def _scale_kdu(
7277 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7279 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7280 for kdu_name
in _scaling_info
:
7281 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7282 deployed_kdu
, index
= get_deployed_kdu(
7283 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7285 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7286 kdu_instance
= deployed_kdu
["kdu-instance"]
7287 kdu_model
= deployed_kdu
.get("kdu-model")
7288 scale
= int(kdu_scaling_info
["scale"])
7289 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7292 "collection": "nsrs",
7293 "filter": {"_id": nsr_id
},
7294 "path": "_admin.deployed.K8s.{}".format(index
),
7297 step
= "scaling application {}".format(
7298 kdu_scaling_info
["resource-name"]
7300 self
.logger
.debug(logging_text
+ step
)
7302 if kdu_scaling_info
["type"] == "delete":
7303 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7306 and kdu_config
.get("terminate-config-primitive")
7307 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7309 terminate_config_primitive_list
= kdu_config
.get(
7310 "terminate-config-primitive"
7312 terminate_config_primitive_list
.sort(
7313 key
=lambda val
: int(val
["seq"])
7317 terminate_config_primitive
7318 ) in terminate_config_primitive_list
:
7319 primitive_params_
= self
._map
_primitive
_params
(
7320 terminate_config_primitive
, {}, {}
7322 step
= "execute terminate config primitive"
7323 self
.logger
.debug(logging_text
+ step
)
7324 await asyncio
.wait_for(
7325 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7326 cluster_uuid
=cluster_uuid
,
7327 kdu_instance
=kdu_instance
,
7328 primitive_name
=terminate_config_primitive
["name"],
7329 params
=primitive_params_
,
7331 total_timeout
=self
.timeout
.primitive
,
7334 timeout
=self
.timeout
.primitive
7335 * self
.timeout
.primitive_outer_factor
,
7338 await asyncio
.wait_for(
7339 self
.k8scluster_map
[k8s_cluster_type
].scale(
7340 kdu_instance
=kdu_instance
,
7342 resource_name
=kdu_scaling_info
["resource-name"],
7343 total_timeout
=self
.timeout
.scale_on_error
,
7345 cluster_uuid
=cluster_uuid
,
7346 kdu_model
=kdu_model
,
7350 timeout
=self
.timeout
.scale_on_error
7351 * self
.timeout
.scale_on_error_outer_factor
,
7354 if kdu_scaling_info
["type"] == "create":
7355 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7358 and kdu_config
.get("initial-config-primitive")
7359 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7361 initial_config_primitive_list
= kdu_config
.get(
7362 "initial-config-primitive"
7364 initial_config_primitive_list
.sort(
7365 key
=lambda val
: int(val
["seq"])
7368 for initial_config_primitive
in initial_config_primitive_list
:
7369 primitive_params_
= self
._map
_primitive
_params
(
7370 initial_config_primitive
, {}, {}
7372 step
= "execute initial config primitive"
7373 self
.logger
.debug(logging_text
+ step
)
7374 await asyncio
.wait_for(
7375 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7376 cluster_uuid
=cluster_uuid
,
7377 kdu_instance
=kdu_instance
,
7378 primitive_name
=initial_config_primitive
["name"],
7379 params
=primitive_params_
,
7386 async def _scale_ng_ro(
7387 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7389 nsr_id
= db_nslcmop
["nsInstanceId"]
7390 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7393 # read from db: vnfd's for every vnf
7396 # for each vnf in ns, read vnfd
7397 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7398 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7399 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7400 # if we haven't this vnfd, read it from db
7401 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7403 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7404 db_vnfds
.append(vnfd
)
7405 n2vc_key
= self
.n2vc
.get_public_key()
7406 n2vc_key_list
= [n2vc_key
]
7409 vdu_scaling_info
.get("vdu-create"),
7410 vdu_scaling_info
.get("vdu-delete"),
7413 # db_vnfr has been updated, update db_vnfrs to use it
7414 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7415 await self
._instantiate
_ng
_ro
(
7425 start_deploy
=time(),
7426 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7428 if vdu_scaling_info
.get("vdu-delete"):
7430 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7433 async def extract_prometheus_scrape_jobs(
7437 ee_config_descriptor
: dict,
7442 vnf_member_index
: str = "",
7444 vdu_index
: int = None,
7446 kdu_index
: int = None,
7448 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7449 This method will wait until the corresponding VDU or KDU is fully instantiated
7452 ee_id (str): Execution Environment ID
7453 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7454 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7455 vnfr_id (str): VNFR ID where this EE applies
7456 nsr_id (str): NSR ID where this EE applies
7457 target_ip (str): VDU/KDU instance IP address
7458 element_type (str): NS or VNF or VDU or KDU
7459 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7460 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7461 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7462 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7463 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7466 LcmException: When the VDU or KDU instance was not found in an hour
7469 _type_: Prometheus jobs
7471 # default the vdur and kdur names to an empty string, to avoid any later
7472 # problem with Prometheus when the element type is not VDU or KDU
7476 # look if exist a file called 'prometheus*.j2' and
7477 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7481 for f
in artifact_content
7482 if f
.startswith("prometheus") and f
.endswith(".j2")
7488 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7491 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7492 if element_type
in ("VDU", "KDU"):
7493 for _
in range(360):
7494 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7495 if vdu_id
and vdu_index
is not None:
7499 for x
in get_iterable(db_vnfr
, "vdur")
7501 x
.get("vdu-id-ref") == vdu_id
7502 and x
.get("count-index") == vdu_index
7507 if vdur
.get("name"):
7508 vdur_name
= vdur
.get("name")
7510 if kdu_name
and kdu_index
is not None:
7514 for x
in get_iterable(db_vnfr
, "kdur")
7516 x
.get("kdu-name") == kdu_name
7517 and x
.get("count-index") == kdu_index
7522 if kdur
.get("name"):
7523 kdur_name
= kdur
.get("name")
7526 await asyncio
.sleep(10)
7528 if vdu_id
and vdu_index
is not None:
7530 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7532 if kdu_name
and kdu_index
is not None:
7534 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7538 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7539 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7541 vnfr_id
= vnfr_id
.replace("-", "")
7543 "JOB_NAME": vnfr_id
,
7544 "TARGET_IP": target_ip
,
7545 "EXPORTER_POD_IP": host_name
,
7546 "EXPORTER_POD_PORT": host_port
,
7548 "VNF_MEMBER_INDEX": vnf_member_index
,
7549 "VDUR_NAME": vdur_name
,
7550 "KDUR_NAME": kdur_name
,
7551 "ELEMENT_TYPE": element_type
,
7553 job_list
= parse_job(job_data
, variables
)
7554 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7555 for job
in job_list
:
7557 not isinstance(job
.get("job_name"), str)
7558 or vnfr_id
not in job
["job_name"]
7560 job
["job_name"] = vnfr_id
+ "_" + str(SystemRandom().randint(1, 10000))
7561 job
["nsr_id"] = nsr_id
7562 job
["vnfr_id"] = vnfr_id
7565 async def rebuild_start_stop(
7566 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7568 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7569 self
.logger
.info(logging_text
+ "Enter")
7570 stage
= ["Preparing the environment", ""]
7571 # database nsrs record
7575 # in case of error, indicates what part of scale was failed to put nsr at error status
7576 start_deploy
= time()
7578 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7579 vim_account_id
= db_vnfr
.get("vim-account-id")
7580 vim_info_key
= "vim:" + vim_account_id
7581 vdu_id
= additional_param
["vdu_id"]
7582 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7583 vdur
= find_in_list(
7584 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7587 vdu_vim_name
= vdur
["name"]
7588 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7589 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7591 raise LcmException("Target vdu is not found")
7592 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7593 # wait for any previous tasks in process
7594 stage
[1] = "Waiting for previous operations to terminate"
7595 self
.logger
.info(stage
[1])
7596 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7598 stage
[1] = "Reading from database."
7599 self
.logger
.info(stage
[1])
7600 self
._write
_ns
_status
(
7603 current_operation
=operation_type
.upper(),
7604 current_operation_id
=nslcmop_id
,
7606 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7609 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7610 db_nsr_update
["operational-status"] = operation_type
7611 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7615 "vim_vm_id": vim_vm_id
,
7617 "vdu_index": additional_param
["count-index"],
7618 "vdu_id": vdur
["id"],
7619 "target_vim": target_vim
,
7620 "vim_account_id": vim_account_id
,
7623 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7624 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7625 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7626 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7627 self
.logger
.info("response from RO: {}".format(result_dict
))
7628 action_id
= result_dict
["action_id"]
7629 await self
._wait
_ng
_ro
(
7634 self
.timeout
.operate
,
7636 "start_stop_rebuild",
7638 return "COMPLETED", "Done"
7639 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7640 self
.logger
.error("Exit Exception {}".format(e
))
7642 except asyncio
.CancelledError
:
7643 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7644 exc
= "Operation was cancelled"
7645 except Exception as e
:
7646 exc
= traceback
.format_exc()
7647 self
.logger
.critical(
7648 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7650 return "FAILED", "Error in operate VNF {}".format(exc
)
7652 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7654 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7656 :param: vim_account_id: VIM Account ID
7658 :return: (cloud_name, cloud_credential)
7660 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7661 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7663 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7665 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7667 :param: vim_account_id: VIM Account ID
7669 :return: (cloud_name, cloud_credential)
7671 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7672 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7674 async def migrate(self
, nsr_id
, nslcmop_id
):
7676 Migrate VNFs and VDUs instances in a NS
7678 :param: nsr_id: NS Instance ID
7679 :param: nslcmop_id: nslcmop ID of migrate
7682 # Try to lock HA task here
7683 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7684 if not task_is_locked_by_me
:
7686 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7687 self
.logger
.debug(logging_text
+ "Enter")
7688 # get all needed from database
7690 db_nslcmop_update
= {}
7691 nslcmop_operation_state
= None
7695 # in case of error, indicates what part of scale was failed to put nsr at error status
7696 start_deploy
= time()
7699 # wait for any previous tasks in process
7700 step
= "Waiting for previous operations to terminate"
7701 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7703 self
._write
_ns
_status
(
7706 current_operation
="MIGRATING",
7707 current_operation_id
=nslcmop_id
,
7709 step
= "Getting nslcmop from database"
7711 step
+ " after having waited for previous tasks to be completed"
7713 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7714 migrate_params
= db_nslcmop
.get("operationParams")
7717 target
.update(migrate_params
)
7718 desc
= await self
.RO
.migrate(nsr_id
, target
)
7719 self
.logger
.debug("RO return > {}".format(desc
))
7720 action_id
= desc
["action_id"]
7721 await self
._wait
_ng
_ro
(
7726 self
.timeout
.migrate
,
7727 operation
="migrate",
7729 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7730 self
.logger
.error("Exit Exception {}".format(e
))
7732 except asyncio
.CancelledError
:
7733 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7734 exc
= "Operation was cancelled"
7735 except Exception as e
:
7736 exc
= traceback
.format_exc()
7737 self
.logger
.critical(
7738 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7741 self
._write
_ns
_status
(
7744 current_operation
="IDLE",
7745 current_operation_id
=None,
7748 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7749 nslcmop_operation_state
= "FAILED"
7751 nslcmop_operation_state
= "COMPLETED"
7752 db_nslcmop_update
["detailed-status"] = "Done"
7753 db_nsr_update
["detailed-status"] = "Done"
7755 self
._write
_op
_status
(
7759 operation_state
=nslcmop_operation_state
,
7760 other_update
=db_nslcmop_update
,
7762 if nslcmop_operation_state
:
7766 "nslcmop_id": nslcmop_id
,
7767 "operationState": nslcmop_operation_state
,
7769 await self
.msg
.aiowrite("ns", "migrated", msg
)
7770 except Exception as e
:
7772 logging_text
+ "kafka_write notification Exception {}".format(e
)
7774 self
.logger
.debug(logging_text
+ "Exit")
7775 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7777 async def heal(self
, nsr_id
, nslcmop_id
):
7781 :param nsr_id: ns instance to heal
7782 :param nslcmop_id: operation to run
7786 # Try to lock HA task here
7787 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7788 if not task_is_locked_by_me
:
7791 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7792 stage
= ["", "", ""]
7793 tasks_dict_info
= {}
7794 # ^ stage, step, VIM progress
7795 self
.logger
.debug(logging_text
+ "Enter")
7796 # get all needed from database
7798 db_nslcmop_update
= {}
7800 db_vnfrs
= {} # vnf's info indexed by _id
7802 old_operational_status
= ""
7803 old_config_status
= ""
7806 # wait for any previous tasks in process
7807 step
= "Waiting for previous operations to terminate"
7808 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7809 self
._write
_ns
_status
(
7812 current_operation
="HEALING",
7813 current_operation_id
=nslcmop_id
,
7816 step
= "Getting nslcmop from database"
7818 step
+ " after having waited for previous tasks to be completed"
7820 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7822 step
= "Getting nsr from database"
7823 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7824 old_operational_status
= db_nsr
["operational-status"]
7825 old_config_status
= db_nsr
["config-status"]
7828 "_admin.deployed.RO.operational-status": "healing",
7830 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7832 step
= "Sending heal order to VIM"
7834 logging_text
=logging_text
,
7836 db_nslcmop
=db_nslcmop
,
7841 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7842 self
.logger
.debug(logging_text
+ stage
[1])
7843 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7844 self
.fs
.sync(db_nsr
["nsd-id"])
7846 # read from db: vnfr's of this ns
7847 step
= "Getting vnfrs from db"
7848 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7849 for vnfr
in db_vnfrs_list
:
7850 db_vnfrs
[vnfr
["_id"]] = vnfr
7851 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7853 # Check for each target VNF
7854 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7855 for target_vnf
in target_list
:
7856 # Find this VNF in the list from DB
7857 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7859 db_vnfr
= db_vnfrs
[vnfr_id
]
7860 vnfd_id
= db_vnfr
.get("vnfd-id")
7861 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7862 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7863 base_folder
= vnfd
["_admin"]["storage"]
7868 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7869 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7871 # Check each target VDU and deploy N2VC
7872 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7875 if not target_vdu_list
:
7876 # Codigo nuevo para crear diccionario
7877 target_vdu_list
= []
7878 for existing_vdu
in db_vnfr
.get("vdur"):
7879 vdu_name
= existing_vdu
.get("vdu-name", None)
7880 vdu_index
= existing_vdu
.get("count-index", 0)
7881 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7884 vdu_to_be_healed
= {
7886 "count-index": vdu_index
,
7887 "run-day1": vdu_run_day1
,
7889 target_vdu_list
.append(vdu_to_be_healed
)
7890 for target_vdu
in target_vdu_list
:
7891 deploy_params_vdu
= target_vdu
7892 # Set run-day1 vnf level value if not vdu level value exists
7893 if not deploy_params_vdu
.get("run-day1") and target_vnf
.get(
7894 "additionalParams", {}
7896 deploy_params_vdu
["run-day1"] = target_vnf
[
7899 vdu_name
= target_vdu
.get("vdu-id", None)
7900 # TODO: Get vdu_id from vdud.
7902 # For multi instance VDU count-index is mandatory
7903 # For single session VDU count-indes is 0
7904 vdu_index
= target_vdu
.get("count-index", 0)
7906 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7907 stage
[1] = "Deploying Execution Environments."
7908 self
.logger
.debug(logging_text
+ stage
[1])
7910 # VNF Level charm. Normal case when proxy charms.
7911 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7912 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7913 if descriptor_config
:
7914 # Continue if healed machine is management machine
7915 vnf_ip_address
= db_vnfr
.get("ip-address")
7916 target_instance
= None
7917 for instance
in db_vnfr
.get("vdur", None):
7919 instance
["vdu-name"] == vdu_name
7920 and instance
["count-index"] == vdu_index
7922 target_instance
= instance
7924 if vnf_ip_address
== target_instance
.get("ip-address"):
7926 logging_text
=logging_text
7927 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7928 member_vnf_index
, vdu_name
, vdu_index
7932 nslcmop_id
=nslcmop_id
,
7938 member_vnf_index
=member_vnf_index
,
7941 deploy_params
=deploy_params_vdu
,
7942 descriptor_config
=descriptor_config
,
7943 base_folder
=base_folder
,
7944 task_instantiation_info
=tasks_dict_info
,
7948 # VDU Level charm. Normal case with native charms.
7949 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7950 if descriptor_config
:
7952 logging_text
=logging_text
7953 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7954 member_vnf_index
, vdu_name
, vdu_index
7958 nslcmop_id
=nslcmop_id
,
7964 member_vnf_index
=member_vnf_index
,
7965 vdu_index
=vdu_index
,
7967 deploy_params
=deploy_params_vdu
,
7968 descriptor_config
=descriptor_config
,
7969 base_folder
=base_folder
,
7970 task_instantiation_info
=tasks_dict_info
,
7975 ROclient
.ROClientException
,
7980 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7982 except asyncio
.CancelledError
:
7984 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7986 exc
= "Operation was cancelled"
7987 except Exception as e
:
7988 exc
= traceback
.format_exc()
7989 self
.logger
.critical(
7990 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7995 stage
[1] = "Waiting for healing pending tasks."
7996 self
.logger
.debug(logging_text
+ stage
[1])
7997 exc
= await self
._wait
_for
_tasks
(
8000 self
.timeout
.ns_deploy
,
8008 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
8009 nslcmop_operation_state
= "FAILED"
8011 db_nsr_update
["operational-status"] = old_operational_status
8012 db_nsr_update
["config-status"] = old_config_status
8015 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
8016 for task
, task_name
in tasks_dict_info
.items():
8017 if not task
.done() or task
.cancelled() or task
.exception():
8018 if task_name
.startswith(self
.task_name_deploy_vca
):
8019 # A N2VC task is pending
8020 db_nsr_update
["config-status"] = "failed"
8022 # RO task is pending
8023 db_nsr_update
["operational-status"] = "failed"
8025 error_description_nslcmop
= None
8026 nslcmop_operation_state
= "COMPLETED"
8027 db_nslcmop_update
["detailed-status"] = "Done"
8028 db_nsr_update
["detailed-status"] = "Done"
8029 db_nsr_update
["operational-status"] = "running"
8030 db_nsr_update
["config-status"] = "configured"
8032 self
._write
_op
_status
(
8035 error_message
=error_description_nslcmop
,
8036 operation_state
=nslcmop_operation_state
,
8037 other_update
=db_nslcmop_update
,
8040 self
._write
_ns
_status
(
8043 current_operation
="IDLE",
8044 current_operation_id
=None,
8045 other_update
=db_nsr_update
,
8048 if nslcmop_operation_state
:
8052 "nslcmop_id": nslcmop_id
,
8053 "operationState": nslcmop_operation_state
,
8055 await self
.msg
.aiowrite("ns", "healed", msg
)
8056 except Exception as e
:
8058 logging_text
+ "kafka_write notification Exception {}".format(e
)
8060 self
.logger
.debug(logging_text
+ "Exit")
8061 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8072 :param logging_text: preffix text to use at logging
8073 :param nsr_id: nsr identity
8074 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8075 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8076 :return: None or exception
8079 def get_vim_account(vim_account_id
):
8081 if vim_account_id
in db_vims
:
8082 return db_vims
[vim_account_id
]
8083 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8084 db_vims
[vim_account_id
] = db_vim
8089 ns_params
= db_nslcmop
.get("operationParams")
8090 if ns_params
and ns_params
.get("timeout_ns_heal"):
8091 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8093 timeout_ns_heal
= self
.timeout
.ns_heal
8097 nslcmop_id
= db_nslcmop
["_id"]
8099 "action_id": nslcmop_id
,
8101 self
.logger
.warning(
8102 "db_nslcmop={} and timeout_ns_heal={}".format(
8103 db_nslcmop
, timeout_ns_heal
8106 target
.update(db_nslcmop
.get("operationParams", {}))
8108 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8109 desc
= await self
.RO
.recreate(nsr_id
, target
)
8110 self
.logger
.debug("RO return > {}".format(desc
))
8111 action_id
= desc
["action_id"]
8112 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8113 await self
._wait
_ng
_ro
(
8120 operation
="healing",
8125 "_admin.deployed.RO.operational-status": "running",
8126 "detailed-status": " ".join(stage
),
8128 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8129 self
._write
_op
_status
(nslcmop_id
, stage
)
8131 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8134 except Exception as e
:
8135 stage
[2] = "ERROR healing at VIM"
8136 # self.set_vnfr_at_error(db_vnfrs, str(e))
8138 "Error healing at VIM {}".format(e
),
8139 exc_info
=not isinstance(
8142 ROclient
.ROClientException
,
8168 task_instantiation_info
,
8171 # launch instantiate_N2VC in a asyncio task and register task object
8172 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8173 # if not found, create one entry and update database
8174 # fill db_nsr._admin.deployed.VCA.<index>
8177 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8181 get_charm_name
= False
8182 if "execution-environment-list" in descriptor_config
:
8183 ee_list
= descriptor_config
.get("execution-environment-list", [])
8184 elif "juju" in descriptor_config
:
8185 ee_list
= [descriptor_config
] # ns charms
8186 if "execution-environment-list" not in descriptor_config
:
8187 # charm name is only required for ns charms
8188 get_charm_name
= True
8189 else: # other types as script are not supported
8192 for ee_item
in ee_list
:
8195 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8196 ee_item
.get("juju"), ee_item
.get("helm-chart")
8199 ee_descriptor_id
= ee_item
.get("id")
8200 if ee_item
.get("juju"):
8201 vca_name
= ee_item
["juju"].get("charm")
8203 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8206 if ee_item
["juju"].get("charm") is not None
8209 if ee_item
["juju"].get("cloud") == "k8s":
8210 vca_type
= "k8s_proxy_charm"
8211 elif ee_item
["juju"].get("proxy") is False:
8212 vca_type
= "native_charm"
8213 elif ee_item
.get("helm-chart"):
8214 vca_name
= ee_item
["helm-chart"]
8215 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8218 vca_type
= "helm-v3"
8221 logging_text
+ "skipping non juju neither charm configuration"
8226 for vca_index
, vca_deployed
in enumerate(
8227 db_nsr
["_admin"]["deployed"]["VCA"]
8229 if not vca_deployed
:
8232 vca_deployed
.get("member-vnf-index") == member_vnf_index
8233 and vca_deployed
.get("vdu_id") == vdu_id
8234 and vca_deployed
.get("kdu_name") == kdu_name
8235 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8236 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8240 # not found, create one.
8242 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8245 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8247 target
+= "/kdu/{}".format(kdu_name
)
8249 "target_element": target
,
8250 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8251 "member-vnf-index": member_vnf_index
,
8253 "kdu_name": kdu_name
,
8254 "vdu_count_index": vdu_index
,
8255 "operational-status": "init", # TODO revise
8256 "detailed-status": "", # TODO revise
8257 "step": "initial-deploy", # TODO revise
8259 "vdu_name": vdu_name
,
8261 "ee_descriptor_id": ee_descriptor_id
,
8262 "charm_name": charm_name
,
8266 # create VCA and configurationStatus in db
8268 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8269 "configurationStatus.{}".format(vca_index
): dict(),
8271 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8273 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8275 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8276 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8277 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8280 task_n2vc
= asyncio
.ensure_future(
8282 logging_text
=logging_text
,
8283 vca_index
=vca_index
,
8289 vdu_index
=vdu_index
,
8290 deploy_params
=deploy_params
,
8291 config_descriptor
=descriptor_config
,
8292 base_folder
=base_folder
,
8293 nslcmop_id
=nslcmop_id
,
8297 ee_config_descriptor
=ee_item
,
8300 self
.lcm_tasks
.register(
8304 "instantiate_N2VC-{}".format(vca_index
),
8307 task_instantiation_info
[
8309 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8310 member_vnf_index
or "", vdu_id
or ""
8313 async def heal_N2VC(
8330 ee_config_descriptor
,
8332 nsr_id
= db_nsr
["_id"]
8333 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8334 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8335 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8336 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8338 "collection": "nsrs",
8339 "filter": {"_id": nsr_id
},
8340 "path": db_update_entry
,
8345 element_under_configuration
= nsr_id
8349 vnfr_id
= db_vnfr
["_id"]
8350 osm_config
["osm"]["vnf_id"] = vnfr_id
8352 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8354 if vca_type
== "native_charm":
8357 index_number
= vdu_index
or 0
8360 element_type
= "VNF"
8361 element_under_configuration
= vnfr_id
8362 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8364 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8365 element_type
= "VDU"
8366 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8367 osm_config
["osm"]["vdu_id"] = vdu_id
8369 namespace
+= ".{}".format(kdu_name
)
8370 element_type
= "KDU"
8371 element_under_configuration
= kdu_name
8372 osm_config
["osm"]["kdu_name"] = kdu_name
8375 if base_folder
["pkg-dir"]:
8376 artifact_path
= "{}/{}/{}/{}".format(
8377 base_folder
["folder"],
8378 base_folder
["pkg-dir"],
8381 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8386 artifact_path
= "{}/Scripts/{}/{}/".format(
8387 base_folder
["folder"],
8390 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8395 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8397 # get initial_config_primitive_list that applies to this element
8398 initial_config_primitive_list
= config_descriptor
.get(
8399 "initial-config-primitive"
8403 "Initial config primitive list > {}".format(
8404 initial_config_primitive_list
8408 # add config if not present for NS charm
8409 ee_descriptor_id
= ee_config_descriptor
.get("id")
8410 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8411 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8412 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8416 "Initial config primitive list #2 > {}".format(
8417 initial_config_primitive_list
8420 # n2vc_redesign STEP 3.1
8421 # find old ee_id if exists
8422 ee_id
= vca_deployed
.get("ee_id")
8424 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8425 # create or register execution environment in VCA. Only for native charms when healing
8426 if vca_type
== "native_charm":
8427 step
= "Waiting to VM being up and getting IP address"
8428 self
.logger
.debug(logging_text
+ step
)
8429 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8438 credentials
= {"hostname": rw_mgmt_ip
}
8440 username
= deep_get(
8441 config_descriptor
, ("config-access", "ssh-access", "default-user")
8443 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8444 # merged. Meanwhile let's get username from initial-config-primitive
8445 if not username
and initial_config_primitive_list
:
8446 for config_primitive
in initial_config_primitive_list
:
8447 for param
in config_primitive
.get("parameter", ()):
8448 if param
["name"] == "ssh-username":
8449 username
= param
["value"]
8453 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8454 "'config-access.ssh-access.default-user'"
8456 credentials
["username"] = username
8458 # n2vc_redesign STEP 3.2
8459 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8460 self
._write
_configuration
_status
(
8462 vca_index
=vca_index
,
8463 status
="REGISTERING",
8464 element_under_configuration
=element_under_configuration
,
8465 element_type
=element_type
,
8468 step
= "register execution environment {}".format(credentials
)
8469 self
.logger
.debug(logging_text
+ step
)
8470 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8471 credentials
=credentials
,
8472 namespace
=namespace
,
8477 # update ee_id en db
8479 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8481 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8483 # for compatibility with MON/POL modules, the need model and application name at database
8484 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8485 # Not sure if this need to be done when healing
8487 ee_id_parts = ee_id.split(".")
8488 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8489 if len(ee_id_parts) >= 2:
8490 model_name = ee_id_parts[0]
8491 application_name = ee_id_parts[1]
8492 db_nsr_update[db_update_entry + "model"] = model_name
8493 db_nsr_update[db_update_entry + "application"] = application_name
8496 # n2vc_redesign STEP 3.3
8497 # Install configuration software. Only for native charms.
8498 step
= "Install configuration Software"
8500 self
._write
_configuration
_status
(
8502 vca_index
=vca_index
,
8503 status
="INSTALLING SW",
8504 element_under_configuration
=element_under_configuration
,
8505 element_type
=element_type
,
8506 # other_update=db_nsr_update,
8510 # TODO check if already done
8511 self
.logger
.debug(logging_text
+ step
)
8513 if vca_type
== "native_charm":
8514 config_primitive
= next(
8515 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8518 if config_primitive
:
8519 config
= self
._map
_primitive
_params
(
8520 config_primitive
, {}, deploy_params
8522 await self
.vca_map
[vca_type
].install_configuration_sw(
8524 artifact_path
=artifact_path
,
8532 # write in db flag of configuration_sw already installed
8534 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8537 # Not sure if this need to be done when healing
8539 # add relations for this VCA (wait for other peers related with this VCA)
8540 await self._add_vca_relations(
8541 logging_text=logging_text,
8544 vca_index=vca_index,
8548 # if SSH access is required, then get execution environment SSH public
8549 # if native charm we have waited already to VM be UP
8550 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8553 # self.logger.debug("get ssh key block")
8555 config_descriptor
, ("config-access", "ssh-access", "required")
8557 # self.logger.debug("ssh key needed")
8558 # Needed to inject a ssh key
8561 ("config-access", "ssh-access", "default-user"),
8563 step
= "Install configuration Software, getting public ssh key"
8564 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8565 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8568 step
= "Insert public key into VM user={} ssh_key={}".format(
8572 # self.logger.debug("no need to get ssh key")
8573 step
= "Waiting to VM being up and getting IP address"
8574 self
.logger
.debug(logging_text
+ step
)
8576 # n2vc_redesign STEP 5.1
8577 # wait for RO (ip-address) Insert pub_key into VM
8578 # IMPORTANT: We need do wait for RO to complete healing operation.
8579 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8582 rw_mgmt_ip
= await self
.wait_kdu_up(
8583 logging_text
, nsr_id
, vnfr_id
, kdu_name
8586 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8596 rw_mgmt_ip
= None # This is for a NS configuration
8598 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8600 # store rw_mgmt_ip in deploy params for later replacement
8601 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8604 # get run-day1 operation parameter
8605 runDay1
= deploy_params
.get("run-day1", False)
8607 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8610 # n2vc_redesign STEP 6 Execute initial config primitive
8611 step
= "execute initial config primitive"
8613 # wait for dependent primitives execution (NS -> VNF -> VDU)
8614 if initial_config_primitive_list
:
8615 await self
._wait
_dependent
_n
2vc
(
8616 nsr_id
, vca_deployed_list
, vca_index
8619 # stage, in function of element type: vdu, kdu, vnf or ns
8620 my_vca
= vca_deployed_list
[vca_index
]
8621 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8623 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8624 elif my_vca
.get("member-vnf-index"):
8626 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8629 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8631 self
._write
_configuration
_status
(
8632 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8635 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8637 check_if_terminated_needed
= True
8638 for initial_config_primitive
in initial_config_primitive_list
:
8639 # adding information on the vca_deployed if it is a NS execution environment
8640 if not vca_deployed
["member-vnf-index"]:
8641 deploy_params
["ns_config_info"] = json
.dumps(
8642 self
._get
_ns
_config
_info
(nsr_id
)
8644 # TODO check if already done
8645 primitive_params_
= self
._map
_primitive
_params
(
8646 initial_config_primitive
, {}, deploy_params
8649 step
= "execute primitive '{}' params '{}'".format(
8650 initial_config_primitive
["name"], primitive_params_
8652 self
.logger
.debug(logging_text
+ step
)
8653 await self
.vca_map
[vca_type
].exec_primitive(
8655 primitive_name
=initial_config_primitive
["name"],
8656 params_dict
=primitive_params_
,
8661 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8662 if check_if_terminated_needed
:
8663 if config_descriptor
.get("terminate-config-primitive"):
8667 {db_update_entry
+ "needed_terminate": True},
8669 check_if_terminated_needed
= False
8671 # TODO register in database that primitive is done
8673 # STEP 7 Configure metrics
8674 # Not sure if this need to be done when healing
8676 if vca_type == "helm" or vca_type == "helm-v3":
8677 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8679 artifact_path=artifact_path,
8680 ee_config_descriptor=ee_config_descriptor,
8683 target_ip=rw_mgmt_ip,
8689 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8692 for job in prometheus_jobs:
8695 {"job_name": job["job_name"]},
8698 fail_on_empty=False,
8702 step
= "instantiated at VCA"
8703 self
.logger
.debug(logging_text
+ step
)
8705 self
._write
_configuration
_status
(
8706 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8709 except Exception as e
: # TODO not use Exception but N2VC exception
8710 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8712 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8715 "Exception while {} : {}".format(step
, e
), exc_info
=True
8717 self
._write
_configuration
_status
(
8718 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8720 raise LcmException("{} {}".format(step
, e
)) from e
8722 async def _wait_heal_ro(
8728 while time() <= start_time
+ timeout
:
8729 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8730 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8731 "operational-status"
8733 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8734 if operational_status_ro
!= "healing":
8736 await asyncio
.sleep(15)
8737 else: # timeout_ns_deploy
8738 raise NgRoException("Timeout waiting ns to deploy")
8740 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8742 Vertical Scale the VDUs in a NS
8744 :param: nsr_id: NS Instance ID
8745 :param: nslcmop_id: nslcmop ID of migrate
8748 # Try to lock HA task here
8749 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8750 if not task_is_locked_by_me
:
8752 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8753 self
.logger
.debug(logging_text
+ "Enter")
8754 # get all needed from database
8756 db_nslcmop_update
= {}
8757 nslcmop_operation_state
= None
8761 # in case of error, indicates what part of scale was failed to put nsr at error status
8762 start_deploy
= time()
8765 # wait for any previous tasks in process
8766 step
= "Waiting for previous operations to terminate"
8767 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8769 self
._write
_ns
_status
(
8772 current_operation
="VerticalScale",
8773 current_operation_id
=nslcmop_id
,
8775 step
= "Getting nslcmop from database"
8777 step
+ " after having waited for previous tasks to be completed"
8779 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8780 operationParams
= db_nslcmop
.get("operationParams")
8782 target
.update(operationParams
)
8783 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8784 self
.logger
.debug("RO return > {}".format(desc
))
8785 action_id
= desc
["action_id"]
8786 await self
._wait
_ng
_ro
(
8791 self
.timeout
.verticalscale
,
8792 operation
="verticalscale",
8794 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8795 self
.logger
.error("Exit Exception {}".format(e
))
8797 except asyncio
.CancelledError
:
8798 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8799 exc
= "Operation was cancelled"
8800 except Exception as e
:
8801 exc
= traceback
.format_exc()
8802 self
.logger
.critical(
8803 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8806 self
._write
_ns
_status
(
8809 current_operation
="IDLE",
8810 current_operation_id
=None,
8813 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8814 nslcmop_operation_state
= "FAILED"
8816 nslcmop_operation_state
= "COMPLETED"
8817 db_nslcmop_update
["detailed-status"] = "Done"
8818 db_nsr_update
["detailed-status"] = "Done"
8820 self
._write
_op
_status
(
8824 operation_state
=nslcmop_operation_state
,
8825 other_update
=db_nslcmop_update
,
8827 if nslcmop_operation_state
:
8831 "nslcmop_id": nslcmop_id
,
8832 "operationState": nslcmop_operation_state
,
8834 await self
.msg
.aiowrite("ns", "verticalscaled", msg
)
8835 except Exception as e
:
8837 logging_text
+ "kafka_write notification Exception {}".format(e
)
8839 self
.logger
.debug(logging_text
+ "Exit")
8840 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")