1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
228 "verticalscale": self
.RO
.status
,
229 "start_stop_rebuild": self
.RO
.status
,
233 def increment_ip_mac(ip_mac
, vm_index
=1):
234 if not isinstance(ip_mac
, str):
237 # try with ipv4 look for last dot
238 i
= ip_mac
.rfind(".")
241 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
242 # try with ipv6 or mac look for last colon. Operate in hex
243 i
= ip_mac
.rfind(":")
246 # format in hex, len can be 2 for mac or 4 for ipv6
247 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
248 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
254 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
256 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
259 # TODO filter RO descriptor fields...
263 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
264 db_dict
["deploymentStatus"] = ro_descriptor
265 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
267 except Exception as e
:
269 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
272 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
274 # remove last dot from path (if exists)
275 if path
.endswith("."):
278 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
279 # .format(table, filter, path, updated_data))
282 nsr_id
= filter.get("_id")
284 # read ns record from database
285 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
286 current_ns_status
= nsr
.get("nsState")
288 # get vca status for NS
289 status_dict
= await self
.n2vc
.get_status(
290 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
295 db_dict
["vcaStatus"] = status_dict
296 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
404 if cluster_type
in ("juju-bundle", "juju"):
405 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
406 # status in a similar way between Juju Bundles and Helm Charts on this side
407 await self
.k8sclusterjuju
.update_vca_status(
408 db_dict
["vcaStatus"],
414 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
418 self
.update_db_2("nsrs", nsr_id
, db_dict
)
419 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
421 except Exception as e
:
422 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
425 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
427 env
= Environment(undefined
=StrictUndefined
, autoescape
=True)
428 template
= env
.from_string(cloud_init_text
)
429 return template
.render(additional_params
or {})
430 except UndefinedError
as e
:
432 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
433 "file, must be provided in the instantiation parameters inside the "
434 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
436 except (TemplateError
, TemplateNotFound
) as e
:
438 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
443 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
444 cloud_init_content
= cloud_init_file
= None
446 if vdu
.get("cloud-init-file"):
447 base_folder
= vnfd
["_admin"]["storage"]
448 if base_folder
["pkg-dir"]:
449 cloud_init_file
= "{}/{}/cloud_init/{}".format(
450 base_folder
["folder"],
451 base_folder
["pkg-dir"],
452 vdu
["cloud-init-file"],
455 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
456 base_folder
["folder"],
457 vdu
["cloud-init-file"],
459 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
460 cloud_init_content
= ci_file
.read()
461 elif vdu
.get("cloud-init"):
462 cloud_init_content
= vdu
["cloud-init"]
464 return cloud_init_content
465 except FsException
as e
:
467 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
468 vnfd
["id"], vdu
["id"], cloud_init_file
, e
472 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
474 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
476 additional_params
= vdur
.get("additionalParams")
477 return parse_yaml_strings(additional_params
)
479 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
481 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
482 :param vnfd: input vnfd
483 :param new_id: overrides vnf id if provided
484 :param additionalParams: Instantiation params for VNFs provided
485 :param nsrId: Id of the NSR
486 :return: copy of vnfd
488 vnfd_RO
= deepcopy(vnfd
)
489 # remove unused by RO configuration, monitoring, scaling and internal keys
490 vnfd_RO
.pop("_id", None)
491 vnfd_RO
.pop("_admin", None)
492 vnfd_RO
.pop("monitoring-param", None)
493 vnfd_RO
.pop("scaling-group-descriptor", None)
494 vnfd_RO
.pop("kdu", None)
495 vnfd_RO
.pop("k8s-cluster", None)
497 vnfd_RO
["id"] = new_id
499 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
500 for vdu
in get_iterable(vnfd_RO
, "vdu"):
501 vdu
.pop("cloud-init-file", None)
502 vdu
.pop("cloud-init", None)
506 def ip_profile_2_RO(ip_profile
):
507 RO_ip_profile
= deepcopy(ip_profile
)
508 if "dns-server" in RO_ip_profile
:
509 if isinstance(RO_ip_profile
["dns-server"], list):
510 RO_ip_profile
["dns-address"] = []
511 for ds
in RO_ip_profile
.pop("dns-server"):
512 RO_ip_profile
["dns-address"].append(ds
["address"])
514 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
515 if RO_ip_profile
.get("ip-version") == "ipv4":
516 RO_ip_profile
["ip-version"] = "IPv4"
517 if RO_ip_profile
.get("ip-version") == "ipv6":
518 RO_ip_profile
["ip-version"] = "IPv6"
519 if "dhcp-params" in RO_ip_profile
:
520 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
523 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
524 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
525 if db_vim
["_admin"]["operationalState"] != "ENABLED":
527 "VIM={} is not available. operationalState={}".format(
528 vim_account
, db_vim
["_admin"]["operationalState"]
531 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
534 def get_ro_wim_id_for_wim_account(self
, wim_account
):
535 if isinstance(wim_account
, str):
536 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
537 if db_wim
["_admin"]["operationalState"] != "ENABLED":
539 "WIM={} is not available. operationalState={}".format(
540 wim_account
, db_wim
["_admin"]["operationalState"]
543 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
548 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
550 db_vdu_push_list
= []
552 db_update
= {"_admin.modified": time()}
554 for vdu_id
, vdu_count
in vdu_create
.items():
558 for vdur
in reversed(db_vnfr
["vdur"])
559 if vdur
["vdu-id-ref"] == vdu_id
564 # Read the template saved in the db:
566 "No vdur in the database. Using the vdur-template to scale"
568 vdur_template
= db_vnfr
.get("vdur-template")
569 if not vdur_template
:
571 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
575 vdur
= vdur_template
[0]
576 # Delete a template from the database after using it
579 {"_id": db_vnfr
["_id"]},
581 pull
={"vdur-template": {"_id": vdur
["_id"]}},
583 for count
in range(vdu_count
):
584 vdur_copy
= deepcopy(vdur
)
585 vdur_copy
["status"] = "BUILD"
586 vdur_copy
["status-detailed"] = None
587 vdur_copy
["ip-address"] = None
588 vdur_copy
["_id"] = str(uuid4())
589 vdur_copy
["count-index"] += count
+ 1
590 vdur_copy
["id"] = "{}-{}".format(
591 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
593 vdur_copy
.pop("vim_info", None)
594 for iface
in vdur_copy
["interfaces"]:
595 if iface
.get("fixed-ip"):
596 iface
["ip-address"] = self
.increment_ip_mac(
597 iface
["ip-address"], count
+ 1
600 iface
.pop("ip-address", None)
601 if iface
.get("fixed-mac"):
602 iface
["mac-address"] = self
.increment_ip_mac(
603 iface
["mac-address"], count
+ 1
606 iface
.pop("mac-address", None)
610 ) # only first vdu can be managment of vnf
611 db_vdu_push_list
.append(vdur_copy
)
612 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
614 if len(db_vnfr
["vdur"]) == 1:
615 # The scale will move to 0 instances
617 "Scaling to 0 !, creating the template with the last vdur"
619 template_vdur
= [db_vnfr
["vdur"][0]]
620 for vdu_id
, vdu_count
in vdu_delete
.items():
622 indexes_to_delete
= [
624 for iv
in enumerate(db_vnfr
["vdur"])
625 if iv
[1]["vdu-id-ref"] == vdu_id
629 "vdur.{}.status".format(i
): "DELETING"
630 for i
in indexes_to_delete
[-vdu_count
:]
634 # it must be deleted one by one because common.db does not allow otherwise
637 for v
in reversed(db_vnfr
["vdur"])
638 if v
["vdu-id-ref"] == vdu_id
640 for vdu
in vdus_to_delete
[:vdu_count
]:
643 {"_id": db_vnfr
["_id"]},
645 pull
={"vdur": {"_id": vdu
["_id"]}},
649 db_push
["vdur"] = db_vdu_push_list
651 db_push
["vdur-template"] = template_vdur
654 db_vnfr
["vdur-template"] = template_vdur
655 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
656 # modify passed dictionary db_vnfr
657 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
658 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
660 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
662 Updates database nsr with the RO info for the created vld
663 :param ns_update_nsr: dictionary to be filled with the updated info
664 :param db_nsr: content of db_nsr. This is also modified
665 :param nsr_desc_RO: nsr descriptor from RO
666 :return: Nothing, LcmException is raised on errors
669 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
670 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
671 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
673 vld
["vim-id"] = net_RO
.get("vim_net_id")
674 vld
["name"] = net_RO
.get("vim_name")
675 vld
["status"] = net_RO
.get("status")
676 vld
["status-detailed"] = net_RO
.get("error_msg")
677 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
681 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
684 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
686 for db_vnfr
in db_vnfrs
.values():
687 vnfr_update
= {"status": "ERROR"}
688 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
689 if "status" not in vdur
:
690 vdur
["status"] = "ERROR"
691 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
693 vdur
["status-detailed"] = str(error_text
)
695 "vdur.{}.status-detailed".format(vdu_index
)
697 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
698 except DbException
as e
:
699 self
.logger
.error("Cannot update vnf. {}".format(e
))
701 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
703 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
704 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
705 :param nsr_desc_RO: nsr descriptor from RO
706 :return: Nothing, LcmException is raised on errors
708 for vnf_index
, db_vnfr
in db_vnfrs
.items():
709 for vnf_RO
in nsr_desc_RO
["vnfs"]:
710 if vnf_RO
["member_vnf_index"] != vnf_index
:
713 if vnf_RO
.get("ip_address"):
714 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
717 elif not db_vnfr
.get("ip-address"):
718 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
719 raise LcmExceptionNoMgmtIP(
720 "ns member_vnf_index '{}' has no IP address".format(
725 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
726 vdur_RO_count_index
= 0
727 if vdur
.get("pdu-type"):
729 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
730 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
732 if vdur
["count-index"] != vdur_RO_count_index
:
733 vdur_RO_count_index
+= 1
735 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
736 if vdur_RO
.get("ip_address"):
737 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
739 vdur
["ip-address"] = None
740 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
741 vdur
["name"] = vdur_RO
.get("vim_name")
742 vdur
["status"] = vdur_RO
.get("status")
743 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
744 for ifacer
in get_iterable(vdur
, "interfaces"):
745 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
746 if ifacer
["name"] == interface_RO
.get("internal_name"):
747 ifacer
["ip-address"] = interface_RO
.get(
750 ifacer
["mac-address"] = interface_RO
.get(
756 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
757 "from VIM info".format(
758 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
761 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
765 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
767 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
771 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
772 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
773 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
775 vld
["vim-id"] = net_RO
.get("vim_net_id")
776 vld
["name"] = net_RO
.get("vim_name")
777 vld
["status"] = net_RO
.get("status")
778 vld
["status-detailed"] = net_RO
.get("error_msg")
779 vnfr_update
["vld.{}".format(vld_index
)] = vld
783 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
788 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
793 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
798 def _get_ns_config_info(self
, nsr_id
):
800 Generates a mapping between vnf,vdu elements and the N2VC id
801 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
802 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
803 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
804 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
806 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
807 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
809 ns_config_info
= {"osm-config-mapping": mapping
}
810 for vca
in vca_deployed_list
:
811 if not vca
["member-vnf-index"]:
813 if not vca
["vdu_id"]:
814 mapping
[vca
["member-vnf-index"]] = vca
["application"]
818 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
820 ] = vca
["application"]
821 return ns_config_info
823 async def _instantiate_ng_ro(
840 def get_vim_account(vim_account_id
):
842 if vim_account_id
in db_vims
:
843 return db_vims
[vim_account_id
]
844 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
845 db_vims
[vim_account_id
] = db_vim
848 # modify target_vld info with instantiation parameters
849 def parse_vld_instantiation_params(
850 target_vim
, target_vld
, vld_params
, target_sdn
852 if vld_params
.get("ip-profile"):
853 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
856 if vld_params
.get("provider-network"):
857 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
860 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
861 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
864 if vld_params
.get("wimAccountId"):
865 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
866 target_vld
["vim_info"][target_wim
] = {}
867 for param
in ("vim-network-name", "vim-network-id"):
868 if vld_params
.get(param
):
869 if isinstance(vld_params
[param
], dict):
870 for vim
, vim_net
in vld_params
[param
].items():
871 other_target_vim
= "vim:" + vim
873 target_vld
["vim_info"],
874 (other_target_vim
, param
.replace("-", "_")),
877 else: # isinstance str
878 target_vld
["vim_info"][target_vim
][
879 param
.replace("-", "_")
880 ] = vld_params
[param
]
881 if vld_params
.get("common_id"):
882 target_vld
["common_id"] = vld_params
.get("common_id")
884 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
885 def update_ns_vld_target(target
, ns_params
):
886 for vnf_params
in ns_params
.get("vnf", ()):
887 if vnf_params
.get("vimAccountId"):
891 for vnfr
in db_vnfrs
.values()
892 if vnf_params
["member-vnf-index"]
893 == vnfr
["member-vnf-index-ref"]
897 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
898 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
899 target_vld
= find_in_list(
900 get_iterable(vdur
, "interfaces"),
901 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
904 vld_params
= find_in_list(
905 get_iterable(ns_params
, "vld"),
906 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
910 if vnf_params
.get("vimAccountId") not in a_vld
.get(
913 target_vim_network_list
= [
914 v
for _
, v
in a_vld
.get("vim_info").items()
916 target_vim_network_name
= next(
918 item
.get("vim_network_name", "")
919 for item
in target_vim_network_list
924 target
["ns"]["vld"][a_index
].get("vim_info").update(
926 "vim:{}".format(vnf_params
["vimAccountId"]): {
927 "vim_network_name": target_vim_network_name
,
933 for param
in ("vim-network-name", "vim-network-id"):
934 if vld_params
.get(param
) and isinstance(
935 vld_params
[param
], dict
937 for vim
, vim_net
in vld_params
[
940 other_target_vim
= "vim:" + vim
942 target
["ns"]["vld"][a_index
].get(
947 param
.replace("-", "_"),
952 nslcmop_id
= db_nslcmop
["_id"]
954 "name": db_nsr
["name"],
957 "image": deepcopy(db_nsr
["image"]),
958 "flavor": deepcopy(db_nsr
["flavor"]),
959 "action_id": nslcmop_id
,
960 "cloud_init_content": {},
962 for image
in target
["image"]:
963 image
["vim_info"] = {}
964 for flavor
in target
["flavor"]:
965 flavor
["vim_info"] = {}
966 if db_nsr
.get("affinity-or-anti-affinity-group"):
967 target
["affinity-or-anti-affinity-group"] = deepcopy(
968 db_nsr
["affinity-or-anti-affinity-group"]
970 for affinity_or_anti_affinity_group
in target
[
971 "affinity-or-anti-affinity-group"
973 affinity_or_anti_affinity_group
["vim_info"] = {}
975 if db_nslcmop
.get("lcmOperationType") != "instantiate":
976 # get parameters of instantiation:
977 db_nslcmop_instantiate
= self
.db
.get_list(
980 "nsInstanceId": db_nslcmop
["nsInstanceId"],
981 "lcmOperationType": "instantiate",
984 ns_params
= db_nslcmop_instantiate
.get("operationParams")
986 ns_params
= db_nslcmop
.get("operationParams")
987 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
988 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
991 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
992 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
996 "mgmt-network": vld
.get("mgmt-network", False),
997 "type": vld
.get("type"),
1000 "vim_network_name": vld
.get("vim-network-name"),
1001 "vim_account_id": ns_params
["vimAccountId"],
1005 # check if this network needs SDN assist
1006 if vld
.get("pci-interfaces"):
1007 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1008 if vim_config
:= db_vim
.get("config"):
1009 if sdnc_id
:= vim_config
.get("sdn-controller"):
1010 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1011 target_sdn
= "sdn:{}".format(sdnc_id
)
1012 target_vld
["vim_info"][target_sdn
] = {
1014 "target_vim": target_vim
,
1016 "type": vld
.get("type"),
1019 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1020 for nsd_vnf_profile
in nsd_vnf_profiles
:
1021 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1022 if cp
["virtual-link-profile-id"] == vld
["id"]:
1024 "member_vnf:{}.{}".format(
1025 cp
["constituent-cpd-id"][0][
1026 "constituent-base-element-id"
1028 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1030 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1032 # check at nsd descriptor, if there is an ip-profile
1034 nsd_vlp
= find_in_list(
1035 get_virtual_link_profiles(nsd
),
1036 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1041 and nsd_vlp
.get("virtual-link-protocol-data")
1042 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1044 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1047 ip_profile_dest_data
= {}
1048 if "ip-version" in ip_profile_source_data
:
1049 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1052 if "cidr" in ip_profile_source_data
:
1053 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1056 if "gateway-ip" in ip_profile_source_data
:
1057 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1060 if "dhcp-enabled" in ip_profile_source_data
:
1061 ip_profile_dest_data
["dhcp-params"] = {
1062 "enabled": ip_profile_source_data
["dhcp-enabled"]
1064 vld_params
["ip-profile"] = ip_profile_dest_data
1066 # update vld_params with instantiation params
1067 vld_instantiation_params
= find_in_list(
1068 get_iterable(ns_params
, "vld"),
1069 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1071 if vld_instantiation_params
:
1072 vld_params
.update(vld_instantiation_params
)
1073 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1074 target
["ns"]["vld"].append(target_vld
)
1075 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1076 update_ns_vld_target(target
, ns_params
)
1078 for vnfr
in db_vnfrs
.values():
1079 vnfd
= find_in_list(
1080 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1082 vnf_params
= find_in_list(
1083 get_iterable(ns_params
, "vnf"),
1084 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1086 target_vnf
= deepcopy(vnfr
)
1087 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1088 for vld
in target_vnf
.get("vld", ()):
1089 # check if connected to a ns.vld, to fill target'
1090 vnf_cp
= find_in_list(
1091 vnfd
.get("int-virtual-link-desc", ()),
1092 lambda cpd
: cpd
.get("id") == vld
["id"],
1095 ns_cp
= "member_vnf:{}.{}".format(
1096 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1098 if cp2target
.get(ns_cp
):
1099 vld
["target"] = cp2target
[ns_cp
]
1102 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1104 # check if this network needs SDN assist
1106 if vld
.get("pci-interfaces"):
1107 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1108 sdnc_id
= db_vim
["config"].get("sdn-controller")
1110 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1111 target_sdn
= "sdn:{}".format(sdnc_id
)
1112 vld
["vim_info"][target_sdn
] = {
1114 "target_vim": target_vim
,
1116 "type": vld
.get("type"),
1119 # check at vnfd descriptor, if there is an ip-profile
1121 vnfd_vlp
= find_in_list(
1122 get_virtual_link_profiles(vnfd
),
1123 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1127 and vnfd_vlp
.get("virtual-link-protocol-data")
1128 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1130 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1133 ip_profile_dest_data
= {}
1134 if "ip-version" in ip_profile_source_data
:
1135 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1138 if "cidr" in ip_profile_source_data
:
1139 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1142 if "gateway-ip" in ip_profile_source_data
:
1143 ip_profile_dest_data
[
1145 ] = ip_profile_source_data
["gateway-ip"]
1146 if "dhcp-enabled" in ip_profile_source_data
:
1147 ip_profile_dest_data
["dhcp-params"] = {
1148 "enabled": ip_profile_source_data
["dhcp-enabled"]
1151 vld_params
["ip-profile"] = ip_profile_dest_data
1152 # update vld_params with instantiation params
1154 vld_instantiation_params
= find_in_list(
1155 get_iterable(vnf_params
, "internal-vld"),
1156 lambda i_vld
: i_vld
["name"] == vld
["id"],
1158 if vld_instantiation_params
:
1159 vld_params
.update(vld_instantiation_params
)
1160 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1163 for vdur
in target_vnf
.get("vdur", ()):
1164 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1165 continue # This vdu must not be created
1166 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1168 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1171 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1172 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1175 and vdu_configuration
.get("config-access")
1176 and vdu_configuration
.get("config-access").get("ssh-access")
1178 vdur
["ssh-keys"] = ssh_keys_all
1179 vdur
["ssh-access-required"] = vdu_configuration
[
1181 ]["ssh-access"]["required"]
1184 and vnf_configuration
.get("config-access")
1185 and vnf_configuration
.get("config-access").get("ssh-access")
1186 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1188 vdur
["ssh-keys"] = ssh_keys_all
1189 vdur
["ssh-access-required"] = vnf_configuration
[
1191 ]["ssh-access"]["required"]
1192 elif ssh_keys_instantiation
and find_in_list(
1193 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1195 vdur
["ssh-keys"] = ssh_keys_instantiation
1197 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1199 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1201 if vdud
.get("cloud-init-file"):
1202 vdur
["cloud-init"] = "{}:file:{}".format(
1203 vnfd
["_id"], vdud
.get("cloud-init-file")
1205 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1206 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1207 base_folder
= vnfd
["_admin"]["storage"]
1208 if base_folder
["pkg-dir"]:
1209 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1210 base_folder
["folder"],
1211 base_folder
["pkg-dir"],
1212 vdud
.get("cloud-init-file"),
1215 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1216 base_folder
["folder"],
1217 vdud
.get("cloud-init-file"),
1219 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1220 target
["cloud_init_content"][
1223 elif vdud
.get("cloud-init"):
1224 vdur
["cloud-init"] = "{}:vdu:{}".format(
1225 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1227 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1228 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1231 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1232 deploy_params_vdu
= self
._format
_additional
_params
(
1233 vdur
.get("additionalParams") or {}
1235 deploy_params_vdu
["OSM"] = get_osm_params(
1236 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1238 vdur
["additionalParams"] = deploy_params_vdu
1241 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1242 if target_vim
not in ns_flavor
["vim_info"]:
1243 ns_flavor
["vim_info"][target_vim
] = {}
1246 # in case alternative images are provided we must check if they should be applied
1247 # for the vim_type, modify the vim_type taking into account
1248 ns_image_id
= int(vdur
["ns-image-id"])
1249 if vdur
.get("alt-image-ids"):
1250 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1251 vim_type
= db_vim
["vim_type"]
1252 for alt_image_id
in vdur
.get("alt-image-ids"):
1253 ns_alt_image
= target
["image"][int(alt_image_id
)]
1254 if vim_type
== ns_alt_image
.get("vim-type"):
1255 # must use alternative image
1257 "use alternative image id: {}".format(alt_image_id
)
1259 ns_image_id
= alt_image_id
1260 vdur
["ns-image-id"] = ns_image_id
1262 ns_image
= target
["image"][int(ns_image_id
)]
1263 if target_vim
not in ns_image
["vim_info"]:
1264 ns_image
["vim_info"][target_vim
] = {}
1267 if vdur
.get("affinity-or-anti-affinity-group-id"):
1268 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1269 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1270 if target_vim
not in ns_ags
["vim_info"]:
1271 ns_ags
["vim_info"][target_vim
] = {}
1273 vdur
["vim_info"] = {target_vim
: {}}
1274 # instantiation parameters
1276 vdu_instantiation_params
= find_in_list(
1277 get_iterable(vnf_params
, "vdu"),
1278 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1280 if vdu_instantiation_params
:
1281 # Parse the vdu_volumes from the instantiation params
1282 vdu_volumes
= get_volumes_from_instantiation_params(
1283 vdu_instantiation_params
, vdud
1285 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1286 vdur_list
.append(vdur
)
1287 target_vnf
["vdur"] = vdur_list
1288 target
["vnf"].append(target_vnf
)
1290 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1291 desc
= await self
.RO
.deploy(nsr_id
, target
)
1292 self
.logger
.debug("RO return > {}".format(desc
))
1293 action_id
= desc
["action_id"]
1294 await self
._wait
_ng
_ro
(
1295 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1296 operation
="instantiation"
1301 "_admin.deployed.RO.operational-status": "running",
1302 "detailed-status": " ".join(stage
),
1304 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1305 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1306 self
._write
_op
_status
(nslcmop_id
, stage
)
1308 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1312 async def _wait_ng_ro(
1322 detailed_status_old
= None
1324 start_time
= start_time
or time()
1325 while time() <= start_time
+ timeout
:
1326 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1327 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1328 if desc_status
["status"] == "FAILED":
1329 raise NgRoException(desc_status
["details"])
1330 elif desc_status
["status"] == "BUILD":
1332 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1333 elif desc_status
["status"] == "DONE":
1335 stage
[2] = "Deployed at VIM"
1338 assert False, "ROclient.check_ns_status returns unknown {}".format(
1339 desc_status
["status"]
1341 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1342 detailed_status_old
= stage
[2]
1343 db_nsr_update
["detailed-status"] = " ".join(stage
)
1344 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1345 self
._write
_op
_status
(nslcmop_id
, stage
)
1346 await asyncio
.sleep(15, loop
=self
.loop
)
1347 else: # timeout_ns_deploy
1348 raise NgRoException("Timeout waiting ns to deploy")
1350 async def _terminate_ng_ro(
1351 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1356 start_deploy
= time()
1363 "action_id": nslcmop_id
,
1365 desc
= await self
.RO
.deploy(nsr_id
, target
)
1366 action_id
= desc
["action_id"]
1367 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1368 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1371 + "ns terminate action at RO. action_id={}".format(action_id
)
1375 delete_timeout
= 20 * 60 # 20 minutes
1376 await self
._wait
_ng
_ro
(
1377 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1378 operation
="termination"
1381 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1384 await self
.RO
.delete(nsr_id
)
1385 except Exception as e
:
1386 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1387 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1388 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1389 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1391 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1393 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1394 failed_detail
.append("delete conflict: {}".format(e
))
1397 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1400 failed_detail
.append("delete error: {}".format(e
))
1403 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1407 stage
[2] = "Error deleting from VIM"
1409 stage
[2] = "Deleted from VIM"
1410 db_nsr_update
["detailed-status"] = " ".join(stage
)
1411 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1412 self
._write
_op
_status
(nslcmop_id
, stage
)
1415 raise LcmException("; ".join(failed_detail
))
1418 async def instantiate_RO(
1432 :param logging_text: preffix text to use at logging
1433 :param nsr_id: nsr identity
1434 :param nsd: database content of ns descriptor
1435 :param db_nsr: database content of ns record
1436 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1438 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1439 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1440 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1441 :return: None or exception
1444 start_deploy
= time()
1445 ns_params
= db_nslcmop
.get("operationParams")
1446 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1447 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1449 timeout_ns_deploy
= self
.timeout
.get(
1450 "ns_deploy", self
.timeout_ns_deploy
1453 # Check for and optionally request placement optimization. Database will be updated if placement activated
1454 stage
[2] = "Waiting for Placement."
1455 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1456 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1457 for vnfr
in db_vnfrs
.values():
1458 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1461 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1463 return await self
._instantiate
_ng
_ro
(
1476 except Exception as e
:
1477 stage
[2] = "ERROR deploying at VIM"
1478 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1480 "Error deploying at VIM {}".format(e
),
1481 exc_info
=not isinstance(
1484 ROclient
.ROClientException
,
1493 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1495 Wait for kdu to be up, get ip address
1496 :param logging_text: prefix use for logging
1500 :return: IP address, K8s services
1503 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1506 while nb_tries
< 360:
1507 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1511 for x
in get_iterable(db_vnfr
, "kdur")
1512 if x
.get("kdu-name") == kdu_name
1518 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1520 if kdur
.get("status"):
1521 if kdur
["status"] in ("READY", "ENABLED"):
1522 return kdur
.get("ip-address"), kdur
.get("services")
1525 "target KDU={} is in error state".format(kdu_name
)
1528 await asyncio
.sleep(10, loop
=self
.loop
)
1530 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1532 async def wait_vm_up_insert_key_ro(
1533 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1536 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1537 :param logging_text: prefix use for logging
1542 :param pub_key: public ssh key to inject, None to skip
1543 :param user: user to apply the public ssh key
1547 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1551 target_vdu_id
= None
1557 if ro_retries
>= 360: # 1 hour
1559 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1562 await asyncio
.sleep(10, loop
=self
.loop
)
1565 if not target_vdu_id
:
1566 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1568 if not vdu_id
: # for the VNF case
1569 if db_vnfr
.get("status") == "ERROR":
1571 "Cannot inject ssh-key because target VNF is in error state"
1573 ip_address
= db_vnfr
.get("ip-address")
1579 for x
in get_iterable(db_vnfr
, "vdur")
1580 if x
.get("ip-address") == ip_address
1588 for x
in get_iterable(db_vnfr
, "vdur")
1589 if x
.get("vdu-id-ref") == vdu_id
1590 and x
.get("count-index") == vdu_index
1596 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1597 ): # If only one, this should be the target vdu
1598 vdur
= db_vnfr
["vdur"][0]
1601 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1602 vnfr_id
, vdu_id
, vdu_index
1605 # New generation RO stores information at "vim_info"
1608 if vdur
.get("vim_info"):
1610 t
for t
in vdur
["vim_info"]
1611 ) # there should be only one key
1612 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1614 vdur
.get("pdu-type")
1615 or vdur
.get("status") == "ACTIVE"
1616 or ng_ro_status
== "ACTIVE"
1618 ip_address
= vdur
.get("ip-address")
1621 target_vdu_id
= vdur
["vdu-id-ref"]
1622 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1624 "Cannot inject ssh-key because target VM is in error state"
1627 if not target_vdu_id
:
1630 # inject public key into machine
1631 if pub_key
and user
:
1632 self
.logger
.debug(logging_text
+ "Inserting RO key")
1633 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1634 if vdur
.get("pdu-type"):
1635 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1638 ro_vm_id
= "{}-{}".format(
1639 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1640 ) # TODO add vdu_index
1644 "action": "inject_ssh_key",
1648 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1650 desc
= await self
.RO
.deploy(nsr_id
, target
)
1651 action_id
= desc
["action_id"]
1652 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1655 # wait until NS is deployed at RO
1657 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1658 ro_nsr_id
= deep_get(
1659 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1663 result_dict
= await self
.RO
.create_action(
1665 item_id_name
=ro_nsr_id
,
1667 "add_public_key": pub_key
,
1672 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1673 if not result_dict
or not isinstance(result_dict
, dict):
1675 "Unknown response from RO when injecting key"
1677 for result
in result_dict
.values():
1678 if result
.get("vim_result") == 200:
1681 raise ROclient
.ROClientException(
1682 "error injecting key: {}".format(
1683 result
.get("description")
1687 except NgRoException
as e
:
1689 "Reaching max tries injecting key. Error: {}".format(e
)
1691 except ROclient
.ROClientException
as e
:
1695 + "error injecting key: {}. Retrying until {} seconds".format(
1702 "Reaching max tries injecting key. Error: {}".format(e
)
1709 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1711 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1713 my_vca
= vca_deployed_list
[vca_index
]
1714 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1715 # vdu or kdu: no dependencies
1719 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1720 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1721 configuration_status_list
= db_nsr
["configurationStatus"]
1722 for index
, vca_deployed
in enumerate(configuration_status_list
):
1723 if index
== vca_index
:
1726 if not my_vca
.get("member-vnf-index") or (
1727 vca_deployed
.get("member-vnf-index")
1728 == my_vca
.get("member-vnf-index")
1730 internal_status
= configuration_status_list
[index
].get("status")
1731 if internal_status
== "READY":
1733 elif internal_status
== "BROKEN":
1735 "Configuration aborted because dependent charm/s has failed"
1740 # no dependencies, return
1742 await asyncio
.sleep(10)
1745 raise LcmException("Configuration aborted because dependent charm/s timeout")
1747 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1750 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1752 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1753 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1756 async def instantiate_N2VC(
1773 ee_config_descriptor
,
1775 nsr_id
= db_nsr
["_id"]
1776 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1777 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1778 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1779 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1781 "collection": "nsrs",
1782 "filter": {"_id": nsr_id
},
1783 "path": db_update_entry
,
1789 element_under_configuration
= nsr_id
1793 vnfr_id
= db_vnfr
["_id"]
1794 osm_config
["osm"]["vnf_id"] = vnfr_id
1796 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1798 if vca_type
== "native_charm":
1801 index_number
= vdu_index
or 0
1804 element_type
= "VNF"
1805 element_under_configuration
= vnfr_id
1806 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1808 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1809 element_type
= "VDU"
1810 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1811 osm_config
["osm"]["vdu_id"] = vdu_id
1813 namespace
+= ".{}".format(kdu_name
)
1814 element_type
= "KDU"
1815 element_under_configuration
= kdu_name
1816 osm_config
["osm"]["kdu_name"] = kdu_name
1819 if base_folder
["pkg-dir"]:
1820 artifact_path
= "{}/{}/{}/{}".format(
1821 base_folder
["folder"],
1822 base_folder
["pkg-dir"],
1825 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1830 artifact_path
= "{}/Scripts/{}/{}/".format(
1831 base_folder
["folder"],
1834 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1839 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1841 # get initial_config_primitive_list that applies to this element
1842 initial_config_primitive_list
= config_descriptor
.get(
1843 "initial-config-primitive"
1847 "Initial config primitive list > {}".format(
1848 initial_config_primitive_list
1852 # add config if not present for NS charm
1853 ee_descriptor_id
= ee_config_descriptor
.get("id")
1854 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1855 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1856 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1860 "Initial config primitive list #2 > {}".format(
1861 initial_config_primitive_list
1864 # n2vc_redesign STEP 3.1
1865 # find old ee_id if exists
1866 ee_id
= vca_deployed
.get("ee_id")
1868 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1869 # create or register execution environment in VCA
1870 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1872 self
._write
_configuration
_status
(
1874 vca_index
=vca_index
,
1876 element_under_configuration
=element_under_configuration
,
1877 element_type
=element_type
,
1880 step
= "create execution environment"
1881 self
.logger
.debug(logging_text
+ step
)
1885 if vca_type
== "k8s_proxy_charm":
1886 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1887 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1888 namespace
=namespace
,
1889 artifact_path
=artifact_path
,
1893 elif vca_type
== "helm" or vca_type
== "helm-v3":
1894 ee_id
, credentials
= await self
.vca_map
[
1896 ].create_execution_environment(
1897 namespace
=namespace
,
1901 artifact_path
=artifact_path
,
1905 ee_id
, credentials
= await self
.vca_map
[
1907 ].create_execution_environment(
1908 namespace
=namespace
,
1914 elif vca_type
== "native_charm":
1915 step
= "Waiting to VM being up and getting IP address"
1916 self
.logger
.debug(logging_text
+ step
)
1917 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1926 credentials
= {"hostname": rw_mgmt_ip
}
1928 username
= deep_get(
1929 config_descriptor
, ("config-access", "ssh-access", "default-user")
1931 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1932 # merged. Meanwhile let's get username from initial-config-primitive
1933 if not username
and initial_config_primitive_list
:
1934 for config_primitive
in initial_config_primitive_list
:
1935 for param
in config_primitive
.get("parameter", ()):
1936 if param
["name"] == "ssh-username":
1937 username
= param
["value"]
1941 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1942 "'config-access.ssh-access.default-user'"
1944 credentials
["username"] = username
1945 # n2vc_redesign STEP 3.2
1947 self
._write
_configuration
_status
(
1949 vca_index
=vca_index
,
1950 status
="REGISTERING",
1951 element_under_configuration
=element_under_configuration
,
1952 element_type
=element_type
,
1955 step
= "register execution environment {}".format(credentials
)
1956 self
.logger
.debug(logging_text
+ step
)
1957 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1958 credentials
=credentials
,
1959 namespace
=namespace
,
1964 # for compatibility with MON/POL modules, the need model and application name at database
1965 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1966 ee_id_parts
= ee_id
.split(".")
1967 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1968 if len(ee_id_parts
) >= 2:
1969 model_name
= ee_id_parts
[0]
1970 application_name
= ee_id_parts
[1]
1971 db_nsr_update
[db_update_entry
+ "model"] = model_name
1972 db_nsr_update
[db_update_entry
+ "application"] = application_name
1974 # n2vc_redesign STEP 3.3
1975 step
= "Install configuration Software"
1977 self
._write
_configuration
_status
(
1979 vca_index
=vca_index
,
1980 status
="INSTALLING SW",
1981 element_under_configuration
=element_under_configuration
,
1982 element_type
=element_type
,
1983 other_update
=db_nsr_update
,
1986 # TODO check if already done
1987 self
.logger
.debug(logging_text
+ step
)
1989 if vca_type
== "native_charm":
1990 config_primitive
= next(
1991 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1994 if config_primitive
:
1995 config
= self
._map
_primitive
_params
(
1996 config_primitive
, {}, deploy_params
1999 if vca_type
== "lxc_proxy_charm":
2000 if element_type
== "NS":
2001 num_units
= db_nsr
.get("config-units") or 1
2002 elif element_type
== "VNF":
2003 num_units
= db_vnfr
.get("config-units") or 1
2004 elif element_type
== "VDU":
2005 for v
in db_vnfr
["vdur"]:
2006 if vdu_id
== v
["vdu-id-ref"]:
2007 num_units
= v
.get("config-units") or 1
2009 if vca_type
!= "k8s_proxy_charm":
2010 await self
.vca_map
[vca_type
].install_configuration_sw(
2012 artifact_path
=artifact_path
,
2015 num_units
=num_units
,
2020 # write in db flag of configuration_sw already installed
2022 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2025 # add relations for this VCA (wait for other peers related with this VCA)
2026 await self
._add
_vca
_relations
(
2027 logging_text
=logging_text
,
2030 vca_index
=vca_index
,
2033 # if SSH access is required, then get execution environment SSH public
2034 # if native charm we have waited already to VM be UP
2035 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2038 # self.logger.debug("get ssh key block")
2040 config_descriptor
, ("config-access", "ssh-access", "required")
2042 # self.logger.debug("ssh key needed")
2043 # Needed to inject a ssh key
2046 ("config-access", "ssh-access", "default-user"),
2048 step
= "Install configuration Software, getting public ssh key"
2049 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2050 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2053 step
= "Insert public key into VM user={} ssh_key={}".format(
2057 # self.logger.debug("no need to get ssh key")
2058 step
= "Waiting to VM being up and getting IP address"
2059 self
.logger
.debug(logging_text
+ step
)
2061 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2064 # n2vc_redesign STEP 5.1
2065 # wait for RO (ip-address) Insert pub_key into VM
2068 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2069 logging_text
, nsr_id
, vnfr_id
, kdu_name
2071 vnfd
= self
.db
.get_one(
2073 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2075 kdu
= get_kdu(vnfd
, kdu_name
)
2077 service
["name"] for service
in get_kdu_services(kdu
)
2079 exposed_services
= []
2080 for service
in services
:
2081 if any(s
in service
["name"] for s
in kdu_services
):
2082 exposed_services
.append(service
)
2083 await self
.vca_map
[vca_type
].exec_primitive(
2085 primitive_name
="config",
2087 "osm-config": json
.dumps(
2089 k8s
={"services": exposed_services
}
2096 # This verification is needed in order to avoid trying to add a public key
2097 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2098 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2099 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2101 elif db_vnfr
.get('vdur'):
2102 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2112 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2114 # store rw_mgmt_ip in deploy params for later replacement
2115 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2117 # n2vc_redesign STEP 6 Execute initial config primitive
2118 step
= "execute initial config primitive"
2120 # wait for dependent primitives execution (NS -> VNF -> VDU)
2121 if initial_config_primitive_list
:
2122 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2124 # stage, in function of element type: vdu, kdu, vnf or ns
2125 my_vca
= vca_deployed_list
[vca_index
]
2126 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2128 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2129 elif my_vca
.get("member-vnf-index"):
2131 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2134 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2136 self
._write
_configuration
_status
(
2137 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2140 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2142 check_if_terminated_needed
= True
2143 for initial_config_primitive
in initial_config_primitive_list
:
2144 # adding information on the vca_deployed if it is a NS execution environment
2145 if not vca_deployed
["member-vnf-index"]:
2146 deploy_params
["ns_config_info"] = json
.dumps(
2147 self
._get
_ns
_config
_info
(nsr_id
)
2149 # TODO check if already done
2150 primitive_params_
= self
._map
_primitive
_params
(
2151 initial_config_primitive
, {}, deploy_params
2154 step
= "execute primitive '{}' params '{}'".format(
2155 initial_config_primitive
["name"], primitive_params_
2157 self
.logger
.debug(logging_text
+ step
)
2158 await self
.vca_map
[vca_type
].exec_primitive(
2160 primitive_name
=initial_config_primitive
["name"],
2161 params_dict
=primitive_params_
,
2166 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2167 if check_if_terminated_needed
:
2168 if config_descriptor
.get("terminate-config-primitive"):
2170 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2172 check_if_terminated_needed
= False
2174 # TODO register in database that primitive is done
2176 # STEP 7 Configure metrics
2177 if vca_type
== "helm" or vca_type
== "helm-v3":
2178 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2180 artifact_path
=artifact_path
,
2181 ee_config_descriptor
=ee_config_descriptor
,
2184 target_ip
=rw_mgmt_ip
,
2190 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2193 for job
in prometheus_jobs
:
2196 {"job_name": job
["job_name"]},
2199 fail_on_empty
=False,
2202 step
= "instantiated at VCA"
2203 self
.logger
.debug(logging_text
+ step
)
2205 self
._write
_configuration
_status
(
2206 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2209 except Exception as e
: # TODO not use Exception but N2VC exception
2210 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2212 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2215 "Exception while {} : {}".format(step
, e
), exc_info
=True
2217 self
._write
_configuration
_status
(
2218 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2220 raise LcmException("{} {}".format(step
, e
)) from e
2222 def _write_ns_status(
2226 current_operation
: str,
2227 current_operation_id
: str,
2228 error_description
: str = None,
2229 error_detail
: str = None,
2230 other_update
: dict = None,
2233 Update db_nsr fields.
2236 :param current_operation:
2237 :param current_operation_id:
2238 :param error_description:
2239 :param error_detail:
2240 :param other_update: Other required changes at database if provided, will be cleared
2244 db_dict
= other_update
or {}
2247 ] = current_operation_id
# for backward compatibility
2248 db_dict
["_admin.current-operation"] = current_operation_id
2249 db_dict
["_admin.operation-type"] = (
2250 current_operation
if current_operation
!= "IDLE" else None
2252 db_dict
["currentOperation"] = current_operation
2253 db_dict
["currentOperationID"] = current_operation_id
2254 db_dict
["errorDescription"] = error_description
2255 db_dict
["errorDetail"] = error_detail
2258 db_dict
["nsState"] = ns_state
2259 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2260 except DbException
as e
:
2261 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2263 def _write_op_status(
2267 error_message
: str = None,
2268 queuePosition
: int = 0,
2269 operation_state
: str = None,
2270 other_update
: dict = None,
2273 db_dict
= other_update
or {}
2274 db_dict
["queuePosition"] = queuePosition
2275 if isinstance(stage
, list):
2276 db_dict
["stage"] = stage
[0]
2277 db_dict
["detailed-status"] = " ".join(stage
)
2278 elif stage
is not None:
2279 db_dict
["stage"] = str(stage
)
2281 if error_message
is not None:
2282 db_dict
["errorMessage"] = error_message
2283 if operation_state
is not None:
2284 db_dict
["operationState"] = operation_state
2285 db_dict
["statusEnteredTime"] = time()
2286 self
.update_db_2("nslcmops", op_id
, db_dict
)
2287 except DbException
as e
:
2289 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2292 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2294 nsr_id
= db_nsr
["_id"]
2295 # configurationStatus
2296 config_status
= db_nsr
.get("configurationStatus")
2299 "configurationStatus.{}.status".format(index
): status
2300 for index
, v
in enumerate(config_status
)
2304 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2306 except DbException
as e
:
2308 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2311 def _write_configuration_status(
2316 element_under_configuration
: str = None,
2317 element_type
: str = None,
2318 other_update
: dict = None,
2321 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2322 # .format(vca_index, status))
2325 db_path
= "configurationStatus.{}.".format(vca_index
)
2326 db_dict
= other_update
or {}
2328 db_dict
[db_path
+ "status"] = status
2329 if element_under_configuration
:
2331 db_path
+ "elementUnderConfiguration"
2332 ] = element_under_configuration
2334 db_dict
[db_path
+ "elementType"] = element_type
2335 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2336 except DbException
as e
:
2338 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2339 status
, nsr_id
, vca_index
, e
2343 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2345 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2346 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2347 Database is used because the result can be obtained from a different LCM worker in case of HA.
2348 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2349 :param db_nslcmop: database content of nslcmop
2350 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2351 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2352 computed 'vim-account-id'
2355 nslcmop_id
= db_nslcmop
["_id"]
2356 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2357 if placement_engine
== "PLA":
2359 logging_text
+ "Invoke and wait for placement optimization"
2361 await self
.msg
.aiowrite(
2362 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2364 db_poll_interval
= 5
2365 wait
= db_poll_interval
* 10
2367 while not pla_result
and wait
>= 0:
2368 await asyncio
.sleep(db_poll_interval
)
2369 wait
-= db_poll_interval
2370 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2371 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2375 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2378 for pla_vnf
in pla_result
["vnf"]:
2379 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2380 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2385 {"_id": vnfr
["_id"]},
2386 {"vim-account-id": pla_vnf
["vimAccountId"]},
2389 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2392 def update_nsrs_with_pla_result(self
, params
):
2394 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2396 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2398 except Exception as e
:
2399 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2401 async def instantiate(self
, nsr_id
, nslcmop_id
):
2404 :param nsr_id: ns instance to deploy
2405 :param nslcmop_id: operation to run
2409 # Try to lock HA task here
2410 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2411 if not task_is_locked_by_me
:
2413 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2417 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2418 self
.logger
.debug(logging_text
+ "Enter")
2420 # get all needed from database
2422 # database nsrs record
2425 # database nslcmops record
2428 # update operation on nsrs
2430 # update operation on nslcmops
2431 db_nslcmop_update
= {}
2433 nslcmop_operation_state
= None
2434 db_vnfrs
= {} # vnf's info indexed by member-index
2436 tasks_dict_info
= {} # from task to info text
2440 "Stage 1/5: preparation of the environment.",
2441 "Waiting for previous operations to terminate.",
2444 # ^ stage, step, VIM progress
2446 # wait for any previous tasks in process
2447 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2449 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2450 stage
[1] = "Reading from database."
2451 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2452 db_nsr_update
["detailed-status"] = "creating"
2453 db_nsr_update
["operational-status"] = "init"
2454 self
._write
_ns
_status
(
2456 ns_state
="BUILDING",
2457 current_operation
="INSTANTIATING",
2458 current_operation_id
=nslcmop_id
,
2459 other_update
=db_nsr_update
,
2461 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2463 # read from db: operation
2464 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2465 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2466 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2467 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2468 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2470 ns_params
= db_nslcmop
.get("operationParams")
2471 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2472 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2474 timeout_ns_deploy
= self
.timeout
.get(
2475 "ns_deploy", self
.timeout_ns_deploy
2479 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2480 self
.logger
.debug(logging_text
+ stage
[1])
2481 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2482 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2483 self
.logger
.debug(logging_text
+ stage
[1])
2484 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2485 self
.fs
.sync(db_nsr
["nsd-id"])
2487 # nsr_name = db_nsr["name"] # TODO short-name??
2489 # read from db: vnf's of this ns
2490 stage
[1] = "Getting vnfrs from db."
2491 self
.logger
.debug(logging_text
+ stage
[1])
2492 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2494 # read from db: vnfd's for every vnf
2495 db_vnfds
= [] # every vnfd data
2497 # for each vnf in ns, read vnfd
2498 for vnfr
in db_vnfrs_list
:
2499 if vnfr
.get("kdur"):
2501 for kdur
in vnfr
["kdur"]:
2502 if kdur
.get("additionalParams"):
2503 kdur
["additionalParams"] = json
.loads(
2504 kdur
["additionalParams"]
2506 kdur_list
.append(kdur
)
2507 vnfr
["kdur"] = kdur_list
2509 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2510 vnfd_id
= vnfr
["vnfd-id"]
2511 vnfd_ref
= vnfr
["vnfd-ref"]
2512 self
.fs
.sync(vnfd_id
)
2514 # if we haven't this vnfd, read it from db
2515 if vnfd_id
not in db_vnfds
:
2517 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2520 self
.logger
.debug(logging_text
+ stage
[1])
2521 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2524 db_vnfds
.append(vnfd
)
2526 # Get or generates the _admin.deployed.VCA list
2527 vca_deployed_list
= None
2528 if db_nsr
["_admin"].get("deployed"):
2529 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2530 if vca_deployed_list
is None:
2531 vca_deployed_list
= []
2532 configuration_status_list
= []
2533 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2534 db_nsr_update
["configurationStatus"] = configuration_status_list
2535 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2536 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2537 elif isinstance(vca_deployed_list
, dict):
2538 # maintain backward compatibility. Change a dict to list at database
2539 vca_deployed_list
= list(vca_deployed_list
.values())
2540 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2541 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2544 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2546 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2547 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2549 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2550 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2551 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2553 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2556 # n2vc_redesign STEP 2 Deploy Network Scenario
2557 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2558 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2560 stage
[1] = "Deploying KDUs."
2561 # self.logger.debug(logging_text + "Before deploy_kdus")
2562 # Call to deploy_kdus in case exists the "vdu:kdu" param
2563 await self
.deploy_kdus(
2564 logging_text
=logging_text
,
2566 nslcmop_id
=nslcmop_id
,
2569 task_instantiation_info
=tasks_dict_info
,
2572 stage
[1] = "Getting VCA public key."
2573 # n2vc_redesign STEP 1 Get VCA public ssh-key
2574 # feature 1429. Add n2vc public key to needed VMs
2575 n2vc_key
= self
.n2vc
.get_public_key()
2576 n2vc_key_list
= [n2vc_key
]
2577 if self
.vca_config
.get("public_key"):
2578 n2vc_key_list
.append(self
.vca_config
["public_key"])
2580 stage
[1] = "Deploying NS at VIM."
2581 task_ro
= asyncio
.ensure_future(
2582 self
.instantiate_RO(
2583 logging_text
=logging_text
,
2587 db_nslcmop
=db_nslcmop
,
2590 n2vc_key_list
=n2vc_key_list
,
2594 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2595 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2597 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2598 stage
[1] = "Deploying Execution Environments."
2599 self
.logger
.debug(logging_text
+ stage
[1])
2601 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2602 for vnf_profile
in get_vnf_profiles(nsd
):
2603 vnfd_id
= vnf_profile
["vnfd-id"]
2604 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2605 member_vnf_index
= str(vnf_profile
["id"])
2606 db_vnfr
= db_vnfrs
[member_vnf_index
]
2607 base_folder
= vnfd
["_admin"]["storage"]
2613 # Get additional parameters
2614 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2615 if db_vnfr
.get("additionalParamsForVnf"):
2616 deploy_params
.update(
2617 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2620 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2621 if descriptor_config
:
2623 logging_text
=logging_text
2624 + "member_vnf_index={} ".format(member_vnf_index
),
2627 nslcmop_id
=nslcmop_id
,
2633 member_vnf_index
=member_vnf_index
,
2634 vdu_index
=vdu_index
,
2636 deploy_params
=deploy_params
,
2637 descriptor_config
=descriptor_config
,
2638 base_folder
=base_folder
,
2639 task_instantiation_info
=tasks_dict_info
,
2643 # Deploy charms for each VDU that supports one.
2644 for vdud
in get_vdu_list(vnfd
):
2646 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2647 vdur
= find_in_list(
2648 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2651 if vdur
.get("additionalParams"):
2652 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2654 deploy_params_vdu
= deploy_params
2655 deploy_params_vdu
["OSM"] = get_osm_params(
2656 db_vnfr
, vdu_id
, vdu_count_index
=0
2658 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2660 self
.logger
.debug("VDUD > {}".format(vdud
))
2662 "Descriptor config > {}".format(descriptor_config
)
2664 if descriptor_config
:
2667 for vdu_index
in range(vdud_count
):
2668 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2670 logging_text
=logging_text
2671 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2672 member_vnf_index
, vdu_id
, vdu_index
2676 nslcmop_id
=nslcmop_id
,
2682 member_vnf_index
=member_vnf_index
,
2683 vdu_index
=vdu_index
,
2685 deploy_params
=deploy_params_vdu
,
2686 descriptor_config
=descriptor_config
,
2687 base_folder
=base_folder
,
2688 task_instantiation_info
=tasks_dict_info
,
2691 for kdud
in get_kdu_list(vnfd
):
2692 kdu_name
= kdud
["name"]
2693 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2694 if descriptor_config
:
2699 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2701 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2702 if kdur
.get("additionalParams"):
2703 deploy_params_kdu
.update(
2704 parse_yaml_strings(kdur
["additionalParams"].copy())
2708 logging_text
=logging_text
,
2711 nslcmop_id
=nslcmop_id
,
2717 member_vnf_index
=member_vnf_index
,
2718 vdu_index
=vdu_index
,
2720 deploy_params
=deploy_params_kdu
,
2721 descriptor_config
=descriptor_config
,
2722 base_folder
=base_folder
,
2723 task_instantiation_info
=tasks_dict_info
,
2727 # Check if this NS has a charm configuration
2728 descriptor_config
= nsd
.get("ns-configuration")
2729 if descriptor_config
and descriptor_config
.get("juju"):
2732 member_vnf_index
= None
2738 # Get additional parameters
2739 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2740 if db_nsr
.get("additionalParamsForNs"):
2741 deploy_params
.update(
2742 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2744 base_folder
= nsd
["_admin"]["storage"]
2746 logging_text
=logging_text
,
2749 nslcmop_id
=nslcmop_id
,
2755 member_vnf_index
=member_vnf_index
,
2756 vdu_index
=vdu_index
,
2758 deploy_params
=deploy_params
,
2759 descriptor_config
=descriptor_config
,
2760 base_folder
=base_folder
,
2761 task_instantiation_info
=tasks_dict_info
,
2765 # rest of staff will be done at finally
2768 ROclient
.ROClientException
,
2774 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2777 except asyncio
.CancelledError
:
2779 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2781 exc
= "Operation was cancelled"
2782 except Exception as e
:
2783 exc
= traceback
.format_exc()
2784 self
.logger
.critical(
2785 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2790 error_list
.append(str(exc
))
2792 # wait for pending tasks
2794 stage
[1] = "Waiting for instantiate pending tasks."
2795 self
.logger
.debug(logging_text
+ stage
[1])
2796 error_list
+= await self
._wait
_for
_tasks
(
2804 stage
[1] = stage
[2] = ""
2805 except asyncio
.CancelledError
:
2806 error_list
.append("Cancelled")
2807 # TODO cancel all tasks
2808 except Exception as exc
:
2809 error_list
.append(str(exc
))
2811 # update operation-status
2812 db_nsr_update
["operational-status"] = "running"
2813 # let's begin with VCA 'configured' status (later we can change it)
2814 db_nsr_update
["config-status"] = "configured"
2815 for task
, task_name
in tasks_dict_info
.items():
2816 if not task
.done() or task
.cancelled() or task
.exception():
2817 if task_name
.startswith(self
.task_name_deploy_vca
):
2818 # A N2VC task is pending
2819 db_nsr_update
["config-status"] = "failed"
2821 # RO or KDU task is pending
2822 db_nsr_update
["operational-status"] = "failed"
2824 # update status at database
2826 error_detail
= ". ".join(error_list
)
2827 self
.logger
.error(logging_text
+ error_detail
)
2828 error_description_nslcmop
= "{} Detail: {}".format(
2829 stage
[0], error_detail
2831 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2832 nslcmop_id
, stage
[0]
2835 db_nsr_update
["detailed-status"] = (
2836 error_description_nsr
+ " Detail: " + error_detail
2838 db_nslcmop_update
["detailed-status"] = error_detail
2839 nslcmop_operation_state
= "FAILED"
2843 error_description_nsr
= error_description_nslcmop
= None
2845 db_nsr_update
["detailed-status"] = "Done"
2846 db_nslcmop_update
["detailed-status"] = "Done"
2847 nslcmop_operation_state
= "COMPLETED"
2850 self
._write
_ns
_status
(
2853 current_operation
="IDLE",
2854 current_operation_id
=None,
2855 error_description
=error_description_nsr
,
2856 error_detail
=error_detail
,
2857 other_update
=db_nsr_update
,
2859 self
._write
_op
_status
(
2862 error_message
=error_description_nslcmop
,
2863 operation_state
=nslcmop_operation_state
,
2864 other_update
=db_nslcmop_update
,
2867 if nslcmop_operation_state
:
2869 await self
.msg
.aiowrite(
2874 "nslcmop_id": nslcmop_id
,
2875 "operationState": nslcmop_operation_state
,
2879 except Exception as e
:
2881 logging_text
+ "kafka_write notification Exception {}".format(e
)
2884 self
.logger
.debug(logging_text
+ "Exit")
2885 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2887 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2888 if vnfd_id
not in cached_vnfds
:
2889 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
})
2890 return cached_vnfds
[vnfd_id
]
2892 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2893 if vnf_profile_id
not in cached_vnfrs
:
2894 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2897 "member-vnf-index-ref": vnf_profile_id
,
2898 "nsr-id-ref": nsr_id
,
2901 return cached_vnfrs
[vnf_profile_id
]
2903 def _is_deployed_vca_in_relation(
2904 self
, vca
: DeployedVCA
, relation
: Relation
2907 for endpoint
in (relation
.provider
, relation
.requirer
):
2908 if endpoint
["kdu-resource-profile-id"]:
2911 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2912 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2913 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2919 def _update_ee_relation_data_with_implicit_data(
2920 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2922 ee_relation_data
= safe_get_ee_relation(
2923 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2925 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2926 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2927 "execution-environment-ref"
2929 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2930 vnfd_id
= vnf_profile
["vnfd-id"]
2931 project
= nsd
["_admin"]["projects_read"][0]
2932 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2935 if ee_relation_level
== EELevel
.VNF
2936 else ee_relation_data
["vdu-profile-id"]
2938 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2941 f
"not execution environments found for ee_relation {ee_relation_data}"
2943 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2944 return ee_relation_data
2946 def _get_ns_relations(
2949 nsd
: Dict
[str, Any
],
2951 cached_vnfds
: Dict
[str, Any
],
2952 ) -> List
[Relation
]:
2954 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2955 for r
in db_ns_relations
:
2956 provider_dict
= None
2957 requirer_dict
= None
2958 if all(key
in r
for key
in ("provider", "requirer")):
2959 provider_dict
= r
["provider"]
2960 requirer_dict
= r
["requirer"]
2961 elif "entities" in r
:
2962 provider_id
= r
["entities"][0]["id"]
2965 "endpoint": r
["entities"][0]["endpoint"],
2967 if provider_id
!= nsd
["id"]:
2968 provider_dict
["vnf-profile-id"] = provider_id
2969 requirer_id
= r
["entities"][1]["id"]
2972 "endpoint": r
["entities"][1]["endpoint"],
2974 if requirer_id
!= nsd
["id"]:
2975 requirer_dict
["vnf-profile-id"] = requirer_id
2978 "provider/requirer or entities must be included in the relation."
2980 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2981 nsr_id
, nsd
, provider_dict
, cached_vnfds
2983 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2984 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2986 provider
= EERelation(relation_provider
)
2987 requirer
= EERelation(relation_requirer
)
2988 relation
= Relation(r
["name"], provider
, requirer
)
2989 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2991 relations
.append(relation
)
2994 def _get_vnf_relations(
2997 nsd
: Dict
[str, Any
],
2999 cached_vnfds
: Dict
[str, Any
],
3000 ) -> List
[Relation
]:
3002 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3003 vnf_profile_id
= vnf_profile
["id"]
3004 vnfd_id
= vnf_profile
["vnfd-id"]
3005 project
= nsd
["_admin"]["projects_read"][0]
3006 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3007 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3008 for r
in db_vnf_relations
:
3009 provider_dict
= None
3010 requirer_dict
= None
3011 if all(key
in r
for key
in ("provider", "requirer")):
3012 provider_dict
= r
["provider"]
3013 requirer_dict
= r
["requirer"]
3014 elif "entities" in r
:
3015 provider_id
= r
["entities"][0]["id"]
3018 "vnf-profile-id": vnf_profile_id
,
3019 "endpoint": r
["entities"][0]["endpoint"],
3021 if provider_id
!= vnfd_id
:
3022 provider_dict
["vdu-profile-id"] = provider_id
3023 requirer_id
= r
["entities"][1]["id"]
3026 "vnf-profile-id": vnf_profile_id
,
3027 "endpoint": r
["entities"][1]["endpoint"],
3029 if requirer_id
!= vnfd_id
:
3030 requirer_dict
["vdu-profile-id"] = requirer_id
3033 "provider/requirer or entities must be included in the relation."
3035 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3036 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3038 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3039 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3041 provider
= EERelation(relation_provider
)
3042 requirer
= EERelation(relation_requirer
)
3043 relation
= Relation(r
["name"], provider
, requirer
)
3044 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3046 relations
.append(relation
)
3049 def _get_kdu_resource_data(
3051 ee_relation
: EERelation
,
3052 db_nsr
: Dict
[str, Any
],
3053 cached_vnfds
: Dict
[str, Any
],
3054 ) -> DeployedK8sResource
:
3055 nsd
= get_nsd(db_nsr
)
3056 vnf_profiles
= get_vnf_profiles(nsd
)
3057 vnfd_id
= find_in_list(
3059 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3061 project
= nsd
["_admin"]["projects_read"][0]
3062 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3063 kdu_resource_profile
= get_kdu_resource_profile(
3064 db_vnfd
, ee_relation
.kdu_resource_profile_id
3066 kdu_name
= kdu_resource_profile
["kdu-name"]
3067 deployed_kdu
, _
= get_deployed_kdu(
3068 db_nsr
.get("_admin", ()).get("deployed", ()),
3070 ee_relation
.vnf_profile_id
,
3072 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3075 def _get_deployed_component(
3077 ee_relation
: EERelation
,
3078 db_nsr
: Dict
[str, Any
],
3079 cached_vnfds
: Dict
[str, Any
],
3080 ) -> DeployedComponent
:
3081 nsr_id
= db_nsr
["_id"]
3082 deployed_component
= None
3083 ee_level
= EELevel
.get_level(ee_relation
)
3084 if ee_level
== EELevel
.NS
:
3085 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3087 deployed_component
= DeployedVCA(nsr_id
, vca
)
3088 elif ee_level
== EELevel
.VNF
:
3089 vca
= get_deployed_vca(
3093 "member-vnf-index": ee_relation
.vnf_profile_id
,
3094 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3098 deployed_component
= DeployedVCA(nsr_id
, vca
)
3099 elif ee_level
== EELevel
.VDU
:
3100 vca
= get_deployed_vca(
3103 "vdu_id": ee_relation
.vdu_profile_id
,
3104 "member-vnf-index": ee_relation
.vnf_profile_id
,
3105 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3109 deployed_component
= DeployedVCA(nsr_id
, vca
)
3110 elif ee_level
== EELevel
.KDU
:
3111 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3112 ee_relation
, db_nsr
, cached_vnfds
3114 if kdu_resource_data
:
3115 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3116 return deployed_component
3118 async def _add_relation(
3122 db_nsr
: Dict
[str, Any
],
3123 cached_vnfds
: Dict
[str, Any
],
3124 cached_vnfrs
: Dict
[str, Any
],
3126 deployed_provider
= self
._get
_deployed
_component
(
3127 relation
.provider
, db_nsr
, cached_vnfds
3129 deployed_requirer
= self
._get
_deployed
_component
(
3130 relation
.requirer
, db_nsr
, cached_vnfds
3134 and deployed_requirer
3135 and deployed_provider
.config_sw_installed
3136 and deployed_requirer
.config_sw_installed
3138 provider_db_vnfr
= (
3140 relation
.provider
.nsr_id
,
3141 relation
.provider
.vnf_profile_id
,
3144 if relation
.provider
.vnf_profile_id
3147 requirer_db_vnfr
= (
3149 relation
.requirer
.nsr_id
,
3150 relation
.requirer
.vnf_profile_id
,
3153 if relation
.requirer
.vnf_profile_id
3156 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3157 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3158 provider_relation_endpoint
= RelationEndpoint(
3159 deployed_provider
.ee_id
,
3161 relation
.provider
.endpoint
,
3163 requirer_relation_endpoint
= RelationEndpoint(
3164 deployed_requirer
.ee_id
,
3166 relation
.requirer
.endpoint
,
3168 await self
.vca_map
[vca_type
].add_relation(
3169 provider
=provider_relation_endpoint
,
3170 requirer
=requirer_relation_endpoint
,
3172 # remove entry from relations list
3176 async def _add_vca_relations(
3182 timeout
: int = 3600,
3186 # 1. find all relations for this VCA
3187 # 2. wait for other peers related
3191 # STEP 1: find all relations for this VCA
3194 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3195 nsd
= get_nsd(db_nsr
)
3198 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3199 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3204 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3205 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3207 # if no relations, terminate
3209 self
.logger
.debug(logging_text
+ " No relations")
3212 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3219 if now
- start
>= timeout
:
3220 self
.logger
.error(logging_text
+ " : timeout adding relations")
3223 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3224 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3226 # for each relation, find the VCA's related
3227 for relation
in relations
.copy():
3228 added
= await self
._add
_relation
(
3236 relations
.remove(relation
)
3239 self
.logger
.debug("Relations added")
3241 await asyncio
.sleep(5.0)
3245 except Exception as e
:
3246 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3249 async def _install_kdu(
3257 k8s_instance_info
: dict,
3258 k8params
: dict = None,
3264 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3267 "collection": "nsrs",
3268 "filter": {"_id": nsr_id
},
3269 "path": nsr_db_path
,
3272 if k8s_instance_info
.get("kdu-deployment-name"):
3273 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3275 kdu_instance
= self
.k8scluster_map
[
3277 ].generate_kdu_instance_name(
3278 db_dict
=db_dict_install
,
3279 kdu_model
=k8s_instance_info
["kdu-model"],
3280 kdu_name
=k8s_instance_info
["kdu-name"],
3283 # Update the nsrs table with the kdu-instance value
3287 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3290 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3291 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3292 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3293 # namespace, this first verification could be removed, and the next step would be done for any kind
3295 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3296 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3297 if k8sclustertype
in ("juju", "juju-bundle"):
3298 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3299 # that the user passed a namespace which he wants its KDU to be deployed in)
3305 "_admin.projects_write": k8s_instance_info
["namespace"],
3306 "_admin.projects_read": k8s_instance_info
["namespace"],
3312 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3317 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3319 k8s_instance_info
["namespace"] = kdu_instance
3321 await self
.k8scluster_map
[k8sclustertype
].install(
3322 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3323 kdu_model
=k8s_instance_info
["kdu-model"],
3326 db_dict
=db_dict_install
,
3328 kdu_name
=k8s_instance_info
["kdu-name"],
3329 namespace
=k8s_instance_info
["namespace"],
3330 kdu_instance
=kdu_instance
,
3334 # Obtain services to obtain management service ip
3335 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3336 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3337 kdu_instance
=kdu_instance
,
3338 namespace
=k8s_instance_info
["namespace"],
3341 # Obtain management service info (if exists)
3342 vnfr_update_dict
= {}
3343 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3345 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3350 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3353 for service
in kdud
.get("service", [])
3354 if service
.get("mgmt-service")
3356 for mgmt_service
in mgmt_services
:
3357 for service
in services
:
3358 if service
["name"].startswith(mgmt_service
["name"]):
3359 # Mgmt service found, Obtain service ip
3360 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3361 if isinstance(ip
, list) and len(ip
) == 1:
3365 "kdur.{}.ip-address".format(kdu_index
)
3368 # Check if must update also mgmt ip at the vnf
3369 service_external_cp
= mgmt_service
.get(
3370 "external-connection-point-ref"
3372 if service_external_cp
:
3374 deep_get(vnfd
, ("mgmt-interface", "cp"))
3375 == service_external_cp
3377 vnfr_update_dict
["ip-address"] = ip
3382 "external-connection-point-ref", ""
3384 == service_external_cp
,
3387 "kdur.{}.ip-address".format(kdu_index
)
3392 "Mgmt service name: {} not found".format(
3393 mgmt_service
["name"]
3397 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3398 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3400 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3403 and kdu_config
.get("initial-config-primitive")
3404 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3406 initial_config_primitive_list
= kdu_config
.get(
3407 "initial-config-primitive"
3409 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3411 for initial_config_primitive
in initial_config_primitive_list
:
3412 primitive_params_
= self
._map
_primitive
_params
(
3413 initial_config_primitive
, {}, {}
3416 await asyncio
.wait_for(
3417 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3418 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3419 kdu_instance
=kdu_instance
,
3420 primitive_name
=initial_config_primitive
["name"],
3421 params
=primitive_params_
,
3422 db_dict
=db_dict_install
,
3428 except Exception as e
:
3429 # Prepare update db with error and raise exception
3432 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3436 vnfr_data
.get("_id"),
3437 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3440 # ignore to keep original exception
3442 # reraise original error
3447 async def deploy_kdus(
3454 task_instantiation_info
,
3456 # Launch kdus if present in the descriptor
3458 k8scluster_id_2_uuic
= {
3459 "helm-chart-v3": {},
3464 async def _get_cluster_id(cluster_id
, cluster_type
):
3465 nonlocal k8scluster_id_2_uuic
3466 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3467 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3469 # check if K8scluster is creating and wait look if previous tasks in process
3470 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3471 "k8scluster", cluster_id
3474 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3475 task_name
, cluster_id
3477 self
.logger
.debug(logging_text
+ text
)
3478 await asyncio
.wait(task_dependency
, timeout
=3600)
3480 db_k8scluster
= self
.db
.get_one(
3481 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3483 if not db_k8scluster
:
3484 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3486 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3488 if cluster_type
== "helm-chart-v3":
3490 # backward compatibility for existing clusters that have not been initialized for helm v3
3491 k8s_credentials
= yaml
.safe_dump(
3492 db_k8scluster
.get("credentials")
3494 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3495 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3497 db_k8scluster_update
= {}
3498 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3499 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3500 db_k8scluster_update
[
3501 "_admin.helm-chart-v3.created"
3503 db_k8scluster_update
[
3504 "_admin.helm-chart-v3.operationalState"
3507 "k8sclusters", cluster_id
, db_k8scluster_update
3509 except Exception as e
:
3512 + "error initializing helm-v3 cluster: {}".format(str(e
))
3515 "K8s cluster '{}' has not been initialized for '{}'".format(
3516 cluster_id
, cluster_type
3521 "K8s cluster '{}' has not been initialized for '{}'".format(
3522 cluster_id
, cluster_type
3525 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3528 logging_text
+= "Deploy kdus: "
3531 db_nsr_update
= {"_admin.deployed.K8s": []}
3532 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3535 updated_cluster_list
= []
3536 updated_v3_cluster_list
= []
3538 for vnfr_data
in db_vnfrs
.values():
3539 vca_id
= self
.get_vca_id(vnfr_data
, {})
3540 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3541 # Step 0: Prepare and set parameters
3542 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3543 vnfd_id
= vnfr_data
.get("vnfd-id")
3544 vnfd_with_id
= find_in_list(
3545 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3549 for kdud
in vnfd_with_id
["kdu"]
3550 if kdud
["name"] == kdur
["kdu-name"]
3552 namespace
= kdur
.get("k8s-namespace")
3553 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3554 if kdur
.get("helm-chart"):
3555 kdumodel
= kdur
["helm-chart"]
3556 # Default version: helm3, if helm-version is v2 assign v2
3557 k8sclustertype
= "helm-chart-v3"
3558 self
.logger
.debug("kdur: {}".format(kdur
))
3560 kdur
.get("helm-version")
3561 and kdur
.get("helm-version") == "v2"
3563 k8sclustertype
= "helm-chart"
3564 elif kdur
.get("juju-bundle"):
3565 kdumodel
= kdur
["juju-bundle"]
3566 k8sclustertype
= "juju-bundle"
3569 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3570 "juju-bundle. Maybe an old NBI version is running".format(
3571 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3574 # check if kdumodel is a file and exists
3576 vnfd_with_id
= find_in_list(
3577 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3579 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3580 if storage
: # may be not present if vnfd has not artifacts
3581 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3582 if storage
["pkg-dir"]:
3583 filename
= "{}/{}/{}s/{}".format(
3590 filename
= "{}/Scripts/{}s/{}".format(
3595 if self
.fs
.file_exists(
3596 filename
, mode
="file"
3597 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3598 kdumodel
= self
.fs
.path
+ filename
3599 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3601 except Exception: # it is not a file
3604 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3605 step
= "Synchronize repos for k8s cluster '{}'".format(
3608 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3612 k8sclustertype
== "helm-chart"
3613 and cluster_uuid
not in updated_cluster_list
3615 k8sclustertype
== "helm-chart-v3"
3616 and cluster_uuid
not in updated_v3_cluster_list
3618 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3619 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3620 cluster_uuid
=cluster_uuid
3623 if del_repo_list
or added_repo_dict
:
3624 if k8sclustertype
== "helm-chart":
3626 "_admin.helm_charts_added." + item
: None
3627 for item
in del_repo_list
3630 "_admin.helm_charts_added." + item
: name
3631 for item
, name
in added_repo_dict
.items()
3633 updated_cluster_list
.append(cluster_uuid
)
3634 elif k8sclustertype
== "helm-chart-v3":
3636 "_admin.helm_charts_v3_added." + item
: None
3637 for item
in del_repo_list
3640 "_admin.helm_charts_v3_added." + item
: name
3641 for item
, name
in added_repo_dict
.items()
3643 updated_v3_cluster_list
.append(cluster_uuid
)
3645 logging_text
+ "repos synchronized on k8s cluster "
3646 "'{}' to_delete: {}, to_add: {}".format(
3647 k8s_cluster_id
, del_repo_list
, added_repo_dict
3652 {"_id": k8s_cluster_id
},
3658 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3659 vnfr_data
["member-vnf-index-ref"],
3663 k8s_instance_info
= {
3664 "kdu-instance": None,
3665 "k8scluster-uuid": cluster_uuid
,
3666 "k8scluster-type": k8sclustertype
,
3667 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3668 "kdu-name": kdur
["kdu-name"],
3669 "kdu-model": kdumodel
,
3670 "namespace": namespace
,
3671 "kdu-deployment-name": kdu_deployment_name
,
3673 db_path
= "_admin.deployed.K8s.{}".format(index
)
3674 db_nsr_update
[db_path
] = k8s_instance_info
3675 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3676 vnfd_with_id
= find_in_list(
3677 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3679 task
= asyncio
.ensure_future(
3688 k8params
=desc_params
,
3693 self
.lcm_tasks
.register(
3697 "instantiate_KDU-{}".format(index
),
3700 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3706 except (LcmException
, asyncio
.CancelledError
):
3708 except Exception as e
:
3709 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3710 if isinstance(e
, (N2VCException
, DbException
)):
3711 self
.logger
.error(logging_text
+ msg
)
3713 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3714 raise LcmException(msg
)
3717 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3736 task_instantiation_info
,
3739 # launch instantiate_N2VC in a asyncio task and register task object
3740 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3741 # if not found, create one entry and update database
3742 # fill db_nsr._admin.deployed.VCA.<index>
3745 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3747 if "execution-environment-list" in descriptor_config
:
3748 ee_list
= descriptor_config
.get("execution-environment-list", [])
3749 elif "juju" in descriptor_config
:
3750 ee_list
= [descriptor_config
] # ns charms
3751 else: # other types as script are not supported
3754 for ee_item
in ee_list
:
3757 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3758 ee_item
.get("juju"), ee_item
.get("helm-chart")
3761 ee_descriptor_id
= ee_item
.get("id")
3762 if ee_item
.get("juju"):
3763 vca_name
= ee_item
["juju"].get("charm")
3766 if ee_item
["juju"].get("charm") is not None
3769 if ee_item
["juju"].get("cloud") == "k8s":
3770 vca_type
= "k8s_proxy_charm"
3771 elif ee_item
["juju"].get("proxy") is False:
3772 vca_type
= "native_charm"
3773 elif ee_item
.get("helm-chart"):
3774 vca_name
= ee_item
["helm-chart"]
3775 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3778 vca_type
= "helm-v3"
3781 logging_text
+ "skipping non juju neither charm configuration"
3786 for vca_index
, vca_deployed
in enumerate(
3787 db_nsr
["_admin"]["deployed"]["VCA"]
3789 if not vca_deployed
:
3792 vca_deployed
.get("member-vnf-index") == member_vnf_index
3793 and vca_deployed
.get("vdu_id") == vdu_id
3794 and vca_deployed
.get("kdu_name") == kdu_name
3795 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3796 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3800 # not found, create one.
3802 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3805 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3807 target
+= "/kdu/{}".format(kdu_name
)
3809 "target_element": target
,
3810 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3811 "member-vnf-index": member_vnf_index
,
3813 "kdu_name": kdu_name
,
3814 "vdu_count_index": vdu_index
,
3815 "operational-status": "init", # TODO revise
3816 "detailed-status": "", # TODO revise
3817 "step": "initial-deploy", # TODO revise
3819 "vdu_name": vdu_name
,
3821 "ee_descriptor_id": ee_descriptor_id
,
3825 # create VCA and configurationStatus in db
3827 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3828 "configurationStatus.{}".format(vca_index
): dict(),
3830 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3832 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3834 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3835 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3836 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3839 task_n2vc
= asyncio
.ensure_future(
3840 self
.instantiate_N2VC(
3841 logging_text
=logging_text
,
3842 vca_index
=vca_index
,
3848 vdu_index
=vdu_index
,
3849 deploy_params
=deploy_params
,
3850 config_descriptor
=descriptor_config
,
3851 base_folder
=base_folder
,
3852 nslcmop_id
=nslcmop_id
,
3856 ee_config_descriptor
=ee_item
,
3859 self
.lcm_tasks
.register(
3863 "instantiate_N2VC-{}".format(vca_index
),
3866 task_instantiation_info
[
3868 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3869 member_vnf_index
or "", vdu_id
or ""
3873 def _create_nslcmop(nsr_id
, operation
, params
):
3875 Creates a ns-lcm-opp content to be stored at database.
3876 :param nsr_id: internal id of the instance
3877 :param operation: instantiate, terminate, scale, action, ...
3878 :param params: user parameters for the operation
3879 :return: dictionary following SOL005 format
3881 # Raise exception if invalid arguments
3882 if not (nsr_id
and operation
and params
):
3884 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3891 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3892 "operationState": "PROCESSING",
3893 "statusEnteredTime": now
,
3894 "nsInstanceId": nsr_id
,
3895 "lcmOperationType": operation
,
3897 "isAutomaticInvocation": False,
3898 "operationParams": params
,
3899 "isCancelPending": False,
3901 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3902 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3907 def _format_additional_params(self
, params
):
3908 params
= params
or {}
3909 for key
, value
in params
.items():
3910 if str(value
).startswith("!!yaml "):
3911 params
[key
] = yaml
.safe_load(value
[7:])
3914 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3915 primitive
= seq
.get("name")
3916 primitive_params
= {}
3918 "member_vnf_index": vnf_index
,
3919 "primitive": primitive
,
3920 "primitive_params": primitive_params
,
3923 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3927 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3928 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3929 if op
.get("operationState") == "COMPLETED":
3930 # b. Skip sub-operation
3931 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3932 return self
.SUBOPERATION_STATUS_SKIP
3934 # c. retry executing sub-operation
3935 # The sub-operation exists, and operationState != 'COMPLETED'
3936 # Update operationState = 'PROCESSING' to indicate a retry.
3937 operationState
= "PROCESSING"
3938 detailed_status
= "In progress"
3939 self
._update
_suboperation
_status
(
3940 db_nslcmop
, op_index
, operationState
, detailed_status
3942 # Return the sub-operation index
3943 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3944 # with arguments extracted from the sub-operation
3947 # Find a sub-operation where all keys in a matching dictionary must match
3948 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3949 def _find_suboperation(self
, db_nslcmop
, match
):
3950 if db_nslcmop
and match
:
3951 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3952 for i
, op
in enumerate(op_list
):
3953 if all(op
.get(k
) == match
[k
] for k
in match
):
3955 return self
.SUBOPERATION_STATUS_NOT_FOUND
3957 # Update status for a sub-operation given its index
3958 def _update_suboperation_status(
3959 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3961 # Update DB for HA tasks
3962 q_filter
= {"_id": db_nslcmop
["_id"]}
3964 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3965 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3968 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3971 # Add sub-operation, return the index of the added sub-operation
3972 # Optionally, set operationState, detailed-status, and operationType
3973 # Status and type are currently set for 'scale' sub-operations:
3974 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3975 # 'detailed-status' : status message
3976 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3977 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3978 def _add_suboperation(
3986 mapped_primitive_params
,
3987 operationState
=None,
3988 detailed_status
=None,
3991 RO_scaling_info
=None,
3994 return self
.SUBOPERATION_STATUS_NOT_FOUND
3995 # Get the "_admin.operations" list, if it exists
3996 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3997 op_list
= db_nslcmop_admin
.get("operations")
3998 # Create or append to the "_admin.operations" list
4000 "member_vnf_index": vnf_index
,
4002 "vdu_count_index": vdu_count_index
,
4003 "primitive": primitive
,
4004 "primitive_params": mapped_primitive_params
,
4007 new_op
["operationState"] = operationState
4009 new_op
["detailed-status"] = detailed_status
4011 new_op
["lcmOperationType"] = operationType
4013 new_op
["RO_nsr_id"] = RO_nsr_id
4015 new_op
["RO_scaling_info"] = RO_scaling_info
4017 # No existing operations, create key 'operations' with current operation as first list element
4018 db_nslcmop_admin
.update({"operations": [new_op
]})
4019 op_list
= db_nslcmop_admin
.get("operations")
4021 # Existing operations, append operation to list
4022 op_list
.append(new_op
)
4024 db_nslcmop_update
= {"_admin.operations": op_list
}
4025 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4026 op_index
= len(op_list
) - 1
4029 # Helper methods for scale() sub-operations
4031 # pre-scale/post-scale:
4032 # Check for 3 different cases:
4033 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4034 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4035 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4036 def _check_or_add_scale_suboperation(
4040 vnf_config_primitive
,
4044 RO_scaling_info
=None,
4046 # Find this sub-operation
4047 if RO_nsr_id
and RO_scaling_info
:
4048 operationType
= "SCALE-RO"
4050 "member_vnf_index": vnf_index
,
4051 "RO_nsr_id": RO_nsr_id
,
4052 "RO_scaling_info": RO_scaling_info
,
4056 "member_vnf_index": vnf_index
,
4057 "primitive": vnf_config_primitive
,
4058 "primitive_params": primitive_params
,
4059 "lcmOperationType": operationType
,
4061 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4062 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4063 # a. New sub-operation
4064 # The sub-operation does not exist, add it.
4065 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4066 # The following parameters are set to None for all kind of scaling:
4068 vdu_count_index
= None
4070 if RO_nsr_id
and RO_scaling_info
:
4071 vnf_config_primitive
= None
4072 primitive_params
= None
4075 RO_scaling_info
= None
4076 # Initial status for sub-operation
4077 operationState
= "PROCESSING"
4078 detailed_status
= "In progress"
4079 # Add sub-operation for pre/post-scaling (zero or more operations)
4080 self
._add
_suboperation
(
4086 vnf_config_primitive
,
4094 return self
.SUBOPERATION_STATUS_NEW
4096 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4097 # or op_index (operationState != 'COMPLETED')
4098 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4100 # Function to return execution_environment id
4102 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4103 # TODO vdu_index_count
4104 for vca
in vca_deployed_list
:
4105 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4108 async def destroy_N2VC(
4116 exec_primitives
=True,
4121 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4122 :param logging_text:
4124 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4125 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4126 :param vca_index: index in the database _admin.deployed.VCA
4127 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4128 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4129 not executed properly
4130 :param scaling_in: True destroys the application, False destroys the model
4131 :return: None or exception
4136 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4137 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4141 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4143 # execute terminate_primitives
4145 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4146 config_descriptor
.get("terminate-config-primitive"),
4147 vca_deployed
.get("ee_descriptor_id"),
4149 vdu_id
= vca_deployed
.get("vdu_id")
4150 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4151 vdu_name
= vca_deployed
.get("vdu_name")
4152 vnf_index
= vca_deployed
.get("member-vnf-index")
4153 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4154 for seq
in terminate_primitives
:
4155 # For each sequence in list, get primitive and call _ns_execute_primitive()
4156 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4157 vnf_index
, seq
.get("name")
4159 self
.logger
.debug(logging_text
+ step
)
4160 # Create the primitive for each sequence, i.e. "primitive": "touch"
4161 primitive
= seq
.get("name")
4162 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4167 self
._add
_suboperation
(
4174 mapped_primitive_params
,
4176 # Sub-operations: Call _ns_execute_primitive() instead of action()
4178 result
, result_detail
= await self
._ns
_execute
_primitive
(
4179 vca_deployed
["ee_id"],
4181 mapped_primitive_params
,
4185 except LcmException
:
4186 # this happens when VCA is not deployed. In this case it is not needed to terminate
4188 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4189 if result
not in result_ok
:
4191 "terminate_primitive {} for vnf_member_index={} fails with "
4192 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4194 # set that this VCA do not need terminated
4195 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4199 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4202 # Delete Prometheus Jobs if any
4203 # This uses NSR_ID, so it will destroy any jobs under this index
4204 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4207 await self
.vca_map
[vca_type
].delete_execution_environment(
4208 vca_deployed
["ee_id"],
4209 scaling_in
=scaling_in
,
4214 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4215 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4216 namespace
= "." + db_nsr
["_id"]
4218 await self
.n2vc
.delete_namespace(
4219 namespace
=namespace
,
4220 total_timeout
=self
.timeout_charm_delete
,
4223 except N2VCNotFound
: # already deleted. Skip
4225 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4227 async def _terminate_RO(
4228 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4231 Terminates a deployment from RO
4232 :param logging_text:
4233 :param nsr_deployed: db_nsr._admin.deployed
4236 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4237 this method will update only the index 2, but it will write on database the concatenated content of the list
4242 ro_nsr_id
= ro_delete_action
= None
4243 if nsr_deployed
and nsr_deployed
.get("RO"):
4244 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4245 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4248 stage
[2] = "Deleting ns from VIM."
4249 db_nsr_update
["detailed-status"] = " ".join(stage
)
4250 self
._write
_op
_status
(nslcmop_id
, stage
)
4251 self
.logger
.debug(logging_text
+ stage
[2])
4252 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4253 self
._write
_op
_status
(nslcmop_id
, stage
)
4254 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4255 ro_delete_action
= desc
["action_id"]
4257 "_admin.deployed.RO.nsr_delete_action_id"
4258 ] = ro_delete_action
4259 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4260 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4261 if ro_delete_action
:
4262 # wait until NS is deleted from VIM
4263 stage
[2] = "Waiting ns deleted from VIM."
4264 detailed_status_old
= None
4268 + " RO_id={} ro_delete_action={}".format(
4269 ro_nsr_id
, ro_delete_action
4272 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4273 self
._write
_op
_status
(nslcmop_id
, stage
)
4275 delete_timeout
= 20 * 60 # 20 minutes
4276 while delete_timeout
> 0:
4277 desc
= await self
.RO
.show(
4279 item_id_name
=ro_nsr_id
,
4280 extra_item
="action",
4281 extra_item_id
=ro_delete_action
,
4285 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4287 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4288 if ns_status
== "ERROR":
4289 raise ROclient
.ROClientException(ns_status_info
)
4290 elif ns_status
== "BUILD":
4291 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4292 elif ns_status
== "ACTIVE":
4293 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4294 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4299 ), "ROclient.check_action_status returns unknown {}".format(
4302 if stage
[2] != detailed_status_old
:
4303 detailed_status_old
= stage
[2]
4304 db_nsr_update
["detailed-status"] = " ".join(stage
)
4305 self
._write
_op
_status
(nslcmop_id
, stage
)
4306 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4307 await asyncio
.sleep(5, loop
=self
.loop
)
4309 else: # delete_timeout <= 0:
4310 raise ROclient
.ROClientException(
4311 "Timeout waiting ns deleted from VIM"
4314 except Exception as e
:
4315 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4317 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4319 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4320 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4321 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4323 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4326 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4328 failed_detail
.append("delete conflict: {}".format(e
))
4331 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4334 failed_detail
.append("delete error: {}".format(e
))
4336 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4340 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4341 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4343 stage
[2] = "Deleting nsd from RO."
4344 db_nsr_update
["detailed-status"] = " ".join(stage
)
4345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4346 self
._write
_op
_status
(nslcmop_id
, stage
)
4347 await self
.RO
.delete("nsd", ro_nsd_id
)
4349 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4351 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4352 except Exception as e
:
4354 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4356 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4358 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4361 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4363 failed_detail
.append(
4364 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4366 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4368 failed_detail
.append(
4369 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4371 self
.logger
.error(logging_text
+ failed_detail
[-1])
4373 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4374 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4375 if not vnf_deployed
or not vnf_deployed
["id"]:
4378 ro_vnfd_id
= vnf_deployed
["id"]
4381 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4382 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4384 db_nsr_update
["detailed-status"] = " ".join(stage
)
4385 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4386 self
._write
_op
_status
(nslcmop_id
, stage
)
4387 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4389 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4391 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4392 except Exception as e
:
4394 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4397 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4401 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4404 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4406 failed_detail
.append(
4407 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4409 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4411 failed_detail
.append(
4412 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4414 self
.logger
.error(logging_text
+ failed_detail
[-1])
4417 stage
[2] = "Error deleting from VIM"
4419 stage
[2] = "Deleted from VIM"
4420 db_nsr_update
["detailed-status"] = " ".join(stage
)
4421 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4422 self
._write
_op
_status
(nslcmop_id
, stage
)
4425 raise LcmException("; ".join(failed_detail
))
4427 async def terminate(self
, nsr_id
, nslcmop_id
):
4428 # Try to lock HA task here
4429 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4430 if not task_is_locked_by_me
:
4433 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4434 self
.logger
.debug(logging_text
+ "Enter")
4435 timeout_ns_terminate
= self
.timeout_ns_terminate
4438 operation_params
= None
4440 error_list
= [] # annotates all failed error messages
4441 db_nslcmop_update
= {}
4442 autoremove
= False # autoremove after terminated
4443 tasks_dict_info
= {}
4446 "Stage 1/3: Preparing task.",
4447 "Waiting for previous operations to terminate.",
4450 # ^ contains [stage, step, VIM-status]
4452 # wait for any previous tasks in process
4453 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4455 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4456 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4457 operation_params
= db_nslcmop
.get("operationParams") or {}
4458 if operation_params
.get("timeout_ns_terminate"):
4459 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4460 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4461 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4463 db_nsr_update
["operational-status"] = "terminating"
4464 db_nsr_update
["config-status"] = "terminating"
4465 self
._write
_ns
_status
(
4467 ns_state
="TERMINATING",
4468 current_operation
="TERMINATING",
4469 current_operation_id
=nslcmop_id
,
4470 other_update
=db_nsr_update
,
4472 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4473 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4474 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4477 stage
[1] = "Getting vnf descriptors from db."
4478 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4480 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4482 db_vnfds_from_id
= {}
4483 db_vnfds_from_member_index
= {}
4485 for vnfr
in db_vnfrs_list
:
4486 vnfd_id
= vnfr
["vnfd-id"]
4487 if vnfd_id
not in db_vnfds_from_id
:
4488 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4489 db_vnfds_from_id
[vnfd_id
] = vnfd
4490 db_vnfds_from_member_index
[
4491 vnfr
["member-vnf-index-ref"]
4492 ] = db_vnfds_from_id
[vnfd_id
]
4494 # Destroy individual execution environments when there are terminating primitives.
4495 # Rest of EE will be deleted at once
4496 # TODO - check before calling _destroy_N2VC
4497 # if not operation_params.get("skip_terminate_primitives"):#
4498 # or not vca.get("needed_terminate"):
4499 stage
[0] = "Stage 2/3 execute terminating primitives."
4500 self
.logger
.debug(logging_text
+ stage
[0])
4501 stage
[1] = "Looking execution environment that needs terminate."
4502 self
.logger
.debug(logging_text
+ stage
[1])
4504 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4505 config_descriptor
= None
4506 vca_member_vnf_index
= vca
.get("member-vnf-index")
4507 vca_id
= self
.get_vca_id(
4508 db_vnfrs_dict
.get(vca_member_vnf_index
)
4509 if vca_member_vnf_index
4513 if not vca
or not vca
.get("ee_id"):
4515 if not vca
.get("member-vnf-index"):
4517 config_descriptor
= db_nsr
.get("ns-configuration")
4518 elif vca
.get("vdu_id"):
4519 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4520 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4521 elif vca
.get("kdu_name"):
4522 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4523 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4525 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4526 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4527 vca_type
= vca
.get("type")
4528 exec_terminate_primitives
= not operation_params
.get(
4529 "skip_terminate_primitives"
4530 ) and vca
.get("needed_terminate")
4531 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4532 # pending native charms
4534 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4536 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4537 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4538 task
= asyncio
.ensure_future(
4546 exec_terminate_primitives
,
4550 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4552 # wait for pending tasks of terminate primitives
4556 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4558 error_list
= await self
._wait
_for
_tasks
(
4561 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4565 tasks_dict_info
.clear()
4567 return # raise LcmException("; ".join(error_list))
4569 # remove All execution environments at once
4570 stage
[0] = "Stage 3/3 delete all."
4572 if nsr_deployed
.get("VCA"):
4573 stage
[1] = "Deleting all execution environments."
4574 self
.logger
.debug(logging_text
+ stage
[1])
4575 vca_id
= self
.get_vca_id({}, db_nsr
)
4576 task_delete_ee
= asyncio
.ensure_future(
4578 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4579 timeout
=self
.timeout_charm_delete
,
4582 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4583 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4585 # Delete from k8scluster
4586 stage
[1] = "Deleting KDUs."
4587 self
.logger
.debug(logging_text
+ stage
[1])
4588 # print(nsr_deployed)
4589 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4590 if not kdu
or not kdu
.get("kdu-instance"):
4592 kdu_instance
= kdu
.get("kdu-instance")
4593 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4594 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4595 vca_id
= self
.get_vca_id({}, db_nsr
)
4596 task_delete_kdu_instance
= asyncio
.ensure_future(
4597 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4598 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4599 kdu_instance
=kdu_instance
,
4601 namespace
=kdu
.get("namespace"),
4607 + "Unknown k8s deployment type {}".format(
4608 kdu
.get("k8scluster-type")
4613 task_delete_kdu_instance
4614 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4617 stage
[1] = "Deleting ns from VIM."
4619 task_delete_ro
= asyncio
.ensure_future(
4620 self
._terminate
_ng
_ro
(
4621 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4625 task_delete_ro
= asyncio
.ensure_future(
4627 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4630 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4632 # rest of staff will be done at finally
4635 ROclient
.ROClientException
,
4640 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4642 except asyncio
.CancelledError
:
4644 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4646 exc
= "Operation was cancelled"
4647 except Exception as e
:
4648 exc
= traceback
.format_exc()
4649 self
.logger
.critical(
4650 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4655 error_list
.append(str(exc
))
4657 # wait for pending tasks
4659 stage
[1] = "Waiting for terminate pending tasks."
4660 self
.logger
.debug(logging_text
+ stage
[1])
4661 error_list
+= await self
._wait
_for
_tasks
(
4664 timeout_ns_terminate
,
4668 stage
[1] = stage
[2] = ""
4669 except asyncio
.CancelledError
:
4670 error_list
.append("Cancelled")
4671 # TODO cancell all tasks
4672 except Exception as exc
:
4673 error_list
.append(str(exc
))
4674 # update status at database
4676 error_detail
= "; ".join(error_list
)
4677 # self.logger.error(logging_text + error_detail)
4678 error_description_nslcmop
= "{} Detail: {}".format(
4679 stage
[0], error_detail
4681 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4682 nslcmop_id
, stage
[0]
4685 db_nsr_update
["operational-status"] = "failed"
4686 db_nsr_update
["detailed-status"] = (
4687 error_description_nsr
+ " Detail: " + error_detail
4689 db_nslcmop_update
["detailed-status"] = error_detail
4690 nslcmop_operation_state
= "FAILED"
4694 error_description_nsr
= error_description_nslcmop
= None
4695 ns_state
= "NOT_INSTANTIATED"
4696 db_nsr_update
["operational-status"] = "terminated"
4697 db_nsr_update
["detailed-status"] = "Done"
4698 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4699 db_nslcmop_update
["detailed-status"] = "Done"
4700 nslcmop_operation_state
= "COMPLETED"
4703 self
._write
_ns
_status
(
4706 current_operation
="IDLE",
4707 current_operation_id
=None,
4708 error_description
=error_description_nsr
,
4709 error_detail
=error_detail
,
4710 other_update
=db_nsr_update
,
4712 self
._write
_op
_status
(
4715 error_message
=error_description_nslcmop
,
4716 operation_state
=nslcmop_operation_state
,
4717 other_update
=db_nslcmop_update
,
4719 if ns_state
== "NOT_INSTANTIATED":
4723 {"nsr-id-ref": nsr_id
},
4724 {"_admin.nsState": "NOT_INSTANTIATED"},
4726 except DbException
as e
:
4729 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4733 if operation_params
:
4734 autoremove
= operation_params
.get("autoremove", False)
4735 if nslcmop_operation_state
:
4737 await self
.msg
.aiowrite(
4742 "nslcmop_id": nslcmop_id
,
4743 "operationState": nslcmop_operation_state
,
4744 "autoremove": autoremove
,
4748 except Exception as e
:
4750 logging_text
+ "kafka_write notification Exception {}".format(e
)
4753 self
.logger
.debug(logging_text
+ "Exit")
4754 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4756 async def _wait_for_tasks(
4757 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4760 error_detail_list
= []
4762 pending_tasks
= list(created_tasks_info
.keys())
4763 num_tasks
= len(pending_tasks
)
4765 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4766 self
._write
_op
_status
(nslcmop_id
, stage
)
4767 while pending_tasks
:
4769 _timeout
= timeout
+ time_start
- time()
4770 done
, pending_tasks
= await asyncio
.wait(
4771 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4773 num_done
+= len(done
)
4774 if not done
: # Timeout
4775 for task
in pending_tasks
:
4776 new_error
= created_tasks_info
[task
] + ": Timeout"
4777 error_detail_list
.append(new_error
)
4778 error_list
.append(new_error
)
4781 if task
.cancelled():
4784 exc
= task
.exception()
4786 if isinstance(exc
, asyncio
.TimeoutError
):
4788 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4789 error_list
.append(created_tasks_info
[task
])
4790 error_detail_list
.append(new_error
)
4797 ROclient
.ROClientException
,
4803 self
.logger
.error(logging_text
+ new_error
)
4805 exc_traceback
= "".join(
4806 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4810 + created_tasks_info
[task
]
4816 logging_text
+ created_tasks_info
[task
] + ": Done"
4818 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4820 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4821 if nsr_id
: # update also nsr
4826 "errorDescription": "Error at: " + ", ".join(error_list
),
4827 "errorDetail": ". ".join(error_detail_list
),
4830 self
._write
_op
_status
(nslcmop_id
, stage
)
4831 return error_detail_list
4834 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4836 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4837 The default-value is used. If it is between < > it look for a value at instantiation_params
4838 :param primitive_desc: portion of VNFD/NSD that describes primitive
4839 :param params: Params provided by user
4840 :param instantiation_params: Instantiation params provided by user
4841 :return: a dictionary with the calculated params
4843 calculated_params
= {}
4844 for parameter
in primitive_desc
.get("parameter", ()):
4845 param_name
= parameter
["name"]
4846 if param_name
in params
:
4847 calculated_params
[param_name
] = params
[param_name
]
4848 elif "default-value" in parameter
or "value" in parameter
:
4849 if "value" in parameter
:
4850 calculated_params
[param_name
] = parameter
["value"]
4852 calculated_params
[param_name
] = parameter
["default-value"]
4854 isinstance(calculated_params
[param_name
], str)
4855 and calculated_params
[param_name
].startswith("<")
4856 and calculated_params
[param_name
].endswith(">")
4858 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4859 calculated_params
[param_name
] = instantiation_params
[
4860 calculated_params
[param_name
][1:-1]
4864 "Parameter {} needed to execute primitive {} not provided".format(
4865 calculated_params
[param_name
], primitive_desc
["name"]
4870 "Parameter {} needed to execute primitive {} not provided".format(
4871 param_name
, primitive_desc
["name"]
4875 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4876 calculated_params
[param_name
] = yaml
.safe_dump(
4877 calculated_params
[param_name
], default_flow_style
=True, width
=256
4879 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4881 ].startswith("!!yaml "):
4882 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4883 if parameter
.get("data-type") == "INTEGER":
4885 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4886 except ValueError: # error converting string to int
4888 "Parameter {} of primitive {} must be integer".format(
4889 param_name
, primitive_desc
["name"]
4892 elif parameter
.get("data-type") == "BOOLEAN":
4893 calculated_params
[param_name
] = not (
4894 (str(calculated_params
[param_name
])).lower() == "false"
4897 # add always ns_config_info if primitive name is config
4898 if primitive_desc
["name"] == "config":
4899 if "ns_config_info" in instantiation_params
:
4900 calculated_params
["ns_config_info"] = instantiation_params
[
4903 return calculated_params
4905 def _look_for_deployed_vca(
4912 ee_descriptor_id
=None,
4914 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4915 for vca
in deployed_vca
:
4918 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4921 vdu_count_index
is not None
4922 and vdu_count_index
!= vca
["vdu_count_index"]
4925 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4927 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4931 # vca_deployed not found
4933 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4934 " is not deployed".format(
4943 ee_id
= vca
.get("ee_id")
4945 "type", "lxc_proxy_charm"
4946 ) # default value for backward compatibility - proxy charm
4949 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4950 "execution environment".format(
4951 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4954 return ee_id
, vca_type
4956 async def _ns_execute_primitive(
4962 retries_interval
=30,
4969 if primitive
== "config":
4970 primitive_params
= {"params": primitive_params
}
4972 vca_type
= vca_type
or "lxc_proxy_charm"
4976 output
= await asyncio
.wait_for(
4977 self
.vca_map
[vca_type
].exec_primitive(
4979 primitive_name
=primitive
,
4980 params_dict
=primitive_params
,
4981 progress_timeout
=self
.timeout_progress_primitive
,
4982 total_timeout
=self
.timeout_primitive
,
4987 timeout
=timeout
or self
.timeout_primitive
,
4991 except asyncio
.CancelledError
:
4993 except Exception as e
: # asyncio.TimeoutError
4994 if isinstance(e
, asyncio
.TimeoutError
):
4999 "Error executing action {} on {} -> {}".format(
5004 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5006 return "FAILED", str(e
)
5008 return "COMPLETED", output
5010 except (LcmException
, asyncio
.CancelledError
):
5012 except Exception as e
:
5013 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5015 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5017 Updating the vca_status with latest juju information in nsrs record
5018 :param: nsr_id: Id of the nsr
5019 :param: nslcmop_id: Id of the nslcmop
5023 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5024 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5025 vca_id
= self
.get_vca_id({}, db_nsr
)
5026 if db_nsr
["_admin"]["deployed"]["K8s"]:
5027 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5028 cluster_uuid
, kdu_instance
, cluster_type
= (
5029 k8s
["k8scluster-uuid"],
5030 k8s
["kdu-instance"],
5031 k8s
["k8scluster-type"],
5033 await self
._on
_update
_k
8s
_db
(
5034 cluster_uuid
=cluster_uuid
,
5035 kdu_instance
=kdu_instance
,
5036 filter={"_id": nsr_id
},
5038 cluster_type
=cluster_type
,
5041 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5042 table
, filter = "nsrs", {"_id": nsr_id
}
5043 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5044 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5046 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5047 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5049 async def action(self
, nsr_id
, nslcmop_id
):
5050 # Try to lock HA task here
5051 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5052 if not task_is_locked_by_me
:
5055 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5056 self
.logger
.debug(logging_text
+ "Enter")
5057 # get all needed from database
5061 db_nslcmop_update
= {}
5062 nslcmop_operation_state
= None
5063 error_description_nslcmop
= None
5066 # wait for any previous tasks in process
5067 step
= "Waiting for previous operations to terminate"
5068 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5070 self
._write
_ns
_status
(
5073 current_operation
="RUNNING ACTION",
5074 current_operation_id
=nslcmop_id
,
5077 step
= "Getting information from database"
5078 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5079 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5080 if db_nslcmop
["operationParams"].get("primitive_params"):
5081 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5082 db_nslcmop
["operationParams"]["primitive_params"]
5085 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5086 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5087 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5088 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5089 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5090 primitive
= db_nslcmop
["operationParams"]["primitive"]
5091 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5092 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5093 "timeout_ns_action", self
.timeout_primitive
5097 step
= "Getting vnfr from database"
5098 db_vnfr
= self
.db
.get_one(
5099 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5101 if db_vnfr
.get("kdur"):
5103 for kdur
in db_vnfr
["kdur"]:
5104 if kdur
.get("additionalParams"):
5105 kdur
["additionalParams"] = json
.loads(
5106 kdur
["additionalParams"]
5108 kdur_list
.append(kdur
)
5109 db_vnfr
["kdur"] = kdur_list
5110 step
= "Getting vnfd from database"
5111 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5113 # Sync filesystem before running a primitive
5114 self
.fs
.sync(db_vnfr
["vnfd-id"])
5116 step
= "Getting nsd from database"
5117 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5119 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5120 # for backward compatibility
5121 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5122 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5123 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5124 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5126 # look for primitive
5127 config_primitive_desc
= descriptor_configuration
= None
5129 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5131 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5133 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5135 descriptor_configuration
= db_nsd
.get("ns-configuration")
5137 if descriptor_configuration
and descriptor_configuration
.get(
5140 for config_primitive
in descriptor_configuration
["config-primitive"]:
5141 if config_primitive
["name"] == primitive
:
5142 config_primitive_desc
= config_primitive
5145 if not config_primitive_desc
:
5146 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5148 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5152 primitive_name
= primitive
5153 ee_descriptor_id
= None
5155 primitive_name
= config_primitive_desc
.get(
5156 "execution-environment-primitive", primitive
5158 ee_descriptor_id
= config_primitive_desc
.get(
5159 "execution-environment-ref"
5165 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5167 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5170 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5172 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5174 desc_params
= parse_yaml_strings(
5175 db_vnfr
.get("additionalParamsForVnf")
5178 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5179 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5180 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5182 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5183 actions
.add(primitive
["name"])
5184 for primitive
in kdu_configuration
.get("config-primitive", []):
5185 actions
.add(primitive
["name"])
5187 nsr_deployed
["K8s"],
5188 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5189 and kdu
["member-vnf-index"] == vnf_index
,
5193 if primitive_name
in actions
5194 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5198 # TODO check if ns is in a proper status
5200 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5202 # kdur and desc_params already set from before
5203 if primitive_params
:
5204 desc_params
.update(primitive_params
)
5205 # TODO Check if we will need something at vnf level
5206 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5208 kdu_name
== kdu
["kdu-name"]
5209 and kdu
["member-vnf-index"] == vnf_index
5214 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5217 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5218 msg
= "unknown k8scluster-type '{}'".format(
5219 kdu
.get("k8scluster-type")
5221 raise LcmException(msg
)
5224 "collection": "nsrs",
5225 "filter": {"_id": nsr_id
},
5226 "path": "_admin.deployed.K8s.{}".format(index
),
5230 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5232 step
= "Executing kdu {}".format(primitive_name
)
5233 if primitive_name
== "upgrade":
5234 if desc_params
.get("kdu_model"):
5235 kdu_model
= desc_params
.get("kdu_model")
5236 del desc_params
["kdu_model"]
5238 kdu_model
= kdu
.get("kdu-model")
5239 parts
= kdu_model
.split(sep
=":")
5241 kdu_model
= parts
[0]
5242 if desc_params
.get("kdu_atomic_upgrade"):
5243 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5244 del desc_params
["kdu_atomic_upgrade"]
5246 atomic_upgrade
= True
5248 detailed_status
= await asyncio
.wait_for(
5249 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5250 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5251 kdu_instance
=kdu
.get("kdu-instance"),
5252 atomic
=atomic_upgrade
,
5253 kdu_model
=kdu_model
,
5256 timeout
=timeout_ns_action
,
5258 timeout
=timeout_ns_action
+ 10,
5261 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5263 elif primitive_name
== "rollback":
5264 detailed_status
= await asyncio
.wait_for(
5265 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5266 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5267 kdu_instance
=kdu
.get("kdu-instance"),
5270 timeout
=timeout_ns_action
,
5272 elif primitive_name
== "status":
5273 detailed_status
= await asyncio
.wait_for(
5274 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5275 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5276 kdu_instance
=kdu
.get("kdu-instance"),
5279 timeout
=timeout_ns_action
,
5282 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5283 kdu
["kdu-name"], nsr_id
5285 params
= self
._map
_primitive
_params
(
5286 config_primitive_desc
, primitive_params
, desc_params
5289 detailed_status
= await asyncio
.wait_for(
5290 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5291 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5292 kdu_instance
=kdu_instance
,
5293 primitive_name
=primitive_name
,
5296 timeout
=timeout_ns_action
,
5299 timeout
=timeout_ns_action
,
5303 nslcmop_operation_state
= "COMPLETED"
5305 detailed_status
= ""
5306 nslcmop_operation_state
= "FAILED"
5308 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5309 nsr_deployed
["VCA"],
5310 member_vnf_index
=vnf_index
,
5312 vdu_count_index
=vdu_count_index
,
5313 ee_descriptor_id
=ee_descriptor_id
,
5315 for vca_index
, vca_deployed
in enumerate(
5316 db_nsr
["_admin"]["deployed"]["VCA"]
5318 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5320 "collection": "nsrs",
5321 "filter": {"_id": nsr_id
},
5322 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5326 nslcmop_operation_state
,
5328 ) = await self
._ns
_execute
_primitive
(
5330 primitive
=primitive_name
,
5331 primitive_params
=self
._map
_primitive
_params
(
5332 config_primitive_desc
, primitive_params
, desc_params
5334 timeout
=timeout_ns_action
,
5340 db_nslcmop_update
["detailed-status"] = detailed_status
5341 error_description_nslcmop
= (
5342 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5346 + " task Done with result {} {}".format(
5347 nslcmop_operation_state
, detailed_status
5350 return # database update is called inside finally
5352 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5353 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5355 except asyncio
.CancelledError
:
5357 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5359 exc
= "Operation was cancelled"
5360 except asyncio
.TimeoutError
:
5361 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5363 except Exception as e
:
5364 exc
= traceback
.format_exc()
5365 self
.logger
.critical(
5366 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5375 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5376 nslcmop_operation_state
= "FAILED"
5378 self
._write
_ns
_status
(
5382 ], # TODO check if degraded. For the moment use previous status
5383 current_operation
="IDLE",
5384 current_operation_id
=None,
5385 # error_description=error_description_nsr,
5386 # error_detail=error_detail,
5387 other_update
=db_nsr_update
,
5390 self
._write
_op
_status
(
5393 error_message
=error_description_nslcmop
,
5394 operation_state
=nslcmop_operation_state
,
5395 other_update
=db_nslcmop_update
,
5398 if nslcmop_operation_state
:
5400 await self
.msg
.aiowrite(
5405 "nslcmop_id": nslcmop_id
,
5406 "operationState": nslcmop_operation_state
,
5410 except Exception as e
:
5412 logging_text
+ "kafka_write notification Exception {}".format(e
)
5414 self
.logger
.debug(logging_text
+ "Exit")
5415 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5416 return nslcmop_operation_state
, detailed_status
5418 async def terminate_vdus(
5419 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5421 """This method terminates VDUs
5424 db_vnfr: VNF instance record
5425 member_vnf_index: VNF index to identify the VDUs to be removed
5426 db_nsr: NS instance record
5427 update_db_nslcmops: Nslcmop update record
5429 vca_scaling_info
= []
5430 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5431 scaling_info
["scaling_direction"] = "IN"
5432 scaling_info
["vdu-delete"] = {}
5433 scaling_info
["kdu-delete"] = {}
5434 db_vdur
= db_vnfr
.get("vdur")
5435 vdur_list
= copy(db_vdur
)
5437 for index
, vdu
in enumerate(vdur_list
):
5438 vca_scaling_info
.append(
5440 "osm_vdu_id": vdu
["vdu-id-ref"],
5441 "member-vnf-index": member_vnf_index
,
5443 "vdu_index": count_index
,
5445 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5446 scaling_info
["vdu"].append(
5448 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5449 "vdu_id": vdu
["vdu-id-ref"],
5452 for interface
in vdu
["interfaces"]:
5453 scaling_info
["vdu"][index
]["interface"].append(
5455 "name": interface
["name"],
5456 "ip_address": interface
["ip-address"],
5457 "mac_address": interface
.get("mac-address"),
5459 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5460 stage
[2] = "Terminating VDUs"
5461 if scaling_info
.get("vdu-delete"):
5462 # scale_process = "RO"
5463 if self
.ro_config
.get("ng"):
5464 await self
._scale
_ng
_ro
(
5465 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5468 async def remove_vnf(
5469 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5471 """This method is to Remove VNF instances from NS.
5474 nsr_id: NS instance id
5475 nslcmop_id: nslcmop id of update
5476 vnf_instance_id: id of the VNF instance to be removed
5479 result: (str, str) COMPLETED/FAILED, details
5483 logging_text
= "Task ns={} update ".format(nsr_id
)
5484 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5485 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5486 if check_vnfr_count
> 1:
5487 stage
= ["", "", ""]
5488 step
= "Getting nslcmop from database"
5489 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5490 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5491 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5492 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5493 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5494 """ db_vnfr = self.db.get_one(
5495 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5497 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5498 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5500 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5501 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5502 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5503 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5504 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5505 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5506 return "COMPLETED", "Done"
5508 step
= "Terminate VNF Failed with"
5509 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5511 except (LcmException
, asyncio
.CancelledError
):
5513 except Exception as e
:
5514 self
.logger
.debug("Error removing VNF {}".format(e
))
5515 return "FAILED", "Error removing VNF {}".format(e
)
5517 async def _ns_redeploy_vnf(
5518 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5520 """This method updates and redeploys VNF instances
5523 nsr_id: NS instance id
5524 nslcmop_id: nslcmop id
5525 db_vnfd: VNF descriptor
5526 db_vnfr: VNF instance record
5527 db_nsr: NS instance record
5530 result: (str, str) COMPLETED/FAILED, details
5534 stage
= ["", "", ""]
5535 logging_text
= "Task ns={} update ".format(nsr_id
)
5536 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5537 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5539 # Terminate old VNF resources
5540 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5541 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5543 # old_vnfd_id = db_vnfr["vnfd-id"]
5544 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5545 new_db_vnfd
= db_vnfd
5546 # new_vnfd_ref = new_db_vnfd["id"]
5547 # new_vnfd_id = vnfd_id
5551 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5553 "name": cp
.get("id"),
5554 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5555 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5558 new_vnfr_cp
.append(vnf_cp
)
5559 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5560 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5561 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5562 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5563 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5564 updated_db_vnfr
= self
.db
.get_one(
5565 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5568 # Instantiate new VNF resources
5569 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5570 vca_scaling_info
= []
5571 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5572 scaling_info
["scaling_direction"] = "OUT"
5573 scaling_info
["vdu-create"] = {}
5574 scaling_info
["kdu-create"] = {}
5575 vdud_instantiate_list
= db_vnfd
["vdu"]
5576 for index
, vdud
in enumerate(vdud_instantiate_list
):
5577 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5581 additional_params
= (
5582 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5585 cloud_init_list
= []
5587 # TODO Information of its own ip is not available because db_vnfr is not updated.
5588 additional_params
["OSM"] = get_osm_params(
5589 updated_db_vnfr
, vdud
["id"], 1
5591 cloud_init_list
.append(
5592 self
._parse
_cloud
_init
(
5599 vca_scaling_info
.append(
5601 "osm_vdu_id": vdud
["id"],
5602 "member-vnf-index": member_vnf_index
,
5604 "vdu_index": count_index
,
5607 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5608 if self
.ro_config
.get("ng"):
5610 "New Resources to be deployed: {}".format(scaling_info
))
5611 await self
._scale
_ng
_ro
(
5612 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5614 return "COMPLETED", "Done"
5615 except (LcmException
, asyncio
.CancelledError
):
5617 except Exception as e
:
5618 self
.logger
.debug("Error updating VNF {}".format(e
))
5619 return "FAILED", "Error updating VNF {}".format(e
)
5621 async def _ns_charm_upgrade(
5627 timeout
: float = None,
5629 """This method upgrade charms in VNF instances
5632 ee_id: Execution environment id
5633 path: Local path to the charm
5635 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5636 timeout: (Float) Timeout for the ns update operation
5639 result: (str, str) COMPLETED/FAILED, details
5642 charm_type
= charm_type
or "lxc_proxy_charm"
5643 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5647 charm_type
=charm_type
,
5648 timeout
=timeout
or self
.timeout_ns_update
,
5652 return "COMPLETED", output
5654 except (LcmException
, asyncio
.CancelledError
):
5657 except Exception as e
:
5659 self
.logger
.debug("Error upgrading charm {}".format(path
))
5661 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5663 async def update(self
, nsr_id
, nslcmop_id
):
5664 """Update NS according to different update types
5666 This method performs upgrade of VNF instances then updates the revision
5667 number in VNF record
5670 nsr_id: Network service will be updated
5671 nslcmop_id: ns lcm operation id
5674 It may raise DbException, LcmException, N2VCException, K8sException
5677 # Try to lock HA task here
5678 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5679 if not task_is_locked_by_me
:
5682 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5683 self
.logger
.debug(logging_text
+ "Enter")
5685 # Set the required variables to be filled up later
5687 db_nslcmop_update
= {}
5689 nslcmop_operation_state
= None
5691 error_description_nslcmop
= ""
5693 change_type
= "updated"
5694 detailed_status
= ""
5697 # wait for any previous tasks in process
5698 step
= "Waiting for previous operations to terminate"
5699 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5700 self
._write
_ns
_status
(
5703 current_operation
="UPDATING",
5704 current_operation_id
=nslcmop_id
,
5707 step
= "Getting nslcmop from database"
5708 db_nslcmop
= self
.db
.get_one(
5709 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5711 update_type
= db_nslcmop
["operationParams"]["updateType"]
5713 step
= "Getting nsr from database"
5714 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5715 old_operational_status
= db_nsr
["operational-status"]
5716 db_nsr_update
["operational-status"] = "updating"
5717 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5718 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5720 if update_type
== "CHANGE_VNFPKG":
5722 # Get the input parameters given through update request
5723 vnf_instance_id
= db_nslcmop
["operationParams"][
5724 "changeVnfPackageData"
5725 ].get("vnfInstanceId")
5727 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5730 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5732 step
= "Getting vnfr from database"
5733 db_vnfr
= self
.db
.get_one(
5734 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5737 step
= "Getting vnfds from database"
5739 latest_vnfd
= self
.db
.get_one(
5740 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5742 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5745 current_vnf_revision
= db_vnfr
.get("revision", 1)
5746 current_vnfd
= self
.db
.get_one(
5748 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5749 fail_on_empty
=False,
5751 # Charm artifact paths will be filled up later
5753 current_charm_artifact_path
,
5754 target_charm_artifact_path
,
5755 charm_artifact_paths
,
5758 step
= "Checking if revision has changed in VNFD"
5759 if current_vnf_revision
!= latest_vnfd_revision
:
5761 change_type
= "policy_updated"
5763 # There is new revision of VNFD, update operation is required
5764 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5765 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5767 step
= "Removing the VNFD packages if they exist in the local path"
5768 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5769 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5771 step
= "Get the VNFD packages from FSMongo"
5772 self
.fs
.sync(from_path
=latest_vnfd_path
)
5773 self
.fs
.sync(from_path
=current_vnfd_path
)
5776 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5778 base_folder
= latest_vnfd
["_admin"]["storage"]
5780 for charm_index
, charm_deployed
in enumerate(
5781 get_iterable(nsr_deployed
, "VCA")
5783 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5785 # Getting charm-id and charm-type
5786 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5787 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5788 charm_type
= charm_deployed
.get("type")
5791 ee_id
= charm_deployed
.get("ee_id")
5793 step
= "Getting descriptor config"
5794 descriptor_config
= get_configuration(
5795 current_vnfd
, current_vnfd
["id"]
5798 if "execution-environment-list" in descriptor_config
:
5799 ee_list
= descriptor_config
.get(
5800 "execution-environment-list", []
5805 # There could be several charm used in the same VNF
5806 for ee_item
in ee_list
:
5807 if ee_item
.get("juju"):
5809 step
= "Getting charm name"
5810 charm_name
= ee_item
["juju"].get("charm")
5812 step
= "Setting Charm artifact paths"
5813 current_charm_artifact_path
.append(
5814 get_charm_artifact_path(
5818 current_vnf_revision
,
5821 target_charm_artifact_path
.append(
5822 get_charm_artifact_path(
5826 latest_vnfd_revision
,
5830 charm_artifact_paths
= zip(
5831 current_charm_artifact_path
, target_charm_artifact_path
5834 step
= "Checking if software version has changed in VNFD"
5835 if find_software_version(current_vnfd
) != find_software_version(
5839 step
= "Checking if existing VNF has charm"
5840 for current_charm_path
, target_charm_path
in list(
5841 charm_artifact_paths
5843 if current_charm_path
:
5845 "Software version change is not supported as VNF instance {} has charm.".format(
5850 # There is no change in the charm package, then redeploy the VNF
5851 # based on new descriptor
5852 step
= "Redeploying VNF"
5853 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5857 ) = await self
._ns
_redeploy
_vnf
(
5864 if result
== "FAILED":
5865 nslcmop_operation_state
= result
5866 error_description_nslcmop
= detailed_status
5867 db_nslcmop_update
["detailed-status"] = detailed_status
5870 + " step {} Done with result {} {}".format(
5871 step
, nslcmop_operation_state
, detailed_status
5876 step
= "Checking if any charm package has changed or not"
5877 for current_charm_path
, target_charm_path
in list(
5878 charm_artifact_paths
5882 and target_charm_path
5883 and self
.check_charm_hash_changed(
5884 current_charm_path
, target_charm_path
5888 step
= "Checking whether VNF uses juju bundle"
5889 if check_juju_bundle_existence(current_vnfd
):
5892 "Charm upgrade is not supported for the instance which"
5893 " uses juju-bundle: {}".format(
5894 check_juju_bundle_existence(current_vnfd
)
5898 step
= "Upgrading Charm"
5902 ) = await self
._ns
_charm
_upgrade
(
5905 charm_type
=charm_type
,
5906 path
=self
.fs
.path
+ target_charm_path
,
5907 timeout
=timeout_seconds
,
5910 if result
== "FAILED":
5911 nslcmop_operation_state
= result
5912 error_description_nslcmop
= detailed_status
5914 db_nslcmop_update
["detailed-status"] = detailed_status
5917 + " step {} Done with result {} {}".format(
5918 step
, nslcmop_operation_state
, detailed_status
5922 step
= "Updating policies"
5923 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5924 result
= "COMPLETED"
5925 detailed_status
= "Done"
5926 db_nslcmop_update
["detailed-status"] = "Done"
5928 # If nslcmop_operation_state is None, so any operation is not failed.
5929 if not nslcmop_operation_state
:
5930 nslcmop_operation_state
= "COMPLETED"
5932 # If update CHANGE_VNFPKG nslcmop_operation is successful
5933 # vnf revision need to be updated
5934 vnfr_update
["revision"] = latest_vnfd_revision
5935 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5939 + " task Done with result {} {}".format(
5940 nslcmop_operation_state
, detailed_status
5943 elif update_type
== "REMOVE_VNF":
5944 # This part is included in https://osm.etsi.org/gerrit/11876
5945 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5946 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5947 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5948 step
= "Removing VNF"
5949 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5950 if result
== "FAILED":
5951 nslcmop_operation_state
= result
5952 error_description_nslcmop
= detailed_status
5953 db_nslcmop_update
["detailed-status"] = detailed_status
5954 change_type
= "vnf_terminated"
5955 if not nslcmop_operation_state
:
5956 nslcmop_operation_state
= "COMPLETED"
5959 + " task Done with result {} {}".format(
5960 nslcmop_operation_state
, detailed_status
5964 elif update_type
== "OPERATE_VNF":
5965 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5966 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5967 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5968 (result
, detailed_status
) = await self
.rebuild_start_stop(
5969 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5971 if result
== "FAILED":
5972 nslcmop_operation_state
= result
5973 error_description_nslcmop
= detailed_status
5974 db_nslcmop_update
["detailed-status"] = detailed_status
5975 if not nslcmop_operation_state
:
5976 nslcmop_operation_state
= "COMPLETED"
5979 + " task Done with result {} {}".format(
5980 nslcmop_operation_state
, detailed_status
5984 # If nslcmop_operation_state is None, so any operation is not failed.
5985 # All operations are executed in overall.
5986 if not nslcmop_operation_state
:
5987 nslcmop_operation_state
= "COMPLETED"
5988 db_nsr_update
["operational-status"] = old_operational_status
5990 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5991 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5993 except asyncio
.CancelledError
:
5995 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5997 exc
= "Operation was cancelled"
5998 except asyncio
.TimeoutError
:
5999 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6001 except Exception as e
:
6002 exc
= traceback
.format_exc()
6003 self
.logger
.critical(
6004 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6013 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6014 nslcmop_operation_state
= "FAILED"
6015 db_nsr_update
["operational-status"] = old_operational_status
6017 self
._write
_ns
_status
(
6019 ns_state
=db_nsr
["nsState"],
6020 current_operation
="IDLE",
6021 current_operation_id
=None,
6022 other_update
=db_nsr_update
,
6025 self
._write
_op
_status
(
6028 error_message
=error_description_nslcmop
,
6029 operation_state
=nslcmop_operation_state
,
6030 other_update
=db_nslcmop_update
,
6033 if nslcmop_operation_state
:
6037 "nslcmop_id": nslcmop_id
,
6038 "operationState": nslcmop_operation_state
,
6040 if change_type
in ("vnf_terminated", "policy_updated"):
6041 msg
.update({"vnf_member_index": member_vnf_index
})
6042 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6043 except Exception as e
:
6045 logging_text
+ "kafka_write notification Exception {}".format(e
)
6047 self
.logger
.debug(logging_text
+ "Exit")
6048 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6049 return nslcmop_operation_state
, detailed_status
6051 async def scale(self
, nsr_id
, nslcmop_id
):
6052 # Try to lock HA task here
6053 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6054 if not task_is_locked_by_me
:
6057 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6058 stage
= ["", "", ""]
6059 tasks_dict_info
= {}
6060 # ^ stage, step, VIM progress
6061 self
.logger
.debug(logging_text
+ "Enter")
6062 # get all needed from database
6064 db_nslcmop_update
= {}
6067 # in case of error, indicates what part of scale was failed to put nsr at error status
6068 scale_process
= None
6069 old_operational_status
= ""
6070 old_config_status
= ""
6073 # wait for any previous tasks in process
6074 step
= "Waiting for previous operations to terminate"
6075 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6076 self
._write
_ns
_status
(
6079 current_operation
="SCALING",
6080 current_operation_id
=nslcmop_id
,
6083 step
= "Getting nslcmop from database"
6085 step
+ " after having waited for previous tasks to be completed"
6087 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6089 step
= "Getting nsr from database"
6090 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6091 old_operational_status
= db_nsr
["operational-status"]
6092 old_config_status
= db_nsr
["config-status"]
6094 step
= "Parsing scaling parameters"
6095 db_nsr_update
["operational-status"] = "scaling"
6096 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6097 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6099 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6101 ]["member-vnf-index"]
6102 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6104 ]["scaling-group-descriptor"]
6105 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6106 # for backward compatibility
6107 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6108 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6109 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6110 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6112 step
= "Getting vnfr from database"
6113 db_vnfr
= self
.db
.get_one(
6114 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6117 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6119 step
= "Getting vnfd from database"
6120 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6122 base_folder
= db_vnfd
["_admin"]["storage"]
6124 step
= "Getting scaling-group-descriptor"
6125 scaling_descriptor
= find_in_list(
6126 get_scaling_aspect(db_vnfd
),
6127 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6129 if not scaling_descriptor
:
6131 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6132 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6135 step
= "Sending scale order to VIM"
6136 # TODO check if ns is in a proper status
6138 if not db_nsr
["_admin"].get("scaling-group"):
6143 "_admin.scaling-group": [
6144 {"name": scaling_group
, "nb-scale-op": 0}
6148 admin_scale_index
= 0
6150 for admin_scale_index
, admin_scale_info
in enumerate(
6151 db_nsr
["_admin"]["scaling-group"]
6153 if admin_scale_info
["name"] == scaling_group
:
6154 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6156 else: # not found, set index one plus last element and add new entry with the name
6157 admin_scale_index
+= 1
6159 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6162 vca_scaling_info
= []
6163 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6164 if scaling_type
== "SCALE_OUT":
6165 if "aspect-delta-details" not in scaling_descriptor
:
6167 "Aspect delta details not fount in scaling descriptor {}".format(
6168 scaling_descriptor
["name"]
6171 # count if max-instance-count is reached
6172 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6174 scaling_info
["scaling_direction"] = "OUT"
6175 scaling_info
["vdu-create"] = {}
6176 scaling_info
["kdu-create"] = {}
6177 for delta
in deltas
:
6178 for vdu_delta
in delta
.get("vdu-delta", {}):
6179 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6180 # vdu_index also provides the number of instance of the targeted vdu
6181 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6182 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6186 additional_params
= (
6187 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6190 cloud_init_list
= []
6192 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6193 max_instance_count
= 10
6194 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6195 max_instance_count
= vdu_profile
.get(
6196 "max-number-of-instances", 10
6199 default_instance_num
= get_number_of_instances(
6202 instances_number
= vdu_delta
.get("number-of-instances", 1)
6203 nb_scale_op
+= instances_number
6205 new_instance_count
= nb_scale_op
+ default_instance_num
6206 # Control if new count is over max and vdu count is less than max.
6207 # Then assign new instance count
6208 if new_instance_count
> max_instance_count
> vdu_count
:
6209 instances_number
= new_instance_count
- max_instance_count
6211 instances_number
= instances_number
6213 if new_instance_count
> max_instance_count
:
6215 "reached the limit of {} (max-instance-count) "
6216 "scaling-out operations for the "
6217 "scaling-group-descriptor '{}'".format(
6218 nb_scale_op
, scaling_group
6221 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6223 # TODO Information of its own ip is not available because db_vnfr is not updated.
6224 additional_params
["OSM"] = get_osm_params(
6225 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6227 cloud_init_list
.append(
6228 self
._parse
_cloud
_init
(
6235 vca_scaling_info
.append(
6237 "osm_vdu_id": vdu_delta
["id"],
6238 "member-vnf-index": vnf_index
,
6240 "vdu_index": vdu_index
+ x
,
6243 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6244 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6245 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6246 kdu_name
= kdu_profile
["kdu-name"]
6247 resource_name
= kdu_profile
.get("resource-name", "")
6249 # Might have different kdus in the same delta
6250 # Should have list for each kdu
6251 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6252 scaling_info
["kdu-create"][kdu_name
] = []
6254 kdur
= get_kdur(db_vnfr
, kdu_name
)
6255 if kdur
.get("helm-chart"):
6256 k8s_cluster_type
= "helm-chart-v3"
6257 self
.logger
.debug("kdur: {}".format(kdur
))
6259 kdur
.get("helm-version")
6260 and kdur
.get("helm-version") == "v2"
6262 k8s_cluster_type
= "helm-chart"
6263 elif kdur
.get("juju-bundle"):
6264 k8s_cluster_type
= "juju-bundle"
6267 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6268 "juju-bundle. Maybe an old NBI version is running".format(
6269 db_vnfr
["member-vnf-index-ref"], kdu_name
6273 max_instance_count
= 10
6274 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6275 max_instance_count
= kdu_profile
.get(
6276 "max-number-of-instances", 10
6279 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6280 deployed_kdu
, _
= get_deployed_kdu(
6281 nsr_deployed
, kdu_name
, vnf_index
6283 if deployed_kdu
is None:
6285 "KDU '{}' for vnf '{}' not deployed".format(
6289 kdu_instance
= deployed_kdu
.get("kdu-instance")
6290 instance_num
= await self
.k8scluster_map
[
6296 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6297 kdu_model
=deployed_kdu
.get("kdu-model"),
6299 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6300 "number-of-instances", 1
6303 # Control if new count is over max and instance_num is less than max.
6304 # Then assign max instance number to kdu replica count
6305 if kdu_replica_count
> max_instance_count
> instance_num
:
6306 kdu_replica_count
= max_instance_count
6307 if kdu_replica_count
> max_instance_count
:
6309 "reached the limit of {} (max-instance-count) "
6310 "scaling-out operations for the "
6311 "scaling-group-descriptor '{}'".format(
6312 instance_num
, scaling_group
6316 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6317 vca_scaling_info
.append(
6319 "osm_kdu_id": kdu_name
,
6320 "member-vnf-index": vnf_index
,
6322 "kdu_index": instance_num
+ x
- 1,
6325 scaling_info
["kdu-create"][kdu_name
].append(
6327 "member-vnf-index": vnf_index
,
6329 "k8s-cluster-type": k8s_cluster_type
,
6330 "resource-name": resource_name
,
6331 "scale": kdu_replica_count
,
6334 elif scaling_type
== "SCALE_IN":
6335 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6337 scaling_info
["scaling_direction"] = "IN"
6338 scaling_info
["vdu-delete"] = {}
6339 scaling_info
["kdu-delete"] = {}
6341 for delta
in deltas
:
6342 for vdu_delta
in delta
.get("vdu-delta", {}):
6343 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6344 min_instance_count
= 0
6345 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6346 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6347 min_instance_count
= vdu_profile
["min-number-of-instances"]
6349 default_instance_num
= get_number_of_instances(
6350 db_vnfd
, vdu_delta
["id"]
6352 instance_num
= vdu_delta
.get("number-of-instances", 1)
6353 nb_scale_op
-= instance_num
6355 new_instance_count
= nb_scale_op
+ default_instance_num
6357 if new_instance_count
< min_instance_count
< vdu_count
:
6358 instances_number
= min_instance_count
- new_instance_count
6360 instances_number
= instance_num
6362 if new_instance_count
< min_instance_count
:
6364 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6365 "scaling-group-descriptor '{}'".format(
6366 nb_scale_op
, scaling_group
6369 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6370 vca_scaling_info
.append(
6372 "osm_vdu_id": vdu_delta
["id"],
6373 "member-vnf-index": vnf_index
,
6375 "vdu_index": vdu_index
- 1 - x
,
6378 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6379 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6380 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6381 kdu_name
= kdu_profile
["kdu-name"]
6382 resource_name
= kdu_profile
.get("resource-name", "")
6384 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6385 scaling_info
["kdu-delete"][kdu_name
] = []
6387 kdur
= get_kdur(db_vnfr
, kdu_name
)
6388 if kdur
.get("helm-chart"):
6389 k8s_cluster_type
= "helm-chart-v3"
6390 self
.logger
.debug("kdur: {}".format(kdur
))
6392 kdur
.get("helm-version")
6393 and kdur
.get("helm-version") == "v2"
6395 k8s_cluster_type
= "helm-chart"
6396 elif kdur
.get("juju-bundle"):
6397 k8s_cluster_type
= "juju-bundle"
6400 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6401 "juju-bundle. Maybe an old NBI version is running".format(
6402 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6406 min_instance_count
= 0
6407 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6408 min_instance_count
= kdu_profile
["min-number-of-instances"]
6410 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6411 deployed_kdu
, _
= get_deployed_kdu(
6412 nsr_deployed
, kdu_name
, vnf_index
6414 if deployed_kdu
is None:
6416 "KDU '{}' for vnf '{}' not deployed".format(
6420 kdu_instance
= deployed_kdu
.get("kdu-instance")
6421 instance_num
= await self
.k8scluster_map
[
6427 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6428 kdu_model
=deployed_kdu
.get("kdu-model"),
6430 kdu_replica_count
= instance_num
- kdu_delta
.get(
6431 "number-of-instances", 1
6434 if kdu_replica_count
< min_instance_count
< instance_num
:
6435 kdu_replica_count
= min_instance_count
6436 if kdu_replica_count
< min_instance_count
:
6438 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6439 "scaling-group-descriptor '{}'".format(
6440 instance_num
, scaling_group
6444 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6445 vca_scaling_info
.append(
6447 "osm_kdu_id": kdu_name
,
6448 "member-vnf-index": vnf_index
,
6450 "kdu_index": instance_num
- x
- 1,
6453 scaling_info
["kdu-delete"][kdu_name
].append(
6455 "member-vnf-index": vnf_index
,
6457 "k8s-cluster-type": k8s_cluster_type
,
6458 "resource-name": resource_name
,
6459 "scale": kdu_replica_count
,
6463 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6464 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6465 if scaling_info
["scaling_direction"] == "IN":
6466 for vdur
in reversed(db_vnfr
["vdur"]):
6467 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6468 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6469 scaling_info
["vdu"].append(
6471 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6472 "vdu_id": vdur
["vdu-id-ref"],
6476 for interface
in vdur
["interfaces"]:
6477 scaling_info
["vdu"][-1]["interface"].append(
6479 "name": interface
["name"],
6480 "ip_address": interface
["ip-address"],
6481 "mac_address": interface
.get("mac-address"),
6484 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6487 step
= "Executing pre-scale vnf-config-primitive"
6488 if scaling_descriptor
.get("scaling-config-action"):
6489 for scaling_config_action
in scaling_descriptor
[
6490 "scaling-config-action"
6493 scaling_config_action
.get("trigger") == "pre-scale-in"
6494 and scaling_type
== "SCALE_IN"
6496 scaling_config_action
.get("trigger") == "pre-scale-out"
6497 and scaling_type
== "SCALE_OUT"
6499 vnf_config_primitive
= scaling_config_action
[
6500 "vnf-config-primitive-name-ref"
6502 step
= db_nslcmop_update
[
6504 ] = "executing pre-scale scaling-config-action '{}'".format(
6505 vnf_config_primitive
6508 # look for primitive
6509 for config_primitive
in (
6510 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6511 ).get("config-primitive", ()):
6512 if config_primitive
["name"] == vnf_config_primitive
:
6516 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6517 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6518 "primitive".format(scaling_group
, vnf_config_primitive
)
6521 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6522 if db_vnfr
.get("additionalParamsForVnf"):
6523 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6525 scale_process
= "VCA"
6526 db_nsr_update
["config-status"] = "configuring pre-scaling"
6527 primitive_params
= self
._map
_primitive
_params
(
6528 config_primitive
, {}, vnfr_params
6531 # Pre-scale retry check: Check if this sub-operation has been executed before
6532 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6535 vnf_config_primitive
,
6539 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6540 # Skip sub-operation
6541 result
= "COMPLETED"
6542 result_detail
= "Done"
6545 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6546 vnf_config_primitive
, result
, result_detail
6550 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6551 # New sub-operation: Get index of this sub-operation
6553 len(db_nslcmop
.get("_admin", {}).get("operations"))
6558 + "vnf_config_primitive={} New sub-operation".format(
6559 vnf_config_primitive
6563 # retry: Get registered params for this existing sub-operation
6564 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6567 vnf_index
= op
.get("member_vnf_index")
6568 vnf_config_primitive
= op
.get("primitive")
6569 primitive_params
= op
.get("primitive_params")
6572 + "vnf_config_primitive={} Sub-operation retry".format(
6573 vnf_config_primitive
6576 # Execute the primitive, either with new (first-time) or registered (reintent) args
6577 ee_descriptor_id
= config_primitive
.get(
6578 "execution-environment-ref"
6580 primitive_name
= config_primitive
.get(
6581 "execution-environment-primitive", vnf_config_primitive
6583 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6584 nsr_deployed
["VCA"],
6585 member_vnf_index
=vnf_index
,
6587 vdu_count_index
=None,
6588 ee_descriptor_id
=ee_descriptor_id
,
6590 result
, result_detail
= await self
._ns
_execute
_primitive
(
6599 + "vnf_config_primitive={} Done with result {} {}".format(
6600 vnf_config_primitive
, result
, result_detail
6603 # Update operationState = COMPLETED | FAILED
6604 self
._update
_suboperation
_status
(
6605 db_nslcmop
, op_index
, result
, result_detail
6608 if result
== "FAILED":
6609 raise LcmException(result_detail
)
6610 db_nsr_update
["config-status"] = old_config_status
6611 scale_process
= None
6615 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6618 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6621 # SCALE-IN VCA - BEGIN
6622 if vca_scaling_info
:
6623 step
= db_nslcmop_update
[
6625 ] = "Deleting the execution environments"
6626 scale_process
= "VCA"
6627 for vca_info
in vca_scaling_info
:
6628 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6629 member_vnf_index
= str(vca_info
["member-vnf-index"])
6631 logging_text
+ "vdu info: {}".format(vca_info
)
6633 if vca_info
.get("osm_vdu_id"):
6634 vdu_id
= vca_info
["osm_vdu_id"]
6635 vdu_index
= int(vca_info
["vdu_index"])
6638 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6639 member_vnf_index
, vdu_id
, vdu_index
6641 stage
[2] = step
= "Scaling in VCA"
6642 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6643 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6644 config_update
= db_nsr
["configurationStatus"]
6645 for vca_index
, vca
in enumerate(vca_update
):
6647 (vca
or vca
.get("ee_id"))
6648 and vca
["member-vnf-index"] == member_vnf_index
6649 and vca
["vdu_count_index"] == vdu_index
6651 if vca
.get("vdu_id"):
6652 config_descriptor
= get_configuration(
6653 db_vnfd
, vca
.get("vdu_id")
6655 elif vca
.get("kdu_name"):
6656 config_descriptor
= get_configuration(
6657 db_vnfd
, vca
.get("kdu_name")
6660 config_descriptor
= get_configuration(
6661 db_vnfd
, db_vnfd
["id"]
6663 operation_params
= (
6664 db_nslcmop
.get("operationParams") or {}
6666 exec_terminate_primitives
= not operation_params
.get(
6667 "skip_terminate_primitives"
6668 ) and vca
.get("needed_terminate")
6669 task
= asyncio
.ensure_future(
6678 exec_primitives
=exec_terminate_primitives
,
6682 timeout
=self
.timeout_charm_delete
,
6685 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6688 del vca_update
[vca_index
]
6689 del config_update
[vca_index
]
6690 # wait for pending tasks of terminate primitives
6694 + "Waiting for tasks {}".format(
6695 list(tasks_dict_info
.keys())
6698 error_list
= await self
._wait
_for
_tasks
(
6702 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6707 tasks_dict_info
.clear()
6709 raise LcmException("; ".join(error_list
))
6711 db_vca_and_config_update
= {
6712 "_admin.deployed.VCA": vca_update
,
6713 "configurationStatus": config_update
,
6716 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6718 scale_process
= None
6719 # SCALE-IN VCA - END
6722 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6723 scale_process
= "RO"
6724 if self
.ro_config
.get("ng"):
6725 await self
._scale
_ng
_ro
(
6726 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6728 scaling_info
.pop("vdu-create", None)
6729 scaling_info
.pop("vdu-delete", None)
6731 scale_process
= None
6735 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6736 scale_process
= "KDU"
6737 await self
._scale
_kdu
(
6738 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6740 scaling_info
.pop("kdu-create", None)
6741 scaling_info
.pop("kdu-delete", None)
6743 scale_process
= None
6747 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6749 # SCALE-UP VCA - BEGIN
6750 if vca_scaling_info
:
6751 step
= db_nslcmop_update
[
6753 ] = "Creating new execution environments"
6754 scale_process
= "VCA"
6755 for vca_info
in vca_scaling_info
:
6756 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6757 member_vnf_index
= str(vca_info
["member-vnf-index"])
6759 logging_text
+ "vdu info: {}".format(vca_info
)
6761 vnfd_id
= db_vnfr
["vnfd-ref"]
6762 if vca_info
.get("osm_vdu_id"):
6763 vdu_index
= int(vca_info
["vdu_index"])
6764 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6765 if db_vnfr
.get("additionalParamsForVnf"):
6766 deploy_params
.update(
6768 db_vnfr
["additionalParamsForVnf"].copy()
6771 descriptor_config
= get_configuration(
6772 db_vnfd
, db_vnfd
["id"]
6774 if descriptor_config
:
6779 logging_text
=logging_text
6780 + "member_vnf_index={} ".format(member_vnf_index
),
6783 nslcmop_id
=nslcmop_id
,
6789 member_vnf_index
=member_vnf_index
,
6790 vdu_index
=vdu_index
,
6792 deploy_params
=deploy_params
,
6793 descriptor_config
=descriptor_config
,
6794 base_folder
=base_folder
,
6795 task_instantiation_info
=tasks_dict_info
,
6798 vdu_id
= vca_info
["osm_vdu_id"]
6799 vdur
= find_in_list(
6800 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6802 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6803 if vdur
.get("additionalParams"):
6804 deploy_params_vdu
= parse_yaml_strings(
6805 vdur
["additionalParams"]
6808 deploy_params_vdu
= deploy_params
6809 deploy_params_vdu
["OSM"] = get_osm_params(
6810 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6812 if descriptor_config
:
6817 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6818 member_vnf_index
, vdu_id
, vdu_index
6820 stage
[2] = step
= "Scaling out VCA"
6821 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6823 logging_text
=logging_text
6824 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6825 member_vnf_index
, vdu_id
, vdu_index
6829 nslcmop_id
=nslcmop_id
,
6835 member_vnf_index
=member_vnf_index
,
6836 vdu_index
=vdu_index
,
6838 deploy_params
=deploy_params_vdu
,
6839 descriptor_config
=descriptor_config
,
6840 base_folder
=base_folder
,
6841 task_instantiation_info
=tasks_dict_info
,
6844 # SCALE-UP VCA - END
6845 scale_process
= None
6848 # execute primitive service POST-SCALING
6849 step
= "Executing post-scale vnf-config-primitive"
6850 if scaling_descriptor
.get("scaling-config-action"):
6851 for scaling_config_action
in scaling_descriptor
[
6852 "scaling-config-action"
6855 scaling_config_action
.get("trigger") == "post-scale-in"
6856 and scaling_type
== "SCALE_IN"
6858 scaling_config_action
.get("trigger") == "post-scale-out"
6859 and scaling_type
== "SCALE_OUT"
6861 vnf_config_primitive
= scaling_config_action
[
6862 "vnf-config-primitive-name-ref"
6864 step
= db_nslcmop_update
[
6866 ] = "executing post-scale scaling-config-action '{}'".format(
6867 vnf_config_primitive
6870 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6871 if db_vnfr
.get("additionalParamsForVnf"):
6872 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6874 # look for primitive
6875 for config_primitive
in (
6876 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6877 ).get("config-primitive", ()):
6878 if config_primitive
["name"] == vnf_config_primitive
:
6882 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6883 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6884 "config-primitive".format(
6885 scaling_group
, vnf_config_primitive
6888 scale_process
= "VCA"
6889 db_nsr_update
["config-status"] = "configuring post-scaling"
6890 primitive_params
= self
._map
_primitive
_params
(
6891 config_primitive
, {}, vnfr_params
6894 # Post-scale retry check: Check if this sub-operation has been executed before
6895 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6898 vnf_config_primitive
,
6902 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6903 # Skip sub-operation
6904 result
= "COMPLETED"
6905 result_detail
= "Done"
6908 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6909 vnf_config_primitive
, result
, result_detail
6913 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6914 # New sub-operation: Get index of this sub-operation
6916 len(db_nslcmop
.get("_admin", {}).get("operations"))
6921 + "vnf_config_primitive={} New sub-operation".format(
6922 vnf_config_primitive
6926 # retry: Get registered params for this existing sub-operation
6927 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6930 vnf_index
= op
.get("member_vnf_index")
6931 vnf_config_primitive
= op
.get("primitive")
6932 primitive_params
= op
.get("primitive_params")
6935 + "vnf_config_primitive={} Sub-operation retry".format(
6936 vnf_config_primitive
6939 # Execute the primitive, either with new (first-time) or registered (reintent) args
6940 ee_descriptor_id
= config_primitive
.get(
6941 "execution-environment-ref"
6943 primitive_name
= config_primitive
.get(
6944 "execution-environment-primitive", vnf_config_primitive
6946 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6947 nsr_deployed
["VCA"],
6948 member_vnf_index
=vnf_index
,
6950 vdu_count_index
=None,
6951 ee_descriptor_id
=ee_descriptor_id
,
6953 result
, result_detail
= await self
._ns
_execute
_primitive
(
6962 + "vnf_config_primitive={} Done with result {} {}".format(
6963 vnf_config_primitive
, result
, result_detail
6966 # Update operationState = COMPLETED | FAILED
6967 self
._update
_suboperation
_status
(
6968 db_nslcmop
, op_index
, result
, result_detail
6971 if result
== "FAILED":
6972 raise LcmException(result_detail
)
6973 db_nsr_update
["config-status"] = old_config_status
6974 scale_process
= None
6979 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6980 db_nsr_update
["operational-status"] = (
6982 if old_operational_status
== "failed"
6983 else old_operational_status
6985 db_nsr_update
["config-status"] = old_config_status
6988 ROclient
.ROClientException
,
6993 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6995 except asyncio
.CancelledError
:
6997 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6999 exc
= "Operation was cancelled"
7000 except Exception as e
:
7001 exc
= traceback
.format_exc()
7002 self
.logger
.critical(
7003 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7007 self
._write
_ns
_status
(
7010 current_operation
="IDLE",
7011 current_operation_id
=None,
7014 stage
[1] = "Waiting for instantiate pending tasks."
7015 self
.logger
.debug(logging_text
+ stage
[1])
7016 exc
= await self
._wait
_for
_tasks
(
7019 self
.timeout_ns_deploy
,
7027 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7028 nslcmop_operation_state
= "FAILED"
7030 db_nsr_update
["operational-status"] = old_operational_status
7031 db_nsr_update
["config-status"] = old_config_status
7032 db_nsr_update
["detailed-status"] = ""
7034 if "VCA" in scale_process
:
7035 db_nsr_update
["config-status"] = "failed"
7036 if "RO" in scale_process
:
7037 db_nsr_update
["operational-status"] = "failed"
7040 ] = "FAILED scaling nslcmop={} {}: {}".format(
7041 nslcmop_id
, step
, exc
7044 error_description_nslcmop
= None
7045 nslcmop_operation_state
= "COMPLETED"
7046 db_nslcmop_update
["detailed-status"] = "Done"
7048 self
._write
_op
_status
(
7051 error_message
=error_description_nslcmop
,
7052 operation_state
=nslcmop_operation_state
,
7053 other_update
=db_nslcmop_update
,
7056 self
._write
_ns
_status
(
7059 current_operation
="IDLE",
7060 current_operation_id
=None,
7061 other_update
=db_nsr_update
,
7064 if nslcmop_operation_state
:
7068 "nslcmop_id": nslcmop_id
,
7069 "operationState": nslcmop_operation_state
,
7071 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7072 except Exception as e
:
7074 logging_text
+ "kafka_write notification Exception {}".format(e
)
7076 self
.logger
.debug(logging_text
+ "Exit")
7077 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7079 async def _scale_kdu(
7080 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7082 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7083 for kdu_name
in _scaling_info
:
7084 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7085 deployed_kdu
, index
= get_deployed_kdu(
7086 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7088 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7089 kdu_instance
= deployed_kdu
["kdu-instance"]
7090 kdu_model
= deployed_kdu
.get("kdu-model")
7091 scale
= int(kdu_scaling_info
["scale"])
7092 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7095 "collection": "nsrs",
7096 "filter": {"_id": nsr_id
},
7097 "path": "_admin.deployed.K8s.{}".format(index
),
7100 step
= "scaling application {}".format(
7101 kdu_scaling_info
["resource-name"]
7103 self
.logger
.debug(logging_text
+ step
)
7105 if kdu_scaling_info
["type"] == "delete":
7106 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7109 and kdu_config
.get("terminate-config-primitive")
7110 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7112 terminate_config_primitive_list
= kdu_config
.get(
7113 "terminate-config-primitive"
7115 terminate_config_primitive_list
.sort(
7116 key
=lambda val
: int(val
["seq"])
7120 terminate_config_primitive
7121 ) in terminate_config_primitive_list
:
7122 primitive_params_
= self
._map
_primitive
_params
(
7123 terminate_config_primitive
, {}, {}
7125 step
= "execute terminate config primitive"
7126 self
.logger
.debug(logging_text
+ step
)
7127 await asyncio
.wait_for(
7128 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7129 cluster_uuid
=cluster_uuid
,
7130 kdu_instance
=kdu_instance
,
7131 primitive_name
=terminate_config_primitive
["name"],
7132 params
=primitive_params_
,
7139 await asyncio
.wait_for(
7140 self
.k8scluster_map
[k8s_cluster_type
].scale(
7143 kdu_scaling_info
["resource-name"],
7145 cluster_uuid
=cluster_uuid
,
7146 kdu_model
=kdu_model
,
7150 timeout
=self
.timeout_vca_on_error
,
7153 if kdu_scaling_info
["type"] == "create":
7154 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7157 and kdu_config
.get("initial-config-primitive")
7158 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7160 initial_config_primitive_list
= kdu_config
.get(
7161 "initial-config-primitive"
7163 initial_config_primitive_list
.sort(
7164 key
=lambda val
: int(val
["seq"])
7167 for initial_config_primitive
in initial_config_primitive_list
:
7168 primitive_params_
= self
._map
_primitive
_params
(
7169 initial_config_primitive
, {}, {}
7171 step
= "execute initial config primitive"
7172 self
.logger
.debug(logging_text
+ step
)
7173 await asyncio
.wait_for(
7174 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7175 cluster_uuid
=cluster_uuid
,
7176 kdu_instance
=kdu_instance
,
7177 primitive_name
=initial_config_primitive
["name"],
7178 params
=primitive_params_
,
7185 async def _scale_ng_ro(
7186 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7188 nsr_id
= db_nslcmop
["nsInstanceId"]
7189 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7192 # read from db: vnfd's for every vnf
7195 # for each vnf in ns, read vnfd
7196 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7197 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7198 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7199 # if we haven't this vnfd, read it from db
7200 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7202 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7203 db_vnfds
.append(vnfd
)
7204 n2vc_key
= self
.n2vc
.get_public_key()
7205 n2vc_key_list
= [n2vc_key
]
7208 vdu_scaling_info
.get("vdu-create"),
7209 vdu_scaling_info
.get("vdu-delete"),
7212 # db_vnfr has been updated, update db_vnfrs to use it
7213 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7214 await self
._instantiate
_ng
_ro
(
7224 start_deploy
=time(),
7225 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7227 if vdu_scaling_info
.get("vdu-delete"):
7229 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7232 async def extract_prometheus_scrape_jobs(
7233 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7235 # look if exist a file called 'prometheus*.j2' and
7236 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7240 for f
in artifact_content
7241 if f
.startswith("prometheus") and f
.endswith(".j2")
7247 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7251 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7252 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7254 vnfr_id
= vnfr_id
.replace("-", "")
7256 "JOB_NAME": vnfr_id
,
7257 "TARGET_IP": target_ip
,
7258 "EXPORTER_POD_IP": host_name
,
7259 "EXPORTER_POD_PORT": host_port
,
7261 job_list
= parse_job(job_data
, variables
)
7262 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7263 for job
in job_list
:
7265 not isinstance(job
.get("job_name"), str)
7266 or vnfr_id
not in job
["job_name"]
7268 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7269 job
["nsr_id"] = nsr_id
7270 job
["vnfr_id"] = vnfr_id
7273 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7274 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7275 self
.logger
.info(logging_text
+ "Enter")
7276 stage
= ["Preparing the environment", ""]
7277 # database nsrs record
7281 # in case of error, indicates what part of scale was failed to put nsr at error status
7282 start_deploy
= time()
7284 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7285 vim_account_id
= db_vnfr
.get("vim-account-id")
7286 vim_info_key
= "vim:" + vim_account_id
7287 vdu_id
= additional_param
["vdu_id"]
7288 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7289 vdur
= find_in_list(
7290 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7293 vdu_vim_name
= vdur
["name"]
7294 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7295 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7297 raise LcmException("Target vdu is not found")
7298 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7299 # wait for any previous tasks in process
7300 stage
[1] = "Waiting for previous operations to terminate"
7301 self
.logger
.info(stage
[1])
7302 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7304 stage
[1] = "Reading from database."
7305 self
.logger
.info(stage
[1])
7306 self
._write
_ns
_status
(
7309 current_operation
=operation_type
.upper(),
7310 current_operation_id
=nslcmop_id
7312 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7315 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7316 db_nsr_update
["operational-status"] = operation_type
7317 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7321 "vim_vm_id": vim_vm_id
,
7323 "vdu_index": additional_param
["count-index"],
7324 "vdu_id": vdur
["id"],
7325 "target_vim": target_vim
,
7326 "vim_account_id": vim_account_id
7329 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7330 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7331 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7332 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7333 self
.logger
.info("response from RO: {}".format(result_dict
))
7334 action_id
= result_dict
["action_id"]
7335 await self
._wait
_ng
_ro
(
7336 nsr_id
, action_id
, nslcmop_id
, start_deploy
,
7337 self
.timeout_operate
, None, "start_stop_rebuild",
7339 return "COMPLETED", "Done"
7340 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7341 self
.logger
.error("Exit Exception {}".format(e
))
7343 except asyncio
.CancelledError
:
7344 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7345 exc
= "Operation was cancelled"
7346 except Exception as e
:
7347 exc
= traceback
.format_exc()
7348 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7349 return "FAILED", "Error in operate VNF {}".format(exc
)
7351 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7353 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7355 :param: vim_account_id: VIM Account ID
7357 :return: (cloud_name, cloud_credential)
7359 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7360 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7362 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7364 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7366 :param: vim_account_id: VIM Account ID
7368 :return: (cloud_name, cloud_credential)
7370 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7371 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7373 async def migrate(self
, nsr_id
, nslcmop_id
):
7375 Migrate VNFs and VDUs instances in a NS
7377 :param: nsr_id: NS Instance ID
7378 :param: nslcmop_id: nslcmop ID of migrate
7381 # Try to lock HA task here
7382 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7383 if not task_is_locked_by_me
:
7385 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7386 self
.logger
.debug(logging_text
+ "Enter")
7387 # get all needed from database
7389 db_nslcmop_update
= {}
7390 nslcmop_operation_state
= None
7394 # in case of error, indicates what part of scale was failed to put nsr at error status
7395 start_deploy
= time()
7398 # wait for any previous tasks in process
7399 step
= "Waiting for previous operations to terminate"
7400 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7402 self
._write
_ns
_status
(
7405 current_operation
="MIGRATING",
7406 current_operation_id
=nslcmop_id
,
7408 step
= "Getting nslcmop from database"
7410 step
+ " after having waited for previous tasks to be completed"
7412 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7413 migrate_params
= db_nslcmop
.get("operationParams")
7416 target
.update(migrate_params
)
7417 desc
= await self
.RO
.migrate(nsr_id
, target
)
7418 self
.logger
.debug("RO return > {}".format(desc
))
7419 action_id
= desc
["action_id"]
7420 await self
._wait
_ng
_ro
(
7421 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7424 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7425 self
.logger
.error("Exit Exception {}".format(e
))
7427 except asyncio
.CancelledError
:
7428 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7429 exc
= "Operation was cancelled"
7430 except Exception as e
:
7431 exc
= traceback
.format_exc()
7432 self
.logger
.critical(
7433 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7436 self
._write
_ns
_status
(
7439 current_operation
="IDLE",
7440 current_operation_id
=None,
7443 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7444 nslcmop_operation_state
= "FAILED"
7446 nslcmop_operation_state
= "COMPLETED"
7447 db_nslcmop_update
["detailed-status"] = "Done"
7448 db_nsr_update
["detailed-status"] = "Done"
7450 self
._write
_op
_status
(
7454 operation_state
=nslcmop_operation_state
,
7455 other_update
=db_nslcmop_update
,
7457 if nslcmop_operation_state
:
7461 "nslcmop_id": nslcmop_id
,
7462 "operationState": nslcmop_operation_state
,
7464 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7465 except Exception as e
:
7467 logging_text
+ "kafka_write notification Exception {}".format(e
)
7469 self
.logger
.debug(logging_text
+ "Exit")
7470 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7473 async def heal(self
, nsr_id
, nslcmop_id
):
7477 :param nsr_id: ns instance to heal
7478 :param nslcmop_id: operation to run
7482 # Try to lock HA task here
7483 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7484 if not task_is_locked_by_me
:
7487 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7488 stage
= ["", "", ""]
7489 tasks_dict_info
= {}
7490 # ^ stage, step, VIM progress
7491 self
.logger
.debug(logging_text
+ "Enter")
7492 # get all needed from database
7494 db_nslcmop_update
= {}
7496 db_vnfrs
= {} # vnf's info indexed by _id
7498 old_operational_status
= ""
7499 old_config_status
= ""
7502 # wait for any previous tasks in process
7503 step
= "Waiting for previous operations to terminate"
7504 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7505 self
._write
_ns
_status
(
7508 current_operation
="HEALING",
7509 current_operation_id
=nslcmop_id
,
7512 step
= "Getting nslcmop from database"
7514 step
+ " after having waited for previous tasks to be completed"
7516 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7518 step
= "Getting nsr from database"
7519 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7520 old_operational_status
= db_nsr
["operational-status"]
7521 old_config_status
= db_nsr
["config-status"]
7524 "_admin.deployed.RO.operational-status": "healing",
7526 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7528 step
= "Sending heal order to VIM"
7529 #task_ro = asyncio.ensure_future(
7531 # logging_text=logging_text,
7533 # db_nslcmop=db_nslcmop,
7537 #self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
7538 #tasks_dict_info[task_ro] = "Healing at VIM"
7540 logging_text
=logging_text
,
7542 db_nslcmop
=db_nslcmop
,
7547 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7548 self
.logger
.debug(logging_text
+ stage
[1])
7549 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7550 self
.fs
.sync(db_nsr
["nsd-id"])
7552 # read from db: vnfr's of this ns
7553 step
= "Getting vnfrs from db"
7554 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7555 for vnfr
in db_vnfrs_list
:
7556 db_vnfrs
[vnfr
["_id"]] = vnfr
7557 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7559 # Check for each target VNF
7560 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7561 for target_vnf
in target_list
:
7562 # Find this VNF in the list from DB
7563 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7565 db_vnfr
= db_vnfrs
[vnfr_id
]
7566 vnfd_id
= db_vnfr
.get("vnfd-id")
7567 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7568 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7569 base_folder
= vnfd
["_admin"]["storage"]
7574 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7575 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7577 # Check each target VDU and deploy N2VC
7578 target_vdu_list
= target_vnf
.get("additionalParams", {}).get("vdu", [])
7579 if not target_vdu_list
:
7580 # Codigo nuevo para crear diccionario
7581 target_vdu_list
= []
7582 for existing_vdu
in db_vnfr
.get("vdur"):
7583 vdu_name
= existing_vdu
.get("vdu-name", None)
7584 vdu_index
= existing_vdu
.get("count-index", 0)
7585 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get("run-day1", False)
7586 vdu_to_be_healed
= {"vdu-id": vdu_name
, "count-index": vdu_index
, "run-day1": vdu_run_day1
}
7587 target_vdu_list
.append(vdu_to_be_healed
)
7588 for target_vdu
in target_vdu_list
:
7589 deploy_params_vdu
= target_vdu
7590 # Set run-day1 vnf level value if not vdu level value exists
7591 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7592 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7593 vdu_name
= target_vdu
.get("vdu-id", None)
7594 # TODO: Get vdu_id from vdud.
7596 # For multi instance VDU count-index is mandatory
7597 # For single session VDU count-indes is 0
7598 vdu_index
= target_vdu
.get("count-index",0)
7600 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7601 stage
[1] = "Deploying Execution Environments."
7602 self
.logger
.debug(logging_text
+ stage
[1])
7604 # VNF Level charm. Normal case when proxy charms.
7605 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7606 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7607 if descriptor_config
:
7608 # Continue if healed machine is management machine
7609 vnf_ip_address
= db_vnfr
.get("ip-address")
7610 target_instance
= None
7611 for instance
in db_vnfr
.get("vdur", None):
7612 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7613 target_instance
= instance
7615 if vnf_ip_address
== target_instance
.get("ip-address"):
7617 logging_text
=logging_text
7618 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7619 member_vnf_index
, vdu_name
, vdu_index
7623 nslcmop_id
=nslcmop_id
,
7629 member_vnf_index
=member_vnf_index
,
7632 deploy_params
=deploy_params_vdu
,
7633 descriptor_config
=descriptor_config
,
7634 base_folder
=base_folder
,
7635 task_instantiation_info
=tasks_dict_info
,
7639 # VDU Level charm. Normal case with native charms.
7640 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7641 if descriptor_config
:
7643 logging_text
=logging_text
7644 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7645 member_vnf_index
, vdu_name
, vdu_index
7649 nslcmop_id
=nslcmop_id
,
7655 member_vnf_index
=member_vnf_index
,
7656 vdu_index
=vdu_index
,
7658 deploy_params
=deploy_params_vdu
,
7659 descriptor_config
=descriptor_config
,
7660 base_folder
=base_folder
,
7661 task_instantiation_info
=tasks_dict_info
,
7666 ROclient
.ROClientException
,
7671 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7673 except asyncio
.CancelledError
:
7675 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7677 exc
= "Operation was cancelled"
7678 except Exception as e
:
7679 exc
= traceback
.format_exc()
7680 self
.logger
.critical(
7681 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7686 stage
[1] = "Waiting for healing pending tasks."
7687 self
.logger
.debug(logging_text
+ stage
[1])
7688 exc
= await self
._wait
_for
_tasks
(
7691 self
.timeout_ns_deploy
,
7699 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7700 nslcmop_operation_state
= "FAILED"
7702 db_nsr_update
["operational-status"] = old_operational_status
7703 db_nsr_update
["config-status"] = old_config_status
7706 ] = "FAILED healing nslcmop={} {}: {}".format(
7707 nslcmop_id
, step
, exc
7709 for task
, task_name
in tasks_dict_info
.items():
7710 if not task
.done() or task
.cancelled() or task
.exception():
7711 if task_name
.startswith(self
.task_name_deploy_vca
):
7712 # A N2VC task is pending
7713 db_nsr_update
["config-status"] = "failed"
7715 # RO task is pending
7716 db_nsr_update
["operational-status"] = "failed"
7718 error_description_nslcmop
= None
7719 nslcmop_operation_state
= "COMPLETED"
7720 db_nslcmop_update
["detailed-status"] = "Done"
7721 db_nsr_update
["detailed-status"] = "Done"
7722 db_nsr_update
["operational-status"] = "running"
7723 db_nsr_update
["config-status"] = "configured"
7725 self
._write
_op
_status
(
7728 error_message
=error_description_nslcmop
,
7729 operation_state
=nslcmop_operation_state
,
7730 other_update
=db_nslcmop_update
,
7733 self
._write
_ns
_status
(
7736 current_operation
="IDLE",
7737 current_operation_id
=None,
7738 other_update
=db_nsr_update
,
7741 if nslcmop_operation_state
:
7745 "nslcmop_id": nslcmop_id
,
7746 "operationState": nslcmop_operation_state
,
7748 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7749 except Exception as e
:
7751 logging_text
+ "kafka_write notification Exception {}".format(e
)
7753 self
.logger
.debug(logging_text
+ "Exit")
7754 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7765 :param logging_text: preffix text to use at logging
7766 :param nsr_id: nsr identity
7767 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7768 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7769 :return: None or exception
7771 def get_vim_account(vim_account_id
):
7773 if vim_account_id
in db_vims
:
7774 return db_vims
[vim_account_id
]
7775 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7776 db_vims
[vim_account_id
] = db_vim
7781 ns_params
= db_nslcmop
.get("operationParams")
7782 if ns_params
and ns_params
.get("timeout_ns_heal"):
7783 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7785 timeout_ns_heal
= self
.timeout
.get(
7786 "ns_heal", self
.timeout_ns_heal
7791 nslcmop_id
= db_nslcmop
["_id"]
7793 "action_id": nslcmop_id
,
7795 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7796 target
.update(db_nslcmop
.get("operationParams", {}))
7798 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7799 desc
= await self
.RO
.recreate(nsr_id
, target
)
7800 self
.logger
.debug("RO return > {}".format(desc
))
7801 action_id
= desc
["action_id"]
7802 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7803 await self
._wait
_ng
_ro
(
7804 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7810 "_admin.deployed.RO.operational-status": "running",
7811 "detailed-status": " ".join(stage
),
7813 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7814 self
._write
_op
_status
(nslcmop_id
, stage
)
7816 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7819 except Exception as e
:
7820 stage
[2] = "ERROR healing at VIM"
7821 #self.set_vnfr_at_error(db_vnfrs, str(e))
7823 "Error healing at VIM {}".format(e
),
7824 exc_info
=not isinstance(
7827 ROclient
.ROClientException
,
7853 task_instantiation_info
,
7856 # launch instantiate_N2VC in a asyncio task and register task object
7857 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7858 # if not found, create one entry and update database
7859 # fill db_nsr._admin.deployed.VCA.<index>
7862 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7864 if "execution-environment-list" in descriptor_config
:
7865 ee_list
= descriptor_config
.get("execution-environment-list", [])
7866 elif "juju" in descriptor_config
:
7867 ee_list
= [descriptor_config
] # ns charms
7868 else: # other types as script are not supported
7871 for ee_item
in ee_list
:
7874 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7875 ee_item
.get("juju"), ee_item
.get("helm-chart")
7878 ee_descriptor_id
= ee_item
.get("id")
7879 if ee_item
.get("juju"):
7880 vca_name
= ee_item
["juju"].get("charm")
7883 if ee_item
["juju"].get("charm") is not None
7886 if ee_item
["juju"].get("cloud") == "k8s":
7887 vca_type
= "k8s_proxy_charm"
7888 elif ee_item
["juju"].get("proxy") is False:
7889 vca_type
= "native_charm"
7890 elif ee_item
.get("helm-chart"):
7891 vca_name
= ee_item
["helm-chart"]
7892 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7895 vca_type
= "helm-v3"
7898 logging_text
+ "skipping non juju neither charm configuration"
7903 for vca_index
, vca_deployed
in enumerate(
7904 db_nsr
["_admin"]["deployed"]["VCA"]
7906 if not vca_deployed
:
7909 vca_deployed
.get("member-vnf-index") == member_vnf_index
7910 and vca_deployed
.get("vdu_id") == vdu_id
7911 and vca_deployed
.get("kdu_name") == kdu_name
7912 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7913 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7917 # not found, create one.
7919 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7922 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7924 target
+= "/kdu/{}".format(kdu_name
)
7926 "target_element": target
,
7927 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7928 "member-vnf-index": member_vnf_index
,
7930 "kdu_name": kdu_name
,
7931 "vdu_count_index": vdu_index
,
7932 "operational-status": "init", # TODO revise
7933 "detailed-status": "", # TODO revise
7934 "step": "initial-deploy", # TODO revise
7936 "vdu_name": vdu_name
,
7938 "ee_descriptor_id": ee_descriptor_id
,
7942 # create VCA and configurationStatus in db
7944 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7945 "configurationStatus.{}".format(vca_index
): dict(),
7947 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7949 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7951 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7952 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7953 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7956 task_n2vc
= asyncio
.ensure_future(
7958 logging_text
=logging_text
,
7959 vca_index
=vca_index
,
7965 vdu_index
=vdu_index
,
7966 deploy_params
=deploy_params
,
7967 config_descriptor
=descriptor_config
,
7968 base_folder
=base_folder
,
7969 nslcmop_id
=nslcmop_id
,
7973 ee_config_descriptor
=ee_item
,
7976 self
.lcm_tasks
.register(
7980 "instantiate_N2VC-{}".format(vca_index
),
7983 task_instantiation_info
[
7985 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7986 member_vnf_index
or "", vdu_id
or ""
7989 async def heal_N2VC(
8006 ee_config_descriptor
,
8008 nsr_id
= db_nsr
["_id"]
8009 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8010 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8011 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8012 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8014 "collection": "nsrs",
8015 "filter": {"_id": nsr_id
},
8016 "path": db_update_entry
,
8022 element_under_configuration
= nsr_id
8026 vnfr_id
= db_vnfr
["_id"]
8027 osm_config
["osm"]["vnf_id"] = vnfr_id
8029 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8031 if vca_type
== "native_charm":
8034 index_number
= vdu_index
or 0
8037 element_type
= "VNF"
8038 element_under_configuration
= vnfr_id
8039 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8041 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8042 element_type
= "VDU"
8043 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8044 osm_config
["osm"]["vdu_id"] = vdu_id
8046 namespace
+= ".{}".format(kdu_name
)
8047 element_type
= "KDU"
8048 element_under_configuration
= kdu_name
8049 osm_config
["osm"]["kdu_name"] = kdu_name
8052 if base_folder
["pkg-dir"]:
8053 artifact_path
= "{}/{}/{}/{}".format(
8054 base_folder
["folder"],
8055 base_folder
["pkg-dir"],
8058 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8063 artifact_path
= "{}/Scripts/{}/{}/".format(
8064 base_folder
["folder"],
8067 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8072 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8074 # get initial_config_primitive_list that applies to this element
8075 initial_config_primitive_list
= config_descriptor
.get(
8076 "initial-config-primitive"
8080 "Initial config primitive list > {}".format(
8081 initial_config_primitive_list
8085 # add config if not present for NS charm
8086 ee_descriptor_id
= ee_config_descriptor
.get("id")
8087 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8088 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8089 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8093 "Initial config primitive list #2 > {}".format(
8094 initial_config_primitive_list
8097 # n2vc_redesign STEP 3.1
8098 # find old ee_id if exists
8099 ee_id
= vca_deployed
.get("ee_id")
8101 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8102 # create or register execution environment in VCA. Only for native charms when healing
8103 if vca_type
== "native_charm":
8104 step
= "Waiting to VM being up and getting IP address"
8105 self
.logger
.debug(logging_text
+ step
)
8106 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8115 credentials
= {"hostname": rw_mgmt_ip
}
8117 username
= deep_get(
8118 config_descriptor
, ("config-access", "ssh-access", "default-user")
8120 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8121 # merged. Meanwhile let's get username from initial-config-primitive
8122 if not username
and initial_config_primitive_list
:
8123 for config_primitive
in initial_config_primitive_list
:
8124 for param
in config_primitive
.get("parameter", ()):
8125 if param
["name"] == "ssh-username":
8126 username
= param
["value"]
8130 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8131 "'config-access.ssh-access.default-user'"
8133 credentials
["username"] = username
8135 # n2vc_redesign STEP 3.2
8136 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8137 self
._write
_configuration
_status
(
8139 vca_index
=vca_index
,
8140 status
="REGISTERING",
8141 element_under_configuration
=element_under_configuration
,
8142 element_type
=element_type
,
8145 step
= "register execution environment {}".format(credentials
)
8146 self
.logger
.debug(logging_text
+ step
)
8147 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8148 credentials
=credentials
,
8149 namespace
=namespace
,
8154 # update ee_id en db
8156 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8158 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8160 # for compatibility with MON/POL modules, the need model and application name at database
8161 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8162 # Not sure if this need to be done when healing
8164 ee_id_parts = ee_id.split(".")
8165 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8166 if len(ee_id_parts) >= 2:
8167 model_name = ee_id_parts[0]
8168 application_name = ee_id_parts[1]
8169 db_nsr_update[db_update_entry + "model"] = model_name
8170 db_nsr_update[db_update_entry + "application"] = application_name
8173 # n2vc_redesign STEP 3.3
8174 # Install configuration software. Only for native charms.
8175 step
= "Install configuration Software"
8177 self
._write
_configuration
_status
(
8179 vca_index
=vca_index
,
8180 status
="INSTALLING SW",
8181 element_under_configuration
=element_under_configuration
,
8182 element_type
=element_type
,
8183 #other_update=db_nsr_update,
8187 # TODO check if already done
8188 self
.logger
.debug(logging_text
+ step
)
8190 if vca_type
== "native_charm":
8191 config_primitive
= next(
8192 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8195 if config_primitive
:
8196 config
= self
._map
_primitive
_params
(
8197 config_primitive
, {}, deploy_params
8199 await self
.vca_map
[vca_type
].install_configuration_sw(
8201 artifact_path
=artifact_path
,
8209 # write in db flag of configuration_sw already installed
8211 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8214 # Not sure if this need to be done when healing
8216 # add relations for this VCA (wait for other peers related with this VCA)
8217 await self._add_vca_relations(
8218 logging_text=logging_text,
8221 vca_index=vca_index,
8225 # if SSH access is required, then get execution environment SSH public
8226 # if native charm we have waited already to VM be UP
8227 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8230 # self.logger.debug("get ssh key block")
8232 config_descriptor
, ("config-access", "ssh-access", "required")
8234 # self.logger.debug("ssh key needed")
8235 # Needed to inject a ssh key
8238 ("config-access", "ssh-access", "default-user"),
8240 step
= "Install configuration Software, getting public ssh key"
8241 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8242 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8245 step
= "Insert public key into VM user={} ssh_key={}".format(
8249 # self.logger.debug("no need to get ssh key")
8250 step
= "Waiting to VM being up and getting IP address"
8251 self
.logger
.debug(logging_text
+ step
)
8253 # n2vc_redesign STEP 5.1
8254 # wait for RO (ip-address) Insert pub_key into VM
8255 # IMPORTANT: We need do wait for RO to complete healing operation.
8256 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8259 rw_mgmt_ip
= await self
.wait_kdu_up(
8260 logging_text
, nsr_id
, vnfr_id
, kdu_name
8263 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8273 rw_mgmt_ip
= None # This is for a NS configuration
8275 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8277 # store rw_mgmt_ip in deploy params for later replacement
8278 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8281 # get run-day1 operation parameter
8282 runDay1
= deploy_params
.get("run-day1",False)
8283 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8285 # n2vc_redesign STEP 6 Execute initial config primitive
8286 step
= "execute initial config primitive"
8288 # wait for dependent primitives execution (NS -> VNF -> VDU)
8289 if initial_config_primitive_list
:
8290 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8292 # stage, in function of element type: vdu, kdu, vnf or ns
8293 my_vca
= vca_deployed_list
[vca_index
]
8294 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8296 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8297 elif my_vca
.get("member-vnf-index"):
8299 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8302 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8304 self
._write
_configuration
_status
(
8305 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8308 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8310 check_if_terminated_needed
= True
8311 for initial_config_primitive
in initial_config_primitive_list
:
8312 # adding information on the vca_deployed if it is a NS execution environment
8313 if not vca_deployed
["member-vnf-index"]:
8314 deploy_params
["ns_config_info"] = json
.dumps(
8315 self
._get
_ns
_config
_info
(nsr_id
)
8317 # TODO check if already done
8318 primitive_params_
= self
._map
_primitive
_params
(
8319 initial_config_primitive
, {}, deploy_params
8322 step
= "execute primitive '{}' params '{}'".format(
8323 initial_config_primitive
["name"], primitive_params_
8325 self
.logger
.debug(logging_text
+ step
)
8326 await self
.vca_map
[vca_type
].exec_primitive(
8328 primitive_name
=initial_config_primitive
["name"],
8329 params_dict
=primitive_params_
,
8334 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8335 if check_if_terminated_needed
:
8336 if config_descriptor
.get("terminate-config-primitive"):
8338 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8340 check_if_terminated_needed
= False
8342 # TODO register in database that primitive is done
8344 # STEP 7 Configure metrics
8345 # Not sure if this need to be done when healing
8347 if vca_type == "helm" or vca_type == "helm-v3":
8348 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8350 artifact_path=artifact_path,
8351 ee_config_descriptor=ee_config_descriptor,
8354 target_ip=rw_mgmt_ip,
8360 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8363 for job in prometheus_jobs:
8366 {"job_name": job["job_name"]},
8369 fail_on_empty=False,
8373 step
= "instantiated at VCA"
8374 self
.logger
.debug(logging_text
+ step
)
8376 self
._write
_configuration
_status
(
8377 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8380 except Exception as e
: # TODO not use Exception but N2VC exception
8381 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8383 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8386 "Exception while {} : {}".format(step
, e
), exc_info
=True
8388 self
._write
_configuration
_status
(
8389 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8391 raise LcmException("{} {}".format(step
, e
)) from e
8393 async def _wait_heal_ro(
8399 while time() <= start_time
+ timeout
:
8400 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8401 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8402 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8403 if operational_status_ro
!= "healing":
8405 await asyncio
.sleep(15, loop
=self
.loop
)
8406 else: # timeout_ns_deploy
8407 raise NgRoException("Timeout waiting ns to deploy")
8409 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8411 Vertical Scale the VDUs in a NS
8413 :param: nsr_id: NS Instance ID
8414 :param: nslcmop_id: nslcmop ID of migrate
8417 # Try to lock HA task here
8418 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8419 if not task_is_locked_by_me
:
8421 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8422 self
.logger
.debug(logging_text
+ "Enter")
8423 # get all needed from database
8425 db_nslcmop_update
= {}
8426 nslcmop_operation_state
= None
8430 # in case of error, indicates what part of scale was failed to put nsr at error status
8431 start_deploy
= time()
8434 # wait for any previous tasks in process
8435 step
= "Waiting for previous operations to terminate"
8436 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8438 self
._write
_ns
_status
(
8441 current_operation
="VerticalScale",
8442 current_operation_id
=nslcmop_id
8444 step
= "Getting nslcmop from database"
8445 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8446 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8447 operationParams
= db_nslcmop
.get("operationParams")
8449 target
.update(operationParams
)
8450 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8451 self
.logger
.debug("RO return > {}".format(desc
))
8452 action_id
= desc
["action_id"]
8453 await self
._wait
_ng
_ro
(
8454 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8455 operation
="verticalscale"
8457 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8458 self
.logger
.error("Exit Exception {}".format(e
))
8460 except asyncio
.CancelledError
:
8461 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8462 exc
= "Operation was cancelled"
8463 except Exception as e
:
8464 exc
= traceback
.format_exc()
8465 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8467 self
._write
_ns
_status
(
8470 current_operation
="IDLE",
8471 current_operation_id
=None,
8476 ] = "FAILED {}: {}".format(step
, exc
)
8477 nslcmop_operation_state
= "FAILED"
8479 nslcmop_operation_state
= "COMPLETED"
8480 db_nslcmop_update
["detailed-status"] = "Done"
8481 db_nsr_update
["detailed-status"] = "Done"
8483 self
._write
_op
_status
(
8487 operation_state
=nslcmop_operation_state
,
8488 other_update
=db_nslcmop_update
,
8490 if nslcmop_operation_state
:
8494 "nslcmop_id": nslcmop_id
,
8495 "operationState": nslcmop_operation_state
,
8497 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8498 except Exception as e
:
8500 logging_text
+ "kafka_write notification Exception {}".format(e
)
8502 self
.logger
.debug(logging_text
+ "Exit")
8503 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")