1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 SUBOPERATION_STATUS_NOT_FOUND
= -1
138 SUBOPERATION_STATUS_NEW
= -2
139 SUBOPERATION_STATUS_SKIP
= -3
140 task_name_deploy_vca
= "Deploying VCA"
142 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
144 Init, Connect to database, filesystem storage, and messaging
145 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
148 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
150 self
.db
= Database().instance
.db
151 self
.fs
= Filesystem().instance
.fs
153 self
.lcm_tasks
= lcm_tasks
154 self
.timeout
= config
["timeout"]
155 self
.ro_config
= config
["ro_config"]
156 self
.ng_ro
= config
["ro_config"].get("ng")
157 self
.vca_config
= config
["VCA"].copy()
159 # create N2VC connector
160 self
.n2vc
= N2VCJujuConnector(
163 on_update_db
=self
._on
_update
_n
2vc
_db
,
168 self
.conn_helm_ee
= LCMHelmConn(
171 vca_config
=self
.vca_config
,
172 on_update_db
=self
._on
_update
_n
2vc
_db
,
175 self
.k8sclusterhelm2
= K8sHelmConnector(
176 kubectl_command
=self
.vca_config
.get("kubectlpath"),
177 helm_command
=self
.vca_config
.get("helmpath"),
184 self
.k8sclusterhelm3
= K8sHelm3Connector(
185 kubectl_command
=self
.vca_config
.get("kubectlpath"),
186 helm_command
=self
.vca_config
.get("helm3path"),
193 self
.k8sclusterjuju
= K8sJujuConnector(
194 kubectl_command
=self
.vca_config
.get("kubectlpath"),
195 juju_command
=self
.vca_config
.get("jujupath"),
198 on_update_db
=self
._on
_update
_k
8s
_db
,
203 self
.k8scluster_map
= {
204 "helm-chart": self
.k8sclusterhelm2
,
205 "helm-chart-v3": self
.k8sclusterhelm3
,
206 "chart": self
.k8sclusterhelm3
,
207 "juju-bundle": self
.k8sclusterjuju
,
208 "juju": self
.k8sclusterjuju
,
212 "lxc_proxy_charm": self
.n2vc
,
213 "native_charm": self
.n2vc
,
214 "k8s_proxy_charm": self
.n2vc
,
215 "helm": self
.conn_helm_ee
,
216 "helm-v3": self
.conn_helm_ee
,
220 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
222 self
.op_status_map
= {
223 "instantiation": self
.RO
.status
,
224 "termination": self
.RO
.status
,
225 "migrate": self
.RO
.status
,
226 "healing": self
.RO
.recreate_status
,
230 def increment_ip_mac(ip_mac
, vm_index
=1):
231 if not isinstance(ip_mac
, str):
234 # try with ipv4 look for last dot
235 i
= ip_mac
.rfind(".")
238 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
239 # try with ipv6 or mac look for last colon. Operate in hex
240 i
= ip_mac
.rfind(":")
243 # format in hex, len can be 2 for mac or 4 for ipv6
244 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
245 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
251 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
253 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
256 # TODO filter RO descriptor fields...
260 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
261 db_dict
["deploymentStatus"] = ro_descriptor
262 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
264 except Exception as e
:
266 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
269 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
271 # remove last dot from path (if exists)
272 if path
.endswith("."):
275 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
276 # .format(table, filter, path, updated_data))
279 nsr_id
= filter.get("_id")
281 # read ns record from database
282 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
283 current_ns_status
= nsr
.get("nsState")
285 # get vca status for NS
286 status_dict
= await self
.n2vc
.get_status(
287 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
292 db_dict
["vcaStatus"] = status_dict
293 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
295 # update configurationStatus for this VCA
297 vca_index
= int(path
[path
.rfind(".") + 1 :])
300 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
302 vca_status
= vca_list
[vca_index
].get("status")
304 configuration_status_list
= nsr
.get("configurationStatus")
305 config_status
= configuration_status_list
[vca_index
].get("status")
307 if config_status
== "BROKEN" and vca_status
!= "failed":
308 db_dict
["configurationStatus"][vca_index
] = "READY"
309 elif config_status
!= "BROKEN" and vca_status
== "failed":
310 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
311 except Exception as e
:
312 # not update configurationStatus
313 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
315 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
316 # if nsState = 'DEGRADED' check if all is OK
318 if current_ns_status
in ("READY", "DEGRADED"):
319 error_description
= ""
321 if status_dict
.get("machines"):
322 for machine_id
in status_dict
.get("machines"):
323 machine
= status_dict
.get("machines").get(machine_id
)
324 # check machine agent-status
325 if machine
.get("agent-status"):
326 s
= machine
.get("agent-status").get("status")
329 error_description
+= (
330 "machine {} agent-status={} ; ".format(
334 # check machine instance status
335 if machine
.get("instance-status"):
336 s
= machine
.get("instance-status").get("status")
339 error_description
+= (
340 "machine {} instance-status={} ; ".format(
345 if status_dict
.get("applications"):
346 for app_id
in status_dict
.get("applications"):
347 app
= status_dict
.get("applications").get(app_id
)
348 # check application status
349 if app
.get("status"):
350 s
= app
.get("status").get("status")
353 error_description
+= (
354 "application {} status={} ; ".format(app_id
, s
)
357 if error_description
:
358 db_dict
["errorDescription"] = error_description
359 if current_ns_status
== "READY" and is_degraded
:
360 db_dict
["nsState"] = "DEGRADED"
361 if current_ns_status
== "DEGRADED" and not is_degraded
:
362 db_dict
["nsState"] = "READY"
365 self
.update_db_2("nsrs", nsr_id
, db_dict
)
367 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
369 except Exception as e
:
370 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
372 async def _on_update_k8s_db(
373 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
376 Updating vca status in NSR record
377 :param cluster_uuid: UUID of a k8s cluster
378 :param kdu_instance: The unique name of the KDU instance
379 :param filter: To get nsr_id
380 :cluster_type: The cluster type (juju, k8s)
384 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
385 # .format(cluster_uuid, kdu_instance, filter))
387 nsr_id
= filter.get("_id")
389 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
390 cluster_uuid
=cluster_uuid
,
391 kdu_instance
=kdu_instance
,
393 complete_status
=True,
399 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
401 if cluster_type
in ("juju-bundle", "juju"):
402 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
403 # status in a similar way between Juju Bundles and Helm Charts on this side
404 await self
.k8sclusterjuju
.update_vca_status(
405 db_dict
["vcaStatus"],
411 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
415 self
.update_db_2("nsrs", nsr_id
, db_dict
)
416 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
418 except Exception as e
:
419 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
422 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
424 env
= Environment(undefined
=StrictUndefined
)
425 template
= env
.from_string(cloud_init_text
)
426 return template
.render(additional_params
or {})
427 except UndefinedError
as e
:
429 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
430 "file, must be provided in the instantiation parameters inside the "
431 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
433 except (TemplateError
, TemplateNotFound
) as e
:
435 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
440 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
441 cloud_init_content
= cloud_init_file
= None
443 if vdu
.get("cloud-init-file"):
444 base_folder
= vnfd
["_admin"]["storage"]
445 if base_folder
["pkg-dir"]:
446 cloud_init_file
= "{}/{}/cloud_init/{}".format(
447 base_folder
["folder"],
448 base_folder
["pkg-dir"],
449 vdu
["cloud-init-file"],
452 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
453 base_folder
["folder"],
454 vdu
["cloud-init-file"],
456 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
457 cloud_init_content
= ci_file
.read()
458 elif vdu
.get("cloud-init"):
459 cloud_init_content
= vdu
["cloud-init"]
461 return cloud_init_content
462 except FsException
as e
:
464 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
465 vnfd
["id"], vdu
["id"], cloud_init_file
, e
469 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
471 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
473 additional_params
= vdur
.get("additionalParams")
474 return parse_yaml_strings(additional_params
)
476 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
478 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
479 :param vnfd: input vnfd
480 :param new_id: overrides vnf id if provided
481 :param additionalParams: Instantiation params for VNFs provided
482 :param nsrId: Id of the NSR
483 :return: copy of vnfd
485 vnfd_RO
= deepcopy(vnfd
)
486 # remove unused by RO configuration, monitoring, scaling and internal keys
487 vnfd_RO
.pop("_id", None)
488 vnfd_RO
.pop("_admin", None)
489 vnfd_RO
.pop("monitoring-param", None)
490 vnfd_RO
.pop("scaling-group-descriptor", None)
491 vnfd_RO
.pop("kdu", None)
492 vnfd_RO
.pop("k8s-cluster", None)
494 vnfd_RO
["id"] = new_id
496 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
497 for vdu
in get_iterable(vnfd_RO
, "vdu"):
498 vdu
.pop("cloud-init-file", None)
499 vdu
.pop("cloud-init", None)
503 def ip_profile_2_RO(ip_profile
):
504 RO_ip_profile
= deepcopy(ip_profile
)
505 if "dns-server" in RO_ip_profile
:
506 if isinstance(RO_ip_profile
["dns-server"], list):
507 RO_ip_profile
["dns-address"] = []
508 for ds
in RO_ip_profile
.pop("dns-server"):
509 RO_ip_profile
["dns-address"].append(ds
["address"])
511 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
512 if RO_ip_profile
.get("ip-version") == "ipv4":
513 RO_ip_profile
["ip-version"] = "IPv4"
514 if RO_ip_profile
.get("ip-version") == "ipv6":
515 RO_ip_profile
["ip-version"] = "IPv6"
516 if "dhcp-params" in RO_ip_profile
:
517 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
520 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
521 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
522 if db_vim
["_admin"]["operationalState"] != "ENABLED":
524 "VIM={} is not available. operationalState={}".format(
525 vim_account
, db_vim
["_admin"]["operationalState"]
528 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
531 def get_ro_wim_id_for_wim_account(self
, wim_account
):
532 if isinstance(wim_account
, str):
533 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
534 if db_wim
["_admin"]["operationalState"] != "ENABLED":
536 "WIM={} is not available. operationalState={}".format(
537 wim_account
, db_wim
["_admin"]["operationalState"]
540 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
545 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
547 db_vdu_push_list
= []
549 db_update
= {"_admin.modified": time()}
551 for vdu_id
, vdu_count
in vdu_create
.items():
555 for vdur
in reversed(db_vnfr
["vdur"])
556 if vdur
["vdu-id-ref"] == vdu_id
561 # Read the template saved in the db:
563 "No vdur in the database. Using the vdur-template to scale"
565 vdur_template
= db_vnfr
.get("vdur-template")
566 if not vdur_template
:
568 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
572 vdur
= vdur_template
[0]
573 # Delete a template from the database after using it
576 {"_id": db_vnfr
["_id"]},
578 pull
={"vdur-template": {"_id": vdur
["_id"]}},
580 for count
in range(vdu_count
):
581 vdur_copy
= deepcopy(vdur
)
582 vdur_copy
["status"] = "BUILD"
583 vdur_copy
["status-detailed"] = None
584 vdur_copy
["ip-address"] = None
585 vdur_copy
["_id"] = str(uuid4())
586 vdur_copy
["count-index"] += count
+ 1
587 vdur_copy
["id"] = "{}-{}".format(
588 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
590 vdur_copy
.pop("vim_info", None)
591 for iface
in vdur_copy
["interfaces"]:
592 if iface
.get("fixed-ip"):
593 iface
["ip-address"] = self
.increment_ip_mac(
594 iface
["ip-address"], count
+ 1
597 iface
.pop("ip-address", None)
598 if iface
.get("fixed-mac"):
599 iface
["mac-address"] = self
.increment_ip_mac(
600 iface
["mac-address"], count
+ 1
603 iface
.pop("mac-address", None)
607 ) # only first vdu can be managment of vnf
608 db_vdu_push_list
.append(vdur_copy
)
609 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
611 if len(db_vnfr
["vdur"]) == 1:
612 # The scale will move to 0 instances
614 "Scaling to 0 !, creating the template with the last vdur"
616 template_vdur
= [db_vnfr
["vdur"][0]]
617 for vdu_id
, vdu_count
in vdu_delete
.items():
619 indexes_to_delete
= [
621 for iv
in enumerate(db_vnfr
["vdur"])
622 if iv
[1]["vdu-id-ref"] == vdu_id
626 "vdur.{}.status".format(i
): "DELETING"
627 for i
in indexes_to_delete
[-vdu_count
:]
631 # it must be deleted one by one because common.db does not allow otherwise
634 for v
in reversed(db_vnfr
["vdur"])
635 if v
["vdu-id-ref"] == vdu_id
637 for vdu
in vdus_to_delete
[:vdu_count
]:
640 {"_id": db_vnfr
["_id"]},
642 pull
={"vdur": {"_id": vdu
["_id"]}},
646 db_push
["vdur"] = db_vdu_push_list
648 db_push
["vdur-template"] = template_vdur
651 db_vnfr
["vdur-template"] = template_vdur
652 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
653 # modify passed dictionary db_vnfr
654 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
655 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
657 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
659 Updates database nsr with the RO info for the created vld
660 :param ns_update_nsr: dictionary to be filled with the updated info
661 :param db_nsr: content of db_nsr. This is also modified
662 :param nsr_desc_RO: nsr descriptor from RO
663 :return: Nothing, LcmException is raised on errors
666 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
667 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
668 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
670 vld
["vim-id"] = net_RO
.get("vim_net_id")
671 vld
["name"] = net_RO
.get("vim_name")
672 vld
["status"] = net_RO
.get("status")
673 vld
["status-detailed"] = net_RO
.get("error_msg")
674 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
678 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
681 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
683 for db_vnfr
in db_vnfrs
.values():
684 vnfr_update
= {"status": "ERROR"}
685 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
686 if "status" not in vdur
:
687 vdur
["status"] = "ERROR"
688 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
690 vdur
["status-detailed"] = str(error_text
)
692 "vdur.{}.status-detailed".format(vdu_index
)
694 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
695 except DbException
as e
:
696 self
.logger
.error("Cannot update vnf. {}".format(e
))
698 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
700 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
701 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
702 :param nsr_desc_RO: nsr descriptor from RO
703 :return: Nothing, LcmException is raised on errors
705 for vnf_index
, db_vnfr
in db_vnfrs
.items():
706 for vnf_RO
in nsr_desc_RO
["vnfs"]:
707 if vnf_RO
["member_vnf_index"] != vnf_index
:
710 if vnf_RO
.get("ip_address"):
711 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
714 elif not db_vnfr
.get("ip-address"):
715 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
716 raise LcmExceptionNoMgmtIP(
717 "ns member_vnf_index '{}' has no IP address".format(
722 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
723 vdur_RO_count_index
= 0
724 if vdur
.get("pdu-type"):
726 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
727 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
729 if vdur
["count-index"] != vdur_RO_count_index
:
730 vdur_RO_count_index
+= 1
732 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
733 if vdur_RO
.get("ip_address"):
734 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
736 vdur
["ip-address"] = None
737 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
738 vdur
["name"] = vdur_RO
.get("vim_name")
739 vdur
["status"] = vdur_RO
.get("status")
740 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
741 for ifacer
in get_iterable(vdur
, "interfaces"):
742 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
743 if ifacer
["name"] == interface_RO
.get("internal_name"):
744 ifacer
["ip-address"] = interface_RO
.get(
747 ifacer
["mac-address"] = interface_RO
.get(
753 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
754 "from VIM info".format(
755 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
758 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
762 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
764 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
768 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
769 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
770 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
772 vld
["vim-id"] = net_RO
.get("vim_net_id")
773 vld
["name"] = net_RO
.get("vim_name")
774 vld
["status"] = net_RO
.get("status")
775 vld
["status-detailed"] = net_RO
.get("error_msg")
776 vnfr_update
["vld.{}".format(vld_index
)] = vld
780 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
785 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
790 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
795 def _get_ns_config_info(self
, nsr_id
):
797 Generates a mapping between vnf,vdu elements and the N2VC id
798 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
799 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
800 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
801 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
803 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
804 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
806 ns_config_info
= {"osm-config-mapping": mapping
}
807 for vca
in vca_deployed_list
:
808 if not vca
["member-vnf-index"]:
810 if not vca
["vdu_id"]:
811 mapping
[vca
["member-vnf-index"]] = vca
["application"]
815 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
817 ] = vca
["application"]
818 return ns_config_info
820 async def _instantiate_ng_ro(
837 def get_vim_account(vim_account_id
):
839 if vim_account_id
in db_vims
:
840 return db_vims
[vim_account_id
]
841 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
842 db_vims
[vim_account_id
] = db_vim
845 # modify target_vld info with instantiation parameters
846 def parse_vld_instantiation_params(
847 target_vim
, target_vld
, vld_params
, target_sdn
849 if vld_params
.get("ip-profile"):
850 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
853 if vld_params
.get("provider-network"):
854 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
857 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
858 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
861 if vld_params
.get("wimAccountId"):
862 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
863 target_vld
["vim_info"][target_wim
] = {}
864 for param
in ("vim-network-name", "vim-network-id"):
865 if vld_params
.get(param
):
866 if isinstance(vld_params
[param
], dict):
867 for vim
, vim_net
in vld_params
[param
].items():
868 other_target_vim
= "vim:" + vim
870 target_vld
["vim_info"],
871 (other_target_vim
, param
.replace("-", "_")),
874 else: # isinstance str
875 target_vld
["vim_info"][target_vim
][
876 param
.replace("-", "_")
877 ] = vld_params
[param
]
878 if vld_params
.get("common_id"):
879 target_vld
["common_id"] = vld_params
.get("common_id")
881 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
882 def update_ns_vld_target(target
, ns_params
):
883 for vnf_params
in ns_params
.get("vnf", ()):
884 if vnf_params
.get("vimAccountId"):
888 for vnfr
in db_vnfrs
.values()
889 if vnf_params
["member-vnf-index"]
890 == vnfr
["member-vnf-index-ref"]
894 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
895 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
896 target_vld
= find_in_list(
897 get_iterable(vdur
, "interfaces"),
898 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
901 vld_params
= find_in_list(
902 get_iterable(ns_params
, "vld"),
903 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
907 if vnf_params
.get("vimAccountId") not in a_vld
.get(
910 target_vim_network_list
= [
911 v
for _
, v
in a_vld
.get("vim_info").items()
913 target_vim_network_name
= next(
915 item
.get("vim_network_name", "")
916 for item
in target_vim_network_list
921 target
["ns"]["vld"][a_index
].get("vim_info").update(
923 "vim:{}".format(vnf_params
["vimAccountId"]): {
924 "vim_network_name": target_vim_network_name
,
930 for param
in ("vim-network-name", "vim-network-id"):
931 if vld_params
.get(param
) and isinstance(
932 vld_params
[param
], dict
934 for vim
, vim_net
in vld_params
[
937 other_target_vim
= "vim:" + vim
939 target
["ns"]["vld"][a_index
].get(
944 param
.replace("-", "_"),
949 nslcmop_id
= db_nslcmop
["_id"]
951 "name": db_nsr
["name"],
954 "image": deepcopy(db_nsr
["image"]),
955 "flavor": deepcopy(db_nsr
["flavor"]),
956 "action_id": nslcmop_id
,
957 "cloud_init_content": {},
959 for image
in target
["image"]:
960 image
["vim_info"] = {}
961 for flavor
in target
["flavor"]:
962 flavor
["vim_info"] = {}
963 if db_nsr
.get("affinity-or-anti-affinity-group"):
964 target
["affinity-or-anti-affinity-group"] = deepcopy(
965 db_nsr
["affinity-or-anti-affinity-group"]
967 for affinity_or_anti_affinity_group
in target
[
968 "affinity-or-anti-affinity-group"
970 affinity_or_anti_affinity_group
["vim_info"] = {}
972 if db_nslcmop
.get("lcmOperationType") != "instantiate":
973 # get parameters of instantiation:
974 db_nslcmop_instantiate
= self
.db
.get_list(
977 "nsInstanceId": db_nslcmop
["nsInstanceId"],
978 "lcmOperationType": "instantiate",
981 ns_params
= db_nslcmop_instantiate
.get("operationParams")
983 ns_params
= db_nslcmop
.get("operationParams")
984 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
985 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
988 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
989 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
993 "mgmt-network": vld
.get("mgmt-network", False),
994 "type": vld
.get("type"),
997 "vim_network_name": vld
.get("vim-network-name"),
998 "vim_account_id": ns_params
["vimAccountId"],
1002 # check if this network needs SDN assist
1003 if vld
.get("pci-interfaces"):
1004 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1005 sdnc_id
= db_vim
["config"].get("sdn-controller")
1007 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1008 target_sdn
= "sdn:{}".format(sdnc_id
)
1009 target_vld
["vim_info"][target_sdn
] = {
1011 "target_vim": target_vim
,
1013 "type": vld
.get("type"),
1016 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1017 for nsd_vnf_profile
in nsd_vnf_profiles
:
1018 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1019 if cp
["virtual-link-profile-id"] == vld
["id"]:
1021 "member_vnf:{}.{}".format(
1022 cp
["constituent-cpd-id"][0][
1023 "constituent-base-element-id"
1025 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1027 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1029 # check at nsd descriptor, if there is an ip-profile
1031 nsd_vlp
= find_in_list(
1032 get_virtual_link_profiles(nsd
),
1033 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1038 and nsd_vlp
.get("virtual-link-protocol-data")
1039 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1041 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1044 ip_profile_dest_data
= {}
1045 if "ip-version" in ip_profile_source_data
:
1046 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1049 if "cidr" in ip_profile_source_data
:
1050 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1053 if "gateway-ip" in ip_profile_source_data
:
1054 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1057 if "dhcp-enabled" in ip_profile_source_data
:
1058 ip_profile_dest_data
["dhcp-params"] = {
1059 "enabled": ip_profile_source_data
["dhcp-enabled"]
1061 vld_params
["ip-profile"] = ip_profile_dest_data
1063 # update vld_params with instantiation params
1064 vld_instantiation_params
= find_in_list(
1065 get_iterable(ns_params
, "vld"),
1066 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1068 if vld_instantiation_params
:
1069 vld_params
.update(vld_instantiation_params
)
1070 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1071 target
["ns"]["vld"].append(target_vld
)
1072 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1073 update_ns_vld_target(target
, ns_params
)
1075 for vnfr
in db_vnfrs
.values():
1076 vnfd
= find_in_list(
1077 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1079 vnf_params
= find_in_list(
1080 get_iterable(ns_params
, "vnf"),
1081 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1083 target_vnf
= deepcopy(vnfr
)
1084 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1085 for vld
in target_vnf
.get("vld", ()):
1086 # check if connected to a ns.vld, to fill target'
1087 vnf_cp
= find_in_list(
1088 vnfd
.get("int-virtual-link-desc", ()),
1089 lambda cpd
: cpd
.get("id") == vld
["id"],
1092 ns_cp
= "member_vnf:{}.{}".format(
1093 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1095 if cp2target
.get(ns_cp
):
1096 vld
["target"] = cp2target
[ns_cp
]
1099 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1101 # check if this network needs SDN assist
1103 if vld
.get("pci-interfaces"):
1104 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1105 sdnc_id
= db_vim
["config"].get("sdn-controller")
1107 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1108 target_sdn
= "sdn:{}".format(sdnc_id
)
1109 vld
["vim_info"][target_sdn
] = {
1111 "target_vim": target_vim
,
1113 "type": vld
.get("type"),
1116 # check at vnfd descriptor, if there is an ip-profile
1118 vnfd_vlp
= find_in_list(
1119 get_virtual_link_profiles(vnfd
),
1120 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1124 and vnfd_vlp
.get("virtual-link-protocol-data")
1125 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1127 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1130 ip_profile_dest_data
= {}
1131 if "ip-version" in ip_profile_source_data
:
1132 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1135 if "cidr" in ip_profile_source_data
:
1136 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1139 if "gateway-ip" in ip_profile_source_data
:
1140 ip_profile_dest_data
[
1142 ] = ip_profile_source_data
["gateway-ip"]
1143 if "dhcp-enabled" in ip_profile_source_data
:
1144 ip_profile_dest_data
["dhcp-params"] = {
1145 "enabled": ip_profile_source_data
["dhcp-enabled"]
1148 vld_params
["ip-profile"] = ip_profile_dest_data
1149 # update vld_params with instantiation params
1151 vld_instantiation_params
= find_in_list(
1152 get_iterable(vnf_params
, "internal-vld"),
1153 lambda i_vld
: i_vld
["name"] == vld
["id"],
1155 if vld_instantiation_params
:
1156 vld_params
.update(vld_instantiation_params
)
1157 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1160 for vdur
in target_vnf
.get("vdur", ()):
1161 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1162 continue # This vdu must not be created
1163 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1165 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1168 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1169 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1172 and vdu_configuration
.get("config-access")
1173 and vdu_configuration
.get("config-access").get("ssh-access")
1175 vdur
["ssh-keys"] = ssh_keys_all
1176 vdur
["ssh-access-required"] = vdu_configuration
[
1178 ]["ssh-access"]["required"]
1181 and vnf_configuration
.get("config-access")
1182 and vnf_configuration
.get("config-access").get("ssh-access")
1183 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1185 vdur
["ssh-keys"] = ssh_keys_all
1186 vdur
["ssh-access-required"] = vnf_configuration
[
1188 ]["ssh-access"]["required"]
1189 elif ssh_keys_instantiation
and find_in_list(
1190 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1192 vdur
["ssh-keys"] = ssh_keys_instantiation
1194 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1196 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1198 if vdud
.get("cloud-init-file"):
1199 vdur
["cloud-init"] = "{}:file:{}".format(
1200 vnfd
["_id"], vdud
.get("cloud-init-file")
1202 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1203 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1204 base_folder
= vnfd
["_admin"]["storage"]
1205 if base_folder
["pkg-dir"]:
1206 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1207 base_folder
["folder"],
1208 base_folder
["pkg-dir"],
1209 vdud
.get("cloud-init-file"),
1212 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1213 base_folder
["folder"],
1214 vdud
.get("cloud-init-file"),
1216 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1217 target
["cloud_init_content"][
1220 elif vdud
.get("cloud-init"):
1221 vdur
["cloud-init"] = "{}:vdu:{}".format(
1222 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1224 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1225 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1228 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1229 deploy_params_vdu
= self
._format
_additional
_params
(
1230 vdur
.get("additionalParams") or {}
1232 deploy_params_vdu
["OSM"] = get_osm_params(
1233 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1235 vdur
["additionalParams"] = deploy_params_vdu
1238 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1239 if target_vim
not in ns_flavor
["vim_info"]:
1240 ns_flavor
["vim_info"][target_vim
] = {}
1243 # in case alternative images are provided we must check if they should be applied
1244 # for the vim_type, modify the vim_type taking into account
1245 ns_image_id
= int(vdur
["ns-image-id"])
1246 if vdur
.get("alt-image-ids"):
1247 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1248 vim_type
= db_vim
["vim_type"]
1249 for alt_image_id
in vdur
.get("alt-image-ids"):
1250 ns_alt_image
= target
["image"][int(alt_image_id
)]
1251 if vim_type
== ns_alt_image
.get("vim-type"):
1252 # must use alternative image
1254 "use alternative image id: {}".format(alt_image_id
)
1256 ns_image_id
= alt_image_id
1257 vdur
["ns-image-id"] = ns_image_id
1259 ns_image
= target
["image"][int(ns_image_id
)]
1260 if target_vim
not in ns_image
["vim_info"]:
1261 ns_image
["vim_info"][target_vim
] = {}
1264 if vdur
.get("affinity-or-anti-affinity-group-id"):
1265 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1266 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1267 if target_vim
not in ns_ags
["vim_info"]:
1268 ns_ags
["vim_info"][target_vim
] = {}
1270 vdur
["vim_info"] = {target_vim
: {}}
1271 # instantiation parameters
1273 vdu_instantiation_params
= find_in_list(
1274 get_iterable(vnf_params
, "vdu"),
1275 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1277 if vdu_instantiation_params
:
1278 # Parse the vdu_volumes from the instantiation params
1279 vdu_volumes
= get_volumes_from_instantiation_params(
1280 vdu_instantiation_params
, vdud
1282 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1283 vdur_list
.append(vdur
)
1284 target_vnf
["vdur"] = vdur_list
1285 target
["vnf"].append(target_vnf
)
1287 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1288 desc
= await self
.RO
.deploy(nsr_id
, target
)
1289 self
.logger
.debug("RO return > {}".format(desc
))
1290 action_id
= desc
["action_id"]
1291 await self
._wait
_ng
_ro
(
1292 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1293 operation
="instantiation"
1298 "_admin.deployed.RO.operational-status": "running",
1299 "detailed-status": " ".join(stage
),
1301 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1303 self
._write
_op
_status
(nslcmop_id
, stage
)
1305 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1309 async def _wait_ng_ro(
1319 detailed_status_old
= None
1321 start_time
= start_time
or time()
1322 while time() <= start_time
+ timeout
:
1323 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1324 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1325 if desc_status
["status"] == "FAILED":
1326 raise NgRoException(desc_status
["details"])
1327 elif desc_status
["status"] == "BUILD":
1329 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1330 elif desc_status
["status"] == "DONE":
1332 stage
[2] = "Deployed at VIM"
1335 assert False, "ROclient.check_ns_status returns unknown {}".format(
1336 desc_status
["status"]
1338 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1339 detailed_status_old
= stage
[2]
1340 db_nsr_update
["detailed-status"] = " ".join(stage
)
1341 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1342 self
._write
_op
_status
(nslcmop_id
, stage
)
1343 await asyncio
.sleep(15, loop
=self
.loop
)
1344 else: # timeout_ns_deploy
1345 raise NgRoException("Timeout waiting ns to deploy")
1347 async def _terminate_ng_ro(
1348 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1353 start_deploy
= time()
1360 "action_id": nslcmop_id
,
1362 desc
= await self
.RO
.deploy(nsr_id
, target
)
1363 action_id
= desc
["action_id"]
1364 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1365 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1368 + "ns terminate action at RO. action_id={}".format(action_id
)
1372 delete_timeout
= 20 * 60 # 20 minutes
1373 await self
._wait
_ng
_ro
(
1374 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1375 operation
="termination"
1378 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1379 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1381 await self
.RO
.delete(nsr_id
)
1382 except Exception as e
:
1383 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1384 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1385 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1386 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1388 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1390 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1391 failed_detail
.append("delete conflict: {}".format(e
))
1394 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1397 failed_detail
.append("delete error: {}".format(e
))
1400 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1404 stage
[2] = "Error deleting from VIM"
1406 stage
[2] = "Deleted from VIM"
1407 db_nsr_update
["detailed-status"] = " ".join(stage
)
1408 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1409 self
._write
_op
_status
(nslcmop_id
, stage
)
1412 raise LcmException("; ".join(failed_detail
))
1415 async def instantiate_RO(
1429 :param logging_text: preffix text to use at logging
1430 :param nsr_id: nsr identity
1431 :param nsd: database content of ns descriptor
1432 :param db_nsr: database content of ns record
1433 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1435 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1436 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1437 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1438 :return: None or exception
1441 start_deploy
= time()
1442 ns_params
= db_nslcmop
.get("operationParams")
1443 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1444 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1446 timeout_ns_deploy
= self
.timeout
.get(
1447 "ns_deploy", self
.timeout_ns_deploy
1450 # Check for and optionally request placement optimization. Database will be updated if placement activated
1451 stage
[2] = "Waiting for Placement."
1452 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1453 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1454 for vnfr
in db_vnfrs
.values():
1455 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1458 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1460 return await self
._instantiate
_ng
_ro
(
1473 except Exception as e
:
1474 stage
[2] = "ERROR deploying at VIM"
1475 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1477 "Error deploying at VIM {}".format(e
),
1478 exc_info
=not isinstance(
1481 ROclient
.ROClientException
,
1490 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1492 Wait for kdu to be up, get ip address
1493 :param logging_text: prefix use for logging
1497 :return: IP address, K8s services
1500 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1503 while nb_tries
< 360:
1504 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1508 for x
in get_iterable(db_vnfr
, "kdur")
1509 if x
.get("kdu-name") == kdu_name
1515 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1517 if kdur
.get("status"):
1518 if kdur
["status"] in ("READY", "ENABLED"):
1519 return kdur
.get("ip-address"), kdur
.get("services")
1522 "target KDU={} is in error state".format(kdu_name
)
1525 await asyncio
.sleep(10, loop
=self
.loop
)
1527 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1529 async def wait_vm_up_insert_key_ro(
1530 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1533 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1534 :param logging_text: prefix use for logging
1539 :param pub_key: public ssh key to inject, None to skip
1540 :param user: user to apply the public ssh key
1544 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1548 target_vdu_id
= None
1554 if ro_retries
>= 360: # 1 hour
1556 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1559 await asyncio
.sleep(10, loop
=self
.loop
)
1562 if not target_vdu_id
:
1563 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1565 if not vdu_id
: # for the VNF case
1566 if db_vnfr
.get("status") == "ERROR":
1568 "Cannot inject ssh-key because target VNF is in error state"
1570 ip_address
= db_vnfr
.get("ip-address")
1576 for x
in get_iterable(db_vnfr
, "vdur")
1577 if x
.get("ip-address") == ip_address
1585 for x
in get_iterable(db_vnfr
, "vdur")
1586 if x
.get("vdu-id-ref") == vdu_id
1587 and x
.get("count-index") == vdu_index
1593 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1594 ): # If only one, this should be the target vdu
1595 vdur
= db_vnfr
["vdur"][0]
1598 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1599 vnfr_id
, vdu_id
, vdu_index
1602 # New generation RO stores information at "vim_info"
1605 if vdur
.get("vim_info"):
1607 t
for t
in vdur
["vim_info"]
1608 ) # there should be only one key
1609 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1611 vdur
.get("pdu-type")
1612 or vdur
.get("status") == "ACTIVE"
1613 or ng_ro_status
== "ACTIVE"
1615 ip_address
= vdur
.get("ip-address")
1618 target_vdu_id
= vdur
["vdu-id-ref"]
1619 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1621 "Cannot inject ssh-key because target VM is in error state"
1624 if not target_vdu_id
:
1627 # inject public key into machine
1628 if pub_key
and user
:
1629 self
.logger
.debug(logging_text
+ "Inserting RO key")
1630 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1631 if vdur
.get("pdu-type"):
1632 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1635 ro_vm_id
= "{}-{}".format(
1636 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1637 ) # TODO add vdu_index
1641 "action": "inject_ssh_key",
1645 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1647 desc
= await self
.RO
.deploy(nsr_id
, target
)
1648 action_id
= desc
["action_id"]
1649 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1652 # wait until NS is deployed at RO
1654 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1655 ro_nsr_id
= deep_get(
1656 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1660 result_dict
= await self
.RO
.create_action(
1662 item_id_name
=ro_nsr_id
,
1664 "add_public_key": pub_key
,
1669 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1670 if not result_dict
or not isinstance(result_dict
, dict):
1672 "Unknown response from RO when injecting key"
1674 for result
in result_dict
.values():
1675 if result
.get("vim_result") == 200:
1678 raise ROclient
.ROClientException(
1679 "error injecting key: {}".format(
1680 result
.get("description")
1684 except NgRoException
as e
:
1686 "Reaching max tries injecting key. Error: {}".format(e
)
1688 except ROclient
.ROClientException
as e
:
1692 + "error injecting key: {}. Retrying until {} seconds".format(
1699 "Reaching max tries injecting key. Error: {}".format(e
)
1706 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1708 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1710 my_vca
= vca_deployed_list
[vca_index
]
1711 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1712 # vdu or kdu: no dependencies
1716 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1717 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1718 configuration_status_list
= db_nsr
["configurationStatus"]
1719 for index
, vca_deployed
in enumerate(configuration_status_list
):
1720 if index
== vca_index
:
1723 if not my_vca
.get("member-vnf-index") or (
1724 vca_deployed
.get("member-vnf-index")
1725 == my_vca
.get("member-vnf-index")
1727 internal_status
= configuration_status_list
[index
].get("status")
1728 if internal_status
== "READY":
1730 elif internal_status
== "BROKEN":
1732 "Configuration aborted because dependent charm/s has failed"
1737 # no dependencies, return
1739 await asyncio
.sleep(10)
1742 raise LcmException("Configuration aborted because dependent charm/s timeout")
1744 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1747 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1749 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1750 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1753 async def instantiate_N2VC(
1770 ee_config_descriptor
,
1772 nsr_id
= db_nsr
["_id"]
1773 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1774 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1775 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1776 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1778 "collection": "nsrs",
1779 "filter": {"_id": nsr_id
},
1780 "path": db_update_entry
,
1786 element_under_configuration
= nsr_id
1790 vnfr_id
= db_vnfr
["_id"]
1791 osm_config
["osm"]["vnf_id"] = vnfr_id
1793 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1795 if vca_type
== "native_charm":
1798 index_number
= vdu_index
or 0
1801 element_type
= "VNF"
1802 element_under_configuration
= vnfr_id
1803 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1805 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1806 element_type
= "VDU"
1807 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1808 osm_config
["osm"]["vdu_id"] = vdu_id
1810 namespace
+= ".{}".format(kdu_name
)
1811 element_type
= "KDU"
1812 element_under_configuration
= kdu_name
1813 osm_config
["osm"]["kdu_name"] = kdu_name
1816 if base_folder
["pkg-dir"]:
1817 artifact_path
= "{}/{}/{}/{}".format(
1818 base_folder
["folder"],
1819 base_folder
["pkg-dir"],
1822 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1827 artifact_path
= "{}/Scripts/{}/{}/".format(
1828 base_folder
["folder"],
1831 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1836 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1838 # get initial_config_primitive_list that applies to this element
1839 initial_config_primitive_list
= config_descriptor
.get(
1840 "initial-config-primitive"
1844 "Initial config primitive list > {}".format(
1845 initial_config_primitive_list
1849 # add config if not present for NS charm
1850 ee_descriptor_id
= ee_config_descriptor
.get("id")
1851 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1852 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1853 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1857 "Initial config primitive list #2 > {}".format(
1858 initial_config_primitive_list
1861 # n2vc_redesign STEP 3.1
1862 # find old ee_id if exists
1863 ee_id
= vca_deployed
.get("ee_id")
1865 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1866 # create or register execution environment in VCA
1867 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1869 self
._write
_configuration
_status
(
1871 vca_index
=vca_index
,
1873 element_under_configuration
=element_under_configuration
,
1874 element_type
=element_type
,
1877 step
= "create execution environment"
1878 self
.logger
.debug(logging_text
+ step
)
1882 if vca_type
== "k8s_proxy_charm":
1883 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1884 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1885 namespace
=namespace
,
1886 artifact_path
=artifact_path
,
1890 elif vca_type
== "helm" or vca_type
== "helm-v3":
1891 ee_id
, credentials
= await self
.vca_map
[
1893 ].create_execution_environment(
1894 namespace
=namespace
,
1898 artifact_path
=artifact_path
,
1902 ee_id
, credentials
= await self
.vca_map
[
1904 ].create_execution_environment(
1905 namespace
=namespace
,
1911 elif vca_type
== "native_charm":
1912 step
= "Waiting to VM being up and getting IP address"
1913 self
.logger
.debug(logging_text
+ step
)
1914 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1923 credentials
= {"hostname": rw_mgmt_ip
}
1925 username
= deep_get(
1926 config_descriptor
, ("config-access", "ssh-access", "default-user")
1928 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1929 # merged. Meanwhile let's get username from initial-config-primitive
1930 if not username
and initial_config_primitive_list
:
1931 for config_primitive
in initial_config_primitive_list
:
1932 for param
in config_primitive
.get("parameter", ()):
1933 if param
["name"] == "ssh-username":
1934 username
= param
["value"]
1938 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1939 "'config-access.ssh-access.default-user'"
1941 credentials
["username"] = username
1942 # n2vc_redesign STEP 3.2
1944 self
._write
_configuration
_status
(
1946 vca_index
=vca_index
,
1947 status
="REGISTERING",
1948 element_under_configuration
=element_under_configuration
,
1949 element_type
=element_type
,
1952 step
= "register execution environment {}".format(credentials
)
1953 self
.logger
.debug(logging_text
+ step
)
1954 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1955 credentials
=credentials
,
1956 namespace
=namespace
,
1961 # for compatibility with MON/POL modules, the need model and application name at database
1962 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1963 ee_id_parts
= ee_id
.split(".")
1964 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1965 if len(ee_id_parts
) >= 2:
1966 model_name
= ee_id_parts
[0]
1967 application_name
= ee_id_parts
[1]
1968 db_nsr_update
[db_update_entry
+ "model"] = model_name
1969 db_nsr_update
[db_update_entry
+ "application"] = application_name
1971 # n2vc_redesign STEP 3.3
1972 step
= "Install configuration Software"
1974 self
._write
_configuration
_status
(
1976 vca_index
=vca_index
,
1977 status
="INSTALLING SW",
1978 element_under_configuration
=element_under_configuration
,
1979 element_type
=element_type
,
1980 other_update
=db_nsr_update
,
1983 # TODO check if already done
1984 self
.logger
.debug(logging_text
+ step
)
1986 if vca_type
== "native_charm":
1987 config_primitive
= next(
1988 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1991 if config_primitive
:
1992 config
= self
._map
_primitive
_params
(
1993 config_primitive
, {}, deploy_params
1996 if vca_type
== "lxc_proxy_charm":
1997 if element_type
== "NS":
1998 num_units
= db_nsr
.get("config-units") or 1
1999 elif element_type
== "VNF":
2000 num_units
= db_vnfr
.get("config-units") or 1
2001 elif element_type
== "VDU":
2002 for v
in db_vnfr
["vdur"]:
2003 if vdu_id
== v
["vdu-id-ref"]:
2004 num_units
= v
.get("config-units") or 1
2006 if vca_type
!= "k8s_proxy_charm":
2007 await self
.vca_map
[vca_type
].install_configuration_sw(
2009 artifact_path
=artifact_path
,
2012 num_units
=num_units
,
2017 # write in db flag of configuration_sw already installed
2019 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2022 # add relations for this VCA (wait for other peers related with this VCA)
2023 await self
._add
_vca
_relations
(
2024 logging_text
=logging_text
,
2027 vca_index
=vca_index
,
2030 # if SSH access is required, then get execution environment SSH public
2031 # if native charm we have waited already to VM be UP
2032 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2035 # self.logger.debug("get ssh key block")
2037 config_descriptor
, ("config-access", "ssh-access", "required")
2039 # self.logger.debug("ssh key needed")
2040 # Needed to inject a ssh key
2043 ("config-access", "ssh-access", "default-user"),
2045 step
= "Install configuration Software, getting public ssh key"
2046 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2047 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2050 step
= "Insert public key into VM user={} ssh_key={}".format(
2054 # self.logger.debug("no need to get ssh key")
2055 step
= "Waiting to VM being up and getting IP address"
2056 self
.logger
.debug(logging_text
+ step
)
2058 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2061 # n2vc_redesign STEP 5.1
2062 # wait for RO (ip-address) Insert pub_key into VM
2065 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2066 logging_text
, nsr_id
, vnfr_id
, kdu_name
2068 vnfd
= self
.db
.get_one(
2070 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2072 kdu
= get_kdu(vnfd
, kdu_name
)
2074 service
["name"] for service
in get_kdu_services(kdu
)
2076 exposed_services
= []
2077 for service
in services
:
2078 if any(s
in service
["name"] for s
in kdu_services
):
2079 exposed_services
.append(service
)
2080 await self
.vca_map
[vca_type
].exec_primitive(
2082 primitive_name
="config",
2084 "osm-config": json
.dumps(
2086 k8s
={"services": exposed_services
}
2093 # This verification is needed in order to avoid trying to add a public key
2094 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2095 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2096 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2098 elif db_vnfr
.get('vdur'):
2099 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2109 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2111 # store rw_mgmt_ip in deploy params for later replacement
2112 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2114 # n2vc_redesign STEP 6 Execute initial config primitive
2115 step
= "execute initial config primitive"
2117 # wait for dependent primitives execution (NS -> VNF -> VDU)
2118 if initial_config_primitive_list
:
2119 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2121 # stage, in function of element type: vdu, kdu, vnf or ns
2122 my_vca
= vca_deployed_list
[vca_index
]
2123 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2125 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2126 elif my_vca
.get("member-vnf-index"):
2128 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2131 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2133 self
._write
_configuration
_status
(
2134 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2137 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2139 check_if_terminated_needed
= True
2140 for initial_config_primitive
in initial_config_primitive_list
:
2141 # adding information on the vca_deployed if it is a NS execution environment
2142 if not vca_deployed
["member-vnf-index"]:
2143 deploy_params
["ns_config_info"] = json
.dumps(
2144 self
._get
_ns
_config
_info
(nsr_id
)
2146 # TODO check if already done
2147 primitive_params_
= self
._map
_primitive
_params
(
2148 initial_config_primitive
, {}, deploy_params
2151 step
= "execute primitive '{}' params '{}'".format(
2152 initial_config_primitive
["name"], primitive_params_
2154 self
.logger
.debug(logging_text
+ step
)
2155 await self
.vca_map
[vca_type
].exec_primitive(
2157 primitive_name
=initial_config_primitive
["name"],
2158 params_dict
=primitive_params_
,
2163 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2164 if check_if_terminated_needed
:
2165 if config_descriptor
.get("terminate-config-primitive"):
2167 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2169 check_if_terminated_needed
= False
2171 # TODO register in database that primitive is done
2173 # STEP 7 Configure metrics
2174 if vca_type
== "helm" or vca_type
== "helm-v3":
2175 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2177 artifact_path
=artifact_path
,
2178 ee_config_descriptor
=ee_config_descriptor
,
2181 target_ip
=rw_mgmt_ip
,
2187 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2190 for job
in prometheus_jobs
:
2193 {"job_name": job
["job_name"]},
2196 fail_on_empty
=False,
2199 step
= "instantiated at VCA"
2200 self
.logger
.debug(logging_text
+ step
)
2202 self
._write
_configuration
_status
(
2203 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2206 except Exception as e
: # TODO not use Exception but N2VC exception
2207 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2209 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2212 "Exception while {} : {}".format(step
, e
), exc_info
=True
2214 self
._write
_configuration
_status
(
2215 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2217 raise LcmException("{} {}".format(step
, e
)) from e
2219 def _write_ns_status(
2223 current_operation
: str,
2224 current_operation_id
: str,
2225 error_description
: str = None,
2226 error_detail
: str = None,
2227 other_update
: dict = None,
2230 Update db_nsr fields.
2233 :param current_operation:
2234 :param current_operation_id:
2235 :param error_description:
2236 :param error_detail:
2237 :param other_update: Other required changes at database if provided, will be cleared
2241 db_dict
= other_update
or {}
2244 ] = current_operation_id
# for backward compatibility
2245 db_dict
["_admin.current-operation"] = current_operation_id
2246 db_dict
["_admin.operation-type"] = (
2247 current_operation
if current_operation
!= "IDLE" else None
2249 db_dict
["currentOperation"] = current_operation
2250 db_dict
["currentOperationID"] = current_operation_id
2251 db_dict
["errorDescription"] = error_description
2252 db_dict
["errorDetail"] = error_detail
2255 db_dict
["nsState"] = ns_state
2256 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2257 except DbException
as e
:
2258 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2260 def _write_op_status(
2264 error_message
: str = None,
2265 queuePosition
: int = 0,
2266 operation_state
: str = None,
2267 other_update
: dict = None,
2270 db_dict
= other_update
or {}
2271 db_dict
["queuePosition"] = queuePosition
2272 if isinstance(stage
, list):
2273 db_dict
["stage"] = stage
[0]
2274 db_dict
["detailed-status"] = " ".join(stage
)
2275 elif stage
is not None:
2276 db_dict
["stage"] = str(stage
)
2278 if error_message
is not None:
2279 db_dict
["errorMessage"] = error_message
2280 if operation_state
is not None:
2281 db_dict
["operationState"] = operation_state
2282 db_dict
["statusEnteredTime"] = time()
2283 self
.update_db_2("nslcmops", op_id
, db_dict
)
2284 except DbException
as e
:
2286 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2289 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2291 nsr_id
= db_nsr
["_id"]
2292 # configurationStatus
2293 config_status
= db_nsr
.get("configurationStatus")
2296 "configurationStatus.{}.status".format(index
): status
2297 for index
, v
in enumerate(config_status
)
2301 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2303 except DbException
as e
:
2305 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2308 def _write_configuration_status(
2313 element_under_configuration
: str = None,
2314 element_type
: str = None,
2315 other_update
: dict = None,
2318 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2319 # .format(vca_index, status))
2322 db_path
= "configurationStatus.{}.".format(vca_index
)
2323 db_dict
= other_update
or {}
2325 db_dict
[db_path
+ "status"] = status
2326 if element_under_configuration
:
2328 db_path
+ "elementUnderConfiguration"
2329 ] = element_under_configuration
2331 db_dict
[db_path
+ "elementType"] = element_type
2332 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2333 except DbException
as e
:
2335 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2336 status
, nsr_id
, vca_index
, e
2340 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2342 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2343 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2344 Database is used because the result can be obtained from a different LCM worker in case of HA.
2345 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2346 :param db_nslcmop: database content of nslcmop
2347 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2348 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2349 computed 'vim-account-id'
2352 nslcmop_id
= db_nslcmop
["_id"]
2353 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2354 if placement_engine
== "PLA":
2356 logging_text
+ "Invoke and wait for placement optimization"
2358 await self
.msg
.aiowrite(
2359 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2361 db_poll_interval
= 5
2362 wait
= db_poll_interval
* 10
2364 while not pla_result
and wait
>= 0:
2365 await asyncio
.sleep(db_poll_interval
)
2366 wait
-= db_poll_interval
2367 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2368 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2372 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2375 for pla_vnf
in pla_result
["vnf"]:
2376 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2377 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2382 {"_id": vnfr
["_id"]},
2383 {"vim-account-id": pla_vnf
["vimAccountId"]},
2386 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2389 def update_nsrs_with_pla_result(self
, params
):
2391 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2393 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2395 except Exception as e
:
2396 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2398 async def instantiate(self
, nsr_id
, nslcmop_id
):
2401 :param nsr_id: ns instance to deploy
2402 :param nslcmop_id: operation to run
2406 # Try to lock HA task here
2407 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2408 if not task_is_locked_by_me
:
2410 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2414 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2415 self
.logger
.debug(logging_text
+ "Enter")
2417 # get all needed from database
2419 # database nsrs record
2422 # database nslcmops record
2425 # update operation on nsrs
2427 # update operation on nslcmops
2428 db_nslcmop_update
= {}
2430 nslcmop_operation_state
= None
2431 db_vnfrs
= {} # vnf's info indexed by member-index
2433 tasks_dict_info
= {} # from task to info text
2437 "Stage 1/5: preparation of the environment.",
2438 "Waiting for previous operations to terminate.",
2441 # ^ stage, step, VIM progress
2443 # wait for any previous tasks in process
2444 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2446 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2447 stage
[1] = "Reading from database."
2448 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2449 db_nsr_update
["detailed-status"] = "creating"
2450 db_nsr_update
["operational-status"] = "init"
2451 self
._write
_ns
_status
(
2453 ns_state
="BUILDING",
2454 current_operation
="INSTANTIATING",
2455 current_operation_id
=nslcmop_id
,
2456 other_update
=db_nsr_update
,
2458 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2460 # read from db: operation
2461 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2462 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2463 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2464 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2465 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2467 ns_params
= db_nslcmop
.get("operationParams")
2468 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2469 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2471 timeout_ns_deploy
= self
.timeout
.get(
2472 "ns_deploy", self
.timeout_ns_deploy
2476 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2477 self
.logger
.debug(logging_text
+ stage
[1])
2478 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2479 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2480 self
.logger
.debug(logging_text
+ stage
[1])
2481 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2482 self
.fs
.sync(db_nsr
["nsd-id"])
2484 # nsr_name = db_nsr["name"] # TODO short-name??
2486 # read from db: vnf's of this ns
2487 stage
[1] = "Getting vnfrs from db."
2488 self
.logger
.debug(logging_text
+ stage
[1])
2489 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2491 # read from db: vnfd's for every vnf
2492 db_vnfds
= [] # every vnfd data
2494 # for each vnf in ns, read vnfd
2495 for vnfr
in db_vnfrs_list
:
2496 if vnfr
.get("kdur"):
2498 for kdur
in vnfr
["kdur"]:
2499 if kdur
.get("additionalParams"):
2500 kdur
["additionalParams"] = json
.loads(
2501 kdur
["additionalParams"]
2503 kdur_list
.append(kdur
)
2504 vnfr
["kdur"] = kdur_list
2506 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2507 vnfd_id
= vnfr
["vnfd-id"]
2508 vnfd_ref
= vnfr
["vnfd-ref"]
2509 self
.fs
.sync(vnfd_id
)
2511 # if we haven't this vnfd, read it from db
2512 if vnfd_id
not in db_vnfds
:
2514 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2517 self
.logger
.debug(logging_text
+ stage
[1])
2518 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2521 db_vnfds
.append(vnfd
)
2523 # Get or generates the _admin.deployed.VCA list
2524 vca_deployed_list
= None
2525 if db_nsr
["_admin"].get("deployed"):
2526 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2527 if vca_deployed_list
is None:
2528 vca_deployed_list
= []
2529 configuration_status_list
= []
2530 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2531 db_nsr_update
["configurationStatus"] = configuration_status_list
2532 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2533 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2534 elif isinstance(vca_deployed_list
, dict):
2535 # maintain backward compatibility. Change a dict to list at database
2536 vca_deployed_list
= list(vca_deployed_list
.values())
2537 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2538 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2541 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2543 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2544 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2546 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2547 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2548 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2550 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2553 # n2vc_redesign STEP 2 Deploy Network Scenario
2554 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2555 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2557 stage
[1] = "Deploying KDUs."
2558 # self.logger.debug(logging_text + "Before deploy_kdus")
2559 # Call to deploy_kdus in case exists the "vdu:kdu" param
2560 await self
.deploy_kdus(
2561 logging_text
=logging_text
,
2563 nslcmop_id
=nslcmop_id
,
2566 task_instantiation_info
=tasks_dict_info
,
2569 stage
[1] = "Getting VCA public key."
2570 # n2vc_redesign STEP 1 Get VCA public ssh-key
2571 # feature 1429. Add n2vc public key to needed VMs
2572 n2vc_key
= self
.n2vc
.get_public_key()
2573 n2vc_key_list
= [n2vc_key
]
2574 if self
.vca_config
.get("public_key"):
2575 n2vc_key_list
.append(self
.vca_config
["public_key"])
2577 stage
[1] = "Deploying NS at VIM."
2578 task_ro
= asyncio
.ensure_future(
2579 self
.instantiate_RO(
2580 logging_text
=logging_text
,
2584 db_nslcmop
=db_nslcmop
,
2587 n2vc_key_list
=n2vc_key_list
,
2591 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2592 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2594 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2595 stage
[1] = "Deploying Execution Environments."
2596 self
.logger
.debug(logging_text
+ stage
[1])
2598 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2599 for vnf_profile
in get_vnf_profiles(nsd
):
2600 vnfd_id
= vnf_profile
["vnfd-id"]
2601 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2602 member_vnf_index
= str(vnf_profile
["id"])
2603 db_vnfr
= db_vnfrs
[member_vnf_index
]
2604 base_folder
= vnfd
["_admin"]["storage"]
2610 # Get additional parameters
2611 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2612 if db_vnfr
.get("additionalParamsForVnf"):
2613 deploy_params
.update(
2614 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2617 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2618 if descriptor_config
:
2620 logging_text
=logging_text
2621 + "member_vnf_index={} ".format(member_vnf_index
),
2624 nslcmop_id
=nslcmop_id
,
2630 member_vnf_index
=member_vnf_index
,
2631 vdu_index
=vdu_index
,
2633 deploy_params
=deploy_params
,
2634 descriptor_config
=descriptor_config
,
2635 base_folder
=base_folder
,
2636 task_instantiation_info
=tasks_dict_info
,
2640 # Deploy charms for each VDU that supports one.
2641 for vdud
in get_vdu_list(vnfd
):
2643 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2644 vdur
= find_in_list(
2645 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2648 if vdur
.get("additionalParams"):
2649 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2651 deploy_params_vdu
= deploy_params
2652 deploy_params_vdu
["OSM"] = get_osm_params(
2653 db_vnfr
, vdu_id
, vdu_count_index
=0
2655 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2657 self
.logger
.debug("VDUD > {}".format(vdud
))
2659 "Descriptor config > {}".format(descriptor_config
)
2661 if descriptor_config
:
2664 for vdu_index
in range(vdud_count
):
2665 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2667 logging_text
=logging_text
2668 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2669 member_vnf_index
, vdu_id
, vdu_index
2673 nslcmop_id
=nslcmop_id
,
2679 member_vnf_index
=member_vnf_index
,
2680 vdu_index
=vdu_index
,
2682 deploy_params
=deploy_params_vdu
,
2683 descriptor_config
=descriptor_config
,
2684 base_folder
=base_folder
,
2685 task_instantiation_info
=tasks_dict_info
,
2688 for kdud
in get_kdu_list(vnfd
):
2689 kdu_name
= kdud
["name"]
2690 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2691 if descriptor_config
:
2696 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2698 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2699 if kdur
.get("additionalParams"):
2700 deploy_params_kdu
.update(
2701 parse_yaml_strings(kdur
["additionalParams"].copy())
2705 logging_text
=logging_text
,
2708 nslcmop_id
=nslcmop_id
,
2714 member_vnf_index
=member_vnf_index
,
2715 vdu_index
=vdu_index
,
2717 deploy_params
=deploy_params_kdu
,
2718 descriptor_config
=descriptor_config
,
2719 base_folder
=base_folder
,
2720 task_instantiation_info
=tasks_dict_info
,
2724 # Check if this NS has a charm configuration
2725 descriptor_config
= nsd
.get("ns-configuration")
2726 if descriptor_config
and descriptor_config
.get("juju"):
2729 member_vnf_index
= None
2735 # Get additional parameters
2736 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2737 if db_nsr
.get("additionalParamsForNs"):
2738 deploy_params
.update(
2739 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2741 base_folder
= nsd
["_admin"]["storage"]
2743 logging_text
=logging_text
,
2746 nslcmop_id
=nslcmop_id
,
2752 member_vnf_index
=member_vnf_index
,
2753 vdu_index
=vdu_index
,
2755 deploy_params
=deploy_params
,
2756 descriptor_config
=descriptor_config
,
2757 base_folder
=base_folder
,
2758 task_instantiation_info
=tasks_dict_info
,
2762 # rest of staff will be done at finally
2765 ROclient
.ROClientException
,
2771 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2774 except asyncio
.CancelledError
:
2776 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2778 exc
= "Operation was cancelled"
2779 except Exception as e
:
2780 exc
= traceback
.format_exc()
2781 self
.logger
.critical(
2782 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2787 error_list
.append(str(exc
))
2789 # wait for pending tasks
2791 stage
[1] = "Waiting for instantiate pending tasks."
2792 self
.logger
.debug(logging_text
+ stage
[1])
2793 error_list
+= await self
._wait
_for
_tasks
(
2801 stage
[1] = stage
[2] = ""
2802 except asyncio
.CancelledError
:
2803 error_list
.append("Cancelled")
2804 # TODO cancel all tasks
2805 except Exception as exc
:
2806 error_list
.append(str(exc
))
2808 # update operation-status
2809 db_nsr_update
["operational-status"] = "running"
2810 # let's begin with VCA 'configured' status (later we can change it)
2811 db_nsr_update
["config-status"] = "configured"
2812 for task
, task_name
in tasks_dict_info
.items():
2813 if not task
.done() or task
.cancelled() or task
.exception():
2814 if task_name
.startswith(self
.task_name_deploy_vca
):
2815 # A N2VC task is pending
2816 db_nsr_update
["config-status"] = "failed"
2818 # RO or KDU task is pending
2819 db_nsr_update
["operational-status"] = "failed"
2821 # update status at database
2823 error_detail
= ". ".join(error_list
)
2824 self
.logger
.error(logging_text
+ error_detail
)
2825 error_description_nslcmop
= "{} Detail: {}".format(
2826 stage
[0], error_detail
2828 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2829 nslcmop_id
, stage
[0]
2832 db_nsr_update
["detailed-status"] = (
2833 error_description_nsr
+ " Detail: " + error_detail
2835 db_nslcmop_update
["detailed-status"] = error_detail
2836 nslcmop_operation_state
= "FAILED"
2840 error_description_nsr
= error_description_nslcmop
= None
2842 db_nsr_update
["detailed-status"] = "Done"
2843 db_nslcmop_update
["detailed-status"] = "Done"
2844 nslcmop_operation_state
= "COMPLETED"
2847 self
._write
_ns
_status
(
2850 current_operation
="IDLE",
2851 current_operation_id
=None,
2852 error_description
=error_description_nsr
,
2853 error_detail
=error_detail
,
2854 other_update
=db_nsr_update
,
2856 self
._write
_op
_status
(
2859 error_message
=error_description_nslcmop
,
2860 operation_state
=nslcmop_operation_state
,
2861 other_update
=db_nslcmop_update
,
2864 if nslcmop_operation_state
:
2866 await self
.msg
.aiowrite(
2871 "nslcmop_id": nslcmop_id
,
2872 "operationState": nslcmop_operation_state
,
2876 except Exception as e
:
2878 logging_text
+ "kafka_write notification Exception {}".format(e
)
2881 self
.logger
.debug(logging_text
+ "Exit")
2882 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2884 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2885 if vnfd_id
not in cached_vnfds
:
2886 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2887 return cached_vnfds
[vnfd_id
]
2889 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2890 if vnf_profile_id
not in cached_vnfrs
:
2891 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2894 "member-vnf-index-ref": vnf_profile_id
,
2895 "nsr-id-ref": nsr_id
,
2898 return cached_vnfrs
[vnf_profile_id
]
2900 def _is_deployed_vca_in_relation(
2901 self
, vca
: DeployedVCA
, relation
: Relation
2904 for endpoint
in (relation
.provider
, relation
.requirer
):
2905 if endpoint
["kdu-resource-profile-id"]:
2908 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2909 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2910 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2916 def _update_ee_relation_data_with_implicit_data(
2917 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2919 ee_relation_data
= safe_get_ee_relation(
2920 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2922 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2923 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2924 "execution-environment-ref"
2926 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2927 vnfd_id
= vnf_profile
["vnfd-id"]
2928 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2931 if ee_relation_level
== EELevel
.VNF
2932 else ee_relation_data
["vdu-profile-id"]
2934 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2937 f
"not execution environments found for ee_relation {ee_relation_data}"
2939 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2940 return ee_relation_data
2942 def _get_ns_relations(
2945 nsd
: Dict
[str, Any
],
2947 cached_vnfds
: Dict
[str, Any
],
2948 ) -> List
[Relation
]:
2950 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2951 for r
in db_ns_relations
:
2952 provider_dict
= None
2953 requirer_dict
= None
2954 if all(key
in r
for key
in ("provider", "requirer")):
2955 provider_dict
= r
["provider"]
2956 requirer_dict
= r
["requirer"]
2957 elif "entities" in r
:
2958 provider_id
= r
["entities"][0]["id"]
2961 "endpoint": r
["entities"][0]["endpoint"],
2963 if provider_id
!= nsd
["id"]:
2964 provider_dict
["vnf-profile-id"] = provider_id
2965 requirer_id
= r
["entities"][1]["id"]
2968 "endpoint": r
["entities"][1]["endpoint"],
2970 if requirer_id
!= nsd
["id"]:
2971 requirer_dict
["vnf-profile-id"] = requirer_id
2974 "provider/requirer or entities must be included in the relation."
2976 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2977 nsr_id
, nsd
, provider_dict
, cached_vnfds
2979 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2980 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2982 provider
= EERelation(relation_provider
)
2983 requirer
= EERelation(relation_requirer
)
2984 relation
= Relation(r
["name"], provider
, requirer
)
2985 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2987 relations
.append(relation
)
2990 def _get_vnf_relations(
2993 nsd
: Dict
[str, Any
],
2995 cached_vnfds
: Dict
[str, Any
],
2996 ) -> List
[Relation
]:
2998 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2999 vnf_profile_id
= vnf_profile
["id"]
3000 vnfd_id
= vnf_profile
["vnfd-id"]
3001 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3002 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3003 for r
in db_vnf_relations
:
3004 provider_dict
= None
3005 requirer_dict
= None
3006 if all(key
in r
for key
in ("provider", "requirer")):
3007 provider_dict
= r
["provider"]
3008 requirer_dict
= r
["requirer"]
3009 elif "entities" in r
:
3010 provider_id
= r
["entities"][0]["id"]
3013 "vnf-profile-id": vnf_profile_id
,
3014 "endpoint": r
["entities"][0]["endpoint"],
3016 if provider_id
!= vnfd_id
:
3017 provider_dict
["vdu-profile-id"] = provider_id
3018 requirer_id
= r
["entities"][1]["id"]
3021 "vnf-profile-id": vnf_profile_id
,
3022 "endpoint": r
["entities"][1]["endpoint"],
3024 if requirer_id
!= vnfd_id
:
3025 requirer_dict
["vdu-profile-id"] = requirer_id
3028 "provider/requirer or entities must be included in the relation."
3030 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3031 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3033 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3034 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3036 provider
= EERelation(relation_provider
)
3037 requirer
= EERelation(relation_requirer
)
3038 relation
= Relation(r
["name"], provider
, requirer
)
3039 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3041 relations
.append(relation
)
3044 def _get_kdu_resource_data(
3046 ee_relation
: EERelation
,
3047 db_nsr
: Dict
[str, Any
],
3048 cached_vnfds
: Dict
[str, Any
],
3049 ) -> DeployedK8sResource
:
3050 nsd
= get_nsd(db_nsr
)
3051 vnf_profiles
= get_vnf_profiles(nsd
)
3052 vnfd_id
= find_in_list(
3054 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3056 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3057 kdu_resource_profile
= get_kdu_resource_profile(
3058 db_vnfd
, ee_relation
.kdu_resource_profile_id
3060 kdu_name
= kdu_resource_profile
["kdu-name"]
3061 deployed_kdu
, _
= get_deployed_kdu(
3062 db_nsr
.get("_admin", ()).get("deployed", ()),
3064 ee_relation
.vnf_profile_id
,
3066 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3069 def _get_deployed_component(
3071 ee_relation
: EERelation
,
3072 db_nsr
: Dict
[str, Any
],
3073 cached_vnfds
: Dict
[str, Any
],
3074 ) -> DeployedComponent
:
3075 nsr_id
= db_nsr
["_id"]
3076 deployed_component
= None
3077 ee_level
= EELevel
.get_level(ee_relation
)
3078 if ee_level
== EELevel
.NS
:
3079 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3081 deployed_component
= DeployedVCA(nsr_id
, vca
)
3082 elif ee_level
== EELevel
.VNF
:
3083 vca
= get_deployed_vca(
3087 "member-vnf-index": ee_relation
.vnf_profile_id
,
3088 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3092 deployed_component
= DeployedVCA(nsr_id
, vca
)
3093 elif ee_level
== EELevel
.VDU
:
3094 vca
= get_deployed_vca(
3097 "vdu_id": ee_relation
.vdu_profile_id
,
3098 "member-vnf-index": ee_relation
.vnf_profile_id
,
3099 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3103 deployed_component
= DeployedVCA(nsr_id
, vca
)
3104 elif ee_level
== EELevel
.KDU
:
3105 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3106 ee_relation
, db_nsr
, cached_vnfds
3108 if kdu_resource_data
:
3109 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3110 return deployed_component
3112 async def _add_relation(
3116 db_nsr
: Dict
[str, Any
],
3117 cached_vnfds
: Dict
[str, Any
],
3118 cached_vnfrs
: Dict
[str, Any
],
3120 deployed_provider
= self
._get
_deployed
_component
(
3121 relation
.provider
, db_nsr
, cached_vnfds
3123 deployed_requirer
= self
._get
_deployed
_component
(
3124 relation
.requirer
, db_nsr
, cached_vnfds
3128 and deployed_requirer
3129 and deployed_provider
.config_sw_installed
3130 and deployed_requirer
.config_sw_installed
3132 provider_db_vnfr
= (
3134 relation
.provider
.nsr_id
,
3135 relation
.provider
.vnf_profile_id
,
3138 if relation
.provider
.vnf_profile_id
3141 requirer_db_vnfr
= (
3143 relation
.requirer
.nsr_id
,
3144 relation
.requirer
.vnf_profile_id
,
3147 if relation
.requirer
.vnf_profile_id
3150 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3151 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3152 provider_relation_endpoint
= RelationEndpoint(
3153 deployed_provider
.ee_id
,
3155 relation
.provider
.endpoint
,
3157 requirer_relation_endpoint
= RelationEndpoint(
3158 deployed_requirer
.ee_id
,
3160 relation
.requirer
.endpoint
,
3162 await self
.vca_map
[vca_type
].add_relation(
3163 provider
=provider_relation_endpoint
,
3164 requirer
=requirer_relation_endpoint
,
3166 # remove entry from relations list
3170 async def _add_vca_relations(
3176 timeout
: int = 3600,
3180 # 1. find all relations for this VCA
3181 # 2. wait for other peers related
3185 # STEP 1: find all relations for this VCA
3188 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3189 nsd
= get_nsd(db_nsr
)
3192 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3193 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3198 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3199 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3201 # if no relations, terminate
3203 self
.logger
.debug(logging_text
+ " No relations")
3206 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3213 if now
- start
>= timeout
:
3214 self
.logger
.error(logging_text
+ " : timeout adding relations")
3217 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3218 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3220 # for each relation, find the VCA's related
3221 for relation
in relations
.copy():
3222 added
= await self
._add
_relation
(
3230 relations
.remove(relation
)
3233 self
.logger
.debug("Relations added")
3235 await asyncio
.sleep(5.0)
3239 except Exception as e
:
3240 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3243 async def _install_kdu(
3251 k8s_instance_info
: dict,
3252 k8params
: dict = None,
3258 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3261 "collection": "nsrs",
3262 "filter": {"_id": nsr_id
},
3263 "path": nsr_db_path
,
3266 if k8s_instance_info
.get("kdu-deployment-name"):
3267 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3269 kdu_instance
= self
.k8scluster_map
[
3271 ].generate_kdu_instance_name(
3272 db_dict
=db_dict_install
,
3273 kdu_model
=k8s_instance_info
["kdu-model"],
3274 kdu_name
=k8s_instance_info
["kdu-name"],
3277 # Update the nsrs table with the kdu-instance value
3281 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3284 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3285 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3286 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3287 # namespace, this first verification could be removed, and the next step would be done for any kind
3289 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3290 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3291 if k8sclustertype
in ("juju", "juju-bundle"):
3292 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3293 # that the user passed a namespace which he wants its KDU to be deployed in)
3299 "_admin.projects_write": k8s_instance_info
["namespace"],
3300 "_admin.projects_read": k8s_instance_info
["namespace"],
3306 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3311 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3313 k8s_instance_info
["namespace"] = kdu_instance
3315 await self
.k8scluster_map
[k8sclustertype
].install(
3316 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3317 kdu_model
=k8s_instance_info
["kdu-model"],
3320 db_dict
=db_dict_install
,
3322 kdu_name
=k8s_instance_info
["kdu-name"],
3323 namespace
=k8s_instance_info
["namespace"],
3324 kdu_instance
=kdu_instance
,
3328 # Obtain services to obtain management service ip
3329 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3330 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3331 kdu_instance
=kdu_instance
,
3332 namespace
=k8s_instance_info
["namespace"],
3335 # Obtain management service info (if exists)
3336 vnfr_update_dict
= {}
3337 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3339 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3344 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3347 for service
in kdud
.get("service", [])
3348 if service
.get("mgmt-service")
3350 for mgmt_service
in mgmt_services
:
3351 for service
in services
:
3352 if service
["name"].startswith(mgmt_service
["name"]):
3353 # Mgmt service found, Obtain service ip
3354 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3355 if isinstance(ip
, list) and len(ip
) == 1:
3359 "kdur.{}.ip-address".format(kdu_index
)
3362 # Check if must update also mgmt ip at the vnf
3363 service_external_cp
= mgmt_service
.get(
3364 "external-connection-point-ref"
3366 if service_external_cp
:
3368 deep_get(vnfd
, ("mgmt-interface", "cp"))
3369 == service_external_cp
3371 vnfr_update_dict
["ip-address"] = ip
3376 "external-connection-point-ref", ""
3378 == service_external_cp
,
3381 "kdur.{}.ip-address".format(kdu_index
)
3386 "Mgmt service name: {} not found".format(
3387 mgmt_service
["name"]
3391 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3392 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3394 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3397 and kdu_config
.get("initial-config-primitive")
3398 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3400 initial_config_primitive_list
= kdu_config
.get(
3401 "initial-config-primitive"
3403 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3405 for initial_config_primitive
in initial_config_primitive_list
:
3406 primitive_params_
= self
._map
_primitive
_params
(
3407 initial_config_primitive
, {}, {}
3410 await asyncio
.wait_for(
3411 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3412 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3413 kdu_instance
=kdu_instance
,
3414 primitive_name
=initial_config_primitive
["name"],
3415 params
=primitive_params_
,
3416 db_dict
=db_dict_install
,
3422 except Exception as e
:
3423 # Prepare update db with error and raise exception
3426 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3430 vnfr_data
.get("_id"),
3431 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3434 # ignore to keep original exception
3436 # reraise original error
3441 async def deploy_kdus(
3448 task_instantiation_info
,
3450 # Launch kdus if present in the descriptor
3452 k8scluster_id_2_uuic
= {
3453 "helm-chart-v3": {},
3458 async def _get_cluster_id(cluster_id
, cluster_type
):
3459 nonlocal k8scluster_id_2_uuic
3460 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3461 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3463 # check if K8scluster is creating and wait look if previous tasks in process
3464 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3465 "k8scluster", cluster_id
3468 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3469 task_name
, cluster_id
3471 self
.logger
.debug(logging_text
+ text
)
3472 await asyncio
.wait(task_dependency
, timeout
=3600)
3474 db_k8scluster
= self
.db
.get_one(
3475 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3477 if not db_k8scluster
:
3478 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3480 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3482 if cluster_type
== "helm-chart-v3":
3484 # backward compatibility for existing clusters that have not been initialized for helm v3
3485 k8s_credentials
= yaml
.safe_dump(
3486 db_k8scluster
.get("credentials")
3488 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3489 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3491 db_k8scluster_update
= {}
3492 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3493 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3494 db_k8scluster_update
[
3495 "_admin.helm-chart-v3.created"
3497 db_k8scluster_update
[
3498 "_admin.helm-chart-v3.operationalState"
3501 "k8sclusters", cluster_id
, db_k8scluster_update
3503 except Exception as e
:
3506 + "error initializing helm-v3 cluster: {}".format(str(e
))
3509 "K8s cluster '{}' has not been initialized for '{}'".format(
3510 cluster_id
, cluster_type
3515 "K8s cluster '{}' has not been initialized for '{}'".format(
3516 cluster_id
, cluster_type
3519 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3522 logging_text
+= "Deploy kdus: "
3525 db_nsr_update
= {"_admin.deployed.K8s": []}
3526 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3529 updated_cluster_list
= []
3530 updated_v3_cluster_list
= []
3532 for vnfr_data
in db_vnfrs
.values():
3533 vca_id
= self
.get_vca_id(vnfr_data
, {})
3534 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3535 # Step 0: Prepare and set parameters
3536 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3537 vnfd_id
= vnfr_data
.get("vnfd-id")
3538 vnfd_with_id
= find_in_list(
3539 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3543 for kdud
in vnfd_with_id
["kdu"]
3544 if kdud
["name"] == kdur
["kdu-name"]
3546 namespace
= kdur
.get("k8s-namespace")
3547 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3548 if kdur
.get("helm-chart"):
3549 kdumodel
= kdur
["helm-chart"]
3550 # Default version: helm3, if helm-version is v2 assign v2
3551 k8sclustertype
= "helm-chart-v3"
3552 self
.logger
.debug("kdur: {}".format(kdur
))
3554 kdur
.get("helm-version")
3555 and kdur
.get("helm-version") == "v2"
3557 k8sclustertype
= "helm-chart"
3558 elif kdur
.get("juju-bundle"):
3559 kdumodel
= kdur
["juju-bundle"]
3560 k8sclustertype
= "juju-bundle"
3563 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3564 "juju-bundle. Maybe an old NBI version is running".format(
3565 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3568 # check if kdumodel is a file and exists
3570 vnfd_with_id
= find_in_list(
3571 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3573 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3574 if storage
: # may be not present if vnfd has not artifacts
3575 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3576 if storage
["pkg-dir"]:
3577 filename
= "{}/{}/{}s/{}".format(
3584 filename
= "{}/Scripts/{}s/{}".format(
3589 if self
.fs
.file_exists(
3590 filename
, mode
="file"
3591 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3592 kdumodel
= self
.fs
.path
+ filename
3593 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3595 except Exception: # it is not a file
3598 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3599 step
= "Synchronize repos for k8s cluster '{}'".format(
3602 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3606 k8sclustertype
== "helm-chart"
3607 and cluster_uuid
not in updated_cluster_list
3609 k8sclustertype
== "helm-chart-v3"
3610 and cluster_uuid
not in updated_v3_cluster_list
3612 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3613 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3614 cluster_uuid
=cluster_uuid
3617 if del_repo_list
or added_repo_dict
:
3618 if k8sclustertype
== "helm-chart":
3620 "_admin.helm_charts_added." + item
: None
3621 for item
in del_repo_list
3624 "_admin.helm_charts_added." + item
: name
3625 for item
, name
in added_repo_dict
.items()
3627 updated_cluster_list
.append(cluster_uuid
)
3628 elif k8sclustertype
== "helm-chart-v3":
3630 "_admin.helm_charts_v3_added." + item
: None
3631 for item
in del_repo_list
3634 "_admin.helm_charts_v3_added." + item
: name
3635 for item
, name
in added_repo_dict
.items()
3637 updated_v3_cluster_list
.append(cluster_uuid
)
3639 logging_text
+ "repos synchronized on k8s cluster "
3640 "'{}' to_delete: {}, to_add: {}".format(
3641 k8s_cluster_id
, del_repo_list
, added_repo_dict
3646 {"_id": k8s_cluster_id
},
3652 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3653 vnfr_data
["member-vnf-index-ref"],
3657 k8s_instance_info
= {
3658 "kdu-instance": None,
3659 "k8scluster-uuid": cluster_uuid
,
3660 "k8scluster-type": k8sclustertype
,
3661 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3662 "kdu-name": kdur
["kdu-name"],
3663 "kdu-model": kdumodel
,
3664 "namespace": namespace
,
3665 "kdu-deployment-name": kdu_deployment_name
,
3667 db_path
= "_admin.deployed.K8s.{}".format(index
)
3668 db_nsr_update
[db_path
] = k8s_instance_info
3669 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3670 vnfd_with_id
= find_in_list(
3671 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3673 task
= asyncio
.ensure_future(
3682 k8params
=desc_params
,
3687 self
.lcm_tasks
.register(
3691 "instantiate_KDU-{}".format(index
),
3694 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3700 except (LcmException
, asyncio
.CancelledError
):
3702 except Exception as e
:
3703 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3704 if isinstance(e
, (N2VCException
, DbException
)):
3705 self
.logger
.error(logging_text
+ msg
)
3707 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3708 raise LcmException(msg
)
3711 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3730 task_instantiation_info
,
3733 # launch instantiate_N2VC in a asyncio task and register task object
3734 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3735 # if not found, create one entry and update database
3736 # fill db_nsr._admin.deployed.VCA.<index>
3739 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3741 if "execution-environment-list" in descriptor_config
:
3742 ee_list
= descriptor_config
.get("execution-environment-list", [])
3743 elif "juju" in descriptor_config
:
3744 ee_list
= [descriptor_config
] # ns charms
3745 else: # other types as script are not supported
3748 for ee_item
in ee_list
:
3751 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3752 ee_item
.get("juju"), ee_item
.get("helm-chart")
3755 ee_descriptor_id
= ee_item
.get("id")
3756 if ee_item
.get("juju"):
3757 vca_name
= ee_item
["juju"].get("charm")
3760 if ee_item
["juju"].get("charm") is not None
3763 if ee_item
["juju"].get("cloud") == "k8s":
3764 vca_type
= "k8s_proxy_charm"
3765 elif ee_item
["juju"].get("proxy") is False:
3766 vca_type
= "native_charm"
3767 elif ee_item
.get("helm-chart"):
3768 vca_name
= ee_item
["helm-chart"]
3769 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3772 vca_type
= "helm-v3"
3775 logging_text
+ "skipping non juju neither charm configuration"
3780 for vca_index
, vca_deployed
in enumerate(
3781 db_nsr
["_admin"]["deployed"]["VCA"]
3783 if not vca_deployed
:
3786 vca_deployed
.get("member-vnf-index") == member_vnf_index
3787 and vca_deployed
.get("vdu_id") == vdu_id
3788 and vca_deployed
.get("kdu_name") == kdu_name
3789 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3790 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3794 # not found, create one.
3796 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3799 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3801 target
+= "/kdu/{}".format(kdu_name
)
3803 "target_element": target
,
3804 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3805 "member-vnf-index": member_vnf_index
,
3807 "kdu_name": kdu_name
,
3808 "vdu_count_index": vdu_index
,
3809 "operational-status": "init", # TODO revise
3810 "detailed-status": "", # TODO revise
3811 "step": "initial-deploy", # TODO revise
3813 "vdu_name": vdu_name
,
3815 "ee_descriptor_id": ee_descriptor_id
,
3819 # create VCA and configurationStatus in db
3821 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3822 "configurationStatus.{}".format(vca_index
): dict(),
3824 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3826 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3828 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3829 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3830 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3833 task_n2vc
= asyncio
.ensure_future(
3834 self
.instantiate_N2VC(
3835 logging_text
=logging_text
,
3836 vca_index
=vca_index
,
3842 vdu_index
=vdu_index
,
3843 deploy_params
=deploy_params
,
3844 config_descriptor
=descriptor_config
,
3845 base_folder
=base_folder
,
3846 nslcmop_id
=nslcmop_id
,
3850 ee_config_descriptor
=ee_item
,
3853 self
.lcm_tasks
.register(
3857 "instantiate_N2VC-{}".format(vca_index
),
3860 task_instantiation_info
[
3862 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3863 member_vnf_index
or "", vdu_id
or ""
3867 def _create_nslcmop(nsr_id
, operation
, params
):
3869 Creates a ns-lcm-opp content to be stored at database.
3870 :param nsr_id: internal id of the instance
3871 :param operation: instantiate, terminate, scale, action, ...
3872 :param params: user parameters for the operation
3873 :return: dictionary following SOL005 format
3875 # Raise exception if invalid arguments
3876 if not (nsr_id
and operation
and params
):
3878 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3885 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3886 "operationState": "PROCESSING",
3887 "statusEnteredTime": now
,
3888 "nsInstanceId": nsr_id
,
3889 "lcmOperationType": operation
,
3891 "isAutomaticInvocation": False,
3892 "operationParams": params
,
3893 "isCancelPending": False,
3895 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3896 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3901 def _format_additional_params(self
, params
):
3902 params
= params
or {}
3903 for key
, value
in params
.items():
3904 if str(value
).startswith("!!yaml "):
3905 params
[key
] = yaml
.safe_load(value
[7:])
3908 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3909 primitive
= seq
.get("name")
3910 primitive_params
= {}
3912 "member_vnf_index": vnf_index
,
3913 "primitive": primitive
,
3914 "primitive_params": primitive_params
,
3917 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3921 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3922 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3923 if op
.get("operationState") == "COMPLETED":
3924 # b. Skip sub-operation
3925 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3926 return self
.SUBOPERATION_STATUS_SKIP
3928 # c. retry executing sub-operation
3929 # The sub-operation exists, and operationState != 'COMPLETED'
3930 # Update operationState = 'PROCESSING' to indicate a retry.
3931 operationState
= "PROCESSING"
3932 detailed_status
= "In progress"
3933 self
._update
_suboperation
_status
(
3934 db_nslcmop
, op_index
, operationState
, detailed_status
3936 # Return the sub-operation index
3937 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3938 # with arguments extracted from the sub-operation
3941 # Find a sub-operation where all keys in a matching dictionary must match
3942 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3943 def _find_suboperation(self
, db_nslcmop
, match
):
3944 if db_nslcmop
and match
:
3945 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3946 for i
, op
in enumerate(op_list
):
3947 if all(op
.get(k
) == match
[k
] for k
in match
):
3949 return self
.SUBOPERATION_STATUS_NOT_FOUND
3951 # Update status for a sub-operation given its index
3952 def _update_suboperation_status(
3953 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3955 # Update DB for HA tasks
3956 q_filter
= {"_id": db_nslcmop
["_id"]}
3958 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3959 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3962 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3965 # Add sub-operation, return the index of the added sub-operation
3966 # Optionally, set operationState, detailed-status, and operationType
3967 # Status and type are currently set for 'scale' sub-operations:
3968 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3969 # 'detailed-status' : status message
3970 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3971 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3972 def _add_suboperation(
3980 mapped_primitive_params
,
3981 operationState
=None,
3982 detailed_status
=None,
3985 RO_scaling_info
=None,
3988 return self
.SUBOPERATION_STATUS_NOT_FOUND
3989 # Get the "_admin.operations" list, if it exists
3990 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3991 op_list
= db_nslcmop_admin
.get("operations")
3992 # Create or append to the "_admin.operations" list
3994 "member_vnf_index": vnf_index
,
3996 "vdu_count_index": vdu_count_index
,
3997 "primitive": primitive
,
3998 "primitive_params": mapped_primitive_params
,
4001 new_op
["operationState"] = operationState
4003 new_op
["detailed-status"] = detailed_status
4005 new_op
["lcmOperationType"] = operationType
4007 new_op
["RO_nsr_id"] = RO_nsr_id
4009 new_op
["RO_scaling_info"] = RO_scaling_info
4011 # No existing operations, create key 'operations' with current operation as first list element
4012 db_nslcmop_admin
.update({"operations": [new_op
]})
4013 op_list
= db_nslcmop_admin
.get("operations")
4015 # Existing operations, append operation to list
4016 op_list
.append(new_op
)
4018 db_nslcmop_update
= {"_admin.operations": op_list
}
4019 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4020 op_index
= len(op_list
) - 1
4023 # Helper methods for scale() sub-operations
4025 # pre-scale/post-scale:
4026 # Check for 3 different cases:
4027 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4028 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4029 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4030 def _check_or_add_scale_suboperation(
4034 vnf_config_primitive
,
4038 RO_scaling_info
=None,
4040 # Find this sub-operation
4041 if RO_nsr_id
and RO_scaling_info
:
4042 operationType
= "SCALE-RO"
4044 "member_vnf_index": vnf_index
,
4045 "RO_nsr_id": RO_nsr_id
,
4046 "RO_scaling_info": RO_scaling_info
,
4050 "member_vnf_index": vnf_index
,
4051 "primitive": vnf_config_primitive
,
4052 "primitive_params": primitive_params
,
4053 "lcmOperationType": operationType
,
4055 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4056 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4057 # a. New sub-operation
4058 # The sub-operation does not exist, add it.
4059 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4060 # The following parameters are set to None for all kind of scaling:
4062 vdu_count_index
= None
4064 if RO_nsr_id
and RO_scaling_info
:
4065 vnf_config_primitive
= None
4066 primitive_params
= None
4069 RO_scaling_info
= None
4070 # Initial status for sub-operation
4071 operationState
= "PROCESSING"
4072 detailed_status
= "In progress"
4073 # Add sub-operation for pre/post-scaling (zero or more operations)
4074 self
._add
_suboperation
(
4080 vnf_config_primitive
,
4088 return self
.SUBOPERATION_STATUS_NEW
4090 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4091 # or op_index (operationState != 'COMPLETED')
4092 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4094 # Function to return execution_environment id
4096 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4097 # TODO vdu_index_count
4098 for vca
in vca_deployed_list
:
4099 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4102 async def destroy_N2VC(
4110 exec_primitives
=True,
4115 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4116 :param logging_text:
4118 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4119 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4120 :param vca_index: index in the database _admin.deployed.VCA
4121 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4122 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4123 not executed properly
4124 :param scaling_in: True destroys the application, False destroys the model
4125 :return: None or exception
4130 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4131 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4135 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4137 # execute terminate_primitives
4139 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4140 config_descriptor
.get("terminate-config-primitive"),
4141 vca_deployed
.get("ee_descriptor_id"),
4143 vdu_id
= vca_deployed
.get("vdu_id")
4144 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4145 vdu_name
= vca_deployed
.get("vdu_name")
4146 vnf_index
= vca_deployed
.get("member-vnf-index")
4147 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4148 for seq
in terminate_primitives
:
4149 # For each sequence in list, get primitive and call _ns_execute_primitive()
4150 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4151 vnf_index
, seq
.get("name")
4153 self
.logger
.debug(logging_text
+ step
)
4154 # Create the primitive for each sequence, i.e. "primitive": "touch"
4155 primitive
= seq
.get("name")
4156 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4161 self
._add
_suboperation
(
4168 mapped_primitive_params
,
4170 # Sub-operations: Call _ns_execute_primitive() instead of action()
4172 result
, result_detail
= await self
._ns
_execute
_primitive
(
4173 vca_deployed
["ee_id"],
4175 mapped_primitive_params
,
4179 except LcmException
:
4180 # this happens when VCA is not deployed. In this case it is not needed to terminate
4182 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4183 if result
not in result_ok
:
4185 "terminate_primitive {} for vnf_member_index={} fails with "
4186 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4188 # set that this VCA do not need terminated
4189 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4193 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4196 # Delete Prometheus Jobs if any
4197 # This uses NSR_ID, so it will destroy any jobs under this index
4198 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4201 await self
.vca_map
[vca_type
].delete_execution_environment(
4202 vca_deployed
["ee_id"],
4203 scaling_in
=scaling_in
,
4208 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4209 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4210 namespace
= "." + db_nsr
["_id"]
4212 await self
.n2vc
.delete_namespace(
4213 namespace
=namespace
,
4214 total_timeout
=self
.timeout_charm_delete
,
4217 except N2VCNotFound
: # already deleted. Skip
4219 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4221 async def _terminate_RO(
4222 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4225 Terminates a deployment from RO
4226 :param logging_text:
4227 :param nsr_deployed: db_nsr._admin.deployed
4230 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4231 this method will update only the index 2, but it will write on database the concatenated content of the list
4236 ro_nsr_id
= ro_delete_action
= None
4237 if nsr_deployed
and nsr_deployed
.get("RO"):
4238 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4239 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4242 stage
[2] = "Deleting ns from VIM."
4243 db_nsr_update
["detailed-status"] = " ".join(stage
)
4244 self
._write
_op
_status
(nslcmop_id
, stage
)
4245 self
.logger
.debug(logging_text
+ stage
[2])
4246 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4247 self
._write
_op
_status
(nslcmop_id
, stage
)
4248 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4249 ro_delete_action
= desc
["action_id"]
4251 "_admin.deployed.RO.nsr_delete_action_id"
4252 ] = ro_delete_action
4253 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4254 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4255 if ro_delete_action
:
4256 # wait until NS is deleted from VIM
4257 stage
[2] = "Waiting ns deleted from VIM."
4258 detailed_status_old
= None
4262 + " RO_id={} ro_delete_action={}".format(
4263 ro_nsr_id
, ro_delete_action
4266 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4267 self
._write
_op
_status
(nslcmop_id
, stage
)
4269 delete_timeout
= 20 * 60 # 20 minutes
4270 while delete_timeout
> 0:
4271 desc
= await self
.RO
.show(
4273 item_id_name
=ro_nsr_id
,
4274 extra_item
="action",
4275 extra_item_id
=ro_delete_action
,
4279 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4281 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4282 if ns_status
== "ERROR":
4283 raise ROclient
.ROClientException(ns_status_info
)
4284 elif ns_status
== "BUILD":
4285 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4286 elif ns_status
== "ACTIVE":
4287 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4288 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4293 ), "ROclient.check_action_status returns unknown {}".format(
4296 if stage
[2] != detailed_status_old
:
4297 detailed_status_old
= stage
[2]
4298 db_nsr_update
["detailed-status"] = " ".join(stage
)
4299 self
._write
_op
_status
(nslcmop_id
, stage
)
4300 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4301 await asyncio
.sleep(5, loop
=self
.loop
)
4303 else: # delete_timeout <= 0:
4304 raise ROclient
.ROClientException(
4305 "Timeout waiting ns deleted from VIM"
4308 except Exception as e
:
4309 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4311 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4313 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4314 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4315 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4317 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4320 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4322 failed_detail
.append("delete conflict: {}".format(e
))
4325 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4328 failed_detail
.append("delete error: {}".format(e
))
4330 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4334 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4335 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4337 stage
[2] = "Deleting nsd from RO."
4338 db_nsr_update
["detailed-status"] = " ".join(stage
)
4339 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4340 self
._write
_op
_status
(nslcmop_id
, stage
)
4341 await self
.RO
.delete("nsd", ro_nsd_id
)
4343 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4345 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4346 except Exception as e
:
4348 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4350 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4352 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4355 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4357 failed_detail
.append(
4358 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4360 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4362 failed_detail
.append(
4363 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4365 self
.logger
.error(logging_text
+ failed_detail
[-1])
4367 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4368 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4369 if not vnf_deployed
or not vnf_deployed
["id"]:
4372 ro_vnfd_id
= vnf_deployed
["id"]
4375 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4376 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4378 db_nsr_update
["detailed-status"] = " ".join(stage
)
4379 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4380 self
._write
_op
_status
(nslcmop_id
, stage
)
4381 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4383 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4385 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4386 except Exception as e
:
4388 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4391 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4395 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4398 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4400 failed_detail
.append(
4401 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4403 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4405 failed_detail
.append(
4406 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4408 self
.logger
.error(logging_text
+ failed_detail
[-1])
4411 stage
[2] = "Error deleting from VIM"
4413 stage
[2] = "Deleted from VIM"
4414 db_nsr_update
["detailed-status"] = " ".join(stage
)
4415 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4416 self
._write
_op
_status
(nslcmop_id
, stage
)
4419 raise LcmException("; ".join(failed_detail
))
4421 async def terminate(self
, nsr_id
, nslcmop_id
):
4422 # Try to lock HA task here
4423 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4424 if not task_is_locked_by_me
:
4427 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4428 self
.logger
.debug(logging_text
+ "Enter")
4429 timeout_ns_terminate
= self
.timeout_ns_terminate
4432 operation_params
= None
4434 error_list
= [] # annotates all failed error messages
4435 db_nslcmop_update
= {}
4436 autoremove
= False # autoremove after terminated
4437 tasks_dict_info
= {}
4440 "Stage 1/3: Preparing task.",
4441 "Waiting for previous operations to terminate.",
4444 # ^ contains [stage, step, VIM-status]
4446 # wait for any previous tasks in process
4447 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4449 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4450 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4451 operation_params
= db_nslcmop
.get("operationParams") or {}
4452 if operation_params
.get("timeout_ns_terminate"):
4453 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4454 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4455 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4457 db_nsr_update
["operational-status"] = "terminating"
4458 db_nsr_update
["config-status"] = "terminating"
4459 self
._write
_ns
_status
(
4461 ns_state
="TERMINATING",
4462 current_operation
="TERMINATING",
4463 current_operation_id
=nslcmop_id
,
4464 other_update
=db_nsr_update
,
4466 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4467 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4468 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4471 stage
[1] = "Getting vnf descriptors from db."
4472 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4474 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4476 db_vnfds_from_id
= {}
4477 db_vnfds_from_member_index
= {}
4479 for vnfr
in db_vnfrs_list
:
4480 vnfd_id
= vnfr
["vnfd-id"]
4481 if vnfd_id
not in db_vnfds_from_id
:
4482 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4483 db_vnfds_from_id
[vnfd_id
] = vnfd
4484 db_vnfds_from_member_index
[
4485 vnfr
["member-vnf-index-ref"]
4486 ] = db_vnfds_from_id
[vnfd_id
]
4488 # Destroy individual execution environments when there are terminating primitives.
4489 # Rest of EE will be deleted at once
4490 # TODO - check before calling _destroy_N2VC
4491 # if not operation_params.get("skip_terminate_primitives"):#
4492 # or not vca.get("needed_terminate"):
4493 stage
[0] = "Stage 2/3 execute terminating primitives."
4494 self
.logger
.debug(logging_text
+ stage
[0])
4495 stage
[1] = "Looking execution environment that needs terminate."
4496 self
.logger
.debug(logging_text
+ stage
[1])
4498 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4499 config_descriptor
= None
4500 vca_member_vnf_index
= vca
.get("member-vnf-index")
4501 vca_id
= self
.get_vca_id(
4502 db_vnfrs_dict
.get(vca_member_vnf_index
)
4503 if vca_member_vnf_index
4507 if not vca
or not vca
.get("ee_id"):
4509 if not vca
.get("member-vnf-index"):
4511 config_descriptor
= db_nsr
.get("ns-configuration")
4512 elif vca
.get("vdu_id"):
4513 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4514 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4515 elif vca
.get("kdu_name"):
4516 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4517 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4519 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4520 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4521 vca_type
= vca
.get("type")
4522 exec_terminate_primitives
= not operation_params
.get(
4523 "skip_terminate_primitives"
4524 ) and vca
.get("needed_terminate")
4525 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4526 # pending native charms
4528 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4530 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4531 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4532 task
= asyncio
.ensure_future(
4540 exec_terminate_primitives
,
4544 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4546 # wait for pending tasks of terminate primitives
4550 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4552 error_list
= await self
._wait
_for
_tasks
(
4555 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4559 tasks_dict_info
.clear()
4561 return # raise LcmException("; ".join(error_list))
4563 # remove All execution environments at once
4564 stage
[0] = "Stage 3/3 delete all."
4566 if nsr_deployed
.get("VCA"):
4567 stage
[1] = "Deleting all execution environments."
4568 self
.logger
.debug(logging_text
+ stage
[1])
4569 vca_id
= self
.get_vca_id({}, db_nsr
)
4570 task_delete_ee
= asyncio
.ensure_future(
4572 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4573 timeout
=self
.timeout_charm_delete
,
4576 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4577 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4579 # Delete from k8scluster
4580 stage
[1] = "Deleting KDUs."
4581 self
.logger
.debug(logging_text
+ stage
[1])
4582 # print(nsr_deployed)
4583 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4584 if not kdu
or not kdu
.get("kdu-instance"):
4586 kdu_instance
= kdu
.get("kdu-instance")
4587 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4588 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4589 vca_id
= self
.get_vca_id({}, db_nsr
)
4590 task_delete_kdu_instance
= asyncio
.ensure_future(
4591 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4592 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4593 kdu_instance
=kdu_instance
,
4595 namespace
=kdu
.get("namespace"),
4601 + "Unknown k8s deployment type {}".format(
4602 kdu
.get("k8scluster-type")
4607 task_delete_kdu_instance
4608 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4611 stage
[1] = "Deleting ns from VIM."
4613 task_delete_ro
= asyncio
.ensure_future(
4614 self
._terminate
_ng
_ro
(
4615 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4619 task_delete_ro
= asyncio
.ensure_future(
4621 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4624 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4626 # rest of staff will be done at finally
4629 ROclient
.ROClientException
,
4634 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4636 except asyncio
.CancelledError
:
4638 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4640 exc
= "Operation was cancelled"
4641 except Exception as e
:
4642 exc
= traceback
.format_exc()
4643 self
.logger
.critical(
4644 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4649 error_list
.append(str(exc
))
4651 # wait for pending tasks
4653 stage
[1] = "Waiting for terminate pending tasks."
4654 self
.logger
.debug(logging_text
+ stage
[1])
4655 error_list
+= await self
._wait
_for
_tasks
(
4658 timeout_ns_terminate
,
4662 stage
[1] = stage
[2] = ""
4663 except asyncio
.CancelledError
:
4664 error_list
.append("Cancelled")
4665 # TODO cancell all tasks
4666 except Exception as exc
:
4667 error_list
.append(str(exc
))
4668 # update status at database
4670 error_detail
= "; ".join(error_list
)
4671 # self.logger.error(logging_text + error_detail)
4672 error_description_nslcmop
= "{} Detail: {}".format(
4673 stage
[0], error_detail
4675 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4676 nslcmop_id
, stage
[0]
4679 db_nsr_update
["operational-status"] = "failed"
4680 db_nsr_update
["detailed-status"] = (
4681 error_description_nsr
+ " Detail: " + error_detail
4683 db_nslcmop_update
["detailed-status"] = error_detail
4684 nslcmop_operation_state
= "FAILED"
4688 error_description_nsr
= error_description_nslcmop
= None
4689 ns_state
= "NOT_INSTANTIATED"
4690 db_nsr_update
["operational-status"] = "terminated"
4691 db_nsr_update
["detailed-status"] = "Done"
4692 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4693 db_nslcmop_update
["detailed-status"] = "Done"
4694 nslcmop_operation_state
= "COMPLETED"
4697 self
._write
_ns
_status
(
4700 current_operation
="IDLE",
4701 current_operation_id
=None,
4702 error_description
=error_description_nsr
,
4703 error_detail
=error_detail
,
4704 other_update
=db_nsr_update
,
4706 self
._write
_op
_status
(
4709 error_message
=error_description_nslcmop
,
4710 operation_state
=nslcmop_operation_state
,
4711 other_update
=db_nslcmop_update
,
4713 if ns_state
== "NOT_INSTANTIATED":
4717 {"nsr-id-ref": nsr_id
},
4718 {"_admin.nsState": "NOT_INSTANTIATED"},
4720 except DbException
as e
:
4723 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4727 if operation_params
:
4728 autoremove
= operation_params
.get("autoremove", False)
4729 if nslcmop_operation_state
:
4731 await self
.msg
.aiowrite(
4736 "nslcmop_id": nslcmop_id
,
4737 "operationState": nslcmop_operation_state
,
4738 "autoremove": autoremove
,
4742 except Exception as e
:
4744 logging_text
+ "kafka_write notification Exception {}".format(e
)
4747 self
.logger
.debug(logging_text
+ "Exit")
4748 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4750 async def _wait_for_tasks(
4751 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4754 error_detail_list
= []
4756 pending_tasks
= list(created_tasks_info
.keys())
4757 num_tasks
= len(pending_tasks
)
4759 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4760 self
._write
_op
_status
(nslcmop_id
, stage
)
4761 while pending_tasks
:
4763 _timeout
= timeout
+ time_start
- time()
4764 done
, pending_tasks
= await asyncio
.wait(
4765 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4767 num_done
+= len(done
)
4768 if not done
: # Timeout
4769 for task
in pending_tasks
:
4770 new_error
= created_tasks_info
[task
] + ": Timeout"
4771 error_detail_list
.append(new_error
)
4772 error_list
.append(new_error
)
4775 if task
.cancelled():
4778 exc
= task
.exception()
4780 if isinstance(exc
, asyncio
.TimeoutError
):
4782 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4783 error_list
.append(created_tasks_info
[task
])
4784 error_detail_list
.append(new_error
)
4791 ROclient
.ROClientException
,
4797 self
.logger
.error(logging_text
+ new_error
)
4799 exc_traceback
= "".join(
4800 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4804 + created_tasks_info
[task
]
4810 logging_text
+ created_tasks_info
[task
] + ": Done"
4812 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4814 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4815 if nsr_id
: # update also nsr
4820 "errorDescription": "Error at: " + ", ".join(error_list
),
4821 "errorDetail": ". ".join(error_detail_list
),
4824 self
._write
_op
_status
(nslcmop_id
, stage
)
4825 return error_detail_list
4828 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4830 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4831 The default-value is used. If it is between < > it look for a value at instantiation_params
4832 :param primitive_desc: portion of VNFD/NSD that describes primitive
4833 :param params: Params provided by user
4834 :param instantiation_params: Instantiation params provided by user
4835 :return: a dictionary with the calculated params
4837 calculated_params
= {}
4838 for parameter
in primitive_desc
.get("parameter", ()):
4839 param_name
= parameter
["name"]
4840 if param_name
in params
:
4841 calculated_params
[param_name
] = params
[param_name
]
4842 elif "default-value" in parameter
or "value" in parameter
:
4843 if "value" in parameter
:
4844 calculated_params
[param_name
] = parameter
["value"]
4846 calculated_params
[param_name
] = parameter
["default-value"]
4848 isinstance(calculated_params
[param_name
], str)
4849 and calculated_params
[param_name
].startswith("<")
4850 and calculated_params
[param_name
].endswith(">")
4852 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4853 calculated_params
[param_name
] = instantiation_params
[
4854 calculated_params
[param_name
][1:-1]
4858 "Parameter {} needed to execute primitive {} not provided".format(
4859 calculated_params
[param_name
], primitive_desc
["name"]
4864 "Parameter {} needed to execute primitive {} not provided".format(
4865 param_name
, primitive_desc
["name"]
4869 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4870 calculated_params
[param_name
] = yaml
.safe_dump(
4871 calculated_params
[param_name
], default_flow_style
=True, width
=256
4873 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4875 ].startswith("!!yaml "):
4876 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4877 if parameter
.get("data-type") == "INTEGER":
4879 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4880 except ValueError: # error converting string to int
4882 "Parameter {} of primitive {} must be integer".format(
4883 param_name
, primitive_desc
["name"]
4886 elif parameter
.get("data-type") == "BOOLEAN":
4887 calculated_params
[param_name
] = not (
4888 (str(calculated_params
[param_name
])).lower() == "false"
4891 # add always ns_config_info if primitive name is config
4892 if primitive_desc
["name"] == "config":
4893 if "ns_config_info" in instantiation_params
:
4894 calculated_params
["ns_config_info"] = instantiation_params
[
4897 return calculated_params
4899 def _look_for_deployed_vca(
4906 ee_descriptor_id
=None,
4908 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4909 for vca
in deployed_vca
:
4912 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4915 vdu_count_index
is not None
4916 and vdu_count_index
!= vca
["vdu_count_index"]
4919 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4921 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4925 # vca_deployed not found
4927 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4928 " is not deployed".format(
4937 ee_id
= vca
.get("ee_id")
4939 "type", "lxc_proxy_charm"
4940 ) # default value for backward compatibility - proxy charm
4943 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4944 "execution environment".format(
4945 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4948 return ee_id
, vca_type
4950 async def _ns_execute_primitive(
4956 retries_interval
=30,
4963 if primitive
== "config":
4964 primitive_params
= {"params": primitive_params
}
4966 vca_type
= vca_type
or "lxc_proxy_charm"
4970 output
= await asyncio
.wait_for(
4971 self
.vca_map
[vca_type
].exec_primitive(
4973 primitive_name
=primitive
,
4974 params_dict
=primitive_params
,
4975 progress_timeout
=self
.timeout_progress_primitive
,
4976 total_timeout
=self
.timeout_primitive
,
4981 timeout
=timeout
or self
.timeout_primitive
,
4985 except asyncio
.CancelledError
:
4987 except Exception as e
: # asyncio.TimeoutError
4988 if isinstance(e
, asyncio
.TimeoutError
):
4993 "Error executing action {} on {} -> {}".format(
4998 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5000 return "FAILED", str(e
)
5002 return "COMPLETED", output
5004 except (LcmException
, asyncio
.CancelledError
):
5006 except Exception as e
:
5007 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5009 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5011 Updating the vca_status with latest juju information in nsrs record
5012 :param: nsr_id: Id of the nsr
5013 :param: nslcmop_id: Id of the nslcmop
5017 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5018 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5019 vca_id
= self
.get_vca_id({}, db_nsr
)
5020 if db_nsr
["_admin"]["deployed"]["K8s"]:
5021 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5022 cluster_uuid
, kdu_instance
, cluster_type
= (
5023 k8s
["k8scluster-uuid"],
5024 k8s
["kdu-instance"],
5025 k8s
["k8scluster-type"],
5027 await self
._on
_update
_k
8s
_db
(
5028 cluster_uuid
=cluster_uuid
,
5029 kdu_instance
=kdu_instance
,
5030 filter={"_id": nsr_id
},
5032 cluster_type
=cluster_type
,
5035 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5036 table
, filter = "nsrs", {"_id": nsr_id
}
5037 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5038 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5040 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5041 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5043 async def action(self
, nsr_id
, nslcmop_id
):
5044 # Try to lock HA task here
5045 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5046 if not task_is_locked_by_me
:
5049 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5050 self
.logger
.debug(logging_text
+ "Enter")
5051 # get all needed from database
5055 db_nslcmop_update
= {}
5056 nslcmop_operation_state
= None
5057 error_description_nslcmop
= None
5060 # wait for any previous tasks in process
5061 step
= "Waiting for previous operations to terminate"
5062 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5064 self
._write
_ns
_status
(
5067 current_operation
="RUNNING ACTION",
5068 current_operation_id
=nslcmop_id
,
5071 step
= "Getting information from database"
5072 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5073 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5074 if db_nslcmop
["operationParams"].get("primitive_params"):
5075 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5076 db_nslcmop
["operationParams"]["primitive_params"]
5079 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5080 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5081 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5082 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5083 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5084 primitive
= db_nslcmop
["operationParams"]["primitive"]
5085 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5086 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5087 "timeout_ns_action", self
.timeout_primitive
5091 step
= "Getting vnfr from database"
5092 db_vnfr
= self
.db
.get_one(
5093 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5095 if db_vnfr
.get("kdur"):
5097 for kdur
in db_vnfr
["kdur"]:
5098 if kdur
.get("additionalParams"):
5099 kdur
["additionalParams"] = json
.loads(
5100 kdur
["additionalParams"]
5102 kdur_list
.append(kdur
)
5103 db_vnfr
["kdur"] = kdur_list
5104 step
= "Getting vnfd from database"
5105 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5107 # Sync filesystem before running a primitive
5108 self
.fs
.sync(db_vnfr
["vnfd-id"])
5110 step
= "Getting nsd from database"
5111 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5113 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5114 # for backward compatibility
5115 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5116 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5117 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5118 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5120 # look for primitive
5121 config_primitive_desc
= descriptor_configuration
= None
5123 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5125 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5127 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5129 descriptor_configuration
= db_nsd
.get("ns-configuration")
5131 if descriptor_configuration
and descriptor_configuration
.get(
5134 for config_primitive
in descriptor_configuration
["config-primitive"]:
5135 if config_primitive
["name"] == primitive
:
5136 config_primitive_desc
= config_primitive
5139 if not config_primitive_desc
:
5140 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5142 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5146 primitive_name
= primitive
5147 ee_descriptor_id
= None
5149 primitive_name
= config_primitive_desc
.get(
5150 "execution-environment-primitive", primitive
5152 ee_descriptor_id
= config_primitive_desc
.get(
5153 "execution-environment-ref"
5159 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5161 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5164 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5166 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5168 desc_params
= parse_yaml_strings(
5169 db_vnfr
.get("additionalParamsForVnf")
5172 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5173 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5174 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5176 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5177 actions
.add(primitive
["name"])
5178 for primitive
in kdu_configuration
.get("config-primitive", []):
5179 actions
.add(primitive
["name"])
5181 nsr_deployed
["K8s"],
5182 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5183 and kdu
["member-vnf-index"] == vnf_index
,
5187 if primitive_name
in actions
5188 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5192 # TODO check if ns is in a proper status
5194 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5196 # kdur and desc_params already set from before
5197 if primitive_params
:
5198 desc_params
.update(primitive_params
)
5199 # TODO Check if we will need something at vnf level
5200 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5202 kdu_name
== kdu
["kdu-name"]
5203 and kdu
["member-vnf-index"] == vnf_index
5208 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5211 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5212 msg
= "unknown k8scluster-type '{}'".format(
5213 kdu
.get("k8scluster-type")
5215 raise LcmException(msg
)
5218 "collection": "nsrs",
5219 "filter": {"_id": nsr_id
},
5220 "path": "_admin.deployed.K8s.{}".format(index
),
5224 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5226 step
= "Executing kdu {}".format(primitive_name
)
5227 if primitive_name
== "upgrade":
5228 if desc_params
.get("kdu_model"):
5229 kdu_model
= desc_params
.get("kdu_model")
5230 del desc_params
["kdu_model"]
5232 kdu_model
= kdu
.get("kdu-model")
5233 parts
= kdu_model
.split(sep
=":")
5235 kdu_model
= parts
[0]
5237 detailed_status
= await asyncio
.wait_for(
5238 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5239 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5240 kdu_instance
=kdu
.get("kdu-instance"),
5242 kdu_model
=kdu_model
,
5245 timeout
=timeout_ns_action
,
5247 timeout
=timeout_ns_action
+ 10,
5250 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5252 elif primitive_name
== "rollback":
5253 detailed_status
= await asyncio
.wait_for(
5254 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5255 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5256 kdu_instance
=kdu
.get("kdu-instance"),
5259 timeout
=timeout_ns_action
,
5261 elif primitive_name
== "status":
5262 detailed_status
= await asyncio
.wait_for(
5263 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5264 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5265 kdu_instance
=kdu
.get("kdu-instance"),
5268 timeout
=timeout_ns_action
,
5271 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5272 kdu
["kdu-name"], nsr_id
5274 params
= self
._map
_primitive
_params
(
5275 config_primitive_desc
, primitive_params
, desc_params
5278 detailed_status
= await asyncio
.wait_for(
5279 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5280 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5281 kdu_instance
=kdu_instance
,
5282 primitive_name
=primitive_name
,
5285 timeout
=timeout_ns_action
,
5288 timeout
=timeout_ns_action
,
5292 nslcmop_operation_state
= "COMPLETED"
5294 detailed_status
= ""
5295 nslcmop_operation_state
= "FAILED"
5297 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5298 nsr_deployed
["VCA"],
5299 member_vnf_index
=vnf_index
,
5301 vdu_count_index
=vdu_count_index
,
5302 ee_descriptor_id
=ee_descriptor_id
,
5304 for vca_index
, vca_deployed
in enumerate(
5305 db_nsr
["_admin"]["deployed"]["VCA"]
5307 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5309 "collection": "nsrs",
5310 "filter": {"_id": nsr_id
},
5311 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5315 nslcmop_operation_state
,
5317 ) = await self
._ns
_execute
_primitive
(
5319 primitive
=primitive_name
,
5320 primitive_params
=self
._map
_primitive
_params
(
5321 config_primitive_desc
, primitive_params
, desc_params
5323 timeout
=timeout_ns_action
,
5329 db_nslcmop_update
["detailed-status"] = detailed_status
5330 error_description_nslcmop
= (
5331 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5335 + " task Done with result {} {}".format(
5336 nslcmop_operation_state
, detailed_status
5339 return # database update is called inside finally
5341 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5342 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5344 except asyncio
.CancelledError
:
5346 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5348 exc
= "Operation was cancelled"
5349 except asyncio
.TimeoutError
:
5350 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5352 except Exception as e
:
5353 exc
= traceback
.format_exc()
5354 self
.logger
.critical(
5355 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5364 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5365 nslcmop_operation_state
= "FAILED"
5367 self
._write
_ns
_status
(
5371 ], # TODO check if degraded. For the moment use previous status
5372 current_operation
="IDLE",
5373 current_operation_id
=None,
5374 # error_description=error_description_nsr,
5375 # error_detail=error_detail,
5376 other_update
=db_nsr_update
,
5379 self
._write
_op
_status
(
5382 error_message
=error_description_nslcmop
,
5383 operation_state
=nslcmop_operation_state
,
5384 other_update
=db_nslcmop_update
,
5387 if nslcmop_operation_state
:
5389 await self
.msg
.aiowrite(
5394 "nslcmop_id": nslcmop_id
,
5395 "operationState": nslcmop_operation_state
,
5399 except Exception as e
:
5401 logging_text
+ "kafka_write notification Exception {}".format(e
)
5403 self
.logger
.debug(logging_text
+ "Exit")
5404 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5405 return nslcmop_operation_state
, detailed_status
5407 async def terminate_vdus(
5408 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5410 """This method terminates VDUs
5413 db_vnfr: VNF instance record
5414 member_vnf_index: VNF index to identify the VDUs to be removed
5415 db_nsr: NS instance record
5416 update_db_nslcmops: Nslcmop update record
5418 vca_scaling_info
= []
5419 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5420 scaling_info
["scaling_direction"] = "IN"
5421 scaling_info
["vdu-delete"] = {}
5422 scaling_info
["kdu-delete"] = {}
5423 db_vdur
= db_vnfr
.get("vdur")
5424 vdur_list
= copy(db_vdur
)
5426 for index
, vdu
in enumerate(vdur_list
):
5427 vca_scaling_info
.append(
5429 "osm_vdu_id": vdu
["vdu-id-ref"],
5430 "member-vnf-index": member_vnf_index
,
5432 "vdu_index": count_index
,
5434 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5435 scaling_info
["vdu"].append(
5437 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5438 "vdu_id": vdu
["vdu-id-ref"],
5441 for interface
in vdu
["interfaces"]:
5442 scaling_info
["vdu"][index
]["interface"].append(
5444 "name": interface
["name"],
5445 "ip_address": interface
["ip-address"],
5446 "mac_address": interface
.get("mac-address"),
5448 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5449 stage
[2] = "Terminating VDUs"
5450 if scaling_info
.get("vdu-delete"):
5451 # scale_process = "RO"
5452 if self
.ro_config
.get("ng"):
5453 await self
._scale
_ng
_ro
(
5454 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5457 async def remove_vnf(
5458 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5460 """This method is to Remove VNF instances from NS.
5463 nsr_id: NS instance id
5464 nslcmop_id: nslcmop id of update
5465 vnf_instance_id: id of the VNF instance to be removed
5468 result: (str, str) COMPLETED/FAILED, details
5472 logging_text
= "Task ns={} update ".format(nsr_id
)
5473 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5474 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5475 if check_vnfr_count
> 1:
5476 stage
= ["", "", ""]
5477 step
= "Getting nslcmop from database"
5478 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5479 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5480 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5481 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5482 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5483 """ db_vnfr = self.db.get_one(
5484 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5486 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5487 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5489 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5490 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5491 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5492 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5493 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5494 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5495 return "COMPLETED", "Done"
5497 step
= "Terminate VNF Failed with"
5498 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5500 except (LcmException
, asyncio
.CancelledError
):
5502 except Exception as e
:
5503 self
.logger
.debug("Error removing VNF {}".format(e
))
5504 return "FAILED", "Error removing VNF {}".format(e
)
5506 async def _ns_redeploy_vnf(
5507 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5509 """This method updates and redeploys VNF instances
5512 nsr_id: NS instance id
5513 nslcmop_id: nslcmop id
5514 db_vnfd: VNF descriptor
5515 db_vnfr: VNF instance record
5516 db_nsr: NS instance record
5519 result: (str, str) COMPLETED/FAILED, details
5523 stage
= ["", "", ""]
5524 logging_text
= "Task ns={} update ".format(nsr_id
)
5525 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5526 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5528 # Terminate old VNF resources
5529 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5530 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5532 # old_vnfd_id = db_vnfr["vnfd-id"]
5533 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5534 new_db_vnfd
= db_vnfd
5535 # new_vnfd_ref = new_db_vnfd["id"]
5536 # new_vnfd_id = vnfd_id
5540 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5542 "name": cp
.get("id"),
5543 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5544 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5547 new_vnfr_cp
.append(vnf_cp
)
5548 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5549 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5550 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5551 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5552 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5553 updated_db_vnfr
= self
.db
.get_one(
5554 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5557 # Instantiate new VNF resources
5558 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5559 vca_scaling_info
= []
5560 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5561 scaling_info
["scaling_direction"] = "OUT"
5562 scaling_info
["vdu-create"] = {}
5563 scaling_info
["kdu-create"] = {}
5564 vdud_instantiate_list
= db_vnfd
["vdu"]
5565 for index
, vdud
in enumerate(vdud_instantiate_list
):
5566 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5570 additional_params
= (
5571 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5574 cloud_init_list
= []
5576 # TODO Information of its own ip is not available because db_vnfr is not updated.
5577 additional_params
["OSM"] = get_osm_params(
5578 updated_db_vnfr
, vdud
["id"], 1
5580 cloud_init_list
.append(
5581 self
._parse
_cloud
_init
(
5588 vca_scaling_info
.append(
5590 "osm_vdu_id": vdud
["id"],
5591 "member-vnf-index": member_vnf_index
,
5593 "vdu_index": count_index
,
5596 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5597 if self
.ro_config
.get("ng"):
5599 "New Resources to be deployed: {}".format(scaling_info
))
5600 await self
._scale
_ng
_ro
(
5601 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5603 return "COMPLETED", "Done"
5604 except (LcmException
, asyncio
.CancelledError
):
5606 except Exception as e
:
5607 self
.logger
.debug("Error updating VNF {}".format(e
))
5608 return "FAILED", "Error updating VNF {}".format(e
)
5610 async def _ns_charm_upgrade(
5616 timeout
: float = None,
5618 """This method upgrade charms in VNF instances
5621 ee_id: Execution environment id
5622 path: Local path to the charm
5624 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5625 timeout: (Float) Timeout for the ns update operation
5628 result: (str, str) COMPLETED/FAILED, details
5631 charm_type
= charm_type
or "lxc_proxy_charm"
5632 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5636 charm_type
=charm_type
,
5637 timeout
=timeout
or self
.timeout_ns_update
,
5641 return "COMPLETED", output
5643 except (LcmException
, asyncio
.CancelledError
):
5646 except Exception as e
:
5648 self
.logger
.debug("Error upgrading charm {}".format(path
))
5650 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5652 async def update(self
, nsr_id
, nslcmop_id
):
5653 """Update NS according to different update types
5655 This method performs upgrade of VNF instances then updates the revision
5656 number in VNF record
5659 nsr_id: Network service will be updated
5660 nslcmop_id: ns lcm operation id
5663 It may raise DbException, LcmException, N2VCException, K8sException
5666 # Try to lock HA task here
5667 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5668 if not task_is_locked_by_me
:
5671 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5672 self
.logger
.debug(logging_text
+ "Enter")
5674 # Set the required variables to be filled up later
5676 db_nslcmop_update
= {}
5678 nslcmop_operation_state
= None
5680 error_description_nslcmop
= ""
5682 change_type
= "updated"
5683 detailed_status
= ""
5686 # wait for any previous tasks in process
5687 step
= "Waiting for previous operations to terminate"
5688 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5689 self
._write
_ns
_status
(
5692 current_operation
="UPDATING",
5693 current_operation_id
=nslcmop_id
,
5696 step
= "Getting nslcmop from database"
5697 db_nslcmop
= self
.db
.get_one(
5698 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5700 update_type
= db_nslcmop
["operationParams"]["updateType"]
5702 step
= "Getting nsr from database"
5703 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5704 old_operational_status
= db_nsr
["operational-status"]
5705 db_nsr_update
["operational-status"] = "updating"
5706 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5707 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5709 if update_type
== "CHANGE_VNFPKG":
5711 # Get the input parameters given through update request
5712 vnf_instance_id
= db_nslcmop
["operationParams"][
5713 "changeVnfPackageData"
5714 ].get("vnfInstanceId")
5716 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5719 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5721 step
= "Getting vnfr from database"
5722 db_vnfr
= self
.db
.get_one(
5723 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5726 step
= "Getting vnfds from database"
5728 latest_vnfd
= self
.db
.get_one(
5729 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5731 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5734 current_vnf_revision
= db_vnfr
.get("revision", 1)
5735 current_vnfd
= self
.db
.get_one(
5737 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5738 fail_on_empty
=False,
5740 # Charm artifact paths will be filled up later
5742 current_charm_artifact_path
,
5743 target_charm_artifact_path
,
5744 charm_artifact_paths
,
5747 step
= "Checking if revision has changed in VNFD"
5748 if current_vnf_revision
!= latest_vnfd_revision
:
5750 change_type
= "policy_updated"
5752 # There is new revision of VNFD, update operation is required
5753 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5754 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5756 step
= "Removing the VNFD packages if they exist in the local path"
5757 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5758 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5760 step
= "Get the VNFD packages from FSMongo"
5761 self
.fs
.sync(from_path
=latest_vnfd_path
)
5762 self
.fs
.sync(from_path
=current_vnfd_path
)
5765 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5767 base_folder
= latest_vnfd
["_admin"]["storage"]
5769 for charm_index
, charm_deployed
in enumerate(
5770 get_iterable(nsr_deployed
, "VCA")
5772 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5774 # Getting charm-id and charm-type
5775 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5776 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5777 charm_type
= charm_deployed
.get("type")
5780 ee_id
= charm_deployed
.get("ee_id")
5782 step
= "Getting descriptor config"
5783 descriptor_config
= get_configuration(
5784 current_vnfd
, current_vnfd
["id"]
5787 if "execution-environment-list" in descriptor_config
:
5788 ee_list
= descriptor_config
.get(
5789 "execution-environment-list", []
5794 # There could be several charm used in the same VNF
5795 for ee_item
in ee_list
:
5796 if ee_item
.get("juju"):
5798 step
= "Getting charm name"
5799 charm_name
= ee_item
["juju"].get("charm")
5801 step
= "Setting Charm artifact paths"
5802 current_charm_artifact_path
.append(
5803 get_charm_artifact_path(
5807 current_vnf_revision
,
5810 target_charm_artifact_path
.append(
5811 get_charm_artifact_path(
5815 latest_vnfd_revision
,
5819 charm_artifact_paths
= zip(
5820 current_charm_artifact_path
, target_charm_artifact_path
5823 step
= "Checking if software version has changed in VNFD"
5824 if find_software_version(current_vnfd
) != find_software_version(
5828 step
= "Checking if existing VNF has charm"
5829 for current_charm_path
, target_charm_path
in list(
5830 charm_artifact_paths
5832 if current_charm_path
:
5834 "Software version change is not supported as VNF instance {} has charm.".format(
5839 # There is no change in the charm package, then redeploy the VNF
5840 # based on new descriptor
5841 step
= "Redeploying VNF"
5842 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5846 ) = await self
._ns
_redeploy
_vnf
(
5853 if result
== "FAILED":
5854 nslcmop_operation_state
= result
5855 error_description_nslcmop
= detailed_status
5856 db_nslcmop_update
["detailed-status"] = detailed_status
5859 + " step {} Done with result {} {}".format(
5860 step
, nslcmop_operation_state
, detailed_status
5865 step
= "Checking if any charm package has changed or not"
5866 for current_charm_path
, target_charm_path
in list(
5867 charm_artifact_paths
5871 and target_charm_path
5872 and self
.check_charm_hash_changed(
5873 current_charm_path
, target_charm_path
5877 step
= "Checking whether VNF uses juju bundle"
5878 if check_juju_bundle_existence(current_vnfd
):
5881 "Charm upgrade is not supported for the instance which"
5882 " uses juju-bundle: {}".format(
5883 check_juju_bundle_existence(current_vnfd
)
5887 step
= "Upgrading Charm"
5891 ) = await self
._ns
_charm
_upgrade
(
5894 charm_type
=charm_type
,
5895 path
=self
.fs
.path
+ target_charm_path
,
5896 timeout
=timeout_seconds
,
5899 if result
== "FAILED":
5900 nslcmop_operation_state
= result
5901 error_description_nslcmop
= detailed_status
5903 db_nslcmop_update
["detailed-status"] = detailed_status
5906 + " step {} Done with result {} {}".format(
5907 step
, nslcmop_operation_state
, detailed_status
5911 step
= "Updating policies"
5912 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5913 result
= "COMPLETED"
5914 detailed_status
= "Done"
5915 db_nslcmop_update
["detailed-status"] = "Done"
5917 # If nslcmop_operation_state is None, so any operation is not failed.
5918 if not nslcmop_operation_state
:
5919 nslcmop_operation_state
= "COMPLETED"
5921 # If update CHANGE_VNFPKG nslcmop_operation is successful
5922 # vnf revision need to be updated
5923 vnfr_update
["revision"] = latest_vnfd_revision
5924 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5928 + " task Done with result {} {}".format(
5929 nslcmop_operation_state
, detailed_status
5932 elif update_type
== "REMOVE_VNF":
5933 # This part is included in https://osm.etsi.org/gerrit/11876
5934 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5935 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5936 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5937 step
= "Removing VNF"
5938 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5939 if result
== "FAILED":
5940 nslcmop_operation_state
= result
5941 error_description_nslcmop
= detailed_status
5942 db_nslcmop_update
["detailed-status"] = detailed_status
5943 change_type
= "vnf_terminated"
5944 if not nslcmop_operation_state
:
5945 nslcmop_operation_state
= "COMPLETED"
5948 + " task Done with result {} {}".format(
5949 nslcmop_operation_state
, detailed_status
5953 elif update_type
== "OPERATE_VNF":
5954 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5955 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5956 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5957 (result
, detailed_status
) = await self
.rebuild_start_stop(
5958 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5960 if result
== "FAILED":
5961 nslcmop_operation_state
= result
5962 error_description_nslcmop
= detailed_status
5963 db_nslcmop_update
["detailed-status"] = detailed_status
5964 if not nslcmop_operation_state
:
5965 nslcmop_operation_state
= "COMPLETED"
5968 + " task Done with result {} {}".format(
5969 nslcmop_operation_state
, detailed_status
5973 # If nslcmop_operation_state is None, so any operation is not failed.
5974 # All operations are executed in overall.
5975 if not nslcmop_operation_state
:
5976 nslcmop_operation_state
= "COMPLETED"
5977 db_nsr_update
["operational-status"] = old_operational_status
5979 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5980 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5982 except asyncio
.CancelledError
:
5984 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5986 exc
= "Operation was cancelled"
5987 except asyncio
.TimeoutError
:
5988 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5990 except Exception as e
:
5991 exc
= traceback
.format_exc()
5992 self
.logger
.critical(
5993 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6002 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6003 nslcmop_operation_state
= "FAILED"
6004 db_nsr_update
["operational-status"] = old_operational_status
6006 self
._write
_ns
_status
(
6008 ns_state
=db_nsr
["nsState"],
6009 current_operation
="IDLE",
6010 current_operation_id
=None,
6011 other_update
=db_nsr_update
,
6014 self
._write
_op
_status
(
6017 error_message
=error_description_nslcmop
,
6018 operation_state
=nslcmop_operation_state
,
6019 other_update
=db_nslcmop_update
,
6022 if nslcmop_operation_state
:
6026 "nslcmop_id": nslcmop_id
,
6027 "operationState": nslcmop_operation_state
,
6029 if change_type
in ("vnf_terminated", "policy_updated"):
6030 msg
.update({"vnf_member_index": member_vnf_index
})
6031 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6032 except Exception as e
:
6034 logging_text
+ "kafka_write notification Exception {}".format(e
)
6036 self
.logger
.debug(logging_text
+ "Exit")
6037 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6038 return nslcmop_operation_state
, detailed_status
6040 async def scale(self
, nsr_id
, nslcmop_id
):
6041 # Try to lock HA task here
6042 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6043 if not task_is_locked_by_me
:
6046 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6047 stage
= ["", "", ""]
6048 tasks_dict_info
= {}
6049 # ^ stage, step, VIM progress
6050 self
.logger
.debug(logging_text
+ "Enter")
6051 # get all needed from database
6053 db_nslcmop_update
= {}
6056 # in case of error, indicates what part of scale was failed to put nsr at error status
6057 scale_process
= None
6058 old_operational_status
= ""
6059 old_config_status
= ""
6062 # wait for any previous tasks in process
6063 step
= "Waiting for previous operations to terminate"
6064 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6065 self
._write
_ns
_status
(
6068 current_operation
="SCALING",
6069 current_operation_id
=nslcmop_id
,
6072 step
= "Getting nslcmop from database"
6074 step
+ " after having waited for previous tasks to be completed"
6076 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6078 step
= "Getting nsr from database"
6079 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6080 old_operational_status
= db_nsr
["operational-status"]
6081 old_config_status
= db_nsr
["config-status"]
6083 step
= "Parsing scaling parameters"
6084 db_nsr_update
["operational-status"] = "scaling"
6085 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6086 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6088 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6090 ]["member-vnf-index"]
6091 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6093 ]["scaling-group-descriptor"]
6094 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6095 # for backward compatibility
6096 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6097 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6098 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6099 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6101 step
= "Getting vnfr from database"
6102 db_vnfr
= self
.db
.get_one(
6103 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6106 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6108 step
= "Getting vnfd from database"
6109 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6111 base_folder
= db_vnfd
["_admin"]["storage"]
6113 step
= "Getting scaling-group-descriptor"
6114 scaling_descriptor
= find_in_list(
6115 get_scaling_aspect(db_vnfd
),
6116 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6118 if not scaling_descriptor
:
6120 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6121 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6124 step
= "Sending scale order to VIM"
6125 # TODO check if ns is in a proper status
6127 if not db_nsr
["_admin"].get("scaling-group"):
6132 "_admin.scaling-group": [
6133 {"name": scaling_group
, "nb-scale-op": 0}
6137 admin_scale_index
= 0
6139 for admin_scale_index
, admin_scale_info
in enumerate(
6140 db_nsr
["_admin"]["scaling-group"]
6142 if admin_scale_info
["name"] == scaling_group
:
6143 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6145 else: # not found, set index one plus last element and add new entry with the name
6146 admin_scale_index
+= 1
6148 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6151 vca_scaling_info
= []
6152 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6153 if scaling_type
== "SCALE_OUT":
6154 if "aspect-delta-details" not in scaling_descriptor
:
6156 "Aspect delta details not fount in scaling descriptor {}".format(
6157 scaling_descriptor
["name"]
6160 # count if max-instance-count is reached
6161 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6163 scaling_info
["scaling_direction"] = "OUT"
6164 scaling_info
["vdu-create"] = {}
6165 scaling_info
["kdu-create"] = {}
6166 for delta
in deltas
:
6167 for vdu_delta
in delta
.get("vdu-delta", {}):
6168 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6169 # vdu_index also provides the number of instance of the targeted vdu
6170 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6171 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6175 additional_params
= (
6176 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6179 cloud_init_list
= []
6181 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6182 max_instance_count
= 10
6183 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6184 max_instance_count
= vdu_profile
.get(
6185 "max-number-of-instances", 10
6188 default_instance_num
= get_number_of_instances(
6191 instances_number
= vdu_delta
.get("number-of-instances", 1)
6192 nb_scale_op
+= instances_number
6194 new_instance_count
= nb_scale_op
+ default_instance_num
6195 # Control if new count is over max and vdu count is less than max.
6196 # Then assign new instance count
6197 if new_instance_count
> max_instance_count
> vdu_count
:
6198 instances_number
= new_instance_count
- max_instance_count
6200 instances_number
= instances_number
6202 if new_instance_count
> max_instance_count
:
6204 "reached the limit of {} (max-instance-count) "
6205 "scaling-out operations for the "
6206 "scaling-group-descriptor '{}'".format(
6207 nb_scale_op
, scaling_group
6210 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6212 # TODO Information of its own ip is not available because db_vnfr is not updated.
6213 additional_params
["OSM"] = get_osm_params(
6214 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6216 cloud_init_list
.append(
6217 self
._parse
_cloud
_init
(
6224 vca_scaling_info
.append(
6226 "osm_vdu_id": vdu_delta
["id"],
6227 "member-vnf-index": vnf_index
,
6229 "vdu_index": vdu_index
+ x
,
6232 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6233 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6234 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6235 kdu_name
= kdu_profile
["kdu-name"]
6236 resource_name
= kdu_profile
.get("resource-name", "")
6238 # Might have different kdus in the same delta
6239 # Should have list for each kdu
6240 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6241 scaling_info
["kdu-create"][kdu_name
] = []
6243 kdur
= get_kdur(db_vnfr
, kdu_name
)
6244 if kdur
.get("helm-chart"):
6245 k8s_cluster_type
= "helm-chart-v3"
6246 self
.logger
.debug("kdur: {}".format(kdur
))
6248 kdur
.get("helm-version")
6249 and kdur
.get("helm-version") == "v2"
6251 k8s_cluster_type
= "helm-chart"
6252 elif kdur
.get("juju-bundle"):
6253 k8s_cluster_type
= "juju-bundle"
6256 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6257 "juju-bundle. Maybe an old NBI version is running".format(
6258 db_vnfr
["member-vnf-index-ref"], kdu_name
6262 max_instance_count
= 10
6263 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6264 max_instance_count
= kdu_profile
.get(
6265 "max-number-of-instances", 10
6268 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6269 deployed_kdu
, _
= get_deployed_kdu(
6270 nsr_deployed
, kdu_name
, vnf_index
6272 if deployed_kdu
is None:
6274 "KDU '{}' for vnf '{}' not deployed".format(
6278 kdu_instance
= deployed_kdu
.get("kdu-instance")
6279 instance_num
= await self
.k8scluster_map
[
6285 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6286 kdu_model
=deployed_kdu
.get("kdu-model"),
6288 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6289 "number-of-instances", 1
6292 # Control if new count is over max and instance_num is less than max.
6293 # Then assign max instance number to kdu replica count
6294 if kdu_replica_count
> max_instance_count
> instance_num
:
6295 kdu_replica_count
= max_instance_count
6296 if kdu_replica_count
> max_instance_count
:
6298 "reached the limit of {} (max-instance-count) "
6299 "scaling-out operations for the "
6300 "scaling-group-descriptor '{}'".format(
6301 instance_num
, scaling_group
6305 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6306 vca_scaling_info
.append(
6308 "osm_kdu_id": kdu_name
,
6309 "member-vnf-index": vnf_index
,
6311 "kdu_index": instance_num
+ x
- 1,
6314 scaling_info
["kdu-create"][kdu_name
].append(
6316 "member-vnf-index": vnf_index
,
6318 "k8s-cluster-type": k8s_cluster_type
,
6319 "resource-name": resource_name
,
6320 "scale": kdu_replica_count
,
6323 elif scaling_type
== "SCALE_IN":
6324 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6326 scaling_info
["scaling_direction"] = "IN"
6327 scaling_info
["vdu-delete"] = {}
6328 scaling_info
["kdu-delete"] = {}
6330 for delta
in deltas
:
6331 for vdu_delta
in delta
.get("vdu-delta", {}):
6332 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6333 min_instance_count
= 0
6334 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6335 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6336 min_instance_count
= vdu_profile
["min-number-of-instances"]
6338 default_instance_num
= get_number_of_instances(
6339 db_vnfd
, vdu_delta
["id"]
6341 instance_num
= vdu_delta
.get("number-of-instances", 1)
6342 nb_scale_op
-= instance_num
6344 new_instance_count
= nb_scale_op
+ default_instance_num
6346 if new_instance_count
< min_instance_count
< vdu_count
:
6347 instances_number
= min_instance_count
- new_instance_count
6349 instances_number
= instance_num
6351 if new_instance_count
< min_instance_count
:
6353 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6354 "scaling-group-descriptor '{}'".format(
6355 nb_scale_op
, scaling_group
6358 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6359 vca_scaling_info
.append(
6361 "osm_vdu_id": vdu_delta
["id"],
6362 "member-vnf-index": vnf_index
,
6364 "vdu_index": vdu_index
- 1 - x
,
6367 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6368 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6369 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6370 kdu_name
= kdu_profile
["kdu-name"]
6371 resource_name
= kdu_profile
.get("resource-name", "")
6373 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6374 scaling_info
["kdu-delete"][kdu_name
] = []
6376 kdur
= get_kdur(db_vnfr
, kdu_name
)
6377 if kdur
.get("helm-chart"):
6378 k8s_cluster_type
= "helm-chart-v3"
6379 self
.logger
.debug("kdur: {}".format(kdur
))
6381 kdur
.get("helm-version")
6382 and kdur
.get("helm-version") == "v2"
6384 k8s_cluster_type
= "helm-chart"
6385 elif kdur
.get("juju-bundle"):
6386 k8s_cluster_type
= "juju-bundle"
6389 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6390 "juju-bundle. Maybe an old NBI version is running".format(
6391 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6395 min_instance_count
= 0
6396 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6397 min_instance_count
= kdu_profile
["min-number-of-instances"]
6399 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6400 deployed_kdu
, _
= get_deployed_kdu(
6401 nsr_deployed
, kdu_name
, vnf_index
6403 if deployed_kdu
is None:
6405 "KDU '{}' for vnf '{}' not deployed".format(
6409 kdu_instance
= deployed_kdu
.get("kdu-instance")
6410 instance_num
= await self
.k8scluster_map
[
6416 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6417 kdu_model
=deployed_kdu
.get("kdu-model"),
6419 kdu_replica_count
= instance_num
- kdu_delta
.get(
6420 "number-of-instances", 1
6423 if kdu_replica_count
< min_instance_count
< instance_num
:
6424 kdu_replica_count
= min_instance_count
6425 if kdu_replica_count
< min_instance_count
:
6427 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6428 "scaling-group-descriptor '{}'".format(
6429 instance_num
, scaling_group
6433 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6434 vca_scaling_info
.append(
6436 "osm_kdu_id": kdu_name
,
6437 "member-vnf-index": vnf_index
,
6439 "kdu_index": instance_num
- x
- 1,
6442 scaling_info
["kdu-delete"][kdu_name
].append(
6444 "member-vnf-index": vnf_index
,
6446 "k8s-cluster-type": k8s_cluster_type
,
6447 "resource-name": resource_name
,
6448 "scale": kdu_replica_count
,
6452 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6453 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6454 if scaling_info
["scaling_direction"] == "IN":
6455 for vdur
in reversed(db_vnfr
["vdur"]):
6456 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6457 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6458 scaling_info
["vdu"].append(
6460 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6461 "vdu_id": vdur
["vdu-id-ref"],
6465 for interface
in vdur
["interfaces"]:
6466 scaling_info
["vdu"][-1]["interface"].append(
6468 "name": interface
["name"],
6469 "ip_address": interface
["ip-address"],
6470 "mac_address": interface
.get("mac-address"),
6473 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6476 step
= "Executing pre-scale vnf-config-primitive"
6477 if scaling_descriptor
.get("scaling-config-action"):
6478 for scaling_config_action
in scaling_descriptor
[
6479 "scaling-config-action"
6482 scaling_config_action
.get("trigger") == "pre-scale-in"
6483 and scaling_type
== "SCALE_IN"
6485 scaling_config_action
.get("trigger") == "pre-scale-out"
6486 and scaling_type
== "SCALE_OUT"
6488 vnf_config_primitive
= scaling_config_action
[
6489 "vnf-config-primitive-name-ref"
6491 step
= db_nslcmop_update
[
6493 ] = "executing pre-scale scaling-config-action '{}'".format(
6494 vnf_config_primitive
6497 # look for primitive
6498 for config_primitive
in (
6499 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6500 ).get("config-primitive", ()):
6501 if config_primitive
["name"] == vnf_config_primitive
:
6505 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6506 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6507 "primitive".format(scaling_group
, vnf_config_primitive
)
6510 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6511 if db_vnfr
.get("additionalParamsForVnf"):
6512 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6514 scale_process
= "VCA"
6515 db_nsr_update
["config-status"] = "configuring pre-scaling"
6516 primitive_params
= self
._map
_primitive
_params
(
6517 config_primitive
, {}, vnfr_params
6520 # Pre-scale retry check: Check if this sub-operation has been executed before
6521 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6524 vnf_config_primitive
,
6528 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6529 # Skip sub-operation
6530 result
= "COMPLETED"
6531 result_detail
= "Done"
6534 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6535 vnf_config_primitive
, result
, result_detail
6539 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6540 # New sub-operation: Get index of this sub-operation
6542 len(db_nslcmop
.get("_admin", {}).get("operations"))
6547 + "vnf_config_primitive={} New sub-operation".format(
6548 vnf_config_primitive
6552 # retry: Get registered params for this existing sub-operation
6553 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6556 vnf_index
= op
.get("member_vnf_index")
6557 vnf_config_primitive
= op
.get("primitive")
6558 primitive_params
= op
.get("primitive_params")
6561 + "vnf_config_primitive={} Sub-operation retry".format(
6562 vnf_config_primitive
6565 # Execute the primitive, either with new (first-time) or registered (reintent) args
6566 ee_descriptor_id
= config_primitive
.get(
6567 "execution-environment-ref"
6569 primitive_name
= config_primitive
.get(
6570 "execution-environment-primitive", vnf_config_primitive
6572 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6573 nsr_deployed
["VCA"],
6574 member_vnf_index
=vnf_index
,
6576 vdu_count_index
=None,
6577 ee_descriptor_id
=ee_descriptor_id
,
6579 result
, result_detail
= await self
._ns
_execute
_primitive
(
6588 + "vnf_config_primitive={} Done with result {} {}".format(
6589 vnf_config_primitive
, result
, result_detail
6592 # Update operationState = COMPLETED | FAILED
6593 self
._update
_suboperation
_status
(
6594 db_nslcmop
, op_index
, result
, result_detail
6597 if result
== "FAILED":
6598 raise LcmException(result_detail
)
6599 db_nsr_update
["config-status"] = old_config_status
6600 scale_process
= None
6604 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6607 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6610 # SCALE-IN VCA - BEGIN
6611 if vca_scaling_info
:
6612 step
= db_nslcmop_update
[
6614 ] = "Deleting the execution environments"
6615 scale_process
= "VCA"
6616 for vca_info
in vca_scaling_info
:
6617 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6618 member_vnf_index
= str(vca_info
["member-vnf-index"])
6620 logging_text
+ "vdu info: {}".format(vca_info
)
6622 if vca_info
.get("osm_vdu_id"):
6623 vdu_id
= vca_info
["osm_vdu_id"]
6624 vdu_index
= int(vca_info
["vdu_index"])
6627 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6628 member_vnf_index
, vdu_id
, vdu_index
6630 stage
[2] = step
= "Scaling in VCA"
6631 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6632 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6633 config_update
= db_nsr
["configurationStatus"]
6634 for vca_index
, vca
in enumerate(vca_update
):
6636 (vca
or vca
.get("ee_id"))
6637 and vca
["member-vnf-index"] == member_vnf_index
6638 and vca
["vdu_count_index"] == vdu_index
6640 if vca
.get("vdu_id"):
6641 config_descriptor
= get_configuration(
6642 db_vnfd
, vca
.get("vdu_id")
6644 elif vca
.get("kdu_name"):
6645 config_descriptor
= get_configuration(
6646 db_vnfd
, vca
.get("kdu_name")
6649 config_descriptor
= get_configuration(
6650 db_vnfd
, db_vnfd
["id"]
6652 operation_params
= (
6653 db_nslcmop
.get("operationParams") or {}
6655 exec_terminate_primitives
= not operation_params
.get(
6656 "skip_terminate_primitives"
6657 ) and vca
.get("needed_terminate")
6658 task
= asyncio
.ensure_future(
6667 exec_primitives
=exec_terminate_primitives
,
6671 timeout
=self
.timeout_charm_delete
,
6674 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6677 del vca_update
[vca_index
]
6678 del config_update
[vca_index
]
6679 # wait for pending tasks of terminate primitives
6683 + "Waiting for tasks {}".format(
6684 list(tasks_dict_info
.keys())
6687 error_list
= await self
._wait
_for
_tasks
(
6691 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6696 tasks_dict_info
.clear()
6698 raise LcmException("; ".join(error_list
))
6700 db_vca_and_config_update
= {
6701 "_admin.deployed.VCA": vca_update
,
6702 "configurationStatus": config_update
,
6705 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6707 scale_process
= None
6708 # SCALE-IN VCA - END
6711 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6712 scale_process
= "RO"
6713 if self
.ro_config
.get("ng"):
6714 await self
._scale
_ng
_ro
(
6715 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6717 scaling_info
.pop("vdu-create", None)
6718 scaling_info
.pop("vdu-delete", None)
6720 scale_process
= None
6724 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6725 scale_process
= "KDU"
6726 await self
._scale
_kdu
(
6727 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6729 scaling_info
.pop("kdu-create", None)
6730 scaling_info
.pop("kdu-delete", None)
6732 scale_process
= None
6736 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6738 # SCALE-UP VCA - BEGIN
6739 if vca_scaling_info
:
6740 step
= db_nslcmop_update
[
6742 ] = "Creating new execution environments"
6743 scale_process
= "VCA"
6744 for vca_info
in vca_scaling_info
:
6745 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6746 member_vnf_index
= str(vca_info
["member-vnf-index"])
6748 logging_text
+ "vdu info: {}".format(vca_info
)
6750 vnfd_id
= db_vnfr
["vnfd-ref"]
6751 if vca_info
.get("osm_vdu_id"):
6752 vdu_index
= int(vca_info
["vdu_index"])
6753 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6754 if db_vnfr
.get("additionalParamsForVnf"):
6755 deploy_params
.update(
6757 db_vnfr
["additionalParamsForVnf"].copy()
6760 descriptor_config
= get_configuration(
6761 db_vnfd
, db_vnfd
["id"]
6763 if descriptor_config
:
6768 logging_text
=logging_text
6769 + "member_vnf_index={} ".format(member_vnf_index
),
6772 nslcmop_id
=nslcmop_id
,
6778 member_vnf_index
=member_vnf_index
,
6779 vdu_index
=vdu_index
,
6781 deploy_params
=deploy_params
,
6782 descriptor_config
=descriptor_config
,
6783 base_folder
=base_folder
,
6784 task_instantiation_info
=tasks_dict_info
,
6787 vdu_id
= vca_info
["osm_vdu_id"]
6788 vdur
= find_in_list(
6789 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6791 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6792 if vdur
.get("additionalParams"):
6793 deploy_params_vdu
= parse_yaml_strings(
6794 vdur
["additionalParams"]
6797 deploy_params_vdu
= deploy_params
6798 deploy_params_vdu
["OSM"] = get_osm_params(
6799 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6801 if descriptor_config
:
6806 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6807 member_vnf_index
, vdu_id
, vdu_index
6809 stage
[2] = step
= "Scaling out VCA"
6810 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6812 logging_text
=logging_text
6813 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6814 member_vnf_index
, vdu_id
, vdu_index
6818 nslcmop_id
=nslcmop_id
,
6824 member_vnf_index
=member_vnf_index
,
6825 vdu_index
=vdu_index
,
6827 deploy_params
=deploy_params_vdu
,
6828 descriptor_config
=descriptor_config
,
6829 base_folder
=base_folder
,
6830 task_instantiation_info
=tasks_dict_info
,
6833 # SCALE-UP VCA - END
6834 scale_process
= None
6837 # execute primitive service POST-SCALING
6838 step
= "Executing post-scale vnf-config-primitive"
6839 if scaling_descriptor
.get("scaling-config-action"):
6840 for scaling_config_action
in scaling_descriptor
[
6841 "scaling-config-action"
6844 scaling_config_action
.get("trigger") == "post-scale-in"
6845 and scaling_type
== "SCALE_IN"
6847 scaling_config_action
.get("trigger") == "post-scale-out"
6848 and scaling_type
== "SCALE_OUT"
6850 vnf_config_primitive
= scaling_config_action
[
6851 "vnf-config-primitive-name-ref"
6853 step
= db_nslcmop_update
[
6855 ] = "executing post-scale scaling-config-action '{}'".format(
6856 vnf_config_primitive
6859 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6860 if db_vnfr
.get("additionalParamsForVnf"):
6861 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6863 # look for primitive
6864 for config_primitive
in (
6865 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6866 ).get("config-primitive", ()):
6867 if config_primitive
["name"] == vnf_config_primitive
:
6871 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6872 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6873 "config-primitive".format(
6874 scaling_group
, vnf_config_primitive
6877 scale_process
= "VCA"
6878 db_nsr_update
["config-status"] = "configuring post-scaling"
6879 primitive_params
= self
._map
_primitive
_params
(
6880 config_primitive
, {}, vnfr_params
6883 # Post-scale retry check: Check if this sub-operation has been executed before
6884 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6887 vnf_config_primitive
,
6891 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6892 # Skip sub-operation
6893 result
= "COMPLETED"
6894 result_detail
= "Done"
6897 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6898 vnf_config_primitive
, result
, result_detail
6902 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6903 # New sub-operation: Get index of this sub-operation
6905 len(db_nslcmop
.get("_admin", {}).get("operations"))
6910 + "vnf_config_primitive={} New sub-operation".format(
6911 vnf_config_primitive
6915 # retry: Get registered params for this existing sub-operation
6916 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6919 vnf_index
= op
.get("member_vnf_index")
6920 vnf_config_primitive
= op
.get("primitive")
6921 primitive_params
= op
.get("primitive_params")
6924 + "vnf_config_primitive={} Sub-operation retry".format(
6925 vnf_config_primitive
6928 # Execute the primitive, either with new (first-time) or registered (reintent) args
6929 ee_descriptor_id
= config_primitive
.get(
6930 "execution-environment-ref"
6932 primitive_name
= config_primitive
.get(
6933 "execution-environment-primitive", vnf_config_primitive
6935 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6936 nsr_deployed
["VCA"],
6937 member_vnf_index
=vnf_index
,
6939 vdu_count_index
=None,
6940 ee_descriptor_id
=ee_descriptor_id
,
6942 result
, result_detail
= await self
._ns
_execute
_primitive
(
6951 + "vnf_config_primitive={} Done with result {} {}".format(
6952 vnf_config_primitive
, result
, result_detail
6955 # Update operationState = COMPLETED | FAILED
6956 self
._update
_suboperation
_status
(
6957 db_nslcmop
, op_index
, result
, result_detail
6960 if result
== "FAILED":
6961 raise LcmException(result_detail
)
6962 db_nsr_update
["config-status"] = old_config_status
6963 scale_process
= None
6968 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6969 db_nsr_update
["operational-status"] = (
6971 if old_operational_status
== "failed"
6972 else old_operational_status
6974 db_nsr_update
["config-status"] = old_config_status
6977 ROclient
.ROClientException
,
6982 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6984 except asyncio
.CancelledError
:
6986 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6988 exc
= "Operation was cancelled"
6989 except Exception as e
:
6990 exc
= traceback
.format_exc()
6991 self
.logger
.critical(
6992 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6996 self
._write
_ns
_status
(
6999 current_operation
="IDLE",
7000 current_operation_id
=None,
7003 stage
[1] = "Waiting for instantiate pending tasks."
7004 self
.logger
.debug(logging_text
+ stage
[1])
7005 exc
= await self
._wait
_for
_tasks
(
7008 self
.timeout_ns_deploy
,
7016 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7017 nslcmop_operation_state
= "FAILED"
7019 db_nsr_update
["operational-status"] = old_operational_status
7020 db_nsr_update
["config-status"] = old_config_status
7021 db_nsr_update
["detailed-status"] = ""
7023 if "VCA" in scale_process
:
7024 db_nsr_update
["config-status"] = "failed"
7025 if "RO" in scale_process
:
7026 db_nsr_update
["operational-status"] = "failed"
7029 ] = "FAILED scaling nslcmop={} {}: {}".format(
7030 nslcmop_id
, step
, exc
7033 error_description_nslcmop
= None
7034 nslcmop_operation_state
= "COMPLETED"
7035 db_nslcmop_update
["detailed-status"] = "Done"
7037 self
._write
_op
_status
(
7040 error_message
=error_description_nslcmop
,
7041 operation_state
=nslcmop_operation_state
,
7042 other_update
=db_nslcmop_update
,
7045 self
._write
_ns
_status
(
7048 current_operation
="IDLE",
7049 current_operation_id
=None,
7050 other_update
=db_nsr_update
,
7053 if nslcmop_operation_state
:
7057 "nslcmop_id": nslcmop_id
,
7058 "operationState": nslcmop_operation_state
,
7060 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7061 except Exception as e
:
7063 logging_text
+ "kafka_write notification Exception {}".format(e
)
7065 self
.logger
.debug(logging_text
+ "Exit")
7066 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7068 async def _scale_kdu(
7069 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7071 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7072 for kdu_name
in _scaling_info
:
7073 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7074 deployed_kdu
, index
= get_deployed_kdu(
7075 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7077 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7078 kdu_instance
= deployed_kdu
["kdu-instance"]
7079 kdu_model
= deployed_kdu
.get("kdu-model")
7080 scale
= int(kdu_scaling_info
["scale"])
7081 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7084 "collection": "nsrs",
7085 "filter": {"_id": nsr_id
},
7086 "path": "_admin.deployed.K8s.{}".format(index
),
7089 step
= "scaling application {}".format(
7090 kdu_scaling_info
["resource-name"]
7092 self
.logger
.debug(logging_text
+ step
)
7094 if kdu_scaling_info
["type"] == "delete":
7095 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7098 and kdu_config
.get("terminate-config-primitive")
7099 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7101 terminate_config_primitive_list
= kdu_config
.get(
7102 "terminate-config-primitive"
7104 terminate_config_primitive_list
.sort(
7105 key
=lambda val
: int(val
["seq"])
7109 terminate_config_primitive
7110 ) in terminate_config_primitive_list
:
7111 primitive_params_
= self
._map
_primitive
_params
(
7112 terminate_config_primitive
, {}, {}
7114 step
= "execute terminate config primitive"
7115 self
.logger
.debug(logging_text
+ step
)
7116 await asyncio
.wait_for(
7117 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7118 cluster_uuid
=cluster_uuid
,
7119 kdu_instance
=kdu_instance
,
7120 primitive_name
=terminate_config_primitive
["name"],
7121 params
=primitive_params_
,
7128 await asyncio
.wait_for(
7129 self
.k8scluster_map
[k8s_cluster_type
].scale(
7132 kdu_scaling_info
["resource-name"],
7134 cluster_uuid
=cluster_uuid
,
7135 kdu_model
=kdu_model
,
7139 timeout
=self
.timeout_vca_on_error
,
7142 if kdu_scaling_info
["type"] == "create":
7143 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7146 and kdu_config
.get("initial-config-primitive")
7147 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7149 initial_config_primitive_list
= kdu_config
.get(
7150 "initial-config-primitive"
7152 initial_config_primitive_list
.sort(
7153 key
=lambda val
: int(val
["seq"])
7156 for initial_config_primitive
in initial_config_primitive_list
:
7157 primitive_params_
= self
._map
_primitive
_params
(
7158 initial_config_primitive
, {}, {}
7160 step
= "execute initial config primitive"
7161 self
.logger
.debug(logging_text
+ step
)
7162 await asyncio
.wait_for(
7163 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7164 cluster_uuid
=cluster_uuid
,
7165 kdu_instance
=kdu_instance
,
7166 primitive_name
=initial_config_primitive
["name"],
7167 params
=primitive_params_
,
7174 async def _scale_ng_ro(
7175 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7177 nsr_id
= db_nslcmop
["nsInstanceId"]
7178 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7181 # read from db: vnfd's for every vnf
7184 # for each vnf in ns, read vnfd
7185 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7186 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7187 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7188 # if we haven't this vnfd, read it from db
7189 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7191 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7192 db_vnfds
.append(vnfd
)
7193 n2vc_key
= self
.n2vc
.get_public_key()
7194 n2vc_key_list
= [n2vc_key
]
7197 vdu_scaling_info
.get("vdu-create"),
7198 vdu_scaling_info
.get("vdu-delete"),
7201 # db_vnfr has been updated, update db_vnfrs to use it
7202 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7203 await self
._instantiate
_ng
_ro
(
7213 start_deploy
=time(),
7214 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7216 if vdu_scaling_info
.get("vdu-delete"):
7218 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7221 async def extract_prometheus_scrape_jobs(
7222 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7224 # look if exist a file called 'prometheus*.j2' and
7225 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7229 for f
in artifact_content
7230 if f
.startswith("prometheus") and f
.endswith(".j2")
7236 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7240 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7241 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7243 vnfr_id
= vnfr_id
.replace("-", "")
7245 "JOB_NAME": vnfr_id
,
7246 "TARGET_IP": target_ip
,
7247 "EXPORTER_POD_IP": host_name
,
7248 "EXPORTER_POD_PORT": host_port
,
7250 job_list
= parse_job(job_data
, variables
)
7251 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7252 for job
in job_list
:
7254 not isinstance(job
.get("job_name"), str)
7255 or vnfr_id
not in job
["job_name"]
7257 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7258 job
["nsr_id"] = nsr_id
7259 job
["vnfr_id"] = vnfr_id
7262 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7263 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7264 self
.logger
.info(logging_text
+ "Enter")
7265 stage
= ["Preparing the environment", ""]
7266 # database nsrs record
7270 # in case of error, indicates what part of scale was failed to put nsr at error status
7271 start_deploy
= time()
7273 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7274 vim_account_id
= db_vnfr
.get("vim-account-id")
7275 vim_info_key
= "vim:" + vim_account_id
7276 vdur
= find_in_list(
7277 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7280 vdu_vim_name
= vdur
["name"]
7281 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7282 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7283 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7284 # wait for any previous tasks in process
7285 stage
[1] = "Waiting for previous operations to terminate"
7286 self
.logger
.info(stage
[1])
7287 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7289 stage
[1] = "Reading from database."
7290 self
.logger
.info(stage
[1])
7291 self
._write
_ns
_status
(
7294 current_operation
=operation_type
.upper(),
7295 current_operation_id
=nslcmop_id
7297 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7300 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7301 db_nsr_update
["operational-status"] = operation_type
7302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7306 "vim_vm_id": vim_vm_id
,
7308 "vdu_index": additional_param
["count-index"],
7309 "vdu_id": vdur
["id"],
7310 "target_vim": target_vim
,
7311 "vim_account_id": vim_account_id
7314 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7315 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7316 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7317 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7318 self
.logger
.info("response from RO: {}".format(result_dict
))
7319 action_id
= result_dict
["action_id"]
7320 await self
._wait
_ng
_ro
(
7321 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7323 return "COMPLETED", "Done"
7324 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7325 self
.logger
.error("Exit Exception {}".format(e
))
7327 except asyncio
.CancelledError
:
7328 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7329 exc
= "Operation was cancelled"
7330 except Exception as e
:
7331 exc
= traceback
.format_exc()
7332 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7333 return "FAILED", "Error in operate VNF {}".format(exc
)
7335 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7337 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7339 :param: vim_account_id: VIM Account ID
7341 :return: (cloud_name, cloud_credential)
7343 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7344 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7346 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7348 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7350 :param: vim_account_id: VIM Account ID
7352 :return: (cloud_name, cloud_credential)
7354 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7355 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7357 async def migrate(self
, nsr_id
, nslcmop_id
):
7359 Migrate VNFs and VDUs instances in a NS
7361 :param: nsr_id: NS Instance ID
7362 :param: nslcmop_id: nslcmop ID of migrate
7365 # Try to lock HA task here
7366 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7367 if not task_is_locked_by_me
:
7369 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7370 self
.logger
.debug(logging_text
+ "Enter")
7371 # get all needed from database
7373 db_nslcmop_update
= {}
7374 nslcmop_operation_state
= None
7378 # in case of error, indicates what part of scale was failed to put nsr at error status
7379 start_deploy
= time()
7382 # wait for any previous tasks in process
7383 step
= "Waiting for previous operations to terminate"
7384 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7386 self
._write
_ns
_status
(
7389 current_operation
="MIGRATING",
7390 current_operation_id
=nslcmop_id
,
7392 step
= "Getting nslcmop from database"
7394 step
+ " after having waited for previous tasks to be completed"
7396 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7397 migrate_params
= db_nslcmop
.get("operationParams")
7400 target
.update(migrate_params
)
7401 desc
= await self
.RO
.migrate(nsr_id
, target
)
7402 self
.logger
.debug("RO return > {}".format(desc
))
7403 action_id
= desc
["action_id"]
7404 await self
._wait
_ng
_ro
(
7405 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7408 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7409 self
.logger
.error("Exit Exception {}".format(e
))
7411 except asyncio
.CancelledError
:
7412 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7413 exc
= "Operation was cancelled"
7414 except Exception as e
:
7415 exc
= traceback
.format_exc()
7416 self
.logger
.critical(
7417 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7420 self
._write
_ns
_status
(
7423 current_operation
="IDLE",
7424 current_operation_id
=None,
7427 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7428 nslcmop_operation_state
= "FAILED"
7430 nslcmop_operation_state
= "COMPLETED"
7431 db_nslcmop_update
["detailed-status"] = "Done"
7432 db_nsr_update
["detailed-status"] = "Done"
7434 self
._write
_op
_status
(
7438 operation_state
=nslcmop_operation_state
,
7439 other_update
=db_nslcmop_update
,
7441 if nslcmop_operation_state
:
7445 "nslcmop_id": nslcmop_id
,
7446 "operationState": nslcmop_operation_state
,
7448 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7449 except Exception as e
:
7451 logging_text
+ "kafka_write notification Exception {}".format(e
)
7453 self
.logger
.debug(logging_text
+ "Exit")
7454 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7457 async def heal(self
, nsr_id
, nslcmop_id
):
7461 :param nsr_id: ns instance to heal
7462 :param nslcmop_id: operation to run
7466 # Try to lock HA task here
7467 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7468 if not task_is_locked_by_me
:
7471 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7472 stage
= ["", "", ""]
7473 tasks_dict_info
= {}
7474 # ^ stage, step, VIM progress
7475 self
.logger
.debug(logging_text
+ "Enter")
7476 # get all needed from database
7478 db_nslcmop_update
= {}
7480 db_vnfrs
= {} # vnf's info indexed by _id
7482 old_operational_status
= ""
7483 old_config_status
= ""
7486 # wait for any previous tasks in process
7487 step
= "Waiting for previous operations to terminate"
7488 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7489 self
._write
_ns
_status
(
7492 current_operation
="HEALING",
7493 current_operation_id
=nslcmop_id
,
7496 step
= "Getting nslcmop from database"
7498 step
+ " after having waited for previous tasks to be completed"
7500 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7502 step
= "Getting nsr from database"
7503 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7504 old_operational_status
= db_nsr
["operational-status"]
7505 old_config_status
= db_nsr
["config-status"]
7508 "_admin.deployed.RO.operational-status": "healing",
7510 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7512 step
= "Sending heal order to VIM"
7513 task_ro
= asyncio
.ensure_future(
7515 logging_text
=logging_text
,
7517 db_nslcmop
=db_nslcmop
,
7521 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7522 tasks_dict_info
[task_ro
] = "Healing at VIM"
7526 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7527 self
.logger
.debug(logging_text
+ stage
[1])
7528 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7529 self
.fs
.sync(db_nsr
["nsd-id"])
7531 # read from db: vnfr's of this ns
7532 step
= "Getting vnfrs from db"
7533 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7534 for vnfr
in db_vnfrs_list
:
7535 db_vnfrs
[vnfr
["_id"]] = vnfr
7536 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7538 # Check for each target VNF
7539 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7540 for target_vnf
in target_list
:
7541 # Find this VNF in the list from DB
7542 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7544 db_vnfr
= db_vnfrs
[vnfr_id
]
7545 vnfd_id
= db_vnfr
.get("vnfd-id")
7546 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7547 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7548 base_folder
= vnfd
["_admin"]["storage"]
7553 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7554 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7556 # Check each target VDU and deploy N2VC
7557 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7558 deploy_params_vdu
= target_vdu
7559 # Set run-day1 vnf level value if not vdu level value exists
7560 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7561 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7562 vdu_name
= target_vdu
.get("vdu-id", None)
7563 # TODO: Get vdu_id from vdud.
7565 # For multi instance VDU count-index is mandatory
7566 # For single session VDU count-indes is 0
7567 vdu_index
= target_vdu
.get("count-index",0)
7569 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7570 stage
[1] = "Deploying Execution Environments."
7571 self
.logger
.debug(logging_text
+ stage
[1])
7573 # VNF Level charm. Normal case when proxy charms.
7574 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7575 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7576 if descriptor_config
:
7577 # Continue if healed machine is management machine
7578 vnf_ip_address
= db_vnfr
.get("ip-address")
7579 target_instance
= None
7580 for instance
in db_vnfr
.get("vdur", None):
7581 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7582 target_instance
= instance
7584 if vnf_ip_address
== target_instance
.get("ip-address"):
7586 logging_text
=logging_text
7587 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7588 member_vnf_index
, vdu_name
, vdu_index
7592 nslcmop_id
=nslcmop_id
,
7598 member_vnf_index
=member_vnf_index
,
7601 deploy_params
=deploy_params_vdu
,
7602 descriptor_config
=descriptor_config
,
7603 base_folder
=base_folder
,
7604 task_instantiation_info
=tasks_dict_info
,
7608 # VDU Level charm. Normal case with native charms.
7609 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7610 if descriptor_config
:
7612 logging_text
=logging_text
7613 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7614 member_vnf_index
, vdu_name
, vdu_index
7618 nslcmop_id
=nslcmop_id
,
7624 member_vnf_index
=member_vnf_index
,
7625 vdu_index
=vdu_index
,
7627 deploy_params
=deploy_params_vdu
,
7628 descriptor_config
=descriptor_config
,
7629 base_folder
=base_folder
,
7630 task_instantiation_info
=tasks_dict_info
,
7635 ROclient
.ROClientException
,
7640 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7642 except asyncio
.CancelledError
:
7644 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7646 exc
= "Operation was cancelled"
7647 except Exception as e
:
7648 exc
= traceback
.format_exc()
7649 self
.logger
.critical(
7650 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7655 stage
[1] = "Waiting for healing pending tasks."
7656 self
.logger
.debug(logging_text
+ stage
[1])
7657 exc
= await self
._wait
_for
_tasks
(
7660 self
.timeout_ns_deploy
,
7668 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7669 nslcmop_operation_state
= "FAILED"
7671 db_nsr_update
["operational-status"] = old_operational_status
7672 db_nsr_update
["config-status"] = old_config_status
7675 ] = "FAILED healing nslcmop={} {}: {}".format(
7676 nslcmop_id
, step
, exc
7678 for task
, task_name
in tasks_dict_info
.items():
7679 if not task
.done() or task
.cancelled() or task
.exception():
7680 if task_name
.startswith(self
.task_name_deploy_vca
):
7681 # A N2VC task is pending
7682 db_nsr_update
["config-status"] = "failed"
7684 # RO task is pending
7685 db_nsr_update
["operational-status"] = "failed"
7687 error_description_nslcmop
= None
7688 nslcmop_operation_state
= "COMPLETED"
7689 db_nslcmop_update
["detailed-status"] = "Done"
7690 db_nsr_update
["detailed-status"] = "Done"
7691 db_nsr_update
["operational-status"] = "running"
7692 db_nsr_update
["config-status"] = "configured"
7694 self
._write
_op
_status
(
7697 error_message
=error_description_nslcmop
,
7698 operation_state
=nslcmop_operation_state
,
7699 other_update
=db_nslcmop_update
,
7702 self
._write
_ns
_status
(
7705 current_operation
="IDLE",
7706 current_operation_id
=None,
7707 other_update
=db_nsr_update
,
7710 if nslcmop_operation_state
:
7714 "nslcmop_id": nslcmop_id
,
7715 "operationState": nslcmop_operation_state
,
7717 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7718 except Exception as e
:
7720 logging_text
+ "kafka_write notification Exception {}".format(e
)
7722 self
.logger
.debug(logging_text
+ "Exit")
7723 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7734 :param logging_text: preffix text to use at logging
7735 :param nsr_id: nsr identity
7736 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7737 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7738 :return: None or exception
7740 def get_vim_account(vim_account_id
):
7742 if vim_account_id
in db_vims
:
7743 return db_vims
[vim_account_id
]
7744 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7745 db_vims
[vim_account_id
] = db_vim
7750 ns_params
= db_nslcmop
.get("operationParams")
7751 if ns_params
and ns_params
.get("timeout_ns_heal"):
7752 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7754 timeout_ns_heal
= self
.timeout
.get(
7755 "ns_heal", self
.timeout_ns_heal
7760 nslcmop_id
= db_nslcmop
["_id"]
7762 "action_id": nslcmop_id
,
7764 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7765 target
.update(db_nslcmop
.get("operationParams", {}))
7767 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7768 desc
= await self
.RO
.recreate(nsr_id
, target
)
7769 self
.logger
.debug("RO return > {}".format(desc
))
7770 action_id
= desc
["action_id"]
7771 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7772 await self
._wait
_ng
_ro
(
7773 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7779 "_admin.deployed.RO.operational-status": "running",
7780 "detailed-status": " ".join(stage
),
7782 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7783 self
._write
_op
_status
(nslcmop_id
, stage
)
7785 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7788 except Exception as e
:
7789 stage
[2] = "ERROR healing at VIM"
7790 #self.set_vnfr_at_error(db_vnfrs, str(e))
7792 "Error healing at VIM {}".format(e
),
7793 exc_info
=not isinstance(
7796 ROclient
.ROClientException
,
7822 task_instantiation_info
,
7825 # launch instantiate_N2VC in a asyncio task and register task object
7826 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7827 # if not found, create one entry and update database
7828 # fill db_nsr._admin.deployed.VCA.<index>
7831 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7833 if "execution-environment-list" in descriptor_config
:
7834 ee_list
= descriptor_config
.get("execution-environment-list", [])
7835 elif "juju" in descriptor_config
:
7836 ee_list
= [descriptor_config
] # ns charms
7837 else: # other types as script are not supported
7840 for ee_item
in ee_list
:
7843 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7844 ee_item
.get("juju"), ee_item
.get("helm-chart")
7847 ee_descriptor_id
= ee_item
.get("id")
7848 if ee_item
.get("juju"):
7849 vca_name
= ee_item
["juju"].get("charm")
7852 if ee_item
["juju"].get("charm") is not None
7855 if ee_item
["juju"].get("cloud") == "k8s":
7856 vca_type
= "k8s_proxy_charm"
7857 elif ee_item
["juju"].get("proxy") is False:
7858 vca_type
= "native_charm"
7859 elif ee_item
.get("helm-chart"):
7860 vca_name
= ee_item
["helm-chart"]
7861 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7864 vca_type
= "helm-v3"
7867 logging_text
+ "skipping non juju neither charm configuration"
7872 for vca_index
, vca_deployed
in enumerate(
7873 db_nsr
["_admin"]["deployed"]["VCA"]
7875 if not vca_deployed
:
7878 vca_deployed
.get("member-vnf-index") == member_vnf_index
7879 and vca_deployed
.get("vdu_id") == vdu_id
7880 and vca_deployed
.get("kdu_name") == kdu_name
7881 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7882 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7886 # not found, create one.
7888 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7891 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7893 target
+= "/kdu/{}".format(kdu_name
)
7895 "target_element": target
,
7896 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7897 "member-vnf-index": member_vnf_index
,
7899 "kdu_name": kdu_name
,
7900 "vdu_count_index": vdu_index
,
7901 "operational-status": "init", # TODO revise
7902 "detailed-status": "", # TODO revise
7903 "step": "initial-deploy", # TODO revise
7905 "vdu_name": vdu_name
,
7907 "ee_descriptor_id": ee_descriptor_id
,
7911 # create VCA and configurationStatus in db
7913 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7914 "configurationStatus.{}".format(vca_index
): dict(),
7916 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7918 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7920 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7921 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7922 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7925 task_n2vc
= asyncio
.ensure_future(
7927 logging_text
=logging_text
,
7928 vca_index
=vca_index
,
7934 vdu_index
=vdu_index
,
7935 deploy_params
=deploy_params
,
7936 config_descriptor
=descriptor_config
,
7937 base_folder
=base_folder
,
7938 nslcmop_id
=nslcmop_id
,
7942 ee_config_descriptor
=ee_item
,
7945 self
.lcm_tasks
.register(
7949 "instantiate_N2VC-{}".format(vca_index
),
7952 task_instantiation_info
[
7954 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7955 member_vnf_index
or "", vdu_id
or ""
7958 async def heal_N2VC(
7975 ee_config_descriptor
,
7977 nsr_id
= db_nsr
["_id"]
7978 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7979 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7980 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7981 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7983 "collection": "nsrs",
7984 "filter": {"_id": nsr_id
},
7985 "path": db_update_entry
,
7991 element_under_configuration
= nsr_id
7995 vnfr_id
= db_vnfr
["_id"]
7996 osm_config
["osm"]["vnf_id"] = vnfr_id
7998 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8000 if vca_type
== "native_charm":
8003 index_number
= vdu_index
or 0
8006 element_type
= "VNF"
8007 element_under_configuration
= vnfr_id
8008 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8010 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8011 element_type
= "VDU"
8012 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8013 osm_config
["osm"]["vdu_id"] = vdu_id
8015 namespace
+= ".{}".format(kdu_name
)
8016 element_type
= "KDU"
8017 element_under_configuration
= kdu_name
8018 osm_config
["osm"]["kdu_name"] = kdu_name
8021 if base_folder
["pkg-dir"]:
8022 artifact_path
= "{}/{}/{}/{}".format(
8023 base_folder
["folder"],
8024 base_folder
["pkg-dir"],
8027 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8032 artifact_path
= "{}/Scripts/{}/{}/".format(
8033 base_folder
["folder"],
8036 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8041 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8043 # get initial_config_primitive_list that applies to this element
8044 initial_config_primitive_list
= config_descriptor
.get(
8045 "initial-config-primitive"
8049 "Initial config primitive list > {}".format(
8050 initial_config_primitive_list
8054 # add config if not present for NS charm
8055 ee_descriptor_id
= ee_config_descriptor
.get("id")
8056 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8057 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8058 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8062 "Initial config primitive list #2 > {}".format(
8063 initial_config_primitive_list
8066 # n2vc_redesign STEP 3.1
8067 # find old ee_id if exists
8068 ee_id
= vca_deployed
.get("ee_id")
8070 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8071 # create or register execution environment in VCA. Only for native charms when healing
8072 if vca_type
== "native_charm":
8073 step
= "Waiting to VM being up and getting IP address"
8074 self
.logger
.debug(logging_text
+ step
)
8075 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8084 credentials
= {"hostname": rw_mgmt_ip
}
8086 username
= deep_get(
8087 config_descriptor
, ("config-access", "ssh-access", "default-user")
8089 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8090 # merged. Meanwhile let's get username from initial-config-primitive
8091 if not username
and initial_config_primitive_list
:
8092 for config_primitive
in initial_config_primitive_list
:
8093 for param
in config_primitive
.get("parameter", ()):
8094 if param
["name"] == "ssh-username":
8095 username
= param
["value"]
8099 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8100 "'config-access.ssh-access.default-user'"
8102 credentials
["username"] = username
8104 # n2vc_redesign STEP 3.2
8105 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8106 self
._write
_configuration
_status
(
8108 vca_index
=vca_index
,
8109 status
="REGISTERING",
8110 element_under_configuration
=element_under_configuration
,
8111 element_type
=element_type
,
8114 step
= "register execution environment {}".format(credentials
)
8115 self
.logger
.debug(logging_text
+ step
)
8116 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8117 credentials
=credentials
,
8118 namespace
=namespace
,
8123 # update ee_id en db
8125 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8127 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8129 # for compatibility with MON/POL modules, the need model and application name at database
8130 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8131 # Not sure if this need to be done when healing
8133 ee_id_parts = ee_id.split(".")
8134 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8135 if len(ee_id_parts) >= 2:
8136 model_name = ee_id_parts[0]
8137 application_name = ee_id_parts[1]
8138 db_nsr_update[db_update_entry + "model"] = model_name
8139 db_nsr_update[db_update_entry + "application"] = application_name
8142 # n2vc_redesign STEP 3.3
8143 # Install configuration software. Only for native charms.
8144 step
= "Install configuration Software"
8146 self
._write
_configuration
_status
(
8148 vca_index
=vca_index
,
8149 status
="INSTALLING SW",
8150 element_under_configuration
=element_under_configuration
,
8151 element_type
=element_type
,
8152 #other_update=db_nsr_update,
8156 # TODO check if already done
8157 self
.logger
.debug(logging_text
+ step
)
8159 if vca_type
== "native_charm":
8160 config_primitive
= next(
8161 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8164 if config_primitive
:
8165 config
= self
._map
_primitive
_params
(
8166 config_primitive
, {}, deploy_params
8168 await self
.vca_map
[vca_type
].install_configuration_sw(
8170 artifact_path
=artifact_path
,
8178 # write in db flag of configuration_sw already installed
8180 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8183 # Not sure if this need to be done when healing
8185 # add relations for this VCA (wait for other peers related with this VCA)
8186 await self._add_vca_relations(
8187 logging_text=logging_text,
8190 vca_index=vca_index,
8194 # if SSH access is required, then get execution environment SSH public
8195 # if native charm we have waited already to VM be UP
8196 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8199 # self.logger.debug("get ssh key block")
8201 config_descriptor
, ("config-access", "ssh-access", "required")
8203 # self.logger.debug("ssh key needed")
8204 # Needed to inject a ssh key
8207 ("config-access", "ssh-access", "default-user"),
8209 step
= "Install configuration Software, getting public ssh key"
8210 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8211 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8214 step
= "Insert public key into VM user={} ssh_key={}".format(
8218 # self.logger.debug("no need to get ssh key")
8219 step
= "Waiting to VM being up and getting IP address"
8220 self
.logger
.debug(logging_text
+ step
)
8222 # n2vc_redesign STEP 5.1
8223 # wait for RO (ip-address) Insert pub_key into VM
8224 # IMPORTANT: We need do wait for RO to complete healing operation.
8225 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8228 rw_mgmt_ip
= await self
.wait_kdu_up(
8229 logging_text
, nsr_id
, vnfr_id
, kdu_name
8232 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8242 rw_mgmt_ip
= None # This is for a NS configuration
8244 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8246 # store rw_mgmt_ip in deploy params for later replacement
8247 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8250 # get run-day1 operation parameter
8251 runDay1
= deploy_params
.get("run-day1",False)
8252 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8254 # n2vc_redesign STEP 6 Execute initial config primitive
8255 step
= "execute initial config primitive"
8257 # wait for dependent primitives execution (NS -> VNF -> VDU)
8258 if initial_config_primitive_list
:
8259 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8261 # stage, in function of element type: vdu, kdu, vnf or ns
8262 my_vca
= vca_deployed_list
[vca_index
]
8263 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8265 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8266 elif my_vca
.get("member-vnf-index"):
8268 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8271 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8273 self
._write
_configuration
_status
(
8274 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8277 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8279 check_if_terminated_needed
= True
8280 for initial_config_primitive
in initial_config_primitive_list
:
8281 # adding information on the vca_deployed if it is a NS execution environment
8282 if not vca_deployed
["member-vnf-index"]:
8283 deploy_params
["ns_config_info"] = json
.dumps(
8284 self
._get
_ns
_config
_info
(nsr_id
)
8286 # TODO check if already done
8287 primitive_params_
= self
._map
_primitive
_params
(
8288 initial_config_primitive
, {}, deploy_params
8291 step
= "execute primitive '{}' params '{}'".format(
8292 initial_config_primitive
["name"], primitive_params_
8294 self
.logger
.debug(logging_text
+ step
)
8295 await self
.vca_map
[vca_type
].exec_primitive(
8297 primitive_name
=initial_config_primitive
["name"],
8298 params_dict
=primitive_params_
,
8303 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8304 if check_if_terminated_needed
:
8305 if config_descriptor
.get("terminate-config-primitive"):
8307 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8309 check_if_terminated_needed
= False
8311 # TODO register in database that primitive is done
8313 # STEP 7 Configure metrics
8314 # Not sure if this need to be done when healing
8316 if vca_type == "helm" or vca_type == "helm-v3":
8317 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8319 artifact_path=artifact_path,
8320 ee_config_descriptor=ee_config_descriptor,
8323 target_ip=rw_mgmt_ip,
8329 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8332 for job in prometheus_jobs:
8335 {"job_name": job["job_name"]},
8338 fail_on_empty=False,
8342 step
= "instantiated at VCA"
8343 self
.logger
.debug(logging_text
+ step
)
8345 self
._write
_configuration
_status
(
8346 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8349 except Exception as e
: # TODO not use Exception but N2VC exception
8350 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8352 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8355 "Exception while {} : {}".format(step
, e
), exc_info
=True
8357 self
._write
_configuration
_status
(
8358 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8360 raise LcmException("{} {}".format(step
, e
)) from e
8362 async def _wait_heal_ro(
8368 while time() <= start_time
+ timeout
:
8369 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8370 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8371 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8372 if operational_status_ro
!= "healing":
8374 await asyncio
.sleep(15, loop
=self
.loop
)
8375 else: # timeout_ns_deploy
8376 raise NgRoException("Timeout waiting ns to deploy")