1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.lcm_config
import LcmCfg
38 from osm_lcm
.data_utils
.nsr
import (
41 get_deployed_vca_list
,
44 from osm_lcm
.data_utils
.vca
import (
53 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
54 from osm_lcm
.lcm_utils
import (
61 check_juju_bundle_existence
,
62 get_charm_artifact_path
,
66 from osm_lcm
.data_utils
.nsd
import (
67 get_ns_configuration_relation_list
,
71 from osm_lcm
.data_utils
.vnfd
import (
77 get_ee_sorted_initial_config_primitive_list
,
78 get_ee_sorted_terminate_config_primitive_list
,
80 get_virtual_link_profiles
,
85 get_number_of_instances
,
87 get_kdu_resource_profile
,
88 find_software_version
,
91 from osm_lcm
.data_utils
.list_utils
import find_in_list
92 from osm_lcm
.data_utils
.vnfr
import (
96 get_volumes_from_instantiation_params
,
98 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
99 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
100 from n2vc
.definitions
import RelationEndpoint
101 from n2vc
.k8s_helm_conn
import K8sHelmConnector
102 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
103 from n2vc
.k8s_juju_conn
import K8sJujuConnector
105 from osm_common
.dbbase
import DbException
106 from osm_common
.fsbase
import FsException
108 from osm_lcm
.data_utils
.database
.database
import Database
109 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
110 from osm_lcm
.data_utils
.wim
import (
112 get_target_wim_attrs
,
113 select_feasible_wim_account
,
116 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
117 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
119 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
120 from osm_lcm
.osm_config
import OsmConfigBuilder
121 from osm_lcm
.prometheus
import parse_job
123 from copy
import copy
, deepcopy
124 from time
import time
125 from uuid
import uuid4
127 from random
import randint
129 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
132 class NsLcm(LcmBase
):
133 SUBOPERATION_STATUS_NOT_FOUND
= -1
134 SUBOPERATION_STATUS_NEW
= -2
135 SUBOPERATION_STATUS_SKIP
= -3
136 task_name_deploy_vca
= "Deploying VCA"
138 def __init__(self
, msg
, lcm_tasks
, config
: LcmCfg
, loop
):
140 Init, Connect to database, filesystem storage, and messaging
141 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
144 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
146 self
.db
= Database().instance
.db
147 self
.fs
= Filesystem().instance
.fs
149 self
.lcm_tasks
= lcm_tasks
150 self
.timeout
= config
.timeout
151 self
.ro_config
= config
.RO
152 self
.vca_config
= config
.VCA
154 # create N2VC connector
155 self
.n2vc
= N2VCJujuConnector(
158 on_update_db
=self
._on
_update
_n
2vc
_db
,
163 self
.conn_helm_ee
= LCMHelmConn(
166 vca_config
=self
.vca_config
,
167 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.k8sclusterhelm2
= K8sHelmConnector(
171 kubectl_command
=self
.vca_config
.kubectlpath
,
172 helm_command
=self
.vca_config
.helmpath
,
179 self
.k8sclusterhelm3
= K8sHelm3Connector(
180 kubectl_command
=self
.vca_config
.kubectlpath
,
181 helm_command
=self
.vca_config
.helm3path
,
188 self
.k8sclusterjuju
= K8sJujuConnector(
189 kubectl_command
=self
.vca_config
.kubectlpath
,
190 juju_command
=self
.vca_config
.jujupath
,
193 on_update_db
=self
._on
_update
_k
8s
_db
,
198 self
.k8scluster_map
= {
199 "helm-chart": self
.k8sclusterhelm2
,
200 "helm-chart-v3": self
.k8sclusterhelm3
,
201 "chart": self
.k8sclusterhelm3
,
202 "juju-bundle": self
.k8sclusterjuju
,
203 "juju": self
.k8sclusterjuju
,
207 "lxc_proxy_charm": self
.n2vc
,
208 "native_charm": self
.n2vc
,
209 "k8s_proxy_charm": self
.n2vc
,
210 "helm": self
.conn_helm_ee
,
211 "helm-v3": self
.conn_helm_ee
,
215 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
.to_dict())
217 self
.op_status_map
= {
218 "instantiation": self
.RO
.status
,
219 "termination": self
.RO
.status
,
220 "migrate": self
.RO
.status
,
221 "healing": self
.RO
.recreate_status
,
222 "verticalscale": self
.RO
.status
,
223 "start_stop_rebuild": self
.RO
.status
,
227 def increment_ip_mac(ip_mac
, vm_index
=1):
228 if not isinstance(ip_mac
, str):
231 # try with ipv4 look for last dot
232 i
= ip_mac
.rfind(".")
235 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
236 # try with ipv6 or mac look for last colon. Operate in hex
237 i
= ip_mac
.rfind(":")
240 # format in hex, len can be 2 for mac or 4 for ipv6
241 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
242 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
248 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
249 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
252 # TODO filter RO descriptor fields...
256 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
257 db_dict
["deploymentStatus"] = ro_descriptor
258 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
260 except Exception as e
:
262 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
265 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
266 # remove last dot from path (if exists)
267 if path
.endswith("."):
270 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
271 # .format(table, filter, path, updated_data))
273 nsr_id
= filter.get("_id")
275 # read ns record from database
276 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
277 current_ns_status
= nsr
.get("nsState")
279 # get vca status for NS
280 status_dict
= await self
.n2vc
.get_status(
281 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
286 db_dict
["vcaStatus"] = status_dict
288 # update configurationStatus for this VCA
290 vca_index
= int(path
[path
.rfind(".") + 1 :])
293 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
295 vca_status
= vca_list
[vca_index
].get("status")
297 configuration_status_list
= nsr
.get("configurationStatus")
298 config_status
= configuration_status_list
[vca_index
].get("status")
300 if config_status
== "BROKEN" and vca_status
!= "failed":
301 db_dict
["configurationStatus"][vca_index
] = "READY"
302 elif config_status
!= "BROKEN" and vca_status
== "failed":
303 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
304 except Exception as e
:
305 # not update configurationStatus
306 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
308 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
309 # if nsState = 'DEGRADED' check if all is OK
311 if current_ns_status
in ("READY", "DEGRADED"):
312 error_description
= ""
314 if status_dict
.get("machines"):
315 for machine_id
in status_dict
.get("machines"):
316 machine
= status_dict
.get("machines").get(machine_id
)
317 # check machine agent-status
318 if machine
.get("agent-status"):
319 s
= machine
.get("agent-status").get("status")
322 error_description
+= (
323 "machine {} agent-status={} ; ".format(
327 # check machine instance status
328 if machine
.get("instance-status"):
329 s
= machine
.get("instance-status").get("status")
332 error_description
+= (
333 "machine {} instance-status={} ; ".format(
338 if status_dict
.get("applications"):
339 for app_id
in status_dict
.get("applications"):
340 app
= status_dict
.get("applications").get(app_id
)
341 # check application status
342 if app
.get("status"):
343 s
= app
.get("status").get("status")
346 error_description
+= (
347 "application {} status={} ; ".format(app_id
, s
)
350 if error_description
:
351 db_dict
["errorDescription"] = error_description
352 if current_ns_status
== "READY" and is_degraded
:
353 db_dict
["nsState"] = "DEGRADED"
354 if current_ns_status
== "DEGRADED" and not is_degraded
:
355 db_dict
["nsState"] = "READY"
358 self
.update_db_2("nsrs", nsr_id
, db_dict
)
360 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
362 except Exception as e
:
363 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
365 async def _on_update_k8s_db(
366 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
369 Updating vca status in NSR record
370 :param cluster_uuid: UUID of a k8s cluster
371 :param kdu_instance: The unique name of the KDU instance
372 :param filter: To get nsr_id
373 :cluster_type: The cluster type (juju, k8s)
377 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
378 # .format(cluster_uuid, kdu_instance, filter))
380 nsr_id
= filter.get("_id")
382 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
383 cluster_uuid
=cluster_uuid
,
384 kdu_instance
=kdu_instance
,
386 complete_status
=True,
392 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
395 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
399 self
.update_db_2("nsrs", nsr_id
, db_dict
)
400 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
402 except Exception as e
:
403 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
406 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
409 undefined
=StrictUndefined
,
410 autoescape
=select_autoescape(default_for_string
=True, default
=True),
412 template
= env
.from_string(cloud_init_text
)
413 return template
.render(additional_params
or {})
414 except UndefinedError
as e
:
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
420 except (TemplateError
, TemplateNotFound
) as e
:
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
427 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
428 cloud_init_content
= cloud_init_file
= None
430 if vdu
.get("cloud-init-file"):
431 base_folder
= vnfd
["_admin"]["storage"]
432 if base_folder
["pkg-dir"]:
433 cloud_init_file
= "{}/{}/cloud_init/{}".format(
434 base_folder
["folder"],
435 base_folder
["pkg-dir"],
436 vdu
["cloud-init-file"],
439 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
440 base_folder
["folder"],
441 vdu
["cloud-init-file"],
443 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
444 cloud_init_content
= ci_file
.read()
445 elif vdu
.get("cloud-init"):
446 cloud_init_content
= vdu
["cloud-init"]
448 return cloud_init_content
449 except FsException
as e
:
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd
["id"], vdu
["id"], cloud_init_file
, e
456 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
458 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
460 additional_params
= vdur
.get("additionalParams")
461 return parse_yaml_strings(additional_params
)
463 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
465 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
466 :param vnfd: input vnfd
467 :param new_id: overrides vnf id if provided
468 :param additionalParams: Instantiation params for VNFs provided
469 :param nsrId: Id of the NSR
470 :return: copy of vnfd
472 vnfd_RO
= deepcopy(vnfd
)
473 # remove unused by RO configuration, monitoring, scaling and internal keys
474 vnfd_RO
.pop("_id", None)
475 vnfd_RO
.pop("_admin", None)
476 vnfd_RO
.pop("monitoring-param", None)
477 vnfd_RO
.pop("scaling-group-descriptor", None)
478 vnfd_RO
.pop("kdu", None)
479 vnfd_RO
.pop("k8s-cluster", None)
481 vnfd_RO
["id"] = new_id
483 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
484 for vdu
in get_iterable(vnfd_RO
, "vdu"):
485 vdu
.pop("cloud-init-file", None)
486 vdu
.pop("cloud-init", None)
490 def ip_profile_2_RO(ip_profile
):
491 RO_ip_profile
= deepcopy(ip_profile
)
492 if "dns-server" in RO_ip_profile
:
493 if isinstance(RO_ip_profile
["dns-server"], list):
494 RO_ip_profile
["dns-address"] = []
495 for ds
in RO_ip_profile
.pop("dns-server"):
496 RO_ip_profile
["dns-address"].append(ds
["address"])
498 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
499 if RO_ip_profile
.get("ip-version") == "ipv4":
500 RO_ip_profile
["ip-version"] = "IPv4"
501 if RO_ip_profile
.get("ip-version") == "ipv6":
502 RO_ip_profile
["ip-version"] = "IPv6"
503 if "dhcp-params" in RO_ip_profile
:
504 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
507 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
508 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
509 if db_vim
["_admin"]["operationalState"] != "ENABLED":
511 "VIM={} is not available. operationalState={}".format(
512 vim_account
, db_vim
["_admin"]["operationalState"]
515 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
518 def get_ro_wim_id_for_wim_account(self
, wim_account
):
519 if isinstance(wim_account
, str):
520 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
521 if db_wim
["_admin"]["operationalState"] != "ENABLED":
523 "WIM={} is not available. operationalState={}".format(
524 wim_account
, db_wim
["_admin"]["operationalState"]
527 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
532 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
533 db_vdu_push_list
= []
535 db_update
= {"_admin.modified": time()}
537 for vdu_id
, vdu_count
in vdu_create
.items():
541 for vdur
in reversed(db_vnfr
["vdur"])
542 if vdur
["vdu-id-ref"] == vdu_id
547 # Read the template saved in the db:
549 "No vdur in the database. Using the vdur-template to scale"
551 vdur_template
= db_vnfr
.get("vdur-template")
552 if not vdur_template
:
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
558 vdur
= vdur_template
[0]
559 # Delete a template from the database after using it
562 {"_id": db_vnfr
["_id"]},
564 pull
={"vdur-template": {"_id": vdur
["_id"]}},
566 for count
in range(vdu_count
):
567 vdur_copy
= deepcopy(vdur
)
568 vdur_copy
["status"] = "BUILD"
569 vdur_copy
["status-detailed"] = None
570 vdur_copy
["ip-address"] = None
571 vdur_copy
["_id"] = str(uuid4())
572 vdur_copy
["count-index"] += count
+ 1
573 vdur_copy
["id"] = "{}-{}".format(
574 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
576 vdur_copy
.pop("vim_info", None)
577 for iface
in vdur_copy
["interfaces"]:
578 if iface
.get("fixed-ip"):
579 iface
["ip-address"] = self
.increment_ip_mac(
580 iface
["ip-address"], count
+ 1
583 iface
.pop("ip-address", None)
584 if iface
.get("fixed-mac"):
585 iface
["mac-address"] = self
.increment_ip_mac(
586 iface
["mac-address"], count
+ 1
589 iface
.pop("mac-address", None)
593 ) # only first vdu can be managment of vnf
594 db_vdu_push_list
.append(vdur_copy
)
595 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
597 if len(db_vnfr
["vdur"]) == 1:
598 # The scale will move to 0 instances
600 "Scaling to 0 !, creating the template with the last vdur"
602 template_vdur
= [db_vnfr
["vdur"][0]]
603 for vdu_id
, vdu_count
in vdu_delete
.items():
605 indexes_to_delete
= [
607 for iv
in enumerate(db_vnfr
["vdur"])
608 if iv
[1]["vdu-id-ref"] == vdu_id
612 "vdur.{}.status".format(i
): "DELETING"
613 for i
in indexes_to_delete
[-vdu_count
:]
617 # it must be deleted one by one because common.db does not allow otherwise
620 for v
in reversed(db_vnfr
["vdur"])
621 if v
["vdu-id-ref"] == vdu_id
623 for vdu
in vdus_to_delete
[:vdu_count
]:
626 {"_id": db_vnfr
["_id"]},
628 pull
={"vdur": {"_id": vdu
["_id"]}},
632 db_push
["vdur"] = db_vdu_push_list
634 db_push
["vdur-template"] = template_vdur
637 db_vnfr
["vdur-template"] = template_vdur
638 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
639 # modify passed dictionary db_vnfr
640 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
641 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
643 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
645 Updates database nsr with the RO info for the created vld
646 :param ns_update_nsr: dictionary to be filled with the updated info
647 :param db_nsr: content of db_nsr. This is also modified
648 :param nsr_desc_RO: nsr descriptor from RO
649 :return: Nothing, LcmException is raised on errors
652 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
653 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
654 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
656 vld
["vim-id"] = net_RO
.get("vim_net_id")
657 vld
["name"] = net_RO
.get("vim_name")
658 vld
["status"] = net_RO
.get("status")
659 vld
["status-detailed"] = net_RO
.get("error_msg")
660 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
664 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
667 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
669 for db_vnfr
in db_vnfrs
.values():
670 vnfr_update
= {"status": "ERROR"}
671 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
672 if "status" not in vdur
:
673 vdur
["status"] = "ERROR"
674 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
676 vdur
["status-detailed"] = str(error_text
)
678 "vdur.{}.status-detailed".format(vdu_index
)
680 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
681 except DbException
as e
:
682 self
.logger
.error("Cannot update vnf. {}".format(e
))
684 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
686 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
687 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
688 :param nsr_desc_RO: nsr descriptor from RO
689 :return: Nothing, LcmException is raised on errors
691 for vnf_index
, db_vnfr
in db_vnfrs
.items():
692 for vnf_RO
in nsr_desc_RO
["vnfs"]:
693 if vnf_RO
["member_vnf_index"] != vnf_index
:
696 if vnf_RO
.get("ip_address"):
697 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
700 elif not db_vnfr
.get("ip-address"):
701 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
702 raise LcmExceptionNoMgmtIP(
703 "ns member_vnf_index '{}' has no IP address".format(
708 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
709 vdur_RO_count_index
= 0
710 if vdur
.get("pdu-type"):
712 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
713 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
715 if vdur
["count-index"] != vdur_RO_count_index
:
716 vdur_RO_count_index
+= 1
718 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
719 if vdur_RO
.get("ip_address"):
720 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
722 vdur
["ip-address"] = None
723 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
724 vdur
["name"] = vdur_RO
.get("vim_name")
725 vdur
["status"] = vdur_RO
.get("status")
726 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
727 for ifacer
in get_iterable(vdur
, "interfaces"):
728 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
729 if ifacer
["name"] == interface_RO
.get("internal_name"):
730 ifacer
["ip-address"] = interface_RO
.get(
733 ifacer
["mac-address"] = interface_RO
.get(
739 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
740 "from VIM info".format(
741 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
744 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
748 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
750 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
754 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
755 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
756 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
758 vld
["vim-id"] = net_RO
.get("vim_net_id")
759 vld
["name"] = net_RO
.get("vim_name")
760 vld
["status"] = net_RO
.get("status")
761 vld
["status-detailed"] = net_RO
.get("error_msg")
762 vnfr_update
["vld.{}".format(vld_index
)] = vld
766 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
771 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
776 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
781 def _get_ns_config_info(self
, nsr_id
):
783 Generates a mapping between vnf,vdu elements and the N2VC id
784 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
785 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
786 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
787 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
789 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
790 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
792 ns_config_info
= {"osm-config-mapping": mapping
}
793 for vca
in vca_deployed_list
:
794 if not vca
["member-vnf-index"]:
796 if not vca
["vdu_id"]:
797 mapping
[vca
["member-vnf-index"]] = vca
["application"]
801 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
803 ] = vca
["application"]
804 return ns_config_info
806 async def _instantiate_ng_ro(
822 def get_vim_account(vim_account_id
):
824 if vim_account_id
in db_vims
:
825 return db_vims
[vim_account_id
]
826 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
827 db_vims
[vim_account_id
] = db_vim
830 # modify target_vld info with instantiation parameters
831 def parse_vld_instantiation_params(
832 target_vim
, target_vld
, vld_params
, target_sdn
834 if vld_params
.get("ip-profile"):
835 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_to_ro_ip_profile(
836 vld_params
["ip-profile"]
838 if vld_params
.get("provider-network"):
839 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
842 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
843 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
847 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
848 # if wim_account_id is specified in vld_params, validate if it is feasible.
849 wim_account_id
, db_wim
= select_feasible_wim_account(
850 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
854 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
855 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
856 # update vld_params with correct WIM account Id
857 vld_params
["wimAccountId"] = wim_account_id
859 target_wim
= "wim:{}".format(wim_account_id
)
860 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
861 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
862 if len(sdn_ports
) > 0:
863 target_vld
["vim_info"][target_wim
] = target_wim_attrs
864 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
867 "Target VLD with WIM data: {:s}".format(str(target_vld
))
870 for param
in ("vim-network-name", "vim-network-id"):
871 if vld_params
.get(param
):
872 if isinstance(vld_params
[param
], dict):
873 for vim
, vim_net
in vld_params
[param
].items():
874 other_target_vim
= "vim:" + vim
876 target_vld
["vim_info"],
877 (other_target_vim
, param
.replace("-", "_")),
880 else: # isinstance str
881 target_vld
["vim_info"][target_vim
][
882 param
.replace("-", "_")
883 ] = vld_params
[param
]
884 if vld_params
.get("common_id"):
885 target_vld
["common_id"] = vld_params
.get("common_id")
887 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
888 def update_ns_vld_target(target
, ns_params
):
889 for vnf_params
in ns_params
.get("vnf", ()):
890 if vnf_params
.get("vimAccountId"):
894 for vnfr
in db_vnfrs
.values()
895 if vnf_params
["member-vnf-index"]
896 == vnfr
["member-vnf-index-ref"]
900 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
903 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
904 target_vld
= find_in_list(
905 get_iterable(vdur
, "interfaces"),
906 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
909 vld_params
= find_in_list(
910 get_iterable(ns_params
, "vld"),
911 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
914 if vnf_params
.get("vimAccountId") not in a_vld
.get(
917 target_vim_network_list
= [
918 v
for _
, v
in a_vld
.get("vim_info").items()
920 target_vim_network_name
= next(
922 item
.get("vim_network_name", "")
923 for item
in target_vim_network_list
928 target
["ns"]["vld"][a_index
].get("vim_info").update(
930 "vim:{}".format(vnf_params
["vimAccountId"]): {
931 "vim_network_name": target_vim_network_name
,
937 for param
in ("vim-network-name", "vim-network-id"):
938 if vld_params
.get(param
) and isinstance(
939 vld_params
[param
], dict
941 for vim
, vim_net
in vld_params
[
944 other_target_vim
= "vim:" + vim
946 target
["ns"]["vld"][a_index
].get(
951 param
.replace("-", "_"),
956 nslcmop_id
= db_nslcmop
["_id"]
958 "name": db_nsr
["name"],
961 "image": deepcopy(db_nsr
["image"]),
962 "flavor": deepcopy(db_nsr
["flavor"]),
963 "action_id": nslcmop_id
,
964 "cloud_init_content": {},
966 for image
in target
["image"]:
967 image
["vim_info"] = {}
968 for flavor
in target
["flavor"]:
969 flavor
["vim_info"] = {}
970 if db_nsr
.get("affinity-or-anti-affinity-group"):
971 target
["affinity-or-anti-affinity-group"] = deepcopy(
972 db_nsr
["affinity-or-anti-affinity-group"]
974 for affinity_or_anti_affinity_group
in target
[
975 "affinity-or-anti-affinity-group"
977 affinity_or_anti_affinity_group
["vim_info"] = {}
979 if db_nslcmop
.get("lcmOperationType") != "instantiate":
980 # get parameters of instantiation:
981 db_nslcmop_instantiate
= self
.db
.get_list(
984 "nsInstanceId": db_nslcmop
["nsInstanceId"],
985 "lcmOperationType": "instantiate",
988 ns_params
= db_nslcmop_instantiate
.get("operationParams")
990 ns_params
= db_nslcmop
.get("operationParams")
991 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
992 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
995 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
996 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1000 "mgmt-network": vld
.get("mgmt-network", False),
1001 "type": vld
.get("type"),
1004 "vim_network_name": vld
.get("vim-network-name"),
1005 "vim_account_id": ns_params
["vimAccountId"],
1009 # check if this network needs SDN assist
1010 if vld
.get("pci-interfaces"):
1011 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1012 if vim_config
:= db_vim
.get("config"):
1013 if sdnc_id
:= vim_config
.get("sdn-controller"):
1014 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1015 target_sdn
= "sdn:{}".format(sdnc_id
)
1016 target_vld
["vim_info"][target_sdn
] = {
1018 "target_vim": target_vim
,
1020 "type": vld
.get("type"),
1023 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1024 for nsd_vnf_profile
in nsd_vnf_profiles
:
1025 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1026 if cp
["virtual-link-profile-id"] == vld
["id"]:
1028 "member_vnf:{}.{}".format(
1029 cp
["constituent-cpd-id"][0][
1030 "constituent-base-element-id"
1032 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1034 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1036 # check at nsd descriptor, if there is an ip-profile
1038 nsd_vlp
= find_in_list(
1039 get_virtual_link_profiles(nsd
),
1040 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1045 and nsd_vlp
.get("virtual-link-protocol-data")
1046 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1048 vld_params
["ip-profile"] = nsd_vlp
["virtual-link-protocol-data"][
1052 # update vld_params with instantiation params
1053 vld_instantiation_params
= find_in_list(
1054 get_iterable(ns_params
, "vld"),
1055 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1057 if vld_instantiation_params
:
1058 vld_params
.update(vld_instantiation_params
)
1059 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1060 target
["ns"]["vld"].append(target_vld
)
1061 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1062 update_ns_vld_target(target
, ns_params
)
1064 for vnfr
in db_vnfrs
.values():
1065 vnfd
= find_in_list(
1066 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1068 vnf_params
= find_in_list(
1069 get_iterable(ns_params
, "vnf"),
1070 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1072 target_vnf
= deepcopy(vnfr
)
1073 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1074 for vld
in target_vnf
.get("vld", ()):
1075 # check if connected to a ns.vld, to fill target'
1076 vnf_cp
= find_in_list(
1077 vnfd
.get("int-virtual-link-desc", ()),
1078 lambda cpd
: cpd
.get("id") == vld
["id"],
1081 ns_cp
= "member_vnf:{}.{}".format(
1082 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1084 if cp2target
.get(ns_cp
):
1085 vld
["target"] = cp2target
[ns_cp
]
1088 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1090 # check if this network needs SDN assist
1092 if vld
.get("pci-interfaces"):
1093 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1094 sdnc_id
= db_vim
["config"].get("sdn-controller")
1096 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1097 target_sdn
= "sdn:{}".format(sdnc_id
)
1098 vld
["vim_info"][target_sdn
] = {
1100 "target_vim": target_vim
,
1102 "type": vld
.get("type"),
1105 # check at vnfd descriptor, if there is an ip-profile
1107 vnfd_vlp
= find_in_list(
1108 get_virtual_link_profiles(vnfd
),
1109 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1113 and vnfd_vlp
.get("virtual-link-protocol-data")
1114 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1116 vld_params
["ip-profile"] = vnfd_vlp
["virtual-link-protocol-data"][
1119 # update vld_params with instantiation params
1121 vld_instantiation_params
= find_in_list(
1122 get_iterable(vnf_params
, "internal-vld"),
1123 lambda i_vld
: i_vld
["name"] == vld
["id"],
1125 if vld_instantiation_params
:
1126 vld_params
.update(vld_instantiation_params
)
1127 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1130 for vdur
in target_vnf
.get("vdur", ()):
1131 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1132 continue # This vdu must not be created
1133 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1135 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1138 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1139 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1142 and vdu_configuration
.get("config-access")
1143 and vdu_configuration
.get("config-access").get("ssh-access")
1145 vdur
["ssh-keys"] = ssh_keys_all
1146 vdur
["ssh-access-required"] = vdu_configuration
[
1148 ]["ssh-access"]["required"]
1151 and vnf_configuration
.get("config-access")
1152 and vnf_configuration
.get("config-access").get("ssh-access")
1153 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1155 vdur
["ssh-keys"] = ssh_keys_all
1156 vdur
["ssh-access-required"] = vnf_configuration
[
1158 ]["ssh-access"]["required"]
1159 elif ssh_keys_instantiation
and find_in_list(
1160 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1162 vdur
["ssh-keys"] = ssh_keys_instantiation
1164 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1166 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1168 if vdud
.get("cloud-init-file"):
1169 vdur
["cloud-init"] = "{}:file:{}".format(
1170 vnfd
["_id"], vdud
.get("cloud-init-file")
1172 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1173 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1174 base_folder
= vnfd
["_admin"]["storage"]
1175 if base_folder
["pkg-dir"]:
1176 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1177 base_folder
["folder"],
1178 base_folder
["pkg-dir"],
1179 vdud
.get("cloud-init-file"),
1182 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1183 base_folder
["folder"],
1184 vdud
.get("cloud-init-file"),
1186 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1187 target
["cloud_init_content"][
1190 elif vdud
.get("cloud-init"):
1191 vdur
["cloud-init"] = "{}:vdu:{}".format(
1192 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1194 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1195 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1198 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1199 deploy_params_vdu
= self
._format
_additional
_params
(
1200 vdur
.get("additionalParams") or {}
1202 deploy_params_vdu
["OSM"] = get_osm_params(
1203 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1205 vdur
["additionalParams"] = deploy_params_vdu
1208 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1209 if target_vim
not in ns_flavor
["vim_info"]:
1210 ns_flavor
["vim_info"][target_vim
] = {}
1213 # in case alternative images are provided we must check if they should be applied
1214 # for the vim_type, modify the vim_type taking into account
1215 ns_image_id
= int(vdur
["ns-image-id"])
1216 if vdur
.get("alt-image-ids"):
1217 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1218 vim_type
= db_vim
["vim_type"]
1219 for alt_image_id
in vdur
.get("alt-image-ids"):
1220 ns_alt_image
= target
["image"][int(alt_image_id
)]
1221 if vim_type
== ns_alt_image
.get("vim-type"):
1222 # must use alternative image
1224 "use alternative image id: {}".format(alt_image_id
)
1226 ns_image_id
= alt_image_id
1227 vdur
["ns-image-id"] = ns_image_id
1229 ns_image
= target
["image"][int(ns_image_id
)]
1230 if target_vim
not in ns_image
["vim_info"]:
1231 ns_image
["vim_info"][target_vim
] = {}
1234 if vdur
.get("affinity-or-anti-affinity-group-id"):
1235 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1236 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1237 if target_vim
not in ns_ags
["vim_info"]:
1238 ns_ags
["vim_info"][target_vim
] = {}
1240 vdur
["vim_info"] = {target_vim
: {}}
1241 # instantiation parameters
1243 vdu_instantiation_params
= find_in_list(
1244 get_iterable(vnf_params
, "vdu"),
1245 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1247 if vdu_instantiation_params
:
1248 # Parse the vdu_volumes from the instantiation params
1249 vdu_volumes
= get_volumes_from_instantiation_params(
1250 vdu_instantiation_params
, vdud
1252 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1253 vdur
["additionalParams"]["OSM"][
1255 ] = vdu_instantiation_params
.get("vim-flavor-id")
1256 vdur_list
.append(vdur
)
1257 target_vnf
["vdur"] = vdur_list
1258 target
["vnf"].append(target_vnf
)
1260 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1261 desc
= await self
.RO
.deploy(nsr_id
, target
)
1262 self
.logger
.debug("RO return > {}".format(desc
))
1263 action_id
= desc
["action_id"]
1264 await self
._wait
_ng
_ro
(
1271 operation
="instantiation",
1276 "_admin.deployed.RO.operational-status": "running",
1277 "detailed-status": " ".join(stage
),
1279 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1280 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1281 self
._write
_op
_status
(nslcmop_id
, stage
)
1283 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1287 async def _wait_ng_ro(
1297 detailed_status_old
= None
1299 start_time
= start_time
or time()
1300 while time() <= start_time
+ timeout
:
1301 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1302 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1303 if desc_status
["status"] == "FAILED":
1304 raise NgRoException(desc_status
["details"])
1305 elif desc_status
["status"] == "BUILD":
1307 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1308 elif desc_status
["status"] == "DONE":
1310 stage
[2] = "Deployed at VIM"
1313 assert False, "ROclient.check_ns_status returns unknown {}".format(
1314 desc_status
["status"]
1316 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1317 detailed_status_old
= stage
[2]
1318 db_nsr_update
["detailed-status"] = " ".join(stage
)
1319 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1320 self
._write
_op
_status
(nslcmop_id
, stage
)
1321 await asyncio
.sleep(15, loop
=self
.loop
)
1322 else: # timeout_ns_deploy
1323 raise NgRoException("Timeout waiting ns to deploy")
1325 async def _terminate_ng_ro(
1326 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1331 start_deploy
= time()
1338 "action_id": nslcmop_id
,
1340 desc
= await self
.RO
.deploy(nsr_id
, target
)
1341 action_id
= desc
["action_id"]
1342 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1345 + "ns terminate action at RO. action_id={}".format(action_id
)
1349 delete_timeout
= 20 * 60 # 20 minutes
1350 await self
._wait
_ng
_ro
(
1357 operation
="termination",
1359 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1361 await self
.RO
.delete(nsr_id
)
1362 except NgRoException
as e
:
1363 if e
.http_code
== 404: # not found
1364 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1365 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1367 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1369 elif e
.http_code
== 409: # conflict
1370 failed_detail
.append("delete conflict: {}".format(e
))
1373 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1376 failed_detail
.append("delete error: {}".format(e
))
1379 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1381 except Exception as e
:
1382 failed_detail
.append("delete error: {}".format(e
))
1384 logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
)
1388 stage
[2] = "Error deleting from VIM"
1390 stage
[2] = "Deleted from VIM"
1391 db_nsr_update
["detailed-status"] = " ".join(stage
)
1392 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1393 self
._write
_op
_status
(nslcmop_id
, stage
)
1396 raise LcmException("; ".join(failed_detail
))
1399 async def instantiate_RO(
1413 :param logging_text: preffix text to use at logging
1414 :param nsr_id: nsr identity
1415 :param nsd: database content of ns descriptor
1416 :param db_nsr: database content of ns record
1417 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1419 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1420 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1421 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1422 :return: None or exception
1425 start_deploy
= time()
1426 ns_params
= db_nslcmop
.get("operationParams")
1427 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1428 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1430 timeout_ns_deploy
= self
.timeout
.ns_deploy
1432 # Check for and optionally request placement optimization. Database will be updated if placement activated
1433 stage
[2] = "Waiting for Placement."
1434 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1435 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1436 for vnfr
in db_vnfrs
.values():
1437 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1440 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1442 return await self
._instantiate
_ng
_ro
(
1455 except Exception as e
:
1456 stage
[2] = "ERROR deploying at VIM"
1457 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1459 "Error deploying at VIM {}".format(e
),
1460 exc_info
=not isinstance(
1463 ROclient
.ROClientException
,
1472 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1474 Wait for kdu to be up, get ip address
1475 :param logging_text: prefix use for logging
1479 :return: IP address, K8s services
1482 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1485 while nb_tries
< 360:
1486 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1490 for x
in get_iterable(db_vnfr
, "kdur")
1491 if x
.get("kdu-name") == kdu_name
1497 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1499 if kdur
.get("status"):
1500 if kdur
["status"] in ("READY", "ENABLED"):
1501 return kdur
.get("ip-address"), kdur
.get("services")
1504 "target KDU={} is in error state".format(kdu_name
)
1507 await asyncio
.sleep(10, loop
=self
.loop
)
1509 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1511 async def wait_vm_up_insert_key_ro(
1512 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1515 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1516 :param logging_text: prefix use for logging
1521 :param pub_key: public ssh key to inject, None to skip
1522 :param user: user to apply the public ssh key
1526 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1528 target_vdu_id
= None
1533 if ro_retries
>= 360: # 1 hour
1535 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1538 await asyncio
.sleep(10, loop
=self
.loop
)
1541 if not target_vdu_id
:
1542 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1544 if not vdu_id
: # for the VNF case
1545 if db_vnfr
.get("status") == "ERROR":
1547 "Cannot inject ssh-key because target VNF is in error state"
1549 ip_address
= db_vnfr
.get("ip-address")
1555 for x
in get_iterable(db_vnfr
, "vdur")
1556 if x
.get("ip-address") == ip_address
1564 for x
in get_iterable(db_vnfr
, "vdur")
1565 if x
.get("vdu-id-ref") == vdu_id
1566 and x
.get("count-index") == vdu_index
1572 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1573 ): # If only one, this should be the target vdu
1574 vdur
= db_vnfr
["vdur"][0]
1577 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1578 vnfr_id
, vdu_id
, vdu_index
1581 # New generation RO stores information at "vim_info"
1584 if vdur
.get("vim_info"):
1586 t
for t
in vdur
["vim_info"]
1587 ) # there should be only one key
1588 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1590 vdur
.get("pdu-type")
1591 or vdur
.get("status") == "ACTIVE"
1592 or ng_ro_status
== "ACTIVE"
1594 ip_address
= vdur
.get("ip-address")
1597 target_vdu_id
= vdur
["vdu-id-ref"]
1598 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1600 "Cannot inject ssh-key because target VM is in error state"
1603 if not target_vdu_id
:
1606 # inject public key into machine
1607 if pub_key
and user
:
1608 self
.logger
.debug(logging_text
+ "Inserting RO key")
1609 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1610 if vdur
.get("pdu-type"):
1611 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1616 "action": "inject_ssh_key",
1620 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1622 desc
= await self
.RO
.deploy(nsr_id
, target
)
1623 action_id
= desc
["action_id"]
1624 await self
._wait
_ng
_ro
(
1625 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1628 except NgRoException
as e
:
1630 "Reaching max tries injecting key. Error: {}".format(e
)
1637 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1639 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1641 my_vca
= vca_deployed_list
[vca_index
]
1642 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1643 # vdu or kdu: no dependencies
1647 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1648 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1649 configuration_status_list
= db_nsr
["configurationStatus"]
1650 for index
, vca_deployed
in enumerate(configuration_status_list
):
1651 if index
== vca_index
:
1654 if not my_vca
.get("member-vnf-index") or (
1655 vca_deployed
.get("member-vnf-index")
1656 == my_vca
.get("member-vnf-index")
1658 internal_status
= configuration_status_list
[index
].get("status")
1659 if internal_status
== "READY":
1661 elif internal_status
== "BROKEN":
1663 "Configuration aborted because dependent charm/s has failed"
1668 # no dependencies, return
1670 await asyncio
.sleep(10)
1673 raise LcmException("Configuration aborted because dependent charm/s timeout")
1675 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1678 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1680 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1681 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1684 async def instantiate_N2VC(
1702 ee_config_descriptor
,
1704 nsr_id
= db_nsr
["_id"]
1705 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1706 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1707 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1708 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1710 "collection": "nsrs",
1711 "filter": {"_id": nsr_id
},
1712 "path": db_update_entry
,
1717 element_under_configuration
= nsr_id
1721 vnfr_id
= db_vnfr
["_id"]
1722 osm_config
["osm"]["vnf_id"] = vnfr_id
1724 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1726 if vca_type
== "native_charm":
1729 index_number
= vdu_index
or 0
1732 element_type
= "VNF"
1733 element_under_configuration
= vnfr_id
1734 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1736 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1737 element_type
= "VDU"
1738 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1739 osm_config
["osm"]["vdu_id"] = vdu_id
1741 namespace
+= ".{}".format(kdu_name
)
1742 element_type
= "KDU"
1743 element_under_configuration
= kdu_name
1744 osm_config
["osm"]["kdu_name"] = kdu_name
1747 if base_folder
["pkg-dir"]:
1748 artifact_path
= "{}/{}/{}/{}".format(
1749 base_folder
["folder"],
1750 base_folder
["pkg-dir"],
1753 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1758 artifact_path
= "{}/Scripts/{}/{}/".format(
1759 base_folder
["folder"],
1762 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1767 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1769 # get initial_config_primitive_list that applies to this element
1770 initial_config_primitive_list
= config_descriptor
.get(
1771 "initial-config-primitive"
1775 "Initial config primitive list > {}".format(
1776 initial_config_primitive_list
1780 # add config if not present for NS charm
1781 ee_descriptor_id
= ee_config_descriptor
.get("id")
1782 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1783 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1784 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1788 "Initial config primitive list #2 > {}".format(
1789 initial_config_primitive_list
1792 # n2vc_redesign STEP 3.1
1793 # find old ee_id if exists
1794 ee_id
= vca_deployed
.get("ee_id")
1796 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1797 # create or register execution environment in VCA
1798 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1799 self
._write
_configuration
_status
(
1801 vca_index
=vca_index
,
1803 element_under_configuration
=element_under_configuration
,
1804 element_type
=element_type
,
1807 step
= "create execution environment"
1808 self
.logger
.debug(logging_text
+ step
)
1812 if vca_type
== "k8s_proxy_charm":
1813 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1814 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1815 namespace
=namespace
,
1816 artifact_path
=artifact_path
,
1820 elif vca_type
== "helm" or vca_type
== "helm-v3":
1821 ee_id
, credentials
= await self
.vca_map
[
1823 ].create_execution_environment(
1824 namespace
=namespace
,
1828 artifact_path
=artifact_path
,
1829 chart_model
=vca_name
,
1833 ee_id
, credentials
= await self
.vca_map
[
1835 ].create_execution_environment(
1836 namespace
=namespace
,
1842 elif vca_type
== "native_charm":
1843 step
= "Waiting to VM being up and getting IP address"
1844 self
.logger
.debug(logging_text
+ step
)
1845 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1854 credentials
= {"hostname": rw_mgmt_ip
}
1856 username
= deep_get(
1857 config_descriptor
, ("config-access", "ssh-access", "default-user")
1859 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1860 # merged. Meanwhile let's get username from initial-config-primitive
1861 if not username
and initial_config_primitive_list
:
1862 for config_primitive
in initial_config_primitive_list
:
1863 for param
in config_primitive
.get("parameter", ()):
1864 if param
["name"] == "ssh-username":
1865 username
= param
["value"]
1869 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1870 "'config-access.ssh-access.default-user'"
1872 credentials
["username"] = username
1873 # n2vc_redesign STEP 3.2
1875 self
._write
_configuration
_status
(
1877 vca_index
=vca_index
,
1878 status
="REGISTERING",
1879 element_under_configuration
=element_under_configuration
,
1880 element_type
=element_type
,
1883 step
= "register execution environment {}".format(credentials
)
1884 self
.logger
.debug(logging_text
+ step
)
1885 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1886 credentials
=credentials
,
1887 namespace
=namespace
,
1892 # for compatibility with MON/POL modules, the need model and application name at database
1893 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1894 ee_id_parts
= ee_id
.split(".")
1895 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1896 if len(ee_id_parts
) >= 2:
1897 model_name
= ee_id_parts
[0]
1898 application_name
= ee_id_parts
[1]
1899 db_nsr_update
[db_update_entry
+ "model"] = model_name
1900 db_nsr_update
[db_update_entry
+ "application"] = application_name
1902 # n2vc_redesign STEP 3.3
1903 step
= "Install configuration Software"
1905 self
._write
_configuration
_status
(
1907 vca_index
=vca_index
,
1908 status
="INSTALLING SW",
1909 element_under_configuration
=element_under_configuration
,
1910 element_type
=element_type
,
1911 other_update
=db_nsr_update
,
1914 # TODO check if already done
1915 self
.logger
.debug(logging_text
+ step
)
1917 if vca_type
== "native_charm":
1918 config_primitive
= next(
1919 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1922 if config_primitive
:
1923 config
= self
._map
_primitive
_params
(
1924 config_primitive
, {}, deploy_params
1927 if vca_type
== "lxc_proxy_charm":
1928 if element_type
== "NS":
1929 num_units
= db_nsr
.get("config-units") or 1
1930 elif element_type
== "VNF":
1931 num_units
= db_vnfr
.get("config-units") or 1
1932 elif element_type
== "VDU":
1933 for v
in db_vnfr
["vdur"]:
1934 if vdu_id
== v
["vdu-id-ref"]:
1935 num_units
= v
.get("config-units") or 1
1937 if vca_type
!= "k8s_proxy_charm":
1938 await self
.vca_map
[vca_type
].install_configuration_sw(
1940 artifact_path
=artifact_path
,
1943 num_units
=num_units
,
1948 # write in db flag of configuration_sw already installed
1950 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1953 # add relations for this VCA (wait for other peers related with this VCA)
1954 is_relation_added
= await self
._add
_vca
_relations
(
1955 logging_text
=logging_text
,
1958 vca_index
=vca_index
,
1961 if not is_relation_added
:
1962 raise LcmException("Relations could not be added to VCA.")
1964 # if SSH access is required, then get execution environment SSH public
1965 # if native charm we have waited already to VM be UP
1966 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1969 # self.logger.debug("get ssh key block")
1971 config_descriptor
, ("config-access", "ssh-access", "required")
1973 # self.logger.debug("ssh key needed")
1974 # Needed to inject a ssh key
1977 ("config-access", "ssh-access", "default-user"),
1979 step
= "Install configuration Software, getting public ssh key"
1980 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1981 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1984 step
= "Insert public key into VM user={} ssh_key={}".format(
1988 # self.logger.debug("no need to get ssh key")
1989 step
= "Waiting to VM being up and getting IP address"
1990 self
.logger
.debug(logging_text
+ step
)
1992 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1995 # n2vc_redesign STEP 5.1
1996 # wait for RO (ip-address) Insert pub_key into VM
1999 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2000 logging_text
, nsr_id
, vnfr_id
, kdu_name
2002 vnfd
= self
.db
.get_one(
2004 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2006 kdu
= get_kdu(vnfd
, kdu_name
)
2008 service
["name"] for service
in get_kdu_services(kdu
)
2010 exposed_services
= []
2011 for service
in services
:
2012 if any(s
in service
["name"] for s
in kdu_services
):
2013 exposed_services
.append(service
)
2014 await self
.vca_map
[vca_type
].exec_primitive(
2016 primitive_name
="config",
2018 "osm-config": json
.dumps(
2020 k8s
={"services": exposed_services
}
2027 # This verification is needed in order to avoid trying to add a public key
2028 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2029 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2030 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2032 elif db_vnfr
.get("vdur"):
2033 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2043 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2045 # store rw_mgmt_ip in deploy params for later replacement
2046 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2048 # n2vc_redesign STEP 6 Execute initial config primitive
2049 step
= "execute initial config primitive"
2051 # wait for dependent primitives execution (NS -> VNF -> VDU)
2052 if initial_config_primitive_list
:
2053 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2055 # stage, in function of element type: vdu, kdu, vnf or ns
2056 my_vca
= vca_deployed_list
[vca_index
]
2057 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2059 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2060 elif my_vca
.get("member-vnf-index"):
2062 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2065 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2067 self
._write
_configuration
_status
(
2068 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2071 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2073 check_if_terminated_needed
= True
2074 for initial_config_primitive
in initial_config_primitive_list
:
2075 # adding information on the vca_deployed if it is a NS execution environment
2076 if not vca_deployed
["member-vnf-index"]:
2077 deploy_params
["ns_config_info"] = json
.dumps(
2078 self
._get
_ns
_config
_info
(nsr_id
)
2080 # TODO check if already done
2081 primitive_params_
= self
._map
_primitive
_params
(
2082 initial_config_primitive
, {}, deploy_params
2085 step
= "execute primitive '{}' params '{}'".format(
2086 initial_config_primitive
["name"], primitive_params_
2088 self
.logger
.debug(logging_text
+ step
)
2089 await self
.vca_map
[vca_type
].exec_primitive(
2091 primitive_name
=initial_config_primitive
["name"],
2092 params_dict
=primitive_params_
,
2097 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2098 if check_if_terminated_needed
:
2099 if config_descriptor
.get("terminate-config-primitive"):
2101 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2103 check_if_terminated_needed
= False
2105 # TODO register in database that primitive is done
2107 # STEP 7 Configure metrics
2108 if vca_type
== "helm" or vca_type
== "helm-v3":
2109 # TODO: review for those cases where the helm chart is a reference and
2110 # is not part of the NF package
2111 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2113 artifact_path
=artifact_path
,
2114 ee_config_descriptor
=ee_config_descriptor
,
2117 target_ip
=rw_mgmt_ip
,
2118 element_type
=element_type
,
2119 vnf_member_index
=db_vnfr
.get("member-vnf-index-ref", ""),
2121 vdu_index
=vdu_index
,
2123 kdu_index
=kdu_index
,
2129 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2132 for job
in prometheus_jobs
:
2135 {"job_name": job
["job_name"]},
2138 fail_on_empty
=False,
2141 step
= "instantiated at VCA"
2142 self
.logger
.debug(logging_text
+ step
)
2144 self
._write
_configuration
_status
(
2145 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2148 except Exception as e
: # TODO not use Exception but N2VC exception
2149 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2151 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2154 "Exception while {} : {}".format(step
, e
), exc_info
=True
2156 self
._write
_configuration
_status
(
2157 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2159 raise LcmException("{}. {}".format(step
, e
)) from e
2161 def _write_ns_status(
2165 current_operation
: str,
2166 current_operation_id
: str,
2167 error_description
: str = None,
2168 error_detail
: str = None,
2169 other_update
: dict = None,
2172 Update db_nsr fields.
2175 :param current_operation:
2176 :param current_operation_id:
2177 :param error_description:
2178 :param error_detail:
2179 :param other_update: Other required changes at database if provided, will be cleared
2183 db_dict
= other_update
or {}
2186 ] = current_operation_id
# for backward compatibility
2187 db_dict
["_admin.current-operation"] = current_operation_id
2188 db_dict
["_admin.operation-type"] = (
2189 current_operation
if current_operation
!= "IDLE" else None
2191 db_dict
["currentOperation"] = current_operation
2192 db_dict
["currentOperationID"] = current_operation_id
2193 db_dict
["errorDescription"] = error_description
2194 db_dict
["errorDetail"] = error_detail
2197 db_dict
["nsState"] = ns_state
2198 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2199 except DbException
as e
:
2200 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2202 def _write_op_status(
2206 error_message
: str = None,
2207 queuePosition
: int = 0,
2208 operation_state
: str = None,
2209 other_update
: dict = None,
2212 db_dict
= other_update
or {}
2213 db_dict
["queuePosition"] = queuePosition
2214 if isinstance(stage
, list):
2215 db_dict
["stage"] = stage
[0]
2216 db_dict
["detailed-status"] = " ".join(stage
)
2217 elif stage
is not None:
2218 db_dict
["stage"] = str(stage
)
2220 if error_message
is not None:
2221 db_dict
["errorMessage"] = error_message
2222 if operation_state
is not None:
2223 db_dict
["operationState"] = operation_state
2224 db_dict
["statusEnteredTime"] = time()
2225 self
.update_db_2("nslcmops", op_id
, db_dict
)
2226 except DbException
as e
:
2228 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2231 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2233 nsr_id
= db_nsr
["_id"]
2234 # configurationStatus
2235 config_status
= db_nsr
.get("configurationStatus")
2238 "configurationStatus.{}.status".format(index
): status
2239 for index
, v
in enumerate(config_status
)
2243 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2245 except DbException
as e
:
2247 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2250 def _write_configuration_status(
2255 element_under_configuration
: str = None,
2256 element_type
: str = None,
2257 other_update
: dict = None,
2259 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2260 # .format(vca_index, status))
2263 db_path
= "configurationStatus.{}.".format(vca_index
)
2264 db_dict
= other_update
or {}
2266 db_dict
[db_path
+ "status"] = status
2267 if element_under_configuration
:
2269 db_path
+ "elementUnderConfiguration"
2270 ] = element_under_configuration
2272 db_dict
[db_path
+ "elementType"] = element_type
2273 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2274 except DbException
as e
:
2276 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2277 status
, nsr_id
, vca_index
, e
2281 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2283 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2284 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2285 Database is used because the result can be obtained from a different LCM worker in case of HA.
2286 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2287 :param db_nslcmop: database content of nslcmop
2288 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2289 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2290 computed 'vim-account-id'
2293 nslcmop_id
= db_nslcmop
["_id"]
2294 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2295 if placement_engine
== "PLA":
2297 logging_text
+ "Invoke and wait for placement optimization"
2299 await self
.msg
.aiowrite(
2300 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2302 db_poll_interval
= 5
2303 wait
= db_poll_interval
* 10
2305 while not pla_result
and wait
>= 0:
2306 await asyncio
.sleep(db_poll_interval
)
2307 wait
-= db_poll_interval
2308 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2309 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2313 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2316 for pla_vnf
in pla_result
["vnf"]:
2317 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2318 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2323 {"_id": vnfr
["_id"]},
2324 {"vim-account-id": pla_vnf
["vimAccountId"]},
2327 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2330 def update_nsrs_with_pla_result(self
, params
):
2332 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2334 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2336 except Exception as e
:
2337 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2339 async def instantiate(self
, nsr_id
, nslcmop_id
):
2342 :param nsr_id: ns instance to deploy
2343 :param nslcmop_id: operation to run
2347 # Try to lock HA task here
2348 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2349 if not task_is_locked_by_me
:
2351 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2355 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2356 self
.logger
.debug(logging_text
+ "Enter")
2358 # get all needed from database
2360 # database nsrs record
2363 # database nslcmops record
2366 # update operation on nsrs
2368 # update operation on nslcmops
2369 db_nslcmop_update
= {}
2371 timeout_ns_deploy
= self
.timeout
.ns_deploy
2373 nslcmop_operation_state
= None
2374 db_vnfrs
= {} # vnf's info indexed by member-index
2376 tasks_dict_info
= {} # from task to info text
2380 "Stage 1/5: preparation of the environment.",
2381 "Waiting for previous operations to terminate.",
2384 # ^ stage, step, VIM progress
2386 # wait for any previous tasks in process
2387 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2389 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2390 stage
[1] = "Reading from database."
2391 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2392 db_nsr_update
["detailed-status"] = "creating"
2393 db_nsr_update
["operational-status"] = "init"
2394 self
._write
_ns
_status
(
2396 ns_state
="BUILDING",
2397 current_operation
="INSTANTIATING",
2398 current_operation_id
=nslcmop_id
,
2399 other_update
=db_nsr_update
,
2401 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2403 # read from db: operation
2404 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2405 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2406 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2407 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2408 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2410 ns_params
= db_nslcmop
.get("operationParams")
2411 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2412 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2415 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2416 self
.logger
.debug(logging_text
+ stage
[1])
2417 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2418 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2419 self
.logger
.debug(logging_text
+ stage
[1])
2420 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2421 self
.fs
.sync(db_nsr
["nsd-id"])
2423 # nsr_name = db_nsr["name"] # TODO short-name??
2425 # read from db: vnf's of this ns
2426 stage
[1] = "Getting vnfrs from db."
2427 self
.logger
.debug(logging_text
+ stage
[1])
2428 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2430 # read from db: vnfd's for every vnf
2431 db_vnfds
= [] # every vnfd data
2433 # for each vnf in ns, read vnfd
2434 for vnfr
in db_vnfrs_list
:
2435 if vnfr
.get("kdur"):
2437 for kdur
in vnfr
["kdur"]:
2438 if kdur
.get("additionalParams"):
2439 kdur
["additionalParams"] = json
.loads(
2440 kdur
["additionalParams"]
2442 kdur_list
.append(kdur
)
2443 vnfr
["kdur"] = kdur_list
2445 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2446 vnfd_id
= vnfr
["vnfd-id"]
2447 vnfd_ref
= vnfr
["vnfd-ref"]
2448 self
.fs
.sync(vnfd_id
)
2450 # if we haven't this vnfd, read it from db
2451 if vnfd_id
not in db_vnfds
:
2453 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2456 self
.logger
.debug(logging_text
+ stage
[1])
2457 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2460 db_vnfds
.append(vnfd
)
2462 # Get or generates the _admin.deployed.VCA list
2463 vca_deployed_list
= None
2464 if db_nsr
["_admin"].get("deployed"):
2465 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2466 if vca_deployed_list
is None:
2467 vca_deployed_list
= []
2468 configuration_status_list
= []
2469 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2470 db_nsr_update
["configurationStatus"] = configuration_status_list
2471 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2472 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2473 elif isinstance(vca_deployed_list
, dict):
2474 # maintain backward compatibility. Change a dict to list at database
2475 vca_deployed_list
= list(vca_deployed_list
.values())
2476 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2477 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2480 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2482 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2483 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2485 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2486 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2487 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2489 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2492 # n2vc_redesign STEP 2 Deploy Network Scenario
2493 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2494 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2496 stage
[1] = "Deploying KDUs."
2497 # self.logger.debug(logging_text + "Before deploy_kdus")
2498 # Call to deploy_kdus in case exists the "vdu:kdu" param
2499 await self
.deploy_kdus(
2500 logging_text
=logging_text
,
2502 nslcmop_id
=nslcmop_id
,
2505 task_instantiation_info
=tasks_dict_info
,
2508 stage
[1] = "Getting VCA public key."
2509 # n2vc_redesign STEP 1 Get VCA public ssh-key
2510 # feature 1429. Add n2vc public key to needed VMs
2511 n2vc_key
= self
.n2vc
.get_public_key()
2512 n2vc_key_list
= [n2vc_key
]
2513 if self
.vca_config
.public_key
:
2514 n2vc_key_list
.append(self
.vca_config
.public_key
)
2516 stage
[1] = "Deploying NS at VIM."
2517 task_ro
= asyncio
.ensure_future(
2518 self
.instantiate_RO(
2519 logging_text
=logging_text
,
2523 db_nslcmop
=db_nslcmop
,
2526 n2vc_key_list
=n2vc_key_list
,
2530 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2531 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2533 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2534 stage
[1] = "Deploying Execution Environments."
2535 self
.logger
.debug(logging_text
+ stage
[1])
2537 # create namespace and certificate if any helm based EE is present in the NS
2538 if check_helm_ee_in_ns(db_vnfds
):
2539 # TODO: create EE namespace
2540 # create TLS certificates
2541 await self
.vca_map
["helm-v3"].create_tls_certificate(
2542 secret_name
="ee-tls-{}".format(nsr_id
),
2545 usage
="server auth",
2548 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2549 for vnf_profile
in get_vnf_profiles(nsd
):
2550 vnfd_id
= vnf_profile
["vnfd-id"]
2551 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2552 member_vnf_index
= str(vnf_profile
["id"])
2553 db_vnfr
= db_vnfrs
[member_vnf_index
]
2554 base_folder
= vnfd
["_admin"]["storage"]
2561 # Get additional parameters
2562 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2563 if db_vnfr
.get("additionalParamsForVnf"):
2564 deploy_params
.update(
2565 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2568 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2569 if descriptor_config
:
2571 logging_text
=logging_text
2572 + "member_vnf_index={} ".format(member_vnf_index
),
2575 nslcmop_id
=nslcmop_id
,
2581 member_vnf_index
=member_vnf_index
,
2582 vdu_index
=vdu_index
,
2583 kdu_index
=kdu_index
,
2585 deploy_params
=deploy_params
,
2586 descriptor_config
=descriptor_config
,
2587 base_folder
=base_folder
,
2588 task_instantiation_info
=tasks_dict_info
,
2592 # Deploy charms for each VDU that supports one.
2593 for vdud
in get_vdu_list(vnfd
):
2595 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2596 vdur
= find_in_list(
2597 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2600 if vdur
.get("additionalParams"):
2601 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2603 deploy_params_vdu
= deploy_params
2604 deploy_params_vdu
["OSM"] = get_osm_params(
2605 db_vnfr
, vdu_id
, vdu_count_index
=0
2607 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2609 self
.logger
.debug("VDUD > {}".format(vdud
))
2611 "Descriptor config > {}".format(descriptor_config
)
2613 if descriptor_config
:
2617 for vdu_index
in range(vdud_count
):
2618 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2620 logging_text
=logging_text
2621 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2622 member_vnf_index
, vdu_id
, vdu_index
2626 nslcmop_id
=nslcmop_id
,
2632 kdu_index
=kdu_index
,
2633 member_vnf_index
=member_vnf_index
,
2634 vdu_index
=vdu_index
,
2636 deploy_params
=deploy_params_vdu
,
2637 descriptor_config
=descriptor_config
,
2638 base_folder
=base_folder
,
2639 task_instantiation_info
=tasks_dict_info
,
2642 for kdud
in get_kdu_list(vnfd
):
2643 kdu_name
= kdud
["name"]
2644 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2645 if descriptor_config
:
2649 kdu_index
, kdur
= next(
2651 for x
in enumerate(db_vnfr
["kdur"])
2652 if x
[1]["kdu-name"] == kdu_name
2654 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2655 if kdur
.get("additionalParams"):
2656 deploy_params_kdu
.update(
2657 parse_yaml_strings(kdur
["additionalParams"].copy())
2661 logging_text
=logging_text
,
2664 nslcmop_id
=nslcmop_id
,
2670 member_vnf_index
=member_vnf_index
,
2671 vdu_index
=vdu_index
,
2672 kdu_index
=kdu_index
,
2674 deploy_params
=deploy_params_kdu
,
2675 descriptor_config
=descriptor_config
,
2676 base_folder
=base_folder
,
2677 task_instantiation_info
=tasks_dict_info
,
2681 # Check if this NS has a charm configuration
2682 descriptor_config
= nsd
.get("ns-configuration")
2683 if descriptor_config
and descriptor_config
.get("juju"):
2686 member_vnf_index
= None
2693 # Get additional parameters
2694 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2695 if db_nsr
.get("additionalParamsForNs"):
2696 deploy_params
.update(
2697 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2699 base_folder
= nsd
["_admin"]["storage"]
2701 logging_text
=logging_text
,
2704 nslcmop_id
=nslcmop_id
,
2710 member_vnf_index
=member_vnf_index
,
2711 vdu_index
=vdu_index
,
2712 kdu_index
=kdu_index
,
2714 deploy_params
=deploy_params
,
2715 descriptor_config
=descriptor_config
,
2716 base_folder
=base_folder
,
2717 task_instantiation_info
=tasks_dict_info
,
2721 # rest of staff will be done at finally
2724 ROclient
.ROClientException
,
2730 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2733 except asyncio
.CancelledError
:
2735 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2737 exc
= "Operation was cancelled"
2738 except Exception as e
:
2739 exc
= traceback
.format_exc()
2740 self
.logger
.critical(
2741 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2746 error_list
.append(str(exc
))
2748 # wait for pending tasks
2750 stage
[1] = "Waiting for instantiate pending tasks."
2751 self
.logger
.debug(logging_text
+ stage
[1])
2752 error_list
+= await self
._wait
_for
_tasks
(
2760 stage
[1] = stage
[2] = ""
2761 except asyncio
.CancelledError
:
2762 error_list
.append("Cancelled")
2763 # TODO cancel all tasks
2764 except Exception as exc
:
2765 error_list
.append(str(exc
))
2767 # update operation-status
2768 db_nsr_update
["operational-status"] = "running"
2769 # let's begin with VCA 'configured' status (later we can change it)
2770 db_nsr_update
["config-status"] = "configured"
2771 for task
, task_name
in tasks_dict_info
.items():
2772 if not task
.done() or task
.cancelled() or task
.exception():
2773 if task_name
.startswith(self
.task_name_deploy_vca
):
2774 # A N2VC task is pending
2775 db_nsr_update
["config-status"] = "failed"
2777 # RO or KDU task is pending
2778 db_nsr_update
["operational-status"] = "failed"
2780 # update status at database
2782 error_detail
= ". ".join(error_list
)
2783 self
.logger
.error(logging_text
+ error_detail
)
2784 error_description_nslcmop
= "{} Detail: {}".format(
2785 stage
[0], error_detail
2787 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2788 nslcmop_id
, stage
[0]
2791 db_nsr_update
["detailed-status"] = (
2792 error_description_nsr
+ " Detail: " + error_detail
2794 db_nslcmop_update
["detailed-status"] = error_detail
2795 nslcmop_operation_state
= "FAILED"
2799 error_description_nsr
= error_description_nslcmop
= None
2801 db_nsr_update
["detailed-status"] = "Done"
2802 db_nslcmop_update
["detailed-status"] = "Done"
2803 nslcmop_operation_state
= "COMPLETED"
2806 self
._write
_ns
_status
(
2809 current_operation
="IDLE",
2810 current_operation_id
=None,
2811 error_description
=error_description_nsr
,
2812 error_detail
=error_detail
,
2813 other_update
=db_nsr_update
,
2815 self
._write
_op
_status
(
2818 error_message
=error_description_nslcmop
,
2819 operation_state
=nslcmop_operation_state
,
2820 other_update
=db_nslcmop_update
,
2823 if nslcmop_operation_state
:
2825 await self
.msg
.aiowrite(
2830 "nslcmop_id": nslcmop_id
,
2831 "operationState": nslcmop_operation_state
,
2835 except Exception as e
:
2837 logging_text
+ "kafka_write notification Exception {}".format(e
)
2840 self
.logger
.debug(logging_text
+ "Exit")
2841 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2843 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2844 if vnfd_id
not in cached_vnfds
:
2845 cached_vnfds
[vnfd_id
] = self
.db
.get_one(
2846 "vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
}
2848 return cached_vnfds
[vnfd_id
]
2850 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2851 if vnf_profile_id
not in cached_vnfrs
:
2852 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2855 "member-vnf-index-ref": vnf_profile_id
,
2856 "nsr-id-ref": nsr_id
,
2859 return cached_vnfrs
[vnf_profile_id
]
2861 def _is_deployed_vca_in_relation(
2862 self
, vca
: DeployedVCA
, relation
: Relation
2865 for endpoint
in (relation
.provider
, relation
.requirer
):
2866 if endpoint
["kdu-resource-profile-id"]:
2869 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2870 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2871 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2877 def _update_ee_relation_data_with_implicit_data(
2878 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2880 ee_relation_data
= safe_get_ee_relation(
2881 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2883 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2884 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2885 "execution-environment-ref"
2887 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2888 vnfd_id
= vnf_profile
["vnfd-id"]
2889 project
= nsd
["_admin"]["projects_read"][0]
2890 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2893 if ee_relation_level
== EELevel
.VNF
2894 else ee_relation_data
["vdu-profile-id"]
2896 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2899 f
"not execution environments found for ee_relation {ee_relation_data}"
2901 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2902 return ee_relation_data
2904 def _get_ns_relations(
2907 nsd
: Dict
[str, Any
],
2909 cached_vnfds
: Dict
[str, Any
],
2910 ) -> List
[Relation
]:
2912 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2913 for r
in db_ns_relations
:
2914 provider_dict
= None
2915 requirer_dict
= None
2916 if all(key
in r
for key
in ("provider", "requirer")):
2917 provider_dict
= r
["provider"]
2918 requirer_dict
= r
["requirer"]
2919 elif "entities" in r
:
2920 provider_id
= r
["entities"][0]["id"]
2923 "endpoint": r
["entities"][0]["endpoint"],
2925 if provider_id
!= nsd
["id"]:
2926 provider_dict
["vnf-profile-id"] = provider_id
2927 requirer_id
= r
["entities"][1]["id"]
2930 "endpoint": r
["entities"][1]["endpoint"],
2932 if requirer_id
!= nsd
["id"]:
2933 requirer_dict
["vnf-profile-id"] = requirer_id
2936 "provider/requirer or entities must be included in the relation."
2938 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2939 nsr_id
, nsd
, provider_dict
, cached_vnfds
2941 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2942 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2944 provider
= EERelation(relation_provider
)
2945 requirer
= EERelation(relation_requirer
)
2946 relation
= Relation(r
["name"], provider
, requirer
)
2947 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2949 relations
.append(relation
)
2952 def _get_vnf_relations(
2955 nsd
: Dict
[str, Any
],
2957 cached_vnfds
: Dict
[str, Any
],
2958 ) -> List
[Relation
]:
2960 if vca
.target_element
== "ns":
2961 self
.logger
.debug("VCA is a NS charm, not a VNF.")
2963 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2964 vnf_profile_id
= vnf_profile
["id"]
2965 vnfd_id
= vnf_profile
["vnfd-id"]
2966 project
= nsd
["_admin"]["projects_read"][0]
2967 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2968 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2969 for r
in db_vnf_relations
:
2970 provider_dict
= None
2971 requirer_dict
= None
2972 if all(key
in r
for key
in ("provider", "requirer")):
2973 provider_dict
= r
["provider"]
2974 requirer_dict
= r
["requirer"]
2975 elif "entities" in r
:
2976 provider_id
= r
["entities"][0]["id"]
2979 "vnf-profile-id": vnf_profile_id
,
2980 "endpoint": r
["entities"][0]["endpoint"],
2982 if provider_id
!= vnfd_id
:
2983 provider_dict
["vdu-profile-id"] = provider_id
2984 requirer_id
= r
["entities"][1]["id"]
2987 "vnf-profile-id": vnf_profile_id
,
2988 "endpoint": r
["entities"][1]["endpoint"],
2990 if requirer_id
!= vnfd_id
:
2991 requirer_dict
["vdu-profile-id"] = requirer_id
2994 "provider/requirer or entities must be included in the relation."
2996 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2997 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2999 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3000 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3002 provider
= EERelation(relation_provider
)
3003 requirer
= EERelation(relation_requirer
)
3004 relation
= Relation(r
["name"], provider
, requirer
)
3005 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3007 relations
.append(relation
)
3010 def _get_kdu_resource_data(
3012 ee_relation
: EERelation
,
3013 db_nsr
: Dict
[str, Any
],
3014 cached_vnfds
: Dict
[str, Any
],
3015 ) -> DeployedK8sResource
:
3016 nsd
= get_nsd(db_nsr
)
3017 vnf_profiles
= get_vnf_profiles(nsd
)
3018 vnfd_id
= find_in_list(
3020 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3022 project
= nsd
["_admin"]["projects_read"][0]
3023 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3024 kdu_resource_profile
= get_kdu_resource_profile(
3025 db_vnfd
, ee_relation
.kdu_resource_profile_id
3027 kdu_name
= kdu_resource_profile
["kdu-name"]
3028 deployed_kdu
, _
= get_deployed_kdu(
3029 db_nsr
.get("_admin", ()).get("deployed", ()),
3031 ee_relation
.vnf_profile_id
,
3033 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3036 def _get_deployed_component(
3038 ee_relation
: EERelation
,
3039 db_nsr
: Dict
[str, Any
],
3040 cached_vnfds
: Dict
[str, Any
],
3041 ) -> DeployedComponent
:
3042 nsr_id
= db_nsr
["_id"]
3043 deployed_component
= None
3044 ee_level
= EELevel
.get_level(ee_relation
)
3045 if ee_level
== EELevel
.NS
:
3046 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3048 deployed_component
= DeployedVCA(nsr_id
, vca
)
3049 elif ee_level
== EELevel
.VNF
:
3050 vca
= get_deployed_vca(
3054 "member-vnf-index": ee_relation
.vnf_profile_id
,
3055 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3059 deployed_component
= DeployedVCA(nsr_id
, vca
)
3060 elif ee_level
== EELevel
.VDU
:
3061 vca
= get_deployed_vca(
3064 "vdu_id": ee_relation
.vdu_profile_id
,
3065 "member-vnf-index": ee_relation
.vnf_profile_id
,
3066 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3070 deployed_component
= DeployedVCA(nsr_id
, vca
)
3071 elif ee_level
== EELevel
.KDU
:
3072 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3073 ee_relation
, db_nsr
, cached_vnfds
3075 if kdu_resource_data
:
3076 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3077 return deployed_component
3079 async def _add_relation(
3083 db_nsr
: Dict
[str, Any
],
3084 cached_vnfds
: Dict
[str, Any
],
3085 cached_vnfrs
: Dict
[str, Any
],
3087 deployed_provider
= self
._get
_deployed
_component
(
3088 relation
.provider
, db_nsr
, cached_vnfds
3090 deployed_requirer
= self
._get
_deployed
_component
(
3091 relation
.requirer
, db_nsr
, cached_vnfds
3095 and deployed_requirer
3096 and deployed_provider
.config_sw_installed
3097 and deployed_requirer
.config_sw_installed
3099 provider_db_vnfr
= (
3101 relation
.provider
.nsr_id
,
3102 relation
.provider
.vnf_profile_id
,
3105 if relation
.provider
.vnf_profile_id
3108 requirer_db_vnfr
= (
3110 relation
.requirer
.nsr_id
,
3111 relation
.requirer
.vnf_profile_id
,
3114 if relation
.requirer
.vnf_profile_id
3117 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3118 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3119 provider_relation_endpoint
= RelationEndpoint(
3120 deployed_provider
.ee_id
,
3122 relation
.provider
.endpoint
,
3124 requirer_relation_endpoint
= RelationEndpoint(
3125 deployed_requirer
.ee_id
,
3127 relation
.requirer
.endpoint
,
3130 await self
.vca_map
[vca_type
].add_relation(
3131 provider
=provider_relation_endpoint
,
3132 requirer
=requirer_relation_endpoint
,
3134 except N2VCException
as exception
:
3135 self
.logger
.error(exception
)
3136 raise LcmException(exception
)
3140 async def _add_vca_relations(
3146 timeout
: int = 3600,
3149 # 1. find all relations for this VCA
3150 # 2. wait for other peers related
3154 # STEP 1: find all relations for this VCA
3157 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3158 nsd
= get_nsd(db_nsr
)
3161 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3162 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3167 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3168 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3170 # if no relations, terminate
3172 self
.logger
.debug(logging_text
+ " No relations")
3175 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3182 if now
- start
>= timeout
:
3183 self
.logger
.error(logging_text
+ " : timeout adding relations")
3186 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3187 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3189 # for each relation, find the VCA's related
3190 for relation
in relations
.copy():
3191 added
= await self
._add
_relation
(
3199 relations
.remove(relation
)
3202 self
.logger
.debug("Relations added")
3204 await asyncio
.sleep(5.0)
3208 except Exception as e
:
3209 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3212 async def _install_kdu(
3220 k8s_instance_info
: dict,
3221 k8params
: dict = None,
3226 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3229 "collection": "nsrs",
3230 "filter": {"_id": nsr_id
},
3231 "path": nsr_db_path
,
3234 if k8s_instance_info
.get("kdu-deployment-name"):
3235 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3237 kdu_instance
= self
.k8scluster_map
[
3239 ].generate_kdu_instance_name(
3240 db_dict
=db_dict_install
,
3241 kdu_model
=k8s_instance_info
["kdu-model"],
3242 kdu_name
=k8s_instance_info
["kdu-name"],
3245 # Update the nsrs table with the kdu-instance value
3249 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3252 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3253 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3254 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3255 # namespace, this first verification could be removed, and the next step would be done for any kind
3257 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3258 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3259 if k8sclustertype
in ("juju", "juju-bundle"):
3260 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3261 # that the user passed a namespace which he wants its KDU to be deployed in)
3267 "_admin.projects_write": k8s_instance_info
["namespace"],
3268 "_admin.projects_read": k8s_instance_info
["namespace"],
3274 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3279 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3281 k8s_instance_info
["namespace"] = kdu_instance
3283 await self
.k8scluster_map
[k8sclustertype
].install(
3284 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3285 kdu_model
=k8s_instance_info
["kdu-model"],
3288 db_dict
=db_dict_install
,
3290 kdu_name
=k8s_instance_info
["kdu-name"],
3291 namespace
=k8s_instance_info
["namespace"],
3292 kdu_instance
=kdu_instance
,
3296 # Obtain services to obtain management service ip
3297 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3298 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3299 kdu_instance
=kdu_instance
,
3300 namespace
=k8s_instance_info
["namespace"],
3303 # Obtain management service info (if exists)
3304 vnfr_update_dict
= {}
3305 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3307 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3312 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3315 for service
in kdud
.get("service", [])
3316 if service
.get("mgmt-service")
3318 for mgmt_service
in mgmt_services
:
3319 for service
in services
:
3320 if service
["name"].startswith(mgmt_service
["name"]):
3321 # Mgmt service found, Obtain service ip
3322 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3323 if isinstance(ip
, list) and len(ip
) == 1:
3327 "kdur.{}.ip-address".format(kdu_index
)
3330 # Check if must update also mgmt ip at the vnf
3331 service_external_cp
= mgmt_service
.get(
3332 "external-connection-point-ref"
3334 if service_external_cp
:
3336 deep_get(vnfd
, ("mgmt-interface", "cp"))
3337 == service_external_cp
3339 vnfr_update_dict
["ip-address"] = ip
3344 "external-connection-point-ref", ""
3346 == service_external_cp
,
3349 "kdur.{}.ip-address".format(kdu_index
)
3354 "Mgmt service name: {} not found".format(
3355 mgmt_service
["name"]
3359 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3360 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3362 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3365 and kdu_config
.get("initial-config-primitive")
3366 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3368 initial_config_primitive_list
= kdu_config
.get(
3369 "initial-config-primitive"
3371 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3373 for initial_config_primitive
in initial_config_primitive_list
:
3374 primitive_params_
= self
._map
_primitive
_params
(
3375 initial_config_primitive
, {}, {}
3378 await asyncio
.wait_for(
3379 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3380 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3381 kdu_instance
=kdu_instance
,
3382 primitive_name
=initial_config_primitive
["name"],
3383 params
=primitive_params_
,
3384 db_dict
=db_dict_install
,
3390 except Exception as e
:
3391 # Prepare update db with error and raise exception
3394 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3398 vnfr_data
.get("_id"),
3399 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3402 # ignore to keep original exception
3404 # reraise original error
3409 async def deploy_kdus(
3416 task_instantiation_info
,
3418 # Launch kdus if present in the descriptor
3420 k8scluster_id_2_uuic
= {
3421 "helm-chart-v3": {},
3426 async def _get_cluster_id(cluster_id
, cluster_type
):
3427 nonlocal k8scluster_id_2_uuic
3428 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3429 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3431 # check if K8scluster is creating and wait look if previous tasks in process
3432 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3433 "k8scluster", cluster_id
3436 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3437 task_name
, cluster_id
3439 self
.logger
.debug(logging_text
+ text
)
3440 await asyncio
.wait(task_dependency
, timeout
=3600)
3442 db_k8scluster
= self
.db
.get_one(
3443 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3445 if not db_k8scluster
:
3446 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3448 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3450 if cluster_type
== "helm-chart-v3":
3452 # backward compatibility for existing clusters that have not been initialized for helm v3
3453 k8s_credentials
= yaml
.safe_dump(
3454 db_k8scluster
.get("credentials")
3456 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3457 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3459 db_k8scluster_update
= {}
3460 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3461 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3462 db_k8scluster_update
[
3463 "_admin.helm-chart-v3.created"
3465 db_k8scluster_update
[
3466 "_admin.helm-chart-v3.operationalState"
3469 "k8sclusters", cluster_id
, db_k8scluster_update
3471 except Exception as e
:
3474 + "error initializing helm-v3 cluster: {}".format(str(e
))
3477 "K8s cluster '{}' has not been initialized for '{}'".format(
3478 cluster_id
, cluster_type
3483 "K8s cluster '{}' has not been initialized for '{}'".format(
3484 cluster_id
, cluster_type
3487 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3490 logging_text
+= "Deploy kdus: "
3493 db_nsr_update
= {"_admin.deployed.K8s": []}
3494 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3497 updated_cluster_list
= []
3498 updated_v3_cluster_list
= []
3500 for vnfr_data
in db_vnfrs
.values():
3501 vca_id
= self
.get_vca_id(vnfr_data
, {})
3502 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3503 # Step 0: Prepare and set parameters
3504 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3505 vnfd_id
= vnfr_data
.get("vnfd-id")
3506 vnfd_with_id
= find_in_list(
3507 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3511 for kdud
in vnfd_with_id
["kdu"]
3512 if kdud
["name"] == kdur
["kdu-name"]
3514 namespace
= kdur
.get("k8s-namespace")
3515 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3516 if kdur
.get("helm-chart"):
3517 kdumodel
= kdur
["helm-chart"]
3518 # Default version: helm3, if helm-version is v2 assign v2
3519 k8sclustertype
= "helm-chart-v3"
3520 self
.logger
.debug("kdur: {}".format(kdur
))
3522 kdur
.get("helm-version")
3523 and kdur
.get("helm-version") == "v2"
3525 k8sclustertype
= "helm-chart"
3526 elif kdur
.get("juju-bundle"):
3527 kdumodel
= kdur
["juju-bundle"]
3528 k8sclustertype
= "juju-bundle"
3531 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3532 "juju-bundle. Maybe an old NBI version is running".format(
3533 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3536 # check if kdumodel is a file and exists
3538 vnfd_with_id
= find_in_list(
3539 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3541 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3542 if storage
: # may be not present if vnfd has not artifacts
3543 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3544 if storage
["pkg-dir"]:
3545 filename
= "{}/{}/{}s/{}".format(
3552 filename
= "{}/Scripts/{}s/{}".format(
3557 if self
.fs
.file_exists(
3558 filename
, mode
="file"
3559 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3560 kdumodel
= self
.fs
.path
+ filename
3561 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3563 except Exception: # it is not a file
3566 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3567 step
= "Synchronize repos for k8s cluster '{}'".format(
3570 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3574 k8sclustertype
== "helm-chart"
3575 and cluster_uuid
not in updated_cluster_list
3577 k8sclustertype
== "helm-chart-v3"
3578 and cluster_uuid
not in updated_v3_cluster_list
3580 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3581 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3582 cluster_uuid
=cluster_uuid
3585 if del_repo_list
or added_repo_dict
:
3586 if k8sclustertype
== "helm-chart":
3588 "_admin.helm_charts_added." + item
: None
3589 for item
in del_repo_list
3592 "_admin.helm_charts_added." + item
: name
3593 for item
, name
in added_repo_dict
.items()
3595 updated_cluster_list
.append(cluster_uuid
)
3596 elif k8sclustertype
== "helm-chart-v3":
3598 "_admin.helm_charts_v3_added." + item
: None
3599 for item
in del_repo_list
3602 "_admin.helm_charts_v3_added." + item
: name
3603 for item
, name
in added_repo_dict
.items()
3605 updated_v3_cluster_list
.append(cluster_uuid
)
3607 logging_text
+ "repos synchronized on k8s cluster "
3608 "'{}' to_delete: {}, to_add: {}".format(
3609 k8s_cluster_id
, del_repo_list
, added_repo_dict
3614 {"_id": k8s_cluster_id
},
3620 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3621 vnfr_data
["member-vnf-index-ref"],
3625 k8s_instance_info
= {
3626 "kdu-instance": None,
3627 "k8scluster-uuid": cluster_uuid
,
3628 "k8scluster-type": k8sclustertype
,
3629 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3630 "kdu-name": kdur
["kdu-name"],
3631 "kdu-model": kdumodel
,
3632 "namespace": namespace
,
3633 "kdu-deployment-name": kdu_deployment_name
,
3635 db_path
= "_admin.deployed.K8s.{}".format(index
)
3636 db_nsr_update
[db_path
] = k8s_instance_info
3637 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3638 vnfd_with_id
= find_in_list(
3639 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3641 task
= asyncio
.ensure_future(
3650 k8params
=desc_params
,
3655 self
.lcm_tasks
.register(
3659 "instantiate_KDU-{}".format(index
),
3662 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3668 except (LcmException
, asyncio
.CancelledError
):
3670 except Exception as e
:
3671 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3672 if isinstance(e
, (N2VCException
, DbException
)):
3673 self
.logger
.error(logging_text
+ msg
)
3675 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3676 raise LcmException(msg
)
3679 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3699 task_instantiation_info
,
3702 # launch instantiate_N2VC in a asyncio task and register task object
3703 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3704 # if not found, create one entry and update database
3705 # fill db_nsr._admin.deployed.VCA.<index>
3708 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3712 get_charm_name
= False
3713 if "execution-environment-list" in descriptor_config
:
3714 ee_list
= descriptor_config
.get("execution-environment-list", [])
3715 elif "juju" in descriptor_config
:
3716 ee_list
= [descriptor_config
] # ns charms
3717 if "execution-environment-list" not in descriptor_config
:
3718 # charm name is only required for ns charms
3719 get_charm_name
= True
3720 else: # other types as script are not supported
3723 for ee_item
in ee_list
:
3726 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3727 ee_item
.get("juju"), ee_item
.get("helm-chart")
3730 ee_descriptor_id
= ee_item
.get("id")
3731 if ee_item
.get("juju"):
3732 vca_name
= ee_item
["juju"].get("charm")
3734 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3737 if ee_item
["juju"].get("charm") is not None
3740 if ee_item
["juju"].get("cloud") == "k8s":
3741 vca_type
= "k8s_proxy_charm"
3742 elif ee_item
["juju"].get("proxy") is False:
3743 vca_type
= "native_charm"
3744 elif ee_item
.get("helm-chart"):
3745 vca_name
= ee_item
["helm-chart"]
3746 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3749 vca_type
= "helm-v3"
3752 logging_text
+ "skipping non juju neither charm configuration"
3757 for vca_index
, vca_deployed
in enumerate(
3758 db_nsr
["_admin"]["deployed"]["VCA"]
3760 if not vca_deployed
:
3763 vca_deployed
.get("member-vnf-index") == member_vnf_index
3764 and vca_deployed
.get("vdu_id") == vdu_id
3765 and vca_deployed
.get("kdu_name") == kdu_name
3766 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3767 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3771 # not found, create one.
3773 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3776 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3778 target
+= "/kdu/{}".format(kdu_name
)
3780 "target_element": target
,
3781 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3782 "member-vnf-index": member_vnf_index
,
3784 "kdu_name": kdu_name
,
3785 "vdu_count_index": vdu_index
,
3786 "operational-status": "init", # TODO revise
3787 "detailed-status": "", # TODO revise
3788 "step": "initial-deploy", # TODO revise
3790 "vdu_name": vdu_name
,
3792 "ee_descriptor_id": ee_descriptor_id
,
3793 "charm_name": charm_name
,
3797 # create VCA and configurationStatus in db
3799 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3800 "configurationStatus.{}".format(vca_index
): dict(),
3802 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3804 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3806 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3807 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3808 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3811 task_n2vc
= asyncio
.ensure_future(
3812 self
.instantiate_N2VC(
3813 logging_text
=logging_text
,
3814 vca_index
=vca_index
,
3820 vdu_index
=vdu_index
,
3821 kdu_index
=kdu_index
,
3822 deploy_params
=deploy_params
,
3823 config_descriptor
=descriptor_config
,
3824 base_folder
=base_folder
,
3825 nslcmop_id
=nslcmop_id
,
3829 ee_config_descriptor
=ee_item
,
3832 self
.lcm_tasks
.register(
3836 "instantiate_N2VC-{}".format(vca_index
),
3839 task_instantiation_info
[
3841 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3842 member_vnf_index
or "", vdu_id
or ""
3846 def _create_nslcmop(nsr_id
, operation
, params
):
3848 Creates a ns-lcm-opp content to be stored at database.
3849 :param nsr_id: internal id of the instance
3850 :param operation: instantiate, terminate, scale, action, ...
3851 :param params: user parameters for the operation
3852 :return: dictionary following SOL005 format
3854 # Raise exception if invalid arguments
3855 if not (nsr_id
and operation
and params
):
3857 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3864 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3865 "operationState": "PROCESSING",
3866 "statusEnteredTime": now
,
3867 "nsInstanceId": nsr_id
,
3868 "lcmOperationType": operation
,
3870 "isAutomaticInvocation": False,
3871 "operationParams": params
,
3872 "isCancelPending": False,
3874 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3875 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3880 def _format_additional_params(self
, params
):
3881 params
= params
or {}
3882 for key
, value
in params
.items():
3883 if str(value
).startswith("!!yaml "):
3884 params
[key
] = yaml
.safe_load(value
[7:])
3887 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3888 primitive
= seq
.get("name")
3889 primitive_params
= {}
3891 "member_vnf_index": vnf_index
,
3892 "primitive": primitive
,
3893 "primitive_params": primitive_params
,
3896 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3900 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3901 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3902 if op
.get("operationState") == "COMPLETED":
3903 # b. Skip sub-operation
3904 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3905 return self
.SUBOPERATION_STATUS_SKIP
3907 # c. retry executing sub-operation
3908 # The sub-operation exists, and operationState != 'COMPLETED'
3909 # Update operationState = 'PROCESSING' to indicate a retry.
3910 operationState
= "PROCESSING"
3911 detailed_status
= "In progress"
3912 self
._update
_suboperation
_status
(
3913 db_nslcmop
, op_index
, operationState
, detailed_status
3915 # Return the sub-operation index
3916 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3917 # with arguments extracted from the sub-operation
3920 # Find a sub-operation where all keys in a matching dictionary must match
3921 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3922 def _find_suboperation(self
, db_nslcmop
, match
):
3923 if db_nslcmop
and match
:
3924 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3925 for i
, op
in enumerate(op_list
):
3926 if all(op
.get(k
) == match
[k
] for k
in match
):
3928 return self
.SUBOPERATION_STATUS_NOT_FOUND
3930 # Update status for a sub-operation given its index
3931 def _update_suboperation_status(
3932 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3934 # Update DB for HA tasks
3935 q_filter
= {"_id": db_nslcmop
["_id"]}
3937 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3938 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3941 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3944 # Add sub-operation, return the index of the added sub-operation
3945 # Optionally, set operationState, detailed-status, and operationType
3946 # Status and type are currently set for 'scale' sub-operations:
3947 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3948 # 'detailed-status' : status message
3949 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3950 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3951 def _add_suboperation(
3959 mapped_primitive_params
,
3960 operationState
=None,
3961 detailed_status
=None,
3964 RO_scaling_info
=None,
3967 return self
.SUBOPERATION_STATUS_NOT_FOUND
3968 # Get the "_admin.operations" list, if it exists
3969 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3970 op_list
= db_nslcmop_admin
.get("operations")
3971 # Create or append to the "_admin.operations" list
3973 "member_vnf_index": vnf_index
,
3975 "vdu_count_index": vdu_count_index
,
3976 "primitive": primitive
,
3977 "primitive_params": mapped_primitive_params
,
3980 new_op
["operationState"] = operationState
3982 new_op
["detailed-status"] = detailed_status
3984 new_op
["lcmOperationType"] = operationType
3986 new_op
["RO_nsr_id"] = RO_nsr_id
3988 new_op
["RO_scaling_info"] = RO_scaling_info
3990 # No existing operations, create key 'operations' with current operation as first list element
3991 db_nslcmop_admin
.update({"operations": [new_op
]})
3992 op_list
= db_nslcmop_admin
.get("operations")
3994 # Existing operations, append operation to list
3995 op_list
.append(new_op
)
3997 db_nslcmop_update
= {"_admin.operations": op_list
}
3998 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3999 op_index
= len(op_list
) - 1
4002 # Helper methods for scale() sub-operations
4004 # pre-scale/post-scale:
4005 # Check for 3 different cases:
4006 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4007 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4008 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4009 def _check_or_add_scale_suboperation(
4013 vnf_config_primitive
,
4017 RO_scaling_info
=None,
4019 # Find this sub-operation
4020 if RO_nsr_id
and RO_scaling_info
:
4021 operationType
= "SCALE-RO"
4023 "member_vnf_index": vnf_index
,
4024 "RO_nsr_id": RO_nsr_id
,
4025 "RO_scaling_info": RO_scaling_info
,
4029 "member_vnf_index": vnf_index
,
4030 "primitive": vnf_config_primitive
,
4031 "primitive_params": primitive_params
,
4032 "lcmOperationType": operationType
,
4034 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4035 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4036 # a. New sub-operation
4037 # The sub-operation does not exist, add it.
4038 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4039 # The following parameters are set to None for all kind of scaling:
4041 vdu_count_index
= None
4043 if RO_nsr_id
and RO_scaling_info
:
4044 vnf_config_primitive
= None
4045 primitive_params
= None
4048 RO_scaling_info
= None
4049 # Initial status for sub-operation
4050 operationState
= "PROCESSING"
4051 detailed_status
= "In progress"
4052 # Add sub-operation for pre/post-scaling (zero or more operations)
4053 self
._add
_suboperation
(
4059 vnf_config_primitive
,
4067 return self
.SUBOPERATION_STATUS_NEW
4069 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4070 # or op_index (operationState != 'COMPLETED')
4071 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4073 # Function to return execution_environment id
4075 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4076 # TODO vdu_index_count
4077 for vca
in vca_deployed_list
:
4078 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4081 async def destroy_N2VC(
4089 exec_primitives
=True,
4094 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4095 :param logging_text:
4097 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4098 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4099 :param vca_index: index in the database _admin.deployed.VCA
4100 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4101 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4102 not executed properly
4103 :param scaling_in: True destroys the application, False destroys the model
4104 :return: None or exception
4109 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4110 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4114 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4116 # execute terminate_primitives
4118 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4119 config_descriptor
.get("terminate-config-primitive"),
4120 vca_deployed
.get("ee_descriptor_id"),
4122 vdu_id
= vca_deployed
.get("vdu_id")
4123 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4124 vdu_name
= vca_deployed
.get("vdu_name")
4125 vnf_index
= vca_deployed
.get("member-vnf-index")
4126 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4127 for seq
in terminate_primitives
:
4128 # For each sequence in list, get primitive and call _ns_execute_primitive()
4129 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4130 vnf_index
, seq
.get("name")
4132 self
.logger
.debug(logging_text
+ step
)
4133 # Create the primitive for each sequence, i.e. "primitive": "touch"
4134 primitive
= seq
.get("name")
4135 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4140 self
._add
_suboperation
(
4147 mapped_primitive_params
,
4149 # Sub-operations: Call _ns_execute_primitive() instead of action()
4151 result
, result_detail
= await self
._ns
_execute
_primitive
(
4152 vca_deployed
["ee_id"],
4154 mapped_primitive_params
,
4158 except LcmException
:
4159 # this happens when VCA is not deployed. In this case it is not needed to terminate
4161 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4162 if result
not in result_ok
:
4164 "terminate_primitive {} for vnf_member_index={} fails with "
4165 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4167 # set that this VCA do not need terminated
4168 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4172 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4175 # Delete Prometheus Jobs if any
4176 # This uses NSR_ID, so it will destroy any jobs under this index
4177 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4180 await self
.vca_map
[vca_type
].delete_execution_environment(
4181 vca_deployed
["ee_id"],
4182 scaling_in
=scaling_in
,
4187 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4188 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4189 namespace
= "." + db_nsr
["_id"]
4191 await self
.n2vc
.delete_namespace(
4192 namespace
=namespace
,
4193 total_timeout
=self
.timeout
.charm_delete
,
4196 except N2VCNotFound
: # already deleted. Skip
4198 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4200 async def terminate(self
, nsr_id
, nslcmop_id
):
4201 # Try to lock HA task here
4202 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4203 if not task_is_locked_by_me
:
4206 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4207 self
.logger
.debug(logging_text
+ "Enter")
4208 timeout_ns_terminate
= self
.timeout
.ns_terminate
4211 operation_params
= None
4213 error_list
= [] # annotates all failed error messages
4214 db_nslcmop_update
= {}
4215 autoremove
= False # autoremove after terminated
4216 tasks_dict_info
= {}
4219 "Stage 1/3: Preparing task.",
4220 "Waiting for previous operations to terminate.",
4223 # ^ contains [stage, step, VIM-status]
4225 # wait for any previous tasks in process
4226 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4228 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4229 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4230 operation_params
= db_nslcmop
.get("operationParams") or {}
4231 if operation_params
.get("timeout_ns_terminate"):
4232 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4233 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4234 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4236 db_nsr_update
["operational-status"] = "terminating"
4237 db_nsr_update
["config-status"] = "terminating"
4238 self
._write
_ns
_status
(
4240 ns_state
="TERMINATING",
4241 current_operation
="TERMINATING",
4242 current_operation_id
=nslcmop_id
,
4243 other_update
=db_nsr_update
,
4245 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4246 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4247 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4250 stage
[1] = "Getting vnf descriptors from db."
4251 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4253 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4255 db_vnfds_from_id
= {}
4256 db_vnfds_from_member_index
= {}
4258 for vnfr
in db_vnfrs_list
:
4259 vnfd_id
= vnfr
["vnfd-id"]
4260 if vnfd_id
not in db_vnfds_from_id
:
4261 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4262 db_vnfds_from_id
[vnfd_id
] = vnfd
4263 db_vnfds_from_member_index
[
4264 vnfr
["member-vnf-index-ref"]
4265 ] = db_vnfds_from_id
[vnfd_id
]
4267 # Destroy individual execution environments when there are terminating primitives.
4268 # Rest of EE will be deleted at once
4269 # TODO - check before calling _destroy_N2VC
4270 # if not operation_params.get("skip_terminate_primitives"):#
4271 # or not vca.get("needed_terminate"):
4272 stage
[0] = "Stage 2/3 execute terminating primitives."
4273 self
.logger
.debug(logging_text
+ stage
[0])
4274 stage
[1] = "Looking execution environment that needs terminate."
4275 self
.logger
.debug(logging_text
+ stage
[1])
4277 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4278 config_descriptor
= None
4279 vca_member_vnf_index
= vca
.get("member-vnf-index")
4280 vca_id
= self
.get_vca_id(
4281 db_vnfrs_dict
.get(vca_member_vnf_index
)
4282 if vca_member_vnf_index
4286 if not vca
or not vca
.get("ee_id"):
4288 if not vca
.get("member-vnf-index"):
4290 config_descriptor
= db_nsr
.get("ns-configuration")
4291 elif vca
.get("vdu_id"):
4292 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4293 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4294 elif vca
.get("kdu_name"):
4295 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4296 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4298 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4299 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4300 vca_type
= vca
.get("type")
4301 exec_terminate_primitives
= not operation_params
.get(
4302 "skip_terminate_primitives"
4303 ) and vca
.get("needed_terminate")
4304 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4305 # pending native charms
4307 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4309 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4310 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4311 task
= asyncio
.ensure_future(
4319 exec_terminate_primitives
,
4323 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4325 # wait for pending tasks of terminate primitives
4329 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4331 error_list
= await self
._wait
_for
_tasks
(
4334 min(self
.timeout
.charm_delete
, timeout_ns_terminate
),
4338 tasks_dict_info
.clear()
4340 return # raise LcmException("; ".join(error_list))
4342 # remove All execution environments at once
4343 stage
[0] = "Stage 3/3 delete all."
4345 if nsr_deployed
.get("VCA"):
4346 stage
[1] = "Deleting all execution environments."
4347 self
.logger
.debug(logging_text
+ stage
[1])
4348 vca_id
= self
.get_vca_id({}, db_nsr
)
4349 task_delete_ee
= asyncio
.ensure_future(
4351 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4352 timeout
=self
.timeout
.charm_delete
,
4355 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4356 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4358 # Delete Namespace and Certificates if necessary
4359 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4360 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4361 certificate_name
=db_nslcmop
["nsInstanceId"],
4363 # TODO: Delete namespace
4365 # Delete from k8scluster
4366 stage
[1] = "Deleting KDUs."
4367 self
.logger
.debug(logging_text
+ stage
[1])
4368 # print(nsr_deployed)
4369 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4370 if not kdu
or not kdu
.get("kdu-instance"):
4372 kdu_instance
= kdu
.get("kdu-instance")
4373 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4374 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4375 vca_id
= self
.get_vca_id({}, db_nsr
)
4376 task_delete_kdu_instance
= asyncio
.ensure_future(
4377 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4378 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4379 kdu_instance
=kdu_instance
,
4381 namespace
=kdu
.get("namespace"),
4387 + "Unknown k8s deployment type {}".format(
4388 kdu
.get("k8scluster-type")
4393 task_delete_kdu_instance
4394 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4397 stage
[1] = "Deleting ns from VIM."
4398 if self
.ro_config
.ng
:
4399 task_delete_ro
= asyncio
.ensure_future(
4400 self
._terminate
_ng
_ro
(
4401 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4404 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4406 # rest of staff will be done at finally
4409 ROclient
.ROClientException
,
4414 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4416 except asyncio
.CancelledError
:
4418 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4420 exc
= "Operation was cancelled"
4421 except Exception as e
:
4422 exc
= traceback
.format_exc()
4423 self
.logger
.critical(
4424 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4429 error_list
.append(str(exc
))
4431 # wait for pending tasks
4433 stage
[1] = "Waiting for terminate pending tasks."
4434 self
.logger
.debug(logging_text
+ stage
[1])
4435 error_list
+= await self
._wait
_for
_tasks
(
4438 timeout_ns_terminate
,
4442 stage
[1] = stage
[2] = ""
4443 except asyncio
.CancelledError
:
4444 error_list
.append("Cancelled")
4445 # TODO cancell all tasks
4446 except Exception as exc
:
4447 error_list
.append(str(exc
))
4448 # update status at database
4450 error_detail
= "; ".join(error_list
)
4451 # self.logger.error(logging_text + error_detail)
4452 error_description_nslcmop
= "{} Detail: {}".format(
4453 stage
[0], error_detail
4455 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4456 nslcmop_id
, stage
[0]
4459 db_nsr_update
["operational-status"] = "failed"
4460 db_nsr_update
["detailed-status"] = (
4461 error_description_nsr
+ " Detail: " + error_detail
4463 db_nslcmop_update
["detailed-status"] = error_detail
4464 nslcmop_operation_state
= "FAILED"
4468 error_description_nsr
= error_description_nslcmop
= None
4469 ns_state
= "NOT_INSTANTIATED"
4470 db_nsr_update
["operational-status"] = "terminated"
4471 db_nsr_update
["detailed-status"] = "Done"
4472 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4473 db_nslcmop_update
["detailed-status"] = "Done"
4474 nslcmop_operation_state
= "COMPLETED"
4477 self
._write
_ns
_status
(
4480 current_operation
="IDLE",
4481 current_operation_id
=None,
4482 error_description
=error_description_nsr
,
4483 error_detail
=error_detail
,
4484 other_update
=db_nsr_update
,
4486 self
._write
_op
_status
(
4489 error_message
=error_description_nslcmop
,
4490 operation_state
=nslcmop_operation_state
,
4491 other_update
=db_nslcmop_update
,
4493 if ns_state
== "NOT_INSTANTIATED":
4497 {"nsr-id-ref": nsr_id
},
4498 {"_admin.nsState": "NOT_INSTANTIATED"},
4500 except DbException
as e
:
4503 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4507 if operation_params
:
4508 autoremove
= operation_params
.get("autoremove", False)
4509 if nslcmop_operation_state
:
4511 await self
.msg
.aiowrite(
4516 "nslcmop_id": nslcmop_id
,
4517 "operationState": nslcmop_operation_state
,
4518 "autoremove": autoremove
,
4522 except Exception as e
:
4524 logging_text
+ "kafka_write notification Exception {}".format(e
)
4527 self
.logger
.debug(logging_text
+ "Exit")
4528 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4530 async def _wait_for_tasks(
4531 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4534 error_detail_list
= []
4536 pending_tasks
= list(created_tasks_info
.keys())
4537 num_tasks
= len(pending_tasks
)
4539 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4540 self
._write
_op
_status
(nslcmop_id
, stage
)
4541 while pending_tasks
:
4543 _timeout
= timeout
+ time_start
- time()
4544 done
, pending_tasks
= await asyncio
.wait(
4545 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4547 num_done
+= len(done
)
4548 if not done
: # Timeout
4549 for task
in pending_tasks
:
4550 new_error
= created_tasks_info
[task
] + ": Timeout"
4551 error_detail_list
.append(new_error
)
4552 error_list
.append(new_error
)
4555 if task
.cancelled():
4558 exc
= task
.exception()
4560 if isinstance(exc
, asyncio
.TimeoutError
):
4562 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4563 error_list
.append(created_tasks_info
[task
])
4564 error_detail_list
.append(new_error
)
4571 ROclient
.ROClientException
,
4577 self
.logger
.error(logging_text
+ new_error
)
4579 exc_traceback
= "".join(
4580 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4584 + created_tasks_info
[task
]
4590 logging_text
+ created_tasks_info
[task
] + ": Done"
4592 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4594 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4595 if nsr_id
: # update also nsr
4600 "errorDescription": "Error at: " + ", ".join(error_list
),
4601 "errorDetail": ". ".join(error_detail_list
),
4604 self
._write
_op
_status
(nslcmop_id
, stage
)
4605 return error_detail_list
4608 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4610 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4611 The default-value is used. If it is between < > it look for a value at instantiation_params
4612 :param primitive_desc: portion of VNFD/NSD that describes primitive
4613 :param params: Params provided by user
4614 :param instantiation_params: Instantiation params provided by user
4615 :return: a dictionary with the calculated params
4617 calculated_params
= {}
4618 for parameter
in primitive_desc
.get("parameter", ()):
4619 param_name
= parameter
["name"]
4620 if param_name
in params
:
4621 calculated_params
[param_name
] = params
[param_name
]
4622 elif "default-value" in parameter
or "value" in parameter
:
4623 if "value" in parameter
:
4624 calculated_params
[param_name
] = parameter
["value"]
4626 calculated_params
[param_name
] = parameter
["default-value"]
4628 isinstance(calculated_params
[param_name
], str)
4629 and calculated_params
[param_name
].startswith("<")
4630 and calculated_params
[param_name
].endswith(">")
4632 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4633 calculated_params
[param_name
] = instantiation_params
[
4634 calculated_params
[param_name
][1:-1]
4638 "Parameter {} needed to execute primitive {} not provided".format(
4639 calculated_params
[param_name
], primitive_desc
["name"]
4644 "Parameter {} needed to execute primitive {} not provided".format(
4645 param_name
, primitive_desc
["name"]
4649 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4650 calculated_params
[param_name
] = yaml
.safe_dump(
4651 calculated_params
[param_name
], default_flow_style
=True, width
=256
4653 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4655 ].startswith("!!yaml "):
4656 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4657 if parameter
.get("data-type") == "INTEGER":
4659 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4660 except ValueError: # error converting string to int
4662 "Parameter {} of primitive {} must be integer".format(
4663 param_name
, primitive_desc
["name"]
4666 elif parameter
.get("data-type") == "BOOLEAN":
4667 calculated_params
[param_name
] = not (
4668 (str(calculated_params
[param_name
])).lower() == "false"
4671 # add always ns_config_info if primitive name is config
4672 if primitive_desc
["name"] == "config":
4673 if "ns_config_info" in instantiation_params
:
4674 calculated_params
["ns_config_info"] = instantiation_params
[
4677 return calculated_params
4679 def _look_for_deployed_vca(
4686 ee_descriptor_id
=None,
4688 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4689 for vca
in deployed_vca
:
4692 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4695 vdu_count_index
is not None
4696 and vdu_count_index
!= vca
["vdu_count_index"]
4699 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4701 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4705 # vca_deployed not found
4707 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4708 " is not deployed".format(
4717 ee_id
= vca
.get("ee_id")
4719 "type", "lxc_proxy_charm"
4720 ) # default value for backward compatibility - proxy charm
4723 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4724 "execution environment".format(
4725 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4728 return ee_id
, vca_type
4730 async def _ns_execute_primitive(
4736 retries_interval
=30,
4743 if primitive
== "config":
4744 primitive_params
= {"params": primitive_params
}
4746 vca_type
= vca_type
or "lxc_proxy_charm"
4750 output
= await asyncio
.wait_for(
4751 self
.vca_map
[vca_type
].exec_primitive(
4753 primitive_name
=primitive
,
4754 params_dict
=primitive_params
,
4755 progress_timeout
=self
.timeout
.progress_primitive
,
4756 total_timeout
=self
.timeout
.primitive
,
4761 timeout
=timeout
or self
.timeout
.primitive
,
4765 except asyncio
.CancelledError
:
4767 except Exception as e
:
4771 "Error executing action {} on {} -> {}".format(
4776 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4778 if isinstance(e
, asyncio
.TimeoutError
):
4780 message
="Timed out waiting for action to complete"
4782 return "FAILED", getattr(e
, "message", repr(e
))
4784 return "COMPLETED", output
4786 except (LcmException
, asyncio
.CancelledError
):
4788 except Exception as e
:
4789 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4791 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4793 Updating the vca_status with latest juju information in nsrs record
4794 :param: nsr_id: Id of the nsr
4795 :param: nslcmop_id: Id of the nslcmop
4799 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4801 vca_id
= self
.get_vca_id({}, db_nsr
)
4802 if db_nsr
["_admin"]["deployed"]["K8s"]:
4803 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4804 cluster_uuid
, kdu_instance
, cluster_type
= (
4805 k8s
["k8scluster-uuid"],
4806 k8s
["kdu-instance"],
4807 k8s
["k8scluster-type"],
4809 await self
._on
_update
_k
8s
_db
(
4810 cluster_uuid
=cluster_uuid
,
4811 kdu_instance
=kdu_instance
,
4812 filter={"_id": nsr_id
},
4814 cluster_type
=cluster_type
,
4817 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4818 table
, filter = "nsrs", {"_id": nsr_id
}
4819 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4820 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4822 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4823 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4825 async def action(self
, nsr_id
, nslcmop_id
):
4826 # Try to lock HA task here
4827 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4828 if not task_is_locked_by_me
:
4831 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4832 self
.logger
.debug(logging_text
+ "Enter")
4833 # get all needed from database
4837 db_nslcmop_update
= {}
4838 nslcmop_operation_state
= None
4839 error_description_nslcmop
= None
4843 # wait for any previous tasks in process
4844 step
= "Waiting for previous operations to terminate"
4845 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4847 self
._write
_ns
_status
(
4850 current_operation
="RUNNING ACTION",
4851 current_operation_id
=nslcmop_id
,
4854 step
= "Getting information from database"
4855 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4856 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4857 if db_nslcmop
["operationParams"].get("primitive_params"):
4858 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4859 db_nslcmop
["operationParams"]["primitive_params"]
4862 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4863 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4864 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4865 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4866 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4867 primitive
= db_nslcmop
["operationParams"]["primitive"]
4868 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4869 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4870 "timeout_ns_action", self
.timeout
.primitive
4874 step
= "Getting vnfr from database"
4875 db_vnfr
= self
.db
.get_one(
4876 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4878 if db_vnfr
.get("kdur"):
4880 for kdur
in db_vnfr
["kdur"]:
4881 if kdur
.get("additionalParams"):
4882 kdur
["additionalParams"] = json
.loads(
4883 kdur
["additionalParams"]
4885 kdur_list
.append(kdur
)
4886 db_vnfr
["kdur"] = kdur_list
4887 step
= "Getting vnfd from database"
4888 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4890 # Sync filesystem before running a primitive
4891 self
.fs
.sync(db_vnfr
["vnfd-id"])
4893 step
= "Getting nsd from database"
4894 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4896 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4897 # for backward compatibility
4898 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4899 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4900 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4901 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4903 # look for primitive
4904 config_primitive_desc
= descriptor_configuration
= None
4906 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4908 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4910 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4912 descriptor_configuration
= db_nsd
.get("ns-configuration")
4914 if descriptor_configuration
and descriptor_configuration
.get(
4917 for config_primitive
in descriptor_configuration
["config-primitive"]:
4918 if config_primitive
["name"] == primitive
:
4919 config_primitive_desc
= config_primitive
4922 if not config_primitive_desc
:
4923 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4925 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4929 primitive_name
= primitive
4930 ee_descriptor_id
= None
4932 primitive_name
= config_primitive_desc
.get(
4933 "execution-environment-primitive", primitive
4935 ee_descriptor_id
= config_primitive_desc
.get(
4936 "execution-environment-ref"
4942 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4944 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4947 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4949 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4951 desc_params
= parse_yaml_strings(
4952 db_vnfr
.get("additionalParamsForVnf")
4955 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4956 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4957 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4959 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4960 actions
.add(primitive
["name"])
4961 for primitive
in kdu_configuration
.get("config-primitive", []):
4962 actions
.add(primitive
["name"])
4964 nsr_deployed
["K8s"],
4965 lambda kdu
: kdu_name
== kdu
["kdu-name"]
4966 and kdu
["member-vnf-index"] == vnf_index
,
4970 if primitive_name
in actions
4971 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
4975 # TODO check if ns is in a proper status
4977 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4979 # kdur and desc_params already set from before
4980 if primitive_params
:
4981 desc_params
.update(primitive_params
)
4982 # TODO Check if we will need something at vnf level
4983 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4985 kdu_name
== kdu
["kdu-name"]
4986 and kdu
["member-vnf-index"] == vnf_index
4991 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4994 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4995 msg
= "unknown k8scluster-type '{}'".format(
4996 kdu
.get("k8scluster-type")
4998 raise LcmException(msg
)
5001 "collection": "nsrs",
5002 "filter": {"_id": nsr_id
},
5003 "path": "_admin.deployed.K8s.{}".format(index
),
5007 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5009 step
= "Executing kdu {}".format(primitive_name
)
5010 if primitive_name
== "upgrade":
5011 if desc_params
.get("kdu_model"):
5012 kdu_model
= desc_params
.get("kdu_model")
5013 del desc_params
["kdu_model"]
5015 kdu_model
= kdu
.get("kdu-model")
5016 if kdu_model
.count("/") < 2: # helm chart is not embedded
5017 parts
= kdu_model
.split(sep
=":")
5019 kdu_model
= parts
[0]
5020 if desc_params
.get("kdu_atomic_upgrade"):
5021 atomic_upgrade
= desc_params
.get(
5022 "kdu_atomic_upgrade"
5023 ).lower() in ("yes", "true", "1")
5024 del desc_params
["kdu_atomic_upgrade"]
5026 atomic_upgrade
= True
5028 detailed_status
= await asyncio
.wait_for(
5029 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5030 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5031 kdu_instance
=kdu
.get("kdu-instance"),
5032 atomic
=atomic_upgrade
,
5033 kdu_model
=kdu_model
,
5036 timeout
=timeout_ns_action
,
5038 timeout
=timeout_ns_action
+ 10,
5041 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5043 elif primitive_name
== "rollback":
5044 detailed_status
= await asyncio
.wait_for(
5045 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5046 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5047 kdu_instance
=kdu
.get("kdu-instance"),
5050 timeout
=timeout_ns_action
,
5052 elif primitive_name
== "status":
5053 detailed_status
= await asyncio
.wait_for(
5054 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5055 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5056 kdu_instance
=kdu
.get("kdu-instance"),
5059 timeout
=timeout_ns_action
,
5062 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5063 kdu
["kdu-name"], nsr_id
5065 params
= self
._map
_primitive
_params
(
5066 config_primitive_desc
, primitive_params
, desc_params
5069 detailed_status
= await asyncio
.wait_for(
5070 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5071 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5072 kdu_instance
=kdu_instance
,
5073 primitive_name
=primitive_name
,
5076 timeout
=timeout_ns_action
,
5079 timeout
=timeout_ns_action
,
5083 nslcmop_operation_state
= "COMPLETED"
5085 detailed_status
= ""
5086 nslcmop_operation_state
= "FAILED"
5088 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5089 nsr_deployed
["VCA"],
5090 member_vnf_index
=vnf_index
,
5092 vdu_count_index
=vdu_count_index
,
5093 ee_descriptor_id
=ee_descriptor_id
,
5095 for vca_index
, vca_deployed
in enumerate(
5096 db_nsr
["_admin"]["deployed"]["VCA"]
5098 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5100 "collection": "nsrs",
5101 "filter": {"_id": nsr_id
},
5102 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5106 nslcmop_operation_state
,
5108 ) = await self
._ns
_execute
_primitive
(
5110 primitive
=primitive_name
,
5111 primitive_params
=self
._map
_primitive
_params
(
5112 config_primitive_desc
, primitive_params
, desc_params
5114 timeout
=timeout_ns_action
,
5120 db_nslcmop_update
["detailed-status"] = detailed_status
5121 error_description_nslcmop
= (
5122 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5126 + "Done with result {} {}".format(
5127 nslcmop_operation_state
, detailed_status
5130 return # database update is called inside finally
5132 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5133 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5135 except asyncio
.CancelledError
:
5137 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5139 exc
= "Operation was cancelled"
5140 except asyncio
.TimeoutError
:
5141 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5143 except Exception as e
:
5144 exc
= traceback
.format_exc()
5145 self
.logger
.critical(
5146 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5155 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5156 nslcmop_operation_state
= "FAILED"
5158 self
._write
_ns
_status
(
5162 ], # TODO check if degraded. For the moment use previous status
5163 current_operation
="IDLE",
5164 current_operation_id
=None,
5165 # error_description=error_description_nsr,
5166 # error_detail=error_detail,
5167 other_update
=db_nsr_update
,
5170 self
._write
_op
_status
(
5173 error_message
=error_description_nslcmop
,
5174 operation_state
=nslcmop_operation_state
,
5175 other_update
=db_nslcmop_update
,
5178 if nslcmop_operation_state
:
5180 await self
.msg
.aiowrite(
5185 "nslcmop_id": nslcmop_id
,
5186 "operationState": nslcmop_operation_state
,
5190 except Exception as e
:
5192 logging_text
+ "kafka_write notification Exception {}".format(e
)
5194 self
.logger
.debug(logging_text
+ "Exit")
5195 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5196 return nslcmop_operation_state
, detailed_status
5198 async def terminate_vdus(
5199 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5201 """This method terminates VDUs
5204 db_vnfr: VNF instance record
5205 member_vnf_index: VNF index to identify the VDUs to be removed
5206 db_nsr: NS instance record
5207 update_db_nslcmops: Nslcmop update record
5209 vca_scaling_info
= []
5210 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5211 scaling_info
["scaling_direction"] = "IN"
5212 scaling_info
["vdu-delete"] = {}
5213 scaling_info
["kdu-delete"] = {}
5214 db_vdur
= db_vnfr
.get("vdur")
5215 vdur_list
= copy(db_vdur
)
5217 for index
, vdu
in enumerate(vdur_list
):
5218 vca_scaling_info
.append(
5220 "osm_vdu_id": vdu
["vdu-id-ref"],
5221 "member-vnf-index": member_vnf_index
,
5223 "vdu_index": count_index
,
5226 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5227 scaling_info
["vdu"].append(
5229 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5230 "vdu_id": vdu
["vdu-id-ref"],
5234 for interface
in vdu
["interfaces"]:
5235 scaling_info
["vdu"][index
]["interface"].append(
5237 "name": interface
["name"],
5238 "ip_address": interface
["ip-address"],
5239 "mac_address": interface
.get("mac-address"),
5242 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5243 stage
[2] = "Terminating VDUs"
5244 if scaling_info
.get("vdu-delete"):
5245 # scale_process = "RO"
5246 if self
.ro_config
.ng
:
5247 await self
._scale
_ng
_ro
(
5256 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5257 """This method is to Remove VNF instances from NS.
5260 nsr_id: NS instance id
5261 nslcmop_id: nslcmop id of update
5262 vnf_instance_id: id of the VNF instance to be removed
5265 result: (str, str) COMPLETED/FAILED, details
5269 logging_text
= "Task ns={} update ".format(nsr_id
)
5270 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5271 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5272 if check_vnfr_count
> 1:
5273 stage
= ["", "", ""]
5274 step
= "Getting nslcmop from database"
5276 step
+ " after having waited for previous tasks to be completed"
5278 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5279 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5280 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5281 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5282 """ db_vnfr = self.db.get_one(
5283 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5285 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5286 await self
.terminate_vdus(
5295 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5296 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5297 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5298 "constituent-vnfr-ref"
5300 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5301 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5303 return "COMPLETED", "Done"
5305 step
= "Terminate VNF Failed with"
5307 "{} Cannot terminate the last VNF in this NS.".format(
5311 except (LcmException
, asyncio
.CancelledError
):
5313 except Exception as e
:
5314 self
.logger
.debug("Error removing VNF {}".format(e
))
5315 return "FAILED", "Error removing VNF {}".format(e
)
5317 async def _ns_redeploy_vnf(
5325 """This method updates and redeploys VNF instances
5328 nsr_id: NS instance id
5329 nslcmop_id: nslcmop id
5330 db_vnfd: VNF descriptor
5331 db_vnfr: VNF instance record
5332 db_nsr: NS instance record
5335 result: (str, str) COMPLETED/FAILED, details
5339 stage
= ["", "", ""]
5340 logging_text
= "Task ns={} update ".format(nsr_id
)
5341 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5342 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5344 # Terminate old VNF resources
5345 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5346 await self
.terminate_vdus(
5355 # old_vnfd_id = db_vnfr["vnfd-id"]
5356 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5357 new_db_vnfd
= db_vnfd
5358 # new_vnfd_ref = new_db_vnfd["id"]
5359 # new_vnfd_id = vnfd_id
5363 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5365 "name": cp
.get("id"),
5366 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5367 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5370 new_vnfr_cp
.append(vnf_cp
)
5371 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5372 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5373 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5375 "revision": latest_vnfd_revision
,
5376 "connection-point": new_vnfr_cp
,
5380 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5381 updated_db_vnfr
= self
.db
.get_one(
5383 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5386 # Instantiate new VNF resources
5387 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5388 vca_scaling_info
= []
5389 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5390 scaling_info
["scaling_direction"] = "OUT"
5391 scaling_info
["vdu-create"] = {}
5392 scaling_info
["kdu-create"] = {}
5393 vdud_instantiate_list
= db_vnfd
["vdu"]
5394 for index
, vdud
in enumerate(vdud_instantiate_list
):
5395 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5397 additional_params
= (
5398 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5401 cloud_init_list
= []
5403 # TODO Information of its own ip is not available because db_vnfr is not updated.
5404 additional_params
["OSM"] = get_osm_params(
5405 updated_db_vnfr
, vdud
["id"], 1
5407 cloud_init_list
.append(
5408 self
._parse
_cloud
_init
(
5415 vca_scaling_info
.append(
5417 "osm_vdu_id": vdud
["id"],
5418 "member-vnf-index": member_vnf_index
,
5420 "vdu_index": count_index
,
5423 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5424 if self
.ro_config
.ng
:
5426 "New Resources to be deployed: {}".format(scaling_info
)
5428 await self
._scale
_ng
_ro
(
5436 return "COMPLETED", "Done"
5437 except (LcmException
, asyncio
.CancelledError
):
5439 except Exception as e
:
5440 self
.logger
.debug("Error updating VNF {}".format(e
))
5441 return "FAILED", "Error updating VNF {}".format(e
)
5443 async def _ns_charm_upgrade(
5449 timeout
: float = None,
5451 """This method upgrade charms in VNF instances
5454 ee_id: Execution environment id
5455 path: Local path to the charm
5457 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5458 timeout: (Float) Timeout for the ns update operation
5461 result: (str, str) COMPLETED/FAILED, details
5464 charm_type
= charm_type
or "lxc_proxy_charm"
5465 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5469 charm_type
=charm_type
,
5470 timeout
=timeout
or self
.timeout
.ns_update
,
5474 return "COMPLETED", output
5476 except (LcmException
, asyncio
.CancelledError
):
5479 except Exception as e
:
5480 self
.logger
.debug("Error upgrading charm {}".format(path
))
5482 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5484 async def update(self
, nsr_id
, nslcmop_id
):
5485 """Update NS according to different update types
5487 This method performs upgrade of VNF instances then updates the revision
5488 number in VNF record
5491 nsr_id: Network service will be updated
5492 nslcmop_id: ns lcm operation id
5495 It may raise DbException, LcmException, N2VCException, K8sException
5498 # Try to lock HA task here
5499 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5500 if not task_is_locked_by_me
:
5503 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5504 self
.logger
.debug(logging_text
+ "Enter")
5506 # Set the required variables to be filled up later
5508 db_nslcmop_update
= {}
5510 nslcmop_operation_state
= None
5512 error_description_nslcmop
= ""
5514 change_type
= "updated"
5515 detailed_status
= ""
5516 member_vnf_index
= None
5519 # wait for any previous tasks in process
5520 step
= "Waiting for previous operations to terminate"
5521 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5522 self
._write
_ns
_status
(
5525 current_operation
="UPDATING",
5526 current_operation_id
=nslcmop_id
,
5529 step
= "Getting nslcmop from database"
5530 db_nslcmop
= self
.db
.get_one(
5531 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5533 update_type
= db_nslcmop
["operationParams"]["updateType"]
5535 step
= "Getting nsr from database"
5536 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5537 old_operational_status
= db_nsr
["operational-status"]
5538 db_nsr_update
["operational-status"] = "updating"
5539 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5540 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5542 if update_type
== "CHANGE_VNFPKG":
5543 # Get the input parameters given through update request
5544 vnf_instance_id
= db_nslcmop
["operationParams"][
5545 "changeVnfPackageData"
5546 ].get("vnfInstanceId")
5548 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5551 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5553 step
= "Getting vnfr from database"
5554 db_vnfr
= self
.db
.get_one(
5555 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5558 step
= "Getting vnfds from database"
5560 latest_vnfd
= self
.db
.get_one(
5561 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5563 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5566 current_vnf_revision
= db_vnfr
.get("revision", 1)
5567 current_vnfd
= self
.db
.get_one(
5569 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5570 fail_on_empty
=False,
5572 # Charm artifact paths will be filled up later
5574 current_charm_artifact_path
,
5575 target_charm_artifact_path
,
5576 charm_artifact_paths
,
5578 ) = ([], [], [], [])
5580 step
= "Checking if revision has changed in VNFD"
5581 if current_vnf_revision
!= latest_vnfd_revision
:
5582 change_type
= "policy_updated"
5584 # There is new revision of VNFD, update operation is required
5585 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5586 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5588 step
= "Removing the VNFD packages if they exist in the local path"
5589 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5590 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5592 step
= "Get the VNFD packages from FSMongo"
5593 self
.fs
.sync(from_path
=latest_vnfd_path
)
5594 self
.fs
.sync(from_path
=current_vnfd_path
)
5597 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5599 current_base_folder
= current_vnfd
["_admin"]["storage"]
5600 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5602 for vca_index
, vca_deployed
in enumerate(
5603 get_iterable(nsr_deployed
, "VCA")
5605 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5607 # Getting charm-id and charm-type
5608 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5609 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5610 vca_type
= vca_deployed
.get("type")
5611 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5614 ee_id
= vca_deployed
.get("ee_id")
5616 step
= "Getting descriptor config"
5617 if current_vnfd
.get("kdu"):
5618 search_key
= "kdu_name"
5620 search_key
= "vnfd_id"
5622 entity_id
= vca_deployed
.get(search_key
)
5624 descriptor_config
= get_configuration(
5625 current_vnfd
, entity_id
5628 if "execution-environment-list" in descriptor_config
:
5629 ee_list
= descriptor_config
.get(
5630 "execution-environment-list", []
5635 # There could be several charm used in the same VNF
5636 for ee_item
in ee_list
:
5637 if ee_item
.get("juju"):
5638 step
= "Getting charm name"
5639 charm_name
= ee_item
["juju"].get("charm")
5641 step
= "Setting Charm artifact paths"
5642 current_charm_artifact_path
.append(
5643 get_charm_artifact_path(
5644 current_base_folder
,
5647 current_vnf_revision
,
5650 target_charm_artifact_path
.append(
5651 get_charm_artifact_path(
5655 latest_vnfd_revision
,
5658 elif ee_item
.get("helm-chart"):
5659 # add chart to list and all parameters
5660 step
= "Getting helm chart name"
5661 chart_name
= ee_item
.get("helm-chart")
5663 ee_item
.get("helm-version")
5664 and ee_item
.get("helm-version") == "v2"
5668 vca_type
= "helm-v3"
5669 step
= "Setting Helm chart artifact paths"
5671 helm_artifacts
.append(
5673 "current_artifact_path": get_charm_artifact_path(
5674 current_base_folder
,
5677 current_vnf_revision
,
5679 "target_artifact_path": get_charm_artifact_path(
5683 latest_vnfd_revision
,
5686 "vca_index": vca_index
,
5687 "vdu_index": vdu_count_index
,
5691 charm_artifact_paths
= zip(
5692 current_charm_artifact_path
, target_charm_artifact_path
5695 step
= "Checking if software version has changed in VNFD"
5696 if find_software_version(current_vnfd
) != find_software_version(
5699 step
= "Checking if existing VNF has charm"
5700 for current_charm_path
, target_charm_path
in list(
5701 charm_artifact_paths
5703 if current_charm_path
:
5705 "Software version change is not supported as VNF instance {} has charm.".format(
5710 # There is no change in the charm package, then redeploy the VNF
5711 # based on new descriptor
5712 step
= "Redeploying VNF"
5713 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5714 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5715 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5717 if result
== "FAILED":
5718 nslcmop_operation_state
= result
5719 error_description_nslcmop
= detailed_status
5720 db_nslcmop_update
["detailed-status"] = detailed_status
5723 + " step {} Done with result {} {}".format(
5724 step
, nslcmop_operation_state
, detailed_status
5729 step
= "Checking if any charm package has changed or not"
5730 for current_charm_path
, target_charm_path
in list(
5731 charm_artifact_paths
5735 and target_charm_path
5736 and self
.check_charm_hash_changed(
5737 current_charm_path
, target_charm_path
5740 step
= "Checking whether VNF uses juju bundle"
5741 if check_juju_bundle_existence(current_vnfd
):
5743 "Charm upgrade is not supported for the instance which"
5744 " uses juju-bundle: {}".format(
5745 check_juju_bundle_existence(current_vnfd
)
5749 step
= "Upgrading Charm"
5753 ) = await self
._ns
_charm
_upgrade
(
5756 charm_type
=vca_type
,
5757 path
=self
.fs
.path
+ target_charm_path
,
5758 timeout
=timeout_seconds
,
5761 if result
== "FAILED":
5762 nslcmop_operation_state
= result
5763 error_description_nslcmop
= detailed_status
5765 db_nslcmop_update
["detailed-status"] = detailed_status
5768 + " step {} Done with result {} {}".format(
5769 step
, nslcmop_operation_state
, detailed_status
5773 step
= "Updating policies"
5774 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5775 result
= "COMPLETED"
5776 detailed_status
= "Done"
5777 db_nslcmop_update
["detailed-status"] = "Done"
5780 for item
in helm_artifacts
:
5782 item
["current_artifact_path"]
5783 and item
["target_artifact_path"]
5784 and self
.check_charm_hash_changed(
5785 item
["current_artifact_path"],
5786 item
["target_artifact_path"],
5790 db_update_entry
= "_admin.deployed.VCA.{}.".format(
5793 vnfr_id
= db_vnfr
["_id"]
5794 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
5796 "collection": "nsrs",
5797 "filter": {"_id": nsr_id
},
5798 "path": db_update_entry
,
5800 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
5801 await self
.vca_map
[vca_type
].upgrade_execution_environment(
5802 namespace
=namespace
,
5806 artifact_path
=item
["target_artifact_path"],
5809 vnf_id
= db_vnfr
.get("vnfd-ref")
5810 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
5811 self
.logger
.debug("get ssh key block")
5815 ("config-access", "ssh-access", "required"),
5817 # Needed to inject a ssh key
5820 ("config-access", "ssh-access", "default-user"),
5823 "Install configuration Software, getting public ssh key"
5825 pub_key
= await self
.vca_map
[
5827 ].get_ee_ssh_public__key(
5828 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
5832 "Insert public key into VM user={} ssh_key={}".format(
5836 self
.logger
.debug(logging_text
+ step
)
5838 # wait for RO (ip-address) Insert pub_key into VM
5839 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
5849 initial_config_primitive_list
= config_descriptor
.get(
5850 "initial-config-primitive"
5852 config_primitive
= next(
5855 for p
in initial_config_primitive_list
5856 if p
["name"] == "config"
5860 if not config_primitive
:
5863 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5865 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
5866 if db_vnfr
.get("additionalParamsForVnf"):
5867 deploy_params
.update(
5869 db_vnfr
["additionalParamsForVnf"].copy()
5872 primitive_params_
= self
._map
_primitive
_params
(
5873 config_primitive
, {}, deploy_params
5876 step
= "execute primitive '{}' params '{}'".format(
5877 config_primitive
["name"], primitive_params_
5879 self
.logger
.debug(logging_text
+ step
)
5880 await self
.vca_map
[vca_type
].exec_primitive(
5882 primitive_name
=config_primitive
["name"],
5883 params_dict
=primitive_params_
,
5889 step
= "Updating policies"
5890 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5891 detailed_status
= "Done"
5892 db_nslcmop_update
["detailed-status"] = "Done"
5894 # If nslcmop_operation_state is None, so any operation is not failed.
5895 if not nslcmop_operation_state
:
5896 nslcmop_operation_state
= "COMPLETED"
5898 # If update CHANGE_VNFPKG nslcmop_operation is successful
5899 # vnf revision need to be updated
5900 vnfr_update
["revision"] = latest_vnfd_revision
5901 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5905 + " task Done with result {} {}".format(
5906 nslcmop_operation_state
, detailed_status
5909 elif update_type
== "REMOVE_VNF":
5910 # This part is included in https://osm.etsi.org/gerrit/11876
5911 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5912 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5913 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5914 step
= "Removing VNF"
5915 (result
, detailed_status
) = await self
.remove_vnf(
5916 nsr_id
, nslcmop_id
, vnf_instance_id
5918 if result
== "FAILED":
5919 nslcmop_operation_state
= result
5920 error_description_nslcmop
= detailed_status
5921 db_nslcmop_update
["detailed-status"] = detailed_status
5922 change_type
= "vnf_terminated"
5923 if not nslcmop_operation_state
:
5924 nslcmop_operation_state
= "COMPLETED"
5927 + " task Done with result {} {}".format(
5928 nslcmop_operation_state
, detailed_status
5932 elif update_type
== "OPERATE_VNF":
5933 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
5936 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
5939 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
5942 (result
, detailed_status
) = await self
.rebuild_start_stop(
5943 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5945 if result
== "FAILED":
5946 nslcmop_operation_state
= result
5947 error_description_nslcmop
= detailed_status
5948 db_nslcmop_update
["detailed-status"] = detailed_status
5949 if not nslcmop_operation_state
:
5950 nslcmop_operation_state
= "COMPLETED"
5953 + " task Done with result {} {}".format(
5954 nslcmop_operation_state
, detailed_status
5958 # If nslcmop_operation_state is None, so any operation is not failed.
5959 # All operations are executed in overall.
5960 if not nslcmop_operation_state
:
5961 nslcmop_operation_state
= "COMPLETED"
5962 db_nsr_update
["operational-status"] = old_operational_status
5964 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5965 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5967 except asyncio
.CancelledError
:
5969 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5971 exc
= "Operation was cancelled"
5972 except asyncio
.TimeoutError
:
5973 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5975 except Exception as e
:
5976 exc
= traceback
.format_exc()
5977 self
.logger
.critical(
5978 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5987 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5988 nslcmop_operation_state
= "FAILED"
5989 db_nsr_update
["operational-status"] = old_operational_status
5991 self
._write
_ns
_status
(
5993 ns_state
=db_nsr
["nsState"],
5994 current_operation
="IDLE",
5995 current_operation_id
=None,
5996 other_update
=db_nsr_update
,
5999 self
._write
_op
_status
(
6002 error_message
=error_description_nslcmop
,
6003 operation_state
=nslcmop_operation_state
,
6004 other_update
=db_nslcmop_update
,
6007 if nslcmop_operation_state
:
6011 "nslcmop_id": nslcmop_id
,
6012 "operationState": nslcmop_operation_state
,
6015 change_type
in ("vnf_terminated", "policy_updated")
6016 and member_vnf_index
6018 msg
.update({"vnf_member_index": member_vnf_index
})
6019 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6020 except Exception as e
:
6022 logging_text
+ "kafka_write notification Exception {}".format(e
)
6024 self
.logger
.debug(logging_text
+ "Exit")
6025 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6026 return nslcmop_operation_state
, detailed_status
6028 async def scale(self
, nsr_id
, nslcmop_id
):
6029 # Try to lock HA task here
6030 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6031 if not task_is_locked_by_me
:
6034 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6035 stage
= ["", "", ""]
6036 tasks_dict_info
= {}
6037 # ^ stage, step, VIM progress
6038 self
.logger
.debug(logging_text
+ "Enter")
6039 # get all needed from database
6041 db_nslcmop_update
= {}
6044 # in case of error, indicates what part of scale was failed to put nsr at error status
6045 scale_process
= None
6046 old_operational_status
= ""
6047 old_config_status
= ""
6050 # wait for any previous tasks in process
6051 step
= "Waiting for previous operations to terminate"
6052 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6053 self
._write
_ns
_status
(
6056 current_operation
="SCALING",
6057 current_operation_id
=nslcmop_id
,
6060 step
= "Getting nslcmop from database"
6062 step
+ " after having waited for previous tasks to be completed"
6064 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6066 step
= "Getting nsr from database"
6067 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6068 old_operational_status
= db_nsr
["operational-status"]
6069 old_config_status
= db_nsr
["config-status"]
6071 step
= "Parsing scaling parameters"
6072 db_nsr_update
["operational-status"] = "scaling"
6073 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6074 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6076 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6078 ]["member-vnf-index"]
6079 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6081 ]["scaling-group-descriptor"]
6082 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6083 # for backward compatibility
6084 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6085 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6086 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6087 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6089 step
= "Getting vnfr from database"
6090 db_vnfr
= self
.db
.get_one(
6091 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6094 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6096 step
= "Getting vnfd from database"
6097 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6099 base_folder
= db_vnfd
["_admin"]["storage"]
6101 step
= "Getting scaling-group-descriptor"
6102 scaling_descriptor
= find_in_list(
6103 get_scaling_aspect(db_vnfd
),
6104 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6106 if not scaling_descriptor
:
6108 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6109 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6112 step
= "Sending scale order to VIM"
6113 # TODO check if ns is in a proper status
6115 if not db_nsr
["_admin"].get("scaling-group"):
6120 "_admin.scaling-group": [
6121 {"name": scaling_group
, "nb-scale-op": 0}
6125 admin_scale_index
= 0
6127 for admin_scale_index
, admin_scale_info
in enumerate(
6128 db_nsr
["_admin"]["scaling-group"]
6130 if admin_scale_info
["name"] == scaling_group
:
6131 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6133 else: # not found, set index one plus last element and add new entry with the name
6134 admin_scale_index
+= 1
6136 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6139 vca_scaling_info
= []
6140 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6141 if scaling_type
== "SCALE_OUT":
6142 if "aspect-delta-details" not in scaling_descriptor
:
6144 "Aspect delta details not fount in scaling descriptor {}".format(
6145 scaling_descriptor
["name"]
6148 # count if max-instance-count is reached
6149 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6151 scaling_info
["scaling_direction"] = "OUT"
6152 scaling_info
["vdu-create"] = {}
6153 scaling_info
["kdu-create"] = {}
6154 for delta
in deltas
:
6155 for vdu_delta
in delta
.get("vdu-delta", {}):
6156 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6157 # vdu_index also provides the number of instance of the targeted vdu
6158 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6159 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6163 additional_params
= (
6164 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6167 cloud_init_list
= []
6169 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6170 max_instance_count
= 10
6171 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6172 max_instance_count
= vdu_profile
.get(
6173 "max-number-of-instances", 10
6176 default_instance_num
= get_number_of_instances(
6179 instances_number
= vdu_delta
.get("number-of-instances", 1)
6180 nb_scale_op
+= instances_number
6182 new_instance_count
= nb_scale_op
+ default_instance_num
6183 # Control if new count is over max and vdu count is less than max.
6184 # Then assign new instance count
6185 if new_instance_count
> max_instance_count
> vdu_count
:
6186 instances_number
= new_instance_count
- max_instance_count
6188 instances_number
= instances_number
6190 if new_instance_count
> max_instance_count
:
6192 "reached the limit of {} (max-instance-count) "
6193 "scaling-out operations for the "
6194 "scaling-group-descriptor '{}'".format(
6195 nb_scale_op
, scaling_group
6198 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6200 # TODO Information of its own ip is not available because db_vnfr is not updated.
6201 additional_params
["OSM"] = get_osm_params(
6202 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6204 cloud_init_list
.append(
6205 self
._parse
_cloud
_init
(
6212 vca_scaling_info
.append(
6214 "osm_vdu_id": vdu_delta
["id"],
6215 "member-vnf-index": vnf_index
,
6217 "vdu_index": vdu_index
+ x
,
6220 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6221 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6222 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6223 kdu_name
= kdu_profile
["kdu-name"]
6224 resource_name
= kdu_profile
.get("resource-name", "")
6226 # Might have different kdus in the same delta
6227 # Should have list for each kdu
6228 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6229 scaling_info
["kdu-create"][kdu_name
] = []
6231 kdur
= get_kdur(db_vnfr
, kdu_name
)
6232 if kdur
.get("helm-chart"):
6233 k8s_cluster_type
= "helm-chart-v3"
6234 self
.logger
.debug("kdur: {}".format(kdur
))
6236 kdur
.get("helm-version")
6237 and kdur
.get("helm-version") == "v2"
6239 k8s_cluster_type
= "helm-chart"
6240 elif kdur
.get("juju-bundle"):
6241 k8s_cluster_type
= "juju-bundle"
6244 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6245 "juju-bundle. Maybe an old NBI version is running".format(
6246 db_vnfr
["member-vnf-index-ref"], kdu_name
6250 max_instance_count
= 10
6251 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6252 max_instance_count
= kdu_profile
.get(
6253 "max-number-of-instances", 10
6256 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6257 deployed_kdu
, _
= get_deployed_kdu(
6258 nsr_deployed
, kdu_name
, vnf_index
6260 if deployed_kdu
is None:
6262 "KDU '{}' for vnf '{}' not deployed".format(
6266 kdu_instance
= deployed_kdu
.get("kdu-instance")
6267 instance_num
= await self
.k8scluster_map
[
6273 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6274 kdu_model
=deployed_kdu
.get("kdu-model"),
6276 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6277 "number-of-instances", 1
6280 # Control if new count is over max and instance_num is less than max.
6281 # Then assign max instance number to kdu replica count
6282 if kdu_replica_count
> max_instance_count
> instance_num
:
6283 kdu_replica_count
= max_instance_count
6284 if kdu_replica_count
> max_instance_count
:
6286 "reached the limit of {} (max-instance-count) "
6287 "scaling-out operations for the "
6288 "scaling-group-descriptor '{}'".format(
6289 instance_num
, scaling_group
6293 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6294 vca_scaling_info
.append(
6296 "osm_kdu_id": kdu_name
,
6297 "member-vnf-index": vnf_index
,
6299 "kdu_index": instance_num
+ x
- 1,
6302 scaling_info
["kdu-create"][kdu_name
].append(
6304 "member-vnf-index": vnf_index
,
6306 "k8s-cluster-type": k8s_cluster_type
,
6307 "resource-name": resource_name
,
6308 "scale": kdu_replica_count
,
6311 elif scaling_type
== "SCALE_IN":
6312 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6314 scaling_info
["scaling_direction"] = "IN"
6315 scaling_info
["vdu-delete"] = {}
6316 scaling_info
["kdu-delete"] = {}
6318 for delta
in deltas
:
6319 for vdu_delta
in delta
.get("vdu-delta", {}):
6320 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6321 min_instance_count
= 0
6322 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6323 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6324 min_instance_count
= vdu_profile
["min-number-of-instances"]
6326 default_instance_num
= get_number_of_instances(
6327 db_vnfd
, vdu_delta
["id"]
6329 instance_num
= vdu_delta
.get("number-of-instances", 1)
6330 nb_scale_op
-= instance_num
6332 new_instance_count
= nb_scale_op
+ default_instance_num
6334 if new_instance_count
< min_instance_count
< vdu_count
:
6335 instances_number
= min_instance_count
- new_instance_count
6337 instances_number
= instance_num
6339 if new_instance_count
< min_instance_count
:
6341 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6342 "scaling-group-descriptor '{}'".format(
6343 nb_scale_op
, scaling_group
6346 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6347 vca_scaling_info
.append(
6349 "osm_vdu_id": vdu_delta
["id"],
6350 "member-vnf-index": vnf_index
,
6352 "vdu_index": vdu_index
- 1 - x
,
6355 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6356 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6357 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6358 kdu_name
= kdu_profile
["kdu-name"]
6359 resource_name
= kdu_profile
.get("resource-name", "")
6361 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6362 scaling_info
["kdu-delete"][kdu_name
] = []
6364 kdur
= get_kdur(db_vnfr
, kdu_name
)
6365 if kdur
.get("helm-chart"):
6366 k8s_cluster_type
= "helm-chart-v3"
6367 self
.logger
.debug("kdur: {}".format(kdur
))
6369 kdur
.get("helm-version")
6370 and kdur
.get("helm-version") == "v2"
6372 k8s_cluster_type
= "helm-chart"
6373 elif kdur
.get("juju-bundle"):
6374 k8s_cluster_type
= "juju-bundle"
6377 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6378 "juju-bundle. Maybe an old NBI version is running".format(
6379 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6383 min_instance_count
= 0
6384 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6385 min_instance_count
= kdu_profile
["min-number-of-instances"]
6387 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6388 deployed_kdu
, _
= get_deployed_kdu(
6389 nsr_deployed
, kdu_name
, vnf_index
6391 if deployed_kdu
is None:
6393 "KDU '{}' for vnf '{}' not deployed".format(
6397 kdu_instance
= deployed_kdu
.get("kdu-instance")
6398 instance_num
= await self
.k8scluster_map
[
6404 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6405 kdu_model
=deployed_kdu
.get("kdu-model"),
6407 kdu_replica_count
= instance_num
- kdu_delta
.get(
6408 "number-of-instances", 1
6411 if kdu_replica_count
< min_instance_count
< instance_num
:
6412 kdu_replica_count
= min_instance_count
6413 if kdu_replica_count
< min_instance_count
:
6415 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6416 "scaling-group-descriptor '{}'".format(
6417 instance_num
, scaling_group
6421 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6422 vca_scaling_info
.append(
6424 "osm_kdu_id": kdu_name
,
6425 "member-vnf-index": vnf_index
,
6427 "kdu_index": instance_num
- x
- 1,
6430 scaling_info
["kdu-delete"][kdu_name
].append(
6432 "member-vnf-index": vnf_index
,
6434 "k8s-cluster-type": k8s_cluster_type
,
6435 "resource-name": resource_name
,
6436 "scale": kdu_replica_count
,
6440 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6441 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6442 if scaling_info
["scaling_direction"] == "IN":
6443 for vdur
in reversed(db_vnfr
["vdur"]):
6444 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6445 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6446 scaling_info
["vdu"].append(
6448 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6449 "vdu_id": vdur
["vdu-id-ref"],
6453 for interface
in vdur
["interfaces"]:
6454 scaling_info
["vdu"][-1]["interface"].append(
6456 "name": interface
["name"],
6457 "ip_address": interface
["ip-address"],
6458 "mac_address": interface
.get("mac-address"),
6461 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6464 step
= "Executing pre-scale vnf-config-primitive"
6465 if scaling_descriptor
.get("scaling-config-action"):
6466 for scaling_config_action
in scaling_descriptor
[
6467 "scaling-config-action"
6470 scaling_config_action
.get("trigger") == "pre-scale-in"
6471 and scaling_type
== "SCALE_IN"
6473 scaling_config_action
.get("trigger") == "pre-scale-out"
6474 and scaling_type
== "SCALE_OUT"
6476 vnf_config_primitive
= scaling_config_action
[
6477 "vnf-config-primitive-name-ref"
6479 step
= db_nslcmop_update
[
6481 ] = "executing pre-scale scaling-config-action '{}'".format(
6482 vnf_config_primitive
6485 # look for primitive
6486 for config_primitive
in (
6487 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6488 ).get("config-primitive", ()):
6489 if config_primitive
["name"] == vnf_config_primitive
:
6493 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6494 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6495 "primitive".format(scaling_group
, vnf_config_primitive
)
6498 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6499 if db_vnfr
.get("additionalParamsForVnf"):
6500 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6502 scale_process
= "VCA"
6503 db_nsr_update
["config-status"] = "configuring pre-scaling"
6504 primitive_params
= self
._map
_primitive
_params
(
6505 config_primitive
, {}, vnfr_params
6508 # Pre-scale retry check: Check if this sub-operation has been executed before
6509 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6512 vnf_config_primitive
,
6516 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6517 # Skip sub-operation
6518 result
= "COMPLETED"
6519 result_detail
= "Done"
6522 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6523 vnf_config_primitive
, result
, result_detail
6527 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6528 # New sub-operation: Get index of this sub-operation
6530 len(db_nslcmop
.get("_admin", {}).get("operations"))
6535 + "vnf_config_primitive={} New sub-operation".format(
6536 vnf_config_primitive
6540 # retry: Get registered params for this existing sub-operation
6541 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6544 vnf_index
= op
.get("member_vnf_index")
6545 vnf_config_primitive
= op
.get("primitive")
6546 primitive_params
= op
.get("primitive_params")
6549 + "vnf_config_primitive={} Sub-operation retry".format(
6550 vnf_config_primitive
6553 # Execute the primitive, either with new (first-time) or registered (reintent) args
6554 ee_descriptor_id
= config_primitive
.get(
6555 "execution-environment-ref"
6557 primitive_name
= config_primitive
.get(
6558 "execution-environment-primitive", vnf_config_primitive
6560 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6561 nsr_deployed
["VCA"],
6562 member_vnf_index
=vnf_index
,
6564 vdu_count_index
=None,
6565 ee_descriptor_id
=ee_descriptor_id
,
6567 result
, result_detail
= await self
._ns
_execute
_primitive
(
6576 + "vnf_config_primitive={} Done with result {} {}".format(
6577 vnf_config_primitive
, result
, result_detail
6580 # Update operationState = COMPLETED | FAILED
6581 self
._update
_suboperation
_status
(
6582 db_nslcmop
, op_index
, result
, result_detail
6585 if result
== "FAILED":
6586 raise LcmException(result_detail
)
6587 db_nsr_update
["config-status"] = old_config_status
6588 scale_process
= None
6592 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6595 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6598 # SCALE-IN VCA - BEGIN
6599 if vca_scaling_info
:
6600 step
= db_nslcmop_update
[
6602 ] = "Deleting the execution environments"
6603 scale_process
= "VCA"
6604 for vca_info
in vca_scaling_info
:
6605 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6606 member_vnf_index
= str(vca_info
["member-vnf-index"])
6608 logging_text
+ "vdu info: {}".format(vca_info
)
6610 if vca_info
.get("osm_vdu_id"):
6611 vdu_id
= vca_info
["osm_vdu_id"]
6612 vdu_index
= int(vca_info
["vdu_index"])
6615 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6616 member_vnf_index
, vdu_id
, vdu_index
6618 stage
[2] = step
= "Scaling in VCA"
6619 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6620 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6621 config_update
= db_nsr
["configurationStatus"]
6622 for vca_index
, vca
in enumerate(vca_update
):
6624 (vca
or vca
.get("ee_id"))
6625 and vca
["member-vnf-index"] == member_vnf_index
6626 and vca
["vdu_count_index"] == vdu_index
6628 if vca
.get("vdu_id"):
6629 config_descriptor
= get_configuration(
6630 db_vnfd
, vca
.get("vdu_id")
6632 elif vca
.get("kdu_name"):
6633 config_descriptor
= get_configuration(
6634 db_vnfd
, vca
.get("kdu_name")
6637 config_descriptor
= get_configuration(
6638 db_vnfd
, db_vnfd
["id"]
6640 operation_params
= (
6641 db_nslcmop
.get("operationParams") or {}
6643 exec_terminate_primitives
= not operation_params
.get(
6644 "skip_terminate_primitives"
6645 ) and vca
.get("needed_terminate")
6646 task
= asyncio
.ensure_future(
6655 exec_primitives
=exec_terminate_primitives
,
6659 timeout
=self
.timeout
.charm_delete
,
6662 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6665 del vca_update
[vca_index
]
6666 del config_update
[vca_index
]
6667 # wait for pending tasks of terminate primitives
6671 + "Waiting for tasks {}".format(
6672 list(tasks_dict_info
.keys())
6675 error_list
= await self
._wait
_for
_tasks
(
6679 self
.timeout
.charm_delete
, self
.timeout
.ns_terminate
6684 tasks_dict_info
.clear()
6686 raise LcmException("; ".join(error_list
))
6688 db_vca_and_config_update
= {
6689 "_admin.deployed.VCA": vca_update
,
6690 "configurationStatus": config_update
,
6693 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6695 scale_process
= None
6696 # SCALE-IN VCA - END
6699 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6700 scale_process
= "RO"
6701 if self
.ro_config
.ng
:
6702 await self
._scale
_ng
_ro
(
6703 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6705 scaling_info
.pop("vdu-create", None)
6706 scaling_info
.pop("vdu-delete", None)
6708 scale_process
= None
6712 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6713 scale_process
= "KDU"
6714 await self
._scale
_kdu
(
6715 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6717 scaling_info
.pop("kdu-create", None)
6718 scaling_info
.pop("kdu-delete", None)
6720 scale_process
= None
6724 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6726 # SCALE-UP VCA - BEGIN
6727 if vca_scaling_info
:
6728 step
= db_nslcmop_update
[
6730 ] = "Creating new execution environments"
6731 scale_process
= "VCA"
6732 for vca_info
in vca_scaling_info
:
6733 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6734 member_vnf_index
= str(vca_info
["member-vnf-index"])
6736 logging_text
+ "vdu info: {}".format(vca_info
)
6738 vnfd_id
= db_vnfr
["vnfd-ref"]
6739 if vca_info
.get("osm_vdu_id"):
6740 vdu_index
= int(vca_info
["vdu_index"])
6741 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6742 if db_vnfr
.get("additionalParamsForVnf"):
6743 deploy_params
.update(
6745 db_vnfr
["additionalParamsForVnf"].copy()
6748 descriptor_config
= get_configuration(
6749 db_vnfd
, db_vnfd
["id"]
6751 if descriptor_config
:
6757 logging_text
=logging_text
6758 + "member_vnf_index={} ".format(member_vnf_index
),
6761 nslcmop_id
=nslcmop_id
,
6767 kdu_index
=kdu_index
,
6768 member_vnf_index
=member_vnf_index
,
6769 vdu_index
=vdu_index
,
6771 deploy_params
=deploy_params
,
6772 descriptor_config
=descriptor_config
,
6773 base_folder
=base_folder
,
6774 task_instantiation_info
=tasks_dict_info
,
6777 vdu_id
= vca_info
["osm_vdu_id"]
6778 vdur
= find_in_list(
6779 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6781 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6782 if vdur
.get("additionalParams"):
6783 deploy_params_vdu
= parse_yaml_strings(
6784 vdur
["additionalParams"]
6787 deploy_params_vdu
= deploy_params
6788 deploy_params_vdu
["OSM"] = get_osm_params(
6789 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6791 if descriptor_config
:
6797 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6798 member_vnf_index
, vdu_id
, vdu_index
6800 stage
[2] = step
= "Scaling out VCA"
6801 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6803 logging_text
=logging_text
6804 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6805 member_vnf_index
, vdu_id
, vdu_index
6809 nslcmop_id
=nslcmop_id
,
6815 member_vnf_index
=member_vnf_index
,
6816 vdu_index
=vdu_index
,
6817 kdu_index
=kdu_index
,
6819 deploy_params
=deploy_params_vdu
,
6820 descriptor_config
=descriptor_config
,
6821 base_folder
=base_folder
,
6822 task_instantiation_info
=tasks_dict_info
,
6825 # SCALE-UP VCA - END
6826 scale_process
= None
6829 # execute primitive service POST-SCALING
6830 step
= "Executing post-scale vnf-config-primitive"
6831 if scaling_descriptor
.get("scaling-config-action"):
6832 for scaling_config_action
in scaling_descriptor
[
6833 "scaling-config-action"
6836 scaling_config_action
.get("trigger") == "post-scale-in"
6837 and scaling_type
== "SCALE_IN"
6839 scaling_config_action
.get("trigger") == "post-scale-out"
6840 and scaling_type
== "SCALE_OUT"
6842 vnf_config_primitive
= scaling_config_action
[
6843 "vnf-config-primitive-name-ref"
6845 step
= db_nslcmop_update
[
6847 ] = "executing post-scale scaling-config-action '{}'".format(
6848 vnf_config_primitive
6851 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6852 if db_vnfr
.get("additionalParamsForVnf"):
6853 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6855 # look for primitive
6856 for config_primitive
in (
6857 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6858 ).get("config-primitive", ()):
6859 if config_primitive
["name"] == vnf_config_primitive
:
6863 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6864 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6865 "config-primitive".format(
6866 scaling_group
, vnf_config_primitive
6869 scale_process
= "VCA"
6870 db_nsr_update
["config-status"] = "configuring post-scaling"
6871 primitive_params
= self
._map
_primitive
_params
(
6872 config_primitive
, {}, vnfr_params
6875 # Post-scale retry check: Check if this sub-operation has been executed before
6876 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6879 vnf_config_primitive
,
6883 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6884 # Skip sub-operation
6885 result
= "COMPLETED"
6886 result_detail
= "Done"
6889 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6890 vnf_config_primitive
, result
, result_detail
6894 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6895 # New sub-operation: Get index of this sub-operation
6897 len(db_nslcmop
.get("_admin", {}).get("operations"))
6902 + "vnf_config_primitive={} New sub-operation".format(
6903 vnf_config_primitive
6907 # retry: Get registered params for this existing sub-operation
6908 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6911 vnf_index
= op
.get("member_vnf_index")
6912 vnf_config_primitive
= op
.get("primitive")
6913 primitive_params
= op
.get("primitive_params")
6916 + "vnf_config_primitive={} Sub-operation retry".format(
6917 vnf_config_primitive
6920 # Execute the primitive, either with new (first-time) or registered (reintent) args
6921 ee_descriptor_id
= config_primitive
.get(
6922 "execution-environment-ref"
6924 primitive_name
= config_primitive
.get(
6925 "execution-environment-primitive", vnf_config_primitive
6927 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6928 nsr_deployed
["VCA"],
6929 member_vnf_index
=vnf_index
,
6931 vdu_count_index
=None,
6932 ee_descriptor_id
=ee_descriptor_id
,
6934 result
, result_detail
= await self
._ns
_execute
_primitive
(
6943 + "vnf_config_primitive={} Done with result {} {}".format(
6944 vnf_config_primitive
, result
, result_detail
6947 # Update operationState = COMPLETED | FAILED
6948 self
._update
_suboperation
_status
(
6949 db_nslcmop
, op_index
, result
, result_detail
6952 if result
== "FAILED":
6953 raise LcmException(result_detail
)
6954 db_nsr_update
["config-status"] = old_config_status
6955 scale_process
= None
6960 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6961 db_nsr_update
["operational-status"] = (
6963 if old_operational_status
== "failed"
6964 else old_operational_status
6966 db_nsr_update
["config-status"] = old_config_status
6969 ROclient
.ROClientException
,
6974 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6976 except asyncio
.CancelledError
:
6978 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6980 exc
= "Operation was cancelled"
6981 except Exception as e
:
6982 exc
= traceback
.format_exc()
6983 self
.logger
.critical(
6984 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6988 self
._write
_ns
_status
(
6991 current_operation
="IDLE",
6992 current_operation_id
=None,
6995 stage
[1] = "Waiting for instantiate pending tasks."
6996 self
.logger
.debug(logging_text
+ stage
[1])
6997 exc
= await self
._wait
_for
_tasks
(
7000 self
.timeout
.ns_deploy
,
7008 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7009 nslcmop_operation_state
= "FAILED"
7011 db_nsr_update
["operational-status"] = old_operational_status
7012 db_nsr_update
["config-status"] = old_config_status
7013 db_nsr_update
["detailed-status"] = ""
7015 if "VCA" in scale_process
:
7016 db_nsr_update
["config-status"] = "failed"
7017 if "RO" in scale_process
:
7018 db_nsr_update
["operational-status"] = "failed"
7021 ] = "FAILED scaling nslcmop={} {}: {}".format(
7022 nslcmop_id
, step
, exc
7025 error_description_nslcmop
= None
7026 nslcmop_operation_state
= "COMPLETED"
7027 db_nslcmop_update
["detailed-status"] = "Done"
7029 self
._write
_op
_status
(
7032 error_message
=error_description_nslcmop
,
7033 operation_state
=nslcmop_operation_state
,
7034 other_update
=db_nslcmop_update
,
7037 self
._write
_ns
_status
(
7040 current_operation
="IDLE",
7041 current_operation_id
=None,
7042 other_update
=db_nsr_update
,
7045 if nslcmop_operation_state
:
7049 "nslcmop_id": nslcmop_id
,
7050 "operationState": nslcmop_operation_state
,
7052 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7053 except Exception as e
:
7055 logging_text
+ "kafka_write notification Exception {}".format(e
)
7057 self
.logger
.debug(logging_text
+ "Exit")
7058 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7060 async def _scale_kdu(
7061 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7063 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7064 for kdu_name
in _scaling_info
:
7065 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7066 deployed_kdu
, index
= get_deployed_kdu(
7067 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7069 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7070 kdu_instance
= deployed_kdu
["kdu-instance"]
7071 kdu_model
= deployed_kdu
.get("kdu-model")
7072 scale
= int(kdu_scaling_info
["scale"])
7073 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7076 "collection": "nsrs",
7077 "filter": {"_id": nsr_id
},
7078 "path": "_admin.deployed.K8s.{}".format(index
),
7081 step
= "scaling application {}".format(
7082 kdu_scaling_info
["resource-name"]
7084 self
.logger
.debug(logging_text
+ step
)
7086 if kdu_scaling_info
["type"] == "delete":
7087 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7090 and kdu_config
.get("terminate-config-primitive")
7091 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7093 terminate_config_primitive_list
= kdu_config
.get(
7094 "terminate-config-primitive"
7096 terminate_config_primitive_list
.sort(
7097 key
=lambda val
: int(val
["seq"])
7101 terminate_config_primitive
7102 ) in terminate_config_primitive_list
:
7103 primitive_params_
= self
._map
_primitive
_params
(
7104 terminate_config_primitive
, {}, {}
7106 step
= "execute terminate config primitive"
7107 self
.logger
.debug(logging_text
+ step
)
7108 await asyncio
.wait_for(
7109 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7110 cluster_uuid
=cluster_uuid
,
7111 kdu_instance
=kdu_instance
,
7112 primitive_name
=terminate_config_primitive
["name"],
7113 params
=primitive_params_
,
7115 total_timeout
=self
.timeout
.primitive
,
7118 timeout
=self
.timeout
.primitive
7119 * self
.timeout
.primitive_outer_factor
,
7122 await asyncio
.wait_for(
7123 self
.k8scluster_map
[k8s_cluster_type
].scale(
7124 kdu_instance
=kdu_instance
,
7126 resource_name
=kdu_scaling_info
["resource-name"],
7127 total_timeout
=self
.timeout
.scale_on_error
,
7129 cluster_uuid
=cluster_uuid
,
7130 kdu_model
=kdu_model
,
7134 timeout
=self
.timeout
.scale_on_error
7135 * self
.timeout
.scale_on_error_outer_factor
,
7138 if kdu_scaling_info
["type"] == "create":
7139 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7142 and kdu_config
.get("initial-config-primitive")
7143 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7145 initial_config_primitive_list
= kdu_config
.get(
7146 "initial-config-primitive"
7148 initial_config_primitive_list
.sort(
7149 key
=lambda val
: int(val
["seq"])
7152 for initial_config_primitive
in initial_config_primitive_list
:
7153 primitive_params_
= self
._map
_primitive
_params
(
7154 initial_config_primitive
, {}, {}
7156 step
= "execute initial config primitive"
7157 self
.logger
.debug(logging_text
+ step
)
7158 await asyncio
.wait_for(
7159 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7160 cluster_uuid
=cluster_uuid
,
7161 kdu_instance
=kdu_instance
,
7162 primitive_name
=initial_config_primitive
["name"],
7163 params
=primitive_params_
,
7170 async def _scale_ng_ro(
7171 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7173 nsr_id
= db_nslcmop
["nsInstanceId"]
7174 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7177 # read from db: vnfd's for every vnf
7180 # for each vnf in ns, read vnfd
7181 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7182 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7183 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7184 # if we haven't this vnfd, read it from db
7185 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7187 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7188 db_vnfds
.append(vnfd
)
7189 n2vc_key
= self
.n2vc
.get_public_key()
7190 n2vc_key_list
= [n2vc_key
]
7193 vdu_scaling_info
.get("vdu-create"),
7194 vdu_scaling_info
.get("vdu-delete"),
7197 # db_vnfr has been updated, update db_vnfrs to use it
7198 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7199 await self
._instantiate
_ng
_ro
(
7209 start_deploy
=time(),
7210 timeout_ns_deploy
=self
.timeout
.ns_deploy
,
7212 if vdu_scaling_info
.get("vdu-delete"):
7214 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7217 async def extract_prometheus_scrape_jobs(
7221 ee_config_descriptor
: dict,
7226 vnf_member_index
: str = "",
7228 vdu_index
: int = None,
7230 kdu_index
: int = None,
7232 """Method to extract prometheus scrape jobs from EE's Prometheus template job file
7233 This method will wait until the corresponding VDU or KDU is fully instantiated
7236 ee_id (str): Execution Environment ID
7237 artifact_path (str): Path where the EE's content is (including the Prometheus template file)
7238 ee_config_descriptor (dict): Execution Environment's configuration descriptor
7239 vnfr_id (str): VNFR ID where this EE applies
7240 nsr_id (str): NSR ID where this EE applies
7241 target_ip (str): VDU/KDU instance IP address
7242 element_type (str): NS or VNF or VDU or KDU
7243 vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
7244 vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
7245 vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
7246 kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
7247 kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
7250 LcmException: When the VDU or KDU instance was not found in an hour
7253 _type_: Prometheus jobs
7255 # default the vdur and kdur names to an empty string, to avoid any later
7256 # problem with Prometheus when the element type is not VDU or KDU
7260 # look if exist a file called 'prometheus*.j2' and
7261 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7265 for f
in artifact_content
7266 if f
.startswith("prometheus") and f
.endswith(".j2")
7272 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7275 # obtain the VDUR or KDUR, if the element type is VDU or KDU
7276 if element_type
in ("VDU", "KDU"):
7277 for _
in range(360):
7278 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
7279 if vdu_id
and vdu_index
is not None:
7283 for x
in get_iterable(db_vnfr
, "vdur")
7285 x
.get("vdu-id-ref") == vdu_id
7286 and x
.get("count-index") == vdu_index
7291 if vdur
.get("name"):
7292 vdur_name
= vdur
.get("name")
7294 if kdu_name
and kdu_index
is not None:
7298 for x
in get_iterable(db_vnfr
, "kdur")
7300 x
.get("kdu-name") == kdu_name
7301 and x
.get("count-index") == kdu_index
7306 if kdur
.get("name"):
7307 kdur_name
= kdur
.get("name")
7310 await asyncio
.sleep(10, loop
=self
.loop
)
7312 if vdu_id
and vdu_index
is not None:
7314 f
"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
7316 if kdu_name
and kdu_index
is not None:
7318 f
"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
7322 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7323 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7325 vnfr_id
= vnfr_id
.replace("-", "")
7327 "JOB_NAME": vnfr_id
,
7328 "TARGET_IP": target_ip
,
7329 "EXPORTER_POD_IP": host_name
,
7330 "EXPORTER_POD_PORT": host_port
,
7332 "VNF_MEMBER_INDEX": vnf_member_index
,
7333 "VDUR_NAME": vdur_name
,
7334 "KDUR_NAME": kdur_name
,
7335 "ELEMENT_TYPE": element_type
,
7337 job_list
= parse_job(job_data
, variables
)
7338 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7339 for job
in job_list
:
7341 not isinstance(job
.get("job_name"), str)
7342 or vnfr_id
not in job
["job_name"]
7344 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7345 job
["nsr_id"] = nsr_id
7346 job
["vnfr_id"] = vnfr_id
7349 async def rebuild_start_stop(
7350 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7352 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7353 self
.logger
.info(logging_text
+ "Enter")
7354 stage
= ["Preparing the environment", ""]
7355 # database nsrs record
7359 # in case of error, indicates what part of scale was failed to put nsr at error status
7360 start_deploy
= time()
7362 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7363 vim_account_id
= db_vnfr
.get("vim-account-id")
7364 vim_info_key
= "vim:" + vim_account_id
7365 vdu_id
= additional_param
["vdu_id"]
7366 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7367 vdur
= find_in_list(
7368 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7371 vdu_vim_name
= vdur
["name"]
7372 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7373 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7375 raise LcmException("Target vdu is not found")
7376 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7377 # wait for any previous tasks in process
7378 stage
[1] = "Waiting for previous operations to terminate"
7379 self
.logger
.info(stage
[1])
7380 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7382 stage
[1] = "Reading from database."
7383 self
.logger
.info(stage
[1])
7384 self
._write
_ns
_status
(
7387 current_operation
=operation_type
.upper(),
7388 current_operation_id
=nslcmop_id
,
7390 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7393 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7394 db_nsr_update
["operational-status"] = operation_type
7395 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7399 "vim_vm_id": vim_vm_id
,
7401 "vdu_index": additional_param
["count-index"],
7402 "vdu_id": vdur
["id"],
7403 "target_vim": target_vim
,
7404 "vim_account_id": vim_account_id
,
7407 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7408 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7409 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7410 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7411 self
.logger
.info("response from RO: {}".format(result_dict
))
7412 action_id
= result_dict
["action_id"]
7413 await self
._wait
_ng
_ro
(
7418 self
.timeout
.operate
,
7420 "start_stop_rebuild",
7422 return "COMPLETED", "Done"
7423 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7424 self
.logger
.error("Exit Exception {}".format(e
))
7426 except asyncio
.CancelledError
:
7427 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7428 exc
= "Operation was cancelled"
7429 except Exception as e
:
7430 exc
= traceback
.format_exc()
7431 self
.logger
.critical(
7432 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7434 return "FAILED", "Error in operate VNF {}".format(exc
)
7436 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7438 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7440 :param: vim_account_id: VIM Account ID
7442 :return: (cloud_name, cloud_credential)
7444 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7445 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7447 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7449 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7451 :param: vim_account_id: VIM Account ID
7453 :return: (cloud_name, cloud_credential)
7455 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7456 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7458 async def migrate(self
, nsr_id
, nslcmop_id
):
7460 Migrate VNFs and VDUs instances in a NS
7462 :param: nsr_id: NS Instance ID
7463 :param: nslcmop_id: nslcmop ID of migrate
7466 # Try to lock HA task here
7467 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7468 if not task_is_locked_by_me
:
7470 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7471 self
.logger
.debug(logging_text
+ "Enter")
7472 # get all needed from database
7474 db_nslcmop_update
= {}
7475 nslcmop_operation_state
= None
7479 # in case of error, indicates what part of scale was failed to put nsr at error status
7480 start_deploy
= time()
7483 # wait for any previous tasks in process
7484 step
= "Waiting for previous operations to terminate"
7485 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7487 self
._write
_ns
_status
(
7490 current_operation
="MIGRATING",
7491 current_operation_id
=nslcmop_id
,
7493 step
= "Getting nslcmop from database"
7495 step
+ " after having waited for previous tasks to be completed"
7497 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7498 migrate_params
= db_nslcmop
.get("operationParams")
7501 target
.update(migrate_params
)
7502 desc
= await self
.RO
.migrate(nsr_id
, target
)
7503 self
.logger
.debug("RO return > {}".format(desc
))
7504 action_id
= desc
["action_id"]
7505 await self
._wait
_ng
_ro
(
7510 self
.timeout
.migrate
,
7511 operation
="migrate",
7513 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7514 self
.logger
.error("Exit Exception {}".format(e
))
7516 except asyncio
.CancelledError
:
7517 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7518 exc
= "Operation was cancelled"
7519 except Exception as e
:
7520 exc
= traceback
.format_exc()
7521 self
.logger
.critical(
7522 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7525 self
._write
_ns
_status
(
7528 current_operation
="IDLE",
7529 current_operation_id
=None,
7532 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7533 nslcmop_operation_state
= "FAILED"
7535 nslcmop_operation_state
= "COMPLETED"
7536 db_nslcmop_update
["detailed-status"] = "Done"
7537 db_nsr_update
["detailed-status"] = "Done"
7539 self
._write
_op
_status
(
7543 operation_state
=nslcmop_operation_state
,
7544 other_update
=db_nslcmop_update
,
7546 if nslcmop_operation_state
:
7550 "nslcmop_id": nslcmop_id
,
7551 "operationState": nslcmop_operation_state
,
7553 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7554 except Exception as e
:
7556 logging_text
+ "kafka_write notification Exception {}".format(e
)
7558 self
.logger
.debug(logging_text
+ "Exit")
7559 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7561 async def heal(self
, nsr_id
, nslcmop_id
):
7565 :param nsr_id: ns instance to heal
7566 :param nslcmop_id: operation to run
7570 # Try to lock HA task here
7571 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7572 if not task_is_locked_by_me
:
7575 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7576 stage
= ["", "", ""]
7577 tasks_dict_info
= {}
7578 # ^ stage, step, VIM progress
7579 self
.logger
.debug(logging_text
+ "Enter")
7580 # get all needed from database
7582 db_nslcmop_update
= {}
7584 db_vnfrs
= {} # vnf's info indexed by _id
7586 old_operational_status
= ""
7587 old_config_status
= ""
7590 # wait for any previous tasks in process
7591 step
= "Waiting for previous operations to terminate"
7592 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7593 self
._write
_ns
_status
(
7596 current_operation
="HEALING",
7597 current_operation_id
=nslcmop_id
,
7600 step
= "Getting nslcmop from database"
7602 step
+ " after having waited for previous tasks to be completed"
7604 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7606 step
= "Getting nsr from database"
7607 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7608 old_operational_status
= db_nsr
["operational-status"]
7609 old_config_status
= db_nsr
["config-status"]
7612 "_admin.deployed.RO.operational-status": "healing",
7614 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7616 step
= "Sending heal order to VIM"
7618 logging_text
=logging_text
,
7620 db_nslcmop
=db_nslcmop
,
7625 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7626 self
.logger
.debug(logging_text
+ stage
[1])
7627 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7628 self
.fs
.sync(db_nsr
["nsd-id"])
7630 # read from db: vnfr's of this ns
7631 step
= "Getting vnfrs from db"
7632 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7633 for vnfr
in db_vnfrs_list
:
7634 db_vnfrs
[vnfr
["_id"]] = vnfr
7635 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7637 # Check for each target VNF
7638 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7639 for target_vnf
in target_list
:
7640 # Find this VNF in the list from DB
7641 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7643 db_vnfr
= db_vnfrs
[vnfr_id
]
7644 vnfd_id
= db_vnfr
.get("vnfd-id")
7645 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7646 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7647 base_folder
= vnfd
["_admin"]["storage"]
7652 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7653 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7655 # Check each target VDU and deploy N2VC
7656 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7659 if not target_vdu_list
:
7660 # Codigo nuevo para crear diccionario
7661 target_vdu_list
= []
7662 for existing_vdu
in db_vnfr
.get("vdur"):
7663 vdu_name
= existing_vdu
.get("vdu-name", None)
7664 vdu_index
= existing_vdu
.get("count-index", 0)
7665 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7668 vdu_to_be_healed
= {
7670 "count-index": vdu_index
,
7671 "run-day1": vdu_run_day1
,
7673 target_vdu_list
.append(vdu_to_be_healed
)
7674 for target_vdu
in target_vdu_list
:
7675 deploy_params_vdu
= target_vdu
7676 # Set run-day1 vnf level value if not vdu level value exists
7677 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7680 deploy_params_vdu
["run-day1"] = target_vnf
[
7683 vdu_name
= target_vdu
.get("vdu-id", None)
7684 # TODO: Get vdu_id from vdud.
7686 # For multi instance VDU count-index is mandatory
7687 # For single session VDU count-indes is 0
7688 vdu_index
= target_vdu
.get("count-index", 0)
7690 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7691 stage
[1] = "Deploying Execution Environments."
7692 self
.logger
.debug(logging_text
+ stage
[1])
7694 # VNF Level charm. Normal case when proxy charms.
7695 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7696 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7697 if descriptor_config
:
7698 # Continue if healed machine is management machine
7699 vnf_ip_address
= db_vnfr
.get("ip-address")
7700 target_instance
= None
7701 for instance
in db_vnfr
.get("vdur", None):
7703 instance
["vdu-name"] == vdu_name
7704 and instance
["count-index"] == vdu_index
7706 target_instance
= instance
7708 if vnf_ip_address
== target_instance
.get("ip-address"):
7710 logging_text
=logging_text
7711 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7712 member_vnf_index
, vdu_name
, vdu_index
7716 nslcmop_id
=nslcmop_id
,
7722 member_vnf_index
=member_vnf_index
,
7725 deploy_params
=deploy_params_vdu
,
7726 descriptor_config
=descriptor_config
,
7727 base_folder
=base_folder
,
7728 task_instantiation_info
=tasks_dict_info
,
7732 # VDU Level charm. Normal case with native charms.
7733 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7734 if descriptor_config
:
7736 logging_text
=logging_text
7737 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7738 member_vnf_index
, vdu_name
, vdu_index
7742 nslcmop_id
=nslcmop_id
,
7748 member_vnf_index
=member_vnf_index
,
7749 vdu_index
=vdu_index
,
7751 deploy_params
=deploy_params_vdu
,
7752 descriptor_config
=descriptor_config
,
7753 base_folder
=base_folder
,
7754 task_instantiation_info
=tasks_dict_info
,
7759 ROclient
.ROClientException
,
7764 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7766 except asyncio
.CancelledError
:
7768 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7770 exc
= "Operation was cancelled"
7771 except Exception as e
:
7772 exc
= traceback
.format_exc()
7773 self
.logger
.critical(
7774 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7779 stage
[1] = "Waiting for healing pending tasks."
7780 self
.logger
.debug(logging_text
+ stage
[1])
7781 exc
= await self
._wait
_for
_tasks
(
7784 self
.timeout
.ns_deploy
,
7792 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7793 nslcmop_operation_state
= "FAILED"
7795 db_nsr_update
["operational-status"] = old_operational_status
7796 db_nsr_update
["config-status"] = old_config_status
7799 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7800 for task
, task_name
in tasks_dict_info
.items():
7801 if not task
.done() or task
.cancelled() or task
.exception():
7802 if task_name
.startswith(self
.task_name_deploy_vca
):
7803 # A N2VC task is pending
7804 db_nsr_update
["config-status"] = "failed"
7806 # RO task is pending
7807 db_nsr_update
["operational-status"] = "failed"
7809 error_description_nslcmop
= None
7810 nslcmop_operation_state
= "COMPLETED"
7811 db_nslcmop_update
["detailed-status"] = "Done"
7812 db_nsr_update
["detailed-status"] = "Done"
7813 db_nsr_update
["operational-status"] = "running"
7814 db_nsr_update
["config-status"] = "configured"
7816 self
._write
_op
_status
(
7819 error_message
=error_description_nslcmop
,
7820 operation_state
=nslcmop_operation_state
,
7821 other_update
=db_nslcmop_update
,
7824 self
._write
_ns
_status
(
7827 current_operation
="IDLE",
7828 current_operation_id
=None,
7829 other_update
=db_nsr_update
,
7832 if nslcmop_operation_state
:
7836 "nslcmop_id": nslcmop_id
,
7837 "operationState": nslcmop_operation_state
,
7839 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7840 except Exception as e
:
7842 logging_text
+ "kafka_write notification Exception {}".format(e
)
7844 self
.logger
.debug(logging_text
+ "Exit")
7845 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7856 :param logging_text: preffix text to use at logging
7857 :param nsr_id: nsr identity
7858 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7859 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7860 :return: None or exception
7863 def get_vim_account(vim_account_id
):
7865 if vim_account_id
in db_vims
:
7866 return db_vims
[vim_account_id
]
7867 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7868 db_vims
[vim_account_id
] = db_vim
7873 ns_params
= db_nslcmop
.get("operationParams")
7874 if ns_params
and ns_params
.get("timeout_ns_heal"):
7875 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7877 timeout_ns_heal
= self
.timeout
.ns_heal
7881 nslcmop_id
= db_nslcmop
["_id"]
7883 "action_id": nslcmop_id
,
7885 self
.logger
.warning(
7886 "db_nslcmop={} and timeout_ns_heal={}".format(
7887 db_nslcmop
, timeout_ns_heal
7890 target
.update(db_nslcmop
.get("operationParams", {}))
7892 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7893 desc
= await self
.RO
.recreate(nsr_id
, target
)
7894 self
.logger
.debug("RO return > {}".format(desc
))
7895 action_id
= desc
["action_id"]
7896 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7897 await self
._wait
_ng
_ro
(
7904 operation
="healing",
7909 "_admin.deployed.RO.operational-status": "running",
7910 "detailed-status": " ".join(stage
),
7912 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7913 self
._write
_op
_status
(nslcmop_id
, stage
)
7915 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7918 except Exception as e
:
7919 stage
[2] = "ERROR healing at VIM"
7920 # self.set_vnfr_at_error(db_vnfrs, str(e))
7922 "Error healing at VIM {}".format(e
),
7923 exc_info
=not isinstance(
7926 ROclient
.ROClientException
,
7952 task_instantiation_info
,
7955 # launch instantiate_N2VC in a asyncio task and register task object
7956 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7957 # if not found, create one entry and update database
7958 # fill db_nsr._admin.deployed.VCA.<index>
7961 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7965 get_charm_name
= False
7966 if "execution-environment-list" in descriptor_config
:
7967 ee_list
= descriptor_config
.get("execution-environment-list", [])
7968 elif "juju" in descriptor_config
:
7969 ee_list
= [descriptor_config
] # ns charms
7970 if "execution-environment-list" not in descriptor_config
:
7971 # charm name is only required for ns charms
7972 get_charm_name
= True
7973 else: # other types as script are not supported
7976 for ee_item
in ee_list
:
7979 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7980 ee_item
.get("juju"), ee_item
.get("helm-chart")
7983 ee_descriptor_id
= ee_item
.get("id")
7984 if ee_item
.get("juju"):
7985 vca_name
= ee_item
["juju"].get("charm")
7987 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7990 if ee_item
["juju"].get("charm") is not None
7993 if ee_item
["juju"].get("cloud") == "k8s":
7994 vca_type
= "k8s_proxy_charm"
7995 elif ee_item
["juju"].get("proxy") is False:
7996 vca_type
= "native_charm"
7997 elif ee_item
.get("helm-chart"):
7998 vca_name
= ee_item
["helm-chart"]
7999 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8002 vca_type
= "helm-v3"
8005 logging_text
+ "skipping non juju neither charm configuration"
8010 for vca_index
, vca_deployed
in enumerate(
8011 db_nsr
["_admin"]["deployed"]["VCA"]
8013 if not vca_deployed
:
8016 vca_deployed
.get("member-vnf-index") == member_vnf_index
8017 and vca_deployed
.get("vdu_id") == vdu_id
8018 and vca_deployed
.get("kdu_name") == kdu_name
8019 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8020 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8024 # not found, create one.
8026 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8029 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8031 target
+= "/kdu/{}".format(kdu_name
)
8033 "target_element": target
,
8034 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8035 "member-vnf-index": member_vnf_index
,
8037 "kdu_name": kdu_name
,
8038 "vdu_count_index": vdu_index
,
8039 "operational-status": "init", # TODO revise
8040 "detailed-status": "", # TODO revise
8041 "step": "initial-deploy", # TODO revise
8043 "vdu_name": vdu_name
,
8045 "ee_descriptor_id": ee_descriptor_id
,
8046 "charm_name": charm_name
,
8050 # create VCA and configurationStatus in db
8052 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8053 "configurationStatus.{}".format(vca_index
): dict(),
8055 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8057 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8059 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8060 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8061 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8064 task_n2vc
= asyncio
.ensure_future(
8066 logging_text
=logging_text
,
8067 vca_index
=vca_index
,
8073 vdu_index
=vdu_index
,
8074 deploy_params
=deploy_params
,
8075 config_descriptor
=descriptor_config
,
8076 base_folder
=base_folder
,
8077 nslcmop_id
=nslcmop_id
,
8081 ee_config_descriptor
=ee_item
,
8084 self
.lcm_tasks
.register(
8088 "instantiate_N2VC-{}".format(vca_index
),
8091 task_instantiation_info
[
8093 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8094 member_vnf_index
or "", vdu_id
or ""
8097 async def heal_N2VC(
8114 ee_config_descriptor
,
8116 nsr_id
= db_nsr
["_id"]
8117 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8118 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8119 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8120 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8122 "collection": "nsrs",
8123 "filter": {"_id": nsr_id
},
8124 "path": db_update_entry
,
8129 element_under_configuration
= nsr_id
8133 vnfr_id
= db_vnfr
["_id"]
8134 osm_config
["osm"]["vnf_id"] = vnfr_id
8136 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8138 if vca_type
== "native_charm":
8141 index_number
= vdu_index
or 0
8144 element_type
= "VNF"
8145 element_under_configuration
= vnfr_id
8146 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8148 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8149 element_type
= "VDU"
8150 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8151 osm_config
["osm"]["vdu_id"] = vdu_id
8153 namespace
+= ".{}".format(kdu_name
)
8154 element_type
= "KDU"
8155 element_under_configuration
= kdu_name
8156 osm_config
["osm"]["kdu_name"] = kdu_name
8159 if base_folder
["pkg-dir"]:
8160 artifact_path
= "{}/{}/{}/{}".format(
8161 base_folder
["folder"],
8162 base_folder
["pkg-dir"],
8165 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8170 artifact_path
= "{}/Scripts/{}/{}/".format(
8171 base_folder
["folder"],
8174 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8179 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8181 # get initial_config_primitive_list that applies to this element
8182 initial_config_primitive_list
= config_descriptor
.get(
8183 "initial-config-primitive"
8187 "Initial config primitive list > {}".format(
8188 initial_config_primitive_list
8192 # add config if not present for NS charm
8193 ee_descriptor_id
= ee_config_descriptor
.get("id")
8194 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8195 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8196 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8200 "Initial config primitive list #2 > {}".format(
8201 initial_config_primitive_list
8204 # n2vc_redesign STEP 3.1
8205 # find old ee_id if exists
8206 ee_id
= vca_deployed
.get("ee_id")
8208 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8209 # create or register execution environment in VCA. Only for native charms when healing
8210 if vca_type
== "native_charm":
8211 step
= "Waiting to VM being up and getting IP address"
8212 self
.logger
.debug(logging_text
+ step
)
8213 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8222 credentials
= {"hostname": rw_mgmt_ip
}
8224 username
= deep_get(
8225 config_descriptor
, ("config-access", "ssh-access", "default-user")
8227 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8228 # merged. Meanwhile let's get username from initial-config-primitive
8229 if not username
and initial_config_primitive_list
:
8230 for config_primitive
in initial_config_primitive_list
:
8231 for param
in config_primitive
.get("parameter", ()):
8232 if param
["name"] == "ssh-username":
8233 username
= param
["value"]
8237 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8238 "'config-access.ssh-access.default-user'"
8240 credentials
["username"] = username
8242 # n2vc_redesign STEP 3.2
8243 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8244 self
._write
_configuration
_status
(
8246 vca_index
=vca_index
,
8247 status
="REGISTERING",
8248 element_under_configuration
=element_under_configuration
,
8249 element_type
=element_type
,
8252 step
= "register execution environment {}".format(credentials
)
8253 self
.logger
.debug(logging_text
+ step
)
8254 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8255 credentials
=credentials
,
8256 namespace
=namespace
,
8261 # update ee_id en db
8263 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8265 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8267 # for compatibility with MON/POL modules, the need model and application name at database
8268 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8269 # Not sure if this need to be done when healing
8271 ee_id_parts = ee_id.split(".")
8272 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8273 if len(ee_id_parts) >= 2:
8274 model_name = ee_id_parts[0]
8275 application_name = ee_id_parts[1]
8276 db_nsr_update[db_update_entry + "model"] = model_name
8277 db_nsr_update[db_update_entry + "application"] = application_name
8280 # n2vc_redesign STEP 3.3
8281 # Install configuration software. Only for native charms.
8282 step
= "Install configuration Software"
8284 self
._write
_configuration
_status
(
8286 vca_index
=vca_index
,
8287 status
="INSTALLING SW",
8288 element_under_configuration
=element_under_configuration
,
8289 element_type
=element_type
,
8290 # other_update=db_nsr_update,
8294 # TODO check if already done
8295 self
.logger
.debug(logging_text
+ step
)
8297 if vca_type
== "native_charm":
8298 config_primitive
= next(
8299 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8302 if config_primitive
:
8303 config
= self
._map
_primitive
_params
(
8304 config_primitive
, {}, deploy_params
8306 await self
.vca_map
[vca_type
].install_configuration_sw(
8308 artifact_path
=artifact_path
,
8316 # write in db flag of configuration_sw already installed
8318 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8321 # Not sure if this need to be done when healing
8323 # add relations for this VCA (wait for other peers related with this VCA)
8324 await self._add_vca_relations(
8325 logging_text=logging_text,
8328 vca_index=vca_index,
8332 # if SSH access is required, then get execution environment SSH public
8333 # if native charm we have waited already to VM be UP
8334 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8337 # self.logger.debug("get ssh key block")
8339 config_descriptor
, ("config-access", "ssh-access", "required")
8341 # self.logger.debug("ssh key needed")
8342 # Needed to inject a ssh key
8345 ("config-access", "ssh-access", "default-user"),
8347 step
= "Install configuration Software, getting public ssh key"
8348 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8349 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8352 step
= "Insert public key into VM user={} ssh_key={}".format(
8356 # self.logger.debug("no need to get ssh key")
8357 step
= "Waiting to VM being up and getting IP address"
8358 self
.logger
.debug(logging_text
+ step
)
8360 # n2vc_redesign STEP 5.1
8361 # wait for RO (ip-address) Insert pub_key into VM
8362 # IMPORTANT: We need do wait for RO to complete healing operation.
8363 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout
.ns_heal
)
8366 rw_mgmt_ip
= await self
.wait_kdu_up(
8367 logging_text
, nsr_id
, vnfr_id
, kdu_name
8370 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8380 rw_mgmt_ip
= None # This is for a NS configuration
8382 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8384 # store rw_mgmt_ip in deploy params for later replacement
8385 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8388 # get run-day1 operation parameter
8389 runDay1
= deploy_params
.get("run-day1", False)
8391 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8394 # n2vc_redesign STEP 6 Execute initial config primitive
8395 step
= "execute initial config primitive"
8397 # wait for dependent primitives execution (NS -> VNF -> VDU)
8398 if initial_config_primitive_list
:
8399 await self
._wait
_dependent
_n
2vc
(
8400 nsr_id
, vca_deployed_list
, vca_index
8403 # stage, in function of element type: vdu, kdu, vnf or ns
8404 my_vca
= vca_deployed_list
[vca_index
]
8405 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8407 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8408 elif my_vca
.get("member-vnf-index"):
8410 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8413 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8415 self
._write
_configuration
_status
(
8416 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8419 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8421 check_if_terminated_needed
= True
8422 for initial_config_primitive
in initial_config_primitive_list
:
8423 # adding information on the vca_deployed if it is a NS execution environment
8424 if not vca_deployed
["member-vnf-index"]:
8425 deploy_params
["ns_config_info"] = json
.dumps(
8426 self
._get
_ns
_config
_info
(nsr_id
)
8428 # TODO check if already done
8429 primitive_params_
= self
._map
_primitive
_params
(
8430 initial_config_primitive
, {}, deploy_params
8433 step
= "execute primitive '{}' params '{}'".format(
8434 initial_config_primitive
["name"], primitive_params_
8436 self
.logger
.debug(logging_text
+ step
)
8437 await self
.vca_map
[vca_type
].exec_primitive(
8439 primitive_name
=initial_config_primitive
["name"],
8440 params_dict
=primitive_params_
,
8445 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8446 if check_if_terminated_needed
:
8447 if config_descriptor
.get("terminate-config-primitive"):
8451 {db_update_entry
+ "needed_terminate": True},
8453 check_if_terminated_needed
= False
8455 # TODO register in database that primitive is done
8457 # STEP 7 Configure metrics
8458 # Not sure if this need to be done when healing
8460 if vca_type == "helm" or vca_type == "helm-v3":
8461 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8463 artifact_path=artifact_path,
8464 ee_config_descriptor=ee_config_descriptor,
8467 target_ip=rw_mgmt_ip,
8473 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8476 for job in prometheus_jobs:
8479 {"job_name": job["job_name"]},
8482 fail_on_empty=False,
8486 step
= "instantiated at VCA"
8487 self
.logger
.debug(logging_text
+ step
)
8489 self
._write
_configuration
_status
(
8490 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8493 except Exception as e
: # TODO not use Exception but N2VC exception
8494 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8496 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8499 "Exception while {} : {}".format(step
, e
), exc_info
=True
8501 self
._write
_configuration
_status
(
8502 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8504 raise LcmException("{} {}".format(step
, e
)) from e
8506 async def _wait_heal_ro(
8512 while time() <= start_time
+ timeout
:
8513 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8514 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8515 "operational-status"
8517 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8518 if operational_status_ro
!= "healing":
8520 await asyncio
.sleep(15, loop
=self
.loop
)
8521 else: # timeout_ns_deploy
8522 raise NgRoException("Timeout waiting ns to deploy")
8524 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8526 Vertical Scale the VDUs in a NS
8528 :param: nsr_id: NS Instance ID
8529 :param: nslcmop_id: nslcmop ID of migrate
8532 # Try to lock HA task here
8533 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8534 if not task_is_locked_by_me
:
8536 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8537 self
.logger
.debug(logging_text
+ "Enter")
8538 # get all needed from database
8540 db_nslcmop_update
= {}
8541 nslcmop_operation_state
= None
8545 # in case of error, indicates what part of scale was failed to put nsr at error status
8546 start_deploy
= time()
8549 # wait for any previous tasks in process
8550 step
= "Waiting for previous operations to terminate"
8551 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8553 self
._write
_ns
_status
(
8556 current_operation
="VerticalScale",
8557 current_operation_id
=nslcmop_id
,
8559 step
= "Getting nslcmop from database"
8561 step
+ " after having waited for previous tasks to be completed"
8563 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8564 operationParams
= db_nslcmop
.get("operationParams")
8566 target
.update(operationParams
)
8567 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8568 self
.logger
.debug("RO return > {}".format(desc
))
8569 action_id
= desc
["action_id"]
8570 await self
._wait
_ng
_ro
(
8575 self
.timeout
.verticalscale
,
8576 operation
="verticalscale",
8578 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8579 self
.logger
.error("Exit Exception {}".format(e
))
8581 except asyncio
.CancelledError
:
8582 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8583 exc
= "Operation was cancelled"
8584 except Exception as e
:
8585 exc
= traceback
.format_exc()
8586 self
.logger
.critical(
8587 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8590 self
._write
_ns
_status
(
8593 current_operation
="IDLE",
8594 current_operation_id
=None,
8597 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8598 nslcmop_operation_state
= "FAILED"
8600 nslcmop_operation_state
= "COMPLETED"
8601 db_nslcmop_update
["detailed-status"] = "Done"
8602 db_nsr_update
["detailed-status"] = "Done"
8604 self
._write
_op
_status
(
8608 operation_state
=nslcmop_operation_state
,
8609 other_update
=db_nslcmop_update
,
8611 if nslcmop_operation_state
:
8615 "nslcmop_id": nslcmop_id
,
8616 "operationState": nslcmop_operation_state
,
8618 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8619 except Exception as e
:
8621 logging_text
+ "kafka_write notification Exception {}".format(e
)
8623 self
.logger
.debug(logging_text
+ "Exit")
8624 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")