1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
64 from osm_lcm
.data_utils
.nsd
import (
65 get_ns_configuration_relation_list
,
69 from osm_lcm
.data_utils
.vnfd
import (
75 get_ee_sorted_initial_config_primitive_list
,
76 get_ee_sorted_terminate_config_primitive_list
,
78 get_virtual_link_profiles
,
83 get_number_of_instances
,
85 get_kdu_resource_profile
,
86 find_software_version
,
89 from osm_lcm
.data_utils
.list_utils
import find_in_list
90 from osm_lcm
.data_utils
.vnfr
import (
94 get_volumes_from_instantiation_params
,
96 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
97 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
98 from n2vc
.definitions
import RelationEndpoint
99 from n2vc
.k8s_helm_conn
import K8sHelmConnector
100 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
101 from n2vc
.k8s_juju_conn
import K8sJujuConnector
103 from osm_common
.dbbase
import DbException
104 from osm_common
.fsbase
import FsException
106 from osm_lcm
.data_utils
.database
.database
import Database
107 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
108 from osm_lcm
.data_utils
.wim
import (
110 get_target_wim_attrs
,
111 select_feasible_wim_account
,
114 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
115 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
117 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
118 from osm_lcm
.osm_config
import OsmConfigBuilder
119 from osm_lcm
.prometheus
import parse_job
121 from copy
import copy
, deepcopy
122 from time
import time
123 from uuid
import uuid4
125 from random
import randint
127 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
130 class NsLcm(LcmBase
):
131 timeout_scale_on_error
= (
133 ) # Time for charm from first time at blocked,error status to mark as failed
134 timeout_scale_on_error_outer_factor
= 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
135 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
136 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
137 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
138 timeout_charm_delete
= 10 * 60
139 timeout_primitive
= 30 * 60 # Timeout for primitive execution
140 timeout_primitive_outer_factor
= 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
141 timeout_ns_update
= 30 * 60 # timeout for ns update
142 timeout_progress_primitive
= (
144 ) # timeout for some progress in a primitive execution
145 timeout_migrate
= 1800 # default global timeout for migrating vnfs
146 timeout_operate
= 1800 # default global timeout for migrating vnfs
147 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
148 SUBOPERATION_STATUS_NOT_FOUND
= -1
149 SUBOPERATION_STATUS_NEW
= -2
150 SUBOPERATION_STATUS_SKIP
= -3
151 task_name_deploy_vca
= "Deploying VCA"
153 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
155 Init, Connect to database, filesystem storage, and messaging
156 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
159 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
161 self
.db
= Database().instance
.db
162 self
.fs
= Filesystem().instance
.fs
164 self
.lcm_tasks
= lcm_tasks
165 self
.timeout
= config
["timeout"]
166 self
.ro_config
= config
["ro_config"]
167 self
.ng_ro
= config
["ro_config"].get("ng")
168 self
.vca_config
= config
["VCA"].copy()
170 # create N2VC connector
171 self
.n2vc
= N2VCJujuConnector(
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
179 self
.conn_helm_ee
= LCMHelmConn(
182 vca_config
=self
.vca_config
,
183 on_update_db
=self
._on
_update
_n
2vc
_db
,
186 self
.k8sclusterhelm2
= K8sHelmConnector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helmpath"),
195 self
.k8sclusterhelm3
= K8sHelm3Connector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 helm_command
=self
.vca_config
.get("helm3path"),
204 self
.k8sclusterjuju
= K8sJujuConnector(
205 kubectl_command
=self
.vca_config
.get("kubectlpath"),
206 juju_command
=self
.vca_config
.get("jujupath"),
209 on_update_db
=self
._on
_update
_k
8s
_db
,
214 self
.k8scluster_map
= {
215 "helm-chart": self
.k8sclusterhelm2
,
216 "helm-chart-v3": self
.k8sclusterhelm3
,
217 "chart": self
.k8sclusterhelm3
,
218 "juju-bundle": self
.k8sclusterjuju
,
219 "juju": self
.k8sclusterjuju
,
223 "lxc_proxy_charm": self
.n2vc
,
224 "native_charm": self
.n2vc
,
225 "k8s_proxy_charm": self
.n2vc
,
226 "helm": self
.conn_helm_ee
,
227 "helm-v3": self
.conn_helm_ee
,
231 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
233 self
.op_status_map
= {
234 "instantiation": self
.RO
.status
,
235 "termination": self
.RO
.status
,
236 "migrate": self
.RO
.status
,
237 "healing": self
.RO
.recreate_status
,
238 "verticalscale": self
.RO
.status
,
239 "start_stop_rebuild": self
.RO
.status
,
243 def increment_ip_mac(ip_mac
, vm_index
=1):
244 if not isinstance(ip_mac
, str):
247 # try with ipv4 look for last dot
248 i
= ip_mac
.rfind(".")
251 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
252 # try with ipv6 or mac look for last colon. Operate in hex
253 i
= ip_mac
.rfind(":")
256 # format in hex, len can be 2 for mac or 4 for ipv6
257 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
258 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
264 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
266 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
269 # TODO filter RO descriptor fields...
273 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
274 db_dict
["deploymentStatus"] = ro_descriptor
275 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
277 except Exception as e
:
279 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
282 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
284 # remove last dot from path (if exists)
285 if path
.endswith("."):
288 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
289 # .format(table, filter, path, updated_data))
292 nsr_id
= filter.get("_id")
294 # read ns record from database
295 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
296 current_ns_status
= nsr
.get("nsState")
298 # get vca status for NS
299 status_dict
= await self
.n2vc
.get_status(
300 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
305 db_dict
["vcaStatus"] = status_dict
307 # update configurationStatus for this VCA
309 vca_index
= int(path
[path
.rfind(".") + 1 :])
312 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
314 vca_status
= vca_list
[vca_index
].get("status")
316 configuration_status_list
= nsr
.get("configurationStatus")
317 config_status
= configuration_status_list
[vca_index
].get("status")
319 if config_status
== "BROKEN" and vca_status
!= "failed":
320 db_dict
["configurationStatus"][vca_index
] = "READY"
321 elif config_status
!= "BROKEN" and vca_status
== "failed":
322 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
323 except Exception as e
:
324 # not update configurationStatus
325 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
327 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
328 # if nsState = 'DEGRADED' check if all is OK
330 if current_ns_status
in ("READY", "DEGRADED"):
331 error_description
= ""
333 if status_dict
.get("machines"):
334 for machine_id
in status_dict
.get("machines"):
335 machine
= status_dict
.get("machines").get(machine_id
)
336 # check machine agent-status
337 if machine
.get("agent-status"):
338 s
= machine
.get("agent-status").get("status")
341 error_description
+= (
342 "machine {} agent-status={} ; ".format(
346 # check machine instance status
347 if machine
.get("instance-status"):
348 s
= machine
.get("instance-status").get("status")
351 error_description
+= (
352 "machine {} instance-status={} ; ".format(
357 if status_dict
.get("applications"):
358 for app_id
in status_dict
.get("applications"):
359 app
= status_dict
.get("applications").get(app_id
)
360 # check application status
361 if app
.get("status"):
362 s
= app
.get("status").get("status")
365 error_description
+= (
366 "application {} status={} ; ".format(app_id
, s
)
369 if error_description
:
370 db_dict
["errorDescription"] = error_description
371 if current_ns_status
== "READY" and is_degraded
:
372 db_dict
["nsState"] = "DEGRADED"
373 if current_ns_status
== "DEGRADED" and not is_degraded
:
374 db_dict
["nsState"] = "READY"
377 self
.update_db_2("nsrs", nsr_id
, db_dict
)
379 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
381 except Exception as e
:
382 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
384 async def _on_update_k8s_db(
385 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
388 Updating vca status in NSR record
389 :param cluster_uuid: UUID of a k8s cluster
390 :param kdu_instance: The unique name of the KDU instance
391 :param filter: To get nsr_id
392 :cluster_type: The cluster type (juju, k8s)
396 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
397 # .format(cluster_uuid, kdu_instance, filter))
399 nsr_id
= filter.get("_id")
401 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
402 cluster_uuid
=cluster_uuid
,
403 kdu_instance
=kdu_instance
,
405 complete_status
=True,
411 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
414 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
418 self
.update_db_2("nsrs", nsr_id
, db_dict
)
419 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
421 except Exception as e
:
422 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
425 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
428 undefined
=StrictUndefined
,
429 autoescape
=select_autoescape(default_for_string
=True, default
=True),
431 template
= env
.from_string(cloud_init_text
)
432 return template
.render(additional_params
or {})
433 except UndefinedError
as e
:
435 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
436 "file, must be provided in the instantiation parameters inside the "
437 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
439 except (TemplateError
, TemplateNotFound
) as e
:
441 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
446 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
447 cloud_init_content
= cloud_init_file
= None
449 if vdu
.get("cloud-init-file"):
450 base_folder
= vnfd
["_admin"]["storage"]
451 if base_folder
["pkg-dir"]:
452 cloud_init_file
= "{}/{}/cloud_init/{}".format(
453 base_folder
["folder"],
454 base_folder
["pkg-dir"],
455 vdu
["cloud-init-file"],
458 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
459 base_folder
["folder"],
460 vdu
["cloud-init-file"],
462 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
463 cloud_init_content
= ci_file
.read()
464 elif vdu
.get("cloud-init"):
465 cloud_init_content
= vdu
["cloud-init"]
467 return cloud_init_content
468 except FsException
as e
:
470 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
471 vnfd
["id"], vdu
["id"], cloud_init_file
, e
475 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
477 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
479 additional_params
= vdur
.get("additionalParams")
480 return parse_yaml_strings(additional_params
)
482 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
484 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
485 :param vnfd: input vnfd
486 :param new_id: overrides vnf id if provided
487 :param additionalParams: Instantiation params for VNFs provided
488 :param nsrId: Id of the NSR
489 :return: copy of vnfd
491 vnfd_RO
= deepcopy(vnfd
)
492 # remove unused by RO configuration, monitoring, scaling and internal keys
493 vnfd_RO
.pop("_id", None)
494 vnfd_RO
.pop("_admin", None)
495 vnfd_RO
.pop("monitoring-param", None)
496 vnfd_RO
.pop("scaling-group-descriptor", None)
497 vnfd_RO
.pop("kdu", None)
498 vnfd_RO
.pop("k8s-cluster", None)
500 vnfd_RO
["id"] = new_id
502 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
503 for vdu
in get_iterable(vnfd_RO
, "vdu"):
504 vdu
.pop("cloud-init-file", None)
505 vdu
.pop("cloud-init", None)
509 def ip_profile_2_RO(ip_profile
):
510 RO_ip_profile
= deepcopy(ip_profile
)
511 if "dns-server" in RO_ip_profile
:
512 if isinstance(RO_ip_profile
["dns-server"], list):
513 RO_ip_profile
["dns-address"] = []
514 for ds
in RO_ip_profile
.pop("dns-server"):
515 RO_ip_profile
["dns-address"].append(ds
["address"])
517 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
518 if RO_ip_profile
.get("ip-version") == "ipv4":
519 RO_ip_profile
["ip-version"] = "IPv4"
520 if RO_ip_profile
.get("ip-version") == "ipv6":
521 RO_ip_profile
["ip-version"] = "IPv6"
522 if "dhcp-params" in RO_ip_profile
:
523 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
526 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
527 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
528 if db_vim
["_admin"]["operationalState"] != "ENABLED":
530 "VIM={} is not available. operationalState={}".format(
531 vim_account
, db_vim
["_admin"]["operationalState"]
534 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
537 def get_ro_wim_id_for_wim_account(self
, wim_account
):
538 if isinstance(wim_account
, str):
539 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
540 if db_wim
["_admin"]["operationalState"] != "ENABLED":
542 "WIM={} is not available. operationalState={}".format(
543 wim_account
, db_wim
["_admin"]["operationalState"]
546 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
551 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
553 db_vdu_push_list
= []
555 db_update
= {"_admin.modified": time()}
557 for vdu_id
, vdu_count
in vdu_create
.items():
561 for vdur
in reversed(db_vnfr
["vdur"])
562 if vdur
["vdu-id-ref"] == vdu_id
567 # Read the template saved in the db:
569 "No vdur in the database. Using the vdur-template to scale"
571 vdur_template
= db_vnfr
.get("vdur-template")
572 if not vdur_template
:
574 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
578 vdur
= vdur_template
[0]
579 # Delete a template from the database after using it
582 {"_id": db_vnfr
["_id"]},
584 pull
={"vdur-template": {"_id": vdur
["_id"]}},
586 for count
in range(vdu_count
):
587 vdur_copy
= deepcopy(vdur
)
588 vdur_copy
["status"] = "BUILD"
589 vdur_copy
["status-detailed"] = None
590 vdur_copy
["ip-address"] = None
591 vdur_copy
["_id"] = str(uuid4())
592 vdur_copy
["count-index"] += count
+ 1
593 vdur_copy
["id"] = "{}-{}".format(
594 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
596 vdur_copy
.pop("vim_info", None)
597 for iface
in vdur_copy
["interfaces"]:
598 if iface
.get("fixed-ip"):
599 iface
["ip-address"] = self
.increment_ip_mac(
600 iface
["ip-address"], count
+ 1
603 iface
.pop("ip-address", None)
604 if iface
.get("fixed-mac"):
605 iface
["mac-address"] = self
.increment_ip_mac(
606 iface
["mac-address"], count
+ 1
609 iface
.pop("mac-address", None)
613 ) # only first vdu can be managment of vnf
614 db_vdu_push_list
.append(vdur_copy
)
615 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
617 if len(db_vnfr
["vdur"]) == 1:
618 # The scale will move to 0 instances
620 "Scaling to 0 !, creating the template with the last vdur"
622 template_vdur
= [db_vnfr
["vdur"][0]]
623 for vdu_id
, vdu_count
in vdu_delete
.items():
625 indexes_to_delete
= [
627 for iv
in enumerate(db_vnfr
["vdur"])
628 if iv
[1]["vdu-id-ref"] == vdu_id
632 "vdur.{}.status".format(i
): "DELETING"
633 for i
in indexes_to_delete
[-vdu_count
:]
637 # it must be deleted one by one because common.db does not allow otherwise
640 for v
in reversed(db_vnfr
["vdur"])
641 if v
["vdu-id-ref"] == vdu_id
643 for vdu
in vdus_to_delete
[:vdu_count
]:
646 {"_id": db_vnfr
["_id"]},
648 pull
={"vdur": {"_id": vdu
["_id"]}},
652 db_push
["vdur"] = db_vdu_push_list
654 db_push
["vdur-template"] = template_vdur
657 db_vnfr
["vdur-template"] = template_vdur
658 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
659 # modify passed dictionary db_vnfr
660 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
661 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
663 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
665 Updates database nsr with the RO info for the created vld
666 :param ns_update_nsr: dictionary to be filled with the updated info
667 :param db_nsr: content of db_nsr. This is also modified
668 :param nsr_desc_RO: nsr descriptor from RO
669 :return: Nothing, LcmException is raised on errors
672 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
673 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
674 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
676 vld
["vim-id"] = net_RO
.get("vim_net_id")
677 vld
["name"] = net_RO
.get("vim_name")
678 vld
["status"] = net_RO
.get("status")
679 vld
["status-detailed"] = net_RO
.get("error_msg")
680 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
684 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
687 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
689 for db_vnfr
in db_vnfrs
.values():
690 vnfr_update
= {"status": "ERROR"}
691 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
692 if "status" not in vdur
:
693 vdur
["status"] = "ERROR"
694 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
696 vdur
["status-detailed"] = str(error_text
)
698 "vdur.{}.status-detailed".format(vdu_index
)
700 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
701 except DbException
as e
:
702 self
.logger
.error("Cannot update vnf. {}".format(e
))
704 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
706 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
707 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
708 :param nsr_desc_RO: nsr descriptor from RO
709 :return: Nothing, LcmException is raised on errors
711 for vnf_index
, db_vnfr
in db_vnfrs
.items():
712 for vnf_RO
in nsr_desc_RO
["vnfs"]:
713 if vnf_RO
["member_vnf_index"] != vnf_index
:
716 if vnf_RO
.get("ip_address"):
717 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
720 elif not db_vnfr
.get("ip-address"):
721 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
722 raise LcmExceptionNoMgmtIP(
723 "ns member_vnf_index '{}' has no IP address".format(
728 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
729 vdur_RO_count_index
= 0
730 if vdur
.get("pdu-type"):
732 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
733 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
735 if vdur
["count-index"] != vdur_RO_count_index
:
736 vdur_RO_count_index
+= 1
738 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
739 if vdur_RO
.get("ip_address"):
740 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
742 vdur
["ip-address"] = None
743 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
744 vdur
["name"] = vdur_RO
.get("vim_name")
745 vdur
["status"] = vdur_RO
.get("status")
746 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
747 for ifacer
in get_iterable(vdur
, "interfaces"):
748 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
749 if ifacer
["name"] == interface_RO
.get("internal_name"):
750 ifacer
["ip-address"] = interface_RO
.get(
753 ifacer
["mac-address"] = interface_RO
.get(
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
760 "from VIM info".format(
761 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
764 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
768 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
770 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
774 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
775 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
776 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
778 vld
["vim-id"] = net_RO
.get("vim_net_id")
779 vld
["name"] = net_RO
.get("vim_name")
780 vld
["status"] = net_RO
.get("status")
781 vld
["status-detailed"] = net_RO
.get("error_msg")
782 vnfr_update
["vld.{}".format(vld_index
)] = vld
786 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
791 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
796 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
801 def _get_ns_config_info(self
, nsr_id
):
803 Generates a mapping between vnf,vdu elements and the N2VC id
804 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
805 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
806 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
807 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
809 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
810 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
812 ns_config_info
= {"osm-config-mapping": mapping
}
813 for vca
in vca_deployed_list
:
814 if not vca
["member-vnf-index"]:
816 if not vca
["vdu_id"]:
817 mapping
[vca
["member-vnf-index"]] = vca
["application"]
821 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
823 ] = vca
["application"]
824 return ns_config_info
826 async def _instantiate_ng_ro(
843 def get_vim_account(vim_account_id
):
845 if vim_account_id
in db_vims
:
846 return db_vims
[vim_account_id
]
847 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
848 db_vims
[vim_account_id
] = db_vim
851 # modify target_vld info with instantiation parameters
852 def parse_vld_instantiation_params(
853 target_vim
, target_vld
, vld_params
, target_sdn
855 if vld_params
.get("ip-profile"):
856 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
859 if vld_params
.get("provider-network"):
860 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
863 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
864 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
868 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
869 # if wim_account_id is specified in vld_params, validate if it is feasible.
870 wim_account_id
, db_wim
= select_feasible_wim_account(
871 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
875 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
876 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
877 # update vld_params with correct WIM account Id
878 vld_params
["wimAccountId"] = wim_account_id
880 target_wim
= "wim:{}".format(wim_account_id
)
881 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
882 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
883 if len(sdn_ports
) > 0:
884 target_vld
["vim_info"][target_wim
] = target_wim_attrs
885 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
888 "Target VLD with WIM data: {:s}".format(str(target_vld
))
891 for param
in ("vim-network-name", "vim-network-id"):
892 if vld_params
.get(param
):
893 if isinstance(vld_params
[param
], dict):
894 for vim
, vim_net
in vld_params
[param
].items():
895 other_target_vim
= "vim:" + vim
897 target_vld
["vim_info"],
898 (other_target_vim
, param
.replace("-", "_")),
901 else: # isinstance str
902 target_vld
["vim_info"][target_vim
][
903 param
.replace("-", "_")
904 ] = vld_params
[param
]
905 if vld_params
.get("common_id"):
906 target_vld
["common_id"] = vld_params
.get("common_id")
908 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
909 def update_ns_vld_target(target
, ns_params
):
910 for vnf_params
in ns_params
.get("vnf", ()):
911 if vnf_params
.get("vimAccountId"):
915 for vnfr
in db_vnfrs
.values()
916 if vnf_params
["member-vnf-index"]
917 == vnfr
["member-vnf-index-ref"]
921 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
924 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
925 target_vld
= find_in_list(
926 get_iterable(vdur
, "interfaces"),
927 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
930 vld_params
= find_in_list(
931 get_iterable(ns_params
, "vld"),
932 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
936 if vnf_params
.get("vimAccountId") not in a_vld
.get(
939 target_vim_network_list
= [
940 v
for _
, v
in a_vld
.get("vim_info").items()
942 target_vim_network_name
= next(
944 item
.get("vim_network_name", "")
945 for item
in target_vim_network_list
950 target
["ns"]["vld"][a_index
].get("vim_info").update(
952 "vim:{}".format(vnf_params
["vimAccountId"]): {
953 "vim_network_name": target_vim_network_name
,
959 for param
in ("vim-network-name", "vim-network-id"):
960 if vld_params
.get(param
) and isinstance(
961 vld_params
[param
], dict
963 for vim
, vim_net
in vld_params
[
966 other_target_vim
= "vim:" + vim
968 target
["ns"]["vld"][a_index
].get(
973 param
.replace("-", "_"),
978 nslcmop_id
= db_nslcmop
["_id"]
980 "name": db_nsr
["name"],
983 "image": deepcopy(db_nsr
["image"]),
984 "flavor": deepcopy(db_nsr
["flavor"]),
985 "action_id": nslcmop_id
,
986 "cloud_init_content": {},
988 for image
in target
["image"]:
989 image
["vim_info"] = {}
990 for flavor
in target
["flavor"]:
991 flavor
["vim_info"] = {}
992 if db_nsr
.get("affinity-or-anti-affinity-group"):
993 target
["affinity-or-anti-affinity-group"] = deepcopy(
994 db_nsr
["affinity-or-anti-affinity-group"]
996 for affinity_or_anti_affinity_group
in target
[
997 "affinity-or-anti-affinity-group"
999 affinity_or_anti_affinity_group
["vim_info"] = {}
1001 if db_nslcmop
.get("lcmOperationType") != "instantiate":
1002 # get parameters of instantiation:
1003 db_nslcmop_instantiate
= self
.db
.get_list(
1006 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1007 "lcmOperationType": "instantiate",
1010 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1012 ns_params
= db_nslcmop
.get("operationParams")
1013 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1014 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1017 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1018 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1021 "name": vld
["name"],
1022 "mgmt-network": vld
.get("mgmt-network", False),
1023 "type": vld
.get("type"),
1026 "vim_network_name": vld
.get("vim-network-name"),
1027 "vim_account_id": ns_params
["vimAccountId"],
1031 # check if this network needs SDN assist
1032 if vld
.get("pci-interfaces"):
1033 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1034 sdnc_id
= db_vim
["config"].get("sdn-controller")
1036 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1037 target_sdn
= "sdn:{}".format(sdnc_id
)
1038 target_vld
["vim_info"][target_sdn
] = {
1040 "target_vim": target_vim
,
1042 "type": vld
.get("type"),
1045 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1046 for nsd_vnf_profile
in nsd_vnf_profiles
:
1047 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1048 if cp
["virtual-link-profile-id"] == vld
["id"]:
1050 "member_vnf:{}.{}".format(
1051 cp
["constituent-cpd-id"][0][
1052 "constituent-base-element-id"
1054 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1056 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1058 # check at nsd descriptor, if there is an ip-profile
1060 nsd_vlp
= find_in_list(
1061 get_virtual_link_profiles(nsd
),
1062 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1067 and nsd_vlp
.get("virtual-link-protocol-data")
1068 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1070 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1073 ip_profile_dest_data
= {}
1074 if "ip-version" in ip_profile_source_data
:
1075 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1078 if "cidr" in ip_profile_source_data
:
1079 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1082 if "gateway-ip" in ip_profile_source_data
:
1083 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1086 if "dhcp-enabled" in ip_profile_source_data
:
1087 ip_profile_dest_data
["dhcp-params"] = {
1088 "enabled": ip_profile_source_data
["dhcp-enabled"]
1090 vld_params
["ip-profile"] = ip_profile_dest_data
1092 # update vld_params with instantiation params
1093 vld_instantiation_params
= find_in_list(
1094 get_iterable(ns_params
, "vld"),
1095 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1097 if vld_instantiation_params
:
1098 vld_params
.update(vld_instantiation_params
)
1099 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1100 target
["ns"]["vld"].append(target_vld
)
1101 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1102 update_ns_vld_target(target
, ns_params
)
1104 for vnfr
in db_vnfrs
.values():
1105 vnfd
= find_in_list(
1106 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1108 vnf_params
= find_in_list(
1109 get_iterable(ns_params
, "vnf"),
1110 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1112 target_vnf
= deepcopy(vnfr
)
1113 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1114 for vld
in target_vnf
.get("vld", ()):
1115 # check if connected to a ns.vld, to fill target'
1116 vnf_cp
= find_in_list(
1117 vnfd
.get("int-virtual-link-desc", ()),
1118 lambda cpd
: cpd
.get("id") == vld
["id"],
1121 ns_cp
= "member_vnf:{}.{}".format(
1122 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1124 if cp2target
.get(ns_cp
):
1125 vld
["target"] = cp2target
[ns_cp
]
1128 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1130 # check if this network needs SDN assist
1132 if vld
.get("pci-interfaces"):
1133 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1134 sdnc_id
= db_vim
["config"].get("sdn-controller")
1136 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1137 target_sdn
= "sdn:{}".format(sdnc_id
)
1138 vld
["vim_info"][target_sdn
] = {
1140 "target_vim": target_vim
,
1142 "type": vld
.get("type"),
1145 # check at vnfd descriptor, if there is an ip-profile
1147 vnfd_vlp
= find_in_list(
1148 get_virtual_link_profiles(vnfd
),
1149 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1153 and vnfd_vlp
.get("virtual-link-protocol-data")
1154 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1156 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1159 ip_profile_dest_data
= {}
1160 if "ip-version" in ip_profile_source_data
:
1161 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1164 if "cidr" in ip_profile_source_data
:
1165 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1168 if "gateway-ip" in ip_profile_source_data
:
1169 ip_profile_dest_data
[
1171 ] = ip_profile_source_data
["gateway-ip"]
1172 if "dhcp-enabled" in ip_profile_source_data
:
1173 ip_profile_dest_data
["dhcp-params"] = {
1174 "enabled": ip_profile_source_data
["dhcp-enabled"]
1177 vld_params
["ip-profile"] = ip_profile_dest_data
1178 # update vld_params with instantiation params
1180 vld_instantiation_params
= find_in_list(
1181 get_iterable(vnf_params
, "internal-vld"),
1182 lambda i_vld
: i_vld
["name"] == vld
["id"],
1184 if vld_instantiation_params
:
1185 vld_params
.update(vld_instantiation_params
)
1186 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1189 for vdur
in target_vnf
.get("vdur", ()):
1190 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1191 continue # This vdu must not be created
1192 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1194 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1197 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1198 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1201 and vdu_configuration
.get("config-access")
1202 and vdu_configuration
.get("config-access").get("ssh-access")
1204 vdur
["ssh-keys"] = ssh_keys_all
1205 vdur
["ssh-access-required"] = vdu_configuration
[
1207 ]["ssh-access"]["required"]
1210 and vnf_configuration
.get("config-access")
1211 and vnf_configuration
.get("config-access").get("ssh-access")
1212 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1214 vdur
["ssh-keys"] = ssh_keys_all
1215 vdur
["ssh-access-required"] = vnf_configuration
[
1217 ]["ssh-access"]["required"]
1218 elif ssh_keys_instantiation
and find_in_list(
1219 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1221 vdur
["ssh-keys"] = ssh_keys_instantiation
1223 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1225 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1227 if vdud
.get("cloud-init-file"):
1228 vdur
["cloud-init"] = "{}:file:{}".format(
1229 vnfd
["_id"], vdud
.get("cloud-init-file")
1231 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1232 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1233 base_folder
= vnfd
["_admin"]["storage"]
1234 if base_folder
["pkg-dir"]:
1235 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1236 base_folder
["folder"],
1237 base_folder
["pkg-dir"],
1238 vdud
.get("cloud-init-file"),
1241 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1242 base_folder
["folder"],
1243 vdud
.get("cloud-init-file"),
1245 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1246 target
["cloud_init_content"][
1249 elif vdud
.get("cloud-init"):
1250 vdur
["cloud-init"] = "{}:vdu:{}".format(
1251 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1253 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1254 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1257 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1258 deploy_params_vdu
= self
._format
_additional
_params
(
1259 vdur
.get("additionalParams") or {}
1261 deploy_params_vdu
["OSM"] = get_osm_params(
1262 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1264 vdur
["additionalParams"] = deploy_params_vdu
1267 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1268 if target_vim
not in ns_flavor
["vim_info"]:
1269 ns_flavor
["vim_info"][target_vim
] = {}
1272 # in case alternative images are provided we must check if they should be applied
1273 # for the vim_type, modify the vim_type taking into account
1274 ns_image_id
= int(vdur
["ns-image-id"])
1275 if vdur
.get("alt-image-ids"):
1276 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1277 vim_type
= db_vim
["vim_type"]
1278 for alt_image_id
in vdur
.get("alt-image-ids"):
1279 ns_alt_image
= target
["image"][int(alt_image_id
)]
1280 if vim_type
== ns_alt_image
.get("vim-type"):
1281 # must use alternative image
1283 "use alternative image id: {}".format(alt_image_id
)
1285 ns_image_id
= alt_image_id
1286 vdur
["ns-image-id"] = ns_image_id
1288 ns_image
= target
["image"][int(ns_image_id
)]
1289 if target_vim
not in ns_image
["vim_info"]:
1290 ns_image
["vim_info"][target_vim
] = {}
1293 if vdur
.get("affinity-or-anti-affinity-group-id"):
1294 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1295 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1296 if target_vim
not in ns_ags
["vim_info"]:
1297 ns_ags
["vim_info"][target_vim
] = {}
1299 vdur
["vim_info"] = {target_vim
: {}}
1300 # instantiation parameters
1302 vdu_instantiation_params
= find_in_list(
1303 get_iterable(vnf_params
, "vdu"),
1304 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1306 if vdu_instantiation_params
:
1307 # Parse the vdu_volumes from the instantiation params
1308 vdu_volumes
= get_volumes_from_instantiation_params(
1309 vdu_instantiation_params
, vdud
1311 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1312 vdur_list
.append(vdur
)
1313 target_vnf
["vdur"] = vdur_list
1314 target
["vnf"].append(target_vnf
)
1316 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1317 desc
= await self
.RO
.deploy(nsr_id
, target
)
1318 self
.logger
.debug("RO return > {}".format(desc
))
1319 action_id
= desc
["action_id"]
1320 await self
._wait
_ng
_ro
(
1327 operation
="instantiation",
1332 "_admin.deployed.RO.operational-status": "running",
1333 "detailed-status": " ".join(stage
),
1335 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1336 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1337 self
._write
_op
_status
(nslcmop_id
, stage
)
1339 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1343 async def _wait_ng_ro(
1353 detailed_status_old
= None
1355 start_time
= start_time
or time()
1356 while time() <= start_time
+ timeout
:
1357 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1358 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1359 if desc_status
["status"] == "FAILED":
1360 raise NgRoException(desc_status
["details"])
1361 elif desc_status
["status"] == "BUILD":
1363 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1364 elif desc_status
["status"] == "DONE":
1366 stage
[2] = "Deployed at VIM"
1369 assert False, "ROclient.check_ns_status returns unknown {}".format(
1370 desc_status
["status"]
1372 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1373 detailed_status_old
= stage
[2]
1374 db_nsr_update
["detailed-status"] = " ".join(stage
)
1375 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1376 self
._write
_op
_status
(nslcmop_id
, stage
)
1377 await asyncio
.sleep(15, loop
=self
.loop
)
1378 else: # timeout_ns_deploy
1379 raise NgRoException("Timeout waiting ns to deploy")
1381 async def _terminate_ng_ro(
1382 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1387 start_deploy
= time()
1394 "action_id": nslcmop_id
,
1396 desc
= await self
.RO
.deploy(nsr_id
, target
)
1397 action_id
= desc
["action_id"]
1398 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1399 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1402 + "ns terminate action at RO. action_id={}".format(action_id
)
1406 delete_timeout
= 20 * 60 # 20 minutes
1407 await self
._wait
_ng
_ro
(
1414 operation
="termination",
1417 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1418 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1420 await self
.RO
.delete(nsr_id
)
1421 except Exception as e
:
1422 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1423 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1424 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1425 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1427 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1429 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1430 failed_detail
.append("delete conflict: {}".format(e
))
1433 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1436 failed_detail
.append("delete error: {}".format(e
))
1439 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1443 stage
[2] = "Error deleting from VIM"
1445 stage
[2] = "Deleted from VIM"
1446 db_nsr_update
["detailed-status"] = " ".join(stage
)
1447 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1448 self
._write
_op
_status
(nslcmop_id
, stage
)
1451 raise LcmException("; ".join(failed_detail
))
1454 async def instantiate_RO(
1468 :param logging_text: preffix text to use at logging
1469 :param nsr_id: nsr identity
1470 :param nsd: database content of ns descriptor
1471 :param db_nsr: database content of ns record
1472 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1474 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1475 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1476 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1477 :return: None or exception
1480 start_deploy
= time()
1481 ns_params
= db_nslcmop
.get("operationParams")
1482 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1483 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1485 timeout_ns_deploy
= self
.timeout
.get(
1486 "ns_deploy", self
.timeout_ns_deploy
1489 # Check for and optionally request placement optimization. Database will be updated if placement activated
1490 stage
[2] = "Waiting for Placement."
1491 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1492 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1493 for vnfr
in db_vnfrs
.values():
1494 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1497 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1499 return await self
._instantiate
_ng
_ro
(
1512 except Exception as e
:
1513 stage
[2] = "ERROR deploying at VIM"
1514 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1516 "Error deploying at VIM {}".format(e
),
1517 exc_info
=not isinstance(
1520 ROclient
.ROClientException
,
1529 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1531 Wait for kdu to be up, get ip address
1532 :param logging_text: prefix use for logging
1536 :return: IP address, K8s services
1539 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1542 while nb_tries
< 360:
1543 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1547 for x
in get_iterable(db_vnfr
, "kdur")
1548 if x
.get("kdu-name") == kdu_name
1554 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1556 if kdur
.get("status"):
1557 if kdur
["status"] in ("READY", "ENABLED"):
1558 return kdur
.get("ip-address"), kdur
.get("services")
1561 "target KDU={} is in error state".format(kdu_name
)
1564 await asyncio
.sleep(10, loop
=self
.loop
)
1566 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1568 async def wait_vm_up_insert_key_ro(
1569 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1572 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1573 :param logging_text: prefix use for logging
1578 :param pub_key: public ssh key to inject, None to skip
1579 :param user: user to apply the public ssh key
1583 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1587 target_vdu_id
= None
1593 if ro_retries
>= 360: # 1 hour
1595 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1598 await asyncio
.sleep(10, loop
=self
.loop
)
1601 if not target_vdu_id
:
1602 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1604 if not vdu_id
: # for the VNF case
1605 if db_vnfr
.get("status") == "ERROR":
1607 "Cannot inject ssh-key because target VNF is in error state"
1609 ip_address
= db_vnfr
.get("ip-address")
1615 for x
in get_iterable(db_vnfr
, "vdur")
1616 if x
.get("ip-address") == ip_address
1624 for x
in get_iterable(db_vnfr
, "vdur")
1625 if x
.get("vdu-id-ref") == vdu_id
1626 and x
.get("count-index") == vdu_index
1632 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1633 ): # If only one, this should be the target vdu
1634 vdur
= db_vnfr
["vdur"][0]
1637 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1638 vnfr_id
, vdu_id
, vdu_index
1641 # New generation RO stores information at "vim_info"
1644 if vdur
.get("vim_info"):
1646 t
for t
in vdur
["vim_info"]
1647 ) # there should be only one key
1648 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1650 vdur
.get("pdu-type")
1651 or vdur
.get("status") == "ACTIVE"
1652 or ng_ro_status
== "ACTIVE"
1654 ip_address
= vdur
.get("ip-address")
1657 target_vdu_id
= vdur
["vdu-id-ref"]
1658 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1660 "Cannot inject ssh-key because target VM is in error state"
1663 if not target_vdu_id
:
1666 # inject public key into machine
1667 if pub_key
and user
:
1668 self
.logger
.debug(logging_text
+ "Inserting RO key")
1669 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1670 if vdur
.get("pdu-type"):
1671 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1674 ro_vm_id
= "{}-{}".format(
1675 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1676 ) # TODO add vdu_index
1680 "action": "inject_ssh_key",
1684 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1686 desc
= await self
.RO
.deploy(nsr_id
, target
)
1687 action_id
= desc
["action_id"]
1688 await self
._wait
_ng
_ro
(
1689 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1693 # wait until NS is deployed at RO
1695 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1696 ro_nsr_id
= deep_get(
1697 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1701 result_dict
= await self
.RO
.create_action(
1703 item_id_name
=ro_nsr_id
,
1705 "add_public_key": pub_key
,
1710 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1711 if not result_dict
or not isinstance(result_dict
, dict):
1713 "Unknown response from RO when injecting key"
1715 for result
in result_dict
.values():
1716 if result
.get("vim_result") == 200:
1719 raise ROclient
.ROClientException(
1720 "error injecting key: {}".format(
1721 result
.get("description")
1725 except NgRoException
as e
:
1727 "Reaching max tries injecting key. Error: {}".format(e
)
1729 except ROclient
.ROClientException
as e
:
1733 + "error injecting key: {}. Retrying until {} seconds".format(
1740 "Reaching max tries injecting key. Error: {}".format(e
)
1747 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1749 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1751 my_vca
= vca_deployed_list
[vca_index
]
1752 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1753 # vdu or kdu: no dependencies
1757 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1758 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1759 configuration_status_list
= db_nsr
["configurationStatus"]
1760 for index
, vca_deployed
in enumerate(configuration_status_list
):
1761 if index
== vca_index
:
1764 if not my_vca
.get("member-vnf-index") or (
1765 vca_deployed
.get("member-vnf-index")
1766 == my_vca
.get("member-vnf-index")
1768 internal_status
= configuration_status_list
[index
].get("status")
1769 if internal_status
== "READY":
1771 elif internal_status
== "BROKEN":
1773 "Configuration aborted because dependent charm/s has failed"
1778 # no dependencies, return
1780 await asyncio
.sleep(10)
1783 raise LcmException("Configuration aborted because dependent charm/s timeout")
1785 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1788 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1790 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1791 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1794 async def instantiate_N2VC(
1811 ee_config_descriptor
,
1813 nsr_id
= db_nsr
["_id"]
1814 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1815 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1816 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1817 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1819 "collection": "nsrs",
1820 "filter": {"_id": nsr_id
},
1821 "path": db_update_entry
,
1827 element_under_configuration
= nsr_id
1831 vnfr_id
= db_vnfr
["_id"]
1832 osm_config
["osm"]["vnf_id"] = vnfr_id
1834 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1836 if vca_type
== "native_charm":
1839 index_number
= vdu_index
or 0
1842 element_type
= "VNF"
1843 element_under_configuration
= vnfr_id
1844 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1846 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1847 element_type
= "VDU"
1848 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1849 osm_config
["osm"]["vdu_id"] = vdu_id
1851 namespace
+= ".{}".format(kdu_name
)
1852 element_type
= "KDU"
1853 element_under_configuration
= kdu_name
1854 osm_config
["osm"]["kdu_name"] = kdu_name
1857 if base_folder
["pkg-dir"]:
1858 artifact_path
= "{}/{}/{}/{}".format(
1859 base_folder
["folder"],
1860 base_folder
["pkg-dir"],
1863 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1868 artifact_path
= "{}/Scripts/{}/{}/".format(
1869 base_folder
["folder"],
1872 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1877 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1879 # get initial_config_primitive_list that applies to this element
1880 initial_config_primitive_list
= config_descriptor
.get(
1881 "initial-config-primitive"
1885 "Initial config primitive list > {}".format(
1886 initial_config_primitive_list
1890 # add config if not present for NS charm
1891 ee_descriptor_id
= ee_config_descriptor
.get("id")
1892 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1893 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1894 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1898 "Initial config primitive list #2 > {}".format(
1899 initial_config_primitive_list
1902 # n2vc_redesign STEP 3.1
1903 # find old ee_id if exists
1904 ee_id
= vca_deployed
.get("ee_id")
1906 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1907 # create or register execution environment in VCA
1908 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1910 self
._write
_configuration
_status
(
1912 vca_index
=vca_index
,
1914 element_under_configuration
=element_under_configuration
,
1915 element_type
=element_type
,
1918 step
= "create execution environment"
1919 self
.logger
.debug(logging_text
+ step
)
1923 if vca_type
== "k8s_proxy_charm":
1924 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1925 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1926 namespace
=namespace
,
1927 artifact_path
=artifact_path
,
1931 elif vca_type
== "helm" or vca_type
== "helm-v3":
1932 ee_id
, credentials
= await self
.vca_map
[
1934 ].create_execution_environment(
1935 namespace
=namespace
,
1939 artifact_path
=artifact_path
,
1940 chart_model
=vca_name
,
1944 ee_id
, credentials
= await self
.vca_map
[
1946 ].create_execution_environment(
1947 namespace
=namespace
,
1953 elif vca_type
== "native_charm":
1954 step
= "Waiting to VM being up and getting IP address"
1955 self
.logger
.debug(logging_text
+ step
)
1956 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1965 credentials
= {"hostname": rw_mgmt_ip
}
1967 username
= deep_get(
1968 config_descriptor
, ("config-access", "ssh-access", "default-user")
1970 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1971 # merged. Meanwhile let's get username from initial-config-primitive
1972 if not username
and initial_config_primitive_list
:
1973 for config_primitive
in initial_config_primitive_list
:
1974 for param
in config_primitive
.get("parameter", ()):
1975 if param
["name"] == "ssh-username":
1976 username
= param
["value"]
1980 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1981 "'config-access.ssh-access.default-user'"
1983 credentials
["username"] = username
1984 # n2vc_redesign STEP 3.2
1986 self
._write
_configuration
_status
(
1988 vca_index
=vca_index
,
1989 status
="REGISTERING",
1990 element_under_configuration
=element_under_configuration
,
1991 element_type
=element_type
,
1994 step
= "register execution environment {}".format(credentials
)
1995 self
.logger
.debug(logging_text
+ step
)
1996 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1997 credentials
=credentials
,
1998 namespace
=namespace
,
2003 # for compatibility with MON/POL modules, the need model and application name at database
2004 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
2005 ee_id_parts
= ee_id
.split(".")
2006 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
2007 if len(ee_id_parts
) >= 2:
2008 model_name
= ee_id_parts
[0]
2009 application_name
= ee_id_parts
[1]
2010 db_nsr_update
[db_update_entry
+ "model"] = model_name
2011 db_nsr_update
[db_update_entry
+ "application"] = application_name
2013 # n2vc_redesign STEP 3.3
2014 step
= "Install configuration Software"
2016 self
._write
_configuration
_status
(
2018 vca_index
=vca_index
,
2019 status
="INSTALLING SW",
2020 element_under_configuration
=element_under_configuration
,
2021 element_type
=element_type
,
2022 other_update
=db_nsr_update
,
2025 # TODO check if already done
2026 self
.logger
.debug(logging_text
+ step
)
2028 if vca_type
== "native_charm":
2029 config_primitive
= next(
2030 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2033 if config_primitive
:
2034 config
= self
._map
_primitive
_params
(
2035 config_primitive
, {}, deploy_params
2038 if vca_type
== "lxc_proxy_charm":
2039 if element_type
== "NS":
2040 num_units
= db_nsr
.get("config-units") or 1
2041 elif element_type
== "VNF":
2042 num_units
= db_vnfr
.get("config-units") or 1
2043 elif element_type
== "VDU":
2044 for v
in db_vnfr
["vdur"]:
2045 if vdu_id
== v
["vdu-id-ref"]:
2046 num_units
= v
.get("config-units") or 1
2048 if vca_type
!= "k8s_proxy_charm":
2049 await self
.vca_map
[vca_type
].install_configuration_sw(
2051 artifact_path
=artifact_path
,
2054 num_units
=num_units
,
2059 # write in db flag of configuration_sw already installed
2061 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2064 # add relations for this VCA (wait for other peers related with this VCA)
2065 await self
._add
_vca
_relations
(
2066 logging_text
=logging_text
,
2069 vca_index
=vca_index
,
2072 # if SSH access is required, then get execution environment SSH public
2073 # if native charm we have waited already to VM be UP
2074 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2077 # self.logger.debug("get ssh key block")
2079 config_descriptor
, ("config-access", "ssh-access", "required")
2081 # self.logger.debug("ssh key needed")
2082 # Needed to inject a ssh key
2085 ("config-access", "ssh-access", "default-user"),
2087 step
= "Install configuration Software, getting public ssh key"
2088 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2089 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2092 step
= "Insert public key into VM user={} ssh_key={}".format(
2096 # self.logger.debug("no need to get ssh key")
2097 step
= "Waiting to VM being up and getting IP address"
2098 self
.logger
.debug(logging_text
+ step
)
2100 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2103 # n2vc_redesign STEP 5.1
2104 # wait for RO (ip-address) Insert pub_key into VM
2107 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2108 logging_text
, nsr_id
, vnfr_id
, kdu_name
2110 vnfd
= self
.db
.get_one(
2112 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2114 kdu
= get_kdu(vnfd
, kdu_name
)
2116 service
["name"] for service
in get_kdu_services(kdu
)
2118 exposed_services
= []
2119 for service
in services
:
2120 if any(s
in service
["name"] for s
in kdu_services
):
2121 exposed_services
.append(service
)
2122 await self
.vca_map
[vca_type
].exec_primitive(
2124 primitive_name
="config",
2126 "osm-config": json
.dumps(
2128 k8s
={"services": exposed_services
}
2135 # This verification is needed in order to avoid trying to add a public key
2136 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2137 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2138 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2140 elif db_vnfr
.get("vdur"):
2141 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2151 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2153 # store rw_mgmt_ip in deploy params for later replacement
2154 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2156 # n2vc_redesign STEP 6 Execute initial config primitive
2157 step
= "execute initial config primitive"
2159 # wait for dependent primitives execution (NS -> VNF -> VDU)
2160 if initial_config_primitive_list
:
2161 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2163 # stage, in function of element type: vdu, kdu, vnf or ns
2164 my_vca
= vca_deployed_list
[vca_index
]
2165 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2167 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2168 elif my_vca
.get("member-vnf-index"):
2170 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2173 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2175 self
._write
_configuration
_status
(
2176 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2179 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2181 check_if_terminated_needed
= True
2182 for initial_config_primitive
in initial_config_primitive_list
:
2183 # adding information on the vca_deployed if it is a NS execution environment
2184 if not vca_deployed
["member-vnf-index"]:
2185 deploy_params
["ns_config_info"] = json
.dumps(
2186 self
._get
_ns
_config
_info
(nsr_id
)
2188 # TODO check if already done
2189 primitive_params_
= self
._map
_primitive
_params
(
2190 initial_config_primitive
, {}, deploy_params
2193 step
= "execute primitive '{}' params '{}'".format(
2194 initial_config_primitive
["name"], primitive_params_
2196 self
.logger
.debug(logging_text
+ step
)
2197 await self
.vca_map
[vca_type
].exec_primitive(
2199 primitive_name
=initial_config_primitive
["name"],
2200 params_dict
=primitive_params_
,
2205 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2206 if check_if_terminated_needed
:
2207 if config_descriptor
.get("terminate-config-primitive"):
2209 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2211 check_if_terminated_needed
= False
2213 # TODO register in database that primitive is done
2215 # STEP 7 Configure metrics
2216 if vca_type
== "helm" or vca_type
== "helm-v3":
2217 # TODO: review for those cases where the helm chart is a reference and
2218 # is not part of the NF package
2219 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2221 artifact_path
=artifact_path
,
2222 ee_config_descriptor
=ee_config_descriptor
,
2225 target_ip
=rw_mgmt_ip
,
2231 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2234 for job
in prometheus_jobs
:
2237 {"job_name": job
["job_name"]},
2240 fail_on_empty
=False,
2243 step
= "instantiated at VCA"
2244 self
.logger
.debug(logging_text
+ step
)
2246 self
._write
_configuration
_status
(
2247 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2250 except Exception as e
: # TODO not use Exception but N2VC exception
2251 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2253 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2256 "Exception while {} : {}".format(step
, e
), exc_info
=True
2258 self
._write
_configuration
_status
(
2259 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2261 raise LcmException("{} {}".format(step
, e
)) from e
2263 def _write_ns_status(
2267 current_operation
: str,
2268 current_operation_id
: str,
2269 error_description
: str = None,
2270 error_detail
: str = None,
2271 other_update
: dict = None,
2274 Update db_nsr fields.
2277 :param current_operation:
2278 :param current_operation_id:
2279 :param error_description:
2280 :param error_detail:
2281 :param other_update: Other required changes at database if provided, will be cleared
2285 db_dict
= other_update
or {}
2288 ] = current_operation_id
# for backward compatibility
2289 db_dict
["_admin.current-operation"] = current_operation_id
2290 db_dict
["_admin.operation-type"] = (
2291 current_operation
if current_operation
!= "IDLE" else None
2293 db_dict
["currentOperation"] = current_operation
2294 db_dict
["currentOperationID"] = current_operation_id
2295 db_dict
["errorDescription"] = error_description
2296 db_dict
["errorDetail"] = error_detail
2299 db_dict
["nsState"] = ns_state
2300 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2301 except DbException
as e
:
2302 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2304 def _write_op_status(
2308 error_message
: str = None,
2309 queuePosition
: int = 0,
2310 operation_state
: str = None,
2311 other_update
: dict = None,
2314 db_dict
= other_update
or {}
2315 db_dict
["queuePosition"] = queuePosition
2316 if isinstance(stage
, list):
2317 db_dict
["stage"] = stage
[0]
2318 db_dict
["detailed-status"] = " ".join(stage
)
2319 elif stage
is not None:
2320 db_dict
["stage"] = str(stage
)
2322 if error_message
is not None:
2323 db_dict
["errorMessage"] = error_message
2324 if operation_state
is not None:
2325 db_dict
["operationState"] = operation_state
2326 db_dict
["statusEnteredTime"] = time()
2327 self
.update_db_2("nslcmops", op_id
, db_dict
)
2328 except DbException
as e
:
2330 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2333 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2335 nsr_id
= db_nsr
["_id"]
2336 # configurationStatus
2337 config_status
= db_nsr
.get("configurationStatus")
2340 "configurationStatus.{}.status".format(index
): status
2341 for index
, v
in enumerate(config_status
)
2345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2347 except DbException
as e
:
2349 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2352 def _write_configuration_status(
2357 element_under_configuration
: str = None,
2358 element_type
: str = None,
2359 other_update
: dict = None,
2362 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2363 # .format(vca_index, status))
2366 db_path
= "configurationStatus.{}.".format(vca_index
)
2367 db_dict
= other_update
or {}
2369 db_dict
[db_path
+ "status"] = status
2370 if element_under_configuration
:
2372 db_path
+ "elementUnderConfiguration"
2373 ] = element_under_configuration
2375 db_dict
[db_path
+ "elementType"] = element_type
2376 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2377 except DbException
as e
:
2379 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2380 status
, nsr_id
, vca_index
, e
2384 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2386 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2387 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2388 Database is used because the result can be obtained from a different LCM worker in case of HA.
2389 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2390 :param db_nslcmop: database content of nslcmop
2391 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2392 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2393 computed 'vim-account-id'
2396 nslcmop_id
= db_nslcmop
["_id"]
2397 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2398 if placement_engine
== "PLA":
2400 logging_text
+ "Invoke and wait for placement optimization"
2402 await self
.msg
.aiowrite(
2403 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2405 db_poll_interval
= 5
2406 wait
= db_poll_interval
* 10
2408 while not pla_result
and wait
>= 0:
2409 await asyncio
.sleep(db_poll_interval
)
2410 wait
-= db_poll_interval
2411 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2412 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2416 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2419 for pla_vnf
in pla_result
["vnf"]:
2420 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2421 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2426 {"_id": vnfr
["_id"]},
2427 {"vim-account-id": pla_vnf
["vimAccountId"]},
2430 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2433 def update_nsrs_with_pla_result(self
, params
):
2435 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2437 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2439 except Exception as e
:
2440 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2442 async def instantiate(self
, nsr_id
, nslcmop_id
):
2445 :param nsr_id: ns instance to deploy
2446 :param nslcmop_id: operation to run
2450 # Try to lock HA task here
2451 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2452 if not task_is_locked_by_me
:
2454 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2458 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2459 self
.logger
.debug(logging_text
+ "Enter")
2461 # get all needed from database
2463 # database nsrs record
2466 # database nslcmops record
2469 # update operation on nsrs
2471 # update operation on nslcmops
2472 db_nslcmop_update
= {}
2474 nslcmop_operation_state
= None
2475 db_vnfrs
= {} # vnf's info indexed by member-index
2477 tasks_dict_info
= {} # from task to info text
2481 "Stage 1/5: preparation of the environment.",
2482 "Waiting for previous operations to terminate.",
2485 # ^ stage, step, VIM progress
2487 # wait for any previous tasks in process
2488 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2490 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2491 stage
[1] = "Reading from database."
2492 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2493 db_nsr_update
["detailed-status"] = "creating"
2494 db_nsr_update
["operational-status"] = "init"
2495 self
._write
_ns
_status
(
2497 ns_state
="BUILDING",
2498 current_operation
="INSTANTIATING",
2499 current_operation_id
=nslcmop_id
,
2500 other_update
=db_nsr_update
,
2502 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2504 # read from db: operation
2505 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2506 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2507 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2508 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2509 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2511 ns_params
= db_nslcmop
.get("operationParams")
2512 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2513 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2515 timeout_ns_deploy
= self
.timeout
.get(
2516 "ns_deploy", self
.timeout_ns_deploy
2520 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2521 self
.logger
.debug(logging_text
+ stage
[1])
2522 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2523 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2524 self
.logger
.debug(logging_text
+ stage
[1])
2525 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2526 self
.fs
.sync(db_nsr
["nsd-id"])
2528 # nsr_name = db_nsr["name"] # TODO short-name??
2530 # read from db: vnf's of this ns
2531 stage
[1] = "Getting vnfrs from db."
2532 self
.logger
.debug(logging_text
+ stage
[1])
2533 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2535 # read from db: vnfd's for every vnf
2536 db_vnfds
= [] # every vnfd data
2538 # for each vnf in ns, read vnfd
2539 for vnfr
in db_vnfrs_list
:
2540 if vnfr
.get("kdur"):
2542 for kdur
in vnfr
["kdur"]:
2543 if kdur
.get("additionalParams"):
2544 kdur
["additionalParams"] = json
.loads(
2545 kdur
["additionalParams"]
2547 kdur_list
.append(kdur
)
2548 vnfr
["kdur"] = kdur_list
2550 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2551 vnfd_id
= vnfr
["vnfd-id"]
2552 vnfd_ref
= vnfr
["vnfd-ref"]
2553 self
.fs
.sync(vnfd_id
)
2555 # if we haven't this vnfd, read it from db
2556 if vnfd_id
not in db_vnfds
:
2558 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2561 self
.logger
.debug(logging_text
+ stage
[1])
2562 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2565 db_vnfds
.append(vnfd
)
2567 # Get or generates the _admin.deployed.VCA list
2568 vca_deployed_list
= None
2569 if db_nsr
["_admin"].get("deployed"):
2570 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2571 if vca_deployed_list
is None:
2572 vca_deployed_list
= []
2573 configuration_status_list
= []
2574 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2575 db_nsr_update
["configurationStatus"] = configuration_status_list
2576 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2577 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2578 elif isinstance(vca_deployed_list
, dict):
2579 # maintain backward compatibility. Change a dict to list at database
2580 vca_deployed_list
= list(vca_deployed_list
.values())
2581 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2582 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2585 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2587 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2588 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2590 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2591 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2592 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2594 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2597 # n2vc_redesign STEP 2 Deploy Network Scenario
2598 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2599 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2601 stage
[1] = "Deploying KDUs."
2602 # self.logger.debug(logging_text + "Before deploy_kdus")
2603 # Call to deploy_kdus in case exists the "vdu:kdu" param
2604 await self
.deploy_kdus(
2605 logging_text
=logging_text
,
2607 nslcmop_id
=nslcmop_id
,
2610 task_instantiation_info
=tasks_dict_info
,
2613 stage
[1] = "Getting VCA public key."
2614 # n2vc_redesign STEP 1 Get VCA public ssh-key
2615 # feature 1429. Add n2vc public key to needed VMs
2616 n2vc_key
= self
.n2vc
.get_public_key()
2617 n2vc_key_list
= [n2vc_key
]
2618 if self
.vca_config
.get("public_key"):
2619 n2vc_key_list
.append(self
.vca_config
["public_key"])
2621 stage
[1] = "Deploying NS at VIM."
2622 task_ro
= asyncio
.ensure_future(
2623 self
.instantiate_RO(
2624 logging_text
=logging_text
,
2628 db_nslcmop
=db_nslcmop
,
2631 n2vc_key_list
=n2vc_key_list
,
2635 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2636 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2638 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2639 stage
[1] = "Deploying Execution Environments."
2640 self
.logger
.debug(logging_text
+ stage
[1])
2642 # create namespace and certificate if any helm based EE is present in the NS
2643 if check_helm_ee_in_ns(db_vnfds
):
2644 # TODO: create EE namespace
2645 # create TLS certificates
2646 await self
.vca_map
["helm-v3"].create_tls_certificate(
2647 secret_name
="ee-tls-{}".format(nsr_id
),
2650 usage
="server auth",
2653 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2654 for vnf_profile
in get_vnf_profiles(nsd
):
2655 vnfd_id
= vnf_profile
["vnfd-id"]
2656 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2657 member_vnf_index
= str(vnf_profile
["id"])
2658 db_vnfr
= db_vnfrs
[member_vnf_index
]
2659 base_folder
= vnfd
["_admin"]["storage"]
2665 # Get additional parameters
2666 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2667 if db_vnfr
.get("additionalParamsForVnf"):
2668 deploy_params
.update(
2669 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2672 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2673 if descriptor_config
:
2675 logging_text
=logging_text
2676 + "member_vnf_index={} ".format(member_vnf_index
),
2679 nslcmop_id
=nslcmop_id
,
2685 member_vnf_index
=member_vnf_index
,
2686 vdu_index
=vdu_index
,
2688 deploy_params
=deploy_params
,
2689 descriptor_config
=descriptor_config
,
2690 base_folder
=base_folder
,
2691 task_instantiation_info
=tasks_dict_info
,
2695 # Deploy charms for each VDU that supports one.
2696 for vdud
in get_vdu_list(vnfd
):
2698 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2699 vdur
= find_in_list(
2700 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2703 if vdur
.get("additionalParams"):
2704 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2706 deploy_params_vdu
= deploy_params
2707 deploy_params_vdu
["OSM"] = get_osm_params(
2708 db_vnfr
, vdu_id
, vdu_count_index
=0
2710 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2712 self
.logger
.debug("VDUD > {}".format(vdud
))
2714 "Descriptor config > {}".format(descriptor_config
)
2716 if descriptor_config
:
2719 for vdu_index
in range(vdud_count
):
2720 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2722 logging_text
=logging_text
2723 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2724 member_vnf_index
, vdu_id
, vdu_index
2728 nslcmop_id
=nslcmop_id
,
2734 member_vnf_index
=member_vnf_index
,
2735 vdu_index
=vdu_index
,
2737 deploy_params
=deploy_params_vdu
,
2738 descriptor_config
=descriptor_config
,
2739 base_folder
=base_folder
,
2740 task_instantiation_info
=tasks_dict_info
,
2743 for kdud
in get_kdu_list(vnfd
):
2744 kdu_name
= kdud
["name"]
2745 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2746 if descriptor_config
:
2751 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2753 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2754 if kdur
.get("additionalParams"):
2755 deploy_params_kdu
.update(
2756 parse_yaml_strings(kdur
["additionalParams"].copy())
2760 logging_text
=logging_text
,
2763 nslcmop_id
=nslcmop_id
,
2769 member_vnf_index
=member_vnf_index
,
2770 vdu_index
=vdu_index
,
2772 deploy_params
=deploy_params_kdu
,
2773 descriptor_config
=descriptor_config
,
2774 base_folder
=base_folder
,
2775 task_instantiation_info
=tasks_dict_info
,
2779 # Check if this NS has a charm configuration
2780 descriptor_config
= nsd
.get("ns-configuration")
2781 if descriptor_config
and descriptor_config
.get("juju"):
2784 member_vnf_index
= None
2790 # Get additional parameters
2791 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2792 if db_nsr
.get("additionalParamsForNs"):
2793 deploy_params
.update(
2794 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2796 base_folder
= nsd
["_admin"]["storage"]
2798 logging_text
=logging_text
,
2801 nslcmop_id
=nslcmop_id
,
2807 member_vnf_index
=member_vnf_index
,
2808 vdu_index
=vdu_index
,
2810 deploy_params
=deploy_params
,
2811 descriptor_config
=descriptor_config
,
2812 base_folder
=base_folder
,
2813 task_instantiation_info
=tasks_dict_info
,
2817 # rest of staff will be done at finally
2820 ROclient
.ROClientException
,
2826 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2829 except asyncio
.CancelledError
:
2831 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2833 exc
= "Operation was cancelled"
2834 except Exception as e
:
2835 exc
= traceback
.format_exc()
2836 self
.logger
.critical(
2837 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2842 error_list
.append(str(exc
))
2844 # wait for pending tasks
2846 stage
[1] = "Waiting for instantiate pending tasks."
2847 self
.logger
.debug(logging_text
+ stage
[1])
2848 error_list
+= await self
._wait
_for
_tasks
(
2856 stage
[1] = stage
[2] = ""
2857 except asyncio
.CancelledError
:
2858 error_list
.append("Cancelled")
2859 # TODO cancel all tasks
2860 except Exception as exc
:
2861 error_list
.append(str(exc
))
2863 # update operation-status
2864 db_nsr_update
["operational-status"] = "running"
2865 # let's begin with VCA 'configured' status (later we can change it)
2866 db_nsr_update
["config-status"] = "configured"
2867 for task
, task_name
in tasks_dict_info
.items():
2868 if not task
.done() or task
.cancelled() or task
.exception():
2869 if task_name
.startswith(self
.task_name_deploy_vca
):
2870 # A N2VC task is pending
2871 db_nsr_update
["config-status"] = "failed"
2873 # RO or KDU task is pending
2874 db_nsr_update
["operational-status"] = "failed"
2876 # update status at database
2878 error_detail
= ". ".join(error_list
)
2879 self
.logger
.error(logging_text
+ error_detail
)
2880 error_description_nslcmop
= "{} Detail: {}".format(
2881 stage
[0], error_detail
2883 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2884 nslcmop_id
, stage
[0]
2887 db_nsr_update
["detailed-status"] = (
2888 error_description_nsr
+ " Detail: " + error_detail
2890 db_nslcmop_update
["detailed-status"] = error_detail
2891 nslcmop_operation_state
= "FAILED"
2895 error_description_nsr
= error_description_nslcmop
= None
2897 db_nsr_update
["detailed-status"] = "Done"
2898 db_nslcmop_update
["detailed-status"] = "Done"
2899 nslcmop_operation_state
= "COMPLETED"
2902 self
._write
_ns
_status
(
2905 current_operation
="IDLE",
2906 current_operation_id
=None,
2907 error_description
=error_description_nsr
,
2908 error_detail
=error_detail
,
2909 other_update
=db_nsr_update
,
2911 self
._write
_op
_status
(
2914 error_message
=error_description_nslcmop
,
2915 operation_state
=nslcmop_operation_state
,
2916 other_update
=db_nslcmop_update
,
2919 if nslcmop_operation_state
:
2921 await self
.msg
.aiowrite(
2926 "nslcmop_id": nslcmop_id
,
2927 "operationState": nslcmop_operation_state
,
2931 except Exception as e
:
2933 logging_text
+ "kafka_write notification Exception {}".format(e
)
2936 self
.logger
.debug(logging_text
+ "Exit")
2937 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2939 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2940 if vnfd_id
not in cached_vnfds
:
2941 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2942 return cached_vnfds
[vnfd_id
]
2944 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2945 if vnf_profile_id
not in cached_vnfrs
:
2946 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2949 "member-vnf-index-ref": vnf_profile_id
,
2950 "nsr-id-ref": nsr_id
,
2953 return cached_vnfrs
[vnf_profile_id
]
2955 def _is_deployed_vca_in_relation(
2956 self
, vca
: DeployedVCA
, relation
: Relation
2959 for endpoint
in (relation
.provider
, relation
.requirer
):
2960 if endpoint
["kdu-resource-profile-id"]:
2963 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2964 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2965 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2971 def _update_ee_relation_data_with_implicit_data(
2972 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2974 ee_relation_data
= safe_get_ee_relation(
2975 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2977 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2978 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2979 "execution-environment-ref"
2981 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2982 vnfd_id
= vnf_profile
["vnfd-id"]
2983 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2986 if ee_relation_level
== EELevel
.VNF
2987 else ee_relation_data
["vdu-profile-id"]
2989 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2992 f
"not execution environments found for ee_relation {ee_relation_data}"
2994 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2995 return ee_relation_data
2997 def _get_ns_relations(
3000 nsd
: Dict
[str, Any
],
3002 cached_vnfds
: Dict
[str, Any
],
3003 ) -> List
[Relation
]:
3005 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
3006 for r
in db_ns_relations
:
3007 provider_dict
= None
3008 requirer_dict
= None
3009 if all(key
in r
for key
in ("provider", "requirer")):
3010 provider_dict
= r
["provider"]
3011 requirer_dict
= r
["requirer"]
3012 elif "entities" in r
:
3013 provider_id
= r
["entities"][0]["id"]
3016 "endpoint": r
["entities"][0]["endpoint"],
3018 if provider_id
!= nsd
["id"]:
3019 provider_dict
["vnf-profile-id"] = provider_id
3020 requirer_id
= r
["entities"][1]["id"]
3023 "endpoint": r
["entities"][1]["endpoint"],
3025 if requirer_id
!= nsd
["id"]:
3026 requirer_dict
["vnf-profile-id"] = requirer_id
3029 "provider/requirer or entities must be included in the relation."
3031 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3032 nsr_id
, nsd
, provider_dict
, cached_vnfds
3034 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3035 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3037 provider
= EERelation(relation_provider
)
3038 requirer
= EERelation(relation_requirer
)
3039 relation
= Relation(r
["name"], provider
, requirer
)
3040 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3042 relations
.append(relation
)
3045 def _get_vnf_relations(
3048 nsd
: Dict
[str, Any
],
3050 cached_vnfds
: Dict
[str, Any
],
3051 ) -> List
[Relation
]:
3053 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3054 vnf_profile_id
= vnf_profile
["id"]
3055 vnfd_id
= vnf_profile
["vnfd-id"]
3056 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3057 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3058 for r
in db_vnf_relations
:
3059 provider_dict
= None
3060 requirer_dict
= None
3061 if all(key
in r
for key
in ("provider", "requirer")):
3062 provider_dict
= r
["provider"]
3063 requirer_dict
= r
["requirer"]
3064 elif "entities" in r
:
3065 provider_id
= r
["entities"][0]["id"]
3068 "vnf-profile-id": vnf_profile_id
,
3069 "endpoint": r
["entities"][0]["endpoint"],
3071 if provider_id
!= vnfd_id
:
3072 provider_dict
["vdu-profile-id"] = provider_id
3073 requirer_id
= r
["entities"][1]["id"]
3076 "vnf-profile-id": vnf_profile_id
,
3077 "endpoint": r
["entities"][1]["endpoint"],
3079 if requirer_id
!= vnfd_id
:
3080 requirer_dict
["vdu-profile-id"] = requirer_id
3083 "provider/requirer or entities must be included in the relation."
3085 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3086 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3088 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3089 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3091 provider
= EERelation(relation_provider
)
3092 requirer
= EERelation(relation_requirer
)
3093 relation
= Relation(r
["name"], provider
, requirer
)
3094 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3096 relations
.append(relation
)
3099 def _get_kdu_resource_data(
3101 ee_relation
: EERelation
,
3102 db_nsr
: Dict
[str, Any
],
3103 cached_vnfds
: Dict
[str, Any
],
3104 ) -> DeployedK8sResource
:
3105 nsd
= get_nsd(db_nsr
)
3106 vnf_profiles
= get_vnf_profiles(nsd
)
3107 vnfd_id
= find_in_list(
3109 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3111 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3112 kdu_resource_profile
= get_kdu_resource_profile(
3113 db_vnfd
, ee_relation
.kdu_resource_profile_id
3115 kdu_name
= kdu_resource_profile
["kdu-name"]
3116 deployed_kdu
, _
= get_deployed_kdu(
3117 db_nsr
.get("_admin", ()).get("deployed", ()),
3119 ee_relation
.vnf_profile_id
,
3121 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3124 def _get_deployed_component(
3126 ee_relation
: EERelation
,
3127 db_nsr
: Dict
[str, Any
],
3128 cached_vnfds
: Dict
[str, Any
],
3129 ) -> DeployedComponent
:
3130 nsr_id
= db_nsr
["_id"]
3131 deployed_component
= None
3132 ee_level
= EELevel
.get_level(ee_relation
)
3133 if ee_level
== EELevel
.NS
:
3134 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3136 deployed_component
= DeployedVCA(nsr_id
, vca
)
3137 elif ee_level
== EELevel
.VNF
:
3138 vca
= get_deployed_vca(
3142 "member-vnf-index": ee_relation
.vnf_profile_id
,
3143 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3147 deployed_component
= DeployedVCA(nsr_id
, vca
)
3148 elif ee_level
== EELevel
.VDU
:
3149 vca
= get_deployed_vca(
3152 "vdu_id": ee_relation
.vdu_profile_id
,
3153 "member-vnf-index": ee_relation
.vnf_profile_id
,
3154 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3158 deployed_component
= DeployedVCA(nsr_id
, vca
)
3159 elif ee_level
== EELevel
.KDU
:
3160 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3161 ee_relation
, db_nsr
, cached_vnfds
3163 if kdu_resource_data
:
3164 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3165 return deployed_component
3167 async def _add_relation(
3171 db_nsr
: Dict
[str, Any
],
3172 cached_vnfds
: Dict
[str, Any
],
3173 cached_vnfrs
: Dict
[str, Any
],
3175 deployed_provider
= self
._get
_deployed
_component
(
3176 relation
.provider
, db_nsr
, cached_vnfds
3178 deployed_requirer
= self
._get
_deployed
_component
(
3179 relation
.requirer
, db_nsr
, cached_vnfds
3183 and deployed_requirer
3184 and deployed_provider
.config_sw_installed
3185 and deployed_requirer
.config_sw_installed
3187 provider_db_vnfr
= (
3189 relation
.provider
.nsr_id
,
3190 relation
.provider
.vnf_profile_id
,
3193 if relation
.provider
.vnf_profile_id
3196 requirer_db_vnfr
= (
3198 relation
.requirer
.nsr_id
,
3199 relation
.requirer
.vnf_profile_id
,
3202 if relation
.requirer
.vnf_profile_id
3205 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3206 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3207 provider_relation_endpoint
= RelationEndpoint(
3208 deployed_provider
.ee_id
,
3210 relation
.provider
.endpoint
,
3212 requirer_relation_endpoint
= RelationEndpoint(
3213 deployed_requirer
.ee_id
,
3215 relation
.requirer
.endpoint
,
3217 await self
.vca_map
[vca_type
].add_relation(
3218 provider
=provider_relation_endpoint
,
3219 requirer
=requirer_relation_endpoint
,
3221 # remove entry from relations list
3225 async def _add_vca_relations(
3231 timeout
: int = 3600,
3235 # 1. find all relations for this VCA
3236 # 2. wait for other peers related
3240 # STEP 1: find all relations for this VCA
3243 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3244 nsd
= get_nsd(db_nsr
)
3247 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3248 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3253 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3254 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3256 # if no relations, terminate
3258 self
.logger
.debug(logging_text
+ " No relations")
3261 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3268 if now
- start
>= timeout
:
3269 self
.logger
.error(logging_text
+ " : timeout adding relations")
3272 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3273 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3275 # for each relation, find the VCA's related
3276 for relation
in relations
.copy():
3277 added
= await self
._add
_relation
(
3285 relations
.remove(relation
)
3288 self
.logger
.debug("Relations added")
3290 await asyncio
.sleep(5.0)
3294 except Exception as e
:
3295 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3298 async def _install_kdu(
3306 k8s_instance_info
: dict,
3307 k8params
: dict = None,
3313 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3316 "collection": "nsrs",
3317 "filter": {"_id": nsr_id
},
3318 "path": nsr_db_path
,
3321 if k8s_instance_info
.get("kdu-deployment-name"):
3322 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3324 kdu_instance
= self
.k8scluster_map
[
3326 ].generate_kdu_instance_name(
3327 db_dict
=db_dict_install
,
3328 kdu_model
=k8s_instance_info
["kdu-model"],
3329 kdu_name
=k8s_instance_info
["kdu-name"],
3332 # Update the nsrs table with the kdu-instance value
3336 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3339 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3340 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3341 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3342 # namespace, this first verification could be removed, and the next step would be done for any kind
3344 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3345 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3346 if k8sclustertype
in ("juju", "juju-bundle"):
3347 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3348 # that the user passed a namespace which he wants its KDU to be deployed in)
3354 "_admin.projects_write": k8s_instance_info
["namespace"],
3355 "_admin.projects_read": k8s_instance_info
["namespace"],
3361 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3366 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3368 k8s_instance_info
["namespace"] = kdu_instance
3370 await self
.k8scluster_map
[k8sclustertype
].install(
3371 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3372 kdu_model
=k8s_instance_info
["kdu-model"],
3375 db_dict
=db_dict_install
,
3377 kdu_name
=k8s_instance_info
["kdu-name"],
3378 namespace
=k8s_instance_info
["namespace"],
3379 kdu_instance
=kdu_instance
,
3383 # Obtain services to obtain management service ip
3384 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3385 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3386 kdu_instance
=kdu_instance
,
3387 namespace
=k8s_instance_info
["namespace"],
3390 # Obtain management service info (if exists)
3391 vnfr_update_dict
= {}
3392 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3394 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3399 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3402 for service
in kdud
.get("service", [])
3403 if service
.get("mgmt-service")
3405 for mgmt_service
in mgmt_services
:
3406 for service
in services
:
3407 if service
["name"].startswith(mgmt_service
["name"]):
3408 # Mgmt service found, Obtain service ip
3409 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3410 if isinstance(ip
, list) and len(ip
) == 1:
3414 "kdur.{}.ip-address".format(kdu_index
)
3417 # Check if must update also mgmt ip at the vnf
3418 service_external_cp
= mgmt_service
.get(
3419 "external-connection-point-ref"
3421 if service_external_cp
:
3423 deep_get(vnfd
, ("mgmt-interface", "cp"))
3424 == service_external_cp
3426 vnfr_update_dict
["ip-address"] = ip
3431 "external-connection-point-ref", ""
3433 == service_external_cp
,
3436 "kdur.{}.ip-address".format(kdu_index
)
3441 "Mgmt service name: {} not found".format(
3442 mgmt_service
["name"]
3446 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3447 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3449 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3452 and kdu_config
.get("initial-config-primitive")
3453 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3455 initial_config_primitive_list
= kdu_config
.get(
3456 "initial-config-primitive"
3458 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3460 for initial_config_primitive
in initial_config_primitive_list
:
3461 primitive_params_
= self
._map
_primitive
_params
(
3462 initial_config_primitive
, {}, {}
3465 await asyncio
.wait_for(
3466 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3467 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3468 kdu_instance
=kdu_instance
,
3469 primitive_name
=initial_config_primitive
["name"],
3470 params
=primitive_params_
,
3471 db_dict
=db_dict_install
,
3477 except Exception as e
:
3478 # Prepare update db with error and raise exception
3481 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3485 vnfr_data
.get("_id"),
3486 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3489 # ignore to keep original exception
3491 # reraise original error
3496 async def deploy_kdus(
3503 task_instantiation_info
,
3505 # Launch kdus if present in the descriptor
3507 k8scluster_id_2_uuic
= {
3508 "helm-chart-v3": {},
3513 async def _get_cluster_id(cluster_id
, cluster_type
):
3514 nonlocal k8scluster_id_2_uuic
3515 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3516 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3518 # check if K8scluster is creating and wait look if previous tasks in process
3519 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3520 "k8scluster", cluster_id
3523 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3524 task_name
, cluster_id
3526 self
.logger
.debug(logging_text
+ text
)
3527 await asyncio
.wait(task_dependency
, timeout
=3600)
3529 db_k8scluster
= self
.db
.get_one(
3530 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3532 if not db_k8scluster
:
3533 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3535 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3537 if cluster_type
== "helm-chart-v3":
3539 # backward compatibility for existing clusters that have not been initialized for helm v3
3540 k8s_credentials
= yaml
.safe_dump(
3541 db_k8scluster
.get("credentials")
3543 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3544 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3546 db_k8scluster_update
= {}
3547 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3548 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3549 db_k8scluster_update
[
3550 "_admin.helm-chart-v3.created"
3552 db_k8scluster_update
[
3553 "_admin.helm-chart-v3.operationalState"
3556 "k8sclusters", cluster_id
, db_k8scluster_update
3558 except Exception as e
:
3561 + "error initializing helm-v3 cluster: {}".format(str(e
))
3564 "K8s cluster '{}' has not been initialized for '{}'".format(
3565 cluster_id
, cluster_type
3570 "K8s cluster '{}' has not been initialized for '{}'".format(
3571 cluster_id
, cluster_type
3574 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3577 logging_text
+= "Deploy kdus: "
3580 db_nsr_update
= {"_admin.deployed.K8s": []}
3581 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3584 updated_cluster_list
= []
3585 updated_v3_cluster_list
= []
3587 for vnfr_data
in db_vnfrs
.values():
3588 vca_id
= self
.get_vca_id(vnfr_data
, {})
3589 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3590 # Step 0: Prepare and set parameters
3591 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3592 vnfd_id
= vnfr_data
.get("vnfd-id")
3593 vnfd_with_id
= find_in_list(
3594 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3598 for kdud
in vnfd_with_id
["kdu"]
3599 if kdud
["name"] == kdur
["kdu-name"]
3601 namespace
= kdur
.get("k8s-namespace")
3602 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3603 if kdur
.get("helm-chart"):
3604 kdumodel
= kdur
["helm-chart"]
3605 # Default version: helm3, if helm-version is v2 assign v2
3606 k8sclustertype
= "helm-chart-v3"
3607 self
.logger
.debug("kdur: {}".format(kdur
))
3609 kdur
.get("helm-version")
3610 and kdur
.get("helm-version") == "v2"
3612 k8sclustertype
= "helm-chart"
3613 elif kdur
.get("juju-bundle"):
3614 kdumodel
= kdur
["juju-bundle"]
3615 k8sclustertype
= "juju-bundle"
3618 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3619 "juju-bundle. Maybe an old NBI version is running".format(
3620 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3623 # check if kdumodel is a file and exists
3625 vnfd_with_id
= find_in_list(
3626 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3628 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3629 if storage
: # may be not present if vnfd has not artifacts
3630 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3631 if storage
["pkg-dir"]:
3632 filename
= "{}/{}/{}s/{}".format(
3639 filename
= "{}/Scripts/{}s/{}".format(
3644 if self
.fs
.file_exists(
3645 filename
, mode
="file"
3646 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3647 kdumodel
= self
.fs
.path
+ filename
3648 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3650 except Exception: # it is not a file
3653 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3654 step
= "Synchronize repos for k8s cluster '{}'".format(
3657 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3661 k8sclustertype
== "helm-chart"
3662 and cluster_uuid
not in updated_cluster_list
3664 k8sclustertype
== "helm-chart-v3"
3665 and cluster_uuid
not in updated_v3_cluster_list
3667 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3668 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3669 cluster_uuid
=cluster_uuid
3672 if del_repo_list
or added_repo_dict
:
3673 if k8sclustertype
== "helm-chart":
3675 "_admin.helm_charts_added." + item
: None
3676 for item
in del_repo_list
3679 "_admin.helm_charts_added." + item
: name
3680 for item
, name
in added_repo_dict
.items()
3682 updated_cluster_list
.append(cluster_uuid
)
3683 elif k8sclustertype
== "helm-chart-v3":
3685 "_admin.helm_charts_v3_added." + item
: None
3686 for item
in del_repo_list
3689 "_admin.helm_charts_v3_added." + item
: name
3690 for item
, name
in added_repo_dict
.items()
3692 updated_v3_cluster_list
.append(cluster_uuid
)
3694 logging_text
+ "repos synchronized on k8s cluster "
3695 "'{}' to_delete: {}, to_add: {}".format(
3696 k8s_cluster_id
, del_repo_list
, added_repo_dict
3701 {"_id": k8s_cluster_id
},
3707 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3708 vnfr_data
["member-vnf-index-ref"],
3712 k8s_instance_info
= {
3713 "kdu-instance": None,
3714 "k8scluster-uuid": cluster_uuid
,
3715 "k8scluster-type": k8sclustertype
,
3716 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3717 "kdu-name": kdur
["kdu-name"],
3718 "kdu-model": kdumodel
,
3719 "namespace": namespace
,
3720 "kdu-deployment-name": kdu_deployment_name
,
3722 db_path
= "_admin.deployed.K8s.{}".format(index
)
3723 db_nsr_update
[db_path
] = k8s_instance_info
3724 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3725 vnfd_with_id
= find_in_list(
3726 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3728 task
= asyncio
.ensure_future(
3737 k8params
=desc_params
,
3742 self
.lcm_tasks
.register(
3746 "instantiate_KDU-{}".format(index
),
3749 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3755 except (LcmException
, asyncio
.CancelledError
):
3757 except Exception as e
:
3758 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3759 if isinstance(e
, (N2VCException
, DbException
)):
3760 self
.logger
.error(logging_text
+ msg
)
3762 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3763 raise LcmException(msg
)
3766 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3785 task_instantiation_info
,
3788 # launch instantiate_N2VC in a asyncio task and register task object
3789 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3790 # if not found, create one entry and update database
3791 # fill db_nsr._admin.deployed.VCA.<index>
3794 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3798 get_charm_name
= False
3799 if "execution-environment-list" in descriptor_config
:
3800 ee_list
= descriptor_config
.get("execution-environment-list", [])
3801 elif "juju" in descriptor_config
:
3802 ee_list
= [descriptor_config
] # ns charms
3803 if "execution-environment-list" not in descriptor_config
:
3804 # charm name is only required for ns charms
3805 get_charm_name
= True
3806 else: # other types as script are not supported
3809 for ee_item
in ee_list
:
3812 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3813 ee_item
.get("juju"), ee_item
.get("helm-chart")
3816 ee_descriptor_id
= ee_item
.get("id")
3817 if ee_item
.get("juju"):
3818 vca_name
= ee_item
["juju"].get("charm")
3820 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3823 if ee_item
["juju"].get("charm") is not None
3826 if ee_item
["juju"].get("cloud") == "k8s":
3827 vca_type
= "k8s_proxy_charm"
3828 elif ee_item
["juju"].get("proxy") is False:
3829 vca_type
= "native_charm"
3830 elif ee_item
.get("helm-chart"):
3831 vca_name
= ee_item
["helm-chart"]
3832 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3835 vca_type
= "helm-v3"
3838 logging_text
+ "skipping non juju neither charm configuration"
3843 for vca_index
, vca_deployed
in enumerate(
3844 db_nsr
["_admin"]["deployed"]["VCA"]
3846 if not vca_deployed
:
3849 vca_deployed
.get("member-vnf-index") == member_vnf_index
3850 and vca_deployed
.get("vdu_id") == vdu_id
3851 and vca_deployed
.get("kdu_name") == kdu_name
3852 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3853 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3857 # not found, create one.
3859 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3862 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3864 target
+= "/kdu/{}".format(kdu_name
)
3866 "target_element": target
,
3867 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3868 "member-vnf-index": member_vnf_index
,
3870 "kdu_name": kdu_name
,
3871 "vdu_count_index": vdu_index
,
3872 "operational-status": "init", # TODO revise
3873 "detailed-status": "", # TODO revise
3874 "step": "initial-deploy", # TODO revise
3876 "vdu_name": vdu_name
,
3878 "ee_descriptor_id": ee_descriptor_id
,
3879 "charm_name": charm_name
,
3883 # create VCA and configurationStatus in db
3885 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3886 "configurationStatus.{}".format(vca_index
): dict(),
3888 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3890 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3892 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3893 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3894 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3897 task_n2vc
= asyncio
.ensure_future(
3898 self
.instantiate_N2VC(
3899 logging_text
=logging_text
,
3900 vca_index
=vca_index
,
3906 vdu_index
=vdu_index
,
3907 deploy_params
=deploy_params
,
3908 config_descriptor
=descriptor_config
,
3909 base_folder
=base_folder
,
3910 nslcmop_id
=nslcmop_id
,
3914 ee_config_descriptor
=ee_item
,
3917 self
.lcm_tasks
.register(
3921 "instantiate_N2VC-{}".format(vca_index
),
3924 task_instantiation_info
[
3926 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3927 member_vnf_index
or "", vdu_id
or ""
3931 def _create_nslcmop(nsr_id
, operation
, params
):
3933 Creates a ns-lcm-opp content to be stored at database.
3934 :param nsr_id: internal id of the instance
3935 :param operation: instantiate, terminate, scale, action, ...
3936 :param params: user parameters for the operation
3937 :return: dictionary following SOL005 format
3939 # Raise exception if invalid arguments
3940 if not (nsr_id
and operation
and params
):
3942 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3949 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3950 "operationState": "PROCESSING",
3951 "statusEnteredTime": now
,
3952 "nsInstanceId": nsr_id
,
3953 "lcmOperationType": operation
,
3955 "isAutomaticInvocation": False,
3956 "operationParams": params
,
3957 "isCancelPending": False,
3959 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3960 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3965 def _format_additional_params(self
, params
):
3966 params
= params
or {}
3967 for key
, value
in params
.items():
3968 if str(value
).startswith("!!yaml "):
3969 params
[key
] = yaml
.safe_load(value
[7:])
3972 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3973 primitive
= seq
.get("name")
3974 primitive_params
= {}
3976 "member_vnf_index": vnf_index
,
3977 "primitive": primitive
,
3978 "primitive_params": primitive_params
,
3981 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3985 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3986 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3987 if op
.get("operationState") == "COMPLETED":
3988 # b. Skip sub-operation
3989 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3990 return self
.SUBOPERATION_STATUS_SKIP
3992 # c. retry executing sub-operation
3993 # The sub-operation exists, and operationState != 'COMPLETED'
3994 # Update operationState = 'PROCESSING' to indicate a retry.
3995 operationState
= "PROCESSING"
3996 detailed_status
= "In progress"
3997 self
._update
_suboperation
_status
(
3998 db_nslcmop
, op_index
, operationState
, detailed_status
4000 # Return the sub-operation index
4001 # _ns_execute_primitive() or RO.create_action() will be called from scale()
4002 # with arguments extracted from the sub-operation
4005 # Find a sub-operation where all keys in a matching dictionary must match
4006 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
4007 def _find_suboperation(self
, db_nslcmop
, match
):
4008 if db_nslcmop
and match
:
4009 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
4010 for i
, op
in enumerate(op_list
):
4011 if all(op
.get(k
) == match
[k
] for k
in match
):
4013 return self
.SUBOPERATION_STATUS_NOT_FOUND
4015 # Update status for a sub-operation given its index
4016 def _update_suboperation_status(
4017 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4019 # Update DB for HA tasks
4020 q_filter
= {"_id": db_nslcmop
["_id"]}
4022 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4023 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4026 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4029 # Add sub-operation, return the index of the added sub-operation
4030 # Optionally, set operationState, detailed-status, and operationType
4031 # Status and type are currently set for 'scale' sub-operations:
4032 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4033 # 'detailed-status' : status message
4034 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4035 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4036 def _add_suboperation(
4044 mapped_primitive_params
,
4045 operationState
=None,
4046 detailed_status
=None,
4049 RO_scaling_info
=None,
4052 return self
.SUBOPERATION_STATUS_NOT_FOUND
4053 # Get the "_admin.operations" list, if it exists
4054 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4055 op_list
= db_nslcmop_admin
.get("operations")
4056 # Create or append to the "_admin.operations" list
4058 "member_vnf_index": vnf_index
,
4060 "vdu_count_index": vdu_count_index
,
4061 "primitive": primitive
,
4062 "primitive_params": mapped_primitive_params
,
4065 new_op
["operationState"] = operationState
4067 new_op
["detailed-status"] = detailed_status
4069 new_op
["lcmOperationType"] = operationType
4071 new_op
["RO_nsr_id"] = RO_nsr_id
4073 new_op
["RO_scaling_info"] = RO_scaling_info
4075 # No existing operations, create key 'operations' with current operation as first list element
4076 db_nslcmop_admin
.update({"operations": [new_op
]})
4077 op_list
= db_nslcmop_admin
.get("operations")
4079 # Existing operations, append operation to list
4080 op_list
.append(new_op
)
4082 db_nslcmop_update
= {"_admin.operations": op_list
}
4083 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4084 op_index
= len(op_list
) - 1
4087 # Helper methods for scale() sub-operations
4089 # pre-scale/post-scale:
4090 # Check for 3 different cases:
4091 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4092 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4093 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4094 def _check_or_add_scale_suboperation(
4098 vnf_config_primitive
,
4102 RO_scaling_info
=None,
4104 # Find this sub-operation
4105 if RO_nsr_id
and RO_scaling_info
:
4106 operationType
= "SCALE-RO"
4108 "member_vnf_index": vnf_index
,
4109 "RO_nsr_id": RO_nsr_id
,
4110 "RO_scaling_info": RO_scaling_info
,
4114 "member_vnf_index": vnf_index
,
4115 "primitive": vnf_config_primitive
,
4116 "primitive_params": primitive_params
,
4117 "lcmOperationType": operationType
,
4119 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4120 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4121 # a. New sub-operation
4122 # The sub-operation does not exist, add it.
4123 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4124 # The following parameters are set to None for all kind of scaling:
4126 vdu_count_index
= None
4128 if RO_nsr_id
and RO_scaling_info
:
4129 vnf_config_primitive
= None
4130 primitive_params
= None
4133 RO_scaling_info
= None
4134 # Initial status for sub-operation
4135 operationState
= "PROCESSING"
4136 detailed_status
= "In progress"
4137 # Add sub-operation for pre/post-scaling (zero or more operations)
4138 self
._add
_suboperation
(
4144 vnf_config_primitive
,
4152 return self
.SUBOPERATION_STATUS_NEW
4154 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4155 # or op_index (operationState != 'COMPLETED')
4156 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4158 # Function to return execution_environment id
4160 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4161 # TODO vdu_index_count
4162 for vca
in vca_deployed_list
:
4163 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4166 async def destroy_N2VC(
4174 exec_primitives
=True,
4179 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4180 :param logging_text:
4182 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4183 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4184 :param vca_index: index in the database _admin.deployed.VCA
4185 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4186 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4187 not executed properly
4188 :param scaling_in: True destroys the application, False destroys the model
4189 :return: None or exception
4194 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4195 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4199 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4201 # execute terminate_primitives
4203 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4204 config_descriptor
.get("terminate-config-primitive"),
4205 vca_deployed
.get("ee_descriptor_id"),
4207 vdu_id
= vca_deployed
.get("vdu_id")
4208 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4209 vdu_name
= vca_deployed
.get("vdu_name")
4210 vnf_index
= vca_deployed
.get("member-vnf-index")
4211 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4212 for seq
in terminate_primitives
:
4213 # For each sequence in list, get primitive and call _ns_execute_primitive()
4214 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4215 vnf_index
, seq
.get("name")
4217 self
.logger
.debug(logging_text
+ step
)
4218 # Create the primitive for each sequence, i.e. "primitive": "touch"
4219 primitive
= seq
.get("name")
4220 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4225 self
._add
_suboperation
(
4232 mapped_primitive_params
,
4234 # Sub-operations: Call _ns_execute_primitive() instead of action()
4236 result
, result_detail
= await self
._ns
_execute
_primitive
(
4237 vca_deployed
["ee_id"],
4239 mapped_primitive_params
,
4243 except LcmException
:
4244 # this happens when VCA is not deployed. In this case it is not needed to terminate
4246 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4247 if result
not in result_ok
:
4249 "terminate_primitive {} for vnf_member_index={} fails with "
4250 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4252 # set that this VCA do not need terminated
4253 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4257 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4260 # Delete Prometheus Jobs if any
4261 # This uses NSR_ID, so it will destroy any jobs under this index
4262 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4265 await self
.vca_map
[vca_type
].delete_execution_environment(
4266 vca_deployed
["ee_id"],
4267 scaling_in
=scaling_in
,
4272 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4273 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4274 namespace
= "." + db_nsr
["_id"]
4276 await self
.n2vc
.delete_namespace(
4277 namespace
=namespace
,
4278 total_timeout
=self
.timeout_charm_delete
,
4281 except N2VCNotFound
: # already deleted. Skip
4283 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4285 async def _terminate_RO(
4286 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4289 Terminates a deployment from RO
4290 :param logging_text:
4291 :param nsr_deployed: db_nsr._admin.deployed
4294 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4295 this method will update only the index 2, but it will write on database the concatenated content of the list
4300 ro_nsr_id
= ro_delete_action
= None
4301 if nsr_deployed
and nsr_deployed
.get("RO"):
4302 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4303 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4306 stage
[2] = "Deleting ns from VIM."
4307 db_nsr_update
["detailed-status"] = " ".join(stage
)
4308 self
._write
_op
_status
(nslcmop_id
, stage
)
4309 self
.logger
.debug(logging_text
+ stage
[2])
4310 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4311 self
._write
_op
_status
(nslcmop_id
, stage
)
4312 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4313 ro_delete_action
= desc
["action_id"]
4315 "_admin.deployed.RO.nsr_delete_action_id"
4316 ] = ro_delete_action
4317 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4318 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4319 if ro_delete_action
:
4320 # wait until NS is deleted from VIM
4321 stage
[2] = "Waiting ns deleted from VIM."
4322 detailed_status_old
= None
4326 + " RO_id={} ro_delete_action={}".format(
4327 ro_nsr_id
, ro_delete_action
4330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4331 self
._write
_op
_status
(nslcmop_id
, stage
)
4333 delete_timeout
= 20 * 60 # 20 minutes
4334 while delete_timeout
> 0:
4335 desc
= await self
.RO
.show(
4337 item_id_name
=ro_nsr_id
,
4338 extra_item
="action",
4339 extra_item_id
=ro_delete_action
,
4343 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4345 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4346 if ns_status
== "ERROR":
4347 raise ROclient
.ROClientException(ns_status_info
)
4348 elif ns_status
== "BUILD":
4349 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4350 elif ns_status
== "ACTIVE":
4351 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4352 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4357 ), "ROclient.check_action_status returns unknown {}".format(
4360 if stage
[2] != detailed_status_old
:
4361 detailed_status_old
= stage
[2]
4362 db_nsr_update
["detailed-status"] = " ".join(stage
)
4363 self
._write
_op
_status
(nslcmop_id
, stage
)
4364 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4365 await asyncio
.sleep(5, loop
=self
.loop
)
4367 else: # delete_timeout <= 0:
4368 raise ROclient
.ROClientException(
4369 "Timeout waiting ns deleted from VIM"
4372 except Exception as e
:
4373 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4375 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4377 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4378 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4379 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4381 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4384 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4386 failed_detail
.append("delete conflict: {}".format(e
))
4389 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4392 failed_detail
.append("delete error: {}".format(e
))
4394 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4398 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4399 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4401 stage
[2] = "Deleting nsd from RO."
4402 db_nsr_update
["detailed-status"] = " ".join(stage
)
4403 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4404 self
._write
_op
_status
(nslcmop_id
, stage
)
4405 await self
.RO
.delete("nsd", ro_nsd_id
)
4407 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4409 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4410 except Exception as e
:
4412 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4414 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4416 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4419 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4421 failed_detail
.append(
4422 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4424 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4426 failed_detail
.append(
4427 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4429 self
.logger
.error(logging_text
+ failed_detail
[-1])
4431 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4432 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4433 if not vnf_deployed
or not vnf_deployed
["id"]:
4436 ro_vnfd_id
= vnf_deployed
["id"]
4439 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4440 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4442 db_nsr_update
["detailed-status"] = " ".join(stage
)
4443 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4444 self
._write
_op
_status
(nslcmop_id
, stage
)
4445 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4447 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4449 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4450 except Exception as e
:
4452 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4455 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4459 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4462 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4464 failed_detail
.append(
4465 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4467 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4469 failed_detail
.append(
4470 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4472 self
.logger
.error(logging_text
+ failed_detail
[-1])
4475 stage
[2] = "Error deleting from VIM"
4477 stage
[2] = "Deleted from VIM"
4478 db_nsr_update
["detailed-status"] = " ".join(stage
)
4479 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4480 self
._write
_op
_status
(nslcmop_id
, stage
)
4483 raise LcmException("; ".join(failed_detail
))
4485 async def terminate(self
, nsr_id
, nslcmop_id
):
4486 # Try to lock HA task here
4487 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4488 if not task_is_locked_by_me
:
4491 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4492 self
.logger
.debug(logging_text
+ "Enter")
4493 timeout_ns_terminate
= self
.timeout_ns_terminate
4496 operation_params
= None
4498 error_list
= [] # annotates all failed error messages
4499 db_nslcmop_update
= {}
4500 autoremove
= False # autoremove after terminated
4501 tasks_dict_info
= {}
4504 "Stage 1/3: Preparing task.",
4505 "Waiting for previous operations to terminate.",
4508 # ^ contains [stage, step, VIM-status]
4510 # wait for any previous tasks in process
4511 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4513 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4514 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4515 operation_params
= db_nslcmop
.get("operationParams") or {}
4516 if operation_params
.get("timeout_ns_terminate"):
4517 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4518 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4519 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4521 db_nsr_update
["operational-status"] = "terminating"
4522 db_nsr_update
["config-status"] = "terminating"
4523 self
._write
_ns
_status
(
4525 ns_state
="TERMINATING",
4526 current_operation
="TERMINATING",
4527 current_operation_id
=nslcmop_id
,
4528 other_update
=db_nsr_update
,
4530 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4531 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4532 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4535 stage
[1] = "Getting vnf descriptors from db."
4536 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4538 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4540 db_vnfds_from_id
= {}
4541 db_vnfds_from_member_index
= {}
4543 for vnfr
in db_vnfrs_list
:
4544 vnfd_id
= vnfr
["vnfd-id"]
4545 if vnfd_id
not in db_vnfds_from_id
:
4546 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4547 db_vnfds_from_id
[vnfd_id
] = vnfd
4548 db_vnfds_from_member_index
[
4549 vnfr
["member-vnf-index-ref"]
4550 ] = db_vnfds_from_id
[vnfd_id
]
4552 # Destroy individual execution environments when there are terminating primitives.
4553 # Rest of EE will be deleted at once
4554 # TODO - check before calling _destroy_N2VC
4555 # if not operation_params.get("skip_terminate_primitives"):#
4556 # or not vca.get("needed_terminate"):
4557 stage
[0] = "Stage 2/3 execute terminating primitives."
4558 self
.logger
.debug(logging_text
+ stage
[0])
4559 stage
[1] = "Looking execution environment that needs terminate."
4560 self
.logger
.debug(logging_text
+ stage
[1])
4562 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4563 config_descriptor
= None
4564 vca_member_vnf_index
= vca
.get("member-vnf-index")
4565 vca_id
= self
.get_vca_id(
4566 db_vnfrs_dict
.get(vca_member_vnf_index
)
4567 if vca_member_vnf_index
4571 if not vca
or not vca
.get("ee_id"):
4573 if not vca
.get("member-vnf-index"):
4575 config_descriptor
= db_nsr
.get("ns-configuration")
4576 elif vca
.get("vdu_id"):
4577 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4578 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4579 elif vca
.get("kdu_name"):
4580 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4581 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4583 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4584 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4585 vca_type
= vca
.get("type")
4586 exec_terminate_primitives
= not operation_params
.get(
4587 "skip_terminate_primitives"
4588 ) and vca
.get("needed_terminate")
4589 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4590 # pending native charms
4592 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4594 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4595 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4596 task
= asyncio
.ensure_future(
4604 exec_terminate_primitives
,
4608 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4610 # wait for pending tasks of terminate primitives
4614 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4616 error_list
= await self
._wait
_for
_tasks
(
4619 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4623 tasks_dict_info
.clear()
4625 return # raise LcmException("; ".join(error_list))
4627 # remove All execution environments at once
4628 stage
[0] = "Stage 3/3 delete all."
4630 if nsr_deployed
.get("VCA"):
4631 stage
[1] = "Deleting all execution environments."
4632 self
.logger
.debug(logging_text
+ stage
[1])
4633 vca_id
= self
.get_vca_id({}, db_nsr
)
4634 task_delete_ee
= asyncio
.ensure_future(
4636 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4637 timeout
=self
.timeout_charm_delete
,
4640 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4641 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4643 # Delete Namespace and Certificates if necessary
4644 if check_helm_ee_in_ns(list(db_vnfds_from_member_index
.values())):
4645 await self
.vca_map
["helm-v3"].delete_tls_certificate(
4646 certificate_name
=db_nslcmop
["nsInstanceId"],
4648 # TODO: Delete namespace
4650 # Delete from k8scluster
4651 stage
[1] = "Deleting KDUs."
4652 self
.logger
.debug(logging_text
+ stage
[1])
4653 # print(nsr_deployed)
4654 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4655 if not kdu
or not kdu
.get("kdu-instance"):
4657 kdu_instance
= kdu
.get("kdu-instance")
4658 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4659 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4660 vca_id
= self
.get_vca_id({}, db_nsr
)
4661 task_delete_kdu_instance
= asyncio
.ensure_future(
4662 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4663 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4664 kdu_instance
=kdu_instance
,
4666 namespace
=kdu
.get("namespace"),
4672 + "Unknown k8s deployment type {}".format(
4673 kdu
.get("k8scluster-type")
4678 task_delete_kdu_instance
4679 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4682 stage
[1] = "Deleting ns from VIM."
4684 task_delete_ro
= asyncio
.ensure_future(
4685 self
._terminate
_ng
_ro
(
4686 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4690 task_delete_ro
= asyncio
.ensure_future(
4692 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4695 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4697 # rest of staff will be done at finally
4700 ROclient
.ROClientException
,
4705 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4707 except asyncio
.CancelledError
:
4709 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4711 exc
= "Operation was cancelled"
4712 except Exception as e
:
4713 exc
= traceback
.format_exc()
4714 self
.logger
.critical(
4715 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4720 error_list
.append(str(exc
))
4722 # wait for pending tasks
4724 stage
[1] = "Waiting for terminate pending tasks."
4725 self
.logger
.debug(logging_text
+ stage
[1])
4726 error_list
+= await self
._wait
_for
_tasks
(
4729 timeout_ns_terminate
,
4733 stage
[1] = stage
[2] = ""
4734 except asyncio
.CancelledError
:
4735 error_list
.append("Cancelled")
4736 # TODO cancell all tasks
4737 except Exception as exc
:
4738 error_list
.append(str(exc
))
4739 # update status at database
4741 error_detail
= "; ".join(error_list
)
4742 # self.logger.error(logging_text + error_detail)
4743 error_description_nslcmop
= "{} Detail: {}".format(
4744 stage
[0], error_detail
4746 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4747 nslcmop_id
, stage
[0]
4750 db_nsr_update
["operational-status"] = "failed"
4751 db_nsr_update
["detailed-status"] = (
4752 error_description_nsr
+ " Detail: " + error_detail
4754 db_nslcmop_update
["detailed-status"] = error_detail
4755 nslcmop_operation_state
= "FAILED"
4759 error_description_nsr
= error_description_nslcmop
= None
4760 ns_state
= "NOT_INSTANTIATED"
4761 db_nsr_update
["operational-status"] = "terminated"
4762 db_nsr_update
["detailed-status"] = "Done"
4763 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4764 db_nslcmop_update
["detailed-status"] = "Done"
4765 nslcmop_operation_state
= "COMPLETED"
4768 self
._write
_ns
_status
(
4771 current_operation
="IDLE",
4772 current_operation_id
=None,
4773 error_description
=error_description_nsr
,
4774 error_detail
=error_detail
,
4775 other_update
=db_nsr_update
,
4777 self
._write
_op
_status
(
4780 error_message
=error_description_nslcmop
,
4781 operation_state
=nslcmop_operation_state
,
4782 other_update
=db_nslcmop_update
,
4784 if ns_state
== "NOT_INSTANTIATED":
4788 {"nsr-id-ref": nsr_id
},
4789 {"_admin.nsState": "NOT_INSTANTIATED"},
4791 except DbException
as e
:
4794 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4798 if operation_params
:
4799 autoremove
= operation_params
.get("autoremove", False)
4800 if nslcmop_operation_state
:
4802 await self
.msg
.aiowrite(
4807 "nslcmop_id": nslcmop_id
,
4808 "operationState": nslcmop_operation_state
,
4809 "autoremove": autoremove
,
4813 except Exception as e
:
4815 logging_text
+ "kafka_write notification Exception {}".format(e
)
4818 self
.logger
.debug(logging_text
+ "Exit")
4819 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4821 async def _wait_for_tasks(
4822 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4825 error_detail_list
= []
4827 pending_tasks
= list(created_tasks_info
.keys())
4828 num_tasks
= len(pending_tasks
)
4830 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4831 self
._write
_op
_status
(nslcmop_id
, stage
)
4832 while pending_tasks
:
4834 _timeout
= timeout
+ time_start
- time()
4835 done
, pending_tasks
= await asyncio
.wait(
4836 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4838 num_done
+= len(done
)
4839 if not done
: # Timeout
4840 for task
in pending_tasks
:
4841 new_error
= created_tasks_info
[task
] + ": Timeout"
4842 error_detail_list
.append(new_error
)
4843 error_list
.append(new_error
)
4846 if task
.cancelled():
4849 exc
= task
.exception()
4851 if isinstance(exc
, asyncio
.TimeoutError
):
4853 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4854 error_list
.append(created_tasks_info
[task
])
4855 error_detail_list
.append(new_error
)
4862 ROclient
.ROClientException
,
4868 self
.logger
.error(logging_text
+ new_error
)
4870 exc_traceback
= "".join(
4871 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4875 + created_tasks_info
[task
]
4881 logging_text
+ created_tasks_info
[task
] + ": Done"
4883 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4885 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4886 if nsr_id
: # update also nsr
4891 "errorDescription": "Error at: " + ", ".join(error_list
),
4892 "errorDetail": ". ".join(error_detail_list
),
4895 self
._write
_op
_status
(nslcmop_id
, stage
)
4896 return error_detail_list
4899 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4901 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4902 The default-value is used. If it is between < > it look for a value at instantiation_params
4903 :param primitive_desc: portion of VNFD/NSD that describes primitive
4904 :param params: Params provided by user
4905 :param instantiation_params: Instantiation params provided by user
4906 :return: a dictionary with the calculated params
4908 calculated_params
= {}
4909 for parameter
in primitive_desc
.get("parameter", ()):
4910 param_name
= parameter
["name"]
4911 if param_name
in params
:
4912 calculated_params
[param_name
] = params
[param_name
]
4913 elif "default-value" in parameter
or "value" in parameter
:
4914 if "value" in parameter
:
4915 calculated_params
[param_name
] = parameter
["value"]
4917 calculated_params
[param_name
] = parameter
["default-value"]
4919 isinstance(calculated_params
[param_name
], str)
4920 and calculated_params
[param_name
].startswith("<")
4921 and calculated_params
[param_name
].endswith(">")
4923 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4924 calculated_params
[param_name
] = instantiation_params
[
4925 calculated_params
[param_name
][1:-1]
4929 "Parameter {} needed to execute primitive {} not provided".format(
4930 calculated_params
[param_name
], primitive_desc
["name"]
4935 "Parameter {} needed to execute primitive {} not provided".format(
4936 param_name
, primitive_desc
["name"]
4940 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4941 calculated_params
[param_name
] = yaml
.safe_dump(
4942 calculated_params
[param_name
], default_flow_style
=True, width
=256
4944 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4946 ].startswith("!!yaml "):
4947 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4948 if parameter
.get("data-type") == "INTEGER":
4950 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4951 except ValueError: # error converting string to int
4953 "Parameter {} of primitive {} must be integer".format(
4954 param_name
, primitive_desc
["name"]
4957 elif parameter
.get("data-type") == "BOOLEAN":
4958 calculated_params
[param_name
] = not (
4959 (str(calculated_params
[param_name
])).lower() == "false"
4962 # add always ns_config_info if primitive name is config
4963 if primitive_desc
["name"] == "config":
4964 if "ns_config_info" in instantiation_params
:
4965 calculated_params
["ns_config_info"] = instantiation_params
[
4968 return calculated_params
4970 def _look_for_deployed_vca(
4977 ee_descriptor_id
=None,
4979 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4980 for vca
in deployed_vca
:
4983 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4986 vdu_count_index
is not None
4987 and vdu_count_index
!= vca
["vdu_count_index"]
4990 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4992 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4996 # vca_deployed not found
4998 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4999 " is not deployed".format(
5008 ee_id
= vca
.get("ee_id")
5010 "type", "lxc_proxy_charm"
5011 ) # default value for backward compatibility - proxy charm
5014 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
5015 "execution environment".format(
5016 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5019 return ee_id
, vca_type
5021 async def _ns_execute_primitive(
5027 retries_interval
=30,
5034 if primitive
== "config":
5035 primitive_params
= {"params": primitive_params
}
5037 vca_type
= vca_type
or "lxc_proxy_charm"
5041 output
= await asyncio
.wait_for(
5042 self
.vca_map
[vca_type
].exec_primitive(
5044 primitive_name
=primitive
,
5045 params_dict
=primitive_params
,
5046 progress_timeout
=self
.timeout_progress_primitive
,
5047 total_timeout
=self
.timeout_primitive
,
5052 timeout
=timeout
or self
.timeout_primitive
,
5056 except asyncio
.CancelledError
:
5058 except Exception as e
:
5062 "Error executing action {} on {} -> {}".format(
5067 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5069 if isinstance(e
, asyncio
.TimeoutError
):
5071 message
="Timed out waiting for action to complete"
5073 return "FAILED", getattr(e
, "message", repr(e
))
5075 return "COMPLETED", output
5077 except (LcmException
, asyncio
.CancelledError
):
5079 except Exception as e
:
5080 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5082 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5084 Updating the vca_status with latest juju information in nsrs record
5085 :param: nsr_id: Id of the nsr
5086 :param: nslcmop_id: Id of the nslcmop
5090 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5091 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5092 vca_id
= self
.get_vca_id({}, db_nsr
)
5093 if db_nsr
["_admin"]["deployed"]["K8s"]:
5094 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5095 cluster_uuid
, kdu_instance
, cluster_type
= (
5096 k8s
["k8scluster-uuid"],
5097 k8s
["kdu-instance"],
5098 k8s
["k8scluster-type"],
5100 await self
._on
_update
_k
8s
_db
(
5101 cluster_uuid
=cluster_uuid
,
5102 kdu_instance
=kdu_instance
,
5103 filter={"_id": nsr_id
},
5105 cluster_type
=cluster_type
,
5108 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5109 table
, filter = "nsrs", {"_id": nsr_id
}
5110 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5111 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5113 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5114 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5116 async def action(self
, nsr_id
, nslcmop_id
):
5117 # Try to lock HA task here
5118 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5119 if not task_is_locked_by_me
:
5122 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5123 self
.logger
.debug(logging_text
+ "Enter")
5124 # get all needed from database
5128 db_nslcmop_update
= {}
5129 nslcmop_operation_state
= None
5130 error_description_nslcmop
= None
5133 # wait for any previous tasks in process
5134 step
= "Waiting for previous operations to terminate"
5135 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5137 self
._write
_ns
_status
(
5140 current_operation
="RUNNING ACTION",
5141 current_operation_id
=nslcmop_id
,
5144 step
= "Getting information from database"
5145 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5146 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5147 if db_nslcmop
["operationParams"].get("primitive_params"):
5148 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5149 db_nslcmop
["operationParams"]["primitive_params"]
5152 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5153 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5154 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5155 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5156 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5157 primitive
= db_nslcmop
["operationParams"]["primitive"]
5158 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5159 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5160 "timeout_ns_action", self
.timeout_primitive
5164 step
= "Getting vnfr from database"
5165 db_vnfr
= self
.db
.get_one(
5166 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5168 if db_vnfr
.get("kdur"):
5170 for kdur
in db_vnfr
["kdur"]:
5171 if kdur
.get("additionalParams"):
5172 kdur
["additionalParams"] = json
.loads(
5173 kdur
["additionalParams"]
5175 kdur_list
.append(kdur
)
5176 db_vnfr
["kdur"] = kdur_list
5177 step
= "Getting vnfd from database"
5178 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5180 # Sync filesystem before running a primitive
5181 self
.fs
.sync(db_vnfr
["vnfd-id"])
5183 step
= "Getting nsd from database"
5184 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5186 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5187 # for backward compatibility
5188 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5189 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5190 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5191 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5193 # look for primitive
5194 config_primitive_desc
= descriptor_configuration
= None
5196 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5198 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5200 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5202 descriptor_configuration
= db_nsd
.get("ns-configuration")
5204 if descriptor_configuration
and descriptor_configuration
.get(
5207 for config_primitive
in descriptor_configuration
["config-primitive"]:
5208 if config_primitive
["name"] == primitive
:
5209 config_primitive_desc
= config_primitive
5212 if not config_primitive_desc
:
5213 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5215 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5219 primitive_name
= primitive
5220 ee_descriptor_id
= None
5222 primitive_name
= config_primitive_desc
.get(
5223 "execution-environment-primitive", primitive
5225 ee_descriptor_id
= config_primitive_desc
.get(
5226 "execution-environment-ref"
5232 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5234 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5237 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5239 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5241 desc_params
= parse_yaml_strings(
5242 db_vnfr
.get("additionalParamsForVnf")
5245 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5246 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5247 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5249 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5250 actions
.add(primitive
["name"])
5251 for primitive
in kdu_configuration
.get("config-primitive", []):
5252 actions
.add(primitive
["name"])
5254 nsr_deployed
["K8s"],
5255 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5256 and kdu
["member-vnf-index"] == vnf_index
,
5260 if primitive_name
in actions
5261 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5265 # TODO check if ns is in a proper status
5267 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5269 # kdur and desc_params already set from before
5270 if primitive_params
:
5271 desc_params
.update(primitive_params
)
5272 # TODO Check if we will need something at vnf level
5273 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5275 kdu_name
== kdu
["kdu-name"]
5276 and kdu
["member-vnf-index"] == vnf_index
5281 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5284 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5285 msg
= "unknown k8scluster-type '{}'".format(
5286 kdu
.get("k8scluster-type")
5288 raise LcmException(msg
)
5291 "collection": "nsrs",
5292 "filter": {"_id": nsr_id
},
5293 "path": "_admin.deployed.K8s.{}".format(index
),
5297 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5299 step
= "Executing kdu {}".format(primitive_name
)
5300 if primitive_name
== "upgrade":
5301 if desc_params
.get("kdu_model"):
5302 kdu_model
= desc_params
.get("kdu_model")
5303 del desc_params
["kdu_model"]
5305 kdu_model
= kdu
.get("kdu-model")
5306 parts
= kdu_model
.split(sep
=":")
5308 kdu_model
= parts
[0]
5309 if desc_params
.get("kdu_atomic_upgrade"):
5310 atomic_upgrade
= desc_params
.get(
5311 "kdu_atomic_upgrade"
5312 ).lower() in ("yes", "true", "1")
5313 del desc_params
["kdu_atomic_upgrade"]
5315 atomic_upgrade
= True
5317 detailed_status
= await asyncio
.wait_for(
5318 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5319 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5320 kdu_instance
=kdu
.get("kdu-instance"),
5321 atomic
=atomic_upgrade
,
5322 kdu_model
=kdu_model
,
5325 timeout
=timeout_ns_action
,
5327 timeout
=timeout_ns_action
+ 10,
5330 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5332 elif primitive_name
== "rollback":
5333 detailed_status
= await asyncio
.wait_for(
5334 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5335 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5336 kdu_instance
=kdu
.get("kdu-instance"),
5339 timeout
=timeout_ns_action
,
5341 elif primitive_name
== "status":
5342 detailed_status
= await asyncio
.wait_for(
5343 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5344 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5345 kdu_instance
=kdu
.get("kdu-instance"),
5348 timeout
=timeout_ns_action
,
5351 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5352 kdu
["kdu-name"], nsr_id
5354 params
= self
._map
_primitive
_params
(
5355 config_primitive_desc
, primitive_params
, desc_params
5358 detailed_status
= await asyncio
.wait_for(
5359 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5360 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5361 kdu_instance
=kdu_instance
,
5362 primitive_name
=primitive_name
,
5365 timeout
=timeout_ns_action
,
5368 timeout
=timeout_ns_action
,
5372 nslcmop_operation_state
= "COMPLETED"
5374 detailed_status
= ""
5375 nslcmop_operation_state
= "FAILED"
5377 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5378 nsr_deployed
["VCA"],
5379 member_vnf_index
=vnf_index
,
5381 vdu_count_index
=vdu_count_index
,
5382 ee_descriptor_id
=ee_descriptor_id
,
5384 for vca_index
, vca_deployed
in enumerate(
5385 db_nsr
["_admin"]["deployed"]["VCA"]
5387 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5389 "collection": "nsrs",
5390 "filter": {"_id": nsr_id
},
5391 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5395 nslcmop_operation_state
,
5397 ) = await self
._ns
_execute
_primitive
(
5399 primitive
=primitive_name
,
5400 primitive_params
=self
._map
_primitive
_params
(
5401 config_primitive_desc
, primitive_params
, desc_params
5403 timeout
=timeout_ns_action
,
5409 db_nslcmop_update
["detailed-status"] = detailed_status
5410 error_description_nslcmop
= (
5411 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5415 + "Done with result {} {}".format(
5416 nslcmop_operation_state
, detailed_status
5419 return # database update is called inside finally
5421 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5422 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5424 except asyncio
.CancelledError
:
5426 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5428 exc
= "Operation was cancelled"
5429 except asyncio
.TimeoutError
:
5430 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5432 except Exception as e
:
5433 exc
= traceback
.format_exc()
5434 self
.logger
.critical(
5435 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5444 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5445 nslcmop_operation_state
= "FAILED"
5447 self
._write
_ns
_status
(
5451 ], # TODO check if degraded. For the moment use previous status
5452 current_operation
="IDLE",
5453 current_operation_id
=None,
5454 # error_description=error_description_nsr,
5455 # error_detail=error_detail,
5456 other_update
=db_nsr_update
,
5459 self
._write
_op
_status
(
5462 error_message
=error_description_nslcmop
,
5463 operation_state
=nslcmop_operation_state
,
5464 other_update
=db_nslcmop_update
,
5467 if nslcmop_operation_state
:
5469 await self
.msg
.aiowrite(
5474 "nslcmop_id": nslcmop_id
,
5475 "operationState": nslcmop_operation_state
,
5479 except Exception as e
:
5481 logging_text
+ "kafka_write notification Exception {}".format(e
)
5483 self
.logger
.debug(logging_text
+ "Exit")
5484 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5485 return nslcmop_operation_state
, detailed_status
5487 async def terminate_vdus(
5488 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5490 """This method terminates VDUs
5493 db_vnfr: VNF instance record
5494 member_vnf_index: VNF index to identify the VDUs to be removed
5495 db_nsr: NS instance record
5496 update_db_nslcmops: Nslcmop update record
5498 vca_scaling_info
= []
5499 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5500 scaling_info
["scaling_direction"] = "IN"
5501 scaling_info
["vdu-delete"] = {}
5502 scaling_info
["kdu-delete"] = {}
5503 db_vdur
= db_vnfr
.get("vdur")
5504 vdur_list
= copy(db_vdur
)
5506 for index
, vdu
in enumerate(vdur_list
):
5507 vca_scaling_info
.append(
5509 "osm_vdu_id": vdu
["vdu-id-ref"],
5510 "member-vnf-index": member_vnf_index
,
5512 "vdu_index": count_index
,
5515 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5516 scaling_info
["vdu"].append(
5518 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5519 "vdu_id": vdu
["vdu-id-ref"],
5523 for interface
in vdu
["interfaces"]:
5524 scaling_info
["vdu"][index
]["interface"].append(
5526 "name": interface
["name"],
5527 "ip_address": interface
["ip-address"],
5528 "mac_address": interface
.get("mac-address"),
5531 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5532 stage
[2] = "Terminating VDUs"
5533 if scaling_info
.get("vdu-delete"):
5534 # scale_process = "RO"
5535 if self
.ro_config
.get("ng"):
5536 await self
._scale
_ng
_ro
(
5545 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5546 """This method is to Remove VNF instances from NS.
5549 nsr_id: NS instance id
5550 nslcmop_id: nslcmop id of update
5551 vnf_instance_id: id of the VNF instance to be removed
5554 result: (str, str) COMPLETED/FAILED, details
5558 logging_text
= "Task ns={} update ".format(nsr_id
)
5559 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5560 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5561 if check_vnfr_count
> 1:
5562 stage
= ["", "", ""]
5563 step
= "Getting nslcmop from database"
5565 step
+ " after having waited for previous tasks to be completed"
5567 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5568 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5569 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5570 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5571 """ db_vnfr = self.db.get_one(
5572 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5574 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5575 await self
.terminate_vdus(
5584 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5585 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5586 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5587 "constituent-vnfr-ref"
5589 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5590 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5591 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5592 return "COMPLETED", "Done"
5594 step
= "Terminate VNF Failed with"
5596 "{} Cannot terminate the last VNF in this NS.".format(
5600 except (LcmException
, asyncio
.CancelledError
):
5602 except Exception as e
:
5603 self
.logger
.debug("Error removing VNF {}".format(e
))
5604 return "FAILED", "Error removing VNF {}".format(e
)
5606 async def _ns_redeploy_vnf(
5614 """This method updates and redeploys VNF instances
5617 nsr_id: NS instance id
5618 nslcmop_id: nslcmop id
5619 db_vnfd: VNF descriptor
5620 db_vnfr: VNF instance record
5621 db_nsr: NS instance record
5624 result: (str, str) COMPLETED/FAILED, details
5628 stage
= ["", "", ""]
5629 logging_text
= "Task ns={} update ".format(nsr_id
)
5630 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5631 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5633 # Terminate old VNF resources
5634 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5635 await self
.terminate_vdus(
5644 # old_vnfd_id = db_vnfr["vnfd-id"]
5645 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5646 new_db_vnfd
= db_vnfd
5647 # new_vnfd_ref = new_db_vnfd["id"]
5648 # new_vnfd_id = vnfd_id
5652 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5654 "name": cp
.get("id"),
5655 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5656 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5659 new_vnfr_cp
.append(vnf_cp
)
5660 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5661 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5662 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5664 "revision": latest_vnfd_revision
,
5665 "connection-point": new_vnfr_cp
,
5669 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5670 updated_db_vnfr
= self
.db
.get_one(
5672 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5675 # Instantiate new VNF resources
5676 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5677 vca_scaling_info
= []
5678 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5679 scaling_info
["scaling_direction"] = "OUT"
5680 scaling_info
["vdu-create"] = {}
5681 scaling_info
["kdu-create"] = {}
5682 vdud_instantiate_list
= db_vnfd
["vdu"]
5683 for index
, vdud
in enumerate(vdud_instantiate_list
):
5684 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5686 additional_params
= (
5687 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5690 cloud_init_list
= []
5692 # TODO Information of its own ip is not available because db_vnfr is not updated.
5693 additional_params
["OSM"] = get_osm_params(
5694 updated_db_vnfr
, vdud
["id"], 1
5696 cloud_init_list
.append(
5697 self
._parse
_cloud
_init
(
5704 vca_scaling_info
.append(
5706 "osm_vdu_id": vdud
["id"],
5707 "member-vnf-index": member_vnf_index
,
5709 "vdu_index": count_index
,
5712 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5713 if self
.ro_config
.get("ng"):
5715 "New Resources to be deployed: {}".format(scaling_info
)
5717 await self
._scale
_ng
_ro
(
5725 return "COMPLETED", "Done"
5726 except (LcmException
, asyncio
.CancelledError
):
5728 except Exception as e
:
5729 self
.logger
.debug("Error updating VNF {}".format(e
))
5730 return "FAILED", "Error updating VNF {}".format(e
)
5732 async def _ns_charm_upgrade(
5738 timeout
: float = None,
5740 """This method upgrade charms in VNF instances
5743 ee_id: Execution environment id
5744 path: Local path to the charm
5746 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5747 timeout: (Float) Timeout for the ns update operation
5750 result: (str, str) COMPLETED/FAILED, details
5753 charm_type
= charm_type
or "lxc_proxy_charm"
5754 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5758 charm_type
=charm_type
,
5759 timeout
=timeout
or self
.timeout_ns_update
,
5763 return "COMPLETED", output
5765 except (LcmException
, asyncio
.CancelledError
):
5768 except Exception as e
:
5770 self
.logger
.debug("Error upgrading charm {}".format(path
))
5772 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5774 async def update(self
, nsr_id
, nslcmop_id
):
5775 """Update NS according to different update types
5777 This method performs upgrade of VNF instances then updates the revision
5778 number in VNF record
5781 nsr_id: Network service will be updated
5782 nslcmop_id: ns lcm operation id
5785 It may raise DbException, LcmException, N2VCException, K8sException
5788 # Try to lock HA task here
5789 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5790 if not task_is_locked_by_me
:
5793 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5794 self
.logger
.debug(logging_text
+ "Enter")
5796 # Set the required variables to be filled up later
5798 db_nslcmop_update
= {}
5800 nslcmop_operation_state
= None
5802 error_description_nslcmop
= ""
5804 change_type
= "updated"
5805 detailed_status
= ""
5808 # wait for any previous tasks in process
5809 step
= "Waiting for previous operations to terminate"
5810 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5811 self
._write
_ns
_status
(
5814 current_operation
="UPDATING",
5815 current_operation_id
=nslcmop_id
,
5818 step
= "Getting nslcmop from database"
5819 db_nslcmop
= self
.db
.get_one(
5820 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5822 update_type
= db_nslcmop
["operationParams"]["updateType"]
5824 step
= "Getting nsr from database"
5825 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5826 old_operational_status
= db_nsr
["operational-status"]
5827 db_nsr_update
["operational-status"] = "updating"
5828 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5829 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5831 if update_type
== "CHANGE_VNFPKG":
5833 # Get the input parameters given through update request
5834 vnf_instance_id
= db_nslcmop
["operationParams"][
5835 "changeVnfPackageData"
5836 ].get("vnfInstanceId")
5838 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5841 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5843 step
= "Getting vnfr from database"
5844 db_vnfr
= self
.db
.get_one(
5845 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5848 step
= "Getting vnfds from database"
5850 latest_vnfd
= self
.db
.get_one(
5851 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5853 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5856 current_vnf_revision
= db_vnfr
.get("revision", 1)
5857 current_vnfd
= self
.db
.get_one(
5859 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5860 fail_on_empty
=False,
5862 # Charm artifact paths will be filled up later
5864 current_charm_artifact_path
,
5865 target_charm_artifact_path
,
5866 charm_artifact_paths
,
5868 ) = ([], [], [], [])
5870 step
= "Checking if revision has changed in VNFD"
5871 if current_vnf_revision
!= latest_vnfd_revision
:
5873 change_type
= "policy_updated"
5875 # There is new revision of VNFD, update operation is required
5876 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5877 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5879 step
= "Removing the VNFD packages if they exist in the local path"
5880 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5881 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5883 step
= "Get the VNFD packages from FSMongo"
5884 self
.fs
.sync(from_path
=latest_vnfd_path
)
5885 self
.fs
.sync(from_path
=current_vnfd_path
)
5888 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5890 current_base_folder
= current_vnfd
["_admin"]["storage"]
5891 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5893 for vca_index
, vca_deployed
in enumerate(
5894 get_iterable(nsr_deployed
, "VCA")
5896 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5898 # Getting charm-id and charm-type
5899 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5900 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5901 vca_type
= vca_deployed
.get("type")
5902 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5905 ee_id
= vca_deployed
.get("ee_id")
5907 step
= "Getting descriptor config"
5908 descriptor_config
= get_configuration(
5909 current_vnfd
, current_vnfd
["id"]
5912 if "execution-environment-list" in descriptor_config
:
5913 ee_list
= descriptor_config
.get(
5914 "execution-environment-list", []
5919 # There could be several charm used in the same VNF
5920 for ee_item
in ee_list
:
5921 if ee_item
.get("juju"):
5923 step
= "Getting charm name"
5924 charm_name
= ee_item
["juju"].get("charm")
5926 step
= "Setting Charm artifact paths"
5927 current_charm_artifact_path
.append(
5928 get_charm_artifact_path(
5929 current_base_folder
,
5932 current_vnf_revision
,
5935 target_charm_artifact_path
.append(
5936 get_charm_artifact_path(
5940 latest_vnfd_revision
,
5943 elif ee_item
.get("helm-chart"):
5944 # add chart to list and all parameters
5945 step
= "Getting helm chart name"
5946 chart_name
= ee_item
.get("helm-chart")
5948 ee_item
.get("helm-version")
5949 and ee_item
.get("helm-version") == "v2"
5953 vca_type
= "helm-v3"
5954 step
= "Setting Helm chart artifact paths"
5956 helm_artifacts
.append(
5958 "current_artifact_path": get_charm_artifact_path(
5959 current_base_folder
,
5962 current_vnf_revision
,
5964 "target_artifact_path": get_charm_artifact_path(
5968 latest_vnfd_revision
,
5971 "vca_index": vca_index
,
5972 "vdu_index": vdu_count_index
,
5976 charm_artifact_paths
= zip(
5977 current_charm_artifact_path
, target_charm_artifact_path
5980 step
= "Checking if software version has changed in VNFD"
5981 if find_software_version(current_vnfd
) != find_software_version(
5985 step
= "Checking if existing VNF has charm"
5986 for current_charm_path
, target_charm_path
in list(
5987 charm_artifact_paths
5989 if current_charm_path
:
5991 "Software version change is not supported as VNF instance {} has charm.".format(
5996 # There is no change in the charm package, then redeploy the VNF
5997 # based on new descriptor
5998 step
= "Redeploying VNF"
5999 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6000 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
6001 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
6003 if result
== "FAILED":
6004 nslcmop_operation_state
= result
6005 error_description_nslcmop
= detailed_status
6006 db_nslcmop_update
["detailed-status"] = detailed_status
6009 + " step {} Done with result {} {}".format(
6010 step
, nslcmop_operation_state
, detailed_status
6015 step
= "Checking if any charm package has changed or not"
6016 for current_charm_path
, target_charm_path
in list(
6017 charm_artifact_paths
6021 and target_charm_path
6022 and self
.check_charm_hash_changed(
6023 current_charm_path
, target_charm_path
6027 step
= "Checking whether VNF uses juju bundle"
6028 if check_juju_bundle_existence(current_vnfd
):
6031 "Charm upgrade is not supported for the instance which"
6032 " uses juju-bundle: {}".format(
6033 check_juju_bundle_existence(current_vnfd
)
6037 step
= "Upgrading Charm"
6041 ) = await self
._ns
_charm
_upgrade
(
6044 charm_type
=vca_type
,
6045 path
=self
.fs
.path
+ target_charm_path
,
6046 timeout
=timeout_seconds
,
6049 if result
== "FAILED":
6050 nslcmop_operation_state
= result
6051 error_description_nslcmop
= detailed_status
6053 db_nslcmop_update
["detailed-status"] = detailed_status
6056 + " step {} Done with result {} {}".format(
6057 step
, nslcmop_operation_state
, detailed_status
6061 step
= "Updating policies"
6062 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6063 result
= "COMPLETED"
6064 detailed_status
= "Done"
6065 db_nslcmop_update
["detailed-status"] = "Done"
6068 for item
in helm_artifacts
:
6070 item
["current_artifact_path"]
6071 and item
["target_artifact_path"]
6072 and self
.check_charm_hash_changed(
6073 item
["current_artifact_path"],
6074 item
["target_artifact_path"],
6078 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6081 vnfr_id
= db_vnfr
["_id"]
6082 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6084 "collection": "nsrs",
6085 "filter": {"_id": nsr_id
},
6086 "path": db_update_entry
,
6088 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6089 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6090 namespace
=namespace
,
6094 artifact_path
=item
["target_artifact_path"],
6097 vnf_id
= db_vnfr
.get("vnfd-ref")
6098 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6099 self
.logger
.debug("get ssh key block")
6103 ("config-access", "ssh-access", "required"),
6105 # Needed to inject a ssh key
6108 ("config-access", "ssh-access", "default-user"),
6111 "Install configuration Software, getting public ssh key"
6113 pub_key
= await self
.vca_map
[
6115 ].get_ee_ssh_public__key(
6116 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6120 "Insert public key into VM user={} ssh_key={}".format(
6124 self
.logger
.debug(logging_text
+ step
)
6126 # wait for RO (ip-address) Insert pub_key into VM
6127 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6137 initial_config_primitive_list
= config_descriptor
.get(
6138 "initial-config-primitive"
6140 config_primitive
= next(
6143 for p
in initial_config_primitive_list
6144 if p
["name"] == "config"
6148 if not config_primitive
:
6151 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6153 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6154 if db_vnfr
.get("additionalParamsForVnf"):
6155 deploy_params
.update(
6157 db_vnfr
["additionalParamsForVnf"].copy()
6160 primitive_params_
= self
._map
_primitive
_params
(
6161 config_primitive
, {}, deploy_params
6164 step
= "execute primitive '{}' params '{}'".format(
6165 config_primitive
["name"], primitive_params_
6167 self
.logger
.debug(logging_text
+ step
)
6168 await self
.vca_map
[vca_type
].exec_primitive(
6170 primitive_name
=config_primitive
["name"],
6171 params_dict
=primitive_params_
,
6177 step
= "Updating policies"
6178 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6179 detailed_status
= "Done"
6180 db_nslcmop_update
["detailed-status"] = "Done"
6182 # If nslcmop_operation_state is None, so any operation is not failed.
6183 if not nslcmop_operation_state
:
6184 nslcmop_operation_state
= "COMPLETED"
6186 # If update CHANGE_VNFPKG nslcmop_operation is successful
6187 # vnf revision need to be updated
6188 vnfr_update
["revision"] = latest_vnfd_revision
6189 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6193 + " task Done with result {} {}".format(
6194 nslcmop_operation_state
, detailed_status
6197 elif update_type
== "REMOVE_VNF":
6198 # This part is included in https://osm.etsi.org/gerrit/11876
6199 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6200 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6201 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6202 step
= "Removing VNF"
6203 (result
, detailed_status
) = await self
.remove_vnf(
6204 nsr_id
, nslcmop_id
, vnf_instance_id
6206 if result
== "FAILED":
6207 nslcmop_operation_state
= result
6208 error_description_nslcmop
= detailed_status
6209 db_nslcmop_update
["detailed-status"] = detailed_status
6210 change_type
= "vnf_terminated"
6211 if not nslcmop_operation_state
:
6212 nslcmop_operation_state
= "COMPLETED"
6215 + " task Done with result {} {}".format(
6216 nslcmop_operation_state
, detailed_status
6220 elif update_type
== "OPERATE_VNF":
6221 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6224 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6227 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6230 (result
, detailed_status
) = await self
.rebuild_start_stop(
6231 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6233 if result
== "FAILED":
6234 nslcmop_operation_state
= result
6235 error_description_nslcmop
= detailed_status
6236 db_nslcmop_update
["detailed-status"] = detailed_status
6237 if not nslcmop_operation_state
:
6238 nslcmop_operation_state
= "COMPLETED"
6241 + " task Done with result {} {}".format(
6242 nslcmop_operation_state
, detailed_status
6246 # If nslcmop_operation_state is None, so any operation is not failed.
6247 # All operations are executed in overall.
6248 if not nslcmop_operation_state
:
6249 nslcmop_operation_state
= "COMPLETED"
6250 db_nsr_update
["operational-status"] = old_operational_status
6252 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6253 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6255 except asyncio
.CancelledError
:
6257 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6259 exc
= "Operation was cancelled"
6260 except asyncio
.TimeoutError
:
6261 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6263 except Exception as e
:
6264 exc
= traceback
.format_exc()
6265 self
.logger
.critical(
6266 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6275 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6276 nslcmop_operation_state
= "FAILED"
6277 db_nsr_update
["operational-status"] = old_operational_status
6279 self
._write
_ns
_status
(
6281 ns_state
=db_nsr
["nsState"],
6282 current_operation
="IDLE",
6283 current_operation_id
=None,
6284 other_update
=db_nsr_update
,
6287 self
._write
_op
_status
(
6290 error_message
=error_description_nslcmop
,
6291 operation_state
=nslcmop_operation_state
,
6292 other_update
=db_nslcmop_update
,
6295 if nslcmop_operation_state
:
6299 "nslcmop_id": nslcmop_id
,
6300 "operationState": nslcmop_operation_state
,
6302 if change_type
in ("vnf_terminated", "policy_updated"):
6303 msg
.update({"vnf_member_index": member_vnf_index
})
6304 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6305 except Exception as e
:
6307 logging_text
+ "kafka_write notification Exception {}".format(e
)
6309 self
.logger
.debug(logging_text
+ "Exit")
6310 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6311 return nslcmop_operation_state
, detailed_status
6313 async def scale(self
, nsr_id
, nslcmop_id
):
6314 # Try to lock HA task here
6315 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6316 if not task_is_locked_by_me
:
6319 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6320 stage
= ["", "", ""]
6321 tasks_dict_info
= {}
6322 # ^ stage, step, VIM progress
6323 self
.logger
.debug(logging_text
+ "Enter")
6324 # get all needed from database
6326 db_nslcmop_update
= {}
6329 # in case of error, indicates what part of scale was failed to put nsr at error status
6330 scale_process
= None
6331 old_operational_status
= ""
6332 old_config_status
= ""
6335 # wait for any previous tasks in process
6336 step
= "Waiting for previous operations to terminate"
6337 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6338 self
._write
_ns
_status
(
6341 current_operation
="SCALING",
6342 current_operation_id
=nslcmop_id
,
6345 step
= "Getting nslcmop from database"
6347 step
+ " after having waited for previous tasks to be completed"
6349 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6351 step
= "Getting nsr from database"
6352 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6353 old_operational_status
= db_nsr
["operational-status"]
6354 old_config_status
= db_nsr
["config-status"]
6356 step
= "Parsing scaling parameters"
6357 db_nsr_update
["operational-status"] = "scaling"
6358 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6359 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6361 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6363 ]["member-vnf-index"]
6364 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6366 ]["scaling-group-descriptor"]
6367 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6368 # for backward compatibility
6369 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6370 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6371 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6372 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6374 step
= "Getting vnfr from database"
6375 db_vnfr
= self
.db
.get_one(
6376 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6379 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6381 step
= "Getting vnfd from database"
6382 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6384 base_folder
= db_vnfd
["_admin"]["storage"]
6386 step
= "Getting scaling-group-descriptor"
6387 scaling_descriptor
= find_in_list(
6388 get_scaling_aspect(db_vnfd
),
6389 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6391 if not scaling_descriptor
:
6393 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6394 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6397 step
= "Sending scale order to VIM"
6398 # TODO check if ns is in a proper status
6400 if not db_nsr
["_admin"].get("scaling-group"):
6405 "_admin.scaling-group": [
6406 {"name": scaling_group
, "nb-scale-op": 0}
6410 admin_scale_index
= 0
6412 for admin_scale_index
, admin_scale_info
in enumerate(
6413 db_nsr
["_admin"]["scaling-group"]
6415 if admin_scale_info
["name"] == scaling_group
:
6416 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6418 else: # not found, set index one plus last element and add new entry with the name
6419 admin_scale_index
+= 1
6421 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6424 vca_scaling_info
= []
6425 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6426 if scaling_type
== "SCALE_OUT":
6427 if "aspect-delta-details" not in scaling_descriptor
:
6429 "Aspect delta details not fount in scaling descriptor {}".format(
6430 scaling_descriptor
["name"]
6433 # count if max-instance-count is reached
6434 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6436 scaling_info
["scaling_direction"] = "OUT"
6437 scaling_info
["vdu-create"] = {}
6438 scaling_info
["kdu-create"] = {}
6439 for delta
in deltas
:
6440 for vdu_delta
in delta
.get("vdu-delta", {}):
6441 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6442 # vdu_index also provides the number of instance of the targeted vdu
6443 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6444 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6448 additional_params
= (
6449 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6452 cloud_init_list
= []
6454 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6455 max_instance_count
= 10
6456 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6457 max_instance_count
= vdu_profile
.get(
6458 "max-number-of-instances", 10
6461 default_instance_num
= get_number_of_instances(
6464 instances_number
= vdu_delta
.get("number-of-instances", 1)
6465 nb_scale_op
+= instances_number
6467 new_instance_count
= nb_scale_op
+ default_instance_num
6468 # Control if new count is over max and vdu count is less than max.
6469 # Then assign new instance count
6470 if new_instance_count
> max_instance_count
> vdu_count
:
6471 instances_number
= new_instance_count
- max_instance_count
6473 instances_number
= instances_number
6475 if new_instance_count
> max_instance_count
:
6477 "reached the limit of {} (max-instance-count) "
6478 "scaling-out operations for the "
6479 "scaling-group-descriptor '{}'".format(
6480 nb_scale_op
, scaling_group
6483 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6485 # TODO Information of its own ip is not available because db_vnfr is not updated.
6486 additional_params
["OSM"] = get_osm_params(
6487 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6489 cloud_init_list
.append(
6490 self
._parse
_cloud
_init
(
6497 vca_scaling_info
.append(
6499 "osm_vdu_id": vdu_delta
["id"],
6500 "member-vnf-index": vnf_index
,
6502 "vdu_index": vdu_index
+ x
,
6505 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6506 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6507 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6508 kdu_name
= kdu_profile
["kdu-name"]
6509 resource_name
= kdu_profile
.get("resource-name", "")
6511 # Might have different kdus in the same delta
6512 # Should have list for each kdu
6513 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6514 scaling_info
["kdu-create"][kdu_name
] = []
6516 kdur
= get_kdur(db_vnfr
, kdu_name
)
6517 if kdur
.get("helm-chart"):
6518 k8s_cluster_type
= "helm-chart-v3"
6519 self
.logger
.debug("kdur: {}".format(kdur
))
6521 kdur
.get("helm-version")
6522 and kdur
.get("helm-version") == "v2"
6524 k8s_cluster_type
= "helm-chart"
6525 elif kdur
.get("juju-bundle"):
6526 k8s_cluster_type
= "juju-bundle"
6529 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6530 "juju-bundle. Maybe an old NBI version is running".format(
6531 db_vnfr
["member-vnf-index-ref"], kdu_name
6535 max_instance_count
= 10
6536 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6537 max_instance_count
= kdu_profile
.get(
6538 "max-number-of-instances", 10
6541 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6542 deployed_kdu
, _
= get_deployed_kdu(
6543 nsr_deployed
, kdu_name
, vnf_index
6545 if deployed_kdu
is None:
6547 "KDU '{}' for vnf '{}' not deployed".format(
6551 kdu_instance
= deployed_kdu
.get("kdu-instance")
6552 instance_num
= await self
.k8scluster_map
[
6558 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6559 kdu_model
=deployed_kdu
.get("kdu-model"),
6561 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6562 "number-of-instances", 1
6565 # Control if new count is over max and instance_num is less than max.
6566 # Then assign max instance number to kdu replica count
6567 if kdu_replica_count
> max_instance_count
> instance_num
:
6568 kdu_replica_count
= max_instance_count
6569 if kdu_replica_count
> max_instance_count
:
6571 "reached the limit of {} (max-instance-count) "
6572 "scaling-out operations for the "
6573 "scaling-group-descriptor '{}'".format(
6574 instance_num
, scaling_group
6578 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6579 vca_scaling_info
.append(
6581 "osm_kdu_id": kdu_name
,
6582 "member-vnf-index": vnf_index
,
6584 "kdu_index": instance_num
+ x
- 1,
6587 scaling_info
["kdu-create"][kdu_name
].append(
6589 "member-vnf-index": vnf_index
,
6591 "k8s-cluster-type": k8s_cluster_type
,
6592 "resource-name": resource_name
,
6593 "scale": kdu_replica_count
,
6596 elif scaling_type
== "SCALE_IN":
6597 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6599 scaling_info
["scaling_direction"] = "IN"
6600 scaling_info
["vdu-delete"] = {}
6601 scaling_info
["kdu-delete"] = {}
6603 for delta
in deltas
:
6604 for vdu_delta
in delta
.get("vdu-delta", {}):
6605 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6606 min_instance_count
= 0
6607 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6608 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6609 min_instance_count
= vdu_profile
["min-number-of-instances"]
6611 default_instance_num
= get_number_of_instances(
6612 db_vnfd
, vdu_delta
["id"]
6614 instance_num
= vdu_delta
.get("number-of-instances", 1)
6615 nb_scale_op
-= instance_num
6617 new_instance_count
= nb_scale_op
+ default_instance_num
6619 if new_instance_count
< min_instance_count
< vdu_count
:
6620 instances_number
= min_instance_count
- new_instance_count
6622 instances_number
= instance_num
6624 if new_instance_count
< min_instance_count
:
6626 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6627 "scaling-group-descriptor '{}'".format(
6628 nb_scale_op
, scaling_group
6631 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6632 vca_scaling_info
.append(
6634 "osm_vdu_id": vdu_delta
["id"],
6635 "member-vnf-index": vnf_index
,
6637 "vdu_index": vdu_index
- 1 - x
,
6640 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6641 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6642 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6643 kdu_name
= kdu_profile
["kdu-name"]
6644 resource_name
= kdu_profile
.get("resource-name", "")
6646 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6647 scaling_info
["kdu-delete"][kdu_name
] = []
6649 kdur
= get_kdur(db_vnfr
, kdu_name
)
6650 if kdur
.get("helm-chart"):
6651 k8s_cluster_type
= "helm-chart-v3"
6652 self
.logger
.debug("kdur: {}".format(kdur
))
6654 kdur
.get("helm-version")
6655 and kdur
.get("helm-version") == "v2"
6657 k8s_cluster_type
= "helm-chart"
6658 elif kdur
.get("juju-bundle"):
6659 k8s_cluster_type
= "juju-bundle"
6662 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6663 "juju-bundle. Maybe an old NBI version is running".format(
6664 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6668 min_instance_count
= 0
6669 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6670 min_instance_count
= kdu_profile
["min-number-of-instances"]
6672 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6673 deployed_kdu
, _
= get_deployed_kdu(
6674 nsr_deployed
, kdu_name
, vnf_index
6676 if deployed_kdu
is None:
6678 "KDU '{}' for vnf '{}' not deployed".format(
6682 kdu_instance
= deployed_kdu
.get("kdu-instance")
6683 instance_num
= await self
.k8scluster_map
[
6689 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6690 kdu_model
=deployed_kdu
.get("kdu-model"),
6692 kdu_replica_count
= instance_num
- kdu_delta
.get(
6693 "number-of-instances", 1
6696 if kdu_replica_count
< min_instance_count
< instance_num
:
6697 kdu_replica_count
= min_instance_count
6698 if kdu_replica_count
< min_instance_count
:
6700 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6701 "scaling-group-descriptor '{}'".format(
6702 instance_num
, scaling_group
6706 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6707 vca_scaling_info
.append(
6709 "osm_kdu_id": kdu_name
,
6710 "member-vnf-index": vnf_index
,
6712 "kdu_index": instance_num
- x
- 1,
6715 scaling_info
["kdu-delete"][kdu_name
].append(
6717 "member-vnf-index": vnf_index
,
6719 "k8s-cluster-type": k8s_cluster_type
,
6720 "resource-name": resource_name
,
6721 "scale": kdu_replica_count
,
6725 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6726 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6727 if scaling_info
["scaling_direction"] == "IN":
6728 for vdur
in reversed(db_vnfr
["vdur"]):
6729 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6730 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6731 scaling_info
["vdu"].append(
6733 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6734 "vdu_id": vdur
["vdu-id-ref"],
6738 for interface
in vdur
["interfaces"]:
6739 scaling_info
["vdu"][-1]["interface"].append(
6741 "name": interface
["name"],
6742 "ip_address": interface
["ip-address"],
6743 "mac_address": interface
.get("mac-address"),
6746 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6749 step
= "Executing pre-scale vnf-config-primitive"
6750 if scaling_descriptor
.get("scaling-config-action"):
6751 for scaling_config_action
in scaling_descriptor
[
6752 "scaling-config-action"
6755 scaling_config_action
.get("trigger") == "pre-scale-in"
6756 and scaling_type
== "SCALE_IN"
6758 scaling_config_action
.get("trigger") == "pre-scale-out"
6759 and scaling_type
== "SCALE_OUT"
6761 vnf_config_primitive
= scaling_config_action
[
6762 "vnf-config-primitive-name-ref"
6764 step
= db_nslcmop_update
[
6766 ] = "executing pre-scale scaling-config-action '{}'".format(
6767 vnf_config_primitive
6770 # look for primitive
6771 for config_primitive
in (
6772 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6773 ).get("config-primitive", ()):
6774 if config_primitive
["name"] == vnf_config_primitive
:
6778 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6779 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6780 "primitive".format(scaling_group
, vnf_config_primitive
)
6783 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6784 if db_vnfr
.get("additionalParamsForVnf"):
6785 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6787 scale_process
= "VCA"
6788 db_nsr_update
["config-status"] = "configuring pre-scaling"
6789 primitive_params
= self
._map
_primitive
_params
(
6790 config_primitive
, {}, vnfr_params
6793 # Pre-scale retry check: Check if this sub-operation has been executed before
6794 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6797 vnf_config_primitive
,
6801 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6802 # Skip sub-operation
6803 result
= "COMPLETED"
6804 result_detail
= "Done"
6807 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6808 vnf_config_primitive
, result
, result_detail
6812 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6813 # New sub-operation: Get index of this sub-operation
6815 len(db_nslcmop
.get("_admin", {}).get("operations"))
6820 + "vnf_config_primitive={} New sub-operation".format(
6821 vnf_config_primitive
6825 # retry: Get registered params for this existing sub-operation
6826 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6829 vnf_index
= op
.get("member_vnf_index")
6830 vnf_config_primitive
= op
.get("primitive")
6831 primitive_params
= op
.get("primitive_params")
6834 + "vnf_config_primitive={} Sub-operation retry".format(
6835 vnf_config_primitive
6838 # Execute the primitive, either with new (first-time) or registered (reintent) args
6839 ee_descriptor_id
= config_primitive
.get(
6840 "execution-environment-ref"
6842 primitive_name
= config_primitive
.get(
6843 "execution-environment-primitive", vnf_config_primitive
6845 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6846 nsr_deployed
["VCA"],
6847 member_vnf_index
=vnf_index
,
6849 vdu_count_index
=None,
6850 ee_descriptor_id
=ee_descriptor_id
,
6852 result
, result_detail
= await self
._ns
_execute
_primitive
(
6861 + "vnf_config_primitive={} Done with result {} {}".format(
6862 vnf_config_primitive
, result
, result_detail
6865 # Update operationState = COMPLETED | FAILED
6866 self
._update
_suboperation
_status
(
6867 db_nslcmop
, op_index
, result
, result_detail
6870 if result
== "FAILED":
6871 raise LcmException(result_detail
)
6872 db_nsr_update
["config-status"] = old_config_status
6873 scale_process
= None
6877 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6880 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6883 # SCALE-IN VCA - BEGIN
6884 if vca_scaling_info
:
6885 step
= db_nslcmop_update
[
6887 ] = "Deleting the execution environments"
6888 scale_process
= "VCA"
6889 for vca_info
in vca_scaling_info
:
6890 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6891 member_vnf_index
= str(vca_info
["member-vnf-index"])
6893 logging_text
+ "vdu info: {}".format(vca_info
)
6895 if vca_info
.get("osm_vdu_id"):
6896 vdu_id
= vca_info
["osm_vdu_id"]
6897 vdu_index
= int(vca_info
["vdu_index"])
6900 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6901 member_vnf_index
, vdu_id
, vdu_index
6903 stage
[2] = step
= "Scaling in VCA"
6904 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6905 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6906 config_update
= db_nsr
["configurationStatus"]
6907 for vca_index
, vca
in enumerate(vca_update
):
6909 (vca
or vca
.get("ee_id"))
6910 and vca
["member-vnf-index"] == member_vnf_index
6911 and vca
["vdu_count_index"] == vdu_index
6913 if vca
.get("vdu_id"):
6914 config_descriptor
= get_configuration(
6915 db_vnfd
, vca
.get("vdu_id")
6917 elif vca
.get("kdu_name"):
6918 config_descriptor
= get_configuration(
6919 db_vnfd
, vca
.get("kdu_name")
6922 config_descriptor
= get_configuration(
6923 db_vnfd
, db_vnfd
["id"]
6925 operation_params
= (
6926 db_nslcmop
.get("operationParams") or {}
6928 exec_terminate_primitives
= not operation_params
.get(
6929 "skip_terminate_primitives"
6930 ) and vca
.get("needed_terminate")
6931 task
= asyncio
.ensure_future(
6940 exec_primitives
=exec_terminate_primitives
,
6944 timeout
=self
.timeout_charm_delete
,
6947 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6950 del vca_update
[vca_index
]
6951 del config_update
[vca_index
]
6952 # wait for pending tasks of terminate primitives
6956 + "Waiting for tasks {}".format(
6957 list(tasks_dict_info
.keys())
6960 error_list
= await self
._wait
_for
_tasks
(
6964 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6969 tasks_dict_info
.clear()
6971 raise LcmException("; ".join(error_list
))
6973 db_vca_and_config_update
= {
6974 "_admin.deployed.VCA": vca_update
,
6975 "configurationStatus": config_update
,
6978 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6980 scale_process
= None
6981 # SCALE-IN VCA - END
6984 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6985 scale_process
= "RO"
6986 if self
.ro_config
.get("ng"):
6987 await self
._scale
_ng
_ro
(
6988 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6990 scaling_info
.pop("vdu-create", None)
6991 scaling_info
.pop("vdu-delete", None)
6993 scale_process
= None
6997 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6998 scale_process
= "KDU"
6999 await self
._scale
_kdu
(
7000 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7002 scaling_info
.pop("kdu-create", None)
7003 scaling_info
.pop("kdu-delete", None)
7005 scale_process
= None
7009 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7011 # SCALE-UP VCA - BEGIN
7012 if vca_scaling_info
:
7013 step
= db_nslcmop_update
[
7015 ] = "Creating new execution environments"
7016 scale_process
= "VCA"
7017 for vca_info
in vca_scaling_info
:
7018 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7019 member_vnf_index
= str(vca_info
["member-vnf-index"])
7021 logging_text
+ "vdu info: {}".format(vca_info
)
7023 vnfd_id
= db_vnfr
["vnfd-ref"]
7024 if vca_info
.get("osm_vdu_id"):
7025 vdu_index
= int(vca_info
["vdu_index"])
7026 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7027 if db_vnfr
.get("additionalParamsForVnf"):
7028 deploy_params
.update(
7030 db_vnfr
["additionalParamsForVnf"].copy()
7033 descriptor_config
= get_configuration(
7034 db_vnfd
, db_vnfd
["id"]
7036 if descriptor_config
:
7041 logging_text
=logging_text
7042 + "member_vnf_index={} ".format(member_vnf_index
),
7045 nslcmop_id
=nslcmop_id
,
7051 member_vnf_index
=member_vnf_index
,
7052 vdu_index
=vdu_index
,
7054 deploy_params
=deploy_params
,
7055 descriptor_config
=descriptor_config
,
7056 base_folder
=base_folder
,
7057 task_instantiation_info
=tasks_dict_info
,
7060 vdu_id
= vca_info
["osm_vdu_id"]
7061 vdur
= find_in_list(
7062 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7064 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7065 if vdur
.get("additionalParams"):
7066 deploy_params_vdu
= parse_yaml_strings(
7067 vdur
["additionalParams"]
7070 deploy_params_vdu
= deploy_params
7071 deploy_params_vdu
["OSM"] = get_osm_params(
7072 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7074 if descriptor_config
:
7079 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7080 member_vnf_index
, vdu_id
, vdu_index
7082 stage
[2] = step
= "Scaling out VCA"
7083 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7085 logging_text
=logging_text
7086 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7087 member_vnf_index
, vdu_id
, vdu_index
7091 nslcmop_id
=nslcmop_id
,
7097 member_vnf_index
=member_vnf_index
,
7098 vdu_index
=vdu_index
,
7100 deploy_params
=deploy_params_vdu
,
7101 descriptor_config
=descriptor_config
,
7102 base_folder
=base_folder
,
7103 task_instantiation_info
=tasks_dict_info
,
7106 # SCALE-UP VCA - END
7107 scale_process
= None
7110 # execute primitive service POST-SCALING
7111 step
= "Executing post-scale vnf-config-primitive"
7112 if scaling_descriptor
.get("scaling-config-action"):
7113 for scaling_config_action
in scaling_descriptor
[
7114 "scaling-config-action"
7117 scaling_config_action
.get("trigger") == "post-scale-in"
7118 and scaling_type
== "SCALE_IN"
7120 scaling_config_action
.get("trigger") == "post-scale-out"
7121 and scaling_type
== "SCALE_OUT"
7123 vnf_config_primitive
= scaling_config_action
[
7124 "vnf-config-primitive-name-ref"
7126 step
= db_nslcmop_update
[
7128 ] = "executing post-scale scaling-config-action '{}'".format(
7129 vnf_config_primitive
7132 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7133 if db_vnfr
.get("additionalParamsForVnf"):
7134 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7136 # look for primitive
7137 for config_primitive
in (
7138 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7139 ).get("config-primitive", ()):
7140 if config_primitive
["name"] == vnf_config_primitive
:
7144 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7145 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7146 "config-primitive".format(
7147 scaling_group
, vnf_config_primitive
7150 scale_process
= "VCA"
7151 db_nsr_update
["config-status"] = "configuring post-scaling"
7152 primitive_params
= self
._map
_primitive
_params
(
7153 config_primitive
, {}, vnfr_params
7156 # Post-scale retry check: Check if this sub-operation has been executed before
7157 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7160 vnf_config_primitive
,
7164 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7165 # Skip sub-operation
7166 result
= "COMPLETED"
7167 result_detail
= "Done"
7170 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7171 vnf_config_primitive
, result
, result_detail
7175 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7176 # New sub-operation: Get index of this sub-operation
7178 len(db_nslcmop
.get("_admin", {}).get("operations"))
7183 + "vnf_config_primitive={} New sub-operation".format(
7184 vnf_config_primitive
7188 # retry: Get registered params for this existing sub-operation
7189 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7192 vnf_index
= op
.get("member_vnf_index")
7193 vnf_config_primitive
= op
.get("primitive")
7194 primitive_params
= op
.get("primitive_params")
7197 + "vnf_config_primitive={} Sub-operation retry".format(
7198 vnf_config_primitive
7201 # Execute the primitive, either with new (first-time) or registered (reintent) args
7202 ee_descriptor_id
= config_primitive
.get(
7203 "execution-environment-ref"
7205 primitive_name
= config_primitive
.get(
7206 "execution-environment-primitive", vnf_config_primitive
7208 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7209 nsr_deployed
["VCA"],
7210 member_vnf_index
=vnf_index
,
7212 vdu_count_index
=None,
7213 ee_descriptor_id
=ee_descriptor_id
,
7215 result
, result_detail
= await self
._ns
_execute
_primitive
(
7224 + "vnf_config_primitive={} Done with result {} {}".format(
7225 vnf_config_primitive
, result
, result_detail
7228 # Update operationState = COMPLETED | FAILED
7229 self
._update
_suboperation
_status
(
7230 db_nslcmop
, op_index
, result
, result_detail
7233 if result
== "FAILED":
7234 raise LcmException(result_detail
)
7235 db_nsr_update
["config-status"] = old_config_status
7236 scale_process
= None
7241 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7242 db_nsr_update
["operational-status"] = (
7244 if old_operational_status
== "failed"
7245 else old_operational_status
7247 db_nsr_update
["config-status"] = old_config_status
7250 ROclient
.ROClientException
,
7255 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7257 except asyncio
.CancelledError
:
7259 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7261 exc
= "Operation was cancelled"
7262 except Exception as e
:
7263 exc
= traceback
.format_exc()
7264 self
.logger
.critical(
7265 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7269 self
._write
_ns
_status
(
7272 current_operation
="IDLE",
7273 current_operation_id
=None,
7276 stage
[1] = "Waiting for instantiate pending tasks."
7277 self
.logger
.debug(logging_text
+ stage
[1])
7278 exc
= await self
._wait
_for
_tasks
(
7281 self
.timeout_ns_deploy
,
7289 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7290 nslcmop_operation_state
= "FAILED"
7292 db_nsr_update
["operational-status"] = old_operational_status
7293 db_nsr_update
["config-status"] = old_config_status
7294 db_nsr_update
["detailed-status"] = ""
7296 if "VCA" in scale_process
:
7297 db_nsr_update
["config-status"] = "failed"
7298 if "RO" in scale_process
:
7299 db_nsr_update
["operational-status"] = "failed"
7302 ] = "FAILED scaling nslcmop={} {}: {}".format(
7303 nslcmop_id
, step
, exc
7306 error_description_nslcmop
= None
7307 nslcmop_operation_state
= "COMPLETED"
7308 db_nslcmop_update
["detailed-status"] = "Done"
7310 self
._write
_op
_status
(
7313 error_message
=error_description_nslcmop
,
7314 operation_state
=nslcmop_operation_state
,
7315 other_update
=db_nslcmop_update
,
7318 self
._write
_ns
_status
(
7321 current_operation
="IDLE",
7322 current_operation_id
=None,
7323 other_update
=db_nsr_update
,
7326 if nslcmop_operation_state
:
7330 "nslcmop_id": nslcmop_id
,
7331 "operationState": nslcmop_operation_state
,
7333 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7334 except Exception as e
:
7336 logging_text
+ "kafka_write notification Exception {}".format(e
)
7338 self
.logger
.debug(logging_text
+ "Exit")
7339 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7341 async def _scale_kdu(
7342 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7344 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7345 for kdu_name
in _scaling_info
:
7346 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7347 deployed_kdu
, index
= get_deployed_kdu(
7348 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7350 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7351 kdu_instance
= deployed_kdu
["kdu-instance"]
7352 kdu_model
= deployed_kdu
.get("kdu-model")
7353 scale
= int(kdu_scaling_info
["scale"])
7354 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7357 "collection": "nsrs",
7358 "filter": {"_id": nsr_id
},
7359 "path": "_admin.deployed.K8s.{}".format(index
),
7362 step
= "scaling application {}".format(
7363 kdu_scaling_info
["resource-name"]
7365 self
.logger
.debug(logging_text
+ step
)
7367 if kdu_scaling_info
["type"] == "delete":
7368 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7371 and kdu_config
.get("terminate-config-primitive")
7372 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7374 terminate_config_primitive_list
= kdu_config
.get(
7375 "terminate-config-primitive"
7377 terminate_config_primitive_list
.sort(
7378 key
=lambda val
: int(val
["seq"])
7382 terminate_config_primitive
7383 ) in terminate_config_primitive_list
:
7384 primitive_params_
= self
._map
_primitive
_params
(
7385 terminate_config_primitive
, {}, {}
7387 step
= "execute terminate config primitive"
7388 self
.logger
.debug(logging_text
+ step
)
7389 await asyncio
.wait_for(
7390 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7391 cluster_uuid
=cluster_uuid
,
7392 kdu_instance
=kdu_instance
,
7393 primitive_name
=terminate_config_primitive
["name"],
7394 params
=primitive_params_
,
7396 total_timeout
=self
.timeout_primitive
,
7399 timeout
=self
.timeout_primitive
7400 * self
.timeout_primitive_outer_factor
,
7403 await asyncio
.wait_for(
7404 self
.k8scluster_map
[k8s_cluster_type
].scale(
7405 kdu_instance
=kdu_instance
,
7407 resource_name
=kdu_scaling_info
["resource-name"],
7408 total_timeout
=self
.timeout_scale_on_error
,
7410 cluster_uuid
=cluster_uuid
,
7411 kdu_model
=kdu_model
,
7415 timeout
=self
.timeout_scale_on_error
7416 * self
.timeout_scale_on_error_outer_factor
,
7419 if kdu_scaling_info
["type"] == "create":
7420 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7423 and kdu_config
.get("initial-config-primitive")
7424 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7426 initial_config_primitive_list
= kdu_config
.get(
7427 "initial-config-primitive"
7429 initial_config_primitive_list
.sort(
7430 key
=lambda val
: int(val
["seq"])
7433 for initial_config_primitive
in initial_config_primitive_list
:
7434 primitive_params_
= self
._map
_primitive
_params
(
7435 initial_config_primitive
, {}, {}
7437 step
= "execute initial config primitive"
7438 self
.logger
.debug(logging_text
+ step
)
7439 await asyncio
.wait_for(
7440 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7441 cluster_uuid
=cluster_uuid
,
7442 kdu_instance
=kdu_instance
,
7443 primitive_name
=initial_config_primitive
["name"],
7444 params
=primitive_params_
,
7451 async def _scale_ng_ro(
7452 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7454 nsr_id
= db_nslcmop
["nsInstanceId"]
7455 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7458 # read from db: vnfd's for every vnf
7461 # for each vnf in ns, read vnfd
7462 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7463 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7464 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7465 # if we haven't this vnfd, read it from db
7466 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7468 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7469 db_vnfds
.append(vnfd
)
7470 n2vc_key
= self
.n2vc
.get_public_key()
7471 n2vc_key_list
= [n2vc_key
]
7474 vdu_scaling_info
.get("vdu-create"),
7475 vdu_scaling_info
.get("vdu-delete"),
7478 # db_vnfr has been updated, update db_vnfrs to use it
7479 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7480 await self
._instantiate
_ng
_ro
(
7490 start_deploy
=time(),
7491 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7493 if vdu_scaling_info
.get("vdu-delete"):
7495 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7498 async def extract_prometheus_scrape_jobs(
7499 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7501 # look if exist a file called 'prometheus*.j2' and
7502 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7506 for f
in artifact_content
7507 if f
.startswith("prometheus") and f
.endswith(".j2")
7513 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7517 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7518 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7520 vnfr_id
= vnfr_id
.replace("-", "")
7522 "JOB_NAME": vnfr_id
,
7523 "TARGET_IP": target_ip
,
7524 "EXPORTER_POD_IP": host_name
,
7525 "EXPORTER_POD_PORT": host_port
,
7527 job_list
= parse_job(job_data
, variables
)
7528 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7529 for job
in job_list
:
7531 not isinstance(job
.get("job_name"), str)
7532 or vnfr_id
not in job
["job_name"]
7534 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7535 job
["nsr_id"] = nsr_id
7536 job
["vnfr_id"] = vnfr_id
7539 async def rebuild_start_stop(
7540 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7542 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7543 self
.logger
.info(logging_text
+ "Enter")
7544 stage
= ["Preparing the environment", ""]
7545 # database nsrs record
7549 # in case of error, indicates what part of scale was failed to put nsr at error status
7550 start_deploy
= time()
7552 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7553 vim_account_id
= db_vnfr
.get("vim-account-id")
7554 vim_info_key
= "vim:" + vim_account_id
7555 vdu_id
= additional_param
["vdu_id"]
7556 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7557 vdur
= find_in_list(
7558 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7561 vdu_vim_name
= vdur
["name"]
7562 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7563 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7565 raise LcmException("Target vdu is not found")
7566 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7567 # wait for any previous tasks in process
7568 stage
[1] = "Waiting for previous operations to terminate"
7569 self
.logger
.info(stage
[1])
7570 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7572 stage
[1] = "Reading from database."
7573 self
.logger
.info(stage
[1])
7574 self
._write
_ns
_status
(
7577 current_operation
=operation_type
.upper(),
7578 current_operation_id
=nslcmop_id
,
7580 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7583 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7584 db_nsr_update
["operational-status"] = operation_type
7585 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7589 "vim_vm_id": vim_vm_id
,
7591 "vdu_index": additional_param
["count-index"],
7592 "vdu_id": vdur
["id"],
7593 "target_vim": target_vim
,
7594 "vim_account_id": vim_account_id
,
7597 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7598 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7599 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7600 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7601 self
.logger
.info("response from RO: {}".format(result_dict
))
7602 action_id
= result_dict
["action_id"]
7603 await self
._wait
_ng
_ro
(
7608 self
.timeout_operate
,
7610 "start_stop_rebuild",
7612 return "COMPLETED", "Done"
7613 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7614 self
.logger
.error("Exit Exception {}".format(e
))
7616 except asyncio
.CancelledError
:
7617 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7618 exc
= "Operation was cancelled"
7619 except Exception as e
:
7620 exc
= traceback
.format_exc()
7621 self
.logger
.critical(
7622 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7624 return "FAILED", "Error in operate VNF {}".format(exc
)
7626 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7628 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7630 :param: vim_account_id: VIM Account ID
7632 :return: (cloud_name, cloud_credential)
7634 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7635 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7637 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7639 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7641 :param: vim_account_id: VIM Account ID
7643 :return: (cloud_name, cloud_credential)
7645 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7646 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7648 async def migrate(self
, nsr_id
, nslcmop_id
):
7650 Migrate VNFs and VDUs instances in a NS
7652 :param: nsr_id: NS Instance ID
7653 :param: nslcmop_id: nslcmop ID of migrate
7656 # Try to lock HA task here
7657 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7658 if not task_is_locked_by_me
:
7660 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7661 self
.logger
.debug(logging_text
+ "Enter")
7662 # get all needed from database
7664 db_nslcmop_update
= {}
7665 nslcmop_operation_state
= None
7669 # in case of error, indicates what part of scale was failed to put nsr at error status
7670 start_deploy
= time()
7673 # wait for any previous tasks in process
7674 step
= "Waiting for previous operations to terminate"
7675 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7677 self
._write
_ns
_status
(
7680 current_operation
="MIGRATING",
7681 current_operation_id
=nslcmop_id
,
7683 step
= "Getting nslcmop from database"
7685 step
+ " after having waited for previous tasks to be completed"
7687 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7688 migrate_params
= db_nslcmop
.get("operationParams")
7691 target
.update(migrate_params
)
7692 desc
= await self
.RO
.migrate(nsr_id
, target
)
7693 self
.logger
.debug("RO return > {}".format(desc
))
7694 action_id
= desc
["action_id"]
7695 await self
._wait
_ng
_ro
(
7700 self
.timeout_migrate
,
7701 operation
="migrate",
7703 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7704 self
.logger
.error("Exit Exception {}".format(e
))
7706 except asyncio
.CancelledError
:
7707 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7708 exc
= "Operation was cancelled"
7709 except Exception as e
:
7710 exc
= traceback
.format_exc()
7711 self
.logger
.critical(
7712 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7715 self
._write
_ns
_status
(
7718 current_operation
="IDLE",
7719 current_operation_id
=None,
7722 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7723 nslcmop_operation_state
= "FAILED"
7725 nslcmop_operation_state
= "COMPLETED"
7726 db_nslcmop_update
["detailed-status"] = "Done"
7727 db_nsr_update
["detailed-status"] = "Done"
7729 self
._write
_op
_status
(
7733 operation_state
=nslcmop_operation_state
,
7734 other_update
=db_nslcmop_update
,
7736 if nslcmop_operation_state
:
7740 "nslcmop_id": nslcmop_id
,
7741 "operationState": nslcmop_operation_state
,
7743 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7744 except Exception as e
:
7746 logging_text
+ "kafka_write notification Exception {}".format(e
)
7748 self
.logger
.debug(logging_text
+ "Exit")
7749 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7751 async def heal(self
, nsr_id
, nslcmop_id
):
7755 :param nsr_id: ns instance to heal
7756 :param nslcmop_id: operation to run
7760 # Try to lock HA task here
7761 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7762 if not task_is_locked_by_me
:
7765 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7766 stage
= ["", "", ""]
7767 tasks_dict_info
= {}
7768 # ^ stage, step, VIM progress
7769 self
.logger
.debug(logging_text
+ "Enter")
7770 # get all needed from database
7772 db_nslcmop_update
= {}
7774 db_vnfrs
= {} # vnf's info indexed by _id
7776 old_operational_status
= ""
7777 old_config_status
= ""
7780 # wait for any previous tasks in process
7781 step
= "Waiting for previous operations to terminate"
7782 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7783 self
._write
_ns
_status
(
7786 current_operation
="HEALING",
7787 current_operation_id
=nslcmop_id
,
7790 step
= "Getting nslcmop from database"
7792 step
+ " after having waited for previous tasks to be completed"
7794 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7796 step
= "Getting nsr from database"
7797 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7798 old_operational_status
= db_nsr
["operational-status"]
7799 old_config_status
= db_nsr
["config-status"]
7802 "_admin.deployed.RO.operational-status": "healing",
7804 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7806 step
= "Sending heal order to VIM"
7808 logging_text
=logging_text
,
7810 db_nslcmop
=db_nslcmop
,
7815 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7816 self
.logger
.debug(logging_text
+ stage
[1])
7817 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7818 self
.fs
.sync(db_nsr
["nsd-id"])
7820 # read from db: vnfr's of this ns
7821 step
= "Getting vnfrs from db"
7822 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7823 for vnfr
in db_vnfrs_list
:
7824 db_vnfrs
[vnfr
["_id"]] = vnfr
7825 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7827 # Check for each target VNF
7828 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7829 for target_vnf
in target_list
:
7830 # Find this VNF in the list from DB
7831 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7833 db_vnfr
= db_vnfrs
[vnfr_id
]
7834 vnfd_id
= db_vnfr
.get("vnfd-id")
7835 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7836 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7837 base_folder
= vnfd
["_admin"]["storage"]
7842 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7843 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7845 # Check each target VDU and deploy N2VC
7846 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7849 if not target_vdu_list
:
7850 # Codigo nuevo para crear diccionario
7851 target_vdu_list
= []
7852 for existing_vdu
in db_vnfr
.get("vdur"):
7853 vdu_name
= existing_vdu
.get("vdu-name", None)
7854 vdu_index
= existing_vdu
.get("count-index", 0)
7855 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7858 vdu_to_be_healed
= {
7860 "count-index": vdu_index
,
7861 "run-day1": vdu_run_day1
,
7863 target_vdu_list
.append(vdu_to_be_healed
)
7864 for target_vdu
in target_vdu_list
:
7865 deploy_params_vdu
= target_vdu
7866 # Set run-day1 vnf level value if not vdu level value exists
7867 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7870 deploy_params_vdu
["run-day1"] = target_vnf
[
7873 vdu_name
= target_vdu
.get("vdu-id", None)
7874 # TODO: Get vdu_id from vdud.
7876 # For multi instance VDU count-index is mandatory
7877 # For single session VDU count-indes is 0
7878 vdu_index
= target_vdu
.get("count-index", 0)
7880 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7881 stage
[1] = "Deploying Execution Environments."
7882 self
.logger
.debug(logging_text
+ stage
[1])
7884 # VNF Level charm. Normal case when proxy charms.
7885 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7886 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7887 if descriptor_config
:
7888 # Continue if healed machine is management machine
7889 vnf_ip_address
= db_vnfr
.get("ip-address")
7890 target_instance
= None
7891 for instance
in db_vnfr
.get("vdur", None):
7893 instance
["vdu-name"] == vdu_name
7894 and instance
["count-index"] == vdu_index
7896 target_instance
= instance
7898 if vnf_ip_address
== target_instance
.get("ip-address"):
7900 logging_text
=logging_text
7901 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7902 member_vnf_index
, vdu_name
, vdu_index
7906 nslcmop_id
=nslcmop_id
,
7912 member_vnf_index
=member_vnf_index
,
7915 deploy_params
=deploy_params_vdu
,
7916 descriptor_config
=descriptor_config
,
7917 base_folder
=base_folder
,
7918 task_instantiation_info
=tasks_dict_info
,
7922 # VDU Level charm. Normal case with native charms.
7923 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7924 if descriptor_config
:
7926 logging_text
=logging_text
7927 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7928 member_vnf_index
, vdu_name
, vdu_index
7932 nslcmop_id
=nslcmop_id
,
7938 member_vnf_index
=member_vnf_index
,
7939 vdu_index
=vdu_index
,
7941 deploy_params
=deploy_params_vdu
,
7942 descriptor_config
=descriptor_config
,
7943 base_folder
=base_folder
,
7944 task_instantiation_info
=tasks_dict_info
,
7949 ROclient
.ROClientException
,
7954 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7956 except asyncio
.CancelledError
:
7958 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7960 exc
= "Operation was cancelled"
7961 except Exception as e
:
7962 exc
= traceback
.format_exc()
7963 self
.logger
.critical(
7964 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7969 stage
[1] = "Waiting for healing pending tasks."
7970 self
.logger
.debug(logging_text
+ stage
[1])
7971 exc
= await self
._wait
_for
_tasks
(
7974 self
.timeout_ns_deploy
,
7982 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7983 nslcmop_operation_state
= "FAILED"
7985 db_nsr_update
["operational-status"] = old_operational_status
7986 db_nsr_update
["config-status"] = old_config_status
7989 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7990 for task
, task_name
in tasks_dict_info
.items():
7991 if not task
.done() or task
.cancelled() or task
.exception():
7992 if task_name
.startswith(self
.task_name_deploy_vca
):
7993 # A N2VC task is pending
7994 db_nsr_update
["config-status"] = "failed"
7996 # RO task is pending
7997 db_nsr_update
["operational-status"] = "failed"
7999 error_description_nslcmop
= None
8000 nslcmop_operation_state
= "COMPLETED"
8001 db_nslcmop_update
["detailed-status"] = "Done"
8002 db_nsr_update
["detailed-status"] = "Done"
8003 db_nsr_update
["operational-status"] = "running"
8004 db_nsr_update
["config-status"] = "configured"
8006 self
._write
_op
_status
(
8009 error_message
=error_description_nslcmop
,
8010 operation_state
=nslcmop_operation_state
,
8011 other_update
=db_nslcmop_update
,
8014 self
._write
_ns
_status
(
8017 current_operation
="IDLE",
8018 current_operation_id
=None,
8019 other_update
=db_nsr_update
,
8022 if nslcmop_operation_state
:
8026 "nslcmop_id": nslcmop_id
,
8027 "operationState": nslcmop_operation_state
,
8029 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8030 except Exception as e
:
8032 logging_text
+ "kafka_write notification Exception {}".format(e
)
8034 self
.logger
.debug(logging_text
+ "Exit")
8035 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8046 :param logging_text: preffix text to use at logging
8047 :param nsr_id: nsr identity
8048 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8049 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8050 :return: None or exception
8053 def get_vim_account(vim_account_id
):
8055 if vim_account_id
in db_vims
:
8056 return db_vims
[vim_account_id
]
8057 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8058 db_vims
[vim_account_id
] = db_vim
8063 ns_params
= db_nslcmop
.get("operationParams")
8064 if ns_params
and ns_params
.get("timeout_ns_heal"):
8065 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8067 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
8071 nslcmop_id
= db_nslcmop
["_id"]
8073 "action_id": nslcmop_id
,
8075 self
.logger
.warning(
8076 "db_nslcmop={} and timeout_ns_heal={}".format(
8077 db_nslcmop
, timeout_ns_heal
8080 target
.update(db_nslcmop
.get("operationParams", {}))
8082 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8083 desc
= await self
.RO
.recreate(nsr_id
, target
)
8084 self
.logger
.debug("RO return > {}".format(desc
))
8085 action_id
= desc
["action_id"]
8086 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8087 await self
._wait
_ng
_ro
(
8094 operation
="healing",
8099 "_admin.deployed.RO.operational-status": "running",
8100 "detailed-status": " ".join(stage
),
8102 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8103 self
._write
_op
_status
(nslcmop_id
, stage
)
8105 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8108 except Exception as e
:
8109 stage
[2] = "ERROR healing at VIM"
8110 # self.set_vnfr_at_error(db_vnfrs, str(e))
8112 "Error healing at VIM {}".format(e
),
8113 exc_info
=not isinstance(
8116 ROclient
.ROClientException
,
8142 task_instantiation_info
,
8145 # launch instantiate_N2VC in a asyncio task and register task object
8146 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8147 # if not found, create one entry and update database
8148 # fill db_nsr._admin.deployed.VCA.<index>
8151 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8155 get_charm_name
= False
8156 if "execution-environment-list" in descriptor_config
:
8157 ee_list
= descriptor_config
.get("execution-environment-list", [])
8158 elif "juju" in descriptor_config
:
8159 ee_list
= [descriptor_config
] # ns charms
8160 if "execution-environment-list" not in descriptor_config
:
8161 # charm name is only required for ns charms
8162 get_charm_name
= True
8163 else: # other types as script are not supported
8166 for ee_item
in ee_list
:
8169 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8170 ee_item
.get("juju"), ee_item
.get("helm-chart")
8173 ee_descriptor_id
= ee_item
.get("id")
8174 if ee_item
.get("juju"):
8175 vca_name
= ee_item
["juju"].get("charm")
8177 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8180 if ee_item
["juju"].get("charm") is not None
8183 if ee_item
["juju"].get("cloud") == "k8s":
8184 vca_type
= "k8s_proxy_charm"
8185 elif ee_item
["juju"].get("proxy") is False:
8186 vca_type
= "native_charm"
8187 elif ee_item
.get("helm-chart"):
8188 vca_name
= ee_item
["helm-chart"]
8189 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8192 vca_type
= "helm-v3"
8195 logging_text
+ "skipping non juju neither charm configuration"
8200 for vca_index
, vca_deployed
in enumerate(
8201 db_nsr
["_admin"]["deployed"]["VCA"]
8203 if not vca_deployed
:
8206 vca_deployed
.get("member-vnf-index") == member_vnf_index
8207 and vca_deployed
.get("vdu_id") == vdu_id
8208 and vca_deployed
.get("kdu_name") == kdu_name
8209 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8210 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8214 # not found, create one.
8216 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8219 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8221 target
+= "/kdu/{}".format(kdu_name
)
8223 "target_element": target
,
8224 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8225 "member-vnf-index": member_vnf_index
,
8227 "kdu_name": kdu_name
,
8228 "vdu_count_index": vdu_index
,
8229 "operational-status": "init", # TODO revise
8230 "detailed-status": "", # TODO revise
8231 "step": "initial-deploy", # TODO revise
8233 "vdu_name": vdu_name
,
8235 "ee_descriptor_id": ee_descriptor_id
,
8236 "charm_name": charm_name
,
8240 # create VCA and configurationStatus in db
8242 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8243 "configurationStatus.{}".format(vca_index
): dict(),
8245 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8247 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8249 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8250 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8251 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8254 task_n2vc
= asyncio
.ensure_future(
8256 logging_text
=logging_text
,
8257 vca_index
=vca_index
,
8263 vdu_index
=vdu_index
,
8264 deploy_params
=deploy_params
,
8265 config_descriptor
=descriptor_config
,
8266 base_folder
=base_folder
,
8267 nslcmop_id
=nslcmop_id
,
8271 ee_config_descriptor
=ee_item
,
8274 self
.lcm_tasks
.register(
8278 "instantiate_N2VC-{}".format(vca_index
),
8281 task_instantiation_info
[
8283 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8284 member_vnf_index
or "", vdu_id
or ""
8287 async def heal_N2VC(
8304 ee_config_descriptor
,
8306 nsr_id
= db_nsr
["_id"]
8307 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8308 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8309 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8310 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8312 "collection": "nsrs",
8313 "filter": {"_id": nsr_id
},
8314 "path": db_update_entry
,
8320 element_under_configuration
= nsr_id
8324 vnfr_id
= db_vnfr
["_id"]
8325 osm_config
["osm"]["vnf_id"] = vnfr_id
8327 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8329 if vca_type
== "native_charm":
8332 index_number
= vdu_index
or 0
8335 element_type
= "VNF"
8336 element_under_configuration
= vnfr_id
8337 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8339 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8340 element_type
= "VDU"
8341 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8342 osm_config
["osm"]["vdu_id"] = vdu_id
8344 namespace
+= ".{}".format(kdu_name
)
8345 element_type
= "KDU"
8346 element_under_configuration
= kdu_name
8347 osm_config
["osm"]["kdu_name"] = kdu_name
8350 if base_folder
["pkg-dir"]:
8351 artifact_path
= "{}/{}/{}/{}".format(
8352 base_folder
["folder"],
8353 base_folder
["pkg-dir"],
8356 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8361 artifact_path
= "{}/Scripts/{}/{}/".format(
8362 base_folder
["folder"],
8365 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8370 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8372 # get initial_config_primitive_list that applies to this element
8373 initial_config_primitive_list
= config_descriptor
.get(
8374 "initial-config-primitive"
8378 "Initial config primitive list > {}".format(
8379 initial_config_primitive_list
8383 # add config if not present for NS charm
8384 ee_descriptor_id
= ee_config_descriptor
.get("id")
8385 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8386 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8387 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8391 "Initial config primitive list #2 > {}".format(
8392 initial_config_primitive_list
8395 # n2vc_redesign STEP 3.1
8396 # find old ee_id if exists
8397 ee_id
= vca_deployed
.get("ee_id")
8399 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8400 # create or register execution environment in VCA. Only for native charms when healing
8401 if vca_type
== "native_charm":
8402 step
= "Waiting to VM being up and getting IP address"
8403 self
.logger
.debug(logging_text
+ step
)
8404 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8413 credentials
= {"hostname": rw_mgmt_ip
}
8415 username
= deep_get(
8416 config_descriptor
, ("config-access", "ssh-access", "default-user")
8418 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8419 # merged. Meanwhile let's get username from initial-config-primitive
8420 if not username
and initial_config_primitive_list
:
8421 for config_primitive
in initial_config_primitive_list
:
8422 for param
in config_primitive
.get("parameter", ()):
8423 if param
["name"] == "ssh-username":
8424 username
= param
["value"]
8428 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8429 "'config-access.ssh-access.default-user'"
8431 credentials
["username"] = username
8433 # n2vc_redesign STEP 3.2
8434 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8435 self
._write
_configuration
_status
(
8437 vca_index
=vca_index
,
8438 status
="REGISTERING",
8439 element_under_configuration
=element_under_configuration
,
8440 element_type
=element_type
,
8443 step
= "register execution environment {}".format(credentials
)
8444 self
.logger
.debug(logging_text
+ step
)
8445 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8446 credentials
=credentials
,
8447 namespace
=namespace
,
8452 # update ee_id en db
8454 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8456 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8458 # for compatibility with MON/POL modules, the need model and application name at database
8459 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8460 # Not sure if this need to be done when healing
8462 ee_id_parts = ee_id.split(".")
8463 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8464 if len(ee_id_parts) >= 2:
8465 model_name = ee_id_parts[0]
8466 application_name = ee_id_parts[1]
8467 db_nsr_update[db_update_entry + "model"] = model_name
8468 db_nsr_update[db_update_entry + "application"] = application_name
8471 # n2vc_redesign STEP 3.3
8472 # Install configuration software. Only for native charms.
8473 step
= "Install configuration Software"
8475 self
._write
_configuration
_status
(
8477 vca_index
=vca_index
,
8478 status
="INSTALLING SW",
8479 element_under_configuration
=element_under_configuration
,
8480 element_type
=element_type
,
8481 # other_update=db_nsr_update,
8485 # TODO check if already done
8486 self
.logger
.debug(logging_text
+ step
)
8488 if vca_type
== "native_charm":
8489 config_primitive
= next(
8490 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8493 if config_primitive
:
8494 config
= self
._map
_primitive
_params
(
8495 config_primitive
, {}, deploy_params
8497 await self
.vca_map
[vca_type
].install_configuration_sw(
8499 artifact_path
=artifact_path
,
8507 # write in db flag of configuration_sw already installed
8509 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8512 # Not sure if this need to be done when healing
8514 # add relations for this VCA (wait for other peers related with this VCA)
8515 await self._add_vca_relations(
8516 logging_text=logging_text,
8519 vca_index=vca_index,
8523 # if SSH access is required, then get execution environment SSH public
8524 # if native charm we have waited already to VM be UP
8525 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8528 # self.logger.debug("get ssh key block")
8530 config_descriptor
, ("config-access", "ssh-access", "required")
8532 # self.logger.debug("ssh key needed")
8533 # Needed to inject a ssh key
8536 ("config-access", "ssh-access", "default-user"),
8538 step
= "Install configuration Software, getting public ssh key"
8539 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8540 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8543 step
= "Insert public key into VM user={} ssh_key={}".format(
8547 # self.logger.debug("no need to get ssh key")
8548 step
= "Waiting to VM being up and getting IP address"
8549 self
.logger
.debug(logging_text
+ step
)
8551 # n2vc_redesign STEP 5.1
8552 # wait for RO (ip-address) Insert pub_key into VM
8553 # IMPORTANT: We need do wait for RO to complete healing operation.
8554 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8557 rw_mgmt_ip
= await self
.wait_kdu_up(
8558 logging_text
, nsr_id
, vnfr_id
, kdu_name
8561 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8571 rw_mgmt_ip
= None # This is for a NS configuration
8573 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8575 # store rw_mgmt_ip in deploy params for later replacement
8576 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8579 # get run-day1 operation parameter
8580 runDay1
= deploy_params
.get("run-day1", False)
8582 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8585 # n2vc_redesign STEP 6 Execute initial config primitive
8586 step
= "execute initial config primitive"
8588 # wait for dependent primitives execution (NS -> VNF -> VDU)
8589 if initial_config_primitive_list
:
8590 await self
._wait
_dependent
_n
2vc
(
8591 nsr_id
, vca_deployed_list
, vca_index
8594 # stage, in function of element type: vdu, kdu, vnf or ns
8595 my_vca
= vca_deployed_list
[vca_index
]
8596 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8598 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8599 elif my_vca
.get("member-vnf-index"):
8601 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8604 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8606 self
._write
_configuration
_status
(
8607 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8610 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8612 check_if_terminated_needed
= True
8613 for initial_config_primitive
in initial_config_primitive_list
:
8614 # adding information on the vca_deployed if it is a NS execution environment
8615 if not vca_deployed
["member-vnf-index"]:
8616 deploy_params
["ns_config_info"] = json
.dumps(
8617 self
._get
_ns
_config
_info
(nsr_id
)
8619 # TODO check if already done
8620 primitive_params_
= self
._map
_primitive
_params
(
8621 initial_config_primitive
, {}, deploy_params
8624 step
= "execute primitive '{}' params '{}'".format(
8625 initial_config_primitive
["name"], primitive_params_
8627 self
.logger
.debug(logging_text
+ step
)
8628 await self
.vca_map
[vca_type
].exec_primitive(
8630 primitive_name
=initial_config_primitive
["name"],
8631 params_dict
=primitive_params_
,
8636 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8637 if check_if_terminated_needed
:
8638 if config_descriptor
.get("terminate-config-primitive"):
8642 {db_update_entry
+ "needed_terminate": True},
8644 check_if_terminated_needed
= False
8646 # TODO register in database that primitive is done
8648 # STEP 7 Configure metrics
8649 # Not sure if this need to be done when healing
8651 if vca_type == "helm" or vca_type == "helm-v3":
8652 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8654 artifact_path=artifact_path,
8655 ee_config_descriptor=ee_config_descriptor,
8658 target_ip=rw_mgmt_ip,
8664 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8667 for job in prometheus_jobs:
8670 {"job_name": job["job_name"]},
8673 fail_on_empty=False,
8677 step
= "instantiated at VCA"
8678 self
.logger
.debug(logging_text
+ step
)
8680 self
._write
_configuration
_status
(
8681 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8684 except Exception as e
: # TODO not use Exception but N2VC exception
8685 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8687 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8690 "Exception while {} : {}".format(step
, e
), exc_info
=True
8692 self
._write
_configuration
_status
(
8693 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8695 raise LcmException("{} {}".format(step
, e
)) from e
8697 async def _wait_heal_ro(
8703 while time() <= start_time
+ timeout
:
8704 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8705 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8706 "operational-status"
8708 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8709 if operational_status_ro
!= "healing":
8711 await asyncio
.sleep(15, loop
=self
.loop
)
8712 else: # timeout_ns_deploy
8713 raise NgRoException("Timeout waiting ns to deploy")
8715 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8717 Vertical Scale the VDUs in a NS
8719 :param: nsr_id: NS Instance ID
8720 :param: nslcmop_id: nslcmop ID of migrate
8723 # Try to lock HA task here
8724 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8725 if not task_is_locked_by_me
:
8727 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8728 self
.logger
.debug(logging_text
+ "Enter")
8729 # get all needed from database
8731 db_nslcmop_update
= {}
8732 nslcmop_operation_state
= None
8736 # in case of error, indicates what part of scale was failed to put nsr at error status
8737 start_deploy
= time()
8740 # wait for any previous tasks in process
8741 step
= "Waiting for previous operations to terminate"
8742 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8744 self
._write
_ns
_status
(
8747 current_operation
="VerticalScale",
8748 current_operation_id
=nslcmop_id
,
8750 step
= "Getting nslcmop from database"
8752 step
+ " after having waited for previous tasks to be completed"
8754 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8755 operationParams
= db_nslcmop
.get("operationParams")
8757 target
.update(operationParams
)
8758 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8759 self
.logger
.debug("RO return > {}".format(desc
))
8760 action_id
= desc
["action_id"]
8761 await self
._wait
_ng
_ro
(
8766 self
.timeout_verticalscale
,
8767 operation
="verticalscale",
8769 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8770 self
.logger
.error("Exit Exception {}".format(e
))
8772 except asyncio
.CancelledError
:
8773 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8774 exc
= "Operation was cancelled"
8775 except Exception as e
:
8776 exc
= traceback
.format_exc()
8777 self
.logger
.critical(
8778 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8781 self
._write
_ns
_status
(
8784 current_operation
="IDLE",
8785 current_operation_id
=None,
8788 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8789 nslcmop_operation_state
= "FAILED"
8791 nslcmop_operation_state
= "COMPLETED"
8792 db_nslcmop_update
["detailed-status"] = "Done"
8793 db_nsr_update
["detailed-status"] = "Done"
8795 self
._write
_op
_status
(
8799 operation_state
=nslcmop_operation_state
,
8800 other_update
=db_nslcmop_update
,
8802 if nslcmop_operation_state
:
8806 "nslcmop_id": nslcmop_id
,
8807 "operationState": nslcmop_operation_state
,
8809 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8810 except Exception as e
:
8812 logging_text
+ "kafka_write notification Exception {}".format(e
)
8814 self
.logger
.debug(logging_text
+ "Exit")
8815 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")