1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
228 "verticalscale": self
.RO
.status
,
232 def increment_ip_mac(ip_mac
, vm_index
=1):
233 if not isinstance(ip_mac
, str):
236 # try with ipv4 look for last dot
237 i
= ip_mac
.rfind(".")
240 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
241 # try with ipv6 or mac look for last colon. Operate in hex
242 i
= ip_mac
.rfind(":")
245 # format in hex, len can be 2 for mac or 4 for ipv6
246 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
247 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
253 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
255 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
258 # TODO filter RO descriptor fields...
262 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
263 db_dict
["deploymentStatus"] = ro_descriptor
264 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
266 except Exception as e
:
268 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
271 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
273 # remove last dot from path (if exists)
274 if path
.endswith("."):
277 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
278 # .format(table, filter, path, updated_data))
281 nsr_id
= filter.get("_id")
283 # read ns record from database
284 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
285 current_ns_status
= nsr
.get("nsState")
287 # get vca status for NS
288 status_dict
= await self
.n2vc
.get_status(
289 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
294 db_dict
["vcaStatus"] = status_dict
296 # update configurationStatus for this VCA
298 vca_index
= int(path
[path
.rfind(".") + 1 :])
301 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
303 vca_status
= vca_list
[vca_index
].get("status")
305 configuration_status_list
= nsr
.get("configurationStatus")
306 config_status
= configuration_status_list
[vca_index
].get("status")
308 if config_status
== "BROKEN" and vca_status
!= "failed":
309 db_dict
["configurationStatus"][vca_index
] = "READY"
310 elif config_status
!= "BROKEN" and vca_status
== "failed":
311 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
312 except Exception as e
:
313 # not update configurationStatus
314 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
316 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
317 # if nsState = 'DEGRADED' check if all is OK
319 if current_ns_status
in ("READY", "DEGRADED"):
320 error_description
= ""
322 if status_dict
.get("machines"):
323 for machine_id
in status_dict
.get("machines"):
324 machine
= status_dict
.get("machines").get(machine_id
)
325 # check machine agent-status
326 if machine
.get("agent-status"):
327 s
= machine
.get("agent-status").get("status")
330 error_description
+= (
331 "machine {} agent-status={} ; ".format(
335 # check machine instance status
336 if machine
.get("instance-status"):
337 s
= machine
.get("instance-status").get("status")
340 error_description
+= (
341 "machine {} instance-status={} ; ".format(
346 if status_dict
.get("applications"):
347 for app_id
in status_dict
.get("applications"):
348 app
= status_dict
.get("applications").get(app_id
)
349 # check application status
350 if app
.get("status"):
351 s
= app
.get("status").get("status")
354 error_description
+= (
355 "application {} status={} ; ".format(app_id
, s
)
358 if error_description
:
359 db_dict
["errorDescription"] = error_description
360 if current_ns_status
== "READY" and is_degraded
:
361 db_dict
["nsState"] = "DEGRADED"
362 if current_ns_status
== "DEGRADED" and not is_degraded
:
363 db_dict
["nsState"] = "READY"
366 self
.update_db_2("nsrs", nsr_id
, db_dict
)
368 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
370 except Exception as e
:
371 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
373 async def _on_update_k8s_db(
374 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
377 Updating vca status in NSR record
378 :param cluster_uuid: UUID of a k8s cluster
379 :param kdu_instance: The unique name of the KDU instance
380 :param filter: To get nsr_id
381 :cluster_type: The cluster type (juju, k8s)
385 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
386 # .format(cluster_uuid, kdu_instance, filter))
388 nsr_id
= filter.get("_id")
390 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
391 cluster_uuid
=cluster_uuid
,
392 kdu_instance
=kdu_instance
,
394 complete_status
=True,
400 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
403 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
407 self
.update_db_2("nsrs", nsr_id
, db_dict
)
408 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
410 except Exception as e
:
411 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
414 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
416 env
= Environment(undefined
=StrictUndefined
)
417 template
= env
.from_string(cloud_init_text
)
418 return template
.render(additional_params
or {})
419 except UndefinedError
as e
:
421 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
422 "file, must be provided in the instantiation parameters inside the "
423 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
425 except (TemplateError
, TemplateNotFound
) as e
:
427 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
432 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
433 cloud_init_content
= cloud_init_file
= None
435 if vdu
.get("cloud-init-file"):
436 base_folder
= vnfd
["_admin"]["storage"]
437 if base_folder
["pkg-dir"]:
438 cloud_init_file
= "{}/{}/cloud_init/{}".format(
439 base_folder
["folder"],
440 base_folder
["pkg-dir"],
441 vdu
["cloud-init-file"],
444 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
445 base_folder
["folder"],
446 vdu
["cloud-init-file"],
448 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
449 cloud_init_content
= ci_file
.read()
450 elif vdu
.get("cloud-init"):
451 cloud_init_content
= vdu
["cloud-init"]
453 return cloud_init_content
454 except FsException
as e
:
456 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
457 vnfd
["id"], vdu
["id"], cloud_init_file
, e
461 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
463 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
465 additional_params
= vdur
.get("additionalParams")
466 return parse_yaml_strings(additional_params
)
468 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
470 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
471 :param vnfd: input vnfd
472 :param new_id: overrides vnf id if provided
473 :param additionalParams: Instantiation params for VNFs provided
474 :param nsrId: Id of the NSR
475 :return: copy of vnfd
477 vnfd_RO
= deepcopy(vnfd
)
478 # remove unused by RO configuration, monitoring, scaling and internal keys
479 vnfd_RO
.pop("_id", None)
480 vnfd_RO
.pop("_admin", None)
481 vnfd_RO
.pop("monitoring-param", None)
482 vnfd_RO
.pop("scaling-group-descriptor", None)
483 vnfd_RO
.pop("kdu", None)
484 vnfd_RO
.pop("k8s-cluster", None)
486 vnfd_RO
["id"] = new_id
488 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
489 for vdu
in get_iterable(vnfd_RO
, "vdu"):
490 vdu
.pop("cloud-init-file", None)
491 vdu
.pop("cloud-init", None)
495 def ip_profile_2_RO(ip_profile
):
496 RO_ip_profile
= deepcopy(ip_profile
)
497 if "dns-server" in RO_ip_profile
:
498 if isinstance(RO_ip_profile
["dns-server"], list):
499 RO_ip_profile
["dns-address"] = []
500 for ds
in RO_ip_profile
.pop("dns-server"):
501 RO_ip_profile
["dns-address"].append(ds
["address"])
503 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
504 if RO_ip_profile
.get("ip-version") == "ipv4":
505 RO_ip_profile
["ip-version"] = "IPv4"
506 if RO_ip_profile
.get("ip-version") == "ipv6":
507 RO_ip_profile
["ip-version"] = "IPv6"
508 if "dhcp-params" in RO_ip_profile
:
509 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
512 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
513 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
514 if db_vim
["_admin"]["operationalState"] != "ENABLED":
516 "VIM={} is not available. operationalState={}".format(
517 vim_account
, db_vim
["_admin"]["operationalState"]
520 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
523 def get_ro_wim_id_for_wim_account(self
, wim_account
):
524 if isinstance(wim_account
, str):
525 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
526 if db_wim
["_admin"]["operationalState"] != "ENABLED":
528 "WIM={} is not available. operationalState={}".format(
529 wim_account
, db_wim
["_admin"]["operationalState"]
532 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
537 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
539 db_vdu_push_list
= []
541 db_update
= {"_admin.modified": time()}
543 for vdu_id
, vdu_count
in vdu_create
.items():
547 for vdur
in reversed(db_vnfr
["vdur"])
548 if vdur
["vdu-id-ref"] == vdu_id
553 # Read the template saved in the db:
555 "No vdur in the database. Using the vdur-template to scale"
557 vdur_template
= db_vnfr
.get("vdur-template")
558 if not vdur_template
:
560 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
564 vdur
= vdur_template
[0]
565 # Delete a template from the database after using it
568 {"_id": db_vnfr
["_id"]},
570 pull
={"vdur-template": {"_id": vdur
["_id"]}},
572 for count
in range(vdu_count
):
573 vdur_copy
= deepcopy(vdur
)
574 vdur_copy
["status"] = "BUILD"
575 vdur_copy
["status-detailed"] = None
576 vdur_copy
["ip-address"] = None
577 vdur_copy
["_id"] = str(uuid4())
578 vdur_copy
["count-index"] += count
+ 1
579 vdur_copy
["id"] = "{}-{}".format(
580 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
582 vdur_copy
.pop("vim_info", None)
583 for iface
in vdur_copy
["interfaces"]:
584 if iface
.get("fixed-ip"):
585 iface
["ip-address"] = self
.increment_ip_mac(
586 iface
["ip-address"], count
+ 1
589 iface
.pop("ip-address", None)
590 if iface
.get("fixed-mac"):
591 iface
["mac-address"] = self
.increment_ip_mac(
592 iface
["mac-address"], count
+ 1
595 iface
.pop("mac-address", None)
599 ) # only first vdu can be managment of vnf
600 db_vdu_push_list
.append(vdur_copy
)
601 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
603 if len(db_vnfr
["vdur"]) == 1:
604 # The scale will move to 0 instances
606 "Scaling to 0 !, creating the template with the last vdur"
608 template_vdur
= [db_vnfr
["vdur"][0]]
609 for vdu_id
, vdu_count
in vdu_delete
.items():
611 indexes_to_delete
= [
613 for iv
in enumerate(db_vnfr
["vdur"])
614 if iv
[1]["vdu-id-ref"] == vdu_id
618 "vdur.{}.status".format(i
): "DELETING"
619 for i
in indexes_to_delete
[-vdu_count
:]
623 # it must be deleted one by one because common.db does not allow otherwise
626 for v
in reversed(db_vnfr
["vdur"])
627 if v
["vdu-id-ref"] == vdu_id
629 for vdu
in vdus_to_delete
[:vdu_count
]:
632 {"_id": db_vnfr
["_id"]},
634 pull
={"vdur": {"_id": vdu
["_id"]}},
638 db_push
["vdur"] = db_vdu_push_list
640 db_push
["vdur-template"] = template_vdur
643 db_vnfr
["vdur-template"] = template_vdur
644 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
645 # modify passed dictionary db_vnfr
646 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
647 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
649 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
651 Updates database nsr with the RO info for the created vld
652 :param ns_update_nsr: dictionary to be filled with the updated info
653 :param db_nsr: content of db_nsr. This is also modified
654 :param nsr_desc_RO: nsr descriptor from RO
655 :return: Nothing, LcmException is raised on errors
658 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
659 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
660 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
662 vld
["vim-id"] = net_RO
.get("vim_net_id")
663 vld
["name"] = net_RO
.get("vim_name")
664 vld
["status"] = net_RO
.get("status")
665 vld
["status-detailed"] = net_RO
.get("error_msg")
666 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
670 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
673 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
675 for db_vnfr
in db_vnfrs
.values():
676 vnfr_update
= {"status": "ERROR"}
677 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
678 if "status" not in vdur
:
679 vdur
["status"] = "ERROR"
680 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
682 vdur
["status-detailed"] = str(error_text
)
684 "vdur.{}.status-detailed".format(vdu_index
)
686 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
687 except DbException
as e
:
688 self
.logger
.error("Cannot update vnf. {}".format(e
))
690 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
692 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
693 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
694 :param nsr_desc_RO: nsr descriptor from RO
695 :return: Nothing, LcmException is raised on errors
697 for vnf_index
, db_vnfr
in db_vnfrs
.items():
698 for vnf_RO
in nsr_desc_RO
["vnfs"]:
699 if vnf_RO
["member_vnf_index"] != vnf_index
:
702 if vnf_RO
.get("ip_address"):
703 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
706 elif not db_vnfr
.get("ip-address"):
707 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
708 raise LcmExceptionNoMgmtIP(
709 "ns member_vnf_index '{}' has no IP address".format(
714 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
715 vdur_RO_count_index
= 0
716 if vdur
.get("pdu-type"):
718 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
719 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
721 if vdur
["count-index"] != vdur_RO_count_index
:
722 vdur_RO_count_index
+= 1
724 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
725 if vdur_RO
.get("ip_address"):
726 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
728 vdur
["ip-address"] = None
729 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
730 vdur
["name"] = vdur_RO
.get("vim_name")
731 vdur
["status"] = vdur_RO
.get("status")
732 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
733 for ifacer
in get_iterable(vdur
, "interfaces"):
734 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
735 if ifacer
["name"] == interface_RO
.get("internal_name"):
736 ifacer
["ip-address"] = interface_RO
.get(
739 ifacer
["mac-address"] = interface_RO
.get(
745 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
746 "from VIM info".format(
747 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
750 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
754 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
756 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
760 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
761 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
762 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
764 vld
["vim-id"] = net_RO
.get("vim_net_id")
765 vld
["name"] = net_RO
.get("vim_name")
766 vld
["status"] = net_RO
.get("status")
767 vld
["status-detailed"] = net_RO
.get("error_msg")
768 vnfr_update
["vld.{}".format(vld_index
)] = vld
772 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
777 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
782 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
787 def _get_ns_config_info(self
, nsr_id
):
789 Generates a mapping between vnf,vdu elements and the N2VC id
790 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
791 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
792 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
793 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
795 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
796 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
798 ns_config_info
= {"osm-config-mapping": mapping
}
799 for vca
in vca_deployed_list
:
800 if not vca
["member-vnf-index"]:
802 if not vca
["vdu_id"]:
803 mapping
[vca
["member-vnf-index"]] = vca
["application"]
807 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
809 ] = vca
["application"]
810 return ns_config_info
812 async def _instantiate_ng_ro(
829 def get_vim_account(vim_account_id
):
831 if vim_account_id
in db_vims
:
832 return db_vims
[vim_account_id
]
833 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
834 db_vims
[vim_account_id
] = db_vim
837 # modify target_vld info with instantiation parameters
838 def parse_vld_instantiation_params(
839 target_vim
, target_vld
, vld_params
, target_sdn
841 if vld_params
.get("ip-profile"):
842 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
845 if vld_params
.get("provider-network"):
846 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
849 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
850 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
853 if vld_params
.get("wimAccountId"):
854 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
855 target_vld
["vim_info"][target_wim
] = {}
856 for param
in ("vim-network-name", "vim-network-id"):
857 if vld_params
.get(param
):
858 if isinstance(vld_params
[param
], dict):
859 for vim
, vim_net
in vld_params
[param
].items():
860 other_target_vim
= "vim:" + vim
862 target_vld
["vim_info"],
863 (other_target_vim
, param
.replace("-", "_")),
866 else: # isinstance str
867 target_vld
["vim_info"][target_vim
][
868 param
.replace("-", "_")
869 ] = vld_params
[param
]
870 if vld_params
.get("common_id"):
871 target_vld
["common_id"] = vld_params
.get("common_id")
873 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
874 def update_ns_vld_target(target
, ns_params
):
875 for vnf_params
in ns_params
.get("vnf", ()):
876 if vnf_params
.get("vimAccountId"):
880 for vnfr
in db_vnfrs
.values()
881 if vnf_params
["member-vnf-index"]
882 == vnfr
["member-vnf-index-ref"]
886 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
887 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
888 target_vld
= find_in_list(
889 get_iterable(vdur
, "interfaces"),
890 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
893 vld_params
= find_in_list(
894 get_iterable(ns_params
, "vld"),
895 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
899 if vnf_params
.get("vimAccountId") not in a_vld
.get(
902 target_vim_network_list
= [
903 v
for _
, v
in a_vld
.get("vim_info").items()
905 target_vim_network_name
= next(
907 item
.get("vim_network_name", "")
908 for item
in target_vim_network_list
913 target
["ns"]["vld"][a_index
].get("vim_info").update(
915 "vim:{}".format(vnf_params
["vimAccountId"]): {
916 "vim_network_name": target_vim_network_name
,
922 for param
in ("vim-network-name", "vim-network-id"):
923 if vld_params
.get(param
) and isinstance(
924 vld_params
[param
], dict
926 for vim
, vim_net
in vld_params
[
929 other_target_vim
= "vim:" + vim
931 target
["ns"]["vld"][a_index
].get(
936 param
.replace("-", "_"),
941 nslcmop_id
= db_nslcmop
["_id"]
943 "name": db_nsr
["name"],
946 "image": deepcopy(db_nsr
["image"]),
947 "flavor": deepcopy(db_nsr
["flavor"]),
948 "action_id": nslcmop_id
,
949 "cloud_init_content": {},
951 for image
in target
["image"]:
952 image
["vim_info"] = {}
953 for flavor
in target
["flavor"]:
954 flavor
["vim_info"] = {}
955 if db_nsr
.get("affinity-or-anti-affinity-group"):
956 target
["affinity-or-anti-affinity-group"] = deepcopy(
957 db_nsr
["affinity-or-anti-affinity-group"]
959 for affinity_or_anti_affinity_group
in target
[
960 "affinity-or-anti-affinity-group"
962 affinity_or_anti_affinity_group
["vim_info"] = {}
964 if db_nslcmop
.get("lcmOperationType") != "instantiate":
965 # get parameters of instantiation:
966 db_nslcmop_instantiate
= self
.db
.get_list(
969 "nsInstanceId": db_nslcmop
["nsInstanceId"],
970 "lcmOperationType": "instantiate",
973 ns_params
= db_nslcmop_instantiate
.get("operationParams")
975 ns_params
= db_nslcmop
.get("operationParams")
976 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
977 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
980 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
981 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
985 "mgmt-network": vld
.get("mgmt-network", False),
986 "type": vld
.get("type"),
989 "vim_network_name": vld
.get("vim-network-name"),
990 "vim_account_id": ns_params
["vimAccountId"],
994 # check if this network needs SDN assist
995 if vld
.get("pci-interfaces"):
996 db_vim
= get_vim_account(ns_params
["vimAccountId"])
997 sdnc_id
= db_vim
["config"].get("sdn-controller")
999 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1000 target_sdn
= "sdn:{}".format(sdnc_id
)
1001 target_vld
["vim_info"][target_sdn
] = {
1003 "target_vim": target_vim
,
1005 "type": vld
.get("type"),
1008 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1009 for nsd_vnf_profile
in nsd_vnf_profiles
:
1010 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1011 if cp
["virtual-link-profile-id"] == vld
["id"]:
1013 "member_vnf:{}.{}".format(
1014 cp
["constituent-cpd-id"][0][
1015 "constituent-base-element-id"
1017 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1019 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1021 # check at nsd descriptor, if there is an ip-profile
1023 nsd_vlp
= find_in_list(
1024 get_virtual_link_profiles(nsd
),
1025 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1030 and nsd_vlp
.get("virtual-link-protocol-data")
1031 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1033 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1036 ip_profile_dest_data
= {}
1037 if "ip-version" in ip_profile_source_data
:
1038 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1041 if "cidr" in ip_profile_source_data
:
1042 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1045 if "gateway-ip" in ip_profile_source_data
:
1046 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1049 if "dhcp-enabled" in ip_profile_source_data
:
1050 ip_profile_dest_data
["dhcp-params"] = {
1051 "enabled": ip_profile_source_data
["dhcp-enabled"]
1053 vld_params
["ip-profile"] = ip_profile_dest_data
1055 # update vld_params with instantiation params
1056 vld_instantiation_params
= find_in_list(
1057 get_iterable(ns_params
, "vld"),
1058 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1060 if vld_instantiation_params
:
1061 vld_params
.update(vld_instantiation_params
)
1062 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1063 target
["ns"]["vld"].append(target_vld
)
1064 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1065 update_ns_vld_target(target
, ns_params
)
1067 for vnfr
in db_vnfrs
.values():
1068 vnfd
= find_in_list(
1069 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1071 vnf_params
= find_in_list(
1072 get_iterable(ns_params
, "vnf"),
1073 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1075 target_vnf
= deepcopy(vnfr
)
1076 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1077 for vld
in target_vnf
.get("vld", ()):
1078 # check if connected to a ns.vld, to fill target'
1079 vnf_cp
= find_in_list(
1080 vnfd
.get("int-virtual-link-desc", ()),
1081 lambda cpd
: cpd
.get("id") == vld
["id"],
1084 ns_cp
= "member_vnf:{}.{}".format(
1085 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1087 if cp2target
.get(ns_cp
):
1088 vld
["target"] = cp2target
[ns_cp
]
1091 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1093 # check if this network needs SDN assist
1095 if vld
.get("pci-interfaces"):
1096 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1097 sdnc_id
= db_vim
["config"].get("sdn-controller")
1099 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1100 target_sdn
= "sdn:{}".format(sdnc_id
)
1101 vld
["vim_info"][target_sdn
] = {
1103 "target_vim": target_vim
,
1105 "type": vld
.get("type"),
1108 # check at vnfd descriptor, if there is an ip-profile
1110 vnfd_vlp
= find_in_list(
1111 get_virtual_link_profiles(vnfd
),
1112 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1116 and vnfd_vlp
.get("virtual-link-protocol-data")
1117 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1119 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1122 ip_profile_dest_data
= {}
1123 if "ip-version" in ip_profile_source_data
:
1124 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1127 if "cidr" in ip_profile_source_data
:
1128 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1131 if "gateway-ip" in ip_profile_source_data
:
1132 ip_profile_dest_data
[
1134 ] = ip_profile_source_data
["gateway-ip"]
1135 if "dhcp-enabled" in ip_profile_source_data
:
1136 ip_profile_dest_data
["dhcp-params"] = {
1137 "enabled": ip_profile_source_data
["dhcp-enabled"]
1140 vld_params
["ip-profile"] = ip_profile_dest_data
1141 # update vld_params with instantiation params
1143 vld_instantiation_params
= find_in_list(
1144 get_iterable(vnf_params
, "internal-vld"),
1145 lambda i_vld
: i_vld
["name"] == vld
["id"],
1147 if vld_instantiation_params
:
1148 vld_params
.update(vld_instantiation_params
)
1149 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1152 for vdur
in target_vnf
.get("vdur", ()):
1153 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1154 continue # This vdu must not be created
1155 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1157 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1160 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1161 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1164 and vdu_configuration
.get("config-access")
1165 and vdu_configuration
.get("config-access").get("ssh-access")
1167 vdur
["ssh-keys"] = ssh_keys_all
1168 vdur
["ssh-access-required"] = vdu_configuration
[
1170 ]["ssh-access"]["required"]
1173 and vnf_configuration
.get("config-access")
1174 and vnf_configuration
.get("config-access").get("ssh-access")
1175 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1177 vdur
["ssh-keys"] = ssh_keys_all
1178 vdur
["ssh-access-required"] = vnf_configuration
[
1180 ]["ssh-access"]["required"]
1181 elif ssh_keys_instantiation
and find_in_list(
1182 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1184 vdur
["ssh-keys"] = ssh_keys_instantiation
1186 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1188 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1190 if vdud
.get("cloud-init-file"):
1191 vdur
["cloud-init"] = "{}:file:{}".format(
1192 vnfd
["_id"], vdud
.get("cloud-init-file")
1194 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1195 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1196 base_folder
= vnfd
["_admin"]["storage"]
1197 if base_folder
["pkg-dir"]:
1198 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1199 base_folder
["folder"],
1200 base_folder
["pkg-dir"],
1201 vdud
.get("cloud-init-file"),
1204 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1205 base_folder
["folder"],
1206 vdud
.get("cloud-init-file"),
1208 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1209 target
["cloud_init_content"][
1212 elif vdud
.get("cloud-init"):
1213 vdur
["cloud-init"] = "{}:vdu:{}".format(
1214 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1216 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1217 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1220 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1221 deploy_params_vdu
= self
._format
_additional
_params
(
1222 vdur
.get("additionalParams") or {}
1224 deploy_params_vdu
["OSM"] = get_osm_params(
1225 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1227 vdur
["additionalParams"] = deploy_params_vdu
1230 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1231 if target_vim
not in ns_flavor
["vim_info"]:
1232 ns_flavor
["vim_info"][target_vim
] = {}
1235 # in case alternative images are provided we must check if they should be applied
1236 # for the vim_type, modify the vim_type taking into account
1237 ns_image_id
= int(vdur
["ns-image-id"])
1238 if vdur
.get("alt-image-ids"):
1239 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1240 vim_type
= db_vim
["vim_type"]
1241 for alt_image_id
in vdur
.get("alt-image-ids"):
1242 ns_alt_image
= target
["image"][int(alt_image_id
)]
1243 if vim_type
== ns_alt_image
.get("vim-type"):
1244 # must use alternative image
1246 "use alternative image id: {}".format(alt_image_id
)
1248 ns_image_id
= alt_image_id
1249 vdur
["ns-image-id"] = ns_image_id
1251 ns_image
= target
["image"][int(ns_image_id
)]
1252 if target_vim
not in ns_image
["vim_info"]:
1253 ns_image
["vim_info"][target_vim
] = {}
1256 if vdur
.get("affinity-or-anti-affinity-group-id"):
1257 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1258 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1259 if target_vim
not in ns_ags
["vim_info"]:
1260 ns_ags
["vim_info"][target_vim
] = {}
1262 vdur
["vim_info"] = {target_vim
: {}}
1263 # instantiation parameters
1265 vdu_instantiation_params
= find_in_list(
1266 get_iterable(vnf_params
, "vdu"),
1267 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1269 if vdu_instantiation_params
:
1270 # Parse the vdu_volumes from the instantiation params
1271 vdu_volumes
= get_volumes_from_instantiation_params(
1272 vdu_instantiation_params
, vdud
1274 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1275 vdur_list
.append(vdur
)
1276 target_vnf
["vdur"] = vdur_list
1277 target
["vnf"].append(target_vnf
)
1279 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1280 desc
= await self
.RO
.deploy(nsr_id
, target
)
1281 self
.logger
.debug("RO return > {}".format(desc
))
1282 action_id
= desc
["action_id"]
1283 await self
._wait
_ng
_ro
(
1284 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1285 operation
="instantiation"
1290 "_admin.deployed.RO.operational-status": "running",
1291 "detailed-status": " ".join(stage
),
1293 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1294 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1295 self
._write
_op
_status
(nslcmop_id
, stage
)
1297 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1301 async def _wait_ng_ro(
1311 detailed_status_old
= None
1313 start_time
= start_time
or time()
1314 while time() <= start_time
+ timeout
:
1315 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1316 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1317 if desc_status
["status"] == "FAILED":
1318 raise NgRoException(desc_status
["details"])
1319 elif desc_status
["status"] == "BUILD":
1321 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1322 elif desc_status
["status"] == "DONE":
1324 stage
[2] = "Deployed at VIM"
1327 assert False, "ROclient.check_ns_status returns unknown {}".format(
1328 desc_status
["status"]
1330 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1331 detailed_status_old
= stage
[2]
1332 db_nsr_update
["detailed-status"] = " ".join(stage
)
1333 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1334 self
._write
_op
_status
(nslcmop_id
, stage
)
1335 await asyncio
.sleep(15, loop
=self
.loop
)
1336 else: # timeout_ns_deploy
1337 raise NgRoException("Timeout waiting ns to deploy")
1339 async def _terminate_ng_ro(
1340 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1345 start_deploy
= time()
1352 "action_id": nslcmop_id
,
1354 desc
= await self
.RO
.deploy(nsr_id
, target
)
1355 action_id
= desc
["action_id"]
1356 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1357 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1360 + "ns terminate action at RO. action_id={}".format(action_id
)
1364 delete_timeout
= 20 * 60 # 20 minutes
1365 await self
._wait
_ng
_ro
(
1366 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1367 operation
="termination"
1370 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1371 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1373 await self
.RO
.delete(nsr_id
)
1374 except Exception as e
:
1375 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1376 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1377 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1378 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1380 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1382 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1383 failed_detail
.append("delete conflict: {}".format(e
))
1386 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1389 failed_detail
.append("delete error: {}".format(e
))
1392 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1396 stage
[2] = "Error deleting from VIM"
1398 stage
[2] = "Deleted from VIM"
1399 db_nsr_update
["detailed-status"] = " ".join(stage
)
1400 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1401 self
._write
_op
_status
(nslcmop_id
, stage
)
1404 raise LcmException("; ".join(failed_detail
))
1407 async def instantiate_RO(
1421 :param logging_text: preffix text to use at logging
1422 :param nsr_id: nsr identity
1423 :param nsd: database content of ns descriptor
1424 :param db_nsr: database content of ns record
1425 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1427 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1428 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1429 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1430 :return: None or exception
1433 start_deploy
= time()
1434 ns_params
= db_nslcmop
.get("operationParams")
1435 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1436 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1438 timeout_ns_deploy
= self
.timeout
.get(
1439 "ns_deploy", self
.timeout_ns_deploy
1442 # Check for and optionally request placement optimization. Database will be updated if placement activated
1443 stage
[2] = "Waiting for Placement."
1444 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1445 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1446 for vnfr
in db_vnfrs
.values():
1447 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1450 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1452 return await self
._instantiate
_ng
_ro
(
1465 except Exception as e
:
1466 stage
[2] = "ERROR deploying at VIM"
1467 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1469 "Error deploying at VIM {}".format(e
),
1470 exc_info
=not isinstance(
1473 ROclient
.ROClientException
,
1482 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1484 Wait for kdu to be up, get ip address
1485 :param logging_text: prefix use for logging
1489 :return: IP address, K8s services
1492 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1495 while nb_tries
< 360:
1496 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1500 for x
in get_iterable(db_vnfr
, "kdur")
1501 if x
.get("kdu-name") == kdu_name
1507 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1509 if kdur
.get("status"):
1510 if kdur
["status"] in ("READY", "ENABLED"):
1511 return kdur
.get("ip-address"), kdur
.get("services")
1514 "target KDU={} is in error state".format(kdu_name
)
1517 await asyncio
.sleep(10, loop
=self
.loop
)
1519 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1521 async def wait_vm_up_insert_key_ro(
1522 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1525 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1526 :param logging_text: prefix use for logging
1531 :param pub_key: public ssh key to inject, None to skip
1532 :param user: user to apply the public ssh key
1536 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1540 target_vdu_id
= None
1546 if ro_retries
>= 360: # 1 hour
1548 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1551 await asyncio
.sleep(10, loop
=self
.loop
)
1554 if not target_vdu_id
:
1555 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1557 if not vdu_id
: # for the VNF case
1558 if db_vnfr
.get("status") == "ERROR":
1560 "Cannot inject ssh-key because target VNF is in error state"
1562 ip_address
= db_vnfr
.get("ip-address")
1568 for x
in get_iterable(db_vnfr
, "vdur")
1569 if x
.get("ip-address") == ip_address
1577 for x
in get_iterable(db_vnfr
, "vdur")
1578 if x
.get("vdu-id-ref") == vdu_id
1579 and x
.get("count-index") == vdu_index
1585 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1586 ): # If only one, this should be the target vdu
1587 vdur
= db_vnfr
["vdur"][0]
1590 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1591 vnfr_id
, vdu_id
, vdu_index
1594 # New generation RO stores information at "vim_info"
1597 if vdur
.get("vim_info"):
1599 t
for t
in vdur
["vim_info"]
1600 ) # there should be only one key
1601 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1603 vdur
.get("pdu-type")
1604 or vdur
.get("status") == "ACTIVE"
1605 or ng_ro_status
== "ACTIVE"
1607 ip_address
= vdur
.get("ip-address")
1610 target_vdu_id
= vdur
["vdu-id-ref"]
1611 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1613 "Cannot inject ssh-key because target VM is in error state"
1616 if not target_vdu_id
:
1619 # inject public key into machine
1620 if pub_key
and user
:
1621 self
.logger
.debug(logging_text
+ "Inserting RO key")
1622 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1623 if vdur
.get("pdu-type"):
1624 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1627 ro_vm_id
= "{}-{}".format(
1628 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1629 ) # TODO add vdu_index
1633 "action": "inject_ssh_key",
1637 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1639 desc
= await self
.RO
.deploy(nsr_id
, target
)
1640 action_id
= desc
["action_id"]
1641 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1644 # wait until NS is deployed at RO
1646 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1647 ro_nsr_id
= deep_get(
1648 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1652 result_dict
= await self
.RO
.create_action(
1654 item_id_name
=ro_nsr_id
,
1656 "add_public_key": pub_key
,
1661 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1662 if not result_dict
or not isinstance(result_dict
, dict):
1664 "Unknown response from RO when injecting key"
1666 for result
in result_dict
.values():
1667 if result
.get("vim_result") == 200:
1670 raise ROclient
.ROClientException(
1671 "error injecting key: {}".format(
1672 result
.get("description")
1676 except NgRoException
as e
:
1678 "Reaching max tries injecting key. Error: {}".format(e
)
1680 except ROclient
.ROClientException
as e
:
1684 + "error injecting key: {}. Retrying until {} seconds".format(
1691 "Reaching max tries injecting key. Error: {}".format(e
)
1698 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1700 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1702 my_vca
= vca_deployed_list
[vca_index
]
1703 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1704 # vdu or kdu: no dependencies
1708 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1709 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1710 configuration_status_list
= db_nsr
["configurationStatus"]
1711 for index
, vca_deployed
in enumerate(configuration_status_list
):
1712 if index
== vca_index
:
1715 if not my_vca
.get("member-vnf-index") or (
1716 vca_deployed
.get("member-vnf-index")
1717 == my_vca
.get("member-vnf-index")
1719 internal_status
= configuration_status_list
[index
].get("status")
1720 if internal_status
== "READY":
1722 elif internal_status
== "BROKEN":
1724 "Configuration aborted because dependent charm/s has failed"
1729 # no dependencies, return
1731 await asyncio
.sleep(10)
1734 raise LcmException("Configuration aborted because dependent charm/s timeout")
1736 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1739 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1741 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1742 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1745 async def instantiate_N2VC(
1762 ee_config_descriptor
,
1764 nsr_id
= db_nsr
["_id"]
1765 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1766 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1767 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1768 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1770 "collection": "nsrs",
1771 "filter": {"_id": nsr_id
},
1772 "path": db_update_entry
,
1778 element_under_configuration
= nsr_id
1782 vnfr_id
= db_vnfr
["_id"]
1783 osm_config
["osm"]["vnf_id"] = vnfr_id
1785 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1787 if vca_type
== "native_charm":
1790 index_number
= vdu_index
or 0
1793 element_type
= "VNF"
1794 element_under_configuration
= vnfr_id
1795 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1797 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1798 element_type
= "VDU"
1799 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1800 osm_config
["osm"]["vdu_id"] = vdu_id
1802 namespace
+= ".{}".format(kdu_name
)
1803 element_type
= "KDU"
1804 element_under_configuration
= kdu_name
1805 osm_config
["osm"]["kdu_name"] = kdu_name
1808 if base_folder
["pkg-dir"]:
1809 artifact_path
= "{}/{}/{}/{}".format(
1810 base_folder
["folder"],
1811 base_folder
["pkg-dir"],
1814 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1819 artifact_path
= "{}/Scripts/{}/{}/".format(
1820 base_folder
["folder"],
1823 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1828 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1830 # get initial_config_primitive_list that applies to this element
1831 initial_config_primitive_list
= config_descriptor
.get(
1832 "initial-config-primitive"
1836 "Initial config primitive list > {}".format(
1837 initial_config_primitive_list
1841 # add config if not present for NS charm
1842 ee_descriptor_id
= ee_config_descriptor
.get("id")
1843 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1844 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1845 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1849 "Initial config primitive list #2 > {}".format(
1850 initial_config_primitive_list
1853 # n2vc_redesign STEP 3.1
1854 # find old ee_id if exists
1855 ee_id
= vca_deployed
.get("ee_id")
1857 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1858 # create or register execution environment in VCA
1859 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1861 self
._write
_configuration
_status
(
1863 vca_index
=vca_index
,
1865 element_under_configuration
=element_under_configuration
,
1866 element_type
=element_type
,
1869 step
= "create execution environment"
1870 self
.logger
.debug(logging_text
+ step
)
1874 if vca_type
== "k8s_proxy_charm":
1875 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1876 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1877 namespace
=namespace
,
1878 artifact_path
=artifact_path
,
1882 elif vca_type
== "helm" or vca_type
== "helm-v3":
1883 ee_id
, credentials
= await self
.vca_map
[
1885 ].create_execution_environment(
1886 namespace
=namespace
,
1890 artifact_path
=artifact_path
,
1894 ee_id
, credentials
= await self
.vca_map
[
1896 ].create_execution_environment(
1897 namespace
=namespace
,
1903 elif vca_type
== "native_charm":
1904 step
= "Waiting to VM being up and getting IP address"
1905 self
.logger
.debug(logging_text
+ step
)
1906 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1915 credentials
= {"hostname": rw_mgmt_ip
}
1917 username
= deep_get(
1918 config_descriptor
, ("config-access", "ssh-access", "default-user")
1920 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1921 # merged. Meanwhile let's get username from initial-config-primitive
1922 if not username
and initial_config_primitive_list
:
1923 for config_primitive
in initial_config_primitive_list
:
1924 for param
in config_primitive
.get("parameter", ()):
1925 if param
["name"] == "ssh-username":
1926 username
= param
["value"]
1930 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1931 "'config-access.ssh-access.default-user'"
1933 credentials
["username"] = username
1934 # n2vc_redesign STEP 3.2
1936 self
._write
_configuration
_status
(
1938 vca_index
=vca_index
,
1939 status
="REGISTERING",
1940 element_under_configuration
=element_under_configuration
,
1941 element_type
=element_type
,
1944 step
= "register execution environment {}".format(credentials
)
1945 self
.logger
.debug(logging_text
+ step
)
1946 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1947 credentials
=credentials
,
1948 namespace
=namespace
,
1953 # for compatibility with MON/POL modules, the need model and application name at database
1954 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1955 ee_id_parts
= ee_id
.split(".")
1956 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1957 if len(ee_id_parts
) >= 2:
1958 model_name
= ee_id_parts
[0]
1959 application_name
= ee_id_parts
[1]
1960 db_nsr_update
[db_update_entry
+ "model"] = model_name
1961 db_nsr_update
[db_update_entry
+ "application"] = application_name
1963 # n2vc_redesign STEP 3.3
1964 step
= "Install configuration Software"
1966 self
._write
_configuration
_status
(
1968 vca_index
=vca_index
,
1969 status
="INSTALLING SW",
1970 element_under_configuration
=element_under_configuration
,
1971 element_type
=element_type
,
1972 other_update
=db_nsr_update
,
1975 # TODO check if already done
1976 self
.logger
.debug(logging_text
+ step
)
1978 if vca_type
== "native_charm":
1979 config_primitive
= next(
1980 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1983 if config_primitive
:
1984 config
= self
._map
_primitive
_params
(
1985 config_primitive
, {}, deploy_params
1988 if vca_type
== "lxc_proxy_charm":
1989 if element_type
== "NS":
1990 num_units
= db_nsr
.get("config-units") or 1
1991 elif element_type
== "VNF":
1992 num_units
= db_vnfr
.get("config-units") or 1
1993 elif element_type
== "VDU":
1994 for v
in db_vnfr
["vdur"]:
1995 if vdu_id
== v
["vdu-id-ref"]:
1996 num_units
= v
.get("config-units") or 1
1998 if vca_type
!= "k8s_proxy_charm":
1999 await self
.vca_map
[vca_type
].install_configuration_sw(
2001 artifact_path
=artifact_path
,
2004 num_units
=num_units
,
2009 # write in db flag of configuration_sw already installed
2011 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2014 # add relations for this VCA (wait for other peers related with this VCA)
2015 await self
._add
_vca
_relations
(
2016 logging_text
=logging_text
,
2019 vca_index
=vca_index
,
2022 # if SSH access is required, then get execution environment SSH public
2023 # if native charm we have waited already to VM be UP
2024 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2027 # self.logger.debug("get ssh key block")
2029 config_descriptor
, ("config-access", "ssh-access", "required")
2031 # self.logger.debug("ssh key needed")
2032 # Needed to inject a ssh key
2035 ("config-access", "ssh-access", "default-user"),
2037 step
= "Install configuration Software, getting public ssh key"
2038 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2039 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2042 step
= "Insert public key into VM user={} ssh_key={}".format(
2046 # self.logger.debug("no need to get ssh key")
2047 step
= "Waiting to VM being up and getting IP address"
2048 self
.logger
.debug(logging_text
+ step
)
2050 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2053 # n2vc_redesign STEP 5.1
2054 # wait for RO (ip-address) Insert pub_key into VM
2057 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2058 logging_text
, nsr_id
, vnfr_id
, kdu_name
2060 vnfd
= self
.db
.get_one(
2062 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2064 kdu
= get_kdu(vnfd
, kdu_name
)
2066 service
["name"] for service
in get_kdu_services(kdu
)
2068 exposed_services
= []
2069 for service
in services
:
2070 if any(s
in service
["name"] for s
in kdu_services
):
2071 exposed_services
.append(service
)
2072 await self
.vca_map
[vca_type
].exec_primitive(
2074 primitive_name
="config",
2076 "osm-config": json
.dumps(
2078 k8s
={"services": exposed_services
}
2085 # This verification is needed in order to avoid trying to add a public key
2086 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2087 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2088 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2090 elif db_vnfr
.get('vdur'):
2091 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2101 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2103 # store rw_mgmt_ip in deploy params for later replacement
2104 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2106 # n2vc_redesign STEP 6 Execute initial config primitive
2107 step
= "execute initial config primitive"
2109 # wait for dependent primitives execution (NS -> VNF -> VDU)
2110 if initial_config_primitive_list
:
2111 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2113 # stage, in function of element type: vdu, kdu, vnf or ns
2114 my_vca
= vca_deployed_list
[vca_index
]
2115 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2117 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2118 elif my_vca
.get("member-vnf-index"):
2120 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2123 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2125 self
._write
_configuration
_status
(
2126 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2129 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2131 check_if_terminated_needed
= True
2132 for initial_config_primitive
in initial_config_primitive_list
:
2133 # adding information on the vca_deployed if it is a NS execution environment
2134 if not vca_deployed
["member-vnf-index"]:
2135 deploy_params
["ns_config_info"] = json
.dumps(
2136 self
._get
_ns
_config
_info
(nsr_id
)
2138 # TODO check if already done
2139 primitive_params_
= self
._map
_primitive
_params
(
2140 initial_config_primitive
, {}, deploy_params
2143 step
= "execute primitive '{}' params '{}'".format(
2144 initial_config_primitive
["name"], primitive_params_
2146 self
.logger
.debug(logging_text
+ step
)
2147 await self
.vca_map
[vca_type
].exec_primitive(
2149 primitive_name
=initial_config_primitive
["name"],
2150 params_dict
=primitive_params_
,
2155 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2156 if check_if_terminated_needed
:
2157 if config_descriptor
.get("terminate-config-primitive"):
2159 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2161 check_if_terminated_needed
= False
2163 # TODO register in database that primitive is done
2165 # STEP 7 Configure metrics
2166 if vca_type
== "helm" or vca_type
== "helm-v3":
2167 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2169 artifact_path
=artifact_path
,
2170 ee_config_descriptor
=ee_config_descriptor
,
2173 target_ip
=rw_mgmt_ip
,
2179 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2182 for job
in prometheus_jobs
:
2185 {"job_name": job
["job_name"]},
2188 fail_on_empty
=False,
2191 step
= "instantiated at VCA"
2192 self
.logger
.debug(logging_text
+ step
)
2194 self
._write
_configuration
_status
(
2195 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2198 except Exception as e
: # TODO not use Exception but N2VC exception
2199 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2201 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2204 "Exception while {} : {}".format(step
, e
), exc_info
=True
2206 self
._write
_configuration
_status
(
2207 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2209 raise LcmException("{} {}".format(step
, e
)) from e
2211 def _write_ns_status(
2215 current_operation
: str,
2216 current_operation_id
: str,
2217 error_description
: str = None,
2218 error_detail
: str = None,
2219 other_update
: dict = None,
2222 Update db_nsr fields.
2225 :param current_operation:
2226 :param current_operation_id:
2227 :param error_description:
2228 :param error_detail:
2229 :param other_update: Other required changes at database if provided, will be cleared
2233 db_dict
= other_update
or {}
2236 ] = current_operation_id
# for backward compatibility
2237 db_dict
["_admin.current-operation"] = current_operation_id
2238 db_dict
["_admin.operation-type"] = (
2239 current_operation
if current_operation
!= "IDLE" else None
2241 db_dict
["currentOperation"] = current_operation
2242 db_dict
["currentOperationID"] = current_operation_id
2243 db_dict
["errorDescription"] = error_description
2244 db_dict
["errorDetail"] = error_detail
2247 db_dict
["nsState"] = ns_state
2248 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2249 except DbException
as e
:
2250 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2252 def _write_op_status(
2256 error_message
: str = None,
2257 queuePosition
: int = 0,
2258 operation_state
: str = None,
2259 other_update
: dict = None,
2262 db_dict
= other_update
or {}
2263 db_dict
["queuePosition"] = queuePosition
2264 if isinstance(stage
, list):
2265 db_dict
["stage"] = stage
[0]
2266 db_dict
["detailed-status"] = " ".join(stage
)
2267 elif stage
is not None:
2268 db_dict
["stage"] = str(stage
)
2270 if error_message
is not None:
2271 db_dict
["errorMessage"] = error_message
2272 if operation_state
is not None:
2273 db_dict
["operationState"] = operation_state
2274 db_dict
["statusEnteredTime"] = time()
2275 self
.update_db_2("nslcmops", op_id
, db_dict
)
2276 except DbException
as e
:
2278 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2281 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2283 nsr_id
= db_nsr
["_id"]
2284 # configurationStatus
2285 config_status
= db_nsr
.get("configurationStatus")
2288 "configurationStatus.{}.status".format(index
): status
2289 for index
, v
in enumerate(config_status
)
2293 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2295 except DbException
as e
:
2297 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2300 def _write_configuration_status(
2305 element_under_configuration
: str = None,
2306 element_type
: str = None,
2307 other_update
: dict = None,
2310 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2311 # .format(vca_index, status))
2314 db_path
= "configurationStatus.{}.".format(vca_index
)
2315 db_dict
= other_update
or {}
2317 db_dict
[db_path
+ "status"] = status
2318 if element_under_configuration
:
2320 db_path
+ "elementUnderConfiguration"
2321 ] = element_under_configuration
2323 db_dict
[db_path
+ "elementType"] = element_type
2324 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2325 except DbException
as e
:
2327 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2328 status
, nsr_id
, vca_index
, e
2332 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2334 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2335 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2336 Database is used because the result can be obtained from a different LCM worker in case of HA.
2337 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2338 :param db_nslcmop: database content of nslcmop
2339 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2340 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2341 computed 'vim-account-id'
2344 nslcmop_id
= db_nslcmop
["_id"]
2345 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2346 if placement_engine
== "PLA":
2348 logging_text
+ "Invoke and wait for placement optimization"
2350 await self
.msg
.aiowrite(
2351 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2353 db_poll_interval
= 5
2354 wait
= db_poll_interval
* 10
2356 while not pla_result
and wait
>= 0:
2357 await asyncio
.sleep(db_poll_interval
)
2358 wait
-= db_poll_interval
2359 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2360 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2364 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2367 for pla_vnf
in pla_result
["vnf"]:
2368 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2369 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2374 {"_id": vnfr
["_id"]},
2375 {"vim-account-id": pla_vnf
["vimAccountId"]},
2378 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2381 def update_nsrs_with_pla_result(self
, params
):
2383 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2385 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2387 except Exception as e
:
2388 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2390 async def instantiate(self
, nsr_id
, nslcmop_id
):
2393 :param nsr_id: ns instance to deploy
2394 :param nslcmop_id: operation to run
2398 # Try to lock HA task here
2399 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2400 if not task_is_locked_by_me
:
2402 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2406 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2407 self
.logger
.debug(logging_text
+ "Enter")
2409 # get all needed from database
2411 # database nsrs record
2414 # database nslcmops record
2417 # update operation on nsrs
2419 # update operation on nslcmops
2420 db_nslcmop_update
= {}
2422 nslcmop_operation_state
= None
2423 db_vnfrs
= {} # vnf's info indexed by member-index
2425 tasks_dict_info
= {} # from task to info text
2429 "Stage 1/5: preparation of the environment.",
2430 "Waiting for previous operations to terminate.",
2433 # ^ stage, step, VIM progress
2435 # wait for any previous tasks in process
2436 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2438 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2439 stage
[1] = "Reading from database."
2440 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2441 db_nsr_update
["detailed-status"] = "creating"
2442 db_nsr_update
["operational-status"] = "init"
2443 self
._write
_ns
_status
(
2445 ns_state
="BUILDING",
2446 current_operation
="INSTANTIATING",
2447 current_operation_id
=nslcmop_id
,
2448 other_update
=db_nsr_update
,
2450 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2452 # read from db: operation
2453 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2454 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2455 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2456 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2457 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2459 ns_params
= db_nslcmop
.get("operationParams")
2460 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2461 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2463 timeout_ns_deploy
= self
.timeout
.get(
2464 "ns_deploy", self
.timeout_ns_deploy
2468 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2469 self
.logger
.debug(logging_text
+ stage
[1])
2470 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2471 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2472 self
.logger
.debug(logging_text
+ stage
[1])
2473 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2474 self
.fs
.sync(db_nsr
["nsd-id"])
2476 # nsr_name = db_nsr["name"] # TODO short-name??
2478 # read from db: vnf's of this ns
2479 stage
[1] = "Getting vnfrs from db."
2480 self
.logger
.debug(logging_text
+ stage
[1])
2481 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2483 # read from db: vnfd's for every vnf
2484 db_vnfds
= [] # every vnfd data
2486 # for each vnf in ns, read vnfd
2487 for vnfr
in db_vnfrs_list
:
2488 if vnfr
.get("kdur"):
2490 for kdur
in vnfr
["kdur"]:
2491 if kdur
.get("additionalParams"):
2492 kdur
["additionalParams"] = json
.loads(
2493 kdur
["additionalParams"]
2495 kdur_list
.append(kdur
)
2496 vnfr
["kdur"] = kdur_list
2498 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2499 vnfd_id
= vnfr
["vnfd-id"]
2500 vnfd_ref
= vnfr
["vnfd-ref"]
2501 self
.fs
.sync(vnfd_id
)
2503 # if we haven't this vnfd, read it from db
2504 if vnfd_id
not in db_vnfds
:
2506 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2509 self
.logger
.debug(logging_text
+ stage
[1])
2510 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2513 db_vnfds
.append(vnfd
)
2515 # Get or generates the _admin.deployed.VCA list
2516 vca_deployed_list
= None
2517 if db_nsr
["_admin"].get("deployed"):
2518 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2519 if vca_deployed_list
is None:
2520 vca_deployed_list
= []
2521 configuration_status_list
= []
2522 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2523 db_nsr_update
["configurationStatus"] = configuration_status_list
2524 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2525 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2526 elif isinstance(vca_deployed_list
, dict):
2527 # maintain backward compatibility. Change a dict to list at database
2528 vca_deployed_list
= list(vca_deployed_list
.values())
2529 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2530 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2533 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2535 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2536 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2538 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2539 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2540 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2542 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2545 # n2vc_redesign STEP 2 Deploy Network Scenario
2546 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2547 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2549 stage
[1] = "Deploying KDUs."
2550 # self.logger.debug(logging_text + "Before deploy_kdus")
2551 # Call to deploy_kdus in case exists the "vdu:kdu" param
2552 await self
.deploy_kdus(
2553 logging_text
=logging_text
,
2555 nslcmop_id
=nslcmop_id
,
2558 task_instantiation_info
=tasks_dict_info
,
2561 stage
[1] = "Getting VCA public key."
2562 # n2vc_redesign STEP 1 Get VCA public ssh-key
2563 # feature 1429. Add n2vc public key to needed VMs
2564 n2vc_key
= self
.n2vc
.get_public_key()
2565 n2vc_key_list
= [n2vc_key
]
2566 if self
.vca_config
.get("public_key"):
2567 n2vc_key_list
.append(self
.vca_config
["public_key"])
2569 stage
[1] = "Deploying NS at VIM."
2570 task_ro
= asyncio
.ensure_future(
2571 self
.instantiate_RO(
2572 logging_text
=logging_text
,
2576 db_nslcmop
=db_nslcmop
,
2579 n2vc_key_list
=n2vc_key_list
,
2583 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2584 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2586 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2587 stage
[1] = "Deploying Execution Environments."
2588 self
.logger
.debug(logging_text
+ stage
[1])
2590 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2591 for vnf_profile
in get_vnf_profiles(nsd
):
2592 vnfd_id
= vnf_profile
["vnfd-id"]
2593 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2594 member_vnf_index
= str(vnf_profile
["id"])
2595 db_vnfr
= db_vnfrs
[member_vnf_index
]
2596 base_folder
= vnfd
["_admin"]["storage"]
2602 # Get additional parameters
2603 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2604 if db_vnfr
.get("additionalParamsForVnf"):
2605 deploy_params
.update(
2606 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2609 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2610 if descriptor_config
:
2612 logging_text
=logging_text
2613 + "member_vnf_index={} ".format(member_vnf_index
),
2616 nslcmop_id
=nslcmop_id
,
2622 member_vnf_index
=member_vnf_index
,
2623 vdu_index
=vdu_index
,
2625 deploy_params
=deploy_params
,
2626 descriptor_config
=descriptor_config
,
2627 base_folder
=base_folder
,
2628 task_instantiation_info
=tasks_dict_info
,
2632 # Deploy charms for each VDU that supports one.
2633 for vdud
in get_vdu_list(vnfd
):
2635 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2636 vdur
= find_in_list(
2637 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2640 if vdur
.get("additionalParams"):
2641 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2643 deploy_params_vdu
= deploy_params
2644 deploy_params_vdu
["OSM"] = get_osm_params(
2645 db_vnfr
, vdu_id
, vdu_count_index
=0
2647 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2649 self
.logger
.debug("VDUD > {}".format(vdud
))
2651 "Descriptor config > {}".format(descriptor_config
)
2653 if descriptor_config
:
2656 for vdu_index
in range(vdud_count
):
2657 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2659 logging_text
=logging_text
2660 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2661 member_vnf_index
, vdu_id
, vdu_index
2665 nslcmop_id
=nslcmop_id
,
2671 member_vnf_index
=member_vnf_index
,
2672 vdu_index
=vdu_index
,
2674 deploy_params
=deploy_params_vdu
,
2675 descriptor_config
=descriptor_config
,
2676 base_folder
=base_folder
,
2677 task_instantiation_info
=tasks_dict_info
,
2680 for kdud
in get_kdu_list(vnfd
):
2681 kdu_name
= kdud
["name"]
2682 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2683 if descriptor_config
:
2688 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2690 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2691 if kdur
.get("additionalParams"):
2692 deploy_params_kdu
.update(
2693 parse_yaml_strings(kdur
["additionalParams"].copy())
2697 logging_text
=logging_text
,
2700 nslcmop_id
=nslcmop_id
,
2706 member_vnf_index
=member_vnf_index
,
2707 vdu_index
=vdu_index
,
2709 deploy_params
=deploy_params_kdu
,
2710 descriptor_config
=descriptor_config
,
2711 base_folder
=base_folder
,
2712 task_instantiation_info
=tasks_dict_info
,
2716 # Check if this NS has a charm configuration
2717 descriptor_config
= nsd
.get("ns-configuration")
2718 if descriptor_config
and descriptor_config
.get("juju"):
2721 member_vnf_index
= None
2727 # Get additional parameters
2728 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2729 if db_nsr
.get("additionalParamsForNs"):
2730 deploy_params
.update(
2731 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2733 base_folder
= nsd
["_admin"]["storage"]
2735 logging_text
=logging_text
,
2738 nslcmop_id
=nslcmop_id
,
2744 member_vnf_index
=member_vnf_index
,
2745 vdu_index
=vdu_index
,
2747 deploy_params
=deploy_params
,
2748 descriptor_config
=descriptor_config
,
2749 base_folder
=base_folder
,
2750 task_instantiation_info
=tasks_dict_info
,
2754 # rest of staff will be done at finally
2757 ROclient
.ROClientException
,
2763 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2766 except asyncio
.CancelledError
:
2768 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2770 exc
= "Operation was cancelled"
2771 except Exception as e
:
2772 exc
= traceback
.format_exc()
2773 self
.logger
.critical(
2774 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2779 error_list
.append(str(exc
))
2781 # wait for pending tasks
2783 stage
[1] = "Waiting for instantiate pending tasks."
2784 self
.logger
.debug(logging_text
+ stage
[1])
2785 error_list
+= await self
._wait
_for
_tasks
(
2793 stage
[1] = stage
[2] = ""
2794 except asyncio
.CancelledError
:
2795 error_list
.append("Cancelled")
2796 # TODO cancel all tasks
2797 except Exception as exc
:
2798 error_list
.append(str(exc
))
2800 # update operation-status
2801 db_nsr_update
["operational-status"] = "running"
2802 # let's begin with VCA 'configured' status (later we can change it)
2803 db_nsr_update
["config-status"] = "configured"
2804 for task
, task_name
in tasks_dict_info
.items():
2805 if not task
.done() or task
.cancelled() or task
.exception():
2806 if task_name
.startswith(self
.task_name_deploy_vca
):
2807 # A N2VC task is pending
2808 db_nsr_update
["config-status"] = "failed"
2810 # RO or KDU task is pending
2811 db_nsr_update
["operational-status"] = "failed"
2813 # update status at database
2815 error_detail
= ". ".join(error_list
)
2816 self
.logger
.error(logging_text
+ error_detail
)
2817 error_description_nslcmop
= "{} Detail: {}".format(
2818 stage
[0], error_detail
2820 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2821 nslcmop_id
, stage
[0]
2824 db_nsr_update
["detailed-status"] = (
2825 error_description_nsr
+ " Detail: " + error_detail
2827 db_nslcmop_update
["detailed-status"] = error_detail
2828 nslcmop_operation_state
= "FAILED"
2832 error_description_nsr
= error_description_nslcmop
= None
2834 db_nsr_update
["detailed-status"] = "Done"
2835 db_nslcmop_update
["detailed-status"] = "Done"
2836 nslcmop_operation_state
= "COMPLETED"
2839 self
._write
_ns
_status
(
2842 current_operation
="IDLE",
2843 current_operation_id
=None,
2844 error_description
=error_description_nsr
,
2845 error_detail
=error_detail
,
2846 other_update
=db_nsr_update
,
2848 self
._write
_op
_status
(
2851 error_message
=error_description_nslcmop
,
2852 operation_state
=nslcmop_operation_state
,
2853 other_update
=db_nslcmop_update
,
2856 if nslcmop_operation_state
:
2858 await self
.msg
.aiowrite(
2863 "nslcmop_id": nslcmop_id
,
2864 "operationState": nslcmop_operation_state
,
2868 except Exception as e
:
2870 logging_text
+ "kafka_write notification Exception {}".format(e
)
2873 self
.logger
.debug(logging_text
+ "Exit")
2874 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2876 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2877 if vnfd_id
not in cached_vnfds
:
2878 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2879 return cached_vnfds
[vnfd_id
]
2881 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2882 if vnf_profile_id
not in cached_vnfrs
:
2883 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2886 "member-vnf-index-ref": vnf_profile_id
,
2887 "nsr-id-ref": nsr_id
,
2890 return cached_vnfrs
[vnf_profile_id
]
2892 def _is_deployed_vca_in_relation(
2893 self
, vca
: DeployedVCA
, relation
: Relation
2896 for endpoint
in (relation
.provider
, relation
.requirer
):
2897 if endpoint
["kdu-resource-profile-id"]:
2900 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2901 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2902 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2908 def _update_ee_relation_data_with_implicit_data(
2909 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2911 ee_relation_data
= safe_get_ee_relation(
2912 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2914 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2915 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2916 "execution-environment-ref"
2918 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2919 vnfd_id
= vnf_profile
["vnfd-id"]
2920 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2923 if ee_relation_level
== EELevel
.VNF
2924 else ee_relation_data
["vdu-profile-id"]
2926 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2929 f
"not execution environments found for ee_relation {ee_relation_data}"
2931 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2932 return ee_relation_data
2934 def _get_ns_relations(
2937 nsd
: Dict
[str, Any
],
2939 cached_vnfds
: Dict
[str, Any
],
2940 ) -> List
[Relation
]:
2942 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2943 for r
in db_ns_relations
:
2944 provider_dict
= None
2945 requirer_dict
= None
2946 if all(key
in r
for key
in ("provider", "requirer")):
2947 provider_dict
= r
["provider"]
2948 requirer_dict
= r
["requirer"]
2949 elif "entities" in r
:
2950 provider_id
= r
["entities"][0]["id"]
2953 "endpoint": r
["entities"][0]["endpoint"],
2955 if provider_id
!= nsd
["id"]:
2956 provider_dict
["vnf-profile-id"] = provider_id
2957 requirer_id
= r
["entities"][1]["id"]
2960 "endpoint": r
["entities"][1]["endpoint"],
2962 if requirer_id
!= nsd
["id"]:
2963 requirer_dict
["vnf-profile-id"] = requirer_id
2966 "provider/requirer or entities must be included in the relation."
2968 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2969 nsr_id
, nsd
, provider_dict
, cached_vnfds
2971 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2972 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2974 provider
= EERelation(relation_provider
)
2975 requirer
= EERelation(relation_requirer
)
2976 relation
= Relation(r
["name"], provider
, requirer
)
2977 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2979 relations
.append(relation
)
2982 def _get_vnf_relations(
2985 nsd
: Dict
[str, Any
],
2987 cached_vnfds
: Dict
[str, Any
],
2988 ) -> List
[Relation
]:
2990 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2991 vnf_profile_id
= vnf_profile
["id"]
2992 vnfd_id
= vnf_profile
["vnfd-id"]
2993 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2994 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2995 for r
in db_vnf_relations
:
2996 provider_dict
= None
2997 requirer_dict
= None
2998 if all(key
in r
for key
in ("provider", "requirer")):
2999 provider_dict
= r
["provider"]
3000 requirer_dict
= r
["requirer"]
3001 elif "entities" in r
:
3002 provider_id
= r
["entities"][0]["id"]
3005 "vnf-profile-id": vnf_profile_id
,
3006 "endpoint": r
["entities"][0]["endpoint"],
3008 if provider_id
!= vnfd_id
:
3009 provider_dict
["vdu-profile-id"] = provider_id
3010 requirer_id
= r
["entities"][1]["id"]
3013 "vnf-profile-id": vnf_profile_id
,
3014 "endpoint": r
["entities"][1]["endpoint"],
3016 if requirer_id
!= vnfd_id
:
3017 requirer_dict
["vdu-profile-id"] = requirer_id
3020 "provider/requirer or entities must be included in the relation."
3022 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3023 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3025 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3026 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3028 provider
= EERelation(relation_provider
)
3029 requirer
= EERelation(relation_requirer
)
3030 relation
= Relation(r
["name"], provider
, requirer
)
3031 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3033 relations
.append(relation
)
3036 def _get_kdu_resource_data(
3038 ee_relation
: EERelation
,
3039 db_nsr
: Dict
[str, Any
],
3040 cached_vnfds
: Dict
[str, Any
],
3041 ) -> DeployedK8sResource
:
3042 nsd
= get_nsd(db_nsr
)
3043 vnf_profiles
= get_vnf_profiles(nsd
)
3044 vnfd_id
= find_in_list(
3046 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3048 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3049 kdu_resource_profile
= get_kdu_resource_profile(
3050 db_vnfd
, ee_relation
.kdu_resource_profile_id
3052 kdu_name
= kdu_resource_profile
["kdu-name"]
3053 deployed_kdu
, _
= get_deployed_kdu(
3054 db_nsr
.get("_admin", ()).get("deployed", ()),
3056 ee_relation
.vnf_profile_id
,
3058 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3061 def _get_deployed_component(
3063 ee_relation
: EERelation
,
3064 db_nsr
: Dict
[str, Any
],
3065 cached_vnfds
: Dict
[str, Any
],
3066 ) -> DeployedComponent
:
3067 nsr_id
= db_nsr
["_id"]
3068 deployed_component
= None
3069 ee_level
= EELevel
.get_level(ee_relation
)
3070 if ee_level
== EELevel
.NS
:
3071 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3073 deployed_component
= DeployedVCA(nsr_id
, vca
)
3074 elif ee_level
== EELevel
.VNF
:
3075 vca
= get_deployed_vca(
3079 "member-vnf-index": ee_relation
.vnf_profile_id
,
3080 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3084 deployed_component
= DeployedVCA(nsr_id
, vca
)
3085 elif ee_level
== EELevel
.VDU
:
3086 vca
= get_deployed_vca(
3089 "vdu_id": ee_relation
.vdu_profile_id
,
3090 "member-vnf-index": ee_relation
.vnf_profile_id
,
3091 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3095 deployed_component
= DeployedVCA(nsr_id
, vca
)
3096 elif ee_level
== EELevel
.KDU
:
3097 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3098 ee_relation
, db_nsr
, cached_vnfds
3100 if kdu_resource_data
:
3101 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3102 return deployed_component
3104 async def _add_relation(
3108 db_nsr
: Dict
[str, Any
],
3109 cached_vnfds
: Dict
[str, Any
],
3110 cached_vnfrs
: Dict
[str, Any
],
3112 deployed_provider
= self
._get
_deployed
_component
(
3113 relation
.provider
, db_nsr
, cached_vnfds
3115 deployed_requirer
= self
._get
_deployed
_component
(
3116 relation
.requirer
, db_nsr
, cached_vnfds
3120 and deployed_requirer
3121 and deployed_provider
.config_sw_installed
3122 and deployed_requirer
.config_sw_installed
3124 provider_db_vnfr
= (
3126 relation
.provider
.nsr_id
,
3127 relation
.provider
.vnf_profile_id
,
3130 if relation
.provider
.vnf_profile_id
3133 requirer_db_vnfr
= (
3135 relation
.requirer
.nsr_id
,
3136 relation
.requirer
.vnf_profile_id
,
3139 if relation
.requirer
.vnf_profile_id
3142 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3143 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3144 provider_relation_endpoint
= RelationEndpoint(
3145 deployed_provider
.ee_id
,
3147 relation
.provider
.endpoint
,
3149 requirer_relation_endpoint
= RelationEndpoint(
3150 deployed_requirer
.ee_id
,
3152 relation
.requirer
.endpoint
,
3154 await self
.vca_map
[vca_type
].add_relation(
3155 provider
=provider_relation_endpoint
,
3156 requirer
=requirer_relation_endpoint
,
3158 # remove entry from relations list
3162 async def _add_vca_relations(
3168 timeout
: int = 3600,
3172 # 1. find all relations for this VCA
3173 # 2. wait for other peers related
3177 # STEP 1: find all relations for this VCA
3180 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3181 nsd
= get_nsd(db_nsr
)
3184 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3185 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3190 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3191 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3193 # if no relations, terminate
3195 self
.logger
.debug(logging_text
+ " No relations")
3198 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3205 if now
- start
>= timeout
:
3206 self
.logger
.error(logging_text
+ " : timeout adding relations")
3209 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3210 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3212 # for each relation, find the VCA's related
3213 for relation
in relations
.copy():
3214 added
= await self
._add
_relation
(
3222 relations
.remove(relation
)
3225 self
.logger
.debug("Relations added")
3227 await asyncio
.sleep(5.0)
3231 except Exception as e
:
3232 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3235 async def _install_kdu(
3243 k8s_instance_info
: dict,
3244 k8params
: dict = None,
3250 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3253 "collection": "nsrs",
3254 "filter": {"_id": nsr_id
},
3255 "path": nsr_db_path
,
3258 if k8s_instance_info
.get("kdu-deployment-name"):
3259 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3261 kdu_instance
= self
.k8scluster_map
[
3263 ].generate_kdu_instance_name(
3264 db_dict
=db_dict_install
,
3265 kdu_model
=k8s_instance_info
["kdu-model"],
3266 kdu_name
=k8s_instance_info
["kdu-name"],
3269 # Update the nsrs table with the kdu-instance value
3273 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3276 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3277 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3278 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3279 # namespace, this first verification could be removed, and the next step would be done for any kind
3281 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3282 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3283 if k8sclustertype
in ("juju", "juju-bundle"):
3284 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3285 # that the user passed a namespace which he wants its KDU to be deployed in)
3291 "_admin.projects_write": k8s_instance_info
["namespace"],
3292 "_admin.projects_read": k8s_instance_info
["namespace"],
3298 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3303 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3305 k8s_instance_info
["namespace"] = kdu_instance
3307 await self
.k8scluster_map
[k8sclustertype
].install(
3308 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3309 kdu_model
=k8s_instance_info
["kdu-model"],
3312 db_dict
=db_dict_install
,
3314 kdu_name
=k8s_instance_info
["kdu-name"],
3315 namespace
=k8s_instance_info
["namespace"],
3316 kdu_instance
=kdu_instance
,
3320 # Obtain services to obtain management service ip
3321 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3322 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3323 kdu_instance
=kdu_instance
,
3324 namespace
=k8s_instance_info
["namespace"],
3327 # Obtain management service info (if exists)
3328 vnfr_update_dict
= {}
3329 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3331 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3336 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3339 for service
in kdud
.get("service", [])
3340 if service
.get("mgmt-service")
3342 for mgmt_service
in mgmt_services
:
3343 for service
in services
:
3344 if service
["name"].startswith(mgmt_service
["name"]):
3345 # Mgmt service found, Obtain service ip
3346 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3347 if isinstance(ip
, list) and len(ip
) == 1:
3351 "kdur.{}.ip-address".format(kdu_index
)
3354 # Check if must update also mgmt ip at the vnf
3355 service_external_cp
= mgmt_service
.get(
3356 "external-connection-point-ref"
3358 if service_external_cp
:
3360 deep_get(vnfd
, ("mgmt-interface", "cp"))
3361 == service_external_cp
3363 vnfr_update_dict
["ip-address"] = ip
3368 "external-connection-point-ref", ""
3370 == service_external_cp
,
3373 "kdur.{}.ip-address".format(kdu_index
)
3378 "Mgmt service name: {} not found".format(
3379 mgmt_service
["name"]
3383 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3384 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3386 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3389 and kdu_config
.get("initial-config-primitive")
3390 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3392 initial_config_primitive_list
= kdu_config
.get(
3393 "initial-config-primitive"
3395 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3397 for initial_config_primitive
in initial_config_primitive_list
:
3398 primitive_params_
= self
._map
_primitive
_params
(
3399 initial_config_primitive
, {}, {}
3402 await asyncio
.wait_for(
3403 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3404 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3405 kdu_instance
=kdu_instance
,
3406 primitive_name
=initial_config_primitive
["name"],
3407 params
=primitive_params_
,
3408 db_dict
=db_dict_install
,
3414 except Exception as e
:
3415 # Prepare update db with error and raise exception
3418 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3422 vnfr_data
.get("_id"),
3423 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3426 # ignore to keep original exception
3428 # reraise original error
3433 async def deploy_kdus(
3440 task_instantiation_info
,
3442 # Launch kdus if present in the descriptor
3444 k8scluster_id_2_uuic
= {
3445 "helm-chart-v3": {},
3450 async def _get_cluster_id(cluster_id
, cluster_type
):
3451 nonlocal k8scluster_id_2_uuic
3452 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3453 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3455 # check if K8scluster is creating and wait look if previous tasks in process
3456 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3457 "k8scluster", cluster_id
3460 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3461 task_name
, cluster_id
3463 self
.logger
.debug(logging_text
+ text
)
3464 await asyncio
.wait(task_dependency
, timeout
=3600)
3466 db_k8scluster
= self
.db
.get_one(
3467 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3469 if not db_k8scluster
:
3470 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3472 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3474 if cluster_type
== "helm-chart-v3":
3476 # backward compatibility for existing clusters that have not been initialized for helm v3
3477 k8s_credentials
= yaml
.safe_dump(
3478 db_k8scluster
.get("credentials")
3480 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3481 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3483 db_k8scluster_update
= {}
3484 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3485 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3486 db_k8scluster_update
[
3487 "_admin.helm-chart-v3.created"
3489 db_k8scluster_update
[
3490 "_admin.helm-chart-v3.operationalState"
3493 "k8sclusters", cluster_id
, db_k8scluster_update
3495 except Exception as e
:
3498 + "error initializing helm-v3 cluster: {}".format(str(e
))
3501 "K8s cluster '{}' has not been initialized for '{}'".format(
3502 cluster_id
, cluster_type
3507 "K8s cluster '{}' has not been initialized for '{}'".format(
3508 cluster_id
, cluster_type
3511 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3514 logging_text
+= "Deploy kdus: "
3517 db_nsr_update
= {"_admin.deployed.K8s": []}
3518 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3521 updated_cluster_list
= []
3522 updated_v3_cluster_list
= []
3524 for vnfr_data
in db_vnfrs
.values():
3525 vca_id
= self
.get_vca_id(vnfr_data
, {})
3526 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3527 # Step 0: Prepare and set parameters
3528 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3529 vnfd_id
= vnfr_data
.get("vnfd-id")
3530 vnfd_with_id
= find_in_list(
3531 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3535 for kdud
in vnfd_with_id
["kdu"]
3536 if kdud
["name"] == kdur
["kdu-name"]
3538 namespace
= kdur
.get("k8s-namespace")
3539 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3540 if kdur
.get("helm-chart"):
3541 kdumodel
= kdur
["helm-chart"]
3542 # Default version: helm3, if helm-version is v2 assign v2
3543 k8sclustertype
= "helm-chart-v3"
3544 self
.logger
.debug("kdur: {}".format(kdur
))
3546 kdur
.get("helm-version")
3547 and kdur
.get("helm-version") == "v2"
3549 k8sclustertype
= "helm-chart"
3550 elif kdur
.get("juju-bundle"):
3551 kdumodel
= kdur
["juju-bundle"]
3552 k8sclustertype
= "juju-bundle"
3555 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3556 "juju-bundle. Maybe an old NBI version is running".format(
3557 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3560 # check if kdumodel is a file and exists
3562 vnfd_with_id
= find_in_list(
3563 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3565 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3566 if storage
: # may be not present if vnfd has not artifacts
3567 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3568 if storage
["pkg-dir"]:
3569 filename
= "{}/{}/{}s/{}".format(
3576 filename
= "{}/Scripts/{}s/{}".format(
3581 if self
.fs
.file_exists(
3582 filename
, mode
="file"
3583 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3584 kdumodel
= self
.fs
.path
+ filename
3585 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3587 except Exception: # it is not a file
3590 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3591 step
= "Synchronize repos for k8s cluster '{}'".format(
3594 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3598 k8sclustertype
== "helm-chart"
3599 and cluster_uuid
not in updated_cluster_list
3601 k8sclustertype
== "helm-chart-v3"
3602 and cluster_uuid
not in updated_v3_cluster_list
3604 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3605 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3606 cluster_uuid
=cluster_uuid
3609 if del_repo_list
or added_repo_dict
:
3610 if k8sclustertype
== "helm-chart":
3612 "_admin.helm_charts_added." + item
: None
3613 for item
in del_repo_list
3616 "_admin.helm_charts_added." + item
: name
3617 for item
, name
in added_repo_dict
.items()
3619 updated_cluster_list
.append(cluster_uuid
)
3620 elif k8sclustertype
== "helm-chart-v3":
3622 "_admin.helm_charts_v3_added." + item
: None
3623 for item
in del_repo_list
3626 "_admin.helm_charts_v3_added." + item
: name
3627 for item
, name
in added_repo_dict
.items()
3629 updated_v3_cluster_list
.append(cluster_uuid
)
3631 logging_text
+ "repos synchronized on k8s cluster "
3632 "'{}' to_delete: {}, to_add: {}".format(
3633 k8s_cluster_id
, del_repo_list
, added_repo_dict
3638 {"_id": k8s_cluster_id
},
3644 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3645 vnfr_data
["member-vnf-index-ref"],
3649 k8s_instance_info
= {
3650 "kdu-instance": None,
3651 "k8scluster-uuid": cluster_uuid
,
3652 "k8scluster-type": k8sclustertype
,
3653 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3654 "kdu-name": kdur
["kdu-name"],
3655 "kdu-model": kdumodel
,
3656 "namespace": namespace
,
3657 "kdu-deployment-name": kdu_deployment_name
,
3659 db_path
= "_admin.deployed.K8s.{}".format(index
)
3660 db_nsr_update
[db_path
] = k8s_instance_info
3661 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3662 vnfd_with_id
= find_in_list(
3663 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3665 task
= asyncio
.ensure_future(
3674 k8params
=desc_params
,
3679 self
.lcm_tasks
.register(
3683 "instantiate_KDU-{}".format(index
),
3686 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3692 except (LcmException
, asyncio
.CancelledError
):
3694 except Exception as e
:
3695 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3696 if isinstance(e
, (N2VCException
, DbException
)):
3697 self
.logger
.error(logging_text
+ msg
)
3699 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3700 raise LcmException(msg
)
3703 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3722 task_instantiation_info
,
3725 # launch instantiate_N2VC in a asyncio task and register task object
3726 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3727 # if not found, create one entry and update database
3728 # fill db_nsr._admin.deployed.VCA.<index>
3731 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3733 if "execution-environment-list" in descriptor_config
:
3734 ee_list
= descriptor_config
.get("execution-environment-list", [])
3735 elif "juju" in descriptor_config
:
3736 ee_list
= [descriptor_config
] # ns charms
3737 else: # other types as script are not supported
3740 for ee_item
in ee_list
:
3743 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3744 ee_item
.get("juju"), ee_item
.get("helm-chart")
3747 ee_descriptor_id
= ee_item
.get("id")
3748 if ee_item
.get("juju"):
3749 vca_name
= ee_item
["juju"].get("charm")
3752 if ee_item
["juju"].get("charm") is not None
3755 if ee_item
["juju"].get("cloud") == "k8s":
3756 vca_type
= "k8s_proxy_charm"
3757 elif ee_item
["juju"].get("proxy") is False:
3758 vca_type
= "native_charm"
3759 elif ee_item
.get("helm-chart"):
3760 vca_name
= ee_item
["helm-chart"]
3761 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3764 vca_type
= "helm-v3"
3767 logging_text
+ "skipping non juju neither charm configuration"
3772 for vca_index
, vca_deployed
in enumerate(
3773 db_nsr
["_admin"]["deployed"]["VCA"]
3775 if not vca_deployed
:
3778 vca_deployed
.get("member-vnf-index") == member_vnf_index
3779 and vca_deployed
.get("vdu_id") == vdu_id
3780 and vca_deployed
.get("kdu_name") == kdu_name
3781 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3782 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3786 # not found, create one.
3788 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3791 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3793 target
+= "/kdu/{}".format(kdu_name
)
3795 "target_element": target
,
3796 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3797 "member-vnf-index": member_vnf_index
,
3799 "kdu_name": kdu_name
,
3800 "vdu_count_index": vdu_index
,
3801 "operational-status": "init", # TODO revise
3802 "detailed-status": "", # TODO revise
3803 "step": "initial-deploy", # TODO revise
3805 "vdu_name": vdu_name
,
3807 "ee_descriptor_id": ee_descriptor_id
,
3811 # create VCA and configurationStatus in db
3813 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3814 "configurationStatus.{}".format(vca_index
): dict(),
3816 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3818 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3820 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3821 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3822 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3825 task_n2vc
= asyncio
.ensure_future(
3826 self
.instantiate_N2VC(
3827 logging_text
=logging_text
,
3828 vca_index
=vca_index
,
3834 vdu_index
=vdu_index
,
3835 deploy_params
=deploy_params
,
3836 config_descriptor
=descriptor_config
,
3837 base_folder
=base_folder
,
3838 nslcmop_id
=nslcmop_id
,
3842 ee_config_descriptor
=ee_item
,
3845 self
.lcm_tasks
.register(
3849 "instantiate_N2VC-{}".format(vca_index
),
3852 task_instantiation_info
[
3854 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3855 member_vnf_index
or "", vdu_id
or ""
3859 def _create_nslcmop(nsr_id
, operation
, params
):
3861 Creates a ns-lcm-opp content to be stored at database.
3862 :param nsr_id: internal id of the instance
3863 :param operation: instantiate, terminate, scale, action, ...
3864 :param params: user parameters for the operation
3865 :return: dictionary following SOL005 format
3867 # Raise exception if invalid arguments
3868 if not (nsr_id
and operation
and params
):
3870 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3877 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3878 "operationState": "PROCESSING",
3879 "statusEnteredTime": now
,
3880 "nsInstanceId": nsr_id
,
3881 "lcmOperationType": operation
,
3883 "isAutomaticInvocation": False,
3884 "operationParams": params
,
3885 "isCancelPending": False,
3887 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3888 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3893 def _format_additional_params(self
, params
):
3894 params
= params
or {}
3895 for key
, value
in params
.items():
3896 if str(value
).startswith("!!yaml "):
3897 params
[key
] = yaml
.safe_load(value
[7:])
3900 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3901 primitive
= seq
.get("name")
3902 primitive_params
= {}
3904 "member_vnf_index": vnf_index
,
3905 "primitive": primitive
,
3906 "primitive_params": primitive_params
,
3909 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3913 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3914 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3915 if op
.get("operationState") == "COMPLETED":
3916 # b. Skip sub-operation
3917 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3918 return self
.SUBOPERATION_STATUS_SKIP
3920 # c. retry executing sub-operation
3921 # The sub-operation exists, and operationState != 'COMPLETED'
3922 # Update operationState = 'PROCESSING' to indicate a retry.
3923 operationState
= "PROCESSING"
3924 detailed_status
= "In progress"
3925 self
._update
_suboperation
_status
(
3926 db_nslcmop
, op_index
, operationState
, detailed_status
3928 # Return the sub-operation index
3929 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3930 # with arguments extracted from the sub-operation
3933 # Find a sub-operation where all keys in a matching dictionary must match
3934 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3935 def _find_suboperation(self
, db_nslcmop
, match
):
3936 if db_nslcmop
and match
:
3937 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3938 for i
, op
in enumerate(op_list
):
3939 if all(op
.get(k
) == match
[k
] for k
in match
):
3941 return self
.SUBOPERATION_STATUS_NOT_FOUND
3943 # Update status for a sub-operation given its index
3944 def _update_suboperation_status(
3945 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3947 # Update DB for HA tasks
3948 q_filter
= {"_id": db_nslcmop
["_id"]}
3950 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3951 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3954 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3957 # Add sub-operation, return the index of the added sub-operation
3958 # Optionally, set operationState, detailed-status, and operationType
3959 # Status and type are currently set for 'scale' sub-operations:
3960 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3961 # 'detailed-status' : status message
3962 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3963 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3964 def _add_suboperation(
3972 mapped_primitive_params
,
3973 operationState
=None,
3974 detailed_status
=None,
3977 RO_scaling_info
=None,
3980 return self
.SUBOPERATION_STATUS_NOT_FOUND
3981 # Get the "_admin.operations" list, if it exists
3982 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3983 op_list
= db_nslcmop_admin
.get("operations")
3984 # Create or append to the "_admin.operations" list
3986 "member_vnf_index": vnf_index
,
3988 "vdu_count_index": vdu_count_index
,
3989 "primitive": primitive
,
3990 "primitive_params": mapped_primitive_params
,
3993 new_op
["operationState"] = operationState
3995 new_op
["detailed-status"] = detailed_status
3997 new_op
["lcmOperationType"] = operationType
3999 new_op
["RO_nsr_id"] = RO_nsr_id
4001 new_op
["RO_scaling_info"] = RO_scaling_info
4003 # No existing operations, create key 'operations' with current operation as first list element
4004 db_nslcmop_admin
.update({"operations": [new_op
]})
4005 op_list
= db_nslcmop_admin
.get("operations")
4007 # Existing operations, append operation to list
4008 op_list
.append(new_op
)
4010 db_nslcmop_update
= {"_admin.operations": op_list
}
4011 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4012 op_index
= len(op_list
) - 1
4015 # Helper methods for scale() sub-operations
4017 # pre-scale/post-scale:
4018 # Check for 3 different cases:
4019 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4020 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4021 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4022 def _check_or_add_scale_suboperation(
4026 vnf_config_primitive
,
4030 RO_scaling_info
=None,
4032 # Find this sub-operation
4033 if RO_nsr_id
and RO_scaling_info
:
4034 operationType
= "SCALE-RO"
4036 "member_vnf_index": vnf_index
,
4037 "RO_nsr_id": RO_nsr_id
,
4038 "RO_scaling_info": RO_scaling_info
,
4042 "member_vnf_index": vnf_index
,
4043 "primitive": vnf_config_primitive
,
4044 "primitive_params": primitive_params
,
4045 "lcmOperationType": operationType
,
4047 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4048 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4049 # a. New sub-operation
4050 # The sub-operation does not exist, add it.
4051 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4052 # The following parameters are set to None for all kind of scaling:
4054 vdu_count_index
= None
4056 if RO_nsr_id
and RO_scaling_info
:
4057 vnf_config_primitive
= None
4058 primitive_params
= None
4061 RO_scaling_info
= None
4062 # Initial status for sub-operation
4063 operationState
= "PROCESSING"
4064 detailed_status
= "In progress"
4065 # Add sub-operation for pre/post-scaling (zero or more operations)
4066 self
._add
_suboperation
(
4072 vnf_config_primitive
,
4080 return self
.SUBOPERATION_STATUS_NEW
4082 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4083 # or op_index (operationState != 'COMPLETED')
4084 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4086 # Function to return execution_environment id
4088 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4089 # TODO vdu_index_count
4090 for vca
in vca_deployed_list
:
4091 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4094 async def destroy_N2VC(
4102 exec_primitives
=True,
4107 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4108 :param logging_text:
4110 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4111 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4112 :param vca_index: index in the database _admin.deployed.VCA
4113 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4114 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4115 not executed properly
4116 :param scaling_in: True destroys the application, False destroys the model
4117 :return: None or exception
4122 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4123 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4127 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4129 # execute terminate_primitives
4131 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4132 config_descriptor
.get("terminate-config-primitive"),
4133 vca_deployed
.get("ee_descriptor_id"),
4135 vdu_id
= vca_deployed
.get("vdu_id")
4136 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4137 vdu_name
= vca_deployed
.get("vdu_name")
4138 vnf_index
= vca_deployed
.get("member-vnf-index")
4139 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4140 for seq
in terminate_primitives
:
4141 # For each sequence in list, get primitive and call _ns_execute_primitive()
4142 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4143 vnf_index
, seq
.get("name")
4145 self
.logger
.debug(logging_text
+ step
)
4146 # Create the primitive for each sequence, i.e. "primitive": "touch"
4147 primitive
= seq
.get("name")
4148 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4153 self
._add
_suboperation
(
4160 mapped_primitive_params
,
4162 # Sub-operations: Call _ns_execute_primitive() instead of action()
4164 result
, result_detail
= await self
._ns
_execute
_primitive
(
4165 vca_deployed
["ee_id"],
4167 mapped_primitive_params
,
4171 except LcmException
:
4172 # this happens when VCA is not deployed. In this case it is not needed to terminate
4174 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4175 if result
not in result_ok
:
4177 "terminate_primitive {} for vnf_member_index={} fails with "
4178 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4180 # set that this VCA do not need terminated
4181 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4185 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4188 # Delete Prometheus Jobs if any
4189 # This uses NSR_ID, so it will destroy any jobs under this index
4190 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4193 await self
.vca_map
[vca_type
].delete_execution_environment(
4194 vca_deployed
["ee_id"],
4195 scaling_in
=scaling_in
,
4200 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4201 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4202 namespace
= "." + db_nsr
["_id"]
4204 await self
.n2vc
.delete_namespace(
4205 namespace
=namespace
,
4206 total_timeout
=self
.timeout_charm_delete
,
4209 except N2VCNotFound
: # already deleted. Skip
4211 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4213 async def _terminate_RO(
4214 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4217 Terminates a deployment from RO
4218 :param logging_text:
4219 :param nsr_deployed: db_nsr._admin.deployed
4222 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4223 this method will update only the index 2, but it will write on database the concatenated content of the list
4228 ro_nsr_id
= ro_delete_action
= None
4229 if nsr_deployed
and nsr_deployed
.get("RO"):
4230 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4231 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4234 stage
[2] = "Deleting ns from VIM."
4235 db_nsr_update
["detailed-status"] = " ".join(stage
)
4236 self
._write
_op
_status
(nslcmop_id
, stage
)
4237 self
.logger
.debug(logging_text
+ stage
[2])
4238 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4239 self
._write
_op
_status
(nslcmop_id
, stage
)
4240 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4241 ro_delete_action
= desc
["action_id"]
4243 "_admin.deployed.RO.nsr_delete_action_id"
4244 ] = ro_delete_action
4245 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4246 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4247 if ro_delete_action
:
4248 # wait until NS is deleted from VIM
4249 stage
[2] = "Waiting ns deleted from VIM."
4250 detailed_status_old
= None
4254 + " RO_id={} ro_delete_action={}".format(
4255 ro_nsr_id
, ro_delete_action
4258 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4259 self
._write
_op
_status
(nslcmop_id
, stage
)
4261 delete_timeout
= 20 * 60 # 20 minutes
4262 while delete_timeout
> 0:
4263 desc
= await self
.RO
.show(
4265 item_id_name
=ro_nsr_id
,
4266 extra_item
="action",
4267 extra_item_id
=ro_delete_action
,
4271 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4273 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4274 if ns_status
== "ERROR":
4275 raise ROclient
.ROClientException(ns_status_info
)
4276 elif ns_status
== "BUILD":
4277 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4278 elif ns_status
== "ACTIVE":
4279 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4280 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4285 ), "ROclient.check_action_status returns unknown {}".format(
4288 if stage
[2] != detailed_status_old
:
4289 detailed_status_old
= stage
[2]
4290 db_nsr_update
["detailed-status"] = " ".join(stage
)
4291 self
._write
_op
_status
(nslcmop_id
, stage
)
4292 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4293 await asyncio
.sleep(5, loop
=self
.loop
)
4295 else: # delete_timeout <= 0:
4296 raise ROclient
.ROClientException(
4297 "Timeout waiting ns deleted from VIM"
4300 except Exception as e
:
4301 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4303 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4307 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4309 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4312 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4314 failed_detail
.append("delete conflict: {}".format(e
))
4317 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4320 failed_detail
.append("delete error: {}".format(e
))
4322 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4326 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4327 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4329 stage
[2] = "Deleting nsd from RO."
4330 db_nsr_update
["detailed-status"] = " ".join(stage
)
4331 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4332 self
._write
_op
_status
(nslcmop_id
, stage
)
4333 await self
.RO
.delete("nsd", ro_nsd_id
)
4335 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4337 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4338 except Exception as e
:
4340 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4342 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4344 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4347 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4349 failed_detail
.append(
4350 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4352 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4354 failed_detail
.append(
4355 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4357 self
.logger
.error(logging_text
+ failed_detail
[-1])
4359 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4360 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4361 if not vnf_deployed
or not vnf_deployed
["id"]:
4364 ro_vnfd_id
= vnf_deployed
["id"]
4367 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4368 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4370 db_nsr_update
["detailed-status"] = " ".join(stage
)
4371 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4372 self
._write
_op
_status
(nslcmop_id
, stage
)
4373 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4375 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4377 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4378 except Exception as e
:
4380 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4383 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4387 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4390 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4392 failed_detail
.append(
4393 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4395 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4397 failed_detail
.append(
4398 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4400 self
.logger
.error(logging_text
+ failed_detail
[-1])
4403 stage
[2] = "Error deleting from VIM"
4405 stage
[2] = "Deleted from VIM"
4406 db_nsr_update
["detailed-status"] = " ".join(stage
)
4407 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4408 self
._write
_op
_status
(nslcmop_id
, stage
)
4411 raise LcmException("; ".join(failed_detail
))
4413 async def terminate(self
, nsr_id
, nslcmop_id
):
4414 # Try to lock HA task here
4415 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4416 if not task_is_locked_by_me
:
4419 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4420 self
.logger
.debug(logging_text
+ "Enter")
4421 timeout_ns_terminate
= self
.timeout_ns_terminate
4424 operation_params
= None
4426 error_list
= [] # annotates all failed error messages
4427 db_nslcmop_update
= {}
4428 autoremove
= False # autoremove after terminated
4429 tasks_dict_info
= {}
4432 "Stage 1/3: Preparing task.",
4433 "Waiting for previous operations to terminate.",
4436 # ^ contains [stage, step, VIM-status]
4438 # wait for any previous tasks in process
4439 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4441 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4442 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4443 operation_params
= db_nslcmop
.get("operationParams") or {}
4444 if operation_params
.get("timeout_ns_terminate"):
4445 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4446 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4447 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4449 db_nsr_update
["operational-status"] = "terminating"
4450 db_nsr_update
["config-status"] = "terminating"
4451 self
._write
_ns
_status
(
4453 ns_state
="TERMINATING",
4454 current_operation
="TERMINATING",
4455 current_operation_id
=nslcmop_id
,
4456 other_update
=db_nsr_update
,
4458 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4459 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4460 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4463 stage
[1] = "Getting vnf descriptors from db."
4464 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4466 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4468 db_vnfds_from_id
= {}
4469 db_vnfds_from_member_index
= {}
4471 for vnfr
in db_vnfrs_list
:
4472 vnfd_id
= vnfr
["vnfd-id"]
4473 if vnfd_id
not in db_vnfds_from_id
:
4474 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4475 db_vnfds_from_id
[vnfd_id
] = vnfd
4476 db_vnfds_from_member_index
[
4477 vnfr
["member-vnf-index-ref"]
4478 ] = db_vnfds_from_id
[vnfd_id
]
4480 # Destroy individual execution environments when there are terminating primitives.
4481 # Rest of EE will be deleted at once
4482 # TODO - check before calling _destroy_N2VC
4483 # if not operation_params.get("skip_terminate_primitives"):#
4484 # or not vca.get("needed_terminate"):
4485 stage
[0] = "Stage 2/3 execute terminating primitives."
4486 self
.logger
.debug(logging_text
+ stage
[0])
4487 stage
[1] = "Looking execution environment that needs terminate."
4488 self
.logger
.debug(logging_text
+ stage
[1])
4490 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4491 config_descriptor
= None
4492 vca_member_vnf_index
= vca
.get("member-vnf-index")
4493 vca_id
= self
.get_vca_id(
4494 db_vnfrs_dict
.get(vca_member_vnf_index
)
4495 if vca_member_vnf_index
4499 if not vca
or not vca
.get("ee_id"):
4501 if not vca
.get("member-vnf-index"):
4503 config_descriptor
= db_nsr
.get("ns-configuration")
4504 elif vca
.get("vdu_id"):
4505 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4506 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4507 elif vca
.get("kdu_name"):
4508 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4509 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4511 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4512 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4513 vca_type
= vca
.get("type")
4514 exec_terminate_primitives
= not operation_params
.get(
4515 "skip_terminate_primitives"
4516 ) and vca
.get("needed_terminate")
4517 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4518 # pending native charms
4520 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4522 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4523 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4524 task
= asyncio
.ensure_future(
4532 exec_terminate_primitives
,
4536 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4538 # wait for pending tasks of terminate primitives
4542 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4544 error_list
= await self
._wait
_for
_tasks
(
4547 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4551 tasks_dict_info
.clear()
4553 return # raise LcmException("; ".join(error_list))
4555 # remove All execution environments at once
4556 stage
[0] = "Stage 3/3 delete all."
4558 if nsr_deployed
.get("VCA"):
4559 stage
[1] = "Deleting all execution environments."
4560 self
.logger
.debug(logging_text
+ stage
[1])
4561 vca_id
= self
.get_vca_id({}, db_nsr
)
4562 task_delete_ee
= asyncio
.ensure_future(
4564 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4565 timeout
=self
.timeout_charm_delete
,
4568 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4569 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4571 # Delete from k8scluster
4572 stage
[1] = "Deleting KDUs."
4573 self
.logger
.debug(logging_text
+ stage
[1])
4574 # print(nsr_deployed)
4575 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4576 if not kdu
or not kdu
.get("kdu-instance"):
4578 kdu_instance
= kdu
.get("kdu-instance")
4579 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4580 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4581 vca_id
= self
.get_vca_id({}, db_nsr
)
4582 task_delete_kdu_instance
= asyncio
.ensure_future(
4583 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4584 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4585 kdu_instance
=kdu_instance
,
4587 namespace
=kdu
.get("namespace"),
4593 + "Unknown k8s deployment type {}".format(
4594 kdu
.get("k8scluster-type")
4599 task_delete_kdu_instance
4600 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4603 stage
[1] = "Deleting ns from VIM."
4605 task_delete_ro
= asyncio
.ensure_future(
4606 self
._terminate
_ng
_ro
(
4607 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4611 task_delete_ro
= asyncio
.ensure_future(
4613 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4616 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4618 # rest of staff will be done at finally
4621 ROclient
.ROClientException
,
4626 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4628 except asyncio
.CancelledError
:
4630 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4632 exc
= "Operation was cancelled"
4633 except Exception as e
:
4634 exc
= traceback
.format_exc()
4635 self
.logger
.critical(
4636 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4641 error_list
.append(str(exc
))
4643 # wait for pending tasks
4645 stage
[1] = "Waiting for terminate pending tasks."
4646 self
.logger
.debug(logging_text
+ stage
[1])
4647 error_list
+= await self
._wait
_for
_tasks
(
4650 timeout_ns_terminate
,
4654 stage
[1] = stage
[2] = ""
4655 except asyncio
.CancelledError
:
4656 error_list
.append("Cancelled")
4657 # TODO cancell all tasks
4658 except Exception as exc
:
4659 error_list
.append(str(exc
))
4660 # update status at database
4662 error_detail
= "; ".join(error_list
)
4663 # self.logger.error(logging_text + error_detail)
4664 error_description_nslcmop
= "{} Detail: {}".format(
4665 stage
[0], error_detail
4667 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4668 nslcmop_id
, stage
[0]
4671 db_nsr_update
["operational-status"] = "failed"
4672 db_nsr_update
["detailed-status"] = (
4673 error_description_nsr
+ " Detail: " + error_detail
4675 db_nslcmop_update
["detailed-status"] = error_detail
4676 nslcmop_operation_state
= "FAILED"
4680 error_description_nsr
= error_description_nslcmop
= None
4681 ns_state
= "NOT_INSTANTIATED"
4682 db_nsr_update
["operational-status"] = "terminated"
4683 db_nsr_update
["detailed-status"] = "Done"
4684 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4685 db_nslcmop_update
["detailed-status"] = "Done"
4686 nslcmop_operation_state
= "COMPLETED"
4689 self
._write
_ns
_status
(
4692 current_operation
="IDLE",
4693 current_operation_id
=None,
4694 error_description
=error_description_nsr
,
4695 error_detail
=error_detail
,
4696 other_update
=db_nsr_update
,
4698 self
._write
_op
_status
(
4701 error_message
=error_description_nslcmop
,
4702 operation_state
=nslcmop_operation_state
,
4703 other_update
=db_nslcmop_update
,
4705 if ns_state
== "NOT_INSTANTIATED":
4709 {"nsr-id-ref": nsr_id
},
4710 {"_admin.nsState": "NOT_INSTANTIATED"},
4712 except DbException
as e
:
4715 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4719 if operation_params
:
4720 autoremove
= operation_params
.get("autoremove", False)
4721 if nslcmop_operation_state
:
4723 await self
.msg
.aiowrite(
4728 "nslcmop_id": nslcmop_id
,
4729 "operationState": nslcmop_operation_state
,
4730 "autoremove": autoremove
,
4734 except Exception as e
:
4736 logging_text
+ "kafka_write notification Exception {}".format(e
)
4739 self
.logger
.debug(logging_text
+ "Exit")
4740 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4742 async def _wait_for_tasks(
4743 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4746 error_detail_list
= []
4748 pending_tasks
= list(created_tasks_info
.keys())
4749 num_tasks
= len(pending_tasks
)
4751 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4752 self
._write
_op
_status
(nslcmop_id
, stage
)
4753 while pending_tasks
:
4755 _timeout
= timeout
+ time_start
- time()
4756 done
, pending_tasks
= await asyncio
.wait(
4757 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4759 num_done
+= len(done
)
4760 if not done
: # Timeout
4761 for task
in pending_tasks
:
4762 new_error
= created_tasks_info
[task
] + ": Timeout"
4763 error_detail_list
.append(new_error
)
4764 error_list
.append(new_error
)
4767 if task
.cancelled():
4770 exc
= task
.exception()
4772 if isinstance(exc
, asyncio
.TimeoutError
):
4774 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4775 error_list
.append(created_tasks_info
[task
])
4776 error_detail_list
.append(new_error
)
4783 ROclient
.ROClientException
,
4789 self
.logger
.error(logging_text
+ new_error
)
4791 exc_traceback
= "".join(
4792 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4796 + created_tasks_info
[task
]
4802 logging_text
+ created_tasks_info
[task
] + ": Done"
4804 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4806 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4807 if nsr_id
: # update also nsr
4812 "errorDescription": "Error at: " + ", ".join(error_list
),
4813 "errorDetail": ". ".join(error_detail_list
),
4816 self
._write
_op
_status
(nslcmop_id
, stage
)
4817 return error_detail_list
4820 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4822 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4823 The default-value is used. If it is between < > it look for a value at instantiation_params
4824 :param primitive_desc: portion of VNFD/NSD that describes primitive
4825 :param params: Params provided by user
4826 :param instantiation_params: Instantiation params provided by user
4827 :return: a dictionary with the calculated params
4829 calculated_params
= {}
4830 for parameter
in primitive_desc
.get("parameter", ()):
4831 param_name
= parameter
["name"]
4832 if param_name
in params
:
4833 calculated_params
[param_name
] = params
[param_name
]
4834 elif "default-value" in parameter
or "value" in parameter
:
4835 if "value" in parameter
:
4836 calculated_params
[param_name
] = parameter
["value"]
4838 calculated_params
[param_name
] = parameter
["default-value"]
4840 isinstance(calculated_params
[param_name
], str)
4841 and calculated_params
[param_name
].startswith("<")
4842 and calculated_params
[param_name
].endswith(">")
4844 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4845 calculated_params
[param_name
] = instantiation_params
[
4846 calculated_params
[param_name
][1:-1]
4850 "Parameter {} needed to execute primitive {} not provided".format(
4851 calculated_params
[param_name
], primitive_desc
["name"]
4856 "Parameter {} needed to execute primitive {} not provided".format(
4857 param_name
, primitive_desc
["name"]
4861 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4862 calculated_params
[param_name
] = yaml
.safe_dump(
4863 calculated_params
[param_name
], default_flow_style
=True, width
=256
4865 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4867 ].startswith("!!yaml "):
4868 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4869 if parameter
.get("data-type") == "INTEGER":
4871 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4872 except ValueError: # error converting string to int
4874 "Parameter {} of primitive {} must be integer".format(
4875 param_name
, primitive_desc
["name"]
4878 elif parameter
.get("data-type") == "BOOLEAN":
4879 calculated_params
[param_name
] = not (
4880 (str(calculated_params
[param_name
])).lower() == "false"
4883 # add always ns_config_info if primitive name is config
4884 if primitive_desc
["name"] == "config":
4885 if "ns_config_info" in instantiation_params
:
4886 calculated_params
["ns_config_info"] = instantiation_params
[
4889 return calculated_params
4891 def _look_for_deployed_vca(
4898 ee_descriptor_id
=None,
4900 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4901 for vca
in deployed_vca
:
4904 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4907 vdu_count_index
is not None
4908 and vdu_count_index
!= vca
["vdu_count_index"]
4911 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4913 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4917 # vca_deployed not found
4919 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4920 " is not deployed".format(
4929 ee_id
= vca
.get("ee_id")
4931 "type", "lxc_proxy_charm"
4932 ) # default value for backward compatibility - proxy charm
4935 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4936 "execution environment".format(
4937 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4940 return ee_id
, vca_type
4942 async def _ns_execute_primitive(
4948 retries_interval
=30,
4955 if primitive
== "config":
4956 primitive_params
= {"params": primitive_params
}
4958 vca_type
= vca_type
or "lxc_proxy_charm"
4962 output
= await asyncio
.wait_for(
4963 self
.vca_map
[vca_type
].exec_primitive(
4965 primitive_name
=primitive
,
4966 params_dict
=primitive_params
,
4967 progress_timeout
=self
.timeout_progress_primitive
,
4968 total_timeout
=self
.timeout_primitive
,
4973 timeout
=timeout
or self
.timeout_primitive
,
4977 except asyncio
.CancelledError
:
4979 except Exception as e
: # asyncio.TimeoutError
4980 if isinstance(e
, asyncio
.TimeoutError
):
4985 "Error executing action {} on {} -> {}".format(
4990 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4992 return "FAILED", str(e
)
4994 return "COMPLETED", output
4996 except (LcmException
, asyncio
.CancelledError
):
4998 except Exception as e
:
4999 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5001 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5003 Updating the vca_status with latest juju information in nsrs record
5004 :param: nsr_id: Id of the nsr
5005 :param: nslcmop_id: Id of the nslcmop
5009 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5010 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5011 vca_id
= self
.get_vca_id({}, db_nsr
)
5012 if db_nsr
["_admin"]["deployed"]["K8s"]:
5013 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5014 cluster_uuid
, kdu_instance
, cluster_type
= (
5015 k8s
["k8scluster-uuid"],
5016 k8s
["kdu-instance"],
5017 k8s
["k8scluster-type"],
5019 await self
._on
_update
_k
8s
_db
(
5020 cluster_uuid
=cluster_uuid
,
5021 kdu_instance
=kdu_instance
,
5022 filter={"_id": nsr_id
},
5024 cluster_type
=cluster_type
,
5027 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5028 table
, filter = "nsrs", {"_id": nsr_id
}
5029 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5030 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5032 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5033 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5035 async def action(self
, nsr_id
, nslcmop_id
):
5036 # Try to lock HA task here
5037 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5038 if not task_is_locked_by_me
:
5041 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5042 self
.logger
.debug(logging_text
+ "Enter")
5043 # get all needed from database
5047 db_nslcmop_update
= {}
5048 nslcmop_operation_state
= None
5049 error_description_nslcmop
= None
5052 # wait for any previous tasks in process
5053 step
= "Waiting for previous operations to terminate"
5054 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5056 self
._write
_ns
_status
(
5059 current_operation
="RUNNING ACTION",
5060 current_operation_id
=nslcmop_id
,
5063 step
= "Getting information from database"
5064 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5065 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5066 if db_nslcmop
["operationParams"].get("primitive_params"):
5067 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5068 db_nslcmop
["operationParams"]["primitive_params"]
5071 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5072 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5073 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5074 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5075 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5076 primitive
= db_nslcmop
["operationParams"]["primitive"]
5077 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5078 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5079 "timeout_ns_action", self
.timeout_primitive
5083 step
= "Getting vnfr from database"
5084 db_vnfr
= self
.db
.get_one(
5085 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5087 if db_vnfr
.get("kdur"):
5089 for kdur
in db_vnfr
["kdur"]:
5090 if kdur
.get("additionalParams"):
5091 kdur
["additionalParams"] = json
.loads(
5092 kdur
["additionalParams"]
5094 kdur_list
.append(kdur
)
5095 db_vnfr
["kdur"] = kdur_list
5096 step
= "Getting vnfd from database"
5097 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5099 # Sync filesystem before running a primitive
5100 self
.fs
.sync(db_vnfr
["vnfd-id"])
5102 step
= "Getting nsd from database"
5103 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5105 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5106 # for backward compatibility
5107 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5108 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5109 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5110 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5112 # look for primitive
5113 config_primitive_desc
= descriptor_configuration
= None
5115 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5117 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5119 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5121 descriptor_configuration
= db_nsd
.get("ns-configuration")
5123 if descriptor_configuration
and descriptor_configuration
.get(
5126 for config_primitive
in descriptor_configuration
["config-primitive"]:
5127 if config_primitive
["name"] == primitive
:
5128 config_primitive_desc
= config_primitive
5131 if not config_primitive_desc
:
5132 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5134 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5138 primitive_name
= primitive
5139 ee_descriptor_id
= None
5141 primitive_name
= config_primitive_desc
.get(
5142 "execution-environment-primitive", primitive
5144 ee_descriptor_id
= config_primitive_desc
.get(
5145 "execution-environment-ref"
5151 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5153 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5156 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5158 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5160 desc_params
= parse_yaml_strings(
5161 db_vnfr
.get("additionalParamsForVnf")
5164 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5165 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5166 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5168 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5169 actions
.add(primitive
["name"])
5170 for primitive
in kdu_configuration
.get("config-primitive", []):
5171 actions
.add(primitive
["name"])
5173 nsr_deployed
["K8s"],
5174 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5175 and kdu
["member-vnf-index"] == vnf_index
,
5179 if primitive_name
in actions
5180 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5184 # TODO check if ns is in a proper status
5186 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5188 # kdur and desc_params already set from before
5189 if primitive_params
:
5190 desc_params
.update(primitive_params
)
5191 # TODO Check if we will need something at vnf level
5192 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5194 kdu_name
== kdu
["kdu-name"]
5195 and kdu
["member-vnf-index"] == vnf_index
5200 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5203 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5204 msg
= "unknown k8scluster-type '{}'".format(
5205 kdu
.get("k8scluster-type")
5207 raise LcmException(msg
)
5210 "collection": "nsrs",
5211 "filter": {"_id": nsr_id
},
5212 "path": "_admin.deployed.K8s.{}".format(index
),
5216 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5218 step
= "Executing kdu {}".format(primitive_name
)
5219 if primitive_name
== "upgrade":
5220 if desc_params
.get("kdu_model"):
5221 kdu_model
= desc_params
.get("kdu_model")
5222 del desc_params
["kdu_model"]
5224 kdu_model
= kdu
.get("kdu-model")
5225 parts
= kdu_model
.split(sep
=":")
5227 kdu_model
= parts
[0]
5229 detailed_status
= await asyncio
.wait_for(
5230 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5231 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5232 kdu_instance
=kdu
.get("kdu-instance"),
5234 kdu_model
=kdu_model
,
5237 timeout
=timeout_ns_action
,
5239 timeout
=timeout_ns_action
+ 10,
5242 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5244 elif primitive_name
== "rollback":
5245 detailed_status
= await asyncio
.wait_for(
5246 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5247 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5248 kdu_instance
=kdu
.get("kdu-instance"),
5251 timeout
=timeout_ns_action
,
5253 elif primitive_name
== "status":
5254 detailed_status
= await asyncio
.wait_for(
5255 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5256 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5257 kdu_instance
=kdu
.get("kdu-instance"),
5260 timeout
=timeout_ns_action
,
5263 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5264 kdu
["kdu-name"], nsr_id
5266 params
= self
._map
_primitive
_params
(
5267 config_primitive_desc
, primitive_params
, desc_params
5270 detailed_status
= await asyncio
.wait_for(
5271 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5272 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5273 kdu_instance
=kdu_instance
,
5274 primitive_name
=primitive_name
,
5277 timeout
=timeout_ns_action
,
5280 timeout
=timeout_ns_action
,
5284 nslcmop_operation_state
= "COMPLETED"
5286 detailed_status
= ""
5287 nslcmop_operation_state
= "FAILED"
5289 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5290 nsr_deployed
["VCA"],
5291 member_vnf_index
=vnf_index
,
5293 vdu_count_index
=vdu_count_index
,
5294 ee_descriptor_id
=ee_descriptor_id
,
5296 for vca_index
, vca_deployed
in enumerate(
5297 db_nsr
["_admin"]["deployed"]["VCA"]
5299 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5301 "collection": "nsrs",
5302 "filter": {"_id": nsr_id
},
5303 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5307 nslcmop_operation_state
,
5309 ) = await self
._ns
_execute
_primitive
(
5311 primitive
=primitive_name
,
5312 primitive_params
=self
._map
_primitive
_params
(
5313 config_primitive_desc
, primitive_params
, desc_params
5315 timeout
=timeout_ns_action
,
5321 db_nslcmop_update
["detailed-status"] = detailed_status
5322 error_description_nslcmop
= (
5323 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5327 + " task Done with result {} {}".format(
5328 nslcmop_operation_state
, detailed_status
5331 return # database update is called inside finally
5333 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5334 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5336 except asyncio
.CancelledError
:
5338 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5340 exc
= "Operation was cancelled"
5341 except asyncio
.TimeoutError
:
5342 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5344 except Exception as e
:
5345 exc
= traceback
.format_exc()
5346 self
.logger
.critical(
5347 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5356 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5357 nslcmop_operation_state
= "FAILED"
5359 self
._write
_ns
_status
(
5363 ], # TODO check if degraded. For the moment use previous status
5364 current_operation
="IDLE",
5365 current_operation_id
=None,
5366 # error_description=error_description_nsr,
5367 # error_detail=error_detail,
5368 other_update
=db_nsr_update
,
5371 self
._write
_op
_status
(
5374 error_message
=error_description_nslcmop
,
5375 operation_state
=nslcmop_operation_state
,
5376 other_update
=db_nslcmop_update
,
5379 if nslcmop_operation_state
:
5381 await self
.msg
.aiowrite(
5386 "nslcmop_id": nslcmop_id
,
5387 "operationState": nslcmop_operation_state
,
5391 except Exception as e
:
5393 logging_text
+ "kafka_write notification Exception {}".format(e
)
5395 self
.logger
.debug(logging_text
+ "Exit")
5396 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5397 return nslcmop_operation_state
, detailed_status
5399 async def terminate_vdus(
5400 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5402 """This method terminates VDUs
5405 db_vnfr: VNF instance record
5406 member_vnf_index: VNF index to identify the VDUs to be removed
5407 db_nsr: NS instance record
5408 update_db_nslcmops: Nslcmop update record
5410 vca_scaling_info
= []
5411 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5412 scaling_info
["scaling_direction"] = "IN"
5413 scaling_info
["vdu-delete"] = {}
5414 scaling_info
["kdu-delete"] = {}
5415 db_vdur
= db_vnfr
.get("vdur")
5416 vdur_list
= copy(db_vdur
)
5418 for index
, vdu
in enumerate(vdur_list
):
5419 vca_scaling_info
.append(
5421 "osm_vdu_id": vdu
["vdu-id-ref"],
5422 "member-vnf-index": member_vnf_index
,
5424 "vdu_index": count_index
,
5426 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5427 scaling_info
["vdu"].append(
5429 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5430 "vdu_id": vdu
["vdu-id-ref"],
5433 for interface
in vdu
["interfaces"]:
5434 scaling_info
["vdu"][index
]["interface"].append(
5436 "name": interface
["name"],
5437 "ip_address": interface
["ip-address"],
5438 "mac_address": interface
.get("mac-address"),
5440 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5441 stage
[2] = "Terminating VDUs"
5442 if scaling_info
.get("vdu-delete"):
5443 # scale_process = "RO"
5444 if self
.ro_config
.get("ng"):
5445 await self
._scale
_ng
_ro
(
5446 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5449 async def remove_vnf(
5450 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5452 """This method is to Remove VNF instances from NS.
5455 nsr_id: NS instance id
5456 nslcmop_id: nslcmop id of update
5457 vnf_instance_id: id of the VNF instance to be removed
5460 result: (str, str) COMPLETED/FAILED, details
5464 logging_text
= "Task ns={} update ".format(nsr_id
)
5465 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5466 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5467 if check_vnfr_count
> 1:
5468 stage
= ["", "", ""]
5469 step
= "Getting nslcmop from database"
5470 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5471 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5472 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5473 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5474 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5475 """ db_vnfr = self.db.get_one(
5476 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5478 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5479 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5481 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5482 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5483 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5484 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5485 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5486 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5487 return "COMPLETED", "Done"
5489 step
= "Terminate VNF Failed with"
5490 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5492 except (LcmException
, asyncio
.CancelledError
):
5494 except Exception as e
:
5495 self
.logger
.debug("Error removing VNF {}".format(e
))
5496 return "FAILED", "Error removing VNF {}".format(e
)
5498 async def _ns_redeploy_vnf(
5499 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5501 """This method updates and redeploys VNF instances
5504 nsr_id: NS instance id
5505 nslcmop_id: nslcmop id
5506 db_vnfd: VNF descriptor
5507 db_vnfr: VNF instance record
5508 db_nsr: NS instance record
5511 result: (str, str) COMPLETED/FAILED, details
5515 stage
= ["", "", ""]
5516 logging_text
= "Task ns={} update ".format(nsr_id
)
5517 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5518 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5520 # Terminate old VNF resources
5521 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5522 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5524 # old_vnfd_id = db_vnfr["vnfd-id"]
5525 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5526 new_db_vnfd
= db_vnfd
5527 # new_vnfd_ref = new_db_vnfd["id"]
5528 # new_vnfd_id = vnfd_id
5532 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5534 "name": cp
.get("id"),
5535 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5536 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5539 new_vnfr_cp
.append(vnf_cp
)
5540 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5541 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5542 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5543 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5544 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5545 updated_db_vnfr
= self
.db
.get_one(
5546 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5549 # Instantiate new VNF resources
5550 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5551 vca_scaling_info
= []
5552 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5553 scaling_info
["scaling_direction"] = "OUT"
5554 scaling_info
["vdu-create"] = {}
5555 scaling_info
["kdu-create"] = {}
5556 vdud_instantiate_list
= db_vnfd
["vdu"]
5557 for index
, vdud
in enumerate(vdud_instantiate_list
):
5558 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5562 additional_params
= (
5563 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5566 cloud_init_list
= []
5568 # TODO Information of its own ip is not available because db_vnfr is not updated.
5569 additional_params
["OSM"] = get_osm_params(
5570 updated_db_vnfr
, vdud
["id"], 1
5572 cloud_init_list
.append(
5573 self
._parse
_cloud
_init
(
5580 vca_scaling_info
.append(
5582 "osm_vdu_id": vdud
["id"],
5583 "member-vnf-index": member_vnf_index
,
5585 "vdu_index": count_index
,
5588 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5589 if self
.ro_config
.get("ng"):
5591 "New Resources to be deployed: {}".format(scaling_info
))
5592 await self
._scale
_ng
_ro
(
5593 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5595 return "COMPLETED", "Done"
5596 except (LcmException
, asyncio
.CancelledError
):
5598 except Exception as e
:
5599 self
.logger
.debug("Error updating VNF {}".format(e
))
5600 return "FAILED", "Error updating VNF {}".format(e
)
5602 async def _ns_charm_upgrade(
5608 timeout
: float = None,
5610 """This method upgrade charms in VNF instances
5613 ee_id: Execution environment id
5614 path: Local path to the charm
5616 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5617 timeout: (Float) Timeout for the ns update operation
5620 result: (str, str) COMPLETED/FAILED, details
5623 charm_type
= charm_type
or "lxc_proxy_charm"
5624 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5628 charm_type
=charm_type
,
5629 timeout
=timeout
or self
.timeout_ns_update
,
5633 return "COMPLETED", output
5635 except (LcmException
, asyncio
.CancelledError
):
5638 except Exception as e
:
5640 self
.logger
.debug("Error upgrading charm {}".format(path
))
5642 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5644 async def update(self
, nsr_id
, nslcmop_id
):
5645 """Update NS according to different update types
5647 This method performs upgrade of VNF instances then updates the revision
5648 number in VNF record
5651 nsr_id: Network service will be updated
5652 nslcmop_id: ns lcm operation id
5655 It may raise DbException, LcmException, N2VCException, K8sException
5658 # Try to lock HA task here
5659 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5660 if not task_is_locked_by_me
:
5663 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5664 self
.logger
.debug(logging_text
+ "Enter")
5666 # Set the required variables to be filled up later
5668 db_nslcmop_update
= {}
5670 nslcmop_operation_state
= None
5672 error_description_nslcmop
= ""
5674 change_type
= "updated"
5675 detailed_status
= ""
5678 # wait for any previous tasks in process
5679 step
= "Waiting for previous operations to terminate"
5680 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5681 self
._write
_ns
_status
(
5684 current_operation
="UPDATING",
5685 current_operation_id
=nslcmop_id
,
5688 step
= "Getting nslcmop from database"
5689 db_nslcmop
= self
.db
.get_one(
5690 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5692 update_type
= db_nslcmop
["operationParams"]["updateType"]
5694 step
= "Getting nsr from database"
5695 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5696 old_operational_status
= db_nsr
["operational-status"]
5697 db_nsr_update
["operational-status"] = "updating"
5698 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5699 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5701 if update_type
== "CHANGE_VNFPKG":
5703 # Get the input parameters given through update request
5704 vnf_instance_id
= db_nslcmop
["operationParams"][
5705 "changeVnfPackageData"
5706 ].get("vnfInstanceId")
5708 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5711 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5713 step
= "Getting vnfr from database"
5714 db_vnfr
= self
.db
.get_one(
5715 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5718 step
= "Getting vnfds from database"
5720 latest_vnfd
= self
.db
.get_one(
5721 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5723 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5726 current_vnf_revision
= db_vnfr
.get("revision", 1)
5727 current_vnfd
= self
.db
.get_one(
5729 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5730 fail_on_empty
=False,
5732 # Charm artifact paths will be filled up later
5734 current_charm_artifact_path
,
5735 target_charm_artifact_path
,
5736 charm_artifact_paths
,
5739 step
= "Checking if revision has changed in VNFD"
5740 if current_vnf_revision
!= latest_vnfd_revision
:
5742 change_type
= "policy_updated"
5744 # There is new revision of VNFD, update operation is required
5745 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5746 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5748 step
= "Removing the VNFD packages if they exist in the local path"
5749 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5750 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5752 step
= "Get the VNFD packages from FSMongo"
5753 self
.fs
.sync(from_path
=latest_vnfd_path
)
5754 self
.fs
.sync(from_path
=current_vnfd_path
)
5757 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5759 base_folder
= latest_vnfd
["_admin"]["storage"]
5761 for charm_index
, charm_deployed
in enumerate(
5762 get_iterable(nsr_deployed
, "VCA")
5764 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5766 # Getting charm-id and charm-type
5767 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5768 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5769 charm_type
= charm_deployed
.get("type")
5772 ee_id
= charm_deployed
.get("ee_id")
5774 step
= "Getting descriptor config"
5775 descriptor_config
= get_configuration(
5776 current_vnfd
, current_vnfd
["id"]
5779 if "execution-environment-list" in descriptor_config
:
5780 ee_list
= descriptor_config
.get(
5781 "execution-environment-list", []
5786 # There could be several charm used in the same VNF
5787 for ee_item
in ee_list
:
5788 if ee_item
.get("juju"):
5790 step
= "Getting charm name"
5791 charm_name
= ee_item
["juju"].get("charm")
5793 step
= "Setting Charm artifact paths"
5794 current_charm_artifact_path
.append(
5795 get_charm_artifact_path(
5799 current_vnf_revision
,
5802 target_charm_artifact_path
.append(
5803 get_charm_artifact_path(
5807 latest_vnfd_revision
,
5811 charm_artifact_paths
= zip(
5812 current_charm_artifact_path
, target_charm_artifact_path
5815 step
= "Checking if software version has changed in VNFD"
5816 if find_software_version(current_vnfd
) != find_software_version(
5820 step
= "Checking if existing VNF has charm"
5821 for current_charm_path
, target_charm_path
in list(
5822 charm_artifact_paths
5824 if current_charm_path
:
5826 "Software version change is not supported as VNF instance {} has charm.".format(
5831 # There is no change in the charm package, then redeploy the VNF
5832 # based on new descriptor
5833 step
= "Redeploying VNF"
5834 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5838 ) = await self
._ns
_redeploy
_vnf
(
5845 if result
== "FAILED":
5846 nslcmop_operation_state
= result
5847 error_description_nslcmop
= detailed_status
5848 db_nslcmop_update
["detailed-status"] = detailed_status
5851 + " step {} Done with result {} {}".format(
5852 step
, nslcmop_operation_state
, detailed_status
5857 step
= "Checking if any charm package has changed or not"
5858 for current_charm_path
, target_charm_path
in list(
5859 charm_artifact_paths
5863 and target_charm_path
5864 and self
.check_charm_hash_changed(
5865 current_charm_path
, target_charm_path
5869 step
= "Checking whether VNF uses juju bundle"
5870 if check_juju_bundle_existence(current_vnfd
):
5873 "Charm upgrade is not supported for the instance which"
5874 " uses juju-bundle: {}".format(
5875 check_juju_bundle_existence(current_vnfd
)
5879 step
= "Upgrading Charm"
5883 ) = await self
._ns
_charm
_upgrade
(
5886 charm_type
=charm_type
,
5887 path
=self
.fs
.path
+ target_charm_path
,
5888 timeout
=timeout_seconds
,
5891 if result
== "FAILED":
5892 nslcmop_operation_state
= result
5893 error_description_nslcmop
= detailed_status
5895 db_nslcmop_update
["detailed-status"] = detailed_status
5898 + " step {} Done with result {} {}".format(
5899 step
, nslcmop_operation_state
, detailed_status
5903 step
= "Updating policies"
5904 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5905 result
= "COMPLETED"
5906 detailed_status
= "Done"
5907 db_nslcmop_update
["detailed-status"] = "Done"
5909 # If nslcmop_operation_state is None, so any operation is not failed.
5910 if not nslcmop_operation_state
:
5911 nslcmop_operation_state
= "COMPLETED"
5913 # If update CHANGE_VNFPKG nslcmop_operation is successful
5914 # vnf revision need to be updated
5915 vnfr_update
["revision"] = latest_vnfd_revision
5916 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5920 + " task Done with result {} {}".format(
5921 nslcmop_operation_state
, detailed_status
5924 elif update_type
== "REMOVE_VNF":
5925 # This part is included in https://osm.etsi.org/gerrit/11876
5926 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5927 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5928 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5929 step
= "Removing VNF"
5930 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5931 if result
== "FAILED":
5932 nslcmop_operation_state
= result
5933 error_description_nslcmop
= detailed_status
5934 db_nslcmop_update
["detailed-status"] = detailed_status
5935 change_type
= "vnf_terminated"
5936 if not nslcmop_operation_state
:
5937 nslcmop_operation_state
= "COMPLETED"
5940 + " task Done with result {} {}".format(
5941 nslcmop_operation_state
, detailed_status
5945 elif update_type
== "OPERATE_VNF":
5946 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5947 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5948 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5949 (result
, detailed_status
) = await self
.rebuild_start_stop(
5950 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5952 if result
== "FAILED":
5953 nslcmop_operation_state
= result
5954 error_description_nslcmop
= detailed_status
5955 db_nslcmop_update
["detailed-status"] = detailed_status
5956 if not nslcmop_operation_state
:
5957 nslcmop_operation_state
= "COMPLETED"
5960 + " task Done with result {} {}".format(
5961 nslcmop_operation_state
, detailed_status
5965 # If nslcmop_operation_state is None, so any operation is not failed.
5966 # All operations are executed in overall.
5967 if not nslcmop_operation_state
:
5968 nslcmop_operation_state
= "COMPLETED"
5969 db_nsr_update
["operational-status"] = old_operational_status
5971 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5972 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5974 except asyncio
.CancelledError
:
5976 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5978 exc
= "Operation was cancelled"
5979 except asyncio
.TimeoutError
:
5980 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5982 except Exception as e
:
5983 exc
= traceback
.format_exc()
5984 self
.logger
.critical(
5985 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5994 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5995 nslcmop_operation_state
= "FAILED"
5996 db_nsr_update
["operational-status"] = old_operational_status
5998 self
._write
_ns
_status
(
6000 ns_state
=db_nsr
["nsState"],
6001 current_operation
="IDLE",
6002 current_operation_id
=None,
6003 other_update
=db_nsr_update
,
6006 self
._write
_op
_status
(
6009 error_message
=error_description_nslcmop
,
6010 operation_state
=nslcmop_operation_state
,
6011 other_update
=db_nslcmop_update
,
6014 if nslcmop_operation_state
:
6018 "nslcmop_id": nslcmop_id
,
6019 "operationState": nslcmop_operation_state
,
6021 if change_type
in ("vnf_terminated", "policy_updated"):
6022 msg
.update({"vnf_member_index": member_vnf_index
})
6023 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6024 except Exception as e
:
6026 logging_text
+ "kafka_write notification Exception {}".format(e
)
6028 self
.logger
.debug(logging_text
+ "Exit")
6029 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6030 return nslcmop_operation_state
, detailed_status
6032 async def scale(self
, nsr_id
, nslcmop_id
):
6033 # Try to lock HA task here
6034 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6035 if not task_is_locked_by_me
:
6038 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6039 stage
= ["", "", ""]
6040 tasks_dict_info
= {}
6041 # ^ stage, step, VIM progress
6042 self
.logger
.debug(logging_text
+ "Enter")
6043 # get all needed from database
6045 db_nslcmop_update
= {}
6048 # in case of error, indicates what part of scale was failed to put nsr at error status
6049 scale_process
= None
6050 old_operational_status
= ""
6051 old_config_status
= ""
6054 # wait for any previous tasks in process
6055 step
= "Waiting for previous operations to terminate"
6056 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6057 self
._write
_ns
_status
(
6060 current_operation
="SCALING",
6061 current_operation_id
=nslcmop_id
,
6064 step
= "Getting nslcmop from database"
6066 step
+ " after having waited for previous tasks to be completed"
6068 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6070 step
= "Getting nsr from database"
6071 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6072 old_operational_status
= db_nsr
["operational-status"]
6073 old_config_status
= db_nsr
["config-status"]
6075 step
= "Parsing scaling parameters"
6076 db_nsr_update
["operational-status"] = "scaling"
6077 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6078 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6080 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6082 ]["member-vnf-index"]
6083 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6085 ]["scaling-group-descriptor"]
6086 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6087 # for backward compatibility
6088 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6089 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6090 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6091 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6093 step
= "Getting vnfr from database"
6094 db_vnfr
= self
.db
.get_one(
6095 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6098 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6100 step
= "Getting vnfd from database"
6101 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6103 base_folder
= db_vnfd
["_admin"]["storage"]
6105 step
= "Getting scaling-group-descriptor"
6106 scaling_descriptor
= find_in_list(
6107 get_scaling_aspect(db_vnfd
),
6108 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6110 if not scaling_descriptor
:
6112 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6113 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6116 step
= "Sending scale order to VIM"
6117 # TODO check if ns is in a proper status
6119 if not db_nsr
["_admin"].get("scaling-group"):
6124 "_admin.scaling-group": [
6125 {"name": scaling_group
, "nb-scale-op": 0}
6129 admin_scale_index
= 0
6131 for admin_scale_index
, admin_scale_info
in enumerate(
6132 db_nsr
["_admin"]["scaling-group"]
6134 if admin_scale_info
["name"] == scaling_group
:
6135 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6137 else: # not found, set index one plus last element and add new entry with the name
6138 admin_scale_index
+= 1
6140 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6143 vca_scaling_info
= []
6144 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6145 if scaling_type
== "SCALE_OUT":
6146 if "aspect-delta-details" not in scaling_descriptor
:
6148 "Aspect delta details not fount in scaling descriptor {}".format(
6149 scaling_descriptor
["name"]
6152 # count if max-instance-count is reached
6153 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6155 scaling_info
["scaling_direction"] = "OUT"
6156 scaling_info
["vdu-create"] = {}
6157 scaling_info
["kdu-create"] = {}
6158 for delta
in deltas
:
6159 for vdu_delta
in delta
.get("vdu-delta", {}):
6160 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6161 # vdu_index also provides the number of instance of the targeted vdu
6162 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6163 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6167 additional_params
= (
6168 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6171 cloud_init_list
= []
6173 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6174 max_instance_count
= 10
6175 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6176 max_instance_count
= vdu_profile
.get(
6177 "max-number-of-instances", 10
6180 default_instance_num
= get_number_of_instances(
6183 instances_number
= vdu_delta
.get("number-of-instances", 1)
6184 nb_scale_op
+= instances_number
6186 new_instance_count
= nb_scale_op
+ default_instance_num
6187 # Control if new count is over max and vdu count is less than max.
6188 # Then assign new instance count
6189 if new_instance_count
> max_instance_count
> vdu_count
:
6190 instances_number
= new_instance_count
- max_instance_count
6192 instances_number
= instances_number
6194 if new_instance_count
> max_instance_count
:
6196 "reached the limit of {} (max-instance-count) "
6197 "scaling-out operations for the "
6198 "scaling-group-descriptor '{}'".format(
6199 nb_scale_op
, scaling_group
6202 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6204 # TODO Information of its own ip is not available because db_vnfr is not updated.
6205 additional_params
["OSM"] = get_osm_params(
6206 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6208 cloud_init_list
.append(
6209 self
._parse
_cloud
_init
(
6216 vca_scaling_info
.append(
6218 "osm_vdu_id": vdu_delta
["id"],
6219 "member-vnf-index": vnf_index
,
6221 "vdu_index": vdu_index
+ x
,
6224 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6225 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6226 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6227 kdu_name
= kdu_profile
["kdu-name"]
6228 resource_name
= kdu_profile
.get("resource-name", "")
6230 # Might have different kdus in the same delta
6231 # Should have list for each kdu
6232 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6233 scaling_info
["kdu-create"][kdu_name
] = []
6235 kdur
= get_kdur(db_vnfr
, kdu_name
)
6236 if kdur
.get("helm-chart"):
6237 k8s_cluster_type
= "helm-chart-v3"
6238 self
.logger
.debug("kdur: {}".format(kdur
))
6240 kdur
.get("helm-version")
6241 and kdur
.get("helm-version") == "v2"
6243 k8s_cluster_type
= "helm-chart"
6244 elif kdur
.get("juju-bundle"):
6245 k8s_cluster_type
= "juju-bundle"
6248 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6249 "juju-bundle. Maybe an old NBI version is running".format(
6250 db_vnfr
["member-vnf-index-ref"], kdu_name
6254 max_instance_count
= 10
6255 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6256 max_instance_count
= kdu_profile
.get(
6257 "max-number-of-instances", 10
6260 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6261 deployed_kdu
, _
= get_deployed_kdu(
6262 nsr_deployed
, kdu_name
, vnf_index
6264 if deployed_kdu
is None:
6266 "KDU '{}' for vnf '{}' not deployed".format(
6270 kdu_instance
= deployed_kdu
.get("kdu-instance")
6271 instance_num
= await self
.k8scluster_map
[
6277 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6278 kdu_model
=deployed_kdu
.get("kdu-model"),
6280 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6281 "number-of-instances", 1
6284 # Control if new count is over max and instance_num is less than max.
6285 # Then assign max instance number to kdu replica count
6286 if kdu_replica_count
> max_instance_count
> instance_num
:
6287 kdu_replica_count
= max_instance_count
6288 if kdu_replica_count
> max_instance_count
:
6290 "reached the limit of {} (max-instance-count) "
6291 "scaling-out operations for the "
6292 "scaling-group-descriptor '{}'".format(
6293 instance_num
, scaling_group
6297 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6298 vca_scaling_info
.append(
6300 "osm_kdu_id": kdu_name
,
6301 "member-vnf-index": vnf_index
,
6303 "kdu_index": instance_num
+ x
- 1,
6306 scaling_info
["kdu-create"][kdu_name
].append(
6308 "member-vnf-index": vnf_index
,
6310 "k8s-cluster-type": k8s_cluster_type
,
6311 "resource-name": resource_name
,
6312 "scale": kdu_replica_count
,
6315 elif scaling_type
== "SCALE_IN":
6316 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6318 scaling_info
["scaling_direction"] = "IN"
6319 scaling_info
["vdu-delete"] = {}
6320 scaling_info
["kdu-delete"] = {}
6322 for delta
in deltas
:
6323 for vdu_delta
in delta
.get("vdu-delta", {}):
6324 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6325 min_instance_count
= 0
6326 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6327 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6328 min_instance_count
= vdu_profile
["min-number-of-instances"]
6330 default_instance_num
= get_number_of_instances(
6331 db_vnfd
, vdu_delta
["id"]
6333 instance_num
= vdu_delta
.get("number-of-instances", 1)
6334 nb_scale_op
-= instance_num
6336 new_instance_count
= nb_scale_op
+ default_instance_num
6338 if new_instance_count
< min_instance_count
< vdu_count
:
6339 instances_number
= min_instance_count
- new_instance_count
6341 instances_number
= instance_num
6343 if new_instance_count
< min_instance_count
:
6345 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6346 "scaling-group-descriptor '{}'".format(
6347 nb_scale_op
, scaling_group
6350 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6351 vca_scaling_info
.append(
6353 "osm_vdu_id": vdu_delta
["id"],
6354 "member-vnf-index": vnf_index
,
6356 "vdu_index": vdu_index
- 1 - x
,
6359 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6360 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6361 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6362 kdu_name
= kdu_profile
["kdu-name"]
6363 resource_name
= kdu_profile
.get("resource-name", "")
6365 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6366 scaling_info
["kdu-delete"][kdu_name
] = []
6368 kdur
= get_kdur(db_vnfr
, kdu_name
)
6369 if kdur
.get("helm-chart"):
6370 k8s_cluster_type
= "helm-chart-v3"
6371 self
.logger
.debug("kdur: {}".format(kdur
))
6373 kdur
.get("helm-version")
6374 and kdur
.get("helm-version") == "v2"
6376 k8s_cluster_type
= "helm-chart"
6377 elif kdur
.get("juju-bundle"):
6378 k8s_cluster_type
= "juju-bundle"
6381 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6382 "juju-bundle. Maybe an old NBI version is running".format(
6383 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6387 min_instance_count
= 0
6388 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6389 min_instance_count
= kdu_profile
["min-number-of-instances"]
6391 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6392 deployed_kdu
, _
= get_deployed_kdu(
6393 nsr_deployed
, kdu_name
, vnf_index
6395 if deployed_kdu
is None:
6397 "KDU '{}' for vnf '{}' not deployed".format(
6401 kdu_instance
= deployed_kdu
.get("kdu-instance")
6402 instance_num
= await self
.k8scluster_map
[
6408 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6409 kdu_model
=deployed_kdu
.get("kdu-model"),
6411 kdu_replica_count
= instance_num
- kdu_delta
.get(
6412 "number-of-instances", 1
6415 if kdu_replica_count
< min_instance_count
< instance_num
:
6416 kdu_replica_count
= min_instance_count
6417 if kdu_replica_count
< min_instance_count
:
6419 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6420 "scaling-group-descriptor '{}'".format(
6421 instance_num
, scaling_group
6425 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6426 vca_scaling_info
.append(
6428 "osm_kdu_id": kdu_name
,
6429 "member-vnf-index": vnf_index
,
6431 "kdu_index": instance_num
- x
- 1,
6434 scaling_info
["kdu-delete"][kdu_name
].append(
6436 "member-vnf-index": vnf_index
,
6438 "k8s-cluster-type": k8s_cluster_type
,
6439 "resource-name": resource_name
,
6440 "scale": kdu_replica_count
,
6444 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6445 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6446 if scaling_info
["scaling_direction"] == "IN":
6447 for vdur
in reversed(db_vnfr
["vdur"]):
6448 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6449 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6450 scaling_info
["vdu"].append(
6452 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6453 "vdu_id": vdur
["vdu-id-ref"],
6457 for interface
in vdur
["interfaces"]:
6458 scaling_info
["vdu"][-1]["interface"].append(
6460 "name": interface
["name"],
6461 "ip_address": interface
["ip-address"],
6462 "mac_address": interface
.get("mac-address"),
6465 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6468 step
= "Executing pre-scale vnf-config-primitive"
6469 if scaling_descriptor
.get("scaling-config-action"):
6470 for scaling_config_action
in scaling_descriptor
[
6471 "scaling-config-action"
6474 scaling_config_action
.get("trigger") == "pre-scale-in"
6475 and scaling_type
== "SCALE_IN"
6477 scaling_config_action
.get("trigger") == "pre-scale-out"
6478 and scaling_type
== "SCALE_OUT"
6480 vnf_config_primitive
= scaling_config_action
[
6481 "vnf-config-primitive-name-ref"
6483 step
= db_nslcmop_update
[
6485 ] = "executing pre-scale scaling-config-action '{}'".format(
6486 vnf_config_primitive
6489 # look for primitive
6490 for config_primitive
in (
6491 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6492 ).get("config-primitive", ()):
6493 if config_primitive
["name"] == vnf_config_primitive
:
6497 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6498 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6499 "primitive".format(scaling_group
, vnf_config_primitive
)
6502 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6503 if db_vnfr
.get("additionalParamsForVnf"):
6504 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6506 scale_process
= "VCA"
6507 db_nsr_update
["config-status"] = "configuring pre-scaling"
6508 primitive_params
= self
._map
_primitive
_params
(
6509 config_primitive
, {}, vnfr_params
6512 # Pre-scale retry check: Check if this sub-operation has been executed before
6513 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6516 vnf_config_primitive
,
6520 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6521 # Skip sub-operation
6522 result
= "COMPLETED"
6523 result_detail
= "Done"
6526 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6527 vnf_config_primitive
, result
, result_detail
6531 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6532 # New sub-operation: Get index of this sub-operation
6534 len(db_nslcmop
.get("_admin", {}).get("operations"))
6539 + "vnf_config_primitive={} New sub-operation".format(
6540 vnf_config_primitive
6544 # retry: Get registered params for this existing sub-operation
6545 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6548 vnf_index
= op
.get("member_vnf_index")
6549 vnf_config_primitive
= op
.get("primitive")
6550 primitive_params
= op
.get("primitive_params")
6553 + "vnf_config_primitive={} Sub-operation retry".format(
6554 vnf_config_primitive
6557 # Execute the primitive, either with new (first-time) or registered (reintent) args
6558 ee_descriptor_id
= config_primitive
.get(
6559 "execution-environment-ref"
6561 primitive_name
= config_primitive
.get(
6562 "execution-environment-primitive", vnf_config_primitive
6564 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6565 nsr_deployed
["VCA"],
6566 member_vnf_index
=vnf_index
,
6568 vdu_count_index
=None,
6569 ee_descriptor_id
=ee_descriptor_id
,
6571 result
, result_detail
= await self
._ns
_execute
_primitive
(
6580 + "vnf_config_primitive={} Done with result {} {}".format(
6581 vnf_config_primitive
, result
, result_detail
6584 # Update operationState = COMPLETED | FAILED
6585 self
._update
_suboperation
_status
(
6586 db_nslcmop
, op_index
, result
, result_detail
6589 if result
== "FAILED":
6590 raise LcmException(result_detail
)
6591 db_nsr_update
["config-status"] = old_config_status
6592 scale_process
= None
6596 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6599 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6602 # SCALE-IN VCA - BEGIN
6603 if vca_scaling_info
:
6604 step
= db_nslcmop_update
[
6606 ] = "Deleting the execution environments"
6607 scale_process
= "VCA"
6608 for vca_info
in vca_scaling_info
:
6609 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6610 member_vnf_index
= str(vca_info
["member-vnf-index"])
6612 logging_text
+ "vdu info: {}".format(vca_info
)
6614 if vca_info
.get("osm_vdu_id"):
6615 vdu_id
= vca_info
["osm_vdu_id"]
6616 vdu_index
= int(vca_info
["vdu_index"])
6619 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6620 member_vnf_index
, vdu_id
, vdu_index
6622 stage
[2] = step
= "Scaling in VCA"
6623 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6624 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6625 config_update
= db_nsr
["configurationStatus"]
6626 for vca_index
, vca
in enumerate(vca_update
):
6628 (vca
or vca
.get("ee_id"))
6629 and vca
["member-vnf-index"] == member_vnf_index
6630 and vca
["vdu_count_index"] == vdu_index
6632 if vca
.get("vdu_id"):
6633 config_descriptor
= get_configuration(
6634 db_vnfd
, vca
.get("vdu_id")
6636 elif vca
.get("kdu_name"):
6637 config_descriptor
= get_configuration(
6638 db_vnfd
, vca
.get("kdu_name")
6641 config_descriptor
= get_configuration(
6642 db_vnfd
, db_vnfd
["id"]
6644 operation_params
= (
6645 db_nslcmop
.get("operationParams") or {}
6647 exec_terminate_primitives
= not operation_params
.get(
6648 "skip_terminate_primitives"
6649 ) and vca
.get("needed_terminate")
6650 task
= asyncio
.ensure_future(
6659 exec_primitives
=exec_terminate_primitives
,
6663 timeout
=self
.timeout_charm_delete
,
6666 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6669 del vca_update
[vca_index
]
6670 del config_update
[vca_index
]
6671 # wait for pending tasks of terminate primitives
6675 + "Waiting for tasks {}".format(
6676 list(tasks_dict_info
.keys())
6679 error_list
= await self
._wait
_for
_tasks
(
6683 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6688 tasks_dict_info
.clear()
6690 raise LcmException("; ".join(error_list
))
6692 db_vca_and_config_update
= {
6693 "_admin.deployed.VCA": vca_update
,
6694 "configurationStatus": config_update
,
6697 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6699 scale_process
= None
6700 # SCALE-IN VCA - END
6703 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6704 scale_process
= "RO"
6705 if self
.ro_config
.get("ng"):
6706 await self
._scale
_ng
_ro
(
6707 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6709 scaling_info
.pop("vdu-create", None)
6710 scaling_info
.pop("vdu-delete", None)
6712 scale_process
= None
6716 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6717 scale_process
= "KDU"
6718 await self
._scale
_kdu
(
6719 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6721 scaling_info
.pop("kdu-create", None)
6722 scaling_info
.pop("kdu-delete", None)
6724 scale_process
= None
6728 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6730 # SCALE-UP VCA - BEGIN
6731 if vca_scaling_info
:
6732 step
= db_nslcmop_update
[
6734 ] = "Creating new execution environments"
6735 scale_process
= "VCA"
6736 for vca_info
in vca_scaling_info
:
6737 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6738 member_vnf_index
= str(vca_info
["member-vnf-index"])
6740 logging_text
+ "vdu info: {}".format(vca_info
)
6742 vnfd_id
= db_vnfr
["vnfd-ref"]
6743 if vca_info
.get("osm_vdu_id"):
6744 vdu_index
= int(vca_info
["vdu_index"])
6745 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6746 if db_vnfr
.get("additionalParamsForVnf"):
6747 deploy_params
.update(
6749 db_vnfr
["additionalParamsForVnf"].copy()
6752 descriptor_config
= get_configuration(
6753 db_vnfd
, db_vnfd
["id"]
6755 if descriptor_config
:
6760 logging_text
=logging_text
6761 + "member_vnf_index={} ".format(member_vnf_index
),
6764 nslcmop_id
=nslcmop_id
,
6770 member_vnf_index
=member_vnf_index
,
6771 vdu_index
=vdu_index
,
6773 deploy_params
=deploy_params
,
6774 descriptor_config
=descriptor_config
,
6775 base_folder
=base_folder
,
6776 task_instantiation_info
=tasks_dict_info
,
6779 vdu_id
= vca_info
["osm_vdu_id"]
6780 vdur
= find_in_list(
6781 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6783 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6784 if vdur
.get("additionalParams"):
6785 deploy_params_vdu
= parse_yaml_strings(
6786 vdur
["additionalParams"]
6789 deploy_params_vdu
= deploy_params
6790 deploy_params_vdu
["OSM"] = get_osm_params(
6791 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6793 if descriptor_config
:
6798 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6799 member_vnf_index
, vdu_id
, vdu_index
6801 stage
[2] = step
= "Scaling out VCA"
6802 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6804 logging_text
=logging_text
6805 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6806 member_vnf_index
, vdu_id
, vdu_index
6810 nslcmop_id
=nslcmop_id
,
6816 member_vnf_index
=member_vnf_index
,
6817 vdu_index
=vdu_index
,
6819 deploy_params
=deploy_params_vdu
,
6820 descriptor_config
=descriptor_config
,
6821 base_folder
=base_folder
,
6822 task_instantiation_info
=tasks_dict_info
,
6825 # SCALE-UP VCA - END
6826 scale_process
= None
6829 # execute primitive service POST-SCALING
6830 step
= "Executing post-scale vnf-config-primitive"
6831 if scaling_descriptor
.get("scaling-config-action"):
6832 for scaling_config_action
in scaling_descriptor
[
6833 "scaling-config-action"
6836 scaling_config_action
.get("trigger") == "post-scale-in"
6837 and scaling_type
== "SCALE_IN"
6839 scaling_config_action
.get("trigger") == "post-scale-out"
6840 and scaling_type
== "SCALE_OUT"
6842 vnf_config_primitive
= scaling_config_action
[
6843 "vnf-config-primitive-name-ref"
6845 step
= db_nslcmop_update
[
6847 ] = "executing post-scale scaling-config-action '{}'".format(
6848 vnf_config_primitive
6851 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6852 if db_vnfr
.get("additionalParamsForVnf"):
6853 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6855 # look for primitive
6856 for config_primitive
in (
6857 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6858 ).get("config-primitive", ()):
6859 if config_primitive
["name"] == vnf_config_primitive
:
6863 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6864 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6865 "config-primitive".format(
6866 scaling_group
, vnf_config_primitive
6869 scale_process
= "VCA"
6870 db_nsr_update
["config-status"] = "configuring post-scaling"
6871 primitive_params
= self
._map
_primitive
_params
(
6872 config_primitive
, {}, vnfr_params
6875 # Post-scale retry check: Check if this sub-operation has been executed before
6876 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6879 vnf_config_primitive
,
6883 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6884 # Skip sub-operation
6885 result
= "COMPLETED"
6886 result_detail
= "Done"
6889 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6890 vnf_config_primitive
, result
, result_detail
6894 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6895 # New sub-operation: Get index of this sub-operation
6897 len(db_nslcmop
.get("_admin", {}).get("operations"))
6902 + "vnf_config_primitive={} New sub-operation".format(
6903 vnf_config_primitive
6907 # retry: Get registered params for this existing sub-operation
6908 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6911 vnf_index
= op
.get("member_vnf_index")
6912 vnf_config_primitive
= op
.get("primitive")
6913 primitive_params
= op
.get("primitive_params")
6916 + "vnf_config_primitive={} Sub-operation retry".format(
6917 vnf_config_primitive
6920 # Execute the primitive, either with new (first-time) or registered (reintent) args
6921 ee_descriptor_id
= config_primitive
.get(
6922 "execution-environment-ref"
6924 primitive_name
= config_primitive
.get(
6925 "execution-environment-primitive", vnf_config_primitive
6927 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6928 nsr_deployed
["VCA"],
6929 member_vnf_index
=vnf_index
,
6931 vdu_count_index
=None,
6932 ee_descriptor_id
=ee_descriptor_id
,
6934 result
, result_detail
= await self
._ns
_execute
_primitive
(
6943 + "vnf_config_primitive={} Done with result {} {}".format(
6944 vnf_config_primitive
, result
, result_detail
6947 # Update operationState = COMPLETED | FAILED
6948 self
._update
_suboperation
_status
(
6949 db_nslcmop
, op_index
, result
, result_detail
6952 if result
== "FAILED":
6953 raise LcmException(result_detail
)
6954 db_nsr_update
["config-status"] = old_config_status
6955 scale_process
= None
6960 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6961 db_nsr_update
["operational-status"] = (
6963 if old_operational_status
== "failed"
6964 else old_operational_status
6966 db_nsr_update
["config-status"] = old_config_status
6969 ROclient
.ROClientException
,
6974 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6976 except asyncio
.CancelledError
:
6978 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6980 exc
= "Operation was cancelled"
6981 except Exception as e
:
6982 exc
= traceback
.format_exc()
6983 self
.logger
.critical(
6984 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6988 self
._write
_ns
_status
(
6991 current_operation
="IDLE",
6992 current_operation_id
=None,
6995 stage
[1] = "Waiting for instantiate pending tasks."
6996 self
.logger
.debug(logging_text
+ stage
[1])
6997 exc
= await self
._wait
_for
_tasks
(
7000 self
.timeout_ns_deploy
,
7008 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7009 nslcmop_operation_state
= "FAILED"
7011 db_nsr_update
["operational-status"] = old_operational_status
7012 db_nsr_update
["config-status"] = old_config_status
7013 db_nsr_update
["detailed-status"] = ""
7015 if "VCA" in scale_process
:
7016 db_nsr_update
["config-status"] = "failed"
7017 if "RO" in scale_process
:
7018 db_nsr_update
["operational-status"] = "failed"
7021 ] = "FAILED scaling nslcmop={} {}: {}".format(
7022 nslcmop_id
, step
, exc
7025 error_description_nslcmop
= None
7026 nslcmop_operation_state
= "COMPLETED"
7027 db_nslcmop_update
["detailed-status"] = "Done"
7029 self
._write
_op
_status
(
7032 error_message
=error_description_nslcmop
,
7033 operation_state
=nslcmop_operation_state
,
7034 other_update
=db_nslcmop_update
,
7037 self
._write
_ns
_status
(
7040 current_operation
="IDLE",
7041 current_operation_id
=None,
7042 other_update
=db_nsr_update
,
7045 if nslcmop_operation_state
:
7049 "nslcmop_id": nslcmop_id
,
7050 "operationState": nslcmop_operation_state
,
7052 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7053 except Exception as e
:
7055 logging_text
+ "kafka_write notification Exception {}".format(e
)
7057 self
.logger
.debug(logging_text
+ "Exit")
7058 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7060 async def _scale_kdu(
7061 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7063 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7064 for kdu_name
in _scaling_info
:
7065 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7066 deployed_kdu
, index
= get_deployed_kdu(
7067 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7069 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7070 kdu_instance
= deployed_kdu
["kdu-instance"]
7071 kdu_model
= deployed_kdu
.get("kdu-model")
7072 scale
= int(kdu_scaling_info
["scale"])
7073 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7076 "collection": "nsrs",
7077 "filter": {"_id": nsr_id
},
7078 "path": "_admin.deployed.K8s.{}".format(index
),
7081 step
= "scaling application {}".format(
7082 kdu_scaling_info
["resource-name"]
7084 self
.logger
.debug(logging_text
+ step
)
7086 if kdu_scaling_info
["type"] == "delete":
7087 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7090 and kdu_config
.get("terminate-config-primitive")
7091 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7093 terminate_config_primitive_list
= kdu_config
.get(
7094 "terminate-config-primitive"
7096 terminate_config_primitive_list
.sort(
7097 key
=lambda val
: int(val
["seq"])
7101 terminate_config_primitive
7102 ) in terminate_config_primitive_list
:
7103 primitive_params_
= self
._map
_primitive
_params
(
7104 terminate_config_primitive
, {}, {}
7106 step
= "execute terminate config primitive"
7107 self
.logger
.debug(logging_text
+ step
)
7108 await asyncio
.wait_for(
7109 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7110 cluster_uuid
=cluster_uuid
,
7111 kdu_instance
=kdu_instance
,
7112 primitive_name
=terminate_config_primitive
["name"],
7113 params
=primitive_params_
,
7120 await asyncio
.wait_for(
7121 self
.k8scluster_map
[k8s_cluster_type
].scale(
7124 kdu_scaling_info
["resource-name"],
7126 cluster_uuid
=cluster_uuid
,
7127 kdu_model
=kdu_model
,
7131 timeout
=self
.timeout_vca_on_error
,
7134 if kdu_scaling_info
["type"] == "create":
7135 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7138 and kdu_config
.get("initial-config-primitive")
7139 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7141 initial_config_primitive_list
= kdu_config
.get(
7142 "initial-config-primitive"
7144 initial_config_primitive_list
.sort(
7145 key
=lambda val
: int(val
["seq"])
7148 for initial_config_primitive
in initial_config_primitive_list
:
7149 primitive_params_
= self
._map
_primitive
_params
(
7150 initial_config_primitive
, {}, {}
7152 step
= "execute initial config primitive"
7153 self
.logger
.debug(logging_text
+ step
)
7154 await asyncio
.wait_for(
7155 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7156 cluster_uuid
=cluster_uuid
,
7157 kdu_instance
=kdu_instance
,
7158 primitive_name
=initial_config_primitive
["name"],
7159 params
=primitive_params_
,
7166 async def _scale_ng_ro(
7167 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7169 nsr_id
= db_nslcmop
["nsInstanceId"]
7170 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7173 # read from db: vnfd's for every vnf
7176 # for each vnf in ns, read vnfd
7177 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7178 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7179 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7180 # if we haven't this vnfd, read it from db
7181 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7183 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7184 db_vnfds
.append(vnfd
)
7185 n2vc_key
= self
.n2vc
.get_public_key()
7186 n2vc_key_list
= [n2vc_key
]
7189 vdu_scaling_info
.get("vdu-create"),
7190 vdu_scaling_info
.get("vdu-delete"),
7193 # db_vnfr has been updated, update db_vnfrs to use it
7194 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7195 await self
._instantiate
_ng
_ro
(
7205 start_deploy
=time(),
7206 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7208 if vdu_scaling_info
.get("vdu-delete"):
7210 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7213 async def extract_prometheus_scrape_jobs(
7214 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7216 # look if exist a file called 'prometheus*.j2' and
7217 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7221 for f
in artifact_content
7222 if f
.startswith("prometheus") and f
.endswith(".j2")
7228 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7232 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7233 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7235 vnfr_id
= vnfr_id
.replace("-", "")
7237 "JOB_NAME": vnfr_id
,
7238 "TARGET_IP": target_ip
,
7239 "EXPORTER_POD_IP": host_name
,
7240 "EXPORTER_POD_PORT": host_port
,
7242 job_list
= parse_job(job_data
, variables
)
7243 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7244 for job
in job_list
:
7246 not isinstance(job
.get("job_name"), str)
7247 or vnfr_id
not in job
["job_name"]
7249 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7250 job
["nsr_id"] = nsr_id
7251 job
["vnfr_id"] = vnfr_id
7254 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7255 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7256 self
.logger
.info(logging_text
+ "Enter")
7257 stage
= ["Preparing the environment", ""]
7258 # database nsrs record
7262 # in case of error, indicates what part of scale was failed to put nsr at error status
7263 start_deploy
= time()
7265 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7266 vim_account_id
= db_vnfr
.get("vim-account-id")
7267 vim_info_key
= "vim:" + vim_account_id
7268 vdur
= find_in_list(
7269 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7272 vdu_vim_name
= vdur
["name"]
7273 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7274 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7275 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7276 # wait for any previous tasks in process
7277 stage
[1] = "Waiting for previous operations to terminate"
7278 self
.logger
.info(stage
[1])
7279 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7281 stage
[1] = "Reading from database."
7282 self
.logger
.info(stage
[1])
7283 self
._write
_ns
_status
(
7286 current_operation
=operation_type
.upper(),
7287 current_operation_id
=nslcmop_id
7289 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7292 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7293 db_nsr_update
["operational-status"] = operation_type
7294 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7298 "vim_vm_id": vim_vm_id
,
7300 "vdu_index": additional_param
["count-index"],
7301 "vdu_id": vdur
["id"],
7302 "target_vim": target_vim
,
7303 "vim_account_id": vim_account_id
7306 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7307 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7308 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7309 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7310 self
.logger
.info("response from RO: {}".format(result_dict
))
7311 action_id
= result_dict
["action_id"]
7312 await self
._wait
_ng
_ro
(
7313 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7315 return "COMPLETED", "Done"
7316 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7317 self
.logger
.error("Exit Exception {}".format(e
))
7319 except asyncio
.CancelledError
:
7320 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7321 exc
= "Operation was cancelled"
7322 except Exception as e
:
7323 exc
= traceback
.format_exc()
7324 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7325 return "FAILED", "Error in operate VNF {}".format(exc
)
7327 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7329 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7331 :param: vim_account_id: VIM Account ID
7333 :return: (cloud_name, cloud_credential)
7335 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7336 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7338 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7340 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7342 :param: vim_account_id: VIM Account ID
7344 :return: (cloud_name, cloud_credential)
7346 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7347 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7349 async def migrate(self
, nsr_id
, nslcmop_id
):
7351 Migrate VNFs and VDUs instances in a NS
7353 :param: nsr_id: NS Instance ID
7354 :param: nslcmop_id: nslcmop ID of migrate
7357 # Try to lock HA task here
7358 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7359 if not task_is_locked_by_me
:
7361 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7362 self
.logger
.debug(logging_text
+ "Enter")
7363 # get all needed from database
7365 db_nslcmop_update
= {}
7366 nslcmop_operation_state
= None
7370 # in case of error, indicates what part of scale was failed to put nsr at error status
7371 start_deploy
= time()
7374 # wait for any previous tasks in process
7375 step
= "Waiting for previous operations to terminate"
7376 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7378 self
._write
_ns
_status
(
7381 current_operation
="MIGRATING",
7382 current_operation_id
=nslcmop_id
,
7384 step
= "Getting nslcmop from database"
7386 step
+ " after having waited for previous tasks to be completed"
7388 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7389 migrate_params
= db_nslcmop
.get("operationParams")
7392 target
.update(migrate_params
)
7393 desc
= await self
.RO
.migrate(nsr_id
, target
)
7394 self
.logger
.debug("RO return > {}".format(desc
))
7395 action_id
= desc
["action_id"]
7396 await self
._wait
_ng
_ro
(
7397 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7400 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7401 self
.logger
.error("Exit Exception {}".format(e
))
7403 except asyncio
.CancelledError
:
7404 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7405 exc
= "Operation was cancelled"
7406 except Exception as e
:
7407 exc
= traceback
.format_exc()
7408 self
.logger
.critical(
7409 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7412 self
._write
_ns
_status
(
7415 current_operation
="IDLE",
7416 current_operation_id
=None,
7419 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7420 nslcmop_operation_state
= "FAILED"
7422 nslcmop_operation_state
= "COMPLETED"
7423 db_nslcmop_update
["detailed-status"] = "Done"
7424 db_nsr_update
["detailed-status"] = "Done"
7426 self
._write
_op
_status
(
7430 operation_state
=nslcmop_operation_state
,
7431 other_update
=db_nslcmop_update
,
7433 if nslcmop_operation_state
:
7437 "nslcmop_id": nslcmop_id
,
7438 "operationState": nslcmop_operation_state
,
7440 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7441 except Exception as e
:
7443 logging_text
+ "kafka_write notification Exception {}".format(e
)
7445 self
.logger
.debug(logging_text
+ "Exit")
7446 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7449 async def heal(self
, nsr_id
, nslcmop_id
):
7453 :param nsr_id: ns instance to heal
7454 :param nslcmop_id: operation to run
7458 # Try to lock HA task here
7459 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7460 if not task_is_locked_by_me
:
7463 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7464 stage
= ["", "", ""]
7465 tasks_dict_info
= {}
7466 # ^ stage, step, VIM progress
7467 self
.logger
.debug(logging_text
+ "Enter")
7468 # get all needed from database
7470 db_nslcmop_update
= {}
7472 db_vnfrs
= {} # vnf's info indexed by _id
7474 old_operational_status
= ""
7475 old_config_status
= ""
7478 # wait for any previous tasks in process
7479 step
= "Waiting for previous operations to terminate"
7480 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7481 self
._write
_ns
_status
(
7484 current_operation
="HEALING",
7485 current_operation_id
=nslcmop_id
,
7488 step
= "Getting nslcmop from database"
7490 step
+ " after having waited for previous tasks to be completed"
7492 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7494 step
= "Getting nsr from database"
7495 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7496 old_operational_status
= db_nsr
["operational-status"]
7497 old_config_status
= db_nsr
["config-status"]
7500 "_admin.deployed.RO.operational-status": "healing",
7502 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7504 step
= "Sending heal order to VIM"
7505 task_ro
= asyncio
.ensure_future(
7507 logging_text
=logging_text
,
7509 db_nslcmop
=db_nslcmop
,
7513 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7514 tasks_dict_info
[task_ro
] = "Healing at VIM"
7518 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7519 self
.logger
.debug(logging_text
+ stage
[1])
7520 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7521 self
.fs
.sync(db_nsr
["nsd-id"])
7523 # read from db: vnfr's of this ns
7524 step
= "Getting vnfrs from db"
7525 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7526 for vnfr
in db_vnfrs_list
:
7527 db_vnfrs
[vnfr
["_id"]] = vnfr
7528 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7530 # Check for each target VNF
7531 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7532 for target_vnf
in target_list
:
7533 # Find this VNF in the list from DB
7534 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7536 db_vnfr
= db_vnfrs
[vnfr_id
]
7537 vnfd_id
= db_vnfr
.get("vnfd-id")
7538 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7539 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7540 base_folder
= vnfd
["_admin"]["storage"]
7545 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7546 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7548 # Check each target VDU and deploy N2VC
7549 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7550 deploy_params_vdu
= target_vdu
7551 # Set run-day1 vnf level value if not vdu level value exists
7552 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7553 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7554 vdu_name
= target_vdu
.get("vdu-id", None)
7555 # TODO: Get vdu_id from vdud.
7557 # For multi instance VDU count-index is mandatory
7558 # For single session VDU count-indes is 0
7559 vdu_index
= target_vdu
.get("count-index",0)
7561 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7562 stage
[1] = "Deploying Execution Environments."
7563 self
.logger
.debug(logging_text
+ stage
[1])
7565 # VNF Level charm. Normal case when proxy charms.
7566 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7567 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7568 if descriptor_config
:
7569 # Continue if healed machine is management machine
7570 vnf_ip_address
= db_vnfr
.get("ip-address")
7571 target_instance
= None
7572 for instance
in db_vnfr
.get("vdur", None):
7573 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7574 target_instance
= instance
7576 if vnf_ip_address
== target_instance
.get("ip-address"):
7578 logging_text
=logging_text
7579 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7580 member_vnf_index
, vdu_name
, vdu_index
7584 nslcmop_id
=nslcmop_id
,
7590 member_vnf_index
=member_vnf_index
,
7593 deploy_params
=deploy_params_vdu
,
7594 descriptor_config
=descriptor_config
,
7595 base_folder
=base_folder
,
7596 task_instantiation_info
=tasks_dict_info
,
7600 # VDU Level charm. Normal case with native charms.
7601 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7602 if descriptor_config
:
7604 logging_text
=logging_text
7605 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7606 member_vnf_index
, vdu_name
, vdu_index
7610 nslcmop_id
=nslcmop_id
,
7616 member_vnf_index
=member_vnf_index
,
7617 vdu_index
=vdu_index
,
7619 deploy_params
=deploy_params_vdu
,
7620 descriptor_config
=descriptor_config
,
7621 base_folder
=base_folder
,
7622 task_instantiation_info
=tasks_dict_info
,
7627 ROclient
.ROClientException
,
7632 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7634 except asyncio
.CancelledError
:
7636 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7638 exc
= "Operation was cancelled"
7639 except Exception as e
:
7640 exc
= traceback
.format_exc()
7641 self
.logger
.critical(
7642 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7647 stage
[1] = "Waiting for healing pending tasks."
7648 self
.logger
.debug(logging_text
+ stage
[1])
7649 exc
= await self
._wait
_for
_tasks
(
7652 self
.timeout_ns_deploy
,
7660 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7661 nslcmop_operation_state
= "FAILED"
7663 db_nsr_update
["operational-status"] = old_operational_status
7664 db_nsr_update
["config-status"] = old_config_status
7667 ] = "FAILED healing nslcmop={} {}: {}".format(
7668 nslcmop_id
, step
, exc
7670 for task
, task_name
in tasks_dict_info
.items():
7671 if not task
.done() or task
.cancelled() or task
.exception():
7672 if task_name
.startswith(self
.task_name_deploy_vca
):
7673 # A N2VC task is pending
7674 db_nsr_update
["config-status"] = "failed"
7676 # RO task is pending
7677 db_nsr_update
["operational-status"] = "failed"
7679 error_description_nslcmop
= None
7680 nslcmop_operation_state
= "COMPLETED"
7681 db_nslcmop_update
["detailed-status"] = "Done"
7682 db_nsr_update
["detailed-status"] = "Done"
7683 db_nsr_update
["operational-status"] = "running"
7684 db_nsr_update
["config-status"] = "configured"
7686 self
._write
_op
_status
(
7689 error_message
=error_description_nslcmop
,
7690 operation_state
=nslcmop_operation_state
,
7691 other_update
=db_nslcmop_update
,
7694 self
._write
_ns
_status
(
7697 current_operation
="IDLE",
7698 current_operation_id
=None,
7699 other_update
=db_nsr_update
,
7702 if nslcmop_operation_state
:
7706 "nslcmop_id": nslcmop_id
,
7707 "operationState": nslcmop_operation_state
,
7709 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7710 except Exception as e
:
7712 logging_text
+ "kafka_write notification Exception {}".format(e
)
7714 self
.logger
.debug(logging_text
+ "Exit")
7715 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7726 :param logging_text: preffix text to use at logging
7727 :param nsr_id: nsr identity
7728 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7729 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7730 :return: None or exception
7732 def get_vim_account(vim_account_id
):
7734 if vim_account_id
in db_vims
:
7735 return db_vims
[vim_account_id
]
7736 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7737 db_vims
[vim_account_id
] = db_vim
7742 ns_params
= db_nslcmop
.get("operationParams")
7743 if ns_params
and ns_params
.get("timeout_ns_heal"):
7744 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7746 timeout_ns_heal
= self
.timeout
.get(
7747 "ns_heal", self
.timeout_ns_heal
7752 nslcmop_id
= db_nslcmop
["_id"]
7754 "action_id": nslcmop_id
,
7756 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7757 target
.update(db_nslcmop
.get("operationParams", {}))
7759 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7760 desc
= await self
.RO
.recreate(nsr_id
, target
)
7761 self
.logger
.debug("RO return > {}".format(desc
))
7762 action_id
= desc
["action_id"]
7763 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7764 await self
._wait
_ng
_ro
(
7765 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7771 "_admin.deployed.RO.operational-status": "running",
7772 "detailed-status": " ".join(stage
),
7774 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7775 self
._write
_op
_status
(nslcmop_id
, stage
)
7777 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7780 except Exception as e
:
7781 stage
[2] = "ERROR healing at VIM"
7782 #self.set_vnfr_at_error(db_vnfrs, str(e))
7784 "Error healing at VIM {}".format(e
),
7785 exc_info
=not isinstance(
7788 ROclient
.ROClientException
,
7814 task_instantiation_info
,
7817 # launch instantiate_N2VC in a asyncio task and register task object
7818 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7819 # if not found, create one entry and update database
7820 # fill db_nsr._admin.deployed.VCA.<index>
7823 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7825 if "execution-environment-list" in descriptor_config
:
7826 ee_list
= descriptor_config
.get("execution-environment-list", [])
7827 elif "juju" in descriptor_config
:
7828 ee_list
= [descriptor_config
] # ns charms
7829 else: # other types as script are not supported
7832 for ee_item
in ee_list
:
7835 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7836 ee_item
.get("juju"), ee_item
.get("helm-chart")
7839 ee_descriptor_id
= ee_item
.get("id")
7840 if ee_item
.get("juju"):
7841 vca_name
= ee_item
["juju"].get("charm")
7844 if ee_item
["juju"].get("charm") is not None
7847 if ee_item
["juju"].get("cloud") == "k8s":
7848 vca_type
= "k8s_proxy_charm"
7849 elif ee_item
["juju"].get("proxy") is False:
7850 vca_type
= "native_charm"
7851 elif ee_item
.get("helm-chart"):
7852 vca_name
= ee_item
["helm-chart"]
7853 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7856 vca_type
= "helm-v3"
7859 logging_text
+ "skipping non juju neither charm configuration"
7864 for vca_index
, vca_deployed
in enumerate(
7865 db_nsr
["_admin"]["deployed"]["VCA"]
7867 if not vca_deployed
:
7870 vca_deployed
.get("member-vnf-index") == member_vnf_index
7871 and vca_deployed
.get("vdu_id") == vdu_id
7872 and vca_deployed
.get("kdu_name") == kdu_name
7873 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7874 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7878 # not found, create one.
7880 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7883 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7885 target
+= "/kdu/{}".format(kdu_name
)
7887 "target_element": target
,
7888 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7889 "member-vnf-index": member_vnf_index
,
7891 "kdu_name": kdu_name
,
7892 "vdu_count_index": vdu_index
,
7893 "operational-status": "init", # TODO revise
7894 "detailed-status": "", # TODO revise
7895 "step": "initial-deploy", # TODO revise
7897 "vdu_name": vdu_name
,
7899 "ee_descriptor_id": ee_descriptor_id
,
7903 # create VCA and configurationStatus in db
7905 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7906 "configurationStatus.{}".format(vca_index
): dict(),
7908 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7910 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7912 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7913 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7914 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7917 task_n2vc
= asyncio
.ensure_future(
7919 logging_text
=logging_text
,
7920 vca_index
=vca_index
,
7926 vdu_index
=vdu_index
,
7927 deploy_params
=deploy_params
,
7928 config_descriptor
=descriptor_config
,
7929 base_folder
=base_folder
,
7930 nslcmop_id
=nslcmop_id
,
7934 ee_config_descriptor
=ee_item
,
7937 self
.lcm_tasks
.register(
7941 "instantiate_N2VC-{}".format(vca_index
),
7944 task_instantiation_info
[
7946 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7947 member_vnf_index
or "", vdu_id
or ""
7950 async def heal_N2VC(
7967 ee_config_descriptor
,
7969 nsr_id
= db_nsr
["_id"]
7970 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7971 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7972 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7973 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7975 "collection": "nsrs",
7976 "filter": {"_id": nsr_id
},
7977 "path": db_update_entry
,
7983 element_under_configuration
= nsr_id
7987 vnfr_id
= db_vnfr
["_id"]
7988 osm_config
["osm"]["vnf_id"] = vnfr_id
7990 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
7992 if vca_type
== "native_charm":
7995 index_number
= vdu_index
or 0
7998 element_type
= "VNF"
7999 element_under_configuration
= vnfr_id
8000 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8002 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8003 element_type
= "VDU"
8004 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8005 osm_config
["osm"]["vdu_id"] = vdu_id
8007 namespace
+= ".{}".format(kdu_name
)
8008 element_type
= "KDU"
8009 element_under_configuration
= kdu_name
8010 osm_config
["osm"]["kdu_name"] = kdu_name
8013 if base_folder
["pkg-dir"]:
8014 artifact_path
= "{}/{}/{}/{}".format(
8015 base_folder
["folder"],
8016 base_folder
["pkg-dir"],
8019 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8024 artifact_path
= "{}/Scripts/{}/{}/".format(
8025 base_folder
["folder"],
8028 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8033 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8035 # get initial_config_primitive_list that applies to this element
8036 initial_config_primitive_list
= config_descriptor
.get(
8037 "initial-config-primitive"
8041 "Initial config primitive list > {}".format(
8042 initial_config_primitive_list
8046 # add config if not present for NS charm
8047 ee_descriptor_id
= ee_config_descriptor
.get("id")
8048 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8049 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8050 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8054 "Initial config primitive list #2 > {}".format(
8055 initial_config_primitive_list
8058 # n2vc_redesign STEP 3.1
8059 # find old ee_id if exists
8060 ee_id
= vca_deployed
.get("ee_id")
8062 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8063 # create or register execution environment in VCA. Only for native charms when healing
8064 if vca_type
== "native_charm":
8065 step
= "Waiting to VM being up and getting IP address"
8066 self
.logger
.debug(logging_text
+ step
)
8067 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8076 credentials
= {"hostname": rw_mgmt_ip
}
8078 username
= deep_get(
8079 config_descriptor
, ("config-access", "ssh-access", "default-user")
8081 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8082 # merged. Meanwhile let's get username from initial-config-primitive
8083 if not username
and initial_config_primitive_list
:
8084 for config_primitive
in initial_config_primitive_list
:
8085 for param
in config_primitive
.get("parameter", ()):
8086 if param
["name"] == "ssh-username":
8087 username
= param
["value"]
8091 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8092 "'config-access.ssh-access.default-user'"
8094 credentials
["username"] = username
8096 # n2vc_redesign STEP 3.2
8097 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8098 self
._write
_configuration
_status
(
8100 vca_index
=vca_index
,
8101 status
="REGISTERING",
8102 element_under_configuration
=element_under_configuration
,
8103 element_type
=element_type
,
8106 step
= "register execution environment {}".format(credentials
)
8107 self
.logger
.debug(logging_text
+ step
)
8108 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8109 credentials
=credentials
,
8110 namespace
=namespace
,
8115 # update ee_id en db
8117 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8119 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8121 # for compatibility with MON/POL modules, the need model and application name at database
8122 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8123 # Not sure if this need to be done when healing
8125 ee_id_parts = ee_id.split(".")
8126 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8127 if len(ee_id_parts) >= 2:
8128 model_name = ee_id_parts[0]
8129 application_name = ee_id_parts[1]
8130 db_nsr_update[db_update_entry + "model"] = model_name
8131 db_nsr_update[db_update_entry + "application"] = application_name
8134 # n2vc_redesign STEP 3.3
8135 # Install configuration software. Only for native charms.
8136 step
= "Install configuration Software"
8138 self
._write
_configuration
_status
(
8140 vca_index
=vca_index
,
8141 status
="INSTALLING SW",
8142 element_under_configuration
=element_under_configuration
,
8143 element_type
=element_type
,
8144 #other_update=db_nsr_update,
8148 # TODO check if already done
8149 self
.logger
.debug(logging_text
+ step
)
8151 if vca_type
== "native_charm":
8152 config_primitive
= next(
8153 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8156 if config_primitive
:
8157 config
= self
._map
_primitive
_params
(
8158 config_primitive
, {}, deploy_params
8160 await self
.vca_map
[vca_type
].install_configuration_sw(
8162 artifact_path
=artifact_path
,
8170 # write in db flag of configuration_sw already installed
8172 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8175 # Not sure if this need to be done when healing
8177 # add relations for this VCA (wait for other peers related with this VCA)
8178 await self._add_vca_relations(
8179 logging_text=logging_text,
8182 vca_index=vca_index,
8186 # if SSH access is required, then get execution environment SSH public
8187 # if native charm we have waited already to VM be UP
8188 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8191 # self.logger.debug("get ssh key block")
8193 config_descriptor
, ("config-access", "ssh-access", "required")
8195 # self.logger.debug("ssh key needed")
8196 # Needed to inject a ssh key
8199 ("config-access", "ssh-access", "default-user"),
8201 step
= "Install configuration Software, getting public ssh key"
8202 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8203 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8206 step
= "Insert public key into VM user={} ssh_key={}".format(
8210 # self.logger.debug("no need to get ssh key")
8211 step
= "Waiting to VM being up and getting IP address"
8212 self
.logger
.debug(logging_text
+ step
)
8214 # n2vc_redesign STEP 5.1
8215 # wait for RO (ip-address) Insert pub_key into VM
8216 # IMPORTANT: We need do wait for RO to complete healing operation.
8217 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8220 rw_mgmt_ip
= await self
.wait_kdu_up(
8221 logging_text
, nsr_id
, vnfr_id
, kdu_name
8224 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8234 rw_mgmt_ip
= None # This is for a NS configuration
8236 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8238 # store rw_mgmt_ip in deploy params for later replacement
8239 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8242 # get run-day1 operation parameter
8243 runDay1
= deploy_params
.get("run-day1",False)
8244 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8246 # n2vc_redesign STEP 6 Execute initial config primitive
8247 step
= "execute initial config primitive"
8249 # wait for dependent primitives execution (NS -> VNF -> VDU)
8250 if initial_config_primitive_list
:
8251 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8253 # stage, in function of element type: vdu, kdu, vnf or ns
8254 my_vca
= vca_deployed_list
[vca_index
]
8255 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8257 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8258 elif my_vca
.get("member-vnf-index"):
8260 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8263 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8265 self
._write
_configuration
_status
(
8266 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8269 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8271 check_if_terminated_needed
= True
8272 for initial_config_primitive
in initial_config_primitive_list
:
8273 # adding information on the vca_deployed if it is a NS execution environment
8274 if not vca_deployed
["member-vnf-index"]:
8275 deploy_params
["ns_config_info"] = json
.dumps(
8276 self
._get
_ns
_config
_info
(nsr_id
)
8278 # TODO check if already done
8279 primitive_params_
= self
._map
_primitive
_params
(
8280 initial_config_primitive
, {}, deploy_params
8283 step
= "execute primitive '{}' params '{}'".format(
8284 initial_config_primitive
["name"], primitive_params_
8286 self
.logger
.debug(logging_text
+ step
)
8287 await self
.vca_map
[vca_type
].exec_primitive(
8289 primitive_name
=initial_config_primitive
["name"],
8290 params_dict
=primitive_params_
,
8295 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8296 if check_if_terminated_needed
:
8297 if config_descriptor
.get("terminate-config-primitive"):
8299 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8301 check_if_terminated_needed
= False
8303 # TODO register in database that primitive is done
8305 # STEP 7 Configure metrics
8306 # Not sure if this need to be done when healing
8308 if vca_type == "helm" or vca_type == "helm-v3":
8309 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8311 artifact_path=artifact_path,
8312 ee_config_descriptor=ee_config_descriptor,
8315 target_ip=rw_mgmt_ip,
8321 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8324 for job in prometheus_jobs:
8327 {"job_name": job["job_name"]},
8330 fail_on_empty=False,
8334 step
= "instantiated at VCA"
8335 self
.logger
.debug(logging_text
+ step
)
8337 self
._write
_configuration
_status
(
8338 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8341 except Exception as e
: # TODO not use Exception but N2VC exception
8342 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8344 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8347 "Exception while {} : {}".format(step
, e
), exc_info
=True
8349 self
._write
_configuration
_status
(
8350 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8352 raise LcmException("{} {}".format(step
, e
)) from e
8354 async def _wait_heal_ro(
8360 while time() <= start_time
+ timeout
:
8361 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8362 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8363 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8364 if operational_status_ro
!= "healing":
8366 await asyncio
.sleep(15, loop
=self
.loop
)
8367 else: # timeout_ns_deploy
8368 raise NgRoException("Timeout waiting ns to deploy")
8370 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8372 Vertical Scale the VDUs in a NS
8374 :param: nsr_id: NS Instance ID
8375 :param: nslcmop_id: nslcmop ID of migrate
8378 # Try to lock HA task here
8379 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8380 if not task_is_locked_by_me
:
8382 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8383 self
.logger
.debug(logging_text
+ "Enter")
8384 # get all needed from database
8386 db_nslcmop_update
= {}
8387 nslcmop_operation_state
= None
8391 # in case of error, indicates what part of scale was failed to put nsr at error status
8392 start_deploy
= time()
8395 # wait for any previous tasks in process
8396 step
= "Waiting for previous operations to terminate"
8397 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8399 self
._write
_ns
_status
(
8402 current_operation
="VerticalScale",
8403 current_operation_id
=nslcmop_id
8405 step
= "Getting nslcmop from database"
8406 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8407 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8408 operationParams
= db_nslcmop
.get("operationParams")
8410 target
.update(operationParams
)
8411 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8412 self
.logger
.debug("RO return > {}".format(desc
))
8413 action_id
= desc
["action_id"]
8414 await self
._wait
_ng
_ro
(
8415 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8416 operation
="verticalscale"
8418 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8419 self
.logger
.error("Exit Exception {}".format(e
))
8421 except asyncio
.CancelledError
:
8422 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8423 exc
= "Operation was cancelled"
8424 except Exception as e
:
8425 exc
= traceback
.format_exc()
8426 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8428 self
._write
_ns
_status
(
8431 current_operation
="IDLE",
8432 current_operation_id
=None,
8437 ] = "FAILED {}: {}".format(step
, exc
)
8438 nslcmop_operation_state
= "FAILED"
8440 nslcmop_operation_state
= "COMPLETED"
8441 db_nslcmop_update
["detailed-status"] = "Done"
8442 db_nsr_update
["detailed-status"] = "Done"
8444 self
._write
_op
_status
(
8448 operation_state
=nslcmop_operation_state
,
8449 other_update
=db_nslcmop_update
,
8451 if nslcmop_operation_state
:
8455 "nslcmop_id": nslcmop_id
,
8456 "operationState": nslcmop_operation_state
,
8458 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8459 except Exception as e
:
8461 logging_text
+ "kafka_write notification Exception {}".format(e
)
8463 self
.logger
.debug(logging_text
+ "Exit")
8464 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")