1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import SystemRandom
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
148 self
.lcm_tasks
= lcm_tasks
149 self
.timeout
= config
.timeout
150 self
.ro_config
= config
.RO
151 self
.vca_config
= config
.VCA
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
156 on_update_db
=self
._on
_update
_n
2vc
_db
,
161 self
.conn_helm_ee
= LCMHelmConn(
163 vca_config
=self
.vca_config
,
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
167 self
.k8sclusterhelm2
= K8sHelmConnector(
168 kubectl_command
=self
.vca_config
.kubectlpath
,
169 helm_command
=self
.vca_config
.helmpath
,
176 self
.k8sclusterhelm3
= K8sHelm3Connector(
177 kubectl_command
=self
.vca_config
.kubectlpath
,
178 helm_command
=self
.vca_config
.helm3path
,
185 self
.k8sclusterjuju
= K8sJujuConnector(
186 kubectl_command
=self
.vca_config
.kubectlpath
,
187 juju_command
=self
.vca_config
.jujupath
,
189 on_update_db
=self
._on
_update
_k
8s
_db
,
194 self
.k8scluster_map
= {
195 "helm-chart": self
.k8sclusterhelm2
,
196 "helm-chart-v3": self
.k8sclusterhelm3
,
197 "chart": self
.k8sclusterhelm3
,
198 "juju-bundle": self
.k8sclusterjuju
,
199 "juju": self
.k8sclusterjuju
,
203 "lxc_proxy_charm": self
.n2vc
,
204 "native_charm": self
.n2vc
,
205 "k8s_proxy_charm": self
.n2vc
,
206 "helm": self
.conn_helm_ee
,
207 "helm-v3": self
.conn_helm_ee
,
211 self
.RO
= NgRoClient(**self
.ro_config
.to_dict())
213 self
.op_status_map
= {
214 "instantiation": self
.RO
.status
,
215 "termination": self
.RO
.status
,
216 "migrate": self
.RO
.status
,
217 "healing": self
.RO
.recreate_status
,
218 "verticalscale": self
.RO
.status
,
219 "start_stop_rebuild": self
.RO
.status
,
223 def increment_ip_mac(ip_mac
, vm_index
=1):
224 if not isinstance(ip_mac
, str):
227 # try with ipv4 look for last dot
228 i
= ip_mac
.rfind(".")
231 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
232 # try with ipv6 or mac look for last colon. Operate in hex
233 i
= ip_mac
.rfind(":")
236 # format in hex, len can be 2 for mac or 4 for ipv6
237 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
238 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
244 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
245 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
248 # TODO filter RO descriptor fields...
252 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
253 db_dict
["deploymentStatus"] = ro_descriptor
254 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
256 except Exception as e
:
258 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
261 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
262 # remove last dot from path (if exists)
263 if path
.endswith("."):
266 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
267 # .format(table, filter, path, updated_data))
269 nsr_id
= filter.get("_id")
271 # read ns record from database
272 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
273 current_ns_status
= nsr
.get("nsState")
275 # get vca status for NS
276 status_dict
= await self
.n2vc
.get_status(
277 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
282 db_dict
["vcaStatus"] = status_dict
284 # update configurationStatus for this VCA
286 vca_index
= int(path
[path
.rfind(".") + 1 :])
289 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
291 vca_status
= vca_list
[vca_index
].get("status")
293 configuration_status_list
= nsr
.get("configurationStatus")
294 config_status
= configuration_status_list
[vca_index
].get("status")
296 if config_status
== "BROKEN" and vca_status
!= "failed":
297 db_dict
["configurationStatus"][vca_index
] = "READY"
298 elif config_status
!= "BROKEN" and vca_status
== "failed":
299 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
300 except Exception as e
:
301 # not update configurationStatus
302 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
304 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
305 # if nsState = 'DEGRADED' check if all is OK
307 if current_ns_status
in ("READY", "DEGRADED"):
308 error_description
= ""
310 if status_dict
.get("machines"):
311 for machine_id
in status_dict
.get("machines"):
312 machine
= status_dict
.get("machines").get(machine_id
)
313 # check machine agent-status
314 if machine
.get("agent-status"):
315 s
= machine
.get("agent-status").get("status")
318 error_description
+= (
319 "machine {} agent-status={} ; ".format(
323 # check machine instance status
324 if machine
.get("instance-status"):
325 s
= machine
.get("instance-status").get("status")
328 error_description
+= (
329 "machine {} instance-status={} ; ".format(
334 if status_dict
.get("applications"):
335 for app_id
in status_dict
.get("applications"):
336 app
= status_dict
.get("applications").get(app_id
)
337 # check application status
338 if app
.get("status"):
339 s
= app
.get("status").get("status")
342 error_description
+= (
343 "application {} status={} ; ".format(app_id
, s
)
346 if error_description
:
347 db_dict
["errorDescription"] = error_description
348 if current_ns_status
== "READY" and is_degraded
:
349 db_dict
["nsState"] = "DEGRADED"
350 if current_ns_status
== "DEGRADED" and not is_degraded
:
351 db_dict
["nsState"] = "READY"
354 self
.update_db_2("nsrs", nsr_id
, db_dict
)
356 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
358 except Exception as e
:
359 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
361 async def _on_update_k8s_db(
362 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
365 Updating vca status in NSR record
366 :param cluster_uuid: UUID of a k8s cluster
367 :param kdu_instance: The unique name of the KDU instance
368 :param filter: To get nsr_id
369 :cluster_type: The cluster type (juju, k8s)
373 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
374 # .format(cluster_uuid, kdu_instance, filter))
376 nsr_id
= filter.get("_id")
378 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
379 cluster_uuid
=cluster_uuid
,
380 kdu_instance
=kdu_instance
,
382 complete_status
=True,
388 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
391 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
395 self
.update_db_2("nsrs", nsr_id
, db_dict
)
396 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
398 except Exception as e
:
399 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
402 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
405 undefined
=StrictUndefined
,
406 autoescape
=select_autoescape(default_for_string
=True, default
=True),
408 template
= env
.from_string(cloud_init_text
)
409 return template
.render(additional_params
or {})
410 except UndefinedError
as e
:
412 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
413 "file, must be provided in the instantiation parameters inside the "
414 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
416 except (TemplateError
, TemplateNotFound
) as e
:
418 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
423 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
424 cloud_init_content
= cloud_init_file
= None
426 if vdu
.get("cloud-init-file"):
427 base_folder
= vnfd
["_admin"]["storage"]
428 if base_folder
["pkg-dir"]:
429 cloud_init_file
= "{}/{}/cloud_init/{}".format(
430 base_folder
["folder"],
431 base_folder
["pkg-dir"],
432 vdu
["cloud-init-file"],
435 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
436 base_folder
["folder"],
437 vdu
["cloud-init-file"],
439 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
440 cloud_init_content
= ci_file
.read()
441 elif vdu
.get("cloud-init"):
442 cloud_init_content
= vdu
["cloud-init"]
444 return cloud_init_content
445 except FsException
as e
:
447 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
448 vnfd
["id"], vdu
["id"], cloud_init_file
, e
452 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
454 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
456 additional_params
= vdur
.get("additionalParams")
457 return parse_yaml_strings(additional_params
)
459 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
461 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
462 :param vnfd: input vnfd
463 :param new_id: overrides vnf id if provided
464 :param additionalParams: Instantiation params for VNFs provided
465 :param nsrId: Id of the NSR
466 :return: copy of vnfd
468 vnfd_RO
= deepcopy(vnfd
)
469 # remove unused by RO configuration, monitoring, scaling and internal keys
470 vnfd_RO
.pop("_id", None)
471 vnfd_RO
.pop("_admin", None)
472 vnfd_RO
.pop("monitoring-param", None)
473 vnfd_RO
.pop("scaling-group-descriptor", None)
474 vnfd_RO
.pop("kdu", None)
475 vnfd_RO
.pop("k8s-cluster", None)
477 vnfd_RO
["id"] = new_id
479 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
480 for vdu
in get_iterable(vnfd_RO
, "vdu"):
481 vdu
.pop("cloud-init-file", None)
482 vdu
.pop("cloud-init", None)
486 def ip_profile_2_RO(ip_profile
):
487 RO_ip_profile
= deepcopy(ip_profile
)
488 if "dns-server" in RO_ip_profile
:
489 if isinstance(RO_ip_profile
["dns-server"], list):
490 RO_ip_profile
["dns-address"] = []
491 for ds
in RO_ip_profile
.pop("dns-server"):
492 RO_ip_profile
["dns-address"].append(ds
["address"])
494 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
495 if RO_ip_profile
.get("ip-version") == "ipv4":
496 RO_ip_profile
["ip-version"] = "IPv4"
497 if RO_ip_profile
.get("ip-version") == "ipv6":
498 RO_ip_profile
["ip-version"] = "IPv6"
499 if "dhcp-params" in RO_ip_profile
:
500 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
503 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
504 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
505 if db_vim
["_admin"]["operationalState"] != "ENABLED":
507 "VIM={} is not available. operationalState={}".format(
508 vim_account
, db_vim
["_admin"]["operationalState"]
511 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
514 def get_ro_wim_id_for_wim_account(self
, wim_account
):
515 if isinstance(wim_account
, str):
516 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
517 if db_wim
["_admin"]["operationalState"] != "ENABLED":
519 "WIM={} is not available. operationalState={}".format(
520 wim_account
, db_wim
["_admin"]["operationalState"]
523 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
528 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
529 db_vdu_push_list
= []
531 db_update
= {"_admin.modified": time()}
533 for vdu_id
, vdu_count
in vdu_create
.items():
537 for vdur
in reversed(db_vnfr
["vdur"])
538 if vdur
["vdu-id-ref"] == vdu_id
543 # Read the template saved in the db:
545 "No vdur in the database. Using the vdur-template to scale"
547 vdur_template
= db_vnfr
.get("vdur-template")
548 if not vdur_template
:
550 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
554 vdur
= vdur_template
[0]
555 # Delete a template from the database after using it
558 {"_id": db_vnfr
["_id"]},
560 pull
={"vdur-template": {"_id": vdur
["_id"]}},
562 for count
in range(vdu_count
):
563 vdur_copy
= deepcopy(vdur
)
564 vdur_copy
["status"] = "BUILD"
565 vdur_copy
["status-detailed"] = None
566 vdur_copy
["ip-address"] = None
567 vdur_copy
["_id"] = str(uuid4())
568 vdur_copy
["count-index"] += count
+ 1
569 vdur_copy
["id"] = "{}-{}".format(
570 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
572 vdur_copy
.pop("vim_info", None)
573 for iface
in vdur_copy
["interfaces"]:
574 if iface
.get("fixed-ip"):
575 iface
["ip-address"] = self
.increment_ip_mac(
576 iface
["ip-address"], count
+ 1
579 iface
.pop("ip-address", None)
580 if iface
.get("fixed-mac"):
581 iface
["mac-address"] = self
.increment_ip_mac(
582 iface
["mac-address"], count
+ 1
585 iface
.pop("mac-address", None)
589 ) # only first vdu can be managment of vnf
590 db_vdu_push_list
.append(vdur_copy
)
591 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
593 if len(db_vnfr
["vdur"]) == 1:
594 # The scale will move to 0 instances
596 "Scaling to 0 !, creating the template with the last vdur"
598 template_vdur
= [db_vnfr
["vdur"][0]]
599 for vdu_id
, vdu_count
in vdu_delete
.items():
601 indexes_to_delete
= [
603 for iv
in enumerate(db_vnfr
["vdur"])
604 if iv
[1]["vdu-id-ref"] == vdu_id
608 "vdur.{}.status".format(i
): "DELETING"
609 for i
in indexes_to_delete
[-vdu_count
:]
613 # it must be deleted one by one because common.db does not allow otherwise
616 for v
in reversed(db_vnfr
["vdur"])
617 if v
["vdu-id-ref"] == vdu_id
619 for vdu
in vdus_to_delete
[:vdu_count
]:
622 {"_id": db_vnfr
["_id"]},
624 pull
={"vdur": {"_id": vdu
["_id"]}},
628 db_push
["vdur"] = db_vdu_push_list
630 db_push
["vdur-template"] = template_vdur
633 db_vnfr
["vdur-template"] = template_vdur
634 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
635 # modify passed dictionary db_vnfr
636 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
637 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
639 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
641 Updates database nsr with the RO info for the created vld
642 :param ns_update_nsr: dictionary to be filled with the updated info
643 :param db_nsr: content of db_nsr. This is also modified
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
648 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
649 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
650 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
652 vld
["vim-id"] = net_RO
.get("vim_net_id")
653 vld
["name"] = net_RO
.get("vim_name")
654 vld
["status"] = net_RO
.get("status")
655 vld
["status-detailed"] = net_RO
.get("error_msg")
656 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
660 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
663 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
665 for db_vnfr
in db_vnfrs
.values():
666 vnfr_update
= {"status": "ERROR"}
667 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
668 if "status" not in vdur
:
669 vdur
["status"] = "ERROR"
670 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
672 vdur
["status-detailed"] = str(error_text
)
674 "vdur.{}.status-detailed".format(vdu_index
)
676 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
677 except DbException
as e
:
678 self
.logger
.error("Cannot update vnf. {}".format(e
))
680 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
682 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
683 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
684 :param nsr_desc_RO: nsr descriptor from RO
685 :return: Nothing, LcmException is raised on errors
687 for vnf_index
, db_vnfr
in db_vnfrs
.items():
688 for vnf_RO
in nsr_desc_RO
["vnfs"]:
689 if vnf_RO
["member_vnf_index"] != vnf_index
:
692 if vnf_RO
.get("ip_address"):
693 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
696 elif not db_vnfr
.get("ip-address"):
697 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
698 raise LcmExceptionNoMgmtIP(
699 "ns member_vnf_index '{}' has no IP address".format(
704 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
705 vdur_RO_count_index
= 0
706 if vdur
.get("pdu-type"):
708 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
709 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
711 if vdur
["count-index"] != vdur_RO_count_index
:
712 vdur_RO_count_index
+= 1
714 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
715 if vdur_RO
.get("ip_address"):
716 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
718 vdur
["ip-address"] = None
719 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
720 vdur
["name"] = vdur_RO
.get("vim_name")
721 vdur
["status"] = vdur_RO
.get("status")
722 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
723 for ifacer
in get_iterable(vdur
, "interfaces"):
724 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
725 if ifacer
["name"] == interface_RO
.get("internal_name"):
726 ifacer
["ip-address"] = interface_RO
.get(
729 ifacer
["mac-address"] = interface_RO
.get(
735 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
736 "from VIM info".format(
737 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
740 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
750 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
751 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
752 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
754 vld
["vim-id"] = net_RO
.get("vim_net_id")
755 vld
["name"] = net_RO
.get("vim_name")
756 vld
["status"] = net_RO
.get("status")
757 vld
["status-detailed"] = net_RO
.get("error_msg")
758 vnfr_update
["vld.{}".format(vld_index
)] = vld
762 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
767 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
772 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
777 def _get_ns_config_info(self
, nsr_id
):
779 Generates a mapping between vnf,vdu elements and the N2VC id
780 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
781 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
782 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
783 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
786 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
788 ns_config_info
= {"osm-config-mapping": mapping
}
789 for vca
in vca_deployed_list
:
790 if not vca
["member-vnf-index"]:
792 if not vca
["vdu_id"]:
793 mapping
[vca
["member-vnf-index"]] = vca
["application"]
797 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
799 ] = vca
["application"]
800 return ns_config_info
802 async def _instantiate_ng_ro(
818 def get_vim_account(vim_account_id
):
820 if vim_account_id
in db_vims
:
821 return db_vims
[vim_account_id
]
822 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
823 db_vims
[vim_account_id
] = db_vim
826 # modify target_vld info with instantiation parameters
827 def parse_vld_instantiation_params(
828 target_vim
, target_vld
, vld_params
, target_sdn
830 if vld_params
.get("ip-profile"):
831 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
832 vld_params
["ip-profile"]
834 if vld_params
.get("provider-network"):
835 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
838 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
839 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
843 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
844 # if wim_account_id is specified in vld_params, validate if it is feasible.
845 wim_account_id
, db_wim
= select_feasible_wim_account(
846 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
850 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
851 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
852 # update vld_params with correct WIM account Id
853 vld_params
["wimAccountId"] = wim_account_id
855 target_wim
= "wim:{}".format(wim_account_id
)
856 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
857 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
858 if len(sdn_ports
) > 0:
859 target_vld
["vim_info"][target_wim
] = target_wim_attrs
860 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
863 "Target VLD with WIM data: {:s}".format(str(target_vld
))
866 for param
in ("vim-network-name", "vim-network-id"):
867 if vld_params
.get(param
):
868 if isinstance(vld_params
[param
], dict):
869 for vim
, vim_net
in vld_params
[param
].items():
870 other_target_vim
= "vim:" + vim
872 target_vld
["vim_info"],
873 (other_target_vim
, param
.replace("-", "_")),
876 else: # isinstance str
877 target_vld
["vim_info"][target_vim
][
878 param
.replace("-", "_")
879 ] = vld_params
[param
]
880 if vld_params
.get("common_id"):
881 target_vld
["common_id"] = vld_params
.get("common_id")
883 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
884 def update_ns_vld_target(target
, ns_params
):
885 for vnf_params
in ns_params
.get("vnf", ()):
886 if vnf_params
.get("vimAccountId"):
890 for vnfr
in db_vnfrs
.values()
891 if vnf_params
["member-vnf-index"]
892 == vnfr
["member-vnf-index-ref"]
896 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
899 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
900 target_vld
= find_in_list(
901 get_iterable(vdur
, "interfaces"),
902 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
905 vld_params
= find_in_list(
906 get_iterable(ns_params
, "vld"),
907 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
910 if vnf_params
.get("vimAccountId") not in a_vld
.get(
913 target_vim_network_list
= [
914 v
for _
, v
in a_vld
.get("vim_info").items()
916 target_vim_network_name
= next(
918 item
.get("vim_network_name", "")
919 for item
in target_vim_network_list
924 target
["ns"]["vld"][a_index
].get("vim_info").update(
926 "vim:{}".format(vnf_params
["vimAccountId"]): {
927 "vim_network_name": target_vim_network_name
,
933 for param
in ("vim-network-name", "vim-network-id"):
934 if vld_params
.get(param
) and isinstance(
935 vld_params
[param
], dict
937 for vim
, vim_net
in vld_params
[
940 other_target_vim
= "vim:" + vim
942 target
["ns"]["vld"][a_index
].get(
947 param
.replace("-", "_"),
952 nslcmop_id
= db_nslcmop
["_id"]
954 "name": db_nsr
["name"],
957 "image": deepcopy(db_nsr
["image"]),
958 "flavor": deepcopy(db_nsr
["flavor"]),
959 "action_id": nslcmop_id
,
960 "cloud_init_content": {},
962 for image
in target
["image"]:
963 image
["vim_info"] = {}
964 for flavor
in target
["flavor"]:
965 flavor
["vim_info"] = {}
966 if db_nsr
.get("affinity-or-anti-affinity-group"):
967 target
["affinity-or-anti-affinity-group"] = deepcopy(
968 db_nsr
["affinity-or-anti-affinity-group"]
970 for affinity_or_anti_affinity_group
in target
[
971 "affinity-or-anti-affinity-group"
973 affinity_or_anti_affinity_group
["vim_info"] = {}
975 if db_nslcmop
.get("lcmOperationType") != "instantiate":
976 # get parameters of instantiation:
977 db_nslcmop_instantiate
= self
.db
.get_list(
980 "nsInstanceId": db_nslcmop
["nsInstanceId"],
981 "lcmOperationType": "instantiate",
984 ns_params
= db_nslcmop_instantiate
.get("operationParams")
986 ns_params
= db_nslcmop
.get("operationParams")
987 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
988 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
991 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
992 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
996 "mgmt-network": vld
.get("mgmt-network", False),
997 "type": vld
.get("type"),
1000 "vim_network_name": vld
.get("vim-network-name"),
1001 "vim_account_id": ns_params
["vimAccountId"],
1005 # check if this network needs SDN assist
1006 if vld
.get("pci-interfaces"):
1007 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1008 if vim_config
:= db_vim
.get("config"):
1009 if sdnc_id
:= vim_config
.get("sdn-controller"):
1010 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1011 target_sdn
= "sdn:{}".format(sdnc_id
)
1012 target_vld
["vim_info"][target_sdn
] = {
1014 "target_vim": target_vim
,
1016 "type": vld
.get("type"),
1019 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1020 for nsd_vnf_profile
in nsd_vnf_profiles
:
1021 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1022 if cp
["virtual-link-profile-id"] == vld
["id"]:
1024 "member_vnf:{}.{}".format(
1025 cp
["constituent-cpd-id"][0][
1026 "constituent-base-element-id"
1028 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1030 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1032 # check at nsd descriptor, if there is an ip-profile
1034 nsd_vlp
= find_in_list(
1035 get_virtual_link_profiles(nsd
),
1036 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1041 and nsd_vlp
.get("virtual-link-protocol-data")
1042 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1044 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1048 # update vld_params with instantiation params
1049 vld_instantiation_params
= find_in_list(
1050 get_iterable(ns_params
, "vld"),
1051 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1053 if vld_instantiation_params
:
1054 vld_params
.update(vld_instantiation_params
)
1055 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1056 target
["ns"]["vld"].append(target_vld
)
1057 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1058 update_ns_vld_target(target
, ns_params
)
1060 for vnfr
in db_vnfrs
.values():
1061 vnfd
= find_in_list(
1062 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1064 vnf_params
= find_in_list(
1065 get_iterable(ns_params
, "vnf"),
1066 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1068 target_vnf
= deepcopy(vnfr
)
1069 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1070 for vld
in target_vnf
.get("vld", ()):
1071 # check if connected to a ns.vld, to fill target'
1072 vnf_cp
= find_in_list(
1073 vnfd
.get("int-virtual-link-desc", ()),
1074 lambda cpd
: cpd
.get("id") == vld
["id"],
1077 ns_cp
= "member_vnf:{}.{}".format(
1078 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1080 if cp2target
.get(ns_cp
):
1081 vld
["target"] = cp2target
[ns_cp
]
1084 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1086 # check if this network needs SDN assist
1088 if vld
.get("pci-interfaces"):
1089 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1090 sdnc_id
= db_vim
["config"].get("sdn-controller")
1092 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1093 target_sdn
= "sdn:{}".format(sdnc_id
)
1094 vld
["vim_info"][target_sdn
] = {
1096 "target_vim": target_vim
,
1098 "type": vld
.get("type"),
1101 # check at vnfd descriptor, if there is an ip-profile
1103 vnfd_vlp
= find_in_list(
1104 get_virtual_link_profiles(vnfd
),
1105 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1109 and vnfd_vlp
.get("virtual-link-protocol-data")
1110 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1112 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1115 # update vld_params with instantiation params
1117 vld_instantiation_params
= find_in_list(
1118 get_iterable(vnf_params
, "internal-vld"),
1119 lambda i_vld
: i_vld
["name"] == vld
["id"],
1121 if vld_instantiation_params
:
1122 vld_params
.update(vld_instantiation_params
)
1123 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1126 for vdur
in target_vnf
.get("vdur", ()):
1127 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1128 continue # This vdu must not be created
1129 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1131 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1134 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1135 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1138 and vdu_configuration
.get("config-access")
1139 and vdu_configuration
.get("config-access").get("ssh-access")
1141 vdur
["ssh-keys"] = ssh_keys_all
1142 vdur
["ssh-access-required"] = vdu_configuration
[
1144 ]["ssh-access"]["required"]
1147 and vnf_configuration
.get("config-access")
1148 and vnf_configuration
.get("config-access").get("ssh-access")
1149 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1151 vdur
["ssh-keys"] = ssh_keys_all
1152 vdur
["ssh-access-required"] = vnf_configuration
[
1154 ]["ssh-access"]["required"]
1155 elif ssh_keys_instantiation
and find_in_list(
1156 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1158 vdur
["ssh-keys"] = ssh_keys_instantiation
1160 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1162 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1164 if vdud
.get("cloud-init-file"):
1165 vdur
["cloud-init"] = "{}:file:{}".format(
1166 vnfd
["_id"], vdud
.get("cloud-init-file")
1168 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1169 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1170 base_folder
= vnfd
["_admin"]["storage"]
1171 if base_folder
["pkg-dir"]:
1172 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1173 base_folder
["folder"],
1174 base_folder
["pkg-dir"],
1175 vdud
.get("cloud-init-file"),
1178 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1179 base_folder
["folder"],
1180 vdud
.get("cloud-init-file"),
1182 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1183 target
["cloud_init_content"][
1186 elif vdud
.get("cloud-init"):
1187 vdur
["cloud-init"] = "{}:vdu:{}".format(
1188 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1190 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1191 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1194 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1195 deploy_params_vdu
= self
._format
_additional
_params
(
1196 vdur
.get("additionalParams") or {}
1198 deploy_params_vdu
["OSM"] = get_osm_params(
1199 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1201 vdur
["additionalParams"] = deploy_params_vdu
1204 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1205 if target_vim
not in ns_flavor
["vim_info"]:
1206 ns_flavor
["vim_info"][target_vim
] = {}
1209 # in case alternative images are provided we must check if they should be applied
1210 # for the vim_type, modify the vim_type taking into account
1211 ns_image_id
= int(vdur
["ns-image-id"])
1212 if vdur
.get("alt-image-ids"):
1213 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1214 vim_type
= db_vim
["vim_type"]
1215 for alt_image_id
in vdur
.get("alt-image-ids"):
1216 ns_alt_image
= target
["image"][int(alt_image_id
)]
1217 if vim_type
== ns_alt_image
.get("vim-type"):
1218 # must use alternative image
1220 "use alternative image id: {}".format(alt_image_id
)
1222 ns_image_id
= alt_image_id
1223 vdur
["ns-image-id"] = ns_image_id
1225 ns_image
= target
["image"][int(ns_image_id
)]
1226 if target_vim
not in ns_image
["vim_info"]:
1227 ns_image
["vim_info"][target_vim
] = {}
1230 if vdur
.get("affinity-or-anti-affinity-group-id"):
1231 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1232 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1233 if target_vim
not in ns_ags
["vim_info"]:
1234 ns_ags
["vim_info"][target_vim
] = {}
1236 vdur
["vim_info"] = {target_vim
: {}}
1237 # instantiation parameters
1239 vdu_instantiation_params
= find_in_list(
1240 get_iterable(vnf_params
, "vdu"),
1241 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1243 if vdu_instantiation_params
:
1244 # Parse the vdu_volumes from the instantiation params
1245 vdu_volumes
= get_volumes_from_instantiation_params(
1246 vdu_instantiation_params
, vdud
1248 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1249 vdur
["additionalParams"]["OSM"][
1251 ] = vdu_instantiation_params
.get("vim-flavor-id")
1252 vdur_list
.append(vdur
)
1253 target_vnf
["vdur"] = vdur_list
1254 target
["vnf"].append(target_vnf
)
1256 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1257 desc
= await self
.RO
.deploy(nsr_id
, target
)
1258 self
.logger
.debug("RO return > {}".format(desc
))
1259 action_id
= desc
["action_id"]
1260 await self
._wait
_ng
_ro
(
1267 operation
="instantiation",
1272 "_admin.deployed.RO.operational-status": "running",
1273 "detailed-status": " ".join(stage
),
1275 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1276 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1277 self
._write
_op
_status
(nslcmop_id
, stage
)
1279 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1283 async def _wait_ng_ro(
1293 detailed_status_old
= None
1295 start_time
= start_time
or time()
1296 while time() <= start_time
+ timeout
:
1297 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1298 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1299 if desc_status
["status"] == "FAILED":
1300 raise NgRoException(desc_status
["details"])
1301 elif desc_status
["status"] == "BUILD":
1303 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1304 elif desc_status
["status"] == "DONE":
1306 stage
[2] = "Deployed at VIM"
1309 assert False, "ROclient.check_ns_status returns unknown {}".format(
1310 desc_status
["status"]
1312 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1313 detailed_status_old
= stage
[2]
1314 db_nsr_update
["detailed-status"] = " ".join(stage
)
1315 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1316 self
._write
_op
_status
(nslcmop_id
, stage
)
1317 await asyncio
.sleep(15)
1318 else: # timeout_ns_deploy
1319 raise NgRoException("Timeout waiting ns to deploy")
1321 async def _terminate_ng_ro(
1322 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1327 start_deploy
= time()
1334 "action_id": nslcmop_id
,
1336 desc
= await self
.RO
.deploy(nsr_id
, target
)
1337 action_id
= desc
["action_id"]
1338 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1341 + "ns terminate action at RO. action_id={}".format(action_id
)
1345 delete_timeout
= 20 * 60 # 20 minutes
1346 await self
._wait
_ng
_ro
(
1353 operation
="termination",
1355 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1357 await self
.RO
.delete(nsr_id
)
1358 except NgRoException
as e
:
1359 if e
.http_code
== 404: # not found
1360 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1361 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1363 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1365 elif e
.http_code
== 409: # conflict
1366 failed_detail
.append("delete conflict: {}".format(e
))
1369 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1372 failed_detail
.append("delete error: {}".format(e
))
1375 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1377 except Exception as e
:
1378 failed_detail
.append("delete error: {}".format(e
))
1380 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1384 stage
[2] = "Error deleting from VIM"
1386 stage
[2] = "Deleted from VIM"
1387 db_nsr_update
["detailed-status"] = " ".join(stage
)
1388 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1389 self
._write
_op
_status
(nslcmop_id
, stage
)
1392 raise LcmException("; ".join(failed_detail
))
1395 async def instantiate_RO(
1409 :param logging_text: preffix text to use at logging
1410 :param nsr_id: nsr identity
1411 :param nsd: database content of ns descriptor
1412 :param db_nsr: database content of ns record
1413 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1415 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1416 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1417 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1418 :return: None or exception
1421 start_deploy
= time()
1422 ns_params
= db_nslcmop
.get("operationParams")
1423 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1424 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1426 timeout_ns_deploy
= self
.timeout
.ns_deploy
1428 # Check for and optionally request placement optimization. Database will be updated if placement activated
1429 stage
[2] = "Waiting for Placement."
1430 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1431 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1432 for vnfr
in db_vnfrs
.values():
1433 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1436 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1438 return await self
._instantiate
_ng
_ro
(
1451 except Exception as e
:
1452 stage
[2] = "ERROR deploying at VIM"
1453 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1455 "Error deploying at VIM {}".format(e
),
1456 exc_info
=not isinstance(
1459 ROclient
.ROClientException
,
1468 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1470 Wait for kdu to be up, get ip address
1471 :param logging_text: prefix use for logging
1475 :return: IP address, K8s services
1478 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1481 while nb_tries
< 360:
1482 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1486 for x
in get_iterable(db_vnfr
, "kdur")
1487 if x
.get("kdu-name") == kdu_name
1493 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1495 if kdur
.get("status"):
1496 if kdur
["status"] in ("READY", "ENABLED"):
1497 return kdur
.get("ip-address"), kdur
.get("services")
1500 "target KDU={} is in error state".format(kdu_name
)
1503 await asyncio
.sleep(10)
1505 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1507 async def wait_vm_up_insert_key_ro(
1508 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1511 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1512 :param logging_text: prefix use for logging
1517 :param pub_key: public ssh key to inject, None to skip
1518 :param user: user to apply the public ssh key
1522 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1524 target_vdu_id
= None
1529 if ro_retries
>= 360: # 1 hour
1531 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1534 await asyncio
.sleep(10)
1537 if not target_vdu_id
:
1538 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1540 if not vdu_id
: # for the VNF case
1541 if db_vnfr
.get("status") == "ERROR":
1543 "Cannot inject ssh-key because target VNF is in error state"
1545 ip_address
= db_vnfr
.get("ip-address")
1551 for x
in get_iterable(db_vnfr
, "vdur")
1552 if x
.get("ip-address") == ip_address
1560 for x
in get_iterable(db_vnfr
, "vdur")
1561 if x
.get("vdu-id-ref") == vdu_id
1562 and x
.get("count-index") == vdu_index
1568 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1569 ): # If only one, this should be the target vdu
1570 vdur
= db_vnfr
["vdur"][0]
1573 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1574 vnfr_id
, vdu_id
, vdu_index
1577 # New generation RO stores information at "vim_info"
1580 if vdur
.get("vim_info"):
1582 t
for t
in vdur
["vim_info"]
1583 ) # there should be only one key
1584 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1586 vdur
.get("pdu-type")
1587 or vdur
.get("status") == "ACTIVE"
1588 or ng_ro_status
== "ACTIVE"
1590 ip_address
= vdur
.get("ip-address")
1593 target_vdu_id
= vdur
["vdu-id-ref"]
1594 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1596 "Cannot inject ssh-key because target VM is in error state"
1599 if not target_vdu_id
:
1602 # inject public key into machine
1603 if pub_key
and user
:
1604 self
.logger
.debug(logging_text
+ "Inserting RO key")
1605 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1606 if vdur
.get("pdu-type"):
1607 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1612 "action": "inject_ssh_key",
1616 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1618 desc
= await self
.RO
.deploy(nsr_id
, target
)
1619 action_id
= desc
["action_id"]
1620 await self
._wait
_ng
_ro
(
1621 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1624 except NgRoException
as e
:
1626 "Reaching max tries injecting key. Error: {}".format(e
)
1633 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1635 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1637 my_vca
= vca_deployed_list
[vca_index
]
1638 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1639 # vdu or kdu: no dependencies
1643 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1644 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1645 configuration_status_list
= db_nsr
["configurationStatus"]
1646 for index
, vca_deployed
in enumerate(configuration_status_list
):
1647 if index
== vca_index
:
1650 if not my_vca
.get("member-vnf-index") or (
1651 vca_deployed
.get("member-vnf-index")
1652 == my_vca
.get("member-vnf-index")
1654 internal_status
= configuration_status_list
[index
].get("status")
1655 if internal_status
== "READY":
1657 elif internal_status
== "BROKEN":
1659 "Configuration aborted because dependent charm/s has failed"
1664 # no dependencies, return
1666 await asyncio
.sleep(10)
1669 raise LcmException("Configuration aborted because dependent charm/s timeout")
1671 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1674 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1676 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1677 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1680 async def instantiate_N2VC(
1698 ee_config_descriptor
,
1700 nsr_id
= db_nsr
["_id"]
1701 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1702 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1703 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1704 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1706 "collection": "nsrs",
1707 "filter": {"_id": nsr_id
},
1708 "path": db_update_entry
,
1713 element_under_configuration
= nsr_id
1717 vnfr_id
= db_vnfr
["_id"]
1718 osm_config
["osm"]["vnf_id"] = vnfr_id
1720 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1722 if vca_type
== "native_charm":
1725 index_number
= vdu_index
or 0
1728 element_type
= "VNF"
1729 element_under_configuration
= vnfr_id
1730 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1732 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1733 element_type
= "VDU"
1734 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1735 osm_config
["osm"]["vdu_id"] = vdu_id
1737 namespace
+= ".{}".format(kdu_name
)
1738 element_type
= "KDU"
1739 element_under_configuration
= kdu_name
1740 osm_config
["osm"]["kdu_name"] = kdu_name
1743 if base_folder
["pkg-dir"]:
1744 artifact_path
= "{}/{}/{}/{}".format(
1745 base_folder
["folder"],
1746 base_folder
["pkg-dir"],
1749 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1754 artifact_path
= "{}/Scripts/{}/{}/".format(
1755 base_folder
["folder"],
1758 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1763 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1765 # get initial_config_primitive_list that applies to this element
1766 initial_config_primitive_list
= config_descriptor
.get(
1767 "initial-config-primitive"
1771 "Initial config primitive list > {}".format(
1772 initial_config_primitive_list
1776 # add config if not present for NS charm
1777 ee_descriptor_id
= ee_config_descriptor
.get("id")
1778 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1779 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1780 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1784 "Initial config primitive list #2 > {}".format(
1785 initial_config_primitive_list
1788 # n2vc_redesign STEP 3.1
1789 # find old ee_id if exists
1790 ee_id
= vca_deployed
.get("ee_id")
1792 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1793 # create or register execution environment in VCA
1794 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1795 self
._write
_configuration
_status
(
1797 vca_index
=vca_index
,
1799 element_under_configuration
=element_under_configuration
,
1800 element_type
=element_type
,
1803 step
= "create execution environment"
1804 self
.logger
.debug(logging_text
+ step
)
1808 if vca_type
== "k8s_proxy_charm":
1809 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1810 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1811 namespace
=namespace
,
1812 artifact_path
=artifact_path
,
1816 elif vca_type
== "helm" or vca_type
== "helm-v3":
1817 ee_id
, credentials
= await self
.vca_map
[
1819 ].create_execution_environment(
1820 namespace
=namespace
,
1824 artifact_path
=artifact_path
,
1825 chart_model
=vca_name
,
1829 ee_id
, credentials
= await self
.vca_map
[
1831 ].create_execution_environment(
1832 namespace
=namespace
,
1838 elif vca_type
== "native_charm":
1839 step
= "Waiting to VM being up and getting IP address"
1840 self
.logger
.debug(logging_text
+ step
)
1841 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1850 credentials
= {"hostname": rw_mgmt_ip
}
1852 username
= deep_get(
1853 config_descriptor
, ("config-access", "ssh-access", "default-user")
1855 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1856 # merged. Meanwhile let's get username from initial-config-primitive
1857 if not username
and initial_config_primitive_list
:
1858 for config_primitive
in initial_config_primitive_list
:
1859 for param
in config_primitive
.get("parameter", ()):
1860 if param
["name"] == "ssh-username":
1861 username
= param
["value"]
1865 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1866 "'config-access.ssh-access.default-user'"
1868 credentials
["username"] = username
1869 # n2vc_redesign STEP 3.2
1871 self
._write
_configuration
_status
(
1873 vca_index
=vca_index
,
1874 status
="REGISTERING",
1875 element_under_configuration
=element_under_configuration
,
1876 element_type
=element_type
,
1879 step
= "register execution environment {}".format(credentials
)
1880 self
.logger
.debug(logging_text
+ step
)
1881 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1882 credentials
=credentials
,
1883 namespace
=namespace
,
1888 # for compatibility with MON/POL modules, the need model and application name at database
1889 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1890 ee_id_parts
= ee_id
.split(".")
1891 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1892 if len(ee_id_parts
) >= 2:
1893 model_name
= ee_id_parts
[0]
1894 application_name
= ee_id_parts
[1]
1895 db_nsr_update
[db_update_entry
+ "model"] = model_name
1896 db_nsr_update
[db_update_entry
+ "application"] = application_name
1898 # n2vc_redesign STEP 3.3
1899 step
= "Install configuration Software"
1901 self
._write
_configuration
_status
(
1903 vca_index
=vca_index
,
1904 status
="INSTALLING SW",
1905 element_under_configuration
=element_under_configuration
,
1906 element_type
=element_type
,
1907 other_update
=db_nsr_update
,
1910 # TODO check if already done
1911 self
.logger
.debug(logging_text
+ step
)
1913 if vca_type
== "native_charm":
1914 config_primitive
= next(
1915 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1918 if config_primitive
:
1919 config
= self
._map
_primitive
_params
(
1920 config_primitive
, {}, deploy_params
1923 if vca_type
== "lxc_proxy_charm":
1924 if element_type
== "NS":
1925 num_units
= db_nsr
.get("config-units") or 1
1926 elif element_type
== "VNF":
1927 num_units
= db_vnfr
.get("config-units") or 1
1928 elif element_type
== "VDU":
1929 for v
in db_vnfr
["vdur"]:
1930 if vdu_id
== v
["vdu-id-ref"]:
1931 num_units
= v
.get("config-units") or 1
1933 if vca_type
!= "k8s_proxy_charm":
1934 await self
.vca_map
[vca_type
].install_configuration_sw(
1936 artifact_path
=artifact_path
,
1939 num_units
=num_units
,
1944 # write in db flag of configuration_sw already installed
1946 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1949 # add relations for this VCA (wait for other peers related with this VCA)
1950 is_relation_added
= await self
._add
_vca
_relations
(
1951 logging_text
=logging_text
,
1954 vca_index
=vca_index
,
1957 if not is_relation_added
:
1958 raise LcmException("Relations could not be added to VCA.")
1960 # if SSH access is required, then get execution environment SSH public
1961 # if native charm we have waited already to VM be UP
1962 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1965 # self.logger.debug("get ssh key block")
1967 config_descriptor
, ("config-access", "ssh-access", "required")
1969 # self.logger.debug("ssh key needed")
1970 # Needed to inject a ssh key
1973 ("config-access", "ssh-access", "default-user"),
1975 step
= "Install configuration Software, getting public ssh key"
1976 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1977 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1980 step
= "Insert public key into VM user={} ssh_key={}".format(
1984 # self.logger.debug("no need to get ssh key")
1985 step
= "Waiting to VM being up and getting IP address"
1986 self
.logger
.debug(logging_text
+ step
)
1988 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1991 # n2vc_redesign STEP 5.1
1992 # wait for RO (ip-address) Insert pub_key into VM
1995 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1996 logging_text
, nsr_id
, vnfr_id
, kdu_name
1998 vnfd
= self
.db
.get_one(
2000 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2002 kdu
= get_kdu(vnfd
, kdu_name
)
2004 service
["name"] for service
in get_kdu_services(kdu
)
2006 exposed_services
= []
2007 for service
in services
:
2008 if any(s
in service
["name"] for s
in kdu_services
):
2009 exposed_services
.append(service
)
2010 await self
.vca_map
[vca_type
].exec_primitive(
2012 primitive_name
="config",
2014 "osm-config": json
.dumps(
2016 k8s
={"services": exposed_services
}
2023 # This verification is needed in order to avoid trying to add a public key
2024 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2025 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2026 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2028 elif db_vnfr
.get("vdur"):
2029 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2039 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2041 # store rw_mgmt_ip in deploy params for later replacement
2042 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2044 # n2vc_redesign STEP 6 Execute initial config primitive
2045 step
= "execute initial config primitive"
2047 # wait for dependent primitives execution (NS -> VNF -> VDU)
2048 if initial_config_primitive_list
:
2049 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2051 # stage, in function of element type: vdu, kdu, vnf or ns
2052 my_vca
= vca_deployed_list
[vca_index
]
2053 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2055 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2056 elif my_vca
.get("member-vnf-index"):
2058 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2061 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2063 self
._write
_configuration
_status
(
2064 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2067 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2069 check_if_terminated_needed
= True
2070 for initial_config_primitive
in initial_config_primitive_list
:
2071 # adding information on the vca_deployed if it is a NS execution environment
2072 if not vca_deployed
["member-vnf-index"]:
2073 deploy_params
["ns_config_info"] = json
.dumps(
2074 self
._get
_ns
_config
_info
(nsr_id
)
2076 # TODO check if already done
2077 primitive_params_
= self
._map
_primitive
_params
(
2078 initial_config_primitive
, {}, deploy_params
2081 step
= "execute primitive '{}' params '{}'".format(
2082 initial_config_primitive
["name"], primitive_params_
2084 self
.logger
.debug(logging_text
+ step
)
2085 await self
.vca_map
[vca_type
].exec_primitive(
2087 primitive_name
=initial_config_primitive
["name"],
2088 params_dict
=primitive_params_
,
2093 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2094 if check_if_terminated_needed
:
2095 if config_descriptor
.get("terminate-config-primitive"):
2097 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2099 check_if_terminated_needed
= False
2101 # TODO register in database that primitive is done
2103 # STEP 7 Configure metrics
2104 if vca_type
== "helm" or vca_type
== "helm-v3":
2105 # TODO: review for those cases where the helm chart is a reference and
2106 # is not part of the NF package
2107 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2109 artifact_path
=artifact_path
,
2110 ee_config_descriptor
=ee_config_descriptor
,
2113 target_ip
=rw_mgmt_ip
,
2114 element_type
=element_type
,
2115 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2117 vdu_index
=vdu_index
,
2119 kdu_index
=kdu_index
,
2125 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2128 for job
in prometheus_jobs
:
2131 {"job_name": job
["job_name"]},
2134 fail_on_empty
=False,
2137 step
= "instantiated at VCA"
2138 self
.logger
.debug(logging_text
+ step
)
2140 self
._write
_configuration
_status
(
2141 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2144 except Exception as e
: # TODO not use Exception but N2VC exception
2145 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2147 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2150 "Exception while {} : {}".format(step
, e
), exc_info
=True
2152 self
._write
_configuration
_status
(
2153 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2155 raise LcmException("{}. {}".format(step
, e
)) from e
2157 def _write_ns_status(
2161 current_operation
: str,
2162 current_operation_id
: str,
2163 error_description
: str = None,
2164 error_detail
: str = None,
2165 other_update
: dict = None,
2168 Update db_nsr fields.
2171 :param current_operation:
2172 :param current_operation_id:
2173 :param error_description:
2174 :param error_detail:
2175 :param other_update: Other required changes at database if provided, will be cleared
2179 db_dict
= other_update
or {}
2182 ] = current_operation_id
# for backward compatibility
2183 db_dict
["_admin.current-operation"] = current_operation_id
2184 db_dict
["_admin.operation-type"] = (
2185 current_operation
if current_operation
!= "IDLE" else None
2187 db_dict
["currentOperation"] = current_operation
2188 db_dict
["currentOperationID"] = current_operation_id
2189 db_dict
["errorDescription"] = error_description
2190 db_dict
["errorDetail"] = error_detail
2193 db_dict
["nsState"] = ns_state
2194 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2195 except DbException
as e
:
2196 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2198 def _write_op_status(
2202 error_message
: str = None,
2203 queuePosition
: int = 0,
2204 operation_state
: str = None,
2205 other_update
: dict = None,
2208 db_dict
= other_update
or {}
2209 db_dict
["queuePosition"] = queuePosition
2210 if isinstance(stage
, list):
2211 db_dict
["stage"] = stage
[0]
2212 db_dict
["detailed-status"] = " ".join(stage
)
2213 elif stage
is not None:
2214 db_dict
["stage"] = str(stage
)
2216 if error_message
is not None:
2217 db_dict
["errorMessage"] = error_message
2218 if operation_state
is not None:
2219 db_dict
["operationState"] = operation_state
2220 db_dict
["statusEnteredTime"] = time()
2221 self
.update_db_2("nslcmops", op_id
, db_dict
)
2222 except DbException
as e
:
2224 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2227 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2229 nsr_id
= db_nsr
["_id"]
2230 # configurationStatus
2231 config_status
= db_nsr
.get("configurationStatus")
2234 "configurationStatus.{}.status".format(index
): status
2235 for index
, v
in enumerate(config_status
)
2239 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2241 except DbException
as e
:
2243 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2246 def _write_configuration_status(
2251 element_under_configuration
: str = None,
2252 element_type
: str = None,
2253 other_update
: dict = None,
2255 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2256 # .format(vca_index, status))
2259 db_path
= "configurationStatus.{}.".format(vca_index
)
2260 db_dict
= other_update
or {}
2262 db_dict
[db_path
+ "status"] = status
2263 if element_under_configuration
:
2265 db_path
+ "elementUnderConfiguration"
2266 ] = element_under_configuration
2268 db_dict
[db_path
+ "elementType"] = element_type
2269 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2270 except DbException
as e
:
2272 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2273 status
, nsr_id
, vca_index
, e
2277 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2279 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2280 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2281 Database is used because the result can be obtained from a different LCM worker in case of HA.
2282 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2283 :param db_nslcmop: database content of nslcmop
2284 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2285 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2286 computed 'vim-account-id'
2289 nslcmop_id
= db_nslcmop
["_id"]
2290 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2291 if placement_engine
== "PLA":
2293 logging_text
+ "Invoke and wait for placement optimization"
2295 await self
.msg
.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id
})
2296 db_poll_interval
= 5
2297 wait
= db_poll_interval
* 10
2299 while not pla_result
and wait
>= 0:
2300 await asyncio
.sleep(db_poll_interval
)
2301 wait
-= db_poll_interval
2302 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2303 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2307 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2310 for pla_vnf
in pla_result
["vnf"]:
2311 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2312 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2317 {"_id": vnfr
["_id"]},
2318 {"vim-account-id": pla_vnf
["vimAccountId"]},
2321 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2324 def _gather_vnfr_healing_alerts(self
, vnfr
, vnfd
):
2326 nsr_id
= vnfr
["nsr-id-ref"]
2327 df
= vnfd
.get("df", [{}])[0]
2328 # Checking for auto-healing configuration
2329 if "healing-aspect" in df
:
2330 healing_aspects
= df
["healing-aspect"]
2331 for healing
in healing_aspects
:
2332 for healing_policy
in healing
.get("healing-policy", ()):
2333 vdu_id
= healing_policy
["vdu-id"]
2335 (vdur
for vdur
in vnfr
["vdur"] if vdu_id
== vdur
["vdu-id-ref"]),
2340 metric_name
= "vm_status"
2341 vdu_name
= vdur
.get("name")
2342 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2344 name
= f
"healing_{uuid}"
2345 action
= healing_policy
2346 # action_on_recovery = healing.get("action-on-recovery")
2347 # cooldown_time = healing.get("cooldown-time")
2348 # day1 = healing.get("day1")
2352 "metric": metric_name
,
2355 "vnf_member_index": vnf_member_index
,
2356 "vdu_name": vdu_name
,
2358 "alarm_status": "ok",
2359 "action_type": "healing",
2362 alerts
.append(alert
)
2365 def _gather_vnfr_scaling_alerts(self
, vnfr
, vnfd
):
2367 nsr_id
= vnfr
["nsr-id-ref"]
2368 df
= vnfd
.get("df", [{}])[0]
2369 # Checking for auto-scaling configuration
2370 if "scaling-aspect" in df
:
2371 rel_operation_types
= {
2379 scaling_aspects
= df
["scaling-aspect"]
2380 all_vnfd_monitoring_params
= {}
2381 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
2382 for mp
in ivld
.get("monitoring-parameters", ()):
2383 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2384 for vdu
in vnfd
.get("vdu", ()):
2385 for mp
in vdu
.get("monitoring-parameter", ()):
2386 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2387 for df
in vnfd
.get("df", ()):
2388 for mp
in df
.get("monitoring-parameter", ()):
2389 all_vnfd_monitoring_params
[mp
.get("id")] = mp
2390 for scaling_aspect
in scaling_aspects
:
2391 scaling_group_name
= scaling_aspect
.get("name", "")
2392 # Get monitored VDUs
2393 all_monitored_vdus
= set()
2394 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get(
2397 for vdu_delta
in delta
.get("vdu-delta", ()):
2398 all_monitored_vdus
.add(vdu_delta
.get("id"))
2399 monitored_vdurs
= list(
2401 lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
,
2405 if not monitored_vdurs
:
2407 "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
2410 for scaling_policy
in scaling_aspect
.get("scaling-policy", ()):
2411 if scaling_policy
["scaling-type"] != "automatic":
2413 threshold_time
= scaling_policy
.get("threshold-time", "1")
2414 cooldown_time
= scaling_policy
.get("cooldown-time", "0")
2415 for scaling_criteria
in scaling_policy
["scaling-criteria"]:
2416 monitoring_param_ref
= scaling_criteria
.get(
2417 "vnf-monitoring-param-ref"
2419 vnf_monitoring_param
= all_vnfd_monitoring_params
[
2420 monitoring_param_ref
2422 for vdur
in monitored_vdurs
:
2423 vdu_id
= vdur
["vdu-id-ref"]
2424 metric_name
= vnf_monitoring_param
.get("performance-metric")
2425 metric_name
= f
"osm_{metric_name}"
2426 vnf_member_index
= vnfr
["member-vnf-index-ref"]
2427 scalein_threshold
= scaling_criteria
.get(
2428 "scale-in-threshold"
2430 scaleout_threshold
= scaling_criteria
.get(
2431 "scale-out-threshold"
2433 # Looking for min/max-number-of-instances
2434 instances_min_number
= 1
2435 instances_max_number
= 1
2436 vdu_profile
= df
["vdu-profile"]
2439 item
for item
in vdu_profile
if item
["id"] == vdu_id
2441 instances_min_number
= profile
.get(
2442 "min-number-of-instances", 1
2444 instances_max_number
= profile
.get(
2445 "max-number-of-instances", 1
2448 if scalein_threshold
:
2450 name
= f
"scalein_{uuid}"
2451 operation
= scaling_criteria
[
2452 "scale-in-relational-operation"
2454 rel_operator
= rel_operation_types
.get(operation
, "<=")
2455 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2456 expression
= f
"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
2459 "vnf_member_index": vnf_member_index
,
2465 "for": str(threshold_time
) + "m",
2468 action
= scaling_policy
2470 "scaling-group": scaling_group_name
,
2471 "cooldown-time": cooldown_time
,
2476 "metric": metric_name
,
2479 "vnf_member_index": vnf_member_index
,
2482 "alarm_status": "ok",
2483 "action_type": "scale_in",
2485 "prometheus_config": prom_cfg
,
2487 alerts
.append(alert
)
2489 if scaleout_threshold
:
2491 name
= f
"scaleout_{uuid}"
2492 operation
= scaling_criteria
[
2493 "scale-out-relational-operation"
2495 rel_operator
= rel_operation_types
.get(operation
, "<=")
2496 metric_selector
= f
'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
2497 expression
= f
"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
2500 "vnf_member_index": vnf_member_index
,
2506 "for": str(threshold_time
) + "m",
2509 action
= scaling_policy
2511 "scaling-group": scaling_group_name
,
2512 "cooldown-time": cooldown_time
,
2517 "metric": metric_name
,
2520 "vnf_member_index": vnf_member_index
,
2523 "alarm_status": "ok",
2524 "action_type": "scale_out",
2526 "prometheus_config": prom_cfg
,
2528 alerts
.append(alert
)
2531 def update_nsrs_with_pla_result(self
, params
):
2533 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2535 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2537 except Exception as e
:
2538 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2540 async def instantiate(self
, nsr_id
, nslcmop_id
):
2543 :param nsr_id: ns instance to deploy
2544 :param nslcmop_id: operation to run
2548 # Try to lock HA task here
2549 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2550 if not task_is_locked_by_me
:
2552 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2556 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2557 self
.logger
.debug(logging_text
+ "Enter")
2559 # get all needed from database
2561 # database nsrs record
2564 # database nslcmops record
2567 # update operation on nsrs
2569 # update operation on nslcmops
2570 db_nslcmop_update
= {}
2572 timeout_ns_deploy
= self
.timeout
.ns_deploy
2574 nslcmop_operation_state
= None
2575 db_vnfrs
= {} # vnf's info indexed by member-index
2577 tasks_dict_info
= {} # from task to info text
2581 "Stage 1/5: preparation of the environment.",
2582 "Waiting for previous operations to terminate.",
2585 # ^ stage, step, VIM progress
2587 # wait for any previous tasks in process
2588 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2590 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2591 stage
[1] = "Reading from database."
2592 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2593 db_nsr_update
["detailed-status"] = "creating"
2594 db_nsr_update
["operational-status"] = "init"
2595 self
._write
_ns
_status
(
2597 ns_state
="BUILDING",
2598 current_operation
="INSTANTIATING",
2599 current_operation_id
=nslcmop_id
,
2600 other_update
=db_nsr_update
,
2602 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2604 # read from db: operation
2605 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2606 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2607 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2608 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2609 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2611 ns_params
= db_nslcmop
.get("operationParams")
2612 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2613 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2616 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2617 self
.logger
.debug(logging_text
+ stage
[1])
2618 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2619 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2620 self
.logger
.debug(logging_text
+ stage
[1])
2621 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2622 self
.fs
.sync(db_nsr
["nsd-id"])
2624 # nsr_name = db_nsr["name"] # TODO short-name??
2626 # read from db: vnf's of this ns
2627 stage
[1] = "Getting vnfrs from db."
2628 self
.logger
.debug(logging_text
+ stage
[1])
2629 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2631 # read from db: vnfd's for every vnf
2632 db_vnfds
= [] # every vnfd data
2634 # for each vnf in ns, read vnfd
2635 for vnfr
in db_vnfrs_list
:
2636 if vnfr
.get("kdur"):
2638 for kdur
in vnfr
["kdur"]:
2639 if kdur
.get("additionalParams"):
2640 kdur
["additionalParams"] = json
.loads(
2641 kdur
["additionalParams"]
2643 kdur_list
.append(kdur
)
2644 vnfr
["kdur"] = kdur_list
2646 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2647 vnfd_id
= vnfr
["vnfd-id"]
2648 vnfd_ref
= vnfr
["vnfd-ref"]
2649 self
.fs
.sync(vnfd_id
)
2651 # if we haven't this vnfd, read it from db
2652 if vnfd_id
not in db_vnfds
:
2654 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2657 self
.logger
.debug(logging_text
+ stage
[1])
2658 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2661 db_vnfds
.append(vnfd
)
2663 # Get or generates the _admin.deployed.VCA list
2664 vca_deployed_list
= None
2665 if db_nsr
["_admin"].get("deployed"):
2666 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2667 if vca_deployed_list
is None:
2668 vca_deployed_list
= []
2669 configuration_status_list
= []
2670 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2671 db_nsr_update
["configurationStatus"] = configuration_status_list
2672 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2673 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2674 elif isinstance(vca_deployed_list
, dict):
2675 # maintain backward compatibility. Change a dict to list at database
2676 vca_deployed_list
= list(vca_deployed_list
.values())
2677 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2678 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2681 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2683 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2684 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2686 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2687 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2688 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2690 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2693 # n2vc_redesign STEP 2 Deploy Network Scenario
2694 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2695 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2697 stage
[1] = "Deploying KDUs."
2698 # self.logger.debug(logging_text + "Before deploy_kdus")
2699 # Call to deploy_kdus in case exists the "vdu:kdu" param
2700 await self
.deploy_kdus(
2701 logging_text
=logging_text
,
2703 nslcmop_id
=nslcmop_id
,
2706 task_instantiation_info
=tasks_dict_info
,
2709 stage
[1] = "Getting VCA public key."
2710 # n2vc_redesign STEP 1 Get VCA public ssh-key
2711 # feature 1429. Add n2vc public key to needed VMs
2712 n2vc_key
= self
.n2vc
.get_public_key()
2713 n2vc_key_list
= [n2vc_key
]
2714 if self
.vca_config
.public_key
:
2715 n2vc_key_list
.append(self
.vca_config
.public_key
)
2717 stage
[1] = "Deploying NS at VIM."
2718 task_ro
= asyncio
.ensure_future(
2719 self
.instantiate_RO(
2720 logging_text
=logging_text
,
2724 db_nslcmop
=db_nslcmop
,
2727 n2vc_key_list
=n2vc_key_list
,
2731 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2732 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2734 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2735 stage
[1] = "Deploying Execution Environments."
2736 self
.logger
.debug(logging_text
+ stage
[1])
2738 # create namespace and certificate if any helm based EE is present in the NS
2739 if check_helm_ee_in_ns(db_vnfds
):
2740 # TODO: create EE namespace
2741 # create TLS certificates
2742 await self
.vca_map
["helm-v3"].create_tls_certificate(
2743 secret_name
="ee-tls-{}".format(nsr_id
),
2746 usage
="server auth",
2749 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2750 for vnf_profile
in get_vnf_profiles(nsd
):
2751 vnfd_id
= vnf_profile
["vnfd-id"]
2752 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2753 member_vnf_index
= str(vnf_profile
["id"])
2754 db_vnfr
= db_vnfrs
[member_vnf_index
]
2755 base_folder
= vnfd
["_admin"]["storage"]
2762 # Get additional parameters
2763 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2764 if db_vnfr
.get("additionalParamsForVnf"):
2765 deploy_params
.update(
2766 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2769 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2770 if descriptor_config
:
2772 logging_text
=logging_text
2773 + "member_vnf_index={} ".format(member_vnf_index
),
2776 nslcmop_id
=nslcmop_id
,
2782 member_vnf_index
=member_vnf_index
,
2783 vdu_index
=vdu_index
,
2784 kdu_index
=kdu_index
,
2786 deploy_params
=deploy_params
,
2787 descriptor_config
=descriptor_config
,
2788 base_folder
=base_folder
,
2789 task_instantiation_info
=tasks_dict_info
,
2793 # Deploy charms for each VDU that supports one.
2794 for vdud
in get_vdu_list(vnfd
):
2796 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2797 vdur
= find_in_list(
2798 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2801 if vdur
.get("additionalParams"):
2802 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2804 deploy_params_vdu
= deploy_params
2805 deploy_params_vdu
["OSM"] = get_osm_params(
2806 db_vnfr
, vdu_id
, vdu_count_index
=0
2808 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2810 self
.logger
.debug("VDUD > {}".format(vdud
))
2812 "Descriptor config > {}".format(descriptor_config
)
2814 if descriptor_config
:
2818 for vdu_index
in range(vdud_count
):
2819 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2821 logging_text
=logging_text
2822 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2823 member_vnf_index
, vdu_id
, vdu_index
2827 nslcmop_id
=nslcmop_id
,
2833 kdu_index
=kdu_index
,
2834 member_vnf_index
=member_vnf_index
,
2835 vdu_index
=vdu_index
,
2837 deploy_params
=deploy_params_vdu
,
2838 descriptor_config
=descriptor_config
,
2839 base_folder
=base_folder
,
2840 task_instantiation_info
=tasks_dict_info
,
2843 for kdud
in get_kdu_list(vnfd
):
2844 kdu_name
= kdud
["name"]
2845 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2846 if descriptor_config
:
2850 kdu_index
, kdur
= next(
2852 for x
in enumerate(db_vnfr
["kdur"])
2853 if x
[1]["kdu-name"] == kdu_name
2855 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2856 if kdur
.get("additionalParams"):
2857 deploy_params_kdu
.update(
2858 parse_yaml_strings(kdur
["additionalParams"].copy())
2862 logging_text
=logging_text
,
2865 nslcmop_id
=nslcmop_id
,
2871 member_vnf_index
=member_vnf_index
,
2872 vdu_index
=vdu_index
,
2873 kdu_index
=kdu_index
,
2875 deploy_params
=deploy_params_kdu
,
2876 descriptor_config
=descriptor_config
,
2877 base_folder
=base_folder
,
2878 task_instantiation_info
=tasks_dict_info
,
2882 # Check if each vnf has exporter for metric collection if so update prometheus job records
2883 if "exporters-endpoints" in vnfd
.get("df")[0]:
2884 exporter_config
= vnfd
.get("df")[0].get("exporters-endpoints")
2885 self
.logger
.debug("exporter config :{}".format(exporter_config
))
2886 artifact_path
= "{}/{}/{}".format(
2887 base_folder
["folder"],
2888 base_folder
["pkg-dir"],
2889 "exporter-endpoint",
2892 ee_config_descriptor
= exporter_config
2893 vnfr_id
= db_vnfr
["id"]
2894 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2903 self
.logger
.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip
))
2904 self
.logger
.debug("Artifact_path:{}".format(artifact_path
))
2905 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
2906 vdu_id_for_prom
= None
2907 vdu_index_for_prom
= None
2908 for x
in get_iterable(db_vnfr
, "vdur"):
2909 vdu_id_for_prom
= x
.get("vdu-id-ref")
2910 vdu_index_for_prom
= x
.get("count-index")
2911 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2913 artifact_path
=artifact_path
,
2914 ee_config_descriptor
=ee_config_descriptor
,
2917 target_ip
=rw_mgmt_ip
,
2919 vdu_id
=vdu_id_for_prom
,
2920 vdu_index
=vdu_index_for_prom
,
2923 self
.logger
.debug("Prometheus job:{}".format(prometheus_jobs
))
2925 db_nsr_update
["_admin.deployed.prometheus_jobs"] = prometheus_jobs
2932 for job
in prometheus_jobs
:
2935 {"job_name": job
["job_name"]},
2938 fail_on_empty
=False,
2941 # Check if this NS has a charm configuration
2942 descriptor_config
= nsd
.get("ns-configuration")
2943 if descriptor_config
and descriptor_config
.get("juju"):
2946 member_vnf_index
= None
2953 # Get additional parameters
2954 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2955 if db_nsr
.get("additionalParamsForNs"):
2956 deploy_params
.update(
2957 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2959 base_folder
= nsd
["_admin"]["storage"]
2961 logging_text
=logging_text
,
2964 nslcmop_id
=nslcmop_id
,
2970 member_vnf_index
=member_vnf_index
,
2971 vdu_index
=vdu_index
,
2972 kdu_index
=kdu_index
,
2974 deploy_params
=deploy_params
,
2975 descriptor_config
=descriptor_config
,
2976 base_folder
=base_folder
,
2977 task_instantiation_info
=tasks_dict_info
,
2981 # rest of staff will be done at finally
2984 ROclient
.ROClientException
,
2990 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2993 except asyncio
.CancelledError
:
2995 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2997 exc
= "Operation was cancelled"
2998 except Exception as e
:
2999 exc
= traceback
.format_exc()
3000 self
.logger
.critical(
3001 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
3006 error_list
.append(str(exc
))
3008 # wait for pending tasks
3010 stage
[1] = "Waiting for instantiate pending tasks."
3011 self
.logger
.debug(logging_text
+ stage
[1])
3012 error_list
+= await self
._wait
_for
_tasks
(
3020 stage
[1] = stage
[2] = ""
3021 except asyncio
.CancelledError
:
3022 error_list
.append("Cancelled")
3023 # TODO cancel all tasks
3024 except Exception as exc
:
3025 error_list
.append(str(exc
))
3027 # update operation-status
3028 db_nsr_update
["operational-status"] = "running"
3029 # let's begin with VCA 'configured' status (later we can change it)
3030 db_nsr_update
["config-status"] = "configured"
3031 for task
, task_name
in tasks_dict_info
.items():
3032 if not task
.done() or task
.cancelled() or task
.exception():
3033 if task_name
.startswith(self
.task_name_deploy_vca
):
3034 # A N2VC task is pending
3035 db_nsr_update
["config-status"] = "failed"
3037 # RO or KDU task is pending
3038 db_nsr_update
["operational-status"] = "failed"
3040 # update status at database
3042 error_detail
= ". ".join(error_list
)
3043 self
.logger
.error(logging_text
+ error_detail
)
3044 error_description_nslcmop
= "{} Detail: {}".format(
3045 stage
[0], error_detail
3047 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
3048 nslcmop_id
, stage
[0]
3051 db_nsr_update
["detailed-status"] = (
3052 error_description_nsr
+ " Detail: " + error_detail
3054 db_nslcmop_update
["detailed-status"] = error_detail
3055 nslcmop_operation_state
= "FAILED"
3059 error_description_nsr
= error_description_nslcmop
= None
3061 db_nsr_update
["detailed-status"] = "Done"
3062 db_nslcmop_update
["detailed-status"] = "Done"
3063 nslcmop_operation_state
= "COMPLETED"
3064 # Gather auto-healing and auto-scaling alerts for each vnfr
3067 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3069 (sub
for sub
in db_vnfds
if sub
["_id"] == vnfr
["vnfd-id"]), None
3071 healing_alerts
= self
._gather
_vnfr
_healing
_alerts
(vnfr
, vnfd
)
3072 for alert
in healing_alerts
:
3073 self
.logger
.info(f
"Storing healing alert in MongoDB: {alert}")
3074 self
.db
.create("alerts", alert
)
3076 scaling_alerts
= self
._gather
_vnfr
_scaling
_alerts
(vnfr
, vnfd
)
3077 for alert
in scaling_alerts
:
3078 self
.logger
.info(f
"Storing scaling alert in MongoDB: {alert}")
3079 self
.db
.create("alerts", alert
)
3082 self
._write
_ns
_status
(
3085 current_operation
="IDLE",
3086 current_operation_id
=None,
3087 error_description
=error_description_nsr
,
3088 error_detail
=error_detail
,
3089 other_update
=db_nsr_update
,
3091 self
._write
_op
_status
(
3094 error_message
=error_description_nslcmop
,
3095 operation_state
=nslcmop_operation_state
,
3096 other_update
=db_nslcmop_update
,
3099 if nslcmop_operation_state
:
3101 await self
.msg
.aiowrite(
3106 "nslcmop_id": nslcmop_id
,
3107 "operationState": nslcmop_operation_state
,
3110 except Exception as e
:
3112 logging_text
+ "kafka_write notification Exception {}".format(e
)
3115 self
.logger
.debug(logging_text
+ "Exit")
3116 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
3118 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
3119 if vnfd_id
not in cached_vnfds
:
3120 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
3121 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
3123 return cached_vnfds
[vnfd_id
]
3125 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
3126 if vnf_profile_id
not in cached_vnfrs
:
3127 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
3130 "member-vnf-index-ref": vnf_profile_id
,
3131 "nsr-id-ref": nsr_id
,
3134 return cached_vnfrs
[vnf_profile_id
]
3136 def _is_deployed_vca_in_relation(
3137 self
, vca
: DeployedVCA
, relation
: Relation
3140 for endpoint
in (relation
.provider
, relation
.requirer
):
3141 if endpoint
["kdu-resource-profile-id"]:
3144 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
3145 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
3146 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
3152 def _update_ee_relation_data_with_implicit_data(
3153 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
3155 ee_relation_data
= safe_get_ee_relation(
3156 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
3158 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
3159 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
3160 "execution-environment-ref"
3162 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
3163 vnfd_id
= vnf_profile
["vnfd-id"]
3164 project
= nsd
["_admin"]["projects_read"][0]
3165 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3168 if ee_relation_level
== EELevel
.VNF
3169 else ee_relation_data
["vdu-profile-id"]
3171 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
3174 f
"not execution environments found for ee_relation {ee_relation_data}"
3176 ee_relation_data
["execution-environment-ref"] = ee
["id"]
3177 return ee_relation_data
3179 def _get_ns_relations(
3182 nsd
: Dict
[str, Any
],
3184 cached_vnfds
: Dict
[str, Any
],
3185 ) -> List
[Relation
]:
3187 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3188 for r
in db_ns_relations
:
3189 provider_dict
= None
3190 requirer_dict
= None
3191 if all(key
in r
for key
in ("provider", "requirer")):
3192 provider_dict
= r
["provider"]
3193 requirer_dict
= r
["requirer"]
3194 elif "entities" in r
:
3195 provider_id
= r
["entities"][0]["id"]
3198 "endpoint": r
["entities"][0]["endpoint"],
3200 if provider_id
!= nsd
["id"]:
3201 provider_dict
["vnf-profile-id"] = provider_id
3202 requirer_id
= r
["entities"][1]["id"]
3205 "endpoint": r
["entities"][1]["endpoint"],
3207 if requirer_id
!= nsd
["id"]:
3208 requirer_dict
["vnf-profile-id"] = requirer_id
3211 "provider/requirer or entities must be included in the relation."
3213 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3214 nsr_id
, nsd
, provider_dict
, cached_vnfds
3216 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3217 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3219 provider
= EERelation(relation_provider
)
3220 requirer
= EERelation(relation_requirer
)
3221 relation
= Relation(r
["name"], provider
, requirer
)
3222 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3224 relations
.append(relation
)
3227 def _get_vnf_relations(
3230 nsd
: Dict
[str, Any
],
3232 cached_vnfds
: Dict
[str, Any
],
3233 ) -> List
[Relation
]:
3235 if vca
.target_element
== "ns":
3236 self
.logger
.debug("VCA is a NS charm, not a VNF.")
3238 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3239 vnf_profile_id
= vnf_profile
["id"]
3240 vnfd_id
= vnf_profile
["vnfd-id"]
3241 project
= nsd
["_admin"]["projects_read"][0]
3242 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3243 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3244 for r
in db_vnf_relations
:
3245 provider_dict
= None
3246 requirer_dict
= None
3247 if all(key
in r
for key
in ("provider", "requirer")):
3248 provider_dict
= r
["provider"]
3249 requirer_dict
= r
["requirer"]
3250 elif "entities" in r
:
3251 provider_id
= r
["entities"][0]["id"]
3254 "vnf-profile-id": vnf_profile_id
,
3255 "endpoint": r
["entities"][0]["endpoint"],
3257 if provider_id
!= vnfd_id
:
3258 provider_dict
["vdu-profile-id"] = provider_id
3259 requirer_id
= r
["entities"][1]["id"]
3262 "vnf-profile-id": vnf_profile_id
,
3263 "endpoint": r
["entities"][1]["endpoint"],
3265 if requirer_id
!= vnfd_id
:
3266 requirer_dict
["vdu-profile-id"] = requirer_id
3269 "provider/requirer or entities must be included in the relation."
3271 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3272 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3274 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3275 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3277 provider
= EERelation(relation_provider
)
3278 requirer
= EERelation(relation_requirer
)
3279 relation
= Relation(r
["name"], provider
, requirer
)
3280 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3282 relations
.append(relation
)
3285 def _get_kdu_resource_data(
3287 ee_relation
: EERelation
,
3288 db_nsr
: Dict
[str, Any
],
3289 cached_vnfds
: Dict
[str, Any
],
3290 ) -> DeployedK8sResource
:
3291 nsd
= get_nsd(db_nsr
)
3292 vnf_profiles
= get_vnf_profiles(nsd
)
3293 vnfd_id
= find_in_list(
3295 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3297 project
= nsd
["_admin"]["projects_read"][0]
3298 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3299 kdu_resource_profile
= get_kdu_resource_profile(
3300 db_vnfd
, ee_relation
.kdu_resource_profile_id
3302 kdu_name
= kdu_resource_profile
["kdu-name"]
3303 deployed_kdu
, _
= get_deployed_kdu(
3304 db_nsr
.get("_admin", ()).get("deployed", ()),
3306 ee_relation
.vnf_profile_id
,
3308 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3311 def _get_deployed_component(
3313 ee_relation
: EERelation
,
3314 db_nsr
: Dict
[str, Any
],
3315 cached_vnfds
: Dict
[str, Any
],
3316 ) -> DeployedComponent
:
3317 nsr_id
= db_nsr
["_id"]
3318 deployed_component
= None
3319 ee_level
= EELevel
.get_level(ee_relation
)
3320 if ee_level
== EELevel
.NS
:
3321 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3323 deployed_component
= DeployedVCA(nsr_id
, vca
)
3324 elif ee_level
== EELevel
.VNF
:
3325 vca
= get_deployed_vca(
3329 "member-vnf-index": ee_relation
.vnf_profile_id
,
3330 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3334 deployed_component
= DeployedVCA(nsr_id
, vca
)
3335 elif ee_level
== EELevel
.VDU
:
3336 vca
= get_deployed_vca(
3339 "vdu_id": ee_relation
.vdu_profile_id
,
3340 "member-vnf-index": ee_relation
.vnf_profile_id
,
3341 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3345 deployed_component
= DeployedVCA(nsr_id
, vca
)
3346 elif ee_level
== EELevel
.KDU
:
3347 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3348 ee_relation
, db_nsr
, cached_vnfds
3350 if kdu_resource_data
:
3351 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3352 return deployed_component
3354 async def _add_relation(
3358 db_nsr
: Dict
[str, Any
],
3359 cached_vnfds
: Dict
[str, Any
],
3360 cached_vnfrs
: Dict
[str, Any
],
3362 deployed_provider
= self
._get
_deployed
_component
(
3363 relation
.provider
, db_nsr
, cached_vnfds
3365 deployed_requirer
= self
._get
_deployed
_component
(
3366 relation
.requirer
, db_nsr
, cached_vnfds
3370 and deployed_requirer
3371 and deployed_provider
.config_sw_installed
3372 and deployed_requirer
.config_sw_installed
3374 provider_db_vnfr
= (
3376 relation
.provider
.nsr_id
,
3377 relation
.provider
.vnf_profile_id
,
3380 if relation
.provider
.vnf_profile_id
3383 requirer_db_vnfr
= (
3385 relation
.requirer
.nsr_id
,
3386 relation
.requirer
.vnf_profile_id
,
3389 if relation
.requirer
.vnf_profile_id
3392 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3393 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3394 provider_relation_endpoint
= RelationEndpoint(
3395 deployed_provider
.ee_id
,
3397 relation
.provider
.endpoint
,
3399 requirer_relation_endpoint
= RelationEndpoint(
3400 deployed_requirer
.ee_id
,
3402 relation
.requirer
.endpoint
,
3405 await self
.vca_map
[vca_type
].add_relation(
3406 provider
=provider_relation_endpoint
,
3407 requirer
=requirer_relation_endpoint
,
3409 except N2VCException
as exception
:
3410 self
.logger
.error(exception
)
3411 raise LcmException(exception
)
3415 async def _add_vca_relations(
3421 timeout
: int = 3600,
3424 # 1. find all relations for this VCA
3425 # 2. wait for other peers related
3429 # STEP 1: find all relations for this VCA
3432 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3433 nsd
= get_nsd(db_nsr
)
3436 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3437 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3442 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3443 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3445 # if no relations, terminate
3447 self
.logger
.debug(logging_text
+ " No relations")
3450 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3457 if now
- start
>= timeout
:
3458 self
.logger
.error(logging_text
+ " : timeout adding relations")
3461 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3462 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3464 # for each relation, find the VCA's related
3465 for relation
in relations
.copy():
3466 added
= await self
._add
_relation
(
3474 relations
.remove(relation
)
3477 self
.logger
.debug("Relations added")
3479 await asyncio
.sleep(5.0)
3483 except Exception as e
:
3484 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3487 async def _install_kdu(
3495 k8s_instance_info
: dict,
3496 k8params
: dict = None,
3501 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3504 "collection": "nsrs",
3505 "filter": {"_id": nsr_id
},
3506 "path": nsr_db_path
,
3509 if k8s_instance_info
.get("kdu-deployment-name"):
3510 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3512 kdu_instance
= self
.k8scluster_map
[
3514 ].generate_kdu_instance_name(
3515 db_dict
=db_dict_install
,
3516 kdu_model
=k8s_instance_info
["kdu-model"],
3517 kdu_name
=k8s_instance_info
["kdu-name"],
3520 # Update the nsrs table with the kdu-instance value
3524 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3527 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3528 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3529 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3530 # namespace, this first verification could be removed, and the next step would be done for any kind
3532 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3533 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3534 if k8sclustertype
in ("juju", "juju-bundle"):
3535 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3536 # that the user passed a namespace which he wants its KDU to be deployed in)
3542 "_admin.projects_write": k8s_instance_info
["namespace"],
3543 "_admin.projects_read": k8s_instance_info
["namespace"],
3549 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3554 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3556 k8s_instance_info
["namespace"] = kdu_instance
3558 await self
.k8scluster_map
[k8sclustertype
].install(
3559 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3560 kdu_model
=k8s_instance_info
["kdu-model"],
3563 db_dict
=db_dict_install
,
3565 kdu_name
=k8s_instance_info
["kdu-name"],
3566 namespace
=k8s_instance_info
["namespace"],
3567 kdu_instance
=kdu_instance
,
3571 # Obtain services to obtain management service ip
3572 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3573 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3574 kdu_instance
=kdu_instance
,
3575 namespace
=k8s_instance_info
["namespace"],
3578 # Obtain management service info (if exists)
3579 vnfr_update_dict
= {}
3580 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3582 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3587 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3590 for service
in kdud
.get("service", [])
3591 if service
.get("mgmt-service")
3593 for mgmt_service
in mgmt_services
:
3594 for service
in services
:
3595 if service
["name"].startswith(mgmt_service
["name"]):
3596 # Mgmt service found, Obtain service ip
3597 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3598 if isinstance(ip
, list) and len(ip
) == 1:
3602 "kdur.{}.ip-address".format(kdu_index
)
3605 # Check if must update also mgmt ip at the vnf
3606 service_external_cp
= mgmt_service
.get(
3607 "external-connection-point-ref"
3609 if service_external_cp
:
3611 deep_get(vnfd
, ("mgmt-interface", "cp"))
3612 == service_external_cp
3614 vnfr_update_dict
["ip-address"] = ip
3619 "external-connection-point-ref", ""
3621 == service_external_cp
,
3624 "kdur.{}.ip-address".format(kdu_index
)
3629 "Mgmt service name: {} not found".format(
3630 mgmt_service
["name"]
3634 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3635 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3637 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3640 and kdu_config
.get("initial-config-primitive")
3641 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3643 initial_config_primitive_list
= kdu_config
.get(
3644 "initial-config-primitive"
3646 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3648 for initial_config_primitive
in initial_config_primitive_list
:
3649 primitive_params_
= self
._map
_primitive
_params
(
3650 initial_config_primitive
, {}, {}
3653 await asyncio
.wait_for(
3654 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3655 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3656 kdu_instance
=kdu_instance
,
3657 primitive_name
=initial_config_primitive
["name"],
3658 params
=primitive_params_
,
3659 db_dict
=db_dict_install
,
3665 except Exception as e
:
3666 # Prepare update db with error and raise exception
3669 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3673 vnfr_data
.get("_id"),
3674 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3677 # ignore to keep original exception
3679 # reraise original error
3684 async def deploy_kdus(
3691 task_instantiation_info
,
3693 # Launch kdus if present in the descriptor
3695 k8scluster_id_2_uuic
= {
3696 "helm-chart-v3": {},
3701 async def _get_cluster_id(cluster_id
, cluster_type
):
3702 nonlocal k8scluster_id_2_uuic
3703 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3704 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3706 # check if K8scluster is creating and wait look if previous tasks in process
3707 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3708 "k8scluster", cluster_id
3711 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3712 task_name
, cluster_id
3714 self
.logger
.debug(logging_text
+ text
)
3715 await asyncio
.wait(task_dependency
, timeout
=3600)
3717 db_k8scluster
= self
.db
.get_one(
3718 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3720 if not db_k8scluster
:
3721 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3723 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3725 if cluster_type
== "helm-chart-v3":
3727 # backward compatibility for existing clusters that have not been initialized for helm v3
3728 k8s_credentials
= yaml
.safe_dump(
3729 db_k8scluster
.get("credentials")
3731 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3732 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3734 db_k8scluster_update
= {}
3735 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3736 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3737 db_k8scluster_update
[
3738 "_admin.helm-chart-v3.created"
3740 db_k8scluster_update
[
3741 "_admin.helm-chart-v3.operationalState"
3744 "k8sclusters", cluster_id
, db_k8scluster_update
3746 except Exception as e
:
3749 + "error initializing helm-v3 cluster: {}".format(str(e
))
3752 "K8s cluster '{}' has not been initialized for '{}'".format(
3753 cluster_id
, cluster_type
3758 "K8s cluster '{}' has not been initialized for '{}'".format(
3759 cluster_id
, cluster_type
3762 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3765 logging_text
+= "Deploy kdus: "
3768 db_nsr_update
= {"_admin.deployed.K8s": []}
3769 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3772 updated_cluster_list
= []
3773 updated_v3_cluster_list
= []
3775 for vnfr_data
in db_vnfrs
.values():
3776 vca_id
= self
.get_vca_id(vnfr_data
, {})
3777 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3778 # Step 0: Prepare and set parameters
3779 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3780 vnfd_id
= vnfr_data
.get("vnfd-id")
3781 vnfd_with_id
= find_in_list(
3782 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3786 for kdud
in vnfd_with_id
["kdu"]
3787 if kdud
["name"] == kdur
["kdu-name"]
3789 namespace
= kdur
.get("k8s-namespace")
3790 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3791 if kdur
.get("helm-chart"):
3792 kdumodel
= kdur
["helm-chart"]
3793 # Default version: helm3, if helm-version is v2 assign v2
3794 k8sclustertype
= "helm-chart-v3"
3795 self
.logger
.debug("kdur: {}".format(kdur
))
3797 kdur
.get("helm-version")
3798 and kdur
.get("helm-version") == "v2"
3800 k8sclustertype
= "helm-chart"
3801 elif kdur
.get("juju-bundle"):
3802 kdumodel
= kdur
["juju-bundle"]
3803 k8sclustertype
= "juju-bundle"
3806 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3807 "juju-bundle. Maybe an old NBI version is running".format(
3808 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3811 # check if kdumodel is a file and exists
3813 vnfd_with_id
= find_in_list(
3814 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3816 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3817 if storage
: # may be not present if vnfd has not artifacts
3818 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3819 if storage
["pkg-dir"]:
3820 filename
= "{}/{}/{}s/{}".format(
3827 filename
= "{}/Scripts/{}s/{}".format(
3832 if self
.fs
.file_exists(
3833 filename
, mode
="file"
3834 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3835 kdumodel
= self
.fs
.path
+ filename
3836 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3838 except Exception: # it is not a file
3841 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3842 step
= "Synchronize repos for k8s cluster '{}'".format(
3845 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3849 k8sclustertype
== "helm-chart"
3850 and cluster_uuid
not in updated_cluster_list
3852 k8sclustertype
== "helm-chart-v3"
3853 and cluster_uuid
not in updated_v3_cluster_list
3855 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3856 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3857 cluster_uuid
=cluster_uuid
3860 if del_repo_list
or added_repo_dict
:
3861 if k8sclustertype
== "helm-chart":
3863 "_admin.helm_charts_added." + item
: None
3864 for item
in del_repo_list
3867 "_admin.helm_charts_added." + item
: name
3868 for item
, name
in added_repo_dict
.items()
3870 updated_cluster_list
.append(cluster_uuid
)
3871 elif k8sclustertype
== "helm-chart-v3":
3873 "_admin.helm_charts_v3_added." + item
: None
3874 for item
in del_repo_list
3877 "_admin.helm_charts_v3_added." + item
: name
3878 for item
, name
in added_repo_dict
.items()
3880 updated_v3_cluster_list
.append(cluster_uuid
)
3882 logging_text
+ "repos synchronized on k8s cluster "
3883 "'{}' to_delete: {}, to_add: {}".format(
3884 k8s_cluster_id
, del_repo_list
, added_repo_dict
3889 {"_id": k8s_cluster_id
},
3895 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3896 vnfr_data
["member-vnf-index-ref"],
3900 k8s_instance_info
= {
3901 "kdu-instance": None,
3902 "k8scluster-uuid": cluster_uuid
,
3903 "k8scluster-type": k8sclustertype
,
3904 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3905 "kdu-name": kdur
["kdu-name"],
3906 "kdu-model": kdumodel
,
3907 "namespace": namespace
,
3908 "kdu-deployment-name": kdu_deployment_name
,
3910 db_path
= "_admin.deployed.K8s.{}".format(index
)
3911 db_nsr_update
[db_path
] = k8s_instance_info
3912 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3913 vnfd_with_id
= find_in_list(
3914 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3916 task
= asyncio
.ensure_future(
3925 k8params
=desc_params
,
3930 self
.lcm_tasks
.register(
3934 "instantiate_KDU-{}".format(index
),
3937 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3943 except (LcmException
, asyncio
.CancelledError
):
3945 except Exception as e
:
3946 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3947 if isinstance(e
, (N2VCException
, DbException
)):
3948 self
.logger
.error(logging_text
+ msg
)
3950 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3951 raise LcmException(msg
)
3954 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3974 task_instantiation_info
,
3977 # launch instantiate_N2VC in a asyncio task and register task object
3978 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3979 # if not found, create one entry and update database
3980 # fill db_nsr._admin.deployed.VCA.<index>
3983 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3987 get_charm_name
= False
3988 if "execution-environment-list" in descriptor_config
:
3989 ee_list
= descriptor_config
.get("execution-environment-list", [])
3990 elif "juju" in descriptor_config
:
3991 ee_list
= [descriptor_config
] # ns charms
3992 if "execution-environment-list" not in descriptor_config
:
3993 # charm name is only required for ns charms
3994 get_charm_name
= True
3995 else: # other types as script are not supported
3998 for ee_item
in ee_list
:
4001 + "_deploy_n2vc ee_item juju={}, helm={}".format(
4002 ee_item
.get("juju"), ee_item
.get("helm-chart")
4005 ee_descriptor_id
= ee_item
.get("id")
4006 if ee_item
.get("juju"):
4007 vca_name
= ee_item
["juju"].get("charm")
4009 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
4012 if ee_item
["juju"].get("charm") is not None
4015 if ee_item
["juju"].get("cloud") == "k8s":
4016 vca_type
= "k8s_proxy_charm"
4017 elif ee_item
["juju"].get("proxy") is False:
4018 vca_type
= "native_charm"
4019 elif ee_item
.get("helm-chart"):
4020 vca_name
= ee_item
["helm-chart"]
4021 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
4024 vca_type
= "helm-v3"
4027 logging_text
+ "skipping non juju neither charm configuration"
4032 for vca_index
, vca_deployed
in enumerate(
4033 db_nsr
["_admin"]["deployed"]["VCA"]
4035 if not vca_deployed
:
4038 vca_deployed
.get("member-vnf-index") == member_vnf_index
4039 and vca_deployed
.get("vdu_id") == vdu_id
4040 and vca_deployed
.get("kdu_name") == kdu_name
4041 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
4042 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
4046 # not found, create one.
4048 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
4051 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
4053 target
+= "/kdu/{}".format(kdu_name
)
4055 "target_element": target
,
4056 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
4057 "member-vnf-index": member_vnf_index
,
4059 "kdu_name": kdu_name
,
4060 "vdu_count_index": vdu_index
,
4061 "operational-status": "init", # TODO revise
4062 "detailed-status": "", # TODO revise
4063 "step": "initial-deploy", # TODO revise
4065 "vdu_name": vdu_name
,
4067 "ee_descriptor_id": ee_descriptor_id
,
4068 "charm_name": charm_name
,
4072 # create VCA and configurationStatus in db
4074 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
4075 "configurationStatus.{}".format(vca_index
): dict(),
4077 self
.update_db_2("nsrs", nsr_id
, db_dict
)
4079 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
4081 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
4082 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
4083 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
4086 task_n2vc
= asyncio
.ensure_future(
4087 self
.instantiate_N2VC(
4088 logging_text
=logging_text
,
4089 vca_index
=vca_index
,
4095 vdu_index
=vdu_index
,
4096 kdu_index
=kdu_index
,
4097 deploy_params
=deploy_params
,
4098 config_descriptor
=descriptor_config
,
4099 base_folder
=base_folder
,
4100 nslcmop_id
=nslcmop_id
,
4104 ee_config_descriptor
=ee_item
,
4107 self
.lcm_tasks
.register(
4111 "instantiate_N2VC-{}".format(vca_index
),
4114 task_instantiation_info
[
4116 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
4117 member_vnf_index
or "", vdu_id
or ""
4121 def _create_nslcmop(nsr_id
, operation
, params
):
4123 Creates a ns-lcm-opp content to be stored at database.
4124 :param nsr_id: internal id of the instance
4125 :param operation: instantiate, terminate, scale, action, ...
4126 :param params: user parameters for the operation
4127 :return: dictionary following SOL005 format
4129 # Raise exception if invalid arguments
4130 if not (nsr_id
and operation
and params
):
4132 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
4139 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
4140 "operationState": "PROCESSING",
4141 "statusEnteredTime": now
,
4142 "nsInstanceId": nsr_id
,
4143 "lcmOperationType": operation
,
4145 "isAutomaticInvocation": False,
4146 "operationParams": params
,
4147 "isCancelPending": False,
4149 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
4150 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
4155 def _format_additional_params(self
, params
):
4156 params
= params
or {}
4157 for key
, value
in params
.items():
4158 if str(value
).startswith("!!yaml "):
4159 params
[key
] = yaml
.safe_load(value
[7:])
4162 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
4163 primitive
= seq
.get("name")
4164 primitive_params
= {}
4166 "member_vnf_index": vnf_index
,
4167 "primitive": primitive
,
4168 "primitive_params": primitive_params
,
4171 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
4175 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
4176 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
4177 if op
.get("operationState") == "COMPLETED":
4178 # b. Skip sub-operation
4179 # _ns_execute_primitive() or RO.create_action() will NOT be executed
4180 return self
.SUBOPERATION_STATUS_SKIP
4182 # c. retry executing sub-operation
4183 # The sub-operation exists, and operationState != 'COMPLETED'
4184 # Update operationState = 'PROCESSING' to indicate a retry.
4185 operationState
= "PROCESSING"
4186 detailed_status
= "In progress"
4187 self
._update
_suboperation
_status
(
4188 db_nslcmop
, op_index
, operationState
, detailed_status
4190 # Return the sub-operation index
4191 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4192 # with arguments extracted from the sub-operation
4195 # Find a sub-operation where all keys in a matching dictionary must match
4196 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4197 def _find_suboperation(self
, db_nslcmop
, match
):
4198 if db_nslcmop
and match
:
4199 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4200 for i
, op
in enumerate(op_list
):
4201 if all(op
.get(k
) == match
[k
] for k
in match
):
4203 return self
.SUBOPERATION_STATUS_NOT_FOUND
4205 # Update status for a sub-operation given its index
4206 def _update_suboperation_status(
4207 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4209 # Update DB for HA tasks
4210 q_filter
= {"_id": db_nslcmop
["_id"]}
4212 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4213 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4216 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4219 # Add sub-operation, return the index of the added sub-operation
4220 # Optionally, set operationState, detailed-status, and operationType
4221 # Status and type are currently set for 'scale' sub-operations:
4222 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4223 # 'detailed-status' : status message
4224 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4225 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4226 def _add_suboperation(
4234 mapped_primitive_params
,
4235 operationState
=None,
4236 detailed_status
=None,
4239 RO_scaling_info
=None,
4242 return self
.SUBOPERATION_STATUS_NOT_FOUND
4243 # Get the "_admin.operations" list, if it exists
4244 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4245 op_list
= db_nslcmop_admin
.get("operations")
4246 # Create or append to the "_admin.operations" list
4248 "member_vnf_index": vnf_index
,
4250 "vdu_count_index": vdu_count_index
,
4251 "primitive": primitive
,
4252 "primitive_params": mapped_primitive_params
,
4255 new_op
["operationState"] = operationState
4257 new_op
["detailed-status"] = detailed_status
4259 new_op
["lcmOperationType"] = operationType
4261 new_op
["RO_nsr_id"] = RO_nsr_id
4263 new_op
["RO_scaling_info"] = RO_scaling_info
4265 # No existing operations, create key 'operations' with current operation as first list element
4266 db_nslcmop_admin
.update({"operations": [new_op
]})
4267 op_list
= db_nslcmop_admin
.get("operations")
4269 # Existing operations, append operation to list
4270 op_list
.append(new_op
)
4272 db_nslcmop_update
= {"_admin.operations": op_list
}
4273 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4274 op_index
= len(op_list
) - 1
4277 # Helper methods for scale() sub-operations
4279 # pre-scale/post-scale:
4280 # Check for 3 different cases:
4281 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4282 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4283 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4284 def _check_or_add_scale_suboperation(
4288 vnf_config_primitive
,
4292 RO_scaling_info
=None,
4294 # Find this sub-operation
4295 if RO_nsr_id
and RO_scaling_info
:
4296 operationType
= "SCALE-RO"
4298 "member_vnf_index": vnf_index
,
4299 "RO_nsr_id": RO_nsr_id
,
4300 "RO_scaling_info": RO_scaling_info
,
4304 "member_vnf_index": vnf_index
,
4305 "primitive": vnf_config_primitive
,
4306 "primitive_params": primitive_params
,
4307 "lcmOperationType": operationType
,
4309 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4310 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4311 # a. New sub-operation
4312 # The sub-operation does not exist, add it.
4313 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4314 # The following parameters are set to None for all kind of scaling:
4316 vdu_count_index
= None
4318 if RO_nsr_id
and RO_scaling_info
:
4319 vnf_config_primitive
= None
4320 primitive_params
= None
4323 RO_scaling_info
= None
4324 # Initial status for sub-operation
4325 operationState
= "PROCESSING"
4326 detailed_status
= "In progress"
4327 # Add sub-operation for pre/post-scaling (zero or more operations)
4328 self
._add
_suboperation
(
4334 vnf_config_primitive
,
4342 return self
.SUBOPERATION_STATUS_NEW
4344 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4345 # or op_index (operationState != 'COMPLETED')
4346 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4348 # Function to return execution_environment id
4350 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4351 # TODO vdu_index_count
4352 for vca
in vca_deployed_list
:
4353 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4354 return vca
.get("ee_id")
4356 async def destroy_N2VC(
4364 exec_primitives
=True,
4369 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4370 :param logging_text:
4372 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4373 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4374 :param vca_index: index in the database _admin.deployed.VCA
4375 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4376 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4377 not executed properly
4378 :param scaling_in: True destroys the application, False destroys the model
4379 :return: None or exception
4384 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4385 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4389 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4391 # execute terminate_primitives
4393 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4394 config_descriptor
.get("terminate-config-primitive"),
4395 vca_deployed
.get("ee_descriptor_id"),
4397 vdu_id
= vca_deployed
.get("vdu_id")
4398 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4399 vdu_name
= vca_deployed
.get("vdu_name")
4400 vnf_index
= vca_deployed
.get("member-vnf-index")
4401 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4402 for seq
in terminate_primitives
:
4403 # For each sequence in list, get primitive and call _ns_execute_primitive()
4404 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4405 vnf_index
, seq
.get("name")
4407 self
.logger
.debug(logging_text
+ step
)
4408 # Create the primitive for each sequence, i.e. "primitive": "touch"
4409 primitive
= seq
.get("name")
4410 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4415 self
._add
_suboperation
(
4422 mapped_primitive_params
,
4424 # Sub-operations: Call _ns_execute_primitive() instead of action()
4426 result
, result_detail
= await self
._ns
_execute
_primitive
(
4427 vca_deployed
["ee_id"],
4429 mapped_primitive_params
,
4433 except LcmException
:
4434 # this happens when VCA is not deployed. In this case it is not needed to terminate
4436 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4437 if result
not in result_ok
:
4439 "terminate_primitive {} for vnf_member_index={} fails with "
4440 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4442 # set that this VCA do not need terminated
4443 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4447 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4450 # Delete Prometheus Jobs if any
4451 # This uses NSR_ID, so it will destroy any jobs under this index
4452 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4455 await self
.vca_map
[vca_type
].delete_execution_environment(
4456 vca_deployed
["ee_id"],
4457 scaling_in
=scaling_in
,
4462 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4463 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4464 namespace
= "." + db_nsr
["_id"]
4466 await self
.n2vc
.delete_namespace(
4467 namespace
=namespace
,
4468 total_timeout
=self
.timeout
.charm_delete
,
4471 except N2VCNotFound
: # already deleted. Skip
4473 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4475 async def terminate(self
, nsr_id
, nslcmop_id
):
4476 # Try to lock HA task here
4477 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4478 if not task_is_locked_by_me
:
4481 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4482 self
.logger
.debug(logging_text
+ "Enter")
4483 timeout_ns_terminate
= self
.timeout
.ns_terminate
4486 operation_params
= None
4488 error_list
= [] # annotates all failed error messages
4489 db_nslcmop_update
= {}
4490 autoremove
= False # autoremove after terminated
4491 tasks_dict_info
= {}
4494 "Stage 1/3: Preparing task.",
4495 "Waiting for previous operations to terminate.",
4498 # ^ contains [stage, step, VIM-status]
4500 # wait for any previous tasks in process
4501 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4503 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4504 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4505 operation_params
= db_nslcmop
.get("operationParams") or {}
4506 if operation_params
.get("timeout_ns_terminate"):
4507 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4508 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4509 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4511 db_nsr_update
["operational-status"] = "terminating"
4512 db_nsr_update
["config-status"] = "terminating"
4513 self
._write
_ns
_status
(
4515 ns_state
="TERMINATING",
4516 current_operation
="TERMINATING",
4517 current_operation_id
=nslcmop_id
,
4518 other_update
=db_nsr_update
,
4520 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4521 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4522 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4525 stage
[1] = "Getting vnf descriptors from db."
4526 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4528 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4530 db_vnfds_from_id
= {}
4531 db_vnfds_from_member_index
= {}
4533 for vnfr
in db_vnfrs_list
:
4534 vnfd_id
= vnfr
["vnfd-id"]
4535 if vnfd_id
not in db_vnfds_from_id
:
4536 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4537 db_vnfds_from_id
[vnfd_id
] = vnfd
4538 db_vnfds_from_member_index
[
4539 vnfr
["member-vnf-index-ref"]
4540 ] = db_vnfds_from_id
[vnfd_id
]
4542 # Destroy individual execution environments when there are terminating primitives.
4543 # Rest of EE will be deleted at once
4544 # TODO - check before calling _destroy_N2VC
4545 # if not operation_params.get("skip_terminate_primitives"):#
4546 # or not vca.get("needed_terminate"):
4547 stage
[0] = "Stage 2/3 execute terminating primitives."
4548 self
.logger
.debug(logging_text
+ stage
[0])
4549 stage
[1] = "Looking execution environment that needs terminate."
4550 self
.logger
.debug(logging_text
+ stage
[1])
4552 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4553 config_descriptor
= None
4554 vca_member_vnf_index
= vca
.get("member-vnf-index")
4555 vca_id
= self
.get_vca_id(
4556 db_vnfrs_dict
.get(vca_member_vnf_index
)
4557 if vca_member_vnf_index
4561 if not vca
or not vca
.get("ee_id"):
4563 if not vca
.get("member-vnf-index"):
4565 config_descriptor
= db_nsr
.get("ns-configuration")
4566 elif vca
.get("vdu_id"):
4567 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4568 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4569 elif vca
.get("kdu_name"):
4570 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4571 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4573 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4574 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4575 vca_type
= vca
.get("type")
4576 exec_terminate_primitives
= not operation_params
.get(
4577 "skip_terminate_primitives"
4578 ) and vca
.get("needed_terminate")
4579 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4580 # pending native charms
4582 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4584 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4585 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4586 task
= asyncio
.ensure_future(
4594 exec_terminate_primitives
,
4598 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4600 # wait for pending tasks of terminate primitives
4604 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4606 error_list
= await self
._wait
_for
_tasks
(
4609 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4613 tasks_dict_info
.clear()
4615 return # raise LcmException("; ".join(error_list))
4617 # remove All execution environments at once
4618 stage
[0] = "Stage 3/3 delete all."
4620 if nsr_deployed
.get("VCA"):
4621 stage
[1] = "Deleting all execution environments."
4622 self
.logger
.debug(logging_text
+ stage
[1])
4623 vca_id
= self
.get_vca_id({}, db_nsr
)
4624 task_delete_ee
= asyncio
.ensure_future(
4626 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4627 timeout
=self
.timeout
.charm_delete
,
4630 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4631 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4633 # Delete Namespace and Certificates if necessary
4634 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4635 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4636 certificate_name
=db_nslcmop
["nsInstanceId"],
4638 # TODO: Delete namespace
4640 # Delete from k8scluster
4641 stage
[1] = "Deleting KDUs."
4642 self
.logger
.debug(logging_text
+ stage
[1])
4643 # print(nsr_deployed)
4644 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4645 if not kdu
or not kdu
.get("kdu-instance"):
4647 kdu_instance
= kdu
.get("kdu-instance")
4648 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4649 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4650 vca_id
= self
.get_vca_id({}, db_nsr
)
4651 task_delete_kdu_instance
= asyncio
.ensure_future(
4652 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4653 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4654 kdu_instance
=kdu_instance
,
4656 namespace
=kdu
.get("namespace"),
4662 + "Unknown k8s deployment type {}".format(
4663 kdu
.get("k8scluster-type")
4668 task_delete_kdu_instance
4669 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4672 stage
[1] = "Deleting ns from VIM."
4673 if self
.ro_config
.ng
:
4674 task_delete_ro
= asyncio
.ensure_future(
4675 self
._terminate
_ng
_ro
(
4676 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4679 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4681 # rest of staff will be done at finally
4684 ROclient
.ROClientException
,
4689 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4691 except asyncio
.CancelledError
:
4693 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4695 exc
= "Operation was cancelled"
4696 except Exception as e
:
4697 exc
= traceback
.format_exc()
4698 self
.logger
.critical(
4699 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4704 error_list
.append(str(exc
))
4706 # wait for pending tasks
4708 stage
[1] = "Waiting for terminate pending tasks."
4709 self
.logger
.debug(logging_text
+ stage
[1])
4710 error_list
+= await self
._wait
_for
_tasks
(
4713 timeout_ns_terminate
,
4717 stage
[1] = stage
[2] = ""
4718 except asyncio
.CancelledError
:
4719 error_list
.append("Cancelled")
4720 # TODO cancell all tasks
4721 except Exception as exc
:
4722 error_list
.append(str(exc
))
4723 # update status at database
4725 error_detail
= "; ".join(error_list
)
4726 # self.logger.error(logging_text + error_detail)
4727 error_description_nslcmop
= "{} Detail: {}".format(
4728 stage
[0], error_detail
4730 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4731 nslcmop_id
, stage
[0]
4734 db_nsr_update
["operational-status"] = "failed"
4735 db_nsr_update
["detailed-status"] = (
4736 error_description_nsr
+ " Detail: " + error_detail
4738 db_nslcmop_update
["detailed-status"] = error_detail
4739 nslcmop_operation_state
= "FAILED"
4743 error_description_nsr
= error_description_nslcmop
= None
4744 ns_state
= "NOT_INSTANTIATED"
4745 db_nsr_update
["operational-status"] = "terminated"
4746 db_nsr_update
["detailed-status"] = "Done"
4747 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4748 db_nslcmop_update
["detailed-status"] = "Done"
4749 nslcmop_operation_state
= "COMPLETED"
4752 self
._write
_ns
_status
(
4755 current_operation
="IDLE",
4756 current_operation_id
=None,
4757 error_description
=error_description_nsr
,
4758 error_detail
=error_detail
,
4759 other_update
=db_nsr_update
,
4761 self
._write
_op
_status
(
4764 error_message
=error_description_nslcmop
,
4765 operation_state
=nslcmop_operation_state
,
4766 other_update
=db_nslcmop_update
,
4768 if ns_state
== "NOT_INSTANTIATED":
4772 {"nsr-id-ref": nsr_id
},
4773 {"_admin.nsState": "NOT_INSTANTIATED"},
4775 except DbException
as e
:
4778 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4782 if operation_params
:
4783 autoremove
= operation_params
.get("autoremove", False)
4784 if nslcmop_operation_state
:
4786 await self
.msg
.aiowrite(
4791 "nslcmop_id": nslcmop_id
,
4792 "operationState": nslcmop_operation_state
,
4793 "autoremove": autoremove
,
4796 except Exception as e
:
4798 logging_text
+ "kafka_write notification Exception {}".format(e
)
4800 self
.logger
.debug(f
"Deleting alerts: ns_id={nsr_id}")
4801 self
.db
.del_list("alerts", {"tags.ns_id": nsr_id
})
4803 self
.logger
.debug(logging_text
+ "Exit")
4804 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4806 async def _wait_for_tasks(
4807 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4810 error_detail_list
= []
4812 pending_tasks
= list(created_tasks_info
.keys())
4813 num_tasks
= len(pending_tasks
)
4815 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4816 self
._write
_op
_status
(nslcmop_id
, stage
)
4817 while pending_tasks
:
4819 _timeout
= timeout
+ time_start
- time()
4820 done
, pending_tasks
= await asyncio
.wait(
4821 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4823 num_done
+= len(done
)
4824 if not done
: # Timeout
4825 for task
in pending_tasks
:
4826 new_error
= created_tasks_info
[task
] + ": Timeout"
4827 error_detail_list
.append(new_error
)
4828 error_list
.append(new_error
)
4831 if task
.cancelled():
4834 exc
= task
.exception()
4836 if isinstance(exc
, asyncio
.TimeoutError
):
4838 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4839 error_list
.append(created_tasks_info
[task
])
4840 error_detail_list
.append(new_error
)
4847 ROclient
.ROClientException
,
4853 self
.logger
.error(logging_text
+ new_error
)
4855 exc_traceback
= "".join(
4856 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4860 + created_tasks_info
[task
]
4866 logging_text
+ created_tasks_info
[task
] + ": Done"
4868 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4870 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4871 if nsr_id
: # update also nsr
4876 "errorDescription": "Error at: " + ", ".join(error_list
),
4877 "errorDetail": ". ".join(error_detail_list
),
4880 self
._write
_op
_status
(nslcmop_id
, stage
)
4881 return error_detail_list
4884 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4886 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4887 The default-value is used. If it is between < > it look for a value at instantiation_params
4888 :param primitive_desc: portion of VNFD/NSD that describes primitive
4889 :param params: Params provided by user
4890 :param instantiation_params: Instantiation params provided by user
4891 :return: a dictionary with the calculated params
4893 calculated_params
= {}
4894 for parameter
in primitive_desc
.get("parameter", ()):
4895 param_name
= parameter
["name"]
4896 if param_name
in params
:
4897 calculated_params
[param_name
] = params
[param_name
]
4898 elif "default-value" in parameter
or "value" in parameter
:
4899 if "value" in parameter
:
4900 calculated_params
[param_name
] = parameter
["value"]
4902 calculated_params
[param_name
] = parameter
["default-value"]
4904 isinstance(calculated_params
[param_name
], str)
4905 and calculated_params
[param_name
].startswith("<")
4906 and calculated_params
[param_name
].endswith(">")
4908 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4909 calculated_params
[param_name
] = instantiation_params
[
4910 calculated_params
[param_name
][1:-1]
4914 "Parameter {} needed to execute primitive {} not provided".format(
4915 calculated_params
[param_name
], primitive_desc
["name"]
4920 "Parameter {} needed to execute primitive {} not provided".format(
4921 param_name
, primitive_desc
["name"]
4925 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4926 calculated_params
[param_name
] = yaml
.safe_dump(
4927 calculated_params
[param_name
], default_flow_style
=True, width
=256
4929 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4931 ].startswith("!!yaml "):
4932 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4933 if parameter
.get("data-type") == "INTEGER":
4935 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4936 except ValueError: # error converting string to int
4938 "Parameter {} of primitive {} must be integer".format(
4939 param_name
, primitive_desc
["name"]
4942 elif parameter
.get("data-type") == "BOOLEAN":
4943 calculated_params
[param_name
] = not (
4944 (str(calculated_params
[param_name
])).lower() == "false"
4947 # add always ns_config_info if primitive name is config
4948 if primitive_desc
["name"] == "config":
4949 if "ns_config_info" in instantiation_params
:
4950 calculated_params
["ns_config_info"] = instantiation_params
[
4953 return calculated_params
4955 def _look_for_deployed_vca(
4962 ee_descriptor_id
=None,
4964 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4965 for vca
in deployed_vca
:
4968 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4971 vdu_count_index
is not None
4972 and vdu_count_index
!= vca
["vdu_count_index"]
4975 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4977 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4981 # vca_deployed not found
4983 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4984 " is not deployed".format(
4993 ee_id
= vca
.get("ee_id")
4995 "type", "lxc_proxy_charm"
4996 ) # default value for backward compatibility - proxy charm
4999 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
5000 "execution environment".format(
5001 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5004 return ee_id
, vca_type
5006 async def _ns_execute_primitive(
5012 retries_interval
=30,
5019 if primitive
== "config":
5020 primitive_params
= {"params": primitive_params
}
5022 vca_type
= vca_type
or "lxc_proxy_charm"
5026 output
= await asyncio
.wait_for(
5027 self
.vca_map
[vca_type
].exec_primitive(
5029 primitive_name
=primitive
,
5030 params_dict
=primitive_params
,
5031 progress_timeout
=self
.timeout
.progress_primitive
,
5032 total_timeout
=self
.timeout
.primitive
,
5037 timeout
=timeout
or self
.timeout
.primitive
,
5041 except asyncio
.CancelledError
:
5043 except Exception as e
:
5047 "Error executing action {} on {} -> {}".format(
5052 await asyncio
.sleep(retries_interval
)
5054 if isinstance(e
, asyncio
.TimeoutError
):
5056 message
="Timed out waiting for action to complete"
5058 return "FAILED", getattr(e
, "message", repr(e
))
5060 return "COMPLETED", output
5062 except (LcmException
, asyncio
.CancelledError
):
5064 except Exception as e
:
5065 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5067 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5069 Updating the vca_status with latest juju information in nsrs record
5070 :param: nsr_id: Id of the nsr
5071 :param: nslcmop_id: Id of the nslcmop
5075 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5076 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5077 vca_id
= self
.get_vca_id({}, db_nsr
)
5078 if db_nsr
["_admin"]["deployed"]["K8s"]:
5079 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5080 cluster_uuid
, kdu_instance
, cluster_type
= (
5081 k8s
["k8scluster-uuid"],
5082 k8s
["kdu-instance"],
5083 k8s
["k8scluster-type"],
5085 await self
._on
_update
_k
8s
_db
(
5086 cluster_uuid
=cluster_uuid
,
5087 kdu_instance
=kdu_instance
,
5088 filter={"_id": nsr_id
},
5090 cluster_type
=cluster_type
,
5093 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5094 table
, filter = "nsrs", {"_id": nsr_id
}
5095 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5096 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5098 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5099 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5101 async def action(self
, nsr_id
, nslcmop_id
):
5102 # Try to lock HA task here
5103 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5104 if not task_is_locked_by_me
:
5107 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5108 self
.logger
.debug(logging_text
+ "Enter")
5109 # get all needed from database
5113 db_nslcmop_update
= {}
5114 nslcmop_operation_state
= None
5115 error_description_nslcmop
= None
5119 # wait for any previous tasks in process
5120 step
= "Waiting for previous operations to terminate"
5121 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5123 self
._write
_ns
_status
(
5126 current_operation
="RUNNING ACTION",
5127 current_operation_id
=nslcmop_id
,
5130 step
= "Getting information from database"
5131 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5132 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5133 if db_nslcmop
["operationParams"].get("primitive_params"):
5134 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5135 db_nslcmop
["operationParams"]["primitive_params"]
5138 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5139 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5140 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5141 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5142 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5143 primitive
= db_nslcmop
["operationParams"]["primitive"]
5144 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5145 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5146 "timeout_ns_action", self
.timeout
.primitive
5150 step
= "Getting vnfr from database"
5151 db_vnfr
= self
.db
.get_one(
5152 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5154 if db_vnfr
.get("kdur"):
5156 for kdur
in db_vnfr
["kdur"]:
5157 if kdur
.get("additionalParams"):
5158 kdur
["additionalParams"] = json
.loads(
5159 kdur
["additionalParams"]
5161 kdur_list
.append(kdur
)
5162 db_vnfr
["kdur"] = kdur_list
5163 step
= "Getting vnfd from database"
5164 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5166 # Sync filesystem before running a primitive
5167 self
.fs
.sync(db_vnfr
["vnfd-id"])
5169 step
= "Getting nsd from database"
5170 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5172 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5173 # for backward compatibility
5174 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5175 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5176 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5177 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5179 # look for primitive
5180 config_primitive_desc
= descriptor_configuration
= None
5182 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5184 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5186 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5188 descriptor_configuration
= db_nsd
.get("ns-configuration")
5190 if descriptor_configuration
and descriptor_configuration
.get(
5193 for config_primitive
in descriptor_configuration
["config-primitive"]:
5194 if config_primitive
["name"] == primitive
:
5195 config_primitive_desc
= config_primitive
5198 if not config_primitive_desc
:
5199 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5201 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5205 primitive_name
= primitive
5206 ee_descriptor_id
= None
5208 primitive_name
= config_primitive_desc
.get(
5209 "execution-environment-primitive", primitive
5211 ee_descriptor_id
= config_primitive_desc
.get(
5212 "execution-environment-ref"
5218 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5220 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5223 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5225 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5227 desc_params
= parse_yaml_strings(
5228 db_vnfr
.get("additionalParamsForVnf")
5231 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5232 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5233 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5235 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5236 actions
.add(primitive
["name"])
5237 for primitive
in kdu_configuration
.get("config-primitive", []):
5238 actions
.add(primitive
["name"])
5240 nsr_deployed
["K8s"],
5241 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5242 and kdu
["member-vnf-index"] == vnf_index
,
5246 if primitive_name
in actions
5247 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5251 # TODO check if ns is in a proper status
5253 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5255 # kdur and desc_params already set from before
5256 if primitive_params
:
5257 desc_params
.update(primitive_params
)
5258 # TODO Check if we will need something at vnf level
5259 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5261 kdu_name
== kdu
["kdu-name"]
5262 and kdu
["member-vnf-index"] == vnf_index
5267 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5270 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5271 msg
= "unknown k8scluster-type '{}'".format(
5272 kdu
.get("k8scluster-type")
5274 raise LcmException(msg
)
5277 "collection": "nsrs",
5278 "filter": {"_id": nsr_id
},
5279 "path": "_admin.deployed.K8s.{}".format(index
),
5283 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5285 step
= "Executing kdu {}".format(primitive_name
)
5286 if primitive_name
== "upgrade":
5287 if desc_params
.get("kdu_model"):
5288 kdu_model
= desc_params
.get("kdu_model")
5289 del desc_params
["kdu_model"]
5291 kdu_model
= kdu
.get("kdu-model")
5292 if kdu_model
.count("/") < 2: # helm chart is not embedded
5293 parts
= kdu_model
.split(sep
=":")
5295 kdu_model
= parts
[0]
5296 if desc_params
.get("kdu_atomic_upgrade"):
5297 atomic_upgrade
= desc_params
.get(
5298 "kdu_atomic_upgrade"
5299 ).lower() in ("yes", "true", "1")
5300 del desc_params
["kdu_atomic_upgrade"]
5302 atomic_upgrade
= True
5304 detailed_status
= await asyncio
.wait_for(
5305 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5306 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5307 kdu_instance
=kdu
.get("kdu-instance"),
5308 atomic
=atomic_upgrade
,
5309 kdu_model
=kdu_model
,
5312 timeout
=timeout_ns_action
,
5314 timeout
=timeout_ns_action
+ 10,
5317 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5319 elif primitive_name
== "rollback":
5320 detailed_status
= await asyncio
.wait_for(
5321 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5322 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5323 kdu_instance
=kdu
.get("kdu-instance"),
5326 timeout
=timeout_ns_action
,
5328 elif primitive_name
== "status":
5329 detailed_status
= await asyncio
.wait_for(
5330 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5331 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5332 kdu_instance
=kdu
.get("kdu-instance"),
5335 timeout
=timeout_ns_action
,
5338 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5339 kdu
["kdu-name"], nsr_id
5341 params
= self
._map
_primitive
_params
(
5342 config_primitive_desc
, primitive_params
, desc_params
5345 detailed_status
= await asyncio
.wait_for(
5346 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5347 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5348 kdu_instance
=kdu_instance
,
5349 primitive_name
=primitive_name
,
5352 timeout
=timeout_ns_action
,
5355 timeout
=timeout_ns_action
,
5359 nslcmop_operation_state
= "COMPLETED"
5361 detailed_status
= ""
5362 nslcmop_operation_state
= "FAILED"
5364 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5365 nsr_deployed
["VCA"],
5366 member_vnf_index
=vnf_index
,
5368 vdu_count_index
=vdu_count_index
,
5369 ee_descriptor_id
=ee_descriptor_id
,
5371 for vca_index
, vca_deployed
in enumerate(
5372 db_nsr
["_admin"]["deployed"]["VCA"]
5374 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5376 "collection": "nsrs",
5377 "filter": {"_id": nsr_id
},
5378 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5382 nslcmop_operation_state
,
5384 ) = await self
._ns
_execute
_primitive
(
5386 primitive
=primitive_name
,
5387 primitive_params
=self
._map
_primitive
_params
(
5388 config_primitive_desc
, primitive_params
, desc_params
5390 timeout
=timeout_ns_action
,
5396 db_nslcmop_update
["detailed-status"] = detailed_status
5397 error_description_nslcmop
= (
5398 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5402 + "Done with result {} {}".format(
5403 nslcmop_operation_state
, detailed_status
5406 return # database update is called inside finally
5408 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5409 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5411 except asyncio
.CancelledError
:
5413 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5415 exc
= "Operation was cancelled"
5416 except asyncio
.TimeoutError
:
5417 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5419 except Exception as e
:
5420 exc
= traceback
.format_exc()
5421 self
.logger
.critical(
5422 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5431 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5432 nslcmop_operation_state
= "FAILED"
5434 self
._write
_ns
_status
(
5438 ], # TODO check if degraded. For the moment use previous status
5439 current_operation
="IDLE",
5440 current_operation_id
=None,
5441 # error_description=error_description_nsr,
5442 # error_detail=error_detail,
5443 other_update
=db_nsr_update
,
5446 self
._write
_op
_status
(
5449 error_message
=error_description_nslcmop
,
5450 operation_state
=nslcmop_operation_state
,
5451 other_update
=db_nslcmop_update
,
5454 if nslcmop_operation_state
:
5456 await self
.msg
.aiowrite(
5461 "nslcmop_id": nslcmop_id
,
5462 "operationState": nslcmop_operation_state
,
5465 except Exception as e
:
5467 logging_text
+ "kafka_write notification Exception {}".format(e
)
5469 self
.logger
.debug(logging_text
+ "Exit")
5470 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5471 return nslcmop_operation_state
, detailed_status
5473 async def terminate_vdus(
5474 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5476 """This method terminates VDUs
5479 db_vnfr: VNF instance record
5480 member_vnf_index: VNF index to identify the VDUs to be removed
5481 db_nsr: NS instance record
5482 update_db_nslcmops: Nslcmop update record
5484 vca_scaling_info
= []
5485 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5486 scaling_info
["scaling_direction"] = "IN"
5487 scaling_info
["vdu-delete"] = {}
5488 scaling_info
["kdu-delete"] = {}
5489 db_vdur
= db_vnfr
.get("vdur")
5490 vdur_list
= copy(db_vdur
)
5492 for index
, vdu
in enumerate(vdur_list
):
5493 vca_scaling_info
.append(
5495 "osm_vdu_id": vdu
["vdu-id-ref"],
5496 "member-vnf-index": member_vnf_index
,
5498 "vdu_index": count_index
,
5501 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5502 scaling_info
["vdu"].append(
5504 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5505 "vdu_id": vdu
["vdu-id-ref"],
5509 for interface
in vdu
["interfaces"]:
5510 scaling_info
["vdu"][index
]["interface"].append(
5512 "name": interface
["name"],
5513 "ip_address": interface
["ip-address"],
5514 "mac_address": interface
.get("mac-address"),
5517 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5518 stage
[2] = "Terminating VDUs"
5519 if scaling_info
.get("vdu-delete"):
5520 # scale_process = "RO"
5521 if self
.ro_config
.ng
:
5522 await self
._scale
_ng
_ro
(
5531 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5532 """This method is to Remove VNF instances from NS.
5535 nsr_id: NS instance id
5536 nslcmop_id: nslcmop id of update
5537 vnf_instance_id: id of the VNF instance to be removed
5540 result: (str, str) COMPLETED/FAILED, details
5544 logging_text
= "Task ns={} update ".format(nsr_id
)
5545 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5546 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5547 if check_vnfr_count
> 1:
5548 stage
= ["", "", ""]
5549 step
= "Getting nslcmop from database"
5551 step
+ " after having waited for previous tasks to be completed"
5553 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5554 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5555 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5556 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5557 """ db_vnfr = self.db.get_one(
5558 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5560 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5561 await self
.terminate_vdus(
5570 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5571 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5572 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5573 "constituent-vnfr-ref"
5575 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5576 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5577 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5578 return "COMPLETED", "Done"
5580 step
= "Terminate VNF Failed with"
5582 "{} Cannot terminate the last VNF in this NS.".format(
5586 except (LcmException
, asyncio
.CancelledError
):
5588 except Exception as e
:
5589 self
.logger
.debug("Error removing VNF {}".format(e
))
5590 return "FAILED", "Error removing VNF {}".format(e
)
5592 async def _ns_redeploy_vnf(
5600 """This method updates and redeploys VNF instances
5603 nsr_id: NS instance id
5604 nslcmop_id: nslcmop id
5605 db_vnfd: VNF descriptor
5606 db_vnfr: VNF instance record
5607 db_nsr: NS instance record
5610 result: (str, str) COMPLETED/FAILED, details
5614 stage
= ["", "", ""]
5615 logging_text
= "Task ns={} update ".format(nsr_id
)
5616 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5617 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5619 # Terminate old VNF resources
5620 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5621 await self
.terminate_vdus(
5630 # old_vnfd_id = db_vnfr["vnfd-id"]
5631 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5632 new_db_vnfd
= db_vnfd
5633 # new_vnfd_ref = new_db_vnfd["id"]
5634 # new_vnfd_id = vnfd_id
5638 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5640 "name": cp
.get("id"),
5641 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5642 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5645 new_vnfr_cp
.append(vnf_cp
)
5646 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5647 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5648 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5650 "revision": latest_vnfd_revision
,
5651 "connection-point": new_vnfr_cp
,
5655 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5656 updated_db_vnfr
= self
.db
.get_one(
5658 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5661 # Instantiate new VNF resources
5662 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5663 vca_scaling_info
= []
5664 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5665 scaling_info
["scaling_direction"] = "OUT"
5666 scaling_info
["vdu-create"] = {}
5667 scaling_info
["kdu-create"] = {}
5668 vdud_instantiate_list
= db_vnfd
["vdu"]
5669 for index
, vdud
in enumerate(vdud_instantiate_list
):
5670 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5672 additional_params
= (
5673 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5676 cloud_init_list
= []
5678 # TODO Information of its own ip is not available because db_vnfr is not updated.
5679 additional_params
["OSM"] = get_osm_params(
5680 updated_db_vnfr
, vdud
["id"], 1
5682 cloud_init_list
.append(
5683 self
._parse
_cloud
_init
(
5690 vca_scaling_info
.append(
5692 "osm_vdu_id": vdud
["id"],
5693 "member-vnf-index": member_vnf_index
,
5695 "vdu_index": count_index
,
5698 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5699 if self
.ro_config
.ng
:
5701 "New Resources to be deployed: {}".format(scaling_info
)
5703 await self
._scale
_ng
_ro
(
5711 return "COMPLETED", "Done"
5712 except (LcmException
, asyncio
.CancelledError
):
5714 except Exception as e
:
5715 self
.logger
.debug("Error updating VNF {}".format(e
))
5716 return "FAILED", "Error updating VNF {}".format(e
)
5718 async def _ns_charm_upgrade(
5724 timeout
: float = None,
5726 """This method upgrade charms in VNF instances
5729 ee_id: Execution environment id
5730 path: Local path to the charm
5732 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5733 timeout: (Float) Timeout for the ns update operation
5736 result: (str, str) COMPLETED/FAILED, details
5739 charm_type
= charm_type
or "lxc_proxy_charm"
5740 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5744 charm_type
=charm_type
,
5745 timeout
=timeout
or self
.timeout
.ns_update
,
5749 return "COMPLETED", output
5751 except (LcmException
, asyncio
.CancelledError
):
5754 except Exception as e
:
5755 self
.logger
.debug("Error upgrading charm {}".format(path
))
5757 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5759 async def update(self
, nsr_id
, nslcmop_id
):
5760 """Update NS according to different update types
5762 This method performs upgrade of VNF instances then updates the revision
5763 number in VNF record
5766 nsr_id: Network service will be updated
5767 nslcmop_id: ns lcm operation id
5770 It may raise DbException, LcmException, N2VCException, K8sException
5773 # Try to lock HA task here
5774 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5775 if not task_is_locked_by_me
:
5778 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5779 self
.logger
.debug(logging_text
+ "Enter")
5781 # Set the required variables to be filled up later
5783 db_nslcmop_update
= {}
5785 nslcmop_operation_state
= None
5787 error_description_nslcmop
= ""
5789 change_type
= "updated"
5790 detailed_status
= ""
5791 member_vnf_index
= None
5794 # wait for any previous tasks in process
5795 step
= "Waiting for previous operations to terminate"
5796 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5797 self
._write
_ns
_status
(
5800 current_operation
="UPDATING",
5801 current_operation_id
=nslcmop_id
,
5804 step
= "Getting nslcmop from database"
5805 db_nslcmop
= self
.db
.get_one(
5806 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5808 update_type
= db_nslcmop
["operationParams"]["updateType"]
5810 step
= "Getting nsr from database"
5811 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5812 old_operational_status
= db_nsr
["operational-status"]
5813 db_nsr_update
["operational-status"] = "updating"
5814 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5815 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5817 if update_type
== "CHANGE_VNFPKG":
5818 # Get the input parameters given through update request
5819 vnf_instance_id
= db_nslcmop
["operationParams"][
5820 "changeVnfPackageData"
5821 ].get("vnfInstanceId")
5823 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5826 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5828 step
= "Getting vnfr from database"
5829 db_vnfr
= self
.db
.get_one(
5830 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5833 step
= "Getting vnfds from database"
5835 latest_vnfd
= self
.db
.get_one(
5836 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5838 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5841 current_vnf_revision
= db_vnfr
.get("revision", 1)
5842 current_vnfd
= self
.db
.get_one(
5844 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5845 fail_on_empty
=False,
5847 # Charm artifact paths will be filled up later
5849 current_charm_artifact_path
,
5850 target_charm_artifact_path
,
5851 charm_artifact_paths
,
5853 ) = ([], [], [], [])
5855 step
= "Checking if revision has changed in VNFD"
5856 if current_vnf_revision
!= latest_vnfd_revision
:
5857 change_type
= "policy_updated"
5859 # There is new revision of VNFD, update operation is required
5860 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5861 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5863 step
= "Removing the VNFD packages if they exist in the local path"
5864 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5865 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5867 step
= "Get the VNFD packages from FSMongo"
5868 self
.fs
.sync(from_path
=latest_vnfd_path
)
5869 self
.fs
.sync(from_path
=current_vnfd_path
)
5872 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5874 current_base_folder
= current_vnfd
["_admin"]["storage"]
5875 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5877 for vca_index
, vca_deployed
in enumerate(
5878 get_iterable(nsr_deployed
, "VCA")
5880 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5882 # Getting charm-id and charm-type
5883 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5884 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5885 vca_type
= vca_deployed
.get("type")
5886 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5889 ee_id
= vca_deployed
.get("ee_id")
5891 step
= "Getting descriptor config"
5892 if current_vnfd
.get("kdu"):
5893 search_key
= "kdu_name"
5895 search_key
= "vnfd_id"
5897 entity_id
= vca_deployed
.get(search_key
)
5899 descriptor_config
= get_configuration(
5900 current_vnfd
, entity_id
5903 if "execution-environment-list" in descriptor_config
:
5904 ee_list
= descriptor_config
.get(
5905 "execution-environment-list", []
5910 # There could be several charm used in the same VNF
5911 for ee_item
in ee_list
:
5912 if ee_item
.get("juju"):
5913 step
= "Getting charm name"
5914 charm_name
= ee_item
["juju"].get("charm")
5916 step
= "Setting Charm artifact paths"
5917 current_charm_artifact_path
.append(
5918 get_charm_artifact_path(
5919 current_base_folder
,
5922 current_vnf_revision
,
5925 target_charm_artifact_path
.append(
5926 get_charm_artifact_path(
5930 latest_vnfd_revision
,
5933 elif ee_item
.get("helm-chart"):
5934 # add chart to list and all parameters
5935 step
= "Getting helm chart name"
5936 chart_name
= ee_item
.get("helm-chart")
5938 ee_item
.get("helm-version")
5939 and ee_item
.get("helm-version") == "v2"
5943 vca_type
= "helm-v3"
5944 step
= "Setting Helm chart artifact paths"
5946 helm_artifacts
.append(
5948 "current_artifact_path": get_charm_artifact_path(
5949 current_base_folder
,
5952 current_vnf_revision
,
5954 "target_artifact_path": get_charm_artifact_path(
5958 latest_vnfd_revision
,
5961 "vca_index": vca_index
,
5962 "vdu_index": vdu_count_index
,
5966 charm_artifact_paths
= zip(
5967 current_charm_artifact_path
, target_charm_artifact_path
5970 step
= "Checking if software version has changed in VNFD"
5971 if find_software_version(current_vnfd
) != find_software_version(
5974 step
= "Checking if existing VNF has charm"
5975 for current_charm_path
, target_charm_path
in list(
5976 charm_artifact_paths
5978 if current_charm_path
:
5980 "Software version change is not supported as VNF instance {} has charm.".format(
5985 # There is no change in the charm package, then redeploy the VNF
5986 # based on new descriptor
5987 step
= "Redeploying VNF"
5988 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5989 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5990 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5992 if result
== "FAILED":
5993 nslcmop_operation_state
= result
5994 error_description_nslcmop
= detailed_status
5995 db_nslcmop_update
["detailed-status"] = detailed_status
5998 + " step {} Done with result {} {}".format(
5999 step
, nslcmop_operation_state
, detailed_status
6004 step
= "Checking if any charm package has changed or not"
6005 for current_charm_path
, target_charm_path
in list(
6006 charm_artifact_paths
6010 and target_charm_path
6011 and self
.check_charm_hash_changed(
6012 current_charm_path
, target_charm_path
6015 step
= "Checking whether VNF uses juju bundle"
6016 if check_juju_bundle_existence(current_vnfd
):
6018 "Charm upgrade is not supported for the instance which"
6019 " uses juju-bundle: {}".format(
6020 check_juju_bundle_existence(current_vnfd
)
6024 step
= "Upgrading Charm"
6028 ) = await self
._ns
_charm
_upgrade
(
6031 charm_type
=vca_type
,
6032 path
=self
.fs
.path
+ target_charm_path
,
6033 timeout
=timeout_seconds
,
6036 if result
== "FAILED":
6037 nslcmop_operation_state
= result
6038 error_description_nslcmop
= detailed_status
6040 db_nslcmop_update
["detailed-status"] = detailed_status
6043 + " step {} Done with result {} {}".format(
6044 step
, nslcmop_operation_state
, detailed_status
6048 step
= "Updating policies"
6049 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6050 result
= "COMPLETED"
6051 detailed_status
= "Done"
6052 db_nslcmop_update
["detailed-status"] = "Done"
6055 for item
in helm_artifacts
:
6057 item
["current_artifact_path"]
6058 and item
["target_artifact_path"]
6059 and self
.check_charm_hash_changed(
6060 item
["current_artifact_path"],
6061 item
["target_artifact_path"],
6065 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6068 vnfr_id
= db_vnfr
["_id"]
6069 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6071 "collection": "nsrs",
6072 "filter": {"_id": nsr_id
},
6073 "path": db_update_entry
,
6075 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6076 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6077 namespace
=namespace
,
6081 artifact_path
=item
["target_artifact_path"],
6084 vnf_id
= db_vnfr
.get("vnfd-ref")
6085 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6086 self
.logger
.debug("get ssh key block")
6090 ("config-access", "ssh-access", "required"),
6092 # Needed to inject a ssh key
6095 ("config-access", "ssh-access", "default-user"),
6098 "Install configuration Software, getting public ssh key"
6100 pub_key
= await self
.vca_map
[
6102 ].get_ee_ssh_public__key(
6103 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6107 "Insert public key into VM user={} ssh_key={}".format(
6111 self
.logger
.debug(logging_text
+ step
)
6113 # wait for RO (ip-address) Insert pub_key into VM
6114 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6124 initial_config_primitive_list
= config_descriptor
.get(
6125 "initial-config-primitive"
6127 config_primitive
= next(
6130 for p
in initial_config_primitive_list
6131 if p
["name"] == "config"
6135 if not config_primitive
:
6138 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6140 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6141 if db_vnfr
.get("additionalParamsForVnf"):
6142 deploy_params
.update(
6144 db_vnfr
["additionalParamsForVnf"].copy()
6147 primitive_params_
= self
._map
_primitive
_params
(
6148 config_primitive
, {}, deploy_params
6151 step
= "execute primitive '{}' params '{}'".format(
6152 config_primitive
["name"], primitive_params_
6154 self
.logger
.debug(logging_text
+ step
)
6155 await self
.vca_map
[vca_type
].exec_primitive(
6157 primitive_name
=config_primitive
["name"],
6158 params_dict
=primitive_params_
,
6164 step
= "Updating policies"
6165 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6166 detailed_status
= "Done"
6167 db_nslcmop_update
["detailed-status"] = "Done"
6169 # If nslcmop_operation_state is None, so any operation is not failed.
6170 if not nslcmop_operation_state
:
6171 nslcmop_operation_state
= "COMPLETED"
6173 # If update CHANGE_VNFPKG nslcmop_operation is successful
6174 # vnf revision need to be updated
6175 vnfr_update
["revision"] = latest_vnfd_revision
6176 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6180 + " task Done with result {} {}".format(
6181 nslcmop_operation_state
, detailed_status
6184 elif update_type
== "REMOVE_VNF":
6185 # This part is included in https://osm.etsi.org/gerrit/11876
6186 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6187 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6188 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6189 step
= "Removing VNF"
6190 (result
, detailed_status
) = await self
.remove_vnf(
6191 nsr_id
, nslcmop_id
, vnf_instance_id
6193 if result
== "FAILED":
6194 nslcmop_operation_state
= result
6195 error_description_nslcmop
= detailed_status
6196 db_nslcmop_update
["detailed-status"] = detailed_status
6197 change_type
= "vnf_terminated"
6198 if not nslcmop_operation_state
:
6199 nslcmop_operation_state
= "COMPLETED"
6202 + " task Done with result {} {}".format(
6203 nslcmop_operation_state
, detailed_status
6207 elif update_type
== "OPERATE_VNF":
6208 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6211 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6214 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6217 (result
, detailed_status
) = await self
.rebuild_start_stop(
6218 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6220 if result
== "FAILED":
6221 nslcmop_operation_state
= result
6222 error_description_nslcmop
= detailed_status
6223 db_nslcmop_update
["detailed-status"] = detailed_status
6224 if not nslcmop_operation_state
:
6225 nslcmop_operation_state
= "COMPLETED"
6228 + " task Done with result {} {}".format(
6229 nslcmop_operation_state
, detailed_status
6233 # If nslcmop_operation_state is None, so any operation is not failed.
6234 # All operations are executed in overall.
6235 if not nslcmop_operation_state
:
6236 nslcmop_operation_state
= "COMPLETED"
6237 db_nsr_update
["operational-status"] = old_operational_status
6239 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6240 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6242 except asyncio
.CancelledError
:
6244 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6246 exc
= "Operation was cancelled"
6247 except asyncio
.TimeoutError
:
6248 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6250 except Exception as e
:
6251 exc
= traceback
.format_exc()
6252 self
.logger
.critical(
6253 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6262 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6263 nslcmop_operation_state
= "FAILED"
6264 db_nsr_update
["operational-status"] = old_operational_status
6266 self
._write
_ns
_status
(
6268 ns_state
=db_nsr
["nsState"],
6269 current_operation
="IDLE",
6270 current_operation_id
=None,
6271 other_update
=db_nsr_update
,
6274 self
._write
_op
_status
(
6277 error_message
=error_description_nslcmop
,
6278 operation_state
=nslcmop_operation_state
,
6279 other_update
=db_nslcmop_update
,
6282 if nslcmop_operation_state
:
6286 "nslcmop_id": nslcmop_id
,
6287 "operationState": nslcmop_operation_state
,
6290 change_type
in ("vnf_terminated", "policy_updated")
6291 and member_vnf_index
6293 msg
.update({"vnf_member_index": member_vnf_index
})
6294 await self
.msg
.aiowrite("ns", change_type
, msg
)
6295 except Exception as e
:
6297 logging_text
+ "kafka_write notification Exception {}".format(e
)
6299 self
.logger
.debug(logging_text
+ "Exit")
6300 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6301 return nslcmop_operation_state
, detailed_status
6303 async def scale(self
, nsr_id
, nslcmop_id
):
6304 # Try to lock HA task here
6305 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6306 if not task_is_locked_by_me
:
6309 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6310 stage
= ["", "", ""]
6311 tasks_dict_info
= {}
6312 # ^ stage, step, VIM progress
6313 self
.logger
.debug(logging_text
+ "Enter")
6314 # get all needed from database
6316 db_nslcmop_update
= {}
6319 # in case of error, indicates what part of scale was failed to put nsr at error status
6320 scale_process
= None
6321 old_operational_status
= ""
6322 old_config_status
= ""
6325 # wait for any previous tasks in process
6326 step
= "Waiting for previous operations to terminate"
6327 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6328 self
._write
_ns
_status
(
6331 current_operation
="SCALING",
6332 current_operation_id
=nslcmop_id
,
6335 step
= "Getting nslcmop from database"
6337 step
+ " after having waited for previous tasks to be completed"
6339 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6341 step
= "Getting nsr from database"
6342 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6343 old_operational_status
= db_nsr
["operational-status"]
6344 old_config_status
= db_nsr
["config-status"]
6346 step
= "Parsing scaling parameters"
6347 db_nsr_update
["operational-status"] = "scaling"
6348 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6349 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6351 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6353 ]["member-vnf-index"]
6354 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6356 ]["scaling-group-descriptor"]
6357 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6358 # for backward compatibility
6359 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6360 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6361 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6362 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6364 step
= "Getting vnfr from database"
6365 db_vnfr
= self
.db
.get_one(
6366 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6369 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6371 step
= "Getting vnfd from database"
6372 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6374 base_folder
= db_vnfd
["_admin"]["storage"]
6376 step
= "Getting scaling-group-descriptor"
6377 scaling_descriptor
= find_in_list(
6378 get_scaling_aspect(db_vnfd
),
6379 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6381 if not scaling_descriptor
:
6383 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6384 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6387 step
= "Sending scale order to VIM"
6388 # TODO check if ns is in a proper status
6390 if not db_nsr
["_admin"].get("scaling-group"):
6395 "_admin.scaling-group": [
6396 {"name": scaling_group
, "nb-scale-op": 0}
6400 admin_scale_index
= 0
6402 for admin_scale_index
, admin_scale_info
in enumerate(
6403 db_nsr
["_admin"]["scaling-group"]
6405 if admin_scale_info
["name"] == scaling_group
:
6406 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6408 else: # not found, set index one plus last element and add new entry with the name
6409 admin_scale_index
+= 1
6411 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6414 vca_scaling_info
= []
6415 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6416 if scaling_type
== "SCALE_OUT":
6417 if "aspect-delta-details" not in scaling_descriptor
:
6419 "Aspect delta details not fount in scaling descriptor {}".format(
6420 scaling_descriptor
["name"]
6423 # count if max-instance-count is reached
6424 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6426 scaling_info
["scaling_direction"] = "OUT"
6427 scaling_info
["vdu-create"] = {}
6428 scaling_info
["kdu-create"] = {}
6429 for delta
in deltas
:
6430 for vdu_delta
in delta
.get("vdu-delta", {}):
6431 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6432 # vdu_index also provides the number of instance of the targeted vdu
6433 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6434 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6438 additional_params
= (
6439 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6442 cloud_init_list
= []
6444 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6445 max_instance_count
= 10
6446 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6447 max_instance_count
= vdu_profile
.get(
6448 "max-number-of-instances", 10
6451 default_instance_num
= get_number_of_instances(
6454 instances_number
= vdu_delta
.get("number-of-instances", 1)
6455 nb_scale_op
+= instances_number
6457 new_instance_count
= nb_scale_op
+ default_instance_num
6458 # Control if new count is over max and vdu count is less than max.
6459 # Then assign new instance count
6460 if new_instance_count
> max_instance_count
> vdu_count
:
6461 instances_number
= new_instance_count
- max_instance_count
6463 instances_number
= instances_number
6465 if new_instance_count
> max_instance_count
:
6467 "reached the limit of {} (max-instance-count) "
6468 "scaling-out operations for the "
6469 "scaling-group-descriptor '{}'".format(
6470 nb_scale_op
, scaling_group
6473 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6475 # TODO Information of its own ip is not available because db_vnfr is not updated.
6476 additional_params
["OSM"] = get_osm_params(
6477 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6479 cloud_init_list
.append(
6480 self
._parse
_cloud
_init
(
6487 vca_scaling_info
.append(
6489 "osm_vdu_id": vdu_delta
["id"],
6490 "member-vnf-index": vnf_index
,
6492 "vdu_index": vdu_index
+ x
,
6495 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6496 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6497 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6498 kdu_name
= kdu_profile
["kdu-name"]
6499 resource_name
= kdu_profile
.get("resource-name", "")
6501 # Might have different kdus in the same delta
6502 # Should have list for each kdu
6503 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6504 scaling_info
["kdu-create"][kdu_name
] = []
6506 kdur
= get_kdur(db_vnfr
, kdu_name
)
6507 if kdur
.get("helm-chart"):
6508 k8s_cluster_type
= "helm-chart-v3"
6509 self
.logger
.debug("kdur: {}".format(kdur
))
6511 kdur
.get("helm-version")
6512 and kdur
.get("helm-version") == "v2"
6514 k8s_cluster_type
= "helm-chart"
6515 elif kdur
.get("juju-bundle"):
6516 k8s_cluster_type
= "juju-bundle"
6519 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6520 "juju-bundle. Maybe an old NBI version is running".format(
6521 db_vnfr
["member-vnf-index-ref"], kdu_name
6525 max_instance_count
= 10
6526 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6527 max_instance_count
= kdu_profile
.get(
6528 "max-number-of-instances", 10
6531 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6532 deployed_kdu
, _
= get_deployed_kdu(
6533 nsr_deployed
, kdu_name
, vnf_index
6535 if deployed_kdu
is None:
6537 "KDU '{}' for vnf '{}' not deployed".format(
6541 kdu_instance
= deployed_kdu
.get("kdu-instance")
6542 instance_num
= await self
.k8scluster_map
[
6548 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6549 kdu_model
=deployed_kdu
.get("kdu-model"),
6551 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6552 "number-of-instances", 1
6555 # Control if new count is over max and instance_num is less than max.
6556 # Then assign max instance number to kdu replica count
6557 if kdu_replica_count
> max_instance_count
> instance_num
:
6558 kdu_replica_count
= max_instance_count
6559 if kdu_replica_count
> max_instance_count
:
6561 "reached the limit of {} (max-instance-count) "
6562 "scaling-out operations for the "
6563 "scaling-group-descriptor '{}'".format(
6564 instance_num
, scaling_group
6568 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6569 vca_scaling_info
.append(
6571 "osm_kdu_id": kdu_name
,
6572 "member-vnf-index": vnf_index
,
6574 "kdu_index": instance_num
+ x
- 1,
6577 scaling_info
["kdu-create"][kdu_name
].append(
6579 "member-vnf-index": vnf_index
,
6581 "k8s-cluster-type": k8s_cluster_type
,
6582 "resource-name": resource_name
,
6583 "scale": kdu_replica_count
,
6586 elif scaling_type
== "SCALE_IN":
6587 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6589 scaling_info
["scaling_direction"] = "IN"
6590 scaling_info
["vdu-delete"] = {}
6591 scaling_info
["kdu-delete"] = {}
6593 for delta
in deltas
:
6594 for vdu_delta
in delta
.get("vdu-delta", {}):
6595 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6596 min_instance_count
= 0
6597 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6598 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6599 min_instance_count
= vdu_profile
["min-number-of-instances"]
6601 default_instance_num
= get_number_of_instances(
6602 db_vnfd
, vdu_delta
["id"]
6604 instance_num
= vdu_delta
.get("number-of-instances", 1)
6605 nb_scale_op
-= instance_num
6607 new_instance_count
= nb_scale_op
+ default_instance_num
6609 if new_instance_count
< min_instance_count
< vdu_count
:
6610 instances_number
= min_instance_count
- new_instance_count
6612 instances_number
= instance_num
6614 if new_instance_count
< min_instance_count
:
6616 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6617 "scaling-group-descriptor '{}'".format(
6618 nb_scale_op
, scaling_group
6621 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6622 vca_scaling_info
.append(
6624 "osm_vdu_id": vdu_delta
["id"],
6625 "member-vnf-index": vnf_index
,
6627 "vdu_index": vdu_index
- 1 - x
,
6630 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6631 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6632 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6633 kdu_name
= kdu_profile
["kdu-name"]
6634 resource_name
= kdu_profile
.get("resource-name", "")
6636 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6637 scaling_info
["kdu-delete"][kdu_name
] = []
6639 kdur
= get_kdur(db_vnfr
, kdu_name
)
6640 if kdur
.get("helm-chart"):
6641 k8s_cluster_type
= "helm-chart-v3"
6642 self
.logger
.debug("kdur: {}".format(kdur
))
6644 kdur
.get("helm-version")
6645 and kdur
.get("helm-version") == "v2"
6647 k8s_cluster_type
= "helm-chart"
6648 elif kdur
.get("juju-bundle"):
6649 k8s_cluster_type
= "juju-bundle"
6652 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6653 "juju-bundle. Maybe an old NBI version is running".format(
6654 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6658 min_instance_count
= 0
6659 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6660 min_instance_count
= kdu_profile
["min-number-of-instances"]
6662 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6663 deployed_kdu
, _
= get_deployed_kdu(
6664 nsr_deployed
, kdu_name
, vnf_index
6666 if deployed_kdu
is None:
6668 "KDU '{}' for vnf '{}' not deployed".format(
6672 kdu_instance
= deployed_kdu
.get("kdu-instance")
6673 instance_num
= await self
.k8scluster_map
[
6679 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6680 kdu_model
=deployed_kdu
.get("kdu-model"),
6682 kdu_replica_count
= instance_num
- kdu_delta
.get(
6683 "number-of-instances", 1
6686 if kdu_replica_count
< min_instance_count
< instance_num
:
6687 kdu_replica_count
= min_instance_count
6688 if kdu_replica_count
< min_instance_count
:
6690 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6691 "scaling-group-descriptor '{}'".format(
6692 instance_num
, scaling_group
6696 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6697 vca_scaling_info
.append(
6699 "osm_kdu_id": kdu_name
,
6700 "member-vnf-index": vnf_index
,
6702 "kdu_index": instance_num
- x
- 1,
6705 scaling_info
["kdu-delete"][kdu_name
].append(
6707 "member-vnf-index": vnf_index
,
6709 "k8s-cluster-type": k8s_cluster_type
,
6710 "resource-name": resource_name
,
6711 "scale": kdu_replica_count
,
6715 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6716 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6717 if scaling_info
["scaling_direction"] == "IN":
6718 for vdur
in reversed(db_vnfr
["vdur"]):
6719 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6720 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6721 scaling_info
["vdu"].append(
6723 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6724 "vdu_id": vdur
["vdu-id-ref"],
6728 for interface
in vdur
["interfaces"]:
6729 scaling_info
["vdu"][-1]["interface"].append(
6731 "name": interface
["name"],
6732 "ip_address": interface
["ip-address"],
6733 "mac_address": interface
.get("mac-address"),
6736 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6739 step
= "Executing pre-scale vnf-config-primitive"
6740 if scaling_descriptor
.get("scaling-config-action"):
6741 for scaling_config_action
in scaling_descriptor
[
6742 "scaling-config-action"
6745 scaling_config_action
.get("trigger") == "pre-scale-in"
6746 and scaling_type
== "SCALE_IN"
6748 scaling_config_action
.get("trigger") == "pre-scale-out"
6749 and scaling_type
== "SCALE_OUT"
6751 vnf_config_primitive
= scaling_config_action
[
6752 "vnf-config-primitive-name-ref"
6754 step
= db_nslcmop_update
[
6756 ] = "executing pre-scale scaling-config-action '{}'".format(
6757 vnf_config_primitive
6760 # look for primitive
6761 for config_primitive
in (
6762 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6763 ).get("config-primitive", ()):
6764 if config_primitive
["name"] == vnf_config_primitive
:
6768 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6769 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6770 "primitive".format(scaling_group
, vnf_config_primitive
)
6773 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6774 if db_vnfr
.get("additionalParamsForVnf"):
6775 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6777 scale_process
= "VCA"
6778 db_nsr_update
["config-status"] = "configuring pre-scaling"
6779 primitive_params
= self
._map
_primitive
_params
(
6780 config_primitive
, {}, vnfr_params
6783 # Pre-scale retry check: Check if this sub-operation has been executed before
6784 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6787 vnf_config_primitive
,
6791 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6792 # Skip sub-operation
6793 result
= "COMPLETED"
6794 result_detail
= "Done"
6797 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6798 vnf_config_primitive
, result
, result_detail
6802 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6803 # New sub-operation: Get index of this sub-operation
6805 len(db_nslcmop
.get("_admin", {}).get("operations"))
6810 + "vnf_config_primitive={} New sub-operation".format(
6811 vnf_config_primitive
6815 # retry: Get registered params for this existing sub-operation
6816 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6819 vnf_index
= op
.get("member_vnf_index")
6820 vnf_config_primitive
= op
.get("primitive")
6821 primitive_params
= op
.get("primitive_params")
6824 + "vnf_config_primitive={} Sub-operation retry".format(
6825 vnf_config_primitive
6828 # Execute the primitive, either with new (first-time) or registered (reintent) args
6829 ee_descriptor_id
= config_primitive
.get(
6830 "execution-environment-ref"
6832 primitive_name
= config_primitive
.get(
6833 "execution-environment-primitive", vnf_config_primitive
6835 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6836 nsr_deployed
["VCA"],
6837 member_vnf_index
=vnf_index
,
6839 vdu_count_index
=None,
6840 ee_descriptor_id
=ee_descriptor_id
,
6842 result
, result_detail
= await self
._ns
_execute
_primitive
(
6851 + "vnf_config_primitive={} Done with result {} {}".format(
6852 vnf_config_primitive
, result
, result_detail
6855 # Update operationState = COMPLETED | FAILED
6856 self
._update
_suboperation
_status
(
6857 db_nslcmop
, op_index
, result
, result_detail
6860 if result
== "FAILED":
6861 raise LcmException(result_detail
)
6862 db_nsr_update
["config-status"] = old_config_status
6863 scale_process
= None
6867 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6870 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6873 # SCALE-IN VCA - BEGIN
6874 if vca_scaling_info
:
6875 step
= db_nslcmop_update
[
6877 ] = "Deleting the execution environments"
6878 scale_process
= "VCA"
6879 for vca_info
in vca_scaling_info
:
6880 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6881 member_vnf_index
= str(vca_info
["member-vnf-index"])
6883 logging_text
+ "vdu info: {}".format(vca_info
)
6885 if vca_info
.get("osm_vdu_id"):
6886 vdu_id
= vca_info
["osm_vdu_id"]
6887 vdu_index
= int(vca_info
["vdu_index"])
6890 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6891 member_vnf_index
, vdu_id
, vdu_index
6893 stage
[2] = step
= "Scaling in VCA"
6894 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6895 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6896 config_update
= db_nsr
["configurationStatus"]
6897 for vca_index
, vca
in enumerate(vca_update
):
6899 (vca
or vca
.get("ee_id"))
6900 and vca
["member-vnf-index"] == member_vnf_index
6901 and vca
["vdu_count_index"] == vdu_index
6903 if vca
.get("vdu_id"):
6904 config_descriptor
= get_configuration(
6905 db_vnfd
, vca
.get("vdu_id")
6907 elif vca
.get("kdu_name"):
6908 config_descriptor
= get_configuration(
6909 db_vnfd
, vca
.get("kdu_name")
6912 config_descriptor
= get_configuration(
6913 db_vnfd
, db_vnfd
["id"]
6915 operation_params
= (
6916 db_nslcmop
.get("operationParams") or {}
6918 exec_terminate_primitives
= not operation_params
.get(
6919 "skip_terminate_primitives"
6920 ) and vca
.get("needed_terminate")
6921 task
= asyncio
.ensure_future(
6930 exec_primitives
=exec_terminate_primitives
,
6934 timeout
=self
.timeout
.charm_delete
,
6937 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6940 del vca_update
[vca_index
]
6941 del config_update
[vca_index
]
6942 # wait for pending tasks of terminate primitives
6946 + "Waiting for tasks {}".format(
6947 list(tasks_dict_info
.keys())
6950 error_list
= await self
._wait
_for
_tasks
(
6954 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6959 tasks_dict_info
.clear()
6961 raise LcmException("; ".join(error_list
))
6963 db_vca_and_config_update
= {
6964 "_admin.deployed.VCA": vca_update
,
6965 "configurationStatus": config_update
,
6968 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6970 scale_process
= None
6971 # SCALE-IN VCA - END
6974 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6975 scale_process
= "RO"
6976 if self
.ro_config
.ng
:
6977 await self
._scale
_ng
_ro
(
6978 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6980 scaling_info
.pop("vdu-create", None)
6981 scaling_info
.pop("vdu-delete", None)
6983 scale_process
= None
6987 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6988 scale_process
= "KDU"
6989 await self
._scale
_kdu
(
6990 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6992 scaling_info
.pop("kdu-create", None)
6993 scaling_info
.pop("kdu-delete", None)
6995 scale_process
= None
6999 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7001 # SCALE-UP VCA - BEGIN
7002 if vca_scaling_info
:
7003 step
= db_nslcmop_update
[
7005 ] = "Creating new execution environments"
7006 scale_process
= "VCA"
7007 for vca_info
in vca_scaling_info
:
7008 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7009 member_vnf_index
= str(vca_info
["member-vnf-index"])
7011 logging_text
+ "vdu info: {}".format(vca_info
)
7013 vnfd_id
= db_vnfr
["vnfd-ref"]
7014 if vca_info
.get("osm_vdu_id"):
7015 vdu_index
= int(vca_info
["vdu_index"])
7016 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7017 if db_vnfr
.get("additionalParamsForVnf"):
7018 deploy_params
.update(
7020 db_vnfr
["additionalParamsForVnf"].copy()
7023 descriptor_config
= get_configuration(
7024 db_vnfd
, db_vnfd
["id"]
7026 if descriptor_config
:
7032 logging_text
=logging_text
7033 + "member_vnf_index={} ".format(member_vnf_index
),
7036 nslcmop_id
=nslcmop_id
,
7042 kdu_index
=kdu_index
,
7043 member_vnf_index
=member_vnf_index
,
7044 vdu_index
=vdu_index
,
7046 deploy_params
=deploy_params
,
7047 descriptor_config
=descriptor_config
,
7048 base_folder
=base_folder
,
7049 task_instantiation_info
=tasks_dict_info
,
7052 vdu_id
= vca_info
["osm_vdu_id"]
7053 vdur
= find_in_list(
7054 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7056 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7057 if vdur
.get("additionalParams"):
7058 deploy_params_vdu
= parse_yaml_strings(
7059 vdur
["additionalParams"]
7062 deploy_params_vdu
= deploy_params
7063 deploy_params_vdu
["OSM"] = get_osm_params(
7064 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7066 if descriptor_config
:
7072 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7073 member_vnf_index
, vdu_id
, vdu_index
7075 stage
[2] = step
= "Scaling out VCA"
7076 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7078 logging_text
=logging_text
7079 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7080 member_vnf_index
, vdu_id
, vdu_index
7084 nslcmop_id
=nslcmop_id
,
7090 member_vnf_index
=member_vnf_index
,
7091 vdu_index
=vdu_index
,
7092 kdu_index
=kdu_index
,
7094 deploy_params
=deploy_params_vdu
,
7095 descriptor_config
=descriptor_config
,
7096 base_folder
=base_folder
,
7097 task_instantiation_info
=tasks_dict_info
,
7100 # SCALE-UP VCA - END
7101 scale_process
= None
7104 # execute primitive service POST-SCALING
7105 step
= "Executing post-scale vnf-config-primitive"
7106 if scaling_descriptor
.get("scaling-config-action"):
7107 for scaling_config_action
in scaling_descriptor
[
7108 "scaling-config-action"
7111 scaling_config_action
.get("trigger") == "post-scale-in"
7112 and scaling_type
== "SCALE_IN"
7114 scaling_config_action
.get("trigger") == "post-scale-out"
7115 and scaling_type
== "SCALE_OUT"
7117 vnf_config_primitive
= scaling_config_action
[
7118 "vnf-config-primitive-name-ref"
7120 step
= db_nslcmop_update
[
7122 ] = "executing post-scale scaling-config-action '{}'".format(
7123 vnf_config_primitive
7126 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7127 if db_vnfr
.get("additionalParamsForVnf"):
7128 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7130 # look for primitive
7131 for config_primitive
in (
7132 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7133 ).get("config-primitive", ()):
7134 if config_primitive
["name"] == vnf_config_primitive
:
7138 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7139 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7140 "config-primitive".format(
7141 scaling_group
, vnf_config_primitive
7144 scale_process
= "VCA"
7145 db_nsr_update
["config-status"] = "configuring post-scaling"
7146 primitive_params
= self
._map
_primitive
_params
(
7147 config_primitive
, {}, vnfr_params
7150 # Post-scale retry check: Check if this sub-operation has been executed before
7151 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7154 vnf_config_primitive
,
7158 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7159 # Skip sub-operation
7160 result
= "COMPLETED"
7161 result_detail
= "Done"
7164 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7165 vnf_config_primitive
, result
, result_detail
7169 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7170 # New sub-operation: Get index of this sub-operation
7172 len(db_nslcmop
.get("_admin", {}).get("operations"))
7177 + "vnf_config_primitive={} New sub-operation".format(
7178 vnf_config_primitive
7182 # retry: Get registered params for this existing sub-operation
7183 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7186 vnf_index
= op
.get("member_vnf_index")
7187 vnf_config_primitive
= op
.get("primitive")
7188 primitive_params
= op
.get("primitive_params")
7191 + "vnf_config_primitive={} Sub-operation retry".format(
7192 vnf_config_primitive
7195 # Execute the primitive, either with new (first-time) or registered (reintent) args
7196 ee_descriptor_id
= config_primitive
.get(
7197 "execution-environment-ref"
7199 primitive_name
= config_primitive
.get(
7200 "execution-environment-primitive", vnf_config_primitive
7202 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7203 nsr_deployed
["VCA"],
7204 member_vnf_index
=vnf_index
,
7206 vdu_count_index
=None,
7207 ee_descriptor_id
=ee_descriptor_id
,
7209 result
, result_detail
= await self
._ns
_execute
_primitive
(
7218 + "vnf_config_primitive={} Done with result {} {}".format(
7219 vnf_config_primitive
, result
, result_detail
7222 # Update operationState = COMPLETED | FAILED
7223 self
._update
_suboperation
_status
(
7224 db_nslcmop
, op_index
, result
, result_detail
7227 if result
== "FAILED":
7228 raise LcmException(result_detail
)
7229 db_nsr_update
["config-status"] = old_config_status
7230 scale_process
= None
7235 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7236 db_nsr_update
["operational-status"] = (
7238 if old_operational_status
== "failed"
7239 else old_operational_status
7241 db_nsr_update
["config-status"] = old_config_status
7244 ROclient
.ROClientException
,
7249 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7251 except asyncio
.CancelledError
:
7253 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7255 exc
= "Operation was cancelled"
7256 except Exception as e
:
7257 exc
= traceback
.format_exc()
7258 self
.logger
.critical(
7259 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7263 self
._write
_ns
_status
(
7266 current_operation
="IDLE",
7267 current_operation_id
=None,
7270 stage
[1] = "Waiting for instantiate pending tasks."
7271 self
.logger
.debug(logging_text
+ stage
[1])
7272 exc
= await self
._wait
_for
_tasks
(
7275 self
.timeout
.ns_deploy
,
7283 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7284 nslcmop_operation_state
= "FAILED"
7286 db_nsr_update
["operational-status"] = old_operational_status
7287 db_nsr_update
["config-status"] = old_config_status
7288 db_nsr_update
["detailed-status"] = ""
7290 if "VCA" in scale_process
:
7291 db_nsr_update
["config-status"] = "failed"
7292 if "RO" in scale_process
:
7293 db_nsr_update
["operational-status"] = "failed"
7296 ] = "FAILED scaling nslcmop={} {}: {}".format(
7297 nslcmop_id
, step
, exc
7300 error_description_nslcmop
= None
7301 nslcmop_operation_state
= "COMPLETED"
7302 db_nslcmop_update
["detailed-status"] = "Done"
7304 self
._write
_op
_status
(
7307 error_message
=error_description_nslcmop
,
7308 operation_state
=nslcmop_operation_state
,
7309 other_update
=db_nslcmop_update
,
7312 self
._write
_ns
_status
(
7315 current_operation
="IDLE",
7316 current_operation_id
=None,
7317 other_update
=db_nsr_update
,
7320 if nslcmop_operation_state
:
7324 "nslcmop_id": nslcmop_id
,
7325 "operationState": nslcmop_operation_state
,
7327 await self
.msg
.aiowrite("ns", "scaled", msg
)
7328 except Exception as e
:
7330 logging_text
+ "kafka_write notification Exception {}".format(e
)
7332 self
.logger
.debug(logging_text
+ "Exit")
7333 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7335 async def _scale_kdu(
7336 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7338 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7339 for kdu_name
in _scaling_info
:
7340 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7341 deployed_kdu
, index
= get_deployed_kdu(
7342 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7344 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7345 kdu_instance
= deployed_kdu
["kdu-instance"]
7346 kdu_model
= deployed_kdu
.get("kdu-model")
7347 scale
= int(kdu_scaling_info
["scale"])
7348 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7351 "collection": "nsrs",
7352 "filter": {"_id": nsr_id
},
7353 "path": "_admin.deployed.K8s.{}".format(index
),
7356 step
= "scaling application {}".format(
7357 kdu_scaling_info
["resource-name"]
7359 self
.logger
.debug(logging_text
+ step
)
7361 if kdu_scaling_info
["type"] == "delete":
7362 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7365 and kdu_config
.get("terminate-config-primitive")
7366 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7368 terminate_config_primitive_list
= kdu_config
.get(
7369 "terminate-config-primitive"
7371 terminate_config_primitive_list
.sort(
7372 key
=lambda val
: int(val
["seq"])
7376 terminate_config_primitive
7377 ) in terminate_config_primitive_list
:
7378 primitive_params_
= self
._map
_primitive
_params
(
7379 terminate_config_primitive
, {}, {}
7381 step
= "execute terminate config primitive"
7382 self
.logger
.debug(logging_text
+ step
)
7383 await asyncio
.wait_for(
7384 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7385 cluster_uuid
=cluster_uuid
,
7386 kdu_instance
=kdu_instance
,
7387 primitive_name
=terminate_config_primitive
["name"],
7388 params
=primitive_params_
,
7390 total_timeout
=self
.timeout
.primitive
,
7393 timeout
=self
.timeout
.primitive
7394 * self
.timeout
.primitive_outer_factor
,
7397 await asyncio
.wait_for(
7398 self
.k8scluster_map
[k8s_cluster_type
].scale(
7399 kdu_instance
=kdu_instance
,
7401 resource_name
=kdu_scaling_info
["resource-name"],
7402 total_timeout
=self
.timeout
.scale_on_error
,
7404 cluster_uuid
=cluster_uuid
,
7405 kdu_model
=kdu_model
,
7409 timeout
=self
.timeout
.scale_on_error
7410 * self
.timeout
.scale_on_error_outer_factor
,
7413 if kdu_scaling_info
["type"] == "create":
7414 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7417 and kdu_config
.get("initial-config-primitive")
7418 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7420 initial_config_primitive_list
= kdu_config
.get(
7421 "initial-config-primitive"
7423 initial_config_primitive_list
.sort(
7424 key
=lambda val
: int(val
["seq"])
7427 for initial_config_primitive
in initial_config_primitive_list
:
7428 primitive_params_
= self
._map
_primitive
_params
(
7429 initial_config_primitive
, {}, {}
7431 step
= "execute initial config primitive"
7432 self
.logger
.debug(logging_text
+ step
)
7433 await asyncio
.wait_for(
7434 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7435 cluster_uuid
=cluster_uuid
,
7436 kdu_instance
=kdu_instance
,
7437 primitive_name
=initial_config_primitive
["name"],
7438 params
=primitive_params_
,
7445 async def _scale_ng_ro(
7446 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7448 nsr_id
= db_nslcmop
["nsInstanceId"]
7449 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7452 # read from db: vnfd's for every vnf
7455 # for each vnf in ns, read vnfd
7456 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7457 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7458 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7459 # if we haven't this vnfd, read it from db
7460 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7462 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7463 db_vnfds
.append(vnfd
)
7464 n2vc_key
= self
.n2vc
.get_public_key()
7465 n2vc_key_list
= [n2vc_key
]
7468 vdu_scaling_info
.get("vdu-create"),
7469 vdu_scaling_info
.get("vdu-delete"),
7472 # db_vnfr has been updated, update db_vnfrs to use it
7473 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7474 await self
._instantiate
_ng
_ro
(
7484 start_deploy
=time(),
7485 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7487 if vdu_scaling_info
.get("vdu-delete"):
7489 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7492 async def extract_prometheus_scrape_jobs(
7496 ee_config_descriptor
: dict,
7501 vnf_member_index
: str = "",
7503 vdu_index
: int = None,
7505 kdu_index
: int = None,
7507 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7508 This method will wait until the corresponding VDU or KDU is fully instantiated
7511 ee_id (str): Execution Environment ID
7512 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7513 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7514 vnfr_id (str): VNFR ID where this EE applies
7515 nsr_id (str): NSR ID where this EE applies
7516 target_ip (str): VDU/KDU instance IP address
7517 element_type (str): NS or VNF or VDU or KDU
7518 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7519 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7520 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7521 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7522 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7525 LcmException: When the VDU or KDU instance was not found in an hour
7528 _type_: Prometheus jobs
7530 # default the vdur and kdur names to an empty string, to avoid any later
7531 # problem with Prometheus when the element type is not VDU or KDU
7535 # look if exist a file called 'prometheus*.j2' and
7536 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7540 for f
in artifact_content
7541 if f
.startswith("prometheus") and f
.endswith(".j2")
7547 self
.logger
.debug("Artifact path{}".format(artifact_path
))
7548 self
.logger
.debug("job file{}".format(job_file
))
7549 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7552 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7553 if element_type
in ("VDU", "KDU"):
7554 for _
in range(360):
7555 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7556 if vdu_id
and vdu_index
is not None:
7560 for x
in get_iterable(db_vnfr
, "vdur")
7562 x
.get("vdu-id-ref") == vdu_id
7563 and x
.get("count-index") == vdu_index
7568 if vdur
.get("name"):
7569 vdur_name
= vdur
.get("name")
7571 if kdu_name
and kdu_index
is not None:
7575 for x
in get_iterable(db_vnfr
, "kdur")
7577 x
.get("kdu-name") == kdu_name
7578 and x
.get("count-index") == kdu_index
7583 if kdur
.get("name"):
7584 kdur_name
= kdur
.get("name")
7587 await asyncio
.sleep(10)
7589 if vdu_id
and vdu_index
is not None:
7591 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7593 if kdu_name
and kdu_index
is not None:
7595 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7599 if ee_id
is not None:
7600 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7601 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7603 vnfr_id
= vnfr_id
.replace("-", "")
7605 "JOB_NAME": vnfr_id
,
7606 "TARGET_IP": target_ip
,
7607 "EXPORTER_POD_IP": host_name
,
7608 "EXPORTER_POD_PORT": host_port
,
7610 "VNF_MEMBER_INDEX": vnf_member_index
,
7611 "VDUR_NAME": vdur_name
,
7612 "KDUR_NAME": kdur_name
,
7613 "ELEMENT_TYPE": element_type
,
7616 metric_path
= ee_config_descriptor
["metric-path"]
7617 target_port
= ee_config_descriptor
["metric-port"]
7618 vnfr_id
= vnfr_id
.replace("-", "")
7620 "JOB_NAME": vnfr_id
,
7621 "TARGET_IP": target_ip
,
7622 "TARGET_PORT": target_port
,
7623 "METRIC_PATH": metric_path
,
7626 job_list
= parse_job(job_data
, variables
)
7627 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7628 for job
in job_list
:
7630 not isinstance(job
.get("job_name"), str)
7631 or vnfr_id
not in job
["job_name"]
7633 job
["job_name"] = vnfr_id
+ "_" + str(SystemRandom().randint(1, 10000))
7634 job
["nsr_id"] = nsr_id
7635 job
["vnfr_id"] = vnfr_id
7638 async def rebuild_start_stop(
7639 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7641 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7642 self
.logger
.info(logging_text
+ "Enter")
7643 stage
= ["Preparing the environment", ""]
7644 # database nsrs record
7648 # in case of error, indicates what part of scale was failed to put nsr at error status
7649 start_deploy
= time()
7651 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7652 vim_account_id
= db_vnfr
.get("vim-account-id")
7653 vim_info_key
= "vim:" + vim_account_id
7654 vdu_id
= additional_param
["vdu_id"]
7655 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7656 vdur
= find_in_list(
7657 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7660 vdu_vim_name
= vdur
["name"]
7661 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7662 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7664 raise LcmException("Target vdu is not found")
7665 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7666 # wait for any previous tasks in process
7667 stage
[1] = "Waiting for previous operations to terminate"
7668 self
.logger
.info(stage
[1])
7669 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7671 stage
[1] = "Reading from database."
7672 self
.logger
.info(stage
[1])
7673 self
._write
_ns
_status
(
7676 current_operation
=operation_type
.upper(),
7677 current_operation_id
=nslcmop_id
,
7679 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7682 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7683 db_nsr_update
["operational-status"] = operation_type
7684 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7688 "vim_vm_id": vim_vm_id
,
7690 "vdu_index": additional_param
["count-index"],
7691 "vdu_id": vdur
["id"],
7692 "target_vim": target_vim
,
7693 "vim_account_id": vim_account_id
,
7696 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7697 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7698 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7699 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7700 self
.logger
.info("response from RO: {}".format(result_dict
))
7701 action_id
= result_dict
["action_id"]
7702 await self
._wait
_ng
_ro
(
7707 self
.timeout
.operate
,
7709 "start_stop_rebuild",
7711 return "COMPLETED", "Done"
7712 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7713 self
.logger
.error("Exit Exception {}".format(e
))
7715 except asyncio
.CancelledError
:
7716 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7717 exc
= "Operation was cancelled"
7718 except Exception as e
:
7719 exc
= traceback
.format_exc()
7720 self
.logger
.critical(
7721 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7723 return "FAILED", "Error in operate VNF {}".format(exc
)
7725 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7727 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7729 :param: vim_account_id: VIM Account ID
7731 :return: (cloud_name, cloud_credential)
7733 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7734 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7736 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7738 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7740 :param: vim_account_id: VIM Account ID
7742 :return: (cloud_name, cloud_credential)
7744 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7745 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7747 async def migrate(self
, nsr_id
, nslcmop_id
):
7749 Migrate VNFs and VDUs instances in a NS
7751 :param: nsr_id: NS Instance ID
7752 :param: nslcmop_id: nslcmop ID of migrate
7755 # Try to lock HA task here
7756 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7757 if not task_is_locked_by_me
:
7759 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7760 self
.logger
.debug(logging_text
+ "Enter")
7761 # get all needed from database
7763 db_nslcmop_update
= {}
7764 nslcmop_operation_state
= None
7768 # in case of error, indicates what part of scale was failed to put nsr at error status
7769 start_deploy
= time()
7772 # wait for any previous tasks in process
7773 step
= "Waiting for previous operations to terminate"
7774 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7776 self
._write
_ns
_status
(
7779 current_operation
="MIGRATING",
7780 current_operation_id
=nslcmop_id
,
7782 step
= "Getting nslcmop from database"
7784 step
+ " after having waited for previous tasks to be completed"
7786 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7787 migrate_params
= db_nslcmop
.get("operationParams")
7790 target
.update(migrate_params
)
7791 desc
= await self
.RO
.migrate(nsr_id
, target
)
7792 self
.logger
.debug("RO return > {}".format(desc
))
7793 action_id
= desc
["action_id"]
7794 await self
._wait
_ng
_ro
(
7799 self
.timeout
.migrate
,
7800 operation
="migrate",
7802 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7803 self
.logger
.error("Exit Exception {}".format(e
))
7805 except asyncio
.CancelledError
:
7806 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7807 exc
= "Operation was cancelled"
7808 except Exception as e
:
7809 exc
= traceback
.format_exc()
7810 self
.logger
.critical(
7811 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7814 self
._write
_ns
_status
(
7817 current_operation
="IDLE",
7818 current_operation_id
=None,
7821 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7822 nslcmop_operation_state
= "FAILED"
7824 nslcmop_operation_state
= "COMPLETED"
7825 db_nslcmop_update
["detailed-status"] = "Done"
7826 db_nsr_update
["detailed-status"] = "Done"
7828 self
._write
_op
_status
(
7832 operation_state
=nslcmop_operation_state
,
7833 other_update
=db_nslcmop_update
,
7835 if nslcmop_operation_state
:
7839 "nslcmop_id": nslcmop_id
,
7840 "operationState": nslcmop_operation_state
,
7842 await self
.msg
.aiowrite("ns", "migrated", msg
)
7843 except Exception as e
:
7845 logging_text
+ "kafka_write notification Exception {}".format(e
)
7847 self
.logger
.debug(logging_text
+ "Exit")
7848 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7850 async def heal(self
, nsr_id
, nslcmop_id
):
7854 :param nsr_id: ns instance to heal
7855 :param nslcmop_id: operation to run
7859 # Try to lock HA task here
7860 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7861 if not task_is_locked_by_me
:
7864 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7865 stage
= ["", "", ""]
7866 tasks_dict_info
= {}
7867 # ^ stage, step, VIM progress
7868 self
.logger
.debug(logging_text
+ "Enter")
7869 # get all needed from database
7871 db_nslcmop_update
= {}
7873 db_vnfrs
= {} # vnf's info indexed by _id
7875 old_operational_status
= ""
7876 old_config_status
= ""
7879 # wait for any previous tasks in process
7880 step
= "Waiting for previous operations to terminate"
7881 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7882 self
._write
_ns
_status
(
7885 current_operation
="HEALING",
7886 current_operation_id
=nslcmop_id
,
7889 step
= "Getting nslcmop from database"
7891 step
+ " after having waited for previous tasks to be completed"
7893 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7895 step
= "Getting nsr from database"
7896 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7897 old_operational_status
= db_nsr
["operational-status"]
7898 old_config_status
= db_nsr
["config-status"]
7901 "_admin.deployed.RO.operational-status": "healing",
7903 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7905 step
= "Sending heal order to VIM"
7907 logging_text
=logging_text
,
7909 db_nslcmop
=db_nslcmop
,
7914 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7915 self
.logger
.debug(logging_text
+ stage
[1])
7916 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7917 self
.fs
.sync(db_nsr
["nsd-id"])
7919 # read from db: vnfr's of this ns
7920 step
= "Getting vnfrs from db"
7921 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7922 for vnfr
in db_vnfrs_list
:
7923 db_vnfrs
[vnfr
["_id"]] = vnfr
7924 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7926 # Check for each target VNF
7927 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7928 for target_vnf
in target_list
:
7929 # Find this VNF in the list from DB
7930 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7932 db_vnfr
= db_vnfrs
[vnfr_id
]
7933 vnfd_id
= db_vnfr
.get("vnfd-id")
7934 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7935 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7936 base_folder
= vnfd
["_admin"]["storage"]
7941 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7942 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7944 # Check each target VDU and deploy N2VC
7945 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7948 if not target_vdu_list
:
7949 # Codigo nuevo para crear diccionario
7950 target_vdu_list
= []
7951 for existing_vdu
in db_vnfr
.get("vdur"):
7952 vdu_name
= existing_vdu
.get("vdu-name", None)
7953 vdu_index
= existing_vdu
.get("count-index", 0)
7954 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7957 vdu_to_be_healed
= {
7959 "count-index": vdu_index
,
7960 "run-day1": vdu_run_day1
,
7962 target_vdu_list
.append(vdu_to_be_healed
)
7963 for target_vdu
in target_vdu_list
:
7964 deploy_params_vdu
= target_vdu
7965 # Set run-day1 vnf level value if not vdu level value exists
7966 if not deploy_params_vdu
.get("run-day1") and target_vnf
.get(
7967 "additionalParams", {}
7969 deploy_params_vdu
["run-day1"] = target_vnf
[
7972 vdu_name
= target_vdu
.get("vdu-id", None)
7973 # TODO: Get vdu_id from vdud.
7975 # For multi instance VDU count-index is mandatory
7976 # For single session VDU count-indes is 0
7977 vdu_index
= target_vdu
.get("count-index", 0)
7979 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7980 stage
[1] = "Deploying Execution Environments."
7981 self
.logger
.debug(logging_text
+ stage
[1])
7983 # VNF Level charm. Normal case when proxy charms.
7984 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7985 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7986 if descriptor_config
:
7987 # Continue if healed machine is management machine
7988 vnf_ip_address
= db_vnfr
.get("ip-address")
7989 target_instance
= None
7990 for instance
in db_vnfr
.get("vdur", None):
7992 instance
["vdu-name"] == vdu_name
7993 and instance
["count-index"] == vdu_index
7995 target_instance
= instance
7997 if vnf_ip_address
== target_instance
.get("ip-address"):
7999 logging_text
=logging_text
8000 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8001 member_vnf_index
, vdu_name
, vdu_index
8005 nslcmop_id
=nslcmop_id
,
8011 member_vnf_index
=member_vnf_index
,
8014 deploy_params
=deploy_params_vdu
,
8015 descriptor_config
=descriptor_config
,
8016 base_folder
=base_folder
,
8017 task_instantiation_info
=tasks_dict_info
,
8021 # VDU Level charm. Normal case with native charms.
8022 descriptor_config
= get_configuration(vnfd
, vdu_name
)
8023 if descriptor_config
:
8025 logging_text
=logging_text
8026 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
8027 member_vnf_index
, vdu_name
, vdu_index
8031 nslcmop_id
=nslcmop_id
,
8037 member_vnf_index
=member_vnf_index
,
8038 vdu_index
=vdu_index
,
8040 deploy_params
=deploy_params_vdu
,
8041 descriptor_config
=descriptor_config
,
8042 base_folder
=base_folder
,
8043 task_instantiation_info
=tasks_dict_info
,
8048 ROclient
.ROClientException
,
8053 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
8055 except asyncio
.CancelledError
:
8057 logging_text
+ "Cancelled Exception while '{}'".format(step
)
8059 exc
= "Operation was cancelled"
8060 except Exception as e
:
8061 exc
= traceback
.format_exc()
8062 self
.logger
.critical(
8063 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
8068 stage
[1] = "Waiting for healing pending tasks."
8069 self
.logger
.debug(logging_text
+ stage
[1])
8070 exc
= await self
._wait
_for
_tasks
(
8073 self
.timeout
.ns_deploy
,
8081 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
8082 nslcmop_operation_state
= "FAILED"
8084 db_nsr_update
["operational-status"] = old_operational_status
8085 db_nsr_update
["config-status"] = old_config_status
8088 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
8089 for task
, task_name
in tasks_dict_info
.items():
8090 if not task
.done() or task
.cancelled() or task
.exception():
8091 if task_name
.startswith(self
.task_name_deploy_vca
):
8092 # A N2VC task is pending
8093 db_nsr_update
["config-status"] = "failed"
8095 # RO task is pending
8096 db_nsr_update
["operational-status"] = "failed"
8098 error_description_nslcmop
= None
8099 nslcmop_operation_state
= "COMPLETED"
8100 db_nslcmop_update
["detailed-status"] = "Done"
8101 db_nsr_update
["detailed-status"] = "Done"
8102 db_nsr_update
["operational-status"] = "running"
8103 db_nsr_update
["config-status"] = "configured"
8105 self
._write
_op
_status
(
8108 error_message
=error_description_nslcmop
,
8109 operation_state
=nslcmop_operation_state
,
8110 other_update
=db_nslcmop_update
,
8113 self
._write
_ns
_status
(
8116 current_operation
="IDLE",
8117 current_operation_id
=None,
8118 other_update
=db_nsr_update
,
8121 if nslcmop_operation_state
:
8125 "nslcmop_id": nslcmop_id
,
8126 "operationState": nslcmop_operation_state
,
8128 await self
.msg
.aiowrite("ns", "healed", msg
)
8129 except Exception as e
:
8131 logging_text
+ "kafka_write notification Exception {}".format(e
)
8133 self
.logger
.debug(logging_text
+ "Exit")
8134 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8145 :param logging_text: preffix text to use at logging
8146 :param nsr_id: nsr identity
8147 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8148 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8149 :return: None or exception
8152 def get_vim_account(vim_account_id
):
8154 if vim_account_id
in db_vims
:
8155 return db_vims
[vim_account_id
]
8156 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8157 db_vims
[vim_account_id
] = db_vim
8162 ns_params
= db_nslcmop
.get("operationParams")
8163 if ns_params
and ns_params
.get("timeout_ns_heal"):
8164 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8166 timeout_ns_heal
= self
.timeout
.ns_heal
8170 nslcmop_id
= db_nslcmop
["_id"]
8172 "action_id": nslcmop_id
,
8174 self
.logger
.warning(
8175 "db_nslcmop={} and timeout_ns_heal={}".format(
8176 db_nslcmop
, timeout_ns_heal
8179 target
.update(db_nslcmop
.get("operationParams", {}))
8181 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8182 desc
= await self
.RO
.recreate(nsr_id
, target
)
8183 self
.logger
.debug("RO return > {}".format(desc
))
8184 action_id
= desc
["action_id"]
8185 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8186 await self
._wait
_ng
_ro
(
8193 operation
="healing",
8198 "_admin.deployed.RO.operational-status": "running",
8199 "detailed-status": " ".join(stage
),
8201 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8202 self
._write
_op
_status
(nslcmop_id
, stage
)
8204 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8207 except Exception as e
:
8208 stage
[2] = "ERROR healing at VIM"
8209 # self.set_vnfr_at_error(db_vnfrs, str(e))
8211 "Error healing at VIM {}".format(e
),
8212 exc_info
=not isinstance(
8215 ROclient
.ROClientException
,
8241 task_instantiation_info
,
8244 # launch instantiate_N2VC in a asyncio task and register task object
8245 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8246 # if not found, create one entry and update database
8247 # fill db_nsr._admin.deployed.VCA.<index>
8250 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8254 get_charm_name
= False
8255 if "execution-environment-list" in descriptor_config
:
8256 ee_list
= descriptor_config
.get("execution-environment-list", [])
8257 elif "juju" in descriptor_config
:
8258 ee_list
= [descriptor_config
] # ns charms
8259 if "execution-environment-list" not in descriptor_config
:
8260 # charm name is only required for ns charms
8261 get_charm_name
= True
8262 else: # other types as script are not supported
8265 for ee_item
in ee_list
:
8268 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8269 ee_item
.get("juju"), ee_item
.get("helm-chart")
8272 ee_descriptor_id
= ee_item
.get("id")
8273 if ee_item
.get("juju"):
8274 vca_name
= ee_item
["juju"].get("charm")
8276 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8279 if ee_item
["juju"].get("charm") is not None
8282 if ee_item
["juju"].get("cloud") == "k8s":
8283 vca_type
= "k8s_proxy_charm"
8284 elif ee_item
["juju"].get("proxy") is False:
8285 vca_type
= "native_charm"
8286 elif ee_item
.get("helm-chart"):
8287 vca_name
= ee_item
["helm-chart"]
8288 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8291 vca_type
= "helm-v3"
8294 logging_text
+ "skipping non juju neither charm configuration"
8299 for vca_index
, vca_deployed
in enumerate(
8300 db_nsr
["_admin"]["deployed"]["VCA"]
8302 if not vca_deployed
:
8305 vca_deployed
.get("member-vnf-index") == member_vnf_index
8306 and vca_deployed
.get("vdu_id") == vdu_id
8307 and vca_deployed
.get("kdu_name") == kdu_name
8308 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8309 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8313 # not found, create one.
8315 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8318 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8320 target
+= "/kdu/{}".format(kdu_name
)
8322 "target_element": target
,
8323 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8324 "member-vnf-index": member_vnf_index
,
8326 "kdu_name": kdu_name
,
8327 "vdu_count_index": vdu_index
,
8328 "operational-status": "init", # TODO revise
8329 "detailed-status": "", # TODO revise
8330 "step": "initial-deploy", # TODO revise
8332 "vdu_name": vdu_name
,
8334 "ee_descriptor_id": ee_descriptor_id
,
8335 "charm_name": charm_name
,
8339 # create VCA and configurationStatus in db
8341 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8342 "configurationStatus.{}".format(vca_index
): dict(),
8344 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8346 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8348 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8349 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8350 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8353 task_n2vc
= asyncio
.ensure_future(
8355 logging_text
=logging_text
,
8356 vca_index
=vca_index
,
8362 vdu_index
=vdu_index
,
8363 deploy_params
=deploy_params
,
8364 config_descriptor
=descriptor_config
,
8365 base_folder
=base_folder
,
8366 nslcmop_id
=nslcmop_id
,
8370 ee_config_descriptor
=ee_item
,
8373 self
.lcm_tasks
.register(
8377 "instantiate_N2VC-{}".format(vca_index
),
8380 task_instantiation_info
[
8382 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8383 member_vnf_index
or "", vdu_id
or ""
8386 async def heal_N2VC(
8403 ee_config_descriptor
,
8405 nsr_id
= db_nsr
["_id"]
8406 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8407 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8408 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8409 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8411 "collection": "nsrs",
8412 "filter": {"_id": nsr_id
},
8413 "path": db_update_entry
,
8418 element_under_configuration
= nsr_id
8422 vnfr_id
= db_vnfr
["_id"]
8423 osm_config
["osm"]["vnf_id"] = vnfr_id
8425 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8427 if vca_type
== "native_charm":
8430 index_number
= vdu_index
or 0
8433 element_type
= "VNF"
8434 element_under_configuration
= vnfr_id
8435 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8437 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8438 element_type
= "VDU"
8439 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8440 osm_config
["osm"]["vdu_id"] = vdu_id
8442 namespace
+= ".{}".format(kdu_name
)
8443 element_type
= "KDU"
8444 element_under_configuration
= kdu_name
8445 osm_config
["osm"]["kdu_name"] = kdu_name
8448 if base_folder
["pkg-dir"]:
8449 artifact_path
= "{}/{}/{}/{}".format(
8450 base_folder
["folder"],
8451 base_folder
["pkg-dir"],
8454 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8459 artifact_path
= "{}/Scripts/{}/{}/".format(
8460 base_folder
["folder"],
8463 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8468 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8470 # get initial_config_primitive_list that applies to this element
8471 initial_config_primitive_list
= config_descriptor
.get(
8472 "initial-config-primitive"
8476 "Initial config primitive list > {}".format(
8477 initial_config_primitive_list
8481 # add config if not present for NS charm
8482 ee_descriptor_id
= ee_config_descriptor
.get("id")
8483 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8484 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8485 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8489 "Initial config primitive list #2 > {}".format(
8490 initial_config_primitive_list
8493 # n2vc_redesign STEP 3.1
8494 # find old ee_id if exists
8495 ee_id
= vca_deployed
.get("ee_id")
8497 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8498 # create or register execution environment in VCA. Only for native charms when healing
8499 if vca_type
== "native_charm":
8500 step
= "Waiting to VM being up and getting IP address"
8501 self
.logger
.debug(logging_text
+ step
)
8502 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8511 credentials
= {"hostname": rw_mgmt_ip
}
8513 username
= deep_get(
8514 config_descriptor
, ("config-access", "ssh-access", "default-user")
8516 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8517 # merged. Meanwhile let's get username from initial-config-primitive
8518 if not username
and initial_config_primitive_list
:
8519 for config_primitive
in initial_config_primitive_list
:
8520 for param
in config_primitive
.get("parameter", ()):
8521 if param
["name"] == "ssh-username":
8522 username
= param
["value"]
8526 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8527 "'config-access.ssh-access.default-user'"
8529 credentials
["username"] = username
8531 # n2vc_redesign STEP 3.2
8532 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8533 self
._write
_configuration
_status
(
8535 vca_index
=vca_index
,
8536 status
="REGISTERING",
8537 element_under_configuration
=element_under_configuration
,
8538 element_type
=element_type
,
8541 step
= "register execution environment {}".format(credentials
)
8542 self
.logger
.debug(logging_text
+ step
)
8543 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8544 credentials
=credentials
,
8545 namespace
=namespace
,
8550 # update ee_id en db
8552 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8554 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8556 # for compatibility with MON/POL modules, the need model and application name at database
8557 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8558 # Not sure if this need to be done when healing
8560 ee_id_parts = ee_id.split(".")
8561 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8562 if len(ee_id_parts) >= 2:
8563 model_name = ee_id_parts[0]
8564 application_name = ee_id_parts[1]
8565 db_nsr_update[db_update_entry + "model"] = model_name
8566 db_nsr_update[db_update_entry + "application"] = application_name
8569 # n2vc_redesign STEP 3.3
8570 # Install configuration software. Only for native charms.
8571 step
= "Install configuration Software"
8573 self
._write
_configuration
_status
(
8575 vca_index
=vca_index
,
8576 status
="INSTALLING SW",
8577 element_under_configuration
=element_under_configuration
,
8578 element_type
=element_type
,
8579 # other_update=db_nsr_update,
8583 # TODO check if already done
8584 self
.logger
.debug(logging_text
+ step
)
8586 if vca_type
== "native_charm":
8587 config_primitive
= next(
8588 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8591 if config_primitive
:
8592 config
= self
._map
_primitive
_params
(
8593 config_primitive
, {}, deploy_params
8595 await self
.vca_map
[vca_type
].install_configuration_sw(
8597 artifact_path
=artifact_path
,
8605 # write in db flag of configuration_sw already installed
8607 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8610 # Not sure if this need to be done when healing
8612 # add relations for this VCA (wait for other peers related with this VCA)
8613 await self._add_vca_relations(
8614 logging_text=logging_text,
8617 vca_index=vca_index,
8621 # if SSH access is required, then get execution environment SSH public
8622 # if native charm we have waited already to VM be UP
8623 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8626 # self.logger.debug("get ssh key block")
8628 config_descriptor
, ("config-access", "ssh-access", "required")
8630 # self.logger.debug("ssh key needed")
8631 # Needed to inject a ssh key
8634 ("config-access", "ssh-access", "default-user"),
8636 step
= "Install configuration Software, getting public ssh key"
8637 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8638 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8641 step
= "Insert public key into VM user={} ssh_key={}".format(
8645 # self.logger.debug("no need to get ssh key")
8646 step
= "Waiting to VM being up and getting IP address"
8647 self
.logger
.debug(logging_text
+ step
)
8649 # n2vc_redesign STEP 5.1
8650 # wait for RO (ip-address) Insert pub_key into VM
8651 # IMPORTANT: We need do wait for RO to complete healing operation.
8652 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8655 rw_mgmt_ip
= await self
.wait_kdu_up(
8656 logging_text
, nsr_id
, vnfr_id
, kdu_name
8659 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8669 rw_mgmt_ip
= None # This is for a NS configuration
8671 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8673 # store rw_mgmt_ip in deploy params for later replacement
8674 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8677 # get run-day1 operation parameter
8678 runDay1
= deploy_params
.get("run-day1", False)
8680 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8683 # n2vc_redesign STEP 6 Execute initial config primitive
8684 step
= "execute initial config primitive"
8686 # wait for dependent primitives execution (NS -> VNF -> VDU)
8687 if initial_config_primitive_list
:
8688 await self
._wait
_dependent
_n
2vc
(
8689 nsr_id
, vca_deployed_list
, vca_index
8692 # stage, in function of element type: vdu, kdu, vnf or ns
8693 my_vca
= vca_deployed_list
[vca_index
]
8694 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8696 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8697 elif my_vca
.get("member-vnf-index"):
8699 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8702 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8704 self
._write
_configuration
_status
(
8705 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8708 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8710 check_if_terminated_needed
= True
8711 for initial_config_primitive
in initial_config_primitive_list
:
8712 # adding information on the vca_deployed if it is a NS execution environment
8713 if not vca_deployed
["member-vnf-index"]:
8714 deploy_params
["ns_config_info"] = json
.dumps(
8715 self
._get
_ns
_config
_info
(nsr_id
)
8717 # TODO check if already done
8718 primitive_params_
= self
._map
_primitive
_params
(
8719 initial_config_primitive
, {}, deploy_params
8722 step
= "execute primitive '{}' params '{}'".format(
8723 initial_config_primitive
["name"], primitive_params_
8725 self
.logger
.debug(logging_text
+ step
)
8726 await self
.vca_map
[vca_type
].exec_primitive(
8728 primitive_name
=initial_config_primitive
["name"],
8729 params_dict
=primitive_params_
,
8734 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8735 if check_if_terminated_needed
:
8736 if config_descriptor
.get("terminate-config-primitive"):
8740 {db_update_entry
+ "needed_terminate": True},
8742 check_if_terminated_needed
= False
8744 # TODO register in database that primitive is done
8746 # STEP 7 Configure metrics
8747 # Not sure if this need to be done when healing
8749 if vca_type == "helm" or vca_type == "helm-v3":
8750 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8752 artifact_path=artifact_path,
8753 ee_config_descriptor=ee_config_descriptor,
8756 target_ip=rw_mgmt_ip,
8762 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8765 for job in prometheus_jobs:
8768 {"job_name": job["job_name"]},
8771 fail_on_empty=False,
8775 step
= "instantiated at VCA"
8776 self
.logger
.debug(logging_text
+ step
)
8778 self
._write
_configuration
_status
(
8779 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8782 except Exception as e
: # TODO not use Exception but N2VC exception
8783 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8785 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8788 "Exception while {} : {}".format(step
, e
), exc_info
=True
8790 self
._write
_configuration
_status
(
8791 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8793 raise LcmException("{} {}".format(step
, e
)) from e
8795 async def _wait_heal_ro(
8801 while time() <= start_time
+ timeout
:
8802 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8803 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8804 "operational-status"
8806 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8807 if operational_status_ro
!= "healing":
8809 await asyncio
.sleep(15)
8810 else: # timeout_ns_deploy
8811 raise NgRoException("Timeout waiting ns to deploy")
8813 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8815 Vertical Scale the VDUs in a NS
8817 :param: nsr_id: NS Instance ID
8818 :param: nslcmop_id: nslcmop ID of migrate
8821 # Try to lock HA task here
8822 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8823 if not task_is_locked_by_me
:
8825 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8826 self
.logger
.debug(logging_text
+ "Enter")
8827 # get all needed from database
8829 db_nslcmop_update
= {}
8830 nslcmop_operation_state
= None
8834 # in case of error, indicates what part of scale was failed to put nsr at error status
8835 start_deploy
= time()
8838 # wait for any previous tasks in process
8839 step
= "Waiting for previous operations to terminate"
8840 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8842 self
._write
_ns
_status
(
8845 current_operation
="VerticalScale",
8846 current_operation_id
=nslcmop_id
,
8848 step
= "Getting nslcmop from database"
8850 step
+ " after having waited for previous tasks to be completed"
8852 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8853 operationParams
= db_nslcmop
.get("operationParams")
8855 target
.update(operationParams
)
8856 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8857 self
.logger
.debug("RO return > {}".format(desc
))
8858 action_id
= desc
["action_id"]
8859 await self
._wait
_ng
_ro
(
8864 self
.timeout
.verticalscale
,
8865 operation
="verticalscale",
8867 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8868 self
.logger
.error("Exit Exception {}".format(e
))
8870 except asyncio
.CancelledError
:
8871 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8872 exc
= "Operation was cancelled"
8873 except Exception as e
:
8874 exc
= traceback
.format_exc()
8875 self
.logger
.critical(
8876 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8879 self
._write
_ns
_status
(
8882 current_operation
="IDLE",
8883 current_operation_id
=None,
8886 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8887 nslcmop_operation_state
= "FAILED"
8889 nslcmop_operation_state
= "COMPLETED"
8890 db_nslcmop_update
["detailed-status"] = "Done"
8891 db_nsr_update
["detailed-status"] = "Done"
8893 self
._write
_op
_status
(
8897 operation_state
=nslcmop_operation_state
,
8898 other_update
=db_nslcmop_update
,
8900 if nslcmop_operation_state
:
8904 "nslcmop_id": nslcmop_id
,
8905 "operationState": nslcmop_operation_state
,
8907 await self
.msg
.aiowrite("ns", "verticalscaled", msg
)
8908 except Exception as e
:
8910 logging_text
+ "kafka_write notification Exception {}".format(e
)
8912 self
.logger
.debug(logging_text
+ "Exit")
8913 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")