1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import SystemRandom
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
161 self
.conn_helm_ee
= LCMHelmConn(
163 vca_config
=self
.vca_config
,
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
167 self
.k8sclusterhelm2
= K8sHelmConnector(
168 kubectl_command
=self
.vca_config
.kubectlpath
,
169 helm_command
=self
.vca_config
.helmpath
,
176 self
.k8sclusterhelm3
= K8sHelm3Connector(
177 kubectl_command
=self
.vca_config
.kubectlpath
,
178 helm_command
=self
.vca_config
.helm3path
,
185 self
.k8sclusterjuju
= K8sJujuConnector(
186 kubectl_command
=self
.vca_config
.kubectlpath
,
187 juju_command
=self
.vca_config
.jujupath
,
189 on_update_db
=self
._on
_update
_k
8s
_db
,
194 self
.k8scluster_map
= {
195 "helm-chart": self
.k8sclusterhelm2
,
196 "helm-chart-v3": self
.k8sclusterhelm3
,
197 "chart": self
.k8sclusterhelm3
,
198 "juju-bundle": self
.k8sclusterjuju
,
199 "juju": self
.k8sclusterjuju
,
203 "lxc_proxy_charm": self
.n2vc
,
204 "native_charm": self
.n2vc
,
205 "k8s_proxy_charm": self
.n2vc
,
206 "helm": self
.conn_helm_ee
,
207 "helm-v3": self
.conn_helm_ee
,
211 self
.RO
= NgRoClient(**self
.ro_config
.to_dict())
213 self
.op_status_map
= {
214 "instantiation": self
.RO
.status
,
215 "termination": self
.RO
.status
,
216 "migrate": self
.RO
.status
,
217 "healing": self
.RO
.recreate_status
,
218 "verticalscale": self
.RO
.status
,
219 "start_stop_rebuild": self
.RO
.status
,
223 def increment_ip_mac(ip_mac
, vm_index
=1):
224 if not isinstance(ip_mac
, str):
227 # try with ipv4 look for last dot
228 i
= ip_mac
.rfind(".")
231 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
232 # try with ipv6 or mac look for last colon. Operate in hex
233 i
= ip_mac
.rfind(":")
236 # format in hex, len can be 2 for mac or 4 for ipv6
237 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
238 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
244 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
245 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
248 # TODO filter RO descriptor fields...
252 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
253 db_dict
["deploymentStatus"] = ro_descriptor
254 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
256 except Exception as e
:
258 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
261 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
262 # remove last dot from path (if exists)
263 if path
.endswith("."):
266 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
267 # .format(table, filter, path, updated_data))
269 nsr_id
= filter.get("_id")
271 # read ns record from database
272 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
273 current_ns_status
= nsr
.get("nsState")
275 # get vca status for NS
276 status_dict
= await self
.n2vc
.get_status(
277 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
282 db_dict
["vcaStatus"] = status_dict
284 # update configurationStatus for this VCA
286 vca_index
= int(path
[path
.rfind(".") + 1 :])
289 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
291 vca_status
= vca_list
[vca_index
].get("status")
293 configuration_status_list
= nsr
.get("configurationStatus")
294 config_status
= configuration_status_list
[vca_index
].get("status")
296 if config_status
== "BROKEN" and vca_status
!= "failed":
297 db_dict
["configurationStatus"][vca_index
] = "READY"
298 elif config_status
!= "BROKEN" and vca_status
== "failed":
299 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
300 except Exception as e
:
301 # not update configurationStatus
302 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
304 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
305 # if nsState = 'DEGRADED' check if all is OK
307 if current_ns_status
in ("READY", "DEGRADED"):
308 error_description
= ""
310 if status_dict
.get("machines"):
311 for machine_id
in status_dict
.get("machines"):
312 machine
= status_dict
.get("machines").get(machine_id
)
313 # check machine agent-status
314 if machine
.get("agent-status"):
315 s
= machine
.get("agent-status").get("status")
318 error_description
+= (
319 "machine {} agent-status={} ; ".format(
323 # check machine instance status
324 if machine
.get("instance-status"):
325 s
= machine
.get("instance-status").get("status")
328 error_description
+= (
329 "machine {} instance-status={} ; ".format(
334 if status_dict
.get("applications"):
335 for app_id
in status_dict
.get("applications"):
336 app
= status_dict
.get("applications").get(app_id
)
337 # check application status
338 if app
.get("status"):
339 s
= app
.get("status").get("status")
342 error_description
+= (
343 "application {} status={} ; ".format(app_id
, s
)
346 if error_description
:
347 db_dict
["errorDescription"] = error_description
348 if current_ns_status
== "READY" and is_degraded
:
349 db_dict
["nsState"] = "DEGRADED"
350 if current_ns_status
== "DEGRADED" and not is_degraded
:
351 db_dict
["nsState"] = "READY"
354 self
.update_db_2("nsrs", nsr_id
, db_dict
)
356 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
358 except Exception as e
:
359 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
361 async def _on_update_k8s_db(
362 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
365 Updating vca status in NSR record
366 :param cluster_uuid: UUID of a k8s cluster
367 :param kdu_instance: The unique name of the KDU instance
368 :param filter: To get nsr_id
369 :cluster_type: The cluster type (juju, k8s)
373 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
374 # .format(cluster_uuid, kdu_instance, filter))
376 nsr_id
= filter.get("_id")
378 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
379 cluster_uuid
=cluster_uuid
,
380 kdu_instance
=kdu_instance
,
382 complete_status
=True,
388 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
391 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
395 self
.update_db_2("nsrs", nsr_id
, db_dict
)
396 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
398 except Exception as e
:
399 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
402 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
405 undefined
=StrictUndefined
,
406 autoescape
=select_autoescape(default_for_string
=True, default
=True),
408 template
= env
.from_string(cloud_init_text
)
409 return template
.render(additional_params
or {})
410 except UndefinedError
as e
:
412 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
413 "file, must be provided in the instantiation parameters inside the "
414 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
416 except (TemplateError
, TemplateNotFound
) as e
:
418 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
423 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
424 cloud_init_content
= cloud_init_file
= None
426 if vdu
.get("cloud-init-file"):
427 base_folder
= vnfd
["_admin"]["storage"]
428 if base_folder
["pkg-dir"]:
429 cloud_init_file
= "{}/{}/cloud_init/{}".format(
430 base_folder
["folder"],
431 base_folder
["pkg-dir"],
432 vdu
["cloud-init-file"],
435 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
436 base_folder
["folder"],
437 vdu
["cloud-init-file"],
439 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
440 cloud_init_content
= ci_file
.read()
441 elif vdu
.get("cloud-init"):
442 cloud_init_content
= vdu
["cloud-init"]
444 return cloud_init_content
445 except FsException
as e
:
447 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
448 vnfd
["id"], vdu
["id"], cloud_init_file
, e
452 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
454 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
456 additional_params
= vdur
.get("additionalParams")
457 return parse_yaml_strings(additional_params
)
459 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
461 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
462 :param vnfd: input vnfd
463 :param new_id: overrides vnf id if provided
464 :param additionalParams: Instantiation params for VNFs provided
465 :param nsrId: Id of the NSR
466 :return: copy of vnfd
468 vnfd_RO
= deepcopy(vnfd
)
469 # remove unused by RO configuration, monitoring, scaling and internal keys
470 vnfd_RO
.pop("_id", None)
471 vnfd_RO
.pop("_admin", None)
472 vnfd_RO
.pop("monitoring-param", None)
473 vnfd_RO
.pop("scaling-group-descriptor", None)
474 vnfd_RO
.pop("kdu", None)
475 vnfd_RO
.pop("k8s-cluster", None)
477 vnfd_RO
["id"] = new_id
479 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
480 for vdu
in get_iterable(vnfd_RO
, "vdu"):
481 vdu
.pop("cloud-init-file", None)
482 vdu
.pop("cloud-init", None)
486 def ip_profile_2_RO(ip_profile
):
487 RO_ip_profile
= deepcopy(ip_profile
)
488 if "dns-server" in RO_ip_profile
:
489 if isinstance(RO_ip_profile
["dns-server"], list):
490 RO_ip_profile
["dns-address"] = []
491 for ds
in RO_ip_profile
.pop("dns-server"):
492 RO_ip_profile
["dns-address"].append(ds
["address"])
494 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
495 if RO_ip_profile
.get("ip-version") == "ipv4":
496 RO_ip_profile
["ip-version"] = "IPv4"
497 if RO_ip_profile
.get("ip-version") == "ipv6":
498 RO_ip_profile
["ip-version"] = "IPv6"
499 if "dhcp-params" in RO_ip_profile
:
500 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
503 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
504 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
505 if db_vim
["_admin"]["operationalState"] != "ENABLED":
507 "VIM={} is not available. operationalState={}".format(
508 vim_account
, db_vim
["_admin"]["operationalState"]
511 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
514 def get_ro_wim_id_for_wim_account(self
, wim_account
):
515 if isinstance(wim_account
, str):
516 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
517 if db_wim
["_admin"]["operationalState"] != "ENABLED":
519 "WIM={} is not available. operationalState={}".format(
520 wim_account
, db_wim
["_admin"]["operationalState"]
523 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
528 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
529 db_vdu_push_list
= []
531 db_update
= {"_admin.modified": time()}
533 for vdu_id
, vdu_count
in vdu_create
.items():
537 for vdur
in reversed(db_vnfr
["vdur"])
538 if vdur
["vdu-id-ref"] == vdu_id
543 # Read the template saved in the db:
545 "No vdur in the database. Using the vdur-template to scale"
547 vdur_template
= db_vnfr
.get("vdur-template")
548 if not vdur_template
:
550 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
554 vdur
= vdur_template
[0]
555 # Delete a template from the database after using it
558 {"_id": db_vnfr
["_id"]},
560 pull
={"vdur-template": {"_id": vdur
["_id"]}},
562 for count
in range(vdu_count
):
563 vdur_copy
= deepcopy(vdur
)
564 vdur_copy
["status"] = "BUILD"
565 vdur_copy
["status-detailed"] = None
566 vdur_copy
["ip-address"] = None
567 vdur_copy
["_id"] = str(uuid4())
568 vdur_copy
["count-index"] += count
+ 1
569 vdur_copy
["id"] = "{}-{}".format(
570 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
572 vdur_copy
.pop("vim_info", None)
573 for iface
in vdur_copy
["interfaces"]:
574 if iface
.get("fixed-ip"):
575 iface
["ip-address"] = self
.increment_ip_mac(
576 iface
["ip-address"], count
+ 1
579 iface
.pop("ip-address", None)
580 if iface
.get("fixed-mac"):
581 iface
["mac-address"] = self
.increment_ip_mac(
582 iface
["mac-address"], count
+ 1
585 iface
.pop("mac-address", None)
589 ) # only first vdu can be managment of vnf
590 db_vdu_push_list
.append(vdur_copy
)
591 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
593 if len(db_vnfr
["vdur"]) == 1:
594 # The scale will move to 0 instances
596 "Scaling to 0 !, creating the template with the last vdur"
598 template_vdur
= [db_vnfr
["vdur"][0]]
599 for vdu_id
, vdu_count
in vdu_delete
.items():
601 indexes_to_delete
= [
603 for iv
in enumerate(db_vnfr
["vdur"])
604 if iv
[1]["vdu-id-ref"] == vdu_id
608 "vdur.{}.status".format(i
): "DELETING"
609 for i
in indexes_to_delete
[-vdu_count
:]
613 # it must be deleted one by one because common.db does not allow otherwise
616 for v
in reversed(db_vnfr
["vdur"])
617 if v
["vdu-id-ref"] == vdu_id
619 for vdu
in vdus_to_delete
[:vdu_count
]:
622 {"_id": db_vnfr
["_id"]},
624 pull
={"vdur": {"_id": vdu
["_id"]}},
628 db_push
["vdur"] = db_vdu_push_list
630 db_push
["vdur-template"] = template_vdur
633 db_vnfr
["vdur-template"] = template_vdur
634 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
635 # modify passed dictionary db_vnfr
636 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
637 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
639 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
641 Updates database nsr with the RO info for the created vld
642 :param ns_update_nsr: dictionary to be filled with the updated info
643 :param db_nsr: content of db_nsr. This is also modified
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
648 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
649 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
650 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
652 vld
["vim-id"] = net_RO
.get("vim_net_id")
653 vld
["name"] = net_RO
.get("vim_name")
654 vld
["status"] = net_RO
.get("status")
655 vld
["status-detailed"] = net_RO
.get("error_msg")
656 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
660 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
663 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
665 for db_vnfr
in db_vnfrs
.values():
666 vnfr_update
= {"status": "ERROR"}
667 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
668 if "status" not in vdur
:
669 vdur
["status"] = "ERROR"
670 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
672 vdur
["status-detailed"] = str(error_text
)
674 "vdur.{}.status-detailed".format(vdu_index
)
676 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
677 except DbException
as e
:
678 self
.logger
.error("Cannot update vnf. {}".format(e
))
680 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
682 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
683 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
684 :param nsr_desc_RO: nsr descriptor from RO
685 :return: Nothing, LcmException is raised on errors
687 for vnf_index
, db_vnfr
in db_vnfrs
.items():
688 for vnf_RO
in nsr_desc_RO
["vnfs"]:
689 if vnf_RO
["member_vnf_index"] != vnf_index
:
692 if vnf_RO
.get("ip_address"):
693 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
696 elif not db_vnfr
.get("ip-address"):
697 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
698 raise LcmExceptionNoMgmtIP(
699 "ns member_vnf_index '{}' has no IP address".format(
704 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
705 vdur_RO_count_index
= 0
706 if vdur
.get("pdu-type"):
708 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
709 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
711 if vdur
["count-index"] != vdur_RO_count_index
:
712 vdur_RO_count_index
+= 1
714 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
715 if vdur_RO
.get("ip_address"):
716 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
718 vdur
["ip-address"] = None
719 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
720 vdur
["name"] = vdur_RO
.get("vim_name")
721 vdur
["status"] = vdur_RO
.get("status")
722 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
723 for ifacer
in get_iterable(vdur
, "interfaces"):
724 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
725 if ifacer
["name"] == interface_RO
.get("internal_name"):
726 ifacer
["ip-address"] = interface_RO
.get(
729 ifacer
["mac-address"] = interface_RO
.get(
735 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
736 "from VIM info".format(
737 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
740 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
750 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
751 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
752 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
754 vld
["vim-id"] = net_RO
.get("vim_net_id")
755 vld
["name"] = net_RO
.get("vim_name")
756 vld
["status"] = net_RO
.get("status")
757 vld
["status-detailed"] = net_RO
.get("error_msg")
758 vnfr_update
["vld.{}".format(vld_index
)] = vld
762 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
767 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
772 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
777 def _get_ns_config_info(self
, nsr_id
):
779 Generates a mapping between vnf,vdu elements and the N2VC id
780 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
781 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
782 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
783 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
786 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
788 ns_config_info
= {"osm-config-mapping": mapping
}
789 for vca
in vca_deployed_list
:
790 if not vca
["member-vnf-index"]:
792 if not vca
["vdu_id"]:
793 mapping
[vca
["member-vnf-index"]] = vca
["application"]
797 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
799 ] = vca
["application"]
800 return ns_config_info
802 async def _instantiate_ng_ro(
818 def get_vim_account(vim_account_id
):
820 if vim_account_id
in db_vims
:
821 return db_vims
[vim_account_id
]
822 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
823 db_vims
[vim_account_id
] = db_vim
826 # modify target_vld info with instantiation parameters
827 def parse_vld_instantiation_params(
828 target_vim
, target_vld
, vld_params
, target_sdn
830 if vld_params
.get("ip-profile"):
831 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
832 vld_params
["ip-profile"]
834 if vld_params
.get("provider-network"):
835 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
838 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
839 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
843 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
844 # if wim_account_id is specified in vld_params, validate if it is feasible.
845 wim_account_id
, db_wim
= select_feasible_wim_account(
846 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
850 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
851 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
852 # update vld_params with correct WIM account Id
853 vld_params
["wimAccountId"] = wim_account_id
855 target_wim
= "wim:{}".format(wim_account_id
)
856 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
857 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
858 if len(sdn_ports
) > 0:
859 target_vld
["vim_info"][target_wim
] = target_wim_attrs
860 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
863 "Target VLD with WIM data: {:s}".format(str(target_vld
))
866 for param
in ("vim-network-name", "vim-network-id"):
867 if vld_params
.get(param
):
868 if isinstance(vld_params
[param
], dict):
869 for vim
, vim_net
in vld_params
[param
].items():
870 other_target_vim
= "vim:" + vim
872 target_vld
["vim_info"],
873 (other_target_vim
, param
.replace("-", "_")),
876 else: # isinstance str
877 target_vld
["vim_info"][target_vim
][
878 param
.replace("-", "_")
879 ] = vld_params
[param
]
880 if vld_params
.get("common_id"):
881 target_vld
["common_id"] = vld_params
.get("common_id")
883 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
884 def update_ns_vld_target(target
, ns_params
):
885 for vnf_params
in ns_params
.get("vnf", ()):
886 if vnf_params
.get("vimAccountId"):
890 for vnfr
in db_vnfrs
.values()
891 if vnf_params
["member-vnf-index"]
892 == vnfr
["member-vnf-index-ref"]
896 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
899 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
900 target_vld
= find_in_list(
901 get_iterable(vdur
, "interfaces"),
902 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
905 vld_params
= find_in_list(
906 get_iterable(ns_params
, "vld"),
907 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
910 if vnf_params
.get("vimAccountId") not in a_vld
.get(
913 target_vim_network_list
= [
914 v
for _
, v
in a_vld
.get("vim_info").items()
916 target_vim_network_name
= next(
918 item
.get("vim_network_name", "")
919 for item
in target_vim_network_list
924 target
["ns"]["vld"][a_index
].get("vim_info").update(
926 "vim:{}".format(vnf_params
["vimAccountId"]): {
927 "vim_network_name": target_vim_network_name
,
933 for param
in ("vim-network-name", "vim-network-id"):
934 if vld_params
.get(param
) and isinstance(
935 vld_params
[param
], dict
937 for vim
, vim_net
in vld_params
[
940 other_target_vim
= "vim:" + vim
942 target
["ns"]["vld"][a_index
].get(
947 param
.replace("-", "_"),
952 nslcmop_id
= db_nslcmop
["_id"]
954 "name": db_nsr
["name"],
957 "image": deepcopy(db_nsr
["image"]),
958 "flavor": deepcopy(db_nsr
["flavor"]),
959 "action_id": nslcmop_id
,
960 "cloud_init_content": {},
962 for image
in target
["image"]:
963 image
["vim_info"] = {}
964 for flavor
in target
["flavor"]:
965 flavor
["vim_info"] = {}
966 if db_nsr
.get("shared-volumes"):
967 target
["shared-volumes"] = deepcopy(db_nsr
["shared-volumes"])
968 for shared_volumes
in target
["shared-volumes"]:
969 shared_volumes
["vim_info"] = {}
970 if db_nsr
.get("affinity-or-anti-affinity-group"):
971 target
["affinity-or-anti-affinity-group"] = deepcopy(
972 db_nsr
["affinity-or-anti-affinity-group"]
974 for affinity_or_anti_affinity_group
in target
[
975 "affinity-or-anti-affinity-group"
977 affinity_or_anti_affinity_group
["vim_info"] = {}
979 if db_nslcmop
.get("lcmOperationType") != "instantiate":
980 # get parameters of instantiation:
981 db_nslcmop_instantiate
= self
.db
.get_list(
984 "nsInstanceId": db_nslcmop
["nsInstanceId"],
985 "lcmOperationType": "instantiate",
988 ns_params
= db_nslcmop_instantiate
.get("operationParams")
990 ns_params
= db_nslcmop
.get("operationParams")
991 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
992 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
995 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
996 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1000 "mgmt-network": vld
.get("mgmt-network", False),
1001 "type": vld
.get("type"),
1004 "vim_network_name": vld
.get("vim-network-name"),
1005 "vim_account_id": ns_params
["vimAccountId"],
1009 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1012 if vim_config
:= db_vim
.get("config"):
1013 if sdnc_id
:= vim_config
.get("sdn-controller"):
1014 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 target_vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1024 for nsd_vnf_profile
in nsd_vnf_profiles
:
1025 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1026 if cp
["virtual-link-profile-id"] == vld
["id"]:
1028 "member_vnf:{}.{}".format(
1029 cp
["constituent-cpd-id"][0][
1030 "constituent-base-element-id"
1032 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1034 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1036 # check at nsd descriptor, if there is an ip-profile
1038 nsd_vlp
= find_in_list(
1039 get_virtual_link_profiles(nsd
),
1040 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1045 and nsd_vlp
.get("virtual-link-protocol-data")
1046 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1048 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1052 # update vld_params with instantiation params
1053 vld_instantiation_params
= find_in_list(
1054 get_iterable(ns_params
, "vld"),
1055 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1057 if vld_instantiation_params
:
1058 vld_params
.update(vld_instantiation_params
)
1059 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1060 target
["ns"]["vld"].append(target_vld
)
1061 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1062 update_ns_vld_target(target
, ns_params
)
1064 for vnfr
in db_vnfrs
.values():
1065 vnfd
= find_in_list(
1066 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1068 vnf_params
= find_in_list(
1069 get_iterable(ns_params
, "vnf"),
1070 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1072 target_vnf
= deepcopy(vnfr
)
1073 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1074 for vld
in target_vnf
.get("vld", ()):
1075 # check if connected to a ns.vld, to fill target'
1076 vnf_cp
= find_in_list(
1077 vnfd
.get("int-virtual-link-desc", ()),
1078 lambda cpd
: cpd
.get("id") == vld
["id"],
1081 ns_cp
= "member_vnf:{}.{}".format(
1082 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1084 if cp2target
.get(ns_cp
):
1085 vld
["target"] = cp2target
[ns_cp
]
1088 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1090 # check if this network needs SDN assist
1092 if vld
.get("pci-interfaces"):
1093 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1094 sdnc_id
= db_vim
["config"].get("sdn-controller")
1096 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1097 target_sdn
= "sdn:{}".format(sdnc_id
)
1098 vld
["vim_info"][target_sdn
] = {
1100 "target_vim": target_vim
,
1102 "type": vld
.get("type"),
1105 # check at vnfd descriptor, if there is an ip-profile
1107 vnfd_vlp
= find_in_list(
1108 get_virtual_link_profiles(vnfd
),
1109 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1113 and vnfd_vlp
.get("virtual-link-protocol-data")
1114 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1116 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1119 # update vld_params with instantiation params
1121 vld_instantiation_params
= find_in_list(
1122 get_iterable(vnf_params
, "internal-vld"),
1123 lambda i_vld
: i_vld
["name"] == vld
["id"],
1125 if vld_instantiation_params
:
1126 vld_params
.update(vld_instantiation_params
)
1127 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1130 for vdur
in target_vnf
.get("vdur", ()):
1131 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1132 continue # This vdu must not be created
1133 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1135 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1138 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1139 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1142 and vdu_configuration
.get("config-access")
1143 and vdu_configuration
.get("config-access").get("ssh-access")
1145 vdur
["ssh-keys"] = ssh_keys_all
1146 vdur
["ssh-access-required"] = vdu_configuration
[
1148 ]["ssh-access"]["required"]
1151 and vnf_configuration
.get("config-access")
1152 and vnf_configuration
.get("config-access").get("ssh-access")
1153 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1155 vdur
["ssh-keys"] = ssh_keys_all
1156 vdur
["ssh-access-required"] = vnf_configuration
[
1158 ]["ssh-access"]["required"]
1159 elif ssh_keys_instantiation
and find_in_list(
1160 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1162 vdur
["ssh-keys"] = ssh_keys_instantiation
1164 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1166 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1168 if vdud
.get("cloud-init-file"):
1169 vdur
["cloud-init"] = "{}:file:{}".format(
1170 vnfd
["_id"], vdud
.get("cloud-init-file")
1172 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1173 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1174 base_folder
= vnfd
["_admin"]["storage"]
1175 if base_folder
["pkg-dir"]:
1176 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1177 base_folder
["folder"],
1178 base_folder
["pkg-dir"],
1179 vdud
.get("cloud-init-file"),
1182 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1183 base_folder
["folder"],
1184 vdud
.get("cloud-init-file"),
1186 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1187 target
["cloud_init_content"][
1190 elif vdud
.get("cloud-init"):
1191 vdur
["cloud-init"] = "{}:vdu:{}".format(
1192 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1194 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1195 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1198 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1199 deploy_params_vdu
= self
._format
_additional
_params
(
1200 vdur
.get("additionalParams") or {}
1202 deploy_params_vdu
["OSM"] = get_osm_params(
1203 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1205 vdur
["additionalParams"] = deploy_params_vdu
1208 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1209 if target_vim
not in ns_flavor
["vim_info"]:
1210 ns_flavor
["vim_info"][target_vim
] = {}
1213 # in case alternative images are provided we must check if they should be applied
1214 # for the vim_type, modify the vim_type taking into account
1215 ns_image_id
= int(vdur
["ns-image-id"])
1216 if vdur
.get("alt-image-ids"):
1217 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1218 vim_type
= db_vim
["vim_type"]
1219 for alt_image_id
in vdur
.get("alt-image-ids"):
1220 ns_alt_image
= target
["image"][int(alt_image_id
)]
1221 if vim_type
== ns_alt_image
.get("vim-type"):
1222 # must use alternative image
1224 "use alternative image id: {}".format(alt_image_id
)
1226 ns_image_id
= alt_image_id
1227 vdur
["ns-image-id"] = ns_image_id
1229 ns_image
= target
["image"][int(ns_image_id
)]
1230 if target_vim
not in ns_image
["vim_info"]:
1231 ns_image
["vim_info"][target_vim
] = {}
1234 if vdur
.get("affinity-or-anti-affinity-group-id"):
1235 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1236 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1237 if target_vim
not in ns_ags
["vim_info"]:
1238 ns_ags
["vim_info"][target_vim
] = {}
1241 if vdur
.get("shared-volumes-id"):
1242 for sv_id
in vdur
["shared-volumes-id"]:
1243 ns_sv
= find_in_list(
1244 target
["shared-volumes"], lambda sv
: sv_id
in sv
["id"]
1247 ns_sv
["vim_info"][target_vim
] = {}
1249 vdur
["vim_info"] = {target_vim
: {}}
1250 # instantiation parameters
1252 vdu_instantiation_params
= find_in_list(
1253 get_iterable(vnf_params
, "vdu"),
1254 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1256 if vdu_instantiation_params
:
1257 # Parse the vdu_volumes from the instantiation params
1258 vdu_volumes
= get_volumes_from_instantiation_params(
1259 vdu_instantiation_params
, vdud
1261 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1262 vdur
["additionalParams"]["OSM"][
1264 ] = vdu_instantiation_params
.get("vim-flavor-id")
1265 vdur_list
.append(vdur
)
1266 target_vnf
["vdur"] = vdur_list
1267 target
["vnf"].append(target_vnf
)
1269 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1270 desc
= await self
.RO
.deploy(nsr_id
, target
)
1271 self
.logger
.debug("RO return > {}".format(desc
))
1272 action_id
= desc
["action_id"]
1273 await self
._wait
_ng
_ro
(
1280 operation
="instantiation",
1285 "_admin.deployed.RO.operational-status": "running",
1286 "detailed-status": " ".join(stage
),
1288 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1289 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1290 self
._write
_op
_status
(nslcmop_id
, stage
)
1292 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1296 async def _wait_ng_ro(
1306 detailed_status_old
= None
1308 start_time
= start_time
or time()
1309 while time() <= start_time
+ timeout
:
1310 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1311 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1312 if desc_status
["status"] == "FAILED":
1313 raise NgRoException(desc_status
["details"])
1314 elif desc_status
["status"] == "BUILD":
1316 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1317 elif desc_status
["status"] == "DONE":
1319 stage
[2] = "Deployed at VIM"
1322 assert False, "ROclient.check_ns_status returns unknown {}".format(
1323 desc_status
["status"]
1325 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1326 detailed_status_old
= stage
[2]
1327 db_nsr_update
["detailed-status"] = " ".join(stage
)
1328 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1329 self
._write
_op
_status
(nslcmop_id
, stage
)
1330 await asyncio
.sleep(15)
1331 else: # timeout_ns_deploy
1332 raise NgRoException("Timeout waiting ns to deploy")
1334 async def _terminate_ng_ro(
1335 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1340 start_deploy
= time()
1347 "action_id": nslcmop_id
,
1349 desc
= await self
.RO
.deploy(nsr_id
, target
)
1350 action_id
= desc
["action_id"]
1351 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1354 + "ns terminate action at RO. action_id={}".format(action_id
)
1358 delete_timeout
= 20 * 60 # 20 minutes
1359 await self
._wait
_ng
_ro
(
1366 operation
="termination",
1368 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1370 await self
.RO
.delete(nsr_id
)
1371 except NgRoException
as e
:
1372 if e
.http_code
== 404: # not found
1373 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1374 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1376 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1378 elif e
.http_code
== 409: # conflict
1379 failed_detail
.append("delete conflict: {}".format(e
))
1382 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1385 failed_detail
.append("delete error: {}".format(e
))
1388 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1390 except Exception as e
:
1391 failed_detail
.append("delete error: {}".format(e
))
1393 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1397 stage
[2] = "Error deleting from VIM"
1399 stage
[2] = "Deleted from VIM"
1400 db_nsr_update
["detailed-status"] = " ".join(stage
)
1401 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1402 self
._write
_op
_status
(nslcmop_id
, stage
)
1405 raise LcmException("; ".join(failed_detail
))
1408 async def instantiate_RO(
1422 :param logging_text: preffix text to use at logging
1423 :param nsr_id: nsr identity
1424 :param nsd: database content of ns descriptor
1425 :param db_nsr: database content of ns record
1426 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1428 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1429 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1430 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1431 :return: None or exception
1434 start_deploy
= time()
1435 ns_params
= db_nslcmop
.get("operationParams")
1436 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1437 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1439 timeout_ns_deploy
= self
.timeout
.ns_deploy
1441 # Check for and optionally request placement optimization. Database will be updated if placement activated
1442 stage
[2] = "Waiting for Placement."
1443 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1444 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1445 for vnfr
in db_vnfrs
.values():
1446 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1449 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1451 return await self
._instantiate
_ng
_ro
(
1464 except Exception as e
:
1465 stage
[2] = "ERROR deploying at VIM"
1466 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1468 "Error deploying at VIM {}".format(e
),
1469 exc_info
=not isinstance(
1472 ROclient
.ROClientException
,
1481 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1483 Wait for kdu to be up, get ip address
1484 :param logging_text: prefix use for logging
1488 :return: IP address, K8s services
1491 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1494 while nb_tries
< 360:
1495 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1499 for x
in get_iterable(db_vnfr
, "kdur")
1500 if x
.get("kdu-name") == kdu_name
1506 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1508 if kdur
.get("status"):
1509 if kdur
["status"] in ("READY", "ENABLED"):
1510 return kdur
.get("ip-address"), kdur
.get("services")
1513 "target KDU={} is in error state".format(kdu_name
)
1516 await asyncio
.sleep(10)
1518 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1520 async def wait_vm_up_insert_key_ro(
1521 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1524 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1525 :param logging_text: prefix use for logging
1530 :param pub_key: public ssh key to inject, None to skip
1531 :param user: user to apply the public ssh key
1535 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1537 target_vdu_id
= None
1542 if ro_retries
>= 360: # 1 hour
1544 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1547 await asyncio
.sleep(10)
1550 if not target_vdu_id
:
1551 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1553 if not vdu_id
: # for the VNF case
1554 if db_vnfr
.get("status") == "ERROR":
1556 "Cannot inject ssh-key because target VNF is in error state"
1558 ip_address
= db_vnfr
.get("ip-address")
1564 for x
in get_iterable(db_vnfr
, "vdur")
1565 if x
.get("ip-address") == ip_address
1573 for x
in get_iterable(db_vnfr
, "vdur")
1574 if x
.get("vdu-id-ref") == vdu_id
1575 and x
.get("count-index") == vdu_index
1581 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1582 ): # If only one, this should be the target vdu
1583 vdur
= db_vnfr
["vdur"][0]
1586 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1587 vnfr_id
, vdu_id
, vdu_index
1590 # New generation RO stores information at "vim_info"
1593 if vdur
.get("vim_info"):
1595 t
for t
in vdur
["vim_info"]
1596 ) # there should be only one key
1597 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1599 vdur
.get("pdu-type")
1600 or vdur
.get("status") == "ACTIVE"
1601 or ng_ro_status
== "ACTIVE"
1603 ip_address
= vdur
.get("ip-address")
1606 target_vdu_id
= vdur
["vdu-id-ref"]
1607 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1609 "Cannot inject ssh-key because target VM is in error state"
1612 if not target_vdu_id
:
1615 # inject public key into machine
1616 if pub_key
and user
:
1617 self
.logger
.debug(logging_text
+ "Inserting RO key")
1618 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1619 if vdur
.get("pdu-type"):
1620 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1625 "action": "inject_ssh_key",
1629 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1631 desc
= await self
.RO
.deploy(nsr_id
, target
)
1632 action_id
= desc
["action_id"]
1633 await self
._wait
_ng
_ro
(
1634 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1637 except NgRoException
as e
:
1639 "Reaching max tries injecting key. Error: {}".format(e
)
1646 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1648 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1650 my_vca
= vca_deployed_list
[vca_index
]
1651 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1652 # vdu or kdu: no dependencies
1656 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1657 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1658 configuration_status_list
= db_nsr
["configurationStatus"]
1659 for index
, vca_deployed
in enumerate(configuration_status_list
):
1660 if index
== vca_index
:
1663 if not my_vca
.get("member-vnf-index") or (
1664 vca_deployed
.get("member-vnf-index")
1665 == my_vca
.get("member-vnf-index")
1667 internal_status
= configuration_status_list
[index
].get("status")
1668 if internal_status
== "READY":
1670 elif internal_status
== "BROKEN":
1672 "Configuration aborted because dependent charm/s has failed"
1677 # no dependencies, return
1679 await asyncio
.sleep(10)
1682 raise LcmException("Configuration aborted because dependent charm/s timeout")
1684 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1687 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1689 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1690 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1693 async def instantiate_N2VC(
1711 ee_config_descriptor
,
1713 nsr_id
= db_nsr
["_id"]
1714 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1715 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1716 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1717 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1719 "collection": "nsrs",
1720 "filter": {"_id": nsr_id
},
1721 "path": db_update_entry
,
1726 element_under_configuration
= nsr_id
1730 vnfr_id
= db_vnfr
["_id"]
1731 osm_config
["osm"]["vnf_id"] = vnfr_id
1733 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1735 if vca_type
== "native_charm":
1738 index_number
= vdu_index
or 0
1741 element_type
= "VNF"
1742 element_under_configuration
= vnfr_id
1743 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1745 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1746 element_type
= "VDU"
1747 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1748 osm_config
["osm"]["vdu_id"] = vdu_id
1750 namespace
+= ".{}".format(kdu_name
)
1751 element_type
= "KDU"
1752 element_under_configuration
= kdu_name
1753 osm_config
["osm"]["kdu_name"] = kdu_name
1756 if base_folder
["pkg-dir"]:
1757 artifact_path
= "{}/{}/{}/{}".format(
1758 base_folder
["folder"],
1759 base_folder
["pkg-dir"],
1762 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1767 artifact_path
= "{}/Scripts/{}/{}/".format(
1768 base_folder
["folder"],
1771 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1776 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1778 # get initial_config_primitive_list that applies to this element
1779 initial_config_primitive_list
= config_descriptor
.get(
1780 "initial-config-primitive"
1784 "Initial config primitive list > {}".format(
1785 initial_config_primitive_list
1789 # add config if not present for NS charm
1790 ee_descriptor_id
= ee_config_descriptor
.get("id")
1791 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1792 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1793 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1797 "Initial config primitive list #2 > {}".format(
1798 initial_config_primitive_list
1801 # n2vc_redesign STEP 3.1
1802 # find old ee_id if exists
1803 ee_id
= vca_deployed
.get("ee_id")
1805 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1806 # create or register execution environment in VCA
1807 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1808 self
._write
_configuration
_status
(
1810 vca_index
=vca_index
,
1812 element_under_configuration
=element_under_configuration
,
1813 element_type
=element_type
,
1816 step
= "create execution environment"
1817 self
.logger
.debug(logging_text
+ step
)
1821 if vca_type
== "k8s_proxy_charm":
1822 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1823 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1824 namespace
=namespace
,
1825 artifact_path
=artifact_path
,
1829 elif vca_type
== "helm" or vca_type
== "helm-v3":
1830 ee_id
, credentials
= await self
.vca_map
[
1832 ].create_execution_environment(
1833 namespace
=namespace
,
1837 artifact_path
=artifact_path
,
1838 chart_model
=vca_name
,
1842 ee_id
, credentials
= await self
.vca_map
[
1844 ].create_execution_environment(
1845 namespace
=namespace
,
1851 elif vca_type
== "native_charm":
1852 step
= "Waiting to VM being up and getting IP address"
1853 self
.logger
.debug(logging_text
+ step
)
1854 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1863 credentials
= {"hostname": rw_mgmt_ip
}
1865 username
= deep_get(
1866 config_descriptor
, ("config-access", "ssh-access", "default-user")
1868 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1869 # merged. Meanwhile let's get username from initial-config-primitive
1870 if not username
and initial_config_primitive_list
:
1871 for config_primitive
in initial_config_primitive_list
:
1872 for param
in config_primitive
.get("parameter", ()):
1873 if param
["name"] == "ssh-username":
1874 username
= param
["value"]
1878 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1879 "'config-access.ssh-access.default-user'"
1881 credentials
["username"] = username
1882 # n2vc_redesign STEP 3.2
1884 self
._write
_configuration
_status
(
1886 vca_index
=vca_index
,
1887 status
="REGISTERING",
1888 element_under_configuration
=element_under_configuration
,
1889 element_type
=element_type
,
1892 step
= "register execution environment {}".format(credentials
)
1893 self
.logger
.debug(logging_text
+ step
)
1894 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1895 credentials
=credentials
,
1896 namespace
=namespace
,
1901 # for compatibility with MON/POL modules, the need model and application name at database
1902 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1903 ee_id_parts
= ee_id
.split(".")
1904 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1905 if len(ee_id_parts
) >= 2:
1906 model_name
= ee_id_parts
[0]
1907 application_name
= ee_id_parts
[1]
1908 db_nsr_update
[db_update_entry
+ "model"] = model_name
1909 db_nsr_update
[db_update_entry
+ "application"] = application_name
1911 # n2vc_redesign STEP 3.3
1912 step
= "Install configuration Software"
1914 self
._write
_configuration
_status
(
1916 vca_index
=vca_index
,
1917 status
="INSTALLING SW",
1918 element_under_configuration
=element_under_configuration
,
1919 element_type
=element_type
,
1920 other_update
=db_nsr_update
,
1923 # TODO check if already done
1924 self
.logger
.debug(logging_text
+ step
)
1926 if vca_type
== "native_charm":
1927 config_primitive
= next(
1928 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1931 if config_primitive
:
1932 config
= self
._map
_primitive
_params
(
1933 config_primitive
, {}, deploy_params
1936 if vca_type
== "lxc_proxy_charm":
1937 if element_type
== "NS":
1938 num_units
= db_nsr
.get("config-units") or 1
1939 elif element_type
== "VNF":
1940 num_units
= db_vnfr
.get("config-units") or 1
1941 elif element_type
== "VDU":
1942 for v
in db_vnfr
["vdur"]:
1943 if vdu_id
== v
["vdu-id-ref"]:
1944 num_units
= v
.get("config-units") or 1
1946 if vca_type
!= "k8s_proxy_charm":
1947 await self
.vca_map
[vca_type
].install_configuration_sw(
1949 artifact_path
=artifact_path
,
1952 num_units
=num_units
,
1957 # write in db flag of configuration_sw already installed
1959 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1962 # add relations for this VCA (wait for other peers related with this VCA)
1963 is_relation_added
= await self
._add
_vca
_relations
(
1964 logging_text
=logging_text
,
1967 vca_index
=vca_index
,
1970 if not is_relation_added
:
1971 raise LcmException("Relations could not be added to VCA.")
1973 # if SSH access is required, then get execution environment SSH public
1974 # if native charm we have waited already to VM be UP
1975 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1978 # self.logger.debug("get ssh key block")
1980 config_descriptor
, ("config-access", "ssh-access", "required")
1982 # self.logger.debug("ssh key needed")
1983 # Needed to inject a ssh key
1986 ("config-access", "ssh-access", "default-user"),
1988 step
= "Install configuration Software, getting public ssh key"
1989 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1990 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1993 step
= "Insert public key into VM user={} ssh_key={}".format(
1997 # self.logger.debug("no need to get ssh key")
1998 step
= "Waiting to VM being up and getting IP address"
1999 self
.logger
.debug(logging_text
+ step
)
2001 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2004 # n2vc_redesign STEP 5.1
2005 # wait for RO (ip-address) Insert pub_key into VM
2008 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2009 logging_text
, nsr_id
, vnfr_id
, kdu_name
2011 vnfd
= self
.db
.get_one(
2013 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2015 kdu
= get_kdu(vnfd
, kdu_name
)
2017 service
["name"] for service
in get_kdu_services(kdu
)
2019 exposed_services
= []
2020 for service
in services
:
2021 if any(s
in service
["name"] for s
in kdu_services
):
2022 exposed_services
.append(service
)
2023 await self
.vca_map
[vca_type
].exec_primitive(
2025 primitive_name
="config",
2027 "osm-config": json
.dumps(
2029 k8s
={"services": exposed_services
}
2036 # This verification is needed in order to avoid trying to add a public key
2037 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2038 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2039 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2041 elif db_vnfr
.get("vdur"):
2042 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2052 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2054 # store rw_mgmt_ip in deploy params for later replacement
2055 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2057 # n2vc_redesign STEP 6 Execute initial config primitive
2058 step
= "execute initial config primitive"
2060 # wait for dependent primitives execution (NS -> VNF -> VDU)
2061 if initial_config_primitive_list
:
2062 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2064 # stage, in function of element type: vdu, kdu, vnf or ns
2065 my_vca
= vca_deployed_list
[vca_index
]
2066 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2068 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2069 elif my_vca
.get("member-vnf-index"):
2071 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2074 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2076 self
._write
_configuration
_status
(
2077 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2080 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2082 check_if_terminated_needed
= True
2083 for initial_config_primitive
in initial_config_primitive_list
:
2084 # adding information on the vca_deployed if it is a NS execution environment
2085 if not vca_deployed
["member-vnf-index"]:
2086 deploy_params
["ns_config_info"] = json
.dumps(
2087 self
._get
_ns
_config
_info
(nsr_id
)
2089 # TODO check if already done
2090 primitive_params_
= self
._map
_primitive
_params
(
2091 initial_config_primitive
, {}, deploy_params
2094 step
= "execute primitive '{}' params '{}'".format(
2095 initial_config_primitive
["name"], primitive_params_
2097 self
.logger
.debug(logging_text
+ step
)
2098 await self
.vca_map
[vca_type
].exec_primitive(
2100 primitive_name
=initial_config_primitive
["name"],
2101 params_dict
=primitive_params_
,
2106 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2107 if check_if_terminated_needed
:
2108 if config_descriptor
.get("terminate-config-primitive"):
2110 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2112 check_if_terminated_needed
= False
2114 # TODO register in database that primitive is done
2116 # STEP 7 Configure metrics
2117 if vca_type
== "helm" or vca_type
== "helm-v3":
2118 # TODO: review for those cases where the helm chart is a reference and
2119 # is not part of the NF package
2120 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2122 artifact_path
=artifact_path
,
2123 ee_config_descriptor
=ee_config_descriptor
,
2126 target_ip
=rw_mgmt_ip
,
2127 element_type
=element_type
,
2128 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2130 vdu_index
=vdu_index
,
2132 kdu_index
=kdu_index
,
2138 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2141 for job
in prometheus_jobs
:
2144 {"job_name": job
["job_name"]},
2147 fail_on_empty
=False,
2150 step
= "instantiated at VCA"
2151 self
.logger
.debug(logging_text
+ step
)
2153 self
._write
_configuration
_status
(
2154 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2157 except Exception as e
: # TODO not use Exception but N2VC exception
2158 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2160 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2163 "Exception while {} : {}".format(step
, e
), exc_info
=True
2165 self
._write
_configuration
_status
(
2166 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2168 raise LcmException("{}. {}".format(step
, e
)) from e
2170 def _write_ns_status(
2174 current_operation
: str,
2175 current_operation_id
: str,
2176 error_description
: str = None,
2177 error_detail
: str = None,
2178 other_update
: dict = None,
2181 Update db_nsr fields.
2184 :param current_operation:
2185 :param current_operation_id:
2186 :param error_description:
2187 :param error_detail:
2188 :param other_update: Other required changes at database if provided, will be cleared
2192 db_dict
= other_update
or {}
2195 ] = current_operation_id
# for backward compatibility
2196 db_dict
["_admin.current-operation"] = current_operation_id
2197 db_dict
["_admin.operation-type"] = (
2198 current_operation
if current_operation
!= "IDLE" else None
2200 db_dict
["currentOperation"] = current_operation
2201 db_dict
["currentOperationID"] = current_operation_id
2202 db_dict
["errorDescription"] = error_description
2203 db_dict
["errorDetail"] = error_detail
2206 db_dict
["nsState"] = ns_state
2207 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2208 except DbException
as e
:
2209 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2211 def _write_op_status(
2215 error_message
: str = None,
2216 queuePosition
: int = 0,
2217 operation_state
: str = None,
2218 other_update
: dict = None,
2221 db_dict
= other_update
or {}
2222 db_dict
["queuePosition"] = queuePosition
2223 if isinstance(stage
, list):
2224 db_dict
["stage"] = stage
[0]
2225 db_dict
["detailed-status"] = " ".join(stage
)
2226 elif stage
is not None:
2227 db_dict
["stage"] = str(stage
)
2229 if error_message
is not None:
2230 db_dict
["errorMessage"] = error_message
2231 if operation_state
is not None:
2232 db_dict
["operationState"] = operation_state
2233 db_dict
["statusEnteredTime"] = time()
2234 self
.update_db_2("nslcmops", op_id
, db_dict
)
2235 except DbException
as e
:
2237 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2240 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2242 nsr_id
= db_nsr
["_id"]
2243 # configurationStatus
2244 config_status
= db_nsr
.get("configurationStatus")
2247 "configurationStatus.{}.status".format(index
): status
2248 for index
, v
in enumerate(config_status
)
2252 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2254 except DbException
as e
:
2256 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2259 def _write_configuration_status(
2264 element_under_configuration
: str = None,
2265 element_type
: str = None,
2266 other_update
: dict = None,
2268 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2269 # .format(vca_index, status))
2272 db_path
= "configurationStatus.{}.".format(vca_index
)
2273 db_dict
= other_update
or {}
2275 db_dict
[db_path
+ "status"] = status
2276 if element_under_configuration
:
2278 db_path
+ "elementUnderConfiguration"
2279 ] = element_under_configuration
2281 db_dict
[db_path
+ "elementType"] = element_type
2282 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2283 except DbException
as e
:
2285 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2286 status
, nsr_id
, vca_index
, e
2290 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2292 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2293 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2294 Database is used because the result can be obtained from a different LCM worker in case of HA.
2295 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2296 :param db_nslcmop: database content of nslcmop
2297 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2298 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2299 computed 'vim-account-id'
2302 nslcmop_id
= db_nslcmop
["_id"]
2303 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2304 if placement_engine
== "PLA":
2306 logging_text
+ "Invoke and wait for placement optimization"
2308 await self
.msg
.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id
})
2309 db_poll_interval
= 5
2310 wait
= db_poll_interval
* 10
2312 while not pla_result
and wait
>= 0:
2313 await asyncio
.sleep(db_poll_interval
)
2314 wait
-= db_poll_interval
2315 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2316 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2320 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2323 for pla_vnf
in pla_result
["vnf"]:
2324 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2325 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2330 {"_id": vnfr
["_id"]},
2331 {"vim-account-id": pla_vnf
["vimAccountId"]},
2334 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2337 def _gather_vnfr_healing_alerts(self
, vnfr
, vnfd
):
2339 nsr_id
= vnfr
["nsr-id-ref"]
2340 df
= vnfd
.get("df", [{}])[0]
2341 # Checking for auto-healing configuration
2342 if "healing-aspect" in df
:
2343 healing_aspects
= df
["healing-aspect"]
2344 for healing
in healing_aspects
:
2345 for healing_policy
in healing
.get("healing-policy", ()):
2346 vdu_id
= healing_policy
["vdu-id"]
2348 (vdur
for vdur
in vnfr
["vdur"] if vdu_id
== vdur
["vdu-id-ref"]),
2353 metric_name
= "vm_status"
2354 vdu_name
= vdur
.get("name")
2355 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2357 name
= f
"healing_{uuid}"
2358 action
= healing_policy
2359 # action_on_recovery = healing.get("action-on-recovery")
2360 # cooldown_time = healing.get("cooldown-time")
2361 # day1 = healing.get("day1")
2365 "metric": metric_name
,
2368 "vnf_member_index": vnf_member_index
,
2369 "vdu_name": vdu_name
,
2371 "alarm_status": "ok",
2372 "action_type": "healing",
2375 alerts
.append(alert
)
2378 def _gather_vnfr_scaling_alerts(self
, vnfr
, vnfd
):
2380 nsr_id
= vnfr
["nsr-id-ref"]
2381 df
= vnfd
.get("df", [{}])[0]
2382 # Checking for auto-scaling configuration
2383 if "scaling-aspect" in df
:
2384 rel_operation_types
= {
2392 scaling_aspects
= df
["scaling-aspect"]
2393 all_vnfd_monitoring_params
= {}
2394 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
2395 for mp
in ivld
.get("monitoring-parameters", ()):
2396 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2397 for vdu
in vnfd
.get("vdu", ()):
2398 for mp
in vdu
.get("monitoring-parameter", ()):
2399 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2400 for df
in vnfd
.get("df", ()):
2401 for mp
in df
.get("monitoring-parameter", ()):
2402 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2403 for scaling_aspect
in scaling_aspects
:
2404 scaling_group_name
= scaling_aspect
.get("name", "")
2405 # Get monitored VDUs
2406 all_monitored_vdus
= set()
2407 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get(
2410 for vdu_delta
in delta
.get("vdu-delta", ()):
2411 all_monitored_vdus
.add(vdu_delta
.get("id"))
2412 monitored_vdurs
= list(
2414 lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
,
2418 if not monitored_vdurs
:
2420 "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
2423 for scaling_policy
in scaling_aspect
.get("scaling-policy", ()):
2424 if scaling_policy
["scaling-type"] != "automatic":
2426 threshold_time
= scaling_policy
.get("threshold-time", "1")
2427 cooldown_time
= scaling_policy
.get("cooldown-time", "0")
2428 for scaling_criteria
in scaling_policy
["scaling-criteria"]:
2429 monitoring_param_ref
= scaling_criteria
.get(
2430 "vnf-monitoring-param-ref"
2432 vnf_monitoring_param
= all_vnfd_monitoring_params
[
2433 monitoring_param_ref
2435 for vdur
in monitored_vdurs
:
2436 vdu_id
= vdur
["vdu-id-ref"]
2437 metric_name
= vnf_monitoring_param
.get("performance-metric")
2438 metric_name
= f
"osm_{metric_name}"
2439 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2440 scalein_threshold
= scaling_criteria
.get(
2441 "scale-in-threshold"
2443 scaleout_threshold
= scaling_criteria
.get(
2444 "scale-out-threshold"
2446 # Looking for min/max-number-of-instances
2447 instances_min_number
= 1
2448 instances_max_number
= 1
2449 vdu_profile
= df
["vdu-profile"]
2452 item
for item
in vdu_profile
if item
["id"] == vdu_id
2454 instances_min_number
= profile
.get(
2455 "min-number-of-instances", 1
2457 instances_max_number
= profile
.get(
2458 "max-number-of-instances", 1
2461 if scalein_threshold
:
2463 name
= f
"scalein_{uuid}"
2464 operation
= scaling_criteria
[
2465 "scale-in-relational-operation"
2467 rel_operator
= rel_operation_types
.get(operation
, "<=")
2468 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2469 expression
= f
"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
2472 "vnf_member_index": vnf_member_index
,
2478 "for": str(threshold_time
) + "m",
2481 action
= scaling_policy
2483 "scaling-group": scaling_group_name
,
2484 "cooldown-time": cooldown_time
,
2489 "metric": metric_name
,
2492 "vnf_member_index": vnf_member_index
,
2495 "alarm_status": "ok",
2496 "action_type": "scale_in",
2498 "prometheus_config": prom_cfg
,
2500 alerts
.append(alert
)
2502 if scaleout_threshold
:
2504 name
= f
"scaleout_{uuid}"
2505 operation
= scaling_criteria
[
2506 "scale-out-relational-operation"
2508 rel_operator
= rel_operation_types
.get(operation
, "<=")
2509 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2510 expression
= f
"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
2513 "vnf_member_index": vnf_member_index
,
2519 "for": str(threshold_time
) + "m",
2522 action
= scaling_policy
2524 "scaling-group": scaling_group_name
,
2525 "cooldown-time": cooldown_time
,
2530 "metric": metric_name
,
2533 "vnf_member_index": vnf_member_index
,
2536 "alarm_status": "ok",
2537 "action_type": "scale_out",
2539 "prometheus_config": prom_cfg
,
2541 alerts
.append(alert
)
2544 def update_nsrs_with_pla_result(self
, params
):
2546 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2548 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2550 except Exception as e
:
2551 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2553 async def instantiate(self
, nsr_id
, nslcmop_id
):
2556 :param nsr_id: ns instance to deploy
2557 :param nslcmop_id: operation to run
2561 # Try to lock HA task here
2562 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2563 if not task_is_locked_by_me
:
2565 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2569 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2570 self
.logger
.debug(logging_text
+ "Enter")
2572 # get all needed from database
2574 # database nsrs record
2577 # database nslcmops record
2580 # update operation on nsrs
2582 # update operation on nslcmops
2583 db_nslcmop_update
= {}
2585 timeout_ns_deploy
= self
.timeout
.ns_deploy
2587 nslcmop_operation_state
= None
2588 db_vnfrs
= {} # vnf's info indexed by member-index
2590 tasks_dict_info
= {} # from task to info text
2594 "Stage 1/5: preparation of the environment.",
2595 "Waiting for previous operations to terminate.",
2598 # ^ stage, step, VIM progress
2600 # wait for any previous tasks in process
2601 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2603 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2604 stage
[1] = "Reading from database."
2605 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2606 db_nsr_update
["detailed-status"] = "creating"
2607 db_nsr_update
["operational-status"] = "init"
2608 self
._write
_ns
_status
(
2610 ns_state
="BUILDING",
2611 current_operation
="INSTANTIATING",
2612 current_operation_id
=nslcmop_id
,
2613 other_update
=db_nsr_update
,
2615 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2617 # read from db: operation
2618 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2619 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2620 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2621 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2622 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2624 ns_params
= db_nslcmop
.get("operationParams")
2625 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2626 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2629 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2630 self
.logger
.debug(logging_text
+ stage
[1])
2631 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2632 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2633 self
.logger
.debug(logging_text
+ stage
[1])
2634 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2635 self
.fs
.sync(db_nsr
["nsd-id"])
2637 # nsr_name = db_nsr["name"] # TODO short-name??
2639 # read from db: vnf's of this ns
2640 stage
[1] = "Getting vnfrs from db."
2641 self
.logger
.debug(logging_text
+ stage
[1])
2642 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2644 # read from db: vnfd's for every vnf
2645 db_vnfds
= [] # every vnfd data
2647 # for each vnf in ns, read vnfd
2648 for vnfr
in db_vnfrs_list
:
2649 if vnfr
.get("kdur"):
2651 for kdur
in vnfr
["kdur"]:
2652 if kdur
.get("additionalParams"):
2653 kdur
["additionalParams"] = json
.loads(
2654 kdur
["additionalParams"]
2656 kdur_list
.append(kdur
)
2657 vnfr
["kdur"] = kdur_list
2659 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2660 vnfd_id
= vnfr
["vnfd-id"]
2661 vnfd_ref
= vnfr
["vnfd-ref"]
2662 self
.fs
.sync(vnfd_id
)
2664 # if we haven't this vnfd, read it from db
2665 if vnfd_id
not in db_vnfds
:
2667 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2670 self
.logger
.debug(logging_text
+ stage
[1])
2671 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2674 db_vnfds
.append(vnfd
)
2676 # Get or generates the _admin.deployed.VCA list
2677 vca_deployed_list
= None
2678 if db_nsr
["_admin"].get("deployed"):
2679 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2680 if vca_deployed_list
is None:
2681 vca_deployed_list
= []
2682 configuration_status_list
= []
2683 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2684 db_nsr_update
["configurationStatus"] = configuration_status_list
2685 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2686 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2687 elif isinstance(vca_deployed_list
, dict):
2688 # maintain backward compatibility. Change a dict to list at database
2689 vca_deployed_list
= list(vca_deployed_list
.values())
2690 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2691 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2694 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2696 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2697 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2699 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2700 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2701 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2703 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2706 # n2vc_redesign STEP 2 Deploy Network Scenario
2707 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2708 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2710 stage
[1] = "Deploying KDUs."
2711 # self.logger.debug(logging_text + "Before deploy_kdus")
2712 # Call to deploy_kdus in case exists the "vdu:kdu" param
2713 await self
.deploy_kdus(
2714 logging_text
=logging_text
,
2716 nslcmop_id
=nslcmop_id
,
2719 task_instantiation_info
=tasks_dict_info
,
2722 stage
[1] = "Getting VCA public key."
2723 # n2vc_redesign STEP 1 Get VCA public ssh-key
2724 # feature 1429. Add n2vc public key to needed VMs
2725 n2vc_key
= self
.n2vc
.get_public_key()
2726 n2vc_key_list
= [n2vc_key
]
2727 if self
.vca_config
.public_key
:
2728 n2vc_key_list
.append(self
.vca_config
.public_key
)
2730 stage
[1] = "Deploying NS at VIM."
2731 task_ro
= asyncio
.ensure_future(
2732 self
.instantiate_RO(
2733 logging_text
=logging_text
,
2737 db_nslcmop
=db_nslcmop
,
2740 n2vc_key_list
=n2vc_key_list
,
2744 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2745 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2747 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2748 stage
[1] = "Deploying Execution Environments."
2749 self
.logger
.debug(logging_text
+ stage
[1])
2751 # create namespace and certificate if any helm based EE is present in the NS
2752 if check_helm_ee_in_ns(db_vnfds
):
2753 # TODO: create EE namespace
2754 # create TLS certificates
2755 await self
.vca_map
["helm-v3"].create_tls_certificate(
2756 secret_name
="ee-tls-{}".format(nsr_id
),
2759 usage
="server auth",
2762 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2763 for vnf_profile
in get_vnf_profiles(nsd
):
2764 vnfd_id
= vnf_profile
["vnfd-id"]
2765 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2766 member_vnf_index
= str(vnf_profile
["id"])
2767 db_vnfr
= db_vnfrs
[member_vnf_index
]
2768 base_folder
= vnfd
["_admin"]["storage"]
2775 # Get additional parameters
2776 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2777 if db_vnfr
.get("additionalParamsForVnf"):
2778 deploy_params
.update(
2779 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2782 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2783 if descriptor_config
:
2785 logging_text
=logging_text
2786 + "member_vnf_index={} ".format(member_vnf_index
),
2789 nslcmop_id
=nslcmop_id
,
2795 member_vnf_index
=member_vnf_index
,
2796 vdu_index
=vdu_index
,
2797 kdu_index
=kdu_index
,
2799 deploy_params
=deploy_params
,
2800 descriptor_config
=descriptor_config
,
2801 base_folder
=base_folder
,
2802 task_instantiation_info
=tasks_dict_info
,
2806 # Deploy charms for each VDU that supports one.
2807 for vdud
in get_vdu_list(vnfd
):
2809 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2810 vdur
= find_in_list(
2811 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2814 if vdur
.get("additionalParams"):
2815 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2817 deploy_params_vdu
= deploy_params
2818 deploy_params_vdu
["OSM"] = get_osm_params(
2819 db_vnfr
, vdu_id
, vdu_count_index
=0
2821 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2823 self
.logger
.debug("VDUD > {}".format(vdud
))
2825 "Descriptor config > {}".format(descriptor_config
)
2827 if descriptor_config
:
2831 for vdu_index
in range(vdud_count
):
2832 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2834 logging_text
=logging_text
2835 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2836 member_vnf_index
, vdu_id
, vdu_index
2840 nslcmop_id
=nslcmop_id
,
2846 kdu_index
=kdu_index
,
2847 member_vnf_index
=member_vnf_index
,
2848 vdu_index
=vdu_index
,
2850 deploy_params
=deploy_params_vdu
,
2851 descriptor_config
=descriptor_config
,
2852 base_folder
=base_folder
,
2853 task_instantiation_info
=tasks_dict_info
,
2856 for kdud
in get_kdu_list(vnfd
):
2857 kdu_name
= kdud
["name"]
2858 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2859 if descriptor_config
:
2863 kdu_index
, kdur
= next(
2865 for x
in enumerate(db_vnfr
["kdur"])
2866 if x
[1]["kdu-name"] == kdu_name
2868 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2869 if kdur
.get("additionalParams"):
2870 deploy_params_kdu
.update(
2871 parse_yaml_strings(kdur
["additionalParams"].copy())
2875 logging_text
=logging_text
,
2878 nslcmop_id
=nslcmop_id
,
2884 member_vnf_index
=member_vnf_index
,
2885 vdu_index
=vdu_index
,
2886 kdu_index
=kdu_index
,
2888 deploy_params
=deploy_params_kdu
,
2889 descriptor_config
=descriptor_config
,
2890 base_folder
=base_folder
,
2891 task_instantiation_info
=tasks_dict_info
,
2895 # Check if each vnf has exporter for metric collection if so update prometheus job records
2896 if "exporters-endpoints" in vnfd
.get("df")[0]:
2897 exporter_config
= vnfd
.get("df")[0].get("exporters-endpoints")
2898 self
.logger
.debug("exporter config :{}".format(exporter_config
))
2899 artifact_path
= "{}/{}/{}".format(
2900 base_folder
["folder"],
2901 base_folder
["pkg-dir"],
2902 "exporter-endpoint",
2905 ee_config_descriptor
= exporter_config
2906 vnfr_id
= db_vnfr
["id"]
2907 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2916 self
.logger
.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip
))
2917 self
.logger
.debug("Artifact_path:{}".format(artifact_path
))
2918 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
2919 vdu_id_for_prom
= None
2920 vdu_index_for_prom
= None
2921 for x
in get_iterable(db_vnfr
, "vdur"):
2922 vdu_id_for_prom
= x
.get("vdu-id-ref")
2923 vdu_index_for_prom
= x
.get("count-index")
2924 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2926 artifact_path
=artifact_path
,
2927 ee_config_descriptor
=ee_config_descriptor
,
2930 target_ip
=rw_mgmt_ip
,
2932 vdu_id
=vdu_id_for_prom
,
2933 vdu_index
=vdu_index_for_prom
,
2936 self
.logger
.debug("Prometheus job:{}".format(prometheus_jobs
))
2938 db_nsr_update
["_admin.deployed.prometheus_jobs"] = prometheus_jobs
2945 for job
in prometheus_jobs
:
2948 {"job_name": job
["job_name"]},
2951 fail_on_empty
=False,
2954 # Check if this NS has a charm configuration
2955 descriptor_config
= nsd
.get("ns-configuration")
2956 if descriptor_config
and descriptor_config
.get("juju"):
2959 member_vnf_index
= None
2966 # Get additional parameters
2967 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2968 if db_nsr
.get("additionalParamsForNs"):
2969 deploy_params
.update(
2970 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2972 base_folder
= nsd
["_admin"]["storage"]
2974 logging_text
=logging_text
,
2977 nslcmop_id
=nslcmop_id
,
2983 member_vnf_index
=member_vnf_index
,
2984 vdu_index
=vdu_index
,
2985 kdu_index
=kdu_index
,
2987 deploy_params
=deploy_params
,
2988 descriptor_config
=descriptor_config
,
2989 base_folder
=base_folder
,
2990 task_instantiation_info
=tasks_dict_info
,
2994 # rest of staff will be done at finally
2997 ROclient
.ROClientException
,
3003 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
3006 except asyncio
.CancelledError
:
3008 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
3010 exc
= "Operation was cancelled"
3011 except Exception as e
:
3012 exc
= traceback
.format_exc()
3013 self
.logger
.critical(
3014 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
3019 error_list
.append(str(exc
))
3021 # wait for pending tasks
3023 stage
[1] = "Waiting for instantiate pending tasks."
3024 self
.logger
.debug(logging_text
+ stage
[1])
3025 error_list
+= await self
._wait
_for
_tasks
(
3033 stage
[1] = stage
[2] = ""
3034 except asyncio
.CancelledError
:
3035 error_list
.append("Cancelled")
3036 # TODO cancel all tasks
3037 except Exception as exc
:
3038 error_list
.append(str(exc
))
3040 # update operation-status
3041 db_nsr_update
["operational-status"] = "running"
3042 # let's begin with VCA 'configured' status (later we can change it)
3043 db_nsr_update
["config-status"] = "configured"
3044 for task
, task_name
in tasks_dict_info
.items():
3045 if not task
.done() or task
.cancelled() or task
.exception():
3046 if task_name
.startswith(self
.task_name_deploy_vca
):
3047 # A N2VC task is pending
3048 db_nsr_update
["config-status"] = "failed"
3050 # RO or KDU task is pending
3051 db_nsr_update
["operational-status"] = "failed"
3053 # update status at database
3055 error_detail
= ". ".join(error_list
)
3056 self
.logger
.error(logging_text
+ error_detail
)
3057 error_description_nslcmop
= "{} Detail: {}".format(
3058 stage
[0], error_detail
3060 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
3061 nslcmop_id
, stage
[0]
3064 db_nsr_update
["detailed-status"] = (
3065 error_description_nsr
+ " Detail: " + error_detail
3067 db_nslcmop_update
["detailed-status"] = error_detail
3068 nslcmop_operation_state
= "FAILED"
3072 error_description_nsr
= error_description_nslcmop
= None
3074 db_nsr_update
["detailed-status"] = "Done"
3075 db_nslcmop_update
["detailed-status"] = "Done"
3076 nslcmop_operation_state
= "COMPLETED"
3077 # Gather auto-healing and auto-scaling alerts for each vnfr
3080 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3082 (sub
for sub
in db_vnfds
if sub
["_id"] == vnfr
["vnfd-id"]), None
3084 healing_alerts
= self
._gather
_vnfr
_healing
_alerts
(vnfr
, vnfd
)
3085 for alert
in healing_alerts
:
3086 self
.logger
.info(f
"Storing healing alert in MongoDB: {alert}")
3087 self
.db
.create("alerts", alert
)
3089 scaling_alerts
= self
._gather
_vnfr
_scaling
_alerts
(vnfr
, vnfd
)
3090 for alert
in scaling_alerts
:
3091 self
.logger
.info(f
"Storing scaling alert in MongoDB: {alert}")
3092 self
.db
.create("alerts", alert
)
3095 self
._write
_ns
_status
(
3098 current_operation
="IDLE",
3099 current_operation_id
=None,
3100 error_description
=error_description_nsr
,
3101 error_detail
=error_detail
,
3102 other_update
=db_nsr_update
,
3104 self
._write
_op
_status
(
3107 error_message
=error_description_nslcmop
,
3108 operation_state
=nslcmop_operation_state
,
3109 other_update
=db_nslcmop_update
,
3112 if nslcmop_operation_state
:
3114 await self
.msg
.aiowrite(
3119 "nslcmop_id": nslcmop_id
,
3120 "operationState": nslcmop_operation_state
,
3123 except Exception as e
:
3125 logging_text
+ "kafka_write notification Exception {}".format(e
)
3128 self
.logger
.debug(logging_text
+ "Exit")
3129 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
3131 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
3132 if vnfd_id
not in cached_vnfds
:
3133 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
3134 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
3136 return cached_vnfds
[vnfd_id
]
3138 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
3139 if vnf_profile_id
not in cached_vnfrs
:
3140 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
3143 "member-vnf-index-ref": vnf_profile_id
,
3144 "nsr-id-ref": nsr_id
,
3147 return cached_vnfrs
[vnf_profile_id
]
3149 def _is_deployed_vca_in_relation(
3150 self
, vca
: DeployedVCA
, relation
: Relation
3153 for endpoint
in (relation
.provider
, relation
.requirer
):
3154 if endpoint
["kdu-resource-profile-id"]:
3157 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
3158 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
3159 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
3165 def _update_ee_relation_data_with_implicit_data(
3166 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
3168 ee_relation_data
= safe_get_ee_relation(
3169 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
3171 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
3172 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
3173 "execution-environment-ref"
3175 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
3176 vnfd_id
= vnf_profile
["vnfd-id"]
3177 project
= nsd
["_admin"]["projects_read"][0]
3178 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3181 if ee_relation_level
== EELevel
.VNF
3182 else ee_relation_data
["vdu-profile-id"]
3184 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
3187 f
"not execution environments found for ee_relation {ee_relation_data}"
3189 ee_relation_data
["execution-environment-ref"] = ee
["id"]
3190 return ee_relation_data
3192 def _get_ns_relations(
3195 nsd
: Dict
[str, Any
],
3197 cached_vnfds
: Dict
[str, Any
],
3198 ) -> List
[Relation
]:
3200 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3201 for r
in db_ns_relations
:
3202 provider_dict
= None
3203 requirer_dict
= None
3204 if all(key
in r
for key
in ("provider", "requirer")):
3205 provider_dict
= r
["provider"]
3206 requirer_dict
= r
["requirer"]
3207 elif "entities" in r
:
3208 provider_id
= r
["entities"][0]["id"]
3211 "endpoint": r
["entities"][0]["endpoint"],
3213 if provider_id
!= nsd
["id"]:
3214 provider_dict
["vnf-profile-id"] = provider_id
3215 requirer_id
= r
["entities"][1]["id"]
3218 "endpoint": r
["entities"][1]["endpoint"],
3220 if requirer_id
!= nsd
["id"]:
3221 requirer_dict
["vnf-profile-id"] = requirer_id
3224 "provider/requirer or entities must be included in the relation."
3226 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3227 nsr_id
, nsd
, provider_dict
, cached_vnfds
3229 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3230 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3232 provider
= EERelation(relation_provider
)
3233 requirer
= EERelation(relation_requirer
)
3234 relation
= Relation(r
["name"], provider
, requirer
)
3235 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3237 relations
.append(relation
)
3240 def _get_vnf_relations(
3243 nsd
: Dict
[str, Any
],
3245 cached_vnfds
: Dict
[str, Any
],
3246 ) -> List
[Relation
]:
3248 if vca
.target_element
== "ns":
3249 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3251 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3252 vnf_profile_id
= vnf_profile
["id"]
3253 vnfd_id
= vnf_profile
["vnfd-id"]
3254 project
= nsd
["_admin"]["projects_read"][0]
3255 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3256 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3257 for r
in db_vnf_relations
:
3258 provider_dict
= None
3259 requirer_dict
= None
3260 if all(key
in r
for key
in ("provider", "requirer")):
3261 provider_dict
= r
["provider"]
3262 requirer_dict
= r
["requirer"]
3263 elif "entities" in r
:
3264 provider_id
= r
["entities"][0]["id"]
3267 "vnf-profile-id": vnf_profile_id
,
3268 "endpoint": r
["entities"][0]["endpoint"],
3270 if provider_id
!= vnfd_id
:
3271 provider_dict
["vdu-profile-id"] = provider_id
3272 requirer_id
= r
["entities"][1]["id"]
3275 "vnf-profile-id": vnf_profile_id
,
3276 "endpoint": r
["entities"][1]["endpoint"],
3278 if requirer_id
!= vnfd_id
:
3279 requirer_dict
["vdu-profile-id"] = requirer_id
3282 "provider/requirer or entities must be included in the relation."
3284 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3285 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3287 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3288 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3290 provider
= EERelation(relation_provider
)
3291 requirer
= EERelation(relation_requirer
)
3292 relation
= Relation(r
["name"], provider
, requirer
)
3293 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3295 relations
.append(relation
)
3298 def _get_kdu_resource_data(
3300 ee_relation
: EERelation
,
3301 db_nsr
: Dict
[str, Any
],
3302 cached_vnfds
: Dict
[str, Any
],
3303 ) -> DeployedK8sResource
:
3304 nsd
= get_nsd(db_nsr
)
3305 vnf_profiles
= get_vnf_profiles(nsd
)
3306 vnfd_id
= find_in_list(
3308 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3310 project
= nsd
["_admin"]["projects_read"][0]
3311 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3312 kdu_resource_profile
= get_kdu_resource_profile(
3313 db_vnfd
, ee_relation
.kdu_resource_profile_id
3315 kdu_name
= kdu_resource_profile
["kdu-name"]
3316 deployed_kdu
, _
= get_deployed_kdu(
3317 db_nsr
.get("_admin", ()).get("deployed", ()),
3319 ee_relation
.vnf_profile_id
,
3321 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3324 def _get_deployed_component(
3326 ee_relation
: EERelation
,
3327 db_nsr
: Dict
[str, Any
],
3328 cached_vnfds
: Dict
[str, Any
],
3329 ) -> DeployedComponent
:
3330 nsr_id
= db_nsr
["_id"]
3331 deployed_component
= None
3332 ee_level
= EELevel
.get_level(ee_relation
)
3333 if ee_level
== EELevel
.NS
:
3334 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3336 deployed_component
= DeployedVCA(nsr_id
, vca
)
3337 elif ee_level
== EELevel
.VNF
:
3338 vca
= get_deployed_vca(
3342 "member-vnf-index": ee_relation
.vnf_profile_id
,
3343 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3347 deployed_component
= DeployedVCA(nsr_id
, vca
)
3348 elif ee_level
== EELevel
.VDU
:
3349 vca
= get_deployed_vca(
3352 "vdu_id": ee_relation
.vdu_profile_id
,
3353 "member-vnf-index": ee_relation
.vnf_profile_id
,
3354 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3358 deployed_component
= DeployedVCA(nsr_id
, vca
)
3359 elif ee_level
== EELevel
.KDU
:
3360 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3361 ee_relation
, db_nsr
, cached_vnfds
3363 if kdu_resource_data
:
3364 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3365 return deployed_component
3367 async def _add_relation(
3371 db_nsr
: Dict
[str, Any
],
3372 cached_vnfds
: Dict
[str, Any
],
3373 cached_vnfrs
: Dict
[str, Any
],
3375 deployed_provider
= self
._get
_deployed
_component
(
3376 relation
.provider
, db_nsr
, cached_vnfds
3378 deployed_requirer
= self
._get
_deployed
_component
(
3379 relation
.requirer
, db_nsr
, cached_vnfds
3383 and deployed_requirer
3384 and deployed_provider
.config_sw_installed
3385 and deployed_requirer
.config_sw_installed
3387 provider_db_vnfr
= (
3389 relation
.provider
.nsr_id
,
3390 relation
.provider
.vnf_profile_id
,
3393 if relation
.provider
.vnf_profile_id
3396 requirer_db_vnfr
= (
3398 relation
.requirer
.nsr_id
,
3399 relation
.requirer
.vnf_profile_id
,
3402 if relation
.requirer
.vnf_profile_id
3405 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3406 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3407 provider_relation_endpoint
= RelationEndpoint(
3408 deployed_provider
.ee_id
,
3410 relation
.provider
.endpoint
,
3412 requirer_relation_endpoint
= RelationEndpoint(
3413 deployed_requirer
.ee_id
,
3415 relation
.requirer
.endpoint
,
3418 await self
.vca_map
[vca_type
].add_relation(
3419 provider
=provider_relation_endpoint
,
3420 requirer
=requirer_relation_endpoint
,
3422 except N2VCException
as exception
:
3423 self
.logger
.error(exception
)
3424 raise LcmException(exception
)
3428 async def _add_vca_relations(
3434 timeout
: int = 3600,
3437 # 1. find all relations for this VCA
3438 # 2. wait for other peers related
3442 # STEP 1: find all relations for this VCA
3445 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3446 nsd
= get_nsd(db_nsr
)
3449 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3450 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3455 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3456 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3458 # if no relations, terminate
3460 self
.logger
.debug(logging_text
+ " No relations")
3463 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3470 if now
- start
>= timeout
:
3471 self
.logger
.error(logging_text
+ " : timeout adding relations")
3474 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3475 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3477 # for each relation, find the VCA's related
3478 for relation
in relations
.copy():
3479 added
= await self
._add
_relation
(
3487 relations
.remove(relation
)
3490 self
.logger
.debug("Relations added")
3492 await asyncio
.sleep(5.0)
3496 except Exception as e
:
3497 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3500 async def _install_kdu(
3508 k8s_instance_info
: dict,
3509 k8params
: dict = None,
3514 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3517 "collection": "nsrs",
3518 "filter": {"_id": nsr_id
},
3519 "path": nsr_db_path
,
3522 if k8s_instance_info
.get("kdu-deployment-name"):
3523 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3525 kdu_instance
= self
.k8scluster_map
[
3527 ].generate_kdu_instance_name(
3528 db_dict
=db_dict_install
,
3529 kdu_model
=k8s_instance_info
["kdu-model"],
3530 kdu_name
=k8s_instance_info
["kdu-name"],
3533 # Update the nsrs table with the kdu-instance value
3537 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3540 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3541 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3542 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3543 # namespace, this first verification could be removed, and the next step would be done for any kind
3545 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3546 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3547 if k8sclustertype
in ("juju", "juju-bundle"):
3548 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3549 # that the user passed a namespace which he wants its KDU to be deployed in)
3555 "_admin.projects_write": k8s_instance_info
["namespace"],
3556 "_admin.projects_read": k8s_instance_info
["namespace"],
3562 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3567 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3569 k8s_instance_info
["namespace"] = kdu_instance
3571 await self
.k8scluster_map
[k8sclustertype
].install(
3572 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3573 kdu_model
=k8s_instance_info
["kdu-model"],
3576 db_dict
=db_dict_install
,
3578 kdu_name
=k8s_instance_info
["kdu-name"],
3579 namespace
=k8s_instance_info
["namespace"],
3580 kdu_instance
=kdu_instance
,
3584 # Obtain services to obtain management service ip
3585 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3586 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3587 kdu_instance
=kdu_instance
,
3588 namespace
=k8s_instance_info
["namespace"],
3591 # Obtain management service info (if exists)
3592 vnfr_update_dict
= {}
3593 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3595 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3600 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3603 for service
in kdud
.get("service", [])
3604 if service
.get("mgmt-service")
3606 for mgmt_service
in mgmt_services
:
3607 for service
in services
:
3608 if service
["name"].startswith(mgmt_service
["name"]):
3609 # Mgmt service found, Obtain service ip
3610 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3611 if isinstance(ip
, list) and len(ip
) == 1:
3615 "kdur.{}.ip-address".format(kdu_index
)
3618 # Check if must update also mgmt ip at the vnf
3619 service_external_cp
= mgmt_service
.get(
3620 "external-connection-point-ref"
3622 if service_external_cp
:
3624 deep_get(vnfd
, ("mgmt-interface", "cp"))
3625 == service_external_cp
3627 vnfr_update_dict
["ip-address"] = ip
3632 "external-connection-point-ref", ""
3634 == service_external_cp
,
3637 "kdur.{}.ip-address".format(kdu_index
)
3642 "Mgmt service name: {} not found".format(
3643 mgmt_service
["name"]
3647 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3648 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3650 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3653 and kdu_config
.get("initial-config-primitive")
3654 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3656 initial_config_primitive_list
= kdu_config
.get(
3657 "initial-config-primitive"
3659 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3661 for initial_config_primitive
in initial_config_primitive_list
:
3662 primitive_params_
= self
._map
_primitive
_params
(
3663 initial_config_primitive
, {}, {}
3666 await asyncio
.wait_for(
3667 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3668 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3669 kdu_instance
=kdu_instance
,
3670 primitive_name
=initial_config_primitive
["name"],
3671 params
=primitive_params_
,
3672 db_dict
=db_dict_install
,
3678 except Exception as e
:
3679 # Prepare update db with error and raise exception
3682 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3686 vnfr_data
.get("_id"),
3687 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3690 # ignore to keep original exception
3692 # reraise original error
3697 async def deploy_kdus(
3704 task_instantiation_info
,
3706 # Launch kdus if present in the descriptor
3708 k8scluster_id_2_uuic
= {
3709 "helm-chart-v3": {},
3714 async def _get_cluster_id(cluster_id
, cluster_type
):
3715 nonlocal k8scluster_id_2_uuic
3716 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3717 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3719 # check if K8scluster is creating and wait look if previous tasks in process
3720 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3721 "k8scluster", cluster_id
3724 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3725 task_name
, cluster_id
3727 self
.logger
.debug(logging_text
+ text
)
3728 await asyncio
.wait(task_dependency
, timeout
=3600)
3730 db_k8scluster
= self
.db
.get_one(
3731 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3733 if not db_k8scluster
:
3734 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3736 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3738 if cluster_type
== "helm-chart-v3":
3740 # backward compatibility for existing clusters that have not been initialized for helm v3
3741 k8s_credentials
= yaml
.safe_dump(
3742 db_k8scluster
.get("credentials")
3744 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3745 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3747 db_k8scluster_update
= {}
3748 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3749 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3750 db_k8scluster_update
[
3751 "_admin.helm-chart-v3.created"
3753 db_k8scluster_update
[
3754 "_admin.helm-chart-v3.operationalState"
3757 "k8sclusters", cluster_id
, db_k8scluster_update
3759 except Exception as e
:
3762 + "error initializing helm-v3 cluster: {}".format(str(e
))
3765 "K8s cluster '{}' has not been initialized for '{}'".format(
3766 cluster_id
, cluster_type
3771 "K8s cluster '{}' has not been initialized for '{}'".format(
3772 cluster_id
, cluster_type
3775 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3778 logging_text
+= "Deploy kdus: "
3781 db_nsr_update
= {"_admin.deployed.K8s": []}
3782 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3785 updated_cluster_list
= []
3786 updated_v3_cluster_list
= []
3788 for vnfr_data
in db_vnfrs
.values():
3789 vca_id
= self
.get_vca_id(vnfr_data
, {})
3790 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3791 # Step 0: Prepare and set parameters
3792 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3793 vnfd_id
= vnfr_data
.get("vnfd-id")
3794 vnfd_with_id
= find_in_list(
3795 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3799 for kdud
in vnfd_with_id
["kdu"]
3800 if kdud
["name"] == kdur
["kdu-name"]
3802 namespace
= kdur
.get("k8s-namespace")
3803 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3804 if kdur
.get("helm-chart"):
3805 kdumodel
= kdur
["helm-chart"]
3806 # Default version: helm3, if helm-version is v2 assign v2
3807 k8sclustertype
= "helm-chart-v3"
3808 self
.logger
.debug("kdur: {}".format(kdur
))
3810 kdur
.get("helm-version")
3811 and kdur
.get("helm-version") == "v2"
3813 k8sclustertype
= "helm-chart"
3814 elif kdur
.get("juju-bundle"):
3815 kdumodel
= kdur
["juju-bundle"]
3816 k8sclustertype
= "juju-bundle"
3819 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3820 "juju-bundle. Maybe an old NBI version is running".format(
3821 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3824 # check if kdumodel is a file and exists
3826 vnfd_with_id
= find_in_list(
3827 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3829 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3830 if storage
: # may be not present if vnfd has not artifacts
3831 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3832 if storage
["pkg-dir"]:
3833 filename
= "{}/{}/{}s/{}".format(
3840 filename
= "{}/Scripts/{}s/{}".format(
3845 if self
.fs
.file_exists(
3846 filename
, mode
="file"
3847 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3848 kdumodel
= self
.fs
.path
+ filename
3849 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3851 except Exception: # it is not a file
3854 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3855 step
= "Synchronize repos for k8s cluster '{}'".format(
3858 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3862 k8sclustertype
== "helm-chart"
3863 and cluster_uuid
not in updated_cluster_list
3865 k8sclustertype
== "helm-chart-v3"
3866 and cluster_uuid
not in updated_v3_cluster_list
3868 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3869 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3870 cluster_uuid
=cluster_uuid
3873 if del_repo_list
or added_repo_dict
:
3874 if k8sclustertype
== "helm-chart":
3876 "_admin.helm_charts_added." + item
: None
3877 for item
in del_repo_list
3880 "_admin.helm_charts_added." + item
: name
3881 for item
, name
in added_repo_dict
.items()
3883 updated_cluster_list
.append(cluster_uuid
)
3884 elif k8sclustertype
== "helm-chart-v3":
3886 "_admin.helm_charts_v3_added." + item
: None
3887 for item
in del_repo_list
3890 "_admin.helm_charts_v3_added." + item
: name
3891 for item
, name
in added_repo_dict
.items()
3893 updated_v3_cluster_list
.append(cluster_uuid
)
3895 logging_text
+ "repos synchronized on k8s cluster "
3896 "'{}' to_delete: {}, to_add: {}".format(
3897 k8s_cluster_id
, del_repo_list
, added_repo_dict
3902 {"_id": k8s_cluster_id
},
3908 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3909 vnfr_data
["member-vnf-index-ref"],
3913 k8s_instance_info
= {
3914 "kdu-instance": None,
3915 "k8scluster-uuid": cluster_uuid
,
3916 "k8scluster-type": k8sclustertype
,
3917 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3918 "kdu-name": kdur
["kdu-name"],
3919 "kdu-model": kdumodel
,
3920 "namespace": namespace
,
3921 "kdu-deployment-name": kdu_deployment_name
,
3923 db_path
= "_admin.deployed.K8s.{}".format(index
)
3924 db_nsr_update
[db_path
] = k8s_instance_info
3925 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3926 vnfd_with_id
= find_in_list(
3927 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3929 task
= asyncio
.ensure_future(
3938 k8params
=desc_params
,
3943 self
.lcm_tasks
.register(
3947 "instantiate_KDU-{}".format(index
),
3950 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3956 except (LcmException
, asyncio
.CancelledError
):
3958 except Exception as e
:
3959 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3960 if isinstance(e
, (N2VCException
, DbException
)):
3961 self
.logger
.error(logging_text
+ msg
)
3963 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3964 raise LcmException(msg
)
3967 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3987 task_instantiation_info
,
3990 # launch instantiate_N2VC in a asyncio task and register task object
3991 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3992 # if not found, create one entry and update database
3993 # fill db_nsr._admin.deployed.VCA.<index>
3996 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
4000 get_charm_name
= False
4001 if "execution-environment-list" in descriptor_config
:
4002 ee_list
= descriptor_config
.get("execution-environment-list", [])
4003 elif "juju" in descriptor_config
:
4004 ee_list
= [descriptor_config
] # ns charms
4005 if "execution-environment-list" not in descriptor_config
:
4006 # charm name is only required for ns charms
4007 get_charm_name
= True
4008 else: # other types as script are not supported
4011 for ee_item
in ee_list
:
4014 + "_deploy_n2vc ee_item juju={}, helm={}".format(
4015 ee_item
.get("juju"), ee_item
.get("helm-chart")
4018 ee_descriptor_id
= ee_item
.get("id")
4019 if ee_item
.get("juju"):
4020 vca_name
= ee_item
["juju"].get("charm")
4022 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
4025 if ee_item
["juju"].get("charm") is not None
4028 if ee_item
["juju"].get("cloud") == "k8s":
4029 vca_type
= "k8s_proxy_charm"
4030 elif ee_item
["juju"].get("proxy") is False:
4031 vca_type
= "native_charm"
4032 elif ee_item
.get("helm-chart"):
4033 vca_name
= ee_item
["helm-chart"]
4034 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
4037 vca_type
= "helm-v3"
4040 logging_text
+ "skipping non juju neither charm configuration"
4045 for vca_index
, vca_deployed
in enumerate(
4046 db_nsr
["_admin"]["deployed"]["VCA"]
4048 if not vca_deployed
:
4051 vca_deployed
.get("member-vnf-index") == member_vnf_index
4052 and vca_deployed
.get("vdu_id") == vdu_id
4053 and vca_deployed
.get("kdu_name") == kdu_name
4054 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
4055 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
4059 # not found, create one.
4061 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
4064 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
4066 target
+= "/kdu/{}".format(kdu_name
)
4068 "target_element": target
,
4069 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
4070 "member-vnf-index": member_vnf_index
,
4072 "kdu_name": kdu_name
,
4073 "vdu_count_index": vdu_index
,
4074 "operational-status": "init", # TODO revise
4075 "detailed-status": "", # TODO revise
4076 "step": "initial-deploy", # TODO revise
4078 "vdu_name": vdu_name
,
4080 "ee_descriptor_id": ee_descriptor_id
,
4081 "charm_name": charm_name
,
4085 # create VCA and configurationStatus in db
4087 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
4088 "configurationStatus.{}".format(vca_index
): dict(),
4090 self
.update_db_2("nsrs", nsr_id
, db_dict
)
4092 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
4094 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
4095 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
4096 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
4099 task_n2vc
= asyncio
.ensure_future(
4100 self
.instantiate_N2VC(
4101 logging_text
=logging_text
,
4102 vca_index
=vca_index
,
4108 vdu_index
=vdu_index
,
4109 kdu_index
=kdu_index
,
4110 deploy_params
=deploy_params
,
4111 config_descriptor
=descriptor_config
,
4112 base_folder
=base_folder
,
4113 nslcmop_id
=nslcmop_id
,
4117 ee_config_descriptor
=ee_item
,
4120 self
.lcm_tasks
.register(
4124 "instantiate_N2VC-{}".format(vca_index
),
4127 task_instantiation_info
[
4129 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
4130 member_vnf_index
or "", vdu_id
or ""
4134 def _create_nslcmop(nsr_id
, operation
, params
):
4136 Creates a ns-lcm-opp content to be stored at database.
4137 :param nsr_id: internal id of the instance
4138 :param operation: instantiate, terminate, scale, action, ...
4139 :param params: user parameters for the operation
4140 :return: dictionary following SOL005 format
4142 # Raise exception if invalid arguments
4143 if not (nsr_id
and operation
and params
):
4145 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
4152 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
4153 "operationState": "PROCESSING",
4154 "statusEnteredTime": now
,
4155 "nsInstanceId": nsr_id
,
4156 "lcmOperationType": operation
,
4158 "isAutomaticInvocation": False,
4159 "operationParams": params
,
4160 "isCancelPending": False,
4162 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
4163 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
4168 def _format_additional_params(self
, params
):
4169 params
= params
or {}
4170 for key
, value
in params
.items():
4171 if str(value
).startswith("!!yaml "):
4172 params
[key
] = yaml
.safe_load(value
[7:])
4175 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
4176 primitive
= seq
.get("name")
4177 primitive_params
= {}
4179 "member_vnf_index": vnf_index
,
4180 "primitive": primitive
,
4181 "primitive_params": primitive_params
,
4184 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
4188 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
4189 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
4190 if op
.get("operationState") == "COMPLETED":
4191 # b. Skip sub-operation
4192 # _ns_execute_primitive() or RO.create_action() will NOT be executed
4193 return self
.SUBOPERATION_STATUS_SKIP
4195 # c. retry executing sub-operation
4196 # The sub-operation exists, and operationState != 'COMPLETED'
4197 # Update operationState = 'PROCESSING' to indicate a retry.
4198 operationState
= "PROCESSING"
4199 detailed_status
= "In progress"
4200 self
._update
_suboperation
_status
(
4201 db_nslcmop
, op_index
, operationState
, detailed_status
4203 # Return the sub-operation index
4204 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4205 # with arguments extracted from the sub-operation
4208 # Find a sub-operation where all keys in a matching dictionary must match
4209 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4210 def _find_suboperation(self
, db_nslcmop
, match
):
4211 if db_nslcmop
and match
:
4212 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4213 for i
, op
in enumerate(op_list
):
4214 if all(op
.get(k
) == match
[k
] for k
in match
):
4216 return self
.SUBOPERATION_STATUS_NOT_FOUND
4218 # Update status for a sub-operation given its index
4219 def _update_suboperation_status(
4220 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4222 # Update DB for HA tasks
4223 q_filter
= {"_id": db_nslcmop
["_id"]}
4225 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4226 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4229 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4232 # Add sub-operation, return the index of the added sub-operation
4233 # Optionally, set operationState, detailed-status, and operationType
4234 # Status and type are currently set for 'scale' sub-operations:
4235 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4236 # 'detailed-status' : status message
4237 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4238 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4239 def _add_suboperation(
4247 mapped_primitive_params
,
4248 operationState
=None,
4249 detailed_status
=None,
4252 RO_scaling_info
=None,
4255 return self
.SUBOPERATION_STATUS_NOT_FOUND
4256 # Get the "_admin.operations" list, if it exists
4257 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4258 op_list
= db_nslcmop_admin
.get("operations")
4259 # Create or append to the "_admin.operations" list
4261 "member_vnf_index": vnf_index
,
4263 "vdu_count_index": vdu_count_index
,
4264 "primitive": primitive
,
4265 "primitive_params": mapped_primitive_params
,
4268 new_op
["operationState"] = operationState
4270 new_op
["detailed-status"] = detailed_status
4272 new_op
["lcmOperationType"] = operationType
4274 new_op
["RO_nsr_id"] = RO_nsr_id
4276 new_op
["RO_scaling_info"] = RO_scaling_info
4278 # No existing operations, create key 'operations' with current operation as first list element
4279 db_nslcmop_admin
.update({"operations": [new_op
]})
4280 op_list
= db_nslcmop_admin
.get("operations")
4282 # Existing operations, append operation to list
4283 op_list
.append(new_op
)
4285 db_nslcmop_update
= {"_admin.operations": op_list
}
4286 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4287 op_index
= len(op_list
) - 1
4290 # Helper methods for scale() sub-operations
4292 # pre-scale/post-scale:
4293 # Check for 3 different cases:
4294 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4295 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4296 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4297 def _check_or_add_scale_suboperation(
4301 vnf_config_primitive
,
4305 RO_scaling_info
=None,
4307 # Find this sub-operation
4308 if RO_nsr_id
and RO_scaling_info
:
4309 operationType
= "SCALE-RO"
4311 "member_vnf_index": vnf_index
,
4312 "RO_nsr_id": RO_nsr_id
,
4313 "RO_scaling_info": RO_scaling_info
,
4317 "member_vnf_index": vnf_index
,
4318 "primitive": vnf_config_primitive
,
4319 "primitive_params": primitive_params
,
4320 "lcmOperationType": operationType
,
4322 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4323 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4324 # a. New sub-operation
4325 # The sub-operation does not exist, add it.
4326 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4327 # The following parameters are set to None for all kind of scaling:
4329 vdu_count_index
= None
4331 if RO_nsr_id
and RO_scaling_info
:
4332 vnf_config_primitive
= None
4333 primitive_params
= None
4336 RO_scaling_info
= None
4337 # Initial status for sub-operation
4338 operationState
= "PROCESSING"
4339 detailed_status
= "In progress"
4340 # Add sub-operation for pre/post-scaling (zero or more operations)
4341 self
._add
_suboperation
(
4347 vnf_config_primitive
,
4355 return self
.SUBOPERATION_STATUS_NEW
4357 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4358 # or op_index (operationState != 'COMPLETED')
4359 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4361 # Function to return execution_environment id
4363 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4364 # TODO vdu_index_count
4365 for vca
in vca_deployed_list
:
4366 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4367 return vca
.get("ee_id")
4369 async def destroy_N2VC(
4377 exec_primitives
=True,
4382 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4383 :param logging_text:
4385 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4386 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4387 :param vca_index: index in the database _admin.deployed.VCA
4388 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4389 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4390 not executed properly
4391 :param scaling_in: True destroys the application, False destroys the model
4392 :return: None or exception
4397 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4398 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4402 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4404 # execute terminate_primitives
4406 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4407 config_descriptor
.get("terminate-config-primitive"),
4408 vca_deployed
.get("ee_descriptor_id"),
4410 vdu_id
= vca_deployed
.get("vdu_id")
4411 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4412 vdu_name
= vca_deployed
.get("vdu_name")
4413 vnf_index
= vca_deployed
.get("member-vnf-index")
4414 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4415 for seq
in terminate_primitives
:
4416 # For each sequence in list, get primitive and call _ns_execute_primitive()
4417 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4418 vnf_index
, seq
.get("name")
4420 self
.logger
.debug(logging_text
+ step
)
4421 # Create the primitive for each sequence, i.e. "primitive": "touch"
4422 primitive
= seq
.get("name")
4423 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4428 self
._add
_suboperation
(
4435 mapped_primitive_params
,
4437 # Sub-operations: Call _ns_execute_primitive() instead of action()
4439 result
, result_detail
= await self
._ns
_execute
_primitive
(
4440 vca_deployed
["ee_id"],
4442 mapped_primitive_params
,
4446 except LcmException
:
4447 # this happens when VCA is not deployed. In this case it is not needed to terminate
4449 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4450 if result
not in result_ok
:
4452 "terminate_primitive {} for vnf_member_index={} fails with "
4453 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4455 # set that this VCA do not need terminated
4456 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4460 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4463 # Delete Prometheus Jobs if any
4464 # This uses NSR_ID, so it will destroy any jobs under this index
4465 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4468 await self
.vca_map
[vca_type
].delete_execution_environment(
4469 vca_deployed
["ee_id"],
4470 scaling_in
=scaling_in
,
4475 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4476 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4477 namespace
= "." + db_nsr
["_id"]
4479 await self
.n2vc
.delete_namespace(
4480 namespace
=namespace
,
4481 total_timeout
=self
.timeout
.charm_delete
,
4484 except N2VCNotFound
: # already deleted. Skip
4486 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4488 async def terminate(self
, nsr_id
, nslcmop_id
):
4489 # Try to lock HA task here
4490 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4491 if not task_is_locked_by_me
:
4494 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4495 self
.logger
.debug(logging_text
+ "Enter")
4496 timeout_ns_terminate
= self
.timeout
.ns_terminate
4499 operation_params
= None
4501 error_list
= [] # annotates all failed error messages
4502 db_nslcmop_update
= {}
4503 autoremove
= False # autoremove after terminated
4504 tasks_dict_info
= {}
4507 "Stage 1/3: Preparing task.",
4508 "Waiting for previous operations to terminate.",
4511 # ^ contains [stage, step, VIM-status]
4513 # wait for any previous tasks in process
4514 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4516 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4517 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4518 operation_params
= db_nslcmop
.get("operationParams") or {}
4519 if operation_params
.get("timeout_ns_terminate"):
4520 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4521 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4522 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4524 db_nsr_update
["operational-status"] = "terminating"
4525 db_nsr_update
["config-status"] = "terminating"
4526 self
._write
_ns
_status
(
4528 ns_state
="TERMINATING",
4529 current_operation
="TERMINATING",
4530 current_operation_id
=nslcmop_id
,
4531 other_update
=db_nsr_update
,
4533 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4534 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4535 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4538 stage
[1] = "Getting vnf descriptors from db."
4539 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4541 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4543 db_vnfds_from_id
= {}
4544 db_vnfds_from_member_index
= {}
4546 for vnfr
in db_vnfrs_list
:
4547 vnfd_id
= vnfr
["vnfd-id"]
4548 if vnfd_id
not in db_vnfds_from_id
:
4549 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4550 db_vnfds_from_id
[vnfd_id
] = vnfd
4551 db_vnfds_from_member_index
[
4552 vnfr
["member-vnf-index-ref"]
4553 ] = db_vnfds_from_id
[vnfd_id
]
4555 # Destroy individual execution environments when there are terminating primitives.
4556 # Rest of EE will be deleted at once
4557 # TODO - check before calling _destroy_N2VC
4558 # if not operation_params.get("skip_terminate_primitives"):#
4559 # or not vca.get("needed_terminate"):
4560 stage
[0] = "Stage 2/3 execute terminating primitives."
4561 self
.logger
.debug(logging_text
+ stage
[0])
4562 stage
[1] = "Looking execution environment that needs terminate."
4563 self
.logger
.debug(logging_text
+ stage
[1])
4565 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4566 config_descriptor
= None
4567 vca_member_vnf_index
= vca
.get("member-vnf-index")
4568 vca_id
= self
.get_vca_id(
4569 db_vnfrs_dict
.get(vca_member_vnf_index
)
4570 if vca_member_vnf_index
4574 if not vca
or not vca
.get("ee_id"):
4576 if not vca
.get("member-vnf-index"):
4578 config_descriptor
= db_nsr
.get("ns-configuration")
4579 elif vca
.get("vdu_id"):
4580 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4581 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4582 elif vca
.get("kdu_name"):
4583 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4584 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4586 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4587 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4588 vca_type
= vca
.get("type")
4589 exec_terminate_primitives
= not operation_params
.get(
4590 "skip_terminate_primitives"
4591 ) and vca
.get("needed_terminate")
4592 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4593 # pending native charms
4595 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4597 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4598 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4599 task
= asyncio
.ensure_future(
4607 exec_terminate_primitives
,
4611 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4613 # wait for pending tasks of terminate primitives
4617 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4619 error_list
= await self
._wait
_for
_tasks
(
4622 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4626 tasks_dict_info
.clear()
4628 return # raise LcmException("; ".join(error_list))
4630 # remove All execution environments at once
4631 stage
[0] = "Stage 3/3 delete all."
4633 if nsr_deployed
.get("VCA"):
4634 stage
[1] = "Deleting all execution environments."
4635 self
.logger
.debug(logging_text
+ stage
[1])
4636 vca_id
= self
.get_vca_id({}, db_nsr
)
4637 task_delete_ee
= asyncio
.ensure_future(
4639 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4640 timeout
=self
.timeout
.charm_delete
,
4643 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4644 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4646 # Delete Namespace and Certificates if necessary
4647 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4648 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4649 certificate_name
=db_nslcmop
["nsInstanceId"],
4651 # TODO: Delete namespace
4653 # Delete from k8scluster
4654 stage
[1] = "Deleting KDUs."
4655 self
.logger
.debug(logging_text
+ stage
[1])
4656 # print(nsr_deployed)
4657 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4658 if not kdu
or not kdu
.get("kdu-instance"):
4660 kdu_instance
= kdu
.get("kdu-instance")
4661 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4662 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4663 vca_id
= self
.get_vca_id({}, db_nsr
)
4664 task_delete_kdu_instance
= asyncio
.ensure_future(
4665 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4666 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4667 kdu_instance
=kdu_instance
,
4669 namespace
=kdu
.get("namespace"),
4675 + "Unknown k8s deployment type {}".format(
4676 kdu
.get("k8scluster-type")
4681 task_delete_kdu_instance
4682 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4685 stage
[1] = "Deleting ns from VIM."
4686 if self
.ro_config
.ng
:
4687 task_delete_ro
= asyncio
.ensure_future(
4688 self
._terminate
_ng
_ro
(
4689 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4692 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4694 # rest of staff will be done at finally
4697 ROclient
.ROClientException
,
4702 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4704 except asyncio
.CancelledError
:
4706 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4708 exc
= "Operation was cancelled"
4709 except Exception as e
:
4710 exc
= traceback
.format_exc()
4711 self
.logger
.critical(
4712 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4717 error_list
.append(str(exc
))
4719 # wait for pending tasks
4721 stage
[1] = "Waiting for terminate pending tasks."
4722 self
.logger
.debug(logging_text
+ stage
[1])
4723 error_list
+= await self
._wait
_for
_tasks
(
4726 timeout_ns_terminate
,
4730 stage
[1] = stage
[2] = ""
4731 except asyncio
.CancelledError
:
4732 error_list
.append("Cancelled")
4733 # TODO cancell all tasks
4734 except Exception as exc
:
4735 error_list
.append(str(exc
))
4736 # update status at database
4738 error_detail
= "; ".join(error_list
)
4739 # self.logger.error(logging_text + error_detail)
4740 error_description_nslcmop
= "{} Detail: {}".format(
4741 stage
[0], error_detail
4743 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4744 nslcmop_id
, stage
[0]
4747 db_nsr_update
["operational-status"] = "failed"
4748 db_nsr_update
["detailed-status"] = (
4749 error_description_nsr
+ " Detail: " + error_detail
4751 db_nslcmop_update
["detailed-status"] = error_detail
4752 nslcmop_operation_state
= "FAILED"
4756 error_description_nsr
= error_description_nslcmop
= None
4757 ns_state
= "NOT_INSTANTIATED"
4758 db_nsr_update
["operational-status"] = "terminated"
4759 db_nsr_update
["detailed-status"] = "Done"
4760 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4761 db_nslcmop_update
["detailed-status"] = "Done"
4762 nslcmop_operation_state
= "COMPLETED"
4765 self
._write
_ns
_status
(
4768 current_operation
="IDLE",
4769 current_operation_id
=None,
4770 error_description
=error_description_nsr
,
4771 error_detail
=error_detail
,
4772 other_update
=db_nsr_update
,
4774 self
._write
_op
_status
(
4777 error_message
=error_description_nslcmop
,
4778 operation_state
=nslcmop_operation_state
,
4779 other_update
=db_nslcmop_update
,
4781 if ns_state
== "NOT_INSTANTIATED":
4785 {"nsr-id-ref": nsr_id
},
4786 {"_admin.nsState": "NOT_INSTANTIATED"},
4788 except DbException
as e
:
4791 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4795 if operation_params
:
4796 autoremove
= operation_params
.get("autoremove", False)
4797 if nslcmop_operation_state
:
4799 await self
.msg
.aiowrite(
4804 "nslcmop_id": nslcmop_id
,
4805 "operationState": nslcmop_operation_state
,
4806 "autoremove": autoremove
,
4809 except Exception as e
:
4811 logging_text
+ "kafka_write notification Exception {}".format(e
)
4813 self
.logger
.debug(f
"Deleting alerts: ns_id={nsr_id}")
4814 self
.db
.del_list("alerts", {"tags.ns_id": nsr_id
})
4816 self
.logger
.debug(logging_text
+ "Exit")
4817 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4819 async def _wait_for_tasks(
4820 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4823 error_detail_list
= []
4825 pending_tasks
= list(created_tasks_info
.keys())
4826 num_tasks
= len(pending_tasks
)
4828 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4829 self
._write
_op
_status
(nslcmop_id
, stage
)
4830 while pending_tasks
:
4832 _timeout
= timeout
+ time_start
- time()
4833 done
, pending_tasks
= await asyncio
.wait(
4834 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4836 num_done
+= len(done
)
4837 if not done
: # Timeout
4838 for task
in pending_tasks
:
4839 new_error
= created_tasks_info
[task
] + ": Timeout"
4840 error_detail_list
.append(new_error
)
4841 error_list
.append(new_error
)
4844 if task
.cancelled():
4847 exc
= task
.exception()
4849 if isinstance(exc
, asyncio
.TimeoutError
):
4851 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4852 error_list
.append(created_tasks_info
[task
])
4853 error_detail_list
.append(new_error
)
4860 ROclient
.ROClientException
,
4866 self
.logger
.error(logging_text
+ new_error
)
4868 exc_traceback
= "".join(
4869 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4873 + created_tasks_info
[task
]
4879 logging_text
+ created_tasks_info
[task
] + ": Done"
4881 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4883 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4884 if nsr_id
: # update also nsr
4889 "errorDescription": "Error at: " + ", ".join(error_list
),
4890 "errorDetail": ". ".join(error_detail_list
),
4893 self
._write
_op
_status
(nslcmop_id
, stage
)
4894 return error_detail_list
4897 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4899 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4900 The default-value is used. If it is between < > it look for a value at instantiation_params
4901 :param primitive_desc: portion of VNFD/NSD that describes primitive
4902 :param params: Params provided by user
4903 :param instantiation_params: Instantiation params provided by user
4904 :return: a dictionary with the calculated params
4906 calculated_params
= {}
4907 for parameter
in primitive_desc
.get("parameter", ()):
4908 param_name
= parameter
["name"]
4909 if param_name
in params
:
4910 calculated_params
[param_name
] = params
[param_name
]
4911 elif "default-value" in parameter
or "value" in parameter
:
4912 if "value" in parameter
:
4913 calculated_params
[param_name
] = parameter
["value"]
4915 calculated_params
[param_name
] = parameter
["default-value"]
4917 isinstance(calculated_params
[param_name
], str)
4918 and calculated_params
[param_name
].startswith("<")
4919 and calculated_params
[param_name
].endswith(">")
4921 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4922 calculated_params
[param_name
] = instantiation_params
[
4923 calculated_params
[param_name
][1:-1]
4927 "Parameter {} needed to execute primitive {} not provided".format(
4928 calculated_params
[param_name
], primitive_desc
["name"]
4933 "Parameter {} needed to execute primitive {} not provided".format(
4934 param_name
, primitive_desc
["name"]
4938 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4939 calculated_params
[param_name
] = yaml
.safe_dump(
4940 calculated_params
[param_name
], default_flow_style
=True, width
=256
4942 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4944 ].startswith("!!yaml "):
4945 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4946 if parameter
.get("data-type") == "INTEGER":
4948 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4949 except ValueError: # error converting string to int
4951 "Parameter {} of primitive {} must be integer".format(
4952 param_name
, primitive_desc
["name"]
4955 elif parameter
.get("data-type") == "BOOLEAN":
4956 calculated_params
[param_name
] = not (
4957 (str(calculated_params
[param_name
])).lower() == "false"
4960 # add always ns_config_info if primitive name is config
4961 if primitive_desc
["name"] == "config":
4962 if "ns_config_info" in instantiation_params
:
4963 calculated_params
["ns_config_info"] = instantiation_params
[
4966 return calculated_params
4968 def _look_for_deployed_vca(
4975 ee_descriptor_id
=None,
4977 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4978 for vca
in deployed_vca
:
4981 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4984 vdu_count_index
is not None
4985 and vdu_count_index
!= vca
["vdu_count_index"]
4988 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4990 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4994 # vca_deployed not found
4996 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4997 " is not deployed".format(
5006 ee_id
= vca
.get("ee_id")
5008 "type", "lxc_proxy_charm"
5009 ) # default value for backward compatibility - proxy charm
5012 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
5013 "execution environment".format(
5014 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5017 return ee_id
, vca_type
5019 async def _ns_execute_primitive(
5025 retries_interval
=30,
5032 if primitive
== "config":
5033 primitive_params
= {"params": primitive_params
}
5035 vca_type
= vca_type
or "lxc_proxy_charm"
5039 output
= await asyncio
.wait_for(
5040 self
.vca_map
[vca_type
].exec_primitive(
5042 primitive_name
=primitive
,
5043 params_dict
=primitive_params
,
5044 progress_timeout
=self
.timeout
.progress_primitive
,
5045 total_timeout
=self
.timeout
.primitive
,
5050 timeout
=timeout
or self
.timeout
.primitive
,
5054 except asyncio
.CancelledError
:
5056 except Exception as e
:
5060 "Error executing action {} on {} -> {}".format(
5065 await asyncio
.sleep(retries_interval
)
5067 if isinstance(e
, asyncio
.TimeoutError
):
5069 message
="Timed out waiting for action to complete"
5071 return "FAILED", getattr(e
, "message", repr(e
))
5073 return "COMPLETED", output
5075 except (LcmException
, asyncio
.CancelledError
):
5077 except Exception as e
:
5078 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5080 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5082 Updating the vca_status with latest juju information in nsrs record
5083 :param: nsr_id: Id of the nsr
5084 :param: nslcmop_id: Id of the nslcmop
5088 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5089 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5090 vca_id
= self
.get_vca_id({}, db_nsr
)
5091 if db_nsr
["_admin"]["deployed"]["K8s"]:
5092 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5093 cluster_uuid
, kdu_instance
, cluster_type
= (
5094 k8s
["k8scluster-uuid"],
5095 k8s
["kdu-instance"],
5096 k8s
["k8scluster-type"],
5098 await self
._on
_update
_k
8s
_db
(
5099 cluster_uuid
=cluster_uuid
,
5100 kdu_instance
=kdu_instance
,
5101 filter={"_id": nsr_id
},
5103 cluster_type
=cluster_type
,
5106 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5107 table
, filter = "nsrs", {"_id": nsr_id
}
5108 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5109 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5111 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5112 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5114 async def action(self
, nsr_id
, nslcmop_id
):
5115 # Try to lock HA task here
5116 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5117 if not task_is_locked_by_me
:
5120 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5121 self
.logger
.debug(logging_text
+ "Enter")
5122 # get all needed from database
5126 db_nslcmop_update
= {}
5127 nslcmop_operation_state
= None
5128 error_description_nslcmop
= None
5132 # wait for any previous tasks in process
5133 step
= "Waiting for previous operations to terminate"
5134 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5136 self
._write
_ns
_status
(
5139 current_operation
="RUNNING ACTION",
5140 current_operation_id
=nslcmop_id
,
5143 step
= "Getting information from database"
5144 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5145 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5146 if db_nslcmop
["operationParams"].get("primitive_params"):
5147 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5148 db_nslcmop
["operationParams"]["primitive_params"]
5151 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5152 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5153 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5154 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5155 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5156 primitive
= db_nslcmop
["operationParams"]["primitive"]
5157 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5158 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5159 "timeout_ns_action", self
.timeout
.primitive
5163 step
= "Getting vnfr from database"
5164 db_vnfr
= self
.db
.get_one(
5165 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5167 if db_vnfr
.get("kdur"):
5169 for kdur
in db_vnfr
["kdur"]:
5170 if kdur
.get("additionalParams"):
5171 kdur
["additionalParams"] = json
.loads(
5172 kdur
["additionalParams"]
5174 kdur_list
.append(kdur
)
5175 db_vnfr
["kdur"] = kdur_list
5176 step
= "Getting vnfd from database"
5177 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5179 # Sync filesystem before running a primitive
5180 self
.fs
.sync(db_vnfr
["vnfd-id"])
5182 step
= "Getting nsd from database"
5183 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5185 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5186 # for backward compatibility
5187 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5188 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5189 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5190 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5192 # look for primitive
5193 config_primitive_desc
= descriptor_configuration
= None
5195 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5197 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5199 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5201 descriptor_configuration
= db_nsd
.get("ns-configuration")
5203 if descriptor_configuration
and descriptor_configuration
.get(
5206 for config_primitive
in descriptor_configuration
["config-primitive"]:
5207 if config_primitive
["name"] == primitive
:
5208 config_primitive_desc
= config_primitive
5211 if not config_primitive_desc
:
5212 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5214 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5218 primitive_name
= primitive
5219 ee_descriptor_id
= None
5221 primitive_name
= config_primitive_desc
.get(
5222 "execution-environment-primitive", primitive
5224 ee_descriptor_id
= config_primitive_desc
.get(
5225 "execution-environment-ref"
5231 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5233 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5236 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5238 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5240 desc_params
= parse_yaml_strings(
5241 db_vnfr
.get("additionalParamsForVnf")
5244 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5245 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5246 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5248 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5249 actions
.add(primitive
["name"])
5250 for primitive
in kdu_configuration
.get("config-primitive", []):
5251 actions
.add(primitive
["name"])
5253 nsr_deployed
["K8s"],
5254 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5255 and kdu
["member-vnf-index"] == vnf_index
,
5259 if primitive_name
in actions
5260 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5264 # TODO check if ns is in a proper status
5266 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5268 # kdur and desc_params already set from before
5269 if primitive_params
:
5270 desc_params
.update(primitive_params
)
5271 # TODO Check if we will need something at vnf level
5272 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5274 kdu_name
== kdu
["kdu-name"]
5275 and kdu
["member-vnf-index"] == vnf_index
5280 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5283 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5284 msg
= "unknown k8scluster-type '{}'".format(
5285 kdu
.get("k8scluster-type")
5287 raise LcmException(msg
)
5290 "collection": "nsrs",
5291 "filter": {"_id": nsr_id
},
5292 "path": "_admin.deployed.K8s.{}".format(index
),
5296 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5298 step
= "Executing kdu {}".format(primitive_name
)
5299 if primitive_name
== "upgrade":
5300 if desc_params
.get("kdu_model"):
5301 kdu_model
= desc_params
.get("kdu_model")
5302 del desc_params
["kdu_model"]
5304 kdu_model
= kdu
.get("kdu-model")
5305 if kdu_model
.count("/") < 2: # helm chart is not embedded
5306 parts
= kdu_model
.split(sep
=":")
5308 kdu_model
= parts
[0]
5309 if desc_params
.get("kdu_atomic_upgrade"):
5310 atomic_upgrade
= desc_params
.get(
5311 "kdu_atomic_upgrade"
5312 ).lower() in ("yes", "true", "1")
5313 del desc_params
["kdu_atomic_upgrade"]
5315 atomic_upgrade
= True
5317 detailed_status
= await asyncio
.wait_for(
5318 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5319 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5320 kdu_instance
=kdu
.get("kdu-instance"),
5321 atomic
=atomic_upgrade
,
5322 kdu_model
=kdu_model
,
5325 timeout
=timeout_ns_action
,
5327 timeout
=timeout_ns_action
+ 10,
5330 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5332 elif primitive_name
== "rollback":
5333 detailed_status
= await asyncio
.wait_for(
5334 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5335 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5336 kdu_instance
=kdu
.get("kdu-instance"),
5339 timeout
=timeout_ns_action
,
5341 elif primitive_name
== "status":
5342 detailed_status
= await asyncio
.wait_for(
5343 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5344 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5345 kdu_instance
=kdu
.get("kdu-instance"),
5348 timeout
=timeout_ns_action
,
5351 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5352 kdu
["kdu-name"], nsr_id
5354 params
= self
._map
_primitive
_params
(
5355 config_primitive_desc
, primitive_params
, desc_params
5358 detailed_status
= await asyncio
.wait_for(
5359 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5360 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5361 kdu_instance
=kdu_instance
,
5362 primitive_name
=primitive_name
,
5365 timeout
=timeout_ns_action
,
5368 timeout
=timeout_ns_action
,
5372 nslcmop_operation_state
= "COMPLETED"
5374 detailed_status
= ""
5375 nslcmop_operation_state
= "FAILED"
5377 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5378 nsr_deployed
["VCA"],
5379 member_vnf_index
=vnf_index
,
5381 vdu_count_index
=vdu_count_index
,
5382 ee_descriptor_id
=ee_descriptor_id
,
5384 for vca_index
, vca_deployed
in enumerate(
5385 db_nsr
["_admin"]["deployed"]["VCA"]
5387 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5389 "collection": "nsrs",
5390 "filter": {"_id": nsr_id
},
5391 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5395 nslcmop_operation_state
,
5397 ) = await self
._ns
_execute
_primitive
(
5399 primitive
=primitive_name
,
5400 primitive_params
=self
._map
_primitive
_params
(
5401 config_primitive_desc
, primitive_params
, desc_params
5403 timeout
=timeout_ns_action
,
5409 db_nslcmop_update
["detailed-status"] = detailed_status
5410 error_description_nslcmop
= (
5411 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5415 + "Done with result {} {}".format(
5416 nslcmop_operation_state
, detailed_status
5419 return # database update is called inside finally
5421 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5422 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5424 except asyncio
.CancelledError
:
5426 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5428 exc
= "Operation was cancelled"
5429 except asyncio
.TimeoutError
:
5430 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5432 except Exception as e
:
5433 exc
= traceback
.format_exc()
5434 self
.logger
.critical(
5435 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5444 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5445 nslcmop_operation_state
= "FAILED"
5447 self
._write
_ns
_status
(
5451 ], # TODO check if degraded. For the moment use previous status
5452 current_operation
="IDLE",
5453 current_operation_id
=None,
5454 # error_description=error_description_nsr,
5455 # error_detail=error_detail,
5456 other_update
=db_nsr_update
,
5459 self
._write
_op
_status
(
5462 error_message
=error_description_nslcmop
,
5463 operation_state
=nslcmop_operation_state
,
5464 other_update
=db_nslcmop_update
,
5467 if nslcmop_operation_state
:
5469 await self
.msg
.aiowrite(
5474 "nslcmop_id": nslcmop_id
,
5475 "operationState": nslcmop_operation_state
,
5478 except Exception as e
:
5480 logging_text
+ "kafka_write notification Exception {}".format(e
)
5482 self
.logger
.debug(logging_text
+ "Exit")
5483 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5484 return nslcmop_operation_state
, detailed_status
5486 async def terminate_vdus(
5487 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5489 """This method terminates VDUs
5492 db_vnfr: VNF instance record
5493 member_vnf_index: VNF index to identify the VDUs to be removed
5494 db_nsr: NS instance record
5495 update_db_nslcmops: Nslcmop update record
5497 vca_scaling_info
= []
5498 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5499 scaling_info
["scaling_direction"] = "IN"
5500 scaling_info
["vdu-delete"] = {}
5501 scaling_info
["kdu-delete"] = {}
5502 db_vdur
= db_vnfr
.get("vdur")
5503 vdur_list
= copy(db_vdur
)
5505 for index
, vdu
in enumerate(vdur_list
):
5506 vca_scaling_info
.append(
5508 "osm_vdu_id": vdu
["vdu-id-ref"],
5509 "member-vnf-index": member_vnf_index
,
5511 "vdu_index": count_index
,
5514 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5515 scaling_info
["vdu"].append(
5517 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5518 "vdu_id": vdu
["vdu-id-ref"],
5522 for interface
in vdu
["interfaces"]:
5523 scaling_info
["vdu"][index
]["interface"].append(
5525 "name": interface
["name"],
5526 "ip_address": interface
["ip-address"],
5527 "mac_address": interface
.get("mac-address"),
5530 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5531 stage
[2] = "Terminating VDUs"
5532 if scaling_info
.get("vdu-delete"):
5533 # scale_process = "RO"
5534 if self
.ro_config
.ng
:
5535 await self
._scale
_ng
_ro
(
5544 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5545 """This method is to Remove VNF instances from NS.
5548 nsr_id: NS instance id
5549 nslcmop_id: nslcmop id of update
5550 vnf_instance_id: id of the VNF instance to be removed
5553 result: (str, str) COMPLETED/FAILED, details
5557 logging_text
= "Task ns={} update ".format(nsr_id
)
5558 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5559 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5560 if check_vnfr_count
> 1:
5561 stage
= ["", "", ""]
5562 step
= "Getting nslcmop from database"
5564 step
+ " after having waited for previous tasks to be completed"
5566 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5567 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5568 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5569 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5570 """ db_vnfr = self.db.get_one(
5571 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5573 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5574 await self
.terminate_vdus(
5583 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5584 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5585 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5586 "constituent-vnfr-ref"
5588 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5589 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5590 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5591 return "COMPLETED", "Done"
5593 step
= "Terminate VNF Failed with"
5595 "{} Cannot terminate the last VNF in this NS.".format(
5599 except (LcmException
, asyncio
.CancelledError
):
5601 except Exception as e
:
5602 self
.logger
.debug("Error removing VNF {}".format(e
))
5603 return "FAILED", "Error removing VNF {}".format(e
)
5605 async def _ns_redeploy_vnf(
5613 """This method updates and redeploys VNF instances
5616 nsr_id: NS instance id
5617 nslcmop_id: nslcmop id
5618 db_vnfd: VNF descriptor
5619 db_vnfr: VNF instance record
5620 db_nsr: NS instance record
5623 result: (str, str) COMPLETED/FAILED, details
5627 stage
= ["", "", ""]
5628 logging_text
= "Task ns={} update ".format(nsr_id
)
5629 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5630 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5632 # Terminate old VNF resources
5633 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5634 await self
.terminate_vdus(
5643 # old_vnfd_id = db_vnfr["vnfd-id"]
5644 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5645 new_db_vnfd
= db_vnfd
5646 # new_vnfd_ref = new_db_vnfd["id"]
5647 # new_vnfd_id = vnfd_id
5651 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5653 "name": cp
.get("id"),
5654 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5655 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5658 new_vnfr_cp
.append(vnf_cp
)
5659 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5660 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5661 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5663 "revision": latest_vnfd_revision
,
5664 "connection-point": new_vnfr_cp
,
5668 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5669 updated_db_vnfr
= self
.db
.get_one(
5671 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5674 # Instantiate new VNF resources
5675 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5676 vca_scaling_info
= []
5677 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5678 scaling_info
["scaling_direction"] = "OUT"
5679 scaling_info
["vdu-create"] = {}
5680 scaling_info
["kdu-create"] = {}
5681 vdud_instantiate_list
= db_vnfd
["vdu"]
5682 for index
, vdud
in enumerate(vdud_instantiate_list
):
5683 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5685 additional_params
= (
5686 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5689 cloud_init_list
= []
5691 # TODO Information of its own ip is not available because db_vnfr is not updated.
5692 additional_params
["OSM"] = get_osm_params(
5693 updated_db_vnfr
, vdud
["id"], 1
5695 cloud_init_list
.append(
5696 self
._parse
_cloud
_init
(
5703 vca_scaling_info
.append(
5705 "osm_vdu_id": vdud
["id"],
5706 "member-vnf-index": member_vnf_index
,
5708 "vdu_index": count_index
,
5711 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5712 if self
.ro_config
.ng
:
5714 "New Resources to be deployed: {}".format(scaling_info
)
5716 await self
._scale
_ng
_ro
(
5724 return "COMPLETED", "Done"
5725 except (LcmException
, asyncio
.CancelledError
):
5727 except Exception as e
:
5728 self
.logger
.debug("Error updating VNF {}".format(e
))
5729 return "FAILED", "Error updating VNF {}".format(e
)
5731 async def _ns_charm_upgrade(
5737 timeout
: float = None,
5739 """This method upgrade charms in VNF instances
5742 ee_id: Execution environment id
5743 path: Local path to the charm
5745 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5746 timeout: (Float) Timeout for the ns update operation
5749 result: (str, str) COMPLETED/FAILED, details
5752 charm_type
= charm_type
or "lxc_proxy_charm"
5753 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5757 charm_type
=charm_type
,
5758 timeout
=timeout
or self
.timeout
.ns_update
,
5762 return "COMPLETED", output
5764 except (LcmException
, asyncio
.CancelledError
):
5767 except Exception as e
:
5768 self
.logger
.debug("Error upgrading charm {}".format(path
))
5770 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5772 async def update(self
, nsr_id
, nslcmop_id
):
5773 """Update NS according to different update types
5775 This method performs upgrade of VNF instances then updates the revision
5776 number in VNF record
5779 nsr_id: Network service will be updated
5780 nslcmop_id: ns lcm operation id
5783 It may raise DbException, LcmException, N2VCException, K8sException
5786 # Try to lock HA task here
5787 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5788 if not task_is_locked_by_me
:
5791 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5792 self
.logger
.debug(logging_text
+ "Enter")
5794 # Set the required variables to be filled up later
5796 db_nslcmop_update
= {}
5798 nslcmop_operation_state
= None
5800 error_description_nslcmop
= ""
5802 change_type
= "updated"
5803 detailed_status
= ""
5804 member_vnf_index
= None
5807 # wait for any previous tasks in process
5808 step
= "Waiting for previous operations to terminate"
5809 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5810 self
._write
_ns
_status
(
5813 current_operation
="UPDATING",
5814 current_operation_id
=nslcmop_id
,
5817 step
= "Getting nslcmop from database"
5818 db_nslcmop
= self
.db
.get_one(
5819 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5821 update_type
= db_nslcmop
["operationParams"]["updateType"]
5823 step
= "Getting nsr from database"
5824 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5825 old_operational_status
= db_nsr
["operational-status"]
5826 db_nsr_update
["operational-status"] = "updating"
5827 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5828 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5830 if update_type
== "CHANGE_VNFPKG":
5831 # Get the input parameters given through update request
5832 vnf_instance_id
= db_nslcmop
["operationParams"][
5833 "changeVnfPackageData"
5834 ].get("vnfInstanceId")
5836 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5839 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5841 step
= "Getting vnfr from database"
5842 db_vnfr
= self
.db
.get_one(
5843 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5846 step
= "Getting vnfds from database"
5848 latest_vnfd
= self
.db
.get_one(
5849 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5851 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5854 current_vnf_revision
= db_vnfr
.get("revision", 1)
5855 current_vnfd
= self
.db
.get_one(
5857 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5858 fail_on_empty
=False,
5860 # Charm artifact paths will be filled up later
5862 current_charm_artifact_path
,
5863 target_charm_artifact_path
,
5864 charm_artifact_paths
,
5866 ) = ([], [], [], [])
5868 step
= "Checking if revision has changed in VNFD"
5869 if current_vnf_revision
!= latest_vnfd_revision
:
5870 change_type
= "policy_updated"
5872 # There is new revision of VNFD, update operation is required
5873 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5874 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5876 step
= "Removing the VNFD packages if they exist in the local path"
5877 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5878 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5880 step
= "Get the VNFD packages from FSMongo"
5881 self
.fs
.sync(from_path
=latest_vnfd_path
)
5882 self
.fs
.sync(from_path
=current_vnfd_path
)
5885 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5887 current_base_folder
= current_vnfd
["_admin"]["storage"]
5888 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5890 for vca_index
, vca_deployed
in enumerate(
5891 get_iterable(nsr_deployed
, "VCA")
5893 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5895 # Getting charm-id and charm-type
5896 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5897 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5898 vca_type
= vca_deployed
.get("type")
5899 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5902 ee_id
= vca_deployed
.get("ee_id")
5904 step
= "Getting descriptor config"
5905 if current_vnfd
.get("kdu"):
5906 search_key
= "kdu_name"
5908 search_key
= "vnfd_id"
5910 entity_id
= vca_deployed
.get(search_key
)
5912 descriptor_config
= get_configuration(
5913 current_vnfd
, entity_id
5916 if "execution-environment-list" in descriptor_config
:
5917 ee_list
= descriptor_config
.get(
5918 "execution-environment-list", []
5923 # There could be several charm used in the same VNF
5924 for ee_item
in ee_list
:
5925 if ee_item
.get("juju"):
5926 step
= "Getting charm name"
5927 charm_name
= ee_item
["juju"].get("charm")
5929 step
= "Setting Charm artifact paths"
5930 current_charm_artifact_path
.append(
5931 get_charm_artifact_path(
5932 current_base_folder
,
5935 current_vnf_revision
,
5938 target_charm_artifact_path
.append(
5939 get_charm_artifact_path(
5943 latest_vnfd_revision
,
5946 elif ee_item
.get("helm-chart"):
5947 # add chart to list and all parameters
5948 step
= "Getting helm chart name"
5949 chart_name
= ee_item
.get("helm-chart")
5951 ee_item
.get("helm-version")
5952 and ee_item
.get("helm-version") == "v2"
5956 vca_type
= "helm-v3"
5957 step
= "Setting Helm chart artifact paths"
5959 helm_artifacts
.append(
5961 "current_artifact_path": get_charm_artifact_path(
5962 current_base_folder
,
5965 current_vnf_revision
,
5967 "target_artifact_path": get_charm_artifact_path(
5971 latest_vnfd_revision
,
5974 "vca_index": vca_index
,
5975 "vdu_index": vdu_count_index
,
5979 charm_artifact_paths
= zip(
5980 current_charm_artifact_path
, target_charm_artifact_path
5983 step
= "Checking if software version has changed in VNFD"
5984 if find_software_version(current_vnfd
) != find_software_version(
5987 step
= "Checking if existing VNF has charm"
5988 for current_charm_path
, target_charm_path
in list(
5989 charm_artifact_paths
5991 if current_charm_path
:
5993 "Software version change is not supported as VNF instance {} has charm.".format(
5998 # There is no change in the charm package, then redeploy the VNF
5999 # based on new descriptor
6000 step
= "Redeploying VNF"
6001 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6002 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
6003 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
6005 if result
== "FAILED":
6006 nslcmop_operation_state
= result
6007 error_description_nslcmop
= detailed_status
6008 db_nslcmop_update
["detailed-status"] = detailed_status
6011 + " step {} Done with result {} {}".format(
6012 step
, nslcmop_operation_state
, detailed_status
6017 step
= "Checking if any charm package has changed or not"
6018 for current_charm_path
, target_charm_path
in list(
6019 charm_artifact_paths
6023 and target_charm_path
6024 and self
.check_charm_hash_changed(
6025 current_charm_path
, target_charm_path
6028 step
= "Checking whether VNF uses juju bundle"
6029 if check_juju_bundle_existence(current_vnfd
):
6031 "Charm upgrade is not supported for the instance which"
6032 " uses juju-bundle: {}".format(
6033 check_juju_bundle_existence(current_vnfd
)
6037 step
= "Upgrading Charm"
6041 ) = await self
._ns
_charm
_upgrade
(
6044 charm_type
=vca_type
,
6045 path
=self
.fs
.path
+ target_charm_path
,
6046 timeout
=timeout_seconds
,
6049 if result
== "FAILED":
6050 nslcmop_operation_state
= result
6051 error_description_nslcmop
= detailed_status
6053 db_nslcmop_update
["detailed-status"] = detailed_status
6056 + " step {} Done with result {} {}".format(
6057 step
, nslcmop_operation_state
, detailed_status
6061 step
= "Updating policies"
6062 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6063 result
= "COMPLETED"
6064 detailed_status
= "Done"
6065 db_nslcmop_update
["detailed-status"] = "Done"
6068 for item
in helm_artifacts
:
6070 item
["current_artifact_path"]
6071 and item
["target_artifact_path"]
6072 and self
.check_charm_hash_changed(
6073 item
["current_artifact_path"],
6074 item
["target_artifact_path"],
6078 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6081 vnfr_id
= db_vnfr
["_id"]
6082 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6084 "collection": "nsrs",
6085 "filter": {"_id": nsr_id
},
6086 "path": db_update_entry
,
6088 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6089 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6090 namespace
=namespace
,
6094 artifact_path
=item
["target_artifact_path"],
6097 vnf_id
= db_vnfr
.get("vnfd-ref")
6098 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6099 self
.logger
.debug("get ssh key block")
6103 ("config-access", "ssh-access", "required"),
6105 # Needed to inject a ssh key
6108 ("config-access", "ssh-access", "default-user"),
6111 "Install configuration Software, getting public ssh key"
6113 pub_key
= await self
.vca_map
[
6115 ].get_ee_ssh_public__key(
6116 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6120 "Insert public key into VM user={} ssh_key={}".format(
6124 self
.logger
.debug(logging_text
+ step
)
6126 # wait for RO (ip-address) Insert pub_key into VM
6127 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6137 initial_config_primitive_list
= config_descriptor
.get(
6138 "initial-config-primitive"
6140 config_primitive
= next(
6143 for p
in initial_config_primitive_list
6144 if p
["name"] == "config"
6148 if not config_primitive
:
6151 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6153 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6154 if db_vnfr
.get("additionalParamsForVnf"):
6155 deploy_params
.update(
6157 db_vnfr
["additionalParamsForVnf"].copy()
6160 primitive_params_
= self
._map
_primitive
_params
(
6161 config_primitive
, {}, deploy_params
6164 step
= "execute primitive '{}' params '{}'".format(
6165 config_primitive
["name"], primitive_params_
6167 self
.logger
.debug(logging_text
+ step
)
6168 await self
.vca_map
[vca_type
].exec_primitive(
6170 primitive_name
=config_primitive
["name"],
6171 params_dict
=primitive_params_
,
6177 step
= "Updating policies"
6178 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6179 detailed_status
= "Done"
6180 db_nslcmop_update
["detailed-status"] = "Done"
6182 # If nslcmop_operation_state is None, so any operation is not failed.
6183 if not nslcmop_operation_state
:
6184 nslcmop_operation_state
= "COMPLETED"
6186 # If update CHANGE_VNFPKG nslcmop_operation is successful
6187 # vnf revision need to be updated
6188 vnfr_update
["revision"] = latest_vnfd_revision
6189 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6193 + " task Done with result {} {}".format(
6194 nslcmop_operation_state
, detailed_status
6197 elif update_type
== "REMOVE_VNF":
6198 # This part is included in https://osm.etsi.org/gerrit/11876
6199 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6200 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6201 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6202 step
= "Removing VNF"
6203 (result
, detailed_status
) = await self
.remove_vnf(
6204 nsr_id
, nslcmop_id
, vnf_instance_id
6206 if result
== "FAILED":
6207 nslcmop_operation_state
= result
6208 error_description_nslcmop
= detailed_status
6209 db_nslcmop_update
["detailed-status"] = detailed_status
6210 change_type
= "vnf_terminated"
6211 if not nslcmop_operation_state
:
6212 nslcmop_operation_state
= "COMPLETED"
6215 + " task Done with result {} {}".format(
6216 nslcmop_operation_state
, detailed_status
6220 elif update_type
== "OPERATE_VNF":
6221 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6224 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6227 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6230 (result
, detailed_status
) = await self
.rebuild_start_stop(
6231 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6233 if result
== "FAILED":
6234 nslcmop_operation_state
= result
6235 error_description_nslcmop
= detailed_status
6236 db_nslcmop_update
["detailed-status"] = detailed_status
6237 if not nslcmop_operation_state
:
6238 nslcmop_operation_state
= "COMPLETED"
6241 + " task Done with result {} {}".format(
6242 nslcmop_operation_state
, detailed_status
6246 # If nslcmop_operation_state is None, so any operation is not failed.
6247 # All operations are executed in overall.
6248 if not nslcmop_operation_state
:
6249 nslcmop_operation_state
= "COMPLETED"
6250 db_nsr_update
["operational-status"] = old_operational_status
6252 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6253 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6255 except asyncio
.CancelledError
:
6257 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6259 exc
= "Operation was cancelled"
6260 except asyncio
.TimeoutError
:
6261 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6263 except Exception as e
:
6264 exc
= traceback
.format_exc()
6265 self
.logger
.critical(
6266 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6275 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6276 nslcmop_operation_state
= "FAILED"
6277 db_nsr_update
["operational-status"] = old_operational_status
6279 self
._write
_ns
_status
(
6281 ns_state
=db_nsr
["nsState"],
6282 current_operation
="IDLE",
6283 current_operation_id
=None,
6284 other_update
=db_nsr_update
,
6287 self
._write
_op
_status
(
6290 error_message
=error_description_nslcmop
,
6291 operation_state
=nslcmop_operation_state
,
6292 other_update
=db_nslcmop_update
,
6295 if nslcmop_operation_state
:
6299 "nslcmop_id": nslcmop_id
,
6300 "operationState": nslcmop_operation_state
,
6303 change_type
in ("vnf_terminated", "policy_updated")
6304 and member_vnf_index
6306 msg
.update({"vnf_member_index": member_vnf_index
})
6307 await self
.msg
.aiowrite("ns", change_type
, msg
)
6308 except Exception as e
:
6310 logging_text
+ "kafka_write notification Exception {}".format(e
)
6312 self
.logger
.debug(logging_text
+ "Exit")
6313 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6314 return nslcmop_operation_state
, detailed_status
6316 async def scale(self
, nsr_id
, nslcmop_id
):
6317 # Try to lock HA task here
6318 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6319 if not task_is_locked_by_me
:
6322 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6323 stage
= ["", "", ""]
6324 tasks_dict_info
= {}
6325 # ^ stage, step, VIM progress
6326 self
.logger
.debug(logging_text
+ "Enter")
6327 # get all needed from database
6329 db_nslcmop_update
= {}
6332 # in case of error, indicates what part of scale was failed to put nsr at error status
6333 scale_process
= None
6334 old_operational_status
= ""
6335 old_config_status
= ""
6338 # wait for any previous tasks in process
6339 step
= "Waiting for previous operations to terminate"
6340 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6341 self
._write
_ns
_status
(
6344 current_operation
="SCALING",
6345 current_operation_id
=nslcmop_id
,
6348 step
= "Getting nslcmop from database"
6350 step
+ " after having waited for previous tasks to be completed"
6352 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6354 step
= "Getting nsr from database"
6355 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6356 old_operational_status
= db_nsr
["operational-status"]
6357 old_config_status
= db_nsr
["config-status"]
6359 step
= "Parsing scaling parameters"
6360 db_nsr_update
["operational-status"] = "scaling"
6361 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6362 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6364 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6366 ]["member-vnf-index"]
6367 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6369 ]["scaling-group-descriptor"]
6370 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6371 # for backward compatibility
6372 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6373 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6374 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6375 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6377 step
= "Getting vnfr from database"
6378 db_vnfr
= self
.db
.get_one(
6379 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6382 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6384 step
= "Getting vnfd from database"
6385 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6387 base_folder
= db_vnfd
["_admin"]["storage"]
6389 step
= "Getting scaling-group-descriptor"
6390 scaling_descriptor
= find_in_list(
6391 get_scaling_aspect(db_vnfd
),
6392 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6394 if not scaling_descriptor
:
6396 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6397 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6400 step
= "Sending scale order to VIM"
6401 # TODO check if ns is in a proper status
6403 if not db_nsr
["_admin"].get("scaling-group"):
6408 "_admin.scaling-group": [
6409 {"name": scaling_group
, "nb-scale-op": 0}
6413 admin_scale_index
= 0
6415 for admin_scale_index
, admin_scale_info
in enumerate(
6416 db_nsr
["_admin"]["scaling-group"]
6418 if admin_scale_info
["name"] == scaling_group
:
6419 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6421 else: # not found, set index one plus last element and add new entry with the name
6422 admin_scale_index
+= 1
6424 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6427 vca_scaling_info
= []
6428 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6429 if scaling_type
== "SCALE_OUT":
6430 if "aspect-delta-details" not in scaling_descriptor
:
6432 "Aspect delta details not fount in scaling descriptor {}".format(
6433 scaling_descriptor
["name"]
6436 # count if max-instance-count is reached
6437 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6439 scaling_info
["scaling_direction"] = "OUT"
6440 scaling_info
["vdu-create"] = {}
6441 scaling_info
["kdu-create"] = {}
6442 for delta
in deltas
:
6443 for vdu_delta
in delta
.get("vdu-delta", {}):
6444 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6445 # vdu_index also provides the number of instance of the targeted vdu
6446 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6447 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6451 additional_params
= (
6452 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6455 cloud_init_list
= []
6457 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6458 max_instance_count
= 10
6459 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6460 max_instance_count
= vdu_profile
.get(
6461 "max-number-of-instances", 10
6464 default_instance_num
= get_number_of_instances(
6467 instances_number
= vdu_delta
.get("number-of-instances", 1)
6468 nb_scale_op
+= instances_number
6470 new_instance_count
= nb_scale_op
+ default_instance_num
6471 # Control if new count is over max and vdu count is less than max.
6472 # Then assign new instance count
6473 if new_instance_count
> max_instance_count
> vdu_count
:
6474 instances_number
= new_instance_count
- max_instance_count
6476 instances_number
= instances_number
6478 if new_instance_count
> max_instance_count
:
6480 "reached the limit of {} (max-instance-count) "
6481 "scaling-out operations for the "
6482 "scaling-group-descriptor '{}'".format(
6483 nb_scale_op
, scaling_group
6486 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6488 # TODO Information of its own ip is not available because db_vnfr is not updated.
6489 additional_params
["OSM"] = get_osm_params(
6490 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6492 cloud_init_list
.append(
6493 self
._parse
_cloud
_init
(
6500 vca_scaling_info
.append(
6502 "osm_vdu_id": vdu_delta
["id"],
6503 "member-vnf-index": vnf_index
,
6505 "vdu_index": vdu_index
+ x
,
6508 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6509 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6510 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6511 kdu_name
= kdu_profile
["kdu-name"]
6512 resource_name
= kdu_profile
.get("resource-name", "")
6514 # Might have different kdus in the same delta
6515 # Should have list for each kdu
6516 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6517 scaling_info
["kdu-create"][kdu_name
] = []
6519 kdur
= get_kdur(db_vnfr
, kdu_name
)
6520 if kdur
.get("helm-chart"):
6521 k8s_cluster_type
= "helm-chart-v3"
6522 self
.logger
.debug("kdur: {}".format(kdur
))
6524 kdur
.get("helm-version")
6525 and kdur
.get("helm-version") == "v2"
6527 k8s_cluster_type
= "helm-chart"
6528 elif kdur
.get("juju-bundle"):
6529 k8s_cluster_type
= "juju-bundle"
6532 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6533 "juju-bundle. Maybe an old NBI version is running".format(
6534 db_vnfr
["member-vnf-index-ref"], kdu_name
6538 max_instance_count
= 10
6539 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6540 max_instance_count
= kdu_profile
.get(
6541 "max-number-of-instances", 10
6544 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6545 deployed_kdu
, _
= get_deployed_kdu(
6546 nsr_deployed
, kdu_name
, vnf_index
6548 if deployed_kdu
is None:
6550 "KDU '{}' for vnf '{}' not deployed".format(
6554 kdu_instance
= deployed_kdu
.get("kdu-instance")
6555 instance_num
= await self
.k8scluster_map
[
6561 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6562 kdu_model
=deployed_kdu
.get("kdu-model"),
6564 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6565 "number-of-instances", 1
6568 # Control if new count is over max and instance_num is less than max.
6569 # Then assign max instance number to kdu replica count
6570 if kdu_replica_count
> max_instance_count
> instance_num
:
6571 kdu_replica_count
= max_instance_count
6572 if kdu_replica_count
> max_instance_count
:
6574 "reached the limit of {} (max-instance-count) "
6575 "scaling-out operations for the "
6576 "scaling-group-descriptor '{}'".format(
6577 instance_num
, scaling_group
6581 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6582 vca_scaling_info
.append(
6584 "osm_kdu_id": kdu_name
,
6585 "member-vnf-index": vnf_index
,
6587 "kdu_index": instance_num
+ x
- 1,
6590 scaling_info
["kdu-create"][kdu_name
].append(
6592 "member-vnf-index": vnf_index
,
6594 "k8s-cluster-type": k8s_cluster_type
,
6595 "resource-name": resource_name
,
6596 "scale": kdu_replica_count
,
6599 elif scaling_type
== "SCALE_IN":
6600 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6602 scaling_info
["scaling_direction"] = "IN"
6603 scaling_info
["vdu-delete"] = {}
6604 scaling_info
["kdu-delete"] = {}
6606 for delta
in deltas
:
6607 for vdu_delta
in delta
.get("vdu-delta", {}):
6608 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6609 min_instance_count
= 0
6610 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6611 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6612 min_instance_count
= vdu_profile
["min-number-of-instances"]
6614 default_instance_num
= get_number_of_instances(
6615 db_vnfd
, vdu_delta
["id"]
6617 instance_num
= vdu_delta
.get("number-of-instances", 1)
6618 nb_scale_op
-= instance_num
6620 new_instance_count
= nb_scale_op
+ default_instance_num
6622 if new_instance_count
< min_instance_count
< vdu_count
:
6623 instances_number
= min_instance_count
- new_instance_count
6625 instances_number
= instance_num
6627 if new_instance_count
< min_instance_count
:
6629 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6630 "scaling-group-descriptor '{}'".format(
6631 nb_scale_op
, scaling_group
6634 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6635 vca_scaling_info
.append(
6637 "osm_vdu_id": vdu_delta
["id"],
6638 "member-vnf-index": vnf_index
,
6640 "vdu_index": vdu_index
- 1 - x
,
6643 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6644 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6645 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6646 kdu_name
= kdu_profile
["kdu-name"]
6647 resource_name
= kdu_profile
.get("resource-name", "")
6649 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6650 scaling_info
["kdu-delete"][kdu_name
] = []
6652 kdur
= get_kdur(db_vnfr
, kdu_name
)
6653 if kdur
.get("helm-chart"):
6654 k8s_cluster_type
= "helm-chart-v3"
6655 self
.logger
.debug("kdur: {}".format(kdur
))
6657 kdur
.get("helm-version")
6658 and kdur
.get("helm-version") == "v2"
6660 k8s_cluster_type
= "helm-chart"
6661 elif kdur
.get("juju-bundle"):
6662 k8s_cluster_type
= "juju-bundle"
6665 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6666 "juju-bundle. Maybe an old NBI version is running".format(
6667 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6671 min_instance_count
= 0
6672 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6673 min_instance_count
= kdu_profile
["min-number-of-instances"]
6675 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6676 deployed_kdu
, _
= get_deployed_kdu(
6677 nsr_deployed
, kdu_name
, vnf_index
6679 if deployed_kdu
is None:
6681 "KDU '{}' for vnf '{}' not deployed".format(
6685 kdu_instance
= deployed_kdu
.get("kdu-instance")
6686 instance_num
= await self
.k8scluster_map
[
6692 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6693 kdu_model
=deployed_kdu
.get("kdu-model"),
6695 kdu_replica_count
= instance_num
- kdu_delta
.get(
6696 "number-of-instances", 1
6699 if kdu_replica_count
< min_instance_count
< instance_num
:
6700 kdu_replica_count
= min_instance_count
6701 if kdu_replica_count
< min_instance_count
:
6703 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6704 "scaling-group-descriptor '{}'".format(
6705 instance_num
, scaling_group
6709 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6710 vca_scaling_info
.append(
6712 "osm_kdu_id": kdu_name
,
6713 "member-vnf-index": vnf_index
,
6715 "kdu_index": instance_num
- x
- 1,
6718 scaling_info
["kdu-delete"][kdu_name
].append(
6720 "member-vnf-index": vnf_index
,
6722 "k8s-cluster-type": k8s_cluster_type
,
6723 "resource-name": resource_name
,
6724 "scale": kdu_replica_count
,
6728 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6729 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6730 if scaling_info
["scaling_direction"] == "IN":
6731 for vdur
in reversed(db_vnfr
["vdur"]):
6732 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6733 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6734 scaling_info
["vdu"].append(
6736 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6737 "vdu_id": vdur
["vdu-id-ref"],
6741 for interface
in vdur
["interfaces"]:
6742 scaling_info
["vdu"][-1]["interface"].append(
6744 "name": interface
["name"],
6745 "ip_address": interface
["ip-address"],
6746 "mac_address": interface
.get("mac-address"),
6749 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6752 step
= "Executing pre-scale vnf-config-primitive"
6753 if scaling_descriptor
.get("scaling-config-action"):
6754 for scaling_config_action
in scaling_descriptor
[
6755 "scaling-config-action"
6758 scaling_config_action
.get("trigger") == "pre-scale-in"
6759 and scaling_type
== "SCALE_IN"
6761 scaling_config_action
.get("trigger") == "pre-scale-out"
6762 and scaling_type
== "SCALE_OUT"
6764 vnf_config_primitive
= scaling_config_action
[
6765 "vnf-config-primitive-name-ref"
6767 step
= db_nslcmop_update
[
6769 ] = "executing pre-scale scaling-config-action '{}'".format(
6770 vnf_config_primitive
6773 # look for primitive
6774 for config_primitive
in (
6775 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6776 ).get("config-primitive", ()):
6777 if config_primitive
["name"] == vnf_config_primitive
:
6781 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6782 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6783 "primitive".format(scaling_group
, vnf_config_primitive
)
6786 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6787 if db_vnfr
.get("additionalParamsForVnf"):
6788 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6790 scale_process
= "VCA"
6791 db_nsr_update
["config-status"] = "configuring pre-scaling"
6792 primitive_params
= self
._map
_primitive
_params
(
6793 config_primitive
, {}, vnfr_params
6796 # Pre-scale retry check: Check if this sub-operation has been executed before
6797 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6800 vnf_config_primitive
,
6804 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6805 # Skip sub-operation
6806 result
= "COMPLETED"
6807 result_detail
= "Done"
6810 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6811 vnf_config_primitive
, result
, result_detail
6815 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6816 # New sub-operation: Get index of this sub-operation
6818 len(db_nslcmop
.get("_admin", {}).get("operations"))
6823 + "vnf_config_primitive={} New sub-operation".format(
6824 vnf_config_primitive
6828 # retry: Get registered params for this existing sub-operation
6829 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6832 vnf_index
= op
.get("member_vnf_index")
6833 vnf_config_primitive
= op
.get("primitive")
6834 primitive_params
= op
.get("primitive_params")
6837 + "vnf_config_primitive={} Sub-operation retry".format(
6838 vnf_config_primitive
6841 # Execute the primitive, either with new (first-time) or registered (reintent) args
6842 ee_descriptor_id
= config_primitive
.get(
6843 "execution-environment-ref"
6845 primitive_name
= config_primitive
.get(
6846 "execution-environment-primitive", vnf_config_primitive
6848 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6849 nsr_deployed
["VCA"],
6850 member_vnf_index
=vnf_index
,
6852 vdu_count_index
=None,
6853 ee_descriptor_id
=ee_descriptor_id
,
6855 result
, result_detail
= await self
._ns
_execute
_primitive
(
6864 + "vnf_config_primitive={} Done with result {} {}".format(
6865 vnf_config_primitive
, result
, result_detail
6868 # Update operationState = COMPLETED | FAILED
6869 self
._update
_suboperation
_status
(
6870 db_nslcmop
, op_index
, result
, result_detail
6873 if result
== "FAILED":
6874 raise LcmException(result_detail
)
6875 db_nsr_update
["config-status"] = old_config_status
6876 scale_process
= None
6880 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6883 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6886 # SCALE-IN VCA - BEGIN
6887 if vca_scaling_info
:
6888 step
= db_nslcmop_update
[
6890 ] = "Deleting the execution environments"
6891 scale_process
= "VCA"
6892 for vca_info
in vca_scaling_info
:
6893 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6894 member_vnf_index
= str(vca_info
["member-vnf-index"])
6896 logging_text
+ "vdu info: {}".format(vca_info
)
6898 if vca_info
.get("osm_vdu_id"):
6899 vdu_id
= vca_info
["osm_vdu_id"]
6900 vdu_index
= int(vca_info
["vdu_index"])
6903 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6904 member_vnf_index
, vdu_id
, vdu_index
6906 stage
[2] = step
= "Scaling in VCA"
6907 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6908 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6909 config_update
= db_nsr
["configurationStatus"]
6910 for vca_index
, vca
in enumerate(vca_update
):
6912 (vca
or vca
.get("ee_id"))
6913 and vca
["member-vnf-index"] == member_vnf_index
6914 and vca
["vdu_count_index"] == vdu_index
6916 if vca
.get("vdu_id"):
6917 config_descriptor
= get_configuration(
6918 db_vnfd
, vca
.get("vdu_id")
6920 elif vca
.get("kdu_name"):
6921 config_descriptor
= get_configuration(
6922 db_vnfd
, vca
.get("kdu_name")
6925 config_descriptor
= get_configuration(
6926 db_vnfd
, db_vnfd
["id"]
6928 operation_params
= (
6929 db_nslcmop
.get("operationParams") or {}
6931 exec_terminate_primitives
= not operation_params
.get(
6932 "skip_terminate_primitives"
6933 ) and vca
.get("needed_terminate")
6934 task
= asyncio
.ensure_future(
6943 exec_primitives
=exec_terminate_primitives
,
6947 timeout
=self
.timeout
.charm_delete
,
6950 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6953 del vca_update
[vca_index
]
6954 del config_update
[vca_index
]
6955 # wait for pending tasks of terminate primitives
6959 + "Waiting for tasks {}".format(
6960 list(tasks_dict_info
.keys())
6963 error_list
= await self
._wait
_for
_tasks
(
6967 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6972 tasks_dict_info
.clear()
6974 raise LcmException("; ".join(error_list
))
6976 db_vca_and_config_update
= {
6977 "_admin.deployed.VCA": vca_update
,
6978 "configurationStatus": config_update
,
6981 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6983 scale_process
= None
6984 # SCALE-IN VCA - END
6987 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6988 scale_process
= "RO"
6989 if self
.ro_config
.ng
:
6990 await self
._scale
_ng
_ro
(
6991 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6993 scaling_info
.pop("vdu-create", None)
6994 scaling_info
.pop("vdu-delete", None)
6996 scale_process
= None
7000 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
7001 scale_process
= "KDU"
7002 await self
._scale
_kdu
(
7003 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7005 scaling_info
.pop("kdu-create", None)
7006 scaling_info
.pop("kdu-delete", None)
7008 scale_process
= None
7012 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7014 # SCALE-UP VCA - BEGIN
7015 if vca_scaling_info
:
7016 step
= db_nslcmop_update
[
7018 ] = "Creating new execution environments"
7019 scale_process
= "VCA"
7020 for vca_info
in vca_scaling_info
:
7021 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7022 member_vnf_index
= str(vca_info
["member-vnf-index"])
7024 logging_text
+ "vdu info: {}".format(vca_info
)
7026 vnfd_id
= db_vnfr
["vnfd-ref"]
7027 if vca_info
.get("osm_vdu_id"):
7028 vdu_index
= int(vca_info
["vdu_index"])
7029 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7030 if db_vnfr
.get("additionalParamsForVnf"):
7031 deploy_params
.update(
7033 db_vnfr
["additionalParamsForVnf"].copy()
7036 descriptor_config
= get_configuration(
7037 db_vnfd
, db_vnfd
["id"]
7039 if descriptor_config
:
7045 logging_text
=logging_text
7046 + "member_vnf_index={} ".format(member_vnf_index
),
7049 nslcmop_id
=nslcmop_id
,
7055 kdu_index
=kdu_index
,
7056 member_vnf_index
=member_vnf_index
,
7057 vdu_index
=vdu_index
,
7059 deploy_params
=deploy_params
,
7060 descriptor_config
=descriptor_config
,
7061 base_folder
=base_folder
,
7062 task_instantiation_info
=tasks_dict_info
,
7065 vdu_id
= vca_info
["osm_vdu_id"]
7066 vdur
= find_in_list(
7067 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7069 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7070 if vdur
.get("additionalParams"):
7071 deploy_params_vdu
= parse_yaml_strings(
7072 vdur
["additionalParams"]
7075 deploy_params_vdu
= deploy_params
7076 deploy_params_vdu
["OSM"] = get_osm_params(
7077 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7079 if descriptor_config
:
7085 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7086 member_vnf_index
, vdu_id
, vdu_index
7088 stage
[2] = step
= "Scaling out VCA"
7089 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7091 logging_text
=logging_text
7092 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7093 member_vnf_index
, vdu_id
, vdu_index
7097 nslcmop_id
=nslcmop_id
,
7103 member_vnf_index
=member_vnf_index
,
7104 vdu_index
=vdu_index
,
7105 kdu_index
=kdu_index
,
7107 deploy_params
=deploy_params_vdu
,
7108 descriptor_config
=descriptor_config
,
7109 base_folder
=base_folder
,
7110 task_instantiation_info
=tasks_dict_info
,
7113 # SCALE-UP VCA - END
7114 scale_process
= None
7117 # execute primitive service POST-SCALING
7118 step
= "Executing post-scale vnf-config-primitive"
7119 if scaling_descriptor
.get("scaling-config-action"):
7120 for scaling_config_action
in scaling_descriptor
[
7121 "scaling-config-action"
7124 scaling_config_action
.get("trigger") == "post-scale-in"
7125 and scaling_type
== "SCALE_IN"
7127 scaling_config_action
.get("trigger") == "post-scale-out"
7128 and scaling_type
== "SCALE_OUT"
7130 vnf_config_primitive
= scaling_config_action
[
7131 "vnf-config-primitive-name-ref"
7133 step
= db_nslcmop_update
[
7135 ] = "executing post-scale scaling-config-action '{}'".format(
7136 vnf_config_primitive
7139 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7140 if db_vnfr
.get("additionalParamsForVnf"):
7141 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7143 # look for primitive
7144 for config_primitive
in (
7145 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7146 ).get("config-primitive", ()):
7147 if config_primitive
["name"] == vnf_config_primitive
:
7151 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7152 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7153 "config-primitive".format(
7154 scaling_group
, vnf_config_primitive
7157 scale_process
= "VCA"
7158 db_nsr_update
["config-status"] = "configuring post-scaling"
7159 primitive_params
= self
._map
_primitive
_params
(
7160 config_primitive
, {}, vnfr_params
7163 # Post-scale retry check: Check if this sub-operation has been executed before
7164 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7167 vnf_config_primitive
,
7171 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7172 # Skip sub-operation
7173 result
= "COMPLETED"
7174 result_detail
= "Done"
7177 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7178 vnf_config_primitive
, result
, result_detail
7182 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7183 # New sub-operation: Get index of this sub-operation
7185 len(db_nslcmop
.get("_admin", {}).get("operations"))
7190 + "vnf_config_primitive={} New sub-operation".format(
7191 vnf_config_primitive
7195 # retry: Get registered params for this existing sub-operation
7196 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7199 vnf_index
= op
.get("member_vnf_index")
7200 vnf_config_primitive
= op
.get("primitive")
7201 primitive_params
= op
.get("primitive_params")
7204 + "vnf_config_primitive={} Sub-operation retry".format(
7205 vnf_config_primitive
7208 # Execute the primitive, either with new (first-time) or registered (reintent) args
7209 ee_descriptor_id
= config_primitive
.get(
7210 "execution-environment-ref"
7212 primitive_name
= config_primitive
.get(
7213 "execution-environment-primitive", vnf_config_primitive
7215 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7216 nsr_deployed
["VCA"],
7217 member_vnf_index
=vnf_index
,
7219 vdu_count_index
=None,
7220 ee_descriptor_id
=ee_descriptor_id
,
7222 result
, result_detail
= await self
._ns
_execute
_primitive
(
7231 + "vnf_config_primitive={} Done with result {} {}".format(
7232 vnf_config_primitive
, result
, result_detail
7235 # Update operationState = COMPLETED | FAILED
7236 self
._update
_suboperation
_status
(
7237 db_nslcmop
, op_index
, result
, result_detail
7240 if result
== "FAILED":
7241 raise LcmException(result_detail
)
7242 db_nsr_update
["config-status"] = old_config_status
7243 scale_process
= None
7248 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7249 db_nsr_update
["operational-status"] = (
7251 if old_operational_status
== "failed"
7252 else old_operational_status
7254 db_nsr_update
["config-status"] = old_config_status
7257 ROclient
.ROClientException
,
7262 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7264 except asyncio
.CancelledError
:
7266 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7268 exc
= "Operation was cancelled"
7269 except Exception as e
:
7270 exc
= traceback
.format_exc()
7271 self
.logger
.critical(
7272 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7276 self
._write
_ns
_status
(
7279 current_operation
="IDLE",
7280 current_operation_id
=None,
7283 stage
[1] = "Waiting for instantiate pending tasks."
7284 self
.logger
.debug(logging_text
+ stage
[1])
7285 exc
= await self
._wait
_for
_tasks
(
7288 self
.timeout
.ns_deploy
,
7296 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7297 nslcmop_operation_state
= "FAILED"
7299 db_nsr_update
["operational-status"] = old_operational_status
7300 db_nsr_update
["config-status"] = old_config_status
7301 db_nsr_update
["detailed-status"] = ""
7303 if "VCA" in scale_process
:
7304 db_nsr_update
["config-status"] = "failed"
7305 if "RO" in scale_process
:
7306 db_nsr_update
["operational-status"] = "failed"
7309 ] = "FAILED scaling nslcmop={} {}: {}".format(
7310 nslcmop_id
, step
, exc
7313 error_description_nslcmop
= None
7314 nslcmop_operation_state
= "COMPLETED"
7315 db_nslcmop_update
["detailed-status"] = "Done"
7317 self
._write
_op
_status
(
7320 error_message
=error_description_nslcmop
,
7321 operation_state
=nslcmop_operation_state
,
7322 other_update
=db_nslcmop_update
,
7325 self
._write
_ns
_status
(
7328 current_operation
="IDLE",
7329 current_operation_id
=None,
7330 other_update
=db_nsr_update
,
7333 if nslcmop_operation_state
:
7337 "nslcmop_id": nslcmop_id
,
7338 "operationState": nslcmop_operation_state
,
7340 await self
.msg
.aiowrite("ns", "scaled", msg
)
7341 except Exception as e
:
7343 logging_text
+ "kafka_write notification Exception {}".format(e
)
7345 self
.logger
.debug(logging_text
+ "Exit")
7346 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7348 async def _scale_kdu(
7349 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7351 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7352 for kdu_name
in _scaling_info
:
7353 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7354 deployed_kdu
, index
= get_deployed_kdu(
7355 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7357 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7358 kdu_instance
= deployed_kdu
["kdu-instance"]
7359 kdu_model
= deployed_kdu
.get("kdu-model")
7360 scale
= int(kdu_scaling_info
["scale"])
7361 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7364 "collection": "nsrs",
7365 "filter": {"_id": nsr_id
},
7366 "path": "_admin.deployed.K8s.{}".format(index
),
7369 step
= "scaling application {}".format(
7370 kdu_scaling_info
["resource-name"]
7372 self
.logger
.debug(logging_text
+ step
)
7374 if kdu_scaling_info
["type"] == "delete":
7375 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7378 and kdu_config
.get("terminate-config-primitive")
7379 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7381 terminate_config_primitive_list
= kdu_config
.get(
7382 "terminate-config-primitive"
7384 terminate_config_primitive_list
.sort(
7385 key
=lambda val
: int(val
["seq"])
7389 terminate_config_primitive
7390 ) in terminate_config_primitive_list
:
7391 primitive_params_
= self
._map
_primitive
_params
(
7392 terminate_config_primitive
, {}, {}
7394 step
= "execute terminate config primitive"
7395 self
.logger
.debug(logging_text
+ step
)
7396 await asyncio
.wait_for(
7397 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7398 cluster_uuid
=cluster_uuid
,
7399 kdu_instance
=kdu_instance
,
7400 primitive_name
=terminate_config_primitive
["name"],
7401 params
=primitive_params_
,
7403 total_timeout
=self
.timeout
.primitive
,
7406 timeout
=self
.timeout
.primitive
7407 * self
.timeout
.primitive_outer_factor
,
7410 await asyncio
.wait_for(
7411 self
.k8scluster_map
[k8s_cluster_type
].scale(
7412 kdu_instance
=kdu_instance
,
7414 resource_name
=kdu_scaling_info
["resource-name"],
7415 total_timeout
=self
.timeout
.scale_on_error
,
7417 cluster_uuid
=cluster_uuid
,
7418 kdu_model
=kdu_model
,
7422 timeout
=self
.timeout
.scale_on_error
7423 * self
.timeout
.scale_on_error_outer_factor
,
7426 if kdu_scaling_info
["type"] == "create":
7427 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7430 and kdu_config
.get("initial-config-primitive")
7431 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7433 initial_config_primitive_list
= kdu_config
.get(
7434 "initial-config-primitive"
7436 initial_config_primitive_list
.sort(
7437 key
=lambda val
: int(val
["seq"])
7440 for initial_config_primitive
in initial_config_primitive_list
:
7441 primitive_params_
= self
._map
_primitive
_params
(
7442 initial_config_primitive
, {}, {}
7444 step
= "execute initial config primitive"
7445 self
.logger
.debug(logging_text
+ step
)
7446 await asyncio
.wait_for(
7447 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7448 cluster_uuid
=cluster_uuid
,
7449 kdu_instance
=kdu_instance
,
7450 primitive_name
=initial_config_primitive
["name"],
7451 params
=primitive_params_
,
7458 async def _scale_ng_ro(
7459 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7461 nsr_id
= db_nslcmop
["nsInstanceId"]
7462 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7465 # read from db: vnfd's for every vnf
7468 # for each vnf in ns, read vnfd
7469 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7470 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7471 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7472 # if we haven't this vnfd, read it from db
7473 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7475 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7476 db_vnfds
.append(vnfd
)
7477 n2vc_key
= self
.n2vc
.get_public_key()
7478 n2vc_key_list
= [n2vc_key
]
7481 vdu_scaling_info
.get("vdu-create"),
7482 vdu_scaling_info
.get("vdu-delete"),
7485 # db_vnfr has been updated, update db_vnfrs to use it
7486 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7487 await self
._instantiate
_ng
_ro
(
7497 start_deploy
=time(),
7498 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7500 if vdu_scaling_info
.get("vdu-delete"):
7502 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7505 async def extract_prometheus_scrape_jobs(
7509 ee_config_descriptor
: dict,
7514 vnf_member_index
: str = "",
7516 vdu_index
: int = None,
7518 kdu_index
: int = None,
7520 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7521 This method will wait until the corresponding VDU or KDU is fully instantiated
7524 ee_id (str): Execution Environment ID
7525 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7526 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7527 vnfr_id (str): VNFR ID where this EE applies
7528 nsr_id (str): NSR ID where this EE applies
7529 target_ip (str): VDU/KDU instance IP address
7530 element_type (str): NS or VNF or VDU or KDU
7531 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7532 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7533 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7534 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7535 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7538 LcmException: When the VDU or KDU instance was not found in an hour
7541 _type_: Prometheus jobs
7543 # default the vdur and kdur names to an empty string, to avoid any later
7544 # problem with Prometheus when the element type is not VDU or KDU
7548 # look if exist a file called 'prometheus*.j2' and
7549 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7553 for f
in artifact_content
7554 if f
.startswith("prometheus") and f
.endswith(".j2")
7560 self
.logger
.debug("Artifact path{}".format(artifact_path
))
7561 self
.logger
.debug("job file{}".format(job_file
))
7562 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7565 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7566 if element_type
in ("VDU", "KDU"):
7567 for _
in range(360):
7568 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7569 if vdu_id
and vdu_index
is not None:
7573 for x
in get_iterable(db_vnfr
, "vdur")
7575 x
.get("vdu-id-ref") == vdu_id
7576 and x
.get("count-index") == vdu_index
7581 if vdur
.get("name"):
7582 vdur_name
= vdur
.get("name")
7584 if kdu_name
and kdu_index
is not None:
7588 for x
in get_iterable(db_vnfr
, "kdur")
7590 x
.get("kdu-name") == kdu_name
7591 and x
.get("count-index") == kdu_index
7596 if kdur
.get("name"):
7597 kdur_name
= kdur
.get("name")
7600 await asyncio
.sleep(10)
7602 if vdu_id
and vdu_index
is not None:
7604 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7606 if kdu_name
and kdu_index
is not None:
7608 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7612 if ee_id
is not None:
7613 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7614 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7616 vnfr_id
= vnfr_id
.replace("-", "")
7618 "JOB_NAME": vnfr_id
,
7619 "TARGET_IP": target_ip
,
7620 "EXPORTER_POD_IP": host_name
,
7621 "EXPORTER_POD_PORT": host_port
,
7623 "VNF_MEMBER_INDEX": vnf_member_index
,
7624 "VDUR_NAME": vdur_name
,
7625 "KDUR_NAME": kdur_name
,
7626 "ELEMENT_TYPE": element_type
,
7629 metric_path
= ee_config_descriptor
["metric-path"]
7630 target_port
= ee_config_descriptor
["metric-port"]
7631 vnfr_id
= vnfr_id
.replace("-", "")
7633 "JOB_NAME": vnfr_id
,
7634 "TARGET_IP": target_ip
,
7635 "TARGET_PORT": target_port
,
7636 "METRIC_PATH": metric_path
,
7639 job_list
= parse_job(job_data
, variables
)
7640 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7641 for job
in job_list
:
7643 not isinstance(job
.get("job_name"), str)
7644 or vnfr_id
not in job
["job_name"]
7646 job
["job_name"] = vnfr_id
+ "_" + str(SystemRandom().randint(1, 10000))
7647 job
["nsr_id"] = nsr_id
7648 job
["vnfr_id"] = vnfr_id
7651 async def rebuild_start_stop(
7652 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7654 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7655 self
.logger
.info(logging_text
+ "Enter")
7656 stage
= ["Preparing the environment", ""]
7657 # database nsrs record
7661 # in case of error, indicates what part of scale was failed to put nsr at error status
7662 start_deploy
= time()
7664 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7665 vim_account_id
= db_vnfr
.get("vim-account-id")
7666 vim_info_key
= "vim:" + vim_account_id
7667 vdu_id
= additional_param
["vdu_id"]
7668 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7669 vdur
= find_in_list(
7670 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7673 vdu_vim_name
= vdur
["name"]
7674 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7675 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7677 raise LcmException("Target vdu is not found")
7678 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7679 # wait for any previous tasks in process
7680 stage
[1] = "Waiting for previous operations to terminate"
7681 self
.logger
.info(stage
[1])
7682 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7684 stage
[1] = "Reading from database."
7685 self
.logger
.info(stage
[1])
7686 self
._write
_ns
_status
(
7689 current_operation
=operation_type
.upper(),
7690 current_operation_id
=nslcmop_id
,
7692 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7695 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7696 db_nsr_update
["operational-status"] = operation_type
7697 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7701 "vim_vm_id": vim_vm_id
,
7703 "vdu_index": additional_param
["count-index"],
7704 "vdu_id": vdur
["id"],
7705 "target_vim": target_vim
,
7706 "vim_account_id": vim_account_id
,
7709 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7710 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7711 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7712 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7713 self
.logger
.info("response from RO: {}".format(result_dict
))
7714 action_id
= result_dict
["action_id"]
7715 await self
._wait
_ng
_ro
(
7720 self
.timeout
.operate
,
7722 "start_stop_rebuild",
7724 return "COMPLETED", "Done"
7725 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7726 self
.logger
.error("Exit Exception {}".format(e
))
7728 except asyncio
.CancelledError
:
7729 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7730 exc
= "Operation was cancelled"
7731 except Exception as e
:
7732 exc
= traceback
.format_exc()
7733 self
.logger
.critical(
7734 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7736 return "FAILED", "Error in operate VNF {}".format(exc
)
7738 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7740 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7742 :param: vim_account_id: VIM Account ID
7744 :return: (cloud_name, cloud_credential)
7746 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7747 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7749 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7751 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7753 :param: vim_account_id: VIM Account ID
7755 :return: (cloud_name, cloud_credential)
7757 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7758 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7760 async def migrate(self
, nsr_id
, nslcmop_id
):
7762 Migrate VNFs and VDUs instances in a NS
7764 :param: nsr_id: NS Instance ID
7765 :param: nslcmop_id: nslcmop ID of migrate
7768 # Try to lock HA task here
7769 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7770 if not task_is_locked_by_me
:
7772 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7773 self
.logger
.debug(logging_text
+ "Enter")
7774 # get all needed from database
7776 db_nslcmop_update
= {}
7777 nslcmop_operation_state
= None
7781 # in case of error, indicates what part of scale was failed to put nsr at error status
7782 start_deploy
= time()
7785 # wait for any previous tasks in process
7786 step
= "Waiting for previous operations to terminate"
7787 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7789 self
._write
_ns
_status
(
7792 current_operation
="MIGRATING",
7793 current_operation_id
=nslcmop_id
,
7795 step
= "Getting nslcmop from database"
7797 step
+ " after having waited for previous tasks to be completed"
7799 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7800 migrate_params
= db_nslcmop
.get("operationParams")
7803 target
.update(migrate_params
)
7804 desc
= await self
.RO
.migrate(nsr_id
, target
)
7805 self
.logger
.debug("RO return > {}".format(desc
))
7806 action_id
= desc
["action_id"]
7807 await self
._wait
_ng
_ro
(
7812 self
.timeout
.migrate
,
7813 operation
="migrate",
7815 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7816 self
.logger
.error("Exit Exception {}".format(e
))
7818 except asyncio
.CancelledError
:
7819 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7820 exc
= "Operation was cancelled"
7821 except Exception as e
:
7822 exc
= traceback
.format_exc()
7823 self
.logger
.critical(
7824 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7827 self
._write
_ns
_status
(
7830 current_operation
="IDLE",
7831 current_operation_id
=None,
7834 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7835 nslcmop_operation_state
= "FAILED"
7837 nslcmop_operation_state
= "COMPLETED"
7838 db_nslcmop_update
["detailed-status"] = "Done"
7839 db_nsr_update
["detailed-status"] = "Done"
7841 self
._write
_op
_status
(
7845 operation_state
=nslcmop_operation_state
,
7846 other_update
=db_nslcmop_update
,
7848 if nslcmop_operation_state
:
7852 "nslcmop_id": nslcmop_id
,
7853 "operationState": nslcmop_operation_state
,
7855 await self
.msg
.aiowrite("ns", "migrated", msg
)
7856 except Exception as e
:
7858 logging_text
+ "kafka_write notification Exception {}".format(e
)
7860 self
.logger
.debug(logging_text
+ "Exit")
7861 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7863 async def heal(self
, nsr_id
, nslcmop_id
):
7867 :param nsr_id: ns instance to heal
7868 :param nslcmop_id: operation to run
7872 # Try to lock HA task here
7873 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7874 if not task_is_locked_by_me
:
7877 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7878 stage
= ["", "", ""]
7879 tasks_dict_info
= {}
7880 # ^ stage, step, VIM progress
7881 self
.logger
.debug(logging_text
+ "Enter")
7882 # get all needed from database
7884 db_nslcmop_update
= {}
7886 db_vnfrs
= {} # vnf's info indexed by _id
7888 old_operational_status
= ""
7889 old_config_status
= ""
7892 # wait for any previous tasks in process
7893 step
= "Waiting for previous operations to terminate"
7894 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7895 self
._write
_ns
_status
(
7898 current_operation
="HEALING",
7899 current_operation_id
=nslcmop_id
,
7902 step
= "Getting nslcmop from database"
7904 step
+ " after having waited for previous tasks to be completed"
7906 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7908 step
= "Getting nsr from database"
7909 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7910 old_operational_status
= db_nsr
["operational-status"]
7911 old_config_status
= db_nsr
["config-status"]
7914 "_admin.deployed.RO.operational-status": "healing",
7916 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7918 step
= "Sending heal order to VIM"
7920 logging_text
=logging_text
,
7922 db_nslcmop
=db_nslcmop
,
7927 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7928 self
.logger
.debug(logging_text
+ stage
[1])
7929 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7930 self
.fs
.sync(db_nsr
["nsd-id"])
7932 # read from db: vnfr's of this ns
7933 step
= "Getting vnfrs from db"
7934 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7935 for vnfr
in db_vnfrs_list
:
7936 db_vnfrs
[vnfr
["_id"]] = vnfr
7937 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7939 # Check for each target VNF
7940 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7941 for target_vnf
in target_list
:
7942 # Find this VNF in the list from DB
7943 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7945 db_vnfr
= db_vnfrs
[vnfr_id
]
7946 vnfd_id
= db_vnfr
.get("vnfd-id")
7947 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7948 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7949 base_folder
= vnfd
["_admin"]["storage"]
7954 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7955 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7957 # Check each target VDU and deploy N2VC
7958 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7961 if not target_vdu_list
:
7962 # Codigo nuevo para crear diccionario
7963 target_vdu_list
= []
7964 for existing_vdu
in db_vnfr
.get("vdur"):
7965 vdu_name
= existing_vdu
.get("vdu-name", None)
7966 vdu_index
= existing_vdu
.get("count-index", 0)
7967 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7970 vdu_to_be_healed
= {
7972 "count-index": vdu_index
,
7973 "run-day1": vdu_run_day1
,
7975 target_vdu_list
.append(vdu_to_be_healed
)
7976 for target_vdu
in target_vdu_list
:
7977 deploy_params_vdu
= target_vdu
7978 # Set run-day1 vnf level value if not vdu level value exists
7979 if not deploy_params_vdu
.get("run-day1") and target_vnf
.get(
7980 "additionalParams", {}
7982 deploy_params_vdu
["run-day1"] = target_vnf
[
7985 vdu_name
= target_vdu
.get("vdu-id", None)
7986 # TODO: Get vdu_id from vdud.
7988 # For multi instance VDU count-index is mandatory
7989 # For single session VDU count-indes is 0
7990 vdu_index
= target_vdu
.get("count-index", 0)
7992 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7993 stage
[1] = "Deploying Execution Environments."
7994 self
.logger
.debug(logging_text
+ stage
[1])
7996 # VNF Level charm. Normal case when proxy charms.
7997 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7998 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7999 if descriptor_config
:
8000 # Continue if healed machine is management machine
8001 vnf_ip_address
= db_vnfr
.get("ip-address")
8002 target_instance
= None
8003 for instance
in db_vnfr
.get("vdur", None):
8005 instance
["vdu-name"] == vdu_name
8006 and instance
["count-index"] == vdu_index
8008 target_instance
= instance
8010 if vnf_ip_address
== target_instance
.get("ip-address"):
8012 logging_text
=logging_text
8013 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8014 member_vnf_index
, vdu_name
, vdu_index
8018 nslcmop_id
=nslcmop_id
,
8024 member_vnf_index
=member_vnf_index
,
8027 deploy_params
=deploy_params_vdu
,
8028 descriptor_config
=descriptor_config
,
8029 base_folder
=base_folder
,
8030 task_instantiation_info
=tasks_dict_info
,
8034 # VDU Level charm. Normal case with native charms.
8035 descriptor_config
= get_configuration(vnfd
, vdu_name
)
8036 if descriptor_config
:
8038 logging_text
=logging_text
8039 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8040 member_vnf_index
, vdu_name
, vdu_index
8044 nslcmop_id
=nslcmop_id
,
8050 member_vnf_index
=member_vnf_index
,
8051 vdu_index
=vdu_index
,
8053 deploy_params
=deploy_params_vdu
,
8054 descriptor_config
=descriptor_config
,
8055 base_folder
=base_folder
,
8056 task_instantiation_info
=tasks_dict_info
,
8061 ROclient
.ROClientException
,
8066 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
8068 except asyncio
.CancelledError
:
8070 logging_text
+ "Cancelled Exception while '{}'".format(step
)
8072 exc
= "Operation was cancelled"
8073 except Exception as e
:
8074 exc
= traceback
.format_exc()
8075 self
.logger
.critical(
8076 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
8081 stage
[1] = "Waiting for healing pending tasks."
8082 self
.logger
.debug(logging_text
+ stage
[1])
8083 exc
= await self
._wait
_for
_tasks
(
8086 self
.timeout
.ns_deploy
,
8094 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
8095 nslcmop_operation_state
= "FAILED"
8097 db_nsr_update
["operational-status"] = old_operational_status
8098 db_nsr_update
["config-status"] = old_config_status
8101 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
8102 for task
, task_name
in tasks_dict_info
.items():
8103 if not task
.done() or task
.cancelled() or task
.exception():
8104 if task_name
.startswith(self
.task_name_deploy_vca
):
8105 # A N2VC task is pending
8106 db_nsr_update
["config-status"] = "failed"
8108 # RO task is pending
8109 db_nsr_update
["operational-status"] = "failed"
8111 error_description_nslcmop
= None
8112 nslcmop_operation_state
= "COMPLETED"
8113 db_nslcmop_update
["detailed-status"] = "Done"
8114 db_nsr_update
["detailed-status"] = "Done"
8115 db_nsr_update
["operational-status"] = "running"
8116 db_nsr_update
["config-status"] = "configured"
8118 self
._write
_op
_status
(
8121 error_message
=error_description_nslcmop
,
8122 operation_state
=nslcmop_operation_state
,
8123 other_update
=db_nslcmop_update
,
8126 self
._write
_ns
_status
(
8129 current_operation
="IDLE",
8130 current_operation_id
=None,
8131 other_update
=db_nsr_update
,
8134 if nslcmop_operation_state
:
8138 "nslcmop_id": nslcmop_id
,
8139 "operationState": nslcmop_operation_state
,
8141 await self
.msg
.aiowrite("ns", "healed", msg
)
8142 except Exception as e
:
8144 logging_text
+ "kafka_write notification Exception {}".format(e
)
8146 self
.logger
.debug(logging_text
+ "Exit")
8147 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8158 :param logging_text: preffix text to use at logging
8159 :param nsr_id: nsr identity
8160 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8161 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8162 :return: None or exception
8165 def get_vim_account(vim_account_id
):
8167 if vim_account_id
in db_vims
:
8168 return db_vims
[vim_account_id
]
8169 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8170 db_vims
[vim_account_id
] = db_vim
8175 ns_params
= db_nslcmop
.get("operationParams")
8176 if ns_params
and ns_params
.get("timeout_ns_heal"):
8177 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8179 timeout_ns_heal
= self
.timeout
.ns_heal
8183 nslcmop_id
= db_nslcmop
["_id"]
8185 "action_id": nslcmop_id
,
8187 self
.logger
.warning(
8188 "db_nslcmop={} and timeout_ns_heal={}".format(
8189 db_nslcmop
, timeout_ns_heal
8192 target
.update(db_nslcmop
.get("operationParams", {}))
8194 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8195 desc
= await self
.RO
.recreate(nsr_id
, target
)
8196 self
.logger
.debug("RO return > {}".format(desc
))
8197 action_id
= desc
["action_id"]
8198 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8199 await self
._wait
_ng
_ro
(
8206 operation
="healing",
8211 "_admin.deployed.RO.operational-status": "running",
8212 "detailed-status": " ".join(stage
),
8214 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8215 self
._write
_op
_status
(nslcmop_id
, stage
)
8217 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8220 except Exception as e
:
8221 stage
[2] = "ERROR healing at VIM"
8222 # self.set_vnfr_at_error(db_vnfrs, str(e))
8224 "Error healing at VIM {}".format(e
),
8225 exc_info
=not isinstance(
8228 ROclient
.ROClientException
,
8254 task_instantiation_info
,
8257 # launch instantiate_N2VC in a asyncio task and register task object
8258 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8259 # if not found, create one entry and update database
8260 # fill db_nsr._admin.deployed.VCA.<index>
8263 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8267 get_charm_name
= False
8268 if "execution-environment-list" in descriptor_config
:
8269 ee_list
= descriptor_config
.get("execution-environment-list", [])
8270 elif "juju" in descriptor_config
:
8271 ee_list
= [descriptor_config
] # ns charms
8272 if "execution-environment-list" not in descriptor_config
:
8273 # charm name is only required for ns charms
8274 get_charm_name
= True
8275 else: # other types as script are not supported
8278 for ee_item
in ee_list
:
8281 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8282 ee_item
.get("juju"), ee_item
.get("helm-chart")
8285 ee_descriptor_id
= ee_item
.get("id")
8286 if ee_item
.get("juju"):
8287 vca_name
= ee_item
["juju"].get("charm")
8289 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8292 if ee_item
["juju"].get("charm") is not None
8295 if ee_item
["juju"].get("cloud") == "k8s":
8296 vca_type
= "k8s_proxy_charm"
8297 elif ee_item
["juju"].get("proxy") is False:
8298 vca_type
= "native_charm"
8299 elif ee_item
.get("helm-chart"):
8300 vca_name
= ee_item
["helm-chart"]
8301 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8304 vca_type
= "helm-v3"
8307 logging_text
+ "skipping non juju neither charm configuration"
8312 for vca_index
, vca_deployed
in enumerate(
8313 db_nsr
["_admin"]["deployed"]["VCA"]
8315 if not vca_deployed
:
8318 vca_deployed
.get("member-vnf-index") == member_vnf_index
8319 and vca_deployed
.get("vdu_id") == vdu_id
8320 and vca_deployed
.get("kdu_name") == kdu_name
8321 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8322 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8326 # not found, create one.
8328 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8331 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8333 target
+= "/kdu/{}".format(kdu_name
)
8335 "target_element": target
,
8336 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8337 "member-vnf-index": member_vnf_index
,
8339 "kdu_name": kdu_name
,
8340 "vdu_count_index": vdu_index
,
8341 "operational-status": "init", # TODO revise
8342 "detailed-status": "", # TODO revise
8343 "step": "initial-deploy", # TODO revise
8345 "vdu_name": vdu_name
,
8347 "ee_descriptor_id": ee_descriptor_id
,
8348 "charm_name": charm_name
,
8352 # create VCA and configurationStatus in db
8354 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8355 "configurationStatus.{}".format(vca_index
): dict(),
8357 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8359 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8361 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8362 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8363 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8366 task_n2vc
= asyncio
.ensure_future(
8368 logging_text
=logging_text
,
8369 vca_index
=vca_index
,
8375 vdu_index
=vdu_index
,
8376 deploy_params
=deploy_params
,
8377 config_descriptor
=descriptor_config
,
8378 base_folder
=base_folder
,
8379 nslcmop_id
=nslcmop_id
,
8383 ee_config_descriptor
=ee_item
,
8386 self
.lcm_tasks
.register(
8390 "instantiate_N2VC-{}".format(vca_index
),
8393 task_instantiation_info
[
8395 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8396 member_vnf_index
or "", vdu_id
or ""
8399 async def heal_N2VC(
8416 ee_config_descriptor
,
8418 nsr_id
= db_nsr
["_id"]
8419 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8420 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8421 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8422 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8424 "collection": "nsrs",
8425 "filter": {"_id": nsr_id
},
8426 "path": db_update_entry
,
8431 element_under_configuration
= nsr_id
8435 vnfr_id
= db_vnfr
["_id"]
8436 osm_config
["osm"]["vnf_id"] = vnfr_id
8438 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8440 if vca_type
== "native_charm":
8443 index_number
= vdu_index
or 0
8446 element_type
= "VNF"
8447 element_under_configuration
= vnfr_id
8448 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8450 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8451 element_type
= "VDU"
8452 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8453 osm_config
["osm"]["vdu_id"] = vdu_id
8455 namespace
+= ".{}".format(kdu_name
)
8456 element_type
= "KDU"
8457 element_under_configuration
= kdu_name
8458 osm_config
["osm"]["kdu_name"] = kdu_name
8461 if base_folder
["pkg-dir"]:
8462 artifact_path
= "{}/{}/{}/{}".format(
8463 base_folder
["folder"],
8464 base_folder
["pkg-dir"],
8467 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8472 artifact_path
= "{}/Scripts/{}/{}/".format(
8473 base_folder
["folder"],
8476 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8481 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8483 # get initial_config_primitive_list that applies to this element
8484 initial_config_primitive_list
= config_descriptor
.get(
8485 "initial-config-primitive"
8489 "Initial config primitive list > {}".format(
8490 initial_config_primitive_list
8494 # add config if not present for NS charm
8495 ee_descriptor_id
= ee_config_descriptor
.get("id")
8496 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8497 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8498 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8502 "Initial config primitive list #2 > {}".format(
8503 initial_config_primitive_list
8506 # n2vc_redesign STEP 3.1
8507 # find old ee_id if exists
8508 ee_id
= vca_deployed
.get("ee_id")
8510 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8511 # create or register execution environment in VCA. Only for native charms when healing
8512 if vca_type
== "native_charm":
8513 step
= "Waiting to VM being up and getting IP address"
8514 self
.logger
.debug(logging_text
+ step
)
8515 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8524 credentials
= {"hostname": rw_mgmt_ip
}
8526 username
= deep_get(
8527 config_descriptor
, ("config-access", "ssh-access", "default-user")
8529 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8530 # merged. Meanwhile let's get username from initial-config-primitive
8531 if not username
and initial_config_primitive_list
:
8532 for config_primitive
in initial_config_primitive_list
:
8533 for param
in config_primitive
.get("parameter", ()):
8534 if param
["name"] == "ssh-username":
8535 username
= param
["value"]
8539 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8540 "'config-access.ssh-access.default-user'"
8542 credentials
["username"] = username
8544 # n2vc_redesign STEP 3.2
8545 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8546 self
._write
_configuration
_status
(
8548 vca_index
=vca_index
,
8549 status
="REGISTERING",
8550 element_under_configuration
=element_under_configuration
,
8551 element_type
=element_type
,
8554 step
= "register execution environment {}".format(credentials
)
8555 self
.logger
.debug(logging_text
+ step
)
8556 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8557 credentials
=credentials
,
8558 namespace
=namespace
,
8563 # update ee_id en db
8565 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8567 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8569 # for compatibility with MON/POL modules, the need model and application name at database
8570 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8571 # Not sure if this need to be done when healing
8573 ee_id_parts = ee_id.split(".")
8574 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8575 if len(ee_id_parts) >= 2:
8576 model_name = ee_id_parts[0]
8577 application_name = ee_id_parts[1]
8578 db_nsr_update[db_update_entry + "model"] = model_name
8579 db_nsr_update[db_update_entry + "application"] = application_name
8582 # n2vc_redesign STEP 3.3
8583 # Install configuration software. Only for native charms.
8584 step
= "Install configuration Software"
8586 self
._write
_configuration
_status
(
8588 vca_index
=vca_index
,
8589 status
="INSTALLING SW",
8590 element_under_configuration
=element_under_configuration
,
8591 element_type
=element_type
,
8592 # other_update=db_nsr_update,
8596 # TODO check if already done
8597 self
.logger
.debug(logging_text
+ step
)
8599 if vca_type
== "native_charm":
8600 config_primitive
= next(
8601 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8604 if config_primitive
:
8605 config
= self
._map
_primitive
_params
(
8606 config_primitive
, {}, deploy_params
8608 await self
.vca_map
[vca_type
].install_configuration_sw(
8610 artifact_path
=artifact_path
,
8618 # write in db flag of configuration_sw already installed
8620 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8623 # Not sure if this need to be done when healing
8625 # add relations for this VCA (wait for other peers related with this VCA)
8626 await self._add_vca_relations(
8627 logging_text=logging_text,
8630 vca_index=vca_index,
8634 # if SSH access is required, then get execution environment SSH public
8635 # if native charm we have waited already to VM be UP
8636 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8639 # self.logger.debug("get ssh key block")
8641 config_descriptor
, ("config-access", "ssh-access", "required")
8643 # self.logger.debug("ssh key needed")
8644 # Needed to inject a ssh key
8647 ("config-access", "ssh-access", "default-user"),
8649 step
= "Install configuration Software, getting public ssh key"
8650 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8651 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8654 step
= "Insert public key into VM user={} ssh_key={}".format(
8658 # self.logger.debug("no need to get ssh key")
8659 step
= "Waiting to VM being up and getting IP address"
8660 self
.logger
.debug(logging_text
+ step
)
8662 # n2vc_redesign STEP 5.1
8663 # wait for RO (ip-address) Insert pub_key into VM
8664 # IMPORTANT: We need do wait for RO to complete healing operation.
8665 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8668 rw_mgmt_ip
= await self
.wait_kdu_up(
8669 logging_text
, nsr_id
, vnfr_id
, kdu_name
8672 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8682 rw_mgmt_ip
= None # This is for a NS configuration
8684 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8686 # store rw_mgmt_ip in deploy params for later replacement
8687 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8690 # get run-day1 operation parameter
8691 runDay1
= deploy_params
.get("run-day1", False)
8693 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8696 # n2vc_redesign STEP 6 Execute initial config primitive
8697 step
= "execute initial config primitive"
8699 # wait for dependent primitives execution (NS -> VNF -> VDU)
8700 if initial_config_primitive_list
:
8701 await self
._wait
_dependent
_n
2vc
(
8702 nsr_id
, vca_deployed_list
, vca_index
8705 # stage, in function of element type: vdu, kdu, vnf or ns
8706 my_vca
= vca_deployed_list
[vca_index
]
8707 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8709 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8710 elif my_vca
.get("member-vnf-index"):
8712 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8715 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8717 self
._write
_configuration
_status
(
8718 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8721 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8723 check_if_terminated_needed
= True
8724 for initial_config_primitive
in initial_config_primitive_list
:
8725 # adding information on the vca_deployed if it is a NS execution environment
8726 if not vca_deployed
["member-vnf-index"]:
8727 deploy_params
["ns_config_info"] = json
.dumps(
8728 self
._get
_ns
_config
_info
(nsr_id
)
8730 # TODO check if already done
8731 primitive_params_
= self
._map
_primitive
_params
(
8732 initial_config_primitive
, {}, deploy_params
8735 step
= "execute primitive '{}' params '{}'".format(
8736 initial_config_primitive
["name"], primitive_params_
8738 self
.logger
.debug(logging_text
+ step
)
8739 await self
.vca_map
[vca_type
].exec_primitive(
8741 primitive_name
=initial_config_primitive
["name"],
8742 params_dict
=primitive_params_
,
8747 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8748 if check_if_terminated_needed
:
8749 if config_descriptor
.get("terminate-config-primitive"):
8753 {db_update_entry
+ "needed_terminate": True},
8755 check_if_terminated_needed
= False
8757 # TODO register in database that primitive is done
8759 # STEP 7 Configure metrics
8760 # Not sure if this need to be done when healing
8762 if vca_type == "helm" or vca_type == "helm-v3":
8763 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8765 artifact_path=artifact_path,
8766 ee_config_descriptor=ee_config_descriptor,
8769 target_ip=rw_mgmt_ip,
8775 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8778 for job in prometheus_jobs:
8781 {"job_name": job["job_name"]},
8784 fail_on_empty=False,
8788 step
= "instantiated at VCA"
8789 self
.logger
.debug(logging_text
+ step
)
8791 self
._write
_configuration
_status
(
8792 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8795 except Exception as e
: # TODO not use Exception but N2VC exception
8796 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8798 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8801 "Exception while {} : {}".format(step
, e
), exc_info
=True
8803 self
._write
_configuration
_status
(
8804 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8806 raise LcmException("{} {}".format(step
, e
)) from e
8808 async def _wait_heal_ro(
8814 while time() <= start_time
+ timeout
:
8815 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8816 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8817 "operational-status"
8819 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8820 if operational_status_ro
!= "healing":
8822 await asyncio
.sleep(15)
8823 else: # timeout_ns_deploy
8824 raise NgRoException("Timeout waiting ns to deploy")
8826 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8828 Vertical Scale the VDUs in a NS
8830 :param: nsr_id: NS Instance ID
8831 :param: nslcmop_id: nslcmop ID of migrate
8834 # Try to lock HA task here
8835 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8836 if not task_is_locked_by_me
:
8838 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8839 self
.logger
.debug(logging_text
+ "Enter")
8840 # get all needed from database
8842 db_nslcmop_update
= {}
8843 nslcmop_operation_state
= None
8847 # in case of error, indicates what part of scale was failed to put nsr at error status
8848 start_deploy
= time()
8851 # wait for any previous tasks in process
8852 step
= "Waiting for previous operations to terminate"
8853 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8855 self
._write
_ns
_status
(
8858 current_operation
="VerticalScale",
8859 current_operation_id
=nslcmop_id
,
8861 step
= "Getting nslcmop from database"
8863 step
+ " after having waited for previous tasks to be completed"
8865 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8866 operationParams
= db_nslcmop
.get("operationParams")
8868 target
.update(operationParams
)
8869 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8870 self
.logger
.debug("RO return > {}".format(desc
))
8871 action_id
= desc
["action_id"]
8872 await self
._wait
_ng
_ro
(
8877 self
.timeout
.verticalscale
,
8878 operation
="verticalscale",
8880 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8881 self
.logger
.error("Exit Exception {}".format(e
))
8883 except asyncio
.CancelledError
:
8884 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8885 exc
= "Operation was cancelled"
8886 except Exception as e
:
8887 exc
= traceback
.format_exc()
8888 self
.logger
.critical(
8889 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8892 self
._write
_ns
_status
(
8895 current_operation
="IDLE",
8896 current_operation_id
=None,
8899 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8900 nslcmop_operation_state
= "FAILED"
8902 nslcmop_operation_state
= "COMPLETED"
8903 db_nslcmop_update
["detailed-status"] = "Done"
8904 db_nsr_update
["detailed-status"] = "Done"
8906 self
._write
_op
_status
(
8910 operation_state
=nslcmop_operation_state
,
8911 other_update
=db_nslcmop_update
,
8913 if nslcmop_operation_state
:
8917 "nslcmop_id": nslcmop_id
,
8918 "operationState": nslcmop_operation_state
,
8920 await self
.msg
.aiowrite("ns", "verticalscaled", msg
)
8921 except Exception as e
:
8923 logging_text
+ "kafka_write notification Exception {}".format(e
)
8925 self
.logger
.debug(logging_text
+ "Exit")
8926 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")