1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 SUBOPERATION_STATUS_NOT_FOUND
= -1
138 SUBOPERATION_STATUS_NEW
= -2
139 SUBOPERATION_STATUS_SKIP
= -3
140 task_name_deploy_vca
= "Deploying VCA"
142 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
144 Init, Connect to database, filesystem storage, and messaging
145 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
148 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
150 self
.db
= Database().instance
.db
151 self
.fs
= Filesystem().instance
.fs
153 self
.lcm_tasks
= lcm_tasks
154 self
.timeout
= config
["timeout"]
155 self
.ro_config
= config
["ro_config"]
156 self
.ng_ro
= config
["ro_config"].get("ng")
157 self
.vca_config
= config
["VCA"].copy()
159 # create N2VC connector
160 self
.n2vc
= N2VCJujuConnector(
163 on_update_db
=self
._on
_update
_n
2vc
_db
,
168 self
.conn_helm_ee
= LCMHelmConn(
171 vca_config
=self
.vca_config
,
172 on_update_db
=self
._on
_update
_n
2vc
_db
,
175 self
.k8sclusterhelm2
= K8sHelmConnector(
176 kubectl_command
=self
.vca_config
.get("kubectlpath"),
177 helm_command
=self
.vca_config
.get("helmpath"),
184 self
.k8sclusterhelm3
= K8sHelm3Connector(
185 kubectl_command
=self
.vca_config
.get("kubectlpath"),
186 helm_command
=self
.vca_config
.get("helm3path"),
193 self
.k8sclusterjuju
= K8sJujuConnector(
194 kubectl_command
=self
.vca_config
.get("kubectlpath"),
195 juju_command
=self
.vca_config
.get("jujupath"),
198 on_update_db
=self
._on
_update
_k
8s
_db
,
203 self
.k8scluster_map
= {
204 "helm-chart": self
.k8sclusterhelm2
,
205 "helm-chart-v3": self
.k8sclusterhelm3
,
206 "chart": self
.k8sclusterhelm3
,
207 "juju-bundle": self
.k8sclusterjuju
,
208 "juju": self
.k8sclusterjuju
,
212 "lxc_proxy_charm": self
.n2vc
,
213 "native_charm": self
.n2vc
,
214 "k8s_proxy_charm": self
.n2vc
,
215 "helm": self
.conn_helm_ee
,
216 "helm-v3": self
.conn_helm_ee
,
220 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
222 self
.op_status_map
= {
223 "instantiation": self
.RO
.status
,
224 "termination": self
.RO
.status
,
225 "migrate": self
.RO
.status
,
226 "healing": self
.RO
.recreate_status
,
230 def increment_ip_mac(ip_mac
, vm_index
=1):
231 if not isinstance(ip_mac
, str):
234 # try with ipv4 look for last dot
235 i
= ip_mac
.rfind(".")
238 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
239 # try with ipv6 or mac look for last colon. Operate in hex
240 i
= ip_mac
.rfind(":")
243 # format in hex, len can be 2 for mac or 4 for ipv6
244 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
245 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
251 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
253 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
256 # TODO filter RO descriptor fields...
260 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
261 db_dict
["deploymentStatus"] = ro_descriptor
262 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
264 except Exception as e
:
266 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
269 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
271 # remove last dot from path (if exists)
272 if path
.endswith("."):
275 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
276 # .format(table, filter, path, updated_data))
279 nsr_id
= filter.get("_id")
281 # read ns record from database
282 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
283 current_ns_status
= nsr
.get("nsState")
285 # get vca status for NS
286 status_dict
= await self
.n2vc
.get_status(
287 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
292 db_dict
["vcaStatus"] = status_dict
293 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
295 # update configurationStatus for this VCA
297 vca_index
= int(path
[path
.rfind(".") + 1 :])
300 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
302 vca_status
= vca_list
[vca_index
].get("status")
304 configuration_status_list
= nsr
.get("configurationStatus")
305 config_status
= configuration_status_list
[vca_index
].get("status")
307 if config_status
== "BROKEN" and vca_status
!= "failed":
308 db_dict
["configurationStatus"][vca_index
] = "READY"
309 elif config_status
!= "BROKEN" and vca_status
== "failed":
310 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
311 except Exception as e
:
312 # not update configurationStatus
313 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
315 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
316 # if nsState = 'DEGRADED' check if all is OK
318 if current_ns_status
in ("READY", "DEGRADED"):
319 error_description
= ""
321 if status_dict
.get("machines"):
322 for machine_id
in status_dict
.get("machines"):
323 machine
= status_dict
.get("machines").get(machine_id
)
324 # check machine agent-status
325 if machine
.get("agent-status"):
326 s
= machine
.get("agent-status").get("status")
329 error_description
+= (
330 "machine {} agent-status={} ; ".format(
334 # check machine instance status
335 if machine
.get("instance-status"):
336 s
= machine
.get("instance-status").get("status")
339 error_description
+= (
340 "machine {} instance-status={} ; ".format(
345 if status_dict
.get("applications"):
346 for app_id
in status_dict
.get("applications"):
347 app
= status_dict
.get("applications").get(app_id
)
348 # check application status
349 if app
.get("status"):
350 s
= app
.get("status").get("status")
353 error_description
+= (
354 "application {} status={} ; ".format(app_id
, s
)
357 if error_description
:
358 db_dict
["errorDescription"] = error_description
359 if current_ns_status
== "READY" and is_degraded
:
360 db_dict
["nsState"] = "DEGRADED"
361 if current_ns_status
== "DEGRADED" and not is_degraded
:
362 db_dict
["nsState"] = "READY"
365 self
.update_db_2("nsrs", nsr_id
, db_dict
)
367 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
369 except Exception as e
:
370 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
372 async def _on_update_k8s_db(
373 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
376 Updating vca status in NSR record
377 :param cluster_uuid: UUID of a k8s cluster
378 :param kdu_instance: The unique name of the KDU instance
379 :param filter: To get nsr_id
380 :cluster_type: The cluster type (juju, k8s)
384 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
385 # .format(cluster_uuid, kdu_instance, filter))
387 nsr_id
= filter.get("_id")
389 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
390 cluster_uuid
=cluster_uuid
,
391 kdu_instance
=kdu_instance
,
393 complete_status
=True,
399 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
401 if cluster_type
in ("juju-bundle", "juju"):
402 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
403 # status in a similar way between Juju Bundles and Helm Charts on this side
404 await self
.k8sclusterjuju
.update_vca_status(
405 db_dict
["vcaStatus"],
411 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
415 self
.update_db_2("nsrs", nsr_id
, db_dict
)
416 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
418 except Exception as e
:
419 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
422 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
424 env
= Environment(undefined
=StrictUndefined
)
425 template
= env
.from_string(cloud_init_text
)
426 return template
.render(additional_params
or {})
427 except UndefinedError
as e
:
429 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
430 "file, must be provided in the instantiation parameters inside the "
431 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
433 except (TemplateError
, TemplateNotFound
) as e
:
435 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
440 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
441 cloud_init_content
= cloud_init_file
= None
443 if vdu
.get("cloud-init-file"):
444 base_folder
= vnfd
["_admin"]["storage"]
445 if base_folder
["pkg-dir"]:
446 cloud_init_file
= "{}/{}/cloud_init/{}".format(
447 base_folder
["folder"],
448 base_folder
["pkg-dir"],
449 vdu
["cloud-init-file"],
452 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
453 base_folder
["folder"],
454 vdu
["cloud-init-file"],
456 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
457 cloud_init_content
= ci_file
.read()
458 elif vdu
.get("cloud-init"):
459 cloud_init_content
= vdu
["cloud-init"]
461 return cloud_init_content
462 except FsException
as e
:
464 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
465 vnfd
["id"], vdu
["id"], cloud_init_file
, e
469 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
471 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
473 additional_params
= vdur
.get("additionalParams")
474 return parse_yaml_strings(additional_params
)
476 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
478 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
479 :param vnfd: input vnfd
480 :param new_id: overrides vnf id if provided
481 :param additionalParams: Instantiation params for VNFs provided
482 :param nsrId: Id of the NSR
483 :return: copy of vnfd
485 vnfd_RO
= deepcopy(vnfd
)
486 # remove unused by RO configuration, monitoring, scaling and internal keys
487 vnfd_RO
.pop("_id", None)
488 vnfd_RO
.pop("_admin", None)
489 vnfd_RO
.pop("monitoring-param", None)
490 vnfd_RO
.pop("scaling-group-descriptor", None)
491 vnfd_RO
.pop("kdu", None)
492 vnfd_RO
.pop("k8s-cluster", None)
494 vnfd_RO
["id"] = new_id
496 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
497 for vdu
in get_iterable(vnfd_RO
, "vdu"):
498 vdu
.pop("cloud-init-file", None)
499 vdu
.pop("cloud-init", None)
503 def ip_profile_2_RO(ip_profile
):
504 RO_ip_profile
= deepcopy(ip_profile
)
505 if "dns-server" in RO_ip_profile
:
506 if isinstance(RO_ip_profile
["dns-server"], list):
507 RO_ip_profile
["dns-address"] = []
508 for ds
in RO_ip_profile
.pop("dns-server"):
509 RO_ip_profile
["dns-address"].append(ds
["address"])
511 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
512 if RO_ip_profile
.get("ip-version") == "ipv4":
513 RO_ip_profile
["ip-version"] = "IPv4"
514 if RO_ip_profile
.get("ip-version") == "ipv6":
515 RO_ip_profile
["ip-version"] = "IPv6"
516 if "dhcp-params" in RO_ip_profile
:
517 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
520 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
521 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
522 if db_vim
["_admin"]["operationalState"] != "ENABLED":
524 "VIM={} is not available. operationalState={}".format(
525 vim_account
, db_vim
["_admin"]["operationalState"]
528 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
531 def get_ro_wim_id_for_wim_account(self
, wim_account
):
532 if isinstance(wim_account
, str):
533 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
534 if db_wim
["_admin"]["operationalState"] != "ENABLED":
536 "WIM={} is not available. operationalState={}".format(
537 wim_account
, db_wim
["_admin"]["operationalState"]
540 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
545 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
547 db_vdu_push_list
= []
549 db_update
= {"_admin.modified": time()}
551 for vdu_id
, vdu_count
in vdu_create
.items():
555 for vdur
in reversed(db_vnfr
["vdur"])
556 if vdur
["vdu-id-ref"] == vdu_id
561 # Read the template saved in the db:
563 "No vdur in the database. Using the vdur-template to scale"
565 vdur_template
= db_vnfr
.get("vdur-template")
566 if not vdur_template
:
568 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
572 vdur
= vdur_template
[0]
573 # Delete a template from the database after using it
576 {"_id": db_vnfr
["_id"]},
578 pull
={"vdur-template": {"_id": vdur
["_id"]}},
580 for count
in range(vdu_count
):
581 vdur_copy
= deepcopy(vdur
)
582 vdur_copy
["status"] = "BUILD"
583 vdur_copy
["status-detailed"] = None
584 vdur_copy
["ip-address"] = None
585 vdur_copy
["_id"] = str(uuid4())
586 vdur_copy
["count-index"] += count
+ 1
587 vdur_copy
["id"] = "{}-{}".format(
588 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
590 vdur_copy
.pop("vim_info", None)
591 for iface
in vdur_copy
["interfaces"]:
592 if iface
.get("fixed-ip"):
593 iface
["ip-address"] = self
.increment_ip_mac(
594 iface
["ip-address"], count
+ 1
597 iface
.pop("ip-address", None)
598 if iface
.get("fixed-mac"):
599 iface
["mac-address"] = self
.increment_ip_mac(
600 iface
["mac-address"], count
+ 1
603 iface
.pop("mac-address", None)
607 ) # only first vdu can be managment of vnf
608 db_vdu_push_list
.append(vdur_copy
)
609 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
611 if len(db_vnfr
["vdur"]) == 1:
612 # The scale will move to 0 instances
614 "Scaling to 0 !, creating the template with the last vdur"
616 template_vdur
= [db_vnfr
["vdur"][0]]
617 for vdu_id
, vdu_count
in vdu_delete
.items():
619 indexes_to_delete
= [
621 for iv
in enumerate(db_vnfr
["vdur"])
622 if iv
[1]["vdu-id-ref"] == vdu_id
626 "vdur.{}.status".format(i
): "DELETING"
627 for i
in indexes_to_delete
[-vdu_count
:]
631 # it must be deleted one by one because common.db does not allow otherwise
634 for v
in reversed(db_vnfr
["vdur"])
635 if v
["vdu-id-ref"] == vdu_id
637 for vdu
in vdus_to_delete
[:vdu_count
]:
640 {"_id": db_vnfr
["_id"]},
642 pull
={"vdur": {"_id": vdu
["_id"]}},
646 db_push
["vdur"] = db_vdu_push_list
648 db_push
["vdur-template"] = template_vdur
651 db_vnfr
["vdur-template"] = template_vdur
652 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
653 # modify passed dictionary db_vnfr
654 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
655 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
657 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
659 Updates database nsr with the RO info for the created vld
660 :param ns_update_nsr: dictionary to be filled with the updated info
661 :param db_nsr: content of db_nsr. This is also modified
662 :param nsr_desc_RO: nsr descriptor from RO
663 :return: Nothing, LcmException is raised on errors
666 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
667 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
668 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
670 vld
["vim-id"] = net_RO
.get("vim_net_id")
671 vld
["name"] = net_RO
.get("vim_name")
672 vld
["status"] = net_RO
.get("status")
673 vld
["status-detailed"] = net_RO
.get("error_msg")
674 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
678 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
681 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
683 for db_vnfr
in db_vnfrs
.values():
684 vnfr_update
= {"status": "ERROR"}
685 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
686 if "status" not in vdur
:
687 vdur
["status"] = "ERROR"
688 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
690 vdur
["status-detailed"] = str(error_text
)
692 "vdur.{}.status-detailed".format(vdu_index
)
694 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
695 except DbException
as e
:
696 self
.logger
.error("Cannot update vnf. {}".format(e
))
698 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
700 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
701 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
702 :param nsr_desc_RO: nsr descriptor from RO
703 :return: Nothing, LcmException is raised on errors
705 for vnf_index
, db_vnfr
in db_vnfrs
.items():
706 for vnf_RO
in nsr_desc_RO
["vnfs"]:
707 if vnf_RO
["member_vnf_index"] != vnf_index
:
710 if vnf_RO
.get("ip_address"):
711 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
714 elif not db_vnfr
.get("ip-address"):
715 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
716 raise LcmExceptionNoMgmtIP(
717 "ns member_vnf_index '{}' has no IP address".format(
722 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
723 vdur_RO_count_index
= 0
724 if vdur
.get("pdu-type"):
726 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
727 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
729 if vdur
["count-index"] != vdur_RO_count_index
:
730 vdur_RO_count_index
+= 1
732 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
733 if vdur_RO
.get("ip_address"):
734 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
736 vdur
["ip-address"] = None
737 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
738 vdur
["name"] = vdur_RO
.get("vim_name")
739 vdur
["status"] = vdur_RO
.get("status")
740 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
741 for ifacer
in get_iterable(vdur
, "interfaces"):
742 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
743 if ifacer
["name"] == interface_RO
.get("internal_name"):
744 ifacer
["ip-address"] = interface_RO
.get(
747 ifacer
["mac-address"] = interface_RO
.get(
753 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
754 "from VIM info".format(
755 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
758 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
762 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
764 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
768 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
769 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
770 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
772 vld
["vim-id"] = net_RO
.get("vim_net_id")
773 vld
["name"] = net_RO
.get("vim_name")
774 vld
["status"] = net_RO
.get("status")
775 vld
["status-detailed"] = net_RO
.get("error_msg")
776 vnfr_update
["vld.{}".format(vld_index
)] = vld
780 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
785 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
790 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
795 def _get_ns_config_info(self
, nsr_id
):
797 Generates a mapping between vnf,vdu elements and the N2VC id
798 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
799 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
800 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
801 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
803 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
804 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
806 ns_config_info
= {"osm-config-mapping": mapping
}
807 for vca
in vca_deployed_list
:
808 if not vca
["member-vnf-index"]:
810 if not vca
["vdu_id"]:
811 mapping
[vca
["member-vnf-index"]] = vca
["application"]
815 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
817 ] = vca
["application"]
818 return ns_config_info
820 async def _instantiate_ng_ro(
837 def get_vim_account(vim_account_id
):
839 if vim_account_id
in db_vims
:
840 return db_vims
[vim_account_id
]
841 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
842 db_vims
[vim_account_id
] = db_vim
845 # modify target_vld info with instantiation parameters
846 def parse_vld_instantiation_params(
847 target_vim
, target_vld
, vld_params
, target_sdn
849 if vld_params
.get("ip-profile"):
850 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
853 if vld_params
.get("provider-network"):
854 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
857 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
858 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
861 if vld_params
.get("wimAccountId"):
862 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
863 target_vld
["vim_info"][target_wim
] = {}
864 for param
in ("vim-network-name", "vim-network-id"):
865 if vld_params
.get(param
):
866 if isinstance(vld_params
[param
], dict):
867 for vim
, vim_net
in vld_params
[param
].items():
868 other_target_vim
= "vim:" + vim
870 target_vld
["vim_info"],
871 (other_target_vim
, param
.replace("-", "_")),
874 else: # isinstance str
875 target_vld
["vim_info"][target_vim
][
876 param
.replace("-", "_")
877 ] = vld_params
[param
]
878 if vld_params
.get("common_id"):
879 target_vld
["common_id"] = vld_params
.get("common_id")
881 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
882 def update_ns_vld_target(target
, ns_params
):
883 for vnf_params
in ns_params
.get("vnf", ()):
884 if vnf_params
.get("vimAccountId"):
888 for vnfr
in db_vnfrs
.values()
889 if vnf_params
["member-vnf-index"]
890 == vnfr
["member-vnf-index-ref"]
894 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
895 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
896 target_vld
= find_in_list(
897 get_iterable(vdur
, "interfaces"),
898 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
901 if vnf_params
.get("vimAccountId") not in a_vld
.get(
904 target
["ns"]["vld"][a_index
].get("vim_info").update(
906 "vim:{}".format(vnf_params
["vimAccountId"]): {
907 "vim_network_name": ""
912 nslcmop_id
= db_nslcmop
["_id"]
914 "name": db_nsr
["name"],
917 "image": deepcopy(db_nsr
["image"]),
918 "flavor": deepcopy(db_nsr
["flavor"]),
919 "action_id": nslcmop_id
,
920 "cloud_init_content": {},
922 for image
in target
["image"]:
923 image
["vim_info"] = {}
924 for flavor
in target
["flavor"]:
925 flavor
["vim_info"] = {}
926 if db_nsr
.get("affinity-or-anti-affinity-group"):
927 target
["affinity-or-anti-affinity-group"] = deepcopy(
928 db_nsr
["affinity-or-anti-affinity-group"]
930 for affinity_or_anti_affinity_group
in target
[
931 "affinity-or-anti-affinity-group"
933 affinity_or_anti_affinity_group
["vim_info"] = {}
935 if db_nslcmop
.get("lcmOperationType") != "instantiate":
936 # get parameters of instantiation:
937 db_nslcmop_instantiate
= self
.db
.get_list(
940 "nsInstanceId": db_nslcmop
["nsInstanceId"],
941 "lcmOperationType": "instantiate",
944 ns_params
= db_nslcmop_instantiate
.get("operationParams")
946 ns_params
= db_nslcmop
.get("operationParams")
947 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
948 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
951 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
952 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
956 "mgmt-network": vld
.get("mgmt-network", False),
957 "type": vld
.get("type"),
960 "vim_network_name": vld
.get("vim-network-name"),
961 "vim_account_id": ns_params
["vimAccountId"],
965 # check if this network needs SDN assist
966 if vld
.get("pci-interfaces"):
967 db_vim
= get_vim_account(ns_params
["vimAccountId"])
968 sdnc_id
= db_vim
["config"].get("sdn-controller")
970 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
971 target_sdn
= "sdn:{}".format(sdnc_id
)
972 target_vld
["vim_info"][target_sdn
] = {
974 "target_vim": target_vim
,
976 "type": vld
.get("type"),
979 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
980 for nsd_vnf_profile
in nsd_vnf_profiles
:
981 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
982 if cp
["virtual-link-profile-id"] == vld
["id"]:
984 "member_vnf:{}.{}".format(
985 cp
["constituent-cpd-id"][0][
986 "constituent-base-element-id"
988 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
990 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
992 # check at nsd descriptor, if there is an ip-profile
994 nsd_vlp
= find_in_list(
995 get_virtual_link_profiles(nsd
),
996 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1001 and nsd_vlp
.get("virtual-link-protocol-data")
1002 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1004 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1007 ip_profile_dest_data
= {}
1008 if "ip-version" in ip_profile_source_data
:
1009 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1012 if "cidr" in ip_profile_source_data
:
1013 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1016 if "gateway-ip" in ip_profile_source_data
:
1017 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1020 if "dhcp-enabled" in ip_profile_source_data
:
1021 ip_profile_dest_data
["dhcp-params"] = {
1022 "enabled": ip_profile_source_data
["dhcp-enabled"]
1024 vld_params
["ip-profile"] = ip_profile_dest_data
1026 # update vld_params with instantiation params
1027 vld_instantiation_params
= find_in_list(
1028 get_iterable(ns_params
, "vld"),
1029 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1031 if vld_instantiation_params
:
1032 vld_params
.update(vld_instantiation_params
)
1033 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1034 target
["ns"]["vld"].append(target_vld
)
1035 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1036 update_ns_vld_target(target
, ns_params
)
1038 for vnfr
in db_vnfrs
.values():
1039 vnfd
= find_in_list(
1040 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1042 vnf_params
= find_in_list(
1043 get_iterable(ns_params
, "vnf"),
1044 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1046 target_vnf
= deepcopy(vnfr
)
1047 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1048 for vld
in target_vnf
.get("vld", ()):
1049 # check if connected to a ns.vld, to fill target'
1050 vnf_cp
= find_in_list(
1051 vnfd
.get("int-virtual-link-desc", ()),
1052 lambda cpd
: cpd
.get("id") == vld
["id"],
1055 ns_cp
= "member_vnf:{}.{}".format(
1056 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1058 if cp2target
.get(ns_cp
):
1059 vld
["target"] = cp2target
[ns_cp
]
1062 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1064 # check if this network needs SDN assist
1066 if vld
.get("pci-interfaces"):
1067 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1068 sdnc_id
= db_vim
["config"].get("sdn-controller")
1070 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1071 target_sdn
= "sdn:{}".format(sdnc_id
)
1072 vld
["vim_info"][target_sdn
] = {
1074 "target_vim": target_vim
,
1076 "type": vld
.get("type"),
1079 # check at vnfd descriptor, if there is an ip-profile
1081 vnfd_vlp
= find_in_list(
1082 get_virtual_link_profiles(vnfd
),
1083 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1087 and vnfd_vlp
.get("virtual-link-protocol-data")
1088 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1090 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1093 ip_profile_dest_data
= {}
1094 if "ip-version" in ip_profile_source_data
:
1095 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1098 if "cidr" in ip_profile_source_data
:
1099 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1102 if "gateway-ip" in ip_profile_source_data
:
1103 ip_profile_dest_data
[
1105 ] = ip_profile_source_data
["gateway-ip"]
1106 if "dhcp-enabled" in ip_profile_source_data
:
1107 ip_profile_dest_data
["dhcp-params"] = {
1108 "enabled": ip_profile_source_data
["dhcp-enabled"]
1111 vld_params
["ip-profile"] = ip_profile_dest_data
1112 # update vld_params with instantiation params
1114 vld_instantiation_params
= find_in_list(
1115 get_iterable(vnf_params
, "internal-vld"),
1116 lambda i_vld
: i_vld
["name"] == vld
["id"],
1118 if vld_instantiation_params
:
1119 vld_params
.update(vld_instantiation_params
)
1120 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1123 for vdur
in target_vnf
.get("vdur", ()):
1124 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1125 continue # This vdu must not be created
1126 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1128 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1131 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1132 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1135 and vdu_configuration
.get("config-access")
1136 and vdu_configuration
.get("config-access").get("ssh-access")
1138 vdur
["ssh-keys"] = ssh_keys_all
1139 vdur
["ssh-access-required"] = vdu_configuration
[
1141 ]["ssh-access"]["required"]
1144 and vnf_configuration
.get("config-access")
1145 and vnf_configuration
.get("config-access").get("ssh-access")
1146 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1148 vdur
["ssh-keys"] = ssh_keys_all
1149 vdur
["ssh-access-required"] = vnf_configuration
[
1151 ]["ssh-access"]["required"]
1152 elif ssh_keys_instantiation
and find_in_list(
1153 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1155 vdur
["ssh-keys"] = ssh_keys_instantiation
1157 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1159 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1161 if vdud
.get("cloud-init-file"):
1162 vdur
["cloud-init"] = "{}:file:{}".format(
1163 vnfd
["_id"], vdud
.get("cloud-init-file")
1165 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1166 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1167 base_folder
= vnfd
["_admin"]["storage"]
1168 if base_folder
["pkg-dir"]:
1169 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1170 base_folder
["folder"],
1171 base_folder
["pkg-dir"],
1172 vdud
.get("cloud-init-file"),
1175 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1176 base_folder
["folder"],
1177 vdud
.get("cloud-init-file"),
1179 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1180 target
["cloud_init_content"][
1183 elif vdud
.get("cloud-init"):
1184 vdur
["cloud-init"] = "{}:vdu:{}".format(
1185 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1187 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1188 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1191 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1192 deploy_params_vdu
= self
._format
_additional
_params
(
1193 vdur
.get("additionalParams") or {}
1195 deploy_params_vdu
["OSM"] = get_osm_params(
1196 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1198 vdur
["additionalParams"] = deploy_params_vdu
1201 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1202 if target_vim
not in ns_flavor
["vim_info"]:
1203 ns_flavor
["vim_info"][target_vim
] = {}
1206 # in case alternative images are provided we must check if they should be applied
1207 # for the vim_type, modify the vim_type taking into account
1208 ns_image_id
= int(vdur
["ns-image-id"])
1209 if vdur
.get("alt-image-ids"):
1210 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1211 vim_type
= db_vim
["vim_type"]
1212 for alt_image_id
in vdur
.get("alt-image-ids"):
1213 ns_alt_image
= target
["image"][int(alt_image_id
)]
1214 if vim_type
== ns_alt_image
.get("vim-type"):
1215 # must use alternative image
1217 "use alternative image id: {}".format(alt_image_id
)
1219 ns_image_id
= alt_image_id
1220 vdur
["ns-image-id"] = ns_image_id
1222 ns_image
= target
["image"][int(ns_image_id
)]
1223 if target_vim
not in ns_image
["vim_info"]:
1224 ns_image
["vim_info"][target_vim
] = {}
1227 if vdur
.get("affinity-or-anti-affinity-group-id"):
1228 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1229 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1230 if target_vim
not in ns_ags
["vim_info"]:
1231 ns_ags
["vim_info"][target_vim
] = {}
1233 vdur
["vim_info"] = {target_vim
: {}}
1234 # instantiation parameters
1236 vdu_instantiation_params
= find_in_list(
1237 get_iterable(vnf_params
, "vdu"),
1238 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1240 if vdu_instantiation_params
:
1241 # Parse the vdu_volumes from the instantiation params
1242 vdu_volumes
= get_volumes_from_instantiation_params(
1243 vdu_instantiation_params
, vdud
1245 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1246 vdur_list
.append(vdur
)
1247 target_vnf
["vdur"] = vdur_list
1248 target
["vnf"].append(target_vnf
)
1250 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1251 desc
= await self
.RO
.deploy(nsr_id
, target
)
1252 self
.logger
.debug("RO return > {}".format(desc
))
1253 action_id
= desc
["action_id"]
1254 await self
._wait
_ng
_ro
(
1255 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1256 operation
="instantiation"
1261 "_admin.deployed.RO.operational-status": "running",
1262 "detailed-status": " ".join(stage
),
1264 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1265 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1266 self
._write
_op
_status
(nslcmop_id
, stage
)
1268 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1272 async def _wait_ng_ro(
1282 detailed_status_old
= None
1284 start_time
= start_time
or time()
1285 while time() <= start_time
+ timeout
:
1286 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1287 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1288 if desc_status
["status"] == "FAILED":
1289 raise NgRoException(desc_status
["details"])
1290 elif desc_status
["status"] == "BUILD":
1292 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1293 elif desc_status
["status"] == "DONE":
1295 stage
[2] = "Deployed at VIM"
1298 assert False, "ROclient.check_ns_status returns unknown {}".format(
1299 desc_status
["status"]
1301 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1302 detailed_status_old
= stage
[2]
1303 db_nsr_update
["detailed-status"] = " ".join(stage
)
1304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1305 self
._write
_op
_status
(nslcmop_id
, stage
)
1306 await asyncio
.sleep(15, loop
=self
.loop
)
1307 else: # timeout_ns_deploy
1308 raise NgRoException("Timeout waiting ns to deploy")
1310 async def _terminate_ng_ro(
1311 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1316 start_deploy
= time()
1323 "action_id": nslcmop_id
,
1325 desc
= await self
.RO
.deploy(nsr_id
, target
)
1326 action_id
= desc
["action_id"]
1327 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1328 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1331 + "ns terminate action at RO. action_id={}".format(action_id
)
1335 delete_timeout
= 20 * 60 # 20 minutes
1336 await self
._wait
_ng
_ro
(
1337 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1338 operation
="termination"
1341 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1342 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1344 await self
.RO
.delete(nsr_id
)
1345 except Exception as e
:
1346 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1347 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1348 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1349 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1351 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1353 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1354 failed_detail
.append("delete conflict: {}".format(e
))
1357 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1360 failed_detail
.append("delete error: {}".format(e
))
1363 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1367 stage
[2] = "Error deleting from VIM"
1369 stage
[2] = "Deleted from VIM"
1370 db_nsr_update
["detailed-status"] = " ".join(stage
)
1371 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1372 self
._write
_op
_status
(nslcmop_id
, stage
)
1375 raise LcmException("; ".join(failed_detail
))
1378 async def instantiate_RO(
1392 :param logging_text: preffix text to use at logging
1393 :param nsr_id: nsr identity
1394 :param nsd: database content of ns descriptor
1395 :param db_nsr: database content of ns record
1396 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1398 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1399 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1400 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1401 :return: None or exception
1404 start_deploy
= time()
1405 ns_params
= db_nslcmop
.get("operationParams")
1406 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1407 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1409 timeout_ns_deploy
= self
.timeout
.get(
1410 "ns_deploy", self
.timeout_ns_deploy
1413 # Check for and optionally request placement optimization. Database will be updated if placement activated
1414 stage
[2] = "Waiting for Placement."
1415 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1416 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1417 for vnfr
in db_vnfrs
.values():
1418 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1421 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1423 return await self
._instantiate
_ng
_ro
(
1436 except Exception as e
:
1437 stage
[2] = "ERROR deploying at VIM"
1438 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1440 "Error deploying at VIM {}".format(e
),
1441 exc_info
=not isinstance(
1444 ROclient
.ROClientException
,
1453 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1455 Wait for kdu to be up, get ip address
1456 :param logging_text: prefix use for logging
1460 :return: IP address, K8s services
1463 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1466 while nb_tries
< 360:
1467 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1471 for x
in get_iterable(db_vnfr
, "kdur")
1472 if x
.get("kdu-name") == kdu_name
1478 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1480 if kdur
.get("status"):
1481 if kdur
["status"] in ("READY", "ENABLED"):
1482 return kdur
.get("ip-address"), kdur
.get("services")
1485 "target KDU={} is in error state".format(kdu_name
)
1488 await asyncio
.sleep(10, loop
=self
.loop
)
1490 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1492 async def wait_vm_up_insert_key_ro(
1493 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1496 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1497 :param logging_text: prefix use for logging
1502 :param pub_key: public ssh key to inject, None to skip
1503 :param user: user to apply the public ssh key
1507 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1511 target_vdu_id
= None
1517 if ro_retries
>= 360: # 1 hour
1519 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1522 await asyncio
.sleep(10, loop
=self
.loop
)
1525 if not target_vdu_id
:
1526 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1528 if not vdu_id
: # for the VNF case
1529 if db_vnfr
.get("status") == "ERROR":
1531 "Cannot inject ssh-key because target VNF is in error state"
1533 ip_address
= db_vnfr
.get("ip-address")
1539 for x
in get_iterable(db_vnfr
, "vdur")
1540 if x
.get("ip-address") == ip_address
1548 for x
in get_iterable(db_vnfr
, "vdur")
1549 if x
.get("vdu-id-ref") == vdu_id
1550 and x
.get("count-index") == vdu_index
1556 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1557 ): # If only one, this should be the target vdu
1558 vdur
= db_vnfr
["vdur"][0]
1561 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1562 vnfr_id
, vdu_id
, vdu_index
1565 # New generation RO stores information at "vim_info"
1568 if vdur
.get("vim_info"):
1570 t
for t
in vdur
["vim_info"]
1571 ) # there should be only one key
1572 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1574 vdur
.get("pdu-type")
1575 or vdur
.get("status") == "ACTIVE"
1576 or ng_ro_status
== "ACTIVE"
1578 ip_address
= vdur
.get("ip-address")
1581 target_vdu_id
= vdur
["vdu-id-ref"]
1582 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1584 "Cannot inject ssh-key because target VM is in error state"
1587 if not target_vdu_id
:
1590 # inject public key into machine
1591 if pub_key
and user
:
1592 self
.logger
.debug(logging_text
+ "Inserting RO key")
1593 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1594 if vdur
.get("pdu-type"):
1595 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1598 ro_vm_id
= "{}-{}".format(
1599 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1600 ) # TODO add vdu_index
1604 "action": "inject_ssh_key",
1608 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1610 desc
= await self
.RO
.deploy(nsr_id
, target
)
1611 action_id
= desc
["action_id"]
1612 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1615 # wait until NS is deployed at RO
1617 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1618 ro_nsr_id
= deep_get(
1619 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1623 result_dict
= await self
.RO
.create_action(
1625 item_id_name
=ro_nsr_id
,
1627 "add_public_key": pub_key
,
1632 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1633 if not result_dict
or not isinstance(result_dict
, dict):
1635 "Unknown response from RO when injecting key"
1637 for result
in result_dict
.values():
1638 if result
.get("vim_result") == 200:
1641 raise ROclient
.ROClientException(
1642 "error injecting key: {}".format(
1643 result
.get("description")
1647 except NgRoException
as e
:
1649 "Reaching max tries injecting key. Error: {}".format(e
)
1651 except ROclient
.ROClientException
as e
:
1655 + "error injecting key: {}. Retrying until {} seconds".format(
1662 "Reaching max tries injecting key. Error: {}".format(e
)
1669 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1671 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1673 my_vca
= vca_deployed_list
[vca_index
]
1674 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1675 # vdu or kdu: no dependencies
1679 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1680 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1681 configuration_status_list
= db_nsr
["configurationStatus"]
1682 for index
, vca_deployed
in enumerate(configuration_status_list
):
1683 if index
== vca_index
:
1686 if not my_vca
.get("member-vnf-index") or (
1687 vca_deployed
.get("member-vnf-index")
1688 == my_vca
.get("member-vnf-index")
1690 internal_status
= configuration_status_list
[index
].get("status")
1691 if internal_status
== "READY":
1693 elif internal_status
== "BROKEN":
1695 "Configuration aborted because dependent charm/s has failed"
1700 # no dependencies, return
1702 await asyncio
.sleep(10)
1705 raise LcmException("Configuration aborted because dependent charm/s timeout")
1707 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1710 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1712 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1713 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1716 async def instantiate_N2VC(
1733 ee_config_descriptor
,
1735 nsr_id
= db_nsr
["_id"]
1736 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1737 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1738 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1739 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1741 "collection": "nsrs",
1742 "filter": {"_id": nsr_id
},
1743 "path": db_update_entry
,
1749 element_under_configuration
= nsr_id
1753 vnfr_id
= db_vnfr
["_id"]
1754 osm_config
["osm"]["vnf_id"] = vnfr_id
1756 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1758 if vca_type
== "native_charm":
1761 index_number
= vdu_index
or 0
1764 element_type
= "VNF"
1765 element_under_configuration
= vnfr_id
1766 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1768 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1769 element_type
= "VDU"
1770 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1771 osm_config
["osm"]["vdu_id"] = vdu_id
1773 namespace
+= ".{}".format(kdu_name
)
1774 element_type
= "KDU"
1775 element_under_configuration
= kdu_name
1776 osm_config
["osm"]["kdu_name"] = kdu_name
1779 if base_folder
["pkg-dir"]:
1780 artifact_path
= "{}/{}/{}/{}".format(
1781 base_folder
["folder"],
1782 base_folder
["pkg-dir"],
1785 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1790 artifact_path
= "{}/Scripts/{}/{}/".format(
1791 base_folder
["folder"],
1794 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1799 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1801 # get initial_config_primitive_list that applies to this element
1802 initial_config_primitive_list
= config_descriptor
.get(
1803 "initial-config-primitive"
1807 "Initial config primitive list > {}".format(
1808 initial_config_primitive_list
1812 # add config if not present for NS charm
1813 ee_descriptor_id
= ee_config_descriptor
.get("id")
1814 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1815 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1816 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1820 "Initial config primitive list #2 > {}".format(
1821 initial_config_primitive_list
1824 # n2vc_redesign STEP 3.1
1825 # find old ee_id if exists
1826 ee_id
= vca_deployed
.get("ee_id")
1828 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1829 # create or register execution environment in VCA
1830 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1832 self
._write
_configuration
_status
(
1834 vca_index
=vca_index
,
1836 element_under_configuration
=element_under_configuration
,
1837 element_type
=element_type
,
1840 step
= "create execution environment"
1841 self
.logger
.debug(logging_text
+ step
)
1845 if vca_type
== "k8s_proxy_charm":
1846 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1847 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1848 namespace
=namespace
,
1849 artifact_path
=artifact_path
,
1853 elif vca_type
== "helm" or vca_type
== "helm-v3":
1854 ee_id
, credentials
= await self
.vca_map
[
1856 ].create_execution_environment(
1857 namespace
=namespace
,
1861 artifact_path
=artifact_path
,
1865 ee_id
, credentials
= await self
.vca_map
[
1867 ].create_execution_environment(
1868 namespace
=namespace
,
1874 elif vca_type
== "native_charm":
1875 step
= "Waiting to VM being up and getting IP address"
1876 self
.logger
.debug(logging_text
+ step
)
1877 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1886 credentials
= {"hostname": rw_mgmt_ip
}
1888 username
= deep_get(
1889 config_descriptor
, ("config-access", "ssh-access", "default-user")
1891 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1892 # merged. Meanwhile let's get username from initial-config-primitive
1893 if not username
and initial_config_primitive_list
:
1894 for config_primitive
in initial_config_primitive_list
:
1895 for param
in config_primitive
.get("parameter", ()):
1896 if param
["name"] == "ssh-username":
1897 username
= param
["value"]
1901 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1902 "'config-access.ssh-access.default-user'"
1904 credentials
["username"] = username
1905 # n2vc_redesign STEP 3.2
1907 self
._write
_configuration
_status
(
1909 vca_index
=vca_index
,
1910 status
="REGISTERING",
1911 element_under_configuration
=element_under_configuration
,
1912 element_type
=element_type
,
1915 step
= "register execution environment {}".format(credentials
)
1916 self
.logger
.debug(logging_text
+ step
)
1917 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1918 credentials
=credentials
,
1919 namespace
=namespace
,
1924 # for compatibility with MON/POL modules, the need model and application name at database
1925 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1926 ee_id_parts
= ee_id
.split(".")
1927 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1928 if len(ee_id_parts
) >= 2:
1929 model_name
= ee_id_parts
[0]
1930 application_name
= ee_id_parts
[1]
1931 db_nsr_update
[db_update_entry
+ "model"] = model_name
1932 db_nsr_update
[db_update_entry
+ "application"] = application_name
1934 # n2vc_redesign STEP 3.3
1935 step
= "Install configuration Software"
1937 self
._write
_configuration
_status
(
1939 vca_index
=vca_index
,
1940 status
="INSTALLING SW",
1941 element_under_configuration
=element_under_configuration
,
1942 element_type
=element_type
,
1943 other_update
=db_nsr_update
,
1946 # TODO check if already done
1947 self
.logger
.debug(logging_text
+ step
)
1949 if vca_type
== "native_charm":
1950 config_primitive
= next(
1951 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1954 if config_primitive
:
1955 config
= self
._map
_primitive
_params
(
1956 config_primitive
, {}, deploy_params
1959 if vca_type
== "lxc_proxy_charm":
1960 if element_type
== "NS":
1961 num_units
= db_nsr
.get("config-units") or 1
1962 elif element_type
== "VNF":
1963 num_units
= db_vnfr
.get("config-units") or 1
1964 elif element_type
== "VDU":
1965 for v
in db_vnfr
["vdur"]:
1966 if vdu_id
== v
["vdu-id-ref"]:
1967 num_units
= v
.get("config-units") or 1
1969 if vca_type
!= "k8s_proxy_charm":
1970 await self
.vca_map
[vca_type
].install_configuration_sw(
1972 artifact_path
=artifact_path
,
1975 num_units
=num_units
,
1980 # write in db flag of configuration_sw already installed
1982 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1985 # add relations for this VCA (wait for other peers related with this VCA)
1986 await self
._add
_vca
_relations
(
1987 logging_text
=logging_text
,
1990 vca_index
=vca_index
,
1993 # if SSH access is required, then get execution environment SSH public
1994 # if native charm we have waited already to VM be UP
1995 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1998 # self.logger.debug("get ssh key block")
2000 config_descriptor
, ("config-access", "ssh-access", "required")
2002 # self.logger.debug("ssh key needed")
2003 # Needed to inject a ssh key
2006 ("config-access", "ssh-access", "default-user"),
2008 step
= "Install configuration Software, getting public ssh key"
2009 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2010 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2013 step
= "Insert public key into VM user={} ssh_key={}".format(
2017 # self.logger.debug("no need to get ssh key")
2018 step
= "Waiting to VM being up and getting IP address"
2019 self
.logger
.debug(logging_text
+ step
)
2021 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2024 # n2vc_redesign STEP 5.1
2025 # wait for RO (ip-address) Insert pub_key into VM
2028 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2029 logging_text
, nsr_id
, vnfr_id
, kdu_name
2031 vnfd
= self
.db
.get_one(
2033 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2035 kdu
= get_kdu(vnfd
, kdu_name
)
2037 service
["name"] for service
in get_kdu_services(kdu
)
2039 exposed_services
= []
2040 for service
in services
:
2041 if any(s
in service
["name"] for s
in kdu_services
):
2042 exposed_services
.append(service
)
2043 await self
.vca_map
[vca_type
].exec_primitive(
2045 primitive_name
="config",
2047 "osm-config": json
.dumps(
2049 k8s
={"services": exposed_services
}
2056 # This verification is needed in order to avoid trying to add a public key
2057 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2058 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2059 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2061 elif db_vnfr
.get('vdur'):
2062 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2072 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2074 # store rw_mgmt_ip in deploy params for later replacement
2075 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2077 # n2vc_redesign STEP 6 Execute initial config primitive
2078 step
= "execute initial config primitive"
2080 # wait for dependent primitives execution (NS -> VNF -> VDU)
2081 if initial_config_primitive_list
:
2082 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2084 # stage, in function of element type: vdu, kdu, vnf or ns
2085 my_vca
= vca_deployed_list
[vca_index
]
2086 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2088 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2089 elif my_vca
.get("member-vnf-index"):
2091 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2094 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2096 self
._write
_configuration
_status
(
2097 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2100 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2102 check_if_terminated_needed
= True
2103 for initial_config_primitive
in initial_config_primitive_list
:
2104 # adding information on the vca_deployed if it is a NS execution environment
2105 if not vca_deployed
["member-vnf-index"]:
2106 deploy_params
["ns_config_info"] = json
.dumps(
2107 self
._get
_ns
_config
_info
(nsr_id
)
2109 # TODO check if already done
2110 primitive_params_
= self
._map
_primitive
_params
(
2111 initial_config_primitive
, {}, deploy_params
2114 step
= "execute primitive '{}' params '{}'".format(
2115 initial_config_primitive
["name"], primitive_params_
2117 self
.logger
.debug(logging_text
+ step
)
2118 await self
.vca_map
[vca_type
].exec_primitive(
2120 primitive_name
=initial_config_primitive
["name"],
2121 params_dict
=primitive_params_
,
2126 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2127 if check_if_terminated_needed
:
2128 if config_descriptor
.get("terminate-config-primitive"):
2130 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2132 check_if_terminated_needed
= False
2134 # TODO register in database that primitive is done
2136 # STEP 7 Configure metrics
2137 if vca_type
== "helm" or vca_type
== "helm-v3":
2138 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2140 artifact_path
=artifact_path
,
2141 ee_config_descriptor
=ee_config_descriptor
,
2144 target_ip
=rw_mgmt_ip
,
2150 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2153 for job
in prometheus_jobs
:
2156 {"job_name": job
["job_name"]},
2159 fail_on_empty
=False,
2162 step
= "instantiated at VCA"
2163 self
.logger
.debug(logging_text
+ step
)
2165 self
._write
_configuration
_status
(
2166 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2169 except Exception as e
: # TODO not use Exception but N2VC exception
2170 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2172 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2175 "Exception while {} : {}".format(step
, e
), exc_info
=True
2177 self
._write
_configuration
_status
(
2178 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2180 raise LcmException("{} {}".format(step
, e
)) from e
2182 def _write_ns_status(
2186 current_operation
: str,
2187 current_operation_id
: str,
2188 error_description
: str = None,
2189 error_detail
: str = None,
2190 other_update
: dict = None,
2193 Update db_nsr fields.
2196 :param current_operation:
2197 :param current_operation_id:
2198 :param error_description:
2199 :param error_detail:
2200 :param other_update: Other required changes at database if provided, will be cleared
2204 db_dict
= other_update
or {}
2207 ] = current_operation_id
# for backward compatibility
2208 db_dict
["_admin.current-operation"] = current_operation_id
2209 db_dict
["_admin.operation-type"] = (
2210 current_operation
if current_operation
!= "IDLE" else None
2212 db_dict
["currentOperation"] = current_operation
2213 db_dict
["currentOperationID"] = current_operation_id
2214 db_dict
["errorDescription"] = error_description
2215 db_dict
["errorDetail"] = error_detail
2218 db_dict
["nsState"] = ns_state
2219 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2220 except DbException
as e
:
2221 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2223 def _write_op_status(
2227 error_message
: str = None,
2228 queuePosition
: int = 0,
2229 operation_state
: str = None,
2230 other_update
: dict = None,
2233 db_dict
= other_update
or {}
2234 db_dict
["queuePosition"] = queuePosition
2235 if isinstance(stage
, list):
2236 db_dict
["stage"] = stage
[0]
2237 db_dict
["detailed-status"] = " ".join(stage
)
2238 elif stage
is not None:
2239 db_dict
["stage"] = str(stage
)
2241 if error_message
is not None:
2242 db_dict
["errorMessage"] = error_message
2243 if operation_state
is not None:
2244 db_dict
["operationState"] = operation_state
2245 db_dict
["statusEnteredTime"] = time()
2246 self
.update_db_2("nslcmops", op_id
, db_dict
)
2247 except DbException
as e
:
2249 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2252 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2254 nsr_id
= db_nsr
["_id"]
2255 # configurationStatus
2256 config_status
= db_nsr
.get("configurationStatus")
2259 "configurationStatus.{}.status".format(index
): status
2260 for index
, v
in enumerate(config_status
)
2264 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2266 except DbException
as e
:
2268 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2271 def _write_configuration_status(
2276 element_under_configuration
: str = None,
2277 element_type
: str = None,
2278 other_update
: dict = None,
2281 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2282 # .format(vca_index, status))
2285 db_path
= "configurationStatus.{}.".format(vca_index
)
2286 db_dict
= other_update
or {}
2288 db_dict
[db_path
+ "status"] = status
2289 if element_under_configuration
:
2291 db_path
+ "elementUnderConfiguration"
2292 ] = element_under_configuration
2294 db_dict
[db_path
+ "elementType"] = element_type
2295 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2296 except DbException
as e
:
2298 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2299 status
, nsr_id
, vca_index
, e
2303 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2305 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2306 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2307 Database is used because the result can be obtained from a different LCM worker in case of HA.
2308 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2309 :param db_nslcmop: database content of nslcmop
2310 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2311 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2312 computed 'vim-account-id'
2315 nslcmop_id
= db_nslcmop
["_id"]
2316 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2317 if placement_engine
== "PLA":
2319 logging_text
+ "Invoke and wait for placement optimization"
2321 await self
.msg
.aiowrite(
2322 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2324 db_poll_interval
= 5
2325 wait
= db_poll_interval
* 10
2327 while not pla_result
and wait
>= 0:
2328 await asyncio
.sleep(db_poll_interval
)
2329 wait
-= db_poll_interval
2330 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2331 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2335 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2338 for pla_vnf
in pla_result
["vnf"]:
2339 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2340 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2345 {"_id": vnfr
["_id"]},
2346 {"vim-account-id": pla_vnf
["vimAccountId"]},
2349 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2352 def update_nsrs_with_pla_result(self
, params
):
2354 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2356 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2358 except Exception as e
:
2359 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2361 async def instantiate(self
, nsr_id
, nslcmop_id
):
2364 :param nsr_id: ns instance to deploy
2365 :param nslcmop_id: operation to run
2369 # Try to lock HA task here
2370 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2371 if not task_is_locked_by_me
:
2373 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2377 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2378 self
.logger
.debug(logging_text
+ "Enter")
2380 # get all needed from database
2382 # database nsrs record
2385 # database nslcmops record
2388 # update operation on nsrs
2390 # update operation on nslcmops
2391 db_nslcmop_update
= {}
2393 nslcmop_operation_state
= None
2394 db_vnfrs
= {} # vnf's info indexed by member-index
2396 tasks_dict_info
= {} # from task to info text
2400 "Stage 1/5: preparation of the environment.",
2401 "Waiting for previous operations to terminate.",
2404 # ^ stage, step, VIM progress
2406 # wait for any previous tasks in process
2407 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2409 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2410 stage
[1] = "Reading from database."
2411 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2412 db_nsr_update
["detailed-status"] = "creating"
2413 db_nsr_update
["operational-status"] = "init"
2414 self
._write
_ns
_status
(
2416 ns_state
="BUILDING",
2417 current_operation
="INSTANTIATING",
2418 current_operation_id
=nslcmop_id
,
2419 other_update
=db_nsr_update
,
2421 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2423 # read from db: operation
2424 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2425 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2426 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2427 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2428 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2430 ns_params
= db_nslcmop
.get("operationParams")
2431 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2432 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2434 timeout_ns_deploy
= self
.timeout
.get(
2435 "ns_deploy", self
.timeout_ns_deploy
2439 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2440 self
.logger
.debug(logging_text
+ stage
[1])
2441 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2442 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2443 self
.logger
.debug(logging_text
+ stage
[1])
2444 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2445 self
.fs
.sync(db_nsr
["nsd-id"])
2447 # nsr_name = db_nsr["name"] # TODO short-name??
2449 # read from db: vnf's of this ns
2450 stage
[1] = "Getting vnfrs from db."
2451 self
.logger
.debug(logging_text
+ stage
[1])
2452 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2454 # read from db: vnfd's for every vnf
2455 db_vnfds
= [] # every vnfd data
2457 # for each vnf in ns, read vnfd
2458 for vnfr
in db_vnfrs_list
:
2459 if vnfr
.get("kdur"):
2461 for kdur
in vnfr
["kdur"]:
2462 if kdur
.get("additionalParams"):
2463 kdur
["additionalParams"] = json
.loads(
2464 kdur
["additionalParams"]
2466 kdur_list
.append(kdur
)
2467 vnfr
["kdur"] = kdur_list
2469 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2470 vnfd_id
= vnfr
["vnfd-id"]
2471 vnfd_ref
= vnfr
["vnfd-ref"]
2472 self
.fs
.sync(vnfd_id
)
2474 # if we haven't this vnfd, read it from db
2475 if vnfd_id
not in db_vnfds
:
2477 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2480 self
.logger
.debug(logging_text
+ stage
[1])
2481 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2484 db_vnfds
.append(vnfd
)
2486 # Get or generates the _admin.deployed.VCA list
2487 vca_deployed_list
= None
2488 if db_nsr
["_admin"].get("deployed"):
2489 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2490 if vca_deployed_list
is None:
2491 vca_deployed_list
= []
2492 configuration_status_list
= []
2493 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2494 db_nsr_update
["configurationStatus"] = configuration_status_list
2495 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2496 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2497 elif isinstance(vca_deployed_list
, dict):
2498 # maintain backward compatibility. Change a dict to list at database
2499 vca_deployed_list
= list(vca_deployed_list
.values())
2500 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2501 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2504 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2506 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2507 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2509 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2510 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2511 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2513 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2516 # n2vc_redesign STEP 2 Deploy Network Scenario
2517 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2518 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2520 stage
[1] = "Deploying KDUs."
2521 # self.logger.debug(logging_text + "Before deploy_kdus")
2522 # Call to deploy_kdus in case exists the "vdu:kdu" param
2523 await self
.deploy_kdus(
2524 logging_text
=logging_text
,
2526 nslcmop_id
=nslcmop_id
,
2529 task_instantiation_info
=tasks_dict_info
,
2532 stage
[1] = "Getting VCA public key."
2533 # n2vc_redesign STEP 1 Get VCA public ssh-key
2534 # feature 1429. Add n2vc public key to needed VMs
2535 n2vc_key
= self
.n2vc
.get_public_key()
2536 n2vc_key_list
= [n2vc_key
]
2537 if self
.vca_config
.get("public_key"):
2538 n2vc_key_list
.append(self
.vca_config
["public_key"])
2540 stage
[1] = "Deploying NS at VIM."
2541 task_ro
= asyncio
.ensure_future(
2542 self
.instantiate_RO(
2543 logging_text
=logging_text
,
2547 db_nslcmop
=db_nslcmop
,
2550 n2vc_key_list
=n2vc_key_list
,
2554 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2555 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2557 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2558 stage
[1] = "Deploying Execution Environments."
2559 self
.logger
.debug(logging_text
+ stage
[1])
2561 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2562 for vnf_profile
in get_vnf_profiles(nsd
):
2563 vnfd_id
= vnf_profile
["vnfd-id"]
2564 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2565 member_vnf_index
= str(vnf_profile
["id"])
2566 db_vnfr
= db_vnfrs
[member_vnf_index
]
2567 base_folder
= vnfd
["_admin"]["storage"]
2573 # Get additional parameters
2574 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2575 if db_vnfr
.get("additionalParamsForVnf"):
2576 deploy_params
.update(
2577 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2580 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2581 if descriptor_config
:
2583 logging_text
=logging_text
2584 + "member_vnf_index={} ".format(member_vnf_index
),
2587 nslcmop_id
=nslcmop_id
,
2593 member_vnf_index
=member_vnf_index
,
2594 vdu_index
=vdu_index
,
2596 deploy_params
=deploy_params
,
2597 descriptor_config
=descriptor_config
,
2598 base_folder
=base_folder
,
2599 task_instantiation_info
=tasks_dict_info
,
2603 # Deploy charms for each VDU that supports one.
2604 for vdud
in get_vdu_list(vnfd
):
2606 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2607 vdur
= find_in_list(
2608 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2611 if vdur
.get("additionalParams"):
2612 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2614 deploy_params_vdu
= deploy_params
2615 deploy_params_vdu
["OSM"] = get_osm_params(
2616 db_vnfr
, vdu_id
, vdu_count_index
=0
2618 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2620 self
.logger
.debug("VDUD > {}".format(vdud
))
2622 "Descriptor config > {}".format(descriptor_config
)
2624 if descriptor_config
:
2627 for vdu_index
in range(vdud_count
):
2628 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2630 logging_text
=logging_text
2631 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2632 member_vnf_index
, vdu_id
, vdu_index
2636 nslcmop_id
=nslcmop_id
,
2642 member_vnf_index
=member_vnf_index
,
2643 vdu_index
=vdu_index
,
2645 deploy_params
=deploy_params_vdu
,
2646 descriptor_config
=descriptor_config
,
2647 base_folder
=base_folder
,
2648 task_instantiation_info
=tasks_dict_info
,
2651 for kdud
in get_kdu_list(vnfd
):
2652 kdu_name
= kdud
["name"]
2653 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2654 if descriptor_config
:
2659 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2661 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2662 if kdur
.get("additionalParams"):
2663 deploy_params_kdu
.update(
2664 parse_yaml_strings(kdur
["additionalParams"].copy())
2668 logging_text
=logging_text
,
2671 nslcmop_id
=nslcmop_id
,
2677 member_vnf_index
=member_vnf_index
,
2678 vdu_index
=vdu_index
,
2680 deploy_params
=deploy_params_kdu
,
2681 descriptor_config
=descriptor_config
,
2682 base_folder
=base_folder
,
2683 task_instantiation_info
=tasks_dict_info
,
2687 # Check if this NS has a charm configuration
2688 descriptor_config
= nsd
.get("ns-configuration")
2689 if descriptor_config
and descriptor_config
.get("juju"):
2692 member_vnf_index
= None
2698 # Get additional parameters
2699 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2700 if db_nsr
.get("additionalParamsForNs"):
2701 deploy_params
.update(
2702 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2704 base_folder
= nsd
["_admin"]["storage"]
2706 logging_text
=logging_text
,
2709 nslcmop_id
=nslcmop_id
,
2715 member_vnf_index
=member_vnf_index
,
2716 vdu_index
=vdu_index
,
2718 deploy_params
=deploy_params
,
2719 descriptor_config
=descriptor_config
,
2720 base_folder
=base_folder
,
2721 task_instantiation_info
=tasks_dict_info
,
2725 # rest of staff will be done at finally
2728 ROclient
.ROClientException
,
2734 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2737 except asyncio
.CancelledError
:
2739 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2741 exc
= "Operation was cancelled"
2742 except Exception as e
:
2743 exc
= traceback
.format_exc()
2744 self
.logger
.critical(
2745 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2750 error_list
.append(str(exc
))
2752 # wait for pending tasks
2754 stage
[1] = "Waiting for instantiate pending tasks."
2755 self
.logger
.debug(logging_text
+ stage
[1])
2756 error_list
+= await self
._wait
_for
_tasks
(
2764 stage
[1] = stage
[2] = ""
2765 except asyncio
.CancelledError
:
2766 error_list
.append("Cancelled")
2767 # TODO cancel all tasks
2768 except Exception as exc
:
2769 error_list
.append(str(exc
))
2771 # update operation-status
2772 db_nsr_update
["operational-status"] = "running"
2773 # let's begin with VCA 'configured' status (later we can change it)
2774 db_nsr_update
["config-status"] = "configured"
2775 for task
, task_name
in tasks_dict_info
.items():
2776 if not task
.done() or task
.cancelled() or task
.exception():
2777 if task_name
.startswith(self
.task_name_deploy_vca
):
2778 # A N2VC task is pending
2779 db_nsr_update
["config-status"] = "failed"
2781 # RO or KDU task is pending
2782 db_nsr_update
["operational-status"] = "failed"
2784 # update status at database
2786 error_detail
= ". ".join(error_list
)
2787 self
.logger
.error(logging_text
+ error_detail
)
2788 error_description_nslcmop
= "{} Detail: {}".format(
2789 stage
[0], error_detail
2791 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2792 nslcmop_id
, stage
[0]
2795 db_nsr_update
["detailed-status"] = (
2796 error_description_nsr
+ " Detail: " + error_detail
2798 db_nslcmop_update
["detailed-status"] = error_detail
2799 nslcmop_operation_state
= "FAILED"
2803 error_description_nsr
= error_description_nslcmop
= None
2805 db_nsr_update
["detailed-status"] = "Done"
2806 db_nslcmop_update
["detailed-status"] = "Done"
2807 nslcmop_operation_state
= "COMPLETED"
2810 self
._write
_ns
_status
(
2813 current_operation
="IDLE",
2814 current_operation_id
=None,
2815 error_description
=error_description_nsr
,
2816 error_detail
=error_detail
,
2817 other_update
=db_nsr_update
,
2819 self
._write
_op
_status
(
2822 error_message
=error_description_nslcmop
,
2823 operation_state
=nslcmop_operation_state
,
2824 other_update
=db_nslcmop_update
,
2827 if nslcmop_operation_state
:
2829 await self
.msg
.aiowrite(
2834 "nslcmop_id": nslcmop_id
,
2835 "operationState": nslcmop_operation_state
,
2839 except Exception as e
:
2841 logging_text
+ "kafka_write notification Exception {}".format(e
)
2844 self
.logger
.debug(logging_text
+ "Exit")
2845 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2847 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2848 if vnfd_id
not in cached_vnfds
:
2849 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2850 return cached_vnfds
[vnfd_id
]
2852 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2853 if vnf_profile_id
not in cached_vnfrs
:
2854 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2857 "member-vnf-index-ref": vnf_profile_id
,
2858 "nsr-id-ref": nsr_id
,
2861 return cached_vnfrs
[vnf_profile_id
]
2863 def _is_deployed_vca_in_relation(
2864 self
, vca
: DeployedVCA
, relation
: Relation
2867 for endpoint
in (relation
.provider
, relation
.requirer
):
2868 if endpoint
["kdu-resource-profile-id"]:
2871 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2872 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2873 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2879 def _update_ee_relation_data_with_implicit_data(
2880 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2882 ee_relation_data
= safe_get_ee_relation(
2883 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2885 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2886 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2887 "execution-environment-ref"
2889 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2890 vnfd_id
= vnf_profile
["vnfd-id"]
2891 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2894 if ee_relation_level
== EELevel
.VNF
2895 else ee_relation_data
["vdu-profile-id"]
2897 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2900 f
"not execution environments found for ee_relation {ee_relation_data}"
2902 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2903 return ee_relation_data
2905 def _get_ns_relations(
2908 nsd
: Dict
[str, Any
],
2910 cached_vnfds
: Dict
[str, Any
],
2911 ) -> List
[Relation
]:
2913 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2914 for r
in db_ns_relations
:
2915 provider_dict
= None
2916 requirer_dict
= None
2917 if all(key
in r
for key
in ("provider", "requirer")):
2918 provider_dict
= r
["provider"]
2919 requirer_dict
= r
["requirer"]
2920 elif "entities" in r
:
2921 provider_id
= r
["entities"][0]["id"]
2924 "endpoint": r
["entities"][0]["endpoint"],
2926 if provider_id
!= nsd
["id"]:
2927 provider_dict
["vnf-profile-id"] = provider_id
2928 requirer_id
= r
["entities"][1]["id"]
2931 "endpoint": r
["entities"][1]["endpoint"],
2933 if requirer_id
!= nsd
["id"]:
2934 requirer_dict
["vnf-profile-id"] = requirer_id
2937 "provider/requirer or entities must be included in the relation."
2939 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2940 nsr_id
, nsd
, provider_dict
, cached_vnfds
2942 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2943 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2945 provider
= EERelation(relation_provider
)
2946 requirer
= EERelation(relation_requirer
)
2947 relation
= Relation(r
["name"], provider
, requirer
)
2948 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2950 relations
.append(relation
)
2953 def _get_vnf_relations(
2956 nsd
: Dict
[str, Any
],
2958 cached_vnfds
: Dict
[str, Any
],
2959 ) -> List
[Relation
]:
2961 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2962 vnf_profile_id
= vnf_profile
["id"]
2963 vnfd_id
= vnf_profile
["vnfd-id"]
2964 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2965 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2966 for r
in db_vnf_relations
:
2967 provider_dict
= None
2968 requirer_dict
= None
2969 if all(key
in r
for key
in ("provider", "requirer")):
2970 provider_dict
= r
["provider"]
2971 requirer_dict
= r
["requirer"]
2972 elif "entities" in r
:
2973 provider_id
= r
["entities"][0]["id"]
2976 "vnf-profile-id": vnf_profile_id
,
2977 "endpoint": r
["entities"][0]["endpoint"],
2979 if provider_id
!= vnfd_id
:
2980 provider_dict
["vdu-profile-id"] = provider_id
2981 requirer_id
= r
["entities"][1]["id"]
2984 "vnf-profile-id": vnf_profile_id
,
2985 "endpoint": r
["entities"][1]["endpoint"],
2987 if requirer_id
!= vnfd_id
:
2988 requirer_dict
["vdu-profile-id"] = requirer_id
2991 "provider/requirer or entities must be included in the relation."
2993 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2994 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2996 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2997 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2999 provider
= EERelation(relation_provider
)
3000 requirer
= EERelation(relation_requirer
)
3001 relation
= Relation(r
["name"], provider
, requirer
)
3002 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3004 relations
.append(relation
)
3007 def _get_kdu_resource_data(
3009 ee_relation
: EERelation
,
3010 db_nsr
: Dict
[str, Any
],
3011 cached_vnfds
: Dict
[str, Any
],
3012 ) -> DeployedK8sResource
:
3013 nsd
= get_nsd(db_nsr
)
3014 vnf_profiles
= get_vnf_profiles(nsd
)
3015 vnfd_id
= find_in_list(
3017 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3019 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3020 kdu_resource_profile
= get_kdu_resource_profile(
3021 db_vnfd
, ee_relation
.kdu_resource_profile_id
3023 kdu_name
= kdu_resource_profile
["kdu-name"]
3024 deployed_kdu
, _
= get_deployed_kdu(
3025 db_nsr
.get("_admin", ()).get("deployed", ()),
3027 ee_relation
.vnf_profile_id
,
3029 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3032 def _get_deployed_component(
3034 ee_relation
: EERelation
,
3035 db_nsr
: Dict
[str, Any
],
3036 cached_vnfds
: Dict
[str, Any
],
3037 ) -> DeployedComponent
:
3038 nsr_id
= db_nsr
["_id"]
3039 deployed_component
= None
3040 ee_level
= EELevel
.get_level(ee_relation
)
3041 if ee_level
== EELevel
.NS
:
3042 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3044 deployed_component
= DeployedVCA(nsr_id
, vca
)
3045 elif ee_level
== EELevel
.VNF
:
3046 vca
= get_deployed_vca(
3050 "member-vnf-index": ee_relation
.vnf_profile_id
,
3051 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3055 deployed_component
= DeployedVCA(nsr_id
, vca
)
3056 elif ee_level
== EELevel
.VDU
:
3057 vca
= get_deployed_vca(
3060 "vdu_id": ee_relation
.vdu_profile_id
,
3061 "member-vnf-index": ee_relation
.vnf_profile_id
,
3062 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3066 deployed_component
= DeployedVCA(nsr_id
, vca
)
3067 elif ee_level
== EELevel
.KDU
:
3068 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3069 ee_relation
, db_nsr
, cached_vnfds
3071 if kdu_resource_data
:
3072 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3073 return deployed_component
3075 async def _add_relation(
3079 db_nsr
: Dict
[str, Any
],
3080 cached_vnfds
: Dict
[str, Any
],
3081 cached_vnfrs
: Dict
[str, Any
],
3083 deployed_provider
= self
._get
_deployed
_component
(
3084 relation
.provider
, db_nsr
, cached_vnfds
3086 deployed_requirer
= self
._get
_deployed
_component
(
3087 relation
.requirer
, db_nsr
, cached_vnfds
3091 and deployed_requirer
3092 and deployed_provider
.config_sw_installed
3093 and deployed_requirer
.config_sw_installed
3095 provider_db_vnfr
= (
3097 relation
.provider
.nsr_id
,
3098 relation
.provider
.vnf_profile_id
,
3101 if relation
.provider
.vnf_profile_id
3104 requirer_db_vnfr
= (
3106 relation
.requirer
.nsr_id
,
3107 relation
.requirer
.vnf_profile_id
,
3110 if relation
.requirer
.vnf_profile_id
3113 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3114 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3115 provider_relation_endpoint
= RelationEndpoint(
3116 deployed_provider
.ee_id
,
3118 relation
.provider
.endpoint
,
3120 requirer_relation_endpoint
= RelationEndpoint(
3121 deployed_requirer
.ee_id
,
3123 relation
.requirer
.endpoint
,
3125 await self
.vca_map
[vca_type
].add_relation(
3126 provider
=provider_relation_endpoint
,
3127 requirer
=requirer_relation_endpoint
,
3129 # remove entry from relations list
3133 async def _add_vca_relations(
3139 timeout
: int = 3600,
3143 # 1. find all relations for this VCA
3144 # 2. wait for other peers related
3148 # STEP 1: find all relations for this VCA
3151 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3152 nsd
= get_nsd(db_nsr
)
3155 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3156 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3161 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3162 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3164 # if no relations, terminate
3166 self
.logger
.debug(logging_text
+ " No relations")
3169 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3176 if now
- start
>= timeout
:
3177 self
.logger
.error(logging_text
+ " : timeout adding relations")
3180 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3181 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3183 # for each relation, find the VCA's related
3184 for relation
in relations
.copy():
3185 added
= await self
._add
_relation
(
3193 relations
.remove(relation
)
3196 self
.logger
.debug("Relations added")
3198 await asyncio
.sleep(5.0)
3202 except Exception as e
:
3203 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3206 async def _install_kdu(
3214 k8s_instance_info
: dict,
3215 k8params
: dict = None,
3221 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3224 "collection": "nsrs",
3225 "filter": {"_id": nsr_id
},
3226 "path": nsr_db_path
,
3229 if k8s_instance_info
.get("kdu-deployment-name"):
3230 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3232 kdu_instance
= self
.k8scluster_map
[
3234 ].generate_kdu_instance_name(
3235 db_dict
=db_dict_install
,
3236 kdu_model
=k8s_instance_info
["kdu-model"],
3237 kdu_name
=k8s_instance_info
["kdu-name"],
3240 # Update the nsrs table with the kdu-instance value
3244 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3247 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3248 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3249 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3250 # namespace, this first verification could be removed, and the next step would be done for any kind
3252 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3253 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3254 if k8sclustertype
in ("juju", "juju-bundle"):
3255 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3256 # that the user passed a namespace which he wants its KDU to be deployed in)
3262 "_admin.projects_write": k8s_instance_info
["namespace"],
3263 "_admin.projects_read": k8s_instance_info
["namespace"],
3269 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3274 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3276 k8s_instance_info
["namespace"] = kdu_instance
3278 await self
.k8scluster_map
[k8sclustertype
].install(
3279 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3280 kdu_model
=k8s_instance_info
["kdu-model"],
3283 db_dict
=db_dict_install
,
3285 kdu_name
=k8s_instance_info
["kdu-name"],
3286 namespace
=k8s_instance_info
["namespace"],
3287 kdu_instance
=kdu_instance
,
3291 # Obtain services to obtain management service ip
3292 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3293 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3294 kdu_instance
=kdu_instance
,
3295 namespace
=k8s_instance_info
["namespace"],
3298 # Obtain management service info (if exists)
3299 vnfr_update_dict
= {}
3300 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3302 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3307 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3310 for service
in kdud
.get("service", [])
3311 if service
.get("mgmt-service")
3313 for mgmt_service
in mgmt_services
:
3314 for service
in services
:
3315 if service
["name"].startswith(mgmt_service
["name"]):
3316 # Mgmt service found, Obtain service ip
3317 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3318 if isinstance(ip
, list) and len(ip
) == 1:
3322 "kdur.{}.ip-address".format(kdu_index
)
3325 # Check if must update also mgmt ip at the vnf
3326 service_external_cp
= mgmt_service
.get(
3327 "external-connection-point-ref"
3329 if service_external_cp
:
3331 deep_get(vnfd
, ("mgmt-interface", "cp"))
3332 == service_external_cp
3334 vnfr_update_dict
["ip-address"] = ip
3339 "external-connection-point-ref", ""
3341 == service_external_cp
,
3344 "kdur.{}.ip-address".format(kdu_index
)
3349 "Mgmt service name: {} not found".format(
3350 mgmt_service
["name"]
3354 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3355 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3357 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3360 and kdu_config
.get("initial-config-primitive")
3361 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3363 initial_config_primitive_list
= kdu_config
.get(
3364 "initial-config-primitive"
3366 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3368 for initial_config_primitive
in initial_config_primitive_list
:
3369 primitive_params_
= self
._map
_primitive
_params
(
3370 initial_config_primitive
, {}, {}
3373 await asyncio
.wait_for(
3374 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3375 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3376 kdu_instance
=kdu_instance
,
3377 primitive_name
=initial_config_primitive
["name"],
3378 params
=primitive_params_
,
3379 db_dict
=db_dict_install
,
3385 except Exception as e
:
3386 # Prepare update db with error and raise exception
3389 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3393 vnfr_data
.get("_id"),
3394 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3397 # ignore to keep original exception
3399 # reraise original error
3404 async def deploy_kdus(
3411 task_instantiation_info
,
3413 # Launch kdus if present in the descriptor
3415 k8scluster_id_2_uuic
= {
3416 "helm-chart-v3": {},
3421 async def _get_cluster_id(cluster_id
, cluster_type
):
3422 nonlocal k8scluster_id_2_uuic
3423 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3424 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3426 # check if K8scluster is creating and wait look if previous tasks in process
3427 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3428 "k8scluster", cluster_id
3431 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3432 task_name
, cluster_id
3434 self
.logger
.debug(logging_text
+ text
)
3435 await asyncio
.wait(task_dependency
, timeout
=3600)
3437 db_k8scluster
= self
.db
.get_one(
3438 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3440 if not db_k8scluster
:
3441 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3443 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3445 if cluster_type
== "helm-chart-v3":
3447 # backward compatibility for existing clusters that have not been initialized for helm v3
3448 k8s_credentials
= yaml
.safe_dump(
3449 db_k8scluster
.get("credentials")
3451 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3452 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3454 db_k8scluster_update
= {}
3455 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3456 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3457 db_k8scluster_update
[
3458 "_admin.helm-chart-v3.created"
3460 db_k8scluster_update
[
3461 "_admin.helm-chart-v3.operationalState"
3464 "k8sclusters", cluster_id
, db_k8scluster_update
3466 except Exception as e
:
3469 + "error initializing helm-v3 cluster: {}".format(str(e
))
3472 "K8s cluster '{}' has not been initialized for '{}'".format(
3473 cluster_id
, cluster_type
3478 "K8s cluster '{}' has not been initialized for '{}'".format(
3479 cluster_id
, cluster_type
3482 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3485 logging_text
+= "Deploy kdus: "
3488 db_nsr_update
= {"_admin.deployed.K8s": []}
3489 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3492 updated_cluster_list
= []
3493 updated_v3_cluster_list
= []
3495 for vnfr_data
in db_vnfrs
.values():
3496 vca_id
= self
.get_vca_id(vnfr_data
, {})
3497 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3498 # Step 0: Prepare and set parameters
3499 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3500 vnfd_id
= vnfr_data
.get("vnfd-id")
3501 vnfd_with_id
= find_in_list(
3502 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3506 for kdud
in vnfd_with_id
["kdu"]
3507 if kdud
["name"] == kdur
["kdu-name"]
3509 namespace
= kdur
.get("k8s-namespace")
3510 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3511 if kdur
.get("helm-chart"):
3512 kdumodel
= kdur
["helm-chart"]
3513 # Default version: helm3, if helm-version is v2 assign v2
3514 k8sclustertype
= "helm-chart-v3"
3515 self
.logger
.debug("kdur: {}".format(kdur
))
3517 kdur
.get("helm-version")
3518 and kdur
.get("helm-version") == "v2"
3520 k8sclustertype
= "helm-chart"
3521 elif kdur
.get("juju-bundle"):
3522 kdumodel
= kdur
["juju-bundle"]
3523 k8sclustertype
= "juju-bundle"
3526 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3527 "juju-bundle. Maybe an old NBI version is running".format(
3528 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3531 # check if kdumodel is a file and exists
3533 vnfd_with_id
= find_in_list(
3534 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3536 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3537 if storage
: # may be not present if vnfd has not artifacts
3538 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3539 if storage
["pkg-dir"]:
3540 filename
= "{}/{}/{}s/{}".format(
3547 filename
= "{}/Scripts/{}s/{}".format(
3552 if self
.fs
.file_exists(
3553 filename
, mode
="file"
3554 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3555 kdumodel
= self
.fs
.path
+ filename
3556 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3558 except Exception: # it is not a file
3561 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3562 step
= "Synchronize repos for k8s cluster '{}'".format(
3565 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3569 k8sclustertype
== "helm-chart"
3570 and cluster_uuid
not in updated_cluster_list
3572 k8sclustertype
== "helm-chart-v3"
3573 and cluster_uuid
not in updated_v3_cluster_list
3575 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3576 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3577 cluster_uuid
=cluster_uuid
3580 if del_repo_list
or added_repo_dict
:
3581 if k8sclustertype
== "helm-chart":
3583 "_admin.helm_charts_added." + item
: None
3584 for item
in del_repo_list
3587 "_admin.helm_charts_added." + item
: name
3588 for item
, name
in added_repo_dict
.items()
3590 updated_cluster_list
.append(cluster_uuid
)
3591 elif k8sclustertype
== "helm-chart-v3":
3593 "_admin.helm_charts_v3_added." + item
: None
3594 for item
in del_repo_list
3597 "_admin.helm_charts_v3_added." + item
: name
3598 for item
, name
in added_repo_dict
.items()
3600 updated_v3_cluster_list
.append(cluster_uuid
)
3602 logging_text
+ "repos synchronized on k8s cluster "
3603 "'{}' to_delete: {}, to_add: {}".format(
3604 k8s_cluster_id
, del_repo_list
, added_repo_dict
3609 {"_id": k8s_cluster_id
},
3615 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3616 vnfr_data
["member-vnf-index-ref"],
3620 k8s_instance_info
= {
3621 "kdu-instance": None,
3622 "k8scluster-uuid": cluster_uuid
,
3623 "k8scluster-type": k8sclustertype
,
3624 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3625 "kdu-name": kdur
["kdu-name"],
3626 "kdu-model": kdumodel
,
3627 "namespace": namespace
,
3628 "kdu-deployment-name": kdu_deployment_name
,
3630 db_path
= "_admin.deployed.K8s.{}".format(index
)
3631 db_nsr_update
[db_path
] = k8s_instance_info
3632 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3633 vnfd_with_id
= find_in_list(
3634 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3636 task
= asyncio
.ensure_future(
3645 k8params
=desc_params
,
3650 self
.lcm_tasks
.register(
3654 "instantiate_KDU-{}".format(index
),
3657 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3663 except (LcmException
, asyncio
.CancelledError
):
3665 except Exception as e
:
3666 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3667 if isinstance(e
, (N2VCException
, DbException
)):
3668 self
.logger
.error(logging_text
+ msg
)
3670 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3671 raise LcmException(msg
)
3674 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3693 task_instantiation_info
,
3696 # launch instantiate_N2VC in a asyncio task and register task object
3697 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3698 # if not found, create one entry and update database
3699 # fill db_nsr._admin.deployed.VCA.<index>
3702 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3704 if "execution-environment-list" in descriptor_config
:
3705 ee_list
= descriptor_config
.get("execution-environment-list", [])
3706 elif "juju" in descriptor_config
:
3707 ee_list
= [descriptor_config
] # ns charms
3708 else: # other types as script are not supported
3711 for ee_item
in ee_list
:
3714 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3715 ee_item
.get("juju"), ee_item
.get("helm-chart")
3718 ee_descriptor_id
= ee_item
.get("id")
3719 if ee_item
.get("juju"):
3720 vca_name
= ee_item
["juju"].get("charm")
3723 if ee_item
["juju"].get("charm") is not None
3726 if ee_item
["juju"].get("cloud") == "k8s":
3727 vca_type
= "k8s_proxy_charm"
3728 elif ee_item
["juju"].get("proxy") is False:
3729 vca_type
= "native_charm"
3730 elif ee_item
.get("helm-chart"):
3731 vca_name
= ee_item
["helm-chart"]
3732 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3735 vca_type
= "helm-v3"
3738 logging_text
+ "skipping non juju neither charm configuration"
3743 for vca_index
, vca_deployed
in enumerate(
3744 db_nsr
["_admin"]["deployed"]["VCA"]
3746 if not vca_deployed
:
3749 vca_deployed
.get("member-vnf-index") == member_vnf_index
3750 and vca_deployed
.get("vdu_id") == vdu_id
3751 and vca_deployed
.get("kdu_name") == kdu_name
3752 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3753 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3757 # not found, create one.
3759 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3762 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3764 target
+= "/kdu/{}".format(kdu_name
)
3766 "target_element": target
,
3767 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3768 "member-vnf-index": member_vnf_index
,
3770 "kdu_name": kdu_name
,
3771 "vdu_count_index": vdu_index
,
3772 "operational-status": "init", # TODO revise
3773 "detailed-status": "", # TODO revise
3774 "step": "initial-deploy", # TODO revise
3776 "vdu_name": vdu_name
,
3778 "ee_descriptor_id": ee_descriptor_id
,
3782 # create VCA and configurationStatus in db
3784 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3785 "configurationStatus.{}".format(vca_index
): dict(),
3787 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3789 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3791 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3792 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3793 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3796 task_n2vc
= asyncio
.ensure_future(
3797 self
.instantiate_N2VC(
3798 logging_text
=logging_text
,
3799 vca_index
=vca_index
,
3805 vdu_index
=vdu_index
,
3806 deploy_params
=deploy_params
,
3807 config_descriptor
=descriptor_config
,
3808 base_folder
=base_folder
,
3809 nslcmop_id
=nslcmop_id
,
3813 ee_config_descriptor
=ee_item
,
3816 self
.lcm_tasks
.register(
3820 "instantiate_N2VC-{}".format(vca_index
),
3823 task_instantiation_info
[
3825 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3826 member_vnf_index
or "", vdu_id
or ""
3830 def _create_nslcmop(nsr_id
, operation
, params
):
3832 Creates a ns-lcm-opp content to be stored at database.
3833 :param nsr_id: internal id of the instance
3834 :param operation: instantiate, terminate, scale, action, ...
3835 :param params: user parameters for the operation
3836 :return: dictionary following SOL005 format
3838 # Raise exception if invalid arguments
3839 if not (nsr_id
and operation
and params
):
3841 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3848 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3849 "operationState": "PROCESSING",
3850 "statusEnteredTime": now
,
3851 "nsInstanceId": nsr_id
,
3852 "lcmOperationType": operation
,
3854 "isAutomaticInvocation": False,
3855 "operationParams": params
,
3856 "isCancelPending": False,
3858 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3859 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3864 def _format_additional_params(self
, params
):
3865 params
= params
or {}
3866 for key
, value
in params
.items():
3867 if str(value
).startswith("!!yaml "):
3868 params
[key
] = yaml
.safe_load(value
[7:])
3871 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3872 primitive
= seq
.get("name")
3873 primitive_params
= {}
3875 "member_vnf_index": vnf_index
,
3876 "primitive": primitive
,
3877 "primitive_params": primitive_params
,
3880 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3884 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3885 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3886 if op
.get("operationState") == "COMPLETED":
3887 # b. Skip sub-operation
3888 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3889 return self
.SUBOPERATION_STATUS_SKIP
3891 # c. retry executing sub-operation
3892 # The sub-operation exists, and operationState != 'COMPLETED'
3893 # Update operationState = 'PROCESSING' to indicate a retry.
3894 operationState
= "PROCESSING"
3895 detailed_status
= "In progress"
3896 self
._update
_suboperation
_status
(
3897 db_nslcmop
, op_index
, operationState
, detailed_status
3899 # Return the sub-operation index
3900 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3901 # with arguments extracted from the sub-operation
3904 # Find a sub-operation where all keys in a matching dictionary must match
3905 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3906 def _find_suboperation(self
, db_nslcmop
, match
):
3907 if db_nslcmop
and match
:
3908 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3909 for i
, op
in enumerate(op_list
):
3910 if all(op
.get(k
) == match
[k
] for k
in match
):
3912 return self
.SUBOPERATION_STATUS_NOT_FOUND
3914 # Update status for a sub-operation given its index
3915 def _update_suboperation_status(
3916 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3918 # Update DB for HA tasks
3919 q_filter
= {"_id": db_nslcmop
["_id"]}
3921 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3922 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3925 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3928 # Add sub-operation, return the index of the added sub-operation
3929 # Optionally, set operationState, detailed-status, and operationType
3930 # Status and type are currently set for 'scale' sub-operations:
3931 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3932 # 'detailed-status' : status message
3933 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3934 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3935 def _add_suboperation(
3943 mapped_primitive_params
,
3944 operationState
=None,
3945 detailed_status
=None,
3948 RO_scaling_info
=None,
3951 return self
.SUBOPERATION_STATUS_NOT_FOUND
3952 # Get the "_admin.operations" list, if it exists
3953 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3954 op_list
= db_nslcmop_admin
.get("operations")
3955 # Create or append to the "_admin.operations" list
3957 "member_vnf_index": vnf_index
,
3959 "vdu_count_index": vdu_count_index
,
3960 "primitive": primitive
,
3961 "primitive_params": mapped_primitive_params
,
3964 new_op
["operationState"] = operationState
3966 new_op
["detailed-status"] = detailed_status
3968 new_op
["lcmOperationType"] = operationType
3970 new_op
["RO_nsr_id"] = RO_nsr_id
3972 new_op
["RO_scaling_info"] = RO_scaling_info
3974 # No existing operations, create key 'operations' with current operation as first list element
3975 db_nslcmop_admin
.update({"operations": [new_op
]})
3976 op_list
= db_nslcmop_admin
.get("operations")
3978 # Existing operations, append operation to list
3979 op_list
.append(new_op
)
3981 db_nslcmop_update
= {"_admin.operations": op_list
}
3982 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3983 op_index
= len(op_list
) - 1
3986 # Helper methods for scale() sub-operations
3988 # pre-scale/post-scale:
3989 # Check for 3 different cases:
3990 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3991 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3992 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3993 def _check_or_add_scale_suboperation(
3997 vnf_config_primitive
,
4001 RO_scaling_info
=None,
4003 # Find this sub-operation
4004 if RO_nsr_id
and RO_scaling_info
:
4005 operationType
= "SCALE-RO"
4007 "member_vnf_index": vnf_index
,
4008 "RO_nsr_id": RO_nsr_id
,
4009 "RO_scaling_info": RO_scaling_info
,
4013 "member_vnf_index": vnf_index
,
4014 "primitive": vnf_config_primitive
,
4015 "primitive_params": primitive_params
,
4016 "lcmOperationType": operationType
,
4018 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4019 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4020 # a. New sub-operation
4021 # The sub-operation does not exist, add it.
4022 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4023 # The following parameters are set to None for all kind of scaling:
4025 vdu_count_index
= None
4027 if RO_nsr_id
and RO_scaling_info
:
4028 vnf_config_primitive
= None
4029 primitive_params
= None
4032 RO_scaling_info
= None
4033 # Initial status for sub-operation
4034 operationState
= "PROCESSING"
4035 detailed_status
= "In progress"
4036 # Add sub-operation for pre/post-scaling (zero or more operations)
4037 self
._add
_suboperation
(
4043 vnf_config_primitive
,
4051 return self
.SUBOPERATION_STATUS_NEW
4053 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4054 # or op_index (operationState != 'COMPLETED')
4055 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4057 # Function to return execution_environment id
4059 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4060 # TODO vdu_index_count
4061 for vca
in vca_deployed_list
:
4062 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4065 async def destroy_N2VC(
4073 exec_primitives
=True,
4078 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4079 :param logging_text:
4081 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4082 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4083 :param vca_index: index in the database _admin.deployed.VCA
4084 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4085 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4086 not executed properly
4087 :param scaling_in: True destroys the application, False destroys the model
4088 :return: None or exception
4093 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4094 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4098 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4100 # execute terminate_primitives
4102 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4103 config_descriptor
.get("terminate-config-primitive"),
4104 vca_deployed
.get("ee_descriptor_id"),
4106 vdu_id
= vca_deployed
.get("vdu_id")
4107 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4108 vdu_name
= vca_deployed
.get("vdu_name")
4109 vnf_index
= vca_deployed
.get("member-vnf-index")
4110 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4111 for seq
in terminate_primitives
:
4112 # For each sequence in list, get primitive and call _ns_execute_primitive()
4113 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4114 vnf_index
, seq
.get("name")
4116 self
.logger
.debug(logging_text
+ step
)
4117 # Create the primitive for each sequence, i.e. "primitive": "touch"
4118 primitive
= seq
.get("name")
4119 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4124 self
._add
_suboperation
(
4131 mapped_primitive_params
,
4133 # Sub-operations: Call _ns_execute_primitive() instead of action()
4135 result
, result_detail
= await self
._ns
_execute
_primitive
(
4136 vca_deployed
["ee_id"],
4138 mapped_primitive_params
,
4142 except LcmException
:
4143 # this happens when VCA is not deployed. In this case it is not needed to terminate
4145 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4146 if result
not in result_ok
:
4148 "terminate_primitive {} for vnf_member_index={} fails with "
4149 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4151 # set that this VCA do not need terminated
4152 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4156 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4159 # Delete Prometheus Jobs if any
4160 # This uses NSR_ID, so it will destroy any jobs under this index
4161 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4164 await self
.vca_map
[vca_type
].delete_execution_environment(
4165 vca_deployed
["ee_id"],
4166 scaling_in
=scaling_in
,
4171 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4172 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4173 namespace
= "." + db_nsr
["_id"]
4175 await self
.n2vc
.delete_namespace(
4176 namespace
=namespace
,
4177 total_timeout
=self
.timeout_charm_delete
,
4180 except N2VCNotFound
: # already deleted. Skip
4182 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4184 async def _terminate_RO(
4185 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4188 Terminates a deployment from RO
4189 :param logging_text:
4190 :param nsr_deployed: db_nsr._admin.deployed
4193 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4194 this method will update only the index 2, but it will write on database the concatenated content of the list
4199 ro_nsr_id
= ro_delete_action
= None
4200 if nsr_deployed
and nsr_deployed
.get("RO"):
4201 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4202 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4205 stage
[2] = "Deleting ns from VIM."
4206 db_nsr_update
["detailed-status"] = " ".join(stage
)
4207 self
._write
_op
_status
(nslcmop_id
, stage
)
4208 self
.logger
.debug(logging_text
+ stage
[2])
4209 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4210 self
._write
_op
_status
(nslcmop_id
, stage
)
4211 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4212 ro_delete_action
= desc
["action_id"]
4214 "_admin.deployed.RO.nsr_delete_action_id"
4215 ] = ro_delete_action
4216 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4217 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4218 if ro_delete_action
:
4219 # wait until NS is deleted from VIM
4220 stage
[2] = "Waiting ns deleted from VIM."
4221 detailed_status_old
= None
4225 + " RO_id={} ro_delete_action={}".format(
4226 ro_nsr_id
, ro_delete_action
4229 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4230 self
._write
_op
_status
(nslcmop_id
, stage
)
4232 delete_timeout
= 20 * 60 # 20 minutes
4233 while delete_timeout
> 0:
4234 desc
= await self
.RO
.show(
4236 item_id_name
=ro_nsr_id
,
4237 extra_item
="action",
4238 extra_item_id
=ro_delete_action
,
4242 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4244 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4245 if ns_status
== "ERROR":
4246 raise ROclient
.ROClientException(ns_status_info
)
4247 elif ns_status
== "BUILD":
4248 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4249 elif ns_status
== "ACTIVE":
4250 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4251 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4256 ), "ROclient.check_action_status returns unknown {}".format(
4259 if stage
[2] != detailed_status_old
:
4260 detailed_status_old
= stage
[2]
4261 db_nsr_update
["detailed-status"] = " ".join(stage
)
4262 self
._write
_op
_status
(nslcmop_id
, stage
)
4263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4264 await asyncio
.sleep(5, loop
=self
.loop
)
4266 else: # delete_timeout <= 0:
4267 raise ROclient
.ROClientException(
4268 "Timeout waiting ns deleted from VIM"
4271 except Exception as e
:
4272 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4274 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4276 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4277 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4278 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4280 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4283 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4285 failed_detail
.append("delete conflict: {}".format(e
))
4288 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4291 failed_detail
.append("delete error: {}".format(e
))
4293 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4297 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4298 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4300 stage
[2] = "Deleting nsd from RO."
4301 db_nsr_update
["detailed-status"] = " ".join(stage
)
4302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4303 self
._write
_op
_status
(nslcmop_id
, stage
)
4304 await self
.RO
.delete("nsd", ro_nsd_id
)
4306 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4308 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4309 except Exception as e
:
4311 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4313 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4315 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4318 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4320 failed_detail
.append(
4321 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4323 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4325 failed_detail
.append(
4326 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4328 self
.logger
.error(logging_text
+ failed_detail
[-1])
4330 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4331 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4332 if not vnf_deployed
or not vnf_deployed
["id"]:
4335 ro_vnfd_id
= vnf_deployed
["id"]
4338 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4339 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4341 db_nsr_update
["detailed-status"] = " ".join(stage
)
4342 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4343 self
._write
_op
_status
(nslcmop_id
, stage
)
4344 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4346 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4348 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4349 except Exception as e
:
4351 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4354 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4358 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4361 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4363 failed_detail
.append(
4364 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4366 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4368 failed_detail
.append(
4369 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4371 self
.logger
.error(logging_text
+ failed_detail
[-1])
4374 stage
[2] = "Error deleting from VIM"
4376 stage
[2] = "Deleted from VIM"
4377 db_nsr_update
["detailed-status"] = " ".join(stage
)
4378 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4379 self
._write
_op
_status
(nslcmop_id
, stage
)
4382 raise LcmException("; ".join(failed_detail
))
4384 async def terminate(self
, nsr_id
, nslcmop_id
):
4385 # Try to lock HA task here
4386 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4387 if not task_is_locked_by_me
:
4390 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4391 self
.logger
.debug(logging_text
+ "Enter")
4392 timeout_ns_terminate
= self
.timeout_ns_terminate
4395 operation_params
= None
4397 error_list
= [] # annotates all failed error messages
4398 db_nslcmop_update
= {}
4399 autoremove
= False # autoremove after terminated
4400 tasks_dict_info
= {}
4403 "Stage 1/3: Preparing task.",
4404 "Waiting for previous operations to terminate.",
4407 # ^ contains [stage, step, VIM-status]
4409 # wait for any previous tasks in process
4410 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4412 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4413 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4414 operation_params
= db_nslcmop
.get("operationParams") or {}
4415 if operation_params
.get("timeout_ns_terminate"):
4416 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4417 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4418 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4420 db_nsr_update
["operational-status"] = "terminating"
4421 db_nsr_update
["config-status"] = "terminating"
4422 self
._write
_ns
_status
(
4424 ns_state
="TERMINATING",
4425 current_operation
="TERMINATING",
4426 current_operation_id
=nslcmop_id
,
4427 other_update
=db_nsr_update
,
4429 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4430 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4431 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4434 stage
[1] = "Getting vnf descriptors from db."
4435 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4437 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4439 db_vnfds_from_id
= {}
4440 db_vnfds_from_member_index
= {}
4442 for vnfr
in db_vnfrs_list
:
4443 vnfd_id
= vnfr
["vnfd-id"]
4444 if vnfd_id
not in db_vnfds_from_id
:
4445 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4446 db_vnfds_from_id
[vnfd_id
] = vnfd
4447 db_vnfds_from_member_index
[
4448 vnfr
["member-vnf-index-ref"]
4449 ] = db_vnfds_from_id
[vnfd_id
]
4451 # Destroy individual execution environments when there are terminating primitives.
4452 # Rest of EE will be deleted at once
4453 # TODO - check before calling _destroy_N2VC
4454 # if not operation_params.get("skip_terminate_primitives"):#
4455 # or not vca.get("needed_terminate"):
4456 stage
[0] = "Stage 2/3 execute terminating primitives."
4457 self
.logger
.debug(logging_text
+ stage
[0])
4458 stage
[1] = "Looking execution environment that needs terminate."
4459 self
.logger
.debug(logging_text
+ stage
[1])
4461 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4462 config_descriptor
= None
4463 vca_member_vnf_index
= vca
.get("member-vnf-index")
4464 vca_id
= self
.get_vca_id(
4465 db_vnfrs_dict
.get(vca_member_vnf_index
)
4466 if vca_member_vnf_index
4470 if not vca
or not vca
.get("ee_id"):
4472 if not vca
.get("member-vnf-index"):
4474 config_descriptor
= db_nsr
.get("ns-configuration")
4475 elif vca
.get("vdu_id"):
4476 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4477 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4478 elif vca
.get("kdu_name"):
4479 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4480 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4482 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4483 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4484 vca_type
= vca
.get("type")
4485 exec_terminate_primitives
= not operation_params
.get(
4486 "skip_terminate_primitives"
4487 ) and vca
.get("needed_terminate")
4488 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4489 # pending native charms
4491 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4493 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4494 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4495 task
= asyncio
.ensure_future(
4503 exec_terminate_primitives
,
4507 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4509 # wait for pending tasks of terminate primitives
4513 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4515 error_list
= await self
._wait
_for
_tasks
(
4518 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4522 tasks_dict_info
.clear()
4524 return # raise LcmException("; ".join(error_list))
4526 # remove All execution environments at once
4527 stage
[0] = "Stage 3/3 delete all."
4529 if nsr_deployed
.get("VCA"):
4530 stage
[1] = "Deleting all execution environments."
4531 self
.logger
.debug(logging_text
+ stage
[1])
4532 vca_id
= self
.get_vca_id({}, db_nsr
)
4533 task_delete_ee
= asyncio
.ensure_future(
4535 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4536 timeout
=self
.timeout_charm_delete
,
4539 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4540 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4542 # Delete from k8scluster
4543 stage
[1] = "Deleting KDUs."
4544 self
.logger
.debug(logging_text
+ stage
[1])
4545 # print(nsr_deployed)
4546 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4547 if not kdu
or not kdu
.get("kdu-instance"):
4549 kdu_instance
= kdu
.get("kdu-instance")
4550 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4551 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4552 vca_id
= self
.get_vca_id({}, db_nsr
)
4553 task_delete_kdu_instance
= asyncio
.ensure_future(
4554 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4555 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4556 kdu_instance
=kdu_instance
,
4558 namespace
=kdu
.get("namespace"),
4564 + "Unknown k8s deployment type {}".format(
4565 kdu
.get("k8scluster-type")
4570 task_delete_kdu_instance
4571 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4574 stage
[1] = "Deleting ns from VIM."
4576 task_delete_ro
= asyncio
.ensure_future(
4577 self
._terminate
_ng
_ro
(
4578 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4582 task_delete_ro
= asyncio
.ensure_future(
4584 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4587 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4589 # rest of staff will be done at finally
4592 ROclient
.ROClientException
,
4597 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4599 except asyncio
.CancelledError
:
4601 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4603 exc
= "Operation was cancelled"
4604 except Exception as e
:
4605 exc
= traceback
.format_exc()
4606 self
.logger
.critical(
4607 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4612 error_list
.append(str(exc
))
4614 # wait for pending tasks
4616 stage
[1] = "Waiting for terminate pending tasks."
4617 self
.logger
.debug(logging_text
+ stage
[1])
4618 error_list
+= await self
._wait
_for
_tasks
(
4621 timeout_ns_terminate
,
4625 stage
[1] = stage
[2] = ""
4626 except asyncio
.CancelledError
:
4627 error_list
.append("Cancelled")
4628 # TODO cancell all tasks
4629 except Exception as exc
:
4630 error_list
.append(str(exc
))
4631 # update status at database
4633 error_detail
= "; ".join(error_list
)
4634 # self.logger.error(logging_text + error_detail)
4635 error_description_nslcmop
= "{} Detail: {}".format(
4636 stage
[0], error_detail
4638 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4639 nslcmop_id
, stage
[0]
4642 db_nsr_update
["operational-status"] = "failed"
4643 db_nsr_update
["detailed-status"] = (
4644 error_description_nsr
+ " Detail: " + error_detail
4646 db_nslcmop_update
["detailed-status"] = error_detail
4647 nslcmop_operation_state
= "FAILED"
4651 error_description_nsr
= error_description_nslcmop
= None
4652 ns_state
= "NOT_INSTANTIATED"
4653 db_nsr_update
["operational-status"] = "terminated"
4654 db_nsr_update
["detailed-status"] = "Done"
4655 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4656 db_nslcmop_update
["detailed-status"] = "Done"
4657 nslcmop_operation_state
= "COMPLETED"
4660 self
._write
_ns
_status
(
4663 current_operation
="IDLE",
4664 current_operation_id
=None,
4665 error_description
=error_description_nsr
,
4666 error_detail
=error_detail
,
4667 other_update
=db_nsr_update
,
4669 self
._write
_op
_status
(
4672 error_message
=error_description_nslcmop
,
4673 operation_state
=nslcmop_operation_state
,
4674 other_update
=db_nslcmop_update
,
4676 if ns_state
== "NOT_INSTANTIATED":
4680 {"nsr-id-ref": nsr_id
},
4681 {"_admin.nsState": "NOT_INSTANTIATED"},
4683 except DbException
as e
:
4686 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4690 if operation_params
:
4691 autoremove
= operation_params
.get("autoremove", False)
4692 if nslcmop_operation_state
:
4694 await self
.msg
.aiowrite(
4699 "nslcmop_id": nslcmop_id
,
4700 "operationState": nslcmop_operation_state
,
4701 "autoremove": autoremove
,
4705 except Exception as e
:
4707 logging_text
+ "kafka_write notification Exception {}".format(e
)
4710 self
.logger
.debug(logging_text
+ "Exit")
4711 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4713 async def _wait_for_tasks(
4714 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4717 error_detail_list
= []
4719 pending_tasks
= list(created_tasks_info
.keys())
4720 num_tasks
= len(pending_tasks
)
4722 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4723 self
._write
_op
_status
(nslcmop_id
, stage
)
4724 while pending_tasks
:
4726 _timeout
= timeout
+ time_start
- time()
4727 done
, pending_tasks
= await asyncio
.wait(
4728 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4730 num_done
+= len(done
)
4731 if not done
: # Timeout
4732 for task
in pending_tasks
:
4733 new_error
= created_tasks_info
[task
] + ": Timeout"
4734 error_detail_list
.append(new_error
)
4735 error_list
.append(new_error
)
4738 if task
.cancelled():
4741 exc
= task
.exception()
4743 if isinstance(exc
, asyncio
.TimeoutError
):
4745 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4746 error_list
.append(created_tasks_info
[task
])
4747 error_detail_list
.append(new_error
)
4754 ROclient
.ROClientException
,
4760 self
.logger
.error(logging_text
+ new_error
)
4762 exc_traceback
= "".join(
4763 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4767 + created_tasks_info
[task
]
4773 logging_text
+ created_tasks_info
[task
] + ": Done"
4775 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4777 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4778 if nsr_id
: # update also nsr
4783 "errorDescription": "Error at: " + ", ".join(error_list
),
4784 "errorDetail": ". ".join(error_detail_list
),
4787 self
._write
_op
_status
(nslcmop_id
, stage
)
4788 return error_detail_list
4791 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4793 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4794 The default-value is used. If it is between < > it look for a value at instantiation_params
4795 :param primitive_desc: portion of VNFD/NSD that describes primitive
4796 :param params: Params provided by user
4797 :param instantiation_params: Instantiation params provided by user
4798 :return: a dictionary with the calculated params
4800 calculated_params
= {}
4801 for parameter
in primitive_desc
.get("parameter", ()):
4802 param_name
= parameter
["name"]
4803 if param_name
in params
:
4804 calculated_params
[param_name
] = params
[param_name
]
4805 elif "default-value" in parameter
or "value" in parameter
:
4806 if "value" in parameter
:
4807 calculated_params
[param_name
] = parameter
["value"]
4809 calculated_params
[param_name
] = parameter
["default-value"]
4811 isinstance(calculated_params
[param_name
], str)
4812 and calculated_params
[param_name
].startswith("<")
4813 and calculated_params
[param_name
].endswith(">")
4815 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4816 calculated_params
[param_name
] = instantiation_params
[
4817 calculated_params
[param_name
][1:-1]
4821 "Parameter {} needed to execute primitive {} not provided".format(
4822 calculated_params
[param_name
], primitive_desc
["name"]
4827 "Parameter {} needed to execute primitive {} not provided".format(
4828 param_name
, primitive_desc
["name"]
4832 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4833 calculated_params
[param_name
] = yaml
.safe_dump(
4834 calculated_params
[param_name
], default_flow_style
=True, width
=256
4836 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4838 ].startswith("!!yaml "):
4839 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4840 if parameter
.get("data-type") == "INTEGER":
4842 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4843 except ValueError: # error converting string to int
4845 "Parameter {} of primitive {} must be integer".format(
4846 param_name
, primitive_desc
["name"]
4849 elif parameter
.get("data-type") == "BOOLEAN":
4850 calculated_params
[param_name
] = not (
4851 (str(calculated_params
[param_name
])).lower() == "false"
4854 # add always ns_config_info if primitive name is config
4855 if primitive_desc
["name"] == "config":
4856 if "ns_config_info" in instantiation_params
:
4857 calculated_params
["ns_config_info"] = instantiation_params
[
4860 return calculated_params
4862 def _look_for_deployed_vca(
4869 ee_descriptor_id
=None,
4871 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4872 for vca
in deployed_vca
:
4875 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4878 vdu_count_index
is not None
4879 and vdu_count_index
!= vca
["vdu_count_index"]
4882 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4884 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4888 # vca_deployed not found
4890 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4891 " is not deployed".format(
4900 ee_id
= vca
.get("ee_id")
4902 "type", "lxc_proxy_charm"
4903 ) # default value for backward compatibility - proxy charm
4906 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4907 "execution environment".format(
4908 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4911 return ee_id
, vca_type
4913 async def _ns_execute_primitive(
4919 retries_interval
=30,
4926 if primitive
== "config":
4927 primitive_params
= {"params": primitive_params
}
4929 vca_type
= vca_type
or "lxc_proxy_charm"
4933 output
= await asyncio
.wait_for(
4934 self
.vca_map
[vca_type
].exec_primitive(
4936 primitive_name
=primitive
,
4937 params_dict
=primitive_params
,
4938 progress_timeout
=self
.timeout_progress_primitive
,
4939 total_timeout
=self
.timeout_primitive
,
4944 timeout
=timeout
or self
.timeout_primitive
,
4948 except asyncio
.CancelledError
:
4950 except Exception as e
: # asyncio.TimeoutError
4951 if isinstance(e
, asyncio
.TimeoutError
):
4956 "Error executing action {} on {} -> {}".format(
4961 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4963 return "FAILED", str(e
)
4965 return "COMPLETED", output
4967 except (LcmException
, asyncio
.CancelledError
):
4969 except Exception as e
:
4970 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4972 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4974 Updating the vca_status with latest juju information in nsrs record
4975 :param: nsr_id: Id of the nsr
4976 :param: nslcmop_id: Id of the nslcmop
4980 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4981 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4982 vca_id
= self
.get_vca_id({}, db_nsr
)
4983 if db_nsr
["_admin"]["deployed"]["K8s"]:
4984 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4985 cluster_uuid
, kdu_instance
, cluster_type
= (
4986 k8s
["k8scluster-uuid"],
4987 k8s
["kdu-instance"],
4988 k8s
["k8scluster-type"],
4990 await self
._on
_update
_k
8s
_db
(
4991 cluster_uuid
=cluster_uuid
,
4992 kdu_instance
=kdu_instance
,
4993 filter={"_id": nsr_id
},
4995 cluster_type
=cluster_type
,
4998 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4999 table
, filter = "nsrs", {"_id": nsr_id
}
5000 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5001 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5003 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5004 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5006 async def action(self
, nsr_id
, nslcmop_id
):
5007 # Try to lock HA task here
5008 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5009 if not task_is_locked_by_me
:
5012 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5013 self
.logger
.debug(logging_text
+ "Enter")
5014 # get all needed from database
5018 db_nslcmop_update
= {}
5019 nslcmop_operation_state
= None
5020 error_description_nslcmop
= None
5023 # wait for any previous tasks in process
5024 step
= "Waiting for previous operations to terminate"
5025 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5027 self
._write
_ns
_status
(
5030 current_operation
="RUNNING ACTION",
5031 current_operation_id
=nslcmop_id
,
5034 step
= "Getting information from database"
5035 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5036 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5037 if db_nslcmop
["operationParams"].get("primitive_params"):
5038 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5039 db_nslcmop
["operationParams"]["primitive_params"]
5042 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5043 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5044 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5045 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5046 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5047 primitive
= db_nslcmop
["operationParams"]["primitive"]
5048 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5049 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5050 "timeout_ns_action", self
.timeout_primitive
5054 step
= "Getting vnfr from database"
5055 db_vnfr
= self
.db
.get_one(
5056 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5058 if db_vnfr
.get("kdur"):
5060 for kdur
in db_vnfr
["kdur"]:
5061 if kdur
.get("additionalParams"):
5062 kdur
["additionalParams"] = json
.loads(
5063 kdur
["additionalParams"]
5065 kdur_list
.append(kdur
)
5066 db_vnfr
["kdur"] = kdur_list
5067 step
= "Getting vnfd from database"
5068 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5070 # Sync filesystem before running a primitive
5071 self
.fs
.sync(db_vnfr
["vnfd-id"])
5073 step
= "Getting nsd from database"
5074 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5076 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5077 # for backward compatibility
5078 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5079 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5080 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5081 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5083 # look for primitive
5084 config_primitive_desc
= descriptor_configuration
= None
5086 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5088 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5090 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5092 descriptor_configuration
= db_nsd
.get("ns-configuration")
5094 if descriptor_configuration
and descriptor_configuration
.get(
5097 for config_primitive
in descriptor_configuration
["config-primitive"]:
5098 if config_primitive
["name"] == primitive
:
5099 config_primitive_desc
= config_primitive
5102 if not config_primitive_desc
:
5103 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5105 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5109 primitive_name
= primitive
5110 ee_descriptor_id
= None
5112 primitive_name
= config_primitive_desc
.get(
5113 "execution-environment-primitive", primitive
5115 ee_descriptor_id
= config_primitive_desc
.get(
5116 "execution-environment-ref"
5122 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5124 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5127 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5129 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5131 desc_params
= parse_yaml_strings(
5132 db_vnfr
.get("additionalParamsForVnf")
5135 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5136 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5137 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5139 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5140 actions
.add(primitive
["name"])
5141 for primitive
in kdu_configuration
.get("config-primitive", []):
5142 actions
.add(primitive
["name"])
5144 nsr_deployed
["K8s"],
5145 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5146 and kdu
["member-vnf-index"] == vnf_index
,
5150 if primitive_name
in actions
5151 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5155 # TODO check if ns is in a proper status
5157 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5159 # kdur and desc_params already set from before
5160 if primitive_params
:
5161 desc_params
.update(primitive_params
)
5162 # TODO Check if we will need something at vnf level
5163 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5165 kdu_name
== kdu
["kdu-name"]
5166 and kdu
["member-vnf-index"] == vnf_index
5171 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5174 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5175 msg
= "unknown k8scluster-type '{}'".format(
5176 kdu
.get("k8scluster-type")
5178 raise LcmException(msg
)
5181 "collection": "nsrs",
5182 "filter": {"_id": nsr_id
},
5183 "path": "_admin.deployed.K8s.{}".format(index
),
5187 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5189 step
= "Executing kdu {}".format(primitive_name
)
5190 if primitive_name
== "upgrade":
5191 if desc_params
.get("kdu_model"):
5192 kdu_model
= desc_params
.get("kdu_model")
5193 del desc_params
["kdu_model"]
5195 kdu_model
= kdu
.get("kdu-model")
5196 parts
= kdu_model
.split(sep
=":")
5198 kdu_model
= parts
[0]
5200 detailed_status
= await asyncio
.wait_for(
5201 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5202 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5203 kdu_instance
=kdu
.get("kdu-instance"),
5205 kdu_model
=kdu_model
,
5208 timeout
=timeout_ns_action
,
5210 timeout
=timeout_ns_action
+ 10,
5213 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5215 elif primitive_name
== "rollback":
5216 detailed_status
= await asyncio
.wait_for(
5217 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5218 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5219 kdu_instance
=kdu
.get("kdu-instance"),
5222 timeout
=timeout_ns_action
,
5224 elif primitive_name
== "status":
5225 detailed_status
= await asyncio
.wait_for(
5226 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5227 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5228 kdu_instance
=kdu
.get("kdu-instance"),
5231 timeout
=timeout_ns_action
,
5234 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5235 kdu
["kdu-name"], nsr_id
5237 params
= self
._map
_primitive
_params
(
5238 config_primitive_desc
, primitive_params
, desc_params
5241 detailed_status
= await asyncio
.wait_for(
5242 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5243 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5244 kdu_instance
=kdu_instance
,
5245 primitive_name
=primitive_name
,
5248 timeout
=timeout_ns_action
,
5251 timeout
=timeout_ns_action
,
5255 nslcmop_operation_state
= "COMPLETED"
5257 detailed_status
= ""
5258 nslcmop_operation_state
= "FAILED"
5260 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5261 nsr_deployed
["VCA"],
5262 member_vnf_index
=vnf_index
,
5264 vdu_count_index
=vdu_count_index
,
5265 ee_descriptor_id
=ee_descriptor_id
,
5267 for vca_index
, vca_deployed
in enumerate(
5268 db_nsr
["_admin"]["deployed"]["VCA"]
5270 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5272 "collection": "nsrs",
5273 "filter": {"_id": nsr_id
},
5274 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5278 nslcmop_operation_state
,
5280 ) = await self
._ns
_execute
_primitive
(
5282 primitive
=primitive_name
,
5283 primitive_params
=self
._map
_primitive
_params
(
5284 config_primitive_desc
, primitive_params
, desc_params
5286 timeout
=timeout_ns_action
,
5292 db_nslcmop_update
["detailed-status"] = detailed_status
5293 error_description_nslcmop
= (
5294 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5298 + " task Done with result {} {}".format(
5299 nslcmop_operation_state
, detailed_status
5302 return # database update is called inside finally
5304 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5305 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5307 except asyncio
.CancelledError
:
5309 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5311 exc
= "Operation was cancelled"
5312 except asyncio
.TimeoutError
:
5313 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5315 except Exception as e
:
5316 exc
= traceback
.format_exc()
5317 self
.logger
.critical(
5318 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5327 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5328 nslcmop_operation_state
= "FAILED"
5330 self
._write
_ns
_status
(
5334 ], # TODO check if degraded. For the moment use previous status
5335 current_operation
="IDLE",
5336 current_operation_id
=None,
5337 # error_description=error_description_nsr,
5338 # error_detail=error_detail,
5339 other_update
=db_nsr_update
,
5342 self
._write
_op
_status
(
5345 error_message
=error_description_nslcmop
,
5346 operation_state
=nslcmop_operation_state
,
5347 other_update
=db_nslcmop_update
,
5350 if nslcmop_operation_state
:
5352 await self
.msg
.aiowrite(
5357 "nslcmop_id": nslcmop_id
,
5358 "operationState": nslcmop_operation_state
,
5362 except Exception as e
:
5364 logging_text
+ "kafka_write notification Exception {}".format(e
)
5366 self
.logger
.debug(logging_text
+ "Exit")
5367 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5368 return nslcmop_operation_state
, detailed_status
5370 async def terminate_vdus(
5371 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5373 """This method terminates VDUs
5376 db_vnfr: VNF instance record
5377 member_vnf_index: VNF index to identify the VDUs to be removed
5378 db_nsr: NS instance record
5379 update_db_nslcmops: Nslcmop update record
5381 vca_scaling_info
= []
5382 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5383 scaling_info
["scaling_direction"] = "IN"
5384 scaling_info
["vdu-delete"] = {}
5385 scaling_info
["kdu-delete"] = {}
5386 db_vdur
= db_vnfr
.get("vdur")
5387 vdur_list
= copy(db_vdur
)
5389 for index
, vdu
in enumerate(vdur_list
):
5390 vca_scaling_info
.append(
5392 "osm_vdu_id": vdu
["vdu-id-ref"],
5393 "member-vnf-index": member_vnf_index
,
5395 "vdu_index": count_index
,
5397 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5398 scaling_info
["vdu"].append(
5400 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5401 "vdu_id": vdu
["vdu-id-ref"],
5404 for interface
in vdu
["interfaces"]:
5405 scaling_info
["vdu"][index
]["interface"].append(
5407 "name": interface
["name"],
5408 "ip_address": interface
["ip-address"],
5409 "mac_address": interface
.get("mac-address"),
5411 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5412 stage
[2] = "Terminating VDUs"
5413 if scaling_info
.get("vdu-delete"):
5414 # scale_process = "RO"
5415 if self
.ro_config
.get("ng"):
5416 await self
._scale
_ng
_ro
(
5417 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5420 async def remove_vnf(
5421 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5423 """This method is to Remove VNF instances from NS.
5426 nsr_id: NS instance id
5427 nslcmop_id: nslcmop id of update
5428 vnf_instance_id: id of the VNF instance to be removed
5431 result: (str, str) COMPLETED/FAILED, details
5435 logging_text
= "Task ns={} update ".format(nsr_id
)
5436 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5437 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5438 if check_vnfr_count
> 1:
5439 stage
= ["", "", ""]
5440 step
= "Getting nslcmop from database"
5441 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5442 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5443 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5444 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5445 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5446 """ db_vnfr = self.db.get_one(
5447 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5449 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5450 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5452 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5453 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5454 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5455 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5456 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5457 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5458 return "COMPLETED", "Done"
5460 step
= "Terminate VNF Failed with"
5461 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5463 except (LcmException
, asyncio
.CancelledError
):
5465 except Exception as e
:
5466 self
.logger
.debug("Error removing VNF {}".format(e
))
5467 return "FAILED", "Error removing VNF {}".format(e
)
5469 async def _ns_redeploy_vnf(
5470 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5472 """This method updates and redeploys VNF instances
5475 nsr_id: NS instance id
5476 nslcmop_id: nslcmop id
5477 db_vnfd: VNF descriptor
5478 db_vnfr: VNF instance record
5479 db_nsr: NS instance record
5482 result: (str, str) COMPLETED/FAILED, details
5486 stage
= ["", "", ""]
5487 logging_text
= "Task ns={} update ".format(nsr_id
)
5488 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5489 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5491 # Terminate old VNF resources
5492 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5493 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5495 # old_vnfd_id = db_vnfr["vnfd-id"]
5496 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5497 new_db_vnfd
= db_vnfd
5498 # new_vnfd_ref = new_db_vnfd["id"]
5499 # new_vnfd_id = vnfd_id
5503 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5505 "name": cp
.get("id"),
5506 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5507 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5510 new_vnfr_cp
.append(vnf_cp
)
5511 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5512 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5513 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5514 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5515 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5516 updated_db_vnfr
= self
.db
.get_one(
5517 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5520 # Instantiate new VNF resources
5521 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5522 vca_scaling_info
= []
5523 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5524 scaling_info
["scaling_direction"] = "OUT"
5525 scaling_info
["vdu-create"] = {}
5526 scaling_info
["kdu-create"] = {}
5527 vdud_instantiate_list
= db_vnfd
["vdu"]
5528 for index
, vdud
in enumerate(vdud_instantiate_list
):
5529 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5533 additional_params
= (
5534 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5537 cloud_init_list
= []
5539 # TODO Information of its own ip is not available because db_vnfr is not updated.
5540 additional_params
["OSM"] = get_osm_params(
5541 updated_db_vnfr
, vdud
["id"], 1
5543 cloud_init_list
.append(
5544 self
._parse
_cloud
_init
(
5551 vca_scaling_info
.append(
5553 "osm_vdu_id": vdud
["id"],
5554 "member-vnf-index": member_vnf_index
,
5556 "vdu_index": count_index
,
5559 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5560 if self
.ro_config
.get("ng"):
5562 "New Resources to be deployed: {}".format(scaling_info
))
5563 await self
._scale
_ng
_ro
(
5564 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5566 return "COMPLETED", "Done"
5567 except (LcmException
, asyncio
.CancelledError
):
5569 except Exception as e
:
5570 self
.logger
.debug("Error updating VNF {}".format(e
))
5571 return "FAILED", "Error updating VNF {}".format(e
)
5573 async def _ns_charm_upgrade(
5579 timeout
: float = None,
5581 """This method upgrade charms in VNF instances
5584 ee_id: Execution environment id
5585 path: Local path to the charm
5587 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5588 timeout: (Float) Timeout for the ns update operation
5591 result: (str, str) COMPLETED/FAILED, details
5594 charm_type
= charm_type
or "lxc_proxy_charm"
5595 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5599 charm_type
=charm_type
,
5600 timeout
=timeout
or self
.timeout_ns_update
,
5604 return "COMPLETED", output
5606 except (LcmException
, asyncio
.CancelledError
):
5609 except Exception as e
:
5611 self
.logger
.debug("Error upgrading charm {}".format(path
))
5613 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5615 async def update(self
, nsr_id
, nslcmop_id
):
5616 """Update NS according to different update types
5618 This method performs upgrade of VNF instances then updates the revision
5619 number in VNF record
5622 nsr_id: Network service will be updated
5623 nslcmop_id: ns lcm operation id
5626 It may raise DbException, LcmException, N2VCException, K8sException
5629 # Try to lock HA task here
5630 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5631 if not task_is_locked_by_me
:
5634 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5635 self
.logger
.debug(logging_text
+ "Enter")
5637 # Set the required variables to be filled up later
5639 db_nslcmop_update
= {}
5641 nslcmop_operation_state
= None
5643 error_description_nslcmop
= ""
5645 change_type
= "updated"
5646 detailed_status
= ""
5649 # wait for any previous tasks in process
5650 step
= "Waiting for previous operations to terminate"
5651 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5652 self
._write
_ns
_status
(
5655 current_operation
="UPDATING",
5656 current_operation_id
=nslcmop_id
,
5659 step
= "Getting nslcmop from database"
5660 db_nslcmop
= self
.db
.get_one(
5661 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5663 update_type
= db_nslcmop
["operationParams"]["updateType"]
5665 step
= "Getting nsr from database"
5666 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5667 old_operational_status
= db_nsr
["operational-status"]
5668 db_nsr_update
["operational-status"] = "updating"
5669 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5670 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5672 if update_type
== "CHANGE_VNFPKG":
5674 # Get the input parameters given through update request
5675 vnf_instance_id
= db_nslcmop
["operationParams"][
5676 "changeVnfPackageData"
5677 ].get("vnfInstanceId")
5679 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5682 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5684 step
= "Getting vnfr from database"
5685 db_vnfr
= self
.db
.get_one(
5686 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5689 step
= "Getting vnfds from database"
5691 latest_vnfd
= self
.db
.get_one(
5692 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5694 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5697 current_vnf_revision
= db_vnfr
.get("revision", 1)
5698 current_vnfd
= self
.db
.get_one(
5700 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5701 fail_on_empty
=False,
5703 # Charm artifact paths will be filled up later
5705 current_charm_artifact_path
,
5706 target_charm_artifact_path
,
5707 charm_artifact_paths
,
5710 step
= "Checking if revision has changed in VNFD"
5711 if current_vnf_revision
!= latest_vnfd_revision
:
5713 change_type
= "policy_updated"
5715 # There is new revision of VNFD, update operation is required
5716 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5717 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5719 step
= "Removing the VNFD packages if they exist in the local path"
5720 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5721 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5723 step
= "Get the VNFD packages from FSMongo"
5724 self
.fs
.sync(from_path
=latest_vnfd_path
)
5725 self
.fs
.sync(from_path
=current_vnfd_path
)
5728 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5730 base_folder
= latest_vnfd
["_admin"]["storage"]
5732 for charm_index
, charm_deployed
in enumerate(
5733 get_iterable(nsr_deployed
, "VCA")
5735 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5737 # Getting charm-id and charm-type
5738 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5739 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5740 charm_type
= charm_deployed
.get("type")
5743 ee_id
= charm_deployed
.get("ee_id")
5745 step
= "Getting descriptor config"
5746 descriptor_config
= get_configuration(
5747 current_vnfd
, current_vnfd
["id"]
5750 if "execution-environment-list" in descriptor_config
:
5751 ee_list
= descriptor_config
.get(
5752 "execution-environment-list", []
5757 # There could be several charm used in the same VNF
5758 for ee_item
in ee_list
:
5759 if ee_item
.get("juju"):
5761 step
= "Getting charm name"
5762 charm_name
= ee_item
["juju"].get("charm")
5764 step
= "Setting Charm artifact paths"
5765 current_charm_artifact_path
.append(
5766 get_charm_artifact_path(
5770 current_vnf_revision
,
5773 target_charm_artifact_path
.append(
5774 get_charm_artifact_path(
5778 latest_vnfd_revision
,
5782 charm_artifact_paths
= zip(
5783 current_charm_artifact_path
, target_charm_artifact_path
5786 step
= "Checking if software version has changed in VNFD"
5787 if find_software_version(current_vnfd
) != find_software_version(
5791 step
= "Checking if existing VNF has charm"
5792 for current_charm_path
, target_charm_path
in list(
5793 charm_artifact_paths
5795 if current_charm_path
:
5797 "Software version change is not supported as VNF instance {} has charm.".format(
5802 # There is no change in the charm package, then redeploy the VNF
5803 # based on new descriptor
5804 step
= "Redeploying VNF"
5805 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5809 ) = await self
._ns
_redeploy
_vnf
(
5816 if result
== "FAILED":
5817 nslcmop_operation_state
= result
5818 error_description_nslcmop
= detailed_status
5819 db_nslcmop_update
["detailed-status"] = detailed_status
5822 + " step {} Done with result {} {}".format(
5823 step
, nslcmop_operation_state
, detailed_status
5828 step
= "Checking if any charm package has changed or not"
5829 for current_charm_path
, target_charm_path
in list(
5830 charm_artifact_paths
5834 and target_charm_path
5835 and self
.check_charm_hash_changed(
5836 current_charm_path
, target_charm_path
5840 step
= "Checking whether VNF uses juju bundle"
5841 if check_juju_bundle_existence(current_vnfd
):
5844 "Charm upgrade is not supported for the instance which"
5845 " uses juju-bundle: {}".format(
5846 check_juju_bundle_existence(current_vnfd
)
5850 step
= "Upgrading Charm"
5854 ) = await self
._ns
_charm
_upgrade
(
5857 charm_type
=charm_type
,
5858 path
=self
.fs
.path
+ target_charm_path
,
5859 timeout
=timeout_seconds
,
5862 if result
== "FAILED":
5863 nslcmop_operation_state
= result
5864 error_description_nslcmop
= detailed_status
5866 db_nslcmop_update
["detailed-status"] = detailed_status
5869 + " step {} Done with result {} {}".format(
5870 step
, nslcmop_operation_state
, detailed_status
5874 step
= "Updating policies"
5875 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5876 result
= "COMPLETED"
5877 detailed_status
= "Done"
5878 db_nslcmop_update
["detailed-status"] = "Done"
5880 # If nslcmop_operation_state is None, so any operation is not failed.
5881 if not nslcmop_operation_state
:
5882 nslcmop_operation_state
= "COMPLETED"
5884 # If update CHANGE_VNFPKG nslcmop_operation is successful
5885 # vnf revision need to be updated
5886 vnfr_update
["revision"] = latest_vnfd_revision
5887 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5891 + " task Done with result {} {}".format(
5892 nslcmop_operation_state
, detailed_status
5895 elif update_type
== "REMOVE_VNF":
5896 # This part is included in https://osm.etsi.org/gerrit/11876
5897 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5898 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5899 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5900 step
= "Removing VNF"
5901 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5902 if result
== "FAILED":
5903 nslcmop_operation_state
= result
5904 error_description_nslcmop
= detailed_status
5905 db_nslcmop_update
["detailed-status"] = detailed_status
5906 change_type
= "vnf_terminated"
5907 if not nslcmop_operation_state
:
5908 nslcmop_operation_state
= "COMPLETED"
5911 + " task Done with result {} {}".format(
5912 nslcmop_operation_state
, detailed_status
5916 elif update_type
== "OPERATE_VNF":
5917 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5918 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5919 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5920 (result
, detailed_status
) = await self
.rebuild_start_stop(
5921 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5923 if result
== "FAILED":
5924 nslcmop_operation_state
= result
5925 error_description_nslcmop
= detailed_status
5926 db_nslcmop_update
["detailed-status"] = detailed_status
5927 if not nslcmop_operation_state
:
5928 nslcmop_operation_state
= "COMPLETED"
5931 + " task Done with result {} {}".format(
5932 nslcmop_operation_state
, detailed_status
5936 # If nslcmop_operation_state is None, so any operation is not failed.
5937 # All operations are executed in overall.
5938 if not nslcmop_operation_state
:
5939 nslcmop_operation_state
= "COMPLETED"
5940 db_nsr_update
["operational-status"] = old_operational_status
5942 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5943 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5945 except asyncio
.CancelledError
:
5947 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5949 exc
= "Operation was cancelled"
5950 except asyncio
.TimeoutError
:
5951 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5953 except Exception as e
:
5954 exc
= traceback
.format_exc()
5955 self
.logger
.critical(
5956 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5965 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5966 nslcmop_operation_state
= "FAILED"
5967 db_nsr_update
["operational-status"] = old_operational_status
5969 self
._write
_ns
_status
(
5971 ns_state
=db_nsr
["nsState"],
5972 current_operation
="IDLE",
5973 current_operation_id
=None,
5974 other_update
=db_nsr_update
,
5977 self
._write
_op
_status
(
5980 error_message
=error_description_nslcmop
,
5981 operation_state
=nslcmop_operation_state
,
5982 other_update
=db_nslcmop_update
,
5985 if nslcmop_operation_state
:
5989 "nslcmop_id": nslcmop_id
,
5990 "operationState": nslcmop_operation_state
,
5992 if change_type
in ("vnf_terminated", "policy_updated"):
5993 msg
.update({"vnf_member_index": member_vnf_index
})
5994 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
5995 except Exception as e
:
5997 logging_text
+ "kafka_write notification Exception {}".format(e
)
5999 self
.logger
.debug(logging_text
+ "Exit")
6000 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6001 return nslcmop_operation_state
, detailed_status
6003 async def scale(self
, nsr_id
, nslcmop_id
):
6004 # Try to lock HA task here
6005 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6006 if not task_is_locked_by_me
:
6009 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6010 stage
= ["", "", ""]
6011 tasks_dict_info
= {}
6012 # ^ stage, step, VIM progress
6013 self
.logger
.debug(logging_text
+ "Enter")
6014 # get all needed from database
6016 db_nslcmop_update
= {}
6019 # in case of error, indicates what part of scale was failed to put nsr at error status
6020 scale_process
= None
6021 old_operational_status
= ""
6022 old_config_status
= ""
6025 # wait for any previous tasks in process
6026 step
= "Waiting for previous operations to terminate"
6027 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6028 self
._write
_ns
_status
(
6031 current_operation
="SCALING",
6032 current_operation_id
=nslcmop_id
,
6035 step
= "Getting nslcmop from database"
6037 step
+ " after having waited for previous tasks to be completed"
6039 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6041 step
= "Getting nsr from database"
6042 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6043 old_operational_status
= db_nsr
["operational-status"]
6044 old_config_status
= db_nsr
["config-status"]
6046 step
= "Parsing scaling parameters"
6047 db_nsr_update
["operational-status"] = "scaling"
6048 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6049 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6051 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6053 ]["member-vnf-index"]
6054 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6056 ]["scaling-group-descriptor"]
6057 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6058 # for backward compatibility
6059 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6060 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6061 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6062 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6064 step
= "Getting vnfr from database"
6065 db_vnfr
= self
.db
.get_one(
6066 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6069 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6071 step
= "Getting vnfd from database"
6072 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6074 base_folder
= db_vnfd
["_admin"]["storage"]
6076 step
= "Getting scaling-group-descriptor"
6077 scaling_descriptor
= find_in_list(
6078 get_scaling_aspect(db_vnfd
),
6079 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6081 if not scaling_descriptor
:
6083 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6084 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6087 step
= "Sending scale order to VIM"
6088 # TODO check if ns is in a proper status
6090 if not db_nsr
["_admin"].get("scaling-group"):
6095 "_admin.scaling-group": [
6096 {"name": scaling_group
, "nb-scale-op": 0}
6100 admin_scale_index
= 0
6102 for admin_scale_index
, admin_scale_info
in enumerate(
6103 db_nsr
["_admin"]["scaling-group"]
6105 if admin_scale_info
["name"] == scaling_group
:
6106 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6108 else: # not found, set index one plus last element and add new entry with the name
6109 admin_scale_index
+= 1
6111 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6114 vca_scaling_info
= []
6115 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6116 if scaling_type
== "SCALE_OUT":
6117 if "aspect-delta-details" not in scaling_descriptor
:
6119 "Aspect delta details not fount in scaling descriptor {}".format(
6120 scaling_descriptor
["name"]
6123 # count if max-instance-count is reached
6124 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6126 scaling_info
["scaling_direction"] = "OUT"
6127 scaling_info
["vdu-create"] = {}
6128 scaling_info
["kdu-create"] = {}
6129 for delta
in deltas
:
6130 for vdu_delta
in delta
.get("vdu-delta", {}):
6131 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6132 # vdu_index also provides the number of instance of the targeted vdu
6133 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6134 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6138 additional_params
= (
6139 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6142 cloud_init_list
= []
6144 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6145 max_instance_count
= 10
6146 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6147 max_instance_count
= vdu_profile
.get(
6148 "max-number-of-instances", 10
6151 default_instance_num
= get_number_of_instances(
6154 instances_number
= vdu_delta
.get("number-of-instances", 1)
6155 nb_scale_op
+= instances_number
6157 new_instance_count
= nb_scale_op
+ default_instance_num
6158 # Control if new count is over max and vdu count is less than max.
6159 # Then assign new instance count
6160 if new_instance_count
> max_instance_count
> vdu_count
:
6161 instances_number
= new_instance_count
- max_instance_count
6163 instances_number
= instances_number
6165 if new_instance_count
> max_instance_count
:
6167 "reached the limit of {} (max-instance-count) "
6168 "scaling-out operations for the "
6169 "scaling-group-descriptor '{}'".format(
6170 nb_scale_op
, scaling_group
6173 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6175 # TODO Information of its own ip is not available because db_vnfr is not updated.
6176 additional_params
["OSM"] = get_osm_params(
6177 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6179 cloud_init_list
.append(
6180 self
._parse
_cloud
_init
(
6187 vca_scaling_info
.append(
6189 "osm_vdu_id": vdu_delta
["id"],
6190 "member-vnf-index": vnf_index
,
6192 "vdu_index": vdu_index
+ x
,
6195 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6196 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6197 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6198 kdu_name
= kdu_profile
["kdu-name"]
6199 resource_name
= kdu_profile
.get("resource-name", "")
6201 # Might have different kdus in the same delta
6202 # Should have list for each kdu
6203 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6204 scaling_info
["kdu-create"][kdu_name
] = []
6206 kdur
= get_kdur(db_vnfr
, kdu_name
)
6207 if kdur
.get("helm-chart"):
6208 k8s_cluster_type
= "helm-chart-v3"
6209 self
.logger
.debug("kdur: {}".format(kdur
))
6211 kdur
.get("helm-version")
6212 and kdur
.get("helm-version") == "v2"
6214 k8s_cluster_type
= "helm-chart"
6215 elif kdur
.get("juju-bundle"):
6216 k8s_cluster_type
= "juju-bundle"
6219 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6220 "juju-bundle. Maybe an old NBI version is running".format(
6221 db_vnfr
["member-vnf-index-ref"], kdu_name
6225 max_instance_count
= 10
6226 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6227 max_instance_count
= kdu_profile
.get(
6228 "max-number-of-instances", 10
6231 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6232 deployed_kdu
, _
= get_deployed_kdu(
6233 nsr_deployed
, kdu_name
, vnf_index
6235 if deployed_kdu
is None:
6237 "KDU '{}' for vnf '{}' not deployed".format(
6241 kdu_instance
= deployed_kdu
.get("kdu-instance")
6242 instance_num
= await self
.k8scluster_map
[
6248 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6249 kdu_model
=deployed_kdu
.get("kdu-model"),
6251 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6252 "number-of-instances", 1
6255 # Control if new count is over max and instance_num is less than max.
6256 # Then assign max instance number to kdu replica count
6257 if kdu_replica_count
> max_instance_count
> instance_num
:
6258 kdu_replica_count
= max_instance_count
6259 if kdu_replica_count
> max_instance_count
:
6261 "reached the limit of {} (max-instance-count) "
6262 "scaling-out operations for the "
6263 "scaling-group-descriptor '{}'".format(
6264 instance_num
, scaling_group
6268 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6269 vca_scaling_info
.append(
6271 "osm_kdu_id": kdu_name
,
6272 "member-vnf-index": vnf_index
,
6274 "kdu_index": instance_num
+ x
- 1,
6277 scaling_info
["kdu-create"][kdu_name
].append(
6279 "member-vnf-index": vnf_index
,
6281 "k8s-cluster-type": k8s_cluster_type
,
6282 "resource-name": resource_name
,
6283 "scale": kdu_replica_count
,
6286 elif scaling_type
== "SCALE_IN":
6287 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6289 scaling_info
["scaling_direction"] = "IN"
6290 scaling_info
["vdu-delete"] = {}
6291 scaling_info
["kdu-delete"] = {}
6293 for delta
in deltas
:
6294 for vdu_delta
in delta
.get("vdu-delta", {}):
6295 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6296 min_instance_count
= 0
6297 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6298 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6299 min_instance_count
= vdu_profile
["min-number-of-instances"]
6301 default_instance_num
= get_number_of_instances(
6302 db_vnfd
, vdu_delta
["id"]
6304 instance_num
= vdu_delta
.get("number-of-instances", 1)
6305 nb_scale_op
-= instance_num
6307 new_instance_count
= nb_scale_op
+ default_instance_num
6309 if new_instance_count
< min_instance_count
< vdu_count
:
6310 instances_number
= min_instance_count
- new_instance_count
6312 instances_number
= instance_num
6314 if new_instance_count
< min_instance_count
:
6316 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6317 "scaling-group-descriptor '{}'".format(
6318 nb_scale_op
, scaling_group
6321 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6322 vca_scaling_info
.append(
6324 "osm_vdu_id": vdu_delta
["id"],
6325 "member-vnf-index": vnf_index
,
6327 "vdu_index": vdu_index
- 1 - x
,
6330 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6331 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6332 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6333 kdu_name
= kdu_profile
["kdu-name"]
6334 resource_name
= kdu_profile
.get("resource-name", "")
6336 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6337 scaling_info
["kdu-delete"][kdu_name
] = []
6339 kdur
= get_kdur(db_vnfr
, kdu_name
)
6340 if kdur
.get("helm-chart"):
6341 k8s_cluster_type
= "helm-chart-v3"
6342 self
.logger
.debug("kdur: {}".format(kdur
))
6344 kdur
.get("helm-version")
6345 and kdur
.get("helm-version") == "v2"
6347 k8s_cluster_type
= "helm-chart"
6348 elif kdur
.get("juju-bundle"):
6349 k8s_cluster_type
= "juju-bundle"
6352 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6353 "juju-bundle. Maybe an old NBI version is running".format(
6354 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6358 min_instance_count
= 0
6359 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6360 min_instance_count
= kdu_profile
["min-number-of-instances"]
6362 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6363 deployed_kdu
, _
= get_deployed_kdu(
6364 nsr_deployed
, kdu_name
, vnf_index
6366 if deployed_kdu
is None:
6368 "KDU '{}' for vnf '{}' not deployed".format(
6372 kdu_instance
= deployed_kdu
.get("kdu-instance")
6373 instance_num
= await self
.k8scluster_map
[
6379 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6380 kdu_model
=deployed_kdu
.get("kdu-model"),
6382 kdu_replica_count
= instance_num
- kdu_delta
.get(
6383 "number-of-instances", 1
6386 if kdu_replica_count
< min_instance_count
< instance_num
:
6387 kdu_replica_count
= min_instance_count
6388 if kdu_replica_count
< min_instance_count
:
6390 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6391 "scaling-group-descriptor '{}'".format(
6392 instance_num
, scaling_group
6396 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6397 vca_scaling_info
.append(
6399 "osm_kdu_id": kdu_name
,
6400 "member-vnf-index": vnf_index
,
6402 "kdu_index": instance_num
- x
- 1,
6405 scaling_info
["kdu-delete"][kdu_name
].append(
6407 "member-vnf-index": vnf_index
,
6409 "k8s-cluster-type": k8s_cluster_type
,
6410 "resource-name": resource_name
,
6411 "scale": kdu_replica_count
,
6415 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6416 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6417 if scaling_info
["scaling_direction"] == "IN":
6418 for vdur
in reversed(db_vnfr
["vdur"]):
6419 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6420 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6421 scaling_info
["vdu"].append(
6423 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6424 "vdu_id": vdur
["vdu-id-ref"],
6428 for interface
in vdur
["interfaces"]:
6429 scaling_info
["vdu"][-1]["interface"].append(
6431 "name": interface
["name"],
6432 "ip_address": interface
["ip-address"],
6433 "mac_address": interface
.get("mac-address"),
6436 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6439 step
= "Executing pre-scale vnf-config-primitive"
6440 if scaling_descriptor
.get("scaling-config-action"):
6441 for scaling_config_action
in scaling_descriptor
[
6442 "scaling-config-action"
6445 scaling_config_action
.get("trigger") == "pre-scale-in"
6446 and scaling_type
== "SCALE_IN"
6448 scaling_config_action
.get("trigger") == "pre-scale-out"
6449 and scaling_type
== "SCALE_OUT"
6451 vnf_config_primitive
= scaling_config_action
[
6452 "vnf-config-primitive-name-ref"
6454 step
= db_nslcmop_update
[
6456 ] = "executing pre-scale scaling-config-action '{}'".format(
6457 vnf_config_primitive
6460 # look for primitive
6461 for config_primitive
in (
6462 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6463 ).get("config-primitive", ()):
6464 if config_primitive
["name"] == vnf_config_primitive
:
6468 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6469 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6470 "primitive".format(scaling_group
, vnf_config_primitive
)
6473 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6474 if db_vnfr
.get("additionalParamsForVnf"):
6475 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6477 scale_process
= "VCA"
6478 db_nsr_update
["config-status"] = "configuring pre-scaling"
6479 primitive_params
= self
._map
_primitive
_params
(
6480 config_primitive
, {}, vnfr_params
6483 # Pre-scale retry check: Check if this sub-operation has been executed before
6484 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6487 vnf_config_primitive
,
6491 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6492 # Skip sub-operation
6493 result
= "COMPLETED"
6494 result_detail
= "Done"
6497 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6498 vnf_config_primitive
, result
, result_detail
6502 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6503 # New sub-operation: Get index of this sub-operation
6505 len(db_nslcmop
.get("_admin", {}).get("operations"))
6510 + "vnf_config_primitive={} New sub-operation".format(
6511 vnf_config_primitive
6515 # retry: Get registered params for this existing sub-operation
6516 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6519 vnf_index
= op
.get("member_vnf_index")
6520 vnf_config_primitive
= op
.get("primitive")
6521 primitive_params
= op
.get("primitive_params")
6524 + "vnf_config_primitive={} Sub-operation retry".format(
6525 vnf_config_primitive
6528 # Execute the primitive, either with new (first-time) or registered (reintent) args
6529 ee_descriptor_id
= config_primitive
.get(
6530 "execution-environment-ref"
6532 primitive_name
= config_primitive
.get(
6533 "execution-environment-primitive", vnf_config_primitive
6535 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6536 nsr_deployed
["VCA"],
6537 member_vnf_index
=vnf_index
,
6539 vdu_count_index
=None,
6540 ee_descriptor_id
=ee_descriptor_id
,
6542 result
, result_detail
= await self
._ns
_execute
_primitive
(
6551 + "vnf_config_primitive={} Done with result {} {}".format(
6552 vnf_config_primitive
, result
, result_detail
6555 # Update operationState = COMPLETED | FAILED
6556 self
._update
_suboperation
_status
(
6557 db_nslcmop
, op_index
, result
, result_detail
6560 if result
== "FAILED":
6561 raise LcmException(result_detail
)
6562 db_nsr_update
["config-status"] = old_config_status
6563 scale_process
= None
6567 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6570 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6573 # SCALE-IN VCA - BEGIN
6574 if vca_scaling_info
:
6575 step
= db_nslcmop_update
[
6577 ] = "Deleting the execution environments"
6578 scale_process
= "VCA"
6579 for vca_info
in vca_scaling_info
:
6580 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6581 member_vnf_index
= str(vca_info
["member-vnf-index"])
6583 logging_text
+ "vdu info: {}".format(vca_info
)
6585 if vca_info
.get("osm_vdu_id"):
6586 vdu_id
= vca_info
["osm_vdu_id"]
6587 vdu_index
= int(vca_info
["vdu_index"])
6590 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6591 member_vnf_index
, vdu_id
, vdu_index
6593 stage
[2] = step
= "Scaling in VCA"
6594 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6595 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6596 config_update
= db_nsr
["configurationStatus"]
6597 for vca_index
, vca
in enumerate(vca_update
):
6599 (vca
or vca
.get("ee_id"))
6600 and vca
["member-vnf-index"] == member_vnf_index
6601 and vca
["vdu_count_index"] == vdu_index
6603 if vca
.get("vdu_id"):
6604 config_descriptor
= get_configuration(
6605 db_vnfd
, vca
.get("vdu_id")
6607 elif vca
.get("kdu_name"):
6608 config_descriptor
= get_configuration(
6609 db_vnfd
, vca
.get("kdu_name")
6612 config_descriptor
= get_configuration(
6613 db_vnfd
, db_vnfd
["id"]
6615 operation_params
= (
6616 db_nslcmop
.get("operationParams") or {}
6618 exec_terminate_primitives
= not operation_params
.get(
6619 "skip_terminate_primitives"
6620 ) and vca
.get("needed_terminate")
6621 task
= asyncio
.ensure_future(
6630 exec_primitives
=exec_terminate_primitives
,
6634 timeout
=self
.timeout_charm_delete
,
6637 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6640 del vca_update
[vca_index
]
6641 del config_update
[vca_index
]
6642 # wait for pending tasks of terminate primitives
6646 + "Waiting for tasks {}".format(
6647 list(tasks_dict_info
.keys())
6650 error_list
= await self
._wait
_for
_tasks
(
6654 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6659 tasks_dict_info
.clear()
6661 raise LcmException("; ".join(error_list
))
6663 db_vca_and_config_update
= {
6664 "_admin.deployed.VCA": vca_update
,
6665 "configurationStatus": config_update
,
6668 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6670 scale_process
= None
6671 # SCALE-IN VCA - END
6674 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6675 scale_process
= "RO"
6676 if self
.ro_config
.get("ng"):
6677 await self
._scale
_ng
_ro
(
6678 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6680 scaling_info
.pop("vdu-create", None)
6681 scaling_info
.pop("vdu-delete", None)
6683 scale_process
= None
6687 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6688 scale_process
= "KDU"
6689 await self
._scale
_kdu
(
6690 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6692 scaling_info
.pop("kdu-create", None)
6693 scaling_info
.pop("kdu-delete", None)
6695 scale_process
= None
6699 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6701 # SCALE-UP VCA - BEGIN
6702 if vca_scaling_info
:
6703 step
= db_nslcmop_update
[
6705 ] = "Creating new execution environments"
6706 scale_process
= "VCA"
6707 for vca_info
in vca_scaling_info
:
6708 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6709 member_vnf_index
= str(vca_info
["member-vnf-index"])
6711 logging_text
+ "vdu info: {}".format(vca_info
)
6713 vnfd_id
= db_vnfr
["vnfd-ref"]
6714 if vca_info
.get("osm_vdu_id"):
6715 vdu_index
= int(vca_info
["vdu_index"])
6716 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6717 if db_vnfr
.get("additionalParamsForVnf"):
6718 deploy_params
.update(
6720 db_vnfr
["additionalParamsForVnf"].copy()
6723 descriptor_config
= get_configuration(
6724 db_vnfd
, db_vnfd
["id"]
6726 if descriptor_config
:
6731 logging_text
=logging_text
6732 + "member_vnf_index={} ".format(member_vnf_index
),
6735 nslcmop_id
=nslcmop_id
,
6741 member_vnf_index
=member_vnf_index
,
6742 vdu_index
=vdu_index
,
6744 deploy_params
=deploy_params
,
6745 descriptor_config
=descriptor_config
,
6746 base_folder
=base_folder
,
6747 task_instantiation_info
=tasks_dict_info
,
6750 vdu_id
= vca_info
["osm_vdu_id"]
6751 vdur
= find_in_list(
6752 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6754 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6755 if vdur
.get("additionalParams"):
6756 deploy_params_vdu
= parse_yaml_strings(
6757 vdur
["additionalParams"]
6760 deploy_params_vdu
= deploy_params
6761 deploy_params_vdu
["OSM"] = get_osm_params(
6762 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6764 if descriptor_config
:
6769 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6770 member_vnf_index
, vdu_id
, vdu_index
6772 stage
[2] = step
= "Scaling out VCA"
6773 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6775 logging_text
=logging_text
6776 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6777 member_vnf_index
, vdu_id
, vdu_index
6781 nslcmop_id
=nslcmop_id
,
6787 member_vnf_index
=member_vnf_index
,
6788 vdu_index
=vdu_index
,
6790 deploy_params
=deploy_params_vdu
,
6791 descriptor_config
=descriptor_config
,
6792 base_folder
=base_folder
,
6793 task_instantiation_info
=tasks_dict_info
,
6796 # SCALE-UP VCA - END
6797 scale_process
= None
6800 # execute primitive service POST-SCALING
6801 step
= "Executing post-scale vnf-config-primitive"
6802 if scaling_descriptor
.get("scaling-config-action"):
6803 for scaling_config_action
in scaling_descriptor
[
6804 "scaling-config-action"
6807 scaling_config_action
.get("trigger") == "post-scale-in"
6808 and scaling_type
== "SCALE_IN"
6810 scaling_config_action
.get("trigger") == "post-scale-out"
6811 and scaling_type
== "SCALE_OUT"
6813 vnf_config_primitive
= scaling_config_action
[
6814 "vnf-config-primitive-name-ref"
6816 step
= db_nslcmop_update
[
6818 ] = "executing post-scale scaling-config-action '{}'".format(
6819 vnf_config_primitive
6822 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6823 if db_vnfr
.get("additionalParamsForVnf"):
6824 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6826 # look for primitive
6827 for config_primitive
in (
6828 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6829 ).get("config-primitive", ()):
6830 if config_primitive
["name"] == vnf_config_primitive
:
6834 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6835 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6836 "config-primitive".format(
6837 scaling_group
, vnf_config_primitive
6840 scale_process
= "VCA"
6841 db_nsr_update
["config-status"] = "configuring post-scaling"
6842 primitive_params
= self
._map
_primitive
_params
(
6843 config_primitive
, {}, vnfr_params
6846 # Post-scale retry check: Check if this sub-operation has been executed before
6847 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6850 vnf_config_primitive
,
6854 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6855 # Skip sub-operation
6856 result
= "COMPLETED"
6857 result_detail
= "Done"
6860 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6861 vnf_config_primitive
, result
, result_detail
6865 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6866 # New sub-operation: Get index of this sub-operation
6868 len(db_nslcmop
.get("_admin", {}).get("operations"))
6873 + "vnf_config_primitive={} New sub-operation".format(
6874 vnf_config_primitive
6878 # retry: Get registered params for this existing sub-operation
6879 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6882 vnf_index
= op
.get("member_vnf_index")
6883 vnf_config_primitive
= op
.get("primitive")
6884 primitive_params
= op
.get("primitive_params")
6887 + "vnf_config_primitive={} Sub-operation retry".format(
6888 vnf_config_primitive
6891 # Execute the primitive, either with new (first-time) or registered (reintent) args
6892 ee_descriptor_id
= config_primitive
.get(
6893 "execution-environment-ref"
6895 primitive_name
= config_primitive
.get(
6896 "execution-environment-primitive", vnf_config_primitive
6898 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6899 nsr_deployed
["VCA"],
6900 member_vnf_index
=vnf_index
,
6902 vdu_count_index
=None,
6903 ee_descriptor_id
=ee_descriptor_id
,
6905 result
, result_detail
= await self
._ns
_execute
_primitive
(
6914 + "vnf_config_primitive={} Done with result {} {}".format(
6915 vnf_config_primitive
, result
, result_detail
6918 # Update operationState = COMPLETED | FAILED
6919 self
._update
_suboperation
_status
(
6920 db_nslcmop
, op_index
, result
, result_detail
6923 if result
== "FAILED":
6924 raise LcmException(result_detail
)
6925 db_nsr_update
["config-status"] = old_config_status
6926 scale_process
= None
6931 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6932 db_nsr_update
["operational-status"] = (
6934 if old_operational_status
== "failed"
6935 else old_operational_status
6937 db_nsr_update
["config-status"] = old_config_status
6940 ROclient
.ROClientException
,
6945 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6947 except asyncio
.CancelledError
:
6949 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6951 exc
= "Operation was cancelled"
6952 except Exception as e
:
6953 exc
= traceback
.format_exc()
6954 self
.logger
.critical(
6955 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6959 self
._write
_ns
_status
(
6962 current_operation
="IDLE",
6963 current_operation_id
=None,
6966 stage
[1] = "Waiting for instantiate pending tasks."
6967 self
.logger
.debug(logging_text
+ stage
[1])
6968 exc
= await self
._wait
_for
_tasks
(
6971 self
.timeout_ns_deploy
,
6979 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6980 nslcmop_operation_state
= "FAILED"
6982 db_nsr_update
["operational-status"] = old_operational_status
6983 db_nsr_update
["config-status"] = old_config_status
6984 db_nsr_update
["detailed-status"] = ""
6986 if "VCA" in scale_process
:
6987 db_nsr_update
["config-status"] = "failed"
6988 if "RO" in scale_process
:
6989 db_nsr_update
["operational-status"] = "failed"
6992 ] = "FAILED scaling nslcmop={} {}: {}".format(
6993 nslcmop_id
, step
, exc
6996 error_description_nslcmop
= None
6997 nslcmop_operation_state
= "COMPLETED"
6998 db_nslcmop_update
["detailed-status"] = "Done"
7000 self
._write
_op
_status
(
7003 error_message
=error_description_nslcmop
,
7004 operation_state
=nslcmop_operation_state
,
7005 other_update
=db_nslcmop_update
,
7008 self
._write
_ns
_status
(
7011 current_operation
="IDLE",
7012 current_operation_id
=None,
7013 other_update
=db_nsr_update
,
7016 if nslcmop_operation_state
:
7020 "nslcmop_id": nslcmop_id
,
7021 "operationState": nslcmop_operation_state
,
7023 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7024 except Exception as e
:
7026 logging_text
+ "kafka_write notification Exception {}".format(e
)
7028 self
.logger
.debug(logging_text
+ "Exit")
7029 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7031 async def _scale_kdu(
7032 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7034 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7035 for kdu_name
in _scaling_info
:
7036 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7037 deployed_kdu
, index
= get_deployed_kdu(
7038 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7040 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7041 kdu_instance
= deployed_kdu
["kdu-instance"]
7042 kdu_model
= deployed_kdu
.get("kdu-model")
7043 scale
= int(kdu_scaling_info
["scale"])
7044 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7047 "collection": "nsrs",
7048 "filter": {"_id": nsr_id
},
7049 "path": "_admin.deployed.K8s.{}".format(index
),
7052 step
= "scaling application {}".format(
7053 kdu_scaling_info
["resource-name"]
7055 self
.logger
.debug(logging_text
+ step
)
7057 if kdu_scaling_info
["type"] == "delete":
7058 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7061 and kdu_config
.get("terminate-config-primitive")
7062 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7064 terminate_config_primitive_list
= kdu_config
.get(
7065 "terminate-config-primitive"
7067 terminate_config_primitive_list
.sort(
7068 key
=lambda val
: int(val
["seq"])
7072 terminate_config_primitive
7073 ) in terminate_config_primitive_list
:
7074 primitive_params_
= self
._map
_primitive
_params
(
7075 terminate_config_primitive
, {}, {}
7077 step
= "execute terminate config primitive"
7078 self
.logger
.debug(logging_text
+ step
)
7079 await asyncio
.wait_for(
7080 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7081 cluster_uuid
=cluster_uuid
,
7082 kdu_instance
=kdu_instance
,
7083 primitive_name
=terminate_config_primitive
["name"],
7084 params
=primitive_params_
,
7091 await asyncio
.wait_for(
7092 self
.k8scluster_map
[k8s_cluster_type
].scale(
7095 kdu_scaling_info
["resource-name"],
7097 cluster_uuid
=cluster_uuid
,
7098 kdu_model
=kdu_model
,
7102 timeout
=self
.timeout_vca_on_error
,
7105 if kdu_scaling_info
["type"] == "create":
7106 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7109 and kdu_config
.get("initial-config-primitive")
7110 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7112 initial_config_primitive_list
= kdu_config
.get(
7113 "initial-config-primitive"
7115 initial_config_primitive_list
.sort(
7116 key
=lambda val
: int(val
["seq"])
7119 for initial_config_primitive
in initial_config_primitive_list
:
7120 primitive_params_
= self
._map
_primitive
_params
(
7121 initial_config_primitive
, {}, {}
7123 step
= "execute initial config primitive"
7124 self
.logger
.debug(logging_text
+ step
)
7125 await asyncio
.wait_for(
7126 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7127 cluster_uuid
=cluster_uuid
,
7128 kdu_instance
=kdu_instance
,
7129 primitive_name
=initial_config_primitive
["name"],
7130 params
=primitive_params_
,
7137 async def _scale_ng_ro(
7138 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7140 nsr_id
= db_nslcmop
["nsInstanceId"]
7141 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7144 # read from db: vnfd's for every vnf
7147 # for each vnf in ns, read vnfd
7148 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7149 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7150 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7151 # if we haven't this vnfd, read it from db
7152 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7154 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7155 db_vnfds
.append(vnfd
)
7156 n2vc_key
= self
.n2vc
.get_public_key()
7157 n2vc_key_list
= [n2vc_key
]
7160 vdu_scaling_info
.get("vdu-create"),
7161 vdu_scaling_info
.get("vdu-delete"),
7164 # db_vnfr has been updated, update db_vnfrs to use it
7165 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7166 await self
._instantiate
_ng
_ro
(
7176 start_deploy
=time(),
7177 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7179 if vdu_scaling_info
.get("vdu-delete"):
7181 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7184 async def extract_prometheus_scrape_jobs(
7185 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7187 # look if exist a file called 'prometheus*.j2' and
7188 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7192 for f
in artifact_content
7193 if f
.startswith("prometheus") and f
.endswith(".j2")
7199 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7203 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7204 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7206 vnfr_id
= vnfr_id
.replace("-", "")
7208 "JOB_NAME": vnfr_id
,
7209 "TARGET_IP": target_ip
,
7210 "EXPORTER_POD_IP": host_name
,
7211 "EXPORTER_POD_PORT": host_port
,
7213 job_list
= parse_job(job_data
, variables
)
7214 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7215 for job
in job_list
:
7217 not isinstance(job
.get("job_name"), str)
7218 or vnfr_id
not in job
["job_name"]
7220 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7221 job
["nsr_id"] = nsr_id
7222 job
["vnfr_id"] = vnfr_id
7225 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7226 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7227 self
.logger
.info(logging_text
+ "Enter")
7228 stage
= ["Preparing the environment", ""]
7229 # database nsrs record
7233 # in case of error, indicates what part of scale was failed to put nsr at error status
7234 start_deploy
= time()
7236 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7237 vim_account_id
= db_vnfr
.get("vim-account-id")
7238 vim_info_key
= "vim:" + vim_account_id
7239 vdur
= find_in_list(
7240 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7243 vdu_vim_name
= vdur
["name"]
7244 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7245 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7246 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7247 # wait for any previous tasks in process
7248 stage
[1] = "Waiting for previous operations to terminate"
7249 self
.logger
.info(stage
[1])
7250 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7252 stage
[1] = "Reading from database."
7253 self
.logger
.info(stage
[1])
7254 self
._write
_ns
_status
(
7257 current_operation
=operation_type
.upper(),
7258 current_operation_id
=nslcmop_id
7260 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7263 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7264 db_nsr_update
["operational-status"] = operation_type
7265 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7269 "vim_vm_id": vim_vm_id
,
7271 "vdu_index": additional_param
["count-index"],
7272 "vdu_id": vdur
["id"],
7273 "target_vim": target_vim
,
7274 "vim_account_id": vim_account_id
7277 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7278 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7279 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7280 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7281 self
.logger
.info("response from RO: {}".format(result_dict
))
7282 action_id
= result_dict
["action_id"]
7283 await self
._wait
_ng
_ro
(
7284 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7286 return "COMPLETED", "Done"
7287 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7288 self
.logger
.error("Exit Exception {}".format(e
))
7290 except asyncio
.CancelledError
:
7291 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7292 exc
= "Operation was cancelled"
7293 except Exception as e
:
7294 exc
= traceback
.format_exc()
7295 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7296 return "FAILED", "Error in operate VNF {}".format(exc
)
7298 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7300 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7302 :param: vim_account_id: VIM Account ID
7304 :return: (cloud_name, cloud_credential)
7306 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7307 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7309 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7311 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7313 :param: vim_account_id: VIM Account ID
7315 :return: (cloud_name, cloud_credential)
7317 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7318 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7320 async def migrate(self
, nsr_id
, nslcmop_id
):
7322 Migrate VNFs and VDUs instances in a NS
7324 :param: nsr_id: NS Instance ID
7325 :param: nslcmop_id: nslcmop ID of migrate
7328 # Try to lock HA task here
7329 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7330 if not task_is_locked_by_me
:
7332 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7333 self
.logger
.debug(logging_text
+ "Enter")
7334 # get all needed from database
7336 db_nslcmop_update
= {}
7337 nslcmop_operation_state
= None
7341 # in case of error, indicates what part of scale was failed to put nsr at error status
7342 start_deploy
= time()
7345 # wait for any previous tasks in process
7346 step
= "Waiting for previous operations to terminate"
7347 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7349 self
._write
_ns
_status
(
7352 current_operation
="MIGRATING",
7353 current_operation_id
=nslcmop_id
,
7355 step
= "Getting nslcmop from database"
7357 step
+ " after having waited for previous tasks to be completed"
7359 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7360 migrate_params
= db_nslcmop
.get("operationParams")
7363 target
.update(migrate_params
)
7364 desc
= await self
.RO
.migrate(nsr_id
, target
)
7365 self
.logger
.debug("RO return > {}".format(desc
))
7366 action_id
= desc
["action_id"]
7367 await self
._wait
_ng
_ro
(
7368 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7371 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7372 self
.logger
.error("Exit Exception {}".format(e
))
7374 except asyncio
.CancelledError
:
7375 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7376 exc
= "Operation was cancelled"
7377 except Exception as e
:
7378 exc
= traceback
.format_exc()
7379 self
.logger
.critical(
7380 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7383 self
._write
_ns
_status
(
7386 current_operation
="IDLE",
7387 current_operation_id
=None,
7390 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7391 nslcmop_operation_state
= "FAILED"
7393 nslcmop_operation_state
= "COMPLETED"
7394 db_nslcmop_update
["detailed-status"] = "Done"
7395 db_nsr_update
["detailed-status"] = "Done"
7397 self
._write
_op
_status
(
7401 operation_state
=nslcmop_operation_state
,
7402 other_update
=db_nslcmop_update
,
7404 if nslcmop_operation_state
:
7408 "nslcmop_id": nslcmop_id
,
7409 "operationState": nslcmop_operation_state
,
7411 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7412 except Exception as e
:
7414 logging_text
+ "kafka_write notification Exception {}".format(e
)
7416 self
.logger
.debug(logging_text
+ "Exit")
7417 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7420 async def heal(self
, nsr_id
, nslcmop_id
):
7424 :param nsr_id: ns instance to heal
7425 :param nslcmop_id: operation to run
7429 # Try to lock HA task here
7430 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7431 if not task_is_locked_by_me
:
7434 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7435 stage
= ["", "", ""]
7436 tasks_dict_info
= {}
7437 # ^ stage, step, VIM progress
7438 self
.logger
.debug(logging_text
+ "Enter")
7439 # get all needed from database
7441 db_nslcmop_update
= {}
7443 db_vnfrs
= {} # vnf's info indexed by _id
7445 old_operational_status
= ""
7446 old_config_status
= ""
7449 # wait for any previous tasks in process
7450 step
= "Waiting for previous operations to terminate"
7451 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7452 self
._write
_ns
_status
(
7455 current_operation
="HEALING",
7456 current_operation_id
=nslcmop_id
,
7459 step
= "Getting nslcmop from database"
7461 step
+ " after having waited for previous tasks to be completed"
7463 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7465 step
= "Getting nsr from database"
7466 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7467 old_operational_status
= db_nsr
["operational-status"]
7468 old_config_status
= db_nsr
["config-status"]
7471 "_admin.deployed.RO.operational-status": "healing",
7473 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7475 step
= "Sending heal order to VIM"
7476 task_ro
= asyncio
.ensure_future(
7478 logging_text
=logging_text
,
7480 db_nslcmop
=db_nslcmop
,
7484 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7485 tasks_dict_info
[task_ro
] = "Healing at VIM"
7489 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7490 self
.logger
.debug(logging_text
+ stage
[1])
7491 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7492 self
.fs
.sync(db_nsr
["nsd-id"])
7494 # read from db: vnfr's of this ns
7495 step
= "Getting vnfrs from db"
7496 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7497 for vnfr
in db_vnfrs_list
:
7498 db_vnfrs
[vnfr
["_id"]] = vnfr
7499 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7501 # Check for each target VNF
7502 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7503 for target_vnf
in target_list
:
7504 # Find this VNF in the list from DB
7505 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7507 db_vnfr
= db_vnfrs
[vnfr_id
]
7508 vnfd_id
= db_vnfr
.get("vnfd-id")
7509 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7510 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7511 base_folder
= vnfd
["_admin"]["storage"]
7516 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7517 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7519 # Check each target VDU and deploy N2VC
7520 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7521 deploy_params_vdu
= target_vdu
7522 # Set run-day1 vnf level value if not vdu level value exists
7523 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7524 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7525 vdu_name
= target_vdu
.get("vdu-id", None)
7526 # TODO: Get vdu_id from vdud.
7528 # For multi instance VDU count-index is mandatory
7529 # For single session VDU count-indes is 0
7530 vdu_index
= target_vdu
.get("count-index",0)
7532 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7533 stage
[1] = "Deploying Execution Environments."
7534 self
.logger
.debug(logging_text
+ stage
[1])
7536 # VNF Level charm. Normal case when proxy charms.
7537 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7538 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7539 if descriptor_config
:
7540 # Continue if healed machine is management machine
7541 vnf_ip_address
= db_vnfr
.get("ip-address")
7542 target_instance
= None
7543 for instance
in db_vnfr
.get("vdur", None):
7544 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7545 target_instance
= instance
7547 if vnf_ip_address
== target_instance
.get("ip-address"):
7549 logging_text
=logging_text
7550 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7551 member_vnf_index
, vdu_name
, vdu_index
7555 nslcmop_id
=nslcmop_id
,
7561 member_vnf_index
=member_vnf_index
,
7564 deploy_params
=deploy_params_vdu
,
7565 descriptor_config
=descriptor_config
,
7566 base_folder
=base_folder
,
7567 task_instantiation_info
=tasks_dict_info
,
7571 # VDU Level charm. Normal case with native charms.
7572 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7573 if descriptor_config
:
7575 logging_text
=logging_text
7576 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7577 member_vnf_index
, vdu_name
, vdu_index
7581 nslcmop_id
=nslcmop_id
,
7587 member_vnf_index
=member_vnf_index
,
7588 vdu_index
=vdu_index
,
7590 deploy_params
=deploy_params_vdu
,
7591 descriptor_config
=descriptor_config
,
7592 base_folder
=base_folder
,
7593 task_instantiation_info
=tasks_dict_info
,
7598 ROclient
.ROClientException
,
7603 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7605 except asyncio
.CancelledError
:
7607 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7609 exc
= "Operation was cancelled"
7610 except Exception as e
:
7611 exc
= traceback
.format_exc()
7612 self
.logger
.critical(
7613 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7618 stage
[1] = "Waiting for healing pending tasks."
7619 self
.logger
.debug(logging_text
+ stage
[1])
7620 exc
= await self
._wait
_for
_tasks
(
7623 self
.timeout_ns_deploy
,
7631 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7632 nslcmop_operation_state
= "FAILED"
7634 db_nsr_update
["operational-status"] = old_operational_status
7635 db_nsr_update
["config-status"] = old_config_status
7638 ] = "FAILED healing nslcmop={} {}: {}".format(
7639 nslcmop_id
, step
, exc
7641 for task
, task_name
in tasks_dict_info
.items():
7642 if not task
.done() or task
.cancelled() or task
.exception():
7643 if task_name
.startswith(self
.task_name_deploy_vca
):
7644 # A N2VC task is pending
7645 db_nsr_update
["config-status"] = "failed"
7647 # RO task is pending
7648 db_nsr_update
["operational-status"] = "failed"
7650 error_description_nslcmop
= None
7651 nslcmop_operation_state
= "COMPLETED"
7652 db_nslcmop_update
["detailed-status"] = "Done"
7653 db_nsr_update
["detailed-status"] = "Done"
7654 db_nsr_update
["operational-status"] = "running"
7655 db_nsr_update
["config-status"] = "configured"
7657 self
._write
_op
_status
(
7660 error_message
=error_description_nslcmop
,
7661 operation_state
=nslcmop_operation_state
,
7662 other_update
=db_nslcmop_update
,
7665 self
._write
_ns
_status
(
7668 current_operation
="IDLE",
7669 current_operation_id
=None,
7670 other_update
=db_nsr_update
,
7673 if nslcmop_operation_state
:
7677 "nslcmop_id": nslcmop_id
,
7678 "operationState": nslcmop_operation_state
,
7680 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7681 except Exception as e
:
7683 logging_text
+ "kafka_write notification Exception {}".format(e
)
7685 self
.logger
.debug(logging_text
+ "Exit")
7686 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7697 :param logging_text: preffix text to use at logging
7698 :param nsr_id: nsr identity
7699 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7700 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7701 :return: None or exception
7703 def get_vim_account(vim_account_id
):
7705 if vim_account_id
in db_vims
:
7706 return db_vims
[vim_account_id
]
7707 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7708 db_vims
[vim_account_id
] = db_vim
7713 ns_params
= db_nslcmop
.get("operationParams")
7714 if ns_params
and ns_params
.get("timeout_ns_heal"):
7715 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7717 timeout_ns_heal
= self
.timeout
.get(
7718 "ns_heal", self
.timeout_ns_heal
7723 nslcmop_id
= db_nslcmop
["_id"]
7725 "action_id": nslcmop_id
,
7727 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7728 target
.update(db_nslcmop
.get("operationParams", {}))
7730 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7731 desc
= await self
.RO
.recreate(nsr_id
, target
)
7732 self
.logger
.debug("RO return > {}".format(desc
))
7733 action_id
= desc
["action_id"]
7734 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7735 await self
._wait
_ng
_ro
(
7736 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7742 "_admin.deployed.RO.operational-status": "running",
7743 "detailed-status": " ".join(stage
),
7745 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7746 self
._write
_op
_status
(nslcmop_id
, stage
)
7748 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7751 except Exception as e
:
7752 stage
[2] = "ERROR healing at VIM"
7753 #self.set_vnfr_at_error(db_vnfrs, str(e))
7755 "Error healing at VIM {}".format(e
),
7756 exc_info
=not isinstance(
7759 ROclient
.ROClientException
,
7785 task_instantiation_info
,
7788 # launch instantiate_N2VC in a asyncio task and register task object
7789 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7790 # if not found, create one entry and update database
7791 # fill db_nsr._admin.deployed.VCA.<index>
7794 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7796 if "execution-environment-list" in descriptor_config
:
7797 ee_list
= descriptor_config
.get("execution-environment-list", [])
7798 elif "juju" in descriptor_config
:
7799 ee_list
= [descriptor_config
] # ns charms
7800 else: # other types as script are not supported
7803 for ee_item
in ee_list
:
7806 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7807 ee_item
.get("juju"), ee_item
.get("helm-chart")
7810 ee_descriptor_id
= ee_item
.get("id")
7811 if ee_item
.get("juju"):
7812 vca_name
= ee_item
["juju"].get("charm")
7815 if ee_item
["juju"].get("charm") is not None
7818 if ee_item
["juju"].get("cloud") == "k8s":
7819 vca_type
= "k8s_proxy_charm"
7820 elif ee_item
["juju"].get("proxy") is False:
7821 vca_type
= "native_charm"
7822 elif ee_item
.get("helm-chart"):
7823 vca_name
= ee_item
["helm-chart"]
7824 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7827 vca_type
= "helm-v3"
7830 logging_text
+ "skipping non juju neither charm configuration"
7835 for vca_index
, vca_deployed
in enumerate(
7836 db_nsr
["_admin"]["deployed"]["VCA"]
7838 if not vca_deployed
:
7841 vca_deployed
.get("member-vnf-index") == member_vnf_index
7842 and vca_deployed
.get("vdu_id") == vdu_id
7843 and vca_deployed
.get("kdu_name") == kdu_name
7844 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7845 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7849 # not found, create one.
7851 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7854 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7856 target
+= "/kdu/{}".format(kdu_name
)
7858 "target_element": target
,
7859 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7860 "member-vnf-index": member_vnf_index
,
7862 "kdu_name": kdu_name
,
7863 "vdu_count_index": vdu_index
,
7864 "operational-status": "init", # TODO revise
7865 "detailed-status": "", # TODO revise
7866 "step": "initial-deploy", # TODO revise
7868 "vdu_name": vdu_name
,
7870 "ee_descriptor_id": ee_descriptor_id
,
7874 # create VCA and configurationStatus in db
7876 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7877 "configurationStatus.{}".format(vca_index
): dict(),
7879 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7881 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7883 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7884 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7885 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7888 task_n2vc
= asyncio
.ensure_future(
7890 logging_text
=logging_text
,
7891 vca_index
=vca_index
,
7897 vdu_index
=vdu_index
,
7898 deploy_params
=deploy_params
,
7899 config_descriptor
=descriptor_config
,
7900 base_folder
=base_folder
,
7901 nslcmop_id
=nslcmop_id
,
7905 ee_config_descriptor
=ee_item
,
7908 self
.lcm_tasks
.register(
7912 "instantiate_N2VC-{}".format(vca_index
),
7915 task_instantiation_info
[
7917 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7918 member_vnf_index
or "", vdu_id
or ""
7921 async def heal_N2VC(
7938 ee_config_descriptor
,
7940 nsr_id
= db_nsr
["_id"]
7941 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7942 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7943 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7944 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7946 "collection": "nsrs",
7947 "filter": {"_id": nsr_id
},
7948 "path": db_update_entry
,
7954 element_under_configuration
= nsr_id
7958 vnfr_id
= db_vnfr
["_id"]
7959 osm_config
["osm"]["vnf_id"] = vnfr_id
7961 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
7963 if vca_type
== "native_charm":
7966 index_number
= vdu_index
or 0
7969 element_type
= "VNF"
7970 element_under_configuration
= vnfr_id
7971 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
7973 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
7974 element_type
= "VDU"
7975 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
7976 osm_config
["osm"]["vdu_id"] = vdu_id
7978 namespace
+= ".{}".format(kdu_name
)
7979 element_type
= "KDU"
7980 element_under_configuration
= kdu_name
7981 osm_config
["osm"]["kdu_name"] = kdu_name
7984 if base_folder
["pkg-dir"]:
7985 artifact_path
= "{}/{}/{}/{}".format(
7986 base_folder
["folder"],
7987 base_folder
["pkg-dir"],
7990 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
7995 artifact_path
= "{}/Scripts/{}/{}/".format(
7996 base_folder
["folder"],
7999 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8004 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8006 # get initial_config_primitive_list that applies to this element
8007 initial_config_primitive_list
= config_descriptor
.get(
8008 "initial-config-primitive"
8012 "Initial config primitive list > {}".format(
8013 initial_config_primitive_list
8017 # add config if not present for NS charm
8018 ee_descriptor_id
= ee_config_descriptor
.get("id")
8019 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8020 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8021 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8025 "Initial config primitive list #2 > {}".format(
8026 initial_config_primitive_list
8029 # n2vc_redesign STEP 3.1
8030 # find old ee_id if exists
8031 ee_id
= vca_deployed
.get("ee_id")
8033 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8034 # create or register execution environment in VCA. Only for native charms when healing
8035 if vca_type
== "native_charm":
8036 step
= "Waiting to VM being up and getting IP address"
8037 self
.logger
.debug(logging_text
+ step
)
8038 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8047 credentials
= {"hostname": rw_mgmt_ip
}
8049 username
= deep_get(
8050 config_descriptor
, ("config-access", "ssh-access", "default-user")
8052 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8053 # merged. Meanwhile let's get username from initial-config-primitive
8054 if not username
and initial_config_primitive_list
:
8055 for config_primitive
in initial_config_primitive_list
:
8056 for param
in config_primitive
.get("parameter", ()):
8057 if param
["name"] == "ssh-username":
8058 username
= param
["value"]
8062 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8063 "'config-access.ssh-access.default-user'"
8065 credentials
["username"] = username
8067 # n2vc_redesign STEP 3.2
8068 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8069 self
._write
_configuration
_status
(
8071 vca_index
=vca_index
,
8072 status
="REGISTERING",
8073 element_under_configuration
=element_under_configuration
,
8074 element_type
=element_type
,
8077 step
= "register execution environment {}".format(credentials
)
8078 self
.logger
.debug(logging_text
+ step
)
8079 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8080 credentials
=credentials
,
8081 namespace
=namespace
,
8086 # update ee_id en db
8088 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8090 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8092 # for compatibility with MON/POL modules, the need model and application name at database
8093 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8094 # Not sure if this need to be done when healing
8096 ee_id_parts = ee_id.split(".")
8097 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8098 if len(ee_id_parts) >= 2:
8099 model_name = ee_id_parts[0]
8100 application_name = ee_id_parts[1]
8101 db_nsr_update[db_update_entry + "model"] = model_name
8102 db_nsr_update[db_update_entry + "application"] = application_name
8105 # n2vc_redesign STEP 3.3
8106 # Install configuration software. Only for native charms.
8107 step
= "Install configuration Software"
8109 self
._write
_configuration
_status
(
8111 vca_index
=vca_index
,
8112 status
="INSTALLING SW",
8113 element_under_configuration
=element_under_configuration
,
8114 element_type
=element_type
,
8115 #other_update=db_nsr_update,
8119 # TODO check if already done
8120 self
.logger
.debug(logging_text
+ step
)
8122 if vca_type
== "native_charm":
8123 config_primitive
= next(
8124 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8127 if config_primitive
:
8128 config
= self
._map
_primitive
_params
(
8129 config_primitive
, {}, deploy_params
8131 await self
.vca_map
[vca_type
].install_configuration_sw(
8133 artifact_path
=artifact_path
,
8141 # write in db flag of configuration_sw already installed
8143 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8146 # Not sure if this need to be done when healing
8148 # add relations for this VCA (wait for other peers related with this VCA)
8149 await self._add_vca_relations(
8150 logging_text=logging_text,
8153 vca_index=vca_index,
8157 # if SSH access is required, then get execution environment SSH public
8158 # if native charm we have waited already to VM be UP
8159 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8162 # self.logger.debug("get ssh key block")
8164 config_descriptor
, ("config-access", "ssh-access", "required")
8166 # self.logger.debug("ssh key needed")
8167 # Needed to inject a ssh key
8170 ("config-access", "ssh-access", "default-user"),
8172 step
= "Install configuration Software, getting public ssh key"
8173 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8174 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8177 step
= "Insert public key into VM user={} ssh_key={}".format(
8181 # self.logger.debug("no need to get ssh key")
8182 step
= "Waiting to VM being up and getting IP address"
8183 self
.logger
.debug(logging_text
+ step
)
8185 # n2vc_redesign STEP 5.1
8186 # wait for RO (ip-address) Insert pub_key into VM
8187 # IMPORTANT: We need do wait for RO to complete healing operation.
8188 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8191 rw_mgmt_ip
= await self
.wait_kdu_up(
8192 logging_text
, nsr_id
, vnfr_id
, kdu_name
8195 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8205 rw_mgmt_ip
= None # This is for a NS configuration
8207 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8209 # store rw_mgmt_ip in deploy params for later replacement
8210 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8213 # get run-day1 operation parameter
8214 runDay1
= deploy_params
.get("run-day1",False)
8215 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8217 # n2vc_redesign STEP 6 Execute initial config primitive
8218 step
= "execute initial config primitive"
8220 # wait for dependent primitives execution (NS -> VNF -> VDU)
8221 if initial_config_primitive_list
:
8222 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8224 # stage, in function of element type: vdu, kdu, vnf or ns
8225 my_vca
= vca_deployed_list
[vca_index
]
8226 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8228 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8229 elif my_vca
.get("member-vnf-index"):
8231 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8234 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8236 self
._write
_configuration
_status
(
8237 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8240 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8242 check_if_terminated_needed
= True
8243 for initial_config_primitive
in initial_config_primitive_list
:
8244 # adding information on the vca_deployed if it is a NS execution environment
8245 if not vca_deployed
["member-vnf-index"]:
8246 deploy_params
["ns_config_info"] = json
.dumps(
8247 self
._get
_ns
_config
_info
(nsr_id
)
8249 # TODO check if already done
8250 primitive_params_
= self
._map
_primitive
_params
(
8251 initial_config_primitive
, {}, deploy_params
8254 step
= "execute primitive '{}' params '{}'".format(
8255 initial_config_primitive
["name"], primitive_params_
8257 self
.logger
.debug(logging_text
+ step
)
8258 await self
.vca_map
[vca_type
].exec_primitive(
8260 primitive_name
=initial_config_primitive
["name"],
8261 params_dict
=primitive_params_
,
8266 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8267 if check_if_terminated_needed
:
8268 if config_descriptor
.get("terminate-config-primitive"):
8270 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8272 check_if_terminated_needed
= False
8274 # TODO register in database that primitive is done
8276 # STEP 7 Configure metrics
8277 # Not sure if this need to be done when healing
8279 if vca_type == "helm" or vca_type == "helm-v3":
8280 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8282 artifact_path=artifact_path,
8283 ee_config_descriptor=ee_config_descriptor,
8286 target_ip=rw_mgmt_ip,
8292 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8295 for job in prometheus_jobs:
8298 {"job_name": job["job_name"]},
8301 fail_on_empty=False,
8305 step
= "instantiated at VCA"
8306 self
.logger
.debug(logging_text
+ step
)
8308 self
._write
_configuration
_status
(
8309 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8312 except Exception as e
: # TODO not use Exception but N2VC exception
8313 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8315 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8318 "Exception while {} : {}".format(step
, e
), exc_info
=True
8320 self
._write
_configuration
_status
(
8321 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8323 raise LcmException("{} {}".format(step
, e
)) from e
8325 async def _wait_heal_ro(
8331 while time() <= start_time
+ timeout
:
8332 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8333 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8334 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8335 if operational_status_ro
!= "healing":
8337 await asyncio
.sleep(15, loop
=self
.loop
)
8338 else: # timeout_ns_deploy
8339 raise NgRoException("Timeout waiting ns to deploy")