1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
64 from osm_lcm
.data_utils
.nsd
import (
65 get_ns_configuration_relation_list
,
69 from osm_lcm
.data_utils
.vnfd
import (
75 get_ee_sorted_initial_config_primitive_list
,
76 get_ee_sorted_terminate_config_primitive_list
,
78 get_virtual_link_profiles
,
83 get_number_of_instances
,
85 get_kdu_resource_profile
,
86 find_software_version
,
88 from osm_lcm
.data_utils
.list_utils
import find_in_list
89 from osm_lcm
.data_utils
.vnfr
import (
93 get_volumes_from_instantiation_params
,
95 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
96 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
97 from n2vc
.definitions
import RelationEndpoint
98 from n2vc
.k8s_helm_conn
import K8sHelmConnector
99 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
100 from n2vc
.k8s_juju_conn
import K8sJujuConnector
102 from osm_common
.dbbase
import DbException
103 from osm_common
.fsbase
import FsException
105 from osm_lcm
.data_utils
.database
.database
import Database
106 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from osm_lcm
.data_utils
.wim
import (
109 get_target_wim_attrs
,
110 select_feasible_wim_account
,
113 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
114 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
116 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
117 from osm_lcm
.osm_config
import OsmConfigBuilder
118 from osm_lcm
.prometheus
import parse_job
120 from copy
import copy
, deepcopy
121 from time
import time
122 from uuid
import uuid4
124 from random
import randint
126 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
129 class NsLcm(LcmBase
):
130 timeout_scale_on_error
= (
132 ) # Time for charm from first time at blocked,error status to mark as failed
133 timeout_scale_on_error_outer_factor
= 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
134 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
135 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
136 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
137 timeout_charm_delete
= 10 * 60
138 timeout_primitive
= 30 * 60 # Timeout for primitive execution
139 timeout_primitive_outer_factor
= 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
140 timeout_ns_update
= 30 * 60 # timeout for ns update
141 timeout_progress_primitive
= (
143 ) # timeout for some progress in a primitive execution
144 timeout_migrate
= 1800 # default global timeout for migrating vnfs
145 timeout_operate
= 1800 # default global timeout for migrating vnfs
146 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
147 SUBOPERATION_STATUS_NOT_FOUND
= -1
148 SUBOPERATION_STATUS_NEW
= -2
149 SUBOPERATION_STATUS_SKIP
= -3
150 task_name_deploy_vca
= "Deploying VCA"
152 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
154 Init, Connect to database, filesystem storage, and messaging
155 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
158 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
160 self
.db
= Database().instance
.db
161 self
.fs
= Filesystem().instance
.fs
163 self
.lcm_tasks
= lcm_tasks
164 self
.timeout
= config
["timeout"]
165 self
.ro_config
= config
["ro_config"]
166 self
.ng_ro
= config
["ro_config"].get("ng")
167 self
.vca_config
= config
["VCA"].copy()
169 # create N2VC connector
170 self
.n2vc
= N2VCJujuConnector(
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
178 self
.conn_helm_ee
= LCMHelmConn(
181 vca_config
=self
.vca_config
,
182 on_update_db
=self
._on
_update
_n
2vc
_db
,
185 self
.k8sclusterhelm2
= K8sHelmConnector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helmpath"),
194 self
.k8sclusterhelm3
= K8sHelm3Connector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 helm_command
=self
.vca_config
.get("helm3path"),
203 self
.k8sclusterjuju
= K8sJujuConnector(
204 kubectl_command
=self
.vca_config
.get("kubectlpath"),
205 juju_command
=self
.vca_config
.get("jujupath"),
208 on_update_db
=self
._on
_update
_k
8s
_db
,
213 self
.k8scluster_map
= {
214 "helm-chart": self
.k8sclusterhelm2
,
215 "helm-chart-v3": self
.k8sclusterhelm3
,
216 "chart": self
.k8sclusterhelm3
,
217 "juju-bundle": self
.k8sclusterjuju
,
218 "juju": self
.k8sclusterjuju
,
222 "lxc_proxy_charm": self
.n2vc
,
223 "native_charm": self
.n2vc
,
224 "k8s_proxy_charm": self
.n2vc
,
225 "helm": self
.conn_helm_ee
,
226 "helm-v3": self
.conn_helm_ee
,
230 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
232 self
.op_status_map
= {
233 "instantiation": self
.RO
.status
,
234 "termination": self
.RO
.status
,
235 "migrate": self
.RO
.status
,
236 "healing": self
.RO
.recreate_status
,
237 "verticalscale": self
.RO
.status
,
238 "start_stop_rebuild": self
.RO
.status
,
242 def increment_ip_mac(ip_mac
, vm_index
=1):
243 if not isinstance(ip_mac
, str):
246 # try with ipv4 look for last dot
247 i
= ip_mac
.rfind(".")
250 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
251 # try with ipv6 or mac look for last colon. Operate in hex
252 i
= ip_mac
.rfind(":")
255 # format in hex, len can be 2 for mac or 4 for ipv6
256 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
257 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
263 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
265 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
268 # TODO filter RO descriptor fields...
272 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
273 db_dict
["deploymentStatus"] = ro_descriptor
274 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
276 except Exception as e
:
278 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
281 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
283 # remove last dot from path (if exists)
284 if path
.endswith("."):
287 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
288 # .format(table, filter, path, updated_data))
291 nsr_id
= filter.get("_id")
293 # read ns record from database
294 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
295 current_ns_status
= nsr
.get("nsState")
297 # get vca status for NS
298 status_dict
= await self
.n2vc
.get_status(
299 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
304 db_dict
["vcaStatus"] = status_dict
306 # update configurationStatus for this VCA
308 vca_index
= int(path
[path
.rfind(".") + 1 :])
311 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
313 vca_status
= vca_list
[vca_index
].get("status")
315 configuration_status_list
= nsr
.get("configurationStatus")
316 config_status
= configuration_status_list
[vca_index
].get("status")
318 if config_status
== "BROKEN" and vca_status
!= "failed":
319 db_dict
["configurationStatus"][vca_index
] = "READY"
320 elif config_status
!= "BROKEN" and vca_status
== "failed":
321 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
322 except Exception as e
:
323 # not update configurationStatus
324 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
326 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
327 # if nsState = 'DEGRADED' check if all is OK
329 if current_ns_status
in ("READY", "DEGRADED"):
330 error_description
= ""
332 if status_dict
.get("machines"):
333 for machine_id
in status_dict
.get("machines"):
334 machine
= status_dict
.get("machines").get(machine_id
)
335 # check machine agent-status
336 if machine
.get("agent-status"):
337 s
= machine
.get("agent-status").get("status")
340 error_description
+= (
341 "machine {} agent-status={} ; ".format(
345 # check machine instance status
346 if machine
.get("instance-status"):
347 s
= machine
.get("instance-status").get("status")
350 error_description
+= (
351 "machine {} instance-status={} ; ".format(
356 if status_dict
.get("applications"):
357 for app_id
in status_dict
.get("applications"):
358 app
= status_dict
.get("applications").get(app_id
)
359 # check application status
360 if app
.get("status"):
361 s
= app
.get("status").get("status")
364 error_description
+= (
365 "application {} status={} ; ".format(app_id
, s
)
368 if error_description
:
369 db_dict
["errorDescription"] = error_description
370 if current_ns_status
== "READY" and is_degraded
:
371 db_dict
["nsState"] = "DEGRADED"
372 if current_ns_status
== "DEGRADED" and not is_degraded
:
373 db_dict
["nsState"] = "READY"
376 self
.update_db_2("nsrs", nsr_id
, db_dict
)
378 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
380 except Exception as e
:
381 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
383 async def _on_update_k8s_db(
384 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
387 Updating vca status in NSR record
388 :param cluster_uuid: UUID of a k8s cluster
389 :param kdu_instance: The unique name of the KDU instance
390 :param filter: To get nsr_id
391 :cluster_type: The cluster type (juju, k8s)
395 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
396 # .format(cluster_uuid, kdu_instance, filter))
398 nsr_id
= filter.get("_id")
400 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
401 cluster_uuid
=cluster_uuid
,
402 kdu_instance
=kdu_instance
,
404 complete_status
=True,
410 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
413 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
417 self
.update_db_2("nsrs", nsr_id
, db_dict
)
418 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
420 except Exception as e
:
421 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
424 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
427 undefined
=StrictUndefined
,
428 autoescape
=select_autoescape(default_for_string
=True, default
=True),
430 template
= env
.from_string(cloud_init_text
)
431 return template
.render(additional_params
or {})
432 except UndefinedError
as e
:
434 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
435 "file, must be provided in the instantiation parameters inside the "
436 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
438 except (TemplateError
, TemplateNotFound
) as e
:
440 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
445 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
446 cloud_init_content
= cloud_init_file
= None
448 if vdu
.get("cloud-init-file"):
449 base_folder
= vnfd
["_admin"]["storage"]
450 if base_folder
["pkg-dir"]:
451 cloud_init_file
= "{}/{}/cloud_init/{}".format(
452 base_folder
["folder"],
453 base_folder
["pkg-dir"],
454 vdu
["cloud-init-file"],
457 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
458 base_folder
["folder"],
459 vdu
["cloud-init-file"],
461 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
462 cloud_init_content
= ci_file
.read()
463 elif vdu
.get("cloud-init"):
464 cloud_init_content
= vdu
["cloud-init"]
466 return cloud_init_content
467 except FsException
as e
:
469 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
470 vnfd
["id"], vdu
["id"], cloud_init_file
, e
474 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
476 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
478 additional_params
= vdur
.get("additionalParams")
479 return parse_yaml_strings(additional_params
)
481 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
483 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
484 :param vnfd: input vnfd
485 :param new_id: overrides vnf id if provided
486 :param additionalParams: Instantiation params for VNFs provided
487 :param nsrId: Id of the NSR
488 :return: copy of vnfd
490 vnfd_RO
= deepcopy(vnfd
)
491 # remove unused by RO configuration, monitoring, scaling and internal keys
492 vnfd_RO
.pop("_id", None)
493 vnfd_RO
.pop("_admin", None)
494 vnfd_RO
.pop("monitoring-param", None)
495 vnfd_RO
.pop("scaling-group-descriptor", None)
496 vnfd_RO
.pop("kdu", None)
497 vnfd_RO
.pop("k8s-cluster", None)
499 vnfd_RO
["id"] = new_id
501 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
502 for vdu
in get_iterable(vnfd_RO
, "vdu"):
503 vdu
.pop("cloud-init-file", None)
504 vdu
.pop("cloud-init", None)
508 def ip_profile_2_RO(ip_profile
):
509 RO_ip_profile
= deepcopy(ip_profile
)
510 if "dns-server" in RO_ip_profile
:
511 if isinstance(RO_ip_profile
["dns-server"], list):
512 RO_ip_profile
["dns-address"] = []
513 for ds
in RO_ip_profile
.pop("dns-server"):
514 RO_ip_profile
["dns-address"].append(ds
["address"])
516 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
517 if RO_ip_profile
.get("ip-version") == "ipv4":
518 RO_ip_profile
["ip-version"] = "IPv4"
519 if RO_ip_profile
.get("ip-version") == "ipv6":
520 RO_ip_profile
["ip-version"] = "IPv6"
521 if "dhcp-params" in RO_ip_profile
:
522 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
525 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
526 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
527 if db_vim
["_admin"]["operationalState"] != "ENABLED":
529 "VIM={} is not available. operationalState={}".format(
530 vim_account
, db_vim
["_admin"]["operationalState"]
533 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
536 def get_ro_wim_id_for_wim_account(self
, wim_account
):
537 if isinstance(wim_account
, str):
538 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
539 if db_wim
["_admin"]["operationalState"] != "ENABLED":
541 "WIM={} is not available. operationalState={}".format(
542 wim_account
, db_wim
["_admin"]["operationalState"]
545 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
550 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
552 db_vdu_push_list
= []
554 db_update
= {"_admin.modified": time()}
556 for vdu_id
, vdu_count
in vdu_create
.items():
560 for vdur
in reversed(db_vnfr
["vdur"])
561 if vdur
["vdu-id-ref"] == vdu_id
566 # Read the template saved in the db:
568 "No vdur in the database. Using the vdur-template to scale"
570 vdur_template
= db_vnfr
.get("vdur-template")
571 if not vdur_template
:
573 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
577 vdur
= vdur_template
[0]
578 # Delete a template from the database after using it
581 {"_id": db_vnfr
["_id"]},
583 pull
={"vdur-template": {"_id": vdur
["_id"]}},
585 for count
in range(vdu_count
):
586 vdur_copy
= deepcopy(vdur
)
587 vdur_copy
["status"] = "BUILD"
588 vdur_copy
["status-detailed"] = None
589 vdur_copy
["ip-address"] = None
590 vdur_copy
["_id"] = str(uuid4())
591 vdur_copy
["count-index"] += count
+ 1
592 vdur_copy
["id"] = "{}-{}".format(
593 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
595 vdur_copy
.pop("vim_info", None)
596 for iface
in vdur_copy
["interfaces"]:
597 if iface
.get("fixed-ip"):
598 iface
["ip-address"] = self
.increment_ip_mac(
599 iface
["ip-address"], count
+ 1
602 iface
.pop("ip-address", None)
603 if iface
.get("fixed-mac"):
604 iface
["mac-address"] = self
.increment_ip_mac(
605 iface
["mac-address"], count
+ 1
608 iface
.pop("mac-address", None)
612 ) # only first vdu can be managment of vnf
613 db_vdu_push_list
.append(vdur_copy
)
614 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
616 if len(db_vnfr
["vdur"]) == 1:
617 # The scale will move to 0 instances
619 "Scaling to 0 !, creating the template with the last vdur"
621 template_vdur
= [db_vnfr
["vdur"][0]]
622 for vdu_id
, vdu_count
in vdu_delete
.items():
624 indexes_to_delete
= [
626 for iv
in enumerate(db_vnfr
["vdur"])
627 if iv
[1]["vdu-id-ref"] == vdu_id
631 "vdur.{}.status".format(i
): "DELETING"
632 for i
in indexes_to_delete
[-vdu_count
:]
636 # it must be deleted one by one because common.db does not allow otherwise
639 for v
in reversed(db_vnfr
["vdur"])
640 if v
["vdu-id-ref"] == vdu_id
642 for vdu
in vdus_to_delete
[:vdu_count
]:
645 {"_id": db_vnfr
["_id"]},
647 pull
={"vdur": {"_id": vdu
["_id"]}},
651 db_push
["vdur"] = db_vdu_push_list
653 db_push
["vdur-template"] = template_vdur
656 db_vnfr
["vdur-template"] = template_vdur
657 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
658 # modify passed dictionary db_vnfr
659 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
660 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
662 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
664 Updates database nsr with the RO info for the created vld
665 :param ns_update_nsr: dictionary to be filled with the updated info
666 :param db_nsr: content of db_nsr. This is also modified
667 :param nsr_desc_RO: nsr descriptor from RO
668 :return: Nothing, LcmException is raised on errors
671 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
672 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
673 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
675 vld
["vim-id"] = net_RO
.get("vim_net_id")
676 vld
["name"] = net_RO
.get("vim_name")
677 vld
["status"] = net_RO
.get("status")
678 vld
["status-detailed"] = net_RO
.get("error_msg")
679 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
683 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
686 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
688 for db_vnfr
in db_vnfrs
.values():
689 vnfr_update
= {"status": "ERROR"}
690 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
691 if "status" not in vdur
:
692 vdur
["status"] = "ERROR"
693 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
695 vdur
["status-detailed"] = str(error_text
)
697 "vdur.{}.status-detailed".format(vdu_index
)
699 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
700 except DbException
as e
:
701 self
.logger
.error("Cannot update vnf. {}".format(e
))
703 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
705 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
706 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
707 :param nsr_desc_RO: nsr descriptor from RO
708 :return: Nothing, LcmException is raised on errors
710 for vnf_index
, db_vnfr
in db_vnfrs
.items():
711 for vnf_RO
in nsr_desc_RO
["vnfs"]:
712 if vnf_RO
["member_vnf_index"] != vnf_index
:
715 if vnf_RO
.get("ip_address"):
716 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
719 elif not db_vnfr
.get("ip-address"):
720 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
721 raise LcmExceptionNoMgmtIP(
722 "ns member_vnf_index '{}' has no IP address".format(
727 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
728 vdur_RO_count_index
= 0
729 if vdur
.get("pdu-type"):
731 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
732 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
734 if vdur
["count-index"] != vdur_RO_count_index
:
735 vdur_RO_count_index
+= 1
737 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
738 if vdur_RO
.get("ip_address"):
739 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
741 vdur
["ip-address"] = None
742 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
743 vdur
["name"] = vdur_RO
.get("vim_name")
744 vdur
["status"] = vdur_RO
.get("status")
745 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
746 for ifacer
in get_iterable(vdur
, "interfaces"):
747 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
748 if ifacer
["name"] == interface_RO
.get("internal_name"):
749 ifacer
["ip-address"] = interface_RO
.get(
752 ifacer
["mac-address"] = interface_RO
.get(
758 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
759 "from VIM info".format(
760 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
763 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
767 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
769 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
773 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
774 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
775 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
777 vld
["vim-id"] = net_RO
.get("vim_net_id")
778 vld
["name"] = net_RO
.get("vim_name")
779 vld
["status"] = net_RO
.get("status")
780 vld
["status-detailed"] = net_RO
.get("error_msg")
781 vnfr_update
["vld.{}".format(vld_index
)] = vld
785 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
790 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
795 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
800 def _get_ns_config_info(self
, nsr_id
):
802 Generates a mapping between vnf,vdu elements and the N2VC id
803 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
804 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
805 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
806 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
808 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
809 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
811 ns_config_info
= {"osm-config-mapping": mapping
}
812 for vca
in vca_deployed_list
:
813 if not vca
["member-vnf-index"]:
815 if not vca
["vdu_id"]:
816 mapping
[vca
["member-vnf-index"]] = vca
["application"]
820 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
822 ] = vca
["application"]
823 return ns_config_info
825 async def _instantiate_ng_ro(
842 def get_vim_account(vim_account_id
):
844 if vim_account_id
in db_vims
:
845 return db_vims
[vim_account_id
]
846 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
847 db_vims
[vim_account_id
] = db_vim
850 # modify target_vld info with instantiation parameters
851 def parse_vld_instantiation_params(
852 target_vim
, target_vld
, vld_params
, target_sdn
854 if vld_params
.get("ip-profile"):
855 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
858 if vld_params
.get("provider-network"):
859 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
862 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
863 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
867 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
868 # if wim_account_id is specified in vld_params, validate if it is feasible.
869 wim_account_id
, db_wim
= select_feasible_wim_account(
870 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
874 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
875 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
876 # update vld_params with correct WIM account Id
877 vld_params
["wimAccountId"] = wim_account_id
879 target_wim
= "wim:{}".format(wim_account_id
)
880 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
881 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
882 if len(sdn_ports
) > 0:
883 target_vld
["vim_info"][target_wim
] = target_wim_attrs
884 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
887 "Target VLD with WIM data: {:s}".format(str(target_vld
))
890 for param
in ("vim-network-name", "vim-network-id"):
891 if vld_params
.get(param
):
892 if isinstance(vld_params
[param
], dict):
893 for vim
, vim_net
in vld_params
[param
].items():
894 other_target_vim
= "vim:" + vim
896 target_vld
["vim_info"],
897 (other_target_vim
, param
.replace("-", "_")),
900 else: # isinstance str
901 target_vld
["vim_info"][target_vim
][
902 param
.replace("-", "_")
903 ] = vld_params
[param
]
904 if vld_params
.get("common_id"):
905 target_vld
["common_id"] = vld_params
.get("common_id")
907 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
908 def update_ns_vld_target(target
, ns_params
):
909 for vnf_params
in ns_params
.get("vnf", ()):
910 if vnf_params
.get("vimAccountId"):
914 for vnfr
in db_vnfrs
.values()
915 if vnf_params
["member-vnf-index"]
916 == vnfr
["member-vnf-index-ref"]
920 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
923 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
924 target_vld
= find_in_list(
925 get_iterable(vdur
, "interfaces"),
926 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
929 vld_params
= find_in_list(
930 get_iterable(ns_params
, "vld"),
931 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
935 if vnf_params
.get("vimAccountId") not in a_vld
.get(
938 target_vim_network_list
= [
939 v
for _
, v
in a_vld
.get("vim_info").items()
941 target_vim_network_name
= next(
943 item
.get("vim_network_name", "")
944 for item
in target_vim_network_list
949 target
["ns"]["vld"][a_index
].get("vim_info").update(
951 "vim:{}".format(vnf_params
["vimAccountId"]): {
952 "vim_network_name": target_vim_network_name
,
958 for param
in ("vim-network-name", "vim-network-id"):
959 if vld_params
.get(param
) and isinstance(
960 vld_params
[param
], dict
962 for vim
, vim_net
in vld_params
[
965 other_target_vim
= "vim:" + vim
967 target
["ns"]["vld"][a_index
].get(
972 param
.replace("-", "_"),
977 nslcmop_id
= db_nslcmop
["_id"]
979 "name": db_nsr
["name"],
982 "image": deepcopy(db_nsr
["image"]),
983 "flavor": deepcopy(db_nsr
["flavor"]),
984 "action_id": nslcmop_id
,
985 "cloud_init_content": {},
987 for image
in target
["image"]:
988 image
["vim_info"] = {}
989 for flavor
in target
["flavor"]:
990 flavor
["vim_info"] = {}
991 if db_nsr
.get("affinity-or-anti-affinity-group"):
992 target
["affinity-or-anti-affinity-group"] = deepcopy(
993 db_nsr
["affinity-or-anti-affinity-group"]
995 for affinity_or_anti_affinity_group
in target
[
996 "affinity-or-anti-affinity-group"
998 affinity_or_anti_affinity_group
["vim_info"] = {}
1000 if db_nslcmop
.get("lcmOperationType") != "instantiate":
1001 # get parameters of instantiation:
1002 db_nslcmop_instantiate
= self
.db
.get_list(
1005 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1006 "lcmOperationType": "instantiate",
1009 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1011 ns_params
= db_nslcmop
.get("operationParams")
1012 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1013 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1016 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1017 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1020 "name": vld
["name"],
1021 "mgmt-network": vld
.get("mgmt-network", False),
1022 "type": vld
.get("type"),
1025 "vim_network_name": vld
.get("vim-network-name"),
1026 "vim_account_id": ns_params
["vimAccountId"],
1030 # check if this network needs SDN assist
1031 if vld
.get("pci-interfaces"):
1032 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1033 sdnc_id
= db_vim
["config"].get("sdn-controller")
1035 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1036 target_sdn
= "sdn:{}".format(sdnc_id
)
1037 target_vld
["vim_info"][target_sdn
] = {
1039 "target_vim": target_vim
,
1041 "type": vld
.get("type"),
1044 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1045 for nsd_vnf_profile
in nsd_vnf_profiles
:
1046 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1047 if cp
["virtual-link-profile-id"] == vld
["id"]:
1049 "member_vnf:{}.{}".format(
1050 cp
["constituent-cpd-id"][0][
1051 "constituent-base-element-id"
1053 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1055 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1057 # check at nsd descriptor, if there is an ip-profile
1059 nsd_vlp
= find_in_list(
1060 get_virtual_link_profiles(nsd
),
1061 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1066 and nsd_vlp
.get("virtual-link-protocol-data")
1067 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1069 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1072 ip_profile_dest_data
= {}
1073 if "ip-version" in ip_profile_source_data
:
1074 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1077 if "cidr" in ip_profile_source_data
:
1078 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1081 if "gateway-ip" in ip_profile_source_data
:
1082 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1085 if "dhcp-enabled" in ip_profile_source_data
:
1086 ip_profile_dest_data
["dhcp-params"] = {
1087 "enabled": ip_profile_source_data
["dhcp-enabled"]
1089 vld_params
["ip-profile"] = ip_profile_dest_data
1091 # update vld_params with instantiation params
1092 vld_instantiation_params
= find_in_list(
1093 get_iterable(ns_params
, "vld"),
1094 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1096 if vld_instantiation_params
:
1097 vld_params
.update(vld_instantiation_params
)
1098 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1099 target
["ns"]["vld"].append(target_vld
)
1100 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1101 update_ns_vld_target(target
, ns_params
)
1103 for vnfr
in db_vnfrs
.values():
1104 vnfd
= find_in_list(
1105 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1107 vnf_params
= find_in_list(
1108 get_iterable(ns_params
, "vnf"),
1109 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1111 target_vnf
= deepcopy(vnfr
)
1112 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1113 for vld
in target_vnf
.get("vld", ()):
1114 # check if connected to a ns.vld, to fill target'
1115 vnf_cp
= find_in_list(
1116 vnfd
.get("int-virtual-link-desc", ()),
1117 lambda cpd
: cpd
.get("id") == vld
["id"],
1120 ns_cp
= "member_vnf:{}.{}".format(
1121 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1123 if cp2target
.get(ns_cp
):
1124 vld
["target"] = cp2target
[ns_cp
]
1127 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1129 # check if this network needs SDN assist
1131 if vld
.get("pci-interfaces"):
1132 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1133 sdnc_id
= db_vim
["config"].get("sdn-controller")
1135 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1136 target_sdn
= "sdn:{}".format(sdnc_id
)
1137 vld
["vim_info"][target_sdn
] = {
1139 "target_vim": target_vim
,
1141 "type": vld
.get("type"),
1144 # check at vnfd descriptor, if there is an ip-profile
1146 vnfd_vlp
= find_in_list(
1147 get_virtual_link_profiles(vnfd
),
1148 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1152 and vnfd_vlp
.get("virtual-link-protocol-data")
1153 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1155 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1158 ip_profile_dest_data
= {}
1159 if "ip-version" in ip_profile_source_data
:
1160 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1163 if "cidr" in ip_profile_source_data
:
1164 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1167 if "gateway-ip" in ip_profile_source_data
:
1168 ip_profile_dest_data
[
1170 ] = ip_profile_source_data
["gateway-ip"]
1171 if "dhcp-enabled" in ip_profile_source_data
:
1172 ip_profile_dest_data
["dhcp-params"] = {
1173 "enabled": ip_profile_source_data
["dhcp-enabled"]
1176 vld_params
["ip-profile"] = ip_profile_dest_data
1177 # update vld_params with instantiation params
1179 vld_instantiation_params
= find_in_list(
1180 get_iterable(vnf_params
, "internal-vld"),
1181 lambda i_vld
: i_vld
["name"] == vld
["id"],
1183 if vld_instantiation_params
:
1184 vld_params
.update(vld_instantiation_params
)
1185 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1188 for vdur
in target_vnf
.get("vdur", ()):
1189 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1190 continue # This vdu must not be created
1191 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1193 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1196 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1197 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1200 and vdu_configuration
.get("config-access")
1201 and vdu_configuration
.get("config-access").get("ssh-access")
1203 vdur
["ssh-keys"] = ssh_keys_all
1204 vdur
["ssh-access-required"] = vdu_configuration
[
1206 ]["ssh-access"]["required"]
1209 and vnf_configuration
.get("config-access")
1210 and vnf_configuration
.get("config-access").get("ssh-access")
1211 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1213 vdur
["ssh-keys"] = ssh_keys_all
1214 vdur
["ssh-access-required"] = vnf_configuration
[
1216 ]["ssh-access"]["required"]
1217 elif ssh_keys_instantiation
and find_in_list(
1218 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1220 vdur
["ssh-keys"] = ssh_keys_instantiation
1222 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1224 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1226 if vdud
.get("cloud-init-file"):
1227 vdur
["cloud-init"] = "{}:file:{}".format(
1228 vnfd
["_id"], vdud
.get("cloud-init-file")
1230 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1231 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1232 base_folder
= vnfd
["_admin"]["storage"]
1233 if base_folder
["pkg-dir"]:
1234 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1235 base_folder
["folder"],
1236 base_folder
["pkg-dir"],
1237 vdud
.get("cloud-init-file"),
1240 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1241 base_folder
["folder"],
1242 vdud
.get("cloud-init-file"),
1244 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1245 target
["cloud_init_content"][
1248 elif vdud
.get("cloud-init"):
1249 vdur
["cloud-init"] = "{}:vdu:{}".format(
1250 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1252 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1253 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1256 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1257 deploy_params_vdu
= self
._format
_additional
_params
(
1258 vdur
.get("additionalParams") or {}
1260 deploy_params_vdu
["OSM"] = get_osm_params(
1261 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1263 vdur
["additionalParams"] = deploy_params_vdu
1266 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1267 if target_vim
not in ns_flavor
["vim_info"]:
1268 ns_flavor
["vim_info"][target_vim
] = {}
1271 # in case alternative images are provided we must check if they should be applied
1272 # for the vim_type, modify the vim_type taking into account
1273 ns_image_id
= int(vdur
["ns-image-id"])
1274 if vdur
.get("alt-image-ids"):
1275 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1276 vim_type
= db_vim
["vim_type"]
1277 for alt_image_id
in vdur
.get("alt-image-ids"):
1278 ns_alt_image
= target
["image"][int(alt_image_id
)]
1279 if vim_type
== ns_alt_image
.get("vim-type"):
1280 # must use alternative image
1282 "use alternative image id: {}".format(alt_image_id
)
1284 ns_image_id
= alt_image_id
1285 vdur
["ns-image-id"] = ns_image_id
1287 ns_image
= target
["image"][int(ns_image_id
)]
1288 if target_vim
not in ns_image
["vim_info"]:
1289 ns_image
["vim_info"][target_vim
] = {}
1292 if vdur
.get("affinity-or-anti-affinity-group-id"):
1293 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1294 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1295 if target_vim
not in ns_ags
["vim_info"]:
1296 ns_ags
["vim_info"][target_vim
] = {}
1298 vdur
["vim_info"] = {target_vim
: {}}
1299 # instantiation parameters
1301 vdu_instantiation_params
= find_in_list(
1302 get_iterable(vnf_params
, "vdu"),
1303 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1305 if vdu_instantiation_params
:
1306 # Parse the vdu_volumes from the instantiation params
1307 vdu_volumes
= get_volumes_from_instantiation_params(
1308 vdu_instantiation_params
, vdud
1310 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1311 vdur_list
.append(vdur
)
1312 target_vnf
["vdur"] = vdur_list
1313 target
["vnf"].append(target_vnf
)
1315 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1316 desc
= await self
.RO
.deploy(nsr_id
, target
)
1317 self
.logger
.debug("RO return > {}".format(desc
))
1318 action_id
= desc
["action_id"]
1319 await self
._wait
_ng
_ro
(
1326 operation
="instantiation",
1331 "_admin.deployed.RO.operational-status": "running",
1332 "detailed-status": " ".join(stage
),
1334 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1335 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1336 self
._write
_op
_status
(nslcmop_id
, stage
)
1338 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1342 async def _wait_ng_ro(
1352 detailed_status_old
= None
1354 start_time
= start_time
or time()
1355 while time() <= start_time
+ timeout
:
1356 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1357 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1358 if desc_status
["status"] == "FAILED":
1359 raise NgRoException(desc_status
["details"])
1360 elif desc_status
["status"] == "BUILD":
1362 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1363 elif desc_status
["status"] == "DONE":
1365 stage
[2] = "Deployed at VIM"
1368 assert False, "ROclient.check_ns_status returns unknown {}".format(
1369 desc_status
["status"]
1371 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1372 detailed_status_old
= stage
[2]
1373 db_nsr_update
["detailed-status"] = " ".join(stage
)
1374 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1375 self
._write
_op
_status
(nslcmop_id
, stage
)
1376 await asyncio
.sleep(15, loop
=self
.loop
)
1377 else: # timeout_ns_deploy
1378 raise NgRoException("Timeout waiting ns to deploy")
1380 async def _terminate_ng_ro(
1381 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1386 start_deploy
= time()
1393 "action_id": nslcmop_id
,
1395 desc
= await self
.RO
.deploy(nsr_id
, target
)
1396 action_id
= desc
["action_id"]
1397 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1401 + "ns terminate action at RO. action_id={}".format(action_id
)
1405 delete_timeout
= 20 * 60 # 20 minutes
1406 await self
._wait
_ng
_ro
(
1413 operation
="termination",
1416 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1417 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1419 await self
.RO
.delete(nsr_id
)
1420 except Exception as e
:
1421 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1422 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1423 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1424 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1426 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1428 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1429 failed_detail
.append("delete conflict: {}".format(e
))
1432 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1435 failed_detail
.append("delete error: {}".format(e
))
1438 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1442 stage
[2] = "Error deleting from VIM"
1444 stage
[2] = "Deleted from VIM"
1445 db_nsr_update
["detailed-status"] = " ".join(stage
)
1446 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1447 self
._write
_op
_status
(nslcmop_id
, stage
)
1450 raise LcmException("; ".join(failed_detail
))
1453 async def instantiate_RO(
1467 :param logging_text: preffix text to use at logging
1468 :param nsr_id: nsr identity
1469 :param nsd: database content of ns descriptor
1470 :param db_nsr: database content of ns record
1471 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1473 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1474 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1475 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1476 :return: None or exception
1479 start_deploy
= time()
1480 ns_params
= db_nslcmop
.get("operationParams")
1481 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1482 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1484 timeout_ns_deploy
= self
.timeout
.get(
1485 "ns_deploy", self
.timeout_ns_deploy
1488 # Check for and optionally request placement optimization. Database will be updated if placement activated
1489 stage
[2] = "Waiting for Placement."
1490 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1491 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1492 for vnfr
in db_vnfrs
.values():
1493 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1496 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1498 return await self
._instantiate
_ng
_ro
(
1511 except Exception as e
:
1512 stage
[2] = "ERROR deploying at VIM"
1513 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1515 "Error deploying at VIM {}".format(e
),
1516 exc_info
=not isinstance(
1519 ROclient
.ROClientException
,
1528 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1530 Wait for kdu to be up, get ip address
1531 :param logging_text: prefix use for logging
1535 :return: IP address, K8s services
1538 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1541 while nb_tries
< 360:
1542 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1546 for x
in get_iterable(db_vnfr
, "kdur")
1547 if x
.get("kdu-name") == kdu_name
1553 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1555 if kdur
.get("status"):
1556 if kdur
["status"] in ("READY", "ENABLED"):
1557 return kdur
.get("ip-address"), kdur
.get("services")
1560 "target KDU={} is in error state".format(kdu_name
)
1563 await asyncio
.sleep(10, loop
=self
.loop
)
1565 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1567 async def wait_vm_up_insert_key_ro(
1568 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1571 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1572 :param logging_text: prefix use for logging
1577 :param pub_key: public ssh key to inject, None to skip
1578 :param user: user to apply the public ssh key
1582 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1586 target_vdu_id
= None
1592 if ro_retries
>= 360: # 1 hour
1594 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1597 await asyncio
.sleep(10, loop
=self
.loop
)
1600 if not target_vdu_id
:
1601 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1603 if not vdu_id
: # for the VNF case
1604 if db_vnfr
.get("status") == "ERROR":
1606 "Cannot inject ssh-key because target VNF is in error state"
1608 ip_address
= db_vnfr
.get("ip-address")
1614 for x
in get_iterable(db_vnfr
, "vdur")
1615 if x
.get("ip-address") == ip_address
1623 for x
in get_iterable(db_vnfr
, "vdur")
1624 if x
.get("vdu-id-ref") == vdu_id
1625 and x
.get("count-index") == vdu_index
1631 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1632 ): # If only one, this should be the target vdu
1633 vdur
= db_vnfr
["vdur"][0]
1636 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1637 vnfr_id
, vdu_id
, vdu_index
1640 # New generation RO stores information at "vim_info"
1643 if vdur
.get("vim_info"):
1645 t
for t
in vdur
["vim_info"]
1646 ) # there should be only one key
1647 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1649 vdur
.get("pdu-type")
1650 or vdur
.get("status") == "ACTIVE"
1651 or ng_ro_status
== "ACTIVE"
1653 ip_address
= vdur
.get("ip-address")
1656 target_vdu_id
= vdur
["vdu-id-ref"]
1657 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1659 "Cannot inject ssh-key because target VM is in error state"
1662 if not target_vdu_id
:
1665 # inject public key into machine
1666 if pub_key
and user
:
1667 self
.logger
.debug(logging_text
+ "Inserting RO key")
1668 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1669 if vdur
.get("pdu-type"):
1670 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1673 ro_vm_id
= "{}-{}".format(
1674 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1675 ) # TODO add vdu_index
1679 "action": "inject_ssh_key",
1683 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1685 desc
= await self
.RO
.deploy(nsr_id
, target
)
1686 action_id
= desc
["action_id"]
1687 await self
._wait
_ng
_ro
(
1688 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1692 # wait until NS is deployed at RO
1694 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1695 ro_nsr_id
= deep_get(
1696 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1700 result_dict
= await self
.RO
.create_action(
1702 item_id_name
=ro_nsr_id
,
1704 "add_public_key": pub_key
,
1709 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1710 if not result_dict
or not isinstance(result_dict
, dict):
1712 "Unknown response from RO when injecting key"
1714 for result
in result_dict
.values():
1715 if result
.get("vim_result") == 200:
1718 raise ROclient
.ROClientException(
1719 "error injecting key: {}".format(
1720 result
.get("description")
1724 except NgRoException
as e
:
1726 "Reaching max tries injecting key. Error: {}".format(e
)
1728 except ROclient
.ROClientException
as e
:
1732 + "error injecting key: {}. Retrying until {} seconds".format(
1739 "Reaching max tries injecting key. Error: {}".format(e
)
1746 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1748 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1750 my_vca
= vca_deployed_list
[vca_index
]
1751 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1752 # vdu or kdu: no dependencies
1756 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1757 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1758 configuration_status_list
= db_nsr
["configurationStatus"]
1759 for index
, vca_deployed
in enumerate(configuration_status_list
):
1760 if index
== vca_index
:
1763 if not my_vca
.get("member-vnf-index") or (
1764 vca_deployed
.get("member-vnf-index")
1765 == my_vca
.get("member-vnf-index")
1767 internal_status
= configuration_status_list
[index
].get("status")
1768 if internal_status
== "READY":
1770 elif internal_status
== "BROKEN":
1772 "Configuration aborted because dependent charm/s has failed"
1777 # no dependencies, return
1779 await asyncio
.sleep(10)
1782 raise LcmException("Configuration aborted because dependent charm/s timeout")
1784 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1787 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1789 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1790 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1793 async def instantiate_N2VC(
1810 ee_config_descriptor
,
1812 nsr_id
= db_nsr
["_id"]
1813 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1814 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1815 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1816 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1818 "collection": "nsrs",
1819 "filter": {"_id": nsr_id
},
1820 "path": db_update_entry
,
1826 element_under_configuration
= nsr_id
1830 vnfr_id
= db_vnfr
["_id"]
1831 osm_config
["osm"]["vnf_id"] = vnfr_id
1833 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1835 if vca_type
== "native_charm":
1838 index_number
= vdu_index
or 0
1841 element_type
= "VNF"
1842 element_under_configuration
= vnfr_id
1843 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1845 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1846 element_type
= "VDU"
1847 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1848 osm_config
["osm"]["vdu_id"] = vdu_id
1850 namespace
+= ".{}".format(kdu_name
)
1851 element_type
= "KDU"
1852 element_under_configuration
= kdu_name
1853 osm_config
["osm"]["kdu_name"] = kdu_name
1856 if base_folder
["pkg-dir"]:
1857 artifact_path
= "{}/{}/{}/{}".format(
1858 base_folder
["folder"],
1859 base_folder
["pkg-dir"],
1862 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1867 artifact_path
= "{}/Scripts/{}/{}/".format(
1868 base_folder
["folder"],
1871 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1876 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1878 # get initial_config_primitive_list that applies to this element
1879 initial_config_primitive_list
= config_descriptor
.get(
1880 "initial-config-primitive"
1884 "Initial config primitive list > {}".format(
1885 initial_config_primitive_list
1889 # add config if not present for NS charm
1890 ee_descriptor_id
= ee_config_descriptor
.get("id")
1891 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1892 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1893 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1897 "Initial config primitive list #2 > {}".format(
1898 initial_config_primitive_list
1901 # n2vc_redesign STEP 3.1
1902 # find old ee_id if exists
1903 ee_id
= vca_deployed
.get("ee_id")
1905 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1906 # create or register execution environment in VCA
1907 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1909 self
._write
_configuration
_status
(
1911 vca_index
=vca_index
,
1913 element_under_configuration
=element_under_configuration
,
1914 element_type
=element_type
,
1917 step
= "create execution environment"
1918 self
.logger
.debug(logging_text
+ step
)
1922 if vca_type
== "k8s_proxy_charm":
1923 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1924 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1925 namespace
=namespace
,
1926 artifact_path
=artifact_path
,
1930 elif vca_type
== "helm" or vca_type
== "helm-v3":
1931 ee_id
, credentials
= await self
.vca_map
[
1933 ].create_execution_environment(
1934 namespace
=namespace
,
1938 artifact_path
=artifact_path
,
1939 chart_model
=vca_name
,
1943 ee_id
, credentials
= await self
.vca_map
[
1945 ].create_execution_environment(
1946 namespace
=namespace
,
1952 elif vca_type
== "native_charm":
1953 step
= "Waiting to VM being up and getting IP address"
1954 self
.logger
.debug(logging_text
+ step
)
1955 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1964 credentials
= {"hostname": rw_mgmt_ip
}
1966 username
= deep_get(
1967 config_descriptor
, ("config-access", "ssh-access", "default-user")
1969 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1970 # merged. Meanwhile let's get username from initial-config-primitive
1971 if not username
and initial_config_primitive_list
:
1972 for config_primitive
in initial_config_primitive_list
:
1973 for param
in config_primitive
.get("parameter", ()):
1974 if param
["name"] == "ssh-username":
1975 username
= param
["value"]
1979 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1980 "'config-access.ssh-access.default-user'"
1982 credentials
["username"] = username
1983 # n2vc_redesign STEP 3.2
1985 self
._write
_configuration
_status
(
1987 vca_index
=vca_index
,
1988 status
="REGISTERING",
1989 element_under_configuration
=element_under_configuration
,
1990 element_type
=element_type
,
1993 step
= "register execution environment {}".format(credentials
)
1994 self
.logger
.debug(logging_text
+ step
)
1995 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1996 credentials
=credentials
,
1997 namespace
=namespace
,
2002 # for compatibility with MON/POL modules, the need model and application name at database
2003 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
2004 ee_id_parts
= ee_id
.split(".")
2005 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
2006 if len(ee_id_parts
) >= 2:
2007 model_name
= ee_id_parts
[0]
2008 application_name
= ee_id_parts
[1]
2009 db_nsr_update
[db_update_entry
+ "model"] = model_name
2010 db_nsr_update
[db_update_entry
+ "application"] = application_name
2012 # n2vc_redesign STEP 3.3
2013 step
= "Install configuration Software"
2015 self
._write
_configuration
_status
(
2017 vca_index
=vca_index
,
2018 status
="INSTALLING SW",
2019 element_under_configuration
=element_under_configuration
,
2020 element_type
=element_type
,
2021 other_update
=db_nsr_update
,
2024 # TODO check if already done
2025 self
.logger
.debug(logging_text
+ step
)
2027 if vca_type
== "native_charm":
2028 config_primitive
= next(
2029 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2032 if config_primitive
:
2033 config
= self
._map
_primitive
_params
(
2034 config_primitive
, {}, deploy_params
2037 if vca_type
== "lxc_proxy_charm":
2038 if element_type
== "NS":
2039 num_units
= db_nsr
.get("config-units") or 1
2040 elif element_type
== "VNF":
2041 num_units
= db_vnfr
.get("config-units") or 1
2042 elif element_type
== "VDU":
2043 for v
in db_vnfr
["vdur"]:
2044 if vdu_id
== v
["vdu-id-ref"]:
2045 num_units
= v
.get("config-units") or 1
2047 if vca_type
!= "k8s_proxy_charm":
2048 await self
.vca_map
[vca_type
].install_configuration_sw(
2050 artifact_path
=artifact_path
,
2053 num_units
=num_units
,
2058 # write in db flag of configuration_sw already installed
2060 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2063 # add relations for this VCA (wait for other peers related with this VCA)
2064 await self
._add
_vca
_relations
(
2065 logging_text
=logging_text
,
2068 vca_index
=vca_index
,
2071 # if SSH access is required, then get execution environment SSH public
2072 # if native charm we have waited already to VM be UP
2073 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2076 # self.logger.debug("get ssh key block")
2078 config_descriptor
, ("config-access", "ssh-access", "required")
2080 # self.logger.debug("ssh key needed")
2081 # Needed to inject a ssh key
2084 ("config-access", "ssh-access", "default-user"),
2086 step
= "Install configuration Software, getting public ssh key"
2087 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2088 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2091 step
= "Insert public key into VM user={} ssh_key={}".format(
2095 # self.logger.debug("no need to get ssh key")
2096 step
= "Waiting to VM being up and getting IP address"
2097 self
.logger
.debug(logging_text
+ step
)
2099 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2102 # n2vc_redesign STEP 5.1
2103 # wait for RO (ip-address) Insert pub_key into VM
2106 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2107 logging_text
, nsr_id
, vnfr_id
, kdu_name
2109 vnfd
= self
.db
.get_one(
2111 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2113 kdu
= get_kdu(vnfd
, kdu_name
)
2115 service
["name"] for service
in get_kdu_services(kdu
)
2117 exposed_services
= []
2118 for service
in services
:
2119 if any(s
in service
["name"] for s
in kdu_services
):
2120 exposed_services
.append(service
)
2121 await self
.vca_map
[vca_type
].exec_primitive(
2123 primitive_name
="config",
2125 "osm-config": json
.dumps(
2127 k8s
={"services": exposed_services
}
2134 # This verification is needed in order to avoid trying to add a public key
2135 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2136 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2137 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2139 elif db_vnfr
.get("vdur"):
2140 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2150 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2152 # store rw_mgmt_ip in deploy params for later replacement
2153 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2155 # n2vc_redesign STEP 6 Execute initial config primitive
2156 step
= "execute initial config primitive"
2158 # wait for dependent primitives execution (NS -> VNF -> VDU)
2159 if initial_config_primitive_list
:
2160 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2162 # stage, in function of element type: vdu, kdu, vnf or ns
2163 my_vca
= vca_deployed_list
[vca_index
]
2164 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2166 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2167 elif my_vca
.get("member-vnf-index"):
2169 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2172 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2174 self
._write
_configuration
_status
(
2175 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2178 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2180 check_if_terminated_needed
= True
2181 for initial_config_primitive
in initial_config_primitive_list
:
2182 # adding information on the vca_deployed if it is a NS execution environment
2183 if not vca_deployed
["member-vnf-index"]:
2184 deploy_params
["ns_config_info"] = json
.dumps(
2185 self
._get
_ns
_config
_info
(nsr_id
)
2187 # TODO check if already done
2188 primitive_params_
= self
._map
_primitive
_params
(
2189 initial_config_primitive
, {}, deploy_params
2192 step
= "execute primitive '{}' params '{}'".format(
2193 initial_config_primitive
["name"], primitive_params_
2195 self
.logger
.debug(logging_text
+ step
)
2196 await self
.vca_map
[vca_type
].exec_primitive(
2198 primitive_name
=initial_config_primitive
["name"],
2199 params_dict
=primitive_params_
,
2204 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2205 if check_if_terminated_needed
:
2206 if config_descriptor
.get("terminate-config-primitive"):
2208 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2210 check_if_terminated_needed
= False
2212 # TODO register in database that primitive is done
2214 # STEP 7 Configure metrics
2215 if vca_type
== "helm" or vca_type
== "helm-v3":
2216 # TODO: review for those cases where the helm chart is a reference and
2217 # is not part of the NF package
2218 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2220 artifact_path
=artifact_path
,
2221 ee_config_descriptor
=ee_config_descriptor
,
2224 target_ip
=rw_mgmt_ip
,
2230 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2233 for job
in prometheus_jobs
:
2236 {"job_name": job
["job_name"]},
2239 fail_on_empty
=False,
2242 step
= "instantiated at VCA"
2243 self
.logger
.debug(logging_text
+ step
)
2245 self
._write
_configuration
_status
(
2246 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2249 except Exception as e
: # TODO not use Exception but N2VC exception
2250 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2252 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2255 "Exception while {} : {}".format(step
, e
), exc_info
=True
2257 self
._write
_configuration
_status
(
2258 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2260 raise LcmException("{} {}".format(step
, e
)) from e
2262 def _write_ns_status(
2266 current_operation
: str,
2267 current_operation_id
: str,
2268 error_description
: str = None,
2269 error_detail
: str = None,
2270 other_update
: dict = None,
2273 Update db_nsr fields.
2276 :param current_operation:
2277 :param current_operation_id:
2278 :param error_description:
2279 :param error_detail:
2280 :param other_update: Other required changes at database if provided, will be cleared
2284 db_dict
= other_update
or {}
2287 ] = current_operation_id
# for backward compatibility
2288 db_dict
["_admin.current-operation"] = current_operation_id
2289 db_dict
["_admin.operation-type"] = (
2290 current_operation
if current_operation
!= "IDLE" else None
2292 db_dict
["currentOperation"] = current_operation
2293 db_dict
["currentOperationID"] = current_operation_id
2294 db_dict
["errorDescription"] = error_description
2295 db_dict
["errorDetail"] = error_detail
2298 db_dict
["nsState"] = ns_state
2299 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2300 except DbException
as e
:
2301 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2303 def _write_op_status(
2307 error_message
: str = None,
2308 queuePosition
: int = 0,
2309 operation_state
: str = None,
2310 other_update
: dict = None,
2313 db_dict
= other_update
or {}
2314 db_dict
["queuePosition"] = queuePosition
2315 if isinstance(stage
, list):
2316 db_dict
["stage"] = stage
[0]
2317 db_dict
["detailed-status"] = " ".join(stage
)
2318 elif stage
is not None:
2319 db_dict
["stage"] = str(stage
)
2321 if error_message
is not None:
2322 db_dict
["errorMessage"] = error_message
2323 if operation_state
is not None:
2324 db_dict
["operationState"] = operation_state
2325 db_dict
["statusEnteredTime"] = time()
2326 self
.update_db_2("nslcmops", op_id
, db_dict
)
2327 except DbException
as e
:
2329 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2332 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2334 nsr_id
= db_nsr
["_id"]
2335 # configurationStatus
2336 config_status
= db_nsr
.get("configurationStatus")
2339 "configurationStatus.{}.status".format(index
): status
2340 for index
, v
in enumerate(config_status
)
2344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2346 except DbException
as e
:
2348 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2351 def _write_configuration_status(
2356 element_under_configuration
: str = None,
2357 element_type
: str = None,
2358 other_update
: dict = None,
2361 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2362 # .format(vca_index, status))
2365 db_path
= "configurationStatus.{}.".format(vca_index
)
2366 db_dict
= other_update
or {}
2368 db_dict
[db_path
+ "status"] = status
2369 if element_under_configuration
:
2371 db_path
+ "elementUnderConfiguration"
2372 ] = element_under_configuration
2374 db_dict
[db_path
+ "elementType"] = element_type
2375 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2376 except DbException
as e
:
2378 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2379 status
, nsr_id
, vca_index
, e
2383 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2385 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2386 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2387 Database is used because the result can be obtained from a different LCM worker in case of HA.
2388 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2389 :param db_nslcmop: database content of nslcmop
2390 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2391 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2392 computed 'vim-account-id'
2395 nslcmop_id
= db_nslcmop
["_id"]
2396 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2397 if placement_engine
== "PLA":
2399 logging_text
+ "Invoke and wait for placement optimization"
2401 await self
.msg
.aiowrite(
2402 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2404 db_poll_interval
= 5
2405 wait
= db_poll_interval
* 10
2407 while not pla_result
and wait
>= 0:
2408 await asyncio
.sleep(db_poll_interval
)
2409 wait
-= db_poll_interval
2410 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2411 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2415 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2418 for pla_vnf
in pla_result
["vnf"]:
2419 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2420 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2425 {"_id": vnfr
["_id"]},
2426 {"vim-account-id": pla_vnf
["vimAccountId"]},
2429 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2432 def update_nsrs_with_pla_result(self
, params
):
2434 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2436 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2438 except Exception as e
:
2439 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2441 async def instantiate(self
, nsr_id
, nslcmop_id
):
2444 :param nsr_id: ns instance to deploy
2445 :param nslcmop_id: operation to run
2449 # Try to lock HA task here
2450 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2451 if not task_is_locked_by_me
:
2453 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2457 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2458 self
.logger
.debug(logging_text
+ "Enter")
2460 # get all needed from database
2462 # database nsrs record
2465 # database nslcmops record
2468 # update operation on nsrs
2470 # update operation on nslcmops
2471 db_nslcmop_update
= {}
2473 nslcmop_operation_state
= None
2474 db_vnfrs
= {} # vnf's info indexed by member-index
2476 tasks_dict_info
= {} # from task to info text
2480 "Stage 1/5: preparation of the environment.",
2481 "Waiting for previous operations to terminate.",
2484 # ^ stage, step, VIM progress
2486 # wait for any previous tasks in process
2487 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2489 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2490 stage
[1] = "Reading from database."
2491 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2492 db_nsr_update
["detailed-status"] = "creating"
2493 db_nsr_update
["operational-status"] = "init"
2494 self
._write
_ns
_status
(
2496 ns_state
="BUILDING",
2497 current_operation
="INSTANTIATING",
2498 current_operation_id
=nslcmop_id
,
2499 other_update
=db_nsr_update
,
2501 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2503 # read from db: operation
2504 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2505 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2506 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2507 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2508 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2510 ns_params
= db_nslcmop
.get("operationParams")
2511 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2512 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2514 timeout_ns_deploy
= self
.timeout
.get(
2515 "ns_deploy", self
.timeout_ns_deploy
2519 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2520 self
.logger
.debug(logging_text
+ stage
[1])
2521 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2522 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2523 self
.logger
.debug(logging_text
+ stage
[1])
2524 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2525 self
.fs
.sync(db_nsr
["nsd-id"])
2527 # nsr_name = db_nsr["name"] # TODO short-name??
2529 # read from db: vnf's of this ns
2530 stage
[1] = "Getting vnfrs from db."
2531 self
.logger
.debug(logging_text
+ stage
[1])
2532 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2534 # read from db: vnfd's for every vnf
2535 db_vnfds
= [] # every vnfd data
2537 # for each vnf in ns, read vnfd
2538 for vnfr
in db_vnfrs_list
:
2539 if vnfr
.get("kdur"):
2541 for kdur
in vnfr
["kdur"]:
2542 if kdur
.get("additionalParams"):
2543 kdur
["additionalParams"] = json
.loads(
2544 kdur
["additionalParams"]
2546 kdur_list
.append(kdur
)
2547 vnfr
["kdur"] = kdur_list
2549 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2550 vnfd_id
= vnfr
["vnfd-id"]
2551 vnfd_ref
= vnfr
["vnfd-ref"]
2552 self
.fs
.sync(vnfd_id
)
2554 # if we haven't this vnfd, read it from db
2555 if vnfd_id
not in db_vnfds
:
2557 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2560 self
.logger
.debug(logging_text
+ stage
[1])
2561 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2564 db_vnfds
.append(vnfd
)
2566 # Get or generates the _admin.deployed.VCA list
2567 vca_deployed_list
= None
2568 if db_nsr
["_admin"].get("deployed"):
2569 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2570 if vca_deployed_list
is None:
2571 vca_deployed_list
= []
2572 configuration_status_list
= []
2573 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2574 db_nsr_update
["configurationStatus"] = configuration_status_list
2575 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2576 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2577 elif isinstance(vca_deployed_list
, dict):
2578 # maintain backward compatibility. Change a dict to list at database
2579 vca_deployed_list
= list(vca_deployed_list
.values())
2580 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2581 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2584 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2586 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2587 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2589 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2590 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2591 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2593 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2596 # n2vc_redesign STEP 2 Deploy Network Scenario
2597 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2598 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2600 stage
[1] = "Deploying KDUs."
2601 # self.logger.debug(logging_text + "Before deploy_kdus")
2602 # Call to deploy_kdus in case exists the "vdu:kdu" param
2603 await self
.deploy_kdus(
2604 logging_text
=logging_text
,
2606 nslcmop_id
=nslcmop_id
,
2609 task_instantiation_info
=tasks_dict_info
,
2612 stage
[1] = "Getting VCA public key."
2613 # n2vc_redesign STEP 1 Get VCA public ssh-key
2614 # feature 1429. Add n2vc public key to needed VMs
2615 n2vc_key
= self
.n2vc
.get_public_key()
2616 n2vc_key_list
= [n2vc_key
]
2617 if self
.vca_config
.get("public_key"):
2618 n2vc_key_list
.append(self
.vca_config
["public_key"])
2620 stage
[1] = "Deploying NS at VIM."
2621 task_ro
= asyncio
.ensure_future(
2622 self
.instantiate_RO(
2623 logging_text
=logging_text
,
2627 db_nslcmop
=db_nslcmop
,
2630 n2vc_key_list
=n2vc_key_list
,
2634 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2635 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2637 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2638 stage
[1] = "Deploying Execution Environments."
2639 self
.logger
.debug(logging_text
+ stage
[1])
2641 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2642 for vnf_profile
in get_vnf_profiles(nsd
):
2643 vnfd_id
= vnf_profile
["vnfd-id"]
2644 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2645 member_vnf_index
= str(vnf_profile
["id"])
2646 db_vnfr
= db_vnfrs
[member_vnf_index
]
2647 base_folder
= vnfd
["_admin"]["storage"]
2653 # Get additional parameters
2654 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2655 if db_vnfr
.get("additionalParamsForVnf"):
2656 deploy_params
.update(
2657 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2660 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2661 if descriptor_config
:
2663 logging_text
=logging_text
2664 + "member_vnf_index={} ".format(member_vnf_index
),
2667 nslcmop_id
=nslcmop_id
,
2673 member_vnf_index
=member_vnf_index
,
2674 vdu_index
=vdu_index
,
2676 deploy_params
=deploy_params
,
2677 descriptor_config
=descriptor_config
,
2678 base_folder
=base_folder
,
2679 task_instantiation_info
=tasks_dict_info
,
2683 # Deploy charms for each VDU that supports one.
2684 for vdud
in get_vdu_list(vnfd
):
2686 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2687 vdur
= find_in_list(
2688 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2691 if vdur
.get("additionalParams"):
2692 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2694 deploy_params_vdu
= deploy_params
2695 deploy_params_vdu
["OSM"] = get_osm_params(
2696 db_vnfr
, vdu_id
, vdu_count_index
=0
2698 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2700 self
.logger
.debug("VDUD > {}".format(vdud
))
2702 "Descriptor config > {}".format(descriptor_config
)
2704 if descriptor_config
:
2707 for vdu_index
in range(vdud_count
):
2708 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2710 logging_text
=logging_text
2711 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2712 member_vnf_index
, vdu_id
, vdu_index
2716 nslcmop_id
=nslcmop_id
,
2722 member_vnf_index
=member_vnf_index
,
2723 vdu_index
=vdu_index
,
2725 deploy_params
=deploy_params_vdu
,
2726 descriptor_config
=descriptor_config
,
2727 base_folder
=base_folder
,
2728 task_instantiation_info
=tasks_dict_info
,
2731 for kdud
in get_kdu_list(vnfd
):
2732 kdu_name
= kdud
["name"]
2733 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2734 if descriptor_config
:
2739 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2741 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2742 if kdur
.get("additionalParams"):
2743 deploy_params_kdu
.update(
2744 parse_yaml_strings(kdur
["additionalParams"].copy())
2748 logging_text
=logging_text
,
2751 nslcmop_id
=nslcmop_id
,
2757 member_vnf_index
=member_vnf_index
,
2758 vdu_index
=vdu_index
,
2760 deploy_params
=deploy_params_kdu
,
2761 descriptor_config
=descriptor_config
,
2762 base_folder
=base_folder
,
2763 task_instantiation_info
=tasks_dict_info
,
2767 # Check if this NS has a charm configuration
2768 descriptor_config
= nsd
.get("ns-configuration")
2769 if descriptor_config
and descriptor_config
.get("juju"):
2772 member_vnf_index
= None
2778 # Get additional parameters
2779 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2780 if db_nsr
.get("additionalParamsForNs"):
2781 deploy_params
.update(
2782 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2784 base_folder
= nsd
["_admin"]["storage"]
2786 logging_text
=logging_text
,
2789 nslcmop_id
=nslcmop_id
,
2795 member_vnf_index
=member_vnf_index
,
2796 vdu_index
=vdu_index
,
2798 deploy_params
=deploy_params
,
2799 descriptor_config
=descriptor_config
,
2800 base_folder
=base_folder
,
2801 task_instantiation_info
=tasks_dict_info
,
2805 # rest of staff will be done at finally
2808 ROclient
.ROClientException
,
2814 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2817 except asyncio
.CancelledError
:
2819 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2821 exc
= "Operation was cancelled"
2822 except Exception as e
:
2823 exc
= traceback
.format_exc()
2824 self
.logger
.critical(
2825 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2830 error_list
.append(str(exc
))
2832 # wait for pending tasks
2834 stage
[1] = "Waiting for instantiate pending tasks."
2835 self
.logger
.debug(logging_text
+ stage
[1])
2836 error_list
+= await self
._wait
_for
_tasks
(
2844 stage
[1] = stage
[2] = ""
2845 except asyncio
.CancelledError
:
2846 error_list
.append("Cancelled")
2847 # TODO cancel all tasks
2848 except Exception as exc
:
2849 error_list
.append(str(exc
))
2851 # update operation-status
2852 db_nsr_update
["operational-status"] = "running"
2853 # let's begin with VCA 'configured' status (later we can change it)
2854 db_nsr_update
["config-status"] = "configured"
2855 for task
, task_name
in tasks_dict_info
.items():
2856 if not task
.done() or task
.cancelled() or task
.exception():
2857 if task_name
.startswith(self
.task_name_deploy_vca
):
2858 # A N2VC task is pending
2859 db_nsr_update
["config-status"] = "failed"
2861 # RO or KDU task is pending
2862 db_nsr_update
["operational-status"] = "failed"
2864 # update status at database
2866 error_detail
= ". ".join(error_list
)
2867 self
.logger
.error(logging_text
+ error_detail
)
2868 error_description_nslcmop
= "{} Detail: {}".format(
2869 stage
[0], error_detail
2871 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2872 nslcmop_id
, stage
[0]
2875 db_nsr_update
["detailed-status"] = (
2876 error_description_nsr
+ " Detail: " + error_detail
2878 db_nslcmop_update
["detailed-status"] = error_detail
2879 nslcmop_operation_state
= "FAILED"
2883 error_description_nsr
= error_description_nslcmop
= None
2885 db_nsr_update
["detailed-status"] = "Done"
2886 db_nslcmop_update
["detailed-status"] = "Done"
2887 nslcmop_operation_state
= "COMPLETED"
2890 self
._write
_ns
_status
(
2893 current_operation
="IDLE",
2894 current_operation_id
=None,
2895 error_description
=error_description_nsr
,
2896 error_detail
=error_detail
,
2897 other_update
=db_nsr_update
,
2899 self
._write
_op
_status
(
2902 error_message
=error_description_nslcmop
,
2903 operation_state
=nslcmop_operation_state
,
2904 other_update
=db_nslcmop_update
,
2907 if nslcmop_operation_state
:
2909 await self
.msg
.aiowrite(
2914 "nslcmop_id": nslcmop_id
,
2915 "operationState": nslcmop_operation_state
,
2919 except Exception as e
:
2921 logging_text
+ "kafka_write notification Exception {}".format(e
)
2924 self
.logger
.debug(logging_text
+ "Exit")
2925 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2927 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2928 if vnfd_id
not in cached_vnfds
:
2929 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2930 return cached_vnfds
[vnfd_id
]
2932 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2933 if vnf_profile_id
not in cached_vnfrs
:
2934 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2937 "member-vnf-index-ref": vnf_profile_id
,
2938 "nsr-id-ref": nsr_id
,
2941 return cached_vnfrs
[vnf_profile_id
]
2943 def _is_deployed_vca_in_relation(
2944 self
, vca
: DeployedVCA
, relation
: Relation
2947 for endpoint
in (relation
.provider
, relation
.requirer
):
2948 if endpoint
["kdu-resource-profile-id"]:
2951 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2952 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2953 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2959 def _update_ee_relation_data_with_implicit_data(
2960 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2962 ee_relation_data
= safe_get_ee_relation(
2963 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2965 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2966 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2967 "execution-environment-ref"
2969 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2970 vnfd_id
= vnf_profile
["vnfd-id"]
2971 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2974 if ee_relation_level
== EELevel
.VNF
2975 else ee_relation_data
["vdu-profile-id"]
2977 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2980 f
"not execution environments found for ee_relation {ee_relation_data}"
2982 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2983 return ee_relation_data
2985 def _get_ns_relations(
2988 nsd
: Dict
[str, Any
],
2990 cached_vnfds
: Dict
[str, Any
],
2991 ) -> List
[Relation
]:
2993 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2994 for r
in db_ns_relations
:
2995 provider_dict
= None
2996 requirer_dict
= None
2997 if all(key
in r
for key
in ("provider", "requirer")):
2998 provider_dict
= r
["provider"]
2999 requirer_dict
= r
["requirer"]
3000 elif "entities" in r
:
3001 provider_id
= r
["entities"][0]["id"]
3004 "endpoint": r
["entities"][0]["endpoint"],
3006 if provider_id
!= nsd
["id"]:
3007 provider_dict
["vnf-profile-id"] = provider_id
3008 requirer_id
= r
["entities"][1]["id"]
3011 "endpoint": r
["entities"][1]["endpoint"],
3013 if requirer_id
!= nsd
["id"]:
3014 requirer_dict
["vnf-profile-id"] = requirer_id
3017 "provider/requirer or entities must be included in the relation."
3019 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3020 nsr_id
, nsd
, provider_dict
, cached_vnfds
3022 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3023 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3025 provider
= EERelation(relation_provider
)
3026 requirer
= EERelation(relation_requirer
)
3027 relation
= Relation(r
["name"], provider
, requirer
)
3028 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3030 relations
.append(relation
)
3033 def _get_vnf_relations(
3036 nsd
: Dict
[str, Any
],
3038 cached_vnfds
: Dict
[str, Any
],
3039 ) -> List
[Relation
]:
3041 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3042 vnf_profile_id
= vnf_profile
["id"]
3043 vnfd_id
= vnf_profile
["vnfd-id"]
3044 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3045 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3046 for r
in db_vnf_relations
:
3047 provider_dict
= None
3048 requirer_dict
= None
3049 if all(key
in r
for key
in ("provider", "requirer")):
3050 provider_dict
= r
["provider"]
3051 requirer_dict
= r
["requirer"]
3052 elif "entities" in r
:
3053 provider_id
= r
["entities"][0]["id"]
3056 "vnf-profile-id": vnf_profile_id
,
3057 "endpoint": r
["entities"][0]["endpoint"],
3059 if provider_id
!= vnfd_id
:
3060 provider_dict
["vdu-profile-id"] = provider_id
3061 requirer_id
= r
["entities"][1]["id"]
3064 "vnf-profile-id": vnf_profile_id
,
3065 "endpoint": r
["entities"][1]["endpoint"],
3067 if requirer_id
!= vnfd_id
:
3068 requirer_dict
["vdu-profile-id"] = requirer_id
3071 "provider/requirer or entities must be included in the relation."
3073 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3074 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3076 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3077 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3079 provider
= EERelation(relation_provider
)
3080 requirer
= EERelation(relation_requirer
)
3081 relation
= Relation(r
["name"], provider
, requirer
)
3082 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3084 relations
.append(relation
)
3087 def _get_kdu_resource_data(
3089 ee_relation
: EERelation
,
3090 db_nsr
: Dict
[str, Any
],
3091 cached_vnfds
: Dict
[str, Any
],
3092 ) -> DeployedK8sResource
:
3093 nsd
= get_nsd(db_nsr
)
3094 vnf_profiles
= get_vnf_profiles(nsd
)
3095 vnfd_id
= find_in_list(
3097 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3099 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3100 kdu_resource_profile
= get_kdu_resource_profile(
3101 db_vnfd
, ee_relation
.kdu_resource_profile_id
3103 kdu_name
= kdu_resource_profile
["kdu-name"]
3104 deployed_kdu
, _
= get_deployed_kdu(
3105 db_nsr
.get("_admin", ()).get("deployed", ()),
3107 ee_relation
.vnf_profile_id
,
3109 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3112 def _get_deployed_component(
3114 ee_relation
: EERelation
,
3115 db_nsr
: Dict
[str, Any
],
3116 cached_vnfds
: Dict
[str, Any
],
3117 ) -> DeployedComponent
:
3118 nsr_id
= db_nsr
["_id"]
3119 deployed_component
= None
3120 ee_level
= EELevel
.get_level(ee_relation
)
3121 if ee_level
== EELevel
.NS
:
3122 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3124 deployed_component
= DeployedVCA(nsr_id
, vca
)
3125 elif ee_level
== EELevel
.VNF
:
3126 vca
= get_deployed_vca(
3130 "member-vnf-index": ee_relation
.vnf_profile_id
,
3131 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3135 deployed_component
= DeployedVCA(nsr_id
, vca
)
3136 elif ee_level
== EELevel
.VDU
:
3137 vca
= get_deployed_vca(
3140 "vdu_id": ee_relation
.vdu_profile_id
,
3141 "member-vnf-index": ee_relation
.vnf_profile_id
,
3142 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3146 deployed_component
= DeployedVCA(nsr_id
, vca
)
3147 elif ee_level
== EELevel
.KDU
:
3148 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3149 ee_relation
, db_nsr
, cached_vnfds
3151 if kdu_resource_data
:
3152 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3153 return deployed_component
3155 async def _add_relation(
3159 db_nsr
: Dict
[str, Any
],
3160 cached_vnfds
: Dict
[str, Any
],
3161 cached_vnfrs
: Dict
[str, Any
],
3163 deployed_provider
= self
._get
_deployed
_component
(
3164 relation
.provider
, db_nsr
, cached_vnfds
3166 deployed_requirer
= self
._get
_deployed
_component
(
3167 relation
.requirer
, db_nsr
, cached_vnfds
3171 and deployed_requirer
3172 and deployed_provider
.config_sw_installed
3173 and deployed_requirer
.config_sw_installed
3175 provider_db_vnfr
= (
3177 relation
.provider
.nsr_id
,
3178 relation
.provider
.vnf_profile_id
,
3181 if relation
.provider
.vnf_profile_id
3184 requirer_db_vnfr
= (
3186 relation
.requirer
.nsr_id
,
3187 relation
.requirer
.vnf_profile_id
,
3190 if relation
.requirer
.vnf_profile_id
3193 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3194 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3195 provider_relation_endpoint
= RelationEndpoint(
3196 deployed_provider
.ee_id
,
3198 relation
.provider
.endpoint
,
3200 requirer_relation_endpoint
= RelationEndpoint(
3201 deployed_requirer
.ee_id
,
3203 relation
.requirer
.endpoint
,
3205 await self
.vca_map
[vca_type
].add_relation(
3206 provider
=provider_relation_endpoint
,
3207 requirer
=requirer_relation_endpoint
,
3209 # remove entry from relations list
3213 async def _add_vca_relations(
3219 timeout
: int = 3600,
3223 # 1. find all relations for this VCA
3224 # 2. wait for other peers related
3228 # STEP 1: find all relations for this VCA
3231 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3232 nsd
= get_nsd(db_nsr
)
3235 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3236 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3241 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3242 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3244 # if no relations, terminate
3246 self
.logger
.debug(logging_text
+ " No relations")
3249 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3256 if now
- start
>= timeout
:
3257 self
.logger
.error(logging_text
+ " : timeout adding relations")
3260 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3261 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3263 # for each relation, find the VCA's related
3264 for relation
in relations
.copy():
3265 added
= await self
._add
_relation
(
3273 relations
.remove(relation
)
3276 self
.logger
.debug("Relations added")
3278 await asyncio
.sleep(5.0)
3282 except Exception as e
:
3283 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3286 async def _install_kdu(
3294 k8s_instance_info
: dict,
3295 k8params
: dict = None,
3301 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3304 "collection": "nsrs",
3305 "filter": {"_id": nsr_id
},
3306 "path": nsr_db_path
,
3309 if k8s_instance_info
.get("kdu-deployment-name"):
3310 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3312 kdu_instance
= self
.k8scluster_map
[
3314 ].generate_kdu_instance_name(
3315 db_dict
=db_dict_install
,
3316 kdu_model
=k8s_instance_info
["kdu-model"],
3317 kdu_name
=k8s_instance_info
["kdu-name"],
3320 # Update the nsrs table with the kdu-instance value
3324 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3327 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3328 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3329 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3330 # namespace, this first verification could be removed, and the next step would be done for any kind
3332 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3333 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3334 if k8sclustertype
in ("juju", "juju-bundle"):
3335 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3336 # that the user passed a namespace which he wants its KDU to be deployed in)
3342 "_admin.projects_write": k8s_instance_info
["namespace"],
3343 "_admin.projects_read": k8s_instance_info
["namespace"],
3349 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3354 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3356 k8s_instance_info
["namespace"] = kdu_instance
3358 await self
.k8scluster_map
[k8sclustertype
].install(
3359 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3360 kdu_model
=k8s_instance_info
["kdu-model"],
3363 db_dict
=db_dict_install
,
3365 kdu_name
=k8s_instance_info
["kdu-name"],
3366 namespace
=k8s_instance_info
["namespace"],
3367 kdu_instance
=kdu_instance
,
3371 # Obtain services to obtain management service ip
3372 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3373 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3374 kdu_instance
=kdu_instance
,
3375 namespace
=k8s_instance_info
["namespace"],
3378 # Obtain management service info (if exists)
3379 vnfr_update_dict
= {}
3380 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3382 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3387 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3390 for service
in kdud
.get("service", [])
3391 if service
.get("mgmt-service")
3393 for mgmt_service
in mgmt_services
:
3394 for service
in services
:
3395 if service
["name"].startswith(mgmt_service
["name"]):
3396 # Mgmt service found, Obtain service ip
3397 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3398 if isinstance(ip
, list) and len(ip
) == 1:
3402 "kdur.{}.ip-address".format(kdu_index
)
3405 # Check if must update also mgmt ip at the vnf
3406 service_external_cp
= mgmt_service
.get(
3407 "external-connection-point-ref"
3409 if service_external_cp
:
3411 deep_get(vnfd
, ("mgmt-interface", "cp"))
3412 == service_external_cp
3414 vnfr_update_dict
["ip-address"] = ip
3419 "external-connection-point-ref", ""
3421 == service_external_cp
,
3424 "kdur.{}.ip-address".format(kdu_index
)
3429 "Mgmt service name: {} not found".format(
3430 mgmt_service
["name"]
3434 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3435 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3437 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3440 and kdu_config
.get("initial-config-primitive")
3441 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3443 initial_config_primitive_list
= kdu_config
.get(
3444 "initial-config-primitive"
3446 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3448 for initial_config_primitive
in initial_config_primitive_list
:
3449 primitive_params_
= self
._map
_primitive
_params
(
3450 initial_config_primitive
, {}, {}
3453 await asyncio
.wait_for(
3454 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3455 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3456 kdu_instance
=kdu_instance
,
3457 primitive_name
=initial_config_primitive
["name"],
3458 params
=primitive_params_
,
3459 db_dict
=db_dict_install
,
3465 except Exception as e
:
3466 # Prepare update db with error and raise exception
3469 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3473 vnfr_data
.get("_id"),
3474 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3477 # ignore to keep original exception
3479 # reraise original error
3484 async def deploy_kdus(
3491 task_instantiation_info
,
3493 # Launch kdus if present in the descriptor
3495 k8scluster_id_2_uuic
= {
3496 "helm-chart-v3": {},
3501 async def _get_cluster_id(cluster_id
, cluster_type
):
3502 nonlocal k8scluster_id_2_uuic
3503 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3504 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3506 # check if K8scluster is creating and wait look if previous tasks in process
3507 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3508 "k8scluster", cluster_id
3511 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3512 task_name
, cluster_id
3514 self
.logger
.debug(logging_text
+ text
)
3515 await asyncio
.wait(task_dependency
, timeout
=3600)
3517 db_k8scluster
= self
.db
.get_one(
3518 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3520 if not db_k8scluster
:
3521 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3523 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3525 if cluster_type
== "helm-chart-v3":
3527 # backward compatibility for existing clusters that have not been initialized for helm v3
3528 k8s_credentials
= yaml
.safe_dump(
3529 db_k8scluster
.get("credentials")
3531 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3532 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3534 db_k8scluster_update
= {}
3535 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3536 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3537 db_k8scluster_update
[
3538 "_admin.helm-chart-v3.created"
3540 db_k8scluster_update
[
3541 "_admin.helm-chart-v3.operationalState"
3544 "k8sclusters", cluster_id
, db_k8scluster_update
3546 except Exception as e
:
3549 + "error initializing helm-v3 cluster: {}".format(str(e
))
3552 "K8s cluster '{}' has not been initialized for '{}'".format(
3553 cluster_id
, cluster_type
3558 "K8s cluster '{}' has not been initialized for '{}'".format(
3559 cluster_id
, cluster_type
3562 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3565 logging_text
+= "Deploy kdus: "
3568 db_nsr_update
= {"_admin.deployed.K8s": []}
3569 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3572 updated_cluster_list
= []
3573 updated_v3_cluster_list
= []
3575 for vnfr_data
in db_vnfrs
.values():
3576 vca_id
= self
.get_vca_id(vnfr_data
, {})
3577 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3578 # Step 0: Prepare and set parameters
3579 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3580 vnfd_id
= vnfr_data
.get("vnfd-id")
3581 vnfd_with_id
= find_in_list(
3582 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3586 for kdud
in vnfd_with_id
["kdu"]
3587 if kdud
["name"] == kdur
["kdu-name"]
3589 namespace
= kdur
.get("k8s-namespace")
3590 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3591 if kdur
.get("helm-chart"):
3592 kdumodel
= kdur
["helm-chart"]
3593 # Default version: helm3, if helm-version is v2 assign v2
3594 k8sclustertype
= "helm-chart-v3"
3595 self
.logger
.debug("kdur: {}".format(kdur
))
3597 kdur
.get("helm-version")
3598 and kdur
.get("helm-version") == "v2"
3600 k8sclustertype
= "helm-chart"
3601 elif kdur
.get("juju-bundle"):
3602 kdumodel
= kdur
["juju-bundle"]
3603 k8sclustertype
= "juju-bundle"
3606 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3607 "juju-bundle. Maybe an old NBI version is running".format(
3608 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3611 # check if kdumodel is a file and exists
3613 vnfd_with_id
= find_in_list(
3614 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3616 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3617 if storage
: # may be not present if vnfd has not artifacts
3618 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3619 if storage
["pkg-dir"]:
3620 filename
= "{}/{}/{}s/{}".format(
3627 filename
= "{}/Scripts/{}s/{}".format(
3632 if self
.fs
.file_exists(
3633 filename
, mode
="file"
3634 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3635 kdumodel
= self
.fs
.path
+ filename
3636 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3638 except Exception: # it is not a file
3641 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3642 step
= "Synchronize repos for k8s cluster '{}'".format(
3645 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3649 k8sclustertype
== "helm-chart"
3650 and cluster_uuid
not in updated_cluster_list
3652 k8sclustertype
== "helm-chart-v3"
3653 and cluster_uuid
not in updated_v3_cluster_list
3655 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3656 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3657 cluster_uuid
=cluster_uuid
3660 if del_repo_list
or added_repo_dict
:
3661 if k8sclustertype
== "helm-chart":
3663 "_admin.helm_charts_added." + item
: None
3664 for item
in del_repo_list
3667 "_admin.helm_charts_added." + item
: name
3668 for item
, name
in added_repo_dict
.items()
3670 updated_cluster_list
.append(cluster_uuid
)
3671 elif k8sclustertype
== "helm-chart-v3":
3673 "_admin.helm_charts_v3_added." + item
: None
3674 for item
in del_repo_list
3677 "_admin.helm_charts_v3_added." + item
: name
3678 for item
, name
in added_repo_dict
.items()
3680 updated_v3_cluster_list
.append(cluster_uuid
)
3682 logging_text
+ "repos synchronized on k8s cluster "
3683 "'{}' to_delete: {}, to_add: {}".format(
3684 k8s_cluster_id
, del_repo_list
, added_repo_dict
3689 {"_id": k8s_cluster_id
},
3695 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3696 vnfr_data
["member-vnf-index-ref"],
3700 k8s_instance_info
= {
3701 "kdu-instance": None,
3702 "k8scluster-uuid": cluster_uuid
,
3703 "k8scluster-type": k8sclustertype
,
3704 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3705 "kdu-name": kdur
["kdu-name"],
3706 "kdu-model": kdumodel
,
3707 "namespace": namespace
,
3708 "kdu-deployment-name": kdu_deployment_name
,
3710 db_path
= "_admin.deployed.K8s.{}".format(index
)
3711 db_nsr_update
[db_path
] = k8s_instance_info
3712 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3713 vnfd_with_id
= find_in_list(
3714 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3716 task
= asyncio
.ensure_future(
3725 k8params
=desc_params
,
3730 self
.lcm_tasks
.register(
3734 "instantiate_KDU-{}".format(index
),
3737 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3743 except (LcmException
, asyncio
.CancelledError
):
3745 except Exception as e
:
3746 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3747 if isinstance(e
, (N2VCException
, DbException
)):
3748 self
.logger
.error(logging_text
+ msg
)
3750 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3751 raise LcmException(msg
)
3754 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3773 task_instantiation_info
,
3776 # launch instantiate_N2VC in a asyncio task and register task object
3777 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3778 # if not found, create one entry and update database
3779 # fill db_nsr._admin.deployed.VCA.<index>
3782 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3786 get_charm_name
= False
3787 if "execution-environment-list" in descriptor_config
:
3788 ee_list
= descriptor_config
.get("execution-environment-list", [])
3789 elif "juju" in descriptor_config
:
3790 ee_list
= [descriptor_config
] # ns charms
3791 if "execution-environment-list" not in descriptor_config
:
3792 # charm name is only required for ns charms
3793 get_charm_name
= True
3794 else: # other types as script are not supported
3797 for ee_item
in ee_list
:
3800 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3801 ee_item
.get("juju"), ee_item
.get("helm-chart")
3804 ee_descriptor_id
= ee_item
.get("id")
3805 if ee_item
.get("juju"):
3806 vca_name
= ee_item
["juju"].get("charm")
3808 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3811 if ee_item
["juju"].get("charm") is not None
3814 if ee_item
["juju"].get("cloud") == "k8s":
3815 vca_type
= "k8s_proxy_charm"
3816 elif ee_item
["juju"].get("proxy") is False:
3817 vca_type
= "native_charm"
3818 elif ee_item
.get("helm-chart"):
3819 vca_name
= ee_item
["helm-chart"]
3820 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3823 vca_type
= "helm-v3"
3826 logging_text
+ "skipping non juju neither charm configuration"
3831 for vca_index
, vca_deployed
in enumerate(
3832 db_nsr
["_admin"]["deployed"]["VCA"]
3834 if not vca_deployed
:
3837 vca_deployed
.get("member-vnf-index") == member_vnf_index
3838 and vca_deployed
.get("vdu_id") == vdu_id
3839 and vca_deployed
.get("kdu_name") == kdu_name
3840 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3841 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3845 # not found, create one.
3847 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3850 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3852 target
+= "/kdu/{}".format(kdu_name
)
3854 "target_element": target
,
3855 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3856 "member-vnf-index": member_vnf_index
,
3858 "kdu_name": kdu_name
,
3859 "vdu_count_index": vdu_index
,
3860 "operational-status": "init", # TODO revise
3861 "detailed-status": "", # TODO revise
3862 "step": "initial-deploy", # TODO revise
3864 "vdu_name": vdu_name
,
3866 "ee_descriptor_id": ee_descriptor_id
,
3867 "charm_name": charm_name
,
3871 # create VCA and configurationStatus in db
3873 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3874 "configurationStatus.{}".format(vca_index
): dict(),
3876 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3878 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3880 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3881 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3882 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3885 task_n2vc
= asyncio
.ensure_future(
3886 self
.instantiate_N2VC(
3887 logging_text
=logging_text
,
3888 vca_index
=vca_index
,
3894 vdu_index
=vdu_index
,
3895 deploy_params
=deploy_params
,
3896 config_descriptor
=descriptor_config
,
3897 base_folder
=base_folder
,
3898 nslcmop_id
=nslcmop_id
,
3902 ee_config_descriptor
=ee_item
,
3905 self
.lcm_tasks
.register(
3909 "instantiate_N2VC-{}".format(vca_index
),
3912 task_instantiation_info
[
3914 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3915 member_vnf_index
or "", vdu_id
or ""
3919 def _create_nslcmop(nsr_id
, operation
, params
):
3921 Creates a ns-lcm-opp content to be stored at database.
3922 :param nsr_id: internal id of the instance
3923 :param operation: instantiate, terminate, scale, action, ...
3924 :param params: user parameters for the operation
3925 :return: dictionary following SOL005 format
3927 # Raise exception if invalid arguments
3928 if not (nsr_id
and operation
and params
):
3930 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3937 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3938 "operationState": "PROCESSING",
3939 "statusEnteredTime": now
,
3940 "nsInstanceId": nsr_id
,
3941 "lcmOperationType": operation
,
3943 "isAutomaticInvocation": False,
3944 "operationParams": params
,
3945 "isCancelPending": False,
3947 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3948 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3953 def _format_additional_params(self
, params
):
3954 params
= params
or {}
3955 for key
, value
in params
.items():
3956 if str(value
).startswith("!!yaml "):
3957 params
[key
] = yaml
.safe_load(value
[7:])
3960 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3961 primitive
= seq
.get("name")
3962 primitive_params
= {}
3964 "member_vnf_index": vnf_index
,
3965 "primitive": primitive
,
3966 "primitive_params": primitive_params
,
3969 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3973 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3974 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3975 if op
.get("operationState") == "COMPLETED":
3976 # b. Skip sub-operation
3977 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3978 return self
.SUBOPERATION_STATUS_SKIP
3980 # c. retry executing sub-operation
3981 # The sub-operation exists, and operationState != 'COMPLETED'
3982 # Update operationState = 'PROCESSING' to indicate a retry.
3983 operationState
= "PROCESSING"
3984 detailed_status
= "In progress"
3985 self
._update
_suboperation
_status
(
3986 db_nslcmop
, op_index
, operationState
, detailed_status
3988 # Return the sub-operation index
3989 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3990 # with arguments extracted from the sub-operation
3993 # Find a sub-operation where all keys in a matching dictionary must match
3994 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3995 def _find_suboperation(self
, db_nslcmop
, match
):
3996 if db_nslcmop
and match
:
3997 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3998 for i
, op
in enumerate(op_list
):
3999 if all(op
.get(k
) == match
[k
] for k
in match
):
4001 return self
.SUBOPERATION_STATUS_NOT_FOUND
4003 # Update status for a sub-operation given its index
4004 def _update_suboperation_status(
4005 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4007 # Update DB for HA tasks
4008 q_filter
= {"_id": db_nslcmop
["_id"]}
4010 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4011 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4014 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4017 # Add sub-operation, return the index of the added sub-operation
4018 # Optionally, set operationState, detailed-status, and operationType
4019 # Status and type are currently set for 'scale' sub-operations:
4020 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4021 # 'detailed-status' : status message
4022 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4023 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4024 def _add_suboperation(
4032 mapped_primitive_params
,
4033 operationState
=None,
4034 detailed_status
=None,
4037 RO_scaling_info
=None,
4040 return self
.SUBOPERATION_STATUS_NOT_FOUND
4041 # Get the "_admin.operations" list, if it exists
4042 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4043 op_list
= db_nslcmop_admin
.get("operations")
4044 # Create or append to the "_admin.operations" list
4046 "member_vnf_index": vnf_index
,
4048 "vdu_count_index": vdu_count_index
,
4049 "primitive": primitive
,
4050 "primitive_params": mapped_primitive_params
,
4053 new_op
["operationState"] = operationState
4055 new_op
["detailed-status"] = detailed_status
4057 new_op
["lcmOperationType"] = operationType
4059 new_op
["RO_nsr_id"] = RO_nsr_id
4061 new_op
["RO_scaling_info"] = RO_scaling_info
4063 # No existing operations, create key 'operations' with current operation as first list element
4064 db_nslcmop_admin
.update({"operations": [new_op
]})
4065 op_list
= db_nslcmop_admin
.get("operations")
4067 # Existing operations, append operation to list
4068 op_list
.append(new_op
)
4070 db_nslcmop_update
= {"_admin.operations": op_list
}
4071 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4072 op_index
= len(op_list
) - 1
4075 # Helper methods for scale() sub-operations
4077 # pre-scale/post-scale:
4078 # Check for 3 different cases:
4079 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4080 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4081 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4082 def _check_or_add_scale_suboperation(
4086 vnf_config_primitive
,
4090 RO_scaling_info
=None,
4092 # Find this sub-operation
4093 if RO_nsr_id
and RO_scaling_info
:
4094 operationType
= "SCALE-RO"
4096 "member_vnf_index": vnf_index
,
4097 "RO_nsr_id": RO_nsr_id
,
4098 "RO_scaling_info": RO_scaling_info
,
4102 "member_vnf_index": vnf_index
,
4103 "primitive": vnf_config_primitive
,
4104 "primitive_params": primitive_params
,
4105 "lcmOperationType": operationType
,
4107 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4108 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4109 # a. New sub-operation
4110 # The sub-operation does not exist, add it.
4111 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4112 # The following parameters are set to None for all kind of scaling:
4114 vdu_count_index
= None
4116 if RO_nsr_id
and RO_scaling_info
:
4117 vnf_config_primitive
= None
4118 primitive_params
= None
4121 RO_scaling_info
= None
4122 # Initial status for sub-operation
4123 operationState
= "PROCESSING"
4124 detailed_status
= "In progress"
4125 # Add sub-operation for pre/post-scaling (zero or more operations)
4126 self
._add
_suboperation
(
4132 vnf_config_primitive
,
4140 return self
.SUBOPERATION_STATUS_NEW
4142 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4143 # or op_index (operationState != 'COMPLETED')
4144 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4146 # Function to return execution_environment id
4148 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4149 # TODO vdu_index_count
4150 for vca
in vca_deployed_list
:
4151 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4154 async def destroy_N2VC(
4162 exec_primitives
=True,
4167 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4168 :param logging_text:
4170 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4171 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4172 :param vca_index: index in the database _admin.deployed.VCA
4173 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4174 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4175 not executed properly
4176 :param scaling_in: True destroys the application, False destroys the model
4177 :return: None or exception
4182 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4183 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4187 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4189 # execute terminate_primitives
4191 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4192 config_descriptor
.get("terminate-config-primitive"),
4193 vca_deployed
.get("ee_descriptor_id"),
4195 vdu_id
= vca_deployed
.get("vdu_id")
4196 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4197 vdu_name
= vca_deployed
.get("vdu_name")
4198 vnf_index
= vca_deployed
.get("member-vnf-index")
4199 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4200 for seq
in terminate_primitives
:
4201 # For each sequence in list, get primitive and call _ns_execute_primitive()
4202 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4203 vnf_index
, seq
.get("name")
4205 self
.logger
.debug(logging_text
+ step
)
4206 # Create the primitive for each sequence, i.e. "primitive": "touch"
4207 primitive
= seq
.get("name")
4208 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4213 self
._add
_suboperation
(
4220 mapped_primitive_params
,
4222 # Sub-operations: Call _ns_execute_primitive() instead of action()
4224 result
, result_detail
= await self
._ns
_execute
_primitive
(
4225 vca_deployed
["ee_id"],
4227 mapped_primitive_params
,
4231 except LcmException
:
4232 # this happens when VCA is not deployed. In this case it is not needed to terminate
4234 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4235 if result
not in result_ok
:
4237 "terminate_primitive {} for vnf_member_index={} fails with "
4238 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4240 # set that this VCA do not need terminated
4241 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4245 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4248 # Delete Prometheus Jobs if any
4249 # This uses NSR_ID, so it will destroy any jobs under this index
4250 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4253 await self
.vca_map
[vca_type
].delete_execution_environment(
4254 vca_deployed
["ee_id"],
4255 scaling_in
=scaling_in
,
4260 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4261 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4262 namespace
= "." + db_nsr
["_id"]
4264 await self
.n2vc
.delete_namespace(
4265 namespace
=namespace
,
4266 total_timeout
=self
.timeout_charm_delete
,
4269 except N2VCNotFound
: # already deleted. Skip
4271 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4273 async def _terminate_RO(
4274 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4277 Terminates a deployment from RO
4278 :param logging_text:
4279 :param nsr_deployed: db_nsr._admin.deployed
4282 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4283 this method will update only the index 2, but it will write on database the concatenated content of the list
4288 ro_nsr_id
= ro_delete_action
= None
4289 if nsr_deployed
and nsr_deployed
.get("RO"):
4290 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4291 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4294 stage
[2] = "Deleting ns from VIM."
4295 db_nsr_update
["detailed-status"] = " ".join(stage
)
4296 self
._write
_op
_status
(nslcmop_id
, stage
)
4297 self
.logger
.debug(logging_text
+ stage
[2])
4298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4299 self
._write
_op
_status
(nslcmop_id
, stage
)
4300 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4301 ro_delete_action
= desc
["action_id"]
4303 "_admin.deployed.RO.nsr_delete_action_id"
4304 ] = ro_delete_action
4305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4307 if ro_delete_action
:
4308 # wait until NS is deleted from VIM
4309 stage
[2] = "Waiting ns deleted from VIM."
4310 detailed_status_old
= None
4314 + " RO_id={} ro_delete_action={}".format(
4315 ro_nsr_id
, ro_delete_action
4318 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4319 self
._write
_op
_status
(nslcmop_id
, stage
)
4321 delete_timeout
= 20 * 60 # 20 minutes
4322 while delete_timeout
> 0:
4323 desc
= await self
.RO
.show(
4325 item_id_name
=ro_nsr_id
,
4326 extra_item
="action",
4327 extra_item_id
=ro_delete_action
,
4331 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4333 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4334 if ns_status
== "ERROR":
4335 raise ROclient
.ROClientException(ns_status_info
)
4336 elif ns_status
== "BUILD":
4337 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4338 elif ns_status
== "ACTIVE":
4339 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4340 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4345 ), "ROclient.check_action_status returns unknown {}".format(
4348 if stage
[2] != detailed_status_old
:
4349 detailed_status_old
= stage
[2]
4350 db_nsr_update
["detailed-status"] = " ".join(stage
)
4351 self
._write
_op
_status
(nslcmop_id
, stage
)
4352 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4353 await asyncio
.sleep(5, loop
=self
.loop
)
4355 else: # delete_timeout <= 0:
4356 raise ROclient
.ROClientException(
4357 "Timeout waiting ns deleted from VIM"
4360 except Exception as e
:
4361 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4363 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4365 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4366 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4367 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4369 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4372 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4374 failed_detail
.append("delete conflict: {}".format(e
))
4377 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4380 failed_detail
.append("delete error: {}".format(e
))
4382 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4386 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4387 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4389 stage
[2] = "Deleting nsd from RO."
4390 db_nsr_update
["detailed-status"] = " ".join(stage
)
4391 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4392 self
._write
_op
_status
(nslcmop_id
, stage
)
4393 await self
.RO
.delete("nsd", ro_nsd_id
)
4395 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4397 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4398 except Exception as e
:
4400 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4402 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4404 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4407 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4409 failed_detail
.append(
4410 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4412 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4414 failed_detail
.append(
4415 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4417 self
.logger
.error(logging_text
+ failed_detail
[-1])
4419 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4420 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4421 if not vnf_deployed
or not vnf_deployed
["id"]:
4424 ro_vnfd_id
= vnf_deployed
["id"]
4427 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4428 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4430 db_nsr_update
["detailed-status"] = " ".join(stage
)
4431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4432 self
._write
_op
_status
(nslcmop_id
, stage
)
4433 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4435 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4437 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4438 except Exception as e
:
4440 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4443 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4447 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4450 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4452 failed_detail
.append(
4453 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4455 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4457 failed_detail
.append(
4458 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4460 self
.logger
.error(logging_text
+ failed_detail
[-1])
4463 stage
[2] = "Error deleting from VIM"
4465 stage
[2] = "Deleted from VIM"
4466 db_nsr_update
["detailed-status"] = " ".join(stage
)
4467 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4468 self
._write
_op
_status
(nslcmop_id
, stage
)
4471 raise LcmException("; ".join(failed_detail
))
4473 async def terminate(self
, nsr_id
, nslcmop_id
):
4474 # Try to lock HA task here
4475 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4476 if not task_is_locked_by_me
:
4479 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4480 self
.logger
.debug(logging_text
+ "Enter")
4481 timeout_ns_terminate
= self
.timeout_ns_terminate
4484 operation_params
= None
4486 error_list
= [] # annotates all failed error messages
4487 db_nslcmop_update
= {}
4488 autoremove
= False # autoremove after terminated
4489 tasks_dict_info
= {}
4492 "Stage 1/3: Preparing task.",
4493 "Waiting for previous operations to terminate.",
4496 # ^ contains [stage, step, VIM-status]
4498 # wait for any previous tasks in process
4499 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4501 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4502 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4503 operation_params
= db_nslcmop
.get("operationParams") or {}
4504 if operation_params
.get("timeout_ns_terminate"):
4505 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4506 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4507 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4509 db_nsr_update
["operational-status"] = "terminating"
4510 db_nsr_update
["config-status"] = "terminating"
4511 self
._write
_ns
_status
(
4513 ns_state
="TERMINATING",
4514 current_operation
="TERMINATING",
4515 current_operation_id
=nslcmop_id
,
4516 other_update
=db_nsr_update
,
4518 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4519 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4520 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4523 stage
[1] = "Getting vnf descriptors from db."
4524 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4526 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4528 db_vnfds_from_id
= {}
4529 db_vnfds_from_member_index
= {}
4531 for vnfr
in db_vnfrs_list
:
4532 vnfd_id
= vnfr
["vnfd-id"]
4533 if vnfd_id
not in db_vnfds_from_id
:
4534 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4535 db_vnfds_from_id
[vnfd_id
] = vnfd
4536 db_vnfds_from_member_index
[
4537 vnfr
["member-vnf-index-ref"]
4538 ] = db_vnfds_from_id
[vnfd_id
]
4540 # Destroy individual execution environments when there are terminating primitives.
4541 # Rest of EE will be deleted at once
4542 # TODO - check before calling _destroy_N2VC
4543 # if not operation_params.get("skip_terminate_primitives"):#
4544 # or not vca.get("needed_terminate"):
4545 stage
[0] = "Stage 2/3 execute terminating primitives."
4546 self
.logger
.debug(logging_text
+ stage
[0])
4547 stage
[1] = "Looking execution environment that needs terminate."
4548 self
.logger
.debug(logging_text
+ stage
[1])
4550 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4551 config_descriptor
= None
4552 vca_member_vnf_index
= vca
.get("member-vnf-index")
4553 vca_id
= self
.get_vca_id(
4554 db_vnfrs_dict
.get(vca_member_vnf_index
)
4555 if vca_member_vnf_index
4559 if not vca
or not vca
.get("ee_id"):
4561 if not vca
.get("member-vnf-index"):
4563 config_descriptor
= db_nsr
.get("ns-configuration")
4564 elif vca
.get("vdu_id"):
4565 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4566 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4567 elif vca
.get("kdu_name"):
4568 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4569 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4571 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4572 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4573 vca_type
= vca
.get("type")
4574 exec_terminate_primitives
= not operation_params
.get(
4575 "skip_terminate_primitives"
4576 ) and vca
.get("needed_terminate")
4577 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4578 # pending native charms
4580 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4582 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4583 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4584 task
= asyncio
.ensure_future(
4592 exec_terminate_primitives
,
4596 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4598 # wait for pending tasks of terminate primitives
4602 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4604 error_list
= await self
._wait
_for
_tasks
(
4607 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4611 tasks_dict_info
.clear()
4613 return # raise LcmException("; ".join(error_list))
4615 # remove All execution environments at once
4616 stage
[0] = "Stage 3/3 delete all."
4618 if nsr_deployed
.get("VCA"):
4619 stage
[1] = "Deleting all execution environments."
4620 self
.logger
.debug(logging_text
+ stage
[1])
4621 vca_id
= self
.get_vca_id({}, db_nsr
)
4622 task_delete_ee
= asyncio
.ensure_future(
4624 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4625 timeout
=self
.timeout_charm_delete
,
4628 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4629 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4631 # Delete from k8scluster
4632 stage
[1] = "Deleting KDUs."
4633 self
.logger
.debug(logging_text
+ stage
[1])
4634 # print(nsr_deployed)
4635 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4636 if not kdu
or not kdu
.get("kdu-instance"):
4638 kdu_instance
= kdu
.get("kdu-instance")
4639 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4640 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4641 vca_id
= self
.get_vca_id({}, db_nsr
)
4642 task_delete_kdu_instance
= asyncio
.ensure_future(
4643 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4644 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4645 kdu_instance
=kdu_instance
,
4647 namespace
=kdu
.get("namespace"),
4653 + "Unknown k8s deployment type {}".format(
4654 kdu
.get("k8scluster-type")
4659 task_delete_kdu_instance
4660 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4663 stage
[1] = "Deleting ns from VIM."
4665 task_delete_ro
= asyncio
.ensure_future(
4666 self
._terminate
_ng
_ro
(
4667 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4671 task_delete_ro
= asyncio
.ensure_future(
4673 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4676 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4678 # rest of staff will be done at finally
4681 ROclient
.ROClientException
,
4686 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4688 except asyncio
.CancelledError
:
4690 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4692 exc
= "Operation was cancelled"
4693 except Exception as e
:
4694 exc
= traceback
.format_exc()
4695 self
.logger
.critical(
4696 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4701 error_list
.append(str(exc
))
4703 # wait for pending tasks
4705 stage
[1] = "Waiting for terminate pending tasks."
4706 self
.logger
.debug(logging_text
+ stage
[1])
4707 error_list
+= await self
._wait
_for
_tasks
(
4710 timeout_ns_terminate
,
4714 stage
[1] = stage
[2] = ""
4715 except asyncio
.CancelledError
:
4716 error_list
.append("Cancelled")
4717 # TODO cancell all tasks
4718 except Exception as exc
:
4719 error_list
.append(str(exc
))
4720 # update status at database
4722 error_detail
= "; ".join(error_list
)
4723 # self.logger.error(logging_text + error_detail)
4724 error_description_nslcmop
= "{} Detail: {}".format(
4725 stage
[0], error_detail
4727 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4728 nslcmop_id
, stage
[0]
4731 db_nsr_update
["operational-status"] = "failed"
4732 db_nsr_update
["detailed-status"] = (
4733 error_description_nsr
+ " Detail: " + error_detail
4735 db_nslcmop_update
["detailed-status"] = error_detail
4736 nslcmop_operation_state
= "FAILED"
4740 error_description_nsr
= error_description_nslcmop
= None
4741 ns_state
= "NOT_INSTANTIATED"
4742 db_nsr_update
["operational-status"] = "terminated"
4743 db_nsr_update
["detailed-status"] = "Done"
4744 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4745 db_nslcmop_update
["detailed-status"] = "Done"
4746 nslcmop_operation_state
= "COMPLETED"
4749 self
._write
_ns
_status
(
4752 current_operation
="IDLE",
4753 current_operation_id
=None,
4754 error_description
=error_description_nsr
,
4755 error_detail
=error_detail
,
4756 other_update
=db_nsr_update
,
4758 self
._write
_op
_status
(
4761 error_message
=error_description_nslcmop
,
4762 operation_state
=nslcmop_operation_state
,
4763 other_update
=db_nslcmop_update
,
4765 if ns_state
== "NOT_INSTANTIATED":
4769 {"nsr-id-ref": nsr_id
},
4770 {"_admin.nsState": "NOT_INSTANTIATED"},
4772 except DbException
as e
:
4775 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4779 if operation_params
:
4780 autoremove
= operation_params
.get("autoremove", False)
4781 if nslcmop_operation_state
:
4783 await self
.msg
.aiowrite(
4788 "nslcmop_id": nslcmop_id
,
4789 "operationState": nslcmop_operation_state
,
4790 "autoremove": autoremove
,
4794 except Exception as e
:
4796 logging_text
+ "kafka_write notification Exception {}".format(e
)
4799 self
.logger
.debug(logging_text
+ "Exit")
4800 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4802 async def _wait_for_tasks(
4803 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4806 error_detail_list
= []
4808 pending_tasks
= list(created_tasks_info
.keys())
4809 num_tasks
= len(pending_tasks
)
4811 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4812 self
._write
_op
_status
(nslcmop_id
, stage
)
4813 while pending_tasks
:
4815 _timeout
= timeout
+ time_start
- time()
4816 done
, pending_tasks
= await asyncio
.wait(
4817 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4819 num_done
+= len(done
)
4820 if not done
: # Timeout
4821 for task
in pending_tasks
:
4822 new_error
= created_tasks_info
[task
] + ": Timeout"
4823 error_detail_list
.append(new_error
)
4824 error_list
.append(new_error
)
4827 if task
.cancelled():
4830 exc
= task
.exception()
4832 if isinstance(exc
, asyncio
.TimeoutError
):
4834 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4835 error_list
.append(created_tasks_info
[task
])
4836 error_detail_list
.append(new_error
)
4843 ROclient
.ROClientException
,
4849 self
.logger
.error(logging_text
+ new_error
)
4851 exc_traceback
= "".join(
4852 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4856 + created_tasks_info
[task
]
4862 logging_text
+ created_tasks_info
[task
] + ": Done"
4864 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4866 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4867 if nsr_id
: # update also nsr
4872 "errorDescription": "Error at: " + ", ".join(error_list
),
4873 "errorDetail": ". ".join(error_detail_list
),
4876 self
._write
_op
_status
(nslcmop_id
, stage
)
4877 return error_detail_list
4880 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4882 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4883 The default-value is used. If it is between < > it look for a value at instantiation_params
4884 :param primitive_desc: portion of VNFD/NSD that describes primitive
4885 :param params: Params provided by user
4886 :param instantiation_params: Instantiation params provided by user
4887 :return: a dictionary with the calculated params
4889 calculated_params
= {}
4890 for parameter
in primitive_desc
.get("parameter", ()):
4891 param_name
= parameter
["name"]
4892 if param_name
in params
:
4893 calculated_params
[param_name
] = params
[param_name
]
4894 elif "default-value" in parameter
or "value" in parameter
:
4895 if "value" in parameter
:
4896 calculated_params
[param_name
] = parameter
["value"]
4898 calculated_params
[param_name
] = parameter
["default-value"]
4900 isinstance(calculated_params
[param_name
], str)
4901 and calculated_params
[param_name
].startswith("<")
4902 and calculated_params
[param_name
].endswith(">")
4904 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4905 calculated_params
[param_name
] = instantiation_params
[
4906 calculated_params
[param_name
][1:-1]
4910 "Parameter {} needed to execute primitive {} not provided".format(
4911 calculated_params
[param_name
], primitive_desc
["name"]
4916 "Parameter {} needed to execute primitive {} not provided".format(
4917 param_name
, primitive_desc
["name"]
4921 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4922 calculated_params
[param_name
] = yaml
.safe_dump(
4923 calculated_params
[param_name
], default_flow_style
=True, width
=256
4925 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4927 ].startswith("!!yaml "):
4928 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4929 if parameter
.get("data-type") == "INTEGER":
4931 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4932 except ValueError: # error converting string to int
4934 "Parameter {} of primitive {} must be integer".format(
4935 param_name
, primitive_desc
["name"]
4938 elif parameter
.get("data-type") == "BOOLEAN":
4939 calculated_params
[param_name
] = not (
4940 (str(calculated_params
[param_name
])).lower() == "false"
4943 # add always ns_config_info if primitive name is config
4944 if primitive_desc
["name"] == "config":
4945 if "ns_config_info" in instantiation_params
:
4946 calculated_params
["ns_config_info"] = instantiation_params
[
4949 return calculated_params
4951 def _look_for_deployed_vca(
4958 ee_descriptor_id
=None,
4960 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4961 for vca
in deployed_vca
:
4964 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4967 vdu_count_index
is not None
4968 and vdu_count_index
!= vca
["vdu_count_index"]
4971 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4973 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4977 # vca_deployed not found
4979 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4980 " is not deployed".format(
4989 ee_id
= vca
.get("ee_id")
4991 "type", "lxc_proxy_charm"
4992 ) # default value for backward compatibility - proxy charm
4995 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4996 "execution environment".format(
4997 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5000 return ee_id
, vca_type
5002 async def _ns_execute_primitive(
5008 retries_interval
=30,
5015 if primitive
== "config":
5016 primitive_params
= {"params": primitive_params
}
5018 vca_type
= vca_type
or "lxc_proxy_charm"
5022 output
= await asyncio
.wait_for(
5023 self
.vca_map
[vca_type
].exec_primitive(
5025 primitive_name
=primitive
,
5026 params_dict
=primitive_params
,
5027 progress_timeout
=self
.timeout_progress_primitive
,
5028 total_timeout
=self
.timeout_primitive
,
5033 timeout
=timeout
or self
.timeout_primitive
,
5037 except asyncio
.CancelledError
:
5039 except Exception as e
:
5043 "Error executing action {} on {} -> {}".format(
5048 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5050 if isinstance(e
, asyncio
.TimeoutError
):
5052 message
="Timed out waiting for action to complete"
5054 return "FAILED", getattr(e
, "message", repr(e
))
5056 return "COMPLETED", output
5058 except (LcmException
, asyncio
.CancelledError
):
5060 except Exception as e
:
5061 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5063 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5065 Updating the vca_status with latest juju information in nsrs record
5066 :param: nsr_id: Id of the nsr
5067 :param: nslcmop_id: Id of the nslcmop
5071 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5072 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5073 vca_id
= self
.get_vca_id({}, db_nsr
)
5074 if db_nsr
["_admin"]["deployed"]["K8s"]:
5075 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5076 cluster_uuid
, kdu_instance
, cluster_type
= (
5077 k8s
["k8scluster-uuid"],
5078 k8s
["kdu-instance"],
5079 k8s
["k8scluster-type"],
5081 await self
._on
_update
_k
8s
_db
(
5082 cluster_uuid
=cluster_uuid
,
5083 kdu_instance
=kdu_instance
,
5084 filter={"_id": nsr_id
},
5086 cluster_type
=cluster_type
,
5089 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5090 table
, filter = "nsrs", {"_id": nsr_id
}
5091 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5092 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5094 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5095 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5097 async def action(self
, nsr_id
, nslcmop_id
):
5098 # Try to lock HA task here
5099 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5100 if not task_is_locked_by_me
:
5103 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5104 self
.logger
.debug(logging_text
+ "Enter")
5105 # get all needed from database
5109 db_nslcmop_update
= {}
5110 nslcmop_operation_state
= None
5111 error_description_nslcmop
= None
5114 # wait for any previous tasks in process
5115 step
= "Waiting for previous operations to terminate"
5116 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5118 self
._write
_ns
_status
(
5121 current_operation
="RUNNING ACTION",
5122 current_operation_id
=nslcmop_id
,
5125 step
= "Getting information from database"
5126 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5127 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5128 if db_nslcmop
["operationParams"].get("primitive_params"):
5129 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5130 db_nslcmop
["operationParams"]["primitive_params"]
5133 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5134 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5135 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5136 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5137 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5138 primitive
= db_nslcmop
["operationParams"]["primitive"]
5139 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5140 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5141 "timeout_ns_action", self
.timeout_primitive
5145 step
= "Getting vnfr from database"
5146 db_vnfr
= self
.db
.get_one(
5147 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5149 if db_vnfr
.get("kdur"):
5151 for kdur
in db_vnfr
["kdur"]:
5152 if kdur
.get("additionalParams"):
5153 kdur
["additionalParams"] = json
.loads(
5154 kdur
["additionalParams"]
5156 kdur_list
.append(kdur
)
5157 db_vnfr
["kdur"] = kdur_list
5158 step
= "Getting vnfd from database"
5159 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5161 # Sync filesystem before running a primitive
5162 self
.fs
.sync(db_vnfr
["vnfd-id"])
5164 step
= "Getting nsd from database"
5165 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5167 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5168 # for backward compatibility
5169 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5170 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5171 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5172 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5174 # look for primitive
5175 config_primitive_desc
= descriptor_configuration
= None
5177 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5179 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5181 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5183 descriptor_configuration
= db_nsd
.get("ns-configuration")
5185 if descriptor_configuration
and descriptor_configuration
.get(
5188 for config_primitive
in descriptor_configuration
["config-primitive"]:
5189 if config_primitive
["name"] == primitive
:
5190 config_primitive_desc
= config_primitive
5193 if not config_primitive_desc
:
5194 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5196 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5200 primitive_name
= primitive
5201 ee_descriptor_id
= None
5203 primitive_name
= config_primitive_desc
.get(
5204 "execution-environment-primitive", primitive
5206 ee_descriptor_id
= config_primitive_desc
.get(
5207 "execution-environment-ref"
5213 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5215 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5218 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5220 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5222 desc_params
= parse_yaml_strings(
5223 db_vnfr
.get("additionalParamsForVnf")
5226 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5227 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5228 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5230 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5231 actions
.add(primitive
["name"])
5232 for primitive
in kdu_configuration
.get("config-primitive", []):
5233 actions
.add(primitive
["name"])
5235 nsr_deployed
["K8s"],
5236 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5237 and kdu
["member-vnf-index"] == vnf_index
,
5241 if primitive_name
in actions
5242 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5246 # TODO check if ns is in a proper status
5248 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5250 # kdur and desc_params already set from before
5251 if primitive_params
:
5252 desc_params
.update(primitive_params
)
5253 # TODO Check if we will need something at vnf level
5254 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5256 kdu_name
== kdu
["kdu-name"]
5257 and kdu
["member-vnf-index"] == vnf_index
5262 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5265 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5266 msg
= "unknown k8scluster-type '{}'".format(
5267 kdu
.get("k8scluster-type")
5269 raise LcmException(msg
)
5272 "collection": "nsrs",
5273 "filter": {"_id": nsr_id
},
5274 "path": "_admin.deployed.K8s.{}".format(index
),
5278 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5280 step
= "Executing kdu {}".format(primitive_name
)
5281 if primitive_name
== "upgrade":
5282 if desc_params
.get("kdu_model"):
5283 kdu_model
= desc_params
.get("kdu_model")
5284 del desc_params
["kdu_model"]
5286 kdu_model
= kdu
.get("kdu-model")
5287 parts
= kdu_model
.split(sep
=":")
5289 kdu_model
= parts
[0]
5290 if desc_params
.get("kdu_atomic_upgrade"):
5291 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5292 del desc_params
["kdu_atomic_upgrade"]
5294 atomic_upgrade
= True
5296 detailed_status
= await asyncio
.wait_for(
5297 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5298 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5299 kdu_instance
=kdu
.get("kdu-instance"),
5300 atomic
=atomic_upgrade
,
5301 kdu_model
=kdu_model
,
5304 timeout
=timeout_ns_action
,
5306 timeout
=timeout_ns_action
+ 10,
5309 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5311 elif primitive_name
== "rollback":
5312 detailed_status
= await asyncio
.wait_for(
5313 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5314 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5315 kdu_instance
=kdu
.get("kdu-instance"),
5318 timeout
=timeout_ns_action
,
5320 elif primitive_name
== "status":
5321 detailed_status
= await asyncio
.wait_for(
5322 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5323 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5324 kdu_instance
=kdu
.get("kdu-instance"),
5327 timeout
=timeout_ns_action
,
5330 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5331 kdu
["kdu-name"], nsr_id
5333 params
= self
._map
_primitive
_params
(
5334 config_primitive_desc
, primitive_params
, desc_params
5337 detailed_status
= await asyncio
.wait_for(
5338 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5339 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5340 kdu_instance
=kdu_instance
,
5341 primitive_name
=primitive_name
,
5344 timeout
=timeout_ns_action
,
5347 timeout
=timeout_ns_action
,
5351 nslcmop_operation_state
= "COMPLETED"
5353 detailed_status
= ""
5354 nslcmop_operation_state
= "FAILED"
5356 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5357 nsr_deployed
["VCA"],
5358 member_vnf_index
=vnf_index
,
5360 vdu_count_index
=vdu_count_index
,
5361 ee_descriptor_id
=ee_descriptor_id
,
5363 for vca_index
, vca_deployed
in enumerate(
5364 db_nsr
["_admin"]["deployed"]["VCA"]
5366 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5368 "collection": "nsrs",
5369 "filter": {"_id": nsr_id
},
5370 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5374 nslcmop_operation_state
,
5376 ) = await self
._ns
_execute
_primitive
(
5378 primitive
=primitive_name
,
5379 primitive_params
=self
._map
_primitive
_params
(
5380 config_primitive_desc
, primitive_params
, desc_params
5382 timeout
=timeout_ns_action
,
5388 db_nslcmop_update
["detailed-status"] = detailed_status
5389 error_description_nslcmop
= (
5390 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5394 + "Done with result {} {}".format(
5395 nslcmop_operation_state
, detailed_status
5398 return # database update is called inside finally
5400 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5401 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5403 except asyncio
.CancelledError
:
5405 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5407 exc
= "Operation was cancelled"
5408 except asyncio
.TimeoutError
:
5409 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5411 except Exception as e
:
5412 exc
= traceback
.format_exc()
5413 self
.logger
.critical(
5414 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5423 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5424 nslcmop_operation_state
= "FAILED"
5426 self
._write
_ns
_status
(
5430 ], # TODO check if degraded. For the moment use previous status
5431 current_operation
="IDLE",
5432 current_operation_id
=None,
5433 # error_description=error_description_nsr,
5434 # error_detail=error_detail,
5435 other_update
=db_nsr_update
,
5438 self
._write
_op
_status
(
5441 error_message
=error_description_nslcmop
,
5442 operation_state
=nslcmop_operation_state
,
5443 other_update
=db_nslcmop_update
,
5446 if nslcmop_operation_state
:
5448 await self
.msg
.aiowrite(
5453 "nslcmop_id": nslcmop_id
,
5454 "operationState": nslcmop_operation_state
,
5458 except Exception as e
:
5460 logging_text
+ "kafka_write notification Exception {}".format(e
)
5462 self
.logger
.debug(logging_text
+ "Exit")
5463 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5464 return nslcmop_operation_state
, detailed_status
5466 async def terminate_vdus(
5467 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5469 """This method terminates VDUs
5472 db_vnfr: VNF instance record
5473 member_vnf_index: VNF index to identify the VDUs to be removed
5474 db_nsr: NS instance record
5475 update_db_nslcmops: Nslcmop update record
5477 vca_scaling_info
= []
5478 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5479 scaling_info
["scaling_direction"] = "IN"
5480 scaling_info
["vdu-delete"] = {}
5481 scaling_info
["kdu-delete"] = {}
5482 db_vdur
= db_vnfr
.get("vdur")
5483 vdur_list
= copy(db_vdur
)
5485 for index
, vdu
in enumerate(vdur_list
):
5486 vca_scaling_info
.append(
5488 "osm_vdu_id": vdu
["vdu-id-ref"],
5489 "member-vnf-index": member_vnf_index
,
5491 "vdu_index": count_index
,
5494 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5495 scaling_info
["vdu"].append(
5497 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5498 "vdu_id": vdu
["vdu-id-ref"],
5502 for interface
in vdu
["interfaces"]:
5503 scaling_info
["vdu"][index
]["interface"].append(
5505 "name": interface
["name"],
5506 "ip_address": interface
["ip-address"],
5507 "mac_address": interface
.get("mac-address"),
5510 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5511 stage
[2] = "Terminating VDUs"
5512 if scaling_info
.get("vdu-delete"):
5513 # scale_process = "RO"
5514 if self
.ro_config
.get("ng"):
5515 await self
._scale
_ng
_ro
(
5524 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5525 """This method is to Remove VNF instances from NS.
5528 nsr_id: NS instance id
5529 nslcmop_id: nslcmop id of update
5530 vnf_instance_id: id of the VNF instance to be removed
5533 result: (str, str) COMPLETED/FAILED, details
5537 logging_text
= "Task ns={} update ".format(nsr_id
)
5538 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5539 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5540 if check_vnfr_count
> 1:
5541 stage
= ["", "", ""]
5542 step
= "Getting nslcmop from database"
5544 step
+ " after having waited for previous tasks to be completed"
5546 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5547 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5548 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5549 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5550 """ db_vnfr = self.db.get_one(
5551 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5553 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5554 await self
.terminate_vdus(
5563 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5564 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5565 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5566 "constituent-vnfr-ref"
5568 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5569 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5570 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5571 return "COMPLETED", "Done"
5573 step
= "Terminate VNF Failed with"
5575 "{} Cannot terminate the last VNF in this NS.".format(
5579 except (LcmException
, asyncio
.CancelledError
):
5581 except Exception as e
:
5582 self
.logger
.debug("Error removing VNF {}".format(e
))
5583 return "FAILED", "Error removing VNF {}".format(e
)
5585 async def _ns_redeploy_vnf(
5593 """This method updates and redeploys VNF instances
5596 nsr_id: NS instance id
5597 nslcmop_id: nslcmop id
5598 db_vnfd: VNF descriptor
5599 db_vnfr: VNF instance record
5600 db_nsr: NS instance record
5603 result: (str, str) COMPLETED/FAILED, details
5607 stage
= ["", "", ""]
5608 logging_text
= "Task ns={} update ".format(nsr_id
)
5609 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5610 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5612 # Terminate old VNF resources
5613 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5614 await self
.terminate_vdus(
5623 # old_vnfd_id = db_vnfr["vnfd-id"]
5624 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5625 new_db_vnfd
= db_vnfd
5626 # new_vnfd_ref = new_db_vnfd["id"]
5627 # new_vnfd_id = vnfd_id
5631 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5633 "name": cp
.get("id"),
5634 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5635 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5638 new_vnfr_cp
.append(vnf_cp
)
5639 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5640 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5641 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5643 "revision": latest_vnfd_revision
,
5644 "connection-point": new_vnfr_cp
,
5648 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5649 updated_db_vnfr
= self
.db
.get_one(
5651 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5654 # Instantiate new VNF resources
5655 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5656 vca_scaling_info
= []
5657 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5658 scaling_info
["scaling_direction"] = "OUT"
5659 scaling_info
["vdu-create"] = {}
5660 scaling_info
["kdu-create"] = {}
5661 vdud_instantiate_list
= db_vnfd
["vdu"]
5662 for index
, vdud
in enumerate(vdud_instantiate_list
):
5663 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5665 additional_params
= (
5666 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5669 cloud_init_list
= []
5671 # TODO Information of its own ip is not available because db_vnfr is not updated.
5672 additional_params
["OSM"] = get_osm_params(
5673 updated_db_vnfr
, vdud
["id"], 1
5675 cloud_init_list
.append(
5676 self
._parse
_cloud
_init
(
5683 vca_scaling_info
.append(
5685 "osm_vdu_id": vdud
["id"],
5686 "member-vnf-index": member_vnf_index
,
5688 "vdu_index": count_index
,
5691 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5692 if self
.ro_config
.get("ng"):
5694 "New Resources to be deployed: {}".format(scaling_info
)
5696 await self
._scale
_ng
_ro
(
5704 return "COMPLETED", "Done"
5705 except (LcmException
, asyncio
.CancelledError
):
5707 except Exception as e
:
5708 self
.logger
.debug("Error updating VNF {}".format(e
))
5709 return "FAILED", "Error updating VNF {}".format(e
)
5711 async def _ns_charm_upgrade(
5717 timeout
: float = None,
5719 """This method upgrade charms in VNF instances
5722 ee_id: Execution environment id
5723 path: Local path to the charm
5725 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5726 timeout: (Float) Timeout for the ns update operation
5729 result: (str, str) COMPLETED/FAILED, details
5732 charm_type
= charm_type
or "lxc_proxy_charm"
5733 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5737 charm_type
=charm_type
,
5738 timeout
=timeout
or self
.timeout_ns_update
,
5742 return "COMPLETED", output
5744 except (LcmException
, asyncio
.CancelledError
):
5747 except Exception as e
:
5749 self
.logger
.debug("Error upgrading charm {}".format(path
))
5751 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5753 async def update(self
, nsr_id
, nslcmop_id
):
5754 """Update NS according to different update types
5756 This method performs upgrade of VNF instances then updates the revision
5757 number in VNF record
5760 nsr_id: Network service will be updated
5761 nslcmop_id: ns lcm operation id
5764 It may raise DbException, LcmException, N2VCException, K8sException
5767 # Try to lock HA task here
5768 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5769 if not task_is_locked_by_me
:
5772 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5773 self
.logger
.debug(logging_text
+ "Enter")
5775 # Set the required variables to be filled up later
5777 db_nslcmop_update
= {}
5779 nslcmop_operation_state
= None
5781 error_description_nslcmop
= ""
5783 change_type
= "updated"
5784 detailed_status
= ""
5787 # wait for any previous tasks in process
5788 step
= "Waiting for previous operations to terminate"
5789 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5790 self
._write
_ns
_status
(
5793 current_operation
="UPDATING",
5794 current_operation_id
=nslcmop_id
,
5797 step
= "Getting nslcmop from database"
5798 db_nslcmop
= self
.db
.get_one(
5799 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5801 update_type
= db_nslcmop
["operationParams"]["updateType"]
5803 step
= "Getting nsr from database"
5804 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5805 old_operational_status
= db_nsr
["operational-status"]
5806 db_nsr_update
["operational-status"] = "updating"
5807 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5808 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5810 if update_type
== "CHANGE_VNFPKG":
5812 # Get the input parameters given through update request
5813 vnf_instance_id
= db_nslcmop
["operationParams"][
5814 "changeVnfPackageData"
5815 ].get("vnfInstanceId")
5817 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5820 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5822 step
= "Getting vnfr from database"
5823 db_vnfr
= self
.db
.get_one(
5824 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5827 step
= "Getting vnfds from database"
5829 latest_vnfd
= self
.db
.get_one(
5830 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5832 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5835 current_vnf_revision
= db_vnfr
.get("revision", 1)
5836 current_vnfd
= self
.db
.get_one(
5838 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5839 fail_on_empty
=False,
5841 # Charm artifact paths will be filled up later
5843 current_charm_artifact_path
,
5844 target_charm_artifact_path
,
5845 charm_artifact_paths
,
5847 ) = ([], [], [], [])
5849 step
= "Checking if revision has changed in VNFD"
5850 if current_vnf_revision
!= latest_vnfd_revision
:
5852 change_type
= "policy_updated"
5854 # There is new revision of VNFD, update operation is required
5855 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5856 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5858 step
= "Removing the VNFD packages if they exist in the local path"
5859 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5860 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5862 step
= "Get the VNFD packages from FSMongo"
5863 self
.fs
.sync(from_path
=latest_vnfd_path
)
5864 self
.fs
.sync(from_path
=current_vnfd_path
)
5867 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5869 current_base_folder
= current_vnfd
["_admin"]["storage"]
5870 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5872 for vca_index
, vca_deployed
in enumerate(
5873 get_iterable(nsr_deployed
, "VCA")
5875 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5877 # Getting charm-id and charm-type
5878 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5879 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5880 vca_type
= vca_deployed
.get("type")
5881 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5884 ee_id
= vca_deployed
.get("ee_id")
5886 step
= "Getting descriptor config"
5887 descriptor_config
= get_configuration(
5888 current_vnfd
, current_vnfd
["id"]
5891 if "execution-environment-list" in descriptor_config
:
5892 ee_list
= descriptor_config
.get(
5893 "execution-environment-list", []
5898 # There could be several charm used in the same VNF
5899 for ee_item
in ee_list
:
5900 if ee_item
.get("juju"):
5902 step
= "Getting charm name"
5903 charm_name
= ee_item
["juju"].get("charm")
5905 step
= "Setting Charm artifact paths"
5906 current_charm_artifact_path
.append(
5907 get_charm_artifact_path(
5908 current_base_folder
,
5911 current_vnf_revision
,
5914 target_charm_artifact_path
.append(
5915 get_charm_artifact_path(
5919 latest_vnfd_revision
,
5922 elif ee_item
.get("helm-chart"):
5923 # add chart to list and all parameters
5924 step
= "Getting helm chart name"
5925 chart_name
= ee_item
.get("helm-chart")
5926 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
5929 vca_type
= "helm-v3"
5930 step
= "Setting Helm chart artifact paths"
5932 helm_artifacts
.append({
5933 "current_artifact_path": get_charm_artifact_path(
5934 current_base_folder
,
5937 current_vnf_revision
,
5939 "target_artifact_path": get_charm_artifact_path(
5943 latest_vnfd_revision
,
5946 "vca_index": vca_index
,
5947 "vdu_index": vdu_count_index
,
5950 charm_artifact_paths
= zip(
5951 current_charm_artifact_path
, target_charm_artifact_path
5954 step
= "Checking if software version has changed in VNFD"
5955 if find_software_version(current_vnfd
) != find_software_version(
5959 step
= "Checking if existing VNF has charm"
5960 for current_charm_path
, target_charm_path
in list(
5961 charm_artifact_paths
5963 if current_charm_path
:
5965 "Software version change is not supported as VNF instance {} has charm.".format(
5970 # There is no change in the charm package, then redeploy the VNF
5971 # based on new descriptor
5972 step
= "Redeploying VNF"
5973 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5974 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5975 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5977 if result
== "FAILED":
5978 nslcmop_operation_state
= result
5979 error_description_nslcmop
= detailed_status
5980 db_nslcmop_update
["detailed-status"] = detailed_status
5983 + " step {} Done with result {} {}".format(
5984 step
, nslcmop_operation_state
, detailed_status
5989 step
= "Checking if any charm package has changed or not"
5990 for current_charm_path
, target_charm_path
in list(
5991 charm_artifact_paths
5995 and target_charm_path
5996 and self
.check_charm_hash_changed(
5997 current_charm_path
, target_charm_path
6001 step
= "Checking whether VNF uses juju bundle"
6002 if check_juju_bundle_existence(current_vnfd
):
6005 "Charm upgrade is not supported for the instance which"
6006 " uses juju-bundle: {}".format(
6007 check_juju_bundle_existence(current_vnfd
)
6011 step
= "Upgrading Charm"
6015 ) = await self
._ns
_charm
_upgrade
(
6018 charm_type
=vca_type
,
6019 path
=self
.fs
.path
+ target_charm_path
,
6020 timeout
=timeout_seconds
,
6023 if result
== "FAILED":
6024 nslcmop_operation_state
= result
6025 error_description_nslcmop
= detailed_status
6027 db_nslcmop_update
["detailed-status"] = detailed_status
6030 + " step {} Done with result {} {}".format(
6031 step
, nslcmop_operation_state
, detailed_status
6035 step
= "Updating policies"
6036 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6037 result
= "COMPLETED"
6038 detailed_status
= "Done"
6039 db_nslcmop_update
["detailed-status"] = "Done"
6042 for item
in helm_artifacts
:
6045 item
["current_artifact_path"]
6046 and item
["target_artifact_path"]
6047 and self
.check_charm_hash_changed(
6048 item
["current_artifact_path"], item
["target_artifact_path"]
6053 db_update_entry
= "_admin.deployed.VCA.{}.".format(item
["vca_index"])
6054 vnfr_id
= db_vnfr
["_id"]
6055 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6057 "collection": "nsrs",
6058 "filter": {"_id": nsr_id
},
6059 "path": db_update_entry
,
6061 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6064 ].upgrade_execution_environment(
6065 namespace
=namespace
,
6069 artifact_path
=item
["target_artifact_path"],
6072 vnf_id
= db_vnfr
.get("vnfd-ref")
6073 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6074 self
.logger
.debug("get ssh key block")
6077 config_descriptor
, ("config-access", "ssh-access", "required")
6079 # Needed to inject a ssh key
6082 ("config-access", "ssh-access", "default-user"),
6084 step
= "Install configuration Software, getting public ssh key"
6085 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
6086 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6089 step
= "Insert public key into VM user={} ssh_key={}".format(
6092 self
.logger
.debug(logging_text
+ step
)
6094 # wait for RO (ip-address) Insert pub_key into VM
6095 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6105 initial_config_primitive_list
= config_descriptor
.get(
6106 "initial-config-primitive"
6108 config_primitive
= next(
6109 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
6112 if not config_primitive
:
6115 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6117 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6118 if db_vnfr
.get("additionalParamsForVnf"):
6119 deploy_params
.update(
6120 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
6122 primitive_params_
= self
._map
_primitive
_params
(
6123 config_primitive
, {}, deploy_params
6126 step
= "execute primitive '{}' params '{}'".format(
6127 config_primitive
["name"], primitive_params_
6129 self
.logger
.debug(logging_text
+ step
)
6130 await self
.vca_map
[vca_type
].exec_primitive(
6132 primitive_name
=config_primitive
["name"],
6133 params_dict
=primitive_params_
,
6139 step
= "Updating policies"
6140 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6141 detailed_status
= "Done"
6142 db_nslcmop_update
["detailed-status"] = "Done"
6144 # If nslcmop_operation_state is None, so any operation is not failed.
6145 if not nslcmop_operation_state
:
6146 nslcmop_operation_state
= "COMPLETED"
6148 # If update CHANGE_VNFPKG nslcmop_operation is successful
6149 # vnf revision need to be updated
6150 vnfr_update
["revision"] = latest_vnfd_revision
6151 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6155 + " task Done with result {} {}".format(
6156 nslcmop_operation_state
, detailed_status
6159 elif update_type
== "REMOVE_VNF":
6160 # This part is included in https://osm.etsi.org/gerrit/11876
6161 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6162 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6163 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6164 step
= "Removing VNF"
6165 (result
, detailed_status
) = await self
.remove_vnf(
6166 nsr_id
, nslcmop_id
, vnf_instance_id
6168 if result
== "FAILED":
6169 nslcmop_operation_state
= result
6170 error_description_nslcmop
= detailed_status
6171 db_nslcmop_update
["detailed-status"] = detailed_status
6172 change_type
= "vnf_terminated"
6173 if not nslcmop_operation_state
:
6174 nslcmop_operation_state
= "COMPLETED"
6177 + " task Done with result {} {}".format(
6178 nslcmop_operation_state
, detailed_status
6182 elif update_type
== "OPERATE_VNF":
6183 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6186 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6189 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6192 (result
, detailed_status
) = await self
.rebuild_start_stop(
6193 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6195 if result
== "FAILED":
6196 nslcmop_operation_state
= result
6197 error_description_nslcmop
= detailed_status
6198 db_nslcmop_update
["detailed-status"] = detailed_status
6199 if not nslcmop_operation_state
:
6200 nslcmop_operation_state
= "COMPLETED"
6203 + " task Done with result {} {}".format(
6204 nslcmop_operation_state
, detailed_status
6208 # If nslcmop_operation_state is None, so any operation is not failed.
6209 # All operations are executed in overall.
6210 if not nslcmop_operation_state
:
6211 nslcmop_operation_state
= "COMPLETED"
6212 db_nsr_update
["operational-status"] = old_operational_status
6214 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6215 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6217 except asyncio
.CancelledError
:
6219 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6221 exc
= "Operation was cancelled"
6222 except asyncio
.TimeoutError
:
6223 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6225 except Exception as e
:
6226 exc
= traceback
.format_exc()
6227 self
.logger
.critical(
6228 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6237 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6238 nslcmop_operation_state
= "FAILED"
6239 db_nsr_update
["operational-status"] = old_operational_status
6241 self
._write
_ns
_status
(
6243 ns_state
=db_nsr
["nsState"],
6244 current_operation
="IDLE",
6245 current_operation_id
=None,
6246 other_update
=db_nsr_update
,
6249 self
._write
_op
_status
(
6252 error_message
=error_description_nslcmop
,
6253 operation_state
=nslcmop_operation_state
,
6254 other_update
=db_nslcmop_update
,
6257 if nslcmop_operation_state
:
6261 "nslcmop_id": nslcmop_id
,
6262 "operationState": nslcmop_operation_state
,
6264 if change_type
in ("vnf_terminated", "policy_updated"):
6265 msg
.update({"vnf_member_index": member_vnf_index
})
6266 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6267 except Exception as e
:
6269 logging_text
+ "kafka_write notification Exception {}".format(e
)
6271 self
.logger
.debug(logging_text
+ "Exit")
6272 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6273 return nslcmop_operation_state
, detailed_status
6275 async def scale(self
, nsr_id
, nslcmop_id
):
6276 # Try to lock HA task here
6277 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6278 if not task_is_locked_by_me
:
6281 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6282 stage
= ["", "", ""]
6283 tasks_dict_info
= {}
6284 # ^ stage, step, VIM progress
6285 self
.logger
.debug(logging_text
+ "Enter")
6286 # get all needed from database
6288 db_nslcmop_update
= {}
6291 # in case of error, indicates what part of scale was failed to put nsr at error status
6292 scale_process
= None
6293 old_operational_status
= ""
6294 old_config_status
= ""
6297 # wait for any previous tasks in process
6298 step
= "Waiting for previous operations to terminate"
6299 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6300 self
._write
_ns
_status
(
6303 current_operation
="SCALING",
6304 current_operation_id
=nslcmop_id
,
6307 step
= "Getting nslcmop from database"
6309 step
+ " after having waited for previous tasks to be completed"
6311 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6313 step
= "Getting nsr from database"
6314 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6315 old_operational_status
= db_nsr
["operational-status"]
6316 old_config_status
= db_nsr
["config-status"]
6318 step
= "Parsing scaling parameters"
6319 db_nsr_update
["operational-status"] = "scaling"
6320 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6321 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6323 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6325 ]["member-vnf-index"]
6326 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6328 ]["scaling-group-descriptor"]
6329 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6330 # for backward compatibility
6331 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6332 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6333 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6336 step
= "Getting vnfr from database"
6337 db_vnfr
= self
.db
.get_one(
6338 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6341 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6343 step
= "Getting vnfd from database"
6344 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6346 base_folder
= db_vnfd
["_admin"]["storage"]
6348 step
= "Getting scaling-group-descriptor"
6349 scaling_descriptor
= find_in_list(
6350 get_scaling_aspect(db_vnfd
),
6351 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6353 if not scaling_descriptor
:
6355 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6356 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6359 step
= "Sending scale order to VIM"
6360 # TODO check if ns is in a proper status
6362 if not db_nsr
["_admin"].get("scaling-group"):
6367 "_admin.scaling-group": [
6368 {"name": scaling_group
, "nb-scale-op": 0}
6372 admin_scale_index
= 0
6374 for admin_scale_index
, admin_scale_info
in enumerate(
6375 db_nsr
["_admin"]["scaling-group"]
6377 if admin_scale_info
["name"] == scaling_group
:
6378 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6380 else: # not found, set index one plus last element and add new entry with the name
6381 admin_scale_index
+= 1
6383 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6386 vca_scaling_info
= []
6387 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6388 if scaling_type
== "SCALE_OUT":
6389 if "aspect-delta-details" not in scaling_descriptor
:
6391 "Aspect delta details not fount in scaling descriptor {}".format(
6392 scaling_descriptor
["name"]
6395 # count if max-instance-count is reached
6396 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6398 scaling_info
["scaling_direction"] = "OUT"
6399 scaling_info
["vdu-create"] = {}
6400 scaling_info
["kdu-create"] = {}
6401 for delta
in deltas
:
6402 for vdu_delta
in delta
.get("vdu-delta", {}):
6403 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6404 # vdu_index also provides the number of instance of the targeted vdu
6405 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6406 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6410 additional_params
= (
6411 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6414 cloud_init_list
= []
6416 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6417 max_instance_count
= 10
6418 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6419 max_instance_count
= vdu_profile
.get(
6420 "max-number-of-instances", 10
6423 default_instance_num
= get_number_of_instances(
6426 instances_number
= vdu_delta
.get("number-of-instances", 1)
6427 nb_scale_op
+= instances_number
6429 new_instance_count
= nb_scale_op
+ default_instance_num
6430 # Control if new count is over max and vdu count is less than max.
6431 # Then assign new instance count
6432 if new_instance_count
> max_instance_count
> vdu_count
:
6433 instances_number
= new_instance_count
- max_instance_count
6435 instances_number
= instances_number
6437 if new_instance_count
> max_instance_count
:
6439 "reached the limit of {} (max-instance-count) "
6440 "scaling-out operations for the "
6441 "scaling-group-descriptor '{}'".format(
6442 nb_scale_op
, scaling_group
6445 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6447 # TODO Information of its own ip is not available because db_vnfr is not updated.
6448 additional_params
["OSM"] = get_osm_params(
6449 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6451 cloud_init_list
.append(
6452 self
._parse
_cloud
_init
(
6459 vca_scaling_info
.append(
6461 "osm_vdu_id": vdu_delta
["id"],
6462 "member-vnf-index": vnf_index
,
6464 "vdu_index": vdu_index
+ x
,
6467 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6468 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6469 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6470 kdu_name
= kdu_profile
["kdu-name"]
6471 resource_name
= kdu_profile
.get("resource-name", "")
6473 # Might have different kdus in the same delta
6474 # Should have list for each kdu
6475 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6476 scaling_info
["kdu-create"][kdu_name
] = []
6478 kdur
= get_kdur(db_vnfr
, kdu_name
)
6479 if kdur
.get("helm-chart"):
6480 k8s_cluster_type
= "helm-chart-v3"
6481 self
.logger
.debug("kdur: {}".format(kdur
))
6483 kdur
.get("helm-version")
6484 and kdur
.get("helm-version") == "v2"
6486 k8s_cluster_type
= "helm-chart"
6487 elif kdur
.get("juju-bundle"):
6488 k8s_cluster_type
= "juju-bundle"
6491 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6492 "juju-bundle. Maybe an old NBI version is running".format(
6493 db_vnfr
["member-vnf-index-ref"], kdu_name
6497 max_instance_count
= 10
6498 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6499 max_instance_count
= kdu_profile
.get(
6500 "max-number-of-instances", 10
6503 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6504 deployed_kdu
, _
= get_deployed_kdu(
6505 nsr_deployed
, kdu_name
, vnf_index
6507 if deployed_kdu
is None:
6509 "KDU '{}' for vnf '{}' not deployed".format(
6513 kdu_instance
= deployed_kdu
.get("kdu-instance")
6514 instance_num
= await self
.k8scluster_map
[
6520 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6521 kdu_model
=deployed_kdu
.get("kdu-model"),
6523 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6524 "number-of-instances", 1
6527 # Control if new count is over max and instance_num is less than max.
6528 # Then assign max instance number to kdu replica count
6529 if kdu_replica_count
> max_instance_count
> instance_num
:
6530 kdu_replica_count
= max_instance_count
6531 if kdu_replica_count
> max_instance_count
:
6533 "reached the limit of {} (max-instance-count) "
6534 "scaling-out operations for the "
6535 "scaling-group-descriptor '{}'".format(
6536 instance_num
, scaling_group
6540 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6541 vca_scaling_info
.append(
6543 "osm_kdu_id": kdu_name
,
6544 "member-vnf-index": vnf_index
,
6546 "kdu_index": instance_num
+ x
- 1,
6549 scaling_info
["kdu-create"][kdu_name
].append(
6551 "member-vnf-index": vnf_index
,
6553 "k8s-cluster-type": k8s_cluster_type
,
6554 "resource-name": resource_name
,
6555 "scale": kdu_replica_count
,
6558 elif scaling_type
== "SCALE_IN":
6559 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6561 scaling_info
["scaling_direction"] = "IN"
6562 scaling_info
["vdu-delete"] = {}
6563 scaling_info
["kdu-delete"] = {}
6565 for delta
in deltas
:
6566 for vdu_delta
in delta
.get("vdu-delta", {}):
6567 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6568 min_instance_count
= 0
6569 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6570 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6571 min_instance_count
= vdu_profile
["min-number-of-instances"]
6573 default_instance_num
= get_number_of_instances(
6574 db_vnfd
, vdu_delta
["id"]
6576 instance_num
= vdu_delta
.get("number-of-instances", 1)
6577 nb_scale_op
-= instance_num
6579 new_instance_count
= nb_scale_op
+ default_instance_num
6581 if new_instance_count
< min_instance_count
< vdu_count
:
6582 instances_number
= min_instance_count
- new_instance_count
6584 instances_number
= instance_num
6586 if new_instance_count
< min_instance_count
:
6588 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6589 "scaling-group-descriptor '{}'".format(
6590 nb_scale_op
, scaling_group
6593 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6594 vca_scaling_info
.append(
6596 "osm_vdu_id": vdu_delta
["id"],
6597 "member-vnf-index": vnf_index
,
6599 "vdu_index": vdu_index
- 1 - x
,
6602 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6603 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6604 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6605 kdu_name
= kdu_profile
["kdu-name"]
6606 resource_name
= kdu_profile
.get("resource-name", "")
6608 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6609 scaling_info
["kdu-delete"][kdu_name
] = []
6611 kdur
= get_kdur(db_vnfr
, kdu_name
)
6612 if kdur
.get("helm-chart"):
6613 k8s_cluster_type
= "helm-chart-v3"
6614 self
.logger
.debug("kdur: {}".format(kdur
))
6616 kdur
.get("helm-version")
6617 and kdur
.get("helm-version") == "v2"
6619 k8s_cluster_type
= "helm-chart"
6620 elif kdur
.get("juju-bundle"):
6621 k8s_cluster_type
= "juju-bundle"
6624 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6625 "juju-bundle. Maybe an old NBI version is running".format(
6626 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6630 min_instance_count
= 0
6631 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6632 min_instance_count
= kdu_profile
["min-number-of-instances"]
6634 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6635 deployed_kdu
, _
= get_deployed_kdu(
6636 nsr_deployed
, kdu_name
, vnf_index
6638 if deployed_kdu
is None:
6640 "KDU '{}' for vnf '{}' not deployed".format(
6644 kdu_instance
= deployed_kdu
.get("kdu-instance")
6645 instance_num
= await self
.k8scluster_map
[
6651 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6652 kdu_model
=deployed_kdu
.get("kdu-model"),
6654 kdu_replica_count
= instance_num
- kdu_delta
.get(
6655 "number-of-instances", 1
6658 if kdu_replica_count
< min_instance_count
< instance_num
:
6659 kdu_replica_count
= min_instance_count
6660 if kdu_replica_count
< min_instance_count
:
6662 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6663 "scaling-group-descriptor '{}'".format(
6664 instance_num
, scaling_group
6668 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6669 vca_scaling_info
.append(
6671 "osm_kdu_id": kdu_name
,
6672 "member-vnf-index": vnf_index
,
6674 "kdu_index": instance_num
- x
- 1,
6677 scaling_info
["kdu-delete"][kdu_name
].append(
6679 "member-vnf-index": vnf_index
,
6681 "k8s-cluster-type": k8s_cluster_type
,
6682 "resource-name": resource_name
,
6683 "scale": kdu_replica_count
,
6687 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6688 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6689 if scaling_info
["scaling_direction"] == "IN":
6690 for vdur
in reversed(db_vnfr
["vdur"]):
6691 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6692 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6693 scaling_info
["vdu"].append(
6695 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6696 "vdu_id": vdur
["vdu-id-ref"],
6700 for interface
in vdur
["interfaces"]:
6701 scaling_info
["vdu"][-1]["interface"].append(
6703 "name": interface
["name"],
6704 "ip_address": interface
["ip-address"],
6705 "mac_address": interface
.get("mac-address"),
6708 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6711 step
= "Executing pre-scale vnf-config-primitive"
6712 if scaling_descriptor
.get("scaling-config-action"):
6713 for scaling_config_action
in scaling_descriptor
[
6714 "scaling-config-action"
6717 scaling_config_action
.get("trigger") == "pre-scale-in"
6718 and scaling_type
== "SCALE_IN"
6720 scaling_config_action
.get("trigger") == "pre-scale-out"
6721 and scaling_type
== "SCALE_OUT"
6723 vnf_config_primitive
= scaling_config_action
[
6724 "vnf-config-primitive-name-ref"
6726 step
= db_nslcmop_update
[
6728 ] = "executing pre-scale scaling-config-action '{}'".format(
6729 vnf_config_primitive
6732 # look for primitive
6733 for config_primitive
in (
6734 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6735 ).get("config-primitive", ()):
6736 if config_primitive
["name"] == vnf_config_primitive
:
6740 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6741 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6742 "primitive".format(scaling_group
, vnf_config_primitive
)
6745 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6746 if db_vnfr
.get("additionalParamsForVnf"):
6747 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6749 scale_process
= "VCA"
6750 db_nsr_update
["config-status"] = "configuring pre-scaling"
6751 primitive_params
= self
._map
_primitive
_params
(
6752 config_primitive
, {}, vnfr_params
6755 # Pre-scale retry check: Check if this sub-operation has been executed before
6756 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6759 vnf_config_primitive
,
6763 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6764 # Skip sub-operation
6765 result
= "COMPLETED"
6766 result_detail
= "Done"
6769 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6770 vnf_config_primitive
, result
, result_detail
6774 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6775 # New sub-operation: Get index of this sub-operation
6777 len(db_nslcmop
.get("_admin", {}).get("operations"))
6782 + "vnf_config_primitive={} New sub-operation".format(
6783 vnf_config_primitive
6787 # retry: Get registered params for this existing sub-operation
6788 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6791 vnf_index
= op
.get("member_vnf_index")
6792 vnf_config_primitive
= op
.get("primitive")
6793 primitive_params
= op
.get("primitive_params")
6796 + "vnf_config_primitive={} Sub-operation retry".format(
6797 vnf_config_primitive
6800 # Execute the primitive, either with new (first-time) or registered (reintent) args
6801 ee_descriptor_id
= config_primitive
.get(
6802 "execution-environment-ref"
6804 primitive_name
= config_primitive
.get(
6805 "execution-environment-primitive", vnf_config_primitive
6807 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6808 nsr_deployed
["VCA"],
6809 member_vnf_index
=vnf_index
,
6811 vdu_count_index
=None,
6812 ee_descriptor_id
=ee_descriptor_id
,
6814 result
, result_detail
= await self
._ns
_execute
_primitive
(
6823 + "vnf_config_primitive={} Done with result {} {}".format(
6824 vnf_config_primitive
, result
, result_detail
6827 # Update operationState = COMPLETED | FAILED
6828 self
._update
_suboperation
_status
(
6829 db_nslcmop
, op_index
, result
, result_detail
6832 if result
== "FAILED":
6833 raise LcmException(result_detail
)
6834 db_nsr_update
["config-status"] = old_config_status
6835 scale_process
= None
6839 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6842 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6845 # SCALE-IN VCA - BEGIN
6846 if vca_scaling_info
:
6847 step
= db_nslcmop_update
[
6849 ] = "Deleting the execution environments"
6850 scale_process
= "VCA"
6851 for vca_info
in vca_scaling_info
:
6852 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6853 member_vnf_index
= str(vca_info
["member-vnf-index"])
6855 logging_text
+ "vdu info: {}".format(vca_info
)
6857 if vca_info
.get("osm_vdu_id"):
6858 vdu_id
= vca_info
["osm_vdu_id"]
6859 vdu_index
= int(vca_info
["vdu_index"])
6862 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6863 member_vnf_index
, vdu_id
, vdu_index
6865 stage
[2] = step
= "Scaling in VCA"
6866 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6867 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6868 config_update
= db_nsr
["configurationStatus"]
6869 for vca_index
, vca
in enumerate(vca_update
):
6871 (vca
or vca
.get("ee_id"))
6872 and vca
["member-vnf-index"] == member_vnf_index
6873 and vca
["vdu_count_index"] == vdu_index
6875 if vca
.get("vdu_id"):
6876 config_descriptor
= get_configuration(
6877 db_vnfd
, vca
.get("vdu_id")
6879 elif vca
.get("kdu_name"):
6880 config_descriptor
= get_configuration(
6881 db_vnfd
, vca
.get("kdu_name")
6884 config_descriptor
= get_configuration(
6885 db_vnfd
, db_vnfd
["id"]
6887 operation_params
= (
6888 db_nslcmop
.get("operationParams") or {}
6890 exec_terminate_primitives
= not operation_params
.get(
6891 "skip_terminate_primitives"
6892 ) and vca
.get("needed_terminate")
6893 task
= asyncio
.ensure_future(
6902 exec_primitives
=exec_terminate_primitives
,
6906 timeout
=self
.timeout_charm_delete
,
6909 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6912 del vca_update
[vca_index
]
6913 del config_update
[vca_index
]
6914 # wait for pending tasks of terminate primitives
6918 + "Waiting for tasks {}".format(
6919 list(tasks_dict_info
.keys())
6922 error_list
= await self
._wait
_for
_tasks
(
6926 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6931 tasks_dict_info
.clear()
6933 raise LcmException("; ".join(error_list
))
6935 db_vca_and_config_update
= {
6936 "_admin.deployed.VCA": vca_update
,
6937 "configurationStatus": config_update
,
6940 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6942 scale_process
= None
6943 # SCALE-IN VCA - END
6946 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6947 scale_process
= "RO"
6948 if self
.ro_config
.get("ng"):
6949 await self
._scale
_ng
_ro
(
6950 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6952 scaling_info
.pop("vdu-create", None)
6953 scaling_info
.pop("vdu-delete", None)
6955 scale_process
= None
6959 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6960 scale_process
= "KDU"
6961 await self
._scale
_kdu
(
6962 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6964 scaling_info
.pop("kdu-create", None)
6965 scaling_info
.pop("kdu-delete", None)
6967 scale_process
= None
6971 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6973 # SCALE-UP VCA - BEGIN
6974 if vca_scaling_info
:
6975 step
= db_nslcmop_update
[
6977 ] = "Creating new execution environments"
6978 scale_process
= "VCA"
6979 for vca_info
in vca_scaling_info
:
6980 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6981 member_vnf_index
= str(vca_info
["member-vnf-index"])
6983 logging_text
+ "vdu info: {}".format(vca_info
)
6985 vnfd_id
= db_vnfr
["vnfd-ref"]
6986 if vca_info
.get("osm_vdu_id"):
6987 vdu_index
= int(vca_info
["vdu_index"])
6988 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6989 if db_vnfr
.get("additionalParamsForVnf"):
6990 deploy_params
.update(
6992 db_vnfr
["additionalParamsForVnf"].copy()
6995 descriptor_config
= get_configuration(
6996 db_vnfd
, db_vnfd
["id"]
6998 if descriptor_config
:
7003 logging_text
=logging_text
7004 + "member_vnf_index={} ".format(member_vnf_index
),
7007 nslcmop_id
=nslcmop_id
,
7013 member_vnf_index
=member_vnf_index
,
7014 vdu_index
=vdu_index
,
7016 deploy_params
=deploy_params
,
7017 descriptor_config
=descriptor_config
,
7018 base_folder
=base_folder
,
7019 task_instantiation_info
=tasks_dict_info
,
7022 vdu_id
= vca_info
["osm_vdu_id"]
7023 vdur
= find_in_list(
7024 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7026 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7027 if vdur
.get("additionalParams"):
7028 deploy_params_vdu
= parse_yaml_strings(
7029 vdur
["additionalParams"]
7032 deploy_params_vdu
= deploy_params
7033 deploy_params_vdu
["OSM"] = get_osm_params(
7034 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7036 if descriptor_config
:
7041 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7042 member_vnf_index
, vdu_id
, vdu_index
7044 stage
[2] = step
= "Scaling out VCA"
7045 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7047 logging_text
=logging_text
7048 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7049 member_vnf_index
, vdu_id
, vdu_index
7053 nslcmop_id
=nslcmop_id
,
7059 member_vnf_index
=member_vnf_index
,
7060 vdu_index
=vdu_index
,
7062 deploy_params
=deploy_params_vdu
,
7063 descriptor_config
=descriptor_config
,
7064 base_folder
=base_folder
,
7065 task_instantiation_info
=tasks_dict_info
,
7068 # SCALE-UP VCA - END
7069 scale_process
= None
7072 # execute primitive service POST-SCALING
7073 step
= "Executing post-scale vnf-config-primitive"
7074 if scaling_descriptor
.get("scaling-config-action"):
7075 for scaling_config_action
in scaling_descriptor
[
7076 "scaling-config-action"
7079 scaling_config_action
.get("trigger") == "post-scale-in"
7080 and scaling_type
== "SCALE_IN"
7082 scaling_config_action
.get("trigger") == "post-scale-out"
7083 and scaling_type
== "SCALE_OUT"
7085 vnf_config_primitive
= scaling_config_action
[
7086 "vnf-config-primitive-name-ref"
7088 step
= db_nslcmop_update
[
7090 ] = "executing post-scale scaling-config-action '{}'".format(
7091 vnf_config_primitive
7094 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7095 if db_vnfr
.get("additionalParamsForVnf"):
7096 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7098 # look for primitive
7099 for config_primitive
in (
7100 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7101 ).get("config-primitive", ()):
7102 if config_primitive
["name"] == vnf_config_primitive
:
7106 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7107 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7108 "config-primitive".format(
7109 scaling_group
, vnf_config_primitive
7112 scale_process
= "VCA"
7113 db_nsr_update
["config-status"] = "configuring post-scaling"
7114 primitive_params
= self
._map
_primitive
_params
(
7115 config_primitive
, {}, vnfr_params
7118 # Post-scale retry check: Check if this sub-operation has been executed before
7119 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7122 vnf_config_primitive
,
7126 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7127 # Skip sub-operation
7128 result
= "COMPLETED"
7129 result_detail
= "Done"
7132 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7133 vnf_config_primitive
, result
, result_detail
7137 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7138 # New sub-operation: Get index of this sub-operation
7140 len(db_nslcmop
.get("_admin", {}).get("operations"))
7145 + "vnf_config_primitive={} New sub-operation".format(
7146 vnf_config_primitive
7150 # retry: Get registered params for this existing sub-operation
7151 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7154 vnf_index
= op
.get("member_vnf_index")
7155 vnf_config_primitive
= op
.get("primitive")
7156 primitive_params
= op
.get("primitive_params")
7159 + "vnf_config_primitive={} Sub-operation retry".format(
7160 vnf_config_primitive
7163 # Execute the primitive, either with new (first-time) or registered (reintent) args
7164 ee_descriptor_id
= config_primitive
.get(
7165 "execution-environment-ref"
7167 primitive_name
= config_primitive
.get(
7168 "execution-environment-primitive", vnf_config_primitive
7170 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7171 nsr_deployed
["VCA"],
7172 member_vnf_index
=vnf_index
,
7174 vdu_count_index
=None,
7175 ee_descriptor_id
=ee_descriptor_id
,
7177 result
, result_detail
= await self
._ns
_execute
_primitive
(
7186 + "vnf_config_primitive={} Done with result {} {}".format(
7187 vnf_config_primitive
, result
, result_detail
7190 # Update operationState = COMPLETED | FAILED
7191 self
._update
_suboperation
_status
(
7192 db_nslcmop
, op_index
, result
, result_detail
7195 if result
== "FAILED":
7196 raise LcmException(result_detail
)
7197 db_nsr_update
["config-status"] = old_config_status
7198 scale_process
= None
7203 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7204 db_nsr_update
["operational-status"] = (
7206 if old_operational_status
== "failed"
7207 else old_operational_status
7209 db_nsr_update
["config-status"] = old_config_status
7212 ROclient
.ROClientException
,
7217 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7219 except asyncio
.CancelledError
:
7221 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7223 exc
= "Operation was cancelled"
7224 except Exception as e
:
7225 exc
= traceback
.format_exc()
7226 self
.logger
.critical(
7227 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7231 self
._write
_ns
_status
(
7234 current_operation
="IDLE",
7235 current_operation_id
=None,
7238 stage
[1] = "Waiting for instantiate pending tasks."
7239 self
.logger
.debug(logging_text
+ stage
[1])
7240 exc
= await self
._wait
_for
_tasks
(
7243 self
.timeout_ns_deploy
,
7251 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7252 nslcmop_operation_state
= "FAILED"
7254 db_nsr_update
["operational-status"] = old_operational_status
7255 db_nsr_update
["config-status"] = old_config_status
7256 db_nsr_update
["detailed-status"] = ""
7258 if "VCA" in scale_process
:
7259 db_nsr_update
["config-status"] = "failed"
7260 if "RO" in scale_process
:
7261 db_nsr_update
["operational-status"] = "failed"
7264 ] = "FAILED scaling nslcmop={} {}: {}".format(
7265 nslcmop_id
, step
, exc
7268 error_description_nslcmop
= None
7269 nslcmop_operation_state
= "COMPLETED"
7270 db_nslcmop_update
["detailed-status"] = "Done"
7272 self
._write
_op
_status
(
7275 error_message
=error_description_nslcmop
,
7276 operation_state
=nslcmop_operation_state
,
7277 other_update
=db_nslcmop_update
,
7280 self
._write
_ns
_status
(
7283 current_operation
="IDLE",
7284 current_operation_id
=None,
7285 other_update
=db_nsr_update
,
7288 if nslcmop_operation_state
:
7292 "nslcmop_id": nslcmop_id
,
7293 "operationState": nslcmop_operation_state
,
7295 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7296 except Exception as e
:
7298 logging_text
+ "kafka_write notification Exception {}".format(e
)
7300 self
.logger
.debug(logging_text
+ "Exit")
7301 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7303 async def _scale_kdu(
7304 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7306 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7307 for kdu_name
in _scaling_info
:
7308 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7309 deployed_kdu
, index
= get_deployed_kdu(
7310 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7312 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7313 kdu_instance
= deployed_kdu
["kdu-instance"]
7314 kdu_model
= deployed_kdu
.get("kdu-model")
7315 scale
= int(kdu_scaling_info
["scale"])
7316 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7319 "collection": "nsrs",
7320 "filter": {"_id": nsr_id
},
7321 "path": "_admin.deployed.K8s.{}".format(index
),
7324 step
= "scaling application {}".format(
7325 kdu_scaling_info
["resource-name"]
7327 self
.logger
.debug(logging_text
+ step
)
7329 if kdu_scaling_info
["type"] == "delete":
7330 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7333 and kdu_config
.get("terminate-config-primitive")
7334 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7336 terminate_config_primitive_list
= kdu_config
.get(
7337 "terminate-config-primitive"
7339 terminate_config_primitive_list
.sort(
7340 key
=lambda val
: int(val
["seq"])
7344 terminate_config_primitive
7345 ) in terminate_config_primitive_list
:
7346 primitive_params_
= self
._map
_primitive
_params
(
7347 terminate_config_primitive
, {}, {}
7349 step
= "execute terminate config primitive"
7350 self
.logger
.debug(logging_text
+ step
)
7351 await asyncio
.wait_for(
7352 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7353 cluster_uuid
=cluster_uuid
,
7354 kdu_instance
=kdu_instance
,
7355 primitive_name
=terminate_config_primitive
["name"],
7356 params
=primitive_params_
,
7358 total_timeout
=self
.timeout_primitive
,
7361 timeout
=self
.timeout_primitive
7362 * self
.timeout_primitive_outer_factor
,
7365 await asyncio
.wait_for(
7366 self
.k8scluster_map
[k8s_cluster_type
].scale(
7367 kdu_instance
=kdu_instance
,
7369 resource_name
=kdu_scaling_info
["resource-name"],
7370 total_timeout
=self
.timeout_scale_on_error
,
7372 cluster_uuid
=cluster_uuid
,
7373 kdu_model
=kdu_model
,
7377 timeout
=self
.timeout_scale_on_error
7378 * self
.timeout_scale_on_error_outer_factor
,
7381 if kdu_scaling_info
["type"] == "create":
7382 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7385 and kdu_config
.get("initial-config-primitive")
7386 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7388 initial_config_primitive_list
= kdu_config
.get(
7389 "initial-config-primitive"
7391 initial_config_primitive_list
.sort(
7392 key
=lambda val
: int(val
["seq"])
7395 for initial_config_primitive
in initial_config_primitive_list
:
7396 primitive_params_
= self
._map
_primitive
_params
(
7397 initial_config_primitive
, {}, {}
7399 step
= "execute initial config primitive"
7400 self
.logger
.debug(logging_text
+ step
)
7401 await asyncio
.wait_for(
7402 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7403 cluster_uuid
=cluster_uuid
,
7404 kdu_instance
=kdu_instance
,
7405 primitive_name
=initial_config_primitive
["name"],
7406 params
=primitive_params_
,
7413 async def _scale_ng_ro(
7414 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7416 nsr_id
= db_nslcmop
["nsInstanceId"]
7417 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7420 # read from db: vnfd's for every vnf
7423 # for each vnf in ns, read vnfd
7424 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7425 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7426 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7427 # if we haven't this vnfd, read it from db
7428 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7430 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7431 db_vnfds
.append(vnfd
)
7432 n2vc_key
= self
.n2vc
.get_public_key()
7433 n2vc_key_list
= [n2vc_key
]
7436 vdu_scaling_info
.get("vdu-create"),
7437 vdu_scaling_info
.get("vdu-delete"),
7440 # db_vnfr has been updated, update db_vnfrs to use it
7441 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7442 await self
._instantiate
_ng
_ro
(
7452 start_deploy
=time(),
7453 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7455 if vdu_scaling_info
.get("vdu-delete"):
7457 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7460 async def extract_prometheus_scrape_jobs(
7461 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7463 # look if exist a file called 'prometheus*.j2' and
7464 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7468 for f
in artifact_content
7469 if f
.startswith("prometheus") and f
.endswith(".j2")
7475 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7479 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7480 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7482 vnfr_id
= vnfr_id
.replace("-", "")
7484 "JOB_NAME": vnfr_id
,
7485 "TARGET_IP": target_ip
,
7486 "EXPORTER_POD_IP": host_name
,
7487 "EXPORTER_POD_PORT": host_port
,
7489 job_list
= parse_job(job_data
, variables
)
7490 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7491 for job
in job_list
:
7493 not isinstance(job
.get("job_name"), str)
7494 or vnfr_id
not in job
["job_name"]
7496 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7497 job
["nsr_id"] = nsr_id
7498 job
["vnfr_id"] = vnfr_id
7501 async def rebuild_start_stop(
7502 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7504 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7505 self
.logger
.info(logging_text
+ "Enter")
7506 stage
= ["Preparing the environment", ""]
7507 # database nsrs record
7511 # in case of error, indicates what part of scale was failed to put nsr at error status
7512 start_deploy
= time()
7514 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7515 vim_account_id
= db_vnfr
.get("vim-account-id")
7516 vim_info_key
= "vim:" + vim_account_id
7517 vdu_id
= additional_param
["vdu_id"]
7518 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7519 vdur
= find_in_list(
7520 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7523 vdu_vim_name
= vdur
["name"]
7524 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7525 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7527 raise LcmException("Target vdu is not found")
7528 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7529 # wait for any previous tasks in process
7530 stage
[1] = "Waiting for previous operations to terminate"
7531 self
.logger
.info(stage
[1])
7532 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7534 stage
[1] = "Reading from database."
7535 self
.logger
.info(stage
[1])
7536 self
._write
_ns
_status
(
7539 current_operation
=operation_type
.upper(),
7540 current_operation_id
=nslcmop_id
,
7542 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7545 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7546 db_nsr_update
["operational-status"] = operation_type
7547 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7551 "vim_vm_id": vim_vm_id
,
7553 "vdu_index": additional_param
["count-index"],
7554 "vdu_id": vdur
["id"],
7555 "target_vim": target_vim
,
7556 "vim_account_id": vim_account_id
,
7559 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7560 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7561 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7562 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7563 self
.logger
.info("response from RO: {}".format(result_dict
))
7564 action_id
= result_dict
["action_id"]
7565 await self
._wait
_ng
_ro
(
7570 self
.timeout_operate
,
7572 "start_stop_rebuild",
7574 return "COMPLETED", "Done"
7575 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7576 self
.logger
.error("Exit Exception {}".format(e
))
7578 except asyncio
.CancelledError
:
7579 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7580 exc
= "Operation was cancelled"
7581 except Exception as e
:
7582 exc
= traceback
.format_exc()
7583 self
.logger
.critical(
7584 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7586 return "FAILED", "Error in operate VNF {}".format(exc
)
7588 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7590 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7592 :param: vim_account_id: VIM Account ID
7594 :return: (cloud_name, cloud_credential)
7596 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7597 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7599 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7601 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7603 :param: vim_account_id: VIM Account ID
7605 :return: (cloud_name, cloud_credential)
7607 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7608 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7610 async def migrate(self
, nsr_id
, nslcmop_id
):
7612 Migrate VNFs and VDUs instances in a NS
7614 :param: nsr_id: NS Instance ID
7615 :param: nslcmop_id: nslcmop ID of migrate
7618 # Try to lock HA task here
7619 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7620 if not task_is_locked_by_me
:
7622 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7623 self
.logger
.debug(logging_text
+ "Enter")
7624 # get all needed from database
7626 db_nslcmop_update
= {}
7627 nslcmop_operation_state
= None
7631 # in case of error, indicates what part of scale was failed to put nsr at error status
7632 start_deploy
= time()
7635 # wait for any previous tasks in process
7636 step
= "Waiting for previous operations to terminate"
7637 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7639 self
._write
_ns
_status
(
7642 current_operation
="MIGRATING",
7643 current_operation_id
=nslcmop_id
,
7645 step
= "Getting nslcmop from database"
7647 step
+ " after having waited for previous tasks to be completed"
7649 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7650 migrate_params
= db_nslcmop
.get("operationParams")
7653 target
.update(migrate_params
)
7654 desc
= await self
.RO
.migrate(nsr_id
, target
)
7655 self
.logger
.debug("RO return > {}".format(desc
))
7656 action_id
= desc
["action_id"]
7657 await self
._wait
_ng
_ro
(
7662 self
.timeout_migrate
,
7663 operation
="migrate",
7665 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7666 self
.logger
.error("Exit Exception {}".format(e
))
7668 except asyncio
.CancelledError
:
7669 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7670 exc
= "Operation was cancelled"
7671 except Exception as e
:
7672 exc
= traceback
.format_exc()
7673 self
.logger
.critical(
7674 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7677 self
._write
_ns
_status
(
7680 current_operation
="IDLE",
7681 current_operation_id
=None,
7684 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7685 nslcmop_operation_state
= "FAILED"
7687 nslcmop_operation_state
= "COMPLETED"
7688 db_nslcmop_update
["detailed-status"] = "Done"
7689 db_nsr_update
["detailed-status"] = "Done"
7691 self
._write
_op
_status
(
7695 operation_state
=nslcmop_operation_state
,
7696 other_update
=db_nslcmop_update
,
7698 if nslcmop_operation_state
:
7702 "nslcmop_id": nslcmop_id
,
7703 "operationState": nslcmop_operation_state
,
7705 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7706 except Exception as e
:
7708 logging_text
+ "kafka_write notification Exception {}".format(e
)
7710 self
.logger
.debug(logging_text
+ "Exit")
7711 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7713 async def heal(self
, nsr_id
, nslcmop_id
):
7717 :param nsr_id: ns instance to heal
7718 :param nslcmop_id: operation to run
7722 # Try to lock HA task here
7723 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7724 if not task_is_locked_by_me
:
7727 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7728 stage
= ["", "", ""]
7729 tasks_dict_info
= {}
7730 # ^ stage, step, VIM progress
7731 self
.logger
.debug(logging_text
+ "Enter")
7732 # get all needed from database
7734 db_nslcmop_update
= {}
7736 db_vnfrs
= {} # vnf's info indexed by _id
7738 old_operational_status
= ""
7739 old_config_status
= ""
7742 # wait for any previous tasks in process
7743 step
= "Waiting for previous operations to terminate"
7744 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7745 self
._write
_ns
_status
(
7748 current_operation
="HEALING",
7749 current_operation_id
=nslcmop_id
,
7752 step
= "Getting nslcmop from database"
7754 step
+ " after having waited for previous tasks to be completed"
7756 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7758 step
= "Getting nsr from database"
7759 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7760 old_operational_status
= db_nsr
["operational-status"]
7761 old_config_status
= db_nsr
["config-status"]
7764 "_admin.deployed.RO.operational-status": "healing",
7766 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7768 step
= "Sending heal order to VIM"
7769 task_ro
= asyncio
.ensure_future(
7771 logging_text
=logging_text
,
7773 db_nslcmop
=db_nslcmop
,
7777 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7778 tasks_dict_info
[task_ro
] = "Healing at VIM"
7782 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7783 self
.logger
.debug(logging_text
+ stage
[1])
7784 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7785 self
.fs
.sync(db_nsr
["nsd-id"])
7787 # read from db: vnfr's of this ns
7788 step
= "Getting vnfrs from db"
7789 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7790 for vnfr
in db_vnfrs_list
:
7791 db_vnfrs
[vnfr
["_id"]] = vnfr
7792 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7794 # Check for each target VNF
7795 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7796 for target_vnf
in target_list
:
7797 # Find this VNF in the list from DB
7798 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7800 db_vnfr
= db_vnfrs
[vnfr_id
]
7801 vnfd_id
= db_vnfr
.get("vnfd-id")
7802 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7803 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7804 base_folder
= vnfd
["_admin"]["storage"]
7809 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7810 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7812 # Check each target VDU and deploy N2VC
7813 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7816 if not target_vdu_list
:
7817 # Codigo nuevo para crear diccionario
7818 target_vdu_list
= []
7819 for existing_vdu
in db_vnfr
.get("vdur"):
7820 vdu_name
= existing_vdu
.get("vdu-name", None)
7821 vdu_index
= existing_vdu
.get("count-index", 0)
7822 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7825 vdu_to_be_healed
= {
7827 "count-index": vdu_index
,
7828 "run-day1": vdu_run_day1
,
7830 target_vdu_list
.append(vdu_to_be_healed
)
7831 for target_vdu
in target_vdu_list
:
7832 deploy_params_vdu
= target_vdu
7833 # Set run-day1 vnf level value if not vdu level value exists
7834 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7837 deploy_params_vdu
["run-day1"] = target_vnf
[
7840 vdu_name
= target_vdu
.get("vdu-id", None)
7841 # TODO: Get vdu_id from vdud.
7843 # For multi instance VDU count-index is mandatory
7844 # For single session VDU count-indes is 0
7845 vdu_index
= target_vdu
.get("count-index", 0)
7847 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7848 stage
[1] = "Deploying Execution Environments."
7849 self
.logger
.debug(logging_text
+ stage
[1])
7851 # VNF Level charm. Normal case when proxy charms.
7852 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7853 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7854 if descriptor_config
:
7855 # Continue if healed machine is management machine
7856 vnf_ip_address
= db_vnfr
.get("ip-address")
7857 target_instance
= None
7858 for instance
in db_vnfr
.get("vdur", None):
7860 instance
["vdu-name"] == vdu_name
7861 and instance
["count-index"] == vdu_index
7863 target_instance
= instance
7865 if vnf_ip_address
== target_instance
.get("ip-address"):
7867 logging_text
=logging_text
7868 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7869 member_vnf_index
, vdu_name
, vdu_index
7873 nslcmop_id
=nslcmop_id
,
7879 member_vnf_index
=member_vnf_index
,
7882 deploy_params
=deploy_params_vdu
,
7883 descriptor_config
=descriptor_config
,
7884 base_folder
=base_folder
,
7885 task_instantiation_info
=tasks_dict_info
,
7889 # VDU Level charm. Normal case with native charms.
7890 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7891 if descriptor_config
:
7893 logging_text
=logging_text
7894 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7895 member_vnf_index
, vdu_name
, vdu_index
7899 nslcmop_id
=nslcmop_id
,
7905 member_vnf_index
=member_vnf_index
,
7906 vdu_index
=vdu_index
,
7908 deploy_params
=deploy_params_vdu
,
7909 descriptor_config
=descriptor_config
,
7910 base_folder
=base_folder
,
7911 task_instantiation_info
=tasks_dict_info
,
7916 ROclient
.ROClientException
,
7921 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7923 except asyncio
.CancelledError
:
7925 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7927 exc
= "Operation was cancelled"
7928 except Exception as e
:
7929 exc
= traceback
.format_exc()
7930 self
.logger
.critical(
7931 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7936 stage
[1] = "Waiting for healing pending tasks."
7937 self
.logger
.debug(logging_text
+ stage
[1])
7938 exc
= await self
._wait
_for
_tasks
(
7941 self
.timeout_ns_deploy
,
7949 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7950 nslcmop_operation_state
= "FAILED"
7952 db_nsr_update
["operational-status"] = old_operational_status
7953 db_nsr_update
["config-status"] = old_config_status
7956 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7957 for task
, task_name
in tasks_dict_info
.items():
7958 if not task
.done() or task
.cancelled() or task
.exception():
7959 if task_name
.startswith(self
.task_name_deploy_vca
):
7960 # A N2VC task is pending
7961 db_nsr_update
["config-status"] = "failed"
7963 # RO task is pending
7964 db_nsr_update
["operational-status"] = "failed"
7966 error_description_nslcmop
= None
7967 nslcmop_operation_state
= "COMPLETED"
7968 db_nslcmop_update
["detailed-status"] = "Done"
7969 db_nsr_update
["detailed-status"] = "Done"
7970 db_nsr_update
["operational-status"] = "running"
7971 db_nsr_update
["config-status"] = "configured"
7973 self
._write
_op
_status
(
7976 error_message
=error_description_nslcmop
,
7977 operation_state
=nslcmop_operation_state
,
7978 other_update
=db_nslcmop_update
,
7981 self
._write
_ns
_status
(
7984 current_operation
="IDLE",
7985 current_operation_id
=None,
7986 other_update
=db_nsr_update
,
7989 if nslcmop_operation_state
:
7993 "nslcmop_id": nslcmop_id
,
7994 "operationState": nslcmop_operation_state
,
7996 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7997 except Exception as e
:
7999 logging_text
+ "kafka_write notification Exception {}".format(e
)
8001 self
.logger
.debug(logging_text
+ "Exit")
8002 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8013 :param logging_text: preffix text to use at logging
8014 :param nsr_id: nsr identity
8015 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8016 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8017 :return: None or exception
8020 def get_vim_account(vim_account_id
):
8022 if vim_account_id
in db_vims
:
8023 return db_vims
[vim_account_id
]
8024 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8025 db_vims
[vim_account_id
] = db_vim
8030 ns_params
= db_nslcmop
.get("operationParams")
8031 if ns_params
and ns_params
.get("timeout_ns_heal"):
8032 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8034 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
8038 nslcmop_id
= db_nslcmop
["_id"]
8040 "action_id": nslcmop_id
,
8042 self
.logger
.warning(
8043 "db_nslcmop={} and timeout_ns_heal={}".format(
8044 db_nslcmop
, timeout_ns_heal
8047 target
.update(db_nslcmop
.get("operationParams", {}))
8049 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8050 desc
= await self
.RO
.recreate(nsr_id
, target
)
8051 self
.logger
.debug("RO return > {}".format(desc
))
8052 action_id
= desc
["action_id"]
8053 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8054 await self
._wait
_ng
_ro
(
8061 operation
="healing",
8066 "_admin.deployed.RO.operational-status": "running",
8067 "detailed-status": " ".join(stage
),
8069 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8070 self
._write
_op
_status
(nslcmop_id
, stage
)
8072 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8075 except Exception as e
:
8076 stage
[2] = "ERROR healing at VIM"
8077 # self.set_vnfr_at_error(db_vnfrs, str(e))
8079 "Error healing at VIM {}".format(e
),
8080 exc_info
=not isinstance(
8083 ROclient
.ROClientException
,
8109 task_instantiation_info
,
8112 # launch instantiate_N2VC in a asyncio task and register task object
8113 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8114 # if not found, create one entry and update database
8115 # fill db_nsr._admin.deployed.VCA.<index>
8118 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8122 get_charm_name
= False
8123 if "execution-environment-list" in descriptor_config
:
8124 ee_list
= descriptor_config
.get("execution-environment-list", [])
8125 elif "juju" in descriptor_config
:
8126 ee_list
= [descriptor_config
] # ns charms
8127 if "execution-environment-list" not in descriptor_config
:
8128 # charm name is only required for ns charms
8129 get_charm_name
= True
8130 else: # other types as script are not supported
8133 for ee_item
in ee_list
:
8136 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8137 ee_item
.get("juju"), ee_item
.get("helm-chart")
8140 ee_descriptor_id
= ee_item
.get("id")
8141 if ee_item
.get("juju"):
8142 vca_name
= ee_item
["juju"].get("charm")
8144 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8147 if ee_item
["juju"].get("charm") is not None
8150 if ee_item
["juju"].get("cloud") == "k8s":
8151 vca_type
= "k8s_proxy_charm"
8152 elif ee_item
["juju"].get("proxy") is False:
8153 vca_type
= "native_charm"
8154 elif ee_item
.get("helm-chart"):
8155 vca_name
= ee_item
["helm-chart"]
8156 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8159 vca_type
= "helm-v3"
8162 logging_text
+ "skipping non juju neither charm configuration"
8167 for vca_index
, vca_deployed
in enumerate(
8168 db_nsr
["_admin"]["deployed"]["VCA"]
8170 if not vca_deployed
:
8173 vca_deployed
.get("member-vnf-index") == member_vnf_index
8174 and vca_deployed
.get("vdu_id") == vdu_id
8175 and vca_deployed
.get("kdu_name") == kdu_name
8176 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8177 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8181 # not found, create one.
8183 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8186 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8188 target
+= "/kdu/{}".format(kdu_name
)
8190 "target_element": target
,
8191 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8192 "member-vnf-index": member_vnf_index
,
8194 "kdu_name": kdu_name
,
8195 "vdu_count_index": vdu_index
,
8196 "operational-status": "init", # TODO revise
8197 "detailed-status": "", # TODO revise
8198 "step": "initial-deploy", # TODO revise
8200 "vdu_name": vdu_name
,
8202 "ee_descriptor_id": ee_descriptor_id
,
8203 "charm_name": charm_name
,
8207 # create VCA and configurationStatus in db
8209 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8210 "configurationStatus.{}".format(vca_index
): dict(),
8212 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8214 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8216 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8217 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8218 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8221 task_n2vc
= asyncio
.ensure_future(
8223 logging_text
=logging_text
,
8224 vca_index
=vca_index
,
8230 vdu_index
=vdu_index
,
8231 deploy_params
=deploy_params
,
8232 config_descriptor
=descriptor_config
,
8233 base_folder
=base_folder
,
8234 nslcmop_id
=nslcmop_id
,
8238 ee_config_descriptor
=ee_item
,
8241 self
.lcm_tasks
.register(
8245 "instantiate_N2VC-{}".format(vca_index
),
8248 task_instantiation_info
[
8250 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8251 member_vnf_index
or "", vdu_id
or ""
8254 async def heal_N2VC(
8271 ee_config_descriptor
,
8273 nsr_id
= db_nsr
["_id"]
8274 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8275 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8276 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8277 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8279 "collection": "nsrs",
8280 "filter": {"_id": nsr_id
},
8281 "path": db_update_entry
,
8287 element_under_configuration
= nsr_id
8291 vnfr_id
= db_vnfr
["_id"]
8292 osm_config
["osm"]["vnf_id"] = vnfr_id
8294 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8296 if vca_type
== "native_charm":
8299 index_number
= vdu_index
or 0
8302 element_type
= "VNF"
8303 element_under_configuration
= vnfr_id
8304 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8306 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8307 element_type
= "VDU"
8308 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8309 osm_config
["osm"]["vdu_id"] = vdu_id
8311 namespace
+= ".{}".format(kdu_name
)
8312 element_type
= "KDU"
8313 element_under_configuration
= kdu_name
8314 osm_config
["osm"]["kdu_name"] = kdu_name
8317 if base_folder
["pkg-dir"]:
8318 artifact_path
= "{}/{}/{}/{}".format(
8319 base_folder
["folder"],
8320 base_folder
["pkg-dir"],
8323 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8328 artifact_path
= "{}/Scripts/{}/{}/".format(
8329 base_folder
["folder"],
8332 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8337 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8339 # get initial_config_primitive_list that applies to this element
8340 initial_config_primitive_list
= config_descriptor
.get(
8341 "initial-config-primitive"
8345 "Initial config primitive list > {}".format(
8346 initial_config_primitive_list
8350 # add config if not present for NS charm
8351 ee_descriptor_id
= ee_config_descriptor
.get("id")
8352 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8353 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8354 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8358 "Initial config primitive list #2 > {}".format(
8359 initial_config_primitive_list
8362 # n2vc_redesign STEP 3.1
8363 # find old ee_id if exists
8364 ee_id
= vca_deployed
.get("ee_id")
8366 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8367 # create or register execution environment in VCA. Only for native charms when healing
8368 if vca_type
== "native_charm":
8369 step
= "Waiting to VM being up and getting IP address"
8370 self
.logger
.debug(logging_text
+ step
)
8371 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8380 credentials
= {"hostname": rw_mgmt_ip
}
8382 username
= deep_get(
8383 config_descriptor
, ("config-access", "ssh-access", "default-user")
8385 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8386 # merged. Meanwhile let's get username from initial-config-primitive
8387 if not username
and initial_config_primitive_list
:
8388 for config_primitive
in initial_config_primitive_list
:
8389 for param
in config_primitive
.get("parameter", ()):
8390 if param
["name"] == "ssh-username":
8391 username
= param
["value"]
8395 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8396 "'config-access.ssh-access.default-user'"
8398 credentials
["username"] = username
8400 # n2vc_redesign STEP 3.2
8401 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8402 self
._write
_configuration
_status
(
8404 vca_index
=vca_index
,
8405 status
="REGISTERING",
8406 element_under_configuration
=element_under_configuration
,
8407 element_type
=element_type
,
8410 step
= "register execution environment {}".format(credentials
)
8411 self
.logger
.debug(logging_text
+ step
)
8412 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8413 credentials
=credentials
,
8414 namespace
=namespace
,
8419 # update ee_id en db
8421 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8423 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8425 # for compatibility with MON/POL modules, the need model and application name at database
8426 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8427 # Not sure if this need to be done when healing
8429 ee_id_parts = ee_id.split(".")
8430 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8431 if len(ee_id_parts) >= 2:
8432 model_name = ee_id_parts[0]
8433 application_name = ee_id_parts[1]
8434 db_nsr_update[db_update_entry + "model"] = model_name
8435 db_nsr_update[db_update_entry + "application"] = application_name
8438 # n2vc_redesign STEP 3.3
8439 # Install configuration software. Only for native charms.
8440 step
= "Install configuration Software"
8442 self
._write
_configuration
_status
(
8444 vca_index
=vca_index
,
8445 status
="INSTALLING SW",
8446 element_under_configuration
=element_under_configuration
,
8447 element_type
=element_type
,
8448 # other_update=db_nsr_update,
8452 # TODO check if already done
8453 self
.logger
.debug(logging_text
+ step
)
8455 if vca_type
== "native_charm":
8456 config_primitive
= next(
8457 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8460 if config_primitive
:
8461 config
= self
._map
_primitive
_params
(
8462 config_primitive
, {}, deploy_params
8464 await self
.vca_map
[vca_type
].install_configuration_sw(
8466 artifact_path
=artifact_path
,
8474 # write in db flag of configuration_sw already installed
8476 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8479 # Not sure if this need to be done when healing
8481 # add relations for this VCA (wait for other peers related with this VCA)
8482 await self._add_vca_relations(
8483 logging_text=logging_text,
8486 vca_index=vca_index,
8490 # if SSH access is required, then get execution environment SSH public
8491 # if native charm we have waited already to VM be UP
8492 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8495 # self.logger.debug("get ssh key block")
8497 config_descriptor
, ("config-access", "ssh-access", "required")
8499 # self.logger.debug("ssh key needed")
8500 # Needed to inject a ssh key
8503 ("config-access", "ssh-access", "default-user"),
8505 step
= "Install configuration Software, getting public ssh key"
8506 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8507 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8510 step
= "Insert public key into VM user={} ssh_key={}".format(
8514 # self.logger.debug("no need to get ssh key")
8515 step
= "Waiting to VM being up and getting IP address"
8516 self
.logger
.debug(logging_text
+ step
)
8518 # n2vc_redesign STEP 5.1
8519 # wait for RO (ip-address) Insert pub_key into VM
8520 # IMPORTANT: We need do wait for RO to complete healing operation.
8521 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8524 rw_mgmt_ip
= await self
.wait_kdu_up(
8525 logging_text
, nsr_id
, vnfr_id
, kdu_name
8528 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8538 rw_mgmt_ip
= None # This is for a NS configuration
8540 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8542 # store rw_mgmt_ip in deploy params for later replacement
8543 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8546 # get run-day1 operation parameter
8547 runDay1
= deploy_params
.get("run-day1", False)
8549 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8552 # n2vc_redesign STEP 6 Execute initial config primitive
8553 step
= "execute initial config primitive"
8555 # wait for dependent primitives execution (NS -> VNF -> VDU)
8556 if initial_config_primitive_list
:
8557 await self
._wait
_dependent
_n
2vc
(
8558 nsr_id
, vca_deployed_list
, vca_index
8561 # stage, in function of element type: vdu, kdu, vnf or ns
8562 my_vca
= vca_deployed_list
[vca_index
]
8563 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8565 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8566 elif my_vca
.get("member-vnf-index"):
8568 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8571 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8573 self
._write
_configuration
_status
(
8574 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8577 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8579 check_if_terminated_needed
= True
8580 for initial_config_primitive
in initial_config_primitive_list
:
8581 # adding information on the vca_deployed if it is a NS execution environment
8582 if not vca_deployed
["member-vnf-index"]:
8583 deploy_params
["ns_config_info"] = json
.dumps(
8584 self
._get
_ns
_config
_info
(nsr_id
)
8586 # TODO check if already done
8587 primitive_params_
= self
._map
_primitive
_params
(
8588 initial_config_primitive
, {}, deploy_params
8591 step
= "execute primitive '{}' params '{}'".format(
8592 initial_config_primitive
["name"], primitive_params_
8594 self
.logger
.debug(logging_text
+ step
)
8595 await self
.vca_map
[vca_type
].exec_primitive(
8597 primitive_name
=initial_config_primitive
["name"],
8598 params_dict
=primitive_params_
,
8603 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8604 if check_if_terminated_needed
:
8605 if config_descriptor
.get("terminate-config-primitive"):
8609 {db_update_entry
+ "needed_terminate": True},
8611 check_if_terminated_needed
= False
8613 # TODO register in database that primitive is done
8615 # STEP 7 Configure metrics
8616 # Not sure if this need to be done when healing
8618 if vca_type == "helm" or vca_type == "helm-v3":
8619 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8621 artifact_path=artifact_path,
8622 ee_config_descriptor=ee_config_descriptor,
8625 target_ip=rw_mgmt_ip,
8631 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8634 for job in prometheus_jobs:
8637 {"job_name": job["job_name"]},
8640 fail_on_empty=False,
8644 step
= "instantiated at VCA"
8645 self
.logger
.debug(logging_text
+ step
)
8647 self
._write
_configuration
_status
(
8648 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8651 except Exception as e
: # TODO not use Exception but N2VC exception
8652 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8654 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8657 "Exception while {} : {}".format(step
, e
), exc_info
=True
8659 self
._write
_configuration
_status
(
8660 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8662 raise LcmException("{} {}".format(step
, e
)) from e
8664 async def _wait_heal_ro(
8670 while time() <= start_time
+ timeout
:
8671 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8672 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8673 "operational-status"
8675 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8676 if operational_status_ro
!= "healing":
8678 await asyncio
.sleep(15, loop
=self
.loop
)
8679 else: # timeout_ns_deploy
8680 raise NgRoException("Timeout waiting ns to deploy")
8682 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8684 Vertical Scale the VDUs in a NS
8686 :param: nsr_id: NS Instance ID
8687 :param: nslcmop_id: nslcmop ID of migrate
8690 # Try to lock HA task here
8691 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8692 if not task_is_locked_by_me
:
8694 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8695 self
.logger
.debug(logging_text
+ "Enter")
8696 # get all needed from database
8698 db_nslcmop_update
= {}
8699 nslcmop_operation_state
= None
8703 # in case of error, indicates what part of scale was failed to put nsr at error status
8704 start_deploy
= time()
8707 # wait for any previous tasks in process
8708 step
= "Waiting for previous operations to terminate"
8709 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8711 self
._write
_ns
_status
(
8714 current_operation
="VerticalScale",
8715 current_operation_id
=nslcmop_id
,
8717 step
= "Getting nslcmop from database"
8719 step
+ " after having waited for previous tasks to be completed"
8721 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8722 operationParams
= db_nslcmop
.get("operationParams")
8724 target
.update(operationParams
)
8725 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8726 self
.logger
.debug("RO return > {}".format(desc
))
8727 action_id
= desc
["action_id"]
8728 await self
._wait
_ng
_ro
(
8733 self
.timeout_verticalscale
,
8734 operation
="verticalscale",
8736 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8737 self
.logger
.error("Exit Exception {}".format(e
))
8739 except asyncio
.CancelledError
:
8740 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8741 exc
= "Operation was cancelled"
8742 except Exception as e
:
8743 exc
= traceback
.format_exc()
8744 self
.logger
.critical(
8745 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8748 self
._write
_ns
_status
(
8751 current_operation
="IDLE",
8752 current_operation_id
=None,
8755 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8756 nslcmop_operation_state
= "FAILED"
8758 nslcmop_operation_state
= "COMPLETED"
8759 db_nslcmop_update
["detailed-status"] = "Done"
8760 db_nsr_update
["detailed-status"] = "Done"
8762 self
._write
_op
_status
(
8766 operation_state
=nslcmop_operation_state
,
8767 other_update
=db_nslcmop_update
,
8769 if nslcmop_operation_state
:
8773 "nslcmop_id": nslcmop_id
,
8774 "operationState": nslcmop_operation_state
,
8776 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8777 except Exception as e
:
8779 logging_text
+ "kafka_write notification Exception {}".format(e
)
8781 self
.logger
.debug(logging_text
+ "Exit")
8782 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")