1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import randint
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
149 self
.lcm_tasks
= lcm_tasks
150 self
.timeout
= config
.timeout
151 self
.ro_config
= config
.RO
152 self
.vca_config
= config
.VCA
154 # create N2VC connector
155 self
.n2vc
= N2VCJujuConnector(
158 on_update_db
=self
._on
_update
_n
2vc
_db
,
163 self
.conn_helm_ee
= LCMHelmConn(
166 vca_config
=self
.vca_config
,
167 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.k8sclusterhelm2
= K8sHelmConnector(
171 kubectl_command
=self
.vca_config
.kubectlpath
,
172 helm_command
=self
.vca_config
.helmpath
,
179 self
.k8sclusterhelm3
= K8sHelm3Connector(
180 kubectl_command
=self
.vca_config
.kubectlpath
,
181 helm_command
=self
.vca_config
.helm3path
,
188 self
.k8sclusterjuju
= K8sJujuConnector(
189 kubectl_command
=self
.vca_config
.kubectlpath
,
190 juju_command
=self
.vca_config
.jujupath
,
193 on_update_db
=self
._on
_update
_k
8s
_db
,
198 self
.k8scluster_map
= {
199 "helm-chart": self
.k8sclusterhelm2
,
200 "helm-chart-v3": self
.k8sclusterhelm3
,
201 "chart": self
.k8sclusterhelm3
,
202 "juju-bundle": self
.k8sclusterjuju
,
203 "juju": self
.k8sclusterjuju
,
207 "lxc_proxy_charm": self
.n2vc
,
208 "native_charm": self
.n2vc
,
209 "k8s_proxy_charm": self
.n2vc
,
210 "helm": self
.conn_helm_ee
,
211 "helm-v3": self
.conn_helm_ee
,
215 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
217 self
.op_status_map
= {
218 "instantiation": self
.RO
.status
,
219 "termination": self
.RO
.status
,
220 "migrate": self
.RO
.status
,
221 "healing": self
.RO
.recreate_status
,
222 "verticalscale": self
.RO
.status
,
223 "start_stop_rebuild": self
.RO
.status
,
227 def increment_ip_mac(ip_mac
, vm_index
=1):
228 if not isinstance(ip_mac
, str):
231 # try with ipv4 look for last dot
232 i
= ip_mac
.rfind(".")
235 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
236 # try with ipv6 or mac look for last colon. Operate in hex
237 i
= ip_mac
.rfind(":")
240 # format in hex, len can be 2 for mac or 4 for ipv6
241 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
242 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
248 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
266 # remove last dot from path (if exists)
267 if path
.endswith("."):
270 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
271 # .format(table, filter, path, updated_data))
273 nsr_id
= filter.get("_id")
275 # read ns record from database
276 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
277 current_ns_status
= nsr
.get("nsState")
279 # get vca status for NS
280 status_dict
= await self
.n2vc
.get_status(
281 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
286 db_dict
["vcaStatus"] = status_dict
288 # update configurationStatus for this VCA
290 vca_index
= int(path
[path
.rfind(".") + 1 :])
293 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
295 vca_status
= vca_list
[vca_index
].get("status")
297 configuration_status_list
= nsr
.get("configurationStatus")
298 config_status
= configuration_status_list
[vca_index
].get("status")
300 if config_status
== "BROKEN" and vca_status
!= "failed":
301 db_dict
["configurationStatus"][vca_index
] = "READY"
302 elif config_status
!= "BROKEN" and vca_status
== "failed":
303 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
304 except Exception as e
:
305 # not update configurationStatus
306 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
308 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
309 # if nsState = 'DEGRADED' check if all is OK
311 if current_ns_status
in ("READY", "DEGRADED"):
312 error_description
= ""
314 if status_dict
.get("machines"):
315 for machine_id
in status_dict
.get("machines"):
316 machine
= status_dict
.get("machines").get(machine_id
)
317 # check machine agent-status
318 if machine
.get("agent-status"):
319 s
= machine
.get("agent-status").get("status")
322 error_description
+= (
323 "machine {} agent-status={} ; ".format(
327 # check machine instance status
328 if machine
.get("instance-status"):
329 s
= machine
.get("instance-status").get("status")
332 error_description
+= (
333 "machine {} instance-status={} ; ".format(
338 if status_dict
.get("applications"):
339 for app_id
in status_dict
.get("applications"):
340 app
= status_dict
.get("applications").get(app_id
)
341 # check application status
342 if app
.get("status"):
343 s
= app
.get("status").get("status")
346 error_description
+= (
347 "application {} status={} ; ".format(app_id
, s
)
350 if error_description
:
351 db_dict
["errorDescription"] = error_description
352 if current_ns_status
== "READY" and is_degraded
:
353 db_dict
["nsState"] = "DEGRADED"
354 if current_ns_status
== "DEGRADED" and not is_degraded
:
355 db_dict
["nsState"] = "READY"
358 self
.update_db_2("nsrs", nsr_id
, db_dict
)
360 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
362 except Exception as e
:
363 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
365 async def _on_update_k8s_db(
366 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
369 Updating vca status in NSR record
370 :param cluster_uuid: UUID of a k8s cluster
371 :param kdu_instance: The unique name of the KDU instance
372 :param filter: To get nsr_id
373 :cluster_type: The cluster type (juju, k8s)
377 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
378 # .format(cluster_uuid, kdu_instance, filter))
380 nsr_id
= filter.get("_id")
382 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
383 cluster_uuid
=cluster_uuid
,
384 kdu_instance
=kdu_instance
,
386 complete_status
=True,
392 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
395 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
399 self
.update_db_2("nsrs", nsr_id
, db_dict
)
400 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
402 except Exception as e
:
403 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
406 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
409 undefined
=StrictUndefined
,
410 autoescape
=select_autoescape(default_for_string
=True, default
=True),
412 template
= env
.from_string(cloud_init_text
)
413 return template
.render(additional_params
or {})
414 except UndefinedError
as e
:
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
420 except (TemplateError
, TemplateNotFound
) as e
:
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
427 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
428 cloud_init_content
= cloud_init_file
= None
430 if vdu
.get("cloud-init-file"):
431 base_folder
= vnfd
["_admin"]["storage"]
432 if base_folder
["pkg-dir"]:
433 cloud_init_file
= "{}/{}/cloud_init/{}".format(
434 base_folder
["folder"],
435 base_folder
["pkg-dir"],
436 vdu
["cloud-init-file"],
439 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
440 base_folder
["folder"],
441 vdu
["cloud-init-file"],
443 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
444 cloud_init_content
= ci_file
.read()
445 elif vdu
.get("cloud-init"):
446 cloud_init_content
= vdu
["cloud-init"]
448 return cloud_init_content
449 except FsException
as e
:
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd
["id"], vdu
["id"], cloud_init_file
, e
456 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
458 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
460 additional_params
= vdur
.get("additionalParams")
461 return parse_yaml_strings(additional_params
)
463 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
465 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
466 :param vnfd: input vnfd
467 :param new_id: overrides vnf id if provided
468 :param additionalParams: Instantiation params for VNFs provided
469 :param nsrId: Id of the NSR
470 :return: copy of vnfd
472 vnfd_RO
= deepcopy(vnfd
)
473 # remove unused by RO configuration, monitoring, scaling and internal keys
474 vnfd_RO
.pop("_id", None)
475 vnfd_RO
.pop("_admin", None)
476 vnfd_RO
.pop("monitoring-param", None)
477 vnfd_RO
.pop("scaling-group-descriptor", None)
478 vnfd_RO
.pop("kdu", None)
479 vnfd_RO
.pop("k8s-cluster", None)
481 vnfd_RO
["id"] = new_id
483 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
484 for vdu
in get_iterable(vnfd_RO
, "vdu"):
485 vdu
.pop("cloud-init-file", None)
486 vdu
.pop("cloud-init", None)
490 def ip_profile_2_RO(ip_profile
):
491 RO_ip_profile
= deepcopy(ip_profile
)
492 if "dns-server" in RO_ip_profile
:
493 if isinstance(RO_ip_profile
["dns-server"], list):
494 RO_ip_profile
["dns-address"] = []
495 for ds
in RO_ip_profile
.pop("dns-server"):
496 RO_ip_profile
["dns-address"].append(ds
["address"])
498 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
499 if RO_ip_profile
.get("ip-version") == "ipv4":
500 RO_ip_profile
["ip-version"] = "IPv4"
501 if RO_ip_profile
.get("ip-version") == "ipv6":
502 RO_ip_profile
["ip-version"] = "IPv6"
503 if "dhcp-params" in RO_ip_profile
:
504 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
507 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
508 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
509 if db_vim
["_admin"]["operationalState"] != "ENABLED":
511 "VIM={} is not available. operationalState={}".format(
512 vim_account
, db_vim
["_admin"]["operationalState"]
515 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
518 def get_ro_wim_id_for_wim_account(self
, wim_account
):
519 if isinstance(wim_account
, str):
520 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
521 if db_wim
["_admin"]["operationalState"] != "ENABLED":
523 "WIM={} is not available. operationalState={}".format(
524 wim_account
, db_wim
["_admin"]["operationalState"]
527 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
532 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
533 db_vdu_push_list
= []
535 db_update
= {"_admin.modified": time()}
537 for vdu_id
, vdu_count
in vdu_create
.items():
541 for vdur
in reversed(db_vnfr
["vdur"])
542 if vdur
["vdu-id-ref"] == vdu_id
547 # Read the template saved in the db:
549 "No vdur in the database. Using the vdur-template to scale"
551 vdur_template
= db_vnfr
.get("vdur-template")
552 if not vdur_template
:
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
558 vdur
= vdur_template
[0]
559 # Delete a template from the database after using it
562 {"_id": db_vnfr
["_id"]},
564 pull
={"vdur-template": {"_id": vdur
["_id"]}},
566 for count
in range(vdu_count
):
567 vdur_copy
= deepcopy(vdur
)
568 vdur_copy
["status"] = "BUILD"
569 vdur_copy
["status-detailed"] = None
570 vdur_copy
["ip-address"] = None
571 vdur_copy
["_id"] = str(uuid4())
572 vdur_copy
["count-index"] += count
+ 1
573 vdur_copy
["id"] = "{}-{}".format(
574 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
576 vdur_copy
.pop("vim_info", None)
577 for iface
in vdur_copy
["interfaces"]:
578 if iface
.get("fixed-ip"):
579 iface
["ip-address"] = self
.increment_ip_mac(
580 iface
["ip-address"], count
+ 1
583 iface
.pop("ip-address", None)
584 if iface
.get("fixed-mac"):
585 iface
["mac-address"] = self
.increment_ip_mac(
586 iface
["mac-address"], count
+ 1
589 iface
.pop("mac-address", None)
593 ) # only first vdu can be managment of vnf
594 db_vdu_push_list
.append(vdur_copy
)
595 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
597 if len(db_vnfr
["vdur"]) == 1:
598 # The scale will move to 0 instances
600 "Scaling to 0 !, creating the template with the last vdur"
602 template_vdur
= [db_vnfr
["vdur"][0]]
603 for vdu_id
, vdu_count
in vdu_delete
.items():
605 indexes_to_delete
= [
607 for iv
in enumerate(db_vnfr
["vdur"])
608 if iv
[1]["vdu-id-ref"] == vdu_id
612 "vdur.{}.status".format(i
): "DELETING"
613 for i
in indexes_to_delete
[-vdu_count
:]
617 # it must be deleted one by one because common.db does not allow otherwise
620 for v
in reversed(db_vnfr
["vdur"])
621 if v
["vdu-id-ref"] == vdu_id
623 for vdu
in vdus_to_delete
[:vdu_count
]:
626 {"_id": db_vnfr
["_id"]},
628 pull
={"vdur": {"_id": vdu
["_id"]}},
632 db_push
["vdur"] = db_vdu_push_list
634 db_push
["vdur-template"] = template_vdur
637 db_vnfr
["vdur-template"] = template_vdur
638 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
639 # modify passed dictionary db_vnfr
640 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
641 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
643 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
645 Updates database nsr with the RO info for the created vld
646 :param ns_update_nsr: dictionary to be filled with the updated info
647 :param db_nsr: content of db_nsr. This is also modified
648 :param nsr_desc_RO: nsr descriptor from RO
649 :return: Nothing, LcmException is raised on errors
652 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
653 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
654 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
656 vld
["vim-id"] = net_RO
.get("vim_net_id")
657 vld
["name"] = net_RO
.get("vim_name")
658 vld
["status"] = net_RO
.get("status")
659 vld
["status-detailed"] = net_RO
.get("error_msg")
660 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
664 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
667 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
669 for db_vnfr
in db_vnfrs
.values():
670 vnfr_update
= {"status": "ERROR"}
671 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
672 if "status" not in vdur
:
673 vdur
["status"] = "ERROR"
674 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
676 vdur
["status-detailed"] = str(error_text
)
678 "vdur.{}.status-detailed".format(vdu_index
)
680 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
681 except DbException
as e
:
682 self
.logger
.error("Cannot update vnf. {}".format(e
))
684 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
686 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
687 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
688 :param nsr_desc_RO: nsr descriptor from RO
689 :return: Nothing, LcmException is raised on errors
691 for vnf_index
, db_vnfr
in db_vnfrs
.items():
692 for vnf_RO
in nsr_desc_RO
["vnfs"]:
693 if vnf_RO
["member_vnf_index"] != vnf_index
:
696 if vnf_RO
.get("ip_address"):
697 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
700 elif not db_vnfr
.get("ip-address"):
701 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
702 raise LcmExceptionNoMgmtIP(
703 "ns member_vnf_index '{}' has no IP address".format(
708 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
709 vdur_RO_count_index
= 0
710 if vdur
.get("pdu-type"):
712 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
713 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
715 if vdur
["count-index"] != vdur_RO_count_index
:
716 vdur_RO_count_index
+= 1
718 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
719 if vdur_RO
.get("ip_address"):
720 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
722 vdur
["ip-address"] = None
723 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
724 vdur
["name"] = vdur_RO
.get("vim_name")
725 vdur
["status"] = vdur_RO
.get("status")
726 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
727 for ifacer
in get_iterable(vdur
, "interfaces"):
728 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
729 if ifacer
["name"] == interface_RO
.get("internal_name"):
730 ifacer
["ip-address"] = interface_RO
.get(
733 ifacer
["mac-address"] = interface_RO
.get(
739 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
740 "from VIM info".format(
741 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
744 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
748 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
750 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
754 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
755 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
756 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
758 vld
["vim-id"] = net_RO
.get("vim_net_id")
759 vld
["name"] = net_RO
.get("vim_name")
760 vld
["status"] = net_RO
.get("status")
761 vld
["status-detailed"] = net_RO
.get("error_msg")
762 vnfr_update
["vld.{}".format(vld_index
)] = vld
766 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
771 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
776 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
781 def _get_ns_config_info(self
, nsr_id
):
783 Generates a mapping between vnf,vdu elements and the N2VC id
784 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
785 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
786 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
787 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
789 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
790 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
792 ns_config_info
= {"osm-config-mapping": mapping
}
793 for vca
in vca_deployed_list
:
794 if not vca
["member-vnf-index"]:
796 if not vca
["vdu_id"]:
797 mapping
[vca
["member-vnf-index"]] = vca
["application"]
801 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
803 ] = vca
["application"]
804 return ns_config_info
806 async def _instantiate_ng_ro(
822 def get_vim_account(vim_account_id
):
824 if vim_account_id
in db_vims
:
825 return db_vims
[vim_account_id
]
826 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
827 db_vims
[vim_account_id
] = db_vim
830 # modify target_vld info with instantiation parameters
831 def parse_vld_instantiation_params(
832 target_vim
, target_vld
, vld_params
, target_sdn
834 if vld_params
.get("ip-profile"):
835 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
836 vld_params
["ip-profile"]
838 if vld_params
.get("provider-network"):
839 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
842 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
843 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
847 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
848 # if wim_account_id is specified in vld_params, validate if it is feasible.
849 wim_account_id
, db_wim
= select_feasible_wim_account(
850 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
854 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
855 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
856 # update vld_params with correct WIM account Id
857 vld_params
["wimAccountId"] = wim_account_id
859 target_wim
= "wim:{}".format(wim_account_id
)
860 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
861 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
862 if len(sdn_ports
) > 0:
863 target_vld
["vim_info"][target_wim
] = target_wim_attrs
864 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
867 "Target VLD with WIM data: {:s}".format(str(target_vld
))
870 for param
in ("vim-network-name", "vim-network-id"):
871 if vld_params
.get(param
):
872 if isinstance(vld_params
[param
], dict):
873 for vim
, vim_net
in vld_params
[param
].items():
874 other_target_vim
= "vim:" + vim
876 target_vld
["vim_info"],
877 (other_target_vim
, param
.replace("-", "_")),
880 else: # isinstance str
881 target_vld
["vim_info"][target_vim
][
882 param
.replace("-", "_")
883 ] = vld_params
[param
]
884 if vld_params
.get("common_id"):
885 target_vld
["common_id"] = vld_params
.get("common_id")
887 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
888 def update_ns_vld_target(target
, ns_params
):
889 for vnf_params
in ns_params
.get("vnf", ()):
890 if vnf_params
.get("vimAccountId"):
894 for vnfr
in db_vnfrs
.values()
895 if vnf_params
["member-vnf-index"]
896 == vnfr
["member-vnf-index-ref"]
900 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
903 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
904 target_vld
= find_in_list(
905 get_iterable(vdur
, "interfaces"),
906 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
909 vld_params
= find_in_list(
910 get_iterable(ns_params
, "vld"),
911 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
914 if vnf_params
.get("vimAccountId") not in a_vld
.get(
917 target_vim_network_list
= [
918 v
for _
, v
in a_vld
.get("vim_info").items()
920 target_vim_network_name
= next(
922 item
.get("vim_network_name", "")
923 for item
in target_vim_network_list
928 target
["ns"]["vld"][a_index
].get("vim_info").update(
930 "vim:{}".format(vnf_params
["vimAccountId"]): {
931 "vim_network_name": target_vim_network_name
,
937 for param
in ("vim-network-name", "vim-network-id"):
938 if vld_params
.get(param
) and isinstance(
939 vld_params
[param
], dict
941 for vim
, vim_net
in vld_params
[
944 other_target_vim
= "vim:" + vim
946 target
["ns"]["vld"][a_index
].get(
951 param
.replace("-", "_"),
956 nslcmop_id
= db_nslcmop
["_id"]
958 "name": db_nsr
["name"],
961 "image": deepcopy(db_nsr
["image"]),
962 "flavor": deepcopy(db_nsr
["flavor"]),
963 "action_id": nslcmop_id
,
964 "cloud_init_content": {},
966 for image
in target
["image"]:
967 image
["vim_info"] = {}
968 for flavor
in target
["flavor"]:
969 flavor
["vim_info"] = {}
970 if db_nsr
.get("affinity-or-anti-affinity-group"):
971 target
["affinity-or-anti-affinity-group"] = deepcopy(
972 db_nsr
["affinity-or-anti-affinity-group"]
974 for affinity_or_anti_affinity_group
in target
[
975 "affinity-or-anti-affinity-group"
977 affinity_or_anti_affinity_group
["vim_info"] = {}
979 if db_nslcmop
.get("lcmOperationType") != "instantiate":
980 # get parameters of instantiation:
981 db_nslcmop_instantiate
= self
.db
.get_list(
984 "nsInstanceId": db_nslcmop
["nsInstanceId"],
985 "lcmOperationType": "instantiate",
988 ns_params
= db_nslcmop_instantiate
.get("operationParams")
990 ns_params
= db_nslcmop
.get("operationParams")
991 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
992 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
995 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
996 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1000 "mgmt-network": vld
.get("mgmt-network", False),
1001 "type": vld
.get("type"),
1004 "vim_network_name": vld
.get("vim-network-name"),
1005 "vim_account_id": ns_params
["vimAccountId"],
1009 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1012 if vim_config
:= db_vim
.get("config"):
1013 if sdnc_id
:= vim_config
.get("sdn-controller"):
1014 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 target_vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1024 for nsd_vnf_profile
in nsd_vnf_profiles
:
1025 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1026 if cp
["virtual-link-profile-id"] == vld
["id"]:
1028 "member_vnf:{}.{}".format(
1029 cp
["constituent-cpd-id"][0][
1030 "constituent-base-element-id"
1032 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1034 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1036 # check at nsd descriptor, if there is an ip-profile
1038 nsd_vlp
= find_in_list(
1039 get_virtual_link_profiles(nsd
),
1040 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1045 and nsd_vlp
.get("virtual-link-protocol-data")
1046 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1048 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1052 # update vld_params with instantiation params
1053 vld_instantiation_params
= find_in_list(
1054 get_iterable(ns_params
, "vld"),
1055 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1057 if vld_instantiation_params
:
1058 vld_params
.update(vld_instantiation_params
)
1059 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1060 target
["ns"]["vld"].append(target_vld
)
1061 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1062 update_ns_vld_target(target
, ns_params
)
1064 for vnfr
in db_vnfrs
.values():
1065 vnfd
= find_in_list(
1066 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1068 vnf_params
= find_in_list(
1069 get_iterable(ns_params
, "vnf"),
1070 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1072 target_vnf
= deepcopy(vnfr
)
1073 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1074 for vld
in target_vnf
.get("vld", ()):
1075 # check if connected to a ns.vld, to fill target'
1076 vnf_cp
= find_in_list(
1077 vnfd
.get("int-virtual-link-desc", ()),
1078 lambda cpd
: cpd
.get("id") == vld
["id"],
1081 ns_cp
= "member_vnf:{}.{}".format(
1082 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1084 if cp2target
.get(ns_cp
):
1085 vld
["target"] = cp2target
[ns_cp
]
1088 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1090 # check if this network needs SDN assist
1092 if vld
.get("pci-interfaces"):
1093 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1094 sdnc_id
= db_vim
["config"].get("sdn-controller")
1096 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1097 target_sdn
= "sdn:{}".format(sdnc_id
)
1098 vld
["vim_info"][target_sdn
] = {
1100 "target_vim": target_vim
,
1102 "type": vld
.get("type"),
1105 # check at vnfd descriptor, if there is an ip-profile
1107 vnfd_vlp
= find_in_list(
1108 get_virtual_link_profiles(vnfd
),
1109 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1113 and vnfd_vlp
.get("virtual-link-protocol-data")
1114 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1116 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1119 # update vld_params with instantiation params
1121 vld_instantiation_params
= find_in_list(
1122 get_iterable(vnf_params
, "internal-vld"),
1123 lambda i_vld
: i_vld
["name"] == vld
["id"],
1125 if vld_instantiation_params
:
1126 vld_params
.update(vld_instantiation_params
)
1127 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1130 for vdur
in target_vnf
.get("vdur", ()):
1131 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1132 continue # This vdu must not be created
1133 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1135 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1138 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1139 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1142 and vdu_configuration
.get("config-access")
1143 and vdu_configuration
.get("config-access").get("ssh-access")
1145 vdur
["ssh-keys"] = ssh_keys_all
1146 vdur
["ssh-access-required"] = vdu_configuration
[
1148 ]["ssh-access"]["required"]
1151 and vnf_configuration
.get("config-access")
1152 and vnf_configuration
.get("config-access").get("ssh-access")
1153 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1155 vdur
["ssh-keys"] = ssh_keys_all
1156 vdur
["ssh-access-required"] = vnf_configuration
[
1158 ]["ssh-access"]["required"]
1159 elif ssh_keys_instantiation
and find_in_list(
1160 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1162 vdur
["ssh-keys"] = ssh_keys_instantiation
1164 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1166 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1168 if vdud
.get("cloud-init-file"):
1169 vdur
["cloud-init"] = "{}:file:{}".format(
1170 vnfd
["_id"], vdud
.get("cloud-init-file")
1172 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1173 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1174 base_folder
= vnfd
["_admin"]["storage"]
1175 if base_folder
["pkg-dir"]:
1176 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1177 base_folder
["folder"],
1178 base_folder
["pkg-dir"],
1179 vdud
.get("cloud-init-file"),
1182 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1183 base_folder
["folder"],
1184 vdud
.get("cloud-init-file"),
1186 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1187 target
["cloud_init_content"][
1190 elif vdud
.get("cloud-init"):
1191 vdur
["cloud-init"] = "{}:vdu:{}".format(
1192 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1194 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1195 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1198 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1199 deploy_params_vdu
= self
._format
_additional
_params
(
1200 vdur
.get("additionalParams") or {}
1202 deploy_params_vdu
["OSM"] = get_osm_params(
1203 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1205 vdur
["additionalParams"] = deploy_params_vdu
1208 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1209 if target_vim
not in ns_flavor
["vim_info"]:
1210 ns_flavor
["vim_info"][target_vim
] = {}
1213 # in case alternative images are provided we must check if they should be applied
1214 # for the vim_type, modify the vim_type taking into account
1215 ns_image_id
= int(vdur
["ns-image-id"])
1216 if vdur
.get("alt-image-ids"):
1217 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1218 vim_type
= db_vim
["vim_type"]
1219 for alt_image_id
in vdur
.get("alt-image-ids"):
1220 ns_alt_image
= target
["image"][int(alt_image_id
)]
1221 if vim_type
== ns_alt_image
.get("vim-type"):
1222 # must use alternative image
1224 "use alternative image id: {}".format(alt_image_id
)
1226 ns_image_id
= alt_image_id
1227 vdur
["ns-image-id"] = ns_image_id
1229 ns_image
= target
["image"][int(ns_image_id
)]
1230 if target_vim
not in ns_image
["vim_info"]:
1231 ns_image
["vim_info"][target_vim
] = {}
1234 if vdur
.get("affinity-or-anti-affinity-group-id"):
1235 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1236 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1237 if target_vim
not in ns_ags
["vim_info"]:
1238 ns_ags
["vim_info"][target_vim
] = {}
1240 vdur
["vim_info"] = {target_vim
: {}}
1241 # instantiation parameters
1243 vdu_instantiation_params
= find_in_list(
1244 get_iterable(vnf_params
, "vdu"),
1245 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1247 if vdu_instantiation_params
:
1248 # Parse the vdu_volumes from the instantiation params
1249 vdu_volumes
= get_volumes_from_instantiation_params(
1250 vdu_instantiation_params
, vdud
1252 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1253 vdur
["additionalParams"]["OSM"][
1255 ] = vdu_instantiation_params
.get("vim-flavor-id")
1256 vdur_list
.append(vdur
)
1257 target_vnf
["vdur"] = vdur_list
1258 target
["vnf"].append(target_vnf
)
1260 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1261 desc
= await self
.RO
.deploy(nsr_id
, target
)
1262 self
.logger
.debug("RO return > {}".format(desc
))
1263 action_id
= desc
["action_id"]
1264 await self
._wait
_ng
_ro
(
1271 operation
="instantiation",
1276 "_admin.deployed.RO.operational-status": "running",
1277 "detailed-status": " ".join(stage
),
1279 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1280 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1281 self
._write
_op
_status
(nslcmop_id
, stage
)
1283 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1287 async def _wait_ng_ro(
1297 detailed_status_old
= None
1299 start_time
= start_time
or time()
1300 while time() <= start_time
+ timeout
:
1301 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1302 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1303 if desc_status
["status"] == "FAILED":
1304 raise NgRoException(desc_status
["details"])
1305 elif desc_status
["status"] == "BUILD":
1307 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1308 elif desc_status
["status"] == "DONE":
1310 stage
[2] = "Deployed at VIM"
1313 assert False, "ROclient.check_ns_status returns unknown {}".format(
1314 desc_status
["status"]
1316 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1317 detailed_status_old
= stage
[2]
1318 db_nsr_update
["detailed-status"] = " ".join(stage
)
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1321 await asyncio
.sleep(15, loop
=self
.loop
)
1322 else: # timeout_ns_deploy
1323 raise NgRoException("Timeout waiting ns to deploy")
1325 async def _terminate_ng_ro(
1326 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1331 start_deploy
= time()
1338 "action_id": nslcmop_id
,
1340 desc
= await self
.RO
.deploy(nsr_id
, target
)
1341 action_id
= desc
["action_id"]
1342 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1345 + "ns terminate action at RO. action_id={}".format(action_id
)
1349 delete_timeout
= 20 * 60 # 20 minutes
1350 await self
._wait
_ng
_ro
(
1357 operation
="termination",
1359 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1361 await self
.RO
.delete(nsr_id
)
1362 except NgRoException
as e
:
1363 if e
.http_code
== 404: # not found
1364 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1365 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1367 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1369 elif e
.http_code
== 409: # conflict
1370 failed_detail
.append("delete conflict: {}".format(e
))
1373 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1376 failed_detail
.append("delete error: {}".format(e
))
1379 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1381 except Exception as e
:
1382 failed_detail
.append("delete error: {}".format(e
))
1384 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1388 stage
[2] = "Error deleting from VIM"
1390 stage
[2] = "Deleted from VIM"
1391 db_nsr_update
["detailed-status"] = " ".join(stage
)
1392 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1393 self
._write
_op
_status
(nslcmop_id
, stage
)
1396 raise LcmException("; ".join(failed_detail
))
1399 async def instantiate_RO(
1413 :param logging_text: preffix text to use at logging
1414 :param nsr_id: nsr identity
1415 :param nsd: database content of ns descriptor
1416 :param db_nsr: database content of ns record
1417 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1419 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1420 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1421 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1422 :return: None or exception
1425 start_deploy
= time()
1426 ns_params
= db_nslcmop
.get("operationParams")
1427 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1428 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1430 timeout_ns_deploy
= self
.timeout
.ns_deploy
1432 # Check for and optionally request placement optimization. Database will be updated if placement activated
1433 stage
[2] = "Waiting for Placement."
1434 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1435 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1436 for vnfr
in db_vnfrs
.values():
1437 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1440 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1442 return await self
._instantiate
_ng
_ro
(
1455 except Exception as e
:
1456 stage
[2] = "ERROR deploying at VIM"
1457 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1459 "Error deploying at VIM {}".format(e
),
1460 exc_info
=not isinstance(
1463 ROclient
.ROClientException
,
1472 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1474 Wait for kdu to be up, get ip address
1475 :param logging_text: prefix use for logging
1479 :return: IP address, K8s services
1482 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1485 while nb_tries
< 360:
1486 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1490 for x
in get_iterable(db_vnfr
, "kdur")
1491 if x
.get("kdu-name") == kdu_name
1497 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1499 if kdur
.get("status"):
1500 if kdur
["status"] in ("READY", "ENABLED"):
1501 return kdur
.get("ip-address"), kdur
.get("services")
1504 "target KDU={} is in error state".format(kdu_name
)
1507 await asyncio
.sleep(10, loop
=self
.loop
)
1509 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1511 async def wait_vm_up_insert_key_ro(
1512 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1515 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1516 :param logging_text: prefix use for logging
1521 :param pub_key: public ssh key to inject, None to skip
1522 :param user: user to apply the public ssh key
1526 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1528 target_vdu_id
= None
1533 if ro_retries
>= 360: # 1 hour
1535 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1538 await asyncio
.sleep(10, loop
=self
.loop
)
1541 if not target_vdu_id
:
1542 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1544 if not vdu_id
: # for the VNF case
1545 if db_vnfr
.get("status") == "ERROR":
1547 "Cannot inject ssh-key because target VNF is in error state"
1549 ip_address
= db_vnfr
.get("ip-address")
1555 for x
in get_iterable(db_vnfr
, "vdur")
1556 if x
.get("ip-address") == ip_address
1564 for x
in get_iterable(db_vnfr
, "vdur")
1565 if x
.get("vdu-id-ref") == vdu_id
1566 and x
.get("count-index") == vdu_index
1572 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1573 ): # If only one, this should be the target vdu
1574 vdur
= db_vnfr
["vdur"][0]
1577 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1578 vnfr_id
, vdu_id
, vdu_index
1581 # New generation RO stores information at "vim_info"
1584 if vdur
.get("vim_info"):
1586 t
for t
in vdur
["vim_info"]
1587 ) # there should be only one key
1588 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1590 vdur
.get("pdu-type")
1591 or vdur
.get("status") == "ACTIVE"
1592 or ng_ro_status
== "ACTIVE"
1594 ip_address
= vdur
.get("ip-address")
1597 target_vdu_id
= vdur
["vdu-id-ref"]
1598 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1600 "Cannot inject ssh-key because target VM is in error state"
1603 if not target_vdu_id
:
1606 # inject public key into machine
1607 if pub_key
and user
:
1608 self
.logger
.debug(logging_text
+ "Inserting RO key")
1609 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1610 if vdur
.get("pdu-type"):
1611 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1616 "action": "inject_ssh_key",
1620 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1622 desc
= await self
.RO
.deploy(nsr_id
, target
)
1623 action_id
= desc
["action_id"]
1624 await self
._wait
_ng
_ro
(
1625 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1628 except NgRoException
as e
:
1630 "Reaching max tries injecting key. Error: {}".format(e
)
1637 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1639 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1641 my_vca
= vca_deployed_list
[vca_index
]
1642 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1643 # vdu or kdu: no dependencies
1647 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1648 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1649 configuration_status_list
= db_nsr
["configurationStatus"]
1650 for index
, vca_deployed
in enumerate(configuration_status_list
):
1651 if index
== vca_index
:
1654 if not my_vca
.get("member-vnf-index") or (
1655 vca_deployed
.get("member-vnf-index")
1656 == my_vca
.get("member-vnf-index")
1658 internal_status
= configuration_status_list
[index
].get("status")
1659 if internal_status
== "READY":
1661 elif internal_status
== "BROKEN":
1663 "Configuration aborted because dependent charm/s has failed"
1668 # no dependencies, return
1670 await asyncio
.sleep(10)
1673 raise LcmException("Configuration aborted because dependent charm/s timeout")
1675 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1678 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1680 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1681 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1684 async def instantiate_N2VC(
1702 ee_config_descriptor
,
1704 nsr_id
= db_nsr
["_id"]
1705 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1706 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1707 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1708 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1710 "collection": "nsrs",
1711 "filter": {"_id": nsr_id
},
1712 "path": db_update_entry
,
1717 element_under_configuration
= nsr_id
1721 vnfr_id
= db_vnfr
["_id"]
1722 osm_config
["osm"]["vnf_id"] = vnfr_id
1724 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1726 if vca_type
== "native_charm":
1729 index_number
= vdu_index
or 0
1732 element_type
= "VNF"
1733 element_under_configuration
= vnfr_id
1734 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1736 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1737 element_type
= "VDU"
1738 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1739 osm_config
["osm"]["vdu_id"] = vdu_id
1741 namespace
+= ".{}".format(kdu_name
)
1742 element_type
= "KDU"
1743 element_under_configuration
= kdu_name
1744 osm_config
["osm"]["kdu_name"] = kdu_name
1747 if base_folder
["pkg-dir"]:
1748 artifact_path
= "{}/{}/{}/{}".format(
1749 base_folder
["folder"],
1750 base_folder
["pkg-dir"],
1753 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1758 artifact_path
= "{}/Scripts/{}/{}/".format(
1759 base_folder
["folder"],
1762 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1767 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1769 # get initial_config_primitive_list that applies to this element
1770 initial_config_primitive_list
= config_descriptor
.get(
1771 "initial-config-primitive"
1775 "Initial config primitive list > {}".format(
1776 initial_config_primitive_list
1780 # add config if not present for NS charm
1781 ee_descriptor_id
= ee_config_descriptor
.get("id")
1782 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1783 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1784 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1788 "Initial config primitive list #2 > {}".format(
1789 initial_config_primitive_list
1792 # n2vc_redesign STEP 3.1
1793 # find old ee_id if exists
1794 ee_id
= vca_deployed
.get("ee_id")
1796 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1797 # create or register execution environment in VCA
1798 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1799 self
._write
_configuration
_status
(
1801 vca_index
=vca_index
,
1803 element_under_configuration
=element_under_configuration
,
1804 element_type
=element_type
,
1807 step
= "create execution environment"
1808 self
.logger
.debug(logging_text
+ step
)
1812 if vca_type
== "k8s_proxy_charm":
1813 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1814 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1815 namespace
=namespace
,
1816 artifact_path
=artifact_path
,
1820 elif vca_type
== "helm" or vca_type
== "helm-v3":
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1828 artifact_path
=artifact_path
,
1829 chart_model
=vca_name
,
1833 ee_id
, credentials
= await self
.vca_map
[
1835 ].create_execution_environment(
1836 namespace
=namespace
,
1842 elif vca_type
== "native_charm":
1843 step
= "Waiting to VM being up and getting IP address"
1844 self
.logger
.debug(logging_text
+ step
)
1845 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1854 credentials
= {"hostname": rw_mgmt_ip
}
1856 username
= deep_get(
1857 config_descriptor
, ("config-access", "ssh-access", "default-user")
1859 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1860 # merged. Meanwhile let's get username from initial-config-primitive
1861 if not username
and initial_config_primitive_list
:
1862 for config_primitive
in initial_config_primitive_list
:
1863 for param
in config_primitive
.get("parameter", ()):
1864 if param
["name"] == "ssh-username":
1865 username
= param
["value"]
1869 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1870 "'config-access.ssh-access.default-user'"
1872 credentials
["username"] = username
1873 # n2vc_redesign STEP 3.2
1875 self
._write
_configuration
_status
(
1877 vca_index
=vca_index
,
1878 status
="REGISTERING",
1879 element_under_configuration
=element_under_configuration
,
1880 element_type
=element_type
,
1883 step
= "register execution environment {}".format(credentials
)
1884 self
.logger
.debug(logging_text
+ step
)
1885 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1886 credentials
=credentials
,
1887 namespace
=namespace
,
1892 # for compatibility with MON/POL modules, the need model and application name at database
1893 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1894 ee_id_parts
= ee_id
.split(".")
1895 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1896 if len(ee_id_parts
) >= 2:
1897 model_name
= ee_id_parts
[0]
1898 application_name
= ee_id_parts
[1]
1899 db_nsr_update
[db_update_entry
+ "model"] = model_name
1900 db_nsr_update
[db_update_entry
+ "application"] = application_name
1902 # n2vc_redesign STEP 3.3
1903 step
= "Install configuration Software"
1905 self
._write
_configuration
_status
(
1907 vca_index
=vca_index
,
1908 status
="INSTALLING SW",
1909 element_under_configuration
=element_under_configuration
,
1910 element_type
=element_type
,
1911 other_update
=db_nsr_update
,
1914 # TODO check if already done
1915 self
.logger
.debug(logging_text
+ step
)
1917 if vca_type
== "native_charm":
1918 config_primitive
= next(
1919 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1922 if config_primitive
:
1923 config
= self
._map
_primitive
_params
(
1924 config_primitive
, {}, deploy_params
1927 if vca_type
== "lxc_proxy_charm":
1928 if element_type
== "NS":
1929 num_units
= db_nsr
.get("config-units") or 1
1930 elif element_type
== "VNF":
1931 num_units
= db_vnfr
.get("config-units") or 1
1932 elif element_type
== "VDU":
1933 for v
in db_vnfr
["vdur"]:
1934 if vdu_id
== v
["vdu-id-ref"]:
1935 num_units
= v
.get("config-units") or 1
1937 if vca_type
!= "k8s_proxy_charm":
1938 await self
.vca_map
[vca_type
].install_configuration_sw(
1940 artifact_path
=artifact_path
,
1943 num_units
=num_units
,
1948 # write in db flag of configuration_sw already installed
1950 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1953 # add relations for this VCA (wait for other peers related with this VCA)
1954 is_relation_added
= await self
._add
_vca
_relations
(
1955 logging_text
=logging_text
,
1958 vca_index
=vca_index
,
1961 if not is_relation_added
:
1962 raise LcmException("Relations could not be added to VCA.")
1964 # if SSH access is required, then get execution environment SSH public
1965 # if native charm we have waited already to VM be UP
1966 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1969 # self.logger.debug("get ssh key block")
1971 config_descriptor
, ("config-access", "ssh-access", "required")
1973 # self.logger.debug("ssh key needed")
1974 # Needed to inject a ssh key
1977 ("config-access", "ssh-access", "default-user"),
1979 step
= "Install configuration Software, getting public ssh key"
1980 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1981 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1984 step
= "Insert public key into VM user={} ssh_key={}".format(
1988 # self.logger.debug("no need to get ssh key")
1989 step
= "Waiting to VM being up and getting IP address"
1990 self
.logger
.debug(logging_text
+ step
)
1992 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1995 # n2vc_redesign STEP 5.1
1996 # wait for RO (ip-address) Insert pub_key into VM
1999 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2000 logging_text
, nsr_id
, vnfr_id
, kdu_name
2002 vnfd
= self
.db
.get_one(
2004 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2006 kdu
= get_kdu(vnfd
, kdu_name
)
2008 service
["name"] for service
in get_kdu_services(kdu
)
2010 exposed_services
= []
2011 for service
in services
:
2012 if any(s
in service
["name"] for s
in kdu_services
):
2013 exposed_services
.append(service
)
2014 await self
.vca_map
[vca_type
].exec_primitive(
2016 primitive_name
="config",
2018 "osm-config": json
.dumps(
2020 k8s
={"services": exposed_services
}
2027 # This verification is needed in order to avoid trying to add a public key
2028 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2029 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2030 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2032 elif db_vnfr
.get("vdur"):
2033 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2043 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2045 # store rw_mgmt_ip in deploy params for later replacement
2046 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2048 # n2vc_redesign STEP 6 Execute initial config primitive
2049 step
= "execute initial config primitive"
2051 # wait for dependent primitives execution (NS -> VNF -> VDU)
2052 if initial_config_primitive_list
:
2053 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2055 # stage, in function of element type: vdu, kdu, vnf or ns
2056 my_vca
= vca_deployed_list
[vca_index
]
2057 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2059 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2060 elif my_vca
.get("member-vnf-index"):
2062 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2065 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2067 self
._write
_configuration
_status
(
2068 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2071 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2073 check_if_terminated_needed
= True
2074 for initial_config_primitive
in initial_config_primitive_list
:
2075 # adding information on the vca_deployed if it is a NS execution environment
2076 if not vca_deployed
["member-vnf-index"]:
2077 deploy_params
["ns_config_info"] = json
.dumps(
2078 self
._get
_ns
_config
_info
(nsr_id
)
2080 # TODO check if already done
2081 primitive_params_
= self
._map
_primitive
_params
(
2082 initial_config_primitive
, {}, deploy_params
2085 step
= "execute primitive '{}' params '{}'".format(
2086 initial_config_primitive
["name"], primitive_params_
2088 self
.logger
.debug(logging_text
+ step
)
2089 await self
.vca_map
[vca_type
].exec_primitive(
2091 primitive_name
=initial_config_primitive
["name"],
2092 params_dict
=primitive_params_
,
2097 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2098 if check_if_terminated_needed
:
2099 if config_descriptor
.get("terminate-config-primitive"):
2101 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2103 check_if_terminated_needed
= False
2105 # TODO register in database that primitive is done
2107 # STEP 7 Configure metrics
2108 if vca_type
== "helm" or vca_type
== "helm-v3":
2109 # TODO: review for those cases where the helm chart is a reference and
2110 # is not part of the NF package
2111 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2113 artifact_path
=artifact_path
,
2114 ee_config_descriptor
=ee_config_descriptor
,
2117 target_ip
=rw_mgmt_ip
,
2118 element_type
=element_type
,
2119 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2121 vdu_index
=vdu_index
,
2123 kdu_index
=kdu_index
,
2129 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2132 for job
in prometheus_jobs
:
2135 {"job_name": job
["job_name"]},
2138 fail_on_empty
=False,
2141 step
= "instantiated at VCA"
2142 self
.logger
.debug(logging_text
+ step
)
2144 self
._write
_configuration
_status
(
2145 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2148 except Exception as e
: # TODO not use Exception but N2VC exception
2149 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2151 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2154 "Exception while {} : {}".format(step
, e
), exc_info
=True
2156 self
._write
_configuration
_status
(
2157 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2159 raise LcmException("{}. {}".format(step
, e
)) from e
2161 def _write_ns_status(
2165 current_operation
: str,
2166 current_operation_id
: str,
2167 error_description
: str = None,
2168 error_detail
: str = None,
2169 other_update
: dict = None,
2172 Update db_nsr fields.
2175 :param current_operation:
2176 :param current_operation_id:
2177 :param error_description:
2178 :param error_detail:
2179 :param other_update: Other required changes at database if provided, will be cleared
2183 db_dict
= other_update
or {}
2186 ] = current_operation_id
# for backward compatibility
2187 db_dict
["_admin.current-operation"] = current_operation_id
2188 db_dict
["_admin.operation-type"] = (
2189 current_operation
if current_operation
!= "IDLE" else None
2191 db_dict
["currentOperation"] = current_operation
2192 db_dict
["currentOperationID"] = current_operation_id
2193 db_dict
["errorDescription"] = error_description
2194 db_dict
["errorDetail"] = error_detail
2197 db_dict
["nsState"] = ns_state
2198 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2199 except DbException
as e
:
2200 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2202 def _write_op_status(
2206 error_message
: str = None,
2207 queuePosition
: int = 0,
2208 operation_state
: str = None,
2209 other_update
: dict = None,
2212 db_dict
= other_update
or {}
2213 db_dict
["queuePosition"] = queuePosition
2214 if isinstance(stage
, list):
2215 db_dict
["stage"] = stage
[0]
2216 db_dict
["detailed-status"] = " ".join(stage
)
2217 elif stage
is not None:
2218 db_dict
["stage"] = str(stage
)
2220 if error_message
is not None:
2221 db_dict
["errorMessage"] = error_message
2222 if operation_state
is not None:
2223 db_dict
["operationState"] = operation_state
2224 db_dict
["statusEnteredTime"] = time()
2225 self
.update_db_2("nslcmops", op_id
, db_dict
)
2226 except DbException
as e
:
2228 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2231 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2233 nsr_id
= db_nsr
["_id"]
2234 # configurationStatus
2235 config_status
= db_nsr
.get("configurationStatus")
2238 "configurationStatus.{}.status".format(index
): status
2239 for index
, v
in enumerate(config_status
)
2243 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2245 except DbException
as e
:
2247 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2250 def _write_configuration_status(
2255 element_under_configuration
: str = None,
2256 element_type
: str = None,
2257 other_update
: dict = None,
2259 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2260 # .format(vca_index, status))
2263 db_path
= "configurationStatus.{}.".format(vca_index
)
2264 db_dict
= other_update
or {}
2266 db_dict
[db_path
+ "status"] = status
2267 if element_under_configuration
:
2269 db_path
+ "elementUnderConfiguration"
2270 ] = element_under_configuration
2272 db_dict
[db_path
+ "elementType"] = element_type
2273 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2274 except DbException
as e
:
2276 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2277 status
, nsr_id
, vca_index
, e
2281 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2283 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2284 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2285 Database is used because the result can be obtained from a different LCM worker in case of HA.
2286 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2287 :param db_nslcmop: database content of nslcmop
2288 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2289 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2290 computed 'vim-account-id'
2293 nslcmop_id
= db_nslcmop
["_id"]
2294 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2295 if placement_engine
== "PLA":
2297 logging_text
+ "Invoke and wait for placement optimization"
2299 await self
.msg
.aiowrite(
2300 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2302 db_poll_interval
= 5
2303 wait
= db_poll_interval
* 10
2305 while not pla_result
and wait
>= 0:
2306 await asyncio
.sleep(db_poll_interval
)
2307 wait
-= db_poll_interval
2308 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2309 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2313 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2316 for pla_vnf
in pla_result
["vnf"]:
2317 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2318 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2323 {"_id": vnfr
["_id"]},
2324 {"vim-account-id": pla_vnf
["vimAccountId"]},
2327 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2330 def _gather_vnfr_healing_alerts(self
, vnfr
, vnfd
):
2332 nsr_id
= vnfr
["nsr-id-ref"]
2333 df
= vnfd
.get("df", [{}])[0]
2334 # Checking for auto-healing configuration
2335 if "healing-aspect" in df
:
2336 healing_aspects
= df
["healing-aspect"]
2337 for healing
in healing_aspects
:
2338 for healing_policy
in healing
.get("healing-policy", ()):
2339 vdu_id
= healing_policy
["vdu-id"]
2341 (vdur
for vdur
in vnfr
["vdur"] if vdu_id
== vdur
["vdu-id-ref"]),
2346 metric_name
= "vm_status"
2347 vdu_name
= vdur
.get("name")
2348 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2350 name
= f
"healing_{uuid}"
2351 action
= healing_policy
2352 # action_on_recovery = healing.get("action-on-recovery")
2353 # cooldown_time = healing.get("cooldown-time")
2354 # day1 = healing.get("day1")
2358 "metric": metric_name
,
2361 "vnf_member_index": vnf_member_index
,
2362 "vdu_name": vdu_name
,
2364 "alarm_status": "ok",
2365 "action_type": "healing",
2368 alerts
.append(alert
)
2371 def _gather_vnfr_scaling_alerts(self
, vnfr
, vnfd
):
2373 nsr_id
= vnfr
["nsr-id-ref"]
2374 df
= vnfd
.get("df", [{}])[0]
2375 # Checking for auto-scaling configuration
2376 if "scaling-aspect" in df
:
2377 rel_operation_types
= {
2385 scaling_aspects
= df
["scaling-aspect"]
2386 all_vnfd_monitoring_params
= {}
2387 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
2388 for mp
in ivld
.get("monitoring-parameters", ()):
2389 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2390 for vdu
in vnfd
.get("vdu", ()):
2391 for mp
in vdu
.get("monitoring-parameter", ()):
2392 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2393 for df
in vnfd
.get("df", ()):
2394 for mp
in df
.get("monitoring-parameter", ()):
2395 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2396 for scaling_aspect
in scaling_aspects
:
2397 scaling_group_name
= scaling_aspect
.get("name", "")
2398 # Get monitored VDUs
2399 all_monitored_vdus
= set()
2400 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get(
2403 for vdu_delta
in delta
.get("vdu-delta", ()):
2404 all_monitored_vdus
.add(vdu_delta
.get("id"))
2405 monitored_vdurs
= list(
2407 lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
,
2411 if not monitored_vdurs
:
2413 "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
2416 for scaling_policy
in scaling_aspect
.get("scaling-policy", ()):
2417 if scaling_policy
["scaling-type"] != "automatic":
2419 threshold_time
= scaling_policy
.get("threshold-time", "1")
2420 cooldown_time
= scaling_policy
.get("cooldown-time", "0")
2421 for scaling_criteria
in scaling_policy
["scaling-criteria"]:
2422 monitoring_param_ref
= scaling_criteria
.get(
2423 "vnf-monitoring-param-ref"
2425 vnf_monitoring_param
= all_vnfd_monitoring_params
[
2426 monitoring_param_ref
2428 for vdur
in monitored_vdurs
:
2429 vdu_id
= vdur
["vdu-id-ref"]
2430 metric_name
= vnf_monitoring_param
.get("performance-metric")
2431 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2432 scalein_threshold
= scaling_criteria
.get(
2433 "scale-in-threshold"
2435 scaleout_threshold
= scaling_criteria
.get(
2436 "scale-out-threshold"
2438 # Looking for min/max-number-of-instances
2439 instances_min_number
= 1
2440 instances_max_number
= 1
2441 vdu_profile
= df
["vdu-profile"]
2444 item
for item
in vdu_profile
if item
["id"] == vdu_id
2446 instances_min_number
= profile
.get(
2447 "min-number-of-instances", 1
2449 instances_max_number
= profile
.get(
2450 "max-number-of-instances", 1
2453 if scalein_threshold
:
2455 name
= f
"scalein_{uuid}"
2456 operation
= scaling_criteria
[
2457 "scale-in-relational-operation"
2459 rel_operator
= rel_operation_types
.get(operation
, "<=")
2460 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2461 expression
= f
"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
2464 "vnf_member_index": vnf_member_index
,
2470 "for": str(threshold_time
) + "m",
2473 action
= scaling_policy
2475 "scaling-group": scaling_group_name
,
2476 "cooldown-time": cooldown_time
,
2481 "metric": metric_name
,
2484 "vnf_member_index": vnf_member_index
,
2487 "alarm_status": "ok",
2488 "action_type": "scale_in",
2490 "prometheus_config": prom_cfg
,
2492 alerts
.append(alert
)
2494 if scaleout_threshold
:
2496 name
= f
"scaleout_{uuid}"
2497 operation
= scaling_criteria
[
2498 "scale-out-relational-operation"
2500 rel_operator
= rel_operation_types
.get(operation
, "<=")
2501 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2502 expression
= f
"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
2505 "vnf_member_index": vnf_member_index
,
2511 "for": str(threshold_time
) + "m",
2514 action
= scaling_policy
2516 "scaling-group": scaling_group_name
,
2517 "cooldown-time": cooldown_time
,
2522 "metric": metric_name
,
2525 "vnf_member_index": vnf_member_index
,
2528 "alarm_status": "ok",
2529 "action_type": "scale_out",
2531 "prometheus_config": prom_cfg
,
2533 alerts
.append(alert
)
2536 def update_nsrs_with_pla_result(self
, params
):
2538 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2540 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2542 except Exception as e
:
2543 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2545 async def instantiate(self
, nsr_id
, nslcmop_id
):
2548 :param nsr_id: ns instance to deploy
2549 :param nslcmop_id: operation to run
2553 # Try to lock HA task here
2554 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2555 if not task_is_locked_by_me
:
2557 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2561 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2562 self
.logger
.debug(logging_text
+ "Enter")
2564 # get all needed from database
2566 # database nsrs record
2569 # database nslcmops record
2572 # update operation on nsrs
2574 # update operation on nslcmops
2575 db_nslcmop_update
= {}
2577 timeout_ns_deploy
= self
.timeout
.ns_deploy
2579 nslcmop_operation_state
= None
2580 db_vnfrs
= {} # vnf's info indexed by member-index
2582 tasks_dict_info
= {} # from task to info text
2586 "Stage 1/5: preparation of the environment.",
2587 "Waiting for previous operations to terminate.",
2590 # ^ stage, step, VIM progress
2592 # wait for any previous tasks in process
2593 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2595 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2596 stage
[1] = "Reading from database."
2597 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2598 db_nsr_update
["detailed-status"] = "creating"
2599 db_nsr_update
["operational-status"] = "init"
2600 self
._write
_ns
_status
(
2602 ns_state
="BUILDING",
2603 current_operation
="INSTANTIATING",
2604 current_operation_id
=nslcmop_id
,
2605 other_update
=db_nsr_update
,
2607 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2609 # read from db: operation
2610 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2611 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2612 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2613 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2614 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2616 ns_params
= db_nslcmop
.get("operationParams")
2617 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2618 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2621 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2622 self
.logger
.debug(logging_text
+ stage
[1])
2623 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2624 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2625 self
.logger
.debug(logging_text
+ stage
[1])
2626 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2627 self
.fs
.sync(db_nsr
["nsd-id"])
2629 # nsr_name = db_nsr["name"] # TODO short-name??
2631 # read from db: vnf's of this ns
2632 stage
[1] = "Getting vnfrs from db."
2633 self
.logger
.debug(logging_text
+ stage
[1])
2634 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2636 # read from db: vnfd's for every vnf
2637 db_vnfds
= [] # every vnfd data
2639 # for each vnf in ns, read vnfd
2640 for vnfr
in db_vnfrs_list
:
2641 if vnfr
.get("kdur"):
2643 for kdur
in vnfr
["kdur"]:
2644 if kdur
.get("additionalParams"):
2645 kdur
["additionalParams"] = json
.loads(
2646 kdur
["additionalParams"]
2648 kdur_list
.append(kdur
)
2649 vnfr
["kdur"] = kdur_list
2651 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2652 vnfd_id
= vnfr
["vnfd-id"]
2653 vnfd_ref
= vnfr
["vnfd-ref"]
2654 self
.fs
.sync(vnfd_id
)
2656 # if we haven't this vnfd, read it from db
2657 if vnfd_id
not in db_vnfds
:
2659 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2662 self
.logger
.debug(logging_text
+ stage
[1])
2663 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2666 db_vnfds
.append(vnfd
)
2668 # Get or generates the _admin.deployed.VCA list
2669 vca_deployed_list
= None
2670 if db_nsr
["_admin"].get("deployed"):
2671 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2672 if vca_deployed_list
is None:
2673 vca_deployed_list
= []
2674 configuration_status_list
= []
2675 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2676 db_nsr_update
["configurationStatus"] = configuration_status_list
2677 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2678 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2679 elif isinstance(vca_deployed_list
, dict):
2680 # maintain backward compatibility. Change a dict to list at database
2681 vca_deployed_list
= list(vca_deployed_list
.values())
2682 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2683 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2686 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2688 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2689 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2691 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2692 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2693 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2695 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2698 # n2vc_redesign STEP 2 Deploy Network Scenario
2699 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2700 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2702 stage
[1] = "Deploying KDUs."
2703 # self.logger.debug(logging_text + "Before deploy_kdus")
2704 # Call to deploy_kdus in case exists the "vdu:kdu" param
2705 await self
.deploy_kdus(
2706 logging_text
=logging_text
,
2708 nslcmop_id
=nslcmop_id
,
2711 task_instantiation_info
=tasks_dict_info
,
2714 stage
[1] = "Getting VCA public key."
2715 # n2vc_redesign STEP 1 Get VCA public ssh-key
2716 # feature 1429. Add n2vc public key to needed VMs
2717 n2vc_key
= self
.n2vc
.get_public_key()
2718 n2vc_key_list
= [n2vc_key
]
2719 if self
.vca_config
.public_key
:
2720 n2vc_key_list
.append(self
.vca_config
.public_key
)
2722 stage
[1] = "Deploying NS at VIM."
2723 task_ro
= asyncio
.ensure_future(
2724 self
.instantiate_RO(
2725 logging_text
=logging_text
,
2729 db_nslcmop
=db_nslcmop
,
2732 n2vc_key_list
=n2vc_key_list
,
2736 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2737 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2739 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2740 stage
[1] = "Deploying Execution Environments."
2741 self
.logger
.debug(logging_text
+ stage
[1])
2743 # create namespace and certificate if any helm based EE is present in the NS
2744 if check_helm_ee_in_ns(db_vnfds
):
2745 # TODO: create EE namespace
2746 # create TLS certificates
2747 await self
.vca_map
["helm-v3"].create_tls_certificate(
2748 secret_name
="ee-tls-{}".format(nsr_id
),
2751 usage
="server auth",
2754 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2755 for vnf_profile
in get_vnf_profiles(nsd
):
2756 vnfd_id
= vnf_profile
["vnfd-id"]
2757 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2758 member_vnf_index
= str(vnf_profile
["id"])
2759 db_vnfr
= db_vnfrs
[member_vnf_index
]
2760 base_folder
= vnfd
["_admin"]["storage"]
2767 # Get additional parameters
2768 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2769 if db_vnfr
.get("additionalParamsForVnf"):
2770 deploy_params
.update(
2771 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2774 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2775 if descriptor_config
:
2777 logging_text
=logging_text
2778 + "member_vnf_index={} ".format(member_vnf_index
),
2781 nslcmop_id
=nslcmop_id
,
2787 member_vnf_index
=member_vnf_index
,
2788 vdu_index
=vdu_index
,
2789 kdu_index
=kdu_index
,
2791 deploy_params
=deploy_params
,
2792 descriptor_config
=descriptor_config
,
2793 base_folder
=base_folder
,
2794 task_instantiation_info
=tasks_dict_info
,
2798 # Deploy charms for each VDU that supports one.
2799 for vdud
in get_vdu_list(vnfd
):
2801 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2802 vdur
= find_in_list(
2803 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2806 if vdur
.get("additionalParams"):
2807 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2809 deploy_params_vdu
= deploy_params
2810 deploy_params_vdu
["OSM"] = get_osm_params(
2811 db_vnfr
, vdu_id
, vdu_count_index
=0
2813 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2815 self
.logger
.debug("VDUD > {}".format(vdud
))
2817 "Descriptor config > {}".format(descriptor_config
)
2819 if descriptor_config
:
2823 for vdu_index
in range(vdud_count
):
2824 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2826 logging_text
=logging_text
2827 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2828 member_vnf_index
, vdu_id
, vdu_index
2832 nslcmop_id
=nslcmop_id
,
2838 kdu_index
=kdu_index
,
2839 member_vnf_index
=member_vnf_index
,
2840 vdu_index
=vdu_index
,
2842 deploy_params
=deploy_params_vdu
,
2843 descriptor_config
=descriptor_config
,
2844 base_folder
=base_folder
,
2845 task_instantiation_info
=tasks_dict_info
,
2848 for kdud
in get_kdu_list(vnfd
):
2849 kdu_name
= kdud
["name"]
2850 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2851 if descriptor_config
:
2855 kdu_index
, kdur
= next(
2857 for x
in enumerate(db_vnfr
["kdur"])
2858 if x
[1]["kdu-name"] == kdu_name
2860 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2861 if kdur
.get("additionalParams"):
2862 deploy_params_kdu
.update(
2863 parse_yaml_strings(kdur
["additionalParams"].copy())
2867 logging_text
=logging_text
,
2870 nslcmop_id
=nslcmop_id
,
2876 member_vnf_index
=member_vnf_index
,
2877 vdu_index
=vdu_index
,
2878 kdu_index
=kdu_index
,
2880 deploy_params
=deploy_params_kdu
,
2881 descriptor_config
=descriptor_config
,
2882 base_folder
=base_folder
,
2883 task_instantiation_info
=tasks_dict_info
,
2887 # Check if this NS has a charm configuration
2888 descriptor_config
= nsd
.get("ns-configuration")
2889 if descriptor_config
and descriptor_config
.get("juju"):
2892 member_vnf_index
= None
2899 # Get additional parameters
2900 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2901 if db_nsr
.get("additionalParamsForNs"):
2902 deploy_params
.update(
2903 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2905 base_folder
= nsd
["_admin"]["storage"]
2907 logging_text
=logging_text
,
2910 nslcmop_id
=nslcmop_id
,
2916 member_vnf_index
=member_vnf_index
,
2917 vdu_index
=vdu_index
,
2918 kdu_index
=kdu_index
,
2920 deploy_params
=deploy_params
,
2921 descriptor_config
=descriptor_config
,
2922 base_folder
=base_folder
,
2923 task_instantiation_info
=tasks_dict_info
,
2927 # rest of staff will be done at finally
2930 ROclient
.ROClientException
,
2936 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2939 except asyncio
.CancelledError
:
2941 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2943 exc
= "Operation was cancelled"
2944 except Exception as e
:
2945 exc
= traceback
.format_exc()
2946 self
.logger
.critical(
2947 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2952 error_list
.append(str(exc
))
2954 # wait for pending tasks
2956 stage
[1] = "Waiting for instantiate pending tasks."
2957 self
.logger
.debug(logging_text
+ stage
[1])
2958 error_list
+= await self
._wait
_for
_tasks
(
2966 stage
[1] = stage
[2] = ""
2967 except asyncio
.CancelledError
:
2968 error_list
.append("Cancelled")
2969 # TODO cancel all tasks
2970 except Exception as exc
:
2971 error_list
.append(str(exc
))
2973 # update operation-status
2974 db_nsr_update
["operational-status"] = "running"
2975 # let's begin with VCA 'configured' status (later we can change it)
2976 db_nsr_update
["config-status"] = "configured"
2977 for task
, task_name
in tasks_dict_info
.items():
2978 if not task
.done() or task
.cancelled() or task
.exception():
2979 if task_name
.startswith(self
.task_name_deploy_vca
):
2980 # A N2VC task is pending
2981 db_nsr_update
["config-status"] = "failed"
2983 # RO or KDU task is pending
2984 db_nsr_update
["operational-status"] = "failed"
2986 # update status at database
2988 error_detail
= ". ".join(error_list
)
2989 self
.logger
.error(logging_text
+ error_detail
)
2990 error_description_nslcmop
= "{} Detail: {}".format(
2991 stage
[0], error_detail
2993 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2994 nslcmop_id
, stage
[0]
2997 db_nsr_update
["detailed-status"] = (
2998 error_description_nsr
+ " Detail: " + error_detail
3000 db_nslcmop_update
["detailed-status"] = error_detail
3001 nslcmop_operation_state
= "FAILED"
3005 error_description_nsr
= error_description_nslcmop
= None
3007 db_nsr_update
["detailed-status"] = "Done"
3008 db_nslcmop_update
["detailed-status"] = "Done"
3009 nslcmop_operation_state
= "COMPLETED"
3010 # Gather auto-healing and auto-scaling alerts for each vnfr
3013 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3015 (sub
for sub
in db_vnfds
if sub
["_id"] == vnfr
["vnfd-id"]), None
3017 healing_alerts
= self
._gather
_vnfr
_healing
_alerts
(vnfr
, vnfd
)
3018 for alert
in healing_alerts
:
3019 self
.logger
.info(f
"Storing healing alert in MongoDB: {alert}")
3020 self
.db
.create("alerts", alert
)
3022 scaling_alerts
= self
._gather
_vnfr
_scaling
_alerts
(vnfr
, vnfd
)
3023 for alert
in scaling_alerts
:
3024 self
.logger
.info(f
"Storing scaling alert in MongoDB: {alert}")
3025 self
.db
.create("alerts", alert
)
3028 self
._write
_ns
_status
(
3031 current_operation
="IDLE",
3032 current_operation_id
=None,
3033 error_description
=error_description_nsr
,
3034 error_detail
=error_detail
,
3035 other_update
=db_nsr_update
,
3037 self
._write
_op
_status
(
3040 error_message
=error_description_nslcmop
,
3041 operation_state
=nslcmop_operation_state
,
3042 other_update
=db_nslcmop_update
,
3045 if nslcmop_operation_state
:
3047 await self
.msg
.aiowrite(
3052 "nslcmop_id": nslcmop_id
,
3053 "operationState": nslcmop_operation_state
,
3057 except Exception as e
:
3059 logging_text
+ "kafka_write notification Exception {}".format(e
)
3062 self
.logger
.debug(logging_text
+ "Exit")
3063 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
3065 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
3066 if vnfd_id
not in cached_vnfds
:
3067 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
3068 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
3070 return cached_vnfds
[vnfd_id
]
3072 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
3073 if vnf_profile_id
not in cached_vnfrs
:
3074 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
3077 "member-vnf-index-ref": vnf_profile_id
,
3078 "nsr-id-ref": nsr_id
,
3081 return cached_vnfrs
[vnf_profile_id
]
3083 def _is_deployed_vca_in_relation(
3084 self
, vca
: DeployedVCA
, relation
: Relation
3087 for endpoint
in (relation
.provider
, relation
.requirer
):
3088 if endpoint
["kdu-resource-profile-id"]:
3091 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
3092 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
3093 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
3099 def _update_ee_relation_data_with_implicit_data(
3100 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
3102 ee_relation_data
= safe_get_ee_relation(
3103 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
3105 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
3106 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
3107 "execution-environment-ref"
3109 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
3110 vnfd_id
= vnf_profile
["vnfd-id"]
3111 project
= nsd
["_admin"]["projects_read"][0]
3112 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3115 if ee_relation_level
== EELevel
.VNF
3116 else ee_relation_data
["vdu-profile-id"]
3118 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
3121 f
"not execution environments found for ee_relation {ee_relation_data}"
3123 ee_relation_data
["execution-environment-ref"] = ee
["id"]
3124 return ee_relation_data
3126 def _get_ns_relations(
3129 nsd
: Dict
[str, Any
],
3131 cached_vnfds
: Dict
[str, Any
],
3132 ) -> List
[Relation
]:
3134 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3135 for r
in db_ns_relations
:
3136 provider_dict
= None
3137 requirer_dict
= None
3138 if all(key
in r
for key
in ("provider", "requirer")):
3139 provider_dict
= r
["provider"]
3140 requirer_dict
= r
["requirer"]
3141 elif "entities" in r
:
3142 provider_id
= r
["entities"][0]["id"]
3145 "endpoint": r
["entities"][0]["endpoint"],
3147 if provider_id
!= nsd
["id"]:
3148 provider_dict
["vnf-profile-id"] = provider_id
3149 requirer_id
= r
["entities"][1]["id"]
3152 "endpoint": r
["entities"][1]["endpoint"],
3154 if requirer_id
!= nsd
["id"]:
3155 requirer_dict
["vnf-profile-id"] = requirer_id
3158 "provider/requirer or entities must be included in the relation."
3160 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3161 nsr_id
, nsd
, provider_dict
, cached_vnfds
3163 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3164 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3166 provider
= EERelation(relation_provider
)
3167 requirer
= EERelation(relation_requirer
)
3168 relation
= Relation(r
["name"], provider
, requirer
)
3169 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3171 relations
.append(relation
)
3174 def _get_vnf_relations(
3177 nsd
: Dict
[str, Any
],
3179 cached_vnfds
: Dict
[str, Any
],
3180 ) -> List
[Relation
]:
3182 if vca
.target_element
== "ns":
3183 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3185 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3186 vnf_profile_id
= vnf_profile
["id"]
3187 vnfd_id
= vnf_profile
["vnfd-id"]
3188 project
= nsd
["_admin"]["projects_read"][0]
3189 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3190 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3191 for r
in db_vnf_relations
:
3192 provider_dict
= None
3193 requirer_dict
= None
3194 if all(key
in r
for key
in ("provider", "requirer")):
3195 provider_dict
= r
["provider"]
3196 requirer_dict
= r
["requirer"]
3197 elif "entities" in r
:
3198 provider_id
= r
["entities"][0]["id"]
3201 "vnf-profile-id": vnf_profile_id
,
3202 "endpoint": r
["entities"][0]["endpoint"],
3204 if provider_id
!= vnfd_id
:
3205 provider_dict
["vdu-profile-id"] = provider_id
3206 requirer_id
= r
["entities"][1]["id"]
3209 "vnf-profile-id": vnf_profile_id
,
3210 "endpoint": r
["entities"][1]["endpoint"],
3212 if requirer_id
!= vnfd_id
:
3213 requirer_dict
["vdu-profile-id"] = requirer_id
3216 "provider/requirer or entities must be included in the relation."
3218 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3219 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3221 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3222 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3224 provider
= EERelation(relation_provider
)
3225 requirer
= EERelation(relation_requirer
)
3226 relation
= Relation(r
["name"], provider
, requirer
)
3227 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3229 relations
.append(relation
)
3232 def _get_kdu_resource_data(
3234 ee_relation
: EERelation
,
3235 db_nsr
: Dict
[str, Any
],
3236 cached_vnfds
: Dict
[str, Any
],
3237 ) -> DeployedK8sResource
:
3238 nsd
= get_nsd(db_nsr
)
3239 vnf_profiles
= get_vnf_profiles(nsd
)
3240 vnfd_id
= find_in_list(
3242 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3244 project
= nsd
["_admin"]["projects_read"][0]
3245 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3246 kdu_resource_profile
= get_kdu_resource_profile(
3247 db_vnfd
, ee_relation
.kdu_resource_profile_id
3249 kdu_name
= kdu_resource_profile
["kdu-name"]
3250 deployed_kdu
, _
= get_deployed_kdu(
3251 db_nsr
.get("_admin", ()).get("deployed", ()),
3253 ee_relation
.vnf_profile_id
,
3255 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3258 def _get_deployed_component(
3260 ee_relation
: EERelation
,
3261 db_nsr
: Dict
[str, Any
],
3262 cached_vnfds
: Dict
[str, Any
],
3263 ) -> DeployedComponent
:
3264 nsr_id
= db_nsr
["_id"]
3265 deployed_component
= None
3266 ee_level
= EELevel
.get_level(ee_relation
)
3267 if ee_level
== EELevel
.NS
:
3268 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3270 deployed_component
= DeployedVCA(nsr_id
, vca
)
3271 elif ee_level
== EELevel
.VNF
:
3272 vca
= get_deployed_vca(
3276 "member-vnf-index": ee_relation
.vnf_profile_id
,
3277 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3281 deployed_component
= DeployedVCA(nsr_id
, vca
)
3282 elif ee_level
== EELevel
.VDU
:
3283 vca
= get_deployed_vca(
3286 "vdu_id": ee_relation
.vdu_profile_id
,
3287 "member-vnf-index": ee_relation
.vnf_profile_id
,
3288 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3292 deployed_component
= DeployedVCA(nsr_id
, vca
)
3293 elif ee_level
== EELevel
.KDU
:
3294 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3295 ee_relation
, db_nsr
, cached_vnfds
3297 if kdu_resource_data
:
3298 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3299 return deployed_component
3301 async def _add_relation(
3305 db_nsr
: Dict
[str, Any
],
3306 cached_vnfds
: Dict
[str, Any
],
3307 cached_vnfrs
: Dict
[str, Any
],
3309 deployed_provider
= self
._get
_deployed
_component
(
3310 relation
.provider
, db_nsr
, cached_vnfds
3312 deployed_requirer
= self
._get
_deployed
_component
(
3313 relation
.requirer
, db_nsr
, cached_vnfds
3317 and deployed_requirer
3318 and deployed_provider
.config_sw_installed
3319 and deployed_requirer
.config_sw_installed
3321 provider_db_vnfr
= (
3323 relation
.provider
.nsr_id
,
3324 relation
.provider
.vnf_profile_id
,
3327 if relation
.provider
.vnf_profile_id
3330 requirer_db_vnfr
= (
3332 relation
.requirer
.nsr_id
,
3333 relation
.requirer
.vnf_profile_id
,
3336 if relation
.requirer
.vnf_profile_id
3339 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3340 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3341 provider_relation_endpoint
= RelationEndpoint(
3342 deployed_provider
.ee_id
,
3344 relation
.provider
.endpoint
,
3346 requirer_relation_endpoint
= RelationEndpoint(
3347 deployed_requirer
.ee_id
,
3349 relation
.requirer
.endpoint
,
3352 await self
.vca_map
[vca_type
].add_relation(
3353 provider
=provider_relation_endpoint
,
3354 requirer
=requirer_relation_endpoint
,
3356 except N2VCException
as exception
:
3357 self
.logger
.error(exception
)
3358 raise LcmException(exception
)
3362 async def _add_vca_relations(
3368 timeout
: int = 3600,
3371 # 1. find all relations for this VCA
3372 # 2. wait for other peers related
3376 # STEP 1: find all relations for this VCA
3379 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3380 nsd
= get_nsd(db_nsr
)
3383 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3384 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3389 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3390 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3392 # if no relations, terminate
3394 self
.logger
.debug(logging_text
+ " No relations")
3397 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3404 if now
- start
>= timeout
:
3405 self
.logger
.error(logging_text
+ " : timeout adding relations")
3408 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3409 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3411 # for each relation, find the VCA's related
3412 for relation
in relations
.copy():
3413 added
= await self
._add
_relation
(
3421 relations
.remove(relation
)
3424 self
.logger
.debug("Relations added")
3426 await asyncio
.sleep(5.0)
3430 except Exception as e
:
3431 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3434 async def _install_kdu(
3442 k8s_instance_info
: dict,
3443 k8params
: dict = None,
3448 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3451 "collection": "nsrs",
3452 "filter": {"_id": nsr_id
},
3453 "path": nsr_db_path
,
3456 if k8s_instance_info
.get("kdu-deployment-name"):
3457 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3459 kdu_instance
= self
.k8scluster_map
[
3461 ].generate_kdu_instance_name(
3462 db_dict
=db_dict_install
,
3463 kdu_model
=k8s_instance_info
["kdu-model"],
3464 kdu_name
=k8s_instance_info
["kdu-name"],
3467 # Update the nsrs table with the kdu-instance value
3471 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3474 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3475 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3476 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3477 # namespace, this first verification could be removed, and the next step would be done for any kind
3479 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3480 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3481 if k8sclustertype
in ("juju", "juju-bundle"):
3482 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3483 # that the user passed a namespace which he wants its KDU to be deployed in)
3489 "_admin.projects_write": k8s_instance_info
["namespace"],
3490 "_admin.projects_read": k8s_instance_info
["namespace"],
3496 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3501 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3503 k8s_instance_info
["namespace"] = kdu_instance
3505 await self
.k8scluster_map
[k8sclustertype
].install(
3506 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3507 kdu_model
=k8s_instance_info
["kdu-model"],
3510 db_dict
=db_dict_install
,
3512 kdu_name
=k8s_instance_info
["kdu-name"],
3513 namespace
=k8s_instance_info
["namespace"],
3514 kdu_instance
=kdu_instance
,
3518 # Obtain services to obtain management service ip
3519 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3520 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3521 kdu_instance
=kdu_instance
,
3522 namespace
=k8s_instance_info
["namespace"],
3525 # Obtain management service info (if exists)
3526 vnfr_update_dict
= {}
3527 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3529 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3534 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3537 for service
in kdud
.get("service", [])
3538 if service
.get("mgmt-service")
3540 for mgmt_service
in mgmt_services
:
3541 for service
in services
:
3542 if service
["name"].startswith(mgmt_service
["name"]):
3543 # Mgmt service found, Obtain service ip
3544 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3545 if isinstance(ip
, list) and len(ip
) == 1:
3549 "kdur.{}.ip-address".format(kdu_index
)
3552 # Check if must update also mgmt ip at the vnf
3553 service_external_cp
= mgmt_service
.get(
3554 "external-connection-point-ref"
3556 if service_external_cp
:
3558 deep_get(vnfd
, ("mgmt-interface", "cp"))
3559 == service_external_cp
3561 vnfr_update_dict
["ip-address"] = ip
3566 "external-connection-point-ref", ""
3568 == service_external_cp
,
3571 "kdur.{}.ip-address".format(kdu_index
)
3576 "Mgmt service name: {} not found".format(
3577 mgmt_service
["name"]
3581 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3582 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3584 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3587 and kdu_config
.get("initial-config-primitive")
3588 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3590 initial_config_primitive_list
= kdu_config
.get(
3591 "initial-config-primitive"
3593 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3595 for initial_config_primitive
in initial_config_primitive_list
:
3596 primitive_params_
= self
._map
_primitive
_params
(
3597 initial_config_primitive
, {}, {}
3600 await asyncio
.wait_for(
3601 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3602 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3603 kdu_instance
=kdu_instance
,
3604 primitive_name
=initial_config_primitive
["name"],
3605 params
=primitive_params_
,
3606 db_dict
=db_dict_install
,
3612 except Exception as e
:
3613 # Prepare update db with error and raise exception
3616 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3620 vnfr_data
.get("_id"),
3621 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3624 # ignore to keep original exception
3626 # reraise original error
3631 async def deploy_kdus(
3638 task_instantiation_info
,
3640 # Launch kdus if present in the descriptor
3642 k8scluster_id_2_uuic
= {
3643 "helm-chart-v3": {},
3648 async def _get_cluster_id(cluster_id
, cluster_type
):
3649 nonlocal k8scluster_id_2_uuic
3650 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3651 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3653 # check if K8scluster is creating and wait look if previous tasks in process
3654 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3655 "k8scluster", cluster_id
3658 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3659 task_name
, cluster_id
3661 self
.logger
.debug(logging_text
+ text
)
3662 await asyncio
.wait(task_dependency
, timeout
=3600)
3664 db_k8scluster
= self
.db
.get_one(
3665 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3667 if not db_k8scluster
:
3668 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3670 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3672 if cluster_type
== "helm-chart-v3":
3674 # backward compatibility for existing clusters that have not been initialized for helm v3
3675 k8s_credentials
= yaml
.safe_dump(
3676 db_k8scluster
.get("credentials")
3678 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3679 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3681 db_k8scluster_update
= {}
3682 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3683 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3684 db_k8scluster_update
[
3685 "_admin.helm-chart-v3.created"
3687 db_k8scluster_update
[
3688 "_admin.helm-chart-v3.operationalState"
3691 "k8sclusters", cluster_id
, db_k8scluster_update
3693 except Exception as e
:
3696 + "error initializing helm-v3 cluster: {}".format(str(e
))
3699 "K8s cluster '{}' has not been initialized for '{}'".format(
3700 cluster_id
, cluster_type
3705 "K8s cluster '{}' has not been initialized for '{}'".format(
3706 cluster_id
, cluster_type
3709 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3712 logging_text
+= "Deploy kdus: "
3715 db_nsr_update
= {"_admin.deployed.K8s": []}
3716 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3719 updated_cluster_list
= []
3720 updated_v3_cluster_list
= []
3722 for vnfr_data
in db_vnfrs
.values():
3723 vca_id
= self
.get_vca_id(vnfr_data
, {})
3724 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3725 # Step 0: Prepare and set parameters
3726 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3727 vnfd_id
= vnfr_data
.get("vnfd-id")
3728 vnfd_with_id
= find_in_list(
3729 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3733 for kdud
in vnfd_with_id
["kdu"]
3734 if kdud
["name"] == kdur
["kdu-name"]
3736 namespace
= kdur
.get("k8s-namespace")
3737 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3738 if kdur
.get("helm-chart"):
3739 kdumodel
= kdur
["helm-chart"]
3740 # Default version: helm3, if helm-version is v2 assign v2
3741 k8sclustertype
= "helm-chart-v3"
3742 self
.logger
.debug("kdur: {}".format(kdur
))
3744 kdur
.get("helm-version")
3745 and kdur
.get("helm-version") == "v2"
3747 k8sclustertype
= "helm-chart"
3748 elif kdur
.get("juju-bundle"):
3749 kdumodel
= kdur
["juju-bundle"]
3750 k8sclustertype
= "juju-bundle"
3753 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3754 "juju-bundle. Maybe an old NBI version is running".format(
3755 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3758 # check if kdumodel is a file and exists
3760 vnfd_with_id
= find_in_list(
3761 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3763 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3764 if storage
: # may be not present if vnfd has not artifacts
3765 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3766 if storage
["pkg-dir"]:
3767 filename
= "{}/{}/{}s/{}".format(
3774 filename
= "{}/Scripts/{}s/{}".format(
3779 if self
.fs
.file_exists(
3780 filename
, mode
="file"
3781 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3782 kdumodel
= self
.fs
.path
+ filename
3783 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3785 except Exception: # it is not a file
3788 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3789 step
= "Synchronize repos for k8s cluster '{}'".format(
3792 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3796 k8sclustertype
== "helm-chart"
3797 and cluster_uuid
not in updated_cluster_list
3799 k8sclustertype
== "helm-chart-v3"
3800 and cluster_uuid
not in updated_v3_cluster_list
3802 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3803 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3804 cluster_uuid
=cluster_uuid
3807 if del_repo_list
or added_repo_dict
:
3808 if k8sclustertype
== "helm-chart":
3810 "_admin.helm_charts_added." + item
: None
3811 for item
in del_repo_list
3814 "_admin.helm_charts_added." + item
: name
3815 for item
, name
in added_repo_dict
.items()
3817 updated_cluster_list
.append(cluster_uuid
)
3818 elif k8sclustertype
== "helm-chart-v3":
3820 "_admin.helm_charts_v3_added." + item
: None
3821 for item
in del_repo_list
3824 "_admin.helm_charts_v3_added." + item
: name
3825 for item
, name
in added_repo_dict
.items()
3827 updated_v3_cluster_list
.append(cluster_uuid
)
3829 logging_text
+ "repos synchronized on k8s cluster "
3830 "'{}' to_delete: {}, to_add: {}".format(
3831 k8s_cluster_id
, del_repo_list
, added_repo_dict
3836 {"_id": k8s_cluster_id
},
3842 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3843 vnfr_data
["member-vnf-index-ref"],
3847 k8s_instance_info
= {
3848 "kdu-instance": None,
3849 "k8scluster-uuid": cluster_uuid
,
3850 "k8scluster-type": k8sclustertype
,
3851 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3852 "kdu-name": kdur
["kdu-name"],
3853 "kdu-model": kdumodel
,
3854 "namespace": namespace
,
3855 "kdu-deployment-name": kdu_deployment_name
,
3857 db_path
= "_admin.deployed.K8s.{}".format(index
)
3858 db_nsr_update
[db_path
] = k8s_instance_info
3859 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3860 vnfd_with_id
= find_in_list(
3861 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3863 task
= asyncio
.ensure_future(
3872 k8params
=desc_params
,
3877 self
.lcm_tasks
.register(
3881 "instantiate_KDU-{}".format(index
),
3884 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3890 except (LcmException
, asyncio
.CancelledError
):
3892 except Exception as e
:
3893 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3894 if isinstance(e
, (N2VCException
, DbException
)):
3895 self
.logger
.error(logging_text
+ msg
)
3897 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3898 raise LcmException(msg
)
3901 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3921 task_instantiation_info
,
3924 # launch instantiate_N2VC in a asyncio task and register task object
3925 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3926 # if not found, create one entry and update database
3927 # fill db_nsr._admin.deployed.VCA.<index>
3930 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3934 get_charm_name
= False
3935 if "execution-environment-list" in descriptor_config
:
3936 ee_list
= descriptor_config
.get("execution-environment-list", [])
3937 elif "juju" in descriptor_config
:
3938 ee_list
= [descriptor_config
] # ns charms
3939 if "execution-environment-list" not in descriptor_config
:
3940 # charm name is only required for ns charms
3941 get_charm_name
= True
3942 else: # other types as script are not supported
3945 for ee_item
in ee_list
:
3948 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3949 ee_item
.get("juju"), ee_item
.get("helm-chart")
3952 ee_descriptor_id
= ee_item
.get("id")
3953 if ee_item
.get("juju"):
3954 vca_name
= ee_item
["juju"].get("charm")
3956 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3959 if ee_item
["juju"].get("charm") is not None
3962 if ee_item
["juju"].get("cloud") == "k8s":
3963 vca_type
= "k8s_proxy_charm"
3964 elif ee_item
["juju"].get("proxy") is False:
3965 vca_type
= "native_charm"
3966 elif ee_item
.get("helm-chart"):
3967 vca_name
= ee_item
["helm-chart"]
3968 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3971 vca_type
= "helm-v3"
3974 logging_text
+ "skipping non juju neither charm configuration"
3979 for vca_index
, vca_deployed
in enumerate(
3980 db_nsr
["_admin"]["deployed"]["VCA"]
3982 if not vca_deployed
:
3985 vca_deployed
.get("member-vnf-index") == member_vnf_index
3986 and vca_deployed
.get("vdu_id") == vdu_id
3987 and vca_deployed
.get("kdu_name") == kdu_name
3988 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3989 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3993 # not found, create one.
3995 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3998 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
4000 target
+= "/kdu/{}".format(kdu_name
)
4002 "target_element": target
,
4003 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
4004 "member-vnf-index": member_vnf_index
,
4006 "kdu_name": kdu_name
,
4007 "vdu_count_index": vdu_index
,
4008 "operational-status": "init", # TODO revise
4009 "detailed-status": "", # TODO revise
4010 "step": "initial-deploy", # TODO revise
4012 "vdu_name": vdu_name
,
4014 "ee_descriptor_id": ee_descriptor_id
,
4015 "charm_name": charm_name
,
4019 # create VCA and configurationStatus in db
4021 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
4022 "configurationStatus.{}".format(vca_index
): dict(),
4024 self
.update_db_2("nsrs", nsr_id
, db_dict
)
4026 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
4028 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
4029 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
4030 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
4033 task_n2vc
= asyncio
.ensure_future(
4034 self
.instantiate_N2VC(
4035 logging_text
=logging_text
,
4036 vca_index
=vca_index
,
4042 vdu_index
=vdu_index
,
4043 kdu_index
=kdu_index
,
4044 deploy_params
=deploy_params
,
4045 config_descriptor
=descriptor_config
,
4046 base_folder
=base_folder
,
4047 nslcmop_id
=nslcmop_id
,
4051 ee_config_descriptor
=ee_item
,
4054 self
.lcm_tasks
.register(
4058 "instantiate_N2VC-{}".format(vca_index
),
4061 task_instantiation_info
[
4063 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
4064 member_vnf_index
or "", vdu_id
or ""
4068 def _create_nslcmop(nsr_id
, operation
, params
):
4070 Creates a ns-lcm-opp content to be stored at database.
4071 :param nsr_id: internal id of the instance
4072 :param operation: instantiate, terminate, scale, action, ...
4073 :param params: user parameters for the operation
4074 :return: dictionary following SOL005 format
4076 # Raise exception if invalid arguments
4077 if not (nsr_id
and operation
and params
):
4079 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
4086 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
4087 "operationState": "PROCESSING",
4088 "statusEnteredTime": now
,
4089 "nsInstanceId": nsr_id
,
4090 "lcmOperationType": operation
,
4092 "isAutomaticInvocation": False,
4093 "operationParams": params
,
4094 "isCancelPending": False,
4096 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
4097 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
4102 def _format_additional_params(self
, params
):
4103 params
= params
or {}
4104 for key
, value
in params
.items():
4105 if str(value
).startswith("!!yaml "):
4106 params
[key
] = yaml
.safe_load(value
[7:])
4109 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
4110 primitive
= seq
.get("name")
4111 primitive_params
= {}
4113 "member_vnf_index": vnf_index
,
4114 "primitive": primitive
,
4115 "primitive_params": primitive_params
,
4118 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
4122 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
4123 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
4124 if op
.get("operationState") == "COMPLETED":
4125 # b. Skip sub-operation
4126 # _ns_execute_primitive() or RO.create_action() will NOT be executed
4127 return self
.SUBOPERATION_STATUS_SKIP
4129 # c. retry executing sub-operation
4130 # The sub-operation exists, and operationState != 'COMPLETED'
4131 # Update operationState = 'PROCESSING' to indicate a retry.
4132 operationState
= "PROCESSING"
4133 detailed_status
= "In progress"
4134 self
._update
_suboperation
_status
(
4135 db_nslcmop
, op_index
, operationState
, detailed_status
4137 # Return the sub-operation index
4138 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4139 # with arguments extracted from the sub-operation
4142 # Find a sub-operation where all keys in a matching dictionary must match
4143 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4144 def _find_suboperation(self
, db_nslcmop
, match
):
4145 if db_nslcmop
and match
:
4146 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4147 for i
, op
in enumerate(op_list
):
4148 if all(op
.get(k
) == match
[k
] for k
in match
):
4150 return self
.SUBOPERATION_STATUS_NOT_FOUND
4152 # Update status for a sub-operation given its index
4153 def _update_suboperation_status(
4154 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4156 # Update DB for HA tasks
4157 q_filter
= {"_id": db_nslcmop
["_id"]}
4159 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4160 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4163 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4166 # Add sub-operation, return the index of the added sub-operation
4167 # Optionally, set operationState, detailed-status, and operationType
4168 # Status and type are currently set for 'scale' sub-operations:
4169 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4170 # 'detailed-status' : status message
4171 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4172 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4173 def _add_suboperation(
4181 mapped_primitive_params
,
4182 operationState
=None,
4183 detailed_status
=None,
4186 RO_scaling_info
=None,
4189 return self
.SUBOPERATION_STATUS_NOT_FOUND
4190 # Get the "_admin.operations" list, if it exists
4191 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4192 op_list
= db_nslcmop_admin
.get("operations")
4193 # Create or append to the "_admin.operations" list
4195 "member_vnf_index": vnf_index
,
4197 "vdu_count_index": vdu_count_index
,
4198 "primitive": primitive
,
4199 "primitive_params": mapped_primitive_params
,
4202 new_op
["operationState"] = operationState
4204 new_op
["detailed-status"] = detailed_status
4206 new_op
["lcmOperationType"] = operationType
4208 new_op
["RO_nsr_id"] = RO_nsr_id
4210 new_op
["RO_scaling_info"] = RO_scaling_info
4212 # No existing operations, create key 'operations' with current operation as first list element
4213 db_nslcmop_admin
.update({"operations": [new_op
]})
4214 op_list
= db_nslcmop_admin
.get("operations")
4216 # Existing operations, append operation to list
4217 op_list
.append(new_op
)
4219 db_nslcmop_update
= {"_admin.operations": op_list
}
4220 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4221 op_index
= len(op_list
) - 1
4224 # Helper methods for scale() sub-operations
4226 # pre-scale/post-scale:
4227 # Check for 3 different cases:
4228 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4229 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4230 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4231 def _check_or_add_scale_suboperation(
4235 vnf_config_primitive
,
4239 RO_scaling_info
=None,
4241 # Find this sub-operation
4242 if RO_nsr_id
and RO_scaling_info
:
4243 operationType
= "SCALE-RO"
4245 "member_vnf_index": vnf_index
,
4246 "RO_nsr_id": RO_nsr_id
,
4247 "RO_scaling_info": RO_scaling_info
,
4251 "member_vnf_index": vnf_index
,
4252 "primitive": vnf_config_primitive
,
4253 "primitive_params": primitive_params
,
4254 "lcmOperationType": operationType
,
4256 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4257 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4258 # a. New sub-operation
4259 # The sub-operation does not exist, add it.
4260 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4261 # The following parameters are set to None for all kind of scaling:
4263 vdu_count_index
= None
4265 if RO_nsr_id
and RO_scaling_info
:
4266 vnf_config_primitive
= None
4267 primitive_params
= None
4270 RO_scaling_info
= None
4271 # Initial status for sub-operation
4272 operationState
= "PROCESSING"
4273 detailed_status
= "In progress"
4274 # Add sub-operation for pre/post-scaling (zero or more operations)
4275 self
._add
_suboperation
(
4281 vnf_config_primitive
,
4289 return self
.SUBOPERATION_STATUS_NEW
4291 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4292 # or op_index (operationState != 'COMPLETED')
4293 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4295 # Function to return execution_environment id
4297 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4298 # TODO vdu_index_count
4299 for vca
in vca_deployed_list
:
4300 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4303 async def destroy_N2VC(
4311 exec_primitives
=True,
4316 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4317 :param logging_text:
4319 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4320 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4321 :param vca_index: index in the database _admin.deployed.VCA
4322 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4323 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4324 not executed properly
4325 :param scaling_in: True destroys the application, False destroys the model
4326 :return: None or exception
4331 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4332 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4336 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4338 # execute terminate_primitives
4340 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4341 config_descriptor
.get("terminate-config-primitive"),
4342 vca_deployed
.get("ee_descriptor_id"),
4344 vdu_id
= vca_deployed
.get("vdu_id")
4345 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4346 vdu_name
= vca_deployed
.get("vdu_name")
4347 vnf_index
= vca_deployed
.get("member-vnf-index")
4348 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4349 for seq
in terminate_primitives
:
4350 # For each sequence in list, get primitive and call _ns_execute_primitive()
4351 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4352 vnf_index
, seq
.get("name")
4354 self
.logger
.debug(logging_text
+ step
)
4355 # Create the primitive for each sequence, i.e. "primitive": "touch"
4356 primitive
= seq
.get("name")
4357 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4362 self
._add
_suboperation
(
4369 mapped_primitive_params
,
4371 # Sub-operations: Call _ns_execute_primitive() instead of action()
4373 result
, result_detail
= await self
._ns
_execute
_primitive
(
4374 vca_deployed
["ee_id"],
4376 mapped_primitive_params
,
4380 except LcmException
:
4381 # this happens when VCA is not deployed. In this case it is not needed to terminate
4383 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4384 if result
not in result_ok
:
4386 "terminate_primitive {} for vnf_member_index={} fails with "
4387 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4389 # set that this VCA do not need terminated
4390 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4394 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4397 # Delete Prometheus Jobs if any
4398 # This uses NSR_ID, so it will destroy any jobs under this index
4399 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4402 await self
.vca_map
[vca_type
].delete_execution_environment(
4403 vca_deployed
["ee_id"],
4404 scaling_in
=scaling_in
,
4409 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4410 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4411 namespace
= "." + db_nsr
["_id"]
4413 await self
.n2vc
.delete_namespace(
4414 namespace
=namespace
,
4415 total_timeout
=self
.timeout
.charm_delete
,
4418 except N2VCNotFound
: # already deleted. Skip
4420 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4422 async def terminate(self
, nsr_id
, nslcmop_id
):
4423 # Try to lock HA task here
4424 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4425 if not task_is_locked_by_me
:
4428 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4429 self
.logger
.debug(logging_text
+ "Enter")
4430 timeout_ns_terminate
= self
.timeout
.ns_terminate
4433 operation_params
= None
4435 error_list
= [] # annotates all failed error messages
4436 db_nslcmop_update
= {}
4437 autoremove
= False # autoremove after terminated
4438 tasks_dict_info
= {}
4441 "Stage 1/3: Preparing task.",
4442 "Waiting for previous operations to terminate.",
4445 # ^ contains [stage, step, VIM-status]
4447 # wait for any previous tasks in process
4448 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4450 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4451 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4452 operation_params
= db_nslcmop
.get("operationParams") or {}
4453 if operation_params
.get("timeout_ns_terminate"):
4454 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4455 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4456 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4458 db_nsr_update
["operational-status"] = "terminating"
4459 db_nsr_update
["config-status"] = "terminating"
4460 self
._write
_ns
_status
(
4462 ns_state
="TERMINATING",
4463 current_operation
="TERMINATING",
4464 current_operation_id
=nslcmop_id
,
4465 other_update
=db_nsr_update
,
4467 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4468 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4469 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4472 stage
[1] = "Getting vnf descriptors from db."
4473 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4475 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4477 db_vnfds_from_id
= {}
4478 db_vnfds_from_member_index
= {}
4480 for vnfr
in db_vnfrs_list
:
4481 vnfd_id
= vnfr
["vnfd-id"]
4482 if vnfd_id
not in db_vnfds_from_id
:
4483 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4484 db_vnfds_from_id
[vnfd_id
] = vnfd
4485 db_vnfds_from_member_index
[
4486 vnfr
["member-vnf-index-ref"]
4487 ] = db_vnfds_from_id
[vnfd_id
]
4489 # Destroy individual execution environments when there are terminating primitives.
4490 # Rest of EE will be deleted at once
4491 # TODO - check before calling _destroy_N2VC
4492 # if not operation_params.get("skip_terminate_primitives"):#
4493 # or not vca.get("needed_terminate"):
4494 stage
[0] = "Stage 2/3 execute terminating primitives."
4495 self
.logger
.debug(logging_text
+ stage
[0])
4496 stage
[1] = "Looking execution environment that needs terminate."
4497 self
.logger
.debug(logging_text
+ stage
[1])
4499 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4500 config_descriptor
= None
4501 vca_member_vnf_index
= vca
.get("member-vnf-index")
4502 vca_id
= self
.get_vca_id(
4503 db_vnfrs_dict
.get(vca_member_vnf_index
)
4504 if vca_member_vnf_index
4508 if not vca
or not vca
.get("ee_id"):
4510 if not vca
.get("member-vnf-index"):
4512 config_descriptor
= db_nsr
.get("ns-configuration")
4513 elif vca
.get("vdu_id"):
4514 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4515 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4516 elif vca
.get("kdu_name"):
4517 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4518 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4520 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4521 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4522 vca_type
= vca
.get("type")
4523 exec_terminate_primitives
= not operation_params
.get(
4524 "skip_terminate_primitives"
4525 ) and vca
.get("needed_terminate")
4526 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4527 # pending native charms
4529 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4531 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4532 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4533 task
= asyncio
.ensure_future(
4541 exec_terminate_primitives
,
4545 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4547 # wait for pending tasks of terminate primitives
4551 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4553 error_list
= await self
._wait
_for
_tasks
(
4556 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4560 tasks_dict_info
.clear()
4562 return # raise LcmException("; ".join(error_list))
4564 # remove All execution environments at once
4565 stage
[0] = "Stage 3/3 delete all."
4567 if nsr_deployed
.get("VCA"):
4568 stage
[1] = "Deleting all execution environments."
4569 self
.logger
.debug(logging_text
+ stage
[1])
4570 vca_id
= self
.get_vca_id({}, db_nsr
)
4571 task_delete_ee
= asyncio
.ensure_future(
4573 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4574 timeout
=self
.timeout
.charm_delete
,
4577 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4578 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4580 # Delete Namespace and Certificates if necessary
4581 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4582 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4583 certificate_name
=db_nslcmop
["nsInstanceId"],
4585 # TODO: Delete namespace
4587 # Delete from k8scluster
4588 stage
[1] = "Deleting KDUs."
4589 self
.logger
.debug(logging_text
+ stage
[1])
4590 # print(nsr_deployed)
4591 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4592 if not kdu
or not kdu
.get("kdu-instance"):
4594 kdu_instance
= kdu
.get("kdu-instance")
4595 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4596 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4597 vca_id
= self
.get_vca_id({}, db_nsr
)
4598 task_delete_kdu_instance
= asyncio
.ensure_future(
4599 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4600 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4601 kdu_instance
=kdu_instance
,
4603 namespace
=kdu
.get("namespace"),
4609 + "Unknown k8s deployment type {}".format(
4610 kdu
.get("k8scluster-type")
4615 task_delete_kdu_instance
4616 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4619 stage
[1] = "Deleting ns from VIM."
4620 if self
.ro_config
.ng
:
4621 task_delete_ro
= asyncio
.ensure_future(
4622 self
._terminate
_ng
_ro
(
4623 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4626 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4628 # rest of staff will be done at finally
4631 ROclient
.ROClientException
,
4636 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4638 except asyncio
.CancelledError
:
4640 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4642 exc
= "Operation was cancelled"
4643 except Exception as e
:
4644 exc
= traceback
.format_exc()
4645 self
.logger
.critical(
4646 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4651 error_list
.append(str(exc
))
4653 # wait for pending tasks
4655 stage
[1] = "Waiting for terminate pending tasks."
4656 self
.logger
.debug(logging_text
+ stage
[1])
4657 error_list
+= await self
._wait
_for
_tasks
(
4660 timeout_ns_terminate
,
4664 stage
[1] = stage
[2] = ""
4665 except asyncio
.CancelledError
:
4666 error_list
.append("Cancelled")
4667 # TODO cancell all tasks
4668 except Exception as exc
:
4669 error_list
.append(str(exc
))
4670 # update status at database
4672 error_detail
= "; ".join(error_list
)
4673 # self.logger.error(logging_text + error_detail)
4674 error_description_nslcmop
= "{} Detail: {}".format(
4675 stage
[0], error_detail
4677 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4678 nslcmop_id
, stage
[0]
4681 db_nsr_update
["operational-status"] = "failed"
4682 db_nsr_update
["detailed-status"] = (
4683 error_description_nsr
+ " Detail: " + error_detail
4685 db_nslcmop_update
["detailed-status"] = error_detail
4686 nslcmop_operation_state
= "FAILED"
4690 error_description_nsr
= error_description_nslcmop
= None
4691 ns_state
= "NOT_INSTANTIATED"
4692 db_nsr_update
["operational-status"] = "terminated"
4693 db_nsr_update
["detailed-status"] = "Done"
4694 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4695 db_nslcmop_update
["detailed-status"] = "Done"
4696 nslcmop_operation_state
= "COMPLETED"
4699 self
._write
_ns
_status
(
4702 current_operation
="IDLE",
4703 current_operation_id
=None,
4704 error_description
=error_description_nsr
,
4705 error_detail
=error_detail
,
4706 other_update
=db_nsr_update
,
4708 self
._write
_op
_status
(
4711 error_message
=error_description_nslcmop
,
4712 operation_state
=nslcmop_operation_state
,
4713 other_update
=db_nslcmop_update
,
4715 if ns_state
== "NOT_INSTANTIATED":
4719 {"nsr-id-ref": nsr_id
},
4720 {"_admin.nsState": "NOT_INSTANTIATED"},
4722 except DbException
as e
:
4725 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4729 if operation_params
:
4730 autoremove
= operation_params
.get("autoremove", False)
4731 if nslcmop_operation_state
:
4733 await self
.msg
.aiowrite(
4738 "nslcmop_id": nslcmop_id
,
4739 "operationState": nslcmop_operation_state
,
4740 "autoremove": autoremove
,
4744 except Exception as e
:
4746 logging_text
+ "kafka_write notification Exception {}".format(e
)
4748 self
.logger
.debug(f
"Deleting alerts: ns_id={nsr_id}")
4749 self
.db
.del_list("alerts", {"tags.ns_id": nsr_id
})
4751 self
.logger
.debug(logging_text
+ "Exit")
4752 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4754 async def _wait_for_tasks(
4755 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4758 error_detail_list
= []
4760 pending_tasks
= list(created_tasks_info
.keys())
4761 num_tasks
= len(pending_tasks
)
4763 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4764 self
._write
_op
_status
(nslcmop_id
, stage
)
4765 while pending_tasks
:
4767 _timeout
= timeout
+ time_start
- time()
4768 done
, pending_tasks
= await asyncio
.wait(
4769 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4771 num_done
+= len(done
)
4772 if not done
: # Timeout
4773 for task
in pending_tasks
:
4774 new_error
= created_tasks_info
[task
] + ": Timeout"
4775 error_detail_list
.append(new_error
)
4776 error_list
.append(new_error
)
4779 if task
.cancelled():
4782 exc
= task
.exception()
4784 if isinstance(exc
, asyncio
.TimeoutError
):
4786 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4787 error_list
.append(created_tasks_info
[task
])
4788 error_detail_list
.append(new_error
)
4795 ROclient
.ROClientException
,
4801 self
.logger
.error(logging_text
+ new_error
)
4803 exc_traceback
= "".join(
4804 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4808 + created_tasks_info
[task
]
4814 logging_text
+ created_tasks_info
[task
] + ": Done"
4816 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4818 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4819 if nsr_id
: # update also nsr
4824 "errorDescription": "Error at: " + ", ".join(error_list
),
4825 "errorDetail": ". ".join(error_detail_list
),
4828 self
._write
_op
_status
(nslcmop_id
, stage
)
4829 return error_detail_list
4832 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4834 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4835 The default-value is used. If it is between < > it look for a value at instantiation_params
4836 :param primitive_desc: portion of VNFD/NSD that describes primitive
4837 :param params: Params provided by user
4838 :param instantiation_params: Instantiation params provided by user
4839 :return: a dictionary with the calculated params
4841 calculated_params
= {}
4842 for parameter
in primitive_desc
.get("parameter", ()):
4843 param_name
= parameter
["name"]
4844 if param_name
in params
:
4845 calculated_params
[param_name
] = params
[param_name
]
4846 elif "default-value" in parameter
or "value" in parameter
:
4847 if "value" in parameter
:
4848 calculated_params
[param_name
] = parameter
["value"]
4850 calculated_params
[param_name
] = parameter
["default-value"]
4852 isinstance(calculated_params
[param_name
], str)
4853 and calculated_params
[param_name
].startswith("<")
4854 and calculated_params
[param_name
].endswith(">")
4856 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4857 calculated_params
[param_name
] = instantiation_params
[
4858 calculated_params
[param_name
][1:-1]
4862 "Parameter {} needed to execute primitive {} not provided".format(
4863 calculated_params
[param_name
], primitive_desc
["name"]
4868 "Parameter {} needed to execute primitive {} not provided".format(
4869 param_name
, primitive_desc
["name"]
4873 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4874 calculated_params
[param_name
] = yaml
.safe_dump(
4875 calculated_params
[param_name
], default_flow_style
=True, width
=256
4877 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4879 ].startswith("!!yaml "):
4880 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4881 if parameter
.get("data-type") == "INTEGER":
4883 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4884 except ValueError: # error converting string to int
4886 "Parameter {} of primitive {} must be integer".format(
4887 param_name
, primitive_desc
["name"]
4890 elif parameter
.get("data-type") == "BOOLEAN":
4891 calculated_params
[param_name
] = not (
4892 (str(calculated_params
[param_name
])).lower() == "false"
4895 # add always ns_config_info if primitive name is config
4896 if primitive_desc
["name"] == "config":
4897 if "ns_config_info" in instantiation_params
:
4898 calculated_params
["ns_config_info"] = instantiation_params
[
4901 return calculated_params
4903 def _look_for_deployed_vca(
4910 ee_descriptor_id
=None,
4912 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4913 for vca
in deployed_vca
:
4916 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4919 vdu_count_index
is not None
4920 and vdu_count_index
!= vca
["vdu_count_index"]
4923 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4925 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4929 # vca_deployed not found
4931 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4932 " is not deployed".format(
4941 ee_id
= vca
.get("ee_id")
4943 "type", "lxc_proxy_charm"
4944 ) # default value for backward compatibility - proxy charm
4947 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4948 "execution environment".format(
4949 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4952 return ee_id
, vca_type
4954 async def _ns_execute_primitive(
4960 retries_interval
=30,
4967 if primitive
== "config":
4968 primitive_params
= {"params": primitive_params
}
4970 vca_type
= vca_type
or "lxc_proxy_charm"
4974 output
= await asyncio
.wait_for(
4975 self
.vca_map
[vca_type
].exec_primitive(
4977 primitive_name
=primitive
,
4978 params_dict
=primitive_params
,
4979 progress_timeout
=self
.timeout
.progress_primitive
,
4980 total_timeout
=self
.timeout
.primitive
,
4985 timeout
=timeout
or self
.timeout
.primitive
,
4989 except asyncio
.CancelledError
:
4991 except Exception as e
:
4995 "Error executing action {} on {} -> {}".format(
5000 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5002 if isinstance(e
, asyncio
.TimeoutError
):
5004 message
="Timed out waiting for action to complete"
5006 return "FAILED", getattr(e
, "message", repr(e
))
5008 return "COMPLETED", output
5010 except (LcmException
, asyncio
.CancelledError
):
5012 except Exception as e
:
5013 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5015 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5017 Updating the vca_status with latest juju information in nsrs record
5018 :param: nsr_id: Id of the nsr
5019 :param: nslcmop_id: Id of the nslcmop
5023 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5024 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5025 vca_id
= self
.get_vca_id({}, db_nsr
)
5026 if db_nsr
["_admin"]["deployed"]["K8s"]:
5027 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5028 cluster_uuid
, kdu_instance
, cluster_type
= (
5029 k8s
["k8scluster-uuid"],
5030 k8s
["kdu-instance"],
5031 k8s
["k8scluster-type"],
5033 await self
._on
_update
_k
8s
_db
(
5034 cluster_uuid
=cluster_uuid
,
5035 kdu_instance
=kdu_instance
,
5036 filter={"_id": nsr_id
},
5038 cluster_type
=cluster_type
,
5041 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5042 table
, filter = "nsrs", {"_id": nsr_id
}
5043 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5044 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5046 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5047 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5049 async def action(self
, nsr_id
, nslcmop_id
):
5050 # Try to lock HA task here
5051 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5052 if not task_is_locked_by_me
:
5055 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5056 self
.logger
.debug(logging_text
+ "Enter")
5057 # get all needed from database
5061 db_nslcmop_update
= {}
5062 nslcmop_operation_state
= None
5063 error_description_nslcmop
= None
5067 # wait for any previous tasks in process
5068 step
= "Waiting for previous operations to terminate"
5069 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5071 self
._write
_ns
_status
(
5074 current_operation
="RUNNING ACTION",
5075 current_operation_id
=nslcmop_id
,
5078 step
= "Getting information from database"
5079 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5080 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5081 if db_nslcmop
["operationParams"].get("primitive_params"):
5082 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5083 db_nslcmop
["operationParams"]["primitive_params"]
5086 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5087 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5088 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5089 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5090 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5091 primitive
= db_nslcmop
["operationParams"]["primitive"]
5092 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5093 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5094 "timeout_ns_action", self
.timeout
.primitive
5098 step
= "Getting vnfr from database"
5099 db_vnfr
= self
.db
.get_one(
5100 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5102 if db_vnfr
.get("kdur"):
5104 for kdur
in db_vnfr
["kdur"]:
5105 if kdur
.get("additionalParams"):
5106 kdur
["additionalParams"] = json
.loads(
5107 kdur
["additionalParams"]
5109 kdur_list
.append(kdur
)
5110 db_vnfr
["kdur"] = kdur_list
5111 step
= "Getting vnfd from database"
5112 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5114 # Sync filesystem before running a primitive
5115 self
.fs
.sync(db_vnfr
["vnfd-id"])
5117 step
= "Getting nsd from database"
5118 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5120 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5121 # for backward compatibility
5122 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5123 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5124 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5125 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5127 # look for primitive
5128 config_primitive_desc
= descriptor_configuration
= None
5130 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5132 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5134 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5136 descriptor_configuration
= db_nsd
.get("ns-configuration")
5138 if descriptor_configuration
and descriptor_configuration
.get(
5141 for config_primitive
in descriptor_configuration
["config-primitive"]:
5142 if config_primitive
["name"] == primitive
:
5143 config_primitive_desc
= config_primitive
5146 if not config_primitive_desc
:
5147 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5149 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5153 primitive_name
= primitive
5154 ee_descriptor_id
= None
5156 primitive_name
= config_primitive_desc
.get(
5157 "execution-environment-primitive", primitive
5159 ee_descriptor_id
= config_primitive_desc
.get(
5160 "execution-environment-ref"
5166 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5168 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5171 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5173 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5175 desc_params
= parse_yaml_strings(
5176 db_vnfr
.get("additionalParamsForVnf")
5179 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5180 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5181 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5183 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5184 actions
.add(primitive
["name"])
5185 for primitive
in kdu_configuration
.get("config-primitive", []):
5186 actions
.add(primitive
["name"])
5188 nsr_deployed
["K8s"],
5189 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5190 and kdu
["member-vnf-index"] == vnf_index
,
5194 if primitive_name
in actions
5195 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5199 # TODO check if ns is in a proper status
5201 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5203 # kdur and desc_params already set from before
5204 if primitive_params
:
5205 desc_params
.update(primitive_params
)
5206 # TODO Check if we will need something at vnf level
5207 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5209 kdu_name
== kdu
["kdu-name"]
5210 and kdu
["member-vnf-index"] == vnf_index
5215 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5218 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5219 msg
= "unknown k8scluster-type '{}'".format(
5220 kdu
.get("k8scluster-type")
5222 raise LcmException(msg
)
5225 "collection": "nsrs",
5226 "filter": {"_id": nsr_id
},
5227 "path": "_admin.deployed.K8s.{}".format(index
),
5231 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5233 step
= "Executing kdu {}".format(primitive_name
)
5234 if primitive_name
== "upgrade":
5235 if desc_params
.get("kdu_model"):
5236 kdu_model
= desc_params
.get("kdu_model")
5237 del desc_params
["kdu_model"]
5239 kdu_model
= kdu
.get("kdu-model")
5240 if kdu_model
.count("/") < 2: # helm chart is not embedded
5241 parts
= kdu_model
.split(sep
=":")
5243 kdu_model
= parts
[0]
5244 if desc_params
.get("kdu_atomic_upgrade"):
5245 atomic_upgrade
= desc_params
.get(
5246 "kdu_atomic_upgrade"
5247 ).lower() in ("yes", "true", "1")
5248 del desc_params
["kdu_atomic_upgrade"]
5250 atomic_upgrade
= True
5252 detailed_status
= await asyncio
.wait_for(
5253 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5254 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5255 kdu_instance
=kdu
.get("kdu-instance"),
5256 atomic
=atomic_upgrade
,
5257 kdu_model
=kdu_model
,
5260 timeout
=timeout_ns_action
,
5262 timeout
=timeout_ns_action
+ 10,
5265 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5267 elif primitive_name
== "rollback":
5268 detailed_status
= await asyncio
.wait_for(
5269 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5270 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5271 kdu_instance
=kdu
.get("kdu-instance"),
5274 timeout
=timeout_ns_action
,
5276 elif primitive_name
== "status":
5277 detailed_status
= await asyncio
.wait_for(
5278 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5279 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5280 kdu_instance
=kdu
.get("kdu-instance"),
5283 timeout
=timeout_ns_action
,
5286 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5287 kdu
["kdu-name"], nsr_id
5289 params
= self
._map
_primitive
_params
(
5290 config_primitive_desc
, primitive_params
, desc_params
5293 detailed_status
= await asyncio
.wait_for(
5294 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5295 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5296 kdu_instance
=kdu_instance
,
5297 primitive_name
=primitive_name
,
5300 timeout
=timeout_ns_action
,
5303 timeout
=timeout_ns_action
,
5307 nslcmop_operation_state
= "COMPLETED"
5309 detailed_status
= ""
5310 nslcmop_operation_state
= "FAILED"
5312 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5313 nsr_deployed
["VCA"],
5314 member_vnf_index
=vnf_index
,
5316 vdu_count_index
=vdu_count_index
,
5317 ee_descriptor_id
=ee_descriptor_id
,
5319 for vca_index
, vca_deployed
in enumerate(
5320 db_nsr
["_admin"]["deployed"]["VCA"]
5322 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5324 "collection": "nsrs",
5325 "filter": {"_id": nsr_id
},
5326 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5330 nslcmop_operation_state
,
5332 ) = await self
._ns
_execute
_primitive
(
5334 primitive
=primitive_name
,
5335 primitive_params
=self
._map
_primitive
_params
(
5336 config_primitive_desc
, primitive_params
, desc_params
5338 timeout
=timeout_ns_action
,
5344 db_nslcmop_update
["detailed-status"] = detailed_status
5345 error_description_nslcmop
= (
5346 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5350 + "Done with result {} {}".format(
5351 nslcmop_operation_state
, detailed_status
5354 return # database update is called inside finally
5356 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5357 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5359 except asyncio
.CancelledError
:
5361 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5363 exc
= "Operation was cancelled"
5364 except asyncio
.TimeoutError
:
5365 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5367 except Exception as e
:
5368 exc
= traceback
.format_exc()
5369 self
.logger
.critical(
5370 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5379 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5380 nslcmop_operation_state
= "FAILED"
5382 self
._write
_ns
_status
(
5386 ], # TODO check if degraded. For the moment use previous status
5387 current_operation
="IDLE",
5388 current_operation_id
=None,
5389 # error_description=error_description_nsr,
5390 # error_detail=error_detail,
5391 other_update
=db_nsr_update
,
5394 self
._write
_op
_status
(
5397 error_message
=error_description_nslcmop
,
5398 operation_state
=nslcmop_operation_state
,
5399 other_update
=db_nslcmop_update
,
5402 if nslcmop_operation_state
:
5404 await self
.msg
.aiowrite(
5409 "nslcmop_id": nslcmop_id
,
5410 "operationState": nslcmop_operation_state
,
5414 except Exception as e
:
5416 logging_text
+ "kafka_write notification Exception {}".format(e
)
5418 self
.logger
.debug(logging_text
+ "Exit")
5419 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5420 return nslcmop_operation_state
, detailed_status
5422 async def terminate_vdus(
5423 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5425 """This method terminates VDUs
5428 db_vnfr: VNF instance record
5429 member_vnf_index: VNF index to identify the VDUs to be removed
5430 db_nsr: NS instance record
5431 update_db_nslcmops: Nslcmop update record
5433 vca_scaling_info
= []
5434 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5435 scaling_info
["scaling_direction"] = "IN"
5436 scaling_info
["vdu-delete"] = {}
5437 scaling_info
["kdu-delete"] = {}
5438 db_vdur
= db_vnfr
.get("vdur")
5439 vdur_list
= copy(db_vdur
)
5441 for index
, vdu
in enumerate(vdur_list
):
5442 vca_scaling_info
.append(
5444 "osm_vdu_id": vdu
["vdu-id-ref"],
5445 "member-vnf-index": member_vnf_index
,
5447 "vdu_index": count_index
,
5450 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5451 scaling_info
["vdu"].append(
5453 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5454 "vdu_id": vdu
["vdu-id-ref"],
5458 for interface
in vdu
["interfaces"]:
5459 scaling_info
["vdu"][index
]["interface"].append(
5461 "name": interface
["name"],
5462 "ip_address": interface
["ip-address"],
5463 "mac_address": interface
.get("mac-address"),
5466 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5467 stage
[2] = "Terminating VDUs"
5468 if scaling_info
.get("vdu-delete"):
5469 # scale_process = "RO"
5470 if self
.ro_config
.ng
:
5471 await self
._scale
_ng
_ro
(
5480 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5481 """This method is to Remove VNF instances from NS.
5484 nsr_id: NS instance id
5485 nslcmop_id: nslcmop id of update
5486 vnf_instance_id: id of the VNF instance to be removed
5489 result: (str, str) COMPLETED/FAILED, details
5493 logging_text
= "Task ns={} update ".format(nsr_id
)
5494 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5495 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5496 if check_vnfr_count
> 1:
5497 stage
= ["", "", ""]
5498 step
= "Getting nslcmop from database"
5500 step
+ " after having waited for previous tasks to be completed"
5502 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5503 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5504 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5505 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5506 """ db_vnfr = self.db.get_one(
5507 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5509 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5510 await self
.terminate_vdus(
5519 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5520 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5521 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5522 "constituent-vnfr-ref"
5524 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5525 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5526 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5527 return "COMPLETED", "Done"
5529 step
= "Terminate VNF Failed with"
5531 "{} Cannot terminate the last VNF in this NS.".format(
5535 except (LcmException
, asyncio
.CancelledError
):
5537 except Exception as e
:
5538 self
.logger
.debug("Error removing VNF {}".format(e
))
5539 return "FAILED", "Error removing VNF {}".format(e
)
5541 async def _ns_redeploy_vnf(
5549 """This method updates and redeploys VNF instances
5552 nsr_id: NS instance id
5553 nslcmop_id: nslcmop id
5554 db_vnfd: VNF descriptor
5555 db_vnfr: VNF instance record
5556 db_nsr: NS instance record
5559 result: (str, str) COMPLETED/FAILED, details
5563 stage
= ["", "", ""]
5564 logging_text
= "Task ns={} update ".format(nsr_id
)
5565 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5566 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5568 # Terminate old VNF resources
5569 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5570 await self
.terminate_vdus(
5579 # old_vnfd_id = db_vnfr["vnfd-id"]
5580 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5581 new_db_vnfd
= db_vnfd
5582 # new_vnfd_ref = new_db_vnfd["id"]
5583 # new_vnfd_id = vnfd_id
5587 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5589 "name": cp
.get("id"),
5590 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5591 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5594 new_vnfr_cp
.append(vnf_cp
)
5595 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5596 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5597 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5599 "revision": latest_vnfd_revision
,
5600 "connection-point": new_vnfr_cp
,
5604 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5605 updated_db_vnfr
= self
.db
.get_one(
5607 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5610 # Instantiate new VNF resources
5611 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5612 vca_scaling_info
= []
5613 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5614 scaling_info
["scaling_direction"] = "OUT"
5615 scaling_info
["vdu-create"] = {}
5616 scaling_info
["kdu-create"] = {}
5617 vdud_instantiate_list
= db_vnfd
["vdu"]
5618 for index
, vdud
in enumerate(vdud_instantiate_list
):
5619 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5621 additional_params
= (
5622 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5625 cloud_init_list
= []
5627 # TODO Information of its own ip is not available because db_vnfr is not updated.
5628 additional_params
["OSM"] = get_osm_params(
5629 updated_db_vnfr
, vdud
["id"], 1
5631 cloud_init_list
.append(
5632 self
._parse
_cloud
_init
(
5639 vca_scaling_info
.append(
5641 "osm_vdu_id": vdud
["id"],
5642 "member-vnf-index": member_vnf_index
,
5644 "vdu_index": count_index
,
5647 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5648 if self
.ro_config
.ng
:
5650 "New Resources to be deployed: {}".format(scaling_info
)
5652 await self
._scale
_ng
_ro
(
5660 return "COMPLETED", "Done"
5661 except (LcmException
, asyncio
.CancelledError
):
5663 except Exception as e
:
5664 self
.logger
.debug("Error updating VNF {}".format(e
))
5665 return "FAILED", "Error updating VNF {}".format(e
)
5667 async def _ns_charm_upgrade(
5673 timeout
: float = None,
5675 """This method upgrade charms in VNF instances
5678 ee_id: Execution environment id
5679 path: Local path to the charm
5681 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5682 timeout: (Float) Timeout for the ns update operation
5685 result: (str, str) COMPLETED/FAILED, details
5688 charm_type
= charm_type
or "lxc_proxy_charm"
5689 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5693 charm_type
=charm_type
,
5694 timeout
=timeout
or self
.timeout
.ns_update
,
5698 return "COMPLETED", output
5700 except (LcmException
, asyncio
.CancelledError
):
5703 except Exception as e
:
5704 self
.logger
.debug("Error upgrading charm {}".format(path
))
5706 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5708 async def update(self
, nsr_id
, nslcmop_id
):
5709 """Update NS according to different update types
5711 This method performs upgrade of VNF instances then updates the revision
5712 number in VNF record
5715 nsr_id: Network service will be updated
5716 nslcmop_id: ns lcm operation id
5719 It may raise DbException, LcmException, N2VCException, K8sException
5722 # Try to lock HA task here
5723 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5724 if not task_is_locked_by_me
:
5727 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5728 self
.logger
.debug(logging_text
+ "Enter")
5730 # Set the required variables to be filled up later
5732 db_nslcmop_update
= {}
5734 nslcmop_operation_state
= None
5736 error_description_nslcmop
= ""
5738 change_type
= "updated"
5739 detailed_status
= ""
5740 member_vnf_index
= None
5743 # wait for any previous tasks in process
5744 step
= "Waiting for previous operations to terminate"
5745 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5746 self
._write
_ns
_status
(
5749 current_operation
="UPDATING",
5750 current_operation_id
=nslcmop_id
,
5753 step
= "Getting nslcmop from database"
5754 db_nslcmop
= self
.db
.get_one(
5755 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5757 update_type
= db_nslcmop
["operationParams"]["updateType"]
5759 step
= "Getting nsr from database"
5760 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5761 old_operational_status
= db_nsr
["operational-status"]
5762 db_nsr_update
["operational-status"] = "updating"
5763 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5764 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5766 if update_type
== "CHANGE_VNFPKG":
5767 # Get the input parameters given through update request
5768 vnf_instance_id
= db_nslcmop
["operationParams"][
5769 "changeVnfPackageData"
5770 ].get("vnfInstanceId")
5772 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5775 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5777 step
= "Getting vnfr from database"
5778 db_vnfr
= self
.db
.get_one(
5779 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5782 step
= "Getting vnfds from database"
5784 latest_vnfd
= self
.db
.get_one(
5785 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5787 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5790 current_vnf_revision
= db_vnfr
.get("revision", 1)
5791 current_vnfd
= self
.db
.get_one(
5793 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5794 fail_on_empty
=False,
5796 # Charm artifact paths will be filled up later
5798 current_charm_artifact_path
,
5799 target_charm_artifact_path
,
5800 charm_artifact_paths
,
5802 ) = ([], [], [], [])
5804 step
= "Checking if revision has changed in VNFD"
5805 if current_vnf_revision
!= latest_vnfd_revision
:
5806 change_type
= "policy_updated"
5808 # There is new revision of VNFD, update operation is required
5809 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5810 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5812 step
= "Removing the VNFD packages if they exist in the local path"
5813 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5814 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5816 step
= "Get the VNFD packages from FSMongo"
5817 self
.fs
.sync(from_path
=latest_vnfd_path
)
5818 self
.fs
.sync(from_path
=current_vnfd_path
)
5821 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5823 current_base_folder
= current_vnfd
["_admin"]["storage"]
5824 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5826 for vca_index
, vca_deployed
in enumerate(
5827 get_iterable(nsr_deployed
, "VCA")
5829 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5831 # Getting charm-id and charm-type
5832 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5833 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5834 vca_type
= vca_deployed
.get("type")
5835 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5838 ee_id
= vca_deployed
.get("ee_id")
5840 step
= "Getting descriptor config"
5841 if current_vnfd
.get("kdu"):
5842 search_key
= "kdu_name"
5844 search_key
= "vnfd_id"
5846 entity_id
= vca_deployed
.get(search_key
)
5848 descriptor_config
= get_configuration(
5849 current_vnfd
, entity_id
5852 if "execution-environment-list" in descriptor_config
:
5853 ee_list
= descriptor_config
.get(
5854 "execution-environment-list", []
5859 # There could be several charm used in the same VNF
5860 for ee_item
in ee_list
:
5861 if ee_item
.get("juju"):
5862 step
= "Getting charm name"
5863 charm_name
= ee_item
["juju"].get("charm")
5865 step
= "Setting Charm artifact paths"
5866 current_charm_artifact_path
.append(
5867 get_charm_artifact_path(
5868 current_base_folder
,
5871 current_vnf_revision
,
5874 target_charm_artifact_path
.append(
5875 get_charm_artifact_path(
5879 latest_vnfd_revision
,
5882 elif ee_item
.get("helm-chart"):
5883 # add chart to list and all parameters
5884 step
= "Getting helm chart name"
5885 chart_name
= ee_item
.get("helm-chart")
5887 ee_item
.get("helm-version")
5888 and ee_item
.get("helm-version") == "v2"
5892 vca_type
= "helm-v3"
5893 step
= "Setting Helm chart artifact paths"
5895 helm_artifacts
.append(
5897 "current_artifact_path": get_charm_artifact_path(
5898 current_base_folder
,
5901 current_vnf_revision
,
5903 "target_artifact_path": get_charm_artifact_path(
5907 latest_vnfd_revision
,
5910 "vca_index": vca_index
,
5911 "vdu_index": vdu_count_index
,
5915 charm_artifact_paths
= zip(
5916 current_charm_artifact_path
, target_charm_artifact_path
5919 step
= "Checking if software version has changed in VNFD"
5920 if find_software_version(current_vnfd
) != find_software_version(
5923 step
= "Checking if existing VNF has charm"
5924 for current_charm_path
, target_charm_path
in list(
5925 charm_artifact_paths
5927 if current_charm_path
:
5929 "Software version change is not supported as VNF instance {} has charm.".format(
5934 # There is no change in the charm package, then redeploy the VNF
5935 # based on new descriptor
5936 step
= "Redeploying VNF"
5937 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5938 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5939 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5941 if result
== "FAILED":
5942 nslcmop_operation_state
= result
5943 error_description_nslcmop
= detailed_status
5944 db_nslcmop_update
["detailed-status"] = detailed_status
5947 + " step {} Done with result {} {}".format(
5948 step
, nslcmop_operation_state
, detailed_status
5953 step
= "Checking if any charm package has changed or not"
5954 for current_charm_path
, target_charm_path
in list(
5955 charm_artifact_paths
5959 and target_charm_path
5960 and self
.check_charm_hash_changed(
5961 current_charm_path
, target_charm_path
5964 step
= "Checking whether VNF uses juju bundle"
5965 if check_juju_bundle_existence(current_vnfd
):
5967 "Charm upgrade is not supported for the instance which"
5968 " uses juju-bundle: {}".format(
5969 check_juju_bundle_existence(current_vnfd
)
5973 step
= "Upgrading Charm"
5977 ) = await self
._ns
_charm
_upgrade
(
5980 charm_type
=vca_type
,
5981 path
=self
.fs
.path
+ target_charm_path
,
5982 timeout
=timeout_seconds
,
5985 if result
== "FAILED":
5986 nslcmop_operation_state
= result
5987 error_description_nslcmop
= detailed_status
5989 db_nslcmop_update
["detailed-status"] = detailed_status
5992 + " step {} Done with result {} {}".format(
5993 step
, nslcmop_operation_state
, detailed_status
5997 step
= "Updating policies"
5998 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5999 result
= "COMPLETED"
6000 detailed_status
= "Done"
6001 db_nslcmop_update
["detailed-status"] = "Done"
6004 for item
in helm_artifacts
:
6006 item
["current_artifact_path"]
6007 and item
["target_artifact_path"]
6008 and self
.check_charm_hash_changed(
6009 item
["current_artifact_path"],
6010 item
["target_artifact_path"],
6014 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6017 vnfr_id
= db_vnfr
["_id"]
6018 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6020 "collection": "nsrs",
6021 "filter": {"_id": nsr_id
},
6022 "path": db_update_entry
,
6024 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6025 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6026 namespace
=namespace
,
6030 artifact_path
=item
["target_artifact_path"],
6033 vnf_id
= db_vnfr
.get("vnfd-ref")
6034 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6035 self
.logger
.debug("get ssh key block")
6039 ("config-access", "ssh-access", "required"),
6041 # Needed to inject a ssh key
6044 ("config-access", "ssh-access", "default-user"),
6047 "Install configuration Software, getting public ssh key"
6049 pub_key
= await self
.vca_map
[
6051 ].get_ee_ssh_public__key(
6052 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6056 "Insert public key into VM user={} ssh_key={}".format(
6060 self
.logger
.debug(logging_text
+ step
)
6062 # wait for RO (ip-address) Insert pub_key into VM
6063 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6073 initial_config_primitive_list
= config_descriptor
.get(
6074 "initial-config-primitive"
6076 config_primitive
= next(
6079 for p
in initial_config_primitive_list
6080 if p
["name"] == "config"
6084 if not config_primitive
:
6087 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6089 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6090 if db_vnfr
.get("additionalParamsForVnf"):
6091 deploy_params
.update(
6093 db_vnfr
["additionalParamsForVnf"].copy()
6096 primitive_params_
= self
._map
_primitive
_params
(
6097 config_primitive
, {}, deploy_params
6100 step
= "execute primitive '{}' params '{}'".format(
6101 config_primitive
["name"], primitive_params_
6103 self
.logger
.debug(logging_text
+ step
)
6104 await self
.vca_map
[vca_type
].exec_primitive(
6106 primitive_name
=config_primitive
["name"],
6107 params_dict
=primitive_params_
,
6113 step
= "Updating policies"
6114 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6115 detailed_status
= "Done"
6116 db_nslcmop_update
["detailed-status"] = "Done"
6118 # If nslcmop_operation_state is None, so any operation is not failed.
6119 if not nslcmop_operation_state
:
6120 nslcmop_operation_state
= "COMPLETED"
6122 # If update CHANGE_VNFPKG nslcmop_operation is successful
6123 # vnf revision need to be updated
6124 vnfr_update
["revision"] = latest_vnfd_revision
6125 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6129 + " task Done with result {} {}".format(
6130 nslcmop_operation_state
, detailed_status
6133 elif update_type
== "REMOVE_VNF":
6134 # This part is included in https://osm.etsi.org/gerrit/11876
6135 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6136 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6137 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6138 step
= "Removing VNF"
6139 (result
, detailed_status
) = await self
.remove_vnf(
6140 nsr_id
, nslcmop_id
, vnf_instance_id
6142 if result
== "FAILED":
6143 nslcmop_operation_state
= result
6144 error_description_nslcmop
= detailed_status
6145 db_nslcmop_update
["detailed-status"] = detailed_status
6146 change_type
= "vnf_terminated"
6147 if not nslcmop_operation_state
:
6148 nslcmop_operation_state
= "COMPLETED"
6151 + " task Done with result {} {}".format(
6152 nslcmop_operation_state
, detailed_status
6156 elif update_type
== "OPERATE_VNF":
6157 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6160 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6163 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6166 (result
, detailed_status
) = await self
.rebuild_start_stop(
6167 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6169 if result
== "FAILED":
6170 nslcmop_operation_state
= result
6171 error_description_nslcmop
= detailed_status
6172 db_nslcmop_update
["detailed-status"] = detailed_status
6173 if not nslcmop_operation_state
:
6174 nslcmop_operation_state
= "COMPLETED"
6177 + " task Done with result {} {}".format(
6178 nslcmop_operation_state
, detailed_status
6182 # If nslcmop_operation_state is None, so any operation is not failed.
6183 # All operations are executed in overall.
6184 if not nslcmop_operation_state
:
6185 nslcmop_operation_state
= "COMPLETED"
6186 db_nsr_update
["operational-status"] = old_operational_status
6188 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6189 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6191 except asyncio
.CancelledError
:
6193 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6195 exc
= "Operation was cancelled"
6196 except asyncio
.TimeoutError
:
6197 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6199 except Exception as e
:
6200 exc
= traceback
.format_exc()
6201 self
.logger
.critical(
6202 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6211 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6212 nslcmop_operation_state
= "FAILED"
6213 db_nsr_update
["operational-status"] = old_operational_status
6215 self
._write
_ns
_status
(
6217 ns_state
=db_nsr
["nsState"],
6218 current_operation
="IDLE",
6219 current_operation_id
=None,
6220 other_update
=db_nsr_update
,
6223 self
._write
_op
_status
(
6226 error_message
=error_description_nslcmop
,
6227 operation_state
=nslcmop_operation_state
,
6228 other_update
=db_nslcmop_update
,
6231 if nslcmop_operation_state
:
6235 "nslcmop_id": nslcmop_id
,
6236 "operationState": nslcmop_operation_state
,
6239 change_type
in ("vnf_terminated", "policy_updated")
6240 and member_vnf_index
6242 msg
.update({"vnf_member_index": member_vnf_index
})
6243 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6244 except Exception as e
:
6246 logging_text
+ "kafka_write notification Exception {}".format(e
)
6248 self
.logger
.debug(logging_text
+ "Exit")
6249 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6250 return nslcmop_operation_state
, detailed_status
6252 async def scale(self
, nsr_id
, nslcmop_id
):
6253 # Try to lock HA task here
6254 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6255 if not task_is_locked_by_me
:
6258 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6259 stage
= ["", "", ""]
6260 tasks_dict_info
= {}
6261 # ^ stage, step, VIM progress
6262 self
.logger
.debug(logging_text
+ "Enter")
6263 # get all needed from database
6265 db_nslcmop_update
= {}
6268 # in case of error, indicates what part of scale was failed to put nsr at error status
6269 scale_process
= None
6270 old_operational_status
= ""
6271 old_config_status
= ""
6274 # wait for any previous tasks in process
6275 step
= "Waiting for previous operations to terminate"
6276 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6277 self
._write
_ns
_status
(
6280 current_operation
="SCALING",
6281 current_operation_id
=nslcmop_id
,
6284 step
= "Getting nslcmop from database"
6286 step
+ " after having waited for previous tasks to be completed"
6288 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6290 step
= "Getting nsr from database"
6291 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6292 old_operational_status
= db_nsr
["operational-status"]
6293 old_config_status
= db_nsr
["config-status"]
6295 step
= "Parsing scaling parameters"
6296 db_nsr_update
["operational-status"] = "scaling"
6297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6298 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6300 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6302 ]["member-vnf-index"]
6303 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6305 ]["scaling-group-descriptor"]
6306 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6307 # for backward compatibility
6308 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6309 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6310 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6311 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6313 step
= "Getting vnfr from database"
6314 db_vnfr
= self
.db
.get_one(
6315 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6318 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6320 step
= "Getting vnfd from database"
6321 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6323 base_folder
= db_vnfd
["_admin"]["storage"]
6325 step
= "Getting scaling-group-descriptor"
6326 scaling_descriptor
= find_in_list(
6327 get_scaling_aspect(db_vnfd
),
6328 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6330 if not scaling_descriptor
:
6332 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6333 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6336 step
= "Sending scale order to VIM"
6337 # TODO check if ns is in a proper status
6339 if not db_nsr
["_admin"].get("scaling-group"):
6344 "_admin.scaling-group": [
6345 {"name": scaling_group
, "nb-scale-op": 0}
6349 admin_scale_index
= 0
6351 for admin_scale_index
, admin_scale_info
in enumerate(
6352 db_nsr
["_admin"]["scaling-group"]
6354 if admin_scale_info
["name"] == scaling_group
:
6355 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6357 else: # not found, set index one plus last element and add new entry with the name
6358 admin_scale_index
+= 1
6360 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6363 vca_scaling_info
= []
6364 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6365 if scaling_type
== "SCALE_OUT":
6366 if "aspect-delta-details" not in scaling_descriptor
:
6368 "Aspect delta details not fount in scaling descriptor {}".format(
6369 scaling_descriptor
["name"]
6372 # count if max-instance-count is reached
6373 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6375 scaling_info
["scaling_direction"] = "OUT"
6376 scaling_info
["vdu-create"] = {}
6377 scaling_info
["kdu-create"] = {}
6378 for delta
in deltas
:
6379 for vdu_delta
in delta
.get("vdu-delta", {}):
6380 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6381 # vdu_index also provides the number of instance of the targeted vdu
6382 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6383 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6387 additional_params
= (
6388 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6391 cloud_init_list
= []
6393 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6394 max_instance_count
= 10
6395 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6396 max_instance_count
= vdu_profile
.get(
6397 "max-number-of-instances", 10
6400 default_instance_num
= get_number_of_instances(
6403 instances_number
= vdu_delta
.get("number-of-instances", 1)
6404 nb_scale_op
+= instances_number
6406 new_instance_count
= nb_scale_op
+ default_instance_num
6407 # Control if new count is over max and vdu count is less than max.
6408 # Then assign new instance count
6409 if new_instance_count
> max_instance_count
> vdu_count
:
6410 instances_number
= new_instance_count
- max_instance_count
6412 instances_number
= instances_number
6414 if new_instance_count
> max_instance_count
:
6416 "reached the limit of {} (max-instance-count) "
6417 "scaling-out operations for the "
6418 "scaling-group-descriptor '{}'".format(
6419 nb_scale_op
, scaling_group
6422 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6424 # TODO Information of its own ip is not available because db_vnfr is not updated.
6425 additional_params
["OSM"] = get_osm_params(
6426 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6428 cloud_init_list
.append(
6429 self
._parse
_cloud
_init
(
6436 vca_scaling_info
.append(
6438 "osm_vdu_id": vdu_delta
["id"],
6439 "member-vnf-index": vnf_index
,
6441 "vdu_index": vdu_index
+ x
,
6444 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6445 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6446 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6447 kdu_name
= kdu_profile
["kdu-name"]
6448 resource_name
= kdu_profile
.get("resource-name", "")
6450 # Might have different kdus in the same delta
6451 # Should have list for each kdu
6452 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6453 scaling_info
["kdu-create"][kdu_name
] = []
6455 kdur
= get_kdur(db_vnfr
, kdu_name
)
6456 if kdur
.get("helm-chart"):
6457 k8s_cluster_type
= "helm-chart-v3"
6458 self
.logger
.debug("kdur: {}".format(kdur
))
6460 kdur
.get("helm-version")
6461 and kdur
.get("helm-version") == "v2"
6463 k8s_cluster_type
= "helm-chart"
6464 elif kdur
.get("juju-bundle"):
6465 k8s_cluster_type
= "juju-bundle"
6468 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6469 "juju-bundle. Maybe an old NBI version is running".format(
6470 db_vnfr
["member-vnf-index-ref"], kdu_name
6474 max_instance_count
= 10
6475 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6476 max_instance_count
= kdu_profile
.get(
6477 "max-number-of-instances", 10
6480 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6481 deployed_kdu
, _
= get_deployed_kdu(
6482 nsr_deployed
, kdu_name
, vnf_index
6484 if deployed_kdu
is None:
6486 "KDU '{}' for vnf '{}' not deployed".format(
6490 kdu_instance
= deployed_kdu
.get("kdu-instance")
6491 instance_num
= await self
.k8scluster_map
[
6497 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6498 kdu_model
=deployed_kdu
.get("kdu-model"),
6500 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6501 "number-of-instances", 1
6504 # Control if new count is over max and instance_num is less than max.
6505 # Then assign max instance number to kdu replica count
6506 if kdu_replica_count
> max_instance_count
> instance_num
:
6507 kdu_replica_count
= max_instance_count
6508 if kdu_replica_count
> max_instance_count
:
6510 "reached the limit of {} (max-instance-count) "
6511 "scaling-out operations for the "
6512 "scaling-group-descriptor '{}'".format(
6513 instance_num
, scaling_group
6517 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6518 vca_scaling_info
.append(
6520 "osm_kdu_id": kdu_name
,
6521 "member-vnf-index": vnf_index
,
6523 "kdu_index": instance_num
+ x
- 1,
6526 scaling_info
["kdu-create"][kdu_name
].append(
6528 "member-vnf-index": vnf_index
,
6530 "k8s-cluster-type": k8s_cluster_type
,
6531 "resource-name": resource_name
,
6532 "scale": kdu_replica_count
,
6535 elif scaling_type
== "SCALE_IN":
6536 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6538 scaling_info
["scaling_direction"] = "IN"
6539 scaling_info
["vdu-delete"] = {}
6540 scaling_info
["kdu-delete"] = {}
6542 for delta
in deltas
:
6543 for vdu_delta
in delta
.get("vdu-delta", {}):
6544 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6545 min_instance_count
= 0
6546 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6547 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6548 min_instance_count
= vdu_profile
["min-number-of-instances"]
6550 default_instance_num
= get_number_of_instances(
6551 db_vnfd
, vdu_delta
["id"]
6553 instance_num
= vdu_delta
.get("number-of-instances", 1)
6554 nb_scale_op
-= instance_num
6556 new_instance_count
= nb_scale_op
+ default_instance_num
6558 if new_instance_count
< min_instance_count
< vdu_count
:
6559 instances_number
= min_instance_count
- new_instance_count
6561 instances_number
= instance_num
6563 if new_instance_count
< min_instance_count
:
6565 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6566 "scaling-group-descriptor '{}'".format(
6567 nb_scale_op
, scaling_group
6570 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6571 vca_scaling_info
.append(
6573 "osm_vdu_id": vdu_delta
["id"],
6574 "member-vnf-index": vnf_index
,
6576 "vdu_index": vdu_index
- 1 - x
,
6579 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6580 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6581 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6582 kdu_name
= kdu_profile
["kdu-name"]
6583 resource_name
= kdu_profile
.get("resource-name", "")
6585 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6586 scaling_info
["kdu-delete"][kdu_name
] = []
6588 kdur
= get_kdur(db_vnfr
, kdu_name
)
6589 if kdur
.get("helm-chart"):
6590 k8s_cluster_type
= "helm-chart-v3"
6591 self
.logger
.debug("kdur: {}".format(kdur
))
6593 kdur
.get("helm-version")
6594 and kdur
.get("helm-version") == "v2"
6596 k8s_cluster_type
= "helm-chart"
6597 elif kdur
.get("juju-bundle"):
6598 k8s_cluster_type
= "juju-bundle"
6601 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6602 "juju-bundle. Maybe an old NBI version is running".format(
6603 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6607 min_instance_count
= 0
6608 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6609 min_instance_count
= kdu_profile
["min-number-of-instances"]
6611 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6612 deployed_kdu
, _
= get_deployed_kdu(
6613 nsr_deployed
, kdu_name
, vnf_index
6615 if deployed_kdu
is None:
6617 "KDU '{}' for vnf '{}' not deployed".format(
6621 kdu_instance
= deployed_kdu
.get("kdu-instance")
6622 instance_num
= await self
.k8scluster_map
[
6628 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6629 kdu_model
=deployed_kdu
.get("kdu-model"),
6631 kdu_replica_count
= instance_num
- kdu_delta
.get(
6632 "number-of-instances", 1
6635 if kdu_replica_count
< min_instance_count
< instance_num
:
6636 kdu_replica_count
= min_instance_count
6637 if kdu_replica_count
< min_instance_count
:
6639 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6640 "scaling-group-descriptor '{}'".format(
6641 instance_num
, scaling_group
6645 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6646 vca_scaling_info
.append(
6648 "osm_kdu_id": kdu_name
,
6649 "member-vnf-index": vnf_index
,
6651 "kdu_index": instance_num
- x
- 1,
6654 scaling_info
["kdu-delete"][kdu_name
].append(
6656 "member-vnf-index": vnf_index
,
6658 "k8s-cluster-type": k8s_cluster_type
,
6659 "resource-name": resource_name
,
6660 "scale": kdu_replica_count
,
6664 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6665 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6666 if scaling_info
["scaling_direction"] == "IN":
6667 for vdur
in reversed(db_vnfr
["vdur"]):
6668 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6669 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6670 scaling_info
["vdu"].append(
6672 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6673 "vdu_id": vdur
["vdu-id-ref"],
6677 for interface
in vdur
["interfaces"]:
6678 scaling_info
["vdu"][-1]["interface"].append(
6680 "name": interface
["name"],
6681 "ip_address": interface
["ip-address"],
6682 "mac_address": interface
.get("mac-address"),
6685 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6688 step
= "Executing pre-scale vnf-config-primitive"
6689 if scaling_descriptor
.get("scaling-config-action"):
6690 for scaling_config_action
in scaling_descriptor
[
6691 "scaling-config-action"
6694 scaling_config_action
.get("trigger") == "pre-scale-in"
6695 and scaling_type
== "SCALE_IN"
6697 scaling_config_action
.get("trigger") == "pre-scale-out"
6698 and scaling_type
== "SCALE_OUT"
6700 vnf_config_primitive
= scaling_config_action
[
6701 "vnf-config-primitive-name-ref"
6703 step
= db_nslcmop_update
[
6705 ] = "executing pre-scale scaling-config-action '{}'".format(
6706 vnf_config_primitive
6709 # look for primitive
6710 for config_primitive
in (
6711 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6712 ).get("config-primitive", ()):
6713 if config_primitive
["name"] == vnf_config_primitive
:
6717 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6718 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6719 "primitive".format(scaling_group
, vnf_config_primitive
)
6722 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6723 if db_vnfr
.get("additionalParamsForVnf"):
6724 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6726 scale_process
= "VCA"
6727 db_nsr_update
["config-status"] = "configuring pre-scaling"
6728 primitive_params
= self
._map
_primitive
_params
(
6729 config_primitive
, {}, vnfr_params
6732 # Pre-scale retry check: Check if this sub-operation has been executed before
6733 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6736 vnf_config_primitive
,
6740 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6741 # Skip sub-operation
6742 result
= "COMPLETED"
6743 result_detail
= "Done"
6746 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6747 vnf_config_primitive
, result
, result_detail
6751 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6752 # New sub-operation: Get index of this sub-operation
6754 len(db_nslcmop
.get("_admin", {}).get("operations"))
6759 + "vnf_config_primitive={} New sub-operation".format(
6760 vnf_config_primitive
6764 # retry: Get registered params for this existing sub-operation
6765 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6768 vnf_index
= op
.get("member_vnf_index")
6769 vnf_config_primitive
= op
.get("primitive")
6770 primitive_params
= op
.get("primitive_params")
6773 + "vnf_config_primitive={} Sub-operation retry".format(
6774 vnf_config_primitive
6777 # Execute the primitive, either with new (first-time) or registered (reintent) args
6778 ee_descriptor_id
= config_primitive
.get(
6779 "execution-environment-ref"
6781 primitive_name
= config_primitive
.get(
6782 "execution-environment-primitive", vnf_config_primitive
6784 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6785 nsr_deployed
["VCA"],
6786 member_vnf_index
=vnf_index
,
6788 vdu_count_index
=None,
6789 ee_descriptor_id
=ee_descriptor_id
,
6791 result
, result_detail
= await self
._ns
_execute
_primitive
(
6800 + "vnf_config_primitive={} Done with result {} {}".format(
6801 vnf_config_primitive
, result
, result_detail
6804 # Update operationState = COMPLETED | FAILED
6805 self
._update
_suboperation
_status
(
6806 db_nslcmop
, op_index
, result
, result_detail
6809 if result
== "FAILED":
6810 raise LcmException(result_detail
)
6811 db_nsr_update
["config-status"] = old_config_status
6812 scale_process
= None
6816 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6819 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6822 # SCALE-IN VCA - BEGIN
6823 if vca_scaling_info
:
6824 step
= db_nslcmop_update
[
6826 ] = "Deleting the execution environments"
6827 scale_process
= "VCA"
6828 for vca_info
in vca_scaling_info
:
6829 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6830 member_vnf_index
= str(vca_info
["member-vnf-index"])
6832 logging_text
+ "vdu info: {}".format(vca_info
)
6834 if vca_info
.get("osm_vdu_id"):
6835 vdu_id
= vca_info
["osm_vdu_id"]
6836 vdu_index
= int(vca_info
["vdu_index"])
6839 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6840 member_vnf_index
, vdu_id
, vdu_index
6842 stage
[2] = step
= "Scaling in VCA"
6843 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6844 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6845 config_update
= db_nsr
["configurationStatus"]
6846 for vca_index
, vca
in enumerate(vca_update
):
6848 (vca
or vca
.get("ee_id"))
6849 and vca
["member-vnf-index"] == member_vnf_index
6850 and vca
["vdu_count_index"] == vdu_index
6852 if vca
.get("vdu_id"):
6853 config_descriptor
= get_configuration(
6854 db_vnfd
, vca
.get("vdu_id")
6856 elif vca
.get("kdu_name"):
6857 config_descriptor
= get_configuration(
6858 db_vnfd
, vca
.get("kdu_name")
6861 config_descriptor
= get_configuration(
6862 db_vnfd
, db_vnfd
["id"]
6864 operation_params
= (
6865 db_nslcmop
.get("operationParams") or {}
6867 exec_terminate_primitives
= not operation_params
.get(
6868 "skip_terminate_primitives"
6869 ) and vca
.get("needed_terminate")
6870 task
= asyncio
.ensure_future(
6879 exec_primitives
=exec_terminate_primitives
,
6883 timeout
=self
.timeout
.charm_delete
,
6886 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6889 del vca_update
[vca_index
]
6890 del config_update
[vca_index
]
6891 # wait for pending tasks of terminate primitives
6895 + "Waiting for tasks {}".format(
6896 list(tasks_dict_info
.keys())
6899 error_list
= await self
._wait
_for
_tasks
(
6903 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6908 tasks_dict_info
.clear()
6910 raise LcmException("; ".join(error_list
))
6912 db_vca_and_config_update
= {
6913 "_admin.deployed.VCA": vca_update
,
6914 "configurationStatus": config_update
,
6917 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6919 scale_process
= None
6920 # SCALE-IN VCA - END
6923 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6924 scale_process
= "RO"
6925 if self
.ro_config
.ng
:
6926 await self
._scale
_ng
_ro
(
6927 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6929 scaling_info
.pop("vdu-create", None)
6930 scaling_info
.pop("vdu-delete", None)
6932 scale_process
= None
6936 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6937 scale_process
= "KDU"
6938 await self
._scale
_kdu
(
6939 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6941 scaling_info
.pop("kdu-create", None)
6942 scaling_info
.pop("kdu-delete", None)
6944 scale_process
= None
6948 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6950 # SCALE-UP VCA - BEGIN
6951 if vca_scaling_info
:
6952 step
= db_nslcmop_update
[
6954 ] = "Creating new execution environments"
6955 scale_process
= "VCA"
6956 for vca_info
in vca_scaling_info
:
6957 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6958 member_vnf_index
= str(vca_info
["member-vnf-index"])
6960 logging_text
+ "vdu info: {}".format(vca_info
)
6962 vnfd_id
= db_vnfr
["vnfd-ref"]
6963 if vca_info
.get("osm_vdu_id"):
6964 vdu_index
= int(vca_info
["vdu_index"])
6965 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6966 if db_vnfr
.get("additionalParamsForVnf"):
6967 deploy_params
.update(
6969 db_vnfr
["additionalParamsForVnf"].copy()
6972 descriptor_config
= get_configuration(
6973 db_vnfd
, db_vnfd
["id"]
6975 if descriptor_config
:
6981 logging_text
=logging_text
6982 + "member_vnf_index={} ".format(member_vnf_index
),
6985 nslcmop_id
=nslcmop_id
,
6991 kdu_index
=kdu_index
,
6992 member_vnf_index
=member_vnf_index
,
6993 vdu_index
=vdu_index
,
6995 deploy_params
=deploy_params
,
6996 descriptor_config
=descriptor_config
,
6997 base_folder
=base_folder
,
6998 task_instantiation_info
=tasks_dict_info
,
7001 vdu_id
= vca_info
["osm_vdu_id"]
7002 vdur
= find_in_list(
7003 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7005 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7006 if vdur
.get("additionalParams"):
7007 deploy_params_vdu
= parse_yaml_strings(
7008 vdur
["additionalParams"]
7011 deploy_params_vdu
= deploy_params
7012 deploy_params_vdu
["OSM"] = get_osm_params(
7013 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7015 if descriptor_config
:
7021 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7022 member_vnf_index
, vdu_id
, vdu_index
7024 stage
[2] = step
= "Scaling out VCA"
7025 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7027 logging_text
=logging_text
7028 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7029 member_vnf_index
, vdu_id
, vdu_index
7033 nslcmop_id
=nslcmop_id
,
7039 member_vnf_index
=member_vnf_index
,
7040 vdu_index
=vdu_index
,
7041 kdu_index
=kdu_index
,
7043 deploy_params
=deploy_params_vdu
,
7044 descriptor_config
=descriptor_config
,
7045 base_folder
=base_folder
,
7046 task_instantiation_info
=tasks_dict_info
,
7049 # SCALE-UP VCA - END
7050 scale_process
= None
7053 # execute primitive service POST-SCALING
7054 step
= "Executing post-scale vnf-config-primitive"
7055 if scaling_descriptor
.get("scaling-config-action"):
7056 for scaling_config_action
in scaling_descriptor
[
7057 "scaling-config-action"
7060 scaling_config_action
.get("trigger") == "post-scale-in"
7061 and scaling_type
== "SCALE_IN"
7063 scaling_config_action
.get("trigger") == "post-scale-out"
7064 and scaling_type
== "SCALE_OUT"
7066 vnf_config_primitive
= scaling_config_action
[
7067 "vnf-config-primitive-name-ref"
7069 step
= db_nslcmop_update
[
7071 ] = "executing post-scale scaling-config-action '{}'".format(
7072 vnf_config_primitive
7075 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7076 if db_vnfr
.get("additionalParamsForVnf"):
7077 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7079 # look for primitive
7080 for config_primitive
in (
7081 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7082 ).get("config-primitive", ()):
7083 if config_primitive
["name"] == vnf_config_primitive
:
7087 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7088 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7089 "config-primitive".format(
7090 scaling_group
, vnf_config_primitive
7093 scale_process
= "VCA"
7094 db_nsr_update
["config-status"] = "configuring post-scaling"
7095 primitive_params
= self
._map
_primitive
_params
(
7096 config_primitive
, {}, vnfr_params
7099 # Post-scale retry check: Check if this sub-operation has been executed before
7100 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7103 vnf_config_primitive
,
7107 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7108 # Skip sub-operation
7109 result
= "COMPLETED"
7110 result_detail
= "Done"
7113 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7114 vnf_config_primitive
, result
, result_detail
7118 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7119 # New sub-operation: Get index of this sub-operation
7121 len(db_nslcmop
.get("_admin", {}).get("operations"))
7126 + "vnf_config_primitive={} New sub-operation".format(
7127 vnf_config_primitive
7131 # retry: Get registered params for this existing sub-operation
7132 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7135 vnf_index
= op
.get("member_vnf_index")
7136 vnf_config_primitive
= op
.get("primitive")
7137 primitive_params
= op
.get("primitive_params")
7140 + "vnf_config_primitive={} Sub-operation retry".format(
7141 vnf_config_primitive
7144 # Execute the primitive, either with new (first-time) or registered (reintent) args
7145 ee_descriptor_id
= config_primitive
.get(
7146 "execution-environment-ref"
7148 primitive_name
= config_primitive
.get(
7149 "execution-environment-primitive", vnf_config_primitive
7151 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7152 nsr_deployed
["VCA"],
7153 member_vnf_index
=vnf_index
,
7155 vdu_count_index
=None,
7156 ee_descriptor_id
=ee_descriptor_id
,
7158 result
, result_detail
= await self
._ns
_execute
_primitive
(
7167 + "vnf_config_primitive={} Done with result {} {}".format(
7168 vnf_config_primitive
, result
, result_detail
7171 # Update operationState = COMPLETED | FAILED
7172 self
._update
_suboperation
_status
(
7173 db_nslcmop
, op_index
, result
, result_detail
7176 if result
== "FAILED":
7177 raise LcmException(result_detail
)
7178 db_nsr_update
["config-status"] = old_config_status
7179 scale_process
= None
7184 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7185 db_nsr_update
["operational-status"] = (
7187 if old_operational_status
== "failed"
7188 else old_operational_status
7190 db_nsr_update
["config-status"] = old_config_status
7193 ROclient
.ROClientException
,
7198 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7200 except asyncio
.CancelledError
:
7202 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7204 exc
= "Operation was cancelled"
7205 except Exception as e
:
7206 exc
= traceback
.format_exc()
7207 self
.logger
.critical(
7208 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7212 self
._write
_ns
_status
(
7215 current_operation
="IDLE",
7216 current_operation_id
=None,
7219 stage
[1] = "Waiting for instantiate pending tasks."
7220 self
.logger
.debug(logging_text
+ stage
[1])
7221 exc
= await self
._wait
_for
_tasks
(
7224 self
.timeout
.ns_deploy
,
7232 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7233 nslcmop_operation_state
= "FAILED"
7235 db_nsr_update
["operational-status"] = old_operational_status
7236 db_nsr_update
["config-status"] = old_config_status
7237 db_nsr_update
["detailed-status"] = ""
7239 if "VCA" in scale_process
:
7240 db_nsr_update
["config-status"] = "failed"
7241 if "RO" in scale_process
:
7242 db_nsr_update
["operational-status"] = "failed"
7245 ] = "FAILED scaling nslcmop={} {}: {}".format(
7246 nslcmop_id
, step
, exc
7249 error_description_nslcmop
= None
7250 nslcmop_operation_state
= "COMPLETED"
7251 db_nslcmop_update
["detailed-status"] = "Done"
7253 self
._write
_op
_status
(
7256 error_message
=error_description_nslcmop
,
7257 operation_state
=nslcmop_operation_state
,
7258 other_update
=db_nslcmop_update
,
7261 self
._write
_ns
_status
(
7264 current_operation
="IDLE",
7265 current_operation_id
=None,
7266 other_update
=db_nsr_update
,
7269 if nslcmop_operation_state
:
7273 "nslcmop_id": nslcmop_id
,
7274 "operationState": nslcmop_operation_state
,
7276 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7277 except Exception as e
:
7279 logging_text
+ "kafka_write notification Exception {}".format(e
)
7281 self
.logger
.debug(logging_text
+ "Exit")
7282 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7284 async def _scale_kdu(
7285 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7287 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7288 for kdu_name
in _scaling_info
:
7289 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7290 deployed_kdu
, index
= get_deployed_kdu(
7291 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7293 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7294 kdu_instance
= deployed_kdu
["kdu-instance"]
7295 kdu_model
= deployed_kdu
.get("kdu-model")
7296 scale
= int(kdu_scaling_info
["scale"])
7297 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7300 "collection": "nsrs",
7301 "filter": {"_id": nsr_id
},
7302 "path": "_admin.deployed.K8s.{}".format(index
),
7305 step
= "scaling application {}".format(
7306 kdu_scaling_info
["resource-name"]
7308 self
.logger
.debug(logging_text
+ step
)
7310 if kdu_scaling_info
["type"] == "delete":
7311 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7314 and kdu_config
.get("terminate-config-primitive")
7315 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7317 terminate_config_primitive_list
= kdu_config
.get(
7318 "terminate-config-primitive"
7320 terminate_config_primitive_list
.sort(
7321 key
=lambda val
: int(val
["seq"])
7325 terminate_config_primitive
7326 ) in terminate_config_primitive_list
:
7327 primitive_params_
= self
._map
_primitive
_params
(
7328 terminate_config_primitive
, {}, {}
7330 step
= "execute terminate config primitive"
7331 self
.logger
.debug(logging_text
+ step
)
7332 await asyncio
.wait_for(
7333 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7334 cluster_uuid
=cluster_uuid
,
7335 kdu_instance
=kdu_instance
,
7336 primitive_name
=terminate_config_primitive
["name"],
7337 params
=primitive_params_
,
7339 total_timeout
=self
.timeout
.primitive
,
7342 timeout
=self
.timeout
.primitive
7343 * self
.timeout
.primitive_outer_factor
,
7346 await asyncio
.wait_for(
7347 self
.k8scluster_map
[k8s_cluster_type
].scale(
7348 kdu_instance
=kdu_instance
,
7350 resource_name
=kdu_scaling_info
["resource-name"],
7351 total_timeout
=self
.timeout
.scale_on_error
,
7353 cluster_uuid
=cluster_uuid
,
7354 kdu_model
=kdu_model
,
7358 timeout
=self
.timeout
.scale_on_error
7359 * self
.timeout
.scale_on_error_outer_factor
,
7362 if kdu_scaling_info
["type"] == "create":
7363 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7366 and kdu_config
.get("initial-config-primitive")
7367 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7369 initial_config_primitive_list
= kdu_config
.get(
7370 "initial-config-primitive"
7372 initial_config_primitive_list
.sort(
7373 key
=lambda val
: int(val
["seq"])
7376 for initial_config_primitive
in initial_config_primitive_list
:
7377 primitive_params_
= self
._map
_primitive
_params
(
7378 initial_config_primitive
, {}, {}
7380 step
= "execute initial config primitive"
7381 self
.logger
.debug(logging_text
+ step
)
7382 await asyncio
.wait_for(
7383 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7384 cluster_uuid
=cluster_uuid
,
7385 kdu_instance
=kdu_instance
,
7386 primitive_name
=initial_config_primitive
["name"],
7387 params
=primitive_params_
,
7394 async def _scale_ng_ro(
7395 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7397 nsr_id
= db_nslcmop
["nsInstanceId"]
7398 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7401 # read from db: vnfd's for every vnf
7404 # for each vnf in ns, read vnfd
7405 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7406 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7407 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7408 # if we haven't this vnfd, read it from db
7409 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7411 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7412 db_vnfds
.append(vnfd
)
7413 n2vc_key
= self
.n2vc
.get_public_key()
7414 n2vc_key_list
= [n2vc_key
]
7417 vdu_scaling_info
.get("vdu-create"),
7418 vdu_scaling_info
.get("vdu-delete"),
7421 # db_vnfr has been updated, update db_vnfrs to use it
7422 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7423 await self
._instantiate
_ng
_ro
(
7433 start_deploy
=time(),
7434 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7436 if vdu_scaling_info
.get("vdu-delete"):
7438 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7441 async def extract_prometheus_scrape_jobs(
7445 ee_config_descriptor
: dict,
7450 vnf_member_index
: str = "",
7452 vdu_index
: int = None,
7454 kdu_index
: int = None,
7456 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7457 This method will wait until the corresponding VDU or KDU is fully instantiated
7460 ee_id (str): Execution Environment ID
7461 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7462 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7463 vnfr_id (str): VNFR ID where this EE applies
7464 nsr_id (str): NSR ID where this EE applies
7465 target_ip (str): VDU/KDU instance IP address
7466 element_type (str): NS or VNF or VDU or KDU
7467 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7468 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7469 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7470 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7471 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7474 LcmException: When the VDU or KDU instance was not found in an hour
7477 _type_: Prometheus jobs
7479 # default the vdur and kdur names to an empty string, to avoid any later
7480 # problem with Prometheus when the element type is not VDU or KDU
7484 # look if exist a file called 'prometheus*.j2' and
7485 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7489 for f
in artifact_content
7490 if f
.startswith("prometheus") and f
.endswith(".j2")
7496 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7499 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7500 if element_type
in ("VDU", "KDU"):
7501 for _
in range(360):
7502 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7503 if vdu_id
and vdu_index
is not None:
7507 for x
in get_iterable(db_vnfr
, "vdur")
7509 x
.get("vdu-id-ref") == vdu_id
7510 and x
.get("count-index") == vdu_index
7515 if vdur
.get("name"):
7516 vdur_name
= vdur
.get("name")
7518 if kdu_name
and kdu_index
is not None:
7522 for x
in get_iterable(db_vnfr
, "kdur")
7524 x
.get("kdu-name") == kdu_name
7525 and x
.get("count-index") == kdu_index
7530 if kdur
.get("name"):
7531 kdur_name
= kdur
.get("name")
7534 await asyncio
.sleep(10, loop
=self
.loop
)
7536 if vdu_id
and vdu_index
is not None:
7538 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7540 if kdu_name
and kdu_index
is not None:
7542 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7546 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7547 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7549 vnfr_id
= vnfr_id
.replace("-", "")
7551 "JOB_NAME": vnfr_id
,
7552 "TARGET_IP": target_ip
,
7553 "EXPORTER_POD_IP": host_name
,
7554 "EXPORTER_POD_PORT": host_port
,
7556 "VNF_MEMBER_INDEX": vnf_member_index
,
7557 "VDUR_NAME": vdur_name
,
7558 "KDUR_NAME": kdur_name
,
7559 "ELEMENT_TYPE": element_type
,
7561 job_list
= parse_job(job_data
, variables
)
7562 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7563 for job
in job_list
:
7565 not isinstance(job
.get("job_name"), str)
7566 or vnfr_id
not in job
["job_name"]
7568 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7569 job
["nsr_id"] = nsr_id
7570 job
["vnfr_id"] = vnfr_id
7573 async def rebuild_start_stop(
7574 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7576 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7577 self
.logger
.info(logging_text
+ "Enter")
7578 stage
= ["Preparing the environment", ""]
7579 # database nsrs record
7583 # in case of error, indicates what part of scale was failed to put nsr at error status
7584 start_deploy
= time()
7586 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7587 vim_account_id
= db_vnfr
.get("vim-account-id")
7588 vim_info_key
= "vim:" + vim_account_id
7589 vdu_id
= additional_param
["vdu_id"]
7590 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7591 vdur
= find_in_list(
7592 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7595 vdu_vim_name
= vdur
["name"]
7596 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7597 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7599 raise LcmException("Target vdu is not found")
7600 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7601 # wait for any previous tasks in process
7602 stage
[1] = "Waiting for previous operations to terminate"
7603 self
.logger
.info(stage
[1])
7604 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7606 stage
[1] = "Reading from database."
7607 self
.logger
.info(stage
[1])
7608 self
._write
_ns
_status
(
7611 current_operation
=operation_type
.upper(),
7612 current_operation_id
=nslcmop_id
,
7614 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7617 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7618 db_nsr_update
["operational-status"] = operation_type
7619 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7623 "vim_vm_id": vim_vm_id
,
7625 "vdu_index": additional_param
["count-index"],
7626 "vdu_id": vdur
["id"],
7627 "target_vim": target_vim
,
7628 "vim_account_id": vim_account_id
,
7631 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7632 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7633 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7634 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7635 self
.logger
.info("response from RO: {}".format(result_dict
))
7636 action_id
= result_dict
["action_id"]
7637 await self
._wait
_ng
_ro
(
7642 self
.timeout
.operate
,
7644 "start_stop_rebuild",
7646 return "COMPLETED", "Done"
7647 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7648 self
.logger
.error("Exit Exception {}".format(e
))
7650 except asyncio
.CancelledError
:
7651 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7652 exc
= "Operation was cancelled"
7653 except Exception as e
:
7654 exc
= traceback
.format_exc()
7655 self
.logger
.critical(
7656 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7658 return "FAILED", "Error in operate VNF {}".format(exc
)
7660 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7662 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7664 :param: vim_account_id: VIM Account ID
7666 :return: (cloud_name, cloud_credential)
7668 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7669 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7671 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7673 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7675 :param: vim_account_id: VIM Account ID
7677 :return: (cloud_name, cloud_credential)
7679 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7680 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7682 async def migrate(self
, nsr_id
, nslcmop_id
):
7684 Migrate VNFs and VDUs instances in a NS
7686 :param: nsr_id: NS Instance ID
7687 :param: nslcmop_id: nslcmop ID of migrate
7690 # Try to lock HA task here
7691 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7692 if not task_is_locked_by_me
:
7694 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7695 self
.logger
.debug(logging_text
+ "Enter")
7696 # get all needed from database
7698 db_nslcmop_update
= {}
7699 nslcmop_operation_state
= None
7703 # in case of error, indicates what part of scale was failed to put nsr at error status
7704 start_deploy
= time()
7707 # wait for any previous tasks in process
7708 step
= "Waiting for previous operations to terminate"
7709 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7711 self
._write
_ns
_status
(
7714 current_operation
="MIGRATING",
7715 current_operation_id
=nslcmop_id
,
7717 step
= "Getting nslcmop from database"
7719 step
+ " after having waited for previous tasks to be completed"
7721 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7722 migrate_params
= db_nslcmop
.get("operationParams")
7725 target
.update(migrate_params
)
7726 desc
= await self
.RO
.migrate(nsr_id
, target
)
7727 self
.logger
.debug("RO return > {}".format(desc
))
7728 action_id
= desc
["action_id"]
7729 await self
._wait
_ng
_ro
(
7734 self
.timeout
.migrate
,
7735 operation
="migrate",
7737 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7738 self
.logger
.error("Exit Exception {}".format(e
))
7740 except asyncio
.CancelledError
:
7741 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7742 exc
= "Operation was cancelled"
7743 except Exception as e
:
7744 exc
= traceback
.format_exc()
7745 self
.logger
.critical(
7746 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7749 self
._write
_ns
_status
(
7752 current_operation
="IDLE",
7753 current_operation_id
=None,
7756 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7757 nslcmop_operation_state
= "FAILED"
7759 nslcmop_operation_state
= "COMPLETED"
7760 db_nslcmop_update
["detailed-status"] = "Done"
7761 db_nsr_update
["detailed-status"] = "Done"
7763 self
._write
_op
_status
(
7767 operation_state
=nslcmop_operation_state
,
7768 other_update
=db_nslcmop_update
,
7770 if nslcmop_operation_state
:
7774 "nslcmop_id": nslcmop_id
,
7775 "operationState": nslcmop_operation_state
,
7777 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7778 except Exception as e
:
7780 logging_text
+ "kafka_write notification Exception {}".format(e
)
7782 self
.logger
.debug(logging_text
+ "Exit")
7783 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7785 async def heal(self
, nsr_id
, nslcmop_id
):
7789 :param nsr_id: ns instance to heal
7790 :param nslcmop_id: operation to run
7794 # Try to lock HA task here
7795 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7796 if not task_is_locked_by_me
:
7799 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7800 stage
= ["", "", ""]
7801 tasks_dict_info
= {}
7802 # ^ stage, step, VIM progress
7803 self
.logger
.debug(logging_text
+ "Enter")
7804 # get all needed from database
7806 db_nslcmop_update
= {}
7808 db_vnfrs
= {} # vnf's info indexed by _id
7810 old_operational_status
= ""
7811 old_config_status
= ""
7814 # wait for any previous tasks in process
7815 step
= "Waiting for previous operations to terminate"
7816 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7817 self
._write
_ns
_status
(
7820 current_operation
="HEALING",
7821 current_operation_id
=nslcmop_id
,
7824 step
= "Getting nslcmop from database"
7826 step
+ " after having waited for previous tasks to be completed"
7828 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7830 step
= "Getting nsr from database"
7831 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7832 old_operational_status
= db_nsr
["operational-status"]
7833 old_config_status
= db_nsr
["config-status"]
7836 "_admin.deployed.RO.operational-status": "healing",
7838 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7840 step
= "Sending heal order to VIM"
7842 logging_text
=logging_text
,
7844 db_nslcmop
=db_nslcmop
,
7849 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7850 self
.logger
.debug(logging_text
+ stage
[1])
7851 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7852 self
.fs
.sync(db_nsr
["nsd-id"])
7854 # read from db: vnfr's of this ns
7855 step
= "Getting vnfrs from db"
7856 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7857 for vnfr
in db_vnfrs_list
:
7858 db_vnfrs
[vnfr
["_id"]] = vnfr
7859 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7861 # Check for each target VNF
7862 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7863 for target_vnf
in target_list
:
7864 # Find this VNF in the list from DB
7865 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7867 db_vnfr
= db_vnfrs
[vnfr_id
]
7868 vnfd_id
= db_vnfr
.get("vnfd-id")
7869 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7870 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7871 base_folder
= vnfd
["_admin"]["storage"]
7876 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7877 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7879 # Check each target VDU and deploy N2VC
7880 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7883 if not target_vdu_list
:
7884 # Codigo nuevo para crear diccionario
7885 target_vdu_list
= []
7886 for existing_vdu
in db_vnfr
.get("vdur"):
7887 vdu_name
= existing_vdu
.get("vdu-name", None)
7888 vdu_index
= existing_vdu
.get("count-index", 0)
7889 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7892 vdu_to_be_healed
= {
7894 "count-index": vdu_index
,
7895 "run-day1": vdu_run_day1
,
7897 target_vdu_list
.append(vdu_to_be_healed
)
7898 for target_vdu
in target_vdu_list
:
7899 deploy_params_vdu
= target_vdu
7900 # Set run-day1 vnf level value if not vdu level value exists
7901 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7904 deploy_params_vdu
["run-day1"] = target_vnf
[
7907 vdu_name
= target_vdu
.get("vdu-id", None)
7908 # TODO: Get vdu_id from vdud.
7910 # For multi instance VDU count-index is mandatory
7911 # For single session VDU count-indes is 0
7912 vdu_index
= target_vdu
.get("count-index", 0)
7914 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7915 stage
[1] = "Deploying Execution Environments."
7916 self
.logger
.debug(logging_text
+ stage
[1])
7918 # VNF Level charm. Normal case when proxy charms.
7919 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7920 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7921 if descriptor_config
:
7922 # Continue if healed machine is management machine
7923 vnf_ip_address
= db_vnfr
.get("ip-address")
7924 target_instance
= None
7925 for instance
in db_vnfr
.get("vdur", None):
7927 instance
["vdu-name"] == vdu_name
7928 and instance
["count-index"] == vdu_index
7930 target_instance
= instance
7932 if vnf_ip_address
== target_instance
.get("ip-address"):
7934 logging_text
=logging_text
7935 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7936 member_vnf_index
, vdu_name
, vdu_index
7940 nslcmop_id
=nslcmop_id
,
7946 member_vnf_index
=member_vnf_index
,
7949 deploy_params
=deploy_params_vdu
,
7950 descriptor_config
=descriptor_config
,
7951 base_folder
=base_folder
,
7952 task_instantiation_info
=tasks_dict_info
,
7956 # VDU Level charm. Normal case with native charms.
7957 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7958 if descriptor_config
:
7960 logging_text
=logging_text
7961 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7962 member_vnf_index
, vdu_name
, vdu_index
7966 nslcmop_id
=nslcmop_id
,
7972 member_vnf_index
=member_vnf_index
,
7973 vdu_index
=vdu_index
,
7975 deploy_params
=deploy_params_vdu
,
7976 descriptor_config
=descriptor_config
,
7977 base_folder
=base_folder
,
7978 task_instantiation_info
=tasks_dict_info
,
7983 ROclient
.ROClientException
,
7988 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7990 except asyncio
.CancelledError
:
7992 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7994 exc
= "Operation was cancelled"
7995 except Exception as e
:
7996 exc
= traceback
.format_exc()
7997 self
.logger
.critical(
7998 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
8003 stage
[1] = "Waiting for healing pending tasks."
8004 self
.logger
.debug(logging_text
+ stage
[1])
8005 exc
= await self
._wait
_for
_tasks
(
8008 self
.timeout
.ns_deploy
,
8016 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
8017 nslcmop_operation_state
= "FAILED"
8019 db_nsr_update
["operational-status"] = old_operational_status
8020 db_nsr_update
["config-status"] = old_config_status
8023 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
8024 for task
, task_name
in tasks_dict_info
.items():
8025 if not task
.done() or task
.cancelled() or task
.exception():
8026 if task_name
.startswith(self
.task_name_deploy_vca
):
8027 # A N2VC task is pending
8028 db_nsr_update
["config-status"] = "failed"
8030 # RO task is pending
8031 db_nsr_update
["operational-status"] = "failed"
8033 error_description_nslcmop
= None
8034 nslcmop_operation_state
= "COMPLETED"
8035 db_nslcmop_update
["detailed-status"] = "Done"
8036 db_nsr_update
["detailed-status"] = "Done"
8037 db_nsr_update
["operational-status"] = "running"
8038 db_nsr_update
["config-status"] = "configured"
8040 self
._write
_op
_status
(
8043 error_message
=error_description_nslcmop
,
8044 operation_state
=nslcmop_operation_state
,
8045 other_update
=db_nslcmop_update
,
8048 self
._write
_ns
_status
(
8051 current_operation
="IDLE",
8052 current_operation_id
=None,
8053 other_update
=db_nsr_update
,
8056 if nslcmop_operation_state
:
8060 "nslcmop_id": nslcmop_id
,
8061 "operationState": nslcmop_operation_state
,
8063 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8064 except Exception as e
:
8066 logging_text
+ "kafka_write notification Exception {}".format(e
)
8068 self
.logger
.debug(logging_text
+ "Exit")
8069 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8080 :param logging_text: preffix text to use at logging
8081 :param nsr_id: nsr identity
8082 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8083 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8084 :return: None or exception
8087 def get_vim_account(vim_account_id
):
8089 if vim_account_id
in db_vims
:
8090 return db_vims
[vim_account_id
]
8091 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8092 db_vims
[vim_account_id
] = db_vim
8097 ns_params
= db_nslcmop
.get("operationParams")
8098 if ns_params
and ns_params
.get("timeout_ns_heal"):
8099 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8101 timeout_ns_heal
= self
.timeout
.ns_heal
8105 nslcmop_id
= db_nslcmop
["_id"]
8107 "action_id": nslcmop_id
,
8109 self
.logger
.warning(
8110 "db_nslcmop={} and timeout_ns_heal={}".format(
8111 db_nslcmop
, timeout_ns_heal
8114 target
.update(db_nslcmop
.get("operationParams", {}))
8116 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8117 desc
= await self
.RO
.recreate(nsr_id
, target
)
8118 self
.logger
.debug("RO return > {}".format(desc
))
8119 action_id
= desc
["action_id"]
8120 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8121 await self
._wait
_ng
_ro
(
8128 operation
="healing",
8133 "_admin.deployed.RO.operational-status": "running",
8134 "detailed-status": " ".join(stage
),
8136 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8137 self
._write
_op
_status
(nslcmop_id
, stage
)
8139 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8142 except Exception as e
:
8143 stage
[2] = "ERROR healing at VIM"
8144 # self.set_vnfr_at_error(db_vnfrs, str(e))
8146 "Error healing at VIM {}".format(e
),
8147 exc_info
=not isinstance(
8150 ROclient
.ROClientException
,
8176 task_instantiation_info
,
8179 # launch instantiate_N2VC in a asyncio task and register task object
8180 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8181 # if not found, create one entry and update database
8182 # fill db_nsr._admin.deployed.VCA.<index>
8185 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8189 get_charm_name
= False
8190 if "execution-environment-list" in descriptor_config
:
8191 ee_list
= descriptor_config
.get("execution-environment-list", [])
8192 elif "juju" in descriptor_config
:
8193 ee_list
= [descriptor_config
] # ns charms
8194 if "execution-environment-list" not in descriptor_config
:
8195 # charm name is only required for ns charms
8196 get_charm_name
= True
8197 else: # other types as script are not supported
8200 for ee_item
in ee_list
:
8203 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8204 ee_item
.get("juju"), ee_item
.get("helm-chart")
8207 ee_descriptor_id
= ee_item
.get("id")
8208 if ee_item
.get("juju"):
8209 vca_name
= ee_item
["juju"].get("charm")
8211 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8214 if ee_item
["juju"].get("charm") is not None
8217 if ee_item
["juju"].get("cloud") == "k8s":
8218 vca_type
= "k8s_proxy_charm"
8219 elif ee_item
["juju"].get("proxy") is False:
8220 vca_type
= "native_charm"
8221 elif ee_item
.get("helm-chart"):
8222 vca_name
= ee_item
["helm-chart"]
8223 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8226 vca_type
= "helm-v3"
8229 logging_text
+ "skipping non juju neither charm configuration"
8234 for vca_index
, vca_deployed
in enumerate(
8235 db_nsr
["_admin"]["deployed"]["VCA"]
8237 if not vca_deployed
:
8240 vca_deployed
.get("member-vnf-index") == member_vnf_index
8241 and vca_deployed
.get("vdu_id") == vdu_id
8242 and vca_deployed
.get("kdu_name") == kdu_name
8243 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8244 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8248 # not found, create one.
8250 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8253 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8255 target
+= "/kdu/{}".format(kdu_name
)
8257 "target_element": target
,
8258 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8259 "member-vnf-index": member_vnf_index
,
8261 "kdu_name": kdu_name
,
8262 "vdu_count_index": vdu_index
,
8263 "operational-status": "init", # TODO revise
8264 "detailed-status": "", # TODO revise
8265 "step": "initial-deploy", # TODO revise
8267 "vdu_name": vdu_name
,
8269 "ee_descriptor_id": ee_descriptor_id
,
8270 "charm_name": charm_name
,
8274 # create VCA and configurationStatus in db
8276 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8277 "configurationStatus.{}".format(vca_index
): dict(),
8279 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8281 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8283 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8284 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8285 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8288 task_n2vc
= asyncio
.ensure_future(
8290 logging_text
=logging_text
,
8291 vca_index
=vca_index
,
8297 vdu_index
=vdu_index
,
8298 deploy_params
=deploy_params
,
8299 config_descriptor
=descriptor_config
,
8300 base_folder
=base_folder
,
8301 nslcmop_id
=nslcmop_id
,
8305 ee_config_descriptor
=ee_item
,
8308 self
.lcm_tasks
.register(
8312 "instantiate_N2VC-{}".format(vca_index
),
8315 task_instantiation_info
[
8317 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8318 member_vnf_index
or "", vdu_id
or ""
8321 async def heal_N2VC(
8338 ee_config_descriptor
,
8340 nsr_id
= db_nsr
["_id"]
8341 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8342 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8343 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8344 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8346 "collection": "nsrs",
8347 "filter": {"_id": nsr_id
},
8348 "path": db_update_entry
,
8353 element_under_configuration
= nsr_id
8357 vnfr_id
= db_vnfr
["_id"]
8358 osm_config
["osm"]["vnf_id"] = vnfr_id
8360 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8362 if vca_type
== "native_charm":
8365 index_number
= vdu_index
or 0
8368 element_type
= "VNF"
8369 element_under_configuration
= vnfr_id
8370 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8372 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8373 element_type
= "VDU"
8374 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8375 osm_config
["osm"]["vdu_id"] = vdu_id
8377 namespace
+= ".{}".format(kdu_name
)
8378 element_type
= "KDU"
8379 element_under_configuration
= kdu_name
8380 osm_config
["osm"]["kdu_name"] = kdu_name
8383 if base_folder
["pkg-dir"]:
8384 artifact_path
= "{}/{}/{}/{}".format(
8385 base_folder
["folder"],
8386 base_folder
["pkg-dir"],
8389 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8394 artifact_path
= "{}/Scripts/{}/{}/".format(
8395 base_folder
["folder"],
8398 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8403 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8405 # get initial_config_primitive_list that applies to this element
8406 initial_config_primitive_list
= config_descriptor
.get(
8407 "initial-config-primitive"
8411 "Initial config primitive list > {}".format(
8412 initial_config_primitive_list
8416 # add config if not present for NS charm
8417 ee_descriptor_id
= ee_config_descriptor
.get("id")
8418 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8419 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8420 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8424 "Initial config primitive list #2 > {}".format(
8425 initial_config_primitive_list
8428 # n2vc_redesign STEP 3.1
8429 # find old ee_id if exists
8430 ee_id
= vca_deployed
.get("ee_id")
8432 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8433 # create or register execution environment in VCA. Only for native charms when healing
8434 if vca_type
== "native_charm":
8435 step
= "Waiting to VM being up and getting IP address"
8436 self
.logger
.debug(logging_text
+ step
)
8437 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8446 credentials
= {"hostname": rw_mgmt_ip
}
8448 username
= deep_get(
8449 config_descriptor
, ("config-access", "ssh-access", "default-user")
8451 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8452 # merged. Meanwhile let's get username from initial-config-primitive
8453 if not username
and initial_config_primitive_list
:
8454 for config_primitive
in initial_config_primitive_list
:
8455 for param
in config_primitive
.get("parameter", ()):
8456 if param
["name"] == "ssh-username":
8457 username
= param
["value"]
8461 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8462 "'config-access.ssh-access.default-user'"
8464 credentials
["username"] = username
8466 # n2vc_redesign STEP 3.2
8467 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8468 self
._write
_configuration
_status
(
8470 vca_index
=vca_index
,
8471 status
="REGISTERING",
8472 element_under_configuration
=element_under_configuration
,
8473 element_type
=element_type
,
8476 step
= "register execution environment {}".format(credentials
)
8477 self
.logger
.debug(logging_text
+ step
)
8478 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8479 credentials
=credentials
,
8480 namespace
=namespace
,
8485 # update ee_id en db
8487 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8489 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8491 # for compatibility with MON/POL modules, the need model and application name at database
8492 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8493 # Not sure if this need to be done when healing
8495 ee_id_parts = ee_id.split(".")
8496 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8497 if len(ee_id_parts) >= 2:
8498 model_name = ee_id_parts[0]
8499 application_name = ee_id_parts[1]
8500 db_nsr_update[db_update_entry + "model"] = model_name
8501 db_nsr_update[db_update_entry + "application"] = application_name
8504 # n2vc_redesign STEP 3.3
8505 # Install configuration software. Only for native charms.
8506 step
= "Install configuration Software"
8508 self
._write
_configuration
_status
(
8510 vca_index
=vca_index
,
8511 status
="INSTALLING SW",
8512 element_under_configuration
=element_under_configuration
,
8513 element_type
=element_type
,
8514 # other_update=db_nsr_update,
8518 # TODO check if already done
8519 self
.logger
.debug(logging_text
+ step
)
8521 if vca_type
== "native_charm":
8522 config_primitive
= next(
8523 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8526 if config_primitive
:
8527 config
= self
._map
_primitive
_params
(
8528 config_primitive
, {}, deploy_params
8530 await self
.vca_map
[vca_type
].install_configuration_sw(
8532 artifact_path
=artifact_path
,
8540 # write in db flag of configuration_sw already installed
8542 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8545 # Not sure if this need to be done when healing
8547 # add relations for this VCA (wait for other peers related with this VCA)
8548 await self._add_vca_relations(
8549 logging_text=logging_text,
8552 vca_index=vca_index,
8556 # if SSH access is required, then get execution environment SSH public
8557 # if native charm we have waited already to VM be UP
8558 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8561 # self.logger.debug("get ssh key block")
8563 config_descriptor
, ("config-access", "ssh-access", "required")
8565 # self.logger.debug("ssh key needed")
8566 # Needed to inject a ssh key
8569 ("config-access", "ssh-access", "default-user"),
8571 step
= "Install configuration Software, getting public ssh key"
8572 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8573 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8576 step
= "Insert public key into VM user={} ssh_key={}".format(
8580 # self.logger.debug("no need to get ssh key")
8581 step
= "Waiting to VM being up and getting IP address"
8582 self
.logger
.debug(logging_text
+ step
)
8584 # n2vc_redesign STEP 5.1
8585 # wait for RO (ip-address) Insert pub_key into VM
8586 # IMPORTANT: We need do wait for RO to complete healing operation.
8587 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8590 rw_mgmt_ip
= await self
.wait_kdu_up(
8591 logging_text
, nsr_id
, vnfr_id
, kdu_name
8594 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8604 rw_mgmt_ip
= None # This is for a NS configuration
8606 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8608 # store rw_mgmt_ip in deploy params for later replacement
8609 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8612 # get run-day1 operation parameter
8613 runDay1
= deploy_params
.get("run-day1", False)
8615 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8618 # n2vc_redesign STEP 6 Execute initial config primitive
8619 step
= "execute initial config primitive"
8621 # wait for dependent primitives execution (NS -> VNF -> VDU)
8622 if initial_config_primitive_list
:
8623 await self
._wait
_dependent
_n
2vc
(
8624 nsr_id
, vca_deployed_list
, vca_index
8627 # stage, in function of element type: vdu, kdu, vnf or ns
8628 my_vca
= vca_deployed_list
[vca_index
]
8629 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8631 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8632 elif my_vca
.get("member-vnf-index"):
8634 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8637 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8639 self
._write
_configuration
_status
(
8640 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8643 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8645 check_if_terminated_needed
= True
8646 for initial_config_primitive
in initial_config_primitive_list
:
8647 # adding information on the vca_deployed if it is a NS execution environment
8648 if not vca_deployed
["member-vnf-index"]:
8649 deploy_params
["ns_config_info"] = json
.dumps(
8650 self
._get
_ns
_config
_info
(nsr_id
)
8652 # TODO check if already done
8653 primitive_params_
= self
._map
_primitive
_params
(
8654 initial_config_primitive
, {}, deploy_params
8657 step
= "execute primitive '{}' params '{}'".format(
8658 initial_config_primitive
["name"], primitive_params_
8660 self
.logger
.debug(logging_text
+ step
)
8661 await self
.vca_map
[vca_type
].exec_primitive(
8663 primitive_name
=initial_config_primitive
["name"],
8664 params_dict
=primitive_params_
,
8669 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8670 if check_if_terminated_needed
:
8671 if config_descriptor
.get("terminate-config-primitive"):
8675 {db_update_entry
+ "needed_terminate": True},
8677 check_if_terminated_needed
= False
8679 # TODO register in database that primitive is done
8681 # STEP 7 Configure metrics
8682 # Not sure if this need to be done when healing
8684 if vca_type == "helm" or vca_type == "helm-v3":
8685 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8687 artifact_path=artifact_path,
8688 ee_config_descriptor=ee_config_descriptor,
8691 target_ip=rw_mgmt_ip,
8697 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8700 for job in prometheus_jobs:
8703 {"job_name": job["job_name"]},
8706 fail_on_empty=False,
8710 step
= "instantiated at VCA"
8711 self
.logger
.debug(logging_text
+ step
)
8713 self
._write
_configuration
_status
(
8714 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8717 except Exception as e
: # TODO not use Exception but N2VC exception
8718 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8720 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8723 "Exception while {} : {}".format(step
, e
), exc_info
=True
8725 self
._write
_configuration
_status
(
8726 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8728 raise LcmException("{} {}".format(step
, e
)) from e
8730 async def _wait_heal_ro(
8736 while time() <= start_time
+ timeout
:
8737 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8738 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8739 "operational-status"
8741 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8742 if operational_status_ro
!= "healing":
8744 await asyncio
.sleep(15, loop
=self
.loop
)
8745 else: # timeout_ns_deploy
8746 raise NgRoException("Timeout waiting ns to deploy")
8748 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8750 Vertical Scale the VDUs in a NS
8752 :param: nsr_id: NS Instance ID
8753 :param: nslcmop_id: nslcmop ID of migrate
8756 # Try to lock HA task here
8757 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8758 if not task_is_locked_by_me
:
8760 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8761 self
.logger
.debug(logging_text
+ "Enter")
8762 # get all needed from database
8764 db_nslcmop_update
= {}
8765 nslcmop_operation_state
= None
8769 # in case of error, indicates what part of scale was failed to put nsr at error status
8770 start_deploy
= time()
8773 # wait for any previous tasks in process
8774 step
= "Waiting for previous operations to terminate"
8775 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8777 self
._write
_ns
_status
(
8780 current_operation
="VerticalScale",
8781 current_operation_id
=nslcmop_id
,
8783 step
= "Getting nslcmop from database"
8785 step
+ " after having waited for previous tasks to be completed"
8787 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8788 operationParams
= db_nslcmop
.get("operationParams")
8790 target
.update(operationParams
)
8791 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8792 self
.logger
.debug("RO return > {}".format(desc
))
8793 action_id
= desc
["action_id"]
8794 await self
._wait
_ng
_ro
(
8799 self
.timeout
.verticalscale
,
8800 operation
="verticalscale",
8802 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8803 self
.logger
.error("Exit Exception {}".format(e
))
8805 except asyncio
.CancelledError
:
8806 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8807 exc
= "Operation was cancelled"
8808 except Exception as e
:
8809 exc
= traceback
.format_exc()
8810 self
.logger
.critical(
8811 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8814 self
._write
_ns
_status
(
8817 current_operation
="IDLE",
8818 current_operation_id
=None,
8821 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8822 nslcmop_operation_state
= "FAILED"
8824 nslcmop_operation_state
= "COMPLETED"
8825 db_nslcmop_update
["detailed-status"] = "Done"
8826 db_nsr_update
["detailed-status"] = "Done"
8828 self
._write
_op
_status
(
8832 operation_state
=nslcmop_operation_state
,
8833 other_update
=db_nslcmop_update
,
8835 if nslcmop_operation_state
:
8839 "nslcmop_id": nslcmop_id
,
8840 "operationState": nslcmop_operation_state
,
8842 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8843 except Exception as e
:
8845 logging_text
+ "kafka_write notification Exception {}".format(e
)
8847 self
.logger
.debug(logging_text
+ "Exit")
8848 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")