1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_scale_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_scale_on_error_outer_factor
= 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
128 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
129 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
130 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
131 timeout_charm_delete
= 10 * 60
132 timeout_primitive
= 30 * 60 # Timeout for primitive execution
133 timeout_primitive_outer_factor
= 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
134 timeout_ns_update
= 30 * 60 # timeout for ns update
135 timeout_progress_primitive
= (
137 ) # timeout for some progress in a primitive execution
138 timeout_migrate
= 1800 # default global timeout for migrating vnfs
139 timeout_operate
= 1800 # default global timeout for migrating vnfs
140 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
141 SUBOPERATION_STATUS_NOT_FOUND
= -1
142 SUBOPERATION_STATUS_NEW
= -2
143 SUBOPERATION_STATUS_SKIP
= -3
144 task_name_deploy_vca
= "Deploying VCA"
146 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
148 Init, Connect to database, filesystem storage, and messaging
149 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
152 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
154 self
.db
= Database().instance
.db
155 self
.fs
= Filesystem().instance
.fs
157 self
.lcm_tasks
= lcm_tasks
158 self
.timeout
= config
["timeout"]
159 self
.ro_config
= config
["ro_config"]
160 self
.ng_ro
= config
["ro_config"].get("ng")
161 self
.vca_config
= config
["VCA"].copy()
163 # create N2VC connector
164 self
.n2vc
= N2VCJujuConnector(
167 on_update_db
=self
._on
_update
_n
2vc
_db
,
172 self
.conn_helm_ee
= LCMHelmConn(
175 vca_config
=self
.vca_config
,
176 on_update_db
=self
._on
_update
_n
2vc
_db
,
179 self
.k8sclusterhelm2
= K8sHelmConnector(
180 kubectl_command
=self
.vca_config
.get("kubectlpath"),
181 helm_command
=self
.vca_config
.get("helmpath"),
188 self
.k8sclusterhelm3
= K8sHelm3Connector(
189 kubectl_command
=self
.vca_config
.get("kubectlpath"),
190 helm_command
=self
.vca_config
.get("helm3path"),
197 self
.k8sclusterjuju
= K8sJujuConnector(
198 kubectl_command
=self
.vca_config
.get("kubectlpath"),
199 juju_command
=self
.vca_config
.get("jujupath"),
202 on_update_db
=self
._on
_update
_k
8s
_db
,
207 self
.k8scluster_map
= {
208 "helm-chart": self
.k8sclusterhelm2
,
209 "helm-chart-v3": self
.k8sclusterhelm3
,
210 "chart": self
.k8sclusterhelm3
,
211 "juju-bundle": self
.k8sclusterjuju
,
212 "juju": self
.k8sclusterjuju
,
216 "lxc_proxy_charm": self
.n2vc
,
217 "native_charm": self
.n2vc
,
218 "k8s_proxy_charm": self
.n2vc
,
219 "helm": self
.conn_helm_ee
,
220 "helm-v3": self
.conn_helm_ee
,
224 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
226 self
.op_status_map
= {
227 "instantiation": self
.RO
.status
,
228 "termination": self
.RO
.status
,
229 "migrate": self
.RO
.status
,
230 "healing": self
.RO
.recreate_status
,
231 "verticalscale": self
.RO
.status
,
232 "start_stop_rebuild": self
.RO
.status
,
236 def increment_ip_mac(ip_mac
, vm_index
=1):
237 if not isinstance(ip_mac
, str):
240 # try with ipv4 look for last dot
241 i
= ip_mac
.rfind(".")
244 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
245 # try with ipv6 or mac look for last colon. Operate in hex
246 i
= ip_mac
.rfind(":")
249 # format in hex, len can be 2 for mac or 4 for ipv6
250 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
251 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
257 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
259 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
262 # TODO filter RO descriptor fields...
266 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
267 db_dict
["deploymentStatus"] = ro_descriptor
268 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
270 except Exception as e
:
272 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
275 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
277 # remove last dot from path (if exists)
278 if path
.endswith("."):
281 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
282 # .format(table, filter, path, updated_data))
285 nsr_id
= filter.get("_id")
287 # read ns record from database
288 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
289 current_ns_status
= nsr
.get("nsState")
291 # get vca status for NS
292 status_dict
= await self
.n2vc
.get_status(
293 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
298 db_dict
["vcaStatus"] = status_dict
300 # update configurationStatus for this VCA
302 vca_index
= int(path
[path
.rfind(".") + 1 :])
305 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
307 vca_status
= vca_list
[vca_index
].get("status")
309 configuration_status_list
= nsr
.get("configurationStatus")
310 config_status
= configuration_status_list
[vca_index
].get("status")
312 if config_status
== "BROKEN" and vca_status
!= "failed":
313 db_dict
["configurationStatus"][vca_index
] = "READY"
314 elif config_status
!= "BROKEN" and vca_status
== "failed":
315 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
316 except Exception as e
:
317 # not update configurationStatus
318 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
320 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
321 # if nsState = 'DEGRADED' check if all is OK
323 if current_ns_status
in ("READY", "DEGRADED"):
324 error_description
= ""
326 if status_dict
.get("machines"):
327 for machine_id
in status_dict
.get("machines"):
328 machine
= status_dict
.get("machines").get(machine_id
)
329 # check machine agent-status
330 if machine
.get("agent-status"):
331 s
= machine
.get("agent-status").get("status")
334 error_description
+= (
335 "machine {} agent-status={} ; ".format(
339 # check machine instance status
340 if machine
.get("instance-status"):
341 s
= machine
.get("instance-status").get("status")
344 error_description
+= (
345 "machine {} instance-status={} ; ".format(
350 if status_dict
.get("applications"):
351 for app_id
in status_dict
.get("applications"):
352 app
= status_dict
.get("applications").get(app_id
)
353 # check application status
354 if app
.get("status"):
355 s
= app
.get("status").get("status")
358 error_description
+= (
359 "application {} status={} ; ".format(app_id
, s
)
362 if error_description
:
363 db_dict
["errorDescription"] = error_description
364 if current_ns_status
== "READY" and is_degraded
:
365 db_dict
["nsState"] = "DEGRADED"
366 if current_ns_status
== "DEGRADED" and not is_degraded
:
367 db_dict
["nsState"] = "READY"
370 self
.update_db_2("nsrs", nsr_id
, db_dict
)
372 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
374 except Exception as e
:
375 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
377 async def _on_update_k8s_db(
378 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
381 Updating vca status in NSR record
382 :param cluster_uuid: UUID of a k8s cluster
383 :param kdu_instance: The unique name of the KDU instance
384 :param filter: To get nsr_id
385 :cluster_type: The cluster type (juju, k8s)
389 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
390 # .format(cluster_uuid, kdu_instance, filter))
392 nsr_id
= filter.get("_id")
394 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
395 cluster_uuid
=cluster_uuid
,
396 kdu_instance
=kdu_instance
,
398 complete_status
=True,
404 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
407 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
411 self
.update_db_2("nsrs", nsr_id
, db_dict
)
412 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
414 except Exception as e
:
415 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
418 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
421 undefined
=StrictUndefined
,
422 autoescape
=select_autoescape(default_for_string
=True, default
=True),
424 template
= env
.from_string(cloud_init_text
)
425 return template
.render(additional_params
or {})
426 except UndefinedError
as e
:
428 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
429 "file, must be provided in the instantiation parameters inside the "
430 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
432 except (TemplateError
, TemplateNotFound
) as e
:
434 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
439 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
440 cloud_init_content
= cloud_init_file
= None
442 if vdu
.get("cloud-init-file"):
443 base_folder
= vnfd
["_admin"]["storage"]
444 if base_folder
["pkg-dir"]:
445 cloud_init_file
= "{}/{}/cloud_init/{}".format(
446 base_folder
["folder"],
447 base_folder
["pkg-dir"],
448 vdu
["cloud-init-file"],
451 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
452 base_folder
["folder"],
453 vdu
["cloud-init-file"],
455 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
456 cloud_init_content
= ci_file
.read()
457 elif vdu
.get("cloud-init"):
458 cloud_init_content
= vdu
["cloud-init"]
460 return cloud_init_content
461 except FsException
as e
:
463 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
464 vnfd
["id"], vdu
["id"], cloud_init_file
, e
468 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
470 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
472 additional_params
= vdur
.get("additionalParams")
473 return parse_yaml_strings(additional_params
)
475 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
477 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
478 :param vnfd: input vnfd
479 :param new_id: overrides vnf id if provided
480 :param additionalParams: Instantiation params for VNFs provided
481 :param nsrId: Id of the NSR
482 :return: copy of vnfd
484 vnfd_RO
= deepcopy(vnfd
)
485 # remove unused by RO configuration, monitoring, scaling and internal keys
486 vnfd_RO
.pop("_id", None)
487 vnfd_RO
.pop("_admin", None)
488 vnfd_RO
.pop("monitoring-param", None)
489 vnfd_RO
.pop("scaling-group-descriptor", None)
490 vnfd_RO
.pop("kdu", None)
491 vnfd_RO
.pop("k8s-cluster", None)
493 vnfd_RO
["id"] = new_id
495 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
496 for vdu
in get_iterable(vnfd_RO
, "vdu"):
497 vdu
.pop("cloud-init-file", None)
498 vdu
.pop("cloud-init", None)
502 def ip_profile_2_RO(ip_profile
):
503 RO_ip_profile
= deepcopy(ip_profile
)
504 if "dns-server" in RO_ip_profile
:
505 if isinstance(RO_ip_profile
["dns-server"], list):
506 RO_ip_profile
["dns-address"] = []
507 for ds
in RO_ip_profile
.pop("dns-server"):
508 RO_ip_profile
["dns-address"].append(ds
["address"])
510 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
511 if RO_ip_profile
.get("ip-version") == "ipv4":
512 RO_ip_profile
["ip-version"] = "IPv4"
513 if RO_ip_profile
.get("ip-version") == "ipv6":
514 RO_ip_profile
["ip-version"] = "IPv6"
515 if "dhcp-params" in RO_ip_profile
:
516 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
519 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
520 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
521 if db_vim
["_admin"]["operationalState"] != "ENABLED":
523 "VIM={} is not available. operationalState={}".format(
524 vim_account
, db_vim
["_admin"]["operationalState"]
527 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
530 def get_ro_wim_id_for_wim_account(self
, wim_account
):
531 if isinstance(wim_account
, str):
532 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
533 if db_wim
["_admin"]["operationalState"] != "ENABLED":
535 "WIM={} is not available. operationalState={}".format(
536 wim_account
, db_wim
["_admin"]["operationalState"]
539 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
544 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
546 db_vdu_push_list
= []
548 db_update
= {"_admin.modified": time()}
550 for vdu_id
, vdu_count
in vdu_create
.items():
554 for vdur
in reversed(db_vnfr
["vdur"])
555 if vdur
["vdu-id-ref"] == vdu_id
560 # Read the template saved in the db:
562 "No vdur in the database. Using the vdur-template to scale"
564 vdur_template
= db_vnfr
.get("vdur-template")
565 if not vdur_template
:
567 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
571 vdur
= vdur_template
[0]
572 # Delete a template from the database after using it
575 {"_id": db_vnfr
["_id"]},
577 pull
={"vdur-template": {"_id": vdur
["_id"]}},
579 for count
in range(vdu_count
):
580 vdur_copy
= deepcopy(vdur
)
581 vdur_copy
["status"] = "BUILD"
582 vdur_copy
["status-detailed"] = None
583 vdur_copy
["ip-address"] = None
584 vdur_copy
["_id"] = str(uuid4())
585 vdur_copy
["count-index"] += count
+ 1
586 vdur_copy
["id"] = "{}-{}".format(
587 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
589 vdur_copy
.pop("vim_info", None)
590 for iface
in vdur_copy
["interfaces"]:
591 if iface
.get("fixed-ip"):
592 iface
["ip-address"] = self
.increment_ip_mac(
593 iface
["ip-address"], count
+ 1
596 iface
.pop("ip-address", None)
597 if iface
.get("fixed-mac"):
598 iface
["mac-address"] = self
.increment_ip_mac(
599 iface
["mac-address"], count
+ 1
602 iface
.pop("mac-address", None)
606 ) # only first vdu can be managment of vnf
607 db_vdu_push_list
.append(vdur_copy
)
608 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
610 if len(db_vnfr
["vdur"]) == 1:
611 # The scale will move to 0 instances
613 "Scaling to 0 !, creating the template with the last vdur"
615 template_vdur
= [db_vnfr
["vdur"][0]]
616 for vdu_id
, vdu_count
in vdu_delete
.items():
618 indexes_to_delete
= [
620 for iv
in enumerate(db_vnfr
["vdur"])
621 if iv
[1]["vdu-id-ref"] == vdu_id
625 "vdur.{}.status".format(i
): "DELETING"
626 for i
in indexes_to_delete
[-vdu_count
:]
630 # it must be deleted one by one because common.db does not allow otherwise
633 for v
in reversed(db_vnfr
["vdur"])
634 if v
["vdu-id-ref"] == vdu_id
636 for vdu
in vdus_to_delete
[:vdu_count
]:
639 {"_id": db_vnfr
["_id"]},
641 pull
={"vdur": {"_id": vdu
["_id"]}},
645 db_push
["vdur"] = db_vdu_push_list
647 db_push
["vdur-template"] = template_vdur
650 db_vnfr
["vdur-template"] = template_vdur
651 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
652 # modify passed dictionary db_vnfr
653 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
654 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
656 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
658 Updates database nsr with the RO info for the created vld
659 :param ns_update_nsr: dictionary to be filled with the updated info
660 :param db_nsr: content of db_nsr. This is also modified
661 :param nsr_desc_RO: nsr descriptor from RO
662 :return: Nothing, LcmException is raised on errors
665 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
666 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
667 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
669 vld
["vim-id"] = net_RO
.get("vim_net_id")
670 vld
["name"] = net_RO
.get("vim_name")
671 vld
["status"] = net_RO
.get("status")
672 vld
["status-detailed"] = net_RO
.get("error_msg")
673 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
677 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
680 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
682 for db_vnfr
in db_vnfrs
.values():
683 vnfr_update
= {"status": "ERROR"}
684 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
685 if "status" not in vdur
:
686 vdur
["status"] = "ERROR"
687 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
689 vdur
["status-detailed"] = str(error_text
)
691 "vdur.{}.status-detailed".format(vdu_index
)
693 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
694 except DbException
as e
:
695 self
.logger
.error("Cannot update vnf. {}".format(e
))
697 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
699 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
700 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
701 :param nsr_desc_RO: nsr descriptor from RO
702 :return: Nothing, LcmException is raised on errors
704 for vnf_index
, db_vnfr
in db_vnfrs
.items():
705 for vnf_RO
in nsr_desc_RO
["vnfs"]:
706 if vnf_RO
["member_vnf_index"] != vnf_index
:
709 if vnf_RO
.get("ip_address"):
710 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
713 elif not db_vnfr
.get("ip-address"):
714 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
715 raise LcmExceptionNoMgmtIP(
716 "ns member_vnf_index '{}' has no IP address".format(
721 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
722 vdur_RO_count_index
= 0
723 if vdur
.get("pdu-type"):
725 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
726 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
728 if vdur
["count-index"] != vdur_RO_count_index
:
729 vdur_RO_count_index
+= 1
731 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
732 if vdur_RO
.get("ip_address"):
733 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
735 vdur
["ip-address"] = None
736 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
737 vdur
["name"] = vdur_RO
.get("vim_name")
738 vdur
["status"] = vdur_RO
.get("status")
739 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
740 for ifacer
in get_iterable(vdur
, "interfaces"):
741 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
742 if ifacer
["name"] == interface_RO
.get("internal_name"):
743 ifacer
["ip-address"] = interface_RO
.get(
746 ifacer
["mac-address"] = interface_RO
.get(
752 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
753 "from VIM info".format(
754 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
757 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
761 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
763 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
767 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
768 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
769 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
771 vld
["vim-id"] = net_RO
.get("vim_net_id")
772 vld
["name"] = net_RO
.get("vim_name")
773 vld
["status"] = net_RO
.get("status")
774 vld
["status-detailed"] = net_RO
.get("error_msg")
775 vnfr_update
["vld.{}".format(vld_index
)] = vld
779 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
784 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
789 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
794 def _get_ns_config_info(self
, nsr_id
):
796 Generates a mapping between vnf,vdu elements and the N2VC id
797 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
798 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
799 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
800 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
802 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
803 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
805 ns_config_info
= {"osm-config-mapping": mapping
}
806 for vca
in vca_deployed_list
:
807 if not vca
["member-vnf-index"]:
809 if not vca
["vdu_id"]:
810 mapping
[vca
["member-vnf-index"]] = vca
["application"]
814 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
816 ] = vca
["application"]
817 return ns_config_info
819 async def _instantiate_ng_ro(
836 def get_vim_account(vim_account_id
):
838 if vim_account_id
in db_vims
:
839 return db_vims
[vim_account_id
]
840 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
841 db_vims
[vim_account_id
] = db_vim
844 # modify target_vld info with instantiation parameters
845 def parse_vld_instantiation_params(
846 target_vim
, target_vld
, vld_params
, target_sdn
848 if vld_params
.get("ip-profile"):
849 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
852 if vld_params
.get("provider-network"):
853 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
856 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
857 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
860 if vld_params
.get("wimAccountId"):
861 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
862 target_vld
["vim_info"][target_wim
] = {}
863 for param
in ("vim-network-name", "vim-network-id"):
864 if vld_params
.get(param
):
865 if isinstance(vld_params
[param
], dict):
866 for vim
, vim_net
in vld_params
[param
].items():
867 other_target_vim
= "vim:" + vim
869 target_vld
["vim_info"],
870 (other_target_vim
, param
.replace("-", "_")),
873 else: # isinstance str
874 target_vld
["vim_info"][target_vim
][
875 param
.replace("-", "_")
876 ] = vld_params
[param
]
877 if vld_params
.get("common_id"):
878 target_vld
["common_id"] = vld_params
.get("common_id")
880 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
881 def update_ns_vld_target(target
, ns_params
):
882 for vnf_params
in ns_params
.get("vnf", ()):
883 if vnf_params
.get("vimAccountId"):
887 for vnfr
in db_vnfrs
.values()
888 if vnf_params
["member-vnf-index"]
889 == vnfr
["member-vnf-index-ref"]
893 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
896 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
897 target_vld
= find_in_list(
898 get_iterable(vdur
, "interfaces"),
899 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
902 vld_params
= find_in_list(
903 get_iterable(ns_params
, "vld"),
904 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
908 if vnf_params
.get("vimAccountId") not in a_vld
.get(
911 target_vim_network_list
= [
912 v
for _
, v
in a_vld
.get("vim_info").items()
914 target_vim_network_name
= next(
916 item
.get("vim_network_name", "")
917 for item
in target_vim_network_list
922 target
["ns"]["vld"][a_index
].get("vim_info").update(
924 "vim:{}".format(vnf_params
["vimAccountId"]): {
925 "vim_network_name": target_vim_network_name
,
931 for param
in ("vim-network-name", "vim-network-id"):
932 if vld_params
.get(param
) and isinstance(
933 vld_params
[param
], dict
935 for vim
, vim_net
in vld_params
[
938 other_target_vim
= "vim:" + vim
940 target
["ns"]["vld"][a_index
].get(
945 param
.replace("-", "_"),
950 nslcmop_id
= db_nslcmop
["_id"]
952 "name": db_nsr
["name"],
955 "image": deepcopy(db_nsr
["image"]),
956 "flavor": deepcopy(db_nsr
["flavor"]),
957 "action_id": nslcmop_id
,
958 "cloud_init_content": {},
960 for image
in target
["image"]:
961 image
["vim_info"] = {}
962 for flavor
in target
["flavor"]:
963 flavor
["vim_info"] = {}
964 if db_nsr
.get("affinity-or-anti-affinity-group"):
965 target
["affinity-or-anti-affinity-group"] = deepcopy(
966 db_nsr
["affinity-or-anti-affinity-group"]
968 for affinity_or_anti_affinity_group
in target
[
969 "affinity-or-anti-affinity-group"
971 affinity_or_anti_affinity_group
["vim_info"] = {}
973 if db_nslcmop
.get("lcmOperationType") != "instantiate":
974 # get parameters of instantiation:
975 db_nslcmop_instantiate
= self
.db
.get_list(
978 "nsInstanceId": db_nslcmop
["nsInstanceId"],
979 "lcmOperationType": "instantiate",
982 ns_params
= db_nslcmop_instantiate
.get("operationParams")
984 ns_params
= db_nslcmop
.get("operationParams")
985 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
986 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
989 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
990 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
994 "mgmt-network": vld
.get("mgmt-network", False),
995 "type": vld
.get("type"),
998 "vim_network_name": vld
.get("vim-network-name"),
999 "vim_account_id": ns_params
["vimAccountId"],
1003 # check if this network needs SDN assist
1004 if vld
.get("pci-interfaces"):
1005 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1006 sdnc_id
= db_vim
["config"].get("sdn-controller")
1008 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1009 target_sdn
= "sdn:{}".format(sdnc_id
)
1010 target_vld
["vim_info"][target_sdn
] = {
1012 "target_vim": target_vim
,
1014 "type": vld
.get("type"),
1017 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1018 for nsd_vnf_profile
in nsd_vnf_profiles
:
1019 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1020 if cp
["virtual-link-profile-id"] == vld
["id"]:
1022 "member_vnf:{}.{}".format(
1023 cp
["constituent-cpd-id"][0][
1024 "constituent-base-element-id"
1026 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1028 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1030 # check at nsd descriptor, if there is an ip-profile
1032 nsd_vlp
= find_in_list(
1033 get_virtual_link_profiles(nsd
),
1034 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1039 and nsd_vlp
.get("virtual-link-protocol-data")
1040 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1042 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1045 ip_profile_dest_data
= {}
1046 if "ip-version" in ip_profile_source_data
:
1047 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1050 if "cidr" in ip_profile_source_data
:
1051 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1054 if "gateway-ip" in ip_profile_source_data
:
1055 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1058 if "dhcp-enabled" in ip_profile_source_data
:
1059 ip_profile_dest_data
["dhcp-params"] = {
1060 "enabled": ip_profile_source_data
["dhcp-enabled"]
1062 vld_params
["ip-profile"] = ip_profile_dest_data
1064 # update vld_params with instantiation params
1065 vld_instantiation_params
= find_in_list(
1066 get_iterable(ns_params
, "vld"),
1067 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1069 if vld_instantiation_params
:
1070 vld_params
.update(vld_instantiation_params
)
1071 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1072 target
["ns"]["vld"].append(target_vld
)
1073 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1074 update_ns_vld_target(target
, ns_params
)
1076 for vnfr
in db_vnfrs
.values():
1077 vnfd
= find_in_list(
1078 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1080 vnf_params
= find_in_list(
1081 get_iterable(ns_params
, "vnf"),
1082 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1084 target_vnf
= deepcopy(vnfr
)
1085 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1086 for vld
in target_vnf
.get("vld", ()):
1087 # check if connected to a ns.vld, to fill target'
1088 vnf_cp
= find_in_list(
1089 vnfd
.get("int-virtual-link-desc", ()),
1090 lambda cpd
: cpd
.get("id") == vld
["id"],
1093 ns_cp
= "member_vnf:{}.{}".format(
1094 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1096 if cp2target
.get(ns_cp
):
1097 vld
["target"] = cp2target
[ns_cp
]
1100 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1102 # check if this network needs SDN assist
1104 if vld
.get("pci-interfaces"):
1105 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1106 sdnc_id
= db_vim
["config"].get("sdn-controller")
1108 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1109 target_sdn
= "sdn:{}".format(sdnc_id
)
1110 vld
["vim_info"][target_sdn
] = {
1112 "target_vim": target_vim
,
1114 "type": vld
.get("type"),
1117 # check at vnfd descriptor, if there is an ip-profile
1119 vnfd_vlp
= find_in_list(
1120 get_virtual_link_profiles(vnfd
),
1121 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1125 and vnfd_vlp
.get("virtual-link-protocol-data")
1126 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1128 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1131 ip_profile_dest_data
= {}
1132 if "ip-version" in ip_profile_source_data
:
1133 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1136 if "cidr" in ip_profile_source_data
:
1137 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1140 if "gateway-ip" in ip_profile_source_data
:
1141 ip_profile_dest_data
[
1143 ] = ip_profile_source_data
["gateway-ip"]
1144 if "dhcp-enabled" in ip_profile_source_data
:
1145 ip_profile_dest_data
["dhcp-params"] = {
1146 "enabled": ip_profile_source_data
["dhcp-enabled"]
1149 vld_params
["ip-profile"] = ip_profile_dest_data
1150 # update vld_params with instantiation params
1152 vld_instantiation_params
= find_in_list(
1153 get_iterable(vnf_params
, "internal-vld"),
1154 lambda i_vld
: i_vld
["name"] == vld
["id"],
1156 if vld_instantiation_params
:
1157 vld_params
.update(vld_instantiation_params
)
1158 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1161 for vdur
in target_vnf
.get("vdur", ()):
1162 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1163 continue # This vdu must not be created
1164 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1166 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1169 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1170 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1173 and vdu_configuration
.get("config-access")
1174 and vdu_configuration
.get("config-access").get("ssh-access")
1176 vdur
["ssh-keys"] = ssh_keys_all
1177 vdur
["ssh-access-required"] = vdu_configuration
[
1179 ]["ssh-access"]["required"]
1182 and vnf_configuration
.get("config-access")
1183 and vnf_configuration
.get("config-access").get("ssh-access")
1184 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1186 vdur
["ssh-keys"] = ssh_keys_all
1187 vdur
["ssh-access-required"] = vnf_configuration
[
1189 ]["ssh-access"]["required"]
1190 elif ssh_keys_instantiation
and find_in_list(
1191 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1193 vdur
["ssh-keys"] = ssh_keys_instantiation
1195 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1197 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1199 if vdud
.get("cloud-init-file"):
1200 vdur
["cloud-init"] = "{}:file:{}".format(
1201 vnfd
["_id"], vdud
.get("cloud-init-file")
1203 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1204 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1205 base_folder
= vnfd
["_admin"]["storage"]
1206 if base_folder
["pkg-dir"]:
1207 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1208 base_folder
["folder"],
1209 base_folder
["pkg-dir"],
1210 vdud
.get("cloud-init-file"),
1213 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1214 base_folder
["folder"],
1215 vdud
.get("cloud-init-file"),
1217 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1218 target
["cloud_init_content"][
1221 elif vdud
.get("cloud-init"):
1222 vdur
["cloud-init"] = "{}:vdu:{}".format(
1223 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1225 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1226 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1229 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1230 deploy_params_vdu
= self
._format
_additional
_params
(
1231 vdur
.get("additionalParams") or {}
1233 deploy_params_vdu
["OSM"] = get_osm_params(
1234 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1236 vdur
["additionalParams"] = deploy_params_vdu
1239 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1240 if target_vim
not in ns_flavor
["vim_info"]:
1241 ns_flavor
["vim_info"][target_vim
] = {}
1244 # in case alternative images are provided we must check if they should be applied
1245 # for the vim_type, modify the vim_type taking into account
1246 ns_image_id
= int(vdur
["ns-image-id"])
1247 if vdur
.get("alt-image-ids"):
1248 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1249 vim_type
= db_vim
["vim_type"]
1250 for alt_image_id
in vdur
.get("alt-image-ids"):
1251 ns_alt_image
= target
["image"][int(alt_image_id
)]
1252 if vim_type
== ns_alt_image
.get("vim-type"):
1253 # must use alternative image
1255 "use alternative image id: {}".format(alt_image_id
)
1257 ns_image_id
= alt_image_id
1258 vdur
["ns-image-id"] = ns_image_id
1260 ns_image
= target
["image"][int(ns_image_id
)]
1261 if target_vim
not in ns_image
["vim_info"]:
1262 ns_image
["vim_info"][target_vim
] = {}
1265 if vdur
.get("affinity-or-anti-affinity-group-id"):
1266 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1267 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1268 if target_vim
not in ns_ags
["vim_info"]:
1269 ns_ags
["vim_info"][target_vim
] = {}
1271 vdur
["vim_info"] = {target_vim
: {}}
1272 # instantiation parameters
1274 vdu_instantiation_params
= find_in_list(
1275 get_iterable(vnf_params
, "vdu"),
1276 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1278 if vdu_instantiation_params
:
1279 # Parse the vdu_volumes from the instantiation params
1280 vdu_volumes
= get_volumes_from_instantiation_params(
1281 vdu_instantiation_params
, vdud
1283 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1284 vdur_list
.append(vdur
)
1285 target_vnf
["vdur"] = vdur_list
1286 target
["vnf"].append(target_vnf
)
1288 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1289 desc
= await self
.RO
.deploy(nsr_id
, target
)
1290 self
.logger
.debug("RO return > {}".format(desc
))
1291 action_id
= desc
["action_id"]
1292 await self
._wait
_ng
_ro
(
1299 operation
="instantiation",
1304 "_admin.deployed.RO.operational-status": "running",
1305 "detailed-status": " ".join(stage
),
1307 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1308 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1309 self
._write
_op
_status
(nslcmop_id
, stage
)
1311 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1315 async def _wait_ng_ro(
1325 detailed_status_old
= None
1327 start_time
= start_time
or time()
1328 while time() <= start_time
+ timeout
:
1329 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1330 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1331 if desc_status
["status"] == "FAILED":
1332 raise NgRoException(desc_status
["details"])
1333 elif desc_status
["status"] == "BUILD":
1335 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1336 elif desc_status
["status"] == "DONE":
1338 stage
[2] = "Deployed at VIM"
1341 assert False, "ROclient.check_ns_status returns unknown {}".format(
1342 desc_status
["status"]
1344 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1345 detailed_status_old
= stage
[2]
1346 db_nsr_update
["detailed-status"] = " ".join(stage
)
1347 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1348 self
._write
_op
_status
(nslcmop_id
, stage
)
1349 await asyncio
.sleep(15, loop
=self
.loop
)
1350 else: # timeout_ns_deploy
1351 raise NgRoException("Timeout waiting ns to deploy")
1353 async def _terminate_ng_ro(
1354 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1359 start_deploy
= time()
1366 "action_id": nslcmop_id
,
1368 desc
= await self
.RO
.deploy(nsr_id
, target
)
1369 action_id
= desc
["action_id"]
1370 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1371 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1374 + "ns terminate action at RO. action_id={}".format(action_id
)
1378 delete_timeout
= 20 * 60 # 20 minutes
1379 await self
._wait
_ng
_ro
(
1386 operation
="termination",
1389 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1390 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1392 await self
.RO
.delete(nsr_id
)
1393 except Exception as e
:
1394 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1395 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1396 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1397 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1399 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1401 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1402 failed_detail
.append("delete conflict: {}".format(e
))
1405 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1408 failed_detail
.append("delete error: {}".format(e
))
1411 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1415 stage
[2] = "Error deleting from VIM"
1417 stage
[2] = "Deleted from VIM"
1418 db_nsr_update
["detailed-status"] = " ".join(stage
)
1419 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1420 self
._write
_op
_status
(nslcmop_id
, stage
)
1423 raise LcmException("; ".join(failed_detail
))
1426 async def instantiate_RO(
1440 :param logging_text: preffix text to use at logging
1441 :param nsr_id: nsr identity
1442 :param nsd: database content of ns descriptor
1443 :param db_nsr: database content of ns record
1444 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1446 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1447 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1448 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1449 :return: None or exception
1452 start_deploy
= time()
1453 ns_params
= db_nslcmop
.get("operationParams")
1454 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1455 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1457 timeout_ns_deploy
= self
.timeout
.get(
1458 "ns_deploy", self
.timeout_ns_deploy
1461 # Check for and optionally request placement optimization. Database will be updated if placement activated
1462 stage
[2] = "Waiting for Placement."
1463 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1464 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1465 for vnfr
in db_vnfrs
.values():
1466 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1469 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1471 return await self
._instantiate
_ng
_ro
(
1484 except Exception as e
:
1485 stage
[2] = "ERROR deploying at VIM"
1486 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1488 "Error deploying at VIM {}".format(e
),
1489 exc_info
=not isinstance(
1492 ROclient
.ROClientException
,
1501 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1503 Wait for kdu to be up, get ip address
1504 :param logging_text: prefix use for logging
1508 :return: IP address, K8s services
1511 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1514 while nb_tries
< 360:
1515 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1519 for x
in get_iterable(db_vnfr
, "kdur")
1520 if x
.get("kdu-name") == kdu_name
1526 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1528 if kdur
.get("status"):
1529 if kdur
["status"] in ("READY", "ENABLED"):
1530 return kdur
.get("ip-address"), kdur
.get("services")
1533 "target KDU={} is in error state".format(kdu_name
)
1536 await asyncio
.sleep(10, loop
=self
.loop
)
1538 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1540 async def wait_vm_up_insert_key_ro(
1541 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1544 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1545 :param logging_text: prefix use for logging
1550 :param pub_key: public ssh key to inject, None to skip
1551 :param user: user to apply the public ssh key
1555 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1559 target_vdu_id
= None
1565 if ro_retries
>= 360: # 1 hour
1567 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1570 await asyncio
.sleep(10, loop
=self
.loop
)
1573 if not target_vdu_id
:
1574 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1576 if not vdu_id
: # for the VNF case
1577 if db_vnfr
.get("status") == "ERROR":
1579 "Cannot inject ssh-key because target VNF is in error state"
1581 ip_address
= db_vnfr
.get("ip-address")
1587 for x
in get_iterable(db_vnfr
, "vdur")
1588 if x
.get("ip-address") == ip_address
1596 for x
in get_iterable(db_vnfr
, "vdur")
1597 if x
.get("vdu-id-ref") == vdu_id
1598 and x
.get("count-index") == vdu_index
1604 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1605 ): # If only one, this should be the target vdu
1606 vdur
= db_vnfr
["vdur"][0]
1609 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1610 vnfr_id
, vdu_id
, vdu_index
1613 # New generation RO stores information at "vim_info"
1616 if vdur
.get("vim_info"):
1618 t
for t
in vdur
["vim_info"]
1619 ) # there should be only one key
1620 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1622 vdur
.get("pdu-type")
1623 or vdur
.get("status") == "ACTIVE"
1624 or ng_ro_status
== "ACTIVE"
1626 ip_address
= vdur
.get("ip-address")
1629 target_vdu_id
= vdur
["vdu-id-ref"]
1630 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1632 "Cannot inject ssh-key because target VM is in error state"
1635 if not target_vdu_id
:
1638 # inject public key into machine
1639 if pub_key
and user
:
1640 self
.logger
.debug(logging_text
+ "Inserting RO key")
1641 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1642 if vdur
.get("pdu-type"):
1643 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1646 ro_vm_id
= "{}-{}".format(
1647 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1648 ) # TODO add vdu_index
1652 "action": "inject_ssh_key",
1656 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1658 desc
= await self
.RO
.deploy(nsr_id
, target
)
1659 action_id
= desc
["action_id"]
1660 await self
._wait
_ng
_ro
(
1661 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1665 # wait until NS is deployed at RO
1667 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1668 ro_nsr_id
= deep_get(
1669 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1673 result_dict
= await self
.RO
.create_action(
1675 item_id_name
=ro_nsr_id
,
1677 "add_public_key": pub_key
,
1682 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1683 if not result_dict
or not isinstance(result_dict
, dict):
1685 "Unknown response from RO when injecting key"
1687 for result
in result_dict
.values():
1688 if result
.get("vim_result") == 200:
1691 raise ROclient
.ROClientException(
1692 "error injecting key: {}".format(
1693 result
.get("description")
1697 except NgRoException
as e
:
1699 "Reaching max tries injecting key. Error: {}".format(e
)
1701 except ROclient
.ROClientException
as e
:
1705 + "error injecting key: {}. Retrying until {} seconds".format(
1712 "Reaching max tries injecting key. Error: {}".format(e
)
1719 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1721 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1723 my_vca
= vca_deployed_list
[vca_index
]
1724 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1725 # vdu or kdu: no dependencies
1729 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1730 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1731 configuration_status_list
= db_nsr
["configurationStatus"]
1732 for index
, vca_deployed
in enumerate(configuration_status_list
):
1733 if index
== vca_index
:
1736 if not my_vca
.get("member-vnf-index") or (
1737 vca_deployed
.get("member-vnf-index")
1738 == my_vca
.get("member-vnf-index")
1740 internal_status
= configuration_status_list
[index
].get("status")
1741 if internal_status
== "READY":
1743 elif internal_status
== "BROKEN":
1745 "Configuration aborted because dependent charm/s has failed"
1750 # no dependencies, return
1752 await asyncio
.sleep(10)
1755 raise LcmException("Configuration aborted because dependent charm/s timeout")
1757 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1760 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1762 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1763 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1766 async def instantiate_N2VC(
1783 ee_config_descriptor
,
1785 nsr_id
= db_nsr
["_id"]
1786 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1787 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1788 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1789 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1791 "collection": "nsrs",
1792 "filter": {"_id": nsr_id
},
1793 "path": db_update_entry
,
1799 element_under_configuration
= nsr_id
1803 vnfr_id
= db_vnfr
["_id"]
1804 osm_config
["osm"]["vnf_id"] = vnfr_id
1806 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1808 if vca_type
== "native_charm":
1811 index_number
= vdu_index
or 0
1814 element_type
= "VNF"
1815 element_under_configuration
= vnfr_id
1816 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1818 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1819 element_type
= "VDU"
1820 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1821 osm_config
["osm"]["vdu_id"] = vdu_id
1823 namespace
+= ".{}".format(kdu_name
)
1824 element_type
= "KDU"
1825 element_under_configuration
= kdu_name
1826 osm_config
["osm"]["kdu_name"] = kdu_name
1829 if base_folder
["pkg-dir"]:
1830 artifact_path
= "{}/{}/{}/{}".format(
1831 base_folder
["folder"],
1832 base_folder
["pkg-dir"],
1835 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1840 artifact_path
= "{}/Scripts/{}/{}/".format(
1841 base_folder
["folder"],
1844 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1849 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1851 # get initial_config_primitive_list that applies to this element
1852 initial_config_primitive_list
= config_descriptor
.get(
1853 "initial-config-primitive"
1857 "Initial config primitive list > {}".format(
1858 initial_config_primitive_list
1862 # add config if not present for NS charm
1863 ee_descriptor_id
= ee_config_descriptor
.get("id")
1864 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1865 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1866 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1870 "Initial config primitive list #2 > {}".format(
1871 initial_config_primitive_list
1874 # n2vc_redesign STEP 3.1
1875 # find old ee_id if exists
1876 ee_id
= vca_deployed
.get("ee_id")
1878 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1879 # create or register execution environment in VCA
1880 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1882 self
._write
_configuration
_status
(
1884 vca_index
=vca_index
,
1886 element_under_configuration
=element_under_configuration
,
1887 element_type
=element_type
,
1890 step
= "create execution environment"
1891 self
.logger
.debug(logging_text
+ step
)
1895 if vca_type
== "k8s_proxy_charm":
1896 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1897 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1898 namespace
=namespace
,
1899 artifact_path
=artifact_path
,
1903 elif vca_type
== "helm" or vca_type
== "helm-v3":
1904 ee_id
, credentials
= await self
.vca_map
[
1906 ].create_execution_environment(
1907 namespace
=namespace
,
1911 artifact_path
=artifact_path
,
1912 chart_model
=vca_name
,
1916 ee_id
, credentials
= await self
.vca_map
[
1918 ].create_execution_environment(
1919 namespace
=namespace
,
1925 elif vca_type
== "native_charm":
1926 step
= "Waiting to VM being up and getting IP address"
1927 self
.logger
.debug(logging_text
+ step
)
1928 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1937 credentials
= {"hostname": rw_mgmt_ip
}
1939 username
= deep_get(
1940 config_descriptor
, ("config-access", "ssh-access", "default-user")
1942 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1943 # merged. Meanwhile let's get username from initial-config-primitive
1944 if not username
and initial_config_primitive_list
:
1945 for config_primitive
in initial_config_primitive_list
:
1946 for param
in config_primitive
.get("parameter", ()):
1947 if param
["name"] == "ssh-username":
1948 username
= param
["value"]
1952 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1953 "'config-access.ssh-access.default-user'"
1955 credentials
["username"] = username
1956 # n2vc_redesign STEP 3.2
1958 self
._write
_configuration
_status
(
1960 vca_index
=vca_index
,
1961 status
="REGISTERING",
1962 element_under_configuration
=element_under_configuration
,
1963 element_type
=element_type
,
1966 step
= "register execution environment {}".format(credentials
)
1967 self
.logger
.debug(logging_text
+ step
)
1968 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1969 credentials
=credentials
,
1970 namespace
=namespace
,
1975 # for compatibility with MON/POL modules, the need model and application name at database
1976 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1977 ee_id_parts
= ee_id
.split(".")
1978 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1979 if len(ee_id_parts
) >= 2:
1980 model_name
= ee_id_parts
[0]
1981 application_name
= ee_id_parts
[1]
1982 db_nsr_update
[db_update_entry
+ "model"] = model_name
1983 db_nsr_update
[db_update_entry
+ "application"] = application_name
1985 # n2vc_redesign STEP 3.3
1986 step
= "Install configuration Software"
1988 self
._write
_configuration
_status
(
1990 vca_index
=vca_index
,
1991 status
="INSTALLING SW",
1992 element_under_configuration
=element_under_configuration
,
1993 element_type
=element_type
,
1994 other_update
=db_nsr_update
,
1997 # TODO check if already done
1998 self
.logger
.debug(logging_text
+ step
)
2000 if vca_type
== "native_charm":
2001 config_primitive
= next(
2002 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2005 if config_primitive
:
2006 config
= self
._map
_primitive
_params
(
2007 config_primitive
, {}, deploy_params
2010 if vca_type
== "lxc_proxy_charm":
2011 if element_type
== "NS":
2012 num_units
= db_nsr
.get("config-units") or 1
2013 elif element_type
== "VNF":
2014 num_units
= db_vnfr
.get("config-units") or 1
2015 elif element_type
== "VDU":
2016 for v
in db_vnfr
["vdur"]:
2017 if vdu_id
== v
["vdu-id-ref"]:
2018 num_units
= v
.get("config-units") or 1
2020 if vca_type
!= "k8s_proxy_charm":
2021 await self
.vca_map
[vca_type
].install_configuration_sw(
2023 artifact_path
=artifact_path
,
2026 num_units
=num_units
,
2031 # write in db flag of configuration_sw already installed
2033 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2036 # add relations for this VCA (wait for other peers related with this VCA)
2037 await self
._add
_vca
_relations
(
2038 logging_text
=logging_text
,
2041 vca_index
=vca_index
,
2044 # if SSH access is required, then get execution environment SSH public
2045 # if native charm we have waited already to VM be UP
2046 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2049 # self.logger.debug("get ssh key block")
2051 config_descriptor
, ("config-access", "ssh-access", "required")
2053 # self.logger.debug("ssh key needed")
2054 # Needed to inject a ssh key
2057 ("config-access", "ssh-access", "default-user"),
2059 step
= "Install configuration Software, getting public ssh key"
2060 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2061 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2064 step
= "Insert public key into VM user={} ssh_key={}".format(
2068 # self.logger.debug("no need to get ssh key")
2069 step
= "Waiting to VM being up and getting IP address"
2070 self
.logger
.debug(logging_text
+ step
)
2072 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2075 # n2vc_redesign STEP 5.1
2076 # wait for RO (ip-address) Insert pub_key into VM
2079 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2080 logging_text
, nsr_id
, vnfr_id
, kdu_name
2082 vnfd
= self
.db
.get_one(
2084 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2086 kdu
= get_kdu(vnfd
, kdu_name
)
2088 service
["name"] for service
in get_kdu_services(kdu
)
2090 exposed_services
= []
2091 for service
in services
:
2092 if any(s
in service
["name"] for s
in kdu_services
):
2093 exposed_services
.append(service
)
2094 await self
.vca_map
[vca_type
].exec_primitive(
2096 primitive_name
="config",
2098 "osm-config": json
.dumps(
2100 k8s
={"services": exposed_services
}
2107 # This verification is needed in order to avoid trying to add a public key
2108 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2109 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2110 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2112 elif db_vnfr
.get("vdur"):
2113 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2123 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2125 # store rw_mgmt_ip in deploy params for later replacement
2126 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2128 # n2vc_redesign STEP 6 Execute initial config primitive
2129 step
= "execute initial config primitive"
2131 # wait for dependent primitives execution (NS -> VNF -> VDU)
2132 if initial_config_primitive_list
:
2133 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2135 # stage, in function of element type: vdu, kdu, vnf or ns
2136 my_vca
= vca_deployed_list
[vca_index
]
2137 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2139 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2140 elif my_vca
.get("member-vnf-index"):
2142 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2145 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2147 self
._write
_configuration
_status
(
2148 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2151 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2153 check_if_terminated_needed
= True
2154 for initial_config_primitive
in initial_config_primitive_list
:
2155 # adding information on the vca_deployed if it is a NS execution environment
2156 if not vca_deployed
["member-vnf-index"]:
2157 deploy_params
["ns_config_info"] = json
.dumps(
2158 self
._get
_ns
_config
_info
(nsr_id
)
2160 # TODO check if already done
2161 primitive_params_
= self
._map
_primitive
_params
(
2162 initial_config_primitive
, {}, deploy_params
2165 step
= "execute primitive '{}' params '{}'".format(
2166 initial_config_primitive
["name"], primitive_params_
2168 self
.logger
.debug(logging_text
+ step
)
2169 await self
.vca_map
[vca_type
].exec_primitive(
2171 primitive_name
=initial_config_primitive
["name"],
2172 params_dict
=primitive_params_
,
2177 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2178 if check_if_terminated_needed
:
2179 if config_descriptor
.get("terminate-config-primitive"):
2181 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2183 check_if_terminated_needed
= False
2185 # TODO register in database that primitive is done
2187 # STEP 7 Configure metrics
2188 if vca_type
== "helm" or vca_type
== "helm-v3":
2189 # TODO: review for those cases where the helm chart is a reference and
2190 # is not part of the NF package
2191 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2193 artifact_path
=artifact_path
,
2194 ee_config_descriptor
=ee_config_descriptor
,
2197 target_ip
=rw_mgmt_ip
,
2203 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2206 for job
in prometheus_jobs
:
2209 {"job_name": job
["job_name"]},
2212 fail_on_empty
=False,
2215 step
= "instantiated at VCA"
2216 self
.logger
.debug(logging_text
+ step
)
2218 self
._write
_configuration
_status
(
2219 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2222 except Exception as e
: # TODO not use Exception but N2VC exception
2223 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2225 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2228 "Exception while {} : {}".format(step
, e
), exc_info
=True
2230 self
._write
_configuration
_status
(
2231 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2233 raise LcmException("{} {}".format(step
, e
)) from e
2235 def _write_ns_status(
2239 current_operation
: str,
2240 current_operation_id
: str,
2241 error_description
: str = None,
2242 error_detail
: str = None,
2243 other_update
: dict = None,
2246 Update db_nsr fields.
2249 :param current_operation:
2250 :param current_operation_id:
2251 :param error_description:
2252 :param error_detail:
2253 :param other_update: Other required changes at database if provided, will be cleared
2257 db_dict
= other_update
or {}
2260 ] = current_operation_id
# for backward compatibility
2261 db_dict
["_admin.current-operation"] = current_operation_id
2262 db_dict
["_admin.operation-type"] = (
2263 current_operation
if current_operation
!= "IDLE" else None
2265 db_dict
["currentOperation"] = current_operation
2266 db_dict
["currentOperationID"] = current_operation_id
2267 db_dict
["errorDescription"] = error_description
2268 db_dict
["errorDetail"] = error_detail
2271 db_dict
["nsState"] = ns_state
2272 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2273 except DbException
as e
:
2274 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2276 def _write_op_status(
2280 error_message
: str = None,
2281 queuePosition
: int = 0,
2282 operation_state
: str = None,
2283 other_update
: dict = None,
2286 db_dict
= other_update
or {}
2287 db_dict
["queuePosition"] = queuePosition
2288 if isinstance(stage
, list):
2289 db_dict
["stage"] = stage
[0]
2290 db_dict
["detailed-status"] = " ".join(stage
)
2291 elif stage
is not None:
2292 db_dict
["stage"] = str(stage
)
2294 if error_message
is not None:
2295 db_dict
["errorMessage"] = error_message
2296 if operation_state
is not None:
2297 db_dict
["operationState"] = operation_state
2298 db_dict
["statusEnteredTime"] = time()
2299 self
.update_db_2("nslcmops", op_id
, db_dict
)
2300 except DbException
as e
:
2302 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2305 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2307 nsr_id
= db_nsr
["_id"]
2308 # configurationStatus
2309 config_status
= db_nsr
.get("configurationStatus")
2312 "configurationStatus.{}.status".format(index
): status
2313 for index
, v
in enumerate(config_status
)
2317 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2319 except DbException
as e
:
2321 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2324 def _write_configuration_status(
2329 element_under_configuration
: str = None,
2330 element_type
: str = None,
2331 other_update
: dict = None,
2334 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2335 # .format(vca_index, status))
2338 db_path
= "configurationStatus.{}.".format(vca_index
)
2339 db_dict
= other_update
or {}
2341 db_dict
[db_path
+ "status"] = status
2342 if element_under_configuration
:
2344 db_path
+ "elementUnderConfiguration"
2345 ] = element_under_configuration
2347 db_dict
[db_path
+ "elementType"] = element_type
2348 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2349 except DbException
as e
:
2351 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2352 status
, nsr_id
, vca_index
, e
2356 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2358 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2359 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2360 Database is used because the result can be obtained from a different LCM worker in case of HA.
2361 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2362 :param db_nslcmop: database content of nslcmop
2363 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2364 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2365 computed 'vim-account-id'
2368 nslcmop_id
= db_nslcmop
["_id"]
2369 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2370 if placement_engine
== "PLA":
2372 logging_text
+ "Invoke and wait for placement optimization"
2374 await self
.msg
.aiowrite(
2375 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2377 db_poll_interval
= 5
2378 wait
= db_poll_interval
* 10
2380 while not pla_result
and wait
>= 0:
2381 await asyncio
.sleep(db_poll_interval
)
2382 wait
-= db_poll_interval
2383 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2384 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2388 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2391 for pla_vnf
in pla_result
["vnf"]:
2392 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2393 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2398 {"_id": vnfr
["_id"]},
2399 {"vim-account-id": pla_vnf
["vimAccountId"]},
2402 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2405 def update_nsrs_with_pla_result(self
, params
):
2407 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2409 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2411 except Exception as e
:
2412 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2414 async def instantiate(self
, nsr_id
, nslcmop_id
):
2417 :param nsr_id: ns instance to deploy
2418 :param nslcmop_id: operation to run
2422 # Try to lock HA task here
2423 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2424 if not task_is_locked_by_me
:
2426 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2430 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2431 self
.logger
.debug(logging_text
+ "Enter")
2433 # get all needed from database
2435 # database nsrs record
2438 # database nslcmops record
2441 # update operation on nsrs
2443 # update operation on nslcmops
2444 db_nslcmop_update
= {}
2446 nslcmop_operation_state
= None
2447 db_vnfrs
= {} # vnf's info indexed by member-index
2449 tasks_dict_info
= {} # from task to info text
2453 "Stage 1/5: preparation of the environment.",
2454 "Waiting for previous operations to terminate.",
2457 # ^ stage, step, VIM progress
2459 # wait for any previous tasks in process
2460 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2462 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2463 stage
[1] = "Reading from database."
2464 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2465 db_nsr_update
["detailed-status"] = "creating"
2466 db_nsr_update
["operational-status"] = "init"
2467 self
._write
_ns
_status
(
2469 ns_state
="BUILDING",
2470 current_operation
="INSTANTIATING",
2471 current_operation_id
=nslcmop_id
,
2472 other_update
=db_nsr_update
,
2474 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2476 # read from db: operation
2477 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2478 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2479 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2480 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2481 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2483 ns_params
= db_nslcmop
.get("operationParams")
2484 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2485 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2487 timeout_ns_deploy
= self
.timeout
.get(
2488 "ns_deploy", self
.timeout_ns_deploy
2492 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2493 self
.logger
.debug(logging_text
+ stage
[1])
2494 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2495 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2496 self
.logger
.debug(logging_text
+ stage
[1])
2497 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2498 self
.fs
.sync(db_nsr
["nsd-id"])
2500 # nsr_name = db_nsr["name"] # TODO short-name??
2502 # read from db: vnf's of this ns
2503 stage
[1] = "Getting vnfrs from db."
2504 self
.logger
.debug(logging_text
+ stage
[1])
2505 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2507 # read from db: vnfd's for every vnf
2508 db_vnfds
= [] # every vnfd data
2510 # for each vnf in ns, read vnfd
2511 for vnfr
in db_vnfrs_list
:
2512 if vnfr
.get("kdur"):
2514 for kdur
in vnfr
["kdur"]:
2515 if kdur
.get("additionalParams"):
2516 kdur
["additionalParams"] = json
.loads(
2517 kdur
["additionalParams"]
2519 kdur_list
.append(kdur
)
2520 vnfr
["kdur"] = kdur_list
2522 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2523 vnfd_id
= vnfr
["vnfd-id"]
2524 vnfd_ref
= vnfr
["vnfd-ref"]
2525 self
.fs
.sync(vnfd_id
)
2527 # if we haven't this vnfd, read it from db
2528 if vnfd_id
not in db_vnfds
:
2530 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2533 self
.logger
.debug(logging_text
+ stage
[1])
2534 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2537 db_vnfds
.append(vnfd
)
2539 # Get or generates the _admin.deployed.VCA list
2540 vca_deployed_list
= None
2541 if db_nsr
["_admin"].get("deployed"):
2542 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2543 if vca_deployed_list
is None:
2544 vca_deployed_list
= []
2545 configuration_status_list
= []
2546 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2547 db_nsr_update
["configurationStatus"] = configuration_status_list
2548 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2549 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2550 elif isinstance(vca_deployed_list
, dict):
2551 # maintain backward compatibility. Change a dict to list at database
2552 vca_deployed_list
= list(vca_deployed_list
.values())
2553 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2554 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2557 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2559 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2560 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2562 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2563 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2564 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2566 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2569 # n2vc_redesign STEP 2 Deploy Network Scenario
2570 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2571 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2573 stage
[1] = "Deploying KDUs."
2574 # self.logger.debug(logging_text + "Before deploy_kdus")
2575 # Call to deploy_kdus in case exists the "vdu:kdu" param
2576 await self
.deploy_kdus(
2577 logging_text
=logging_text
,
2579 nslcmop_id
=nslcmop_id
,
2582 task_instantiation_info
=tasks_dict_info
,
2585 stage
[1] = "Getting VCA public key."
2586 # n2vc_redesign STEP 1 Get VCA public ssh-key
2587 # feature 1429. Add n2vc public key to needed VMs
2588 n2vc_key
= self
.n2vc
.get_public_key()
2589 n2vc_key_list
= [n2vc_key
]
2590 if self
.vca_config
.get("public_key"):
2591 n2vc_key_list
.append(self
.vca_config
["public_key"])
2593 stage
[1] = "Deploying NS at VIM."
2594 task_ro
= asyncio
.ensure_future(
2595 self
.instantiate_RO(
2596 logging_text
=logging_text
,
2600 db_nslcmop
=db_nslcmop
,
2603 n2vc_key_list
=n2vc_key_list
,
2607 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2608 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2610 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2611 stage
[1] = "Deploying Execution Environments."
2612 self
.logger
.debug(logging_text
+ stage
[1])
2614 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2615 for vnf_profile
in get_vnf_profiles(nsd
):
2616 vnfd_id
= vnf_profile
["vnfd-id"]
2617 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2618 member_vnf_index
= str(vnf_profile
["id"])
2619 db_vnfr
= db_vnfrs
[member_vnf_index
]
2620 base_folder
= vnfd
["_admin"]["storage"]
2626 # Get additional parameters
2627 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2628 if db_vnfr
.get("additionalParamsForVnf"):
2629 deploy_params
.update(
2630 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2633 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2634 if descriptor_config
:
2636 logging_text
=logging_text
2637 + "member_vnf_index={} ".format(member_vnf_index
),
2640 nslcmop_id
=nslcmop_id
,
2646 member_vnf_index
=member_vnf_index
,
2647 vdu_index
=vdu_index
,
2649 deploy_params
=deploy_params
,
2650 descriptor_config
=descriptor_config
,
2651 base_folder
=base_folder
,
2652 task_instantiation_info
=tasks_dict_info
,
2656 # Deploy charms for each VDU that supports one.
2657 for vdud
in get_vdu_list(vnfd
):
2659 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2660 vdur
= find_in_list(
2661 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2664 if vdur
.get("additionalParams"):
2665 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2667 deploy_params_vdu
= deploy_params
2668 deploy_params_vdu
["OSM"] = get_osm_params(
2669 db_vnfr
, vdu_id
, vdu_count_index
=0
2671 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2673 self
.logger
.debug("VDUD > {}".format(vdud
))
2675 "Descriptor config > {}".format(descriptor_config
)
2677 if descriptor_config
:
2680 for vdu_index
in range(vdud_count
):
2681 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2683 logging_text
=logging_text
2684 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2685 member_vnf_index
, vdu_id
, vdu_index
2689 nslcmop_id
=nslcmop_id
,
2695 member_vnf_index
=member_vnf_index
,
2696 vdu_index
=vdu_index
,
2698 deploy_params
=deploy_params_vdu
,
2699 descriptor_config
=descriptor_config
,
2700 base_folder
=base_folder
,
2701 task_instantiation_info
=tasks_dict_info
,
2704 for kdud
in get_kdu_list(vnfd
):
2705 kdu_name
= kdud
["name"]
2706 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2707 if descriptor_config
:
2712 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2714 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2715 if kdur
.get("additionalParams"):
2716 deploy_params_kdu
.update(
2717 parse_yaml_strings(kdur
["additionalParams"].copy())
2721 logging_text
=logging_text
,
2724 nslcmop_id
=nslcmop_id
,
2730 member_vnf_index
=member_vnf_index
,
2731 vdu_index
=vdu_index
,
2733 deploy_params
=deploy_params_kdu
,
2734 descriptor_config
=descriptor_config
,
2735 base_folder
=base_folder
,
2736 task_instantiation_info
=tasks_dict_info
,
2740 # Check if this NS has a charm configuration
2741 descriptor_config
= nsd
.get("ns-configuration")
2742 if descriptor_config
and descriptor_config
.get("juju"):
2745 member_vnf_index
= None
2751 # Get additional parameters
2752 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2753 if db_nsr
.get("additionalParamsForNs"):
2754 deploy_params
.update(
2755 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2757 base_folder
= nsd
["_admin"]["storage"]
2759 logging_text
=logging_text
,
2762 nslcmop_id
=nslcmop_id
,
2768 member_vnf_index
=member_vnf_index
,
2769 vdu_index
=vdu_index
,
2771 deploy_params
=deploy_params
,
2772 descriptor_config
=descriptor_config
,
2773 base_folder
=base_folder
,
2774 task_instantiation_info
=tasks_dict_info
,
2778 # rest of staff will be done at finally
2781 ROclient
.ROClientException
,
2787 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2790 except asyncio
.CancelledError
:
2792 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2794 exc
= "Operation was cancelled"
2795 except Exception as e
:
2796 exc
= traceback
.format_exc()
2797 self
.logger
.critical(
2798 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2803 error_list
.append(str(exc
))
2805 # wait for pending tasks
2807 stage
[1] = "Waiting for instantiate pending tasks."
2808 self
.logger
.debug(logging_text
+ stage
[1])
2809 error_list
+= await self
._wait
_for
_tasks
(
2817 stage
[1] = stage
[2] = ""
2818 except asyncio
.CancelledError
:
2819 error_list
.append("Cancelled")
2820 # TODO cancel all tasks
2821 except Exception as exc
:
2822 error_list
.append(str(exc
))
2824 # update operation-status
2825 db_nsr_update
["operational-status"] = "running"
2826 # let's begin with VCA 'configured' status (later we can change it)
2827 db_nsr_update
["config-status"] = "configured"
2828 for task
, task_name
in tasks_dict_info
.items():
2829 if not task
.done() or task
.cancelled() or task
.exception():
2830 if task_name
.startswith(self
.task_name_deploy_vca
):
2831 # A N2VC task is pending
2832 db_nsr_update
["config-status"] = "failed"
2834 # RO or KDU task is pending
2835 db_nsr_update
["operational-status"] = "failed"
2837 # update status at database
2839 error_detail
= ". ".join(error_list
)
2840 self
.logger
.error(logging_text
+ error_detail
)
2841 error_description_nslcmop
= "{} Detail: {}".format(
2842 stage
[0], error_detail
2844 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2845 nslcmop_id
, stage
[0]
2848 db_nsr_update
["detailed-status"] = (
2849 error_description_nsr
+ " Detail: " + error_detail
2851 db_nslcmop_update
["detailed-status"] = error_detail
2852 nslcmop_operation_state
= "FAILED"
2856 error_description_nsr
= error_description_nslcmop
= None
2858 db_nsr_update
["detailed-status"] = "Done"
2859 db_nslcmop_update
["detailed-status"] = "Done"
2860 nslcmop_operation_state
= "COMPLETED"
2863 self
._write
_ns
_status
(
2866 current_operation
="IDLE",
2867 current_operation_id
=None,
2868 error_description
=error_description_nsr
,
2869 error_detail
=error_detail
,
2870 other_update
=db_nsr_update
,
2872 self
._write
_op
_status
(
2875 error_message
=error_description_nslcmop
,
2876 operation_state
=nslcmop_operation_state
,
2877 other_update
=db_nslcmop_update
,
2880 if nslcmop_operation_state
:
2882 await self
.msg
.aiowrite(
2887 "nslcmop_id": nslcmop_id
,
2888 "operationState": nslcmop_operation_state
,
2892 except Exception as e
:
2894 logging_text
+ "kafka_write notification Exception {}".format(e
)
2897 self
.logger
.debug(logging_text
+ "Exit")
2898 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2900 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2901 if vnfd_id
not in cached_vnfds
:
2902 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2903 return cached_vnfds
[vnfd_id
]
2905 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2906 if vnf_profile_id
not in cached_vnfrs
:
2907 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2910 "member-vnf-index-ref": vnf_profile_id
,
2911 "nsr-id-ref": nsr_id
,
2914 return cached_vnfrs
[vnf_profile_id
]
2916 def _is_deployed_vca_in_relation(
2917 self
, vca
: DeployedVCA
, relation
: Relation
2920 for endpoint
in (relation
.provider
, relation
.requirer
):
2921 if endpoint
["kdu-resource-profile-id"]:
2924 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2925 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2926 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2932 def _update_ee_relation_data_with_implicit_data(
2933 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2935 ee_relation_data
= safe_get_ee_relation(
2936 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2938 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2939 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2940 "execution-environment-ref"
2942 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2943 vnfd_id
= vnf_profile
["vnfd-id"]
2944 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2947 if ee_relation_level
== EELevel
.VNF
2948 else ee_relation_data
["vdu-profile-id"]
2950 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2953 f
"not execution environments found for ee_relation {ee_relation_data}"
2955 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2956 return ee_relation_data
2958 def _get_ns_relations(
2961 nsd
: Dict
[str, Any
],
2963 cached_vnfds
: Dict
[str, Any
],
2964 ) -> List
[Relation
]:
2966 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2967 for r
in db_ns_relations
:
2968 provider_dict
= None
2969 requirer_dict
= None
2970 if all(key
in r
for key
in ("provider", "requirer")):
2971 provider_dict
= r
["provider"]
2972 requirer_dict
= r
["requirer"]
2973 elif "entities" in r
:
2974 provider_id
= r
["entities"][0]["id"]
2977 "endpoint": r
["entities"][0]["endpoint"],
2979 if provider_id
!= nsd
["id"]:
2980 provider_dict
["vnf-profile-id"] = provider_id
2981 requirer_id
= r
["entities"][1]["id"]
2984 "endpoint": r
["entities"][1]["endpoint"],
2986 if requirer_id
!= nsd
["id"]:
2987 requirer_dict
["vnf-profile-id"] = requirer_id
2990 "provider/requirer or entities must be included in the relation."
2992 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2993 nsr_id
, nsd
, provider_dict
, cached_vnfds
2995 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2996 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2998 provider
= EERelation(relation_provider
)
2999 requirer
= EERelation(relation_requirer
)
3000 relation
= Relation(r
["name"], provider
, requirer
)
3001 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3003 relations
.append(relation
)
3006 def _get_vnf_relations(
3009 nsd
: Dict
[str, Any
],
3011 cached_vnfds
: Dict
[str, Any
],
3012 ) -> List
[Relation
]:
3014 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3015 vnf_profile_id
= vnf_profile
["id"]
3016 vnfd_id
= vnf_profile
["vnfd-id"]
3017 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3018 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3019 for r
in db_vnf_relations
:
3020 provider_dict
= None
3021 requirer_dict
= None
3022 if all(key
in r
for key
in ("provider", "requirer")):
3023 provider_dict
= r
["provider"]
3024 requirer_dict
= r
["requirer"]
3025 elif "entities" in r
:
3026 provider_id
= r
["entities"][0]["id"]
3029 "vnf-profile-id": vnf_profile_id
,
3030 "endpoint": r
["entities"][0]["endpoint"],
3032 if provider_id
!= vnfd_id
:
3033 provider_dict
["vdu-profile-id"] = provider_id
3034 requirer_id
= r
["entities"][1]["id"]
3037 "vnf-profile-id": vnf_profile_id
,
3038 "endpoint": r
["entities"][1]["endpoint"],
3040 if requirer_id
!= vnfd_id
:
3041 requirer_dict
["vdu-profile-id"] = requirer_id
3044 "provider/requirer or entities must be included in the relation."
3046 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3047 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3049 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3050 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3052 provider
= EERelation(relation_provider
)
3053 requirer
= EERelation(relation_requirer
)
3054 relation
= Relation(r
["name"], provider
, requirer
)
3055 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3057 relations
.append(relation
)
3060 def _get_kdu_resource_data(
3062 ee_relation
: EERelation
,
3063 db_nsr
: Dict
[str, Any
],
3064 cached_vnfds
: Dict
[str, Any
],
3065 ) -> DeployedK8sResource
:
3066 nsd
= get_nsd(db_nsr
)
3067 vnf_profiles
= get_vnf_profiles(nsd
)
3068 vnfd_id
= find_in_list(
3070 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3072 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3073 kdu_resource_profile
= get_kdu_resource_profile(
3074 db_vnfd
, ee_relation
.kdu_resource_profile_id
3076 kdu_name
= kdu_resource_profile
["kdu-name"]
3077 deployed_kdu
, _
= get_deployed_kdu(
3078 db_nsr
.get("_admin", ()).get("deployed", ()),
3080 ee_relation
.vnf_profile_id
,
3082 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3085 def _get_deployed_component(
3087 ee_relation
: EERelation
,
3088 db_nsr
: Dict
[str, Any
],
3089 cached_vnfds
: Dict
[str, Any
],
3090 ) -> DeployedComponent
:
3091 nsr_id
= db_nsr
["_id"]
3092 deployed_component
= None
3093 ee_level
= EELevel
.get_level(ee_relation
)
3094 if ee_level
== EELevel
.NS
:
3095 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3097 deployed_component
= DeployedVCA(nsr_id
, vca
)
3098 elif ee_level
== EELevel
.VNF
:
3099 vca
= get_deployed_vca(
3103 "member-vnf-index": ee_relation
.vnf_profile_id
,
3104 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3108 deployed_component
= DeployedVCA(nsr_id
, vca
)
3109 elif ee_level
== EELevel
.VDU
:
3110 vca
= get_deployed_vca(
3113 "vdu_id": ee_relation
.vdu_profile_id
,
3114 "member-vnf-index": ee_relation
.vnf_profile_id
,
3115 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3119 deployed_component
= DeployedVCA(nsr_id
, vca
)
3120 elif ee_level
== EELevel
.KDU
:
3121 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3122 ee_relation
, db_nsr
, cached_vnfds
3124 if kdu_resource_data
:
3125 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3126 return deployed_component
3128 async def _add_relation(
3132 db_nsr
: Dict
[str, Any
],
3133 cached_vnfds
: Dict
[str, Any
],
3134 cached_vnfrs
: Dict
[str, Any
],
3136 deployed_provider
= self
._get
_deployed
_component
(
3137 relation
.provider
, db_nsr
, cached_vnfds
3139 deployed_requirer
= self
._get
_deployed
_component
(
3140 relation
.requirer
, db_nsr
, cached_vnfds
3144 and deployed_requirer
3145 and deployed_provider
.config_sw_installed
3146 and deployed_requirer
.config_sw_installed
3148 provider_db_vnfr
= (
3150 relation
.provider
.nsr_id
,
3151 relation
.provider
.vnf_profile_id
,
3154 if relation
.provider
.vnf_profile_id
3157 requirer_db_vnfr
= (
3159 relation
.requirer
.nsr_id
,
3160 relation
.requirer
.vnf_profile_id
,
3163 if relation
.requirer
.vnf_profile_id
3166 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3167 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3168 provider_relation_endpoint
= RelationEndpoint(
3169 deployed_provider
.ee_id
,
3171 relation
.provider
.endpoint
,
3173 requirer_relation_endpoint
= RelationEndpoint(
3174 deployed_requirer
.ee_id
,
3176 relation
.requirer
.endpoint
,
3178 await self
.vca_map
[vca_type
].add_relation(
3179 provider
=provider_relation_endpoint
,
3180 requirer
=requirer_relation_endpoint
,
3182 # remove entry from relations list
3186 async def _add_vca_relations(
3192 timeout
: int = 3600,
3196 # 1. find all relations for this VCA
3197 # 2. wait for other peers related
3201 # STEP 1: find all relations for this VCA
3204 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3205 nsd
= get_nsd(db_nsr
)
3208 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3209 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3214 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3215 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3217 # if no relations, terminate
3219 self
.logger
.debug(logging_text
+ " No relations")
3222 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3229 if now
- start
>= timeout
:
3230 self
.logger
.error(logging_text
+ " : timeout adding relations")
3233 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3234 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3236 # for each relation, find the VCA's related
3237 for relation
in relations
.copy():
3238 added
= await self
._add
_relation
(
3246 relations
.remove(relation
)
3249 self
.logger
.debug("Relations added")
3251 await asyncio
.sleep(5.0)
3255 except Exception as e
:
3256 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3259 async def _install_kdu(
3267 k8s_instance_info
: dict,
3268 k8params
: dict = None,
3274 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3277 "collection": "nsrs",
3278 "filter": {"_id": nsr_id
},
3279 "path": nsr_db_path
,
3282 if k8s_instance_info
.get("kdu-deployment-name"):
3283 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3285 kdu_instance
= self
.k8scluster_map
[
3287 ].generate_kdu_instance_name(
3288 db_dict
=db_dict_install
,
3289 kdu_model
=k8s_instance_info
["kdu-model"],
3290 kdu_name
=k8s_instance_info
["kdu-name"],
3293 # Update the nsrs table with the kdu-instance value
3297 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3300 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3301 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3302 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3303 # namespace, this first verification could be removed, and the next step would be done for any kind
3305 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3306 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3307 if k8sclustertype
in ("juju", "juju-bundle"):
3308 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3309 # that the user passed a namespace which he wants its KDU to be deployed in)
3315 "_admin.projects_write": k8s_instance_info
["namespace"],
3316 "_admin.projects_read": k8s_instance_info
["namespace"],
3322 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3327 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3329 k8s_instance_info
["namespace"] = kdu_instance
3331 await self
.k8scluster_map
[k8sclustertype
].install(
3332 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3333 kdu_model
=k8s_instance_info
["kdu-model"],
3336 db_dict
=db_dict_install
,
3338 kdu_name
=k8s_instance_info
["kdu-name"],
3339 namespace
=k8s_instance_info
["namespace"],
3340 kdu_instance
=kdu_instance
,
3344 # Obtain services to obtain management service ip
3345 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3346 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3347 kdu_instance
=kdu_instance
,
3348 namespace
=k8s_instance_info
["namespace"],
3351 # Obtain management service info (if exists)
3352 vnfr_update_dict
= {}
3353 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3355 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3360 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3363 for service
in kdud
.get("service", [])
3364 if service
.get("mgmt-service")
3366 for mgmt_service
in mgmt_services
:
3367 for service
in services
:
3368 if service
["name"].startswith(mgmt_service
["name"]):
3369 # Mgmt service found, Obtain service ip
3370 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3371 if isinstance(ip
, list) and len(ip
) == 1:
3375 "kdur.{}.ip-address".format(kdu_index
)
3378 # Check if must update also mgmt ip at the vnf
3379 service_external_cp
= mgmt_service
.get(
3380 "external-connection-point-ref"
3382 if service_external_cp
:
3384 deep_get(vnfd
, ("mgmt-interface", "cp"))
3385 == service_external_cp
3387 vnfr_update_dict
["ip-address"] = ip
3392 "external-connection-point-ref", ""
3394 == service_external_cp
,
3397 "kdur.{}.ip-address".format(kdu_index
)
3402 "Mgmt service name: {} not found".format(
3403 mgmt_service
["name"]
3407 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3408 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3410 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3413 and kdu_config
.get("initial-config-primitive")
3414 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3416 initial_config_primitive_list
= kdu_config
.get(
3417 "initial-config-primitive"
3419 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3421 for initial_config_primitive
in initial_config_primitive_list
:
3422 primitive_params_
= self
._map
_primitive
_params
(
3423 initial_config_primitive
, {}, {}
3426 await asyncio
.wait_for(
3427 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3428 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3429 kdu_instance
=kdu_instance
,
3430 primitive_name
=initial_config_primitive
["name"],
3431 params
=primitive_params_
,
3432 db_dict
=db_dict_install
,
3438 except Exception as e
:
3439 # Prepare update db with error and raise exception
3442 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3446 vnfr_data
.get("_id"),
3447 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3450 # ignore to keep original exception
3452 # reraise original error
3457 async def deploy_kdus(
3464 task_instantiation_info
,
3466 # Launch kdus if present in the descriptor
3468 k8scluster_id_2_uuic
= {
3469 "helm-chart-v3": {},
3474 async def _get_cluster_id(cluster_id
, cluster_type
):
3475 nonlocal k8scluster_id_2_uuic
3476 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3477 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3479 # check if K8scluster is creating and wait look if previous tasks in process
3480 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3481 "k8scluster", cluster_id
3484 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3485 task_name
, cluster_id
3487 self
.logger
.debug(logging_text
+ text
)
3488 await asyncio
.wait(task_dependency
, timeout
=3600)
3490 db_k8scluster
= self
.db
.get_one(
3491 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3493 if not db_k8scluster
:
3494 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3496 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3498 if cluster_type
== "helm-chart-v3":
3500 # backward compatibility for existing clusters that have not been initialized for helm v3
3501 k8s_credentials
= yaml
.safe_dump(
3502 db_k8scluster
.get("credentials")
3504 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3505 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3507 db_k8scluster_update
= {}
3508 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3509 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3510 db_k8scluster_update
[
3511 "_admin.helm-chart-v3.created"
3513 db_k8scluster_update
[
3514 "_admin.helm-chart-v3.operationalState"
3517 "k8sclusters", cluster_id
, db_k8scluster_update
3519 except Exception as e
:
3522 + "error initializing helm-v3 cluster: {}".format(str(e
))
3525 "K8s cluster '{}' has not been initialized for '{}'".format(
3526 cluster_id
, cluster_type
3531 "K8s cluster '{}' has not been initialized for '{}'".format(
3532 cluster_id
, cluster_type
3535 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3538 logging_text
+= "Deploy kdus: "
3541 db_nsr_update
= {"_admin.deployed.K8s": []}
3542 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3545 updated_cluster_list
= []
3546 updated_v3_cluster_list
= []
3548 for vnfr_data
in db_vnfrs
.values():
3549 vca_id
= self
.get_vca_id(vnfr_data
, {})
3550 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3551 # Step 0: Prepare and set parameters
3552 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3553 vnfd_id
= vnfr_data
.get("vnfd-id")
3554 vnfd_with_id
= find_in_list(
3555 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3559 for kdud
in vnfd_with_id
["kdu"]
3560 if kdud
["name"] == kdur
["kdu-name"]
3562 namespace
= kdur
.get("k8s-namespace")
3563 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3564 if kdur
.get("helm-chart"):
3565 kdumodel
= kdur
["helm-chart"]
3566 # Default version: helm3, if helm-version is v2 assign v2
3567 k8sclustertype
= "helm-chart-v3"
3568 self
.logger
.debug("kdur: {}".format(kdur
))
3570 kdur
.get("helm-version")
3571 and kdur
.get("helm-version") == "v2"
3573 k8sclustertype
= "helm-chart"
3574 elif kdur
.get("juju-bundle"):
3575 kdumodel
= kdur
["juju-bundle"]
3576 k8sclustertype
= "juju-bundle"
3579 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3580 "juju-bundle. Maybe an old NBI version is running".format(
3581 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3584 # check if kdumodel is a file and exists
3586 vnfd_with_id
= find_in_list(
3587 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3589 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3590 if storage
: # may be not present if vnfd has not artifacts
3591 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3592 if storage
["pkg-dir"]:
3593 filename
= "{}/{}/{}s/{}".format(
3600 filename
= "{}/Scripts/{}s/{}".format(
3605 if self
.fs
.file_exists(
3606 filename
, mode
="file"
3607 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3608 kdumodel
= self
.fs
.path
+ filename
3609 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3611 except Exception: # it is not a file
3614 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3615 step
= "Synchronize repos for k8s cluster '{}'".format(
3618 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3622 k8sclustertype
== "helm-chart"
3623 and cluster_uuid
not in updated_cluster_list
3625 k8sclustertype
== "helm-chart-v3"
3626 and cluster_uuid
not in updated_v3_cluster_list
3628 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3629 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3630 cluster_uuid
=cluster_uuid
3633 if del_repo_list
or added_repo_dict
:
3634 if k8sclustertype
== "helm-chart":
3636 "_admin.helm_charts_added." + item
: None
3637 for item
in del_repo_list
3640 "_admin.helm_charts_added." + item
: name
3641 for item
, name
in added_repo_dict
.items()
3643 updated_cluster_list
.append(cluster_uuid
)
3644 elif k8sclustertype
== "helm-chart-v3":
3646 "_admin.helm_charts_v3_added." + item
: None
3647 for item
in del_repo_list
3650 "_admin.helm_charts_v3_added." + item
: name
3651 for item
, name
in added_repo_dict
.items()
3653 updated_v3_cluster_list
.append(cluster_uuid
)
3655 logging_text
+ "repos synchronized on k8s cluster "
3656 "'{}' to_delete: {}, to_add: {}".format(
3657 k8s_cluster_id
, del_repo_list
, added_repo_dict
3662 {"_id": k8s_cluster_id
},
3668 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3669 vnfr_data
["member-vnf-index-ref"],
3673 k8s_instance_info
= {
3674 "kdu-instance": None,
3675 "k8scluster-uuid": cluster_uuid
,
3676 "k8scluster-type": k8sclustertype
,
3677 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3678 "kdu-name": kdur
["kdu-name"],
3679 "kdu-model": kdumodel
,
3680 "namespace": namespace
,
3681 "kdu-deployment-name": kdu_deployment_name
,
3683 db_path
= "_admin.deployed.K8s.{}".format(index
)
3684 db_nsr_update
[db_path
] = k8s_instance_info
3685 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3686 vnfd_with_id
= find_in_list(
3687 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3689 task
= asyncio
.ensure_future(
3698 k8params
=desc_params
,
3703 self
.lcm_tasks
.register(
3707 "instantiate_KDU-{}".format(index
),
3710 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3716 except (LcmException
, asyncio
.CancelledError
):
3718 except Exception as e
:
3719 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3720 if isinstance(e
, (N2VCException
, DbException
)):
3721 self
.logger
.error(logging_text
+ msg
)
3723 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3724 raise LcmException(msg
)
3727 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3746 task_instantiation_info
,
3749 # launch instantiate_N2VC in a asyncio task and register task object
3750 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3751 # if not found, create one entry and update database
3752 # fill db_nsr._admin.deployed.VCA.<index>
3755 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3759 get_charm_name
= False
3760 if "execution-environment-list" in descriptor_config
:
3761 ee_list
= descriptor_config
.get("execution-environment-list", [])
3762 elif "juju" in descriptor_config
:
3763 ee_list
= [descriptor_config
] # ns charms
3764 if "execution-environment-list" not in descriptor_config
:
3765 # charm name is only required for ns charms
3766 get_charm_name
= True
3767 else: # other types as script are not supported
3770 for ee_item
in ee_list
:
3773 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3774 ee_item
.get("juju"), ee_item
.get("helm-chart")
3777 ee_descriptor_id
= ee_item
.get("id")
3778 if ee_item
.get("juju"):
3779 vca_name
= ee_item
["juju"].get("charm")
3781 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3784 if ee_item
["juju"].get("charm") is not None
3787 if ee_item
["juju"].get("cloud") == "k8s":
3788 vca_type
= "k8s_proxy_charm"
3789 elif ee_item
["juju"].get("proxy") is False:
3790 vca_type
= "native_charm"
3791 elif ee_item
.get("helm-chart"):
3792 vca_name
= ee_item
["helm-chart"]
3793 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3796 vca_type
= "helm-v3"
3799 logging_text
+ "skipping non juju neither charm configuration"
3804 for vca_index
, vca_deployed
in enumerate(
3805 db_nsr
["_admin"]["deployed"]["VCA"]
3807 if not vca_deployed
:
3810 vca_deployed
.get("member-vnf-index") == member_vnf_index
3811 and vca_deployed
.get("vdu_id") == vdu_id
3812 and vca_deployed
.get("kdu_name") == kdu_name
3813 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3814 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3818 # not found, create one.
3820 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3823 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3825 target
+= "/kdu/{}".format(kdu_name
)
3827 "target_element": target
,
3828 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3829 "member-vnf-index": member_vnf_index
,
3831 "kdu_name": kdu_name
,
3832 "vdu_count_index": vdu_index
,
3833 "operational-status": "init", # TODO revise
3834 "detailed-status": "", # TODO revise
3835 "step": "initial-deploy", # TODO revise
3837 "vdu_name": vdu_name
,
3839 "ee_descriptor_id": ee_descriptor_id
,
3840 "charm_name": charm_name
,
3844 # create VCA and configurationStatus in db
3846 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3847 "configurationStatus.{}".format(vca_index
): dict(),
3849 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3851 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3853 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3854 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3855 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3858 task_n2vc
= asyncio
.ensure_future(
3859 self
.instantiate_N2VC(
3860 logging_text
=logging_text
,
3861 vca_index
=vca_index
,
3867 vdu_index
=vdu_index
,
3868 deploy_params
=deploy_params
,
3869 config_descriptor
=descriptor_config
,
3870 base_folder
=base_folder
,
3871 nslcmop_id
=nslcmop_id
,
3875 ee_config_descriptor
=ee_item
,
3878 self
.lcm_tasks
.register(
3882 "instantiate_N2VC-{}".format(vca_index
),
3885 task_instantiation_info
[
3887 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3888 member_vnf_index
or "", vdu_id
or ""
3892 def _create_nslcmop(nsr_id
, operation
, params
):
3894 Creates a ns-lcm-opp content to be stored at database.
3895 :param nsr_id: internal id of the instance
3896 :param operation: instantiate, terminate, scale, action, ...
3897 :param params: user parameters for the operation
3898 :return: dictionary following SOL005 format
3900 # Raise exception if invalid arguments
3901 if not (nsr_id
and operation
and params
):
3903 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3910 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3911 "operationState": "PROCESSING",
3912 "statusEnteredTime": now
,
3913 "nsInstanceId": nsr_id
,
3914 "lcmOperationType": operation
,
3916 "isAutomaticInvocation": False,
3917 "operationParams": params
,
3918 "isCancelPending": False,
3920 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3921 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3926 def _format_additional_params(self
, params
):
3927 params
= params
or {}
3928 for key
, value
in params
.items():
3929 if str(value
).startswith("!!yaml "):
3930 params
[key
] = yaml
.safe_load(value
[7:])
3933 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3934 primitive
= seq
.get("name")
3935 primitive_params
= {}
3937 "member_vnf_index": vnf_index
,
3938 "primitive": primitive
,
3939 "primitive_params": primitive_params
,
3942 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3946 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3947 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3948 if op
.get("operationState") == "COMPLETED":
3949 # b. Skip sub-operation
3950 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3951 return self
.SUBOPERATION_STATUS_SKIP
3953 # c. retry executing sub-operation
3954 # The sub-operation exists, and operationState != 'COMPLETED'
3955 # Update operationState = 'PROCESSING' to indicate a retry.
3956 operationState
= "PROCESSING"
3957 detailed_status
= "In progress"
3958 self
._update
_suboperation
_status
(
3959 db_nslcmop
, op_index
, operationState
, detailed_status
3961 # Return the sub-operation index
3962 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3963 # with arguments extracted from the sub-operation
3966 # Find a sub-operation where all keys in a matching dictionary must match
3967 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3968 def _find_suboperation(self
, db_nslcmop
, match
):
3969 if db_nslcmop
and match
:
3970 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3971 for i
, op
in enumerate(op_list
):
3972 if all(op
.get(k
) == match
[k
] for k
in match
):
3974 return self
.SUBOPERATION_STATUS_NOT_FOUND
3976 # Update status for a sub-operation given its index
3977 def _update_suboperation_status(
3978 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3980 # Update DB for HA tasks
3981 q_filter
= {"_id": db_nslcmop
["_id"]}
3983 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3984 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3987 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3990 # Add sub-operation, return the index of the added sub-operation
3991 # Optionally, set operationState, detailed-status, and operationType
3992 # Status and type are currently set for 'scale' sub-operations:
3993 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3994 # 'detailed-status' : status message
3995 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3996 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3997 def _add_suboperation(
4005 mapped_primitive_params
,
4006 operationState
=None,
4007 detailed_status
=None,
4010 RO_scaling_info
=None,
4013 return self
.SUBOPERATION_STATUS_NOT_FOUND
4014 # Get the "_admin.operations" list, if it exists
4015 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4016 op_list
= db_nslcmop_admin
.get("operations")
4017 # Create or append to the "_admin.operations" list
4019 "member_vnf_index": vnf_index
,
4021 "vdu_count_index": vdu_count_index
,
4022 "primitive": primitive
,
4023 "primitive_params": mapped_primitive_params
,
4026 new_op
["operationState"] = operationState
4028 new_op
["detailed-status"] = detailed_status
4030 new_op
["lcmOperationType"] = operationType
4032 new_op
["RO_nsr_id"] = RO_nsr_id
4034 new_op
["RO_scaling_info"] = RO_scaling_info
4036 # No existing operations, create key 'operations' with current operation as first list element
4037 db_nslcmop_admin
.update({"operations": [new_op
]})
4038 op_list
= db_nslcmop_admin
.get("operations")
4040 # Existing operations, append operation to list
4041 op_list
.append(new_op
)
4043 db_nslcmop_update
= {"_admin.operations": op_list
}
4044 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4045 op_index
= len(op_list
) - 1
4048 # Helper methods for scale() sub-operations
4050 # pre-scale/post-scale:
4051 # Check for 3 different cases:
4052 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4053 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4054 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4055 def _check_or_add_scale_suboperation(
4059 vnf_config_primitive
,
4063 RO_scaling_info
=None,
4065 # Find this sub-operation
4066 if RO_nsr_id
and RO_scaling_info
:
4067 operationType
= "SCALE-RO"
4069 "member_vnf_index": vnf_index
,
4070 "RO_nsr_id": RO_nsr_id
,
4071 "RO_scaling_info": RO_scaling_info
,
4075 "member_vnf_index": vnf_index
,
4076 "primitive": vnf_config_primitive
,
4077 "primitive_params": primitive_params
,
4078 "lcmOperationType": operationType
,
4080 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4081 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4082 # a. New sub-operation
4083 # The sub-operation does not exist, add it.
4084 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4085 # The following parameters are set to None for all kind of scaling:
4087 vdu_count_index
= None
4089 if RO_nsr_id
and RO_scaling_info
:
4090 vnf_config_primitive
= None
4091 primitive_params
= None
4094 RO_scaling_info
= None
4095 # Initial status for sub-operation
4096 operationState
= "PROCESSING"
4097 detailed_status
= "In progress"
4098 # Add sub-operation for pre/post-scaling (zero or more operations)
4099 self
._add
_suboperation
(
4105 vnf_config_primitive
,
4113 return self
.SUBOPERATION_STATUS_NEW
4115 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4116 # or op_index (operationState != 'COMPLETED')
4117 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4119 # Function to return execution_environment id
4121 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4122 # TODO vdu_index_count
4123 for vca
in vca_deployed_list
:
4124 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4127 async def destroy_N2VC(
4135 exec_primitives
=True,
4140 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4141 :param logging_text:
4143 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4144 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4145 :param vca_index: index in the database _admin.deployed.VCA
4146 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4147 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4148 not executed properly
4149 :param scaling_in: True destroys the application, False destroys the model
4150 :return: None or exception
4155 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4156 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4160 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4162 # execute terminate_primitives
4164 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4165 config_descriptor
.get("terminate-config-primitive"),
4166 vca_deployed
.get("ee_descriptor_id"),
4168 vdu_id
= vca_deployed
.get("vdu_id")
4169 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4170 vdu_name
= vca_deployed
.get("vdu_name")
4171 vnf_index
= vca_deployed
.get("member-vnf-index")
4172 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4173 for seq
in terminate_primitives
:
4174 # For each sequence in list, get primitive and call _ns_execute_primitive()
4175 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4176 vnf_index
, seq
.get("name")
4178 self
.logger
.debug(logging_text
+ step
)
4179 # Create the primitive for each sequence, i.e. "primitive": "touch"
4180 primitive
= seq
.get("name")
4181 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4186 self
._add
_suboperation
(
4193 mapped_primitive_params
,
4195 # Sub-operations: Call _ns_execute_primitive() instead of action()
4197 result
, result_detail
= await self
._ns
_execute
_primitive
(
4198 vca_deployed
["ee_id"],
4200 mapped_primitive_params
,
4204 except LcmException
:
4205 # this happens when VCA is not deployed. In this case it is not needed to terminate
4207 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4208 if result
not in result_ok
:
4210 "terminate_primitive {} for vnf_member_index={} fails with "
4211 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4213 # set that this VCA do not need terminated
4214 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4218 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4221 # Delete Prometheus Jobs if any
4222 # This uses NSR_ID, so it will destroy any jobs under this index
4223 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4226 await self
.vca_map
[vca_type
].delete_execution_environment(
4227 vca_deployed
["ee_id"],
4228 scaling_in
=scaling_in
,
4233 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4234 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4235 namespace
= "." + db_nsr
["_id"]
4237 await self
.n2vc
.delete_namespace(
4238 namespace
=namespace
,
4239 total_timeout
=self
.timeout_charm_delete
,
4242 except N2VCNotFound
: # already deleted. Skip
4244 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4246 async def _terminate_RO(
4247 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4250 Terminates a deployment from RO
4251 :param logging_text:
4252 :param nsr_deployed: db_nsr._admin.deployed
4255 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4256 this method will update only the index 2, but it will write on database the concatenated content of the list
4261 ro_nsr_id
= ro_delete_action
= None
4262 if nsr_deployed
and nsr_deployed
.get("RO"):
4263 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4264 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4267 stage
[2] = "Deleting ns from VIM."
4268 db_nsr_update
["detailed-status"] = " ".join(stage
)
4269 self
._write
_op
_status
(nslcmop_id
, stage
)
4270 self
.logger
.debug(logging_text
+ stage
[2])
4271 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4272 self
._write
_op
_status
(nslcmop_id
, stage
)
4273 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4274 ro_delete_action
= desc
["action_id"]
4276 "_admin.deployed.RO.nsr_delete_action_id"
4277 ] = ro_delete_action
4278 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4279 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4280 if ro_delete_action
:
4281 # wait until NS is deleted from VIM
4282 stage
[2] = "Waiting ns deleted from VIM."
4283 detailed_status_old
= None
4287 + " RO_id={} ro_delete_action={}".format(
4288 ro_nsr_id
, ro_delete_action
4291 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4292 self
._write
_op
_status
(nslcmop_id
, stage
)
4294 delete_timeout
= 20 * 60 # 20 minutes
4295 while delete_timeout
> 0:
4296 desc
= await self
.RO
.show(
4298 item_id_name
=ro_nsr_id
,
4299 extra_item
="action",
4300 extra_item_id
=ro_delete_action
,
4304 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4306 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4307 if ns_status
== "ERROR":
4308 raise ROclient
.ROClientException(ns_status_info
)
4309 elif ns_status
== "BUILD":
4310 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4311 elif ns_status
== "ACTIVE":
4312 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4313 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4318 ), "ROclient.check_action_status returns unknown {}".format(
4321 if stage
[2] != detailed_status_old
:
4322 detailed_status_old
= stage
[2]
4323 db_nsr_update
["detailed-status"] = " ".join(stage
)
4324 self
._write
_op
_status
(nslcmop_id
, stage
)
4325 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4326 await asyncio
.sleep(5, loop
=self
.loop
)
4328 else: # delete_timeout <= 0:
4329 raise ROclient
.ROClientException(
4330 "Timeout waiting ns deleted from VIM"
4333 except Exception as e
:
4334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4336 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4338 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4339 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4340 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4342 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4345 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4347 failed_detail
.append("delete conflict: {}".format(e
))
4350 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4353 failed_detail
.append("delete error: {}".format(e
))
4355 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4359 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4360 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4362 stage
[2] = "Deleting nsd from RO."
4363 db_nsr_update
["detailed-status"] = " ".join(stage
)
4364 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4365 self
._write
_op
_status
(nslcmop_id
, stage
)
4366 await self
.RO
.delete("nsd", ro_nsd_id
)
4368 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4370 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4371 except Exception as e
:
4373 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4375 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4377 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4380 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4382 failed_detail
.append(
4383 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4385 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4387 failed_detail
.append(
4388 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4390 self
.logger
.error(logging_text
+ failed_detail
[-1])
4392 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4393 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4394 if not vnf_deployed
or not vnf_deployed
["id"]:
4397 ro_vnfd_id
= vnf_deployed
["id"]
4400 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4401 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4403 db_nsr_update
["detailed-status"] = " ".join(stage
)
4404 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4405 self
._write
_op
_status
(nslcmop_id
, stage
)
4406 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4408 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4410 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4411 except Exception as e
:
4413 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4416 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4420 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4423 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4425 failed_detail
.append(
4426 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4428 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4430 failed_detail
.append(
4431 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4433 self
.logger
.error(logging_text
+ failed_detail
[-1])
4436 stage
[2] = "Error deleting from VIM"
4438 stage
[2] = "Deleted from VIM"
4439 db_nsr_update
["detailed-status"] = " ".join(stage
)
4440 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4441 self
._write
_op
_status
(nslcmop_id
, stage
)
4444 raise LcmException("; ".join(failed_detail
))
4446 async def terminate(self
, nsr_id
, nslcmop_id
):
4447 # Try to lock HA task here
4448 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4449 if not task_is_locked_by_me
:
4452 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4453 self
.logger
.debug(logging_text
+ "Enter")
4454 timeout_ns_terminate
= self
.timeout_ns_terminate
4457 operation_params
= None
4459 error_list
= [] # annotates all failed error messages
4460 db_nslcmop_update
= {}
4461 autoremove
= False # autoremove after terminated
4462 tasks_dict_info
= {}
4465 "Stage 1/3: Preparing task.",
4466 "Waiting for previous operations to terminate.",
4469 # ^ contains [stage, step, VIM-status]
4471 # wait for any previous tasks in process
4472 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4474 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4475 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4476 operation_params
= db_nslcmop
.get("operationParams") or {}
4477 if operation_params
.get("timeout_ns_terminate"):
4478 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4479 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4480 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4482 db_nsr_update
["operational-status"] = "terminating"
4483 db_nsr_update
["config-status"] = "terminating"
4484 self
._write
_ns
_status
(
4486 ns_state
="TERMINATING",
4487 current_operation
="TERMINATING",
4488 current_operation_id
=nslcmop_id
,
4489 other_update
=db_nsr_update
,
4491 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4492 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4493 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4496 stage
[1] = "Getting vnf descriptors from db."
4497 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4499 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4501 db_vnfds_from_id
= {}
4502 db_vnfds_from_member_index
= {}
4504 for vnfr
in db_vnfrs_list
:
4505 vnfd_id
= vnfr
["vnfd-id"]
4506 if vnfd_id
not in db_vnfds_from_id
:
4507 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4508 db_vnfds_from_id
[vnfd_id
] = vnfd
4509 db_vnfds_from_member_index
[
4510 vnfr
["member-vnf-index-ref"]
4511 ] = db_vnfds_from_id
[vnfd_id
]
4513 # Destroy individual execution environments when there are terminating primitives.
4514 # Rest of EE will be deleted at once
4515 # TODO - check before calling _destroy_N2VC
4516 # if not operation_params.get("skip_terminate_primitives"):#
4517 # or not vca.get("needed_terminate"):
4518 stage
[0] = "Stage 2/3 execute terminating primitives."
4519 self
.logger
.debug(logging_text
+ stage
[0])
4520 stage
[1] = "Looking execution environment that needs terminate."
4521 self
.logger
.debug(logging_text
+ stage
[1])
4523 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4524 config_descriptor
= None
4525 vca_member_vnf_index
= vca
.get("member-vnf-index")
4526 vca_id
= self
.get_vca_id(
4527 db_vnfrs_dict
.get(vca_member_vnf_index
)
4528 if vca_member_vnf_index
4532 if not vca
or not vca
.get("ee_id"):
4534 if not vca
.get("member-vnf-index"):
4536 config_descriptor
= db_nsr
.get("ns-configuration")
4537 elif vca
.get("vdu_id"):
4538 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4539 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4540 elif vca
.get("kdu_name"):
4541 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4542 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4544 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4545 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4546 vca_type
= vca
.get("type")
4547 exec_terminate_primitives
= not operation_params
.get(
4548 "skip_terminate_primitives"
4549 ) and vca
.get("needed_terminate")
4550 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4551 # pending native charms
4553 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4555 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4556 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4557 task
= asyncio
.ensure_future(
4565 exec_terminate_primitives
,
4569 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4571 # wait for pending tasks of terminate primitives
4575 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4577 error_list
= await self
._wait
_for
_tasks
(
4580 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4584 tasks_dict_info
.clear()
4586 return # raise LcmException("; ".join(error_list))
4588 # remove All execution environments at once
4589 stage
[0] = "Stage 3/3 delete all."
4591 if nsr_deployed
.get("VCA"):
4592 stage
[1] = "Deleting all execution environments."
4593 self
.logger
.debug(logging_text
+ stage
[1])
4594 vca_id
= self
.get_vca_id({}, db_nsr
)
4595 task_delete_ee
= asyncio
.ensure_future(
4597 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4598 timeout
=self
.timeout_charm_delete
,
4601 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4602 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4604 # Delete from k8scluster
4605 stage
[1] = "Deleting KDUs."
4606 self
.logger
.debug(logging_text
+ stage
[1])
4607 # print(nsr_deployed)
4608 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4609 if not kdu
or not kdu
.get("kdu-instance"):
4611 kdu_instance
= kdu
.get("kdu-instance")
4612 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4613 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4614 vca_id
= self
.get_vca_id({}, db_nsr
)
4615 task_delete_kdu_instance
= asyncio
.ensure_future(
4616 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4617 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4618 kdu_instance
=kdu_instance
,
4620 namespace
=kdu
.get("namespace"),
4626 + "Unknown k8s deployment type {}".format(
4627 kdu
.get("k8scluster-type")
4632 task_delete_kdu_instance
4633 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4636 stage
[1] = "Deleting ns from VIM."
4638 task_delete_ro
= asyncio
.ensure_future(
4639 self
._terminate
_ng
_ro
(
4640 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4644 task_delete_ro
= asyncio
.ensure_future(
4646 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4649 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4651 # rest of staff will be done at finally
4654 ROclient
.ROClientException
,
4659 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4661 except asyncio
.CancelledError
:
4663 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4665 exc
= "Operation was cancelled"
4666 except Exception as e
:
4667 exc
= traceback
.format_exc()
4668 self
.logger
.critical(
4669 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4674 error_list
.append(str(exc
))
4676 # wait for pending tasks
4678 stage
[1] = "Waiting for terminate pending tasks."
4679 self
.logger
.debug(logging_text
+ stage
[1])
4680 error_list
+= await self
._wait
_for
_tasks
(
4683 timeout_ns_terminate
,
4687 stage
[1] = stage
[2] = ""
4688 except asyncio
.CancelledError
:
4689 error_list
.append("Cancelled")
4690 # TODO cancell all tasks
4691 except Exception as exc
:
4692 error_list
.append(str(exc
))
4693 # update status at database
4695 error_detail
= "; ".join(error_list
)
4696 # self.logger.error(logging_text + error_detail)
4697 error_description_nslcmop
= "{} Detail: {}".format(
4698 stage
[0], error_detail
4700 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4701 nslcmop_id
, stage
[0]
4704 db_nsr_update
["operational-status"] = "failed"
4705 db_nsr_update
["detailed-status"] = (
4706 error_description_nsr
+ " Detail: " + error_detail
4708 db_nslcmop_update
["detailed-status"] = error_detail
4709 nslcmop_operation_state
= "FAILED"
4713 error_description_nsr
= error_description_nslcmop
= None
4714 ns_state
= "NOT_INSTANTIATED"
4715 db_nsr_update
["operational-status"] = "terminated"
4716 db_nsr_update
["detailed-status"] = "Done"
4717 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4718 db_nslcmop_update
["detailed-status"] = "Done"
4719 nslcmop_operation_state
= "COMPLETED"
4722 self
._write
_ns
_status
(
4725 current_operation
="IDLE",
4726 current_operation_id
=None,
4727 error_description
=error_description_nsr
,
4728 error_detail
=error_detail
,
4729 other_update
=db_nsr_update
,
4731 self
._write
_op
_status
(
4734 error_message
=error_description_nslcmop
,
4735 operation_state
=nslcmop_operation_state
,
4736 other_update
=db_nslcmop_update
,
4738 if ns_state
== "NOT_INSTANTIATED":
4742 {"nsr-id-ref": nsr_id
},
4743 {"_admin.nsState": "NOT_INSTANTIATED"},
4745 except DbException
as e
:
4748 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4752 if operation_params
:
4753 autoremove
= operation_params
.get("autoremove", False)
4754 if nslcmop_operation_state
:
4756 await self
.msg
.aiowrite(
4761 "nslcmop_id": nslcmop_id
,
4762 "operationState": nslcmop_operation_state
,
4763 "autoremove": autoremove
,
4767 except Exception as e
:
4769 logging_text
+ "kafka_write notification Exception {}".format(e
)
4772 self
.logger
.debug(logging_text
+ "Exit")
4773 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4775 async def _wait_for_tasks(
4776 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4779 error_detail_list
= []
4781 pending_tasks
= list(created_tasks_info
.keys())
4782 num_tasks
= len(pending_tasks
)
4784 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4785 self
._write
_op
_status
(nslcmop_id
, stage
)
4786 while pending_tasks
:
4788 _timeout
= timeout
+ time_start
- time()
4789 done
, pending_tasks
= await asyncio
.wait(
4790 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4792 num_done
+= len(done
)
4793 if not done
: # Timeout
4794 for task
in pending_tasks
:
4795 new_error
= created_tasks_info
[task
] + ": Timeout"
4796 error_detail_list
.append(new_error
)
4797 error_list
.append(new_error
)
4800 if task
.cancelled():
4803 exc
= task
.exception()
4805 if isinstance(exc
, asyncio
.TimeoutError
):
4807 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4808 error_list
.append(created_tasks_info
[task
])
4809 error_detail_list
.append(new_error
)
4816 ROclient
.ROClientException
,
4822 self
.logger
.error(logging_text
+ new_error
)
4824 exc_traceback
= "".join(
4825 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4829 + created_tasks_info
[task
]
4835 logging_text
+ created_tasks_info
[task
] + ": Done"
4837 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4839 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4840 if nsr_id
: # update also nsr
4845 "errorDescription": "Error at: " + ", ".join(error_list
),
4846 "errorDetail": ". ".join(error_detail_list
),
4849 self
._write
_op
_status
(nslcmop_id
, stage
)
4850 return error_detail_list
4853 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4855 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4856 The default-value is used. If it is between < > it look for a value at instantiation_params
4857 :param primitive_desc: portion of VNFD/NSD that describes primitive
4858 :param params: Params provided by user
4859 :param instantiation_params: Instantiation params provided by user
4860 :return: a dictionary with the calculated params
4862 calculated_params
= {}
4863 for parameter
in primitive_desc
.get("parameter", ()):
4864 param_name
= parameter
["name"]
4865 if param_name
in params
:
4866 calculated_params
[param_name
] = params
[param_name
]
4867 elif "default-value" in parameter
or "value" in parameter
:
4868 if "value" in parameter
:
4869 calculated_params
[param_name
] = parameter
["value"]
4871 calculated_params
[param_name
] = parameter
["default-value"]
4873 isinstance(calculated_params
[param_name
], str)
4874 and calculated_params
[param_name
].startswith("<")
4875 and calculated_params
[param_name
].endswith(">")
4877 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4878 calculated_params
[param_name
] = instantiation_params
[
4879 calculated_params
[param_name
][1:-1]
4883 "Parameter {} needed to execute primitive {} not provided".format(
4884 calculated_params
[param_name
], primitive_desc
["name"]
4889 "Parameter {} needed to execute primitive {} not provided".format(
4890 param_name
, primitive_desc
["name"]
4894 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4895 calculated_params
[param_name
] = yaml
.safe_dump(
4896 calculated_params
[param_name
], default_flow_style
=True, width
=256
4898 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4900 ].startswith("!!yaml "):
4901 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4902 if parameter
.get("data-type") == "INTEGER":
4904 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4905 except ValueError: # error converting string to int
4907 "Parameter {} of primitive {} must be integer".format(
4908 param_name
, primitive_desc
["name"]
4911 elif parameter
.get("data-type") == "BOOLEAN":
4912 calculated_params
[param_name
] = not (
4913 (str(calculated_params
[param_name
])).lower() == "false"
4916 # add always ns_config_info if primitive name is config
4917 if primitive_desc
["name"] == "config":
4918 if "ns_config_info" in instantiation_params
:
4919 calculated_params
["ns_config_info"] = instantiation_params
[
4922 return calculated_params
4924 def _look_for_deployed_vca(
4931 ee_descriptor_id
=None,
4933 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4934 for vca
in deployed_vca
:
4937 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4940 vdu_count_index
is not None
4941 and vdu_count_index
!= vca
["vdu_count_index"]
4944 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4946 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4950 # vca_deployed not found
4952 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4953 " is not deployed".format(
4962 ee_id
= vca
.get("ee_id")
4964 "type", "lxc_proxy_charm"
4965 ) # default value for backward compatibility - proxy charm
4968 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4969 "execution environment".format(
4970 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4973 return ee_id
, vca_type
4975 async def _ns_execute_primitive(
4981 retries_interval
=30,
4988 if primitive
== "config":
4989 primitive_params
= {"params": primitive_params
}
4991 vca_type
= vca_type
or "lxc_proxy_charm"
4995 output
= await asyncio
.wait_for(
4996 self
.vca_map
[vca_type
].exec_primitive(
4998 primitive_name
=primitive
,
4999 params_dict
=primitive_params
,
5000 progress_timeout
=self
.timeout_progress_primitive
,
5001 total_timeout
=self
.timeout_primitive
,
5006 timeout
=timeout
or self
.timeout_primitive
,
5010 except asyncio
.CancelledError
:
5012 except Exception as e
:
5016 "Error executing action {} on {} -> {}".format(
5021 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5023 if isinstance(e
, asyncio
.TimeoutError
):
5025 message
="Timed out waiting for action to complete"
5027 return "FAILED", getattr(e
, "message", repr(e
))
5029 return "COMPLETED", output
5031 except (LcmException
, asyncio
.CancelledError
):
5033 except Exception as e
:
5034 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5036 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5038 Updating the vca_status with latest juju information in nsrs record
5039 :param: nsr_id: Id of the nsr
5040 :param: nslcmop_id: Id of the nslcmop
5044 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5045 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5046 vca_id
= self
.get_vca_id({}, db_nsr
)
5047 if db_nsr
["_admin"]["deployed"]["K8s"]:
5048 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5049 cluster_uuid
, kdu_instance
, cluster_type
= (
5050 k8s
["k8scluster-uuid"],
5051 k8s
["kdu-instance"],
5052 k8s
["k8scluster-type"],
5054 await self
._on
_update
_k
8s
_db
(
5055 cluster_uuid
=cluster_uuid
,
5056 kdu_instance
=kdu_instance
,
5057 filter={"_id": nsr_id
},
5059 cluster_type
=cluster_type
,
5062 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5063 table
, filter = "nsrs", {"_id": nsr_id
}
5064 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5065 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5067 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5068 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5070 async def action(self
, nsr_id
, nslcmop_id
):
5071 # Try to lock HA task here
5072 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5073 if not task_is_locked_by_me
:
5076 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5077 self
.logger
.debug(logging_text
+ "Enter")
5078 # get all needed from database
5082 db_nslcmop_update
= {}
5083 nslcmop_operation_state
= None
5084 error_description_nslcmop
= None
5087 # wait for any previous tasks in process
5088 step
= "Waiting for previous operations to terminate"
5089 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5091 self
._write
_ns
_status
(
5094 current_operation
="RUNNING ACTION",
5095 current_operation_id
=nslcmop_id
,
5098 step
= "Getting information from database"
5099 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5100 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5101 if db_nslcmop
["operationParams"].get("primitive_params"):
5102 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5103 db_nslcmop
["operationParams"]["primitive_params"]
5106 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5107 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5108 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5109 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5110 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5111 primitive
= db_nslcmop
["operationParams"]["primitive"]
5112 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5113 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5114 "timeout_ns_action", self
.timeout_primitive
5118 step
= "Getting vnfr from database"
5119 db_vnfr
= self
.db
.get_one(
5120 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5122 if db_vnfr
.get("kdur"):
5124 for kdur
in db_vnfr
["kdur"]:
5125 if kdur
.get("additionalParams"):
5126 kdur
["additionalParams"] = json
.loads(
5127 kdur
["additionalParams"]
5129 kdur_list
.append(kdur
)
5130 db_vnfr
["kdur"] = kdur_list
5131 step
= "Getting vnfd from database"
5132 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5134 # Sync filesystem before running a primitive
5135 self
.fs
.sync(db_vnfr
["vnfd-id"])
5137 step
= "Getting nsd from database"
5138 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5140 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5141 # for backward compatibility
5142 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5143 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5144 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5145 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5147 # look for primitive
5148 config_primitive_desc
= descriptor_configuration
= None
5150 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5152 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5154 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5156 descriptor_configuration
= db_nsd
.get("ns-configuration")
5158 if descriptor_configuration
and descriptor_configuration
.get(
5161 for config_primitive
in descriptor_configuration
["config-primitive"]:
5162 if config_primitive
["name"] == primitive
:
5163 config_primitive_desc
= config_primitive
5166 if not config_primitive_desc
:
5167 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5169 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5173 primitive_name
= primitive
5174 ee_descriptor_id
= None
5176 primitive_name
= config_primitive_desc
.get(
5177 "execution-environment-primitive", primitive
5179 ee_descriptor_id
= config_primitive_desc
.get(
5180 "execution-environment-ref"
5186 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5188 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5191 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5193 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5195 desc_params
= parse_yaml_strings(
5196 db_vnfr
.get("additionalParamsForVnf")
5199 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5200 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5201 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5203 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5204 actions
.add(primitive
["name"])
5205 for primitive
in kdu_configuration
.get("config-primitive", []):
5206 actions
.add(primitive
["name"])
5208 nsr_deployed
["K8s"],
5209 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5210 and kdu
["member-vnf-index"] == vnf_index
,
5214 if primitive_name
in actions
5215 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5219 # TODO check if ns is in a proper status
5221 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5223 # kdur and desc_params already set from before
5224 if primitive_params
:
5225 desc_params
.update(primitive_params
)
5226 # TODO Check if we will need something at vnf level
5227 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5229 kdu_name
== kdu
["kdu-name"]
5230 and kdu
["member-vnf-index"] == vnf_index
5235 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5238 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5239 msg
= "unknown k8scluster-type '{}'".format(
5240 kdu
.get("k8scluster-type")
5242 raise LcmException(msg
)
5245 "collection": "nsrs",
5246 "filter": {"_id": nsr_id
},
5247 "path": "_admin.deployed.K8s.{}".format(index
),
5251 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5253 step
= "Executing kdu {}".format(primitive_name
)
5254 if primitive_name
== "upgrade":
5255 if desc_params
.get("kdu_model"):
5256 kdu_model
= desc_params
.get("kdu_model")
5257 del desc_params
["kdu_model"]
5259 kdu_model
= kdu
.get("kdu-model")
5260 parts
= kdu_model
.split(sep
=":")
5262 kdu_model
= parts
[0]
5263 if desc_params
.get("kdu_atomic_upgrade"):
5264 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5265 del desc_params
["kdu_atomic_upgrade"]
5267 atomic_upgrade
= True
5269 detailed_status
= await asyncio
.wait_for(
5270 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5271 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5272 kdu_instance
=kdu
.get("kdu-instance"),
5273 atomic
=atomic_upgrade
,
5274 kdu_model
=kdu_model
,
5277 timeout
=timeout_ns_action
,
5279 timeout
=timeout_ns_action
+ 10,
5282 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5284 elif primitive_name
== "rollback":
5285 detailed_status
= await asyncio
.wait_for(
5286 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5287 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5288 kdu_instance
=kdu
.get("kdu-instance"),
5291 timeout
=timeout_ns_action
,
5293 elif primitive_name
== "status":
5294 detailed_status
= await asyncio
.wait_for(
5295 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5296 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5297 kdu_instance
=kdu
.get("kdu-instance"),
5300 timeout
=timeout_ns_action
,
5303 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5304 kdu
["kdu-name"], nsr_id
5306 params
= self
._map
_primitive
_params
(
5307 config_primitive_desc
, primitive_params
, desc_params
5310 detailed_status
= await asyncio
.wait_for(
5311 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5312 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5313 kdu_instance
=kdu_instance
,
5314 primitive_name
=primitive_name
,
5317 timeout
=timeout_ns_action
,
5320 timeout
=timeout_ns_action
,
5324 nslcmop_operation_state
= "COMPLETED"
5326 detailed_status
= ""
5327 nslcmop_operation_state
= "FAILED"
5329 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5330 nsr_deployed
["VCA"],
5331 member_vnf_index
=vnf_index
,
5333 vdu_count_index
=vdu_count_index
,
5334 ee_descriptor_id
=ee_descriptor_id
,
5336 for vca_index
, vca_deployed
in enumerate(
5337 db_nsr
["_admin"]["deployed"]["VCA"]
5339 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5341 "collection": "nsrs",
5342 "filter": {"_id": nsr_id
},
5343 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5347 nslcmop_operation_state
,
5349 ) = await self
._ns
_execute
_primitive
(
5351 primitive
=primitive_name
,
5352 primitive_params
=self
._map
_primitive
_params
(
5353 config_primitive_desc
, primitive_params
, desc_params
5355 timeout
=timeout_ns_action
,
5361 db_nslcmop_update
["detailed-status"] = detailed_status
5362 error_description_nslcmop
= (
5363 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5367 + "Done with result {} {}".format(
5368 nslcmop_operation_state
, detailed_status
5371 return # database update is called inside finally
5373 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5374 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5376 except asyncio
.CancelledError
:
5378 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5380 exc
= "Operation was cancelled"
5381 except asyncio
.TimeoutError
:
5382 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5384 except Exception as e
:
5385 exc
= traceback
.format_exc()
5386 self
.logger
.critical(
5387 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5396 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5397 nslcmop_operation_state
= "FAILED"
5399 self
._write
_ns
_status
(
5403 ], # TODO check if degraded. For the moment use previous status
5404 current_operation
="IDLE",
5405 current_operation_id
=None,
5406 # error_description=error_description_nsr,
5407 # error_detail=error_detail,
5408 other_update
=db_nsr_update
,
5411 self
._write
_op
_status
(
5414 error_message
=error_description_nslcmop
,
5415 operation_state
=nslcmop_operation_state
,
5416 other_update
=db_nslcmop_update
,
5419 if nslcmop_operation_state
:
5421 await self
.msg
.aiowrite(
5426 "nslcmop_id": nslcmop_id
,
5427 "operationState": nslcmop_operation_state
,
5431 except Exception as e
:
5433 logging_text
+ "kafka_write notification Exception {}".format(e
)
5435 self
.logger
.debug(logging_text
+ "Exit")
5436 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5437 return nslcmop_operation_state
, detailed_status
5439 async def terminate_vdus(
5440 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5442 """This method terminates VDUs
5445 db_vnfr: VNF instance record
5446 member_vnf_index: VNF index to identify the VDUs to be removed
5447 db_nsr: NS instance record
5448 update_db_nslcmops: Nslcmop update record
5450 vca_scaling_info
= []
5451 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5452 scaling_info
["scaling_direction"] = "IN"
5453 scaling_info
["vdu-delete"] = {}
5454 scaling_info
["kdu-delete"] = {}
5455 db_vdur
= db_vnfr
.get("vdur")
5456 vdur_list
= copy(db_vdur
)
5458 for index
, vdu
in enumerate(vdur_list
):
5459 vca_scaling_info
.append(
5461 "osm_vdu_id": vdu
["vdu-id-ref"],
5462 "member-vnf-index": member_vnf_index
,
5464 "vdu_index": count_index
,
5467 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5468 scaling_info
["vdu"].append(
5470 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5471 "vdu_id": vdu
["vdu-id-ref"],
5475 for interface
in vdu
["interfaces"]:
5476 scaling_info
["vdu"][index
]["interface"].append(
5478 "name": interface
["name"],
5479 "ip_address": interface
["ip-address"],
5480 "mac_address": interface
.get("mac-address"),
5483 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5484 stage
[2] = "Terminating VDUs"
5485 if scaling_info
.get("vdu-delete"):
5486 # scale_process = "RO"
5487 if self
.ro_config
.get("ng"):
5488 await self
._scale
_ng
_ro
(
5497 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5498 """This method is to Remove VNF instances from NS.
5501 nsr_id: NS instance id
5502 nslcmop_id: nslcmop id of update
5503 vnf_instance_id: id of the VNF instance to be removed
5506 result: (str, str) COMPLETED/FAILED, details
5510 logging_text
= "Task ns={} update ".format(nsr_id
)
5511 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5512 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5513 if check_vnfr_count
> 1:
5514 stage
= ["", "", ""]
5515 step
= "Getting nslcmop from database"
5517 step
+ " after having waited for previous tasks to be completed"
5519 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5520 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5521 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5522 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5523 """ db_vnfr = self.db.get_one(
5524 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5526 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5527 await self
.terminate_vdus(
5536 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5537 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5538 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5539 "constituent-vnfr-ref"
5541 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5542 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5543 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5544 return "COMPLETED", "Done"
5546 step
= "Terminate VNF Failed with"
5548 "{} Cannot terminate the last VNF in this NS.".format(
5552 except (LcmException
, asyncio
.CancelledError
):
5554 except Exception as e
:
5555 self
.logger
.debug("Error removing VNF {}".format(e
))
5556 return "FAILED", "Error removing VNF {}".format(e
)
5558 async def _ns_redeploy_vnf(
5566 """This method updates and redeploys VNF instances
5569 nsr_id: NS instance id
5570 nslcmop_id: nslcmop id
5571 db_vnfd: VNF descriptor
5572 db_vnfr: VNF instance record
5573 db_nsr: NS instance record
5576 result: (str, str) COMPLETED/FAILED, details
5580 stage
= ["", "", ""]
5581 logging_text
= "Task ns={} update ".format(nsr_id
)
5582 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5583 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5585 # Terminate old VNF resources
5586 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5587 await self
.terminate_vdus(
5596 # old_vnfd_id = db_vnfr["vnfd-id"]
5597 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5598 new_db_vnfd
= db_vnfd
5599 # new_vnfd_ref = new_db_vnfd["id"]
5600 # new_vnfd_id = vnfd_id
5604 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5606 "name": cp
.get("id"),
5607 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5608 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5611 new_vnfr_cp
.append(vnf_cp
)
5612 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5613 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5614 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5616 "revision": latest_vnfd_revision
,
5617 "connection-point": new_vnfr_cp
,
5621 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5622 updated_db_vnfr
= self
.db
.get_one(
5624 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5627 # Instantiate new VNF resources
5628 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5629 vca_scaling_info
= []
5630 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5631 scaling_info
["scaling_direction"] = "OUT"
5632 scaling_info
["vdu-create"] = {}
5633 scaling_info
["kdu-create"] = {}
5634 vdud_instantiate_list
= db_vnfd
["vdu"]
5635 for index
, vdud
in enumerate(vdud_instantiate_list
):
5636 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5638 additional_params
= (
5639 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5642 cloud_init_list
= []
5644 # TODO Information of its own ip is not available because db_vnfr is not updated.
5645 additional_params
["OSM"] = get_osm_params(
5646 updated_db_vnfr
, vdud
["id"], 1
5648 cloud_init_list
.append(
5649 self
._parse
_cloud
_init
(
5656 vca_scaling_info
.append(
5658 "osm_vdu_id": vdud
["id"],
5659 "member-vnf-index": member_vnf_index
,
5661 "vdu_index": count_index
,
5664 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5665 if self
.ro_config
.get("ng"):
5667 "New Resources to be deployed: {}".format(scaling_info
)
5669 await self
._scale
_ng
_ro
(
5677 return "COMPLETED", "Done"
5678 except (LcmException
, asyncio
.CancelledError
):
5680 except Exception as e
:
5681 self
.logger
.debug("Error updating VNF {}".format(e
))
5682 return "FAILED", "Error updating VNF {}".format(e
)
5684 async def _ns_charm_upgrade(
5690 timeout
: float = None,
5692 """This method upgrade charms in VNF instances
5695 ee_id: Execution environment id
5696 path: Local path to the charm
5698 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5699 timeout: (Float) Timeout for the ns update operation
5702 result: (str, str) COMPLETED/FAILED, details
5705 charm_type
= charm_type
or "lxc_proxy_charm"
5706 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5710 charm_type
=charm_type
,
5711 timeout
=timeout
or self
.timeout_ns_update
,
5715 return "COMPLETED", output
5717 except (LcmException
, asyncio
.CancelledError
):
5720 except Exception as e
:
5722 self
.logger
.debug("Error upgrading charm {}".format(path
))
5724 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5726 async def update(self
, nsr_id
, nslcmop_id
):
5727 """Update NS according to different update types
5729 This method performs upgrade of VNF instances then updates the revision
5730 number in VNF record
5733 nsr_id: Network service will be updated
5734 nslcmop_id: ns lcm operation id
5737 It may raise DbException, LcmException, N2VCException, K8sException
5740 # Try to lock HA task here
5741 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5742 if not task_is_locked_by_me
:
5745 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5746 self
.logger
.debug(logging_text
+ "Enter")
5748 # Set the required variables to be filled up later
5750 db_nslcmop_update
= {}
5752 nslcmop_operation_state
= None
5754 error_description_nslcmop
= ""
5756 change_type
= "updated"
5757 detailed_status
= ""
5760 # wait for any previous tasks in process
5761 step
= "Waiting for previous operations to terminate"
5762 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5763 self
._write
_ns
_status
(
5766 current_operation
="UPDATING",
5767 current_operation_id
=nslcmop_id
,
5770 step
= "Getting nslcmop from database"
5771 db_nslcmop
= self
.db
.get_one(
5772 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5774 update_type
= db_nslcmop
["operationParams"]["updateType"]
5776 step
= "Getting nsr from database"
5777 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5778 old_operational_status
= db_nsr
["operational-status"]
5779 db_nsr_update
["operational-status"] = "updating"
5780 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5781 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5783 if update_type
== "CHANGE_VNFPKG":
5785 # Get the input parameters given through update request
5786 vnf_instance_id
= db_nslcmop
["operationParams"][
5787 "changeVnfPackageData"
5788 ].get("vnfInstanceId")
5790 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5793 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5795 step
= "Getting vnfr from database"
5796 db_vnfr
= self
.db
.get_one(
5797 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5800 step
= "Getting vnfds from database"
5802 latest_vnfd
= self
.db
.get_one(
5803 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5805 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5808 current_vnf_revision
= db_vnfr
.get("revision", 1)
5809 current_vnfd
= self
.db
.get_one(
5811 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5812 fail_on_empty
=False,
5814 # Charm artifact paths will be filled up later
5816 current_charm_artifact_path
,
5817 target_charm_artifact_path
,
5818 charm_artifact_paths
,
5821 step
= "Checking if revision has changed in VNFD"
5822 if current_vnf_revision
!= latest_vnfd_revision
:
5824 change_type
= "policy_updated"
5826 # There is new revision of VNFD, update operation is required
5827 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5828 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5830 step
= "Removing the VNFD packages if they exist in the local path"
5831 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5832 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5834 step
= "Get the VNFD packages from FSMongo"
5835 self
.fs
.sync(from_path
=latest_vnfd_path
)
5836 self
.fs
.sync(from_path
=current_vnfd_path
)
5839 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5841 base_folder
= latest_vnfd
["_admin"]["storage"]
5843 for charm_index
, charm_deployed
in enumerate(
5844 get_iterable(nsr_deployed
, "VCA")
5846 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5848 # Getting charm-id and charm-type
5849 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5850 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5851 charm_type
= charm_deployed
.get("type")
5854 ee_id
= charm_deployed
.get("ee_id")
5856 step
= "Getting descriptor config"
5857 descriptor_config
= get_configuration(
5858 current_vnfd
, current_vnfd
["id"]
5861 if "execution-environment-list" in descriptor_config
:
5862 ee_list
= descriptor_config
.get(
5863 "execution-environment-list", []
5868 # There could be several charm used in the same VNF
5869 for ee_item
in ee_list
:
5870 if ee_item
.get("juju"):
5872 step
= "Getting charm name"
5873 charm_name
= ee_item
["juju"].get("charm")
5875 step
= "Setting Charm artifact paths"
5876 current_charm_artifact_path
.append(
5877 get_charm_artifact_path(
5881 current_vnf_revision
,
5884 target_charm_artifact_path
.append(
5885 get_charm_artifact_path(
5889 latest_vnfd_revision
,
5893 charm_artifact_paths
= zip(
5894 current_charm_artifact_path
, target_charm_artifact_path
5897 step
= "Checking if software version has changed in VNFD"
5898 if find_software_version(current_vnfd
) != find_software_version(
5902 step
= "Checking if existing VNF has charm"
5903 for current_charm_path
, target_charm_path
in list(
5904 charm_artifact_paths
5906 if current_charm_path
:
5908 "Software version change is not supported as VNF instance {} has charm.".format(
5913 # There is no change in the charm package, then redeploy the VNF
5914 # based on new descriptor
5915 step
= "Redeploying VNF"
5916 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5917 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5918 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5920 if result
== "FAILED":
5921 nslcmop_operation_state
= result
5922 error_description_nslcmop
= detailed_status
5923 db_nslcmop_update
["detailed-status"] = detailed_status
5926 + " step {} Done with result {} {}".format(
5927 step
, nslcmop_operation_state
, detailed_status
5932 step
= "Checking if any charm package has changed or not"
5933 for current_charm_path
, target_charm_path
in list(
5934 charm_artifact_paths
5938 and target_charm_path
5939 and self
.check_charm_hash_changed(
5940 current_charm_path
, target_charm_path
5944 step
= "Checking whether VNF uses juju bundle"
5945 if check_juju_bundle_existence(current_vnfd
):
5948 "Charm upgrade is not supported for the instance which"
5949 " uses juju-bundle: {}".format(
5950 check_juju_bundle_existence(current_vnfd
)
5954 step
= "Upgrading Charm"
5958 ) = await self
._ns
_charm
_upgrade
(
5961 charm_type
=charm_type
,
5962 path
=self
.fs
.path
+ target_charm_path
,
5963 timeout
=timeout_seconds
,
5966 if result
== "FAILED":
5967 nslcmop_operation_state
= result
5968 error_description_nslcmop
= detailed_status
5970 db_nslcmop_update
["detailed-status"] = detailed_status
5973 + " step {} Done with result {} {}".format(
5974 step
, nslcmop_operation_state
, detailed_status
5978 step
= "Updating policies"
5979 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5980 result
= "COMPLETED"
5981 detailed_status
= "Done"
5982 db_nslcmop_update
["detailed-status"] = "Done"
5984 # If nslcmop_operation_state is None, so any operation is not failed.
5985 if not nslcmop_operation_state
:
5986 nslcmop_operation_state
= "COMPLETED"
5988 # If update CHANGE_VNFPKG nslcmop_operation is successful
5989 # vnf revision need to be updated
5990 vnfr_update
["revision"] = latest_vnfd_revision
5991 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5995 + " task Done with result {} {}".format(
5996 nslcmop_operation_state
, detailed_status
5999 elif update_type
== "REMOVE_VNF":
6000 # This part is included in https://osm.etsi.org/gerrit/11876
6001 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6002 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6003 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6004 step
= "Removing VNF"
6005 (result
, detailed_status
) = await self
.remove_vnf(
6006 nsr_id
, nslcmop_id
, vnf_instance_id
6008 if result
== "FAILED":
6009 nslcmop_operation_state
= result
6010 error_description_nslcmop
= detailed_status
6011 db_nslcmop_update
["detailed-status"] = detailed_status
6012 change_type
= "vnf_terminated"
6013 if not nslcmop_operation_state
:
6014 nslcmop_operation_state
= "COMPLETED"
6017 + " task Done with result {} {}".format(
6018 nslcmop_operation_state
, detailed_status
6022 elif update_type
== "OPERATE_VNF":
6023 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6026 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6029 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6032 (result
, detailed_status
) = await self
.rebuild_start_stop(
6033 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6035 if result
== "FAILED":
6036 nslcmop_operation_state
= result
6037 error_description_nslcmop
= detailed_status
6038 db_nslcmop_update
["detailed-status"] = detailed_status
6039 if not nslcmop_operation_state
:
6040 nslcmop_operation_state
= "COMPLETED"
6043 + " task Done with result {} {}".format(
6044 nslcmop_operation_state
, detailed_status
6048 # If nslcmop_operation_state is None, so any operation is not failed.
6049 # All operations are executed in overall.
6050 if not nslcmop_operation_state
:
6051 nslcmop_operation_state
= "COMPLETED"
6052 db_nsr_update
["operational-status"] = old_operational_status
6054 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6055 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6057 except asyncio
.CancelledError
:
6059 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6061 exc
= "Operation was cancelled"
6062 except asyncio
.TimeoutError
:
6063 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6065 except Exception as e
:
6066 exc
= traceback
.format_exc()
6067 self
.logger
.critical(
6068 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6077 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6078 nslcmop_operation_state
= "FAILED"
6079 db_nsr_update
["operational-status"] = old_operational_status
6081 self
._write
_ns
_status
(
6083 ns_state
=db_nsr
["nsState"],
6084 current_operation
="IDLE",
6085 current_operation_id
=None,
6086 other_update
=db_nsr_update
,
6089 self
._write
_op
_status
(
6092 error_message
=error_description_nslcmop
,
6093 operation_state
=nslcmop_operation_state
,
6094 other_update
=db_nslcmop_update
,
6097 if nslcmop_operation_state
:
6101 "nslcmop_id": nslcmop_id
,
6102 "operationState": nslcmop_operation_state
,
6104 if change_type
in ("vnf_terminated", "policy_updated"):
6105 msg
.update({"vnf_member_index": member_vnf_index
})
6106 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6107 except Exception as e
:
6109 logging_text
+ "kafka_write notification Exception {}".format(e
)
6111 self
.logger
.debug(logging_text
+ "Exit")
6112 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6113 return nslcmop_operation_state
, detailed_status
6115 async def scale(self
, nsr_id
, nslcmop_id
):
6116 # Try to lock HA task here
6117 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6118 if not task_is_locked_by_me
:
6121 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6122 stage
= ["", "", ""]
6123 tasks_dict_info
= {}
6124 # ^ stage, step, VIM progress
6125 self
.logger
.debug(logging_text
+ "Enter")
6126 # get all needed from database
6128 db_nslcmop_update
= {}
6131 # in case of error, indicates what part of scale was failed to put nsr at error status
6132 scale_process
= None
6133 old_operational_status
= ""
6134 old_config_status
= ""
6137 # wait for any previous tasks in process
6138 step
= "Waiting for previous operations to terminate"
6139 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6140 self
._write
_ns
_status
(
6143 current_operation
="SCALING",
6144 current_operation_id
=nslcmop_id
,
6147 step
= "Getting nslcmop from database"
6149 step
+ " after having waited for previous tasks to be completed"
6151 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6153 step
= "Getting nsr from database"
6154 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6155 old_operational_status
= db_nsr
["operational-status"]
6156 old_config_status
= db_nsr
["config-status"]
6158 step
= "Parsing scaling parameters"
6159 db_nsr_update
["operational-status"] = "scaling"
6160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6161 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6163 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6165 ]["member-vnf-index"]
6166 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6168 ]["scaling-group-descriptor"]
6169 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6170 # for backward compatibility
6171 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6172 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6173 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6174 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6176 step
= "Getting vnfr from database"
6177 db_vnfr
= self
.db
.get_one(
6178 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6181 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6183 step
= "Getting vnfd from database"
6184 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6186 base_folder
= db_vnfd
["_admin"]["storage"]
6188 step
= "Getting scaling-group-descriptor"
6189 scaling_descriptor
= find_in_list(
6190 get_scaling_aspect(db_vnfd
),
6191 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6193 if not scaling_descriptor
:
6195 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6196 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6199 step
= "Sending scale order to VIM"
6200 # TODO check if ns is in a proper status
6202 if not db_nsr
["_admin"].get("scaling-group"):
6207 "_admin.scaling-group": [
6208 {"name": scaling_group
, "nb-scale-op": 0}
6212 admin_scale_index
= 0
6214 for admin_scale_index
, admin_scale_info
in enumerate(
6215 db_nsr
["_admin"]["scaling-group"]
6217 if admin_scale_info
["name"] == scaling_group
:
6218 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6220 else: # not found, set index one plus last element and add new entry with the name
6221 admin_scale_index
+= 1
6223 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6226 vca_scaling_info
= []
6227 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6228 if scaling_type
== "SCALE_OUT":
6229 if "aspect-delta-details" not in scaling_descriptor
:
6231 "Aspect delta details not fount in scaling descriptor {}".format(
6232 scaling_descriptor
["name"]
6235 # count if max-instance-count is reached
6236 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6238 scaling_info
["scaling_direction"] = "OUT"
6239 scaling_info
["vdu-create"] = {}
6240 scaling_info
["kdu-create"] = {}
6241 for delta
in deltas
:
6242 for vdu_delta
in delta
.get("vdu-delta", {}):
6243 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6244 # vdu_index also provides the number of instance of the targeted vdu
6245 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6246 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6250 additional_params
= (
6251 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6254 cloud_init_list
= []
6256 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6257 max_instance_count
= 10
6258 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6259 max_instance_count
= vdu_profile
.get(
6260 "max-number-of-instances", 10
6263 default_instance_num
= get_number_of_instances(
6266 instances_number
= vdu_delta
.get("number-of-instances", 1)
6267 nb_scale_op
+= instances_number
6269 new_instance_count
= nb_scale_op
+ default_instance_num
6270 # Control if new count is over max and vdu count is less than max.
6271 # Then assign new instance count
6272 if new_instance_count
> max_instance_count
> vdu_count
:
6273 instances_number
= new_instance_count
- max_instance_count
6275 instances_number
= instances_number
6277 if new_instance_count
> max_instance_count
:
6279 "reached the limit of {} (max-instance-count) "
6280 "scaling-out operations for the "
6281 "scaling-group-descriptor '{}'".format(
6282 nb_scale_op
, scaling_group
6285 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6287 # TODO Information of its own ip is not available because db_vnfr is not updated.
6288 additional_params
["OSM"] = get_osm_params(
6289 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6291 cloud_init_list
.append(
6292 self
._parse
_cloud
_init
(
6299 vca_scaling_info
.append(
6301 "osm_vdu_id": vdu_delta
["id"],
6302 "member-vnf-index": vnf_index
,
6304 "vdu_index": vdu_index
+ x
,
6307 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6308 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6309 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6310 kdu_name
= kdu_profile
["kdu-name"]
6311 resource_name
= kdu_profile
.get("resource-name", "")
6313 # Might have different kdus in the same delta
6314 # Should have list for each kdu
6315 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6316 scaling_info
["kdu-create"][kdu_name
] = []
6318 kdur
= get_kdur(db_vnfr
, kdu_name
)
6319 if kdur
.get("helm-chart"):
6320 k8s_cluster_type
= "helm-chart-v3"
6321 self
.logger
.debug("kdur: {}".format(kdur
))
6323 kdur
.get("helm-version")
6324 and kdur
.get("helm-version") == "v2"
6326 k8s_cluster_type
= "helm-chart"
6327 elif kdur
.get("juju-bundle"):
6328 k8s_cluster_type
= "juju-bundle"
6331 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6332 "juju-bundle. Maybe an old NBI version is running".format(
6333 db_vnfr
["member-vnf-index-ref"], kdu_name
6337 max_instance_count
= 10
6338 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6339 max_instance_count
= kdu_profile
.get(
6340 "max-number-of-instances", 10
6343 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6344 deployed_kdu
, _
= get_deployed_kdu(
6345 nsr_deployed
, kdu_name
, vnf_index
6347 if deployed_kdu
is None:
6349 "KDU '{}' for vnf '{}' not deployed".format(
6353 kdu_instance
= deployed_kdu
.get("kdu-instance")
6354 instance_num
= await self
.k8scluster_map
[
6360 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6361 kdu_model
=deployed_kdu
.get("kdu-model"),
6363 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6364 "number-of-instances", 1
6367 # Control if new count is over max and instance_num is less than max.
6368 # Then assign max instance number to kdu replica count
6369 if kdu_replica_count
> max_instance_count
> instance_num
:
6370 kdu_replica_count
= max_instance_count
6371 if kdu_replica_count
> max_instance_count
:
6373 "reached the limit of {} (max-instance-count) "
6374 "scaling-out operations for the "
6375 "scaling-group-descriptor '{}'".format(
6376 instance_num
, scaling_group
6380 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6381 vca_scaling_info
.append(
6383 "osm_kdu_id": kdu_name
,
6384 "member-vnf-index": vnf_index
,
6386 "kdu_index": instance_num
+ x
- 1,
6389 scaling_info
["kdu-create"][kdu_name
].append(
6391 "member-vnf-index": vnf_index
,
6393 "k8s-cluster-type": k8s_cluster_type
,
6394 "resource-name": resource_name
,
6395 "scale": kdu_replica_count
,
6398 elif scaling_type
== "SCALE_IN":
6399 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6401 scaling_info
["scaling_direction"] = "IN"
6402 scaling_info
["vdu-delete"] = {}
6403 scaling_info
["kdu-delete"] = {}
6405 for delta
in deltas
:
6406 for vdu_delta
in delta
.get("vdu-delta", {}):
6407 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6408 min_instance_count
= 0
6409 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6410 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6411 min_instance_count
= vdu_profile
["min-number-of-instances"]
6413 default_instance_num
= get_number_of_instances(
6414 db_vnfd
, vdu_delta
["id"]
6416 instance_num
= vdu_delta
.get("number-of-instances", 1)
6417 nb_scale_op
-= instance_num
6419 new_instance_count
= nb_scale_op
+ default_instance_num
6421 if new_instance_count
< min_instance_count
< vdu_count
:
6422 instances_number
= min_instance_count
- new_instance_count
6424 instances_number
= instance_num
6426 if new_instance_count
< min_instance_count
:
6428 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6429 "scaling-group-descriptor '{}'".format(
6430 nb_scale_op
, scaling_group
6433 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6434 vca_scaling_info
.append(
6436 "osm_vdu_id": vdu_delta
["id"],
6437 "member-vnf-index": vnf_index
,
6439 "vdu_index": vdu_index
- 1 - x
,
6442 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6443 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6444 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6445 kdu_name
= kdu_profile
["kdu-name"]
6446 resource_name
= kdu_profile
.get("resource-name", "")
6448 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6449 scaling_info
["kdu-delete"][kdu_name
] = []
6451 kdur
= get_kdur(db_vnfr
, kdu_name
)
6452 if kdur
.get("helm-chart"):
6453 k8s_cluster_type
= "helm-chart-v3"
6454 self
.logger
.debug("kdur: {}".format(kdur
))
6456 kdur
.get("helm-version")
6457 and kdur
.get("helm-version") == "v2"
6459 k8s_cluster_type
= "helm-chart"
6460 elif kdur
.get("juju-bundle"):
6461 k8s_cluster_type
= "juju-bundle"
6464 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6465 "juju-bundle. Maybe an old NBI version is running".format(
6466 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6470 min_instance_count
= 0
6471 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6472 min_instance_count
= kdu_profile
["min-number-of-instances"]
6474 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6475 deployed_kdu
, _
= get_deployed_kdu(
6476 nsr_deployed
, kdu_name
, vnf_index
6478 if deployed_kdu
is None:
6480 "KDU '{}' for vnf '{}' not deployed".format(
6484 kdu_instance
= deployed_kdu
.get("kdu-instance")
6485 instance_num
= await self
.k8scluster_map
[
6491 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6492 kdu_model
=deployed_kdu
.get("kdu-model"),
6494 kdu_replica_count
= instance_num
- kdu_delta
.get(
6495 "number-of-instances", 1
6498 if kdu_replica_count
< min_instance_count
< instance_num
:
6499 kdu_replica_count
= min_instance_count
6500 if kdu_replica_count
< min_instance_count
:
6502 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6503 "scaling-group-descriptor '{}'".format(
6504 instance_num
, scaling_group
6508 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6509 vca_scaling_info
.append(
6511 "osm_kdu_id": kdu_name
,
6512 "member-vnf-index": vnf_index
,
6514 "kdu_index": instance_num
- x
- 1,
6517 scaling_info
["kdu-delete"][kdu_name
].append(
6519 "member-vnf-index": vnf_index
,
6521 "k8s-cluster-type": k8s_cluster_type
,
6522 "resource-name": resource_name
,
6523 "scale": kdu_replica_count
,
6527 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6528 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6529 if scaling_info
["scaling_direction"] == "IN":
6530 for vdur
in reversed(db_vnfr
["vdur"]):
6531 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6532 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6533 scaling_info
["vdu"].append(
6535 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6536 "vdu_id": vdur
["vdu-id-ref"],
6540 for interface
in vdur
["interfaces"]:
6541 scaling_info
["vdu"][-1]["interface"].append(
6543 "name": interface
["name"],
6544 "ip_address": interface
["ip-address"],
6545 "mac_address": interface
.get("mac-address"),
6548 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6551 step
= "Executing pre-scale vnf-config-primitive"
6552 if scaling_descriptor
.get("scaling-config-action"):
6553 for scaling_config_action
in scaling_descriptor
[
6554 "scaling-config-action"
6557 scaling_config_action
.get("trigger") == "pre-scale-in"
6558 and scaling_type
== "SCALE_IN"
6560 scaling_config_action
.get("trigger") == "pre-scale-out"
6561 and scaling_type
== "SCALE_OUT"
6563 vnf_config_primitive
= scaling_config_action
[
6564 "vnf-config-primitive-name-ref"
6566 step
= db_nslcmop_update
[
6568 ] = "executing pre-scale scaling-config-action '{}'".format(
6569 vnf_config_primitive
6572 # look for primitive
6573 for config_primitive
in (
6574 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6575 ).get("config-primitive", ()):
6576 if config_primitive
["name"] == vnf_config_primitive
:
6580 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6581 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6582 "primitive".format(scaling_group
, vnf_config_primitive
)
6585 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6586 if db_vnfr
.get("additionalParamsForVnf"):
6587 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6589 scale_process
= "VCA"
6590 db_nsr_update
["config-status"] = "configuring pre-scaling"
6591 primitive_params
= self
._map
_primitive
_params
(
6592 config_primitive
, {}, vnfr_params
6595 # Pre-scale retry check: Check if this sub-operation has been executed before
6596 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6599 vnf_config_primitive
,
6603 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6604 # Skip sub-operation
6605 result
= "COMPLETED"
6606 result_detail
= "Done"
6609 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6610 vnf_config_primitive
, result
, result_detail
6614 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6615 # New sub-operation: Get index of this sub-operation
6617 len(db_nslcmop
.get("_admin", {}).get("operations"))
6622 + "vnf_config_primitive={} New sub-operation".format(
6623 vnf_config_primitive
6627 # retry: Get registered params for this existing sub-operation
6628 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6631 vnf_index
= op
.get("member_vnf_index")
6632 vnf_config_primitive
= op
.get("primitive")
6633 primitive_params
= op
.get("primitive_params")
6636 + "vnf_config_primitive={} Sub-operation retry".format(
6637 vnf_config_primitive
6640 # Execute the primitive, either with new (first-time) or registered (reintent) args
6641 ee_descriptor_id
= config_primitive
.get(
6642 "execution-environment-ref"
6644 primitive_name
= config_primitive
.get(
6645 "execution-environment-primitive", vnf_config_primitive
6647 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6648 nsr_deployed
["VCA"],
6649 member_vnf_index
=vnf_index
,
6651 vdu_count_index
=None,
6652 ee_descriptor_id
=ee_descriptor_id
,
6654 result
, result_detail
= await self
._ns
_execute
_primitive
(
6663 + "vnf_config_primitive={} Done with result {} {}".format(
6664 vnf_config_primitive
, result
, result_detail
6667 # Update operationState = COMPLETED | FAILED
6668 self
._update
_suboperation
_status
(
6669 db_nslcmop
, op_index
, result
, result_detail
6672 if result
== "FAILED":
6673 raise LcmException(result_detail
)
6674 db_nsr_update
["config-status"] = old_config_status
6675 scale_process
= None
6679 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6682 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6685 # SCALE-IN VCA - BEGIN
6686 if vca_scaling_info
:
6687 step
= db_nslcmop_update
[
6689 ] = "Deleting the execution environments"
6690 scale_process
= "VCA"
6691 for vca_info
in vca_scaling_info
:
6692 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6693 member_vnf_index
= str(vca_info
["member-vnf-index"])
6695 logging_text
+ "vdu info: {}".format(vca_info
)
6697 if vca_info
.get("osm_vdu_id"):
6698 vdu_id
= vca_info
["osm_vdu_id"]
6699 vdu_index
= int(vca_info
["vdu_index"])
6702 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6703 member_vnf_index
, vdu_id
, vdu_index
6705 stage
[2] = step
= "Scaling in VCA"
6706 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6707 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6708 config_update
= db_nsr
["configurationStatus"]
6709 for vca_index
, vca
in enumerate(vca_update
):
6711 (vca
or vca
.get("ee_id"))
6712 and vca
["member-vnf-index"] == member_vnf_index
6713 and vca
["vdu_count_index"] == vdu_index
6715 if vca
.get("vdu_id"):
6716 config_descriptor
= get_configuration(
6717 db_vnfd
, vca
.get("vdu_id")
6719 elif vca
.get("kdu_name"):
6720 config_descriptor
= get_configuration(
6721 db_vnfd
, vca
.get("kdu_name")
6724 config_descriptor
= get_configuration(
6725 db_vnfd
, db_vnfd
["id"]
6727 operation_params
= (
6728 db_nslcmop
.get("operationParams") or {}
6730 exec_terminate_primitives
= not operation_params
.get(
6731 "skip_terminate_primitives"
6732 ) and vca
.get("needed_terminate")
6733 task
= asyncio
.ensure_future(
6742 exec_primitives
=exec_terminate_primitives
,
6746 timeout
=self
.timeout_charm_delete
,
6749 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6752 del vca_update
[vca_index
]
6753 del config_update
[vca_index
]
6754 # wait for pending tasks of terminate primitives
6758 + "Waiting for tasks {}".format(
6759 list(tasks_dict_info
.keys())
6762 error_list
= await self
._wait
_for
_tasks
(
6766 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6771 tasks_dict_info
.clear()
6773 raise LcmException("; ".join(error_list
))
6775 db_vca_and_config_update
= {
6776 "_admin.deployed.VCA": vca_update
,
6777 "configurationStatus": config_update
,
6780 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6782 scale_process
= None
6783 # SCALE-IN VCA - END
6786 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6787 scale_process
= "RO"
6788 if self
.ro_config
.get("ng"):
6789 await self
._scale
_ng
_ro
(
6790 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6792 scaling_info
.pop("vdu-create", None)
6793 scaling_info
.pop("vdu-delete", None)
6795 scale_process
= None
6799 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6800 scale_process
= "KDU"
6801 await self
._scale
_kdu
(
6802 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6804 scaling_info
.pop("kdu-create", None)
6805 scaling_info
.pop("kdu-delete", None)
6807 scale_process
= None
6811 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6813 # SCALE-UP VCA - BEGIN
6814 if vca_scaling_info
:
6815 step
= db_nslcmop_update
[
6817 ] = "Creating new execution environments"
6818 scale_process
= "VCA"
6819 for vca_info
in vca_scaling_info
:
6820 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6821 member_vnf_index
= str(vca_info
["member-vnf-index"])
6823 logging_text
+ "vdu info: {}".format(vca_info
)
6825 vnfd_id
= db_vnfr
["vnfd-ref"]
6826 if vca_info
.get("osm_vdu_id"):
6827 vdu_index
= int(vca_info
["vdu_index"])
6828 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6829 if db_vnfr
.get("additionalParamsForVnf"):
6830 deploy_params
.update(
6832 db_vnfr
["additionalParamsForVnf"].copy()
6835 descriptor_config
= get_configuration(
6836 db_vnfd
, db_vnfd
["id"]
6838 if descriptor_config
:
6843 logging_text
=logging_text
6844 + "member_vnf_index={} ".format(member_vnf_index
),
6847 nslcmop_id
=nslcmop_id
,
6853 member_vnf_index
=member_vnf_index
,
6854 vdu_index
=vdu_index
,
6856 deploy_params
=deploy_params
,
6857 descriptor_config
=descriptor_config
,
6858 base_folder
=base_folder
,
6859 task_instantiation_info
=tasks_dict_info
,
6862 vdu_id
= vca_info
["osm_vdu_id"]
6863 vdur
= find_in_list(
6864 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6866 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6867 if vdur
.get("additionalParams"):
6868 deploy_params_vdu
= parse_yaml_strings(
6869 vdur
["additionalParams"]
6872 deploy_params_vdu
= deploy_params
6873 deploy_params_vdu
["OSM"] = get_osm_params(
6874 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6876 if descriptor_config
:
6881 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6882 member_vnf_index
, vdu_id
, vdu_index
6884 stage
[2] = step
= "Scaling out VCA"
6885 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6887 logging_text
=logging_text
6888 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6889 member_vnf_index
, vdu_id
, vdu_index
6893 nslcmop_id
=nslcmop_id
,
6899 member_vnf_index
=member_vnf_index
,
6900 vdu_index
=vdu_index
,
6902 deploy_params
=deploy_params_vdu
,
6903 descriptor_config
=descriptor_config
,
6904 base_folder
=base_folder
,
6905 task_instantiation_info
=tasks_dict_info
,
6908 # SCALE-UP VCA - END
6909 scale_process
= None
6912 # execute primitive service POST-SCALING
6913 step
= "Executing post-scale vnf-config-primitive"
6914 if scaling_descriptor
.get("scaling-config-action"):
6915 for scaling_config_action
in scaling_descriptor
[
6916 "scaling-config-action"
6919 scaling_config_action
.get("trigger") == "post-scale-in"
6920 and scaling_type
== "SCALE_IN"
6922 scaling_config_action
.get("trigger") == "post-scale-out"
6923 and scaling_type
== "SCALE_OUT"
6925 vnf_config_primitive
= scaling_config_action
[
6926 "vnf-config-primitive-name-ref"
6928 step
= db_nslcmop_update
[
6930 ] = "executing post-scale scaling-config-action '{}'".format(
6931 vnf_config_primitive
6934 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6935 if db_vnfr
.get("additionalParamsForVnf"):
6936 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6938 # look for primitive
6939 for config_primitive
in (
6940 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6941 ).get("config-primitive", ()):
6942 if config_primitive
["name"] == vnf_config_primitive
:
6946 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6947 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6948 "config-primitive".format(
6949 scaling_group
, vnf_config_primitive
6952 scale_process
= "VCA"
6953 db_nsr_update
["config-status"] = "configuring post-scaling"
6954 primitive_params
= self
._map
_primitive
_params
(
6955 config_primitive
, {}, vnfr_params
6958 # Post-scale retry check: Check if this sub-operation has been executed before
6959 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6962 vnf_config_primitive
,
6966 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6967 # Skip sub-operation
6968 result
= "COMPLETED"
6969 result_detail
= "Done"
6972 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6973 vnf_config_primitive
, result
, result_detail
6977 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6978 # New sub-operation: Get index of this sub-operation
6980 len(db_nslcmop
.get("_admin", {}).get("operations"))
6985 + "vnf_config_primitive={} New sub-operation".format(
6986 vnf_config_primitive
6990 # retry: Get registered params for this existing sub-operation
6991 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6994 vnf_index
= op
.get("member_vnf_index")
6995 vnf_config_primitive
= op
.get("primitive")
6996 primitive_params
= op
.get("primitive_params")
6999 + "vnf_config_primitive={} Sub-operation retry".format(
7000 vnf_config_primitive
7003 # Execute the primitive, either with new (first-time) or registered (reintent) args
7004 ee_descriptor_id
= config_primitive
.get(
7005 "execution-environment-ref"
7007 primitive_name
= config_primitive
.get(
7008 "execution-environment-primitive", vnf_config_primitive
7010 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7011 nsr_deployed
["VCA"],
7012 member_vnf_index
=vnf_index
,
7014 vdu_count_index
=None,
7015 ee_descriptor_id
=ee_descriptor_id
,
7017 result
, result_detail
= await self
._ns
_execute
_primitive
(
7026 + "vnf_config_primitive={} Done with result {} {}".format(
7027 vnf_config_primitive
, result
, result_detail
7030 # Update operationState = COMPLETED | FAILED
7031 self
._update
_suboperation
_status
(
7032 db_nslcmop
, op_index
, result
, result_detail
7035 if result
== "FAILED":
7036 raise LcmException(result_detail
)
7037 db_nsr_update
["config-status"] = old_config_status
7038 scale_process
= None
7043 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7044 db_nsr_update
["operational-status"] = (
7046 if old_operational_status
== "failed"
7047 else old_operational_status
7049 db_nsr_update
["config-status"] = old_config_status
7052 ROclient
.ROClientException
,
7057 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7059 except asyncio
.CancelledError
:
7061 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7063 exc
= "Operation was cancelled"
7064 except Exception as e
:
7065 exc
= traceback
.format_exc()
7066 self
.logger
.critical(
7067 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7071 self
._write
_ns
_status
(
7074 current_operation
="IDLE",
7075 current_operation_id
=None,
7078 stage
[1] = "Waiting for instantiate pending tasks."
7079 self
.logger
.debug(logging_text
+ stage
[1])
7080 exc
= await self
._wait
_for
_tasks
(
7083 self
.timeout_ns_deploy
,
7091 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7092 nslcmop_operation_state
= "FAILED"
7094 db_nsr_update
["operational-status"] = old_operational_status
7095 db_nsr_update
["config-status"] = old_config_status
7096 db_nsr_update
["detailed-status"] = ""
7098 if "VCA" in scale_process
:
7099 db_nsr_update
["config-status"] = "failed"
7100 if "RO" in scale_process
:
7101 db_nsr_update
["operational-status"] = "failed"
7104 ] = "FAILED scaling nslcmop={} {}: {}".format(
7105 nslcmop_id
, step
, exc
7108 error_description_nslcmop
= None
7109 nslcmop_operation_state
= "COMPLETED"
7110 db_nslcmop_update
["detailed-status"] = "Done"
7112 self
._write
_op
_status
(
7115 error_message
=error_description_nslcmop
,
7116 operation_state
=nslcmop_operation_state
,
7117 other_update
=db_nslcmop_update
,
7120 self
._write
_ns
_status
(
7123 current_operation
="IDLE",
7124 current_operation_id
=None,
7125 other_update
=db_nsr_update
,
7128 if nslcmop_operation_state
:
7132 "nslcmop_id": nslcmop_id
,
7133 "operationState": nslcmop_operation_state
,
7135 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7136 except Exception as e
:
7138 logging_text
+ "kafka_write notification Exception {}".format(e
)
7140 self
.logger
.debug(logging_text
+ "Exit")
7141 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7143 async def _scale_kdu(
7144 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7146 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7147 for kdu_name
in _scaling_info
:
7148 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7149 deployed_kdu
, index
= get_deployed_kdu(
7150 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7152 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7153 kdu_instance
= deployed_kdu
["kdu-instance"]
7154 kdu_model
= deployed_kdu
.get("kdu-model")
7155 scale
= int(kdu_scaling_info
["scale"])
7156 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7159 "collection": "nsrs",
7160 "filter": {"_id": nsr_id
},
7161 "path": "_admin.deployed.K8s.{}".format(index
),
7164 step
= "scaling application {}".format(
7165 kdu_scaling_info
["resource-name"]
7167 self
.logger
.debug(logging_text
+ step
)
7169 if kdu_scaling_info
["type"] == "delete":
7170 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7173 and kdu_config
.get("terminate-config-primitive")
7174 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7176 terminate_config_primitive_list
= kdu_config
.get(
7177 "terminate-config-primitive"
7179 terminate_config_primitive_list
.sort(
7180 key
=lambda val
: int(val
["seq"])
7184 terminate_config_primitive
7185 ) in terminate_config_primitive_list
:
7186 primitive_params_
= self
._map
_primitive
_params
(
7187 terminate_config_primitive
, {}, {}
7189 step
= "execute terminate config primitive"
7190 self
.logger
.debug(logging_text
+ step
)
7191 await asyncio
.wait_for(
7192 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7193 cluster_uuid
=cluster_uuid
,
7194 kdu_instance
=kdu_instance
,
7195 primitive_name
=terminate_config_primitive
["name"],
7196 params
=primitive_params_
,
7198 total_timeout
=self
.timeout_primitive
,
7201 timeout
=self
.timeout_primitive
7202 * self
.timeout_primitive_outer_factor
,
7205 await asyncio
.wait_for(
7206 self
.k8scluster_map
[k8s_cluster_type
].scale(
7207 kdu_instance
=kdu_instance
,
7209 resource_name
=kdu_scaling_info
["resource-name"],
7210 total_timeout
=self
.timeout_scale_on_error
,
7212 cluster_uuid
=cluster_uuid
,
7213 kdu_model
=kdu_model
,
7217 timeout
=self
.timeout_scale_on_error
7218 * self
.timeout_scale_on_error_outer_factor
,
7221 if kdu_scaling_info
["type"] == "create":
7222 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7225 and kdu_config
.get("initial-config-primitive")
7226 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7228 initial_config_primitive_list
= kdu_config
.get(
7229 "initial-config-primitive"
7231 initial_config_primitive_list
.sort(
7232 key
=lambda val
: int(val
["seq"])
7235 for initial_config_primitive
in initial_config_primitive_list
:
7236 primitive_params_
= self
._map
_primitive
_params
(
7237 initial_config_primitive
, {}, {}
7239 step
= "execute initial config primitive"
7240 self
.logger
.debug(logging_text
+ step
)
7241 await asyncio
.wait_for(
7242 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7243 cluster_uuid
=cluster_uuid
,
7244 kdu_instance
=kdu_instance
,
7245 primitive_name
=initial_config_primitive
["name"],
7246 params
=primitive_params_
,
7253 async def _scale_ng_ro(
7254 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7256 nsr_id
= db_nslcmop
["nsInstanceId"]
7257 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7260 # read from db: vnfd's for every vnf
7263 # for each vnf in ns, read vnfd
7264 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7265 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7266 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7267 # if we haven't this vnfd, read it from db
7268 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7270 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7271 db_vnfds
.append(vnfd
)
7272 n2vc_key
= self
.n2vc
.get_public_key()
7273 n2vc_key_list
= [n2vc_key
]
7276 vdu_scaling_info
.get("vdu-create"),
7277 vdu_scaling_info
.get("vdu-delete"),
7280 # db_vnfr has been updated, update db_vnfrs to use it
7281 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7282 await self
._instantiate
_ng
_ro
(
7292 start_deploy
=time(),
7293 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7295 if vdu_scaling_info
.get("vdu-delete"):
7297 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7300 async def extract_prometheus_scrape_jobs(
7301 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7303 # look if exist a file called 'prometheus*.j2' and
7304 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7308 for f
in artifact_content
7309 if f
.startswith("prometheus") and f
.endswith(".j2")
7315 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7319 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7320 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7322 vnfr_id
= vnfr_id
.replace("-", "")
7324 "JOB_NAME": vnfr_id
,
7325 "TARGET_IP": target_ip
,
7326 "EXPORTER_POD_IP": host_name
,
7327 "EXPORTER_POD_PORT": host_port
,
7329 job_list
= parse_job(job_data
, variables
)
7330 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7331 for job
in job_list
:
7333 not isinstance(job
.get("job_name"), str)
7334 or vnfr_id
not in job
["job_name"]
7336 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7337 job
["nsr_id"] = nsr_id
7338 job
["vnfr_id"] = vnfr_id
7341 async def rebuild_start_stop(
7342 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7344 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7345 self
.logger
.info(logging_text
+ "Enter")
7346 stage
= ["Preparing the environment", ""]
7347 # database nsrs record
7351 # in case of error, indicates what part of scale was failed to put nsr at error status
7352 start_deploy
= time()
7354 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7355 vim_account_id
= db_vnfr
.get("vim-account-id")
7356 vim_info_key
= "vim:" + vim_account_id
7357 vdu_id
= additional_param
["vdu_id"]
7358 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7359 vdur
= find_in_list(
7360 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7363 vdu_vim_name
= vdur
["name"]
7364 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7365 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7367 raise LcmException("Target vdu is not found")
7368 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7369 # wait for any previous tasks in process
7370 stage
[1] = "Waiting for previous operations to terminate"
7371 self
.logger
.info(stage
[1])
7372 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7374 stage
[1] = "Reading from database."
7375 self
.logger
.info(stage
[1])
7376 self
._write
_ns
_status
(
7379 current_operation
=operation_type
.upper(),
7380 current_operation_id
=nslcmop_id
,
7382 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7385 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7386 db_nsr_update
["operational-status"] = operation_type
7387 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7391 "vim_vm_id": vim_vm_id
,
7393 "vdu_index": additional_param
["count-index"],
7394 "vdu_id": vdur
["id"],
7395 "target_vim": target_vim
,
7396 "vim_account_id": vim_account_id
,
7399 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7400 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7401 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7402 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7403 self
.logger
.info("response from RO: {}".format(result_dict
))
7404 action_id
= result_dict
["action_id"]
7405 await self
._wait
_ng
_ro
(
7410 self
.timeout_operate
,
7412 "start_stop_rebuild",
7414 return "COMPLETED", "Done"
7415 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7416 self
.logger
.error("Exit Exception {}".format(e
))
7418 except asyncio
.CancelledError
:
7419 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7420 exc
= "Operation was cancelled"
7421 except Exception as e
:
7422 exc
= traceback
.format_exc()
7423 self
.logger
.critical(
7424 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7426 return "FAILED", "Error in operate VNF {}".format(exc
)
7428 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7430 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7432 :param: vim_account_id: VIM Account ID
7434 :return: (cloud_name, cloud_credential)
7436 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7437 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7439 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7441 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7443 :param: vim_account_id: VIM Account ID
7445 :return: (cloud_name, cloud_credential)
7447 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7448 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7450 async def migrate(self
, nsr_id
, nslcmop_id
):
7452 Migrate VNFs and VDUs instances in a NS
7454 :param: nsr_id: NS Instance ID
7455 :param: nslcmop_id: nslcmop ID of migrate
7458 # Try to lock HA task here
7459 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7460 if not task_is_locked_by_me
:
7462 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7463 self
.logger
.debug(logging_text
+ "Enter")
7464 # get all needed from database
7466 db_nslcmop_update
= {}
7467 nslcmop_operation_state
= None
7471 # in case of error, indicates what part of scale was failed to put nsr at error status
7472 start_deploy
= time()
7475 # wait for any previous tasks in process
7476 step
= "Waiting for previous operations to terminate"
7477 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7479 self
._write
_ns
_status
(
7482 current_operation
="MIGRATING",
7483 current_operation_id
=nslcmop_id
,
7485 step
= "Getting nslcmop from database"
7487 step
+ " after having waited for previous tasks to be completed"
7489 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7490 migrate_params
= db_nslcmop
.get("operationParams")
7493 target
.update(migrate_params
)
7494 desc
= await self
.RO
.migrate(nsr_id
, target
)
7495 self
.logger
.debug("RO return > {}".format(desc
))
7496 action_id
= desc
["action_id"]
7497 await self
._wait
_ng
_ro
(
7502 self
.timeout_migrate
,
7503 operation
="migrate",
7505 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7506 self
.logger
.error("Exit Exception {}".format(e
))
7508 except asyncio
.CancelledError
:
7509 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7510 exc
= "Operation was cancelled"
7511 except Exception as e
:
7512 exc
= traceback
.format_exc()
7513 self
.logger
.critical(
7514 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7517 self
._write
_ns
_status
(
7520 current_operation
="IDLE",
7521 current_operation_id
=None,
7524 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7525 nslcmop_operation_state
= "FAILED"
7527 nslcmop_operation_state
= "COMPLETED"
7528 db_nslcmop_update
["detailed-status"] = "Done"
7529 db_nsr_update
["detailed-status"] = "Done"
7531 self
._write
_op
_status
(
7535 operation_state
=nslcmop_operation_state
,
7536 other_update
=db_nslcmop_update
,
7538 if nslcmop_operation_state
:
7542 "nslcmop_id": nslcmop_id
,
7543 "operationState": nslcmop_operation_state
,
7545 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7546 except Exception as e
:
7548 logging_text
+ "kafka_write notification Exception {}".format(e
)
7550 self
.logger
.debug(logging_text
+ "Exit")
7551 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7553 async def heal(self
, nsr_id
, nslcmop_id
):
7557 :param nsr_id: ns instance to heal
7558 :param nslcmop_id: operation to run
7562 # Try to lock HA task here
7563 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7564 if not task_is_locked_by_me
:
7567 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7568 stage
= ["", "", ""]
7569 tasks_dict_info
= {}
7570 # ^ stage, step, VIM progress
7571 self
.logger
.debug(logging_text
+ "Enter")
7572 # get all needed from database
7574 db_nslcmop_update
= {}
7576 db_vnfrs
= {} # vnf's info indexed by _id
7578 old_operational_status
= ""
7579 old_config_status
= ""
7582 # wait for any previous tasks in process
7583 step
= "Waiting for previous operations to terminate"
7584 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7585 self
._write
_ns
_status
(
7588 current_operation
="HEALING",
7589 current_operation_id
=nslcmop_id
,
7592 step
= "Getting nslcmop from database"
7594 step
+ " after having waited for previous tasks to be completed"
7596 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7598 step
= "Getting nsr from database"
7599 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7600 old_operational_status
= db_nsr
["operational-status"]
7601 old_config_status
= db_nsr
["config-status"]
7604 "_admin.deployed.RO.operational-status": "healing",
7606 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7608 step
= "Sending heal order to VIM"
7609 task_ro
= asyncio
.ensure_future(
7611 logging_text
=logging_text
,
7613 db_nslcmop
=db_nslcmop
,
7617 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7618 tasks_dict_info
[task_ro
] = "Healing at VIM"
7622 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7623 self
.logger
.debug(logging_text
+ stage
[1])
7624 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7625 self
.fs
.sync(db_nsr
["nsd-id"])
7627 # read from db: vnfr's of this ns
7628 step
= "Getting vnfrs from db"
7629 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7630 for vnfr
in db_vnfrs_list
:
7631 db_vnfrs
[vnfr
["_id"]] = vnfr
7632 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7634 # Check for each target VNF
7635 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7636 for target_vnf
in target_list
:
7637 # Find this VNF in the list from DB
7638 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7640 db_vnfr
= db_vnfrs
[vnfr_id
]
7641 vnfd_id
= db_vnfr
.get("vnfd-id")
7642 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7643 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7644 base_folder
= vnfd
["_admin"]["storage"]
7649 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7650 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7652 # Check each target VDU and deploy N2VC
7653 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7656 if not target_vdu_list
:
7657 # Codigo nuevo para crear diccionario
7658 target_vdu_list
= []
7659 for existing_vdu
in db_vnfr
.get("vdur"):
7660 vdu_name
= existing_vdu
.get("vdu-name", None)
7661 vdu_index
= existing_vdu
.get("count-index", 0)
7662 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7665 vdu_to_be_healed
= {
7667 "count-index": vdu_index
,
7668 "run-day1": vdu_run_day1
,
7670 target_vdu_list
.append(vdu_to_be_healed
)
7671 for target_vdu
in target_vdu_list
:
7672 deploy_params_vdu
= target_vdu
7673 # Set run-day1 vnf level value if not vdu level value exists
7674 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7677 deploy_params_vdu
["run-day1"] = target_vnf
[
7680 vdu_name
= target_vdu
.get("vdu-id", None)
7681 # TODO: Get vdu_id from vdud.
7683 # For multi instance VDU count-index is mandatory
7684 # For single session VDU count-indes is 0
7685 vdu_index
= target_vdu
.get("count-index", 0)
7687 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7688 stage
[1] = "Deploying Execution Environments."
7689 self
.logger
.debug(logging_text
+ stage
[1])
7691 # VNF Level charm. Normal case when proxy charms.
7692 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7693 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7694 if descriptor_config
:
7695 # Continue if healed machine is management machine
7696 vnf_ip_address
= db_vnfr
.get("ip-address")
7697 target_instance
= None
7698 for instance
in db_vnfr
.get("vdur", None):
7700 instance
["vdu-name"] == vdu_name
7701 and instance
["count-index"] == vdu_index
7703 target_instance
= instance
7705 if vnf_ip_address
== target_instance
.get("ip-address"):
7707 logging_text
=logging_text
7708 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7709 member_vnf_index
, vdu_name
, vdu_index
7713 nslcmop_id
=nslcmop_id
,
7719 member_vnf_index
=member_vnf_index
,
7722 deploy_params
=deploy_params_vdu
,
7723 descriptor_config
=descriptor_config
,
7724 base_folder
=base_folder
,
7725 task_instantiation_info
=tasks_dict_info
,
7729 # VDU Level charm. Normal case with native charms.
7730 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7731 if descriptor_config
:
7733 logging_text
=logging_text
7734 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7735 member_vnf_index
, vdu_name
, vdu_index
7739 nslcmop_id
=nslcmop_id
,
7745 member_vnf_index
=member_vnf_index
,
7746 vdu_index
=vdu_index
,
7748 deploy_params
=deploy_params_vdu
,
7749 descriptor_config
=descriptor_config
,
7750 base_folder
=base_folder
,
7751 task_instantiation_info
=tasks_dict_info
,
7756 ROclient
.ROClientException
,
7761 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7763 except asyncio
.CancelledError
:
7765 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7767 exc
= "Operation was cancelled"
7768 except Exception as e
:
7769 exc
= traceback
.format_exc()
7770 self
.logger
.critical(
7771 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7776 stage
[1] = "Waiting for healing pending tasks."
7777 self
.logger
.debug(logging_text
+ stage
[1])
7778 exc
= await self
._wait
_for
_tasks
(
7781 self
.timeout_ns_deploy
,
7789 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7790 nslcmop_operation_state
= "FAILED"
7792 db_nsr_update
["operational-status"] = old_operational_status
7793 db_nsr_update
["config-status"] = old_config_status
7796 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7797 for task
, task_name
in tasks_dict_info
.items():
7798 if not task
.done() or task
.cancelled() or task
.exception():
7799 if task_name
.startswith(self
.task_name_deploy_vca
):
7800 # A N2VC task is pending
7801 db_nsr_update
["config-status"] = "failed"
7803 # RO task is pending
7804 db_nsr_update
["operational-status"] = "failed"
7806 error_description_nslcmop
= None
7807 nslcmop_operation_state
= "COMPLETED"
7808 db_nslcmop_update
["detailed-status"] = "Done"
7809 db_nsr_update
["detailed-status"] = "Done"
7810 db_nsr_update
["operational-status"] = "running"
7811 db_nsr_update
["config-status"] = "configured"
7813 self
._write
_op
_status
(
7816 error_message
=error_description_nslcmop
,
7817 operation_state
=nslcmop_operation_state
,
7818 other_update
=db_nslcmop_update
,
7821 self
._write
_ns
_status
(
7824 current_operation
="IDLE",
7825 current_operation_id
=None,
7826 other_update
=db_nsr_update
,
7829 if nslcmop_operation_state
:
7833 "nslcmop_id": nslcmop_id
,
7834 "operationState": nslcmop_operation_state
,
7836 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7837 except Exception as e
:
7839 logging_text
+ "kafka_write notification Exception {}".format(e
)
7841 self
.logger
.debug(logging_text
+ "Exit")
7842 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7853 :param logging_text: preffix text to use at logging
7854 :param nsr_id: nsr identity
7855 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7856 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7857 :return: None or exception
7860 def get_vim_account(vim_account_id
):
7862 if vim_account_id
in db_vims
:
7863 return db_vims
[vim_account_id
]
7864 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7865 db_vims
[vim_account_id
] = db_vim
7870 ns_params
= db_nslcmop
.get("operationParams")
7871 if ns_params
and ns_params
.get("timeout_ns_heal"):
7872 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7874 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7878 nslcmop_id
= db_nslcmop
["_id"]
7880 "action_id": nslcmop_id
,
7882 self
.logger
.warning(
7883 "db_nslcmop={} and timeout_ns_heal={}".format(
7884 db_nslcmop
, timeout_ns_heal
7887 target
.update(db_nslcmop
.get("operationParams", {}))
7889 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7890 desc
= await self
.RO
.recreate(nsr_id
, target
)
7891 self
.logger
.debug("RO return > {}".format(desc
))
7892 action_id
= desc
["action_id"]
7893 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7894 await self
._wait
_ng
_ro
(
7901 operation
="healing",
7906 "_admin.deployed.RO.operational-status": "running",
7907 "detailed-status": " ".join(stage
),
7909 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7910 self
._write
_op
_status
(nslcmop_id
, stage
)
7912 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7915 except Exception as e
:
7916 stage
[2] = "ERROR healing at VIM"
7917 # self.set_vnfr_at_error(db_vnfrs, str(e))
7919 "Error healing at VIM {}".format(e
),
7920 exc_info
=not isinstance(
7923 ROclient
.ROClientException
,
7949 task_instantiation_info
,
7952 # launch instantiate_N2VC in a asyncio task and register task object
7953 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7954 # if not found, create one entry and update database
7955 # fill db_nsr._admin.deployed.VCA.<index>
7958 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7962 get_charm_name
= False
7963 if "execution-environment-list" in descriptor_config
:
7964 ee_list
= descriptor_config
.get("execution-environment-list", [])
7965 elif "juju" in descriptor_config
:
7966 ee_list
= [descriptor_config
] # ns charms
7967 if "execution-environment-list" not in descriptor_config
:
7968 # charm name is only required for ns charms
7969 get_charm_name
= True
7970 else: # other types as script are not supported
7973 for ee_item
in ee_list
:
7976 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7977 ee_item
.get("juju"), ee_item
.get("helm-chart")
7980 ee_descriptor_id
= ee_item
.get("id")
7981 if ee_item
.get("juju"):
7982 vca_name
= ee_item
["juju"].get("charm")
7984 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7987 if ee_item
["juju"].get("charm") is not None
7990 if ee_item
["juju"].get("cloud") == "k8s":
7991 vca_type
= "k8s_proxy_charm"
7992 elif ee_item
["juju"].get("proxy") is False:
7993 vca_type
= "native_charm"
7994 elif ee_item
.get("helm-chart"):
7995 vca_name
= ee_item
["helm-chart"]
7996 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7999 vca_type
= "helm-v3"
8002 logging_text
+ "skipping non juju neither charm configuration"
8007 for vca_index
, vca_deployed
in enumerate(
8008 db_nsr
["_admin"]["deployed"]["VCA"]
8010 if not vca_deployed
:
8013 vca_deployed
.get("member-vnf-index") == member_vnf_index
8014 and vca_deployed
.get("vdu_id") == vdu_id
8015 and vca_deployed
.get("kdu_name") == kdu_name
8016 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8017 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8021 # not found, create one.
8023 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8026 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8028 target
+= "/kdu/{}".format(kdu_name
)
8030 "target_element": target
,
8031 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8032 "member-vnf-index": member_vnf_index
,
8034 "kdu_name": kdu_name
,
8035 "vdu_count_index": vdu_index
,
8036 "operational-status": "init", # TODO revise
8037 "detailed-status": "", # TODO revise
8038 "step": "initial-deploy", # TODO revise
8040 "vdu_name": vdu_name
,
8042 "ee_descriptor_id": ee_descriptor_id
,
8043 "charm_name": charm_name
,
8047 # create VCA and configurationStatus in db
8049 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8050 "configurationStatus.{}".format(vca_index
): dict(),
8052 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8054 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8056 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8057 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8058 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8061 task_n2vc
= asyncio
.ensure_future(
8063 logging_text
=logging_text
,
8064 vca_index
=vca_index
,
8070 vdu_index
=vdu_index
,
8071 deploy_params
=deploy_params
,
8072 config_descriptor
=descriptor_config
,
8073 base_folder
=base_folder
,
8074 nslcmop_id
=nslcmop_id
,
8078 ee_config_descriptor
=ee_item
,
8081 self
.lcm_tasks
.register(
8085 "instantiate_N2VC-{}".format(vca_index
),
8088 task_instantiation_info
[
8090 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8091 member_vnf_index
or "", vdu_id
or ""
8094 async def heal_N2VC(
8111 ee_config_descriptor
,
8113 nsr_id
= db_nsr
["_id"]
8114 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8115 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8116 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8117 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8119 "collection": "nsrs",
8120 "filter": {"_id": nsr_id
},
8121 "path": db_update_entry
,
8127 element_under_configuration
= nsr_id
8131 vnfr_id
= db_vnfr
["_id"]
8132 osm_config
["osm"]["vnf_id"] = vnfr_id
8134 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8136 if vca_type
== "native_charm":
8139 index_number
= vdu_index
or 0
8142 element_type
= "VNF"
8143 element_under_configuration
= vnfr_id
8144 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8146 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8147 element_type
= "VDU"
8148 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8149 osm_config
["osm"]["vdu_id"] = vdu_id
8151 namespace
+= ".{}".format(kdu_name
)
8152 element_type
= "KDU"
8153 element_under_configuration
= kdu_name
8154 osm_config
["osm"]["kdu_name"] = kdu_name
8157 if base_folder
["pkg-dir"]:
8158 artifact_path
= "{}/{}/{}/{}".format(
8159 base_folder
["folder"],
8160 base_folder
["pkg-dir"],
8163 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8168 artifact_path
= "{}/Scripts/{}/{}/".format(
8169 base_folder
["folder"],
8172 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8177 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8179 # get initial_config_primitive_list that applies to this element
8180 initial_config_primitive_list
= config_descriptor
.get(
8181 "initial-config-primitive"
8185 "Initial config primitive list > {}".format(
8186 initial_config_primitive_list
8190 # add config if not present for NS charm
8191 ee_descriptor_id
= ee_config_descriptor
.get("id")
8192 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8193 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8194 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8198 "Initial config primitive list #2 > {}".format(
8199 initial_config_primitive_list
8202 # n2vc_redesign STEP 3.1
8203 # find old ee_id if exists
8204 ee_id
= vca_deployed
.get("ee_id")
8206 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8207 # create or register execution environment in VCA. Only for native charms when healing
8208 if vca_type
== "native_charm":
8209 step
= "Waiting to VM being up and getting IP address"
8210 self
.logger
.debug(logging_text
+ step
)
8211 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8220 credentials
= {"hostname": rw_mgmt_ip
}
8222 username
= deep_get(
8223 config_descriptor
, ("config-access", "ssh-access", "default-user")
8225 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8226 # merged. Meanwhile let's get username from initial-config-primitive
8227 if not username
and initial_config_primitive_list
:
8228 for config_primitive
in initial_config_primitive_list
:
8229 for param
in config_primitive
.get("parameter", ()):
8230 if param
["name"] == "ssh-username":
8231 username
= param
["value"]
8235 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8236 "'config-access.ssh-access.default-user'"
8238 credentials
["username"] = username
8240 # n2vc_redesign STEP 3.2
8241 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8242 self
._write
_configuration
_status
(
8244 vca_index
=vca_index
,
8245 status
="REGISTERING",
8246 element_under_configuration
=element_under_configuration
,
8247 element_type
=element_type
,
8250 step
= "register execution environment {}".format(credentials
)
8251 self
.logger
.debug(logging_text
+ step
)
8252 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8253 credentials
=credentials
,
8254 namespace
=namespace
,
8259 # update ee_id en db
8261 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8263 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8265 # for compatibility with MON/POL modules, the need model and application name at database
8266 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8267 # Not sure if this need to be done when healing
8269 ee_id_parts = ee_id.split(".")
8270 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8271 if len(ee_id_parts) >= 2:
8272 model_name = ee_id_parts[0]
8273 application_name = ee_id_parts[1]
8274 db_nsr_update[db_update_entry + "model"] = model_name
8275 db_nsr_update[db_update_entry + "application"] = application_name
8278 # n2vc_redesign STEP 3.3
8279 # Install configuration software. Only for native charms.
8280 step
= "Install configuration Software"
8282 self
._write
_configuration
_status
(
8284 vca_index
=vca_index
,
8285 status
="INSTALLING SW",
8286 element_under_configuration
=element_under_configuration
,
8287 element_type
=element_type
,
8288 # other_update=db_nsr_update,
8292 # TODO check if already done
8293 self
.logger
.debug(logging_text
+ step
)
8295 if vca_type
== "native_charm":
8296 config_primitive
= next(
8297 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8300 if config_primitive
:
8301 config
= self
._map
_primitive
_params
(
8302 config_primitive
, {}, deploy_params
8304 await self
.vca_map
[vca_type
].install_configuration_sw(
8306 artifact_path
=artifact_path
,
8314 # write in db flag of configuration_sw already installed
8316 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8319 # Not sure if this need to be done when healing
8321 # add relations for this VCA (wait for other peers related with this VCA)
8322 await self._add_vca_relations(
8323 logging_text=logging_text,
8326 vca_index=vca_index,
8330 # if SSH access is required, then get execution environment SSH public
8331 # if native charm we have waited already to VM be UP
8332 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8335 # self.logger.debug("get ssh key block")
8337 config_descriptor
, ("config-access", "ssh-access", "required")
8339 # self.logger.debug("ssh key needed")
8340 # Needed to inject a ssh key
8343 ("config-access", "ssh-access", "default-user"),
8345 step
= "Install configuration Software, getting public ssh key"
8346 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8347 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8350 step
= "Insert public key into VM user={} ssh_key={}".format(
8354 # self.logger.debug("no need to get ssh key")
8355 step
= "Waiting to VM being up and getting IP address"
8356 self
.logger
.debug(logging_text
+ step
)
8358 # n2vc_redesign STEP 5.1
8359 # wait for RO (ip-address) Insert pub_key into VM
8360 # IMPORTANT: We need do wait for RO to complete healing operation.
8361 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8364 rw_mgmt_ip
= await self
.wait_kdu_up(
8365 logging_text
, nsr_id
, vnfr_id
, kdu_name
8368 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8378 rw_mgmt_ip
= None # This is for a NS configuration
8380 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8382 # store rw_mgmt_ip in deploy params for later replacement
8383 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8386 # get run-day1 operation parameter
8387 runDay1
= deploy_params
.get("run-day1", False)
8389 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8392 # n2vc_redesign STEP 6 Execute initial config primitive
8393 step
= "execute initial config primitive"
8395 # wait for dependent primitives execution (NS -> VNF -> VDU)
8396 if initial_config_primitive_list
:
8397 await self
._wait
_dependent
_n
2vc
(
8398 nsr_id
, vca_deployed_list
, vca_index
8401 # stage, in function of element type: vdu, kdu, vnf or ns
8402 my_vca
= vca_deployed_list
[vca_index
]
8403 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8405 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8406 elif my_vca
.get("member-vnf-index"):
8408 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8411 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8413 self
._write
_configuration
_status
(
8414 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8417 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8419 check_if_terminated_needed
= True
8420 for initial_config_primitive
in initial_config_primitive_list
:
8421 # adding information on the vca_deployed if it is a NS execution environment
8422 if not vca_deployed
["member-vnf-index"]:
8423 deploy_params
["ns_config_info"] = json
.dumps(
8424 self
._get
_ns
_config
_info
(nsr_id
)
8426 # TODO check if already done
8427 primitive_params_
= self
._map
_primitive
_params
(
8428 initial_config_primitive
, {}, deploy_params
8431 step
= "execute primitive '{}' params '{}'".format(
8432 initial_config_primitive
["name"], primitive_params_
8434 self
.logger
.debug(logging_text
+ step
)
8435 await self
.vca_map
[vca_type
].exec_primitive(
8437 primitive_name
=initial_config_primitive
["name"],
8438 params_dict
=primitive_params_
,
8443 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8444 if check_if_terminated_needed
:
8445 if config_descriptor
.get("terminate-config-primitive"):
8449 {db_update_entry
+ "needed_terminate": True},
8451 check_if_terminated_needed
= False
8453 # TODO register in database that primitive is done
8455 # STEP 7 Configure metrics
8456 # Not sure if this need to be done when healing
8458 if vca_type == "helm" or vca_type == "helm-v3":
8459 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8461 artifact_path=artifact_path,
8462 ee_config_descriptor=ee_config_descriptor,
8465 target_ip=rw_mgmt_ip,
8471 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8474 for job in prometheus_jobs:
8477 {"job_name": job["job_name"]},
8480 fail_on_empty=False,
8484 step
= "instantiated at VCA"
8485 self
.logger
.debug(logging_text
+ step
)
8487 self
._write
_configuration
_status
(
8488 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8491 except Exception as e
: # TODO not use Exception but N2VC exception
8492 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8494 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8497 "Exception while {} : {}".format(step
, e
), exc_info
=True
8499 self
._write
_configuration
_status
(
8500 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8502 raise LcmException("{} {}".format(step
, e
)) from e
8504 async def _wait_heal_ro(
8510 while time() <= start_time
+ timeout
:
8511 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8512 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8513 "operational-status"
8515 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8516 if operational_status_ro
!= "healing":
8518 await asyncio
.sleep(15, loop
=self
.loop
)
8519 else: # timeout_ns_deploy
8520 raise NgRoException("Timeout waiting ns to deploy")
8522 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8524 Vertical Scale the VDUs in a NS
8526 :param: nsr_id: NS Instance ID
8527 :param: nslcmop_id: nslcmop ID of migrate
8530 # Try to lock HA task here
8531 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8532 if not task_is_locked_by_me
:
8534 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8535 self
.logger
.debug(logging_text
+ "Enter")
8536 # get all needed from database
8538 db_nslcmop_update
= {}
8539 nslcmop_operation_state
= None
8543 # in case of error, indicates what part of scale was failed to put nsr at error status
8544 start_deploy
= time()
8547 # wait for any previous tasks in process
8548 step
= "Waiting for previous operations to terminate"
8549 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8551 self
._write
_ns
_status
(
8554 current_operation
="VerticalScale",
8555 current_operation_id
=nslcmop_id
,
8557 step
= "Getting nslcmop from database"
8559 step
+ " after having waited for previous tasks to be completed"
8561 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8562 operationParams
= db_nslcmop
.get("operationParams")
8564 target
.update(operationParams
)
8565 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8566 self
.logger
.debug("RO return > {}".format(desc
))
8567 action_id
= desc
["action_id"]
8568 await self
._wait
_ng
_ro
(
8573 self
.timeout_verticalscale
,
8574 operation
="verticalscale",
8576 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8577 self
.logger
.error("Exit Exception {}".format(e
))
8579 except asyncio
.CancelledError
:
8580 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8581 exc
= "Operation was cancelled"
8582 except Exception as e
:
8583 exc
= traceback
.format_exc()
8584 self
.logger
.critical(
8585 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8588 self
._write
_ns
_status
(
8591 current_operation
="IDLE",
8592 current_operation_id
=None,
8595 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8596 nslcmop_operation_state
= "FAILED"
8598 nslcmop_operation_state
= "COMPLETED"
8599 db_nslcmop_update
["detailed-status"] = "Done"
8600 db_nsr_update
["detailed-status"] = "Done"
8602 self
._write
_op
_status
(
8606 operation_state
=nslcmop_operation_state
,
8607 other_update
=db_nslcmop_update
,
8609 if nslcmop_operation_state
:
8613 "nslcmop_id": nslcmop_id
,
8614 "operationState": nslcmop_operation_state
,
8616 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8617 except Exception as e
:
8619 logging_text
+ "kafka_write notification Exception {}".format(e
)
8621 self
.logger
.debug(logging_text
+ "Exit")
8622 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")