1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
64 from osm_lcm
.data_utils
.nsd
import (
65 get_ns_configuration_relation_list
,
69 from osm_lcm
.data_utils
.vnfd
import (
75 get_ee_sorted_initial_config_primitive_list
,
76 get_ee_sorted_terminate_config_primitive_list
,
78 get_virtual_link_profiles
,
83 get_number_of_instances
,
85 get_kdu_resource_profile
,
86 find_software_version
,
88 from osm_lcm
.data_utils
.list_utils
import find_in_list
89 from osm_lcm
.data_utils
.vnfr
import (
93 get_volumes_from_instantiation_params
,
95 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
96 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
97 from n2vc
.definitions
import RelationEndpoint
98 from n2vc
.k8s_helm_conn
import K8sHelmConnector
99 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
100 from n2vc
.k8s_juju_conn
import K8sJujuConnector
102 from osm_common
.dbbase
import DbException
103 from osm_common
.fsbase
import FsException
105 from osm_lcm
.data_utils
.database
.database
import Database
106 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from osm_lcm
.data_utils
.wim
import (
109 get_target_wim_attrs
,
110 select_feasible_wim_account
,
113 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
114 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
116 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
117 from osm_lcm
.osm_config
import OsmConfigBuilder
118 from osm_lcm
.prometheus
import parse_job
120 from copy
import copy
, deepcopy
121 from time
import time
122 from uuid
import uuid4
124 from random
import randint
126 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
129 class NsLcm(LcmBase
):
130 timeout_scale_on_error
= (
132 ) # Time for charm from first time at blocked,error status to mark as failed
133 timeout_scale_on_error_outer_factor
= 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
134 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
135 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
136 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
137 timeout_charm_delete
= 10 * 60
138 timeout_primitive
= 30 * 60 # Timeout for primitive execution
139 timeout_primitive_outer_factor
= 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
140 timeout_ns_update
= 30 * 60 # timeout for ns update
141 timeout_progress_primitive
= (
143 ) # timeout for some progress in a primitive execution
144 timeout_migrate
= 1800 # default global timeout for migrating vnfs
145 timeout_operate
= 1800 # default global timeout for migrating vnfs
146 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
147 SUBOPERATION_STATUS_NOT_FOUND
= -1
148 SUBOPERATION_STATUS_NEW
= -2
149 SUBOPERATION_STATUS_SKIP
= -3
150 task_name_deploy_vca
= "Deploying VCA"
152 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
154 Init, Connect to database, filesystem storage, and messaging
155 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
158 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
160 self
.db
= Database().instance
.db
161 self
.fs
= Filesystem().instance
.fs
163 self
.lcm_tasks
= lcm_tasks
164 self
.timeout
= config
["timeout"]
165 self
.ro_config
= config
["ro_config"]
166 self
.ng_ro
= config
["ro_config"].get("ng")
167 self
.vca_config
= config
["VCA"].copy()
169 # create N2VC connector
170 self
.n2vc
= N2VCJujuConnector(
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
178 self
.conn_helm_ee
= LCMHelmConn(
181 vca_config
=self
.vca_config
,
182 on_update_db
=self
._on
_update
_n
2vc
_db
,
185 self
.k8sclusterhelm2
= K8sHelmConnector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helmpath"),
194 self
.k8sclusterhelm3
= K8sHelm3Connector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 helm_command
=self
.vca_config
.get("helm3path"),
203 self
.k8sclusterjuju
= K8sJujuConnector(
204 kubectl_command
=self
.vca_config
.get("kubectlpath"),
205 juju_command
=self
.vca_config
.get("jujupath"),
208 on_update_db
=self
._on
_update
_k
8s
_db
,
213 self
.k8scluster_map
= {
214 "helm-chart": self
.k8sclusterhelm2
,
215 "helm-chart-v3": self
.k8sclusterhelm3
,
216 "chart": self
.k8sclusterhelm3
,
217 "juju-bundle": self
.k8sclusterjuju
,
218 "juju": self
.k8sclusterjuju
,
222 "lxc_proxy_charm": self
.n2vc
,
223 "native_charm": self
.n2vc
,
224 "k8s_proxy_charm": self
.n2vc
,
225 "helm": self
.conn_helm_ee
,
226 "helm-v3": self
.conn_helm_ee
,
230 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
232 self
.op_status_map
= {
233 "instantiation": self
.RO
.status
,
234 "termination": self
.RO
.status
,
235 "migrate": self
.RO
.status
,
236 "healing": self
.RO
.recreate_status
,
237 "verticalscale": self
.RO
.status
,
238 "start_stop_rebuild": self
.RO
.status
,
242 def increment_ip_mac(ip_mac
, vm_index
=1):
243 if not isinstance(ip_mac
, str):
246 # try with ipv4 look for last dot
247 i
= ip_mac
.rfind(".")
250 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
251 # try with ipv6 or mac look for last colon. Operate in hex
252 i
= ip_mac
.rfind(":")
255 # format in hex, len can be 2 for mac or 4 for ipv6
256 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
257 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
263 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
265 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
268 # TODO filter RO descriptor fields...
272 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
273 db_dict
["deploymentStatus"] = ro_descriptor
274 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
276 except Exception as e
:
278 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
281 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
283 # remove last dot from path (if exists)
284 if path
.endswith("."):
287 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
288 # .format(table, filter, path, updated_data))
291 nsr_id
= filter.get("_id")
293 # read ns record from database
294 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
295 current_ns_status
= nsr
.get("nsState")
297 # get vca status for NS
298 status_dict
= await self
.n2vc
.get_status(
299 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
304 db_dict
["vcaStatus"] = status_dict
306 # update configurationStatus for this VCA
308 vca_index
= int(path
[path
.rfind(".") + 1 :])
311 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
313 vca_status
= vca_list
[vca_index
].get("status")
315 configuration_status_list
= nsr
.get("configurationStatus")
316 config_status
= configuration_status_list
[vca_index
].get("status")
318 if config_status
== "BROKEN" and vca_status
!= "failed":
319 db_dict
["configurationStatus"][vca_index
] = "READY"
320 elif config_status
!= "BROKEN" and vca_status
== "failed":
321 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
322 except Exception as e
:
323 # not update configurationStatus
324 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
326 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
327 # if nsState = 'DEGRADED' check if all is OK
329 if current_ns_status
in ("READY", "DEGRADED"):
330 error_description
= ""
332 if status_dict
.get("machines"):
333 for machine_id
in status_dict
.get("machines"):
334 machine
= status_dict
.get("machines").get(machine_id
)
335 # check machine agent-status
336 if machine
.get("agent-status"):
337 s
= machine
.get("agent-status").get("status")
340 error_description
+= (
341 "machine {} agent-status={} ; ".format(
345 # check machine instance status
346 if machine
.get("instance-status"):
347 s
= machine
.get("instance-status").get("status")
350 error_description
+= (
351 "machine {} instance-status={} ; ".format(
356 if status_dict
.get("applications"):
357 for app_id
in status_dict
.get("applications"):
358 app
= status_dict
.get("applications").get(app_id
)
359 # check application status
360 if app
.get("status"):
361 s
= app
.get("status").get("status")
364 error_description
+= (
365 "application {} status={} ; ".format(app_id
, s
)
368 if error_description
:
369 db_dict
["errorDescription"] = error_description
370 if current_ns_status
== "READY" and is_degraded
:
371 db_dict
["nsState"] = "DEGRADED"
372 if current_ns_status
== "DEGRADED" and not is_degraded
:
373 db_dict
["nsState"] = "READY"
376 self
.update_db_2("nsrs", nsr_id
, db_dict
)
378 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
380 except Exception as e
:
381 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
383 async def _on_update_k8s_db(
384 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
387 Updating vca status in NSR record
388 :param cluster_uuid: UUID of a k8s cluster
389 :param kdu_instance: The unique name of the KDU instance
390 :param filter: To get nsr_id
391 :cluster_type: The cluster type (juju, k8s)
395 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
396 # .format(cluster_uuid, kdu_instance, filter))
398 nsr_id
= filter.get("_id")
400 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
401 cluster_uuid
=cluster_uuid
,
402 kdu_instance
=kdu_instance
,
404 complete_status
=True,
410 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
413 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
417 self
.update_db_2("nsrs", nsr_id
, db_dict
)
418 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
420 except Exception as e
:
421 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
424 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
427 undefined
=StrictUndefined
,
428 autoescape
=select_autoescape(default_for_string
=True, default
=True),
430 template
= env
.from_string(cloud_init_text
)
431 return template
.render(additional_params
or {})
432 except UndefinedError
as e
:
434 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
435 "file, must be provided in the instantiation parameters inside the "
436 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
438 except (TemplateError
, TemplateNotFound
) as e
:
440 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
445 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
446 cloud_init_content
= cloud_init_file
= None
448 if vdu
.get("cloud-init-file"):
449 base_folder
= vnfd
["_admin"]["storage"]
450 if base_folder
["pkg-dir"]:
451 cloud_init_file
= "{}/{}/cloud_init/{}".format(
452 base_folder
["folder"],
453 base_folder
["pkg-dir"],
454 vdu
["cloud-init-file"],
457 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
458 base_folder
["folder"],
459 vdu
["cloud-init-file"],
461 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
462 cloud_init_content
= ci_file
.read()
463 elif vdu
.get("cloud-init"):
464 cloud_init_content
= vdu
["cloud-init"]
466 return cloud_init_content
467 except FsException
as e
:
469 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
470 vnfd
["id"], vdu
["id"], cloud_init_file
, e
474 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
476 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
478 additional_params
= vdur
.get("additionalParams")
479 return parse_yaml_strings(additional_params
)
481 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
483 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
484 :param vnfd: input vnfd
485 :param new_id: overrides vnf id if provided
486 :param additionalParams: Instantiation params for VNFs provided
487 :param nsrId: Id of the NSR
488 :return: copy of vnfd
490 vnfd_RO
= deepcopy(vnfd
)
491 # remove unused by RO configuration, monitoring, scaling and internal keys
492 vnfd_RO
.pop("_id", None)
493 vnfd_RO
.pop("_admin", None)
494 vnfd_RO
.pop("monitoring-param", None)
495 vnfd_RO
.pop("scaling-group-descriptor", None)
496 vnfd_RO
.pop("kdu", None)
497 vnfd_RO
.pop("k8s-cluster", None)
499 vnfd_RO
["id"] = new_id
501 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
502 for vdu
in get_iterable(vnfd_RO
, "vdu"):
503 vdu
.pop("cloud-init-file", None)
504 vdu
.pop("cloud-init", None)
508 def ip_profile_2_RO(ip_profile
):
509 RO_ip_profile
= deepcopy(ip_profile
)
510 if "dns-server" in RO_ip_profile
:
511 if isinstance(RO_ip_profile
["dns-server"], list):
512 RO_ip_profile
["dns-address"] = []
513 for ds
in RO_ip_profile
.pop("dns-server"):
514 RO_ip_profile
["dns-address"].append(ds
["address"])
516 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
517 if RO_ip_profile
.get("ip-version") == "ipv4":
518 RO_ip_profile
["ip-version"] = "IPv4"
519 if RO_ip_profile
.get("ip-version") == "ipv6":
520 RO_ip_profile
["ip-version"] = "IPv6"
521 if "dhcp-params" in RO_ip_profile
:
522 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
525 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
526 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
527 if db_vim
["_admin"]["operationalState"] != "ENABLED":
529 "VIM={} is not available. operationalState={}".format(
530 vim_account
, db_vim
["_admin"]["operationalState"]
533 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
536 def get_ro_wim_id_for_wim_account(self
, wim_account
):
537 if isinstance(wim_account
, str):
538 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
539 if db_wim
["_admin"]["operationalState"] != "ENABLED":
541 "WIM={} is not available. operationalState={}".format(
542 wim_account
, db_wim
["_admin"]["operationalState"]
545 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
550 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
552 db_vdu_push_list
= []
554 db_update
= {"_admin.modified": time()}
556 for vdu_id
, vdu_count
in vdu_create
.items():
560 for vdur
in reversed(db_vnfr
["vdur"])
561 if vdur
["vdu-id-ref"] == vdu_id
566 # Read the template saved in the db:
568 "No vdur in the database. Using the vdur-template to scale"
570 vdur_template
= db_vnfr
.get("vdur-template")
571 if not vdur_template
:
573 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
577 vdur
= vdur_template
[0]
578 # Delete a template from the database after using it
581 {"_id": db_vnfr
["_id"]},
583 pull
={"vdur-template": {"_id": vdur
["_id"]}},
585 for count
in range(vdu_count
):
586 vdur_copy
= deepcopy(vdur
)
587 vdur_copy
["status"] = "BUILD"
588 vdur_copy
["status-detailed"] = None
589 vdur_copy
["ip-address"] = None
590 vdur_copy
["_id"] = str(uuid4())
591 vdur_copy
["count-index"] += count
+ 1
592 vdur_copy
["id"] = "{}-{}".format(
593 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
595 vdur_copy
.pop("vim_info", None)
596 for iface
in vdur_copy
["interfaces"]:
597 if iface
.get("fixed-ip"):
598 iface
["ip-address"] = self
.increment_ip_mac(
599 iface
["ip-address"], count
+ 1
602 iface
.pop("ip-address", None)
603 if iface
.get("fixed-mac"):
604 iface
["mac-address"] = self
.increment_ip_mac(
605 iface
["mac-address"], count
+ 1
608 iface
.pop("mac-address", None)
612 ) # only first vdu can be managment of vnf
613 db_vdu_push_list
.append(vdur_copy
)
614 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
616 if len(db_vnfr
["vdur"]) == 1:
617 # The scale will move to 0 instances
619 "Scaling to 0 !, creating the template with the last vdur"
621 template_vdur
= [db_vnfr
["vdur"][0]]
622 for vdu_id
, vdu_count
in vdu_delete
.items():
624 indexes_to_delete
= [
626 for iv
in enumerate(db_vnfr
["vdur"])
627 if iv
[1]["vdu-id-ref"] == vdu_id
631 "vdur.{}.status".format(i
): "DELETING"
632 for i
in indexes_to_delete
[-vdu_count
:]
636 # it must be deleted one by one because common.db does not allow otherwise
639 for v
in reversed(db_vnfr
["vdur"])
640 if v
["vdu-id-ref"] == vdu_id
642 for vdu
in vdus_to_delete
[:vdu_count
]:
645 {"_id": db_vnfr
["_id"]},
647 pull
={"vdur": {"_id": vdu
["_id"]}},
651 db_push
["vdur"] = db_vdu_push_list
653 db_push
["vdur-template"] = template_vdur
656 db_vnfr
["vdur-template"] = template_vdur
657 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
658 # modify passed dictionary db_vnfr
659 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
660 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
662 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
664 Updates database nsr with the RO info for the created vld
665 :param ns_update_nsr: dictionary to be filled with the updated info
666 :param db_nsr: content of db_nsr. This is also modified
667 :param nsr_desc_RO: nsr descriptor from RO
668 :return: Nothing, LcmException is raised on errors
671 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
672 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
673 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
675 vld
["vim-id"] = net_RO
.get("vim_net_id")
676 vld
["name"] = net_RO
.get("vim_name")
677 vld
["status"] = net_RO
.get("status")
678 vld
["status-detailed"] = net_RO
.get("error_msg")
679 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
683 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
686 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
688 for db_vnfr
in db_vnfrs
.values():
689 vnfr_update
= {"status": "ERROR"}
690 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
691 if "status" not in vdur
:
692 vdur
["status"] = "ERROR"
693 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
695 vdur
["status-detailed"] = str(error_text
)
697 "vdur.{}.status-detailed".format(vdu_index
)
699 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
700 except DbException
as e
:
701 self
.logger
.error("Cannot update vnf. {}".format(e
))
703 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
705 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
706 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
707 :param nsr_desc_RO: nsr descriptor from RO
708 :return: Nothing, LcmException is raised on errors
710 for vnf_index
, db_vnfr
in db_vnfrs
.items():
711 for vnf_RO
in nsr_desc_RO
["vnfs"]:
712 if vnf_RO
["member_vnf_index"] != vnf_index
:
715 if vnf_RO
.get("ip_address"):
716 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
719 elif not db_vnfr
.get("ip-address"):
720 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
721 raise LcmExceptionNoMgmtIP(
722 "ns member_vnf_index '{}' has no IP address".format(
727 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
728 vdur_RO_count_index
= 0
729 if vdur
.get("pdu-type"):
731 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
732 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
734 if vdur
["count-index"] != vdur_RO_count_index
:
735 vdur_RO_count_index
+= 1
737 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
738 if vdur_RO
.get("ip_address"):
739 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
741 vdur
["ip-address"] = None
742 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
743 vdur
["name"] = vdur_RO
.get("vim_name")
744 vdur
["status"] = vdur_RO
.get("status")
745 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
746 for ifacer
in get_iterable(vdur
, "interfaces"):
747 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
748 if ifacer
["name"] == interface_RO
.get("internal_name"):
749 ifacer
["ip-address"] = interface_RO
.get(
752 ifacer
["mac-address"] = interface_RO
.get(
758 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
759 "from VIM info".format(
760 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
763 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
767 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
769 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
773 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
774 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
775 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
777 vld
["vim-id"] = net_RO
.get("vim_net_id")
778 vld
["name"] = net_RO
.get("vim_name")
779 vld
["status"] = net_RO
.get("status")
780 vld
["status-detailed"] = net_RO
.get("error_msg")
781 vnfr_update
["vld.{}".format(vld_index
)] = vld
785 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
790 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
795 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
800 def _get_ns_config_info(self
, nsr_id
):
802 Generates a mapping between vnf,vdu elements and the N2VC id
803 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
804 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
805 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
806 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
808 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
809 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
811 ns_config_info
= {"osm-config-mapping": mapping
}
812 for vca
in vca_deployed_list
:
813 if not vca
["member-vnf-index"]:
815 if not vca
["vdu_id"]:
816 mapping
[vca
["member-vnf-index"]] = vca
["application"]
820 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
822 ] = vca
["application"]
823 return ns_config_info
825 async def _instantiate_ng_ro(
842 def get_vim_account(vim_account_id
):
844 if vim_account_id
in db_vims
:
845 return db_vims
[vim_account_id
]
846 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
847 db_vims
[vim_account_id
] = db_vim
850 # modify target_vld info with instantiation parameters
851 def parse_vld_instantiation_params(
852 target_vim
, target_vld
, vld_params
, target_sdn
854 if vld_params
.get("ip-profile"):
855 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
858 if vld_params
.get("provider-network"):
859 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
862 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
863 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
867 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
868 # if wim_account_id is specified in vld_params, validate if it is feasible.
869 wim_account_id
, db_wim
= select_feasible_wim_account(
870 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
874 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
875 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
876 # update vld_params with correct WIM account Id
877 vld_params
["wimAccountId"] = wim_account_id
879 target_wim
= "wim:{}".format(wim_account_id
)
880 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
881 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
882 if len(sdn_ports
) > 0:
883 target_vld
["vim_info"][target_wim
] = target_wim_attrs
884 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
887 "Target VLD with WIM data: {:s}".format(str(target_vld
))
890 for param
in ("vim-network-name", "vim-network-id"):
891 if vld_params
.get(param
):
892 if isinstance(vld_params
[param
], dict):
893 for vim
, vim_net
in vld_params
[param
].items():
894 other_target_vim
= "vim:" + vim
896 target_vld
["vim_info"],
897 (other_target_vim
, param
.replace("-", "_")),
900 else: # isinstance str
901 target_vld
["vim_info"][target_vim
][
902 param
.replace("-", "_")
903 ] = vld_params
[param
]
904 if vld_params
.get("common_id"):
905 target_vld
["common_id"] = vld_params
.get("common_id")
907 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
908 def update_ns_vld_target(target
, ns_params
):
909 for vnf_params
in ns_params
.get("vnf", ()):
910 if vnf_params
.get("vimAccountId"):
914 for vnfr
in db_vnfrs
.values()
915 if vnf_params
["member-vnf-index"]
916 == vnfr
["member-vnf-index-ref"]
920 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
923 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
924 target_vld
= find_in_list(
925 get_iterable(vdur
, "interfaces"),
926 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
929 vld_params
= find_in_list(
930 get_iterable(ns_params
, "vld"),
931 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
935 if vnf_params
.get("vimAccountId") not in a_vld
.get(
938 target_vim_network_list
= [
939 v
for _
, v
in a_vld
.get("vim_info").items()
941 target_vim_network_name
= next(
943 item
.get("vim_network_name", "")
944 for item
in target_vim_network_list
949 target
["ns"]["vld"][a_index
].get("vim_info").update(
951 "vim:{}".format(vnf_params
["vimAccountId"]): {
952 "vim_network_name": target_vim_network_name
,
958 for param
in ("vim-network-name", "vim-network-id"):
959 if vld_params
.get(param
) and isinstance(
960 vld_params
[param
], dict
962 for vim
, vim_net
in vld_params
[
965 other_target_vim
= "vim:" + vim
967 target
["ns"]["vld"][a_index
].get(
972 param
.replace("-", "_"),
977 nslcmop_id
= db_nslcmop
["_id"]
979 "name": db_nsr
["name"],
982 "image": deepcopy(db_nsr
["image"]),
983 "flavor": deepcopy(db_nsr
["flavor"]),
984 "action_id": nslcmop_id
,
985 "cloud_init_content": {},
987 for image
in target
["image"]:
988 image
["vim_info"] = {}
989 for flavor
in target
["flavor"]:
990 flavor
["vim_info"] = {}
991 if db_nsr
.get("affinity-or-anti-affinity-group"):
992 target
["affinity-or-anti-affinity-group"] = deepcopy(
993 db_nsr
["affinity-or-anti-affinity-group"]
995 for affinity_or_anti_affinity_group
in target
[
996 "affinity-or-anti-affinity-group"
998 affinity_or_anti_affinity_group
["vim_info"] = {}
1000 if db_nslcmop
.get("lcmOperationType") != "instantiate":
1001 # get parameters of instantiation:
1002 db_nslcmop_instantiate
= self
.db
.get_list(
1005 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1006 "lcmOperationType": "instantiate",
1009 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1011 ns_params
= db_nslcmop
.get("operationParams")
1012 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1013 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1016 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1017 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1020 "name": vld
["name"],
1021 "mgmt-network": vld
.get("mgmt-network", False),
1022 "type": vld
.get("type"),
1025 "vim_network_name": vld
.get("vim-network-name"),
1026 "vim_account_id": ns_params
["vimAccountId"],
1030 # check if this network needs SDN assist
1031 if vld
.get("pci-interfaces"):
1032 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1033 sdnc_id
= db_vim
["config"].get("sdn-controller")
1035 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1036 target_sdn
= "sdn:{}".format(sdnc_id
)
1037 target_vld
["vim_info"][target_sdn
] = {
1039 "target_vim": target_vim
,
1041 "type": vld
.get("type"),
1044 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1045 for nsd_vnf_profile
in nsd_vnf_profiles
:
1046 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1047 if cp
["virtual-link-profile-id"] == vld
["id"]:
1049 "member_vnf:{}.{}".format(
1050 cp
["constituent-cpd-id"][0][
1051 "constituent-base-element-id"
1053 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1055 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1057 # check at nsd descriptor, if there is an ip-profile
1059 nsd_vlp
= find_in_list(
1060 get_virtual_link_profiles(nsd
),
1061 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1066 and nsd_vlp
.get("virtual-link-protocol-data")
1067 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1069 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1072 ip_profile_dest_data
= {}
1073 if "ip-version" in ip_profile_source_data
:
1074 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1077 if "cidr" in ip_profile_source_data
:
1078 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1081 if "gateway-ip" in ip_profile_source_data
:
1082 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1085 if "dhcp-enabled" in ip_profile_source_data
:
1086 ip_profile_dest_data
["dhcp-params"] = {
1087 "enabled": ip_profile_source_data
["dhcp-enabled"]
1089 vld_params
["ip-profile"] = ip_profile_dest_data
1091 # update vld_params with instantiation params
1092 vld_instantiation_params
= find_in_list(
1093 get_iterable(ns_params
, "vld"),
1094 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1096 if vld_instantiation_params
:
1097 vld_params
.update(vld_instantiation_params
)
1098 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1099 target
["ns"]["vld"].append(target_vld
)
1100 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1101 update_ns_vld_target(target
, ns_params
)
1103 for vnfr
in db_vnfrs
.values():
1104 vnfd
= find_in_list(
1105 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1107 vnf_params
= find_in_list(
1108 get_iterable(ns_params
, "vnf"),
1109 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1111 target_vnf
= deepcopy(vnfr
)
1112 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1113 for vld
in target_vnf
.get("vld", ()):
1114 # check if connected to a ns.vld, to fill target'
1115 vnf_cp
= find_in_list(
1116 vnfd
.get("int-virtual-link-desc", ()),
1117 lambda cpd
: cpd
.get("id") == vld
["id"],
1120 ns_cp
= "member_vnf:{}.{}".format(
1121 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1123 if cp2target
.get(ns_cp
):
1124 vld
["target"] = cp2target
[ns_cp
]
1127 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1129 # check if this network needs SDN assist
1131 if vld
.get("pci-interfaces"):
1132 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1133 sdnc_id
= db_vim
["config"].get("sdn-controller")
1135 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1136 target_sdn
= "sdn:{}".format(sdnc_id
)
1137 vld
["vim_info"][target_sdn
] = {
1139 "target_vim": target_vim
,
1141 "type": vld
.get("type"),
1144 # check at vnfd descriptor, if there is an ip-profile
1146 vnfd_vlp
= find_in_list(
1147 get_virtual_link_profiles(vnfd
),
1148 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1152 and vnfd_vlp
.get("virtual-link-protocol-data")
1153 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1155 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1158 ip_profile_dest_data
= {}
1159 if "ip-version" in ip_profile_source_data
:
1160 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1163 if "cidr" in ip_profile_source_data
:
1164 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1167 if "gateway-ip" in ip_profile_source_data
:
1168 ip_profile_dest_data
[
1170 ] = ip_profile_source_data
["gateway-ip"]
1171 if "dhcp-enabled" in ip_profile_source_data
:
1172 ip_profile_dest_data
["dhcp-params"] = {
1173 "enabled": ip_profile_source_data
["dhcp-enabled"]
1176 vld_params
["ip-profile"] = ip_profile_dest_data
1177 # update vld_params with instantiation params
1179 vld_instantiation_params
= find_in_list(
1180 get_iterable(vnf_params
, "internal-vld"),
1181 lambda i_vld
: i_vld
["name"] == vld
["id"],
1183 if vld_instantiation_params
:
1184 vld_params
.update(vld_instantiation_params
)
1185 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1188 for vdur
in target_vnf
.get("vdur", ()):
1189 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1190 continue # This vdu must not be created
1191 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1193 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1196 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1197 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1200 and vdu_configuration
.get("config-access")
1201 and vdu_configuration
.get("config-access").get("ssh-access")
1203 vdur
["ssh-keys"] = ssh_keys_all
1204 vdur
["ssh-access-required"] = vdu_configuration
[
1206 ]["ssh-access"]["required"]
1209 and vnf_configuration
.get("config-access")
1210 and vnf_configuration
.get("config-access").get("ssh-access")
1211 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1213 vdur
["ssh-keys"] = ssh_keys_all
1214 vdur
["ssh-access-required"] = vnf_configuration
[
1216 ]["ssh-access"]["required"]
1217 elif ssh_keys_instantiation
and find_in_list(
1218 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1220 vdur
["ssh-keys"] = ssh_keys_instantiation
1222 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1224 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1226 if vdud
.get("cloud-init-file"):
1227 vdur
["cloud-init"] = "{}:file:{}".format(
1228 vnfd
["_id"], vdud
.get("cloud-init-file")
1230 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1231 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1232 base_folder
= vnfd
["_admin"]["storage"]
1233 if base_folder
["pkg-dir"]:
1234 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1235 base_folder
["folder"],
1236 base_folder
["pkg-dir"],
1237 vdud
.get("cloud-init-file"),
1240 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1241 base_folder
["folder"],
1242 vdud
.get("cloud-init-file"),
1244 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1245 target
["cloud_init_content"][
1248 elif vdud
.get("cloud-init"):
1249 vdur
["cloud-init"] = "{}:vdu:{}".format(
1250 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1252 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1253 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1256 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1257 deploy_params_vdu
= self
._format
_additional
_params
(
1258 vdur
.get("additionalParams") or {}
1260 deploy_params_vdu
["OSM"] = get_osm_params(
1261 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1263 vdur
["additionalParams"] = deploy_params_vdu
1266 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1267 if target_vim
not in ns_flavor
["vim_info"]:
1268 ns_flavor
["vim_info"][target_vim
] = {}
1271 # in case alternative images are provided we must check if they should be applied
1272 # for the vim_type, modify the vim_type taking into account
1273 ns_image_id
= int(vdur
["ns-image-id"])
1274 if vdur
.get("alt-image-ids"):
1275 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1276 vim_type
= db_vim
["vim_type"]
1277 for alt_image_id
in vdur
.get("alt-image-ids"):
1278 ns_alt_image
= target
["image"][int(alt_image_id
)]
1279 if vim_type
== ns_alt_image
.get("vim-type"):
1280 # must use alternative image
1282 "use alternative image id: {}".format(alt_image_id
)
1284 ns_image_id
= alt_image_id
1285 vdur
["ns-image-id"] = ns_image_id
1287 ns_image
= target
["image"][int(ns_image_id
)]
1288 if target_vim
not in ns_image
["vim_info"]:
1289 ns_image
["vim_info"][target_vim
] = {}
1292 if vdur
.get("affinity-or-anti-affinity-group-id"):
1293 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1294 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1295 if target_vim
not in ns_ags
["vim_info"]:
1296 ns_ags
["vim_info"][target_vim
] = {}
1298 vdur
["vim_info"] = {target_vim
: {}}
1299 # instantiation parameters
1301 vdu_instantiation_params
= find_in_list(
1302 get_iterable(vnf_params
, "vdu"),
1303 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1305 if vdu_instantiation_params
:
1306 # Parse the vdu_volumes from the instantiation params
1307 vdu_volumes
= get_volumes_from_instantiation_params(
1308 vdu_instantiation_params
, vdud
1310 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1311 vdur_list
.append(vdur
)
1312 target_vnf
["vdur"] = vdur_list
1313 target
["vnf"].append(target_vnf
)
1315 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1316 desc
= await self
.RO
.deploy(nsr_id
, target
)
1317 self
.logger
.debug("RO return > {}".format(desc
))
1318 action_id
= desc
["action_id"]
1319 await self
._wait
_ng
_ro
(
1326 operation
="instantiation",
1331 "_admin.deployed.RO.operational-status": "running",
1332 "detailed-status": " ".join(stage
),
1334 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1335 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1336 self
._write
_op
_status
(nslcmop_id
, stage
)
1338 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1342 async def _wait_ng_ro(
1352 detailed_status_old
= None
1354 start_time
= start_time
or time()
1355 while time() <= start_time
+ timeout
:
1356 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1357 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1358 if desc_status
["status"] == "FAILED":
1359 raise NgRoException(desc_status
["details"])
1360 elif desc_status
["status"] == "BUILD":
1362 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1363 elif desc_status
["status"] == "DONE":
1365 stage
[2] = "Deployed at VIM"
1368 assert False, "ROclient.check_ns_status returns unknown {}".format(
1369 desc_status
["status"]
1371 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1372 detailed_status_old
= stage
[2]
1373 db_nsr_update
["detailed-status"] = " ".join(stage
)
1374 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1375 self
._write
_op
_status
(nslcmop_id
, stage
)
1376 await asyncio
.sleep(15, loop
=self
.loop
)
1377 else: # timeout_ns_deploy
1378 raise NgRoException("Timeout waiting ns to deploy")
1380 async def _terminate_ng_ro(
1381 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1386 start_deploy
= time()
1393 "action_id": nslcmop_id
,
1395 desc
= await self
.RO
.deploy(nsr_id
, target
)
1396 action_id
= desc
["action_id"]
1397 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1398 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1401 + "ns terminate action at RO. action_id={}".format(action_id
)
1405 delete_timeout
= 20 * 60 # 20 minutes
1406 await self
._wait
_ng
_ro
(
1413 operation
="termination",
1416 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1417 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1419 await self
.RO
.delete(nsr_id
)
1420 except Exception as e
:
1421 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1422 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1423 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1424 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1426 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1428 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1429 failed_detail
.append("delete conflict: {}".format(e
))
1432 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1435 failed_detail
.append("delete error: {}".format(e
))
1438 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1442 stage
[2] = "Error deleting from VIM"
1444 stage
[2] = "Deleted from VIM"
1445 db_nsr_update
["detailed-status"] = " ".join(stage
)
1446 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1447 self
._write
_op
_status
(nslcmop_id
, stage
)
1450 raise LcmException("; ".join(failed_detail
))
1453 async def instantiate_RO(
1467 :param logging_text: preffix text to use at logging
1468 :param nsr_id: nsr identity
1469 :param nsd: database content of ns descriptor
1470 :param db_nsr: database content of ns record
1471 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1473 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1474 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1475 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1476 :return: None or exception
1479 start_deploy
= time()
1480 ns_params
= db_nslcmop
.get("operationParams")
1481 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1482 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1484 timeout_ns_deploy
= self
.timeout
.get(
1485 "ns_deploy", self
.timeout_ns_deploy
1488 # Check for and optionally request placement optimization. Database will be updated if placement activated
1489 stage
[2] = "Waiting for Placement."
1490 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1491 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1492 for vnfr
in db_vnfrs
.values():
1493 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1496 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1498 return await self
._instantiate
_ng
_ro
(
1511 except Exception as e
:
1512 stage
[2] = "ERROR deploying at VIM"
1513 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1515 "Error deploying at VIM {}".format(e
),
1516 exc_info
=not isinstance(
1519 ROclient
.ROClientException
,
1528 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1530 Wait for kdu to be up, get ip address
1531 :param logging_text: prefix use for logging
1535 :return: IP address, K8s services
1538 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1541 while nb_tries
< 360:
1542 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1546 for x
in get_iterable(db_vnfr
, "kdur")
1547 if x
.get("kdu-name") == kdu_name
1553 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1555 if kdur
.get("status"):
1556 if kdur
["status"] in ("READY", "ENABLED"):
1557 return kdur
.get("ip-address"), kdur
.get("services")
1560 "target KDU={} is in error state".format(kdu_name
)
1563 await asyncio
.sleep(10, loop
=self
.loop
)
1565 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1567 async def wait_vm_up_insert_key_ro(
1568 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1571 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1572 :param logging_text: prefix use for logging
1577 :param pub_key: public ssh key to inject, None to skip
1578 :param user: user to apply the public ssh key
1582 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1586 target_vdu_id
= None
1592 if ro_retries
>= 360: # 1 hour
1594 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1597 await asyncio
.sleep(10, loop
=self
.loop
)
1600 if not target_vdu_id
:
1601 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1603 if not vdu_id
: # for the VNF case
1604 if db_vnfr
.get("status") == "ERROR":
1606 "Cannot inject ssh-key because target VNF is in error state"
1608 ip_address
= db_vnfr
.get("ip-address")
1614 for x
in get_iterable(db_vnfr
, "vdur")
1615 if x
.get("ip-address") == ip_address
1623 for x
in get_iterable(db_vnfr
, "vdur")
1624 if x
.get("vdu-id-ref") == vdu_id
1625 and x
.get("count-index") == vdu_index
1631 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1632 ): # If only one, this should be the target vdu
1633 vdur
= db_vnfr
["vdur"][0]
1636 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1637 vnfr_id
, vdu_id
, vdu_index
1640 # New generation RO stores information at "vim_info"
1643 if vdur
.get("vim_info"):
1645 t
for t
in vdur
["vim_info"]
1646 ) # there should be only one key
1647 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1649 vdur
.get("pdu-type")
1650 or vdur
.get("status") == "ACTIVE"
1651 or ng_ro_status
== "ACTIVE"
1653 ip_address
= vdur
.get("ip-address")
1656 target_vdu_id
= vdur
["vdu-id-ref"]
1657 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1659 "Cannot inject ssh-key because target VM is in error state"
1662 if not target_vdu_id
:
1665 # inject public key into machine
1666 if pub_key
and user
:
1667 self
.logger
.debug(logging_text
+ "Inserting RO key")
1668 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1669 if vdur
.get("pdu-type"):
1670 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1673 ro_vm_id
= "{}-{}".format(
1674 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1675 ) # TODO add vdu_index
1679 "action": "inject_ssh_key",
1683 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1685 desc
= await self
.RO
.deploy(nsr_id
, target
)
1686 action_id
= desc
["action_id"]
1687 await self
._wait
_ng
_ro
(
1688 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1692 # wait until NS is deployed at RO
1694 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1695 ro_nsr_id
= deep_get(
1696 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1700 result_dict
= await self
.RO
.create_action(
1702 item_id_name
=ro_nsr_id
,
1704 "add_public_key": pub_key
,
1709 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1710 if not result_dict
or not isinstance(result_dict
, dict):
1712 "Unknown response from RO when injecting key"
1714 for result
in result_dict
.values():
1715 if result
.get("vim_result") == 200:
1718 raise ROclient
.ROClientException(
1719 "error injecting key: {}".format(
1720 result
.get("description")
1724 except NgRoException
as e
:
1726 "Reaching max tries injecting key. Error: {}".format(e
)
1728 except ROclient
.ROClientException
as e
:
1732 + "error injecting key: {}. Retrying until {} seconds".format(
1739 "Reaching max tries injecting key. Error: {}".format(e
)
1746 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1748 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1750 my_vca
= vca_deployed_list
[vca_index
]
1751 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1752 # vdu or kdu: no dependencies
1756 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1757 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1758 configuration_status_list
= db_nsr
["configurationStatus"]
1759 for index
, vca_deployed
in enumerate(configuration_status_list
):
1760 if index
== vca_index
:
1763 if not my_vca
.get("member-vnf-index") or (
1764 vca_deployed
.get("member-vnf-index")
1765 == my_vca
.get("member-vnf-index")
1767 internal_status
= configuration_status_list
[index
].get("status")
1768 if internal_status
== "READY":
1770 elif internal_status
== "BROKEN":
1772 "Configuration aborted because dependent charm/s has failed"
1777 # no dependencies, return
1779 await asyncio
.sleep(10)
1782 raise LcmException("Configuration aborted because dependent charm/s timeout")
1784 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1787 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1789 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1790 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1793 async def instantiate_N2VC(
1810 ee_config_descriptor
,
1812 nsr_id
= db_nsr
["_id"]
1813 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1814 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1815 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1816 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1818 "collection": "nsrs",
1819 "filter": {"_id": nsr_id
},
1820 "path": db_update_entry
,
1826 element_under_configuration
= nsr_id
1830 vnfr_id
= db_vnfr
["_id"]
1831 osm_config
["osm"]["vnf_id"] = vnfr_id
1833 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1835 if vca_type
== "native_charm":
1838 index_number
= vdu_index
or 0
1841 element_type
= "VNF"
1842 element_under_configuration
= vnfr_id
1843 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1845 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1846 element_type
= "VDU"
1847 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1848 osm_config
["osm"]["vdu_id"] = vdu_id
1850 namespace
+= ".{}".format(kdu_name
)
1851 element_type
= "KDU"
1852 element_under_configuration
= kdu_name
1853 osm_config
["osm"]["kdu_name"] = kdu_name
1856 if base_folder
["pkg-dir"]:
1857 artifact_path
= "{}/{}/{}/{}".format(
1858 base_folder
["folder"],
1859 base_folder
["pkg-dir"],
1862 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1867 artifact_path
= "{}/Scripts/{}/{}/".format(
1868 base_folder
["folder"],
1871 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1876 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1878 # get initial_config_primitive_list that applies to this element
1879 initial_config_primitive_list
= config_descriptor
.get(
1880 "initial-config-primitive"
1884 "Initial config primitive list > {}".format(
1885 initial_config_primitive_list
1889 # add config if not present for NS charm
1890 ee_descriptor_id
= ee_config_descriptor
.get("id")
1891 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1892 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1893 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1897 "Initial config primitive list #2 > {}".format(
1898 initial_config_primitive_list
1901 # n2vc_redesign STEP 3.1
1902 # find old ee_id if exists
1903 ee_id
= vca_deployed
.get("ee_id")
1905 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1906 # create or register execution environment in VCA
1907 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1909 self
._write
_configuration
_status
(
1911 vca_index
=vca_index
,
1913 element_under_configuration
=element_under_configuration
,
1914 element_type
=element_type
,
1917 step
= "create execution environment"
1918 self
.logger
.debug(logging_text
+ step
)
1922 if vca_type
== "k8s_proxy_charm":
1923 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1924 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1925 namespace
=namespace
,
1926 artifact_path
=artifact_path
,
1930 elif vca_type
== "helm" or vca_type
== "helm-v3":
1931 ee_id
, credentials
= await self
.vca_map
[
1933 ].create_execution_environment(
1934 namespace
=namespace
,
1938 artifact_path
=artifact_path
,
1939 chart_model
=vca_name
,
1943 ee_id
, credentials
= await self
.vca_map
[
1945 ].create_execution_environment(
1946 namespace
=namespace
,
1952 elif vca_type
== "native_charm":
1953 step
= "Waiting to VM being up and getting IP address"
1954 self
.logger
.debug(logging_text
+ step
)
1955 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1964 credentials
= {"hostname": rw_mgmt_ip
}
1966 username
= deep_get(
1967 config_descriptor
, ("config-access", "ssh-access", "default-user")
1969 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1970 # merged. Meanwhile let's get username from initial-config-primitive
1971 if not username
and initial_config_primitive_list
:
1972 for config_primitive
in initial_config_primitive_list
:
1973 for param
in config_primitive
.get("parameter", ()):
1974 if param
["name"] == "ssh-username":
1975 username
= param
["value"]
1979 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1980 "'config-access.ssh-access.default-user'"
1982 credentials
["username"] = username
1983 # n2vc_redesign STEP 3.2
1985 self
._write
_configuration
_status
(
1987 vca_index
=vca_index
,
1988 status
="REGISTERING",
1989 element_under_configuration
=element_under_configuration
,
1990 element_type
=element_type
,
1993 step
= "register execution environment {}".format(credentials
)
1994 self
.logger
.debug(logging_text
+ step
)
1995 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1996 credentials
=credentials
,
1997 namespace
=namespace
,
2002 # for compatibility with MON/POL modules, the need model and application name at database
2003 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
2004 ee_id_parts
= ee_id
.split(".")
2005 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
2006 if len(ee_id_parts
) >= 2:
2007 model_name
= ee_id_parts
[0]
2008 application_name
= ee_id_parts
[1]
2009 db_nsr_update
[db_update_entry
+ "model"] = model_name
2010 db_nsr_update
[db_update_entry
+ "application"] = application_name
2012 # n2vc_redesign STEP 3.3
2013 step
= "Install configuration Software"
2015 self
._write
_configuration
_status
(
2017 vca_index
=vca_index
,
2018 status
="INSTALLING SW",
2019 element_under_configuration
=element_under_configuration
,
2020 element_type
=element_type
,
2021 other_update
=db_nsr_update
,
2024 # TODO check if already done
2025 self
.logger
.debug(logging_text
+ step
)
2027 if vca_type
== "native_charm":
2028 config_primitive
= next(
2029 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2032 if config_primitive
:
2033 config
= self
._map
_primitive
_params
(
2034 config_primitive
, {}, deploy_params
2037 if vca_type
== "lxc_proxy_charm":
2038 if element_type
== "NS":
2039 num_units
= db_nsr
.get("config-units") or 1
2040 elif element_type
== "VNF":
2041 num_units
= db_vnfr
.get("config-units") or 1
2042 elif element_type
== "VDU":
2043 for v
in db_vnfr
["vdur"]:
2044 if vdu_id
== v
["vdu-id-ref"]:
2045 num_units
= v
.get("config-units") or 1
2047 if vca_type
!= "k8s_proxy_charm":
2048 await self
.vca_map
[vca_type
].install_configuration_sw(
2050 artifact_path
=artifact_path
,
2053 num_units
=num_units
,
2058 # write in db flag of configuration_sw already installed
2060 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2063 # add relations for this VCA (wait for other peers related with this VCA)
2064 await self
._add
_vca
_relations
(
2065 logging_text
=logging_text
,
2068 vca_index
=vca_index
,
2071 # if SSH access is required, then get execution environment SSH public
2072 # if native charm we have waited already to VM be UP
2073 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2076 # self.logger.debug("get ssh key block")
2078 config_descriptor
, ("config-access", "ssh-access", "required")
2080 # self.logger.debug("ssh key needed")
2081 # Needed to inject a ssh key
2084 ("config-access", "ssh-access", "default-user"),
2086 step
= "Install configuration Software, getting public ssh key"
2087 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2088 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2091 step
= "Insert public key into VM user={} ssh_key={}".format(
2095 # self.logger.debug("no need to get ssh key")
2096 step
= "Waiting to VM being up and getting IP address"
2097 self
.logger
.debug(logging_text
+ step
)
2099 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2102 # n2vc_redesign STEP 5.1
2103 # wait for RO (ip-address) Insert pub_key into VM
2106 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2107 logging_text
, nsr_id
, vnfr_id
, kdu_name
2109 vnfd
= self
.db
.get_one(
2111 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2113 kdu
= get_kdu(vnfd
, kdu_name
)
2115 service
["name"] for service
in get_kdu_services(kdu
)
2117 exposed_services
= []
2118 for service
in services
:
2119 if any(s
in service
["name"] for s
in kdu_services
):
2120 exposed_services
.append(service
)
2121 await self
.vca_map
[vca_type
].exec_primitive(
2123 primitive_name
="config",
2125 "osm-config": json
.dumps(
2127 k8s
={"services": exposed_services
}
2134 # This verification is needed in order to avoid trying to add a public key
2135 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2136 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2137 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2139 elif db_vnfr
.get("vdur"):
2140 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2150 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2152 # store rw_mgmt_ip in deploy params for later replacement
2153 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2155 # n2vc_redesign STEP 6 Execute initial config primitive
2156 step
= "execute initial config primitive"
2158 # wait for dependent primitives execution (NS -> VNF -> VDU)
2159 if initial_config_primitive_list
:
2160 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2162 # stage, in function of element type: vdu, kdu, vnf or ns
2163 my_vca
= vca_deployed_list
[vca_index
]
2164 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2166 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2167 elif my_vca
.get("member-vnf-index"):
2169 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2172 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2174 self
._write
_configuration
_status
(
2175 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2178 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2180 check_if_terminated_needed
= True
2181 for initial_config_primitive
in initial_config_primitive_list
:
2182 # adding information on the vca_deployed if it is a NS execution environment
2183 if not vca_deployed
["member-vnf-index"]:
2184 deploy_params
["ns_config_info"] = json
.dumps(
2185 self
._get
_ns
_config
_info
(nsr_id
)
2187 # TODO check if already done
2188 primitive_params_
= self
._map
_primitive
_params
(
2189 initial_config_primitive
, {}, deploy_params
2192 step
= "execute primitive '{}' params '{}'".format(
2193 initial_config_primitive
["name"], primitive_params_
2195 self
.logger
.debug(logging_text
+ step
)
2196 await self
.vca_map
[vca_type
].exec_primitive(
2198 primitive_name
=initial_config_primitive
["name"],
2199 params_dict
=primitive_params_
,
2204 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2205 if check_if_terminated_needed
:
2206 if config_descriptor
.get("terminate-config-primitive"):
2208 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2210 check_if_terminated_needed
= False
2212 # TODO register in database that primitive is done
2214 # STEP 7 Configure metrics
2215 if vca_type
== "helm" or vca_type
== "helm-v3":
2216 # TODO: review for those cases where the helm chart is a reference and
2217 # is not part of the NF package
2218 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2220 artifact_path
=artifact_path
,
2221 ee_config_descriptor
=ee_config_descriptor
,
2224 target_ip
=rw_mgmt_ip
,
2230 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2233 for job
in prometheus_jobs
:
2236 {"job_name": job
["job_name"]},
2239 fail_on_empty
=False,
2242 step
= "instantiated at VCA"
2243 self
.logger
.debug(logging_text
+ step
)
2245 self
._write
_configuration
_status
(
2246 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2249 except Exception as e
: # TODO not use Exception but N2VC exception
2250 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2252 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2255 "Exception while {} : {}".format(step
, e
), exc_info
=True
2257 self
._write
_configuration
_status
(
2258 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2260 raise LcmException("{} {}".format(step
, e
)) from e
2262 def _write_ns_status(
2266 current_operation
: str,
2267 current_operation_id
: str,
2268 error_description
: str = None,
2269 error_detail
: str = None,
2270 other_update
: dict = None,
2273 Update db_nsr fields.
2276 :param current_operation:
2277 :param current_operation_id:
2278 :param error_description:
2279 :param error_detail:
2280 :param other_update: Other required changes at database if provided, will be cleared
2284 db_dict
= other_update
or {}
2287 ] = current_operation_id
# for backward compatibility
2288 db_dict
["_admin.current-operation"] = current_operation_id
2289 db_dict
["_admin.operation-type"] = (
2290 current_operation
if current_operation
!= "IDLE" else None
2292 db_dict
["currentOperation"] = current_operation
2293 db_dict
["currentOperationID"] = current_operation_id
2294 db_dict
["errorDescription"] = error_description
2295 db_dict
["errorDetail"] = error_detail
2298 db_dict
["nsState"] = ns_state
2299 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2300 except DbException
as e
:
2301 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2303 def _write_op_status(
2307 error_message
: str = None,
2308 queuePosition
: int = 0,
2309 operation_state
: str = None,
2310 other_update
: dict = None,
2313 db_dict
= other_update
or {}
2314 db_dict
["queuePosition"] = queuePosition
2315 if isinstance(stage
, list):
2316 db_dict
["stage"] = stage
[0]
2317 db_dict
["detailed-status"] = " ".join(stage
)
2318 elif stage
is not None:
2319 db_dict
["stage"] = str(stage
)
2321 if error_message
is not None:
2322 db_dict
["errorMessage"] = error_message
2323 if operation_state
is not None:
2324 db_dict
["operationState"] = operation_state
2325 db_dict
["statusEnteredTime"] = time()
2326 self
.update_db_2("nslcmops", op_id
, db_dict
)
2327 except DbException
as e
:
2329 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2332 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2334 nsr_id
= db_nsr
["_id"]
2335 # configurationStatus
2336 config_status
= db_nsr
.get("configurationStatus")
2339 "configurationStatus.{}.status".format(index
): status
2340 for index
, v
in enumerate(config_status
)
2344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2346 except DbException
as e
:
2348 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2351 def _write_configuration_status(
2356 element_under_configuration
: str = None,
2357 element_type
: str = None,
2358 other_update
: dict = None,
2361 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2362 # .format(vca_index, status))
2365 db_path
= "configurationStatus.{}.".format(vca_index
)
2366 db_dict
= other_update
or {}
2368 db_dict
[db_path
+ "status"] = status
2369 if element_under_configuration
:
2371 db_path
+ "elementUnderConfiguration"
2372 ] = element_under_configuration
2374 db_dict
[db_path
+ "elementType"] = element_type
2375 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2376 except DbException
as e
:
2378 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2379 status
, nsr_id
, vca_index
, e
2383 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2385 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2386 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2387 Database is used because the result can be obtained from a different LCM worker in case of HA.
2388 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2389 :param db_nslcmop: database content of nslcmop
2390 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2391 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2392 computed 'vim-account-id'
2395 nslcmop_id
= db_nslcmop
["_id"]
2396 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2397 if placement_engine
== "PLA":
2399 logging_text
+ "Invoke and wait for placement optimization"
2401 await self
.msg
.aiowrite(
2402 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2404 db_poll_interval
= 5
2405 wait
= db_poll_interval
* 10
2407 while not pla_result
and wait
>= 0:
2408 await asyncio
.sleep(db_poll_interval
)
2409 wait
-= db_poll_interval
2410 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2411 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2415 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2418 for pla_vnf
in pla_result
["vnf"]:
2419 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2420 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2425 {"_id": vnfr
["_id"]},
2426 {"vim-account-id": pla_vnf
["vimAccountId"]},
2429 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2432 def update_nsrs_with_pla_result(self
, params
):
2434 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2436 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2438 except Exception as e
:
2439 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2441 async def instantiate(self
, nsr_id
, nslcmop_id
):
2444 :param nsr_id: ns instance to deploy
2445 :param nslcmop_id: operation to run
2449 # Try to lock HA task here
2450 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2451 if not task_is_locked_by_me
:
2453 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2457 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2458 self
.logger
.debug(logging_text
+ "Enter")
2460 # get all needed from database
2462 # database nsrs record
2465 # database nslcmops record
2468 # update operation on nsrs
2470 # update operation on nslcmops
2471 db_nslcmop_update
= {}
2473 nslcmop_operation_state
= None
2474 db_vnfrs
= {} # vnf's info indexed by member-index
2476 tasks_dict_info
= {} # from task to info text
2480 "Stage 1/5: preparation of the environment.",
2481 "Waiting for previous operations to terminate.",
2484 # ^ stage, step, VIM progress
2486 # wait for any previous tasks in process
2487 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2489 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2490 stage
[1] = "Reading from database."
2491 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2492 db_nsr_update
["detailed-status"] = "creating"
2493 db_nsr_update
["operational-status"] = "init"
2494 self
._write
_ns
_status
(
2496 ns_state
="BUILDING",
2497 current_operation
="INSTANTIATING",
2498 current_operation_id
=nslcmop_id
,
2499 other_update
=db_nsr_update
,
2501 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2503 # read from db: operation
2504 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2505 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2506 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2507 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2508 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2510 ns_params
= db_nslcmop
.get("operationParams")
2511 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2512 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2514 timeout_ns_deploy
= self
.timeout
.get(
2515 "ns_deploy", self
.timeout_ns_deploy
2519 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2520 self
.logger
.debug(logging_text
+ stage
[1])
2521 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2522 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2523 self
.logger
.debug(logging_text
+ stage
[1])
2524 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2525 self
.fs
.sync(db_nsr
["nsd-id"])
2527 # nsr_name = db_nsr["name"] # TODO short-name??
2529 # read from db: vnf's of this ns
2530 stage
[1] = "Getting vnfrs from db."
2531 self
.logger
.debug(logging_text
+ stage
[1])
2532 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2534 # read from db: vnfd's for every vnf
2535 db_vnfds
= [] # every vnfd data
2537 # for each vnf in ns, read vnfd
2538 for vnfr
in db_vnfrs_list
:
2539 if vnfr
.get("kdur"):
2541 for kdur
in vnfr
["kdur"]:
2542 if kdur
.get("additionalParams"):
2543 kdur
["additionalParams"] = json
.loads(
2544 kdur
["additionalParams"]
2546 kdur_list
.append(kdur
)
2547 vnfr
["kdur"] = kdur_list
2549 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2550 vnfd_id
= vnfr
["vnfd-id"]
2551 vnfd_ref
= vnfr
["vnfd-ref"]
2552 self
.fs
.sync(vnfd_id
)
2554 # if we haven't this vnfd, read it from db
2555 if vnfd_id
not in db_vnfds
:
2557 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2560 self
.logger
.debug(logging_text
+ stage
[1])
2561 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2564 db_vnfds
.append(vnfd
)
2566 # Get or generates the _admin.deployed.VCA list
2567 vca_deployed_list
= None
2568 if db_nsr
["_admin"].get("deployed"):
2569 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2570 if vca_deployed_list
is None:
2571 vca_deployed_list
= []
2572 configuration_status_list
= []
2573 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2574 db_nsr_update
["configurationStatus"] = configuration_status_list
2575 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2576 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2577 elif isinstance(vca_deployed_list
, dict):
2578 # maintain backward compatibility. Change a dict to list at database
2579 vca_deployed_list
= list(vca_deployed_list
.values())
2580 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2581 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2584 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2586 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2587 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2589 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2590 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2591 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2593 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2596 # n2vc_redesign STEP 2 Deploy Network Scenario
2597 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2598 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2600 stage
[1] = "Deploying KDUs."
2601 # self.logger.debug(logging_text + "Before deploy_kdus")
2602 # Call to deploy_kdus in case exists the "vdu:kdu" param
2603 await self
.deploy_kdus(
2604 logging_text
=logging_text
,
2606 nslcmop_id
=nslcmop_id
,
2609 task_instantiation_info
=tasks_dict_info
,
2612 stage
[1] = "Getting VCA public key."
2613 # n2vc_redesign STEP 1 Get VCA public ssh-key
2614 # feature 1429. Add n2vc public key to needed VMs
2615 n2vc_key
= self
.n2vc
.get_public_key()
2616 n2vc_key_list
= [n2vc_key
]
2617 if self
.vca_config
.get("public_key"):
2618 n2vc_key_list
.append(self
.vca_config
["public_key"])
2620 stage
[1] = "Deploying NS at VIM."
2621 task_ro
= asyncio
.ensure_future(
2622 self
.instantiate_RO(
2623 logging_text
=logging_text
,
2627 db_nslcmop
=db_nslcmop
,
2630 n2vc_key_list
=n2vc_key_list
,
2634 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2635 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2637 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2638 stage
[1] = "Deploying Execution Environments."
2639 self
.logger
.debug(logging_text
+ stage
[1])
2641 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2642 for vnf_profile
in get_vnf_profiles(nsd
):
2643 vnfd_id
= vnf_profile
["vnfd-id"]
2644 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2645 member_vnf_index
= str(vnf_profile
["id"])
2646 db_vnfr
= db_vnfrs
[member_vnf_index
]
2647 base_folder
= vnfd
["_admin"]["storage"]
2653 # Get additional parameters
2654 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2655 if db_vnfr
.get("additionalParamsForVnf"):
2656 deploy_params
.update(
2657 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2660 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2661 if descriptor_config
:
2663 logging_text
=logging_text
2664 + "member_vnf_index={} ".format(member_vnf_index
),
2667 nslcmop_id
=nslcmop_id
,
2673 member_vnf_index
=member_vnf_index
,
2674 vdu_index
=vdu_index
,
2676 deploy_params
=deploy_params
,
2677 descriptor_config
=descriptor_config
,
2678 base_folder
=base_folder
,
2679 task_instantiation_info
=tasks_dict_info
,
2683 # Deploy charms for each VDU that supports one.
2684 for vdud
in get_vdu_list(vnfd
):
2686 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2687 vdur
= find_in_list(
2688 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2691 if vdur
.get("additionalParams"):
2692 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2694 deploy_params_vdu
= deploy_params
2695 deploy_params_vdu
["OSM"] = get_osm_params(
2696 db_vnfr
, vdu_id
, vdu_count_index
=0
2698 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2700 self
.logger
.debug("VDUD > {}".format(vdud
))
2702 "Descriptor config > {}".format(descriptor_config
)
2704 if descriptor_config
:
2707 for vdu_index
in range(vdud_count
):
2708 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2710 logging_text
=logging_text
2711 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2712 member_vnf_index
, vdu_id
, vdu_index
2716 nslcmop_id
=nslcmop_id
,
2722 member_vnf_index
=member_vnf_index
,
2723 vdu_index
=vdu_index
,
2725 deploy_params
=deploy_params_vdu
,
2726 descriptor_config
=descriptor_config
,
2727 base_folder
=base_folder
,
2728 task_instantiation_info
=tasks_dict_info
,
2731 for kdud
in get_kdu_list(vnfd
):
2732 kdu_name
= kdud
["name"]
2733 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2734 if descriptor_config
:
2739 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2741 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2742 if kdur
.get("additionalParams"):
2743 deploy_params_kdu
.update(
2744 parse_yaml_strings(kdur
["additionalParams"].copy())
2748 logging_text
=logging_text
,
2751 nslcmop_id
=nslcmop_id
,
2757 member_vnf_index
=member_vnf_index
,
2758 vdu_index
=vdu_index
,
2760 deploy_params
=deploy_params_kdu
,
2761 descriptor_config
=descriptor_config
,
2762 base_folder
=base_folder
,
2763 task_instantiation_info
=tasks_dict_info
,
2767 # Check if this NS has a charm configuration
2768 descriptor_config
= nsd
.get("ns-configuration")
2769 if descriptor_config
and descriptor_config
.get("juju"):
2772 member_vnf_index
= None
2778 # Get additional parameters
2779 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2780 if db_nsr
.get("additionalParamsForNs"):
2781 deploy_params
.update(
2782 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2784 base_folder
= nsd
["_admin"]["storage"]
2786 logging_text
=logging_text
,
2789 nslcmop_id
=nslcmop_id
,
2795 member_vnf_index
=member_vnf_index
,
2796 vdu_index
=vdu_index
,
2798 deploy_params
=deploy_params
,
2799 descriptor_config
=descriptor_config
,
2800 base_folder
=base_folder
,
2801 task_instantiation_info
=tasks_dict_info
,
2805 # rest of staff will be done at finally
2808 ROclient
.ROClientException
,
2814 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2817 except asyncio
.CancelledError
:
2819 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2821 exc
= "Operation was cancelled"
2822 except Exception as e
:
2823 exc
= traceback
.format_exc()
2824 self
.logger
.critical(
2825 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2830 error_list
.append(str(exc
))
2832 # wait for pending tasks
2834 stage
[1] = "Waiting for instantiate pending tasks."
2835 self
.logger
.debug(logging_text
+ stage
[1])
2836 error_list
+= await self
._wait
_for
_tasks
(
2844 stage
[1] = stage
[2] = ""
2845 except asyncio
.CancelledError
:
2846 error_list
.append("Cancelled")
2847 # TODO cancel all tasks
2848 except Exception as exc
:
2849 error_list
.append(str(exc
))
2851 # update operation-status
2852 db_nsr_update
["operational-status"] = "running"
2853 # let's begin with VCA 'configured' status (later we can change it)
2854 db_nsr_update
["config-status"] = "configured"
2855 for task
, task_name
in tasks_dict_info
.items():
2856 if not task
.done() or task
.cancelled() or task
.exception():
2857 if task_name
.startswith(self
.task_name_deploy_vca
):
2858 # A N2VC task is pending
2859 db_nsr_update
["config-status"] = "failed"
2861 # RO or KDU task is pending
2862 db_nsr_update
["operational-status"] = "failed"
2864 # update status at database
2866 error_detail
= ". ".join(error_list
)
2867 self
.logger
.error(logging_text
+ error_detail
)
2868 error_description_nslcmop
= "{} Detail: {}".format(
2869 stage
[0], error_detail
2871 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2872 nslcmop_id
, stage
[0]
2875 db_nsr_update
["detailed-status"] = (
2876 error_description_nsr
+ " Detail: " + error_detail
2878 db_nslcmop_update
["detailed-status"] = error_detail
2879 nslcmop_operation_state
= "FAILED"
2883 error_description_nsr
= error_description_nslcmop
= None
2885 db_nsr_update
["detailed-status"] = "Done"
2886 db_nslcmop_update
["detailed-status"] = "Done"
2887 nslcmop_operation_state
= "COMPLETED"
2890 self
._write
_ns
_status
(
2893 current_operation
="IDLE",
2894 current_operation_id
=None,
2895 error_description
=error_description_nsr
,
2896 error_detail
=error_detail
,
2897 other_update
=db_nsr_update
,
2899 self
._write
_op
_status
(
2902 error_message
=error_description_nslcmop
,
2903 operation_state
=nslcmop_operation_state
,
2904 other_update
=db_nslcmop_update
,
2907 if nslcmop_operation_state
:
2909 await self
.msg
.aiowrite(
2914 "nslcmop_id": nslcmop_id
,
2915 "operationState": nslcmop_operation_state
,
2919 except Exception as e
:
2921 logging_text
+ "kafka_write notification Exception {}".format(e
)
2924 self
.logger
.debug(logging_text
+ "Exit")
2925 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2927 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2928 if vnfd_id
not in cached_vnfds
:
2929 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2930 return cached_vnfds
[vnfd_id
]
2932 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2933 if vnf_profile_id
not in cached_vnfrs
:
2934 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2937 "member-vnf-index-ref": vnf_profile_id
,
2938 "nsr-id-ref": nsr_id
,
2941 return cached_vnfrs
[vnf_profile_id
]
2943 def _is_deployed_vca_in_relation(
2944 self
, vca
: DeployedVCA
, relation
: Relation
2947 for endpoint
in (relation
.provider
, relation
.requirer
):
2948 if endpoint
["kdu-resource-profile-id"]:
2951 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2952 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2953 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2959 def _update_ee_relation_data_with_implicit_data(
2960 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2962 ee_relation_data
= safe_get_ee_relation(
2963 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2965 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2966 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2967 "execution-environment-ref"
2969 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2970 vnfd_id
= vnf_profile
["vnfd-id"]
2971 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2974 if ee_relation_level
== EELevel
.VNF
2975 else ee_relation_data
["vdu-profile-id"]
2977 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2980 f
"not execution environments found for ee_relation {ee_relation_data}"
2982 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2983 return ee_relation_data
2985 def _get_ns_relations(
2988 nsd
: Dict
[str, Any
],
2990 cached_vnfds
: Dict
[str, Any
],
2991 ) -> List
[Relation
]:
2993 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2994 for r
in db_ns_relations
:
2995 provider_dict
= None
2996 requirer_dict
= None
2997 if all(key
in r
for key
in ("provider", "requirer")):
2998 provider_dict
= r
["provider"]
2999 requirer_dict
= r
["requirer"]
3000 elif "entities" in r
:
3001 provider_id
= r
["entities"][0]["id"]
3004 "endpoint": r
["entities"][0]["endpoint"],
3006 if provider_id
!= nsd
["id"]:
3007 provider_dict
["vnf-profile-id"] = provider_id
3008 requirer_id
= r
["entities"][1]["id"]
3011 "endpoint": r
["entities"][1]["endpoint"],
3013 if requirer_id
!= nsd
["id"]:
3014 requirer_dict
["vnf-profile-id"] = requirer_id
3017 "provider/requirer or entities must be included in the relation."
3019 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3020 nsr_id
, nsd
, provider_dict
, cached_vnfds
3022 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3023 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3025 provider
= EERelation(relation_provider
)
3026 requirer
= EERelation(relation_requirer
)
3027 relation
= Relation(r
["name"], provider
, requirer
)
3028 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3030 relations
.append(relation
)
3033 def _get_vnf_relations(
3036 nsd
: Dict
[str, Any
],
3038 cached_vnfds
: Dict
[str, Any
],
3039 ) -> List
[Relation
]:
3041 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3042 vnf_profile_id
= vnf_profile
["id"]
3043 vnfd_id
= vnf_profile
["vnfd-id"]
3044 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3045 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3046 for r
in db_vnf_relations
:
3047 provider_dict
= None
3048 requirer_dict
= None
3049 if all(key
in r
for key
in ("provider", "requirer")):
3050 provider_dict
= r
["provider"]
3051 requirer_dict
= r
["requirer"]
3052 elif "entities" in r
:
3053 provider_id
= r
["entities"][0]["id"]
3056 "vnf-profile-id": vnf_profile_id
,
3057 "endpoint": r
["entities"][0]["endpoint"],
3059 if provider_id
!= vnfd_id
:
3060 provider_dict
["vdu-profile-id"] = provider_id
3061 requirer_id
= r
["entities"][1]["id"]
3064 "vnf-profile-id": vnf_profile_id
,
3065 "endpoint": r
["entities"][1]["endpoint"],
3067 if requirer_id
!= vnfd_id
:
3068 requirer_dict
["vdu-profile-id"] = requirer_id
3071 "provider/requirer or entities must be included in the relation."
3073 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3074 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3076 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3077 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3079 provider
= EERelation(relation_provider
)
3080 requirer
= EERelation(relation_requirer
)
3081 relation
= Relation(r
["name"], provider
, requirer
)
3082 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3084 relations
.append(relation
)
3087 def _get_kdu_resource_data(
3089 ee_relation
: EERelation
,
3090 db_nsr
: Dict
[str, Any
],
3091 cached_vnfds
: Dict
[str, Any
],
3092 ) -> DeployedK8sResource
:
3093 nsd
= get_nsd(db_nsr
)
3094 vnf_profiles
= get_vnf_profiles(nsd
)
3095 vnfd_id
= find_in_list(
3097 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3099 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3100 kdu_resource_profile
= get_kdu_resource_profile(
3101 db_vnfd
, ee_relation
.kdu_resource_profile_id
3103 kdu_name
= kdu_resource_profile
["kdu-name"]
3104 deployed_kdu
, _
= get_deployed_kdu(
3105 db_nsr
.get("_admin", ()).get("deployed", ()),
3107 ee_relation
.vnf_profile_id
,
3109 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3112 def _get_deployed_component(
3114 ee_relation
: EERelation
,
3115 db_nsr
: Dict
[str, Any
],
3116 cached_vnfds
: Dict
[str, Any
],
3117 ) -> DeployedComponent
:
3118 nsr_id
= db_nsr
["_id"]
3119 deployed_component
= None
3120 ee_level
= EELevel
.get_level(ee_relation
)
3121 if ee_level
== EELevel
.NS
:
3122 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3124 deployed_component
= DeployedVCA(nsr_id
, vca
)
3125 elif ee_level
== EELevel
.VNF
:
3126 vca
= get_deployed_vca(
3130 "member-vnf-index": ee_relation
.vnf_profile_id
,
3131 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3135 deployed_component
= DeployedVCA(nsr_id
, vca
)
3136 elif ee_level
== EELevel
.VDU
:
3137 vca
= get_deployed_vca(
3140 "vdu_id": ee_relation
.vdu_profile_id
,
3141 "member-vnf-index": ee_relation
.vnf_profile_id
,
3142 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3146 deployed_component
= DeployedVCA(nsr_id
, vca
)
3147 elif ee_level
== EELevel
.KDU
:
3148 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3149 ee_relation
, db_nsr
, cached_vnfds
3151 if kdu_resource_data
:
3152 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3153 return deployed_component
3155 async def _add_relation(
3159 db_nsr
: Dict
[str, Any
],
3160 cached_vnfds
: Dict
[str, Any
],
3161 cached_vnfrs
: Dict
[str, Any
],
3163 deployed_provider
= self
._get
_deployed
_component
(
3164 relation
.provider
, db_nsr
, cached_vnfds
3166 deployed_requirer
= self
._get
_deployed
_component
(
3167 relation
.requirer
, db_nsr
, cached_vnfds
3171 and deployed_requirer
3172 and deployed_provider
.config_sw_installed
3173 and deployed_requirer
.config_sw_installed
3175 provider_db_vnfr
= (
3177 relation
.provider
.nsr_id
,
3178 relation
.provider
.vnf_profile_id
,
3181 if relation
.provider
.vnf_profile_id
3184 requirer_db_vnfr
= (
3186 relation
.requirer
.nsr_id
,
3187 relation
.requirer
.vnf_profile_id
,
3190 if relation
.requirer
.vnf_profile_id
3193 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3194 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3195 provider_relation_endpoint
= RelationEndpoint(
3196 deployed_provider
.ee_id
,
3198 relation
.provider
.endpoint
,
3200 requirer_relation_endpoint
= RelationEndpoint(
3201 deployed_requirer
.ee_id
,
3203 relation
.requirer
.endpoint
,
3205 await self
.vca_map
[vca_type
].add_relation(
3206 provider
=provider_relation_endpoint
,
3207 requirer
=requirer_relation_endpoint
,
3209 # remove entry from relations list
3213 async def _add_vca_relations(
3219 timeout
: int = 3600,
3223 # 1. find all relations for this VCA
3224 # 2. wait for other peers related
3228 # STEP 1: find all relations for this VCA
3231 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3232 nsd
= get_nsd(db_nsr
)
3235 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3236 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3241 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3242 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3244 # if no relations, terminate
3246 self
.logger
.debug(logging_text
+ " No relations")
3249 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3256 if now
- start
>= timeout
:
3257 self
.logger
.error(logging_text
+ " : timeout adding relations")
3260 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3261 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3263 # for each relation, find the VCA's related
3264 for relation
in relations
.copy():
3265 added
= await self
._add
_relation
(
3273 relations
.remove(relation
)
3276 self
.logger
.debug("Relations added")
3278 await asyncio
.sleep(5.0)
3282 except Exception as e
:
3283 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3286 async def _install_kdu(
3294 k8s_instance_info
: dict,
3295 k8params
: dict = None,
3301 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3304 "collection": "nsrs",
3305 "filter": {"_id": nsr_id
},
3306 "path": nsr_db_path
,
3309 if k8s_instance_info
.get("kdu-deployment-name"):
3310 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3312 kdu_instance
= self
.k8scluster_map
[
3314 ].generate_kdu_instance_name(
3315 db_dict
=db_dict_install
,
3316 kdu_model
=k8s_instance_info
["kdu-model"],
3317 kdu_name
=k8s_instance_info
["kdu-name"],
3320 # Update the nsrs table with the kdu-instance value
3324 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3327 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3328 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3329 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3330 # namespace, this first verification could be removed, and the next step would be done for any kind
3332 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3333 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3334 if k8sclustertype
in ("juju", "juju-bundle"):
3335 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3336 # that the user passed a namespace which he wants its KDU to be deployed in)
3342 "_admin.projects_write": k8s_instance_info
["namespace"],
3343 "_admin.projects_read": k8s_instance_info
["namespace"],
3349 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3354 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3356 k8s_instance_info
["namespace"] = kdu_instance
3358 await self
.k8scluster_map
[k8sclustertype
].install(
3359 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3360 kdu_model
=k8s_instance_info
["kdu-model"],
3363 db_dict
=db_dict_install
,
3365 kdu_name
=k8s_instance_info
["kdu-name"],
3366 namespace
=k8s_instance_info
["namespace"],
3367 kdu_instance
=kdu_instance
,
3371 # Obtain services to obtain management service ip
3372 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3373 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3374 kdu_instance
=kdu_instance
,
3375 namespace
=k8s_instance_info
["namespace"],
3378 # Obtain management service info (if exists)
3379 vnfr_update_dict
= {}
3380 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3382 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3387 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3390 for service
in kdud
.get("service", [])
3391 if service
.get("mgmt-service")
3393 for mgmt_service
in mgmt_services
:
3394 for service
in services
:
3395 if service
["name"].startswith(mgmt_service
["name"]):
3396 # Mgmt service found, Obtain service ip
3397 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3398 if isinstance(ip
, list) and len(ip
) == 1:
3402 "kdur.{}.ip-address".format(kdu_index
)
3405 # Check if must update also mgmt ip at the vnf
3406 service_external_cp
= mgmt_service
.get(
3407 "external-connection-point-ref"
3409 if service_external_cp
:
3411 deep_get(vnfd
, ("mgmt-interface", "cp"))
3412 == service_external_cp
3414 vnfr_update_dict
["ip-address"] = ip
3419 "external-connection-point-ref", ""
3421 == service_external_cp
,
3424 "kdur.{}.ip-address".format(kdu_index
)
3429 "Mgmt service name: {} not found".format(
3430 mgmt_service
["name"]
3434 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3435 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3437 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3440 and kdu_config
.get("initial-config-primitive")
3441 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3443 initial_config_primitive_list
= kdu_config
.get(
3444 "initial-config-primitive"
3446 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3448 for initial_config_primitive
in initial_config_primitive_list
:
3449 primitive_params_
= self
._map
_primitive
_params
(
3450 initial_config_primitive
, {}, {}
3453 await asyncio
.wait_for(
3454 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3455 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3456 kdu_instance
=kdu_instance
,
3457 primitive_name
=initial_config_primitive
["name"],
3458 params
=primitive_params_
,
3459 db_dict
=db_dict_install
,
3465 except Exception as e
:
3466 # Prepare update db with error and raise exception
3469 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3473 vnfr_data
.get("_id"),
3474 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3477 # ignore to keep original exception
3479 # reraise original error
3484 async def deploy_kdus(
3491 task_instantiation_info
,
3493 # Launch kdus if present in the descriptor
3495 k8scluster_id_2_uuic
= {
3496 "helm-chart-v3": {},
3501 async def _get_cluster_id(cluster_id
, cluster_type
):
3502 nonlocal k8scluster_id_2_uuic
3503 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3504 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3506 # check if K8scluster is creating and wait look if previous tasks in process
3507 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3508 "k8scluster", cluster_id
3511 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3512 task_name
, cluster_id
3514 self
.logger
.debug(logging_text
+ text
)
3515 await asyncio
.wait(task_dependency
, timeout
=3600)
3517 db_k8scluster
= self
.db
.get_one(
3518 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3520 if not db_k8scluster
:
3521 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3523 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3525 if cluster_type
== "helm-chart-v3":
3527 # backward compatibility for existing clusters that have not been initialized for helm v3
3528 k8s_credentials
= yaml
.safe_dump(
3529 db_k8scluster
.get("credentials")
3531 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3532 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3534 db_k8scluster_update
= {}
3535 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3536 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3537 db_k8scluster_update
[
3538 "_admin.helm-chart-v3.created"
3540 db_k8scluster_update
[
3541 "_admin.helm-chart-v3.operationalState"
3544 "k8sclusters", cluster_id
, db_k8scluster_update
3546 except Exception as e
:
3549 + "error initializing helm-v3 cluster: {}".format(str(e
))
3552 "K8s cluster '{}' has not been initialized for '{}'".format(
3553 cluster_id
, cluster_type
3558 "K8s cluster '{}' has not been initialized for '{}'".format(
3559 cluster_id
, cluster_type
3562 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3565 logging_text
+= "Deploy kdus: "
3568 db_nsr_update
= {"_admin.deployed.K8s": []}
3569 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3572 updated_cluster_list
= []
3573 updated_v3_cluster_list
= []
3575 for vnfr_data
in db_vnfrs
.values():
3576 vca_id
= self
.get_vca_id(vnfr_data
, {})
3577 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3578 # Step 0: Prepare and set parameters
3579 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3580 vnfd_id
= vnfr_data
.get("vnfd-id")
3581 vnfd_with_id
= find_in_list(
3582 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3586 for kdud
in vnfd_with_id
["kdu"]
3587 if kdud
["name"] == kdur
["kdu-name"]
3589 namespace
= kdur
.get("k8s-namespace")
3590 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3591 if kdur
.get("helm-chart"):
3592 kdumodel
= kdur
["helm-chart"]
3593 # Default version: helm3, if helm-version is v2 assign v2
3594 k8sclustertype
= "helm-chart-v3"
3595 self
.logger
.debug("kdur: {}".format(kdur
))
3597 kdur
.get("helm-version")
3598 and kdur
.get("helm-version") == "v2"
3600 k8sclustertype
= "helm-chart"
3601 elif kdur
.get("juju-bundle"):
3602 kdumodel
= kdur
["juju-bundle"]
3603 k8sclustertype
= "juju-bundle"
3606 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3607 "juju-bundle. Maybe an old NBI version is running".format(
3608 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3611 # check if kdumodel is a file and exists
3613 vnfd_with_id
= find_in_list(
3614 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3616 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3617 if storage
: # may be not present if vnfd has not artifacts
3618 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3619 if storage
["pkg-dir"]:
3620 filename
= "{}/{}/{}s/{}".format(
3627 filename
= "{}/Scripts/{}s/{}".format(
3632 if self
.fs
.file_exists(
3633 filename
, mode
="file"
3634 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3635 kdumodel
= self
.fs
.path
+ filename
3636 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3638 except Exception: # it is not a file
3641 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3642 step
= "Synchronize repos for k8s cluster '{}'".format(
3645 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3649 k8sclustertype
== "helm-chart"
3650 and cluster_uuid
not in updated_cluster_list
3652 k8sclustertype
== "helm-chart-v3"
3653 and cluster_uuid
not in updated_v3_cluster_list
3655 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3656 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3657 cluster_uuid
=cluster_uuid
3660 if del_repo_list
or added_repo_dict
:
3661 if k8sclustertype
== "helm-chart":
3663 "_admin.helm_charts_added." + item
: None
3664 for item
in del_repo_list
3667 "_admin.helm_charts_added." + item
: name
3668 for item
, name
in added_repo_dict
.items()
3670 updated_cluster_list
.append(cluster_uuid
)
3671 elif k8sclustertype
== "helm-chart-v3":
3673 "_admin.helm_charts_v3_added." + item
: None
3674 for item
in del_repo_list
3677 "_admin.helm_charts_v3_added." + item
: name
3678 for item
, name
in added_repo_dict
.items()
3680 updated_v3_cluster_list
.append(cluster_uuid
)
3682 logging_text
+ "repos synchronized on k8s cluster "
3683 "'{}' to_delete: {}, to_add: {}".format(
3684 k8s_cluster_id
, del_repo_list
, added_repo_dict
3689 {"_id": k8s_cluster_id
},
3695 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3696 vnfr_data
["member-vnf-index-ref"],
3700 k8s_instance_info
= {
3701 "kdu-instance": None,
3702 "k8scluster-uuid": cluster_uuid
,
3703 "k8scluster-type": k8sclustertype
,
3704 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3705 "kdu-name": kdur
["kdu-name"],
3706 "kdu-model": kdumodel
,
3707 "namespace": namespace
,
3708 "kdu-deployment-name": kdu_deployment_name
,
3710 db_path
= "_admin.deployed.K8s.{}".format(index
)
3711 db_nsr_update
[db_path
] = k8s_instance_info
3712 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3713 vnfd_with_id
= find_in_list(
3714 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3716 task
= asyncio
.ensure_future(
3725 k8params
=desc_params
,
3730 self
.lcm_tasks
.register(
3734 "instantiate_KDU-{}".format(index
),
3737 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3743 except (LcmException
, asyncio
.CancelledError
):
3745 except Exception as e
:
3746 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3747 if isinstance(e
, (N2VCException
, DbException
)):
3748 self
.logger
.error(logging_text
+ msg
)
3750 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3751 raise LcmException(msg
)
3754 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3773 task_instantiation_info
,
3776 # launch instantiate_N2VC in a asyncio task and register task object
3777 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3778 # if not found, create one entry and update database
3779 # fill db_nsr._admin.deployed.VCA.<index>
3782 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3786 get_charm_name
= False
3787 if "execution-environment-list" in descriptor_config
:
3788 ee_list
= descriptor_config
.get("execution-environment-list", [])
3789 elif "juju" in descriptor_config
:
3790 ee_list
= [descriptor_config
] # ns charms
3791 if "execution-environment-list" not in descriptor_config
:
3792 # charm name is only required for ns charms
3793 get_charm_name
= True
3794 else: # other types as script are not supported
3797 for ee_item
in ee_list
:
3800 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3801 ee_item
.get("juju"), ee_item
.get("helm-chart")
3804 ee_descriptor_id
= ee_item
.get("id")
3805 if ee_item
.get("juju"):
3806 vca_name
= ee_item
["juju"].get("charm")
3808 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3811 if ee_item
["juju"].get("charm") is not None
3814 if ee_item
["juju"].get("cloud") == "k8s":
3815 vca_type
= "k8s_proxy_charm"
3816 elif ee_item
["juju"].get("proxy") is False:
3817 vca_type
= "native_charm"
3818 elif ee_item
.get("helm-chart"):
3819 vca_name
= ee_item
["helm-chart"]
3820 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3823 vca_type
= "helm-v3"
3826 logging_text
+ "skipping non juju neither charm configuration"
3831 for vca_index
, vca_deployed
in enumerate(
3832 db_nsr
["_admin"]["deployed"]["VCA"]
3834 if not vca_deployed
:
3837 vca_deployed
.get("member-vnf-index") == member_vnf_index
3838 and vca_deployed
.get("vdu_id") == vdu_id
3839 and vca_deployed
.get("kdu_name") == kdu_name
3840 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3841 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3845 # not found, create one.
3847 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3850 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3852 target
+= "/kdu/{}".format(kdu_name
)
3854 "target_element": target
,
3855 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3856 "member-vnf-index": member_vnf_index
,
3858 "kdu_name": kdu_name
,
3859 "vdu_count_index": vdu_index
,
3860 "operational-status": "init", # TODO revise
3861 "detailed-status": "", # TODO revise
3862 "step": "initial-deploy", # TODO revise
3864 "vdu_name": vdu_name
,
3866 "ee_descriptor_id": ee_descriptor_id
,
3867 "charm_name": charm_name
,
3871 # create VCA and configurationStatus in db
3873 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3874 "configurationStatus.{}".format(vca_index
): dict(),
3876 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3878 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3880 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3881 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3882 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3885 task_n2vc
= asyncio
.ensure_future(
3886 self
.instantiate_N2VC(
3887 logging_text
=logging_text
,
3888 vca_index
=vca_index
,
3894 vdu_index
=vdu_index
,
3895 deploy_params
=deploy_params
,
3896 config_descriptor
=descriptor_config
,
3897 base_folder
=base_folder
,
3898 nslcmop_id
=nslcmop_id
,
3902 ee_config_descriptor
=ee_item
,
3905 self
.lcm_tasks
.register(
3909 "instantiate_N2VC-{}".format(vca_index
),
3912 task_instantiation_info
[
3914 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3915 member_vnf_index
or "", vdu_id
or ""
3919 def _create_nslcmop(nsr_id
, operation
, params
):
3921 Creates a ns-lcm-opp content to be stored at database.
3922 :param nsr_id: internal id of the instance
3923 :param operation: instantiate, terminate, scale, action, ...
3924 :param params: user parameters for the operation
3925 :return: dictionary following SOL005 format
3927 # Raise exception if invalid arguments
3928 if not (nsr_id
and operation
and params
):
3930 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3937 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3938 "operationState": "PROCESSING",
3939 "statusEnteredTime": now
,
3940 "nsInstanceId": nsr_id
,
3941 "lcmOperationType": operation
,
3943 "isAutomaticInvocation": False,
3944 "operationParams": params
,
3945 "isCancelPending": False,
3947 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3948 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3953 def _format_additional_params(self
, params
):
3954 params
= params
or {}
3955 for key
, value
in params
.items():
3956 if str(value
).startswith("!!yaml "):
3957 params
[key
] = yaml
.safe_load(value
[7:])
3960 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3961 primitive
= seq
.get("name")
3962 primitive_params
= {}
3964 "member_vnf_index": vnf_index
,
3965 "primitive": primitive
,
3966 "primitive_params": primitive_params
,
3969 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3973 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3974 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3975 if op
.get("operationState") == "COMPLETED":
3976 # b. Skip sub-operation
3977 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3978 return self
.SUBOPERATION_STATUS_SKIP
3980 # c. retry executing sub-operation
3981 # The sub-operation exists, and operationState != 'COMPLETED'
3982 # Update operationState = 'PROCESSING' to indicate a retry.
3983 operationState
= "PROCESSING"
3984 detailed_status
= "In progress"
3985 self
._update
_suboperation
_status
(
3986 db_nslcmop
, op_index
, operationState
, detailed_status
3988 # Return the sub-operation index
3989 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3990 # with arguments extracted from the sub-operation
3993 # Find a sub-operation where all keys in a matching dictionary must match
3994 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3995 def _find_suboperation(self
, db_nslcmop
, match
):
3996 if db_nslcmop
and match
:
3997 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3998 for i
, op
in enumerate(op_list
):
3999 if all(op
.get(k
) == match
[k
] for k
in match
):
4001 return self
.SUBOPERATION_STATUS_NOT_FOUND
4003 # Update status for a sub-operation given its index
4004 def _update_suboperation_status(
4005 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4007 # Update DB for HA tasks
4008 q_filter
= {"_id": db_nslcmop
["_id"]}
4010 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4011 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4014 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4017 # Add sub-operation, return the index of the added sub-operation
4018 # Optionally, set operationState, detailed-status, and operationType
4019 # Status and type are currently set for 'scale' sub-operations:
4020 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4021 # 'detailed-status' : status message
4022 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4023 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4024 def _add_suboperation(
4032 mapped_primitive_params
,
4033 operationState
=None,
4034 detailed_status
=None,
4037 RO_scaling_info
=None,
4040 return self
.SUBOPERATION_STATUS_NOT_FOUND
4041 # Get the "_admin.operations" list, if it exists
4042 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4043 op_list
= db_nslcmop_admin
.get("operations")
4044 # Create or append to the "_admin.operations" list
4046 "member_vnf_index": vnf_index
,
4048 "vdu_count_index": vdu_count_index
,
4049 "primitive": primitive
,
4050 "primitive_params": mapped_primitive_params
,
4053 new_op
["operationState"] = operationState
4055 new_op
["detailed-status"] = detailed_status
4057 new_op
["lcmOperationType"] = operationType
4059 new_op
["RO_nsr_id"] = RO_nsr_id
4061 new_op
["RO_scaling_info"] = RO_scaling_info
4063 # No existing operations, create key 'operations' with current operation as first list element
4064 db_nslcmop_admin
.update({"operations": [new_op
]})
4065 op_list
= db_nslcmop_admin
.get("operations")
4067 # Existing operations, append operation to list
4068 op_list
.append(new_op
)
4070 db_nslcmop_update
= {"_admin.operations": op_list
}
4071 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4072 op_index
= len(op_list
) - 1
4075 # Helper methods for scale() sub-operations
4077 # pre-scale/post-scale:
4078 # Check for 3 different cases:
4079 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4080 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4081 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4082 def _check_or_add_scale_suboperation(
4086 vnf_config_primitive
,
4090 RO_scaling_info
=None,
4092 # Find this sub-operation
4093 if RO_nsr_id
and RO_scaling_info
:
4094 operationType
= "SCALE-RO"
4096 "member_vnf_index": vnf_index
,
4097 "RO_nsr_id": RO_nsr_id
,
4098 "RO_scaling_info": RO_scaling_info
,
4102 "member_vnf_index": vnf_index
,
4103 "primitive": vnf_config_primitive
,
4104 "primitive_params": primitive_params
,
4105 "lcmOperationType": operationType
,
4107 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4108 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4109 # a. New sub-operation
4110 # The sub-operation does not exist, add it.
4111 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4112 # The following parameters are set to None for all kind of scaling:
4114 vdu_count_index
= None
4116 if RO_nsr_id
and RO_scaling_info
:
4117 vnf_config_primitive
= None
4118 primitive_params
= None
4121 RO_scaling_info
= None
4122 # Initial status for sub-operation
4123 operationState
= "PROCESSING"
4124 detailed_status
= "In progress"
4125 # Add sub-operation for pre/post-scaling (zero or more operations)
4126 self
._add
_suboperation
(
4132 vnf_config_primitive
,
4140 return self
.SUBOPERATION_STATUS_NEW
4142 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4143 # or op_index (operationState != 'COMPLETED')
4144 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4146 # Function to return execution_environment id
4148 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4149 # TODO vdu_index_count
4150 for vca
in vca_deployed_list
:
4151 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4154 async def destroy_N2VC(
4162 exec_primitives
=True,
4167 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4168 :param logging_text:
4170 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4171 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4172 :param vca_index: index in the database _admin.deployed.VCA
4173 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4174 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4175 not executed properly
4176 :param scaling_in: True destroys the application, False destroys the model
4177 :return: None or exception
4182 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4183 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4187 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4189 # execute terminate_primitives
4191 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4192 config_descriptor
.get("terminate-config-primitive"),
4193 vca_deployed
.get("ee_descriptor_id"),
4195 vdu_id
= vca_deployed
.get("vdu_id")
4196 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4197 vdu_name
= vca_deployed
.get("vdu_name")
4198 vnf_index
= vca_deployed
.get("member-vnf-index")
4199 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4200 for seq
in terminate_primitives
:
4201 # For each sequence in list, get primitive and call _ns_execute_primitive()
4202 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4203 vnf_index
, seq
.get("name")
4205 self
.logger
.debug(logging_text
+ step
)
4206 # Create the primitive for each sequence, i.e. "primitive": "touch"
4207 primitive
= seq
.get("name")
4208 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4213 self
._add
_suboperation
(
4220 mapped_primitive_params
,
4222 # Sub-operations: Call _ns_execute_primitive() instead of action()
4224 result
, result_detail
= await self
._ns
_execute
_primitive
(
4225 vca_deployed
["ee_id"],
4227 mapped_primitive_params
,
4231 except LcmException
:
4232 # this happens when VCA is not deployed. In this case it is not needed to terminate
4234 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4235 if result
not in result_ok
:
4237 "terminate_primitive {} for vnf_member_index={} fails with "
4238 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4240 # set that this VCA do not need terminated
4241 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4245 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4248 # Delete Prometheus Jobs if any
4249 # This uses NSR_ID, so it will destroy any jobs under this index
4250 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4253 await self
.vca_map
[vca_type
].delete_execution_environment(
4254 vca_deployed
["ee_id"],
4255 scaling_in
=scaling_in
,
4260 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4261 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4262 namespace
= "." + db_nsr
["_id"]
4264 await self
.n2vc
.delete_namespace(
4265 namespace
=namespace
,
4266 total_timeout
=self
.timeout_charm_delete
,
4269 except N2VCNotFound
: # already deleted. Skip
4271 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4273 async def _terminate_RO(
4274 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4277 Terminates a deployment from RO
4278 :param logging_text:
4279 :param nsr_deployed: db_nsr._admin.deployed
4282 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4283 this method will update only the index 2, but it will write on database the concatenated content of the list
4288 ro_nsr_id
= ro_delete_action
= None
4289 if nsr_deployed
and nsr_deployed
.get("RO"):
4290 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4291 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4294 stage
[2] = "Deleting ns from VIM."
4295 db_nsr_update
["detailed-status"] = " ".join(stage
)
4296 self
._write
_op
_status
(nslcmop_id
, stage
)
4297 self
.logger
.debug(logging_text
+ stage
[2])
4298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4299 self
._write
_op
_status
(nslcmop_id
, stage
)
4300 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4301 ro_delete_action
= desc
["action_id"]
4303 "_admin.deployed.RO.nsr_delete_action_id"
4304 ] = ro_delete_action
4305 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4306 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4307 if ro_delete_action
:
4308 # wait until NS is deleted from VIM
4309 stage
[2] = "Waiting ns deleted from VIM."
4310 detailed_status_old
= None
4314 + " RO_id={} ro_delete_action={}".format(
4315 ro_nsr_id
, ro_delete_action
4318 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4319 self
._write
_op
_status
(nslcmop_id
, stage
)
4321 delete_timeout
= 20 * 60 # 20 minutes
4322 while delete_timeout
> 0:
4323 desc
= await self
.RO
.show(
4325 item_id_name
=ro_nsr_id
,
4326 extra_item
="action",
4327 extra_item_id
=ro_delete_action
,
4331 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4333 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4334 if ns_status
== "ERROR":
4335 raise ROclient
.ROClientException(ns_status_info
)
4336 elif ns_status
== "BUILD":
4337 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4338 elif ns_status
== "ACTIVE":
4339 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4340 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4345 ), "ROclient.check_action_status returns unknown {}".format(
4348 if stage
[2] != detailed_status_old
:
4349 detailed_status_old
= stage
[2]
4350 db_nsr_update
["detailed-status"] = " ".join(stage
)
4351 self
._write
_op
_status
(nslcmop_id
, stage
)
4352 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4353 await asyncio
.sleep(5, loop
=self
.loop
)
4355 else: # delete_timeout <= 0:
4356 raise ROclient
.ROClientException(
4357 "Timeout waiting ns deleted from VIM"
4360 except Exception as e
:
4361 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4363 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4365 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4366 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4367 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4369 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4372 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4374 failed_detail
.append("delete conflict: {}".format(e
))
4377 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4380 failed_detail
.append("delete error: {}".format(e
))
4382 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4386 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4387 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4389 stage
[2] = "Deleting nsd from RO."
4390 db_nsr_update
["detailed-status"] = " ".join(stage
)
4391 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4392 self
._write
_op
_status
(nslcmop_id
, stage
)
4393 await self
.RO
.delete("nsd", ro_nsd_id
)
4395 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4397 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4398 except Exception as e
:
4400 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4402 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4404 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4407 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4409 failed_detail
.append(
4410 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4412 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4414 failed_detail
.append(
4415 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4417 self
.logger
.error(logging_text
+ failed_detail
[-1])
4419 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4420 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4421 if not vnf_deployed
or not vnf_deployed
["id"]:
4424 ro_vnfd_id
= vnf_deployed
["id"]
4427 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4428 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4430 db_nsr_update
["detailed-status"] = " ".join(stage
)
4431 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4432 self
._write
_op
_status
(nslcmop_id
, stage
)
4433 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4435 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4437 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4438 except Exception as e
:
4440 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4443 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4447 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4450 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4452 failed_detail
.append(
4453 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4455 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4457 failed_detail
.append(
4458 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4460 self
.logger
.error(logging_text
+ failed_detail
[-1])
4463 stage
[2] = "Error deleting from VIM"
4465 stage
[2] = "Deleted from VIM"
4466 db_nsr_update
["detailed-status"] = " ".join(stage
)
4467 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4468 self
._write
_op
_status
(nslcmop_id
, stage
)
4471 raise LcmException("; ".join(failed_detail
))
4473 async def terminate(self
, nsr_id
, nslcmop_id
):
4474 # Try to lock HA task here
4475 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4476 if not task_is_locked_by_me
:
4479 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4480 self
.logger
.debug(logging_text
+ "Enter")
4481 timeout_ns_terminate
= self
.timeout_ns_terminate
4484 operation_params
= None
4486 error_list
= [] # annotates all failed error messages
4487 db_nslcmop_update
= {}
4488 autoremove
= False # autoremove after terminated
4489 tasks_dict_info
= {}
4492 "Stage 1/3: Preparing task.",
4493 "Waiting for previous operations to terminate.",
4496 # ^ contains [stage, step, VIM-status]
4498 # wait for any previous tasks in process
4499 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4501 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4502 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4503 operation_params
= db_nslcmop
.get("operationParams") or {}
4504 if operation_params
.get("timeout_ns_terminate"):
4505 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4506 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4507 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4509 db_nsr_update
["operational-status"] = "terminating"
4510 db_nsr_update
["config-status"] = "terminating"
4511 self
._write
_ns
_status
(
4513 ns_state
="TERMINATING",
4514 current_operation
="TERMINATING",
4515 current_operation_id
=nslcmop_id
,
4516 other_update
=db_nsr_update
,
4518 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4519 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4520 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4523 stage
[1] = "Getting vnf descriptors from db."
4524 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4526 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4528 db_vnfds_from_id
= {}
4529 db_vnfds_from_member_index
= {}
4531 for vnfr
in db_vnfrs_list
:
4532 vnfd_id
= vnfr
["vnfd-id"]
4533 if vnfd_id
not in db_vnfds_from_id
:
4534 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4535 db_vnfds_from_id
[vnfd_id
] = vnfd
4536 db_vnfds_from_member_index
[
4537 vnfr
["member-vnf-index-ref"]
4538 ] = db_vnfds_from_id
[vnfd_id
]
4540 # Destroy individual execution environments when there are terminating primitives.
4541 # Rest of EE will be deleted at once
4542 # TODO - check before calling _destroy_N2VC
4543 # if not operation_params.get("skip_terminate_primitives"):#
4544 # or not vca.get("needed_terminate"):
4545 stage
[0] = "Stage 2/3 execute terminating primitives."
4546 self
.logger
.debug(logging_text
+ stage
[0])
4547 stage
[1] = "Looking execution environment that needs terminate."
4548 self
.logger
.debug(logging_text
+ stage
[1])
4550 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4551 config_descriptor
= None
4552 vca_member_vnf_index
= vca
.get("member-vnf-index")
4553 vca_id
= self
.get_vca_id(
4554 db_vnfrs_dict
.get(vca_member_vnf_index
)
4555 if vca_member_vnf_index
4559 if not vca
or not vca
.get("ee_id"):
4561 if not vca
.get("member-vnf-index"):
4563 config_descriptor
= db_nsr
.get("ns-configuration")
4564 elif vca
.get("vdu_id"):
4565 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4566 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4567 elif vca
.get("kdu_name"):
4568 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4569 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4571 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4572 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4573 vca_type
= vca
.get("type")
4574 exec_terminate_primitives
= not operation_params
.get(
4575 "skip_terminate_primitives"
4576 ) and vca
.get("needed_terminate")
4577 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4578 # pending native charms
4580 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4582 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4583 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4584 task
= asyncio
.ensure_future(
4592 exec_terminate_primitives
,
4596 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4598 # wait for pending tasks of terminate primitives
4602 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4604 error_list
= await self
._wait
_for
_tasks
(
4607 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4611 tasks_dict_info
.clear()
4613 return # raise LcmException("; ".join(error_list))
4615 # remove All execution environments at once
4616 stage
[0] = "Stage 3/3 delete all."
4618 if nsr_deployed
.get("VCA"):
4619 stage
[1] = "Deleting all execution environments."
4620 self
.logger
.debug(logging_text
+ stage
[1])
4621 vca_id
= self
.get_vca_id({}, db_nsr
)
4622 task_delete_ee
= asyncio
.ensure_future(
4624 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4625 timeout
=self
.timeout_charm_delete
,
4628 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4629 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4631 # Delete from k8scluster
4632 stage
[1] = "Deleting KDUs."
4633 self
.logger
.debug(logging_text
+ stage
[1])
4634 # print(nsr_deployed)
4635 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4636 if not kdu
or not kdu
.get("kdu-instance"):
4638 kdu_instance
= kdu
.get("kdu-instance")
4639 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4640 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4641 vca_id
= self
.get_vca_id({}, db_nsr
)
4642 task_delete_kdu_instance
= asyncio
.ensure_future(
4643 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4644 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4645 kdu_instance
=kdu_instance
,
4647 namespace
=kdu
.get("namespace"),
4653 + "Unknown k8s deployment type {}".format(
4654 kdu
.get("k8scluster-type")
4659 task_delete_kdu_instance
4660 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4663 stage
[1] = "Deleting ns from VIM."
4665 task_delete_ro
= asyncio
.ensure_future(
4666 self
._terminate
_ng
_ro
(
4667 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4671 task_delete_ro
= asyncio
.ensure_future(
4673 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4676 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4678 # rest of staff will be done at finally
4681 ROclient
.ROClientException
,
4686 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4688 except asyncio
.CancelledError
:
4690 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4692 exc
= "Operation was cancelled"
4693 except Exception as e
:
4694 exc
= traceback
.format_exc()
4695 self
.logger
.critical(
4696 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4701 error_list
.append(str(exc
))
4703 # wait for pending tasks
4705 stage
[1] = "Waiting for terminate pending tasks."
4706 self
.logger
.debug(logging_text
+ stage
[1])
4707 error_list
+= await self
._wait
_for
_tasks
(
4710 timeout_ns_terminate
,
4714 stage
[1] = stage
[2] = ""
4715 except asyncio
.CancelledError
:
4716 error_list
.append("Cancelled")
4717 # TODO cancell all tasks
4718 except Exception as exc
:
4719 error_list
.append(str(exc
))
4720 # update status at database
4722 error_detail
= "; ".join(error_list
)
4723 # self.logger.error(logging_text + error_detail)
4724 error_description_nslcmop
= "{} Detail: {}".format(
4725 stage
[0], error_detail
4727 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4728 nslcmop_id
, stage
[0]
4731 db_nsr_update
["operational-status"] = "failed"
4732 db_nsr_update
["detailed-status"] = (
4733 error_description_nsr
+ " Detail: " + error_detail
4735 db_nslcmop_update
["detailed-status"] = error_detail
4736 nslcmop_operation_state
= "FAILED"
4740 error_description_nsr
= error_description_nslcmop
= None
4741 ns_state
= "NOT_INSTANTIATED"
4742 db_nsr_update
["operational-status"] = "terminated"
4743 db_nsr_update
["detailed-status"] = "Done"
4744 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4745 db_nslcmop_update
["detailed-status"] = "Done"
4746 nslcmop_operation_state
= "COMPLETED"
4749 self
._write
_ns
_status
(
4752 current_operation
="IDLE",
4753 current_operation_id
=None,
4754 error_description
=error_description_nsr
,
4755 error_detail
=error_detail
,
4756 other_update
=db_nsr_update
,
4758 self
._write
_op
_status
(
4761 error_message
=error_description_nslcmop
,
4762 operation_state
=nslcmop_operation_state
,
4763 other_update
=db_nslcmop_update
,
4765 if ns_state
== "NOT_INSTANTIATED":
4769 {"nsr-id-ref": nsr_id
},
4770 {"_admin.nsState": "NOT_INSTANTIATED"},
4772 except DbException
as e
:
4775 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4779 if operation_params
:
4780 autoremove
= operation_params
.get("autoremove", False)
4781 if nslcmop_operation_state
:
4783 await self
.msg
.aiowrite(
4788 "nslcmop_id": nslcmop_id
,
4789 "operationState": nslcmop_operation_state
,
4790 "autoremove": autoremove
,
4794 except Exception as e
:
4796 logging_text
+ "kafka_write notification Exception {}".format(e
)
4799 self
.logger
.debug(logging_text
+ "Exit")
4800 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4802 async def _wait_for_tasks(
4803 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4806 error_detail_list
= []
4808 pending_tasks
= list(created_tasks_info
.keys())
4809 num_tasks
= len(pending_tasks
)
4811 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4812 self
._write
_op
_status
(nslcmop_id
, stage
)
4813 while pending_tasks
:
4815 _timeout
= timeout
+ time_start
- time()
4816 done
, pending_tasks
= await asyncio
.wait(
4817 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4819 num_done
+= len(done
)
4820 if not done
: # Timeout
4821 for task
in pending_tasks
:
4822 new_error
= created_tasks_info
[task
] + ": Timeout"
4823 error_detail_list
.append(new_error
)
4824 error_list
.append(new_error
)
4827 if task
.cancelled():
4830 exc
= task
.exception()
4832 if isinstance(exc
, asyncio
.TimeoutError
):
4834 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4835 error_list
.append(created_tasks_info
[task
])
4836 error_detail_list
.append(new_error
)
4843 ROclient
.ROClientException
,
4849 self
.logger
.error(logging_text
+ new_error
)
4851 exc_traceback
= "".join(
4852 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4856 + created_tasks_info
[task
]
4862 logging_text
+ created_tasks_info
[task
] + ": Done"
4864 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4866 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4867 if nsr_id
: # update also nsr
4872 "errorDescription": "Error at: " + ", ".join(error_list
),
4873 "errorDetail": ". ".join(error_detail_list
),
4876 self
._write
_op
_status
(nslcmop_id
, stage
)
4877 return error_detail_list
4880 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4882 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4883 The default-value is used. If it is between < > it look for a value at instantiation_params
4884 :param primitive_desc: portion of VNFD/NSD that describes primitive
4885 :param params: Params provided by user
4886 :param instantiation_params: Instantiation params provided by user
4887 :return: a dictionary with the calculated params
4889 calculated_params
= {}
4890 for parameter
in primitive_desc
.get("parameter", ()):
4891 param_name
= parameter
["name"]
4892 if param_name
in params
:
4893 calculated_params
[param_name
] = params
[param_name
]
4894 elif "default-value" in parameter
or "value" in parameter
:
4895 if "value" in parameter
:
4896 calculated_params
[param_name
] = parameter
["value"]
4898 calculated_params
[param_name
] = parameter
["default-value"]
4900 isinstance(calculated_params
[param_name
], str)
4901 and calculated_params
[param_name
].startswith("<")
4902 and calculated_params
[param_name
].endswith(">")
4904 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4905 calculated_params
[param_name
] = instantiation_params
[
4906 calculated_params
[param_name
][1:-1]
4910 "Parameter {} needed to execute primitive {} not provided".format(
4911 calculated_params
[param_name
], primitive_desc
["name"]
4916 "Parameter {} needed to execute primitive {} not provided".format(
4917 param_name
, primitive_desc
["name"]
4921 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4922 calculated_params
[param_name
] = yaml
.safe_dump(
4923 calculated_params
[param_name
], default_flow_style
=True, width
=256
4925 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4927 ].startswith("!!yaml "):
4928 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4929 if parameter
.get("data-type") == "INTEGER":
4931 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4932 except ValueError: # error converting string to int
4934 "Parameter {} of primitive {} must be integer".format(
4935 param_name
, primitive_desc
["name"]
4938 elif parameter
.get("data-type") == "BOOLEAN":
4939 calculated_params
[param_name
] = not (
4940 (str(calculated_params
[param_name
])).lower() == "false"
4943 # add always ns_config_info if primitive name is config
4944 if primitive_desc
["name"] == "config":
4945 if "ns_config_info" in instantiation_params
:
4946 calculated_params
["ns_config_info"] = instantiation_params
[
4949 return calculated_params
4951 def _look_for_deployed_vca(
4958 ee_descriptor_id
=None,
4960 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4961 for vca
in deployed_vca
:
4964 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4967 vdu_count_index
is not None
4968 and vdu_count_index
!= vca
["vdu_count_index"]
4971 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4973 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4977 # vca_deployed not found
4979 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4980 " is not deployed".format(
4989 ee_id
= vca
.get("ee_id")
4991 "type", "lxc_proxy_charm"
4992 ) # default value for backward compatibility - proxy charm
4995 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4996 "execution environment".format(
4997 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
5000 return ee_id
, vca_type
5002 async def _ns_execute_primitive(
5008 retries_interval
=30,
5015 if primitive
== "config":
5016 primitive_params
= {"params": primitive_params
}
5018 vca_type
= vca_type
or "lxc_proxy_charm"
5022 output
= await asyncio
.wait_for(
5023 self
.vca_map
[vca_type
].exec_primitive(
5025 primitive_name
=primitive
,
5026 params_dict
=primitive_params
,
5027 progress_timeout
=self
.timeout_progress_primitive
,
5028 total_timeout
=self
.timeout_primitive
,
5033 timeout
=timeout
or self
.timeout_primitive
,
5037 except asyncio
.CancelledError
:
5039 except Exception as e
:
5043 "Error executing action {} on {} -> {}".format(
5048 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5050 if isinstance(e
, asyncio
.TimeoutError
):
5052 message
="Timed out waiting for action to complete"
5054 return "FAILED", getattr(e
, "message", repr(e
))
5056 return "COMPLETED", output
5058 except (LcmException
, asyncio
.CancelledError
):
5060 except Exception as e
:
5061 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5063 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5065 Updating the vca_status with latest juju information in nsrs record
5066 :param: nsr_id: Id of the nsr
5067 :param: nslcmop_id: Id of the nslcmop
5071 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5072 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5073 vca_id
= self
.get_vca_id({}, db_nsr
)
5074 if db_nsr
["_admin"]["deployed"]["K8s"]:
5075 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5076 cluster_uuid
, kdu_instance
, cluster_type
= (
5077 k8s
["k8scluster-uuid"],
5078 k8s
["kdu-instance"],
5079 k8s
["k8scluster-type"],
5081 await self
._on
_update
_k
8s
_db
(
5082 cluster_uuid
=cluster_uuid
,
5083 kdu_instance
=kdu_instance
,
5084 filter={"_id": nsr_id
},
5086 cluster_type
=cluster_type
,
5089 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5090 table
, filter = "nsrs", {"_id": nsr_id
}
5091 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5092 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5094 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5095 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5097 async def action(self
, nsr_id
, nslcmop_id
):
5098 # Try to lock HA task here
5099 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5100 if not task_is_locked_by_me
:
5103 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5104 self
.logger
.debug(logging_text
+ "Enter")
5105 # get all needed from database
5109 db_nslcmop_update
= {}
5110 nslcmop_operation_state
= None
5111 error_description_nslcmop
= None
5114 # wait for any previous tasks in process
5115 step
= "Waiting for previous operations to terminate"
5116 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5118 self
._write
_ns
_status
(
5121 current_operation
="RUNNING ACTION",
5122 current_operation_id
=nslcmop_id
,
5125 step
= "Getting information from database"
5126 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5127 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5128 if db_nslcmop
["operationParams"].get("primitive_params"):
5129 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5130 db_nslcmop
["operationParams"]["primitive_params"]
5133 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5134 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5135 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5136 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5137 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5138 primitive
= db_nslcmop
["operationParams"]["primitive"]
5139 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5140 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5141 "timeout_ns_action", self
.timeout_primitive
5145 step
= "Getting vnfr from database"
5146 db_vnfr
= self
.db
.get_one(
5147 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5149 if db_vnfr
.get("kdur"):
5151 for kdur
in db_vnfr
["kdur"]:
5152 if kdur
.get("additionalParams"):
5153 kdur
["additionalParams"] = json
.loads(
5154 kdur
["additionalParams"]
5156 kdur_list
.append(kdur
)
5157 db_vnfr
["kdur"] = kdur_list
5158 step
= "Getting vnfd from database"
5159 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5161 # Sync filesystem before running a primitive
5162 self
.fs
.sync(db_vnfr
["vnfd-id"])
5164 step
= "Getting nsd from database"
5165 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5167 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5168 # for backward compatibility
5169 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5170 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5171 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5172 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5174 # look for primitive
5175 config_primitive_desc
= descriptor_configuration
= None
5177 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5179 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5181 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5183 descriptor_configuration
= db_nsd
.get("ns-configuration")
5185 if descriptor_configuration
and descriptor_configuration
.get(
5188 for config_primitive
in descriptor_configuration
["config-primitive"]:
5189 if config_primitive
["name"] == primitive
:
5190 config_primitive_desc
= config_primitive
5193 if not config_primitive_desc
:
5194 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5196 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5200 primitive_name
= primitive
5201 ee_descriptor_id
= None
5203 primitive_name
= config_primitive_desc
.get(
5204 "execution-environment-primitive", primitive
5206 ee_descriptor_id
= config_primitive_desc
.get(
5207 "execution-environment-ref"
5213 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5215 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5218 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5220 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5222 desc_params
= parse_yaml_strings(
5223 db_vnfr
.get("additionalParamsForVnf")
5226 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5227 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5228 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5230 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5231 actions
.add(primitive
["name"])
5232 for primitive
in kdu_configuration
.get("config-primitive", []):
5233 actions
.add(primitive
["name"])
5235 nsr_deployed
["K8s"],
5236 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5237 and kdu
["member-vnf-index"] == vnf_index
,
5241 if primitive_name
in actions
5242 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5246 # TODO check if ns is in a proper status
5248 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5250 # kdur and desc_params already set from before
5251 if primitive_params
:
5252 desc_params
.update(primitive_params
)
5253 # TODO Check if we will need something at vnf level
5254 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5256 kdu_name
== kdu
["kdu-name"]
5257 and kdu
["member-vnf-index"] == vnf_index
5262 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5265 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5266 msg
= "unknown k8scluster-type '{}'".format(
5267 kdu
.get("k8scluster-type")
5269 raise LcmException(msg
)
5272 "collection": "nsrs",
5273 "filter": {"_id": nsr_id
},
5274 "path": "_admin.deployed.K8s.{}".format(index
),
5278 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5280 step
= "Executing kdu {}".format(primitive_name
)
5281 if primitive_name
== "upgrade":
5282 if desc_params
.get("kdu_model"):
5283 kdu_model
= desc_params
.get("kdu_model")
5284 del desc_params
["kdu_model"]
5286 kdu_model
= kdu
.get("kdu-model")
5287 parts
= kdu_model
.split(sep
=":")
5289 kdu_model
= parts
[0]
5290 if desc_params
.get("kdu_atomic_upgrade"):
5291 atomic_upgrade
= desc_params
.get(
5292 "kdu_atomic_upgrade"
5293 ).lower() in ("yes", "true", "1")
5294 del desc_params
["kdu_atomic_upgrade"]
5296 atomic_upgrade
= True
5298 detailed_status
= await asyncio
.wait_for(
5299 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5300 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5301 kdu_instance
=kdu
.get("kdu-instance"),
5302 atomic
=atomic_upgrade
,
5303 kdu_model
=kdu_model
,
5306 timeout
=timeout_ns_action
,
5308 timeout
=timeout_ns_action
+ 10,
5311 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5313 elif primitive_name
== "rollback":
5314 detailed_status
= await asyncio
.wait_for(
5315 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5316 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5317 kdu_instance
=kdu
.get("kdu-instance"),
5320 timeout
=timeout_ns_action
,
5322 elif primitive_name
== "status":
5323 detailed_status
= await asyncio
.wait_for(
5324 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5325 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5326 kdu_instance
=kdu
.get("kdu-instance"),
5329 timeout
=timeout_ns_action
,
5332 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5333 kdu
["kdu-name"], nsr_id
5335 params
= self
._map
_primitive
_params
(
5336 config_primitive_desc
, primitive_params
, desc_params
5339 detailed_status
= await asyncio
.wait_for(
5340 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5341 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5342 kdu_instance
=kdu_instance
,
5343 primitive_name
=primitive_name
,
5346 timeout
=timeout_ns_action
,
5349 timeout
=timeout_ns_action
,
5353 nslcmop_operation_state
= "COMPLETED"
5355 detailed_status
= ""
5356 nslcmop_operation_state
= "FAILED"
5358 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5359 nsr_deployed
["VCA"],
5360 member_vnf_index
=vnf_index
,
5362 vdu_count_index
=vdu_count_index
,
5363 ee_descriptor_id
=ee_descriptor_id
,
5365 for vca_index
, vca_deployed
in enumerate(
5366 db_nsr
["_admin"]["deployed"]["VCA"]
5368 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5370 "collection": "nsrs",
5371 "filter": {"_id": nsr_id
},
5372 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5376 nslcmop_operation_state
,
5378 ) = await self
._ns
_execute
_primitive
(
5380 primitive
=primitive_name
,
5381 primitive_params
=self
._map
_primitive
_params
(
5382 config_primitive_desc
, primitive_params
, desc_params
5384 timeout
=timeout_ns_action
,
5390 db_nslcmop_update
["detailed-status"] = detailed_status
5391 error_description_nslcmop
= (
5392 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5396 + "Done with result {} {}".format(
5397 nslcmop_operation_state
, detailed_status
5400 return # database update is called inside finally
5402 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5403 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5405 except asyncio
.CancelledError
:
5407 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5409 exc
= "Operation was cancelled"
5410 except asyncio
.TimeoutError
:
5411 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5413 except Exception as e
:
5414 exc
= traceback
.format_exc()
5415 self
.logger
.critical(
5416 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5425 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5426 nslcmop_operation_state
= "FAILED"
5428 self
._write
_ns
_status
(
5432 ], # TODO check if degraded. For the moment use previous status
5433 current_operation
="IDLE",
5434 current_operation_id
=None,
5435 # error_description=error_description_nsr,
5436 # error_detail=error_detail,
5437 other_update
=db_nsr_update
,
5440 self
._write
_op
_status
(
5443 error_message
=error_description_nslcmop
,
5444 operation_state
=nslcmop_operation_state
,
5445 other_update
=db_nslcmop_update
,
5448 if nslcmop_operation_state
:
5450 await self
.msg
.aiowrite(
5455 "nslcmop_id": nslcmop_id
,
5456 "operationState": nslcmop_operation_state
,
5460 except Exception as e
:
5462 logging_text
+ "kafka_write notification Exception {}".format(e
)
5464 self
.logger
.debug(logging_text
+ "Exit")
5465 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5466 return nslcmop_operation_state
, detailed_status
5468 async def terminate_vdus(
5469 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5471 """This method terminates VDUs
5474 db_vnfr: VNF instance record
5475 member_vnf_index: VNF index to identify the VDUs to be removed
5476 db_nsr: NS instance record
5477 update_db_nslcmops: Nslcmop update record
5479 vca_scaling_info
= []
5480 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5481 scaling_info
["scaling_direction"] = "IN"
5482 scaling_info
["vdu-delete"] = {}
5483 scaling_info
["kdu-delete"] = {}
5484 db_vdur
= db_vnfr
.get("vdur")
5485 vdur_list
= copy(db_vdur
)
5487 for index
, vdu
in enumerate(vdur_list
):
5488 vca_scaling_info
.append(
5490 "osm_vdu_id": vdu
["vdu-id-ref"],
5491 "member-vnf-index": member_vnf_index
,
5493 "vdu_index": count_index
,
5496 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5497 scaling_info
["vdu"].append(
5499 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5500 "vdu_id": vdu
["vdu-id-ref"],
5504 for interface
in vdu
["interfaces"]:
5505 scaling_info
["vdu"][index
]["interface"].append(
5507 "name": interface
["name"],
5508 "ip_address": interface
["ip-address"],
5509 "mac_address": interface
.get("mac-address"),
5512 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5513 stage
[2] = "Terminating VDUs"
5514 if scaling_info
.get("vdu-delete"):
5515 # scale_process = "RO"
5516 if self
.ro_config
.get("ng"):
5517 await self
._scale
_ng
_ro
(
5526 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5527 """This method is to Remove VNF instances from NS.
5530 nsr_id: NS instance id
5531 nslcmop_id: nslcmop id of update
5532 vnf_instance_id: id of the VNF instance to be removed
5535 result: (str, str) COMPLETED/FAILED, details
5539 logging_text
= "Task ns={} update ".format(nsr_id
)
5540 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5541 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5542 if check_vnfr_count
> 1:
5543 stage
= ["", "", ""]
5544 step
= "Getting nslcmop from database"
5546 step
+ " after having waited for previous tasks to be completed"
5548 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5549 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5550 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5551 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5552 """ db_vnfr = self.db.get_one(
5553 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5555 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5556 await self
.terminate_vdus(
5565 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5566 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5567 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5568 "constituent-vnfr-ref"
5570 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5571 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5572 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5573 return "COMPLETED", "Done"
5575 step
= "Terminate VNF Failed with"
5577 "{} Cannot terminate the last VNF in this NS.".format(
5581 except (LcmException
, asyncio
.CancelledError
):
5583 except Exception as e
:
5584 self
.logger
.debug("Error removing VNF {}".format(e
))
5585 return "FAILED", "Error removing VNF {}".format(e
)
5587 async def _ns_redeploy_vnf(
5595 """This method updates and redeploys VNF instances
5598 nsr_id: NS instance id
5599 nslcmop_id: nslcmop id
5600 db_vnfd: VNF descriptor
5601 db_vnfr: VNF instance record
5602 db_nsr: NS instance record
5605 result: (str, str) COMPLETED/FAILED, details
5609 stage
= ["", "", ""]
5610 logging_text
= "Task ns={} update ".format(nsr_id
)
5611 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5612 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5614 # Terminate old VNF resources
5615 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5616 await self
.terminate_vdus(
5625 # old_vnfd_id = db_vnfr["vnfd-id"]
5626 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5627 new_db_vnfd
= db_vnfd
5628 # new_vnfd_ref = new_db_vnfd["id"]
5629 # new_vnfd_id = vnfd_id
5633 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5635 "name": cp
.get("id"),
5636 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5637 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5640 new_vnfr_cp
.append(vnf_cp
)
5641 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5642 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5643 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5645 "revision": latest_vnfd_revision
,
5646 "connection-point": new_vnfr_cp
,
5650 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5651 updated_db_vnfr
= self
.db
.get_one(
5653 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5656 # Instantiate new VNF resources
5657 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5658 vca_scaling_info
= []
5659 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5660 scaling_info
["scaling_direction"] = "OUT"
5661 scaling_info
["vdu-create"] = {}
5662 scaling_info
["kdu-create"] = {}
5663 vdud_instantiate_list
= db_vnfd
["vdu"]
5664 for index
, vdud
in enumerate(vdud_instantiate_list
):
5665 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5667 additional_params
= (
5668 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5671 cloud_init_list
= []
5673 # TODO Information of its own ip is not available because db_vnfr is not updated.
5674 additional_params
["OSM"] = get_osm_params(
5675 updated_db_vnfr
, vdud
["id"], 1
5677 cloud_init_list
.append(
5678 self
._parse
_cloud
_init
(
5685 vca_scaling_info
.append(
5687 "osm_vdu_id": vdud
["id"],
5688 "member-vnf-index": member_vnf_index
,
5690 "vdu_index": count_index
,
5693 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5694 if self
.ro_config
.get("ng"):
5696 "New Resources to be deployed: {}".format(scaling_info
)
5698 await self
._scale
_ng
_ro
(
5706 return "COMPLETED", "Done"
5707 except (LcmException
, asyncio
.CancelledError
):
5709 except Exception as e
:
5710 self
.logger
.debug("Error updating VNF {}".format(e
))
5711 return "FAILED", "Error updating VNF {}".format(e
)
5713 async def _ns_charm_upgrade(
5719 timeout
: float = None,
5721 """This method upgrade charms in VNF instances
5724 ee_id: Execution environment id
5725 path: Local path to the charm
5727 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5728 timeout: (Float) Timeout for the ns update operation
5731 result: (str, str) COMPLETED/FAILED, details
5734 charm_type
= charm_type
or "lxc_proxy_charm"
5735 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5739 charm_type
=charm_type
,
5740 timeout
=timeout
or self
.timeout_ns_update
,
5744 return "COMPLETED", output
5746 except (LcmException
, asyncio
.CancelledError
):
5749 except Exception as e
:
5751 self
.logger
.debug("Error upgrading charm {}".format(path
))
5753 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5755 async def update(self
, nsr_id
, nslcmop_id
):
5756 """Update NS according to different update types
5758 This method performs upgrade of VNF instances then updates the revision
5759 number in VNF record
5762 nsr_id: Network service will be updated
5763 nslcmop_id: ns lcm operation id
5766 It may raise DbException, LcmException, N2VCException, K8sException
5769 # Try to lock HA task here
5770 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5771 if not task_is_locked_by_me
:
5774 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5775 self
.logger
.debug(logging_text
+ "Enter")
5777 # Set the required variables to be filled up later
5779 db_nslcmop_update
= {}
5781 nslcmop_operation_state
= None
5783 error_description_nslcmop
= ""
5785 change_type
= "updated"
5786 detailed_status
= ""
5789 # wait for any previous tasks in process
5790 step
= "Waiting for previous operations to terminate"
5791 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5792 self
._write
_ns
_status
(
5795 current_operation
="UPDATING",
5796 current_operation_id
=nslcmop_id
,
5799 step
= "Getting nslcmop from database"
5800 db_nslcmop
= self
.db
.get_one(
5801 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5803 update_type
= db_nslcmop
["operationParams"]["updateType"]
5805 step
= "Getting nsr from database"
5806 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5807 old_operational_status
= db_nsr
["operational-status"]
5808 db_nsr_update
["operational-status"] = "updating"
5809 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5810 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5812 if update_type
== "CHANGE_VNFPKG":
5814 # Get the input parameters given through update request
5815 vnf_instance_id
= db_nslcmop
["operationParams"][
5816 "changeVnfPackageData"
5817 ].get("vnfInstanceId")
5819 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5822 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5824 step
= "Getting vnfr from database"
5825 db_vnfr
= self
.db
.get_one(
5826 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5829 step
= "Getting vnfds from database"
5831 latest_vnfd
= self
.db
.get_one(
5832 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5834 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5837 current_vnf_revision
= db_vnfr
.get("revision", 1)
5838 current_vnfd
= self
.db
.get_one(
5840 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5841 fail_on_empty
=False,
5843 # Charm artifact paths will be filled up later
5845 current_charm_artifact_path
,
5846 target_charm_artifact_path
,
5847 charm_artifact_paths
,
5849 ) = ([], [], [], [])
5851 step
= "Checking if revision has changed in VNFD"
5852 if current_vnf_revision
!= latest_vnfd_revision
:
5854 change_type
= "policy_updated"
5856 # There is new revision of VNFD, update operation is required
5857 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5858 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5860 step
= "Removing the VNFD packages if they exist in the local path"
5861 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5862 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5864 step
= "Get the VNFD packages from FSMongo"
5865 self
.fs
.sync(from_path
=latest_vnfd_path
)
5866 self
.fs
.sync(from_path
=current_vnfd_path
)
5869 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5871 current_base_folder
= current_vnfd
["_admin"]["storage"]
5872 latest_base_folder
= latest_vnfd
["_admin"]["storage"]
5874 for vca_index
, vca_deployed
in enumerate(
5875 get_iterable(nsr_deployed
, "VCA")
5877 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5879 # Getting charm-id and charm-type
5880 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5881 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5882 vca_type
= vca_deployed
.get("type")
5883 vdu_count_index
= vca_deployed
.get("vdu_count_index")
5886 ee_id
= vca_deployed
.get("ee_id")
5888 step
= "Getting descriptor config"
5889 descriptor_config
= get_configuration(
5890 current_vnfd
, current_vnfd
["id"]
5893 if "execution-environment-list" in descriptor_config
:
5894 ee_list
= descriptor_config
.get(
5895 "execution-environment-list", []
5900 # There could be several charm used in the same VNF
5901 for ee_item
in ee_list
:
5902 if ee_item
.get("juju"):
5904 step
= "Getting charm name"
5905 charm_name
= ee_item
["juju"].get("charm")
5907 step
= "Setting Charm artifact paths"
5908 current_charm_artifact_path
.append(
5909 get_charm_artifact_path(
5910 current_base_folder
,
5913 current_vnf_revision
,
5916 target_charm_artifact_path
.append(
5917 get_charm_artifact_path(
5921 latest_vnfd_revision
,
5924 elif ee_item
.get("helm-chart"):
5925 # add chart to list and all parameters
5926 step
= "Getting helm chart name"
5927 chart_name
= ee_item
.get("helm-chart")
5929 ee_item
.get("helm-version")
5930 and ee_item
.get("helm-version") == "v2"
5934 vca_type
= "helm-v3"
5935 step
= "Setting Helm chart artifact paths"
5937 helm_artifacts
.append(
5939 "current_artifact_path": get_charm_artifact_path(
5940 current_base_folder
,
5943 current_vnf_revision
,
5945 "target_artifact_path": get_charm_artifact_path(
5949 latest_vnfd_revision
,
5952 "vca_index": vca_index
,
5953 "vdu_index": vdu_count_index
,
5957 charm_artifact_paths
= zip(
5958 current_charm_artifact_path
, target_charm_artifact_path
5961 step
= "Checking if software version has changed in VNFD"
5962 if find_software_version(current_vnfd
) != find_software_version(
5966 step
= "Checking if existing VNF has charm"
5967 for current_charm_path
, target_charm_path
in list(
5968 charm_artifact_paths
5970 if current_charm_path
:
5972 "Software version change is not supported as VNF instance {} has charm.".format(
5977 # There is no change in the charm package, then redeploy the VNF
5978 # based on new descriptor
5979 step
= "Redeploying VNF"
5980 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5981 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5982 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5984 if result
== "FAILED":
5985 nslcmop_operation_state
= result
5986 error_description_nslcmop
= detailed_status
5987 db_nslcmop_update
["detailed-status"] = detailed_status
5990 + " step {} Done with result {} {}".format(
5991 step
, nslcmop_operation_state
, detailed_status
5996 step
= "Checking if any charm package has changed or not"
5997 for current_charm_path
, target_charm_path
in list(
5998 charm_artifact_paths
6002 and target_charm_path
6003 and self
.check_charm_hash_changed(
6004 current_charm_path
, target_charm_path
6008 step
= "Checking whether VNF uses juju bundle"
6009 if check_juju_bundle_existence(current_vnfd
):
6012 "Charm upgrade is not supported for the instance which"
6013 " uses juju-bundle: {}".format(
6014 check_juju_bundle_existence(current_vnfd
)
6018 step
= "Upgrading Charm"
6022 ) = await self
._ns
_charm
_upgrade
(
6025 charm_type
=vca_type
,
6026 path
=self
.fs
.path
+ target_charm_path
,
6027 timeout
=timeout_seconds
,
6030 if result
== "FAILED":
6031 nslcmop_operation_state
= result
6032 error_description_nslcmop
= detailed_status
6034 db_nslcmop_update
["detailed-status"] = detailed_status
6037 + " step {} Done with result {} {}".format(
6038 step
, nslcmop_operation_state
, detailed_status
6042 step
= "Updating policies"
6043 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6044 result
= "COMPLETED"
6045 detailed_status
= "Done"
6046 db_nslcmop_update
["detailed-status"] = "Done"
6049 for item
in helm_artifacts
:
6051 item
["current_artifact_path"]
6052 and item
["target_artifact_path"]
6053 and self
.check_charm_hash_changed(
6054 item
["current_artifact_path"],
6055 item
["target_artifact_path"],
6059 db_update_entry
= "_admin.deployed.VCA.{}.".format(
6062 vnfr_id
= db_vnfr
["_id"]
6063 osm_config
= {"osm": {"ns_id": nsr_id
, "vnf_id": vnfr_id
}}
6065 "collection": "nsrs",
6066 "filter": {"_id": nsr_id
},
6067 "path": db_update_entry
,
6069 vca_type
, namespace
, helm_id
= get_ee_id_parts(item
["ee_id"])
6070 await self
.vca_map
[vca_type
].upgrade_execution_environment(
6071 namespace
=namespace
,
6075 artifact_path
=item
["target_artifact_path"],
6078 vnf_id
= db_vnfr
.get("vnfd-ref")
6079 config_descriptor
= get_configuration(latest_vnfd
, vnf_id
)
6080 self
.logger
.debug("get ssh key block")
6084 ("config-access", "ssh-access", "required"),
6086 # Needed to inject a ssh key
6089 ("config-access", "ssh-access", "default-user"),
6092 "Install configuration Software, getting public ssh key"
6094 pub_key
= await self
.vca_map
[
6096 ].get_ee_ssh_public__key(
6097 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
6101 "Insert public key into VM user={} ssh_key={}".format(
6105 self
.logger
.debug(logging_text
+ step
)
6107 # wait for RO (ip-address) Insert pub_key into VM
6108 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
6118 initial_config_primitive_list
= config_descriptor
.get(
6119 "initial-config-primitive"
6121 config_primitive
= next(
6124 for p
in initial_config_primitive_list
6125 if p
["name"] == "config"
6129 if not config_primitive
:
6132 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6134 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
6135 if db_vnfr
.get("additionalParamsForVnf"):
6136 deploy_params
.update(
6138 db_vnfr
["additionalParamsForVnf"].copy()
6141 primitive_params_
= self
._map
_primitive
_params
(
6142 config_primitive
, {}, deploy_params
6145 step
= "execute primitive '{}' params '{}'".format(
6146 config_primitive
["name"], primitive_params_
6148 self
.logger
.debug(logging_text
+ step
)
6149 await self
.vca_map
[vca_type
].exec_primitive(
6151 primitive_name
=config_primitive
["name"],
6152 params_dict
=primitive_params_
,
6158 step
= "Updating policies"
6159 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6160 detailed_status
= "Done"
6161 db_nslcmop_update
["detailed-status"] = "Done"
6163 # If nslcmop_operation_state is None, so any operation is not failed.
6164 if not nslcmop_operation_state
:
6165 nslcmop_operation_state
= "COMPLETED"
6167 # If update CHANGE_VNFPKG nslcmop_operation is successful
6168 # vnf revision need to be updated
6169 vnfr_update
["revision"] = latest_vnfd_revision
6170 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6174 + " task Done with result {} {}".format(
6175 nslcmop_operation_state
, detailed_status
6178 elif update_type
== "REMOVE_VNF":
6179 # This part is included in https://osm.etsi.org/gerrit/11876
6180 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6181 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6182 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6183 step
= "Removing VNF"
6184 (result
, detailed_status
) = await self
.remove_vnf(
6185 nsr_id
, nslcmop_id
, vnf_instance_id
6187 if result
== "FAILED":
6188 nslcmop_operation_state
= result
6189 error_description_nslcmop
= detailed_status
6190 db_nslcmop_update
["detailed-status"] = detailed_status
6191 change_type
= "vnf_terminated"
6192 if not nslcmop_operation_state
:
6193 nslcmop_operation_state
= "COMPLETED"
6196 + " task Done with result {} {}".format(
6197 nslcmop_operation_state
, detailed_status
6201 elif update_type
== "OPERATE_VNF":
6202 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6205 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6208 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6211 (result
, detailed_status
) = await self
.rebuild_start_stop(
6212 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6214 if result
== "FAILED":
6215 nslcmop_operation_state
= result
6216 error_description_nslcmop
= detailed_status
6217 db_nslcmop_update
["detailed-status"] = detailed_status
6218 if not nslcmop_operation_state
:
6219 nslcmop_operation_state
= "COMPLETED"
6222 + " task Done with result {} {}".format(
6223 nslcmop_operation_state
, detailed_status
6227 # If nslcmop_operation_state is None, so any operation is not failed.
6228 # All operations are executed in overall.
6229 if not nslcmop_operation_state
:
6230 nslcmop_operation_state
= "COMPLETED"
6231 db_nsr_update
["operational-status"] = old_operational_status
6233 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6234 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6236 except asyncio
.CancelledError
:
6238 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6240 exc
= "Operation was cancelled"
6241 except asyncio
.TimeoutError
:
6242 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6244 except Exception as e
:
6245 exc
= traceback
.format_exc()
6246 self
.logger
.critical(
6247 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6256 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6257 nslcmop_operation_state
= "FAILED"
6258 db_nsr_update
["operational-status"] = old_operational_status
6260 self
._write
_ns
_status
(
6262 ns_state
=db_nsr
["nsState"],
6263 current_operation
="IDLE",
6264 current_operation_id
=None,
6265 other_update
=db_nsr_update
,
6268 self
._write
_op
_status
(
6271 error_message
=error_description_nslcmop
,
6272 operation_state
=nslcmop_operation_state
,
6273 other_update
=db_nslcmop_update
,
6276 if nslcmop_operation_state
:
6280 "nslcmop_id": nslcmop_id
,
6281 "operationState": nslcmop_operation_state
,
6283 if change_type
in ("vnf_terminated", "policy_updated"):
6284 msg
.update({"vnf_member_index": member_vnf_index
})
6285 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6286 except Exception as e
:
6288 logging_text
+ "kafka_write notification Exception {}".format(e
)
6290 self
.logger
.debug(logging_text
+ "Exit")
6291 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6292 return nslcmop_operation_state
, detailed_status
6294 async def scale(self
, nsr_id
, nslcmop_id
):
6295 # Try to lock HA task here
6296 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6297 if not task_is_locked_by_me
:
6300 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6301 stage
= ["", "", ""]
6302 tasks_dict_info
= {}
6303 # ^ stage, step, VIM progress
6304 self
.logger
.debug(logging_text
+ "Enter")
6305 # get all needed from database
6307 db_nslcmop_update
= {}
6310 # in case of error, indicates what part of scale was failed to put nsr at error status
6311 scale_process
= None
6312 old_operational_status
= ""
6313 old_config_status
= ""
6316 # wait for any previous tasks in process
6317 step
= "Waiting for previous operations to terminate"
6318 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6319 self
._write
_ns
_status
(
6322 current_operation
="SCALING",
6323 current_operation_id
=nslcmop_id
,
6326 step
= "Getting nslcmop from database"
6328 step
+ " after having waited for previous tasks to be completed"
6330 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6332 step
= "Getting nsr from database"
6333 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6334 old_operational_status
= db_nsr
["operational-status"]
6335 old_config_status
= db_nsr
["config-status"]
6337 step
= "Parsing scaling parameters"
6338 db_nsr_update
["operational-status"] = "scaling"
6339 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6340 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6342 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6344 ]["member-vnf-index"]
6345 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6347 ]["scaling-group-descriptor"]
6348 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6349 # for backward compatibility
6350 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6351 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6352 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6353 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6355 step
= "Getting vnfr from database"
6356 db_vnfr
= self
.db
.get_one(
6357 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6360 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6362 step
= "Getting vnfd from database"
6363 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6365 base_folder
= db_vnfd
["_admin"]["storage"]
6367 step
= "Getting scaling-group-descriptor"
6368 scaling_descriptor
= find_in_list(
6369 get_scaling_aspect(db_vnfd
),
6370 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6372 if not scaling_descriptor
:
6374 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6375 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6378 step
= "Sending scale order to VIM"
6379 # TODO check if ns is in a proper status
6381 if not db_nsr
["_admin"].get("scaling-group"):
6386 "_admin.scaling-group": [
6387 {"name": scaling_group
, "nb-scale-op": 0}
6391 admin_scale_index
= 0
6393 for admin_scale_index
, admin_scale_info
in enumerate(
6394 db_nsr
["_admin"]["scaling-group"]
6396 if admin_scale_info
["name"] == scaling_group
:
6397 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6399 else: # not found, set index one plus last element and add new entry with the name
6400 admin_scale_index
+= 1
6402 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6405 vca_scaling_info
= []
6406 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6407 if scaling_type
== "SCALE_OUT":
6408 if "aspect-delta-details" not in scaling_descriptor
:
6410 "Aspect delta details not fount in scaling descriptor {}".format(
6411 scaling_descriptor
["name"]
6414 # count if max-instance-count is reached
6415 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6417 scaling_info
["scaling_direction"] = "OUT"
6418 scaling_info
["vdu-create"] = {}
6419 scaling_info
["kdu-create"] = {}
6420 for delta
in deltas
:
6421 for vdu_delta
in delta
.get("vdu-delta", {}):
6422 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6423 # vdu_index also provides the number of instance of the targeted vdu
6424 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6425 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6429 additional_params
= (
6430 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6433 cloud_init_list
= []
6435 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6436 max_instance_count
= 10
6437 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6438 max_instance_count
= vdu_profile
.get(
6439 "max-number-of-instances", 10
6442 default_instance_num
= get_number_of_instances(
6445 instances_number
= vdu_delta
.get("number-of-instances", 1)
6446 nb_scale_op
+= instances_number
6448 new_instance_count
= nb_scale_op
+ default_instance_num
6449 # Control if new count is over max and vdu count is less than max.
6450 # Then assign new instance count
6451 if new_instance_count
> max_instance_count
> vdu_count
:
6452 instances_number
= new_instance_count
- max_instance_count
6454 instances_number
= instances_number
6456 if new_instance_count
> max_instance_count
:
6458 "reached the limit of {} (max-instance-count) "
6459 "scaling-out operations for the "
6460 "scaling-group-descriptor '{}'".format(
6461 nb_scale_op
, scaling_group
6464 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6466 # TODO Information of its own ip is not available because db_vnfr is not updated.
6467 additional_params
["OSM"] = get_osm_params(
6468 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6470 cloud_init_list
.append(
6471 self
._parse
_cloud
_init
(
6478 vca_scaling_info
.append(
6480 "osm_vdu_id": vdu_delta
["id"],
6481 "member-vnf-index": vnf_index
,
6483 "vdu_index": vdu_index
+ x
,
6486 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6487 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6488 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6489 kdu_name
= kdu_profile
["kdu-name"]
6490 resource_name
= kdu_profile
.get("resource-name", "")
6492 # Might have different kdus in the same delta
6493 # Should have list for each kdu
6494 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6495 scaling_info
["kdu-create"][kdu_name
] = []
6497 kdur
= get_kdur(db_vnfr
, kdu_name
)
6498 if kdur
.get("helm-chart"):
6499 k8s_cluster_type
= "helm-chart-v3"
6500 self
.logger
.debug("kdur: {}".format(kdur
))
6502 kdur
.get("helm-version")
6503 and kdur
.get("helm-version") == "v2"
6505 k8s_cluster_type
= "helm-chart"
6506 elif kdur
.get("juju-bundle"):
6507 k8s_cluster_type
= "juju-bundle"
6510 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6511 "juju-bundle. Maybe an old NBI version is running".format(
6512 db_vnfr
["member-vnf-index-ref"], kdu_name
6516 max_instance_count
= 10
6517 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6518 max_instance_count
= kdu_profile
.get(
6519 "max-number-of-instances", 10
6522 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6523 deployed_kdu
, _
= get_deployed_kdu(
6524 nsr_deployed
, kdu_name
, vnf_index
6526 if deployed_kdu
is None:
6528 "KDU '{}' for vnf '{}' not deployed".format(
6532 kdu_instance
= deployed_kdu
.get("kdu-instance")
6533 instance_num
= await self
.k8scluster_map
[
6539 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6540 kdu_model
=deployed_kdu
.get("kdu-model"),
6542 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6543 "number-of-instances", 1
6546 # Control if new count is over max and instance_num is less than max.
6547 # Then assign max instance number to kdu replica count
6548 if kdu_replica_count
> max_instance_count
> instance_num
:
6549 kdu_replica_count
= max_instance_count
6550 if kdu_replica_count
> max_instance_count
:
6552 "reached the limit of {} (max-instance-count) "
6553 "scaling-out operations for the "
6554 "scaling-group-descriptor '{}'".format(
6555 instance_num
, scaling_group
6559 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6560 vca_scaling_info
.append(
6562 "osm_kdu_id": kdu_name
,
6563 "member-vnf-index": vnf_index
,
6565 "kdu_index": instance_num
+ x
- 1,
6568 scaling_info
["kdu-create"][kdu_name
].append(
6570 "member-vnf-index": vnf_index
,
6572 "k8s-cluster-type": k8s_cluster_type
,
6573 "resource-name": resource_name
,
6574 "scale": kdu_replica_count
,
6577 elif scaling_type
== "SCALE_IN":
6578 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6580 scaling_info
["scaling_direction"] = "IN"
6581 scaling_info
["vdu-delete"] = {}
6582 scaling_info
["kdu-delete"] = {}
6584 for delta
in deltas
:
6585 for vdu_delta
in delta
.get("vdu-delta", {}):
6586 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6587 min_instance_count
= 0
6588 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6589 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6590 min_instance_count
= vdu_profile
["min-number-of-instances"]
6592 default_instance_num
= get_number_of_instances(
6593 db_vnfd
, vdu_delta
["id"]
6595 instance_num
= vdu_delta
.get("number-of-instances", 1)
6596 nb_scale_op
-= instance_num
6598 new_instance_count
= nb_scale_op
+ default_instance_num
6600 if new_instance_count
< min_instance_count
< vdu_count
:
6601 instances_number
= min_instance_count
- new_instance_count
6603 instances_number
= instance_num
6605 if new_instance_count
< min_instance_count
:
6607 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6608 "scaling-group-descriptor '{}'".format(
6609 nb_scale_op
, scaling_group
6612 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6613 vca_scaling_info
.append(
6615 "osm_vdu_id": vdu_delta
["id"],
6616 "member-vnf-index": vnf_index
,
6618 "vdu_index": vdu_index
- 1 - x
,
6621 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6622 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6623 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6624 kdu_name
= kdu_profile
["kdu-name"]
6625 resource_name
= kdu_profile
.get("resource-name", "")
6627 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6628 scaling_info
["kdu-delete"][kdu_name
] = []
6630 kdur
= get_kdur(db_vnfr
, kdu_name
)
6631 if kdur
.get("helm-chart"):
6632 k8s_cluster_type
= "helm-chart-v3"
6633 self
.logger
.debug("kdur: {}".format(kdur
))
6635 kdur
.get("helm-version")
6636 and kdur
.get("helm-version") == "v2"
6638 k8s_cluster_type
= "helm-chart"
6639 elif kdur
.get("juju-bundle"):
6640 k8s_cluster_type
= "juju-bundle"
6643 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6644 "juju-bundle. Maybe an old NBI version is running".format(
6645 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6649 min_instance_count
= 0
6650 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6651 min_instance_count
= kdu_profile
["min-number-of-instances"]
6653 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6654 deployed_kdu
, _
= get_deployed_kdu(
6655 nsr_deployed
, kdu_name
, vnf_index
6657 if deployed_kdu
is None:
6659 "KDU '{}' for vnf '{}' not deployed".format(
6663 kdu_instance
= deployed_kdu
.get("kdu-instance")
6664 instance_num
= await self
.k8scluster_map
[
6670 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6671 kdu_model
=deployed_kdu
.get("kdu-model"),
6673 kdu_replica_count
= instance_num
- kdu_delta
.get(
6674 "number-of-instances", 1
6677 if kdu_replica_count
< min_instance_count
< instance_num
:
6678 kdu_replica_count
= min_instance_count
6679 if kdu_replica_count
< min_instance_count
:
6681 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6682 "scaling-group-descriptor '{}'".format(
6683 instance_num
, scaling_group
6687 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6688 vca_scaling_info
.append(
6690 "osm_kdu_id": kdu_name
,
6691 "member-vnf-index": vnf_index
,
6693 "kdu_index": instance_num
- x
- 1,
6696 scaling_info
["kdu-delete"][kdu_name
].append(
6698 "member-vnf-index": vnf_index
,
6700 "k8s-cluster-type": k8s_cluster_type
,
6701 "resource-name": resource_name
,
6702 "scale": kdu_replica_count
,
6706 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6707 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6708 if scaling_info
["scaling_direction"] == "IN":
6709 for vdur
in reversed(db_vnfr
["vdur"]):
6710 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6711 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6712 scaling_info
["vdu"].append(
6714 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6715 "vdu_id": vdur
["vdu-id-ref"],
6719 for interface
in vdur
["interfaces"]:
6720 scaling_info
["vdu"][-1]["interface"].append(
6722 "name": interface
["name"],
6723 "ip_address": interface
["ip-address"],
6724 "mac_address": interface
.get("mac-address"),
6727 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6730 step
= "Executing pre-scale vnf-config-primitive"
6731 if scaling_descriptor
.get("scaling-config-action"):
6732 for scaling_config_action
in scaling_descriptor
[
6733 "scaling-config-action"
6736 scaling_config_action
.get("trigger") == "pre-scale-in"
6737 and scaling_type
== "SCALE_IN"
6739 scaling_config_action
.get("trigger") == "pre-scale-out"
6740 and scaling_type
== "SCALE_OUT"
6742 vnf_config_primitive
= scaling_config_action
[
6743 "vnf-config-primitive-name-ref"
6745 step
= db_nslcmop_update
[
6747 ] = "executing pre-scale scaling-config-action '{}'".format(
6748 vnf_config_primitive
6751 # look for primitive
6752 for config_primitive
in (
6753 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6754 ).get("config-primitive", ()):
6755 if config_primitive
["name"] == vnf_config_primitive
:
6759 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6760 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6761 "primitive".format(scaling_group
, vnf_config_primitive
)
6764 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6765 if db_vnfr
.get("additionalParamsForVnf"):
6766 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6768 scale_process
= "VCA"
6769 db_nsr_update
["config-status"] = "configuring pre-scaling"
6770 primitive_params
= self
._map
_primitive
_params
(
6771 config_primitive
, {}, vnfr_params
6774 # Pre-scale retry check: Check if this sub-operation has been executed before
6775 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6778 vnf_config_primitive
,
6782 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6783 # Skip sub-operation
6784 result
= "COMPLETED"
6785 result_detail
= "Done"
6788 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6789 vnf_config_primitive
, result
, result_detail
6793 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6794 # New sub-operation: Get index of this sub-operation
6796 len(db_nslcmop
.get("_admin", {}).get("operations"))
6801 + "vnf_config_primitive={} New sub-operation".format(
6802 vnf_config_primitive
6806 # retry: Get registered params for this existing sub-operation
6807 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6810 vnf_index
= op
.get("member_vnf_index")
6811 vnf_config_primitive
= op
.get("primitive")
6812 primitive_params
= op
.get("primitive_params")
6815 + "vnf_config_primitive={} Sub-operation retry".format(
6816 vnf_config_primitive
6819 # Execute the primitive, either with new (first-time) or registered (reintent) args
6820 ee_descriptor_id
= config_primitive
.get(
6821 "execution-environment-ref"
6823 primitive_name
= config_primitive
.get(
6824 "execution-environment-primitive", vnf_config_primitive
6826 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6827 nsr_deployed
["VCA"],
6828 member_vnf_index
=vnf_index
,
6830 vdu_count_index
=None,
6831 ee_descriptor_id
=ee_descriptor_id
,
6833 result
, result_detail
= await self
._ns
_execute
_primitive
(
6842 + "vnf_config_primitive={} Done with result {} {}".format(
6843 vnf_config_primitive
, result
, result_detail
6846 # Update operationState = COMPLETED | FAILED
6847 self
._update
_suboperation
_status
(
6848 db_nslcmop
, op_index
, result
, result_detail
6851 if result
== "FAILED":
6852 raise LcmException(result_detail
)
6853 db_nsr_update
["config-status"] = old_config_status
6854 scale_process
= None
6858 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6861 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6864 # SCALE-IN VCA - BEGIN
6865 if vca_scaling_info
:
6866 step
= db_nslcmop_update
[
6868 ] = "Deleting the execution environments"
6869 scale_process
= "VCA"
6870 for vca_info
in vca_scaling_info
:
6871 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6872 member_vnf_index
= str(vca_info
["member-vnf-index"])
6874 logging_text
+ "vdu info: {}".format(vca_info
)
6876 if vca_info
.get("osm_vdu_id"):
6877 vdu_id
= vca_info
["osm_vdu_id"]
6878 vdu_index
= int(vca_info
["vdu_index"])
6881 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6882 member_vnf_index
, vdu_id
, vdu_index
6884 stage
[2] = step
= "Scaling in VCA"
6885 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6886 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6887 config_update
= db_nsr
["configurationStatus"]
6888 for vca_index
, vca
in enumerate(vca_update
):
6890 (vca
or vca
.get("ee_id"))
6891 and vca
["member-vnf-index"] == member_vnf_index
6892 and vca
["vdu_count_index"] == vdu_index
6894 if vca
.get("vdu_id"):
6895 config_descriptor
= get_configuration(
6896 db_vnfd
, vca
.get("vdu_id")
6898 elif vca
.get("kdu_name"):
6899 config_descriptor
= get_configuration(
6900 db_vnfd
, vca
.get("kdu_name")
6903 config_descriptor
= get_configuration(
6904 db_vnfd
, db_vnfd
["id"]
6906 operation_params
= (
6907 db_nslcmop
.get("operationParams") or {}
6909 exec_terminate_primitives
= not operation_params
.get(
6910 "skip_terminate_primitives"
6911 ) and vca
.get("needed_terminate")
6912 task
= asyncio
.ensure_future(
6921 exec_primitives
=exec_terminate_primitives
,
6925 timeout
=self
.timeout_charm_delete
,
6928 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6931 del vca_update
[vca_index
]
6932 del config_update
[vca_index
]
6933 # wait for pending tasks of terminate primitives
6937 + "Waiting for tasks {}".format(
6938 list(tasks_dict_info
.keys())
6941 error_list
= await self
._wait
_for
_tasks
(
6945 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6950 tasks_dict_info
.clear()
6952 raise LcmException("; ".join(error_list
))
6954 db_vca_and_config_update
= {
6955 "_admin.deployed.VCA": vca_update
,
6956 "configurationStatus": config_update
,
6959 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6961 scale_process
= None
6962 # SCALE-IN VCA - END
6965 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6966 scale_process
= "RO"
6967 if self
.ro_config
.get("ng"):
6968 await self
._scale
_ng
_ro
(
6969 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6971 scaling_info
.pop("vdu-create", None)
6972 scaling_info
.pop("vdu-delete", None)
6974 scale_process
= None
6978 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6979 scale_process
= "KDU"
6980 await self
._scale
_kdu
(
6981 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6983 scaling_info
.pop("kdu-create", None)
6984 scaling_info
.pop("kdu-delete", None)
6986 scale_process
= None
6990 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6992 # SCALE-UP VCA - BEGIN
6993 if vca_scaling_info
:
6994 step
= db_nslcmop_update
[
6996 ] = "Creating new execution environments"
6997 scale_process
= "VCA"
6998 for vca_info
in vca_scaling_info
:
6999 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
7000 member_vnf_index
= str(vca_info
["member-vnf-index"])
7002 logging_text
+ "vdu info: {}".format(vca_info
)
7004 vnfd_id
= db_vnfr
["vnfd-ref"]
7005 if vca_info
.get("osm_vdu_id"):
7006 vdu_index
= int(vca_info
["vdu_index"])
7007 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
7008 if db_vnfr
.get("additionalParamsForVnf"):
7009 deploy_params
.update(
7011 db_vnfr
["additionalParamsForVnf"].copy()
7014 descriptor_config
= get_configuration(
7015 db_vnfd
, db_vnfd
["id"]
7017 if descriptor_config
:
7022 logging_text
=logging_text
7023 + "member_vnf_index={} ".format(member_vnf_index
),
7026 nslcmop_id
=nslcmop_id
,
7032 member_vnf_index
=member_vnf_index
,
7033 vdu_index
=vdu_index
,
7035 deploy_params
=deploy_params
,
7036 descriptor_config
=descriptor_config
,
7037 base_folder
=base_folder
,
7038 task_instantiation_info
=tasks_dict_info
,
7041 vdu_id
= vca_info
["osm_vdu_id"]
7042 vdur
= find_in_list(
7043 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
7045 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
7046 if vdur
.get("additionalParams"):
7047 deploy_params_vdu
= parse_yaml_strings(
7048 vdur
["additionalParams"]
7051 deploy_params_vdu
= deploy_params
7052 deploy_params_vdu
["OSM"] = get_osm_params(
7053 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
7055 if descriptor_config
:
7060 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7061 member_vnf_index
, vdu_id
, vdu_index
7063 stage
[2] = step
= "Scaling out VCA"
7064 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
7066 logging_text
=logging_text
7067 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
7068 member_vnf_index
, vdu_id
, vdu_index
7072 nslcmop_id
=nslcmop_id
,
7078 member_vnf_index
=member_vnf_index
,
7079 vdu_index
=vdu_index
,
7081 deploy_params
=deploy_params_vdu
,
7082 descriptor_config
=descriptor_config
,
7083 base_folder
=base_folder
,
7084 task_instantiation_info
=tasks_dict_info
,
7087 # SCALE-UP VCA - END
7088 scale_process
= None
7091 # execute primitive service POST-SCALING
7092 step
= "Executing post-scale vnf-config-primitive"
7093 if scaling_descriptor
.get("scaling-config-action"):
7094 for scaling_config_action
in scaling_descriptor
[
7095 "scaling-config-action"
7098 scaling_config_action
.get("trigger") == "post-scale-in"
7099 and scaling_type
== "SCALE_IN"
7101 scaling_config_action
.get("trigger") == "post-scale-out"
7102 and scaling_type
== "SCALE_OUT"
7104 vnf_config_primitive
= scaling_config_action
[
7105 "vnf-config-primitive-name-ref"
7107 step
= db_nslcmop_update
[
7109 ] = "executing post-scale scaling-config-action '{}'".format(
7110 vnf_config_primitive
7113 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
7114 if db_vnfr
.get("additionalParamsForVnf"):
7115 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
7117 # look for primitive
7118 for config_primitive
in (
7119 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
7120 ).get("config-primitive", ()):
7121 if config_primitive
["name"] == vnf_config_primitive
:
7125 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
7126 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
7127 "config-primitive".format(
7128 scaling_group
, vnf_config_primitive
7131 scale_process
= "VCA"
7132 db_nsr_update
["config-status"] = "configuring post-scaling"
7133 primitive_params
= self
._map
_primitive
_params
(
7134 config_primitive
, {}, vnfr_params
7137 # Post-scale retry check: Check if this sub-operation has been executed before
7138 op_index
= self
._check
_or
_add
_scale
_suboperation
(
7141 vnf_config_primitive
,
7145 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
7146 # Skip sub-operation
7147 result
= "COMPLETED"
7148 result_detail
= "Done"
7151 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
7152 vnf_config_primitive
, result
, result_detail
7156 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7157 # New sub-operation: Get index of this sub-operation
7159 len(db_nslcmop
.get("_admin", {}).get("operations"))
7164 + "vnf_config_primitive={} New sub-operation".format(
7165 vnf_config_primitive
7169 # retry: Get registered params for this existing sub-operation
7170 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7173 vnf_index
= op
.get("member_vnf_index")
7174 vnf_config_primitive
= op
.get("primitive")
7175 primitive_params
= op
.get("primitive_params")
7178 + "vnf_config_primitive={} Sub-operation retry".format(
7179 vnf_config_primitive
7182 # Execute the primitive, either with new (first-time) or registered (reintent) args
7183 ee_descriptor_id
= config_primitive
.get(
7184 "execution-environment-ref"
7186 primitive_name
= config_primitive
.get(
7187 "execution-environment-primitive", vnf_config_primitive
7189 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7190 nsr_deployed
["VCA"],
7191 member_vnf_index
=vnf_index
,
7193 vdu_count_index
=None,
7194 ee_descriptor_id
=ee_descriptor_id
,
7196 result
, result_detail
= await self
._ns
_execute
_primitive
(
7205 + "vnf_config_primitive={} Done with result {} {}".format(
7206 vnf_config_primitive
, result
, result_detail
7209 # Update operationState = COMPLETED | FAILED
7210 self
._update
_suboperation
_status
(
7211 db_nslcmop
, op_index
, result
, result_detail
7214 if result
== "FAILED":
7215 raise LcmException(result_detail
)
7216 db_nsr_update
["config-status"] = old_config_status
7217 scale_process
= None
7222 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7223 db_nsr_update
["operational-status"] = (
7225 if old_operational_status
== "failed"
7226 else old_operational_status
7228 db_nsr_update
["config-status"] = old_config_status
7231 ROclient
.ROClientException
,
7236 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7238 except asyncio
.CancelledError
:
7240 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7242 exc
= "Operation was cancelled"
7243 except Exception as e
:
7244 exc
= traceback
.format_exc()
7245 self
.logger
.critical(
7246 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7250 self
._write
_ns
_status
(
7253 current_operation
="IDLE",
7254 current_operation_id
=None,
7257 stage
[1] = "Waiting for instantiate pending tasks."
7258 self
.logger
.debug(logging_text
+ stage
[1])
7259 exc
= await self
._wait
_for
_tasks
(
7262 self
.timeout_ns_deploy
,
7270 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7271 nslcmop_operation_state
= "FAILED"
7273 db_nsr_update
["operational-status"] = old_operational_status
7274 db_nsr_update
["config-status"] = old_config_status
7275 db_nsr_update
["detailed-status"] = ""
7277 if "VCA" in scale_process
:
7278 db_nsr_update
["config-status"] = "failed"
7279 if "RO" in scale_process
:
7280 db_nsr_update
["operational-status"] = "failed"
7283 ] = "FAILED scaling nslcmop={} {}: {}".format(
7284 nslcmop_id
, step
, exc
7287 error_description_nslcmop
= None
7288 nslcmop_operation_state
= "COMPLETED"
7289 db_nslcmop_update
["detailed-status"] = "Done"
7291 self
._write
_op
_status
(
7294 error_message
=error_description_nslcmop
,
7295 operation_state
=nslcmop_operation_state
,
7296 other_update
=db_nslcmop_update
,
7299 self
._write
_ns
_status
(
7302 current_operation
="IDLE",
7303 current_operation_id
=None,
7304 other_update
=db_nsr_update
,
7307 if nslcmop_operation_state
:
7311 "nslcmop_id": nslcmop_id
,
7312 "operationState": nslcmop_operation_state
,
7314 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7315 except Exception as e
:
7317 logging_text
+ "kafka_write notification Exception {}".format(e
)
7319 self
.logger
.debug(logging_text
+ "Exit")
7320 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7322 async def _scale_kdu(
7323 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7325 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7326 for kdu_name
in _scaling_info
:
7327 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7328 deployed_kdu
, index
= get_deployed_kdu(
7329 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7331 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7332 kdu_instance
= deployed_kdu
["kdu-instance"]
7333 kdu_model
= deployed_kdu
.get("kdu-model")
7334 scale
= int(kdu_scaling_info
["scale"])
7335 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7338 "collection": "nsrs",
7339 "filter": {"_id": nsr_id
},
7340 "path": "_admin.deployed.K8s.{}".format(index
),
7343 step
= "scaling application {}".format(
7344 kdu_scaling_info
["resource-name"]
7346 self
.logger
.debug(logging_text
+ step
)
7348 if kdu_scaling_info
["type"] == "delete":
7349 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7352 and kdu_config
.get("terminate-config-primitive")
7353 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7355 terminate_config_primitive_list
= kdu_config
.get(
7356 "terminate-config-primitive"
7358 terminate_config_primitive_list
.sort(
7359 key
=lambda val
: int(val
["seq"])
7363 terminate_config_primitive
7364 ) in terminate_config_primitive_list
:
7365 primitive_params_
= self
._map
_primitive
_params
(
7366 terminate_config_primitive
, {}, {}
7368 step
= "execute terminate config primitive"
7369 self
.logger
.debug(logging_text
+ step
)
7370 await asyncio
.wait_for(
7371 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7372 cluster_uuid
=cluster_uuid
,
7373 kdu_instance
=kdu_instance
,
7374 primitive_name
=terminate_config_primitive
["name"],
7375 params
=primitive_params_
,
7377 total_timeout
=self
.timeout_primitive
,
7380 timeout
=self
.timeout_primitive
7381 * self
.timeout_primitive_outer_factor
,
7384 await asyncio
.wait_for(
7385 self
.k8scluster_map
[k8s_cluster_type
].scale(
7386 kdu_instance
=kdu_instance
,
7388 resource_name
=kdu_scaling_info
["resource-name"],
7389 total_timeout
=self
.timeout_scale_on_error
,
7391 cluster_uuid
=cluster_uuid
,
7392 kdu_model
=kdu_model
,
7396 timeout
=self
.timeout_scale_on_error
7397 * self
.timeout_scale_on_error_outer_factor
,
7400 if kdu_scaling_info
["type"] == "create":
7401 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7404 and kdu_config
.get("initial-config-primitive")
7405 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7407 initial_config_primitive_list
= kdu_config
.get(
7408 "initial-config-primitive"
7410 initial_config_primitive_list
.sort(
7411 key
=lambda val
: int(val
["seq"])
7414 for initial_config_primitive
in initial_config_primitive_list
:
7415 primitive_params_
= self
._map
_primitive
_params
(
7416 initial_config_primitive
, {}, {}
7418 step
= "execute initial config primitive"
7419 self
.logger
.debug(logging_text
+ step
)
7420 await asyncio
.wait_for(
7421 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7422 cluster_uuid
=cluster_uuid
,
7423 kdu_instance
=kdu_instance
,
7424 primitive_name
=initial_config_primitive
["name"],
7425 params
=primitive_params_
,
7432 async def _scale_ng_ro(
7433 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7435 nsr_id
= db_nslcmop
["nsInstanceId"]
7436 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7439 # read from db: vnfd's for every vnf
7442 # for each vnf in ns, read vnfd
7443 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7444 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7445 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7446 # if we haven't this vnfd, read it from db
7447 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7449 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7450 db_vnfds
.append(vnfd
)
7451 n2vc_key
= self
.n2vc
.get_public_key()
7452 n2vc_key_list
= [n2vc_key
]
7455 vdu_scaling_info
.get("vdu-create"),
7456 vdu_scaling_info
.get("vdu-delete"),
7459 # db_vnfr has been updated, update db_vnfrs to use it
7460 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7461 await self
._instantiate
_ng
_ro
(
7471 start_deploy
=time(),
7472 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7474 if vdu_scaling_info
.get("vdu-delete"):
7476 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7479 async def extract_prometheus_scrape_jobs(
7480 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7482 # look if exist a file called 'prometheus*.j2' and
7483 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7487 for f
in artifact_content
7488 if f
.startswith("prometheus") and f
.endswith(".j2")
7494 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7498 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7499 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7501 vnfr_id
= vnfr_id
.replace("-", "")
7503 "JOB_NAME": vnfr_id
,
7504 "TARGET_IP": target_ip
,
7505 "EXPORTER_POD_IP": host_name
,
7506 "EXPORTER_POD_PORT": host_port
,
7508 job_list
= parse_job(job_data
, variables
)
7509 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7510 for job
in job_list
:
7512 not isinstance(job
.get("job_name"), str)
7513 or vnfr_id
not in job
["job_name"]
7515 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7516 job
["nsr_id"] = nsr_id
7517 job
["vnfr_id"] = vnfr_id
7520 async def rebuild_start_stop(
7521 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7523 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7524 self
.logger
.info(logging_text
+ "Enter")
7525 stage
= ["Preparing the environment", ""]
7526 # database nsrs record
7530 # in case of error, indicates what part of scale was failed to put nsr at error status
7531 start_deploy
= time()
7533 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7534 vim_account_id
= db_vnfr
.get("vim-account-id")
7535 vim_info_key
= "vim:" + vim_account_id
7536 vdu_id
= additional_param
["vdu_id"]
7537 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7538 vdur
= find_in_list(
7539 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7542 vdu_vim_name
= vdur
["name"]
7543 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7544 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7546 raise LcmException("Target vdu is not found")
7547 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7548 # wait for any previous tasks in process
7549 stage
[1] = "Waiting for previous operations to terminate"
7550 self
.logger
.info(stage
[1])
7551 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7553 stage
[1] = "Reading from database."
7554 self
.logger
.info(stage
[1])
7555 self
._write
_ns
_status
(
7558 current_operation
=operation_type
.upper(),
7559 current_operation_id
=nslcmop_id
,
7561 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7564 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7565 db_nsr_update
["operational-status"] = operation_type
7566 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7570 "vim_vm_id": vim_vm_id
,
7572 "vdu_index": additional_param
["count-index"],
7573 "vdu_id": vdur
["id"],
7574 "target_vim": target_vim
,
7575 "vim_account_id": vim_account_id
,
7578 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7579 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7580 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7581 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7582 self
.logger
.info("response from RO: {}".format(result_dict
))
7583 action_id
= result_dict
["action_id"]
7584 await self
._wait
_ng
_ro
(
7589 self
.timeout_operate
,
7591 "start_stop_rebuild",
7593 return "COMPLETED", "Done"
7594 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7595 self
.logger
.error("Exit Exception {}".format(e
))
7597 except asyncio
.CancelledError
:
7598 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7599 exc
= "Operation was cancelled"
7600 except Exception as e
:
7601 exc
= traceback
.format_exc()
7602 self
.logger
.critical(
7603 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7605 return "FAILED", "Error in operate VNF {}".format(exc
)
7607 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7609 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7611 :param: vim_account_id: VIM Account ID
7613 :return: (cloud_name, cloud_credential)
7615 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7616 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7618 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7620 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7622 :param: vim_account_id: VIM Account ID
7624 :return: (cloud_name, cloud_credential)
7626 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7627 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7629 async def migrate(self
, nsr_id
, nslcmop_id
):
7631 Migrate VNFs and VDUs instances in a NS
7633 :param: nsr_id: NS Instance ID
7634 :param: nslcmop_id: nslcmop ID of migrate
7637 # Try to lock HA task here
7638 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7639 if not task_is_locked_by_me
:
7641 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7642 self
.logger
.debug(logging_text
+ "Enter")
7643 # get all needed from database
7645 db_nslcmop_update
= {}
7646 nslcmop_operation_state
= None
7650 # in case of error, indicates what part of scale was failed to put nsr at error status
7651 start_deploy
= time()
7654 # wait for any previous tasks in process
7655 step
= "Waiting for previous operations to terminate"
7656 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7658 self
._write
_ns
_status
(
7661 current_operation
="MIGRATING",
7662 current_operation_id
=nslcmop_id
,
7664 step
= "Getting nslcmop from database"
7666 step
+ " after having waited for previous tasks to be completed"
7668 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7669 migrate_params
= db_nslcmop
.get("operationParams")
7672 target
.update(migrate_params
)
7673 desc
= await self
.RO
.migrate(nsr_id
, target
)
7674 self
.logger
.debug("RO return > {}".format(desc
))
7675 action_id
= desc
["action_id"]
7676 await self
._wait
_ng
_ro
(
7681 self
.timeout_migrate
,
7682 operation
="migrate",
7684 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7685 self
.logger
.error("Exit Exception {}".format(e
))
7687 except asyncio
.CancelledError
:
7688 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7689 exc
= "Operation was cancelled"
7690 except Exception as e
:
7691 exc
= traceback
.format_exc()
7692 self
.logger
.critical(
7693 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7696 self
._write
_ns
_status
(
7699 current_operation
="IDLE",
7700 current_operation_id
=None,
7703 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7704 nslcmop_operation_state
= "FAILED"
7706 nslcmop_operation_state
= "COMPLETED"
7707 db_nslcmop_update
["detailed-status"] = "Done"
7708 db_nsr_update
["detailed-status"] = "Done"
7710 self
._write
_op
_status
(
7714 operation_state
=nslcmop_operation_state
,
7715 other_update
=db_nslcmop_update
,
7717 if nslcmop_operation_state
:
7721 "nslcmop_id": nslcmop_id
,
7722 "operationState": nslcmop_operation_state
,
7724 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7725 except Exception as e
:
7727 logging_text
+ "kafka_write notification Exception {}".format(e
)
7729 self
.logger
.debug(logging_text
+ "Exit")
7730 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7732 async def heal(self
, nsr_id
, nslcmop_id
):
7736 :param nsr_id: ns instance to heal
7737 :param nslcmop_id: operation to run
7741 # Try to lock HA task here
7742 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7743 if not task_is_locked_by_me
:
7746 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7747 stage
= ["", "", ""]
7748 tasks_dict_info
= {}
7749 # ^ stage, step, VIM progress
7750 self
.logger
.debug(logging_text
+ "Enter")
7751 # get all needed from database
7753 db_nslcmop_update
= {}
7755 db_vnfrs
= {} # vnf's info indexed by _id
7757 old_operational_status
= ""
7758 old_config_status
= ""
7761 # wait for any previous tasks in process
7762 step
= "Waiting for previous operations to terminate"
7763 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7764 self
._write
_ns
_status
(
7767 current_operation
="HEALING",
7768 current_operation_id
=nslcmop_id
,
7771 step
= "Getting nslcmop from database"
7773 step
+ " after having waited for previous tasks to be completed"
7775 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7777 step
= "Getting nsr from database"
7778 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7779 old_operational_status
= db_nsr
["operational-status"]
7780 old_config_status
= db_nsr
["config-status"]
7783 "_admin.deployed.RO.operational-status": "healing",
7785 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7787 step
= "Sending heal order to VIM"
7788 task_ro
= asyncio
.ensure_future(
7790 logging_text
=logging_text
,
7792 db_nslcmop
=db_nslcmop
,
7796 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7797 tasks_dict_info
[task_ro
] = "Healing at VIM"
7801 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7802 self
.logger
.debug(logging_text
+ stage
[1])
7803 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7804 self
.fs
.sync(db_nsr
["nsd-id"])
7806 # read from db: vnfr's of this ns
7807 step
= "Getting vnfrs from db"
7808 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7809 for vnfr
in db_vnfrs_list
:
7810 db_vnfrs
[vnfr
["_id"]] = vnfr
7811 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7813 # Check for each target VNF
7814 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7815 for target_vnf
in target_list
:
7816 # Find this VNF in the list from DB
7817 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7819 db_vnfr
= db_vnfrs
[vnfr_id
]
7820 vnfd_id
= db_vnfr
.get("vnfd-id")
7821 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7822 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7823 base_folder
= vnfd
["_admin"]["storage"]
7828 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7829 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7831 # Check each target VDU and deploy N2VC
7832 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7835 if not target_vdu_list
:
7836 # Codigo nuevo para crear diccionario
7837 target_vdu_list
= []
7838 for existing_vdu
in db_vnfr
.get("vdur"):
7839 vdu_name
= existing_vdu
.get("vdu-name", None)
7840 vdu_index
= existing_vdu
.get("count-index", 0)
7841 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7844 vdu_to_be_healed
= {
7846 "count-index": vdu_index
,
7847 "run-day1": vdu_run_day1
,
7849 target_vdu_list
.append(vdu_to_be_healed
)
7850 for target_vdu
in target_vdu_list
:
7851 deploy_params_vdu
= target_vdu
7852 # Set run-day1 vnf level value if not vdu level value exists
7853 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7856 deploy_params_vdu
["run-day1"] = target_vnf
[
7859 vdu_name
= target_vdu
.get("vdu-id", None)
7860 # TODO: Get vdu_id from vdud.
7862 # For multi instance VDU count-index is mandatory
7863 # For single session VDU count-indes is 0
7864 vdu_index
= target_vdu
.get("count-index", 0)
7866 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7867 stage
[1] = "Deploying Execution Environments."
7868 self
.logger
.debug(logging_text
+ stage
[1])
7870 # VNF Level charm. Normal case when proxy charms.
7871 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7872 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7873 if descriptor_config
:
7874 # Continue if healed machine is management machine
7875 vnf_ip_address
= db_vnfr
.get("ip-address")
7876 target_instance
= None
7877 for instance
in db_vnfr
.get("vdur", None):
7879 instance
["vdu-name"] == vdu_name
7880 and instance
["count-index"] == vdu_index
7882 target_instance
= instance
7884 if vnf_ip_address
== target_instance
.get("ip-address"):
7886 logging_text
=logging_text
7887 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7888 member_vnf_index
, vdu_name
, vdu_index
7892 nslcmop_id
=nslcmop_id
,
7898 member_vnf_index
=member_vnf_index
,
7901 deploy_params
=deploy_params_vdu
,
7902 descriptor_config
=descriptor_config
,
7903 base_folder
=base_folder
,
7904 task_instantiation_info
=tasks_dict_info
,
7908 # VDU Level charm. Normal case with native charms.
7909 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7910 if descriptor_config
:
7912 logging_text
=logging_text
7913 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7914 member_vnf_index
, vdu_name
, vdu_index
7918 nslcmop_id
=nslcmop_id
,
7924 member_vnf_index
=member_vnf_index
,
7925 vdu_index
=vdu_index
,
7927 deploy_params
=deploy_params_vdu
,
7928 descriptor_config
=descriptor_config
,
7929 base_folder
=base_folder
,
7930 task_instantiation_info
=tasks_dict_info
,
7935 ROclient
.ROClientException
,
7940 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7942 except asyncio
.CancelledError
:
7944 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7946 exc
= "Operation was cancelled"
7947 except Exception as e
:
7948 exc
= traceback
.format_exc()
7949 self
.logger
.critical(
7950 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7955 stage
[1] = "Waiting for healing pending tasks."
7956 self
.logger
.debug(logging_text
+ stage
[1])
7957 exc
= await self
._wait
_for
_tasks
(
7960 self
.timeout_ns_deploy
,
7968 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7969 nslcmop_operation_state
= "FAILED"
7971 db_nsr_update
["operational-status"] = old_operational_status
7972 db_nsr_update
["config-status"] = old_config_status
7975 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7976 for task
, task_name
in tasks_dict_info
.items():
7977 if not task
.done() or task
.cancelled() or task
.exception():
7978 if task_name
.startswith(self
.task_name_deploy_vca
):
7979 # A N2VC task is pending
7980 db_nsr_update
["config-status"] = "failed"
7982 # RO task is pending
7983 db_nsr_update
["operational-status"] = "failed"
7985 error_description_nslcmop
= None
7986 nslcmop_operation_state
= "COMPLETED"
7987 db_nslcmop_update
["detailed-status"] = "Done"
7988 db_nsr_update
["detailed-status"] = "Done"
7989 db_nsr_update
["operational-status"] = "running"
7990 db_nsr_update
["config-status"] = "configured"
7992 self
._write
_op
_status
(
7995 error_message
=error_description_nslcmop
,
7996 operation_state
=nslcmop_operation_state
,
7997 other_update
=db_nslcmop_update
,
8000 self
._write
_ns
_status
(
8003 current_operation
="IDLE",
8004 current_operation_id
=None,
8005 other_update
=db_nsr_update
,
8008 if nslcmop_operation_state
:
8012 "nslcmop_id": nslcmop_id
,
8013 "operationState": nslcmop_operation_state
,
8015 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
8016 except Exception as e
:
8018 logging_text
+ "kafka_write notification Exception {}".format(e
)
8020 self
.logger
.debug(logging_text
+ "Exit")
8021 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
8032 :param logging_text: preffix text to use at logging
8033 :param nsr_id: nsr identity
8034 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
8035 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
8036 :return: None or exception
8039 def get_vim_account(vim_account_id
):
8041 if vim_account_id
in db_vims
:
8042 return db_vims
[vim_account_id
]
8043 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
8044 db_vims
[vim_account_id
] = db_vim
8049 ns_params
= db_nslcmop
.get("operationParams")
8050 if ns_params
and ns_params
.get("timeout_ns_heal"):
8051 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
8053 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
8057 nslcmop_id
= db_nslcmop
["_id"]
8059 "action_id": nslcmop_id
,
8061 self
.logger
.warning(
8062 "db_nslcmop={} and timeout_ns_heal={}".format(
8063 db_nslcmop
, timeout_ns_heal
8066 target
.update(db_nslcmop
.get("operationParams", {}))
8068 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
8069 desc
= await self
.RO
.recreate(nsr_id
, target
)
8070 self
.logger
.debug("RO return > {}".format(desc
))
8071 action_id
= desc
["action_id"]
8072 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
8073 await self
._wait
_ng
_ro
(
8080 operation
="healing",
8085 "_admin.deployed.RO.operational-status": "running",
8086 "detailed-status": " ".join(stage
),
8088 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
8089 self
._write
_op
_status
(nslcmop_id
, stage
)
8091 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
8094 except Exception as e
:
8095 stage
[2] = "ERROR healing at VIM"
8096 # self.set_vnfr_at_error(db_vnfrs, str(e))
8098 "Error healing at VIM {}".format(e
),
8099 exc_info
=not isinstance(
8102 ROclient
.ROClientException
,
8128 task_instantiation_info
,
8131 # launch instantiate_N2VC in a asyncio task and register task object
8132 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
8133 # if not found, create one entry and update database
8134 # fill db_nsr._admin.deployed.VCA.<index>
8137 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
8141 get_charm_name
= False
8142 if "execution-environment-list" in descriptor_config
:
8143 ee_list
= descriptor_config
.get("execution-environment-list", [])
8144 elif "juju" in descriptor_config
:
8145 ee_list
= [descriptor_config
] # ns charms
8146 if "execution-environment-list" not in descriptor_config
:
8147 # charm name is only required for ns charms
8148 get_charm_name
= True
8149 else: # other types as script are not supported
8152 for ee_item
in ee_list
:
8155 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8156 ee_item
.get("juju"), ee_item
.get("helm-chart")
8159 ee_descriptor_id
= ee_item
.get("id")
8160 if ee_item
.get("juju"):
8161 vca_name
= ee_item
["juju"].get("charm")
8163 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8166 if ee_item
["juju"].get("charm") is not None
8169 if ee_item
["juju"].get("cloud") == "k8s":
8170 vca_type
= "k8s_proxy_charm"
8171 elif ee_item
["juju"].get("proxy") is False:
8172 vca_type
= "native_charm"
8173 elif ee_item
.get("helm-chart"):
8174 vca_name
= ee_item
["helm-chart"]
8175 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8178 vca_type
= "helm-v3"
8181 logging_text
+ "skipping non juju neither charm configuration"
8186 for vca_index
, vca_deployed
in enumerate(
8187 db_nsr
["_admin"]["deployed"]["VCA"]
8189 if not vca_deployed
:
8192 vca_deployed
.get("member-vnf-index") == member_vnf_index
8193 and vca_deployed
.get("vdu_id") == vdu_id
8194 and vca_deployed
.get("kdu_name") == kdu_name
8195 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8196 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8200 # not found, create one.
8202 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8205 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8207 target
+= "/kdu/{}".format(kdu_name
)
8209 "target_element": target
,
8210 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8211 "member-vnf-index": member_vnf_index
,
8213 "kdu_name": kdu_name
,
8214 "vdu_count_index": vdu_index
,
8215 "operational-status": "init", # TODO revise
8216 "detailed-status": "", # TODO revise
8217 "step": "initial-deploy", # TODO revise
8219 "vdu_name": vdu_name
,
8221 "ee_descriptor_id": ee_descriptor_id
,
8222 "charm_name": charm_name
,
8226 # create VCA and configurationStatus in db
8228 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8229 "configurationStatus.{}".format(vca_index
): dict(),
8231 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8233 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8235 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8236 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8237 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8240 task_n2vc
= asyncio
.ensure_future(
8242 logging_text
=logging_text
,
8243 vca_index
=vca_index
,
8249 vdu_index
=vdu_index
,
8250 deploy_params
=deploy_params
,
8251 config_descriptor
=descriptor_config
,
8252 base_folder
=base_folder
,
8253 nslcmop_id
=nslcmop_id
,
8257 ee_config_descriptor
=ee_item
,
8260 self
.lcm_tasks
.register(
8264 "instantiate_N2VC-{}".format(vca_index
),
8267 task_instantiation_info
[
8269 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8270 member_vnf_index
or "", vdu_id
or ""
8273 async def heal_N2VC(
8290 ee_config_descriptor
,
8292 nsr_id
= db_nsr
["_id"]
8293 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8294 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8295 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8296 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8298 "collection": "nsrs",
8299 "filter": {"_id": nsr_id
},
8300 "path": db_update_entry
,
8306 element_under_configuration
= nsr_id
8310 vnfr_id
= db_vnfr
["_id"]
8311 osm_config
["osm"]["vnf_id"] = vnfr_id
8313 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8315 if vca_type
== "native_charm":
8318 index_number
= vdu_index
or 0
8321 element_type
= "VNF"
8322 element_under_configuration
= vnfr_id
8323 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8325 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8326 element_type
= "VDU"
8327 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8328 osm_config
["osm"]["vdu_id"] = vdu_id
8330 namespace
+= ".{}".format(kdu_name
)
8331 element_type
= "KDU"
8332 element_under_configuration
= kdu_name
8333 osm_config
["osm"]["kdu_name"] = kdu_name
8336 if base_folder
["pkg-dir"]:
8337 artifact_path
= "{}/{}/{}/{}".format(
8338 base_folder
["folder"],
8339 base_folder
["pkg-dir"],
8342 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8347 artifact_path
= "{}/Scripts/{}/{}/".format(
8348 base_folder
["folder"],
8351 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8356 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8358 # get initial_config_primitive_list that applies to this element
8359 initial_config_primitive_list
= config_descriptor
.get(
8360 "initial-config-primitive"
8364 "Initial config primitive list > {}".format(
8365 initial_config_primitive_list
8369 # add config if not present for NS charm
8370 ee_descriptor_id
= ee_config_descriptor
.get("id")
8371 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8372 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8373 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8377 "Initial config primitive list #2 > {}".format(
8378 initial_config_primitive_list
8381 # n2vc_redesign STEP 3.1
8382 # find old ee_id if exists
8383 ee_id
= vca_deployed
.get("ee_id")
8385 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8386 # create or register execution environment in VCA. Only for native charms when healing
8387 if vca_type
== "native_charm":
8388 step
= "Waiting to VM being up and getting IP address"
8389 self
.logger
.debug(logging_text
+ step
)
8390 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8399 credentials
= {"hostname": rw_mgmt_ip
}
8401 username
= deep_get(
8402 config_descriptor
, ("config-access", "ssh-access", "default-user")
8404 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8405 # merged. Meanwhile let's get username from initial-config-primitive
8406 if not username
and initial_config_primitive_list
:
8407 for config_primitive
in initial_config_primitive_list
:
8408 for param
in config_primitive
.get("parameter", ()):
8409 if param
["name"] == "ssh-username":
8410 username
= param
["value"]
8414 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8415 "'config-access.ssh-access.default-user'"
8417 credentials
["username"] = username
8419 # n2vc_redesign STEP 3.2
8420 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8421 self
._write
_configuration
_status
(
8423 vca_index
=vca_index
,
8424 status
="REGISTERING",
8425 element_under_configuration
=element_under_configuration
,
8426 element_type
=element_type
,
8429 step
= "register execution environment {}".format(credentials
)
8430 self
.logger
.debug(logging_text
+ step
)
8431 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8432 credentials
=credentials
,
8433 namespace
=namespace
,
8438 # update ee_id en db
8440 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8442 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8444 # for compatibility with MON/POL modules, the need model and application name at database
8445 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8446 # Not sure if this need to be done when healing
8448 ee_id_parts = ee_id.split(".")
8449 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8450 if len(ee_id_parts) >= 2:
8451 model_name = ee_id_parts[0]
8452 application_name = ee_id_parts[1]
8453 db_nsr_update[db_update_entry + "model"] = model_name
8454 db_nsr_update[db_update_entry + "application"] = application_name
8457 # n2vc_redesign STEP 3.3
8458 # Install configuration software. Only for native charms.
8459 step
= "Install configuration Software"
8461 self
._write
_configuration
_status
(
8463 vca_index
=vca_index
,
8464 status
="INSTALLING SW",
8465 element_under_configuration
=element_under_configuration
,
8466 element_type
=element_type
,
8467 # other_update=db_nsr_update,
8471 # TODO check if already done
8472 self
.logger
.debug(logging_text
+ step
)
8474 if vca_type
== "native_charm":
8475 config_primitive
= next(
8476 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8479 if config_primitive
:
8480 config
= self
._map
_primitive
_params
(
8481 config_primitive
, {}, deploy_params
8483 await self
.vca_map
[vca_type
].install_configuration_sw(
8485 artifact_path
=artifact_path
,
8493 # write in db flag of configuration_sw already installed
8495 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8498 # Not sure if this need to be done when healing
8500 # add relations for this VCA (wait for other peers related with this VCA)
8501 await self._add_vca_relations(
8502 logging_text=logging_text,
8505 vca_index=vca_index,
8509 # if SSH access is required, then get execution environment SSH public
8510 # if native charm we have waited already to VM be UP
8511 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8514 # self.logger.debug("get ssh key block")
8516 config_descriptor
, ("config-access", "ssh-access", "required")
8518 # self.logger.debug("ssh key needed")
8519 # Needed to inject a ssh key
8522 ("config-access", "ssh-access", "default-user"),
8524 step
= "Install configuration Software, getting public ssh key"
8525 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8526 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8529 step
= "Insert public key into VM user={} ssh_key={}".format(
8533 # self.logger.debug("no need to get ssh key")
8534 step
= "Waiting to VM being up and getting IP address"
8535 self
.logger
.debug(logging_text
+ step
)
8537 # n2vc_redesign STEP 5.1
8538 # wait for RO (ip-address) Insert pub_key into VM
8539 # IMPORTANT: We need do wait for RO to complete healing operation.
8540 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8543 rw_mgmt_ip
= await self
.wait_kdu_up(
8544 logging_text
, nsr_id
, vnfr_id
, kdu_name
8547 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8557 rw_mgmt_ip
= None # This is for a NS configuration
8559 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8561 # store rw_mgmt_ip in deploy params for later replacement
8562 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8565 # get run-day1 operation parameter
8566 runDay1
= deploy_params
.get("run-day1", False)
8568 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8571 # n2vc_redesign STEP 6 Execute initial config primitive
8572 step
= "execute initial config primitive"
8574 # wait for dependent primitives execution (NS -> VNF -> VDU)
8575 if initial_config_primitive_list
:
8576 await self
._wait
_dependent
_n
2vc
(
8577 nsr_id
, vca_deployed_list
, vca_index
8580 # stage, in function of element type: vdu, kdu, vnf or ns
8581 my_vca
= vca_deployed_list
[vca_index
]
8582 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8584 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8585 elif my_vca
.get("member-vnf-index"):
8587 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8590 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8592 self
._write
_configuration
_status
(
8593 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8596 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8598 check_if_terminated_needed
= True
8599 for initial_config_primitive
in initial_config_primitive_list
:
8600 # adding information on the vca_deployed if it is a NS execution environment
8601 if not vca_deployed
["member-vnf-index"]:
8602 deploy_params
["ns_config_info"] = json
.dumps(
8603 self
._get
_ns
_config
_info
(nsr_id
)
8605 # TODO check if already done
8606 primitive_params_
= self
._map
_primitive
_params
(
8607 initial_config_primitive
, {}, deploy_params
8610 step
= "execute primitive '{}' params '{}'".format(
8611 initial_config_primitive
["name"], primitive_params_
8613 self
.logger
.debug(logging_text
+ step
)
8614 await self
.vca_map
[vca_type
].exec_primitive(
8616 primitive_name
=initial_config_primitive
["name"],
8617 params_dict
=primitive_params_
,
8622 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8623 if check_if_terminated_needed
:
8624 if config_descriptor
.get("terminate-config-primitive"):
8628 {db_update_entry
+ "needed_terminate": True},
8630 check_if_terminated_needed
= False
8632 # TODO register in database that primitive is done
8634 # STEP 7 Configure metrics
8635 # Not sure if this need to be done when healing
8637 if vca_type == "helm" or vca_type == "helm-v3":
8638 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8640 artifact_path=artifact_path,
8641 ee_config_descriptor=ee_config_descriptor,
8644 target_ip=rw_mgmt_ip,
8650 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8653 for job in prometheus_jobs:
8656 {"job_name": job["job_name"]},
8659 fail_on_empty=False,
8663 step
= "instantiated at VCA"
8664 self
.logger
.debug(logging_text
+ step
)
8666 self
._write
_configuration
_status
(
8667 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8670 except Exception as e
: # TODO not use Exception but N2VC exception
8671 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8673 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8676 "Exception while {} : {}".format(step
, e
), exc_info
=True
8678 self
._write
_configuration
_status
(
8679 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8681 raise LcmException("{} {}".format(step
, e
)) from e
8683 async def _wait_heal_ro(
8689 while time() <= start_time
+ timeout
:
8690 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8691 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8692 "operational-status"
8694 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8695 if operational_status_ro
!= "healing":
8697 await asyncio
.sleep(15, loop
=self
.loop
)
8698 else: # timeout_ns_deploy
8699 raise NgRoException("Timeout waiting ns to deploy")
8701 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8703 Vertical Scale the VDUs in a NS
8705 :param: nsr_id: NS Instance ID
8706 :param: nslcmop_id: nslcmop ID of migrate
8709 # Try to lock HA task here
8710 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8711 if not task_is_locked_by_me
:
8713 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8714 self
.logger
.debug(logging_text
+ "Enter")
8715 # get all needed from database
8717 db_nslcmop_update
= {}
8718 nslcmop_operation_state
= None
8722 # in case of error, indicates what part of scale was failed to put nsr at error status
8723 start_deploy
= time()
8726 # wait for any previous tasks in process
8727 step
= "Waiting for previous operations to terminate"
8728 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8730 self
._write
_ns
_status
(
8733 current_operation
="VerticalScale",
8734 current_operation_id
=nslcmop_id
,
8736 step
= "Getting nslcmop from database"
8738 step
+ " after having waited for previous tasks to be completed"
8740 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8741 operationParams
= db_nslcmop
.get("operationParams")
8743 target
.update(operationParams
)
8744 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8745 self
.logger
.debug("RO return > {}".format(desc
))
8746 action_id
= desc
["action_id"]
8747 await self
._wait
_ng
_ro
(
8752 self
.timeout_verticalscale
,
8753 operation
="verticalscale",
8755 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8756 self
.logger
.error("Exit Exception {}".format(e
))
8758 except asyncio
.CancelledError
:
8759 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8760 exc
= "Operation was cancelled"
8761 except Exception as e
:
8762 exc
= traceback
.format_exc()
8763 self
.logger
.critical(
8764 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8767 self
._write
_ns
_status
(
8770 current_operation
="IDLE",
8771 current_operation_id
=None,
8774 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8775 nslcmop_operation_state
= "FAILED"
8777 nslcmop_operation_state
= "COMPLETED"
8778 db_nslcmop_update
["detailed-status"] = "Done"
8779 db_nsr_update
["detailed-status"] = "Done"
8781 self
._write
_op
_status
(
8785 operation_state
=nslcmop_operation_state
,
8786 other_update
=db_nslcmop_update
,
8788 if nslcmop_operation_state
:
8792 "nslcmop_id": nslcmop_id
,
8793 "operationState": nslcmop_operation_state
,
8795 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8796 except Exception as e
:
8798 logging_text
+ "kafka_write notification Exception {}".format(e
)
8800 self
.logger
.debug(logging_text
+ "Exit")
8801 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")